001/*
002 * Copyright 2025 devteam@scivics-lab.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing,
011 * software distributed under the License is distributed on an
012 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied.  See the License for the
014 * specific language governing permissions and limitations
015 * under the License.
016 */
017
018package com.scivicslab.actoriac.cli;
019
020import java.io.File;
021import java.io.IOException;
022import java.nio.file.Files;
023import java.nio.file.Path;
024import java.sql.*;
025import java.util.*;
026import java.util.concurrent.Callable;
027import java.util.stream.Stream;
028
029import picocli.CommandLine.Command;
030import picocli.CommandLine.Option;
031import picocli.CommandLine.Parameters;
032
033/**
034 * CLI subcommand to merge scattered log databases into a single database.
035 *
036 * <p>Before the log-server feature, each workflow run would create its own
037 * separate database file. This command consolidates them into one database.</p>
038 *
039 * <h2>Usage Examples</h2>
040 * <pre>
041 * # Scan a directory for .mv.db files and merge them
042 * actor-iac merge-logs --scan ./workflows --target ./logs/merged
043 *
044 * # Merge specific database files
045 * actor-iac merge-logs --target ./logs/merged ./db1 ./db2 ./db3
046 *
047 * # Dry-run to see what would be merged
048 * actor-iac merge-logs --scan ./workflows --target ./logs/merged --dry-run
049 * </pre>
050 *
051 * @author devteam@scivics-lab.com
052 * @since 2.10.0
053 */
054@Command(
055    name = "log-merge",
056    mixinStandardHelpOptions = true,
057    version = "actor-IaC log-merge 2.10.0",
058    description = "Merge scattered log databases into a single database."
059)
060public class MergeLogsCLI implements Callable<Integer> {
061
062    @Option(
063        names = {"--target"},
064        required = true,
065        description = "Target database file path (without .mv.db extension)"
066    )
067    private File targetDb;
068
069    @Option(
070        names = {"--scan"},
071        description = "Directory to scan for .mv.db files (recursive)"
072    )
073    private File scanDir;
074
075    @Parameters(
076        paramLabel = "SOURCE",
077        description = "Source database files to merge (without .mv.db extension)",
078        arity = "0..*"
079    )
080    private List<File> sourceDbs = new ArrayList<>();
081
082    @Option(
083        names = {"--dry-run"},
084        description = "Show what would be merged without actually merging"
085    )
086    private boolean dryRun;
087
088    @Option(
089        names = {"-v", "--verbose"},
090        description = "Enable verbose output"
091    )
092    private boolean verbose;
093
094    @Option(
095        names = {"--skip-duplicates"},
096        description = "Skip sessions that already exist in target (based on workflow_name and started_at)",
097        defaultValue = "true"
098    )
099    private boolean skipDuplicates;
100
101    /** Statistics for reporting */
102    private int totalSessions = 0;
103    private int totalLogs = 0;
104    private int totalNodeResults = 0;
105    private int skippedSessions = 0;
106
107    @Override
108    public Integer call() {
109        // Collect source databases
110        List<File> allSources = collectSourceDatabases();
111        if (allSources.isEmpty()) {
112            System.err.println("No source databases found.");
113            System.err.println("Use --scan <dir> to scan for databases, or specify source files directly.");
114            return 1;
115        }
116
117        // Filter out target from sources if present
118        String targetPath = targetDb.getAbsolutePath();
119        allSources.removeIf(f -> f.getAbsolutePath().equals(targetPath));
120
121        if (allSources.isEmpty()) {
122            System.err.println("No source databases to merge (target was the only database found).");
123            return 1;
124        }
125
126        System.out.println("=".repeat(60));
127        System.out.println("Log Database Merge");
128        System.out.println("=".repeat(60));
129        System.out.println("Target: " + targetDb.getAbsolutePath() + ".mv.db");
130        System.out.println("Sources: " + allSources.size() + " database(s)");
131        if (verbose) {
132            for (File source : allSources) {
133                System.out.println("  - " + source.getAbsolutePath() + ".mv.db");
134            }
135        }
136        System.out.println("-".repeat(60));
137
138        if (dryRun) {
139            System.out.println("[DRY-RUN MODE - No changes will be made]");
140            System.out.println();
141            return dryRunAnalysis(allSources);
142        }
143
144        try {
145            return performMerge(allSources);
146        } catch (SQLException e) {
147            System.err.println("Database error: " + e.getMessage());
148            if (verbose) {
149                e.printStackTrace();
150            }
151            return 1;
152        }
153    }
154
155    /**
156     * Collects all source database files from scan directory and explicit parameters.
157     */
158    private List<File> collectSourceDatabases() {
159        List<File> sources = new ArrayList<>();
160
161        // Add explicitly specified sources
162        for (File source : sourceDbs) {
163            File dbFile = new File(source.getAbsolutePath() + ".mv.db");
164            if (dbFile.exists()) {
165                sources.add(source);
166            } else {
167                System.err.println("Warning: Database not found: " + dbFile.getAbsolutePath());
168            }
169        }
170
171        // Scan directory for .mv.db files
172        if (scanDir != null) {
173            if (!scanDir.isDirectory()) {
174                System.err.println("Warning: Not a directory: " + scanDir);
175            } else {
176                try (Stream<Path> paths = Files.walk(scanDir.toPath())) {
177                    paths.filter(Files::isRegularFile)
178                         .filter(p -> p.toString().endsWith(".mv.db"))
179                         .map(p -> {
180                             // Remove .mv.db extension
181                             String path = p.toString();
182                             return new File(path.substring(0, path.length() - 6));
183                         })
184                         .filter(f -> !sources.contains(f))
185                         .forEach(sources::add);
186                } catch (IOException e) {
187                    System.err.println("Warning: Failed to scan directory: " + e.getMessage());
188                }
189            }
190        }
191
192        return sources;
193    }
194
195    /**
196     * Performs dry-run analysis showing what would be merged.
197     */
198    private int dryRunAnalysis(List<File> sources) {
199        int totalSes = 0;
200        int totalLog = 0;
201        int totalNr = 0;
202
203        for (File source : sources) {
204            try (Connection conn = openDatabase(source)) {
205                // Skip databases without sessions table
206                if (!tableExists(conn, "sessions")) {
207                    System.out.printf("%-50s (empty - no sessions table)%n",
208                        truncate(source.getName(), 50));
209                    continue;
210                }
211
212                int sessions = countRows(conn, "sessions");
213                int logs = tableExists(conn, "logs") ? countRows(conn, "logs") : 0;
214                int nodeResults = tableExists(conn, "node_results") ? countRows(conn, "node_results") : 0;
215
216                totalSes += sessions;
217                totalLog += logs;
218                totalNr += nodeResults;
219
220                System.out.printf("%-50s sessions: %4d  logs: %6d  node_results: %4d%n",
221                    truncate(source.getName(), 50), sessions, logs, nodeResults);
222
223            } catch (SQLException e) {
224                System.err.println("Error reading " + source + ": " + e.getMessage());
225            }
226        }
227
228        System.out.println("-".repeat(60));
229        System.out.printf("%-50s sessions: %4d  logs: %6d  node_results: %4d%n",
230            "TOTAL", totalSes, totalLog, totalNr);
231        System.out.println("=".repeat(60));
232
233        return 0;
234    }
235
236    /**
237     * Performs the actual merge operation.
238     */
239    private int performMerge(List<File> sources) throws SQLException {
240        // Open/create target database
241        try (Connection targetConn = openDatabase(targetDb)) {
242            initializeSchema(targetConn);
243
244            // Load existing sessions for duplicate detection
245            Set<String> existingSessions = skipDuplicates ? loadExistingSessions(targetConn) : Set.of();
246
247            for (File source : sources) {
248                System.out.println("Merging: " + source.getName() + ".mv.db");
249                try (Connection sourceConn = openDatabase(source)) {
250                    mergeDatabase(sourceConn, targetConn, existingSessions, source.getName());
251                } catch (SQLException e) {
252                    System.err.println("  Error: " + e.getMessage());
253                    if (verbose) {
254                        e.printStackTrace();
255                    }
256                }
257            }
258
259            targetConn.commit();
260        }
261
262        // Print summary
263        System.out.println("-".repeat(60));
264        System.out.println("Merge completed:");
265        System.out.println("  Sessions merged:     " + totalSessions);
266        System.out.println("  Sessions skipped:    " + skippedSessions + " (duplicates)");
267        System.out.println("  Log entries merged:  " + totalLogs);
268        System.out.println("  Node results merged: " + totalNodeResults);
269        System.out.println("=".repeat(60));
270
271        return 0;
272    }
273
274    /**
275     * Opens a database connection.
276     */
277    private Connection openDatabase(File dbPath) throws SQLException {
278        String url = "jdbc:h2:" + dbPath.getAbsolutePath();
279        Connection conn = DriverManager.getConnection(url);
280        conn.setAutoCommit(false);
281        return conn;
282    }
283
284    /**
285     * Initializes the target database schema.
286     */
287    private void initializeSchema(Connection conn) throws SQLException {
288        try (Statement stmt = conn.createStatement()) {
289            stmt.execute("""
290                CREATE TABLE IF NOT EXISTS sessions (
291                    id IDENTITY PRIMARY KEY,
292                    started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
293                    ended_at TIMESTAMP,
294                    workflow_name VARCHAR(255),
295                    overlay_name VARCHAR(255),
296                    inventory_name VARCHAR(255),
297                    node_count INT,
298                    status VARCHAR(20) DEFAULT 'RUNNING',
299                    source_db VARCHAR(255)
300                )
301                """);
302
303            stmt.execute("""
304                CREATE TABLE IF NOT EXISTS logs (
305                    id IDENTITY PRIMARY KEY,
306                    session_id BIGINT,
307                    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
308                    node_id VARCHAR(255) NOT NULL,
309                    vertex_name CLOB,
310                    action_name CLOB,
311                    level VARCHAR(10) NOT NULL,
312                    message CLOB,
313                    exit_code INT,
314                    duration_ms BIGINT,
315                    FOREIGN KEY (session_id) REFERENCES sessions(id)
316                )
317                """);
318
319            stmt.execute("""
320                CREATE TABLE IF NOT EXISTS node_results (
321                    id IDENTITY PRIMARY KEY,
322                    session_id BIGINT,
323                    node_id VARCHAR(255) NOT NULL,
324                    status VARCHAR(20) NOT NULL,
325                    reason VARCHAR(1000),
326                    FOREIGN KEY (session_id) REFERENCES sessions(id)
327                )
328                """);
329
330            // Add source_db column if it doesn't exist (migration)
331            try {
332                stmt.execute("ALTER TABLE sessions ADD COLUMN IF NOT EXISTS source_db VARCHAR(255)");
333            } catch (SQLException e) {
334                // Column might already exist
335            }
336
337            // Create indexes
338            stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_session ON logs(session_id)");
339            stmt.execute("CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp)");
340            stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at)");
341            stmt.execute("CREATE INDEX IF NOT EXISTS idx_sessions_workflow ON sessions(workflow_name)");
342        }
343        conn.commit();
344    }
345
346    /**
347     * Loads existing session keys for duplicate detection.
348     */
349    private Set<String> loadExistingSessions(Connection conn) throws SQLException {
350        Set<String> existing = new HashSet<>();
351        try (Statement stmt = conn.createStatement();
352             ResultSet rs = stmt.executeQuery("SELECT workflow_name, started_at FROM sessions")) {
353            while (rs.next()) {
354                String key = makeSessionKey(rs.getString("workflow_name"), rs.getTimestamp("started_at"));
355                existing.add(key);
356            }
357        }
358        return existing;
359    }
360
361    /**
362     * Creates a unique key for duplicate detection.
363     */
364    private String makeSessionKey(String workflowName, Timestamp startedAt) {
365        return (workflowName != null ? workflowName : "") + "|" +
366               (startedAt != null ? startedAt.toString() : "");
367    }
368
369    /**
370     * Checks if a table exists in the database.
371     */
372    private boolean tableExists(Connection conn, String tableName) {
373        // Use a simple query to check if table exists - more reliable than metadata
374        try (Statement stmt = conn.createStatement()) {
375            stmt.executeQuery("SELECT 1 FROM " + tableName + " WHERE 1=0");
376            return true;
377        } catch (SQLException e) {
378            // Table doesn't exist
379            return false;
380        }
381    }
382
383    /**
384     * Merges one source database into the target.
385     */
386    private void mergeDatabase(Connection source, Connection target,
387                               Set<String> existingSessions, String sourceName) throws SQLException {
388        // Check if sessions table exists
389        if (!tableExists(source, "sessions")) {
390            if (verbose) {
391                System.out.println("  Skipping (no sessions table)");
392            }
393            return;
394        }
395
396        // Read all sessions from source
397        try (Statement stmt = source.createStatement();
398             ResultSet rs = stmt.executeQuery("SELECT * FROM sessions ORDER BY id")) {
399
400            while (rs.next()) {
401                long oldSessionId = rs.getLong("id");
402                String workflowName = rs.getString("workflow_name");
403                Timestamp startedAt = rs.getTimestamp("started_at");
404
405                // Check for duplicates
406                String sessionKey = makeSessionKey(workflowName, startedAt);
407                if (existingSessions.contains(sessionKey)) {
408                    if (verbose) {
409                        System.out.println("  Skipping duplicate session: " + workflowName + " at " + startedAt);
410                    }
411                    skippedSessions++;
412                    continue;
413                }
414
415                // Insert session into target and get new ID
416                long newSessionId = insertSession(target, rs, sourceName);
417                existingSessions.add(sessionKey);
418                totalSessions++;
419
420                // Copy logs for this session
421                int logCount = copyLogs(source, target, oldSessionId, newSessionId);
422                totalLogs += logCount;
423
424                // Copy node_results for this session
425                int nrCount = copyNodeResults(source, target, oldSessionId, newSessionId);
426                totalNodeResults += nrCount;
427
428                if (verbose) {
429                    System.out.printf("  Session %d -> %d: %s (%d logs, %d node_results)%n",
430                        oldSessionId, newSessionId, workflowName, logCount, nrCount);
431                }
432            }
433        }
434    }
435
436    /**
437     * Inserts a session into the target database.
438     */
439    private long insertSession(Connection target, ResultSet rs, String sourceName) throws SQLException {
440        String sql = """
441            INSERT INTO sessions (started_at, ended_at, workflow_name, overlay_name,
442                                  inventory_name, node_count, status, source_db)
443            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
444            """;
445        try (PreparedStatement ps = target.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
446            ps.setTimestamp(1, rs.getTimestamp("started_at"));
447            ps.setTimestamp(2, rs.getTimestamp("ended_at"));
448            ps.setString(3, rs.getString("workflow_name"));
449            ps.setString(4, rs.getString("overlay_name"));
450            ps.setString(5, rs.getString("inventory_name"));
451            ps.setInt(6, rs.getInt("node_count"));
452            ps.setString(7, rs.getString("status"));
453            ps.setString(8, sourceName);
454            ps.executeUpdate();
455
456            try (ResultSet keys = ps.getGeneratedKeys()) {
457                if (keys.next()) {
458                    return keys.getLong(1);
459                }
460            }
461        }
462        throw new SQLException("Failed to get generated session ID");
463    }
464
465    /**
466     * Copies logs from source to target with new session ID.
467     */
468    private int copyLogs(Connection source, Connection target,
469                         long oldSessionId, long newSessionId) throws SQLException {
470        int count = 0;
471        String selectSql = "SELECT * FROM logs WHERE session_id = ?";
472        String insertSql = """
473            INSERT INTO logs (session_id, timestamp, node_id, vertex_name, action_name,
474                             level, message, exit_code, duration_ms)
475            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
476            """;
477
478        try (PreparedStatement selectPs = source.prepareStatement(selectSql);
479             PreparedStatement insertPs = target.prepareStatement(insertSql)) {
480            selectPs.setLong(1, oldSessionId);
481
482            try (ResultSet rs = selectPs.executeQuery()) {
483                while (rs.next()) {
484                    insertPs.setLong(1, newSessionId);
485                    insertPs.setTimestamp(2, rs.getTimestamp("timestamp"));
486                    insertPs.setString(3, rs.getString("node_id"));
487                    insertPs.setString(4, rs.getString("vertex_name"));
488                    insertPs.setString(5, rs.getString("action_name"));
489                    insertPs.setString(6, rs.getString("level"));
490                    insertPs.setString(7, rs.getString("message"));
491
492                    int exitCode = rs.getInt("exit_code");
493                    if (rs.wasNull()) {
494                        insertPs.setNull(8, Types.INTEGER);
495                    } else {
496                        insertPs.setInt(8, exitCode);
497                    }
498
499                    long durationMs = rs.getLong("duration_ms");
500                    if (rs.wasNull()) {
501                        insertPs.setNull(9, Types.BIGINT);
502                    } else {
503                        insertPs.setLong(9, durationMs);
504                    }
505
506                    insertPs.executeUpdate();
507                    count++;
508                }
509            }
510        }
511        return count;
512    }
513
514    /**
515     * Copies node_results from source to target with new session ID.
516     */
517    private int copyNodeResults(Connection source, Connection target,
518                                long oldSessionId, long newSessionId) throws SQLException {
519        int count = 0;
520        String selectSql = "SELECT * FROM node_results WHERE session_id = ?";
521        String insertSql = """
522            INSERT INTO node_results (session_id, node_id, status, reason)
523            VALUES (?, ?, ?, ?)
524            """;
525
526        try (PreparedStatement selectPs = source.prepareStatement(selectSql);
527             PreparedStatement insertPs = target.prepareStatement(insertSql)) {
528            selectPs.setLong(1, oldSessionId);
529
530            try (ResultSet rs = selectPs.executeQuery()) {
531                while (rs.next()) {
532                    insertPs.setLong(1, newSessionId);
533                    insertPs.setString(2, rs.getString("node_id"));
534                    insertPs.setString(3, rs.getString("status"));
535                    insertPs.setString(4, rs.getString("reason"));
536                    insertPs.executeUpdate();
537                    count++;
538                }
539            }
540        }
541        return count;
542    }
543
544    /**
545     * Counts rows in a table.
546     */
547    private int countRows(Connection conn, String table) throws SQLException {
548        try (Statement stmt = conn.createStatement();
549             ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM " + table)) {
550            if (rs.next()) {
551                return rs.getInt(1);
552            }
553        }
554        return 0;
555    }
556
557    /**
558     * Truncates a string to max length with ellipsis.
559     */
560    private static String truncate(String s, int maxLen) {
561        if (s == null || s.length() <= maxLen) {
562            return s;
563        }
564        return s.substring(0, maxLen - 3) + "...";
565    }
566}