001/* 002 * Copyright 2025 devteam@scivicslab.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, 011 * software distributed under the License is distributed on an 012 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the 014 * specific language governing permissions and limitations 015 * under the License. 016 */ 017 018package com.scivicslab.actoriac.log; 019 020import java.io.BufferedWriter; 021import java.io.FileWriter; 022import java.io.IOException; 023import java.io.PrintWriter; 024import java.nio.file.Path; 025import java.sql.*; 026import java.time.LocalDateTime; 027import java.time.ZoneId; 028import java.time.format.DateTimeFormatter; 029import java.util.ArrayList; 030import java.util.List; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.LinkedBlockingQueue; 033import java.util.concurrent.atomic.AtomicBoolean; 034import java.util.logging.Logger; 035 036/** 037 * H2 Database implementation of DistributedLogStore. 038 * 039 * <p>Uses H2 embedded database for storing logs from distributed workflow 040 * execution. Supports concurrent access from multiple threads and provides 041 * efficient querying by node, level, and time.</p> 042 * 043 * <p>Features:</p> 044 * <ul> 045 * <li>Pure Java - no native dependencies</li> 046 * <li>Single file storage (.mv.db)</li> 047 * <li>Asynchronous batch writing for performance</li> 048 * <li>SQL-based querying</li> 049 * </ul> 050 * 051 * @author devteam@scivicslab.com 052 */ 053public class H2LogStore implements DistributedLogStore { 054 055 private static final Logger LOG = Logger.getLogger(H2LogStore.class.getName()); 056 private static final DateTimeFormatter ISO_FORMATTER = 057 DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXX"); 058 private static final ZoneId SYSTEM_ZONE = ZoneId.systemDefault(); 059 060 private final Connection connection; 061 private final H2LogReader reader; 062 private final BlockingQueue<LogTask> writeQueue; 063 private final Thread writerThread; 064 private final AtomicBoolean running; 065 private static final int BATCH_SIZE = 100; 066 067 /** 068 * Optional text log file writer. 069 * When set, log entries are also written to this text file. 070 */ 071 private PrintWriter textLogWriter; 072 073 /** 074 * Creates an H2LogStore with the specified database path. 075 * 076 * @param dbPath path to the database file (without extension) 077 * @throws SQLException if database connection fails 078 */ 079 public H2LogStore(Path dbPath) throws SQLException { 080 // AUTO_SERVER=TRUE allows multiple processes to access the same DB 081 // First process starts embedded server, others connect via TCP 082 String url = "jdbc:h2:" + dbPath.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE"; 083 this.connection = DriverManager.getConnection(url); 084 this.reader = new H2LogReader(connection); 085 this.writeQueue = new LinkedBlockingQueue<>(); 086 this.running = new AtomicBoolean(true); 087 088 initSchema(); 089 090 // Start async writer thread 091 this.writerThread = new Thread(this::writerLoop, "H2LogStore-Writer"); 092 this.writerThread.setDaemon(true); 093 this.writerThread.start(); 094 } 095 096 /** 097 * Creates an in-memory H2LogStore (for testing). 098 * 099 * @throws SQLException if database connection fails 100 */ 101 public H2LogStore() throws SQLException { 102 // Use unique DB name per instance to avoid test interference 103 String url = "jdbc:h2:mem:testdb_" + System.nanoTime() + ";DB_CLOSE_DELAY=-1"; 104 this.connection = DriverManager.getConnection(url); 105 this.reader = new H2LogReader(connection); 106 this.writeQueue = new LinkedBlockingQueue<>(); 107 this.running = new AtomicBoolean(true); 108 109 initSchema(); 110 111 this.writerThread = new Thread(this::writerLoop, "H2LogStore-Writer"); 112 this.writerThread.setDaemon(true); 113 this.writerThread.start(); 114 } 115 116 /** 117 * Gets the database connection for read-only operations. 118 * 119 * <p>This connection is shared and should NOT be closed by the caller. 120 * The connection will be closed when the H2LogStore is closed.</p> 121 * 122 * @return the JDBC connection 123 */ 124 @Override 125 public Connection getConnection() { 126 return this.connection; 127 } 128 129 /** 130 * Sets the text log file for additional text-based logging. 131 * 132 * <p>When a text log file is set, all log entries written to the database 133 * are also appended to this text file in a human-readable format.</p> 134 * 135 * @param textLogPath path to the text log file 136 * @throws IOException if the file cannot be opened for writing 137 */ 138 public void setTextLogFile(Path textLogPath) throws IOException { 139 if (this.textLogWriter != null) { 140 this.textLogWriter.close(); 141 } 142 this.textLogWriter = new PrintWriter(new BufferedWriter(new FileWriter(textLogPath.toFile(), true)), true); 143 LOG.info("Text logging enabled: " + textLogPath); 144 } 145 146 /** 147 * Disables text file logging. 148 */ 149 public void disableTextLog() { 150 if (this.textLogWriter != null) { 151 this.textLogWriter.close(); 152 this.textLogWriter = null; 153 } 154 } 155 156 private void initSchema() throws SQLException { 157 initSchema(connection); 158 } 159 160 /** 161 * Initializes the log database schema on the given connection. 162 * 163 * <p>This method can be called from external components (e.g., log server) 164 * to ensure consistent schema across all database access points.</p> 165 * 166 * @param conn the database connection 167 * @throws SQLException if schema initialization fails 168 */ 169 public static void initSchema(Connection conn) throws SQLException { 170 try (Statement stmt = conn.createStatement()) { 171 // Sessions table 172 stmt.execute(""" 173 CREATE TABLE IF NOT EXISTS sessions ( 174 id IDENTITY PRIMARY KEY, 175 started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 176 ended_at TIMESTAMP, 177 workflow_name VARCHAR(255), 178 overlay_name VARCHAR(255), 179 inventory_name VARCHAR(255), 180 node_count INT, 181 status VARCHAR(20) DEFAULT 'RUNNING', 182 cwd VARCHAR(1000), 183 git_commit VARCHAR(50), 184 git_branch VARCHAR(255), 185 command_line VARCHAR(2000), 186 actoriac_version VARCHAR(50), 187 actoriac_commit VARCHAR(50) 188 ) 189 """); 190 191 // Logs table 192 // label and action_name use CLOB to store long YAML snippets 193 stmt.execute(""" 194 CREATE TABLE IF NOT EXISTS logs ( 195 id IDENTITY PRIMARY KEY, 196 session_id BIGINT, 197 timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 198 node_id VARCHAR(255) NOT NULL, 199 label CLOB, 200 action_name CLOB, 201 level VARCHAR(10) NOT NULL, 202 message CLOB, 203 exit_code INT, 204 duration_ms BIGINT, 205 FOREIGN KEY (session_id) REFERENCES sessions(id) 206 ) 207 """); 208 209 // Node results table 210 stmt.execute(""" 211 CREATE TABLE IF NOT EXISTS node_results ( 212 id IDENTITY PRIMARY KEY, 213 session_id BIGINT, 214 node_id VARCHAR(255) NOT NULL, 215 status VARCHAR(20) NOT NULL, 216 reason VARCHAR(1000), 217 FOREIGN KEY (session_id) REFERENCES sessions(id), 218 UNIQUE (session_id, node_id) 219 ) 220 """); 221 222 // Indexes 223 stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_session ON logs(session_id)"); 224 stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_node ON logs(node_id)"); 225 stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_level ON logs(level)"); 226 stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp)"); 227 stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_workflow ON sessions(workflow_name)"); 228 stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_overlay ON sessions(overlay_name)"); 229 stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_inventory ON sessions(inventory_name)"); 230 stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at)"); 231 } 232 } 233 234 private void writerLoop() { 235 List<LogTask> batch = new ArrayList<>(BATCH_SIZE); 236 while (running.get() || !writeQueue.isEmpty()) { 237 try { 238 LogTask task = writeQueue.poll(100, java.util.concurrent.TimeUnit.MILLISECONDS); 239 if (task != null) { 240 batch.add(task); 241 writeQueue.drainTo(batch, BATCH_SIZE - 1); 242 processBatch(batch); 243 batch.clear(); 244 } 245 } catch (InterruptedException e) { 246 Thread.currentThread().interrupt(); 247 break; 248 } 249 } 250 // Process remaining 251 if (!batch.isEmpty()) { 252 processBatch(batch); 253 } 254 } 255 256 private void processBatch(List<LogTask> batch) { 257 try { 258 connection.setAutoCommit(false); 259 for (LogTask task : batch) { 260 task.execute(connection); 261 } 262 connection.commit(); 263 connection.setAutoCommit(true); 264 } catch (SQLException e) { 265 try { 266 connection.rollback(); 267 connection.setAutoCommit(true); 268 } catch (SQLException ex) { 269 // Ignore rollback errors 270 } 271 e.printStackTrace(); 272 } 273 } 274 275 @Override 276 public long startSession(String workflowName, int nodeCount) { 277 return startSession(workflowName, null, null, nodeCount); 278 } 279 280 @Override 281 public long startSession(String workflowName, String overlayName, String inventoryName, int nodeCount) { 282 return startSession(workflowName, overlayName, inventoryName, nodeCount, 283 null, null, null, null, null, null); 284 } 285 286 @Override 287 public long startSession(String workflowName, String overlayName, String inventoryName, int nodeCount, 288 String cwd, String gitCommit, String gitBranch, 289 String commandLine, String actorIacVersion, String actorIacCommit) { 290 try (PreparedStatement ps = connection.prepareStatement( 291 "INSERT INTO sessions (workflow_name, overlay_name, inventory_name, node_count, " + 292 "cwd, git_commit, git_branch, command_line, actoriac_version, actoriac_commit) " + 293 "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 294 Statement.RETURN_GENERATED_KEYS)) { 295 ps.setString(1, workflowName); 296 ps.setString(2, overlayName); 297 ps.setString(3, inventoryName); 298 ps.setInt(4, nodeCount); 299 ps.setString(5, cwd); 300 ps.setString(6, gitCommit); 301 ps.setString(7, gitBranch); 302 ps.setString(8, commandLine); 303 ps.setString(9, actorIacVersion); 304 ps.setString(10, actorIacCommit); 305 ps.executeUpdate(); 306 try (ResultSet rs = ps.getGeneratedKeys()) { 307 if (rs.next()) { 308 return rs.getLong(1); 309 } 310 } 311 } catch (SQLException e) { 312 throw new RuntimeException("Failed to start session", e); 313 } 314 return -1; 315 } 316 317 @Override 318 public void log(long sessionId, String nodeId, LogLevel level, String message) { 319 log(sessionId, nodeId, null, level, message); 320 } 321 322 @Override 323 public void log(long sessionId, String nodeId, String label, LogLevel level, String message) { 324 writeQueue.offer(new LogTask.InsertLog(sessionId, nodeId, label, null, level, message, null, null)); 325 writeToTextLog(nodeId, label, level, message); 326 } 327 328 @Override 329 public void logAction(long sessionId, String nodeId, String label, 330 String actionName, int exitCode, long durationMs, String output) { 331 LogLevel level = parseLogLevelFromLabel(label, exitCode); 332 writeQueue.offer(new LogTask.InsertLog(sessionId, nodeId, label, actionName, level, output, exitCode, durationMs)); 333 writeToTextLog(nodeId, label, level, output); 334 } 335 336 /** 337 * Parses log level from the label parameter. 338 * 339 * <p>Maps java.util.logging levels to LogLevel:</p> 340 * <ul> 341 * <li>SEVERE → ERROR</li> 342 * <li>WARNING → WARN</li> 343 * <li>INFO → INFO</li> 344 * <li>CONFIG, FINE, FINER, FINEST → DEBUG</li> 345 * </ul> 346 */ 347 private LogLevel parseLogLevelFromLabel(String label, int exitCode) { 348 if (label != null && label.startsWith("log-")) { 349 String levelName = label.substring(4).toUpperCase(); 350 return switch (levelName) { 351 case "SEVERE" -> LogLevel.ERROR; 352 case "WARNING" -> LogLevel.WARN; 353 case "INFO" -> LogLevel.INFO; 354 case "CONFIG", "FINE", "FINER", "FINEST" -> LogLevel.DEBUG; 355 default -> exitCode == 0 ? LogLevel.INFO : LogLevel.ERROR; 356 }; 357 } 358 return exitCode == 0 ? LogLevel.INFO : LogLevel.ERROR; 359 } 360 361 /** 362 * Writes a log entry to the text log file if enabled. 363 * 364 * @param nodeId the node identifier 365 * @param label the workflow label (may be null) 366 * @param level the log level 367 * @param message the log message 368 */ 369 private void writeToTextLog(String nodeId, String label, LogLevel level, String message) { 370 if (textLogWriter == null) { 371 return; 372 } 373 String timestamp = LocalDateTime.now().atZone(SYSTEM_ZONE).format(ISO_FORMATTER); 374 String labelPart = label != null ? " [" + label + "]" : ""; 375 textLogWriter.printf("%s %s %s%s %s%n", timestamp, level, nodeId, labelPart, message); 376 } 377 378 @Override 379 public void markNodeSuccess(long sessionId, String nodeId) { 380 writeQueue.offer(new LogTask.UpdateNodeResult(sessionId, nodeId, "SUCCESS", null)); 381 } 382 383 @Override 384 public void markNodeFailed(long sessionId, String nodeId, String reason) { 385 writeQueue.offer(new LogTask.UpdateNodeResult(sessionId, nodeId, "FAILED", reason)); 386 } 387 388 @Override 389 public void endSession(long sessionId, SessionStatus status) { 390 // Flush pending writes 391 flushWrites(); 392 393 try (PreparedStatement ps = connection.prepareStatement( 394 "UPDATE sessions SET ended_at = CURRENT_TIMESTAMP, status = ? WHERE id = ?")) { 395 ps.setString(1, status.name()); 396 ps.setLong(2, sessionId); 397 ps.executeUpdate(); 398 } catch (SQLException e) { 399 throw new RuntimeException("Failed to end session", e); 400 } 401 } 402 403 private void flushWrites() { 404 // Wait for queue to drain 405 while (!writeQueue.isEmpty()) { 406 try { 407 Thread.sleep(10); 408 } catch (InterruptedException e) { 409 Thread.currentThread().interrupt(); 410 break; 411 } 412 } 413 } 414 415 @Override 416 public List<LogEntry> getLogsByNode(long sessionId, String nodeId) { 417 return reader.getLogsByNode(sessionId, nodeId); 418 } 419 420 @Override 421 public List<LogEntry> getLogsByLevel(long sessionId, LogLevel minLevel) { 422 return reader.getLogsByLevel(sessionId, minLevel); 423 } 424 425 @Override 426 public SessionSummary getSummary(long sessionId) { 427 return reader.getSummary(sessionId); 428 } 429 430 @Override 431 public long getLatestSessionId() { 432 return reader.getLatestSessionId(); 433 } 434 435 @Override 436 public List<SessionSummary> listSessions(int limit) { 437 return reader.listSessions(limit); 438 } 439 440 @Override 441 public void close() throws Exception { 442 running.set(false); 443 writerThread.interrupt(); 444 try { 445 writerThread.join(5000); 446 } catch (InterruptedException e) { 447 Thread.currentThread().interrupt(); 448 } 449 if (textLogWriter != null) { 450 textLogWriter.close(); 451 textLogWriter = null; 452 } 453 connection.close(); 454 } 455 456 // Internal task classes for async writing 457 private interface LogTask { 458 void execute(Connection conn) throws SQLException; 459 460 record InsertLog(long sessionId, String nodeId, String label, String actionName, 461 LogLevel level, String message, Integer exitCode, Long durationMs) implements LogTask { 462 @Override 463 public void execute(Connection conn) throws SQLException { 464 try (PreparedStatement ps = conn.prepareStatement( 465 "INSERT INTO logs (session_id, node_id, label, action_name, level, message, exit_code, duration_ms) " + 466 "VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { 467 ps.setLong(1, sessionId); 468 ps.setString(2, nodeId); 469 ps.setString(3, label); 470 ps.setString(4, actionName); 471 ps.setString(5, level.name()); 472 ps.setString(6, message); 473 if (exitCode != null) { 474 ps.setInt(7, exitCode); 475 } else { 476 ps.setNull(7, Types.INTEGER); 477 } 478 if (durationMs != null) { 479 ps.setLong(8, durationMs); 480 } else { 481 ps.setNull(8, Types.BIGINT); 482 } 483 ps.executeUpdate(); 484 } 485 } 486 } 487 488 record UpdateNodeResult(long sessionId, String nodeId, String status, String reason) implements LogTask { 489 @Override 490 public void execute(Connection conn) throws SQLException { 491 try (PreparedStatement ps = conn.prepareStatement( 492 "MERGE INTO node_results (session_id, node_id, status, reason) KEY (session_id, node_id) VALUES (?, ?, ?, ?)")) { 493 ps.setLong(1, sessionId); 494 ps.setString(2, nodeId); 495 ps.setString(3, status); 496 ps.setString(4, reason); 497 ps.executeUpdate(); 498 } 499 } 500 } 501 } 502}