001/*
002 * Copyright 2025 devteam@scivics-lab.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing,
011 * software distributed under the License is distributed on an
012 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied.  See the License for the
014 * specific language governing permissions and limitations
015 * under the License.
016 */
017
018package com.scivicslab.actoriac.log;
019
020import java.nio.file.Path;
021import java.sql.*;
022import java.time.LocalDateTime;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.LinkedBlockingQueue;
027import java.util.concurrent.atomic.AtomicBoolean;
028import java.util.logging.Logger;
029
030/**
031 * H2 Database implementation of DistributedLogStore.
032 *
033 * <p>Uses H2 embedded database for storing logs from distributed workflow
034 * execution. Supports concurrent access from multiple threads and provides
035 * efficient querying by node, level, and time.</p>
036 *
037 * <p>Features:</p>
038 * <ul>
039 *   <li>Pure Java - no native dependencies</li>
040 *   <li>Single file storage (.mv.db)</li>
041 *   <li>Asynchronous batch writing for performance</li>
042 *   <li>SQL-based querying</li>
043 * </ul>
044 *
045 * @author devteam@scivics-lab.com
046 */
047public class H2LogStore implements DistributedLogStore {
048
049    private static final Logger LOG = Logger.getLogger(H2LogStore.class.getName());
050
051    private final Connection connection;
052    private final BlockingQueue<LogTask> writeQueue;
053    private final Thread writerThread;
054    private final AtomicBoolean running;
055    private static final int BATCH_SIZE = 100;
056    private static final int DEFAULT_TCP_PORT = 29090;
057
058    /**
059     * Creates an H2LogStore with the specified database path.
060     *
061     * @param dbPath path to the database file (without extension)
062     * @throws SQLException if database connection fails
063     */
064    public H2LogStore(Path dbPath) throws SQLException {
065        // AUTO_SERVER=TRUE allows multiple processes to access the same DB
066        // First process starts embedded server, others connect via TCP
067        String url = "jdbc:h2:" + dbPath.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE";
068        this.connection = DriverManager.getConnection(url);
069        this.writeQueue = new LinkedBlockingQueue<>();
070        this.running = new AtomicBoolean(true);
071
072        initSchema();
073
074        // Start async writer thread
075        this.writerThread = new Thread(this::writerLoop, "H2LogStore-Writer");
076        this.writerThread.setDaemon(true);
077        this.writerThread.start();
078    }
079
080    /**
081     * Creates an in-memory H2LogStore (for testing).
082     *
083     * @throws SQLException if database connection fails
084     */
085    public H2LogStore() throws SQLException {
086        // Use unique DB name per instance to avoid test interference
087        String url = "jdbc:h2:mem:testdb_" + System.nanoTime() + ";DB_CLOSE_DELAY=-1";
088        this.connection = DriverManager.getConnection(url);
089        this.writeQueue = new LinkedBlockingQueue<>();
090        this.running = new AtomicBoolean(true);
091
092        initSchema();
093
094        this.writerThread = new Thread(this::writerLoop, "H2LogStore-Writer");
095        this.writerThread.setDaemon(true);
096        this.writerThread.start();
097    }
098
099    /**
100     * Creates an H2LogStore connected to a remote TCP server.
101     *
102     * <p>The server should be started using the {@code log-server} command
103     * before connecting. Schema is expected to be initialized by the server.</p>
104     *
105     * @param host H2 server hostname (typically "localhost")
106     * @param port H2 server TCP port
107     * @param dbPath database path on the server
108     * @throws SQLException if connection fails
109     */
110    public H2LogStore(String host, int port, String dbPath) throws SQLException {
111        String url = "jdbc:h2:tcp://" + host + ":" + port + "/" + dbPath;
112        this.connection = DriverManager.getConnection(url);
113        this.writeQueue = new LinkedBlockingQueue<>();
114        this.running = new AtomicBoolean(true);
115
116        // Schema should already be initialized by the server
117        verifySchema();
118
119        this.writerThread = new Thread(this::writerLoop, "H2LogStore-Writer");
120        this.writerThread.setDaemon(true);
121        this.writerThread.start();
122    }
123
124    /**
125     * Factory method that attempts TCP connection with fallback to embedded mode.
126     *
127     * <p>If a log server address is specified and reachable, connects via TCP.
128     * Otherwise, falls back to embedded mode with AUTO_SERVER=TRUE.</p>
129     *
130     * @param logServer server address in "host:port" format (may be null)
131     * @param embeddedPath path for embedded database fallback
132     * @return H2LogStore instance
133     * @throws SQLException if both TCP and embedded connections fail
134     */
135    public static H2LogStore createWithFallback(String logServer, Path embeddedPath)
136            throws SQLException {
137        if (logServer != null && !logServer.isBlank()) {
138            try {
139                String[] parts = logServer.split(":");
140                String host = parts[0];
141                int port = parts.length > 1 ? Integer.parseInt(parts[1]) : DEFAULT_TCP_PORT;
142
143                // Use the embedded path as the database name on the server
144                String dbPath = embeddedPath.toAbsolutePath().toString();
145
146                H2LogStore store = new H2LogStore(host, port, dbPath);
147                LOG.info("Connected to log server at " + logServer);
148                return store;
149            } catch (SQLException e) {
150                LOG.warning("Failed to connect to log server '" + logServer +
151                           "', falling back to embedded mode: " + e.getMessage());
152            } catch (NumberFormatException e) {
153                LOG.warning("Invalid log server port in '" + logServer +
154                           "', falling back to embedded mode");
155            }
156        }
157        return new H2LogStore(embeddedPath);
158    }
159
160    /**
161     * Verifies the database schema exists (for TCP connections).
162     *
163     * @throws SQLException if schema is not initialized
164     */
165    private void verifySchema() throws SQLException {
166        try (Statement stmt = connection.createStatement()) {
167            // Simple check - if this fails, schema doesn't exist
168            stmt.execute("SELECT 1 FROM sessions WHERE 1=0");
169            stmt.execute("SELECT 1 FROM logs WHERE 1=0");
170            stmt.execute("SELECT 1 FROM node_results WHERE 1=0");
171        } catch (SQLException e) {
172            throw new SQLException("Database schema not initialized. " +
173                    "Ensure log-server has been started at least once.", e);
174        }
175    }
176
177    private void initSchema() throws SQLException {
178        try (Statement stmt = connection.createStatement()) {
179            // Sessions table
180            stmt.execute("""
181                CREATE TABLE IF NOT EXISTS sessions (
182                    id IDENTITY PRIMARY KEY,
183                    started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
184                    ended_at TIMESTAMP,
185                    workflow_name VARCHAR(255),
186                    overlay_name VARCHAR(255),
187                    inventory_name VARCHAR(255),
188                    node_count INT,
189                    status VARCHAR(20) DEFAULT 'RUNNING'
190                )
191                """);
192
193            // Add columns if they don't exist (migration for existing databases)
194            try {
195                stmt.execute("ALTER TABLE sessions ADD COLUMN IF NOT EXISTS overlay_name VARCHAR(255)");
196                stmt.execute("ALTER TABLE sessions ADD COLUMN IF NOT EXISTS inventory_name VARCHAR(255)");
197            } catch (SQLException e) {
198                // Ignore if columns already exist
199            }
200
201            // Logs table
202            // vertex_name and action_name use CLOB to store long YAML snippets
203            stmt.execute("""
204                CREATE TABLE IF NOT EXISTS logs (
205                    id IDENTITY PRIMARY KEY,
206                    session_id BIGINT,
207                    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
208                    node_id VARCHAR(255) NOT NULL,
209                    vertex_name CLOB,
210                    action_name CLOB,
211                    level VARCHAR(10) NOT NULL,
212                    message CLOB,
213                    exit_code INT,
214                    duration_ms BIGINT,
215                    FOREIGN KEY (session_id) REFERENCES sessions(id)
216                )
217                """);
218
219            // Node results table
220            stmt.execute("""
221                CREATE TABLE IF NOT EXISTS node_results (
222                    id IDENTITY PRIMARY KEY,
223                    session_id BIGINT,
224                    node_id VARCHAR(255) NOT NULL,
225                    status VARCHAR(20) NOT NULL,
226                    reason VARCHAR(1000),
227                    FOREIGN KEY (session_id) REFERENCES sessions(id),
228                    UNIQUE (session_id, node_id)
229                )
230                """);
231
232            // Indexes
233            stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_session ON logs(session_id)");
234            stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_node ON logs(node_id)");
235            stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_level ON logs(level)");
236            stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp)");
237            stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_workflow ON sessions(workflow_name)");
238            stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_overlay ON sessions(overlay_name)");
239            stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_inventory ON sessions(inventory_name)");
240            stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at)");
241        }
242    }
243
244    private void writerLoop() {
245        List<LogTask> batch = new ArrayList<>(BATCH_SIZE);
246        while (running.get() || !writeQueue.isEmpty()) {
247            try {
248                LogTask task = writeQueue.poll(100, java.util.concurrent.TimeUnit.MILLISECONDS);
249                if (task != null) {
250                    batch.add(task);
251                    writeQueue.drainTo(batch, BATCH_SIZE - 1);
252                    processBatch(batch);
253                    batch.clear();
254                }
255            } catch (InterruptedException e) {
256                Thread.currentThread().interrupt();
257                break;
258            }
259        }
260        // Process remaining
261        if (!batch.isEmpty()) {
262            processBatch(batch);
263        }
264    }
265
266    private void processBatch(List<LogTask> batch) {
267        try {
268            connection.setAutoCommit(false);
269            for (LogTask task : batch) {
270                task.execute(connection);
271            }
272            connection.commit();
273            connection.setAutoCommit(true);
274        } catch (SQLException e) {
275            try {
276                connection.rollback();
277                connection.setAutoCommit(true);
278            } catch (SQLException ex) {
279                // Ignore rollback errors
280            }
281            e.printStackTrace();
282        }
283    }
284
285    @Override
286    public long startSession(String workflowName, int nodeCount) {
287        return startSession(workflowName, null, null, nodeCount);
288    }
289
290    @Override
291    public long startSession(String workflowName, String overlayName, String inventoryName, int nodeCount) {
292        try (PreparedStatement ps = connection.prepareStatement(
293                "INSERT INTO sessions (workflow_name, overlay_name, inventory_name, node_count) VALUES (?, ?, ?, ?)",
294                Statement.RETURN_GENERATED_KEYS)) {
295            ps.setString(1, workflowName);
296            ps.setString(2, overlayName);
297            ps.setString(3, inventoryName);
298            ps.setInt(4, nodeCount);
299            ps.executeUpdate();
300            try (ResultSet rs = ps.getGeneratedKeys()) {
301                if (rs.next()) {
302                    return rs.getLong(1);
303                }
304            }
305        } catch (SQLException e) {
306            throw new RuntimeException("Failed to start session", e);
307        }
308        return -1;
309    }
310
311    @Override
312    public void log(long sessionId, String nodeId, LogLevel level, String message) {
313        log(sessionId, nodeId, null, level, message);
314    }
315
316    @Override
317    public void log(long sessionId, String nodeId, String vertexName, LogLevel level, String message) {
318        writeQueue.offer(new LogTask.InsertLog(sessionId, nodeId, vertexName, null, level, message, null, null));
319    }
320
321    @Override
322    public void logAction(long sessionId, String nodeId, String vertexName,
323                          String actionName, int exitCode, long durationMs, String output) {
324        LogLevel level = exitCode == 0 ? LogLevel.INFO : LogLevel.ERROR;
325        writeQueue.offer(new LogTask.InsertLog(sessionId, nodeId, vertexName, actionName, level, output, exitCode, durationMs));
326    }
327
328    @Override
329    public void markNodeSuccess(long sessionId, String nodeId) {
330        writeQueue.offer(new LogTask.UpdateNodeResult(sessionId, nodeId, "SUCCESS", null));
331    }
332
333    @Override
334    public void markNodeFailed(long sessionId, String nodeId, String reason) {
335        writeQueue.offer(new LogTask.UpdateNodeResult(sessionId, nodeId, "FAILED", reason));
336    }
337
338    @Override
339    public void endSession(long sessionId, SessionStatus status) {
340        // Flush pending writes
341        flushWrites();
342
343        try (PreparedStatement ps = connection.prepareStatement(
344                "UPDATE sessions SET ended_at = CURRENT_TIMESTAMP, status = ? WHERE id = ?")) {
345            ps.setString(1, status.name());
346            ps.setLong(2, sessionId);
347            ps.executeUpdate();
348        } catch (SQLException e) {
349            throw new RuntimeException("Failed to end session", e);
350        }
351    }
352
353    private void flushWrites() {
354        // Wait for queue to drain
355        while (!writeQueue.isEmpty()) {
356            try {
357                Thread.sleep(10);
358            } catch (InterruptedException e) {
359                Thread.currentThread().interrupt();
360                break;
361            }
362        }
363    }
364
365    @Override
366    public List<LogEntry> getLogsByNode(long sessionId, String nodeId) {
367        List<LogEntry> entries = new ArrayList<>();
368        try (PreparedStatement ps = connection.prepareStatement(
369                "SELECT * FROM logs WHERE session_id = ? AND node_id = ? ORDER BY timestamp")) {
370            ps.setLong(1, sessionId);
371            ps.setString(2, nodeId);
372            try (ResultSet rs = ps.executeQuery()) {
373                while (rs.next()) {
374                    entries.add(mapLogEntry(rs));
375                }
376            }
377        } catch (SQLException e) {
378            throw new RuntimeException("Failed to query logs", e);
379        }
380        return entries;
381    }
382
383    @Override
384    public List<LogEntry> getLogsByLevel(long sessionId, LogLevel minLevel) {
385        List<LogEntry> entries = new ArrayList<>();
386        try (PreparedStatement ps = connection.prepareStatement(
387                "SELECT * FROM logs WHERE session_id = ? AND level IN (?, ?, ?, ?) ORDER BY timestamp")) {
388            ps.setLong(1, sessionId);
389            int idx = 2;
390            for (LogLevel level : LogLevel.values()) {
391                if (level.isAtLeast(minLevel)) {
392                    ps.setString(idx++, level.name());
393                } else {
394                    ps.setString(idx++, "NONE"); // Placeholder
395                }
396            }
397            try (ResultSet rs = ps.executeQuery()) {
398                while (rs.next()) {
399                    entries.add(mapLogEntry(rs));
400                }
401            }
402        } catch (SQLException e) {
403            throw new RuntimeException("Failed to query logs", e);
404        }
405        return entries;
406    }
407
408    @Override
409    public SessionSummary getSummary(long sessionId) {
410        try {
411            // Get session info
412            String workflowName = null;
413            String overlayName = null;
414            String inventoryName = null;
415            LocalDateTime startedAt = null;
416            LocalDateTime endedAt = null;
417            int nodeCount = 0;
418            SessionStatus status = SessionStatus.RUNNING;
419
420            try (PreparedStatement ps = connection.prepareStatement(
421                    "SELECT * FROM sessions WHERE id = ?")) {
422                ps.setLong(1, sessionId);
423                try (ResultSet rs = ps.executeQuery()) {
424                    if (rs.next()) {
425                        workflowName = rs.getString("workflow_name");
426                        overlayName = rs.getString("overlay_name");
427                        inventoryName = rs.getString("inventory_name");
428                        Timestamp ts = rs.getTimestamp("started_at");
429                        startedAt = ts != null ? ts.toLocalDateTime() : null;
430                        ts = rs.getTimestamp("ended_at");
431                        endedAt = ts != null ? ts.toLocalDateTime() : null;
432                        nodeCount = rs.getInt("node_count");
433                        String statusStr = rs.getString("status");
434                        if (statusStr != null) {
435                            status = SessionStatus.valueOf(statusStr);
436                        }
437                    }
438                }
439            }
440
441            // Get node results
442            int successCount = 0;
443            int failedCount = 0;
444            List<String> failedNodes = new ArrayList<>();
445
446            try (PreparedStatement ps = connection.prepareStatement(
447                    "SELECT node_id, status FROM node_results WHERE session_id = ?")) {
448                ps.setLong(1, sessionId);
449                try (ResultSet rs = ps.executeQuery()) {
450                    while (rs.next()) {
451                        String nodeStatus = rs.getString("status");
452                        if ("SUCCESS".equals(nodeStatus)) {
453                            successCount++;
454                        } else {
455                            failedCount++;
456                            failedNodes.add(rs.getString("node_id"));
457                        }
458                    }
459                }
460            }
461
462            // Get log counts
463            int totalLogEntries = 0;
464            int errorCount = 0;
465
466            try (PreparedStatement ps = connection.prepareStatement(
467                    "SELECT COUNT(*) as total, SUM(CASE WHEN level = 'ERROR' THEN 1 ELSE 0 END) as errors " +
468                    "FROM logs WHERE session_id = ?")) {
469                ps.setLong(1, sessionId);
470                try (ResultSet rs = ps.executeQuery()) {
471                    if (rs.next()) {
472                        totalLogEntries = rs.getInt("total");
473                        errorCount = rs.getInt("errors");
474                    }
475                }
476            }
477
478            return new SessionSummary(sessionId, workflowName, overlayName, inventoryName,
479                    startedAt, endedAt, nodeCount, status, successCount, failedCount,
480                    failedNodes, totalLogEntries, errorCount);
481
482        } catch (SQLException e) {
483            throw new RuntimeException("Failed to get session summary", e);
484        }
485    }
486
487    @Override
488    public long getLatestSessionId() {
489        try (Statement stmt = connection.createStatement();
490             ResultSet rs = stmt.executeQuery("SELECT MAX(id) FROM sessions")) {
491            if (rs.next()) {
492                return rs.getLong(1);
493            }
494        } catch (SQLException e) {
495            throw new RuntimeException("Failed to get latest session", e);
496        }
497        return -1;
498    }
499
500    @Override
501    public List<SessionSummary> listSessions(int limit) {
502        List<SessionSummary> sessions = new ArrayList<>();
503        try (PreparedStatement ps = connection.prepareStatement(
504                "SELECT id FROM sessions ORDER BY started_at DESC LIMIT ?")) {
505            ps.setInt(1, limit);
506            try (ResultSet rs = ps.executeQuery()) {
507                while (rs.next()) {
508                    sessions.add(getSummary(rs.getLong("id")));
509                }
510            }
511        } catch (SQLException e) {
512            throw new RuntimeException("Failed to list sessions", e);
513        }
514        return sessions;
515    }
516
517    private LogEntry mapLogEntry(ResultSet rs) throws SQLException {
518        Timestamp ts = rs.getTimestamp("timestamp");
519        Integer exitCode = rs.getObject("exit_code") != null ? rs.getInt("exit_code") : null;
520        Long durationMs = rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null;
521        String levelStr = rs.getString("level");
522        LogLevel level = levelStr != null ? LogLevel.valueOf(levelStr) : LogLevel.INFO;
523
524        return new LogEntry(
525                rs.getLong("id"),
526                rs.getLong("session_id"),
527                ts != null ? ts.toLocalDateTime() : null,
528                rs.getString("node_id"),
529                rs.getString("vertex_name"),
530                rs.getString("action_name"),
531                level,
532                rs.getString("message"),
533                exitCode,
534                durationMs
535        );
536    }
537
538    @Override
539    public void close() throws Exception {
540        running.set(false);
541        writerThread.interrupt();
542        try {
543            writerThread.join(5000);
544        } catch (InterruptedException e) {
545            Thread.currentThread().interrupt();
546        }
547        connection.close();
548    }
549
550    // Internal task classes for async writing
551    private interface LogTask {
552        void execute(Connection conn) throws SQLException;
553
554        record InsertLog(long sessionId, String nodeId, String vertexName, String actionName,
555                         LogLevel level, String message, Integer exitCode, Long durationMs) implements LogTask {
556            @Override
557            public void execute(Connection conn) throws SQLException {
558                try (PreparedStatement ps = conn.prepareStatement(
559                        "INSERT INTO logs (session_id, node_id, vertex_name, action_name, level, message, exit_code, duration_ms) " +
560                        "VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) {
561                    ps.setLong(1, sessionId);
562                    ps.setString(2, nodeId);
563                    ps.setString(3, vertexName);
564                    ps.setString(4, actionName);
565                    ps.setString(5, level.name());
566                    ps.setString(6, message);
567                    if (exitCode != null) {
568                        ps.setInt(7, exitCode);
569                    } else {
570                        ps.setNull(7, Types.INTEGER);
571                    }
572                    if (durationMs != null) {
573                        ps.setLong(8, durationMs);
574                    } else {
575                        ps.setNull(8, Types.BIGINT);
576                    }
577                    ps.executeUpdate();
578                }
579            }
580        }
581
582        record UpdateNodeResult(long sessionId, String nodeId, String status, String reason) implements LogTask {
583            @Override
584            public void execute(Connection conn) throws SQLException {
585                try (PreparedStatement ps = conn.prepareStatement(
586                        "MERGE INTO node_results (session_id, node_id, status, reason) KEY (session_id, node_id) VALUES (?, ?, ?, ?)")) {
587                    ps.setLong(1, sessionId);
588                    ps.setString(2, nodeId);
589                    ps.setString(3, status);
590                    ps.setString(4, reason);
591                    ps.executeUpdate();
592                }
593            }
594        }
595    }
596}