001/* 002 * Copyright 2025 devteam@scivicslab.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, 011 * software distributed under the License is distributed on an 012 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the 014 * specific language governing permissions and limitations 015 * under the License. 016 */ 017 018package com.scivicslab.actoriac.cli; 019 020import java.io.File; 021import java.io.IOException; 022import java.nio.file.Files; 023import java.nio.file.Path; 024import java.sql.*; 025import java.util.*; 026import java.util.concurrent.Callable; 027import java.util.stream.Stream; 028 029import com.scivicslab.actoriac.log.H2LogStore; 030 031import picocli.CommandLine.Command; 032import picocli.CommandLine.Option; 033import picocli.CommandLine.Parameters; 034 035/** 036 * CLI subcommand to merge scattered log databases into a single database. 037 * 038 * <p>Before the log-server feature, each workflow run would create its own 039 * separate database file. This command consolidates them into one database.</p> 040 * 041 * <h2>Usage Examples</h2> 042 * <pre> 043 * # Scan a directory for .mv.db files and merge them 044 * actor-iac merge-logs --scan ./workflows --target ./logs/merged 045 * 046 * # Merge specific database files 047 * actor-iac merge-logs --target ./logs/merged ./db1 ./db2 ./db3 048 * 049 * # Dry-run to see what would be merged 050 * actor-iac merge-logs --scan ./workflows --target ./logs/merged --dry-run 051 * </pre> 052 * 053 * @author devteam@scivicslab.com 054 * @since 2.10.0 055 */ 056@Command( 057 name = "log-merge", 058 mixinStandardHelpOptions = true, 059 versionProvider = VersionProvider.class, 060 description = "Merge scattered log databases into a single database." 061) 062public class MergeLogsCLI implements Callable<Integer> { 063 064 @Option( 065 names = {"--target"}, 066 required = true, 067 description = "Target database file path (without .mv.db extension)" 068 ) 069 private File targetDb; 070 071 @Option( 072 names = {"--scan"}, 073 description = "Directory to scan for .mv.db files (recursive)" 074 ) 075 private File scanDir; 076 077 @Parameters( 078 paramLabel = "SOURCE", 079 description = "Source database files to merge (without .mv.db extension)", 080 arity = "0..*" 081 ) 082 private List<File> sourceDbs = new ArrayList<>(); 083 084 @Option( 085 names = {"--dry-run"}, 086 description = "Show what would be merged without actually merging" 087 ) 088 private boolean dryRun; 089 090 @Option( 091 names = {"-v", "--verbose"}, 092 description = "Enable verbose output" 093 ) 094 private boolean verbose; 095 096 @Option( 097 names = {"--skip-duplicates"}, 098 description = "Skip sessions that already exist in target (based on workflow_name and started_at)", 099 defaultValue = "true" 100 ) 101 private boolean skipDuplicates; 102 103 /** Statistics for reporting */ 104 private int totalSessions = 0; 105 private int totalLogs = 0; 106 private int totalNodeResults = 0; 107 private int skippedSessions = 0; 108 109 @Override 110 public Integer call() { 111 // Collect source databases 112 List<File> allSources = collectSourceDatabases(); 113 if (allSources.isEmpty()) { 114 System.err.println("No source databases found."); 115 System.err.println("Use --scan <dir> to scan for databases, or specify source files directly."); 116 return 1; 117 } 118 119 // Filter out target from sources if present 120 String targetPath = targetDb.getAbsolutePath(); 121 allSources.removeIf(f -> f.getAbsolutePath().equals(targetPath)); 122 123 if (allSources.isEmpty()) { 124 System.err.println("No source databases to merge (target was the only database found)."); 125 return 1; 126 } 127 128 System.out.println("=".repeat(60)); 129 System.out.println("Log Database Merge"); 130 System.out.println("=".repeat(60)); 131 System.out.println("Target: " + targetDb.getAbsolutePath() + ".mv.db"); 132 System.out.println("Sources: " + allSources.size() + " database(s)"); 133 if (verbose) { 134 for (File source : allSources) { 135 System.out.println(" - " + source.getAbsolutePath() + ".mv.db"); 136 } 137 } 138 System.out.println("-".repeat(60)); 139 140 if (dryRun) { 141 System.out.println("[DRY-RUN MODE - No changes will be made]"); 142 System.out.println(); 143 return dryRunAnalysis(allSources); 144 } 145 146 try { 147 return performMerge(allSources); 148 } catch (SQLException e) { 149 System.err.println("Database error: " + e.getMessage()); 150 if (verbose) { 151 e.printStackTrace(); 152 } 153 return 1; 154 } 155 } 156 157 /** 158 * Collects all source database files from scan directory and explicit parameters. 159 */ 160 private List<File> collectSourceDatabases() { 161 List<File> sources = new ArrayList<>(); 162 163 // Add explicitly specified sources 164 for (File source : sourceDbs) { 165 File dbFile = new File(source.getAbsolutePath() + ".mv.db"); 166 if (dbFile.exists()) { 167 sources.add(source); 168 } else { 169 System.err.println("Warning: Database not found: " + dbFile.getAbsolutePath()); 170 } 171 } 172 173 // Scan directory for .mv.db files 174 if (scanDir != null) { 175 if (!scanDir.isDirectory()) { 176 System.err.println("Warning: Not a directory: " + scanDir); 177 } else { 178 try (Stream<Path> paths = Files.walk(scanDir.toPath())) { 179 paths.filter(Files::isRegularFile) 180 .filter(p -> p.toString().endsWith(".mv.db")) 181 .map(p -> { 182 // Remove .mv.db extension 183 String path = p.toString(); 184 return new File(path.substring(0, path.length() - 6)); 185 }) 186 .filter(f -> !sources.contains(f)) 187 .forEach(sources::add); 188 } catch (IOException e) { 189 System.err.println("Warning: Failed to scan directory: " + e.getMessage()); 190 } 191 } 192 } 193 194 return sources; 195 } 196 197 /** 198 * Performs dry-run analysis showing what would be merged. 199 */ 200 private int dryRunAnalysis(List<File> sources) { 201 int totalSes = 0; 202 int totalLog = 0; 203 int totalNr = 0; 204 205 for (File source : sources) { 206 try (Connection conn = openDatabase(source)) { 207 // Skip databases without sessions table 208 if (!tableExists(conn, "sessions")) { 209 System.out.printf("%-50s (empty - no sessions table)%n", 210 truncate(source.getName(), 50)); 211 continue; 212 } 213 214 int sessions = countRows(conn, "sessions"); 215 int logs = tableExists(conn, "logs") ? countRows(conn, "logs") : 0; 216 int nodeResults = tableExists(conn, "node_results") ? countRows(conn, "node_results") : 0; 217 218 totalSes += sessions; 219 totalLog += logs; 220 totalNr += nodeResults; 221 222 System.out.printf("%-50s sessions: %4d logs: %6d node_results: %4d%n", 223 truncate(source.getName(), 50), sessions, logs, nodeResults); 224 225 } catch (SQLException e) { 226 System.err.println("Error reading " + source + ": " + e.getMessage()); 227 } 228 } 229 230 System.out.println("-".repeat(60)); 231 System.out.printf("%-50s sessions: %4d logs: %6d node_results: %4d%n", 232 "TOTAL", totalSes, totalLog, totalNr); 233 System.out.println("=".repeat(60)); 234 235 return 0; 236 } 237 238 /** 239 * Performs the actual merge operation. 240 */ 241 private int performMerge(List<File> sources) throws SQLException { 242 // Open/create target database 243 try (Connection targetConn = openDatabase(targetDb)) { 244 initializeSchema(targetConn); 245 246 // Load existing sessions for duplicate detection 247 Set<String> existingSessions = skipDuplicates ? loadExistingSessions(targetConn) : Set.of(); 248 249 for (File source : sources) { 250 System.out.println("Merging: " + source.getName() + ".mv.db"); 251 try (Connection sourceConn = openDatabase(source)) { 252 mergeDatabase(sourceConn, targetConn, existingSessions, source.getName()); 253 } catch (SQLException e) { 254 System.err.println(" Error: " + e.getMessage()); 255 if (verbose) { 256 e.printStackTrace(); 257 } 258 } 259 } 260 261 targetConn.commit(); 262 } 263 264 // Print summary 265 System.out.println("-".repeat(60)); 266 System.out.println("Merge completed:"); 267 System.out.println(" Sessions merged: " + totalSessions); 268 System.out.println(" Sessions skipped: " + skippedSessions + " (duplicates)"); 269 System.out.println(" Log entries merged: " + totalLogs); 270 System.out.println(" Node results merged: " + totalNodeResults); 271 System.out.println("=".repeat(60)); 272 273 return 0; 274 } 275 276 /** 277 * Opens a database connection. 278 */ 279 private Connection openDatabase(File dbPath) throws SQLException { 280 String url = "jdbc:h2:" + dbPath.getAbsolutePath(); 281 Connection conn = DriverManager.getConnection(url); 282 conn.setAutoCommit(false); 283 return conn; 284 } 285 286 /** 287 * Initializes the target database schema. 288 */ 289 private void initializeSchema(Connection conn) throws SQLException { 290 H2LogStore.initSchema(conn); 291 292 // Add source_db column for merge tracking (specific to merge-logs) 293 try (Statement stmt = conn.createStatement()) { 294 stmt.execute("ALTER TABLE sessions ADD COLUMN IF NOT EXISTS source_db VARCHAR(255)"); 295 } catch (SQLException e) { 296 // Column might already exist 297 } 298 299 conn.commit(); 300 } 301 302 /** 303 * Loads existing session keys for duplicate detection. 304 */ 305 private Set<String> loadExistingSessions(Connection conn) throws SQLException { 306 Set<String> existing = new HashSet<>(); 307 try (Statement stmt = conn.createStatement(); 308 ResultSet rs = stmt.executeQuery("SELECT workflow_name, started_at FROM sessions")) { 309 while (rs.next()) { 310 String key = makeSessionKey(rs.getString("workflow_name"), rs.getTimestamp("started_at")); 311 existing.add(key); 312 } 313 } 314 return existing; 315 } 316 317 /** 318 * Creates a unique key for duplicate detection. 319 */ 320 private String makeSessionKey(String workflowName, Timestamp startedAt) { 321 return (workflowName != null ? workflowName : "") + "|" + 322 (startedAt != null ? startedAt.toString() : ""); 323 } 324 325 /** 326 * Checks if a table exists in the database. 327 */ 328 private boolean tableExists(Connection conn, String tableName) { 329 // Use a simple query to check if table exists - more reliable than metadata 330 try (Statement stmt = conn.createStatement()) { 331 stmt.executeQuery("SELECT 1 FROM " + tableName + " WHERE 1=0"); 332 return true; 333 } catch (SQLException e) { 334 // Table doesn't exist 335 return false; 336 } 337 } 338 339 /** 340 * Merges one source database into the target. 341 */ 342 private void mergeDatabase(Connection source, Connection target, 343 Set<String> existingSessions, String sourceName) throws SQLException { 344 // Check if sessions table exists 345 if (!tableExists(source, "sessions")) { 346 if (verbose) { 347 System.out.println(" Skipping (no sessions table)"); 348 } 349 return; 350 } 351 352 // Read all sessions from source 353 try (Statement stmt = source.createStatement(); 354 ResultSet rs = stmt.executeQuery("SELECT * FROM sessions ORDER BY id")) { 355 356 while (rs.next()) { 357 long oldSessionId = rs.getLong("id"); 358 String workflowName = rs.getString("workflow_name"); 359 Timestamp startedAt = rs.getTimestamp("started_at"); 360 361 // Check for duplicates 362 String sessionKey = makeSessionKey(workflowName, startedAt); 363 if (existingSessions.contains(sessionKey)) { 364 if (verbose) { 365 System.out.println(" Skipping duplicate session: " + workflowName + " at " + startedAt); 366 } 367 skippedSessions++; 368 continue; 369 } 370 371 // Insert session into target and get new ID 372 long newSessionId = insertSession(target, rs, sourceName); 373 existingSessions.add(sessionKey); 374 totalSessions++; 375 376 // Copy logs for this session 377 int logCount = copyLogs(source, target, oldSessionId, newSessionId); 378 totalLogs += logCount; 379 380 // Copy node_results for this session 381 int nrCount = copyNodeResults(source, target, oldSessionId, newSessionId); 382 totalNodeResults += nrCount; 383 384 if (verbose) { 385 System.out.printf(" Session %d -> %d: %s (%d logs, %d node_results)%n", 386 oldSessionId, newSessionId, workflowName, logCount, nrCount); 387 } 388 } 389 } 390 } 391 392 /** 393 * Inserts a session into the target database. 394 */ 395 private long insertSession(Connection target, ResultSet rs, String sourceName) throws SQLException { 396 String sql = """ 397 INSERT INTO sessions (started_at, ended_at, workflow_name, overlay_name, 398 inventory_name, node_count, status, source_db) 399 VALUES (?, ?, ?, ?, ?, ?, ?, ?) 400 """; 401 try (PreparedStatement ps = target.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { 402 ps.setTimestamp(1, rs.getTimestamp("started_at")); 403 ps.setTimestamp(2, rs.getTimestamp("ended_at")); 404 ps.setString(3, rs.getString("workflow_name")); 405 ps.setString(4, rs.getString("overlay_name")); 406 ps.setString(5, rs.getString("inventory_name")); 407 ps.setInt(6, rs.getInt("node_count")); 408 ps.setString(7, rs.getString("status")); 409 ps.setString(8, sourceName); 410 ps.executeUpdate(); 411 412 try (ResultSet keys = ps.getGeneratedKeys()) { 413 if (keys.next()) { 414 return keys.getLong(1); 415 } 416 } 417 } 418 throw new SQLException("Failed to get generated session ID"); 419 } 420 421 /** 422 * Copies logs from source to target with new session ID. 423 */ 424 private int copyLogs(Connection source, Connection target, 425 long oldSessionId, long newSessionId) throws SQLException { 426 int count = 0; 427 String selectSql = "SELECT * FROM logs WHERE session_id = ?"; 428 String insertSql = """ 429 INSERT INTO logs (session_id, timestamp, node_id, label, action_name, 430 level, message, exit_code, duration_ms) 431 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 432 """; 433 434 try (PreparedStatement selectPs = source.prepareStatement(selectSql); 435 PreparedStatement insertPs = target.prepareStatement(insertSql)) { 436 selectPs.setLong(1, oldSessionId); 437 438 try (ResultSet rs = selectPs.executeQuery()) { 439 while (rs.next()) { 440 insertPs.setLong(1, newSessionId); 441 insertPs.setTimestamp(2, rs.getTimestamp("timestamp")); 442 insertPs.setString(3, rs.getString("node_id")); 443 insertPs.setString(4, rs.getString("label")); 444 insertPs.setString(5, rs.getString("action_name")); 445 insertPs.setString(6, rs.getString("level")); 446 insertPs.setString(7, rs.getString("message")); 447 448 int exitCode = rs.getInt("exit_code"); 449 if (rs.wasNull()) { 450 insertPs.setNull(8, Types.INTEGER); 451 } else { 452 insertPs.setInt(8, exitCode); 453 } 454 455 long durationMs = rs.getLong("duration_ms"); 456 if (rs.wasNull()) { 457 insertPs.setNull(9, Types.BIGINT); 458 } else { 459 insertPs.setLong(9, durationMs); 460 } 461 462 insertPs.executeUpdate(); 463 count++; 464 } 465 } 466 } 467 return count; 468 } 469 470 /** 471 * Copies node_results from source to target with new session ID. 472 */ 473 private int copyNodeResults(Connection source, Connection target, 474 long oldSessionId, long newSessionId) throws SQLException { 475 int count = 0; 476 String selectSql = "SELECT * FROM node_results WHERE session_id = ?"; 477 String insertSql = """ 478 INSERT INTO node_results (session_id, node_id, status, reason) 479 VALUES (?, ?, ?, ?) 480 """; 481 482 try (PreparedStatement selectPs = source.prepareStatement(selectSql); 483 PreparedStatement insertPs = target.prepareStatement(insertSql)) { 484 selectPs.setLong(1, oldSessionId); 485 486 try (ResultSet rs = selectPs.executeQuery()) { 487 while (rs.next()) { 488 insertPs.setLong(1, newSessionId); 489 insertPs.setString(2, rs.getString("node_id")); 490 insertPs.setString(3, rs.getString("status")); 491 insertPs.setString(4, rs.getString("reason")); 492 insertPs.executeUpdate(); 493 count++; 494 } 495 } 496 } 497 return count; 498 } 499 500 /** 501 * Counts rows in a table. 502 */ 503 private int countRows(Connection conn, String table) throws SQLException { 504 try (Statement stmt = conn.createStatement(); 505 ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM " + table)) { 506 if (rs.next()) { 507 return rs.getInt(1); 508 } 509 } 510 return 0; 511 } 512 513 /** 514 * Truncates a string to max length with ellipsis. 515 */ 516 private static String truncate(String s, int maxLen) { 517 if (s == null || s.length() <= maxLen) { 518 return s; 519 } 520 return s.substring(0, maxLen - 3) + "..."; 521 } 522}