001/*
002 * Copyright 2025 devteam@scivics-lab.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing,
011 * software distributed under the License is distributed on an
012 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied.  See the License for the
014 * specific language governing permissions and limitations
015 * under the License.
016 */
017
018package com.scivicslab.actoriac.log;
019
020import java.nio.file.Path;
021import java.sql.*;
022import java.time.LocalDateTime;
023import java.util.ArrayList;
024import java.util.List;
025
026/**
027 * Read-only H2 log reader for querying workflow logs.
028 *
029 * <p>This class provides access to the H2 log database for querying logs.
030 * Uses AUTO_SERVER=TRUE to connect to the H2 server started by the writer process,
031 * allowing concurrent access while actor-IaC is writing logs.</p>
032 *
033 * @author devteam@scivics-lab.com
034 */
035public class H2LogReader implements AutoCloseable {
036
037    private final Connection connection;
038
039    /**
040     * Opens the log database for reading.
041     *
042     * @param dbPath path to the database file (without extension)
043     * @throws SQLException if database connection fails
044     */
045    public H2LogReader(Path dbPath) throws SQLException {
046        // Connect to the database.
047        // If H2LogStore is running with AUTO_SERVER=TRUE, H2 will automatically
048        // detect this and connect to the server process via TCP.
049        // FILE_LOCK=FS allows multiple readers even without AUTO_SERVER.
050        String url = "jdbc:h2:" + dbPath.toAbsolutePath().toString()
051                   + ";FILE_LOCK=FS;OPEN_NEW=FALSE";
052        this.connection = DriverManager.getConnection(url);
053    }
054
055    /**
056     * Opens a remote log database via H2 TCP server.
057     *
058     * <p>Connects to an H2 log server started with the {@code log-server} command.</p>
059     *
060     * @param host H2 server hostname (typically "localhost")
061     * @param port H2 server TCP port
062     * @param dbPath database path on the server
063     * @throws SQLException if database connection fails
064     */
065    public H2LogReader(String host, int port, String dbPath) throws SQLException {
066        String url = "jdbc:h2:tcp://" + host + ":" + port + "/" + dbPath;
067        this.connection = DriverManager.getConnection(url);
068    }
069
070    /**
071     * Gets logs filtered by node ID.
072     *
073     * @param sessionId the session ID
074     * @param nodeId the node ID to filter by
075     * @return list of log entries for the specified node
076     */
077    public List<LogEntry> getLogsByNode(long sessionId, String nodeId) {
078        List<LogEntry> entries = new ArrayList<>();
079        try (PreparedStatement ps = connection.prepareStatement(
080                "SELECT * FROM logs WHERE session_id = ? AND node_id = ? ORDER BY timestamp")) {
081            ps.setLong(1, sessionId);
082            ps.setString(2, nodeId);
083            try (ResultSet rs = ps.executeQuery()) {
084                while (rs.next()) {
085                    entries.add(mapLogEntry(rs));
086                }
087            }
088        } catch (SQLException e) {
089            throw new RuntimeException("Failed to query logs", e);
090        }
091        return entries;
092    }
093
094    /**
095     * Gets logs filtered by minimum log level.
096     *
097     * @param sessionId the session ID
098     * @param minLevel the minimum log level to include
099     * @return list of log entries at or above the specified level
100     */
101    public List<LogEntry> getLogsByLevel(long sessionId, LogLevel minLevel) {
102        List<LogEntry> entries = new ArrayList<>();
103        try (PreparedStatement ps = connection.prepareStatement(
104                "SELECT * FROM logs WHERE session_id = ? AND level IN (?, ?, ?, ?) ORDER BY timestamp")) {
105            ps.setLong(1, sessionId);
106            int idx = 2;
107            for (LogLevel level : LogLevel.values()) {
108                if (level.isAtLeast(minLevel)) {
109                    ps.setString(idx++, level.name());
110                } else {
111                    ps.setString(idx++, "NONE");
112                }
113            }
114            try (ResultSet rs = ps.executeQuery()) {
115                while (rs.next()) {
116                    entries.add(mapLogEntry(rs));
117                }
118            }
119        } catch (SQLException e) {
120            throw new RuntimeException("Failed to query logs", e);
121        }
122        return entries;
123    }
124
125    /**
126     * Gets a summary of the specified session.
127     *
128     * @param sessionId the session ID
129     * @return the session summary, or null if not found
130     */
131    public SessionSummary getSummary(long sessionId) {
132        try {
133            String workflowName = null;
134            String overlayName = null;
135            String inventoryName = null;
136            LocalDateTime startedAt = null;
137            LocalDateTime endedAt = null;
138            int nodeCount = 0;
139            SessionStatus status = SessionStatus.RUNNING;
140
141            try (PreparedStatement ps = connection.prepareStatement(
142                    "SELECT * FROM sessions WHERE id = ?")) {
143                ps.setLong(1, sessionId);
144                try (ResultSet rs = ps.executeQuery()) {
145                    if (rs.next()) {
146                        workflowName = rs.getString("workflow_name");
147                        overlayName = rs.getString("overlay_name");
148                        inventoryName = rs.getString("inventory_name");
149                        Timestamp ts = rs.getTimestamp("started_at");
150                        startedAt = ts != null ? ts.toLocalDateTime() : null;
151                        ts = rs.getTimestamp("ended_at");
152                        endedAt = ts != null ? ts.toLocalDateTime() : null;
153                        nodeCount = rs.getInt("node_count");
154                        String statusStr = rs.getString("status");
155                        if (statusStr != null) {
156                            status = SessionStatus.valueOf(statusStr);
157                        }
158                    } else {
159                        return null;
160                    }
161                }
162            }
163
164            int successCount = 0;
165            int failedCount = 0;
166            List<String> failedNodes = new ArrayList<>();
167
168            try (PreparedStatement ps = connection.prepareStatement(
169                    "SELECT node_id, status FROM node_results WHERE session_id = ?")) {
170                ps.setLong(1, sessionId);
171                try (ResultSet rs = ps.executeQuery()) {
172                    while (rs.next()) {
173                        String nodeStatus = rs.getString("status");
174                        if ("SUCCESS".equals(nodeStatus)) {
175                            successCount++;
176                        } else {
177                            failedCount++;
178                            failedNodes.add(rs.getString("node_id"));
179                        }
180                    }
181                }
182            }
183
184            int totalLogEntries = 0;
185            int errorCount = 0;
186
187            try (PreparedStatement ps = connection.prepareStatement(
188                    "SELECT COUNT(*) as total, SUM(CASE WHEN level = 'ERROR' THEN 1 ELSE 0 END) as errors " +
189                    "FROM logs WHERE session_id = ?")) {
190                ps.setLong(1, sessionId);
191                try (ResultSet rs = ps.executeQuery()) {
192                    if (rs.next()) {
193                        totalLogEntries = rs.getInt("total");
194                        errorCount = rs.getInt("errors");
195                    }
196                }
197            }
198
199            return new SessionSummary(sessionId, workflowName, overlayName, inventoryName,
200                    startedAt, endedAt, nodeCount, status, successCount, failedCount,
201                    failedNodes, totalLogEntries, errorCount);
202
203        } catch (SQLException e) {
204            throw new RuntimeException("Failed to get session summary", e);
205        }
206    }
207
208    /**
209     * Gets the latest session ID.
210     *
211     * @return the latest session ID, or -1 if no sessions exist
212     */
213    public long getLatestSessionId() {
214        try (Statement stmt = connection.createStatement();
215             ResultSet rs = stmt.executeQuery("SELECT MAX(id) FROM sessions")) {
216            if (rs.next()) {
217                long id = rs.getLong(1);
218                return rs.wasNull() ? -1 : id;
219            }
220        } catch (SQLException e) {
221            throw new RuntimeException("Failed to get latest session", e);
222        }
223        return -1;
224    }
225
226    /**
227     * Lists recent sessions.
228     *
229     * @param limit maximum number of sessions to return
230     * @return list of session summaries, ordered by start time descending
231     */
232    public List<SessionSummary> listSessions(int limit) {
233        List<SessionSummary> sessions = new ArrayList<>();
234        try (PreparedStatement ps = connection.prepareStatement(
235                "SELECT id FROM sessions ORDER BY started_at DESC LIMIT ?")) {
236            ps.setInt(1, limit);
237            try (ResultSet rs = ps.executeQuery()) {
238                while (rs.next()) {
239                    sessions.add(getSummary(rs.getLong("id")));
240                }
241            }
242        } catch (SQLException e) {
243            throw new RuntimeException("Failed to list sessions", e);
244        }
245        return sessions;
246    }
247
248    /**
249     * Lists sessions filtered by criteria.
250     *
251     * @param workflowName filter by workflow name (null to skip)
252     * @param overlayName filter by overlay name (null to skip)
253     * @param inventoryName filter by inventory name (null to skip)
254     * @param startedAfter filter by start time (null to skip)
255     * @param limit maximum number of sessions to return
256     * @return list of session summaries matching the criteria
257     */
258    public List<SessionSummary> listSessionsFiltered(String workflowName, String overlayName,
259                                                      String inventoryName, LocalDateTime startedAfter,
260                                                      int limit) {
261        return listSessionsFiltered(workflowName, overlayName, inventoryName, startedAfter, null, limit);
262    }
263
264    /**
265     * Lists sessions filtered by criteria including end time.
266     *
267     * @param workflowName filter by workflow name (null to skip)
268     * @param overlayName filter by overlay name (null to skip)
269     * @param inventoryName filter by inventory name (null to skip)
270     * @param startedAfter filter by start time (null to skip)
271     * @param endedAfter filter by end time (null to skip)
272     * @param limit maximum number of sessions to return
273     * @return list of session summaries matching the criteria
274     */
275    public List<SessionSummary> listSessionsFiltered(String workflowName, String overlayName,
276                                                      String inventoryName, LocalDateTime startedAfter,
277                                                      LocalDateTime endedAfter, int limit) {
278        List<SessionSummary> sessions = new ArrayList<>();
279        StringBuilder sql = new StringBuilder("SELECT id FROM sessions WHERE 1=1");
280
281        if (workflowName != null) {
282            sql.append(" AND workflow_name = ?");
283        }
284        if (overlayName != null) {
285            sql.append(" AND overlay_name = ?");
286        }
287        if (inventoryName != null) {
288            sql.append(" AND inventory_name = ?");
289        }
290        if (startedAfter != null) {
291            sql.append(" AND started_at >= ?");
292        }
293        if (endedAfter != null) {
294            sql.append(" AND ended_at >= ?");
295        }
296        sql.append(" ORDER BY started_at DESC LIMIT ?");
297
298        try (PreparedStatement ps = connection.prepareStatement(sql.toString())) {
299            int idx = 1;
300            if (workflowName != null) {
301                ps.setString(idx++, workflowName);
302            }
303            if (overlayName != null) {
304                ps.setString(idx++, overlayName);
305            }
306            if (inventoryName != null) {
307                ps.setString(idx++, inventoryName);
308            }
309            if (startedAfter != null) {
310                ps.setTimestamp(idx++, Timestamp.valueOf(startedAfter));
311            }
312            if (endedAfter != null) {
313                ps.setTimestamp(idx++, Timestamp.valueOf(endedAfter));
314            }
315            ps.setInt(idx, limit);
316
317            try (ResultSet rs = ps.executeQuery()) {
318                while (rs.next()) {
319                    sessions.add(getSummary(rs.getLong("id")));
320                }
321            }
322        } catch (SQLException e) {
323            throw new RuntimeException("Failed to list sessions", e);
324        }
325        return sessions;
326    }
327
328    /**
329     * Lists sessions filtered by workflow name.
330     *
331     * @param workflowName the workflow name to filter by
332     * @param limit maximum number of sessions to return
333     * @return list of session summaries for the specified workflow
334     */
335    public List<SessionSummary> listSessionsByWorkflow(String workflowName, int limit) {
336        return listSessionsFiltered(workflowName, null, null, null, limit);
337    }
338
339    /**
340     * Lists sessions filtered by overlay name.
341     *
342     * @param overlayName the overlay name to filter by
343     * @param limit maximum number of sessions to return
344     * @return list of session summaries for the specified overlay
345     */
346    public List<SessionSummary> listSessionsByOverlay(String overlayName, int limit) {
347        return listSessionsFiltered(null, overlayName, null, null, limit);
348    }
349
350    /**
351     * Lists sessions filtered by inventory name.
352     *
353     * @param inventoryName the inventory name to filter by
354     * @param limit maximum number of sessions to return
355     * @return list of session summaries for the specified inventory
356     */
357    public List<SessionSummary> listSessionsByInventory(String inventoryName, int limit) {
358        return listSessionsFiltered(null, null, inventoryName, null, limit);
359    }
360
361    /**
362     * Lists sessions started after the specified time.
363     *
364     * @param startedAfter only include sessions started after this time
365     * @param limit maximum number of sessions to return
366     * @return list of session summaries started after the specified time
367     */
368    public List<SessionSummary> listSessionsAfter(LocalDateTime startedAfter, int limit) {
369        return listSessionsFiltered(null, null, null, startedAfter, limit);
370    }
371
372    /**
373     * Information about a node in a session.
374     *
375     * @param nodeId the node identifier
376     * @param status the node status (SUCCESS, FAILED, or null if not yet recorded)
377     * @param logCount the number of log entries for this node
378     */
379    public record NodeInfo(String nodeId, String status, int logCount) {}
380
381    /**
382     * Gets all nodes that participated in a session.
383     *
384     * @param sessionId the session ID
385     * @return list of node information, ordered by node ID
386     */
387    public List<NodeInfo> getNodesInSession(long sessionId) {
388        List<NodeInfo> nodes = new ArrayList<>();
389        String sql = """
390            SELECT l.node_id,
391                   nr.status,
392                   COUNT(l.id) as log_count
393            FROM logs l
394            LEFT JOIN node_results nr ON l.session_id = nr.session_id AND l.node_id = nr.node_id
395            WHERE l.session_id = ?
396            GROUP BY l.node_id, nr.status
397            ORDER BY l.node_id
398            """;
399
400        try (PreparedStatement ps = connection.prepareStatement(sql)) {
401            ps.setLong(1, sessionId);
402            try (ResultSet rs = ps.executeQuery()) {
403                while (rs.next()) {
404                    nodes.add(new NodeInfo(
405                            rs.getString("node_id"),
406                            rs.getString("status"),
407                            rs.getInt("log_count")
408                    ));
409                }
410            }
411        } catch (SQLException e) {
412            throw new RuntimeException("Failed to get nodes in session", e);
413        }
414        return nodes;
415    }
416
417    private LogEntry mapLogEntry(ResultSet rs) throws SQLException {
418        Timestamp ts = rs.getTimestamp("timestamp");
419        Integer exitCode = rs.getObject("exit_code") != null ? rs.getInt("exit_code") : null;
420        Long durationMs = rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null;
421        String levelStr = rs.getString("level");
422        LogLevel level = levelStr != null ? LogLevel.valueOf(levelStr) : LogLevel.INFO;
423
424        return new LogEntry(
425                rs.getLong("id"),
426                rs.getLong("session_id"),
427                ts != null ? ts.toLocalDateTime() : null,
428                rs.getString("node_id"),
429                rs.getString("vertex_name"),
430                rs.getString("action_name"),
431                level,
432                rs.getString("message"),
433                exitCode,
434                durationMs
435        );
436    }
437
438    @Override
439    public void close() throws SQLException {
440        connection.close();
441    }
442}