001/* 002 * Copyright 2025 devteam@scivics-lab.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, 011 * software distributed under the License is distributed on an 012 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the 014 * specific language governing permissions and limitations 015 * under the License. 016 */ 017 018package com.scivicslab.actoriac.cli; 019 020import java.io.File; 021import java.io.IOException; 022import java.nio.file.Files; 023import java.nio.file.Path; 024import java.sql.*; 025import java.util.*; 026import java.util.concurrent.Callable; 027import java.util.stream.Stream; 028 029import picocli.CommandLine.Command; 030import picocli.CommandLine.Option; 031import picocli.CommandLine.Parameters; 032 033/** 034 * CLI subcommand to merge scattered log databases into a single database. 035 * 036 * <p>Before the log-server feature, each workflow run would create its own 037 * separate database file. This command consolidates them into one database.</p> 038 * 039 * <h2>Usage Examples</h2> 040 * <pre> 041 * # Scan a directory for .mv.db files and merge them 042 * actor-iac merge-logs --scan ./workflows --target ./logs/merged 043 * 044 * # Merge specific database files 045 * actor-iac merge-logs --target ./logs/merged ./db1 ./db2 ./db3 046 * 047 * # Dry-run to see what would be merged 048 * actor-iac merge-logs --scan ./workflows --target ./logs/merged --dry-run 049 * </pre> 050 * 051 * @author devteam@scivics-lab.com 052 * @since 2.10.0 053 */ 054@Command( 055 name = "log-merge", 056 mixinStandardHelpOptions = true, 057 version = "actor-IaC log-merge 2.10.0", 058 description = "Merge scattered log databases into a single database." 059) 060public class MergeLogsCLI implements Callable<Integer> { 061 062 @Option( 063 names = {"--target"}, 064 required = true, 065 description = "Target database file path (without .mv.db extension)" 066 ) 067 private File targetDb; 068 069 @Option( 070 names = {"--scan"}, 071 description = "Directory to scan for .mv.db files (recursive)" 072 ) 073 private File scanDir; 074 075 @Parameters( 076 paramLabel = "SOURCE", 077 description = "Source database files to merge (without .mv.db extension)", 078 arity = "0..*" 079 ) 080 private List<File> sourceDbs = new ArrayList<>(); 081 082 @Option( 083 names = {"--dry-run"}, 084 description = "Show what would be merged without actually merging" 085 ) 086 private boolean dryRun; 087 088 @Option( 089 names = {"-v", "--verbose"}, 090 description = "Enable verbose output" 091 ) 092 private boolean verbose; 093 094 @Option( 095 names = {"--skip-duplicates"}, 096 description = "Skip sessions that already exist in target (based on workflow_name and started_at)", 097 defaultValue = "true" 098 ) 099 private boolean skipDuplicates; 100 101 /** Statistics for reporting */ 102 private int totalSessions = 0; 103 private int totalLogs = 0; 104 private int totalNodeResults = 0; 105 private int skippedSessions = 0; 106 107 @Override 108 public Integer call() { 109 // Collect source databases 110 List<File> allSources = collectSourceDatabases(); 111 if (allSources.isEmpty()) { 112 System.err.println("No source databases found."); 113 System.err.println("Use --scan <dir> to scan for databases, or specify source files directly."); 114 return 1; 115 } 116 117 // Filter out target from sources if present 118 String targetPath = targetDb.getAbsolutePath(); 119 allSources.removeIf(f -> f.getAbsolutePath().equals(targetPath)); 120 121 if (allSources.isEmpty()) { 122 System.err.println("No source databases to merge (target was the only database found)."); 123 return 1; 124 } 125 126 System.out.println("=".repeat(60)); 127 System.out.println("Log Database Merge"); 128 System.out.println("=".repeat(60)); 129 System.out.println("Target: " + targetDb.getAbsolutePath() + ".mv.db"); 130 System.out.println("Sources: " + allSources.size() + " database(s)"); 131 if (verbose) { 132 for (File source : allSources) { 133 System.out.println(" - " + source.getAbsolutePath() + ".mv.db"); 134 } 135 } 136 System.out.println("-".repeat(60)); 137 138 if (dryRun) { 139 System.out.println("[DRY-RUN MODE - No changes will be made]"); 140 System.out.println(); 141 return dryRunAnalysis(allSources); 142 } 143 144 try { 145 return performMerge(allSources); 146 } catch (SQLException e) { 147 System.err.println("Database error: " + e.getMessage()); 148 if (verbose) { 149 e.printStackTrace(); 150 } 151 return 1; 152 } 153 } 154 155 /** 156 * Collects all source database files from scan directory and explicit parameters. 157 */ 158 private List<File> collectSourceDatabases() { 159 List<File> sources = new ArrayList<>(); 160 161 // Add explicitly specified sources 162 for (File source : sourceDbs) { 163 File dbFile = new File(source.getAbsolutePath() + ".mv.db"); 164 if (dbFile.exists()) { 165 sources.add(source); 166 } else { 167 System.err.println("Warning: Database not found: " + dbFile.getAbsolutePath()); 168 } 169 } 170 171 // Scan directory for .mv.db files 172 if (scanDir != null) { 173 if (!scanDir.isDirectory()) { 174 System.err.println("Warning: Not a directory: " + scanDir); 175 } else { 176 try (Stream<Path> paths = Files.walk(scanDir.toPath())) { 177 paths.filter(Files::isRegularFile) 178 .filter(p -> p.toString().endsWith(".mv.db")) 179 .map(p -> { 180 // Remove .mv.db extension 181 String path = p.toString(); 182 return new File(path.substring(0, path.length() - 6)); 183 }) 184 .filter(f -> !sources.contains(f)) 185 .forEach(sources::add); 186 } catch (IOException e) { 187 System.err.println("Warning: Failed to scan directory: " + e.getMessage()); 188 } 189 } 190 } 191 192 return sources; 193 } 194 195 /** 196 * Performs dry-run analysis showing what would be merged. 197 */ 198 private int dryRunAnalysis(List<File> sources) { 199 int totalSes = 0; 200 int totalLog = 0; 201 int totalNr = 0; 202 203 for (File source : sources) { 204 try (Connection conn = openDatabase(source)) { 205 // Skip databases without sessions table 206 if (!tableExists(conn, "sessions")) { 207 System.out.printf("%-50s (empty - no sessions table)%n", 208 truncate(source.getName(), 50)); 209 continue; 210 } 211 212 int sessions = countRows(conn, "sessions"); 213 int logs = tableExists(conn, "logs") ? countRows(conn, "logs") : 0; 214 int nodeResults = tableExists(conn, "node_results") ? countRows(conn, "node_results") : 0; 215 216 totalSes += sessions; 217 totalLog += logs; 218 totalNr += nodeResults; 219 220 System.out.printf("%-50s sessions: %4d logs: %6d node_results: %4d%n", 221 truncate(source.getName(), 50), sessions, logs, nodeResults); 222 223 } catch (SQLException e) { 224 System.err.println("Error reading " + source + ": " + e.getMessage()); 225 } 226 } 227 228 System.out.println("-".repeat(60)); 229 System.out.printf("%-50s sessions: %4d logs: %6d node_results: %4d%n", 230 "TOTAL", totalSes, totalLog, totalNr); 231 System.out.println("=".repeat(60)); 232 233 return 0; 234 } 235 236 /** 237 * Performs the actual merge operation. 238 */ 239 private int performMerge(List<File> sources) throws SQLException { 240 // Open/create target database 241 try (Connection targetConn = openDatabase(targetDb)) { 242 initializeSchema(targetConn); 243 244 // Load existing sessions for duplicate detection 245 Set<String> existingSessions = skipDuplicates ? loadExistingSessions(targetConn) : Set.of(); 246 247 for (File source : sources) { 248 System.out.println("Merging: " + source.getName() + ".mv.db"); 249 try (Connection sourceConn = openDatabase(source)) { 250 mergeDatabase(sourceConn, targetConn, existingSessions, source.getName()); 251 } catch (SQLException e) { 252 System.err.println(" Error: " + e.getMessage()); 253 if (verbose) { 254 e.printStackTrace(); 255 } 256 } 257 } 258 259 targetConn.commit(); 260 } 261 262 // Print summary 263 System.out.println("-".repeat(60)); 264 System.out.println("Merge completed:"); 265 System.out.println(" Sessions merged: " + totalSessions); 266 System.out.println(" Sessions skipped: " + skippedSessions + " (duplicates)"); 267 System.out.println(" Log entries merged: " + totalLogs); 268 System.out.println(" Node results merged: " + totalNodeResults); 269 System.out.println("=".repeat(60)); 270 271 return 0; 272 } 273 274 /** 275 * Opens a database connection. 276 */ 277 private Connection openDatabase(File dbPath) throws SQLException { 278 String url = "jdbc:h2:" + dbPath.getAbsolutePath(); 279 Connection conn = DriverManager.getConnection(url); 280 conn.setAutoCommit(false); 281 return conn; 282 } 283 284 /** 285 * Initializes the target database schema. 286 */ 287 private void initializeSchema(Connection conn) throws SQLException { 288 try (Statement stmt = conn.createStatement()) { 289 stmt.execute(""" 290 CREATE TABLE IF NOT EXISTS sessions ( 291 id IDENTITY PRIMARY KEY, 292 started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 293 ended_at TIMESTAMP, 294 workflow_name VARCHAR(255), 295 overlay_name VARCHAR(255), 296 inventory_name VARCHAR(255), 297 node_count INT, 298 status VARCHAR(20) DEFAULT 'RUNNING', 299 source_db VARCHAR(255) 300 ) 301 """); 302 303 stmt.execute(""" 304 CREATE TABLE IF NOT EXISTS logs ( 305 id IDENTITY PRIMARY KEY, 306 session_id BIGINT, 307 timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 308 node_id VARCHAR(255) NOT NULL, 309 vertex_name CLOB, 310 action_name CLOB, 311 level VARCHAR(10) NOT NULL, 312 message CLOB, 313 exit_code INT, 314 duration_ms BIGINT, 315 FOREIGN KEY (session_id) REFERENCES sessions(id) 316 ) 317 """); 318 319 stmt.execute(""" 320 CREATE TABLE IF NOT EXISTS node_results ( 321 id IDENTITY PRIMARY KEY, 322 session_id BIGINT, 323 node_id VARCHAR(255) NOT NULL, 324 status VARCHAR(20) NOT NULL, 325 reason VARCHAR(1000), 326 FOREIGN KEY (session_id) REFERENCES sessions(id) 327 ) 328 """); 329 330 // Add source_db column if it doesn't exist (migration) 331 try { 332 stmt.execute("ALTER TABLE sessions ADD COLUMN IF NOT EXISTS source_db VARCHAR(255)"); 333 } catch (SQLException e) { 334 // Column might already exist 335 } 336 337 // Create indexes 338 stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_session ON logs(session_id)"); 339 stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp)"); 340 stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at)"); 341 stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_workflow ON sessions(workflow_name)"); 342 } 343 conn.commit(); 344 } 345 346 /** 347 * Loads existing session keys for duplicate detection. 348 */ 349 private Set<String> loadExistingSessions(Connection conn) throws SQLException { 350 Set<String> existing = new HashSet<>(); 351 try (Statement stmt = conn.createStatement(); 352 ResultSet rs = stmt.executeQuery("SELECT workflow_name, started_at FROM sessions")) { 353 while (rs.next()) { 354 String key = makeSessionKey(rs.getString("workflow_name"), rs.getTimestamp("started_at")); 355 existing.add(key); 356 } 357 } 358 return existing; 359 } 360 361 /** 362 * Creates a unique key for duplicate detection. 363 */ 364 private String makeSessionKey(String workflowName, Timestamp startedAt) { 365 return (workflowName != null ? workflowName : "") + "|" + 366 (startedAt != null ? startedAt.toString() : ""); 367 } 368 369 /** 370 * Checks if a table exists in the database. 371 */ 372 private boolean tableExists(Connection conn, String tableName) { 373 // Use a simple query to check if table exists - more reliable than metadata 374 try (Statement stmt = conn.createStatement()) { 375 stmt.executeQuery("SELECT 1 FROM " + tableName + " WHERE 1=0"); 376 return true; 377 } catch (SQLException e) { 378 // Table doesn't exist 379 return false; 380 } 381 } 382 383 /** 384 * Merges one source database into the target. 385 */ 386 private void mergeDatabase(Connection source, Connection target, 387 Set<String> existingSessions, String sourceName) throws SQLException { 388 // Check if sessions table exists 389 if (!tableExists(source, "sessions")) { 390 if (verbose) { 391 System.out.println(" Skipping (no sessions table)"); 392 } 393 return; 394 } 395 396 // Read all sessions from source 397 try (Statement stmt = source.createStatement(); 398 ResultSet rs = stmt.executeQuery("SELECT * FROM sessions ORDER BY id")) { 399 400 while (rs.next()) { 401 long oldSessionId = rs.getLong("id"); 402 String workflowName = rs.getString("workflow_name"); 403 Timestamp startedAt = rs.getTimestamp("started_at"); 404 405 // Check for duplicates 406 String sessionKey = makeSessionKey(workflowName, startedAt); 407 if (existingSessions.contains(sessionKey)) { 408 if (verbose) { 409 System.out.println(" Skipping duplicate session: " + workflowName + " at " + startedAt); 410 } 411 skippedSessions++; 412 continue; 413 } 414 415 // Insert session into target and get new ID 416 long newSessionId = insertSession(target, rs, sourceName); 417 existingSessions.add(sessionKey); 418 totalSessions++; 419 420 // Copy logs for this session 421 int logCount = copyLogs(source, target, oldSessionId, newSessionId); 422 totalLogs += logCount; 423 424 // Copy node_results for this session 425 int nrCount = copyNodeResults(source, target, oldSessionId, newSessionId); 426 totalNodeResults += nrCount; 427 428 if (verbose) { 429 System.out.printf(" Session %d -> %d: %s (%d logs, %d node_results)%n", 430 oldSessionId, newSessionId, workflowName, logCount, nrCount); 431 } 432 } 433 } 434 } 435 436 /** 437 * Inserts a session into the target database. 438 */ 439 private long insertSession(Connection target, ResultSet rs, String sourceName) throws SQLException { 440 String sql = """ 441 INSERT INTO sessions (started_at, ended_at, workflow_name, overlay_name, 442 inventory_name, node_count, status, source_db) 443 VALUES (?, ?, ?, ?, ?, ?, ?, ?) 444 """; 445 try (PreparedStatement ps = target.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { 446 ps.setTimestamp(1, rs.getTimestamp("started_at")); 447 ps.setTimestamp(2, rs.getTimestamp("ended_at")); 448 ps.setString(3, rs.getString("workflow_name")); 449 ps.setString(4, rs.getString("overlay_name")); 450 ps.setString(5, rs.getString("inventory_name")); 451 ps.setInt(6, rs.getInt("node_count")); 452 ps.setString(7, rs.getString("status")); 453 ps.setString(8, sourceName); 454 ps.executeUpdate(); 455 456 try (ResultSet keys = ps.getGeneratedKeys()) { 457 if (keys.next()) { 458 return keys.getLong(1); 459 } 460 } 461 } 462 throw new SQLException("Failed to get generated session ID"); 463 } 464 465 /** 466 * Copies logs from source to target with new session ID. 467 */ 468 private int copyLogs(Connection source, Connection target, 469 long oldSessionId, long newSessionId) throws SQLException { 470 int count = 0; 471 String selectSql = "SELECT * FROM logs WHERE session_id = ?"; 472 String insertSql = """ 473 INSERT INTO logs (session_id, timestamp, node_id, vertex_name, action_name, 474 level, message, exit_code, duration_ms) 475 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 476 """; 477 478 try (PreparedStatement selectPs = source.prepareStatement(selectSql); 479 PreparedStatement insertPs = target.prepareStatement(insertSql)) { 480 selectPs.setLong(1, oldSessionId); 481 482 try (ResultSet rs = selectPs.executeQuery()) { 483 while (rs.next()) { 484 insertPs.setLong(1, newSessionId); 485 insertPs.setTimestamp(2, rs.getTimestamp("timestamp")); 486 insertPs.setString(3, rs.getString("node_id")); 487 insertPs.setString(4, rs.getString("vertex_name")); 488 insertPs.setString(5, rs.getString("action_name")); 489 insertPs.setString(6, rs.getString("level")); 490 insertPs.setString(7, rs.getString("message")); 491 492 int exitCode = rs.getInt("exit_code"); 493 if (rs.wasNull()) { 494 insertPs.setNull(8, Types.INTEGER); 495 } else { 496 insertPs.setInt(8, exitCode); 497 } 498 499 long durationMs = rs.getLong("duration_ms"); 500 if (rs.wasNull()) { 501 insertPs.setNull(9, Types.BIGINT); 502 } else { 503 insertPs.setLong(9, durationMs); 504 } 505 506 insertPs.executeUpdate(); 507 count++; 508 } 509 } 510 } 511 return count; 512 } 513 514 /** 515 * Copies node_results from source to target with new session ID. 516 */ 517 private int copyNodeResults(Connection source, Connection target, 518 long oldSessionId, long newSessionId) throws SQLException { 519 int count = 0; 520 String selectSql = "SELECT * FROM node_results WHERE session_id = ?"; 521 String insertSql = """ 522 INSERT INTO node_results (session_id, node_id, status, reason) 523 VALUES (?, ?, ?, ?) 524 """; 525 526 try (PreparedStatement selectPs = source.prepareStatement(selectSql); 527 PreparedStatement insertPs = target.prepareStatement(insertSql)) { 528 selectPs.setLong(1, oldSessionId); 529 530 try (ResultSet rs = selectPs.executeQuery()) { 531 while (rs.next()) { 532 insertPs.setLong(1, newSessionId); 533 insertPs.setString(2, rs.getString("node_id")); 534 insertPs.setString(3, rs.getString("status")); 535 insertPs.setString(4, rs.getString("reason")); 536 insertPs.executeUpdate(); 537 count++; 538 } 539 } 540 } 541 return count; 542 } 543 544 /** 545 * Counts rows in a table. 546 */ 547 private int countRows(Connection conn, String table) throws SQLException { 548 try (Statement stmt = conn.createStatement(); 549 ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM " + table)) { 550 if (rs.next()) { 551 return rs.getInt(1); 552 } 553 } 554 return 0; 555 } 556 557 /** 558 * Truncates a string to max length with ellipsis. 559 */ 560 private static String truncate(String s, int maxLen) { 561 if (s == null || s.length() <= maxLen) { 562 return s; 563 } 564 return s.substring(0, maxLen - 3) + "..."; 565 } 566}