001/*
002 * Copyright 2025 devteam@scivicslab.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing,
011 * software distributed under the License is distributed on an
012 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied.  See the License for the
014 * specific language governing permissions and limitations
015 * under the License.
016 */
017
018package com.scivicslab.actoriac.log;
019
020import java.nio.file.Path;
021import java.sql.*;
022import java.time.LocalDateTime;
023import java.util.ArrayList;
024import java.util.List;
025
026/**
027 * Read-only H2 log reader for querying workflow logs.
028 *
029 * <p>This class provides access to the H2 log database for querying logs.
030 * Uses AUTO_SERVER=TRUE to connect to the H2 server started by the writer process,
031 * allowing concurrent access while actor-IaC is writing logs.</p>
032 *
033 * @author devteam@scivicslab.com
034 */
035public class H2LogReader implements AutoCloseable {
036
037    private final Connection connection;
038    private final boolean ownsConnection;
039
040    /**
041     * Opens the log database for reading.
042     *
043     * @param dbPath path to the database file (without extension)
044     * @throws SQLException if database connection fails
045     */
046    public H2LogReader(Path dbPath) throws SQLException {
047        // Connect to the database using AUTO_SERVER=TRUE.
048        // This allows connecting to the server started by H2LogStore or log-server,
049        // or starting a new server if none exists.
050        String url = "jdbc:h2:" + dbPath.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE";
051        this.connection = DriverManager.getConnection(url);
052        this.ownsConnection = true;
053    }
054
055    /**
056     * Opens a remote log database via H2 TCP server.
057     *
058     * <p>Connects to an H2 log server started with the {@code log-server} command.</p>
059     *
060     * @param host H2 server hostname (typically "localhost")
061     * @param port H2 server TCP port
062     * @param dbPath database path on the server
063     * @throws SQLException if database connection fails
064     */
065    public H2LogReader(String host, int port, String dbPath) throws SQLException {
066        String url = "jdbc:h2:tcp://" + host + ":" + port + "/" + dbPath;
067        this.connection = DriverManager.getConnection(url);
068        this.ownsConnection = true;
069    }
070
071    /**
072     * Creates a reader using an existing connection.
073     *
074     * <p>Used internally by H2LogStore to delegate read operations.
075     * The connection will NOT be closed when this reader is closed.</p>
076     *
077     * @param connection the database connection to use
078     */
079    public H2LogReader(Connection connection) {
080        this.connection = connection;
081        this.ownsConnection = false;
082    }
083
084    /**
085     * Gets logs filtered by node ID.
086     *
087     * @param sessionId the session ID
088     * @param nodeId the node ID to filter by
089     * @return list of log entries for the specified node
090     */
091    public List<LogEntry> getLogsByNode(long sessionId, String nodeId) {
092        List<LogEntry> entries = new ArrayList<>();
093        try (PreparedStatement ps = connection.prepareStatement(
094                "SELECT * FROM logs WHERE session_id = ? AND node_id = ? ORDER BY timestamp")) {
095            ps.setLong(1, sessionId);
096            ps.setString(2, nodeId);
097            try (ResultSet rs = ps.executeQuery()) {
098                while (rs.next()) {
099                    entries.add(mapLogEntry(rs));
100                }
101            }
102        } catch (SQLException e) {
103            throw new RuntimeException("Failed to query logs", e);
104        }
105        return entries;
106    }
107
108    /**
109     * Gets logs filtered by minimum log level.
110     *
111     * @param sessionId the session ID
112     * @param minLevel the minimum log level to include
113     * @return list of log entries at or above the specified level
114     */
115    public List<LogEntry> getLogsByLevel(long sessionId, LogLevel minLevel) {
116        List<LogEntry> entries = new ArrayList<>();
117        try (PreparedStatement ps = connection.prepareStatement(
118                "SELECT * FROM logs WHERE session_id = ? AND level IN (?, ?, ?, ?) ORDER BY timestamp")) {
119            ps.setLong(1, sessionId);
120            int idx = 2;
121            for (LogLevel level : LogLevel.values()) {
122                if (level.isAtLeast(minLevel)) {
123                    ps.setString(idx++, level.name());
124                } else {
125                    ps.setString(idx++, "NONE");
126                }
127            }
128            try (ResultSet rs = ps.executeQuery()) {
129                while (rs.next()) {
130                    entries.add(mapLogEntry(rs));
131                }
132            }
133        } catch (SQLException e) {
134            throw new RuntimeException("Failed to query logs", e);
135        }
136        return entries;
137    }
138
139    /**
140     * Gets a summary of the specified session.
141     *
142     * @param sessionId the session ID
143     * @return the session summary, or null if not found
144     */
145    public SessionSummary getSummary(long sessionId) {
146        try {
147            String workflowName = null;
148            String overlayName = null;
149            String inventoryName = null;
150            LocalDateTime startedAt = null;
151            LocalDateTime endedAt = null;
152            int nodeCount = 0;
153            SessionStatus status = SessionStatus.RUNNING;
154
155            // Execution context
156            String cwd = null;
157            String gitCommit = null;
158            String gitBranch = null;
159            String commandLine = null;
160            String actorIacVersion = null;
161            String actorIacCommit = null;
162
163            try (PreparedStatement ps = connection.prepareStatement(
164                    "SELECT * FROM sessions WHERE id = ?")) {
165                ps.setLong(1, sessionId);
166                try (ResultSet rs = ps.executeQuery()) {
167                    if (rs.next()) {
168                        workflowName = rs.getString("workflow_name");
169                        overlayName = rs.getString("overlay_name");
170                        inventoryName = rs.getString("inventory_name");
171                        Timestamp ts = rs.getTimestamp("started_at");
172                        startedAt = ts != null ? ts.toLocalDateTime() : null;
173                        ts = rs.getTimestamp("ended_at");
174                        endedAt = ts != null ? ts.toLocalDateTime() : null;
175                        nodeCount = rs.getInt("node_count");
176                        String statusStr = rs.getString("status");
177                        if (statusStr != null) {
178                            status = SessionStatus.valueOf(statusStr);
179                        }
180
181                        // Read execution context (may be null for older sessions)
182                        cwd = getStringOrNull(rs, "cwd");
183                        gitCommit = getStringOrNull(rs, "git_commit");
184                        gitBranch = getStringOrNull(rs, "git_branch");
185                        commandLine = getStringOrNull(rs, "command_line");
186                        actorIacVersion = getStringOrNull(rs, "actoriac_version");
187                        actorIacCommit = getStringOrNull(rs, "actoriac_commit");
188                    } else {
189                        return null;
190                    }
191                }
192            }
193
194            int successCount = 0;
195            int failedCount = 0;
196            List<String> failedNodes = new ArrayList<>();
197
198            try (PreparedStatement ps = connection.prepareStatement(
199                    "SELECT node_id, status FROM node_results WHERE session_id = ?")) {
200                ps.setLong(1, sessionId);
201                try (ResultSet rs = ps.executeQuery()) {
202                    while (rs.next()) {
203                        String nodeStatus = rs.getString("status");
204                        if ("SUCCESS".equals(nodeStatus)) {
205                            successCount++;
206                        } else {
207                            failedCount++;
208                            failedNodes.add(rs.getString("node_id"));
209                        }
210                    }
211                }
212            }
213
214            // Use actual node count from node_results if available
215            int actualNodeCount = successCount + failedCount;
216            if (actualNodeCount > 0) {
217                nodeCount = actualNodeCount;
218            }
219
220            int totalLogEntries = 0;
221            int errorCount = 0;
222
223            try (PreparedStatement ps = connection.prepareStatement(
224                    "SELECT COUNT(*) as total, SUM(CASE WHEN level = 'ERROR' THEN 1 ELSE 0 END) as errors " +
225                    "FROM logs WHERE session_id = ?")) {
226                ps.setLong(1, sessionId);
227                try (ResultSet rs = ps.executeQuery()) {
228                    if (rs.next()) {
229                        totalLogEntries = rs.getInt("total");
230                        errorCount = rs.getInt("errors");
231                    }
232                }
233            }
234
235            return new SessionSummary(sessionId, workflowName, overlayName, inventoryName,
236                    startedAt, endedAt, nodeCount, status, successCount, failedCount,
237                    failedNodes, totalLogEntries, errorCount,
238                    cwd, gitCommit, gitBranch, commandLine, actorIacVersion, actorIacCommit);
239
240        } catch (SQLException e) {
241            throw new RuntimeException("Failed to get session summary", e);
242        }
243    }
244
245    /**
246     * Safely gets a string column value, returning null if the column doesn't exist.
247     */
248    private String getStringOrNull(ResultSet rs, String columnName) {
249        try {
250            return rs.getString(columnName);
251        } catch (SQLException e) {
252            // Column may not exist in older databases
253            return null;
254        }
255    }
256
257    /**
258     * Gets the latest session ID.
259     *
260     * @return the latest session ID, or -1 if no sessions exist
261     */
262    public long getLatestSessionId() {
263        try (Statement stmt = connection.createStatement();
264             ResultSet rs = stmt.executeQuery("SELECT MAX(id) FROM sessions")) {
265            if (rs.next()) {
266                long id = rs.getLong(1);
267                return rs.wasNull() ? -1 : id;
268            }
269        } catch (SQLException e) {
270            throw new RuntimeException("Failed to get latest session", e);
271        }
272        return -1;
273    }
274
275    /**
276     * Lists recent sessions.
277     *
278     * @param limit maximum number of sessions to return
279     * @return list of session summaries, ordered by start time descending
280     */
281    public List<SessionSummary> listSessions(int limit) {
282        List<SessionSummary> sessions = new ArrayList<>();
283        try (PreparedStatement ps = connection.prepareStatement(
284                "SELECT id FROM sessions ORDER BY started_at DESC LIMIT ?")) {
285            ps.setInt(1, limit);
286            try (ResultSet rs = ps.executeQuery()) {
287                while (rs.next()) {
288                    sessions.add(getSummary(rs.getLong("id")));
289                }
290            }
291        } catch (SQLException e) {
292            throw new RuntimeException("Failed to list sessions", e);
293        }
294        return sessions;
295    }
296
297    /**
298     * Lists sessions filtered by criteria.
299     *
300     * @param workflowName filter by workflow name (null to skip)
301     * @param overlayName filter by overlay name (null to skip)
302     * @param inventoryName filter by inventory name (null to skip)
303     * @param startedAfter filter by start time (null to skip)
304     * @param limit maximum number of sessions to return
305     * @return list of session summaries matching the criteria
306     */
307    public List<SessionSummary> listSessionsFiltered(String workflowName, String overlayName,
308                                                      String inventoryName, LocalDateTime startedAfter,
309                                                      int limit) {
310        return listSessionsFiltered(workflowName, overlayName, inventoryName, startedAfter, null, limit);
311    }
312
313    /**
314     * Lists sessions filtered by criteria including end time.
315     *
316     * @param workflowName filter by workflow name (null to skip)
317     * @param overlayName filter by overlay name (null to skip)
318     * @param inventoryName filter by inventory name (null to skip)
319     * @param startedAfter filter by start time (null to skip)
320     * @param endedAfter filter by end time (null to skip)
321     * @param limit maximum number of sessions to return
322     * @return list of session summaries matching the criteria
323     */
324    public List<SessionSummary> listSessionsFiltered(String workflowName, String overlayName,
325                                                      String inventoryName, LocalDateTime startedAfter,
326                                                      LocalDateTime endedAfter, int limit) {
327        List<SessionSummary> sessions = new ArrayList<>();
328        StringBuilder sql = new StringBuilder("SELECT id FROM sessions WHERE 1=1");
329
330        if (workflowName != null) {
331            sql.append(" AND workflow_name = ?");
332        }
333        if (overlayName != null) {
334            sql.append(" AND overlay_name = ?");
335        }
336        if (inventoryName != null) {
337            sql.append(" AND inventory_name = ?");
338        }
339        if (startedAfter != null) {
340            sql.append(" AND started_at >= ?");
341        }
342        if (endedAfter != null) {
343            sql.append(" AND ended_at >= ?");
344        }
345        sql.append(" ORDER BY started_at DESC LIMIT ?");
346
347        try (PreparedStatement ps = connection.prepareStatement(sql.toString())) {
348            int idx = 1;
349            if (workflowName != null) {
350                ps.setString(idx++, workflowName);
351            }
352            if (overlayName != null) {
353                ps.setString(idx++, overlayName);
354            }
355            if (inventoryName != null) {
356                ps.setString(idx++, inventoryName);
357            }
358            if (startedAfter != null) {
359                ps.setTimestamp(idx++, Timestamp.valueOf(startedAfter));
360            }
361            if (endedAfter != null) {
362                ps.setTimestamp(idx++, Timestamp.valueOf(endedAfter));
363            }
364            ps.setInt(idx, limit);
365
366            try (ResultSet rs = ps.executeQuery()) {
367                while (rs.next()) {
368                    sessions.add(getSummary(rs.getLong("id")));
369                }
370            }
371        } catch (SQLException e) {
372            throw new RuntimeException("Failed to list sessions", e);
373        }
374        return sessions;
375    }
376
377    /**
378     * Lists sessions filtered by workflow name.
379     *
380     * @param workflowName the workflow name to filter by
381     * @param limit maximum number of sessions to return
382     * @return list of session summaries for the specified workflow
383     */
384    public List<SessionSummary> listSessionsByWorkflow(String workflowName, int limit) {
385        return listSessionsFiltered(workflowName, null, null, null, limit);
386    }
387
388    /**
389     * Lists sessions filtered by overlay name.
390     *
391     * @param overlayName the overlay name to filter by
392     * @param limit maximum number of sessions to return
393     * @return list of session summaries for the specified overlay
394     */
395    public List<SessionSummary> listSessionsByOverlay(String overlayName, int limit) {
396        return listSessionsFiltered(null, overlayName, null, null, limit);
397    }
398
399    /**
400     * Lists sessions filtered by inventory name.
401     *
402     * @param inventoryName the inventory name to filter by
403     * @param limit maximum number of sessions to return
404     * @return list of session summaries for the specified inventory
405     */
406    public List<SessionSummary> listSessionsByInventory(String inventoryName, int limit) {
407        return listSessionsFiltered(null, null, inventoryName, null, limit);
408    }
409
410    /**
411     * Lists sessions started after the specified time.
412     *
413     * @param startedAfter only include sessions started after this time
414     * @param limit maximum number of sessions to return
415     * @return list of session summaries started after the specified time
416     */
417    public List<SessionSummary> listSessionsAfter(LocalDateTime startedAfter, int limit) {
418        return listSessionsFiltered(null, null, null, startedAfter, limit);
419    }
420
421    /**
422     * Information about a node in a session.
423     *
424     * @param nodeId the node identifier
425     * @param status the node status (SUCCESS, FAILED, or null if not yet recorded)
426     * @param logCount the number of log entries for this node
427     */
428    public record NodeInfo(String nodeId, String status, int logCount) {}
429
430    /**
431     * Gets all nodes that participated in a session.
432     *
433     * <p>Returns only nodes that have results recorded in node_results table,
434     * which represents actual workflow target nodes (not internal actors like
435     * cli, nodeGroup, etc.).</p>
436     *
437     * @param sessionId the session ID
438     * @return list of node information, ordered by node ID
439     */
440    public List<NodeInfo> getNodesInSession(long sessionId) {
441        List<NodeInfo> nodes = new ArrayList<>();
442        String sql = """
443            SELECT nr.node_id,
444                   nr.status,
445                   COUNT(l.id) as log_count
446            FROM node_results nr
447            LEFT JOIN logs l ON nr.session_id = l.session_id AND nr.node_id = l.node_id
448            WHERE nr.session_id = ?
449            GROUP BY nr.node_id, nr.status
450            ORDER BY nr.node_id
451            """;
452
453        try (PreparedStatement ps = connection.prepareStatement(sql)) {
454            ps.setLong(1, sessionId);
455            try (ResultSet rs = ps.executeQuery()) {
456                while (rs.next()) {
457                    nodes.add(new NodeInfo(
458                            rs.getString("node_id"),
459                            rs.getString("status"),
460                            rs.getInt("log_count")
461                    ));
462                }
463            }
464        } catch (SQLException e) {
465            throw new RuntimeException("Failed to get nodes in session", e);
466        }
467        return nodes;
468    }
469
470    private LogEntry mapLogEntry(ResultSet rs) throws SQLException {
471        Timestamp ts = rs.getTimestamp("timestamp");
472        Integer exitCode = rs.getObject("exit_code") != null ? rs.getInt("exit_code") : null;
473        Long durationMs = rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null;
474        String levelStr = rs.getString("level");
475        LogLevel level = levelStr != null ? LogLevel.valueOf(levelStr) : LogLevel.INFO;
476
477        return new LogEntry(
478                rs.getLong("id"),
479                rs.getLong("session_id"),
480                ts != null ? ts.toLocalDateTime() : null,
481                rs.getString("node_id"),
482                rs.getString("label"),
483                rs.getString("action_name"),
484                level,
485                rs.getString("message"),
486                exitCode,
487                durationMs
488        );
489    }
490
491    @Override
492    public void close() throws SQLException {
493        if (ownsConnection) {
494            connection.close();
495        }
496    }
497}