001/*
002 * Copyright 2025 devteam@scivicslab.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing,
011 * software distributed under the License is distributed on an
012 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied.  See the License for the
014 * specific language governing permissions and limitations
015 * under the License.
016 */
017
018package com.scivicslab.actoriac.log;
019
020import java.io.BufferedWriter;
021import java.io.FileWriter;
022import java.io.IOException;
023import java.io.PrintWriter;
024import java.nio.file.Path;
025import java.sql.*;
026import java.time.LocalDateTime;
027import java.time.ZoneId;
028import java.time.format.DateTimeFormatter;
029import java.util.ArrayList;
030import java.util.List;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.LinkedBlockingQueue;
033import java.util.concurrent.atomic.AtomicBoolean;
034import java.util.logging.Logger;
035
036/**
037 * H2 Database implementation of DistributedLogStore.
038 *
039 * <p>Uses H2 embedded database for storing logs from distributed workflow
040 * execution. Supports concurrent access from multiple threads and provides
041 * efficient querying by node, level, and time.</p>
042 *
043 * <p>Features:</p>
044 * <ul>
045 *   <li>Pure Java - no native dependencies</li>
046 *   <li>Single file storage (.mv.db)</li>
047 *   <li>Asynchronous batch writing for performance</li>
048 *   <li>SQL-based querying</li>
049 * </ul>
050 *
051 * @author devteam@scivicslab.com
052 */
053public class H2LogStore implements DistributedLogStore {
054
055    private static final Logger LOG = Logger.getLogger(H2LogStore.class.getName());
056    private static final DateTimeFormatter ISO_FORMATTER =
057            DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXX");
058    private static final ZoneId SYSTEM_ZONE = ZoneId.systemDefault();
059
060    private final Connection connection;
061    private final H2LogReader reader;
062    private final BlockingQueue<LogTask> writeQueue;
063    private final Thread writerThread;
064    private final AtomicBoolean running;
065    private static final int BATCH_SIZE = 100;
066
067    /**
068     * Optional text log file writer.
069     * When set, log entries are also written to this text file.
070     */
071    private PrintWriter textLogWriter;
072
073    /**
074     * Creates an H2LogStore with the specified database path.
075     *
076     * @param dbPath path to the database file (without extension)
077     * @throws SQLException if database connection fails
078     */
079    public H2LogStore(Path dbPath) throws SQLException {
080        // AUTO_SERVER=TRUE allows multiple processes to access the same DB
081        // First process starts embedded server, others connect via TCP
082        String url = "jdbc:h2:" + dbPath.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE";
083        this.connection = DriverManager.getConnection(url);
084        this.reader = new H2LogReader(connection);
085        this.writeQueue = new LinkedBlockingQueue<>();
086        this.running = new AtomicBoolean(true);
087
088        initSchema();
089
090        // Start async writer thread
091        this.writerThread = new Thread(this::writerLoop, "H2LogStore-Writer");
092        this.writerThread.setDaemon(true);
093        this.writerThread.start();
094    }
095
096    /**
097     * Creates an in-memory H2LogStore (for testing).
098     *
099     * @throws SQLException if database connection fails
100     */
101    public H2LogStore() throws SQLException {
102        // Use unique DB name per instance to avoid test interference
103        String url = "jdbc:h2:mem:testdb_" + System.nanoTime() + ";DB_CLOSE_DELAY=-1";
104        this.connection = DriverManager.getConnection(url);
105        this.reader = new H2LogReader(connection);
106        this.writeQueue = new LinkedBlockingQueue<>();
107        this.running = new AtomicBoolean(true);
108
109        initSchema();
110
111        this.writerThread = new Thread(this::writerLoop, "H2LogStore-Writer");
112        this.writerThread.setDaemon(true);
113        this.writerThread.start();
114    }
115
116    /**
117     * Gets the database connection for read-only operations.
118     *
119     * <p>This connection is shared and should NOT be closed by the caller.
120     * The connection will be closed when the H2LogStore is closed.</p>
121     *
122     * @return the JDBC connection
123     */
124    @Override
125    public Connection getConnection() {
126        return this.connection;
127    }
128
129    /**
130     * Sets the text log file for additional text-based logging.
131     *
132     * <p>When a text log file is set, all log entries written to the database
133     * are also appended to this text file in a human-readable format.</p>
134     *
135     * @param textLogPath path to the text log file
136     * @throws IOException if the file cannot be opened for writing
137     */
138    public void setTextLogFile(Path textLogPath) throws IOException {
139        if (this.textLogWriter != null) {
140            this.textLogWriter.close();
141        }
142        this.textLogWriter = new PrintWriter(new BufferedWriter(new FileWriter(textLogPath.toFile(), true)), true);
143        LOG.info("Text logging enabled: " + textLogPath);
144    }
145
146    /**
147     * Disables text file logging.
148     */
149    public void disableTextLog() {
150        if (this.textLogWriter != null) {
151            this.textLogWriter.close();
152            this.textLogWriter = null;
153        }
154    }
155
156    private void initSchema() throws SQLException {
157        initSchema(connection);
158    }
159
160    /**
161     * Initializes the log database schema on the given connection.
162     *
163     * <p>This method can be called from external components (e.g., log server)
164     * to ensure consistent schema across all database access points.</p>
165     *
166     * @param conn the database connection
167     * @throws SQLException if schema initialization fails
168     */
169    public static void initSchema(Connection conn) throws SQLException {
170        try (Statement stmt = conn.createStatement()) {
171            // Sessions table
172            stmt.execute("""
173                CREATE TABLE IF NOT EXISTS sessions (
174                    id IDENTITY PRIMARY KEY,
175                    started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
176                    ended_at TIMESTAMP,
177                    workflow_name VARCHAR(255),
178                    overlay_name VARCHAR(255),
179                    inventory_name VARCHAR(255),
180                    node_count INT,
181                    status VARCHAR(20) DEFAULT 'RUNNING',
182                    cwd VARCHAR(1000),
183                    git_commit VARCHAR(50),
184                    git_branch VARCHAR(255),
185                    command_line VARCHAR(2000),
186                    actoriac_version VARCHAR(50),
187                    actoriac_commit VARCHAR(50)
188                )
189                """);
190
191            // Logs table
192            // label and action_name use CLOB to store long YAML snippets
193            stmt.execute("""
194                CREATE TABLE IF NOT EXISTS logs (
195                    id IDENTITY PRIMARY KEY,
196                    session_id BIGINT,
197                    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
198                    node_id VARCHAR(255) NOT NULL,
199                    label CLOB,
200                    action_name CLOB,
201                    level VARCHAR(10) NOT NULL,
202                    message CLOB,
203                    exit_code INT,
204                    duration_ms BIGINT,
205                    FOREIGN KEY (session_id) REFERENCES sessions(id)
206                )
207                """);
208
209            // Node results table
210            stmt.execute("""
211                CREATE TABLE IF NOT EXISTS node_results (
212                    id IDENTITY PRIMARY KEY,
213                    session_id BIGINT,
214                    node_id VARCHAR(255) NOT NULL,
215                    status VARCHAR(20) NOT NULL,
216                    reason VARCHAR(1000),
217                    FOREIGN KEY (session_id) REFERENCES sessions(id),
218                    UNIQUE (session_id, node_id)
219                )
220                """);
221
222            // Indexes
223            stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_session ON logs(session_id)");
224            stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_node ON logs(node_id)");
225            stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_level ON logs(level)");
226            stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp)");
227            stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_workflow ON sessions(workflow_name)");
228            stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_overlay ON sessions(overlay_name)");
229            stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_inventory ON sessions(inventory_name)");
230            stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at)");
231        }
232    }
233
234    private void writerLoop() {
235        List<LogTask> batch = new ArrayList<>(BATCH_SIZE);
236        while (running.get() || !writeQueue.isEmpty()) {
237            try {
238                LogTask task = writeQueue.poll(100, java.util.concurrent.TimeUnit.MILLISECONDS);
239                if (task != null) {
240                    batch.add(task);
241                    writeQueue.drainTo(batch, BATCH_SIZE - 1);
242                    processBatch(batch);
243                    batch.clear();
244                }
245            } catch (InterruptedException e) {
246                Thread.currentThread().interrupt();
247                break;
248            }
249        }
250        // Process remaining
251        if (!batch.isEmpty()) {
252            processBatch(batch);
253        }
254    }
255
256    private void processBatch(List<LogTask> batch) {
257        try {
258            connection.setAutoCommit(false);
259            for (LogTask task : batch) {
260                task.execute(connection);
261            }
262            connection.commit();
263            connection.setAutoCommit(true);
264        } catch (SQLException e) {
265            try {
266                connection.rollback();
267                connection.setAutoCommit(true);
268            } catch (SQLException ex) {
269                // Ignore rollback errors
270            }
271            e.printStackTrace();
272        }
273    }
274
275    @Override
276    public long startSession(String workflowName, int nodeCount) {
277        return startSession(workflowName, null, null, nodeCount);
278    }
279
280    @Override
281    public long startSession(String workflowName, String overlayName, String inventoryName, int nodeCount) {
282        return startSession(workflowName, overlayName, inventoryName, nodeCount,
283                            null, null, null, null, null, null);
284    }
285
286    @Override
287    public long startSession(String workflowName, String overlayName, String inventoryName, int nodeCount,
288                             String cwd, String gitCommit, String gitBranch,
289                             String commandLine, String actorIacVersion, String actorIacCommit) {
290        try (PreparedStatement ps = connection.prepareStatement(
291                "INSERT INTO sessions (workflow_name, overlay_name, inventory_name, node_count, " +
292                "cwd, git_commit, git_branch, command_line, actoriac_version, actoriac_commit) " +
293                "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
294                Statement.RETURN_GENERATED_KEYS)) {
295            ps.setString(1, workflowName);
296            ps.setString(2, overlayName);
297            ps.setString(3, inventoryName);
298            ps.setInt(4, nodeCount);
299            ps.setString(5, cwd);
300            ps.setString(6, gitCommit);
301            ps.setString(7, gitBranch);
302            ps.setString(8, commandLine);
303            ps.setString(9, actorIacVersion);
304            ps.setString(10, actorIacCommit);
305            ps.executeUpdate();
306            try (ResultSet rs = ps.getGeneratedKeys()) {
307                if (rs.next()) {
308                    return rs.getLong(1);
309                }
310            }
311        } catch (SQLException e) {
312            throw new RuntimeException("Failed to start session", e);
313        }
314        return -1;
315    }
316
317    @Override
318    public void log(long sessionId, String nodeId, LogLevel level, String message) {
319        log(sessionId, nodeId, null, level, message);
320    }
321
322    @Override
323    public void log(long sessionId, String nodeId, String label, LogLevel level, String message) {
324        writeQueue.offer(new LogTask.InsertLog(sessionId, nodeId, label, null, level, message, null, null));
325        writeToTextLog(nodeId, label, level, message);
326    }
327
328    @Override
329    public void logAction(long sessionId, String nodeId, String label,
330                          String actionName, int exitCode, long durationMs, String output) {
331        LogLevel level = parseLogLevelFromLabel(label, exitCode);
332        writeQueue.offer(new LogTask.InsertLog(sessionId, nodeId, label, actionName, level, output, exitCode, durationMs));
333        writeToTextLog(nodeId, label, level, output);
334    }
335
336    /**
337     * Parses log level from the label parameter.
338     *
339     * <p>Maps java.util.logging levels to LogLevel:</p>
340     * <ul>
341     *   <li>SEVERE → ERROR</li>
342     *   <li>WARNING → WARN</li>
343     *   <li>INFO → INFO</li>
344     *   <li>CONFIG, FINE, FINER, FINEST → DEBUG</li>
345     * </ul>
346     */
347    private LogLevel parseLogLevelFromLabel(String label, int exitCode) {
348        if (label != null && label.startsWith("log-")) {
349            String levelName = label.substring(4).toUpperCase();
350            return switch (levelName) {
351                case "SEVERE" -> LogLevel.ERROR;
352                case "WARNING" -> LogLevel.WARN;
353                case "INFO" -> LogLevel.INFO;
354                case "CONFIG", "FINE", "FINER", "FINEST" -> LogLevel.DEBUG;
355                default -> exitCode == 0 ? LogLevel.INFO : LogLevel.ERROR;
356            };
357        }
358        return exitCode == 0 ? LogLevel.INFO : LogLevel.ERROR;
359    }
360
361    /**
362     * Writes a log entry to the text log file if enabled.
363     *
364     * @param nodeId the node identifier
365     * @param label the workflow label (may be null)
366     * @param level the log level
367     * @param message the log message
368     */
369    private void writeToTextLog(String nodeId, String label, LogLevel level, String message) {
370        if (textLogWriter == null) {
371            return;
372        }
373        String timestamp = LocalDateTime.now().atZone(SYSTEM_ZONE).format(ISO_FORMATTER);
374        String labelPart = label != null ? " [" + label + "]" : "";
375        textLogWriter.printf("%s %s %s%s %s%n", timestamp, level, nodeId, labelPart, message);
376    }
377
378    @Override
379    public void markNodeSuccess(long sessionId, String nodeId) {
380        writeQueue.offer(new LogTask.UpdateNodeResult(sessionId, nodeId, "SUCCESS", null));
381    }
382
383    @Override
384    public void markNodeFailed(long sessionId, String nodeId, String reason) {
385        writeQueue.offer(new LogTask.UpdateNodeResult(sessionId, nodeId, "FAILED", reason));
386    }
387
388    @Override
389    public void endSession(long sessionId, SessionStatus status) {
390        // Flush pending writes
391        flushWrites();
392
393        try (PreparedStatement ps = connection.prepareStatement(
394                "UPDATE sessions SET ended_at = CURRENT_TIMESTAMP, status = ? WHERE id = ?")) {
395            ps.setString(1, status.name());
396            ps.setLong(2, sessionId);
397            ps.executeUpdate();
398        } catch (SQLException e) {
399            throw new RuntimeException("Failed to end session", e);
400        }
401    }
402
403    private void flushWrites() {
404        // Wait for queue to drain
405        while (!writeQueue.isEmpty()) {
406            try {
407                Thread.sleep(10);
408            } catch (InterruptedException e) {
409                Thread.currentThread().interrupt();
410                break;
411            }
412        }
413    }
414
415    @Override
416    public List<LogEntry> getLogsByNode(long sessionId, String nodeId) {
417        return reader.getLogsByNode(sessionId, nodeId);
418    }
419
420    @Override
421    public List<LogEntry> getLogsByLevel(long sessionId, LogLevel minLevel) {
422        return reader.getLogsByLevel(sessionId, minLevel);
423    }
424
425    @Override
426    public SessionSummary getSummary(long sessionId) {
427        return reader.getSummary(sessionId);
428    }
429
430    @Override
431    public long getLatestSessionId() {
432        return reader.getLatestSessionId();
433    }
434
435    @Override
436    public List<SessionSummary> listSessions(int limit) {
437        return reader.listSessions(limit);
438    }
439
440    @Override
441    public void close() throws Exception {
442        running.set(false);
443        writerThread.interrupt();
444        try {
445            writerThread.join(5000);
446        } catch (InterruptedException e) {
447            Thread.currentThread().interrupt();
448        }
449        if (textLogWriter != null) {
450            textLogWriter.close();
451            textLogWriter = null;
452        }
453        connection.close();
454    }
455
456    // Internal task classes for async writing
457    private interface LogTask {
458        void execute(Connection conn) throws SQLException;
459
460        record InsertLog(long sessionId, String nodeId, String label, String actionName,
461                         LogLevel level, String message, Integer exitCode, Long durationMs) implements LogTask {
462            @Override
463            public void execute(Connection conn) throws SQLException {
464                try (PreparedStatement ps = conn.prepareStatement(
465                        "INSERT INTO logs (session_id, node_id, label, action_name, level, message, exit_code, duration_ms) " +
466                        "VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) {
467                    ps.setLong(1, sessionId);
468                    ps.setString(2, nodeId);
469                    ps.setString(3, label);
470                    ps.setString(4, actionName);
471                    ps.setString(5, level.name());
472                    ps.setString(6, message);
473                    if (exitCode != null) {
474                        ps.setInt(7, exitCode);
475                    } else {
476                        ps.setNull(7, Types.INTEGER);
477                    }
478                    if (durationMs != null) {
479                        ps.setLong(8, durationMs);
480                    } else {
481                        ps.setNull(8, Types.BIGINT);
482                    }
483                    ps.executeUpdate();
484                }
485            }
486        }
487
488        record UpdateNodeResult(long sessionId, String nodeId, String status, String reason) implements LogTask {
489            @Override
490            public void execute(Connection conn) throws SQLException {
491                try (PreparedStatement ps = conn.prepareStatement(
492                        "MERGE INTO node_results (session_id, node_id, status, reason) KEY (session_id, node_id) VALUES (?, ?, ?, ?)")) {
493                    ps.setLong(1, sessionId);
494                    ps.setString(2, nodeId);
495                    ps.setString(3, status);
496                    ps.setString(4, reason);
497                    ps.executeUpdate();
498                }
499            }
500        }
501    }
502}