001/* 002 * Copyright 2025 devteam@scivics-lab.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, 011 * software distributed under the License is distributed on an 012 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the 014 * specific language governing permissions and limitations 015 * under the License. 016 */ 017 018package com.scivicslab.actoriac.log; 019 020import java.nio.file.Path; 021import java.sql.*; 022import java.time.LocalDateTime; 023import java.util.ArrayList; 024import java.util.List; 025 026/** 027 * Read-only H2 log reader for querying workflow logs. 028 * 029 * <p>This class provides access to the H2 log database for querying logs. 030 * Uses AUTO_SERVER=TRUE to connect to the H2 server started by the writer process, 031 * allowing concurrent access while actor-IaC is writing logs.</p> 032 * 033 * @author devteam@scivics-lab.com 034 */ 035public class H2LogReader implements AutoCloseable { 036 037 private final Connection connection; 038 039 /** 040 * Opens the log database for reading. 041 * 042 * @param dbPath path to the database file (without extension) 043 * @throws SQLException if database connection fails 044 */ 045 public H2LogReader(Path dbPath) throws SQLException { 046 // Connect to the database. 047 // If H2LogStore is running with AUTO_SERVER=TRUE, H2 will automatically 048 // detect this and connect to the server process via TCP. 049 // FILE_LOCK=FS allows multiple readers even without AUTO_SERVER. 050 String url = "jdbc:h2:" + dbPath.toAbsolutePath().toString() 051 + ";FILE_LOCK=FS;OPEN_NEW=FALSE"; 052 this.connection = DriverManager.getConnection(url); 053 } 054 055 /** 056 * Opens a remote log database via H2 TCP server. 057 * 058 * <p>Connects to an H2 log server started with the {@code log-server} command.</p> 059 * 060 * @param host H2 server hostname (typically "localhost") 061 * @param port H2 server TCP port 062 * @param dbPath database path on the server 063 * @throws SQLException if database connection fails 064 */ 065 public H2LogReader(String host, int port, String dbPath) throws SQLException { 066 String url = "jdbc:h2:tcp://" + host + ":" + port + "/" + dbPath; 067 this.connection = DriverManager.getConnection(url); 068 } 069 070 /** 071 * Gets logs filtered by node ID. 072 * 073 * @param sessionId the session ID 074 * @param nodeId the node ID to filter by 075 * @return list of log entries for the specified node 076 */ 077 public List<LogEntry> getLogsByNode(long sessionId, String nodeId) { 078 List<LogEntry> entries = new ArrayList<>(); 079 try (PreparedStatement ps = connection.prepareStatement( 080 "SELECT * FROM logs WHERE session_id = ? AND node_id = ? ORDER BY timestamp")) { 081 ps.setLong(1, sessionId); 082 ps.setString(2, nodeId); 083 try (ResultSet rs = ps.executeQuery()) { 084 while (rs.next()) { 085 entries.add(mapLogEntry(rs)); 086 } 087 } 088 } catch (SQLException e) { 089 throw new RuntimeException("Failed to query logs", e); 090 } 091 return entries; 092 } 093 094 /** 095 * Gets logs filtered by minimum log level. 096 * 097 * @param sessionId the session ID 098 * @param minLevel the minimum log level to include 099 * @return list of log entries at or above the specified level 100 */ 101 public List<LogEntry> getLogsByLevel(long sessionId, LogLevel minLevel) { 102 List<LogEntry> entries = new ArrayList<>(); 103 try (PreparedStatement ps = connection.prepareStatement( 104 "SELECT * FROM logs WHERE session_id = ? AND level IN (?, ?, ?, ?) ORDER BY timestamp")) { 105 ps.setLong(1, sessionId); 106 int idx = 2; 107 for (LogLevel level : LogLevel.values()) { 108 if (level.isAtLeast(minLevel)) { 109 ps.setString(idx++, level.name()); 110 } else { 111 ps.setString(idx++, "NONE"); 112 } 113 } 114 try (ResultSet rs = ps.executeQuery()) { 115 while (rs.next()) { 116 entries.add(mapLogEntry(rs)); 117 } 118 } 119 } catch (SQLException e) { 120 throw new RuntimeException("Failed to query logs", e); 121 } 122 return entries; 123 } 124 125 /** 126 * Gets a summary of the specified session. 127 * 128 * @param sessionId the session ID 129 * @return the session summary, or null if not found 130 */ 131 public SessionSummary getSummary(long sessionId) { 132 try { 133 String workflowName = null; 134 String overlayName = null; 135 String inventoryName = null; 136 LocalDateTime startedAt = null; 137 LocalDateTime endedAt = null; 138 int nodeCount = 0; 139 SessionStatus status = SessionStatus.RUNNING; 140 141 try (PreparedStatement ps = connection.prepareStatement( 142 "SELECT * FROM sessions WHERE id = ?")) { 143 ps.setLong(1, sessionId); 144 try (ResultSet rs = ps.executeQuery()) { 145 if (rs.next()) { 146 workflowName = rs.getString("workflow_name"); 147 overlayName = rs.getString("overlay_name"); 148 inventoryName = rs.getString("inventory_name"); 149 Timestamp ts = rs.getTimestamp("started_at"); 150 startedAt = ts != null ? ts.toLocalDateTime() : null; 151 ts = rs.getTimestamp("ended_at"); 152 endedAt = ts != null ? ts.toLocalDateTime() : null; 153 nodeCount = rs.getInt("node_count"); 154 String statusStr = rs.getString("status"); 155 if (statusStr != null) { 156 status = SessionStatus.valueOf(statusStr); 157 } 158 } else { 159 return null; 160 } 161 } 162 } 163 164 int successCount = 0; 165 int failedCount = 0; 166 List<String> failedNodes = new ArrayList<>(); 167 168 try (PreparedStatement ps = connection.prepareStatement( 169 "SELECT node_id, status FROM node_results WHERE session_id = ?")) { 170 ps.setLong(1, sessionId); 171 try (ResultSet rs = ps.executeQuery()) { 172 while (rs.next()) { 173 String nodeStatus = rs.getString("status"); 174 if ("SUCCESS".equals(nodeStatus)) { 175 successCount++; 176 } else { 177 failedCount++; 178 failedNodes.add(rs.getString("node_id")); 179 } 180 } 181 } 182 } 183 184 int totalLogEntries = 0; 185 int errorCount = 0; 186 187 try (PreparedStatement ps = connection.prepareStatement( 188 "SELECT COUNT(*) as total, SUM(CASE WHEN level = 'ERROR' THEN 1 ELSE 0 END) as errors " + 189 "FROM logs WHERE session_id = ?")) { 190 ps.setLong(1, sessionId); 191 try (ResultSet rs = ps.executeQuery()) { 192 if (rs.next()) { 193 totalLogEntries = rs.getInt("total"); 194 errorCount = rs.getInt("errors"); 195 } 196 } 197 } 198 199 return new SessionSummary(sessionId, workflowName, overlayName, inventoryName, 200 startedAt, endedAt, nodeCount, status, successCount, failedCount, 201 failedNodes, totalLogEntries, errorCount); 202 203 } catch (SQLException e) { 204 throw new RuntimeException("Failed to get session summary", e); 205 } 206 } 207 208 /** 209 * Gets the latest session ID. 210 * 211 * @return the latest session ID, or -1 if no sessions exist 212 */ 213 public long getLatestSessionId() { 214 try (Statement stmt = connection.createStatement(); 215 ResultSet rs = stmt.executeQuery("SELECT MAX(id) FROM sessions")) { 216 if (rs.next()) { 217 long id = rs.getLong(1); 218 return rs.wasNull() ? -1 : id; 219 } 220 } catch (SQLException e) { 221 throw new RuntimeException("Failed to get latest session", e); 222 } 223 return -1; 224 } 225 226 /** 227 * Lists recent sessions. 228 * 229 * @param limit maximum number of sessions to return 230 * @return list of session summaries, ordered by start time descending 231 */ 232 public List<SessionSummary> listSessions(int limit) { 233 List<SessionSummary> sessions = new ArrayList<>(); 234 try (PreparedStatement ps = connection.prepareStatement( 235 "SELECT id FROM sessions ORDER BY started_at DESC LIMIT ?")) { 236 ps.setInt(1, limit); 237 try (ResultSet rs = ps.executeQuery()) { 238 while (rs.next()) { 239 sessions.add(getSummary(rs.getLong("id"))); 240 } 241 } 242 } catch (SQLException e) { 243 throw new RuntimeException("Failed to list sessions", e); 244 } 245 return sessions; 246 } 247 248 /** 249 * Lists sessions filtered by criteria. 250 * 251 * @param workflowName filter by workflow name (null to skip) 252 * @param overlayName filter by overlay name (null to skip) 253 * @param inventoryName filter by inventory name (null to skip) 254 * @param startedAfter filter by start time (null to skip) 255 * @param limit maximum number of sessions to return 256 * @return list of session summaries matching the criteria 257 */ 258 public List<SessionSummary> listSessionsFiltered(String workflowName, String overlayName, 259 String inventoryName, LocalDateTime startedAfter, 260 int limit) { 261 return listSessionsFiltered(workflowName, overlayName, inventoryName, startedAfter, null, limit); 262 } 263 264 /** 265 * Lists sessions filtered by criteria including end time. 266 * 267 * @param workflowName filter by workflow name (null to skip) 268 * @param overlayName filter by overlay name (null to skip) 269 * @param inventoryName filter by inventory name (null to skip) 270 * @param startedAfter filter by start time (null to skip) 271 * @param endedAfter filter by end time (null to skip) 272 * @param limit maximum number of sessions to return 273 * @return list of session summaries matching the criteria 274 */ 275 public List<SessionSummary> listSessionsFiltered(String workflowName, String overlayName, 276 String inventoryName, LocalDateTime startedAfter, 277 LocalDateTime endedAfter, int limit) { 278 List<SessionSummary> sessions = new ArrayList<>(); 279 StringBuilder sql = new StringBuilder("SELECT id FROM sessions WHERE 1=1"); 280 281 if (workflowName != null) { 282 sql.append(" AND workflow_name = ?"); 283 } 284 if (overlayName != null) { 285 sql.append(" AND overlay_name = ?"); 286 } 287 if (inventoryName != null) { 288 sql.append(" AND inventory_name = ?"); 289 } 290 if (startedAfter != null) { 291 sql.append(" AND started_at >= ?"); 292 } 293 if (endedAfter != null) { 294 sql.append(" AND ended_at >= ?"); 295 } 296 sql.append(" ORDER BY started_at DESC LIMIT ?"); 297 298 try (PreparedStatement ps = connection.prepareStatement(sql.toString())) { 299 int idx = 1; 300 if (workflowName != null) { 301 ps.setString(idx++, workflowName); 302 } 303 if (overlayName != null) { 304 ps.setString(idx++, overlayName); 305 } 306 if (inventoryName != null) { 307 ps.setString(idx++, inventoryName); 308 } 309 if (startedAfter != null) { 310 ps.setTimestamp(idx++, Timestamp.valueOf(startedAfter)); 311 } 312 if (endedAfter != null) { 313 ps.setTimestamp(idx++, Timestamp.valueOf(endedAfter)); 314 } 315 ps.setInt(idx, limit); 316 317 try (ResultSet rs = ps.executeQuery()) { 318 while (rs.next()) { 319 sessions.add(getSummary(rs.getLong("id"))); 320 } 321 } 322 } catch (SQLException e) { 323 throw new RuntimeException("Failed to list sessions", e); 324 } 325 return sessions; 326 } 327 328 /** 329 * Lists sessions filtered by workflow name. 330 * 331 * @param workflowName the workflow name to filter by 332 * @param limit maximum number of sessions to return 333 * @return list of session summaries for the specified workflow 334 */ 335 public List<SessionSummary> listSessionsByWorkflow(String workflowName, int limit) { 336 return listSessionsFiltered(workflowName, null, null, null, limit); 337 } 338 339 /** 340 * Lists sessions filtered by overlay name. 341 * 342 * @param overlayName the overlay name to filter by 343 * @param limit maximum number of sessions to return 344 * @return list of session summaries for the specified overlay 345 */ 346 public List<SessionSummary> listSessionsByOverlay(String overlayName, int limit) { 347 return listSessionsFiltered(null, overlayName, null, null, limit); 348 } 349 350 /** 351 * Lists sessions filtered by inventory name. 352 * 353 * @param inventoryName the inventory name to filter by 354 * @param limit maximum number of sessions to return 355 * @return list of session summaries for the specified inventory 356 */ 357 public List<SessionSummary> listSessionsByInventory(String inventoryName, int limit) { 358 return listSessionsFiltered(null, null, inventoryName, null, limit); 359 } 360 361 /** 362 * Lists sessions started after the specified time. 363 * 364 * @param startedAfter only include sessions started after this time 365 * @param limit maximum number of sessions to return 366 * @return list of session summaries started after the specified time 367 */ 368 public List<SessionSummary> listSessionsAfter(LocalDateTime startedAfter, int limit) { 369 return listSessionsFiltered(null, null, null, startedAfter, limit); 370 } 371 372 /** 373 * Information about a node in a session. 374 * 375 * @param nodeId the node identifier 376 * @param status the node status (SUCCESS, FAILED, or null if not yet recorded) 377 * @param logCount the number of log entries for this node 378 */ 379 public record NodeInfo(String nodeId, String status, int logCount) {} 380 381 /** 382 * Gets all nodes that participated in a session. 383 * 384 * @param sessionId the session ID 385 * @return list of node information, ordered by node ID 386 */ 387 public List<NodeInfo> getNodesInSession(long sessionId) { 388 List<NodeInfo> nodes = new ArrayList<>(); 389 String sql = """ 390 SELECT l.node_id, 391 nr.status, 392 COUNT(l.id) as log_count 393 FROM logs l 394 LEFT JOIN node_results nr ON l.session_id = nr.session_id AND l.node_id = nr.node_id 395 WHERE l.session_id = ? 396 GROUP BY l.node_id, nr.status 397 ORDER BY l.node_id 398 """; 399 400 try (PreparedStatement ps = connection.prepareStatement(sql)) { 401 ps.setLong(1, sessionId); 402 try (ResultSet rs = ps.executeQuery()) { 403 while (rs.next()) { 404 nodes.add(new NodeInfo( 405 rs.getString("node_id"), 406 rs.getString("status"), 407 rs.getInt("log_count") 408 )); 409 } 410 } 411 } catch (SQLException e) { 412 throw new RuntimeException("Failed to get nodes in session", e); 413 } 414 return nodes; 415 } 416 417 private LogEntry mapLogEntry(ResultSet rs) throws SQLException { 418 Timestamp ts = rs.getTimestamp("timestamp"); 419 Integer exitCode = rs.getObject("exit_code") != null ? rs.getInt("exit_code") : null; 420 Long durationMs = rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null; 421 String levelStr = rs.getString("level"); 422 LogLevel level = levelStr != null ? LogLevel.valueOf(levelStr) : LogLevel.INFO; 423 424 return new LogEntry( 425 rs.getLong("id"), 426 rs.getLong("session_id"), 427 ts != null ? ts.toLocalDateTime() : null, 428 rs.getString("node_id"), 429 rs.getString("vertex_name"), 430 rs.getString("action_name"), 431 level, 432 rs.getString("message"), 433 exitCode, 434 durationMs 435 ); 436 } 437 438 @Override 439 public void close() throws SQLException { 440 connection.close(); 441 } 442}