001/* 002 * Copyright 2025 devteam@scivics-lab.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, 011 * software distributed under the License is distributed on an 012 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the 014 * specific language governing permissions and limitations 015 * under the License. 016 */ 017 018package com.scivicslab.actoriac.log; 019 020import java.nio.file.Path; 021import java.sql.*; 022import java.time.LocalDateTime; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.LinkedBlockingQueue; 027import java.util.concurrent.atomic.AtomicBoolean; 028import java.util.logging.Logger; 029 030/** 031 * H2 Database implementation of DistributedLogStore. 032 * 033 * <p>Uses H2 embedded database for storing logs from distributed workflow 034 * execution. Supports concurrent access from multiple threads and provides 035 * efficient querying by node, level, and time.</p> 036 * 037 * <p>Features:</p> 038 * <ul> 039 * <li>Pure Java - no native dependencies</li> 040 * <li>Single file storage (.mv.db)</li> 041 * <li>Asynchronous batch writing for performance</li> 042 * <li>SQL-based querying</li> 043 * </ul> 044 * 045 * @author devteam@scivics-lab.com 046 */ 047public class H2LogStore implements DistributedLogStore { 048 049 private static final Logger LOG = Logger.getLogger(H2LogStore.class.getName()); 050 051 private final Connection connection; 052 private final BlockingQueue<LogTask> writeQueue; 053 private final Thread writerThread; 054 private final AtomicBoolean running; 055 private static final int BATCH_SIZE = 100; 056 private static final int DEFAULT_TCP_PORT = 29090; 057 058 /** 059 * Creates an H2LogStore with the specified database path. 060 * 061 * @param dbPath path to the database file (without extension) 062 * @throws SQLException if database connection fails 063 */ 064 public H2LogStore(Path dbPath) throws SQLException { 065 // AUTO_SERVER=TRUE allows multiple processes to access the same DB 066 // First process starts embedded server, others connect via TCP 067 String url = "jdbc:h2:" + dbPath.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE"; 068 this.connection = DriverManager.getConnection(url); 069 this.writeQueue = new LinkedBlockingQueue<>(); 070 this.running = new AtomicBoolean(true); 071 072 initSchema(); 073 074 // Start async writer thread 075 this.writerThread = new Thread(this::writerLoop, "H2LogStore-Writer"); 076 this.writerThread.setDaemon(true); 077 this.writerThread.start(); 078 } 079 080 /** 081 * Creates an in-memory H2LogStore (for testing). 082 * 083 * @throws SQLException if database connection fails 084 */ 085 public H2LogStore() throws SQLException { 086 // Use unique DB name per instance to avoid test interference 087 String url = "jdbc:h2:mem:testdb_" + System.nanoTime() + ";DB_CLOSE_DELAY=-1"; 088 this.connection = DriverManager.getConnection(url); 089 this.writeQueue = new LinkedBlockingQueue<>(); 090 this.running = new AtomicBoolean(true); 091 092 initSchema(); 093 094 this.writerThread = new Thread(this::writerLoop, "H2LogStore-Writer"); 095 this.writerThread.setDaemon(true); 096 this.writerThread.start(); 097 } 098 099 /** 100 * Creates an H2LogStore connected to a remote TCP server. 101 * 102 * <p>The server should be started using the {@code log-server} command 103 * before connecting. Schema is expected to be initialized by the server.</p> 104 * 105 * @param host H2 server hostname (typically "localhost") 106 * @param port H2 server TCP port 107 * @param dbPath database path on the server 108 * @throws SQLException if connection fails 109 */ 110 public H2LogStore(String host, int port, String dbPath) throws SQLException { 111 String url = "jdbc:h2:tcp://" + host + ":" + port + "/" + dbPath; 112 this.connection = DriverManager.getConnection(url); 113 this.writeQueue = new LinkedBlockingQueue<>(); 114 this.running = new AtomicBoolean(true); 115 116 // Schema should already be initialized by the server 117 verifySchema(); 118 119 this.writerThread = new Thread(this::writerLoop, "H2LogStore-Writer"); 120 this.writerThread.setDaemon(true); 121 this.writerThread.start(); 122 } 123 124 /** 125 * Factory method that attempts TCP connection with fallback to embedded mode. 126 * 127 * <p>If a log server address is specified and reachable, connects via TCP. 128 * Otherwise, falls back to embedded mode with AUTO_SERVER=TRUE.</p> 129 * 130 * @param logServer server address in "host:port" format (may be null) 131 * @param embeddedPath path for embedded database fallback 132 * @return H2LogStore instance 133 * @throws SQLException if both TCP and embedded connections fail 134 */ 135 public static H2LogStore createWithFallback(String logServer, Path embeddedPath) 136 throws SQLException { 137 if (logServer != null && !logServer.isBlank()) { 138 try { 139 String[] parts = logServer.split(":"); 140 String host = parts[0]; 141 int port = parts.length > 1 ? Integer.parseInt(parts[1]) : DEFAULT_TCP_PORT; 142 143 // Use the embedded path as the database name on the server 144 String dbPath = embeddedPath.toAbsolutePath().toString(); 145 146 H2LogStore store = new H2LogStore(host, port, dbPath); 147 LOG.info("Connected to log server at " + logServer); 148 return store; 149 } catch (SQLException e) { 150 LOG.warning("Failed to connect to log server '" + logServer + 151 "', falling back to embedded mode: " + e.getMessage()); 152 } catch (NumberFormatException e) { 153 LOG.warning("Invalid log server port in '" + logServer + 154 "', falling back to embedded mode"); 155 } 156 } 157 return new H2LogStore(embeddedPath); 158 } 159 160 /** 161 * Verifies the database schema exists (for TCP connections). 162 * 163 * @throws SQLException if schema is not initialized 164 */ 165 private void verifySchema() throws SQLException { 166 try (Statement stmt = connection.createStatement()) { 167 // Simple check - if this fails, schema doesn't exist 168 stmt.execute("SELECT 1 FROM sessions WHERE 1=0"); 169 stmt.execute("SELECT 1 FROM logs WHERE 1=0"); 170 stmt.execute("SELECT 1 FROM node_results WHERE 1=0"); 171 } catch (SQLException e) { 172 throw new SQLException("Database schema not initialized. " + 173 "Ensure log-server has been started at least once.", e); 174 } 175 } 176 177 private void initSchema() throws SQLException { 178 try (Statement stmt = connection.createStatement()) { 179 // Sessions table 180 stmt.execute(""" 181 CREATE TABLE IF NOT EXISTS sessions ( 182 id IDENTITY PRIMARY KEY, 183 started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 184 ended_at TIMESTAMP, 185 workflow_name VARCHAR(255), 186 overlay_name VARCHAR(255), 187 inventory_name VARCHAR(255), 188 node_count INT, 189 status VARCHAR(20) DEFAULT 'RUNNING' 190 ) 191 """); 192 193 // Add columns if they don't exist (migration for existing databases) 194 try { 195 stmt.execute("ALTER TABLE sessions ADD COLUMN IF NOT EXISTS overlay_name VARCHAR(255)"); 196 stmt.execute("ALTER TABLE sessions ADD COLUMN IF NOT EXISTS inventory_name VARCHAR(255)"); 197 } catch (SQLException e) { 198 // Ignore if columns already exist 199 } 200 201 // Logs table 202 // vertex_name and action_name use CLOB to store long YAML snippets 203 stmt.execute(""" 204 CREATE TABLE IF NOT EXISTS logs ( 205 id IDENTITY PRIMARY KEY, 206 session_id BIGINT, 207 timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 208 node_id VARCHAR(255) NOT NULL, 209 vertex_name CLOB, 210 action_name CLOB, 211 level VARCHAR(10) NOT NULL, 212 message CLOB, 213 exit_code INT, 214 duration_ms BIGINT, 215 FOREIGN KEY (session_id) REFERENCES sessions(id) 216 ) 217 """); 218 219 // Node results table 220 stmt.execute(""" 221 CREATE TABLE IF NOT EXISTS node_results ( 222 id IDENTITY PRIMARY KEY, 223 session_id BIGINT, 224 node_id VARCHAR(255) NOT NULL, 225 status VARCHAR(20) NOT NULL, 226 reason VARCHAR(1000), 227 FOREIGN KEY (session_id) REFERENCES sessions(id), 228 UNIQUE (session_id, node_id) 229 ) 230 """); 231 232 // Indexes 233 stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_session ON logs(session_id)"); 234 stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_node ON logs(node_id)"); 235 stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_level ON logs(level)"); 236 stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp)"); 237 stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_workflow ON sessions(workflow_name)"); 238 stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_overlay ON sessions(overlay_name)"); 239 stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_inventory ON sessions(inventory_name)"); 240 stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at)"); 241 } 242 } 243 244 private void writerLoop() { 245 List<LogTask> batch = new ArrayList<>(BATCH_SIZE); 246 while (running.get() || !writeQueue.isEmpty()) { 247 try { 248 LogTask task = writeQueue.poll(100, java.util.concurrent.TimeUnit.MILLISECONDS); 249 if (task != null) { 250 batch.add(task); 251 writeQueue.drainTo(batch, BATCH_SIZE - 1); 252 processBatch(batch); 253 batch.clear(); 254 } 255 } catch (InterruptedException e) { 256 Thread.currentThread().interrupt(); 257 break; 258 } 259 } 260 // Process remaining 261 if (!batch.isEmpty()) { 262 processBatch(batch); 263 } 264 } 265 266 private void processBatch(List<LogTask> batch) { 267 try { 268 connection.setAutoCommit(false); 269 for (LogTask task : batch) { 270 task.execute(connection); 271 } 272 connection.commit(); 273 connection.setAutoCommit(true); 274 } catch (SQLException e) { 275 try { 276 connection.rollback(); 277 connection.setAutoCommit(true); 278 } catch (SQLException ex) { 279 // Ignore rollback errors 280 } 281 e.printStackTrace(); 282 } 283 } 284 285 @Override 286 public long startSession(String workflowName, int nodeCount) { 287 return startSession(workflowName, null, null, nodeCount); 288 } 289 290 @Override 291 public long startSession(String workflowName, String overlayName, String inventoryName, int nodeCount) { 292 try (PreparedStatement ps = connection.prepareStatement( 293 "INSERT INTO sessions (workflow_name, overlay_name, inventory_name, node_count) VALUES (?, ?, ?, ?)", 294 Statement.RETURN_GENERATED_KEYS)) { 295 ps.setString(1, workflowName); 296 ps.setString(2, overlayName); 297 ps.setString(3, inventoryName); 298 ps.setInt(4, nodeCount); 299 ps.executeUpdate(); 300 try (ResultSet rs = ps.getGeneratedKeys()) { 301 if (rs.next()) { 302 return rs.getLong(1); 303 } 304 } 305 } catch (SQLException e) { 306 throw new RuntimeException("Failed to start session", e); 307 } 308 return -1; 309 } 310 311 @Override 312 public void log(long sessionId, String nodeId, LogLevel level, String message) { 313 log(sessionId, nodeId, null, level, message); 314 } 315 316 @Override 317 public void log(long sessionId, String nodeId, String vertexName, LogLevel level, String message) { 318 writeQueue.offer(new LogTask.InsertLog(sessionId, nodeId, vertexName, null, level, message, null, null)); 319 } 320 321 @Override 322 public void logAction(long sessionId, String nodeId, String vertexName, 323 String actionName, int exitCode, long durationMs, String output) { 324 LogLevel level = exitCode == 0 ? LogLevel.INFO : LogLevel.ERROR; 325 writeQueue.offer(new LogTask.InsertLog(sessionId, nodeId, vertexName, actionName, level, output, exitCode, durationMs)); 326 } 327 328 @Override 329 public void markNodeSuccess(long sessionId, String nodeId) { 330 writeQueue.offer(new LogTask.UpdateNodeResult(sessionId, nodeId, "SUCCESS", null)); 331 } 332 333 @Override 334 public void markNodeFailed(long sessionId, String nodeId, String reason) { 335 writeQueue.offer(new LogTask.UpdateNodeResult(sessionId, nodeId, "FAILED", reason)); 336 } 337 338 @Override 339 public void endSession(long sessionId, SessionStatus status) { 340 // Flush pending writes 341 flushWrites(); 342 343 try (PreparedStatement ps = connection.prepareStatement( 344 "UPDATE sessions SET ended_at = CURRENT_TIMESTAMP, status = ? WHERE id = ?")) { 345 ps.setString(1, status.name()); 346 ps.setLong(2, sessionId); 347 ps.executeUpdate(); 348 } catch (SQLException e) { 349 throw new RuntimeException("Failed to end session", e); 350 } 351 } 352 353 private void flushWrites() { 354 // Wait for queue to drain 355 while (!writeQueue.isEmpty()) { 356 try { 357 Thread.sleep(10); 358 } catch (InterruptedException e) { 359 Thread.currentThread().interrupt(); 360 break; 361 } 362 } 363 } 364 365 @Override 366 public List<LogEntry> getLogsByNode(long sessionId, String nodeId) { 367 List<LogEntry> entries = new ArrayList<>(); 368 try (PreparedStatement ps = connection.prepareStatement( 369 "SELECT * FROM logs WHERE session_id = ? AND node_id = ? ORDER BY timestamp")) { 370 ps.setLong(1, sessionId); 371 ps.setString(2, nodeId); 372 try (ResultSet rs = ps.executeQuery()) { 373 while (rs.next()) { 374 entries.add(mapLogEntry(rs)); 375 } 376 } 377 } catch (SQLException e) { 378 throw new RuntimeException("Failed to query logs", e); 379 } 380 return entries; 381 } 382 383 @Override 384 public List<LogEntry> getLogsByLevel(long sessionId, LogLevel minLevel) { 385 List<LogEntry> entries = new ArrayList<>(); 386 try (PreparedStatement ps = connection.prepareStatement( 387 "SELECT * FROM logs WHERE session_id = ? AND level IN (?, ?, ?, ?) ORDER BY timestamp")) { 388 ps.setLong(1, sessionId); 389 int idx = 2; 390 for (LogLevel level : LogLevel.values()) { 391 if (level.isAtLeast(minLevel)) { 392 ps.setString(idx++, level.name()); 393 } else { 394 ps.setString(idx++, "NONE"); // Placeholder 395 } 396 } 397 try (ResultSet rs = ps.executeQuery()) { 398 while (rs.next()) { 399 entries.add(mapLogEntry(rs)); 400 } 401 } 402 } catch (SQLException e) { 403 throw new RuntimeException("Failed to query logs", e); 404 } 405 return entries; 406 } 407 408 @Override 409 public SessionSummary getSummary(long sessionId) { 410 try { 411 // Get session info 412 String workflowName = null; 413 String overlayName = null; 414 String inventoryName = null; 415 LocalDateTime startedAt = null; 416 LocalDateTime endedAt = null; 417 int nodeCount = 0; 418 SessionStatus status = SessionStatus.RUNNING; 419 420 try (PreparedStatement ps = connection.prepareStatement( 421 "SELECT * FROM sessions WHERE id = ?")) { 422 ps.setLong(1, sessionId); 423 try (ResultSet rs = ps.executeQuery()) { 424 if (rs.next()) { 425 workflowName = rs.getString("workflow_name"); 426 overlayName = rs.getString("overlay_name"); 427 inventoryName = rs.getString("inventory_name"); 428 Timestamp ts = rs.getTimestamp("started_at"); 429 startedAt = ts != null ? ts.toLocalDateTime() : null; 430 ts = rs.getTimestamp("ended_at"); 431 endedAt = ts != null ? ts.toLocalDateTime() : null; 432 nodeCount = rs.getInt("node_count"); 433 String statusStr = rs.getString("status"); 434 if (statusStr != null) { 435 status = SessionStatus.valueOf(statusStr); 436 } 437 } 438 } 439 } 440 441 // Get node results 442 int successCount = 0; 443 int failedCount = 0; 444 List<String> failedNodes = new ArrayList<>(); 445 446 try (PreparedStatement ps = connection.prepareStatement( 447 "SELECT node_id, status FROM node_results WHERE session_id = ?")) { 448 ps.setLong(1, sessionId); 449 try (ResultSet rs = ps.executeQuery()) { 450 while (rs.next()) { 451 String nodeStatus = rs.getString("status"); 452 if ("SUCCESS".equals(nodeStatus)) { 453 successCount++; 454 } else { 455 failedCount++; 456 failedNodes.add(rs.getString("node_id")); 457 } 458 } 459 } 460 } 461 462 // Get log counts 463 int totalLogEntries = 0; 464 int errorCount = 0; 465 466 try (PreparedStatement ps = connection.prepareStatement( 467 "SELECT COUNT(*) as total, SUM(CASE WHEN level = 'ERROR' THEN 1 ELSE 0 END) as errors " + 468 "FROM logs WHERE session_id = ?")) { 469 ps.setLong(1, sessionId); 470 try (ResultSet rs = ps.executeQuery()) { 471 if (rs.next()) { 472 totalLogEntries = rs.getInt("total"); 473 errorCount = rs.getInt("errors"); 474 } 475 } 476 } 477 478 return new SessionSummary(sessionId, workflowName, overlayName, inventoryName, 479 startedAt, endedAt, nodeCount, status, successCount, failedCount, 480 failedNodes, totalLogEntries, errorCount); 481 482 } catch (SQLException e) { 483 throw new RuntimeException("Failed to get session summary", e); 484 } 485 } 486 487 @Override 488 public long getLatestSessionId() { 489 try (Statement stmt = connection.createStatement(); 490 ResultSet rs = stmt.executeQuery("SELECT MAX(id) FROM sessions")) { 491 if (rs.next()) { 492 return rs.getLong(1); 493 } 494 } catch (SQLException e) { 495 throw new RuntimeException("Failed to get latest session", e); 496 } 497 return -1; 498 } 499 500 @Override 501 public List<SessionSummary> listSessions(int limit) { 502 List<SessionSummary> sessions = new ArrayList<>(); 503 try (PreparedStatement ps = connection.prepareStatement( 504 "SELECT id FROM sessions ORDER BY started_at DESC LIMIT ?")) { 505 ps.setInt(1, limit); 506 try (ResultSet rs = ps.executeQuery()) { 507 while (rs.next()) { 508 sessions.add(getSummary(rs.getLong("id"))); 509 } 510 } 511 } catch (SQLException e) { 512 throw new RuntimeException("Failed to list sessions", e); 513 } 514 return sessions; 515 } 516 517 private LogEntry mapLogEntry(ResultSet rs) throws SQLException { 518 Timestamp ts = rs.getTimestamp("timestamp"); 519 Integer exitCode = rs.getObject("exit_code") != null ? rs.getInt("exit_code") : null; 520 Long durationMs = rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null; 521 String levelStr = rs.getString("level"); 522 LogLevel level = levelStr != null ? LogLevel.valueOf(levelStr) : LogLevel.INFO; 523 524 return new LogEntry( 525 rs.getLong("id"), 526 rs.getLong("session_id"), 527 ts != null ? ts.toLocalDateTime() : null, 528 rs.getString("node_id"), 529 rs.getString("vertex_name"), 530 rs.getString("action_name"), 531 level, 532 rs.getString("message"), 533 exitCode, 534 durationMs 535 ); 536 } 537 538 @Override 539 public void close() throws Exception { 540 running.set(false); 541 writerThread.interrupt(); 542 try { 543 writerThread.join(5000); 544 } catch (InterruptedException e) { 545 Thread.currentThread().interrupt(); 546 } 547 connection.close(); 548 } 549 550 // Internal task classes for async writing 551 private interface LogTask { 552 void execute(Connection conn) throws SQLException; 553 554 record InsertLog(long sessionId, String nodeId, String vertexName, String actionName, 555 LogLevel level, String message, Integer exitCode, Long durationMs) implements LogTask { 556 @Override 557 public void execute(Connection conn) throws SQLException { 558 try (PreparedStatement ps = conn.prepareStatement( 559 "INSERT INTO logs (session_id, node_id, vertex_name, action_name, level, message, exit_code, duration_ms) " + 560 "VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { 561 ps.setLong(1, sessionId); 562 ps.setString(2, nodeId); 563 ps.setString(3, vertexName); 564 ps.setString(4, actionName); 565 ps.setString(5, level.name()); 566 ps.setString(6, message); 567 if (exitCode != null) { 568 ps.setInt(7, exitCode); 569 } else { 570 ps.setNull(7, Types.INTEGER); 571 } 572 if (durationMs != null) { 573 ps.setLong(8, durationMs); 574 } else { 575 ps.setNull(8, Types.BIGINT); 576 } 577 ps.executeUpdate(); 578 } 579 } 580 } 581 582 record UpdateNodeResult(long sessionId, String nodeId, String status, String reason) implements LogTask { 583 @Override 584 public void execute(Connection conn) throws SQLException { 585 try (PreparedStatement ps = conn.prepareStatement( 586 "MERGE INTO node_results (session_id, node_id, status, reason) KEY (session_id, node_id) VALUES (?, ?, ?, ?)")) { 587 ps.setLong(1, sessionId); 588 ps.setString(2, nodeId); 589 ps.setString(3, status); 590 ps.setString(4, reason); 591 ps.executeUpdate(); 592 } 593 } 594 } 595 } 596}