001/*
002 * Copyright 2025 devteam@scivicslab.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing,
011 * software distributed under the License is distributed on an
012 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied.  See the License for the
014 * specific language governing permissions and limitations
015 * under the License.
016 */
017
018package com.scivicslab.actoriac.cli;
019
020import java.io.File;
021import java.io.IOException;
022import java.nio.file.Files;
023import java.nio.file.Path;
024import java.sql.*;
025import java.util.*;
026import java.util.concurrent.Callable;
027import java.util.stream.Stream;
028
029import com.scivicslab.actoriac.log.H2LogStore;
030
031import picocli.CommandLine.Command;
032import picocli.CommandLine.Option;
033import picocli.CommandLine.Parameters;
034
035/**
036 * CLI subcommand to merge scattered log databases into a single database.
037 *
038 * <p>Before the log-server feature, each workflow run would create its own
039 * separate database file. This command consolidates them into one database.</p>
040 *
041 * <h2>Usage Examples</h2>
042 * <pre>
043 * # Scan a directory for .mv.db files and merge them
044 * actor-iac merge-logs --scan ./workflows --target ./logs/merged
045 *
046 * # Merge specific database files
047 * actor-iac merge-logs --target ./logs/merged ./db1 ./db2 ./db3
048 *
049 * # Dry-run to see what would be merged
050 * actor-iac merge-logs --scan ./workflows --target ./logs/merged --dry-run
051 * </pre>
052 *
053 * @author devteam@scivicslab.com
054 * @since 2.10.0
055 */
056@Command(
057    name = "log-merge",
058    mixinStandardHelpOptions = true,
059    versionProvider = VersionProvider.class,
060    description = "Merge scattered log databases into a single database."
061)
062public class MergeLogsCLI implements Callable<Integer> {
063
064    @Option(
065        names = {"--target"},
066        required = true,
067        description = "Target database file path (without .mv.db extension)"
068    )
069    private File targetDb;
070
071    @Option(
072        names = {"--scan"},
073        description = "Directory to scan for .mv.db files (recursive)"
074    )
075    private File scanDir;
076
077    @Parameters(
078        paramLabel = "SOURCE",
079        description = "Source database files to merge (without .mv.db extension)",
080        arity = "0..*"
081    )
082    private List<File> sourceDbs = new ArrayList<>();
083
084    @Option(
085        names = {"--dry-run"},
086        description = "Show what would be merged without actually merging"
087    )
088    private boolean dryRun;
089
090    @Option(
091        names = {"-v", "--verbose"},
092        description = "Enable verbose output"
093    )
094    private boolean verbose;
095
096    @Option(
097        names = {"--skip-duplicates"},
098        description = "Skip sessions that already exist in target (based on workflow_name and started_at)",
099        defaultValue = "true"
100    )
101    private boolean skipDuplicates;
102
103    /** Statistics for reporting */
104    private int totalSessions = 0;
105    private int totalLogs = 0;
106    private int totalNodeResults = 0;
107    private int skippedSessions = 0;
108
109    @Override
110    public Integer call() {
111        // Collect source databases
112        List<File> allSources = collectSourceDatabases();
113        if (allSources.isEmpty()) {
114            System.err.println("No source databases found.");
115            System.err.println("Use --scan <dir> to scan for databases, or specify source files directly.");
116            return 1;
117        }
118
119        // Filter out target from sources if present
120        String targetPath = targetDb.getAbsolutePath();
121        allSources.removeIf(f -> f.getAbsolutePath().equals(targetPath));
122
123        if (allSources.isEmpty()) {
124            System.err.println("No source databases to merge (target was the only database found).");
125            return 1;
126        }
127
128        System.out.println("=".repeat(60));
129        System.out.println("Log Database Merge");
130        System.out.println("=".repeat(60));
131        System.out.println("Target: " + targetDb.getAbsolutePath() + ".mv.db");
132        System.out.println("Sources: " + allSources.size() + " database(s)");
133        if (verbose) {
134            for (File source : allSources) {
135                System.out.println("  - " + source.getAbsolutePath() + ".mv.db");
136            }
137        }
138        System.out.println("-".repeat(60));
139
140        if (dryRun) {
141            System.out.println("[DRY-RUN MODE - No changes will be made]");
142            System.out.println();
143            return dryRunAnalysis(allSources);
144        }
145
146        try {
147            return performMerge(allSources);
148        } catch (SQLException e) {
149            System.err.println("Database error: " + e.getMessage());
150            if (verbose) {
151                e.printStackTrace();
152            }
153            return 1;
154        }
155    }
156
157    /**
158     * Collects all source database files from scan directory and explicit parameters.
159     */
160    private List<File> collectSourceDatabases() {
161        List<File> sources = new ArrayList<>();
162
163        // Add explicitly specified sources
164        for (File source : sourceDbs) {
165            File dbFile = new File(source.getAbsolutePath() + ".mv.db");
166            if (dbFile.exists()) {
167                sources.add(source);
168            } else {
169                System.err.println("Warning: Database not found: " + dbFile.getAbsolutePath());
170            }
171        }
172
173        // Scan directory for .mv.db files
174        if (scanDir != null) {
175            if (!scanDir.isDirectory()) {
176                System.err.println("Warning: Not a directory: " + scanDir);
177            } else {
178                try (Stream<Path> paths = Files.walk(scanDir.toPath())) {
179                    paths.filter(Files::isRegularFile)
180                         .filter(p -> p.toString().endsWith(".mv.db"))
181                         .map(p -> {
182                             // Remove .mv.db extension
183                             String path = p.toString();
184                             return new File(path.substring(0, path.length() - 6));
185                         })
186                         .filter(f -> !sources.contains(f))
187                         .forEach(sources::add);
188                } catch (IOException e) {
189                    System.err.println("Warning: Failed to scan directory: " + e.getMessage());
190                }
191            }
192        }
193
194        return sources;
195    }
196
197    /**
198     * Performs dry-run analysis showing what would be merged.
199     */
200    private int dryRunAnalysis(List<File> sources) {
201        int totalSes = 0;
202        int totalLog = 0;
203        int totalNr = 0;
204
205        for (File source : sources) {
206            try (Connection conn = openDatabase(source)) {
207                // Skip databases without sessions table
208                if (!tableExists(conn, "sessions")) {
209                    System.out.printf("%-50s (empty - no sessions table)%n",
210                        truncate(source.getName(), 50));
211                    continue;
212                }
213
214                int sessions = countRows(conn, "sessions");
215                int logs = tableExists(conn, "logs") ? countRows(conn, "logs") : 0;
216                int nodeResults = tableExists(conn, "node_results") ? countRows(conn, "node_results") : 0;
217
218                totalSes += sessions;
219                totalLog += logs;
220                totalNr += nodeResults;
221
222                System.out.printf("%-50s sessions: %4d  logs: %6d  node_results: %4d%n",
223                    truncate(source.getName(), 50), sessions, logs, nodeResults);
224
225            } catch (SQLException e) {
226                System.err.println("Error reading " + source + ": " + e.getMessage());
227            }
228        }
229
230        System.out.println("-".repeat(60));
231        System.out.printf("%-50s sessions: %4d  logs: %6d  node_results: %4d%n",
232            "TOTAL", totalSes, totalLog, totalNr);
233        System.out.println("=".repeat(60));
234
235        return 0;
236    }
237
238    /**
239     * Performs the actual merge operation.
240     */
241    private int performMerge(List<File> sources) throws SQLException {
242        // Open/create target database
243        try (Connection targetConn = openDatabase(targetDb)) {
244            initializeSchema(targetConn);
245
246            // Load existing sessions for duplicate detection
247            Set<String> existingSessions = skipDuplicates ? loadExistingSessions(targetConn) : Set.of();
248
249            for (File source : sources) {
250                System.out.println("Merging: " + source.getName() + ".mv.db");
251                try (Connection sourceConn = openDatabase(source)) {
252                    mergeDatabase(sourceConn, targetConn, existingSessions, source.getName());
253                } catch (SQLException e) {
254                    System.err.println("  Error: " + e.getMessage());
255                    if (verbose) {
256                        e.printStackTrace();
257                    }
258                }
259            }
260
261            targetConn.commit();
262        }
263
264        // Print summary
265        System.out.println("-".repeat(60));
266        System.out.println("Merge completed:");
267        System.out.println("  Sessions merged:     " + totalSessions);
268        System.out.println("  Sessions skipped:    " + skippedSessions + " (duplicates)");
269        System.out.println("  Log entries merged:  " + totalLogs);
270        System.out.println("  Node results merged: " + totalNodeResults);
271        System.out.println("=".repeat(60));
272
273        return 0;
274    }
275
276    /**
277     * Opens a database connection.
278     */
279    private Connection openDatabase(File dbPath) throws SQLException {
280        String url = "jdbc:h2:" + dbPath.getAbsolutePath();
281        Connection conn = DriverManager.getConnection(url);
282        conn.setAutoCommit(false);
283        return conn;
284    }
285
286    /**
287     * Initializes the target database schema.
288     */
289    private void initializeSchema(Connection conn) throws SQLException {
290        H2LogStore.initSchema(conn);
291
292        // Add source_db column for merge tracking (specific to merge-logs)
293        try (Statement stmt = conn.createStatement()) {
294            stmt.execute("ALTER TABLE sessions ADD COLUMN IF NOT EXISTS source_db VARCHAR(255)");
295        } catch (SQLException e) {
296            // Column might already exist
297        }
298
299        conn.commit();
300    }
301
302    /**
303     * Loads existing session keys for duplicate detection.
304     */
305    private Set<String> loadExistingSessions(Connection conn) throws SQLException {
306        Set<String> existing = new HashSet<>();
307        try (Statement stmt = conn.createStatement();
308             ResultSet rs = stmt.executeQuery("SELECT workflow_name, started_at FROM sessions")) {
309            while (rs.next()) {
310                String key = makeSessionKey(rs.getString("workflow_name"), rs.getTimestamp("started_at"));
311                existing.add(key);
312            }
313        }
314        return existing;
315    }
316
317    /**
318     * Creates a unique key for duplicate detection.
319     */
320    private String makeSessionKey(String workflowName, Timestamp startedAt) {
321        return (workflowName != null ? workflowName : "") + "|" +
322               (startedAt != null ? startedAt.toString() : "");
323    }
324
325    /**
326     * Checks if a table exists in the database.
327     */
328    private boolean tableExists(Connection conn, String tableName) {
329        // Use a simple query to check if table exists - more reliable than metadata
330        try (Statement stmt = conn.createStatement()) {
331            stmt.executeQuery("SELECT 1 FROM " + tableName + " WHERE 1=0");
332            return true;
333        } catch (SQLException e) {
334            // Table doesn't exist
335            return false;
336        }
337    }
338
339    /**
340     * Merges one source database into the target.
341     */
342    private void mergeDatabase(Connection source, Connection target,
343                               Set<String> existingSessions, String sourceName) throws SQLException {
344        // Check if sessions table exists
345        if (!tableExists(source, "sessions")) {
346            if (verbose) {
347                System.out.println("  Skipping (no sessions table)");
348            }
349            return;
350        }
351
352        // Read all sessions from source
353        try (Statement stmt = source.createStatement();
354             ResultSet rs = stmt.executeQuery("SELECT * FROM sessions ORDER BY id")) {
355
356            while (rs.next()) {
357                long oldSessionId = rs.getLong("id");
358                String workflowName = rs.getString("workflow_name");
359                Timestamp startedAt = rs.getTimestamp("started_at");
360
361                // Check for duplicates
362                String sessionKey = makeSessionKey(workflowName, startedAt);
363                if (existingSessions.contains(sessionKey)) {
364                    if (verbose) {
365                        System.out.println("  Skipping duplicate session: " + workflowName + " at " + startedAt);
366                    }
367                    skippedSessions++;
368                    continue;
369                }
370
371                // Insert session into target and get new ID
372                long newSessionId = insertSession(target, rs, sourceName);
373                existingSessions.add(sessionKey);
374                totalSessions++;
375
376                // Copy logs for this session
377                int logCount = copyLogs(source, target, oldSessionId, newSessionId);
378                totalLogs += logCount;
379
380                // Copy node_results for this session
381                int nrCount = copyNodeResults(source, target, oldSessionId, newSessionId);
382                totalNodeResults += nrCount;
383
384                if (verbose) {
385                    System.out.printf("  Session %d -> %d: %s (%d logs, %d node_results)%n",
386                        oldSessionId, newSessionId, workflowName, logCount, nrCount);
387                }
388            }
389        }
390    }
391
392    /**
393     * Inserts a session into the target database.
394     */
395    private long insertSession(Connection target, ResultSet rs, String sourceName) throws SQLException {
396        String sql = """
397            INSERT INTO sessions (started_at, ended_at, workflow_name, overlay_name,
398                                  inventory_name, node_count, status, source_db)
399            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
400            """;
401        try (PreparedStatement ps = target.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
402            ps.setTimestamp(1, rs.getTimestamp("started_at"));
403            ps.setTimestamp(2, rs.getTimestamp("ended_at"));
404            ps.setString(3, rs.getString("workflow_name"));
405            ps.setString(4, rs.getString("overlay_name"));
406            ps.setString(5, rs.getString("inventory_name"));
407            ps.setInt(6, rs.getInt("node_count"));
408            ps.setString(7, rs.getString("status"));
409            ps.setString(8, sourceName);
410            ps.executeUpdate();
411
412            try (ResultSet keys = ps.getGeneratedKeys()) {
413                if (keys.next()) {
414                    return keys.getLong(1);
415                }
416            }
417        }
418        throw new SQLException("Failed to get generated session ID");
419    }
420
421    /**
422     * Copies logs from source to target with new session ID.
423     */
424    private int copyLogs(Connection source, Connection target,
425                         long oldSessionId, long newSessionId) throws SQLException {
426        int count = 0;
427        String selectSql = "SELECT * FROM logs WHERE session_id = ?";
428        String insertSql = """
429            INSERT INTO logs (session_id, timestamp, node_id, label, action_name,
430                             level, message, exit_code, duration_ms)
431            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
432            """;
433
434        try (PreparedStatement selectPs = source.prepareStatement(selectSql);
435             PreparedStatement insertPs = target.prepareStatement(insertSql)) {
436            selectPs.setLong(1, oldSessionId);
437
438            try (ResultSet rs = selectPs.executeQuery()) {
439                while (rs.next()) {
440                    insertPs.setLong(1, newSessionId);
441                    insertPs.setTimestamp(2, rs.getTimestamp("timestamp"));
442                    insertPs.setString(3, rs.getString("node_id"));
443                    insertPs.setString(4, rs.getString("label"));
444                    insertPs.setString(5, rs.getString("action_name"));
445                    insertPs.setString(6, rs.getString("level"));
446                    insertPs.setString(7, rs.getString("message"));
447
448                    int exitCode = rs.getInt("exit_code");
449                    if (rs.wasNull()) {
450                        insertPs.setNull(8, Types.INTEGER);
451                    } else {
452                        insertPs.setInt(8, exitCode);
453                    }
454
455                    long durationMs = rs.getLong("duration_ms");
456                    if (rs.wasNull()) {
457                        insertPs.setNull(9, Types.BIGINT);
458                    } else {
459                        insertPs.setLong(9, durationMs);
460                    }
461
462                    insertPs.executeUpdate();
463                    count++;
464                }
465            }
466        }
467        return count;
468    }
469
470    /**
471     * Copies node_results from source to target with new session ID.
472     */
473    private int copyNodeResults(Connection source, Connection target,
474                                long oldSessionId, long newSessionId) throws SQLException {
475        int count = 0;
476        String selectSql = "SELECT * FROM node_results WHERE session_id = ?";
477        String insertSql = """
478            INSERT INTO node_results (session_id, node_id, status, reason)
479            VALUES (?, ?, ?, ?)
480            """;
481
482        try (PreparedStatement selectPs = source.prepareStatement(selectSql);
483             PreparedStatement insertPs = target.prepareStatement(insertSql)) {
484            selectPs.setLong(1, oldSessionId);
485
486            try (ResultSet rs = selectPs.executeQuery()) {
487                while (rs.next()) {
488                    insertPs.setLong(1, newSessionId);
489                    insertPs.setString(2, rs.getString("node_id"));
490                    insertPs.setString(3, rs.getString("status"));
491                    insertPs.setString(4, rs.getString("reason"));
492                    insertPs.executeUpdate();
493                    count++;
494                }
495            }
496        }
497        return count;
498    }
499
500    /**
501     * Counts rows in a table.
502     */
503    private int countRows(Connection conn, String table) throws SQLException {
504        try (Statement stmt = conn.createStatement();
505             ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM " + table)) {
506            if (rs.next()) {
507                return rs.getInt(1);
508            }
509        }
510        return 0;
511    }
512
513    /**
514     * Truncates a string to max length with ellipsis.
515     */
516    private static String truncate(String s, int maxLen) {
517        if (s == null || s.length() <= maxLen) {
518            return s;
519        }
520        return s.substring(0, maxLen - 3) + "...";
521    }
522}