001/* 002 * Copyright 2025 devteam@scivicslab.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, 011 * software distributed under the License is distributed on an 012 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the 014 * specific language governing permissions and limitations 015 * under the License. 016 */ 017 018package com.scivicslab.actoriac.log; 019 020import java.nio.file.Path; 021import java.sql.*; 022import java.time.LocalDateTime; 023import java.util.ArrayList; 024import java.util.List; 025 026/** 027 * Read-only H2 log reader for querying workflow logs. 028 * 029 * <p>This class provides access to the H2 log database for querying logs. 030 * Uses AUTO_SERVER=TRUE to connect to the H2 server started by the writer process, 031 * allowing concurrent access while actor-IaC is writing logs.</p> 032 * 033 * @author devteam@scivicslab.com 034 */ 035public class H2LogReader implements AutoCloseable { 036 037 private final Connection connection; 038 private final boolean ownsConnection; 039 040 /** 041 * Opens the log database for reading. 042 * 043 * @param dbPath path to the database file (without extension) 044 * @throws SQLException if database connection fails 045 */ 046 public H2LogReader(Path dbPath) throws SQLException { 047 // Connect to the database using AUTO_SERVER=TRUE. 048 // This allows connecting to the server started by H2LogStore or log-server, 049 // or starting a new server if none exists. 050 String url = "jdbc:h2:" + dbPath.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE"; 051 this.connection = DriverManager.getConnection(url); 052 this.ownsConnection = true; 053 } 054 055 /** 056 * Opens a remote log database via H2 TCP server. 057 * 058 * <p>Connects to an H2 log server started with the {@code log-server} command.</p> 059 * 060 * @param host H2 server hostname (typically "localhost") 061 * @param port H2 server TCP port 062 * @param dbPath database path on the server 063 * @throws SQLException if database connection fails 064 */ 065 public H2LogReader(String host, int port, String dbPath) throws SQLException { 066 String url = "jdbc:h2:tcp://" + host + ":" + port + "/" + dbPath; 067 this.connection = DriverManager.getConnection(url); 068 this.ownsConnection = true; 069 } 070 071 /** 072 * Creates a reader using an existing connection. 073 * 074 * <p>Used internally by H2LogStore to delegate read operations. 075 * The connection will NOT be closed when this reader is closed.</p> 076 * 077 * @param connection the database connection to use 078 */ 079 public H2LogReader(Connection connection) { 080 this.connection = connection; 081 this.ownsConnection = false; 082 } 083 084 /** 085 * Gets logs filtered by node ID. 086 * 087 * @param sessionId the session ID 088 * @param nodeId the node ID to filter by 089 * @return list of log entries for the specified node 090 */ 091 public List<LogEntry> getLogsByNode(long sessionId, String nodeId) { 092 List<LogEntry> entries = new ArrayList<>(); 093 try (PreparedStatement ps = connection.prepareStatement( 094 "SELECT * FROM logs WHERE session_id = ? AND node_id = ? ORDER BY timestamp")) { 095 ps.setLong(1, sessionId); 096 ps.setString(2, nodeId); 097 try (ResultSet rs = ps.executeQuery()) { 098 while (rs.next()) { 099 entries.add(mapLogEntry(rs)); 100 } 101 } 102 } catch (SQLException e) { 103 throw new RuntimeException("Failed to query logs", e); 104 } 105 return entries; 106 } 107 108 /** 109 * Gets logs filtered by minimum log level. 110 * 111 * @param sessionId the session ID 112 * @param minLevel the minimum log level to include 113 * @return list of log entries at or above the specified level 114 */ 115 public List<LogEntry> getLogsByLevel(long sessionId, LogLevel minLevel) { 116 List<LogEntry> entries = new ArrayList<>(); 117 try (PreparedStatement ps = connection.prepareStatement( 118 "SELECT * FROM logs WHERE session_id = ? AND level IN (?, ?, ?, ?) ORDER BY timestamp")) { 119 ps.setLong(1, sessionId); 120 int idx = 2; 121 for (LogLevel level : LogLevel.values()) { 122 if (level.isAtLeast(minLevel)) { 123 ps.setString(idx++, level.name()); 124 } else { 125 ps.setString(idx++, "NONE"); 126 } 127 } 128 try (ResultSet rs = ps.executeQuery()) { 129 while (rs.next()) { 130 entries.add(mapLogEntry(rs)); 131 } 132 } 133 } catch (SQLException e) { 134 throw new RuntimeException("Failed to query logs", e); 135 } 136 return entries; 137 } 138 139 /** 140 * Gets a summary of the specified session. 141 * 142 * @param sessionId the session ID 143 * @return the session summary, or null if not found 144 */ 145 public SessionSummary getSummary(long sessionId) { 146 try { 147 String workflowName = null; 148 String overlayName = null; 149 String inventoryName = null; 150 LocalDateTime startedAt = null; 151 LocalDateTime endedAt = null; 152 int nodeCount = 0; 153 SessionStatus status = SessionStatus.RUNNING; 154 155 // Execution context 156 String cwd = null; 157 String gitCommit = null; 158 String gitBranch = null; 159 String commandLine = null; 160 String actorIacVersion = null; 161 String actorIacCommit = null; 162 163 try (PreparedStatement ps = connection.prepareStatement( 164 "SELECT * FROM sessions WHERE id = ?")) { 165 ps.setLong(1, sessionId); 166 try (ResultSet rs = ps.executeQuery()) { 167 if (rs.next()) { 168 workflowName = rs.getString("workflow_name"); 169 overlayName = rs.getString("overlay_name"); 170 inventoryName = rs.getString("inventory_name"); 171 Timestamp ts = rs.getTimestamp("started_at"); 172 startedAt = ts != null ? ts.toLocalDateTime() : null; 173 ts = rs.getTimestamp("ended_at"); 174 endedAt = ts != null ? ts.toLocalDateTime() : null; 175 nodeCount = rs.getInt("node_count"); 176 String statusStr = rs.getString("status"); 177 if (statusStr != null) { 178 status = SessionStatus.valueOf(statusStr); 179 } 180 181 // Read execution context (may be null for older sessions) 182 cwd = getStringOrNull(rs, "cwd"); 183 gitCommit = getStringOrNull(rs, "git_commit"); 184 gitBranch = getStringOrNull(rs, "git_branch"); 185 commandLine = getStringOrNull(rs, "command_line"); 186 actorIacVersion = getStringOrNull(rs, "actoriac_version"); 187 actorIacCommit = getStringOrNull(rs, "actoriac_commit"); 188 } else { 189 return null; 190 } 191 } 192 } 193 194 int successCount = 0; 195 int failedCount = 0; 196 List<String> failedNodes = new ArrayList<>(); 197 198 try (PreparedStatement ps = connection.prepareStatement( 199 "SELECT node_id, status FROM node_results WHERE session_id = ?")) { 200 ps.setLong(1, sessionId); 201 try (ResultSet rs = ps.executeQuery()) { 202 while (rs.next()) { 203 String nodeStatus = rs.getString("status"); 204 if ("SUCCESS".equals(nodeStatus)) { 205 successCount++; 206 } else { 207 failedCount++; 208 failedNodes.add(rs.getString("node_id")); 209 } 210 } 211 } 212 } 213 214 // Use actual node count from node_results if available 215 int actualNodeCount = successCount + failedCount; 216 if (actualNodeCount > 0) { 217 nodeCount = actualNodeCount; 218 } 219 220 int totalLogEntries = 0; 221 int errorCount = 0; 222 223 try (PreparedStatement ps = connection.prepareStatement( 224 "SELECT COUNT(*) as total, SUM(CASE WHEN level = 'ERROR' THEN 1 ELSE 0 END) as errors " + 225 "FROM logs WHERE session_id = ?")) { 226 ps.setLong(1, sessionId); 227 try (ResultSet rs = ps.executeQuery()) { 228 if (rs.next()) { 229 totalLogEntries = rs.getInt("total"); 230 errorCount = rs.getInt("errors"); 231 } 232 } 233 } 234 235 return new SessionSummary(sessionId, workflowName, overlayName, inventoryName, 236 startedAt, endedAt, nodeCount, status, successCount, failedCount, 237 failedNodes, totalLogEntries, errorCount, 238 cwd, gitCommit, gitBranch, commandLine, actorIacVersion, actorIacCommit); 239 240 } catch (SQLException e) { 241 throw new RuntimeException("Failed to get session summary", e); 242 } 243 } 244 245 /** 246 * Safely gets a string column value, returning null if the column doesn't exist. 247 */ 248 private String getStringOrNull(ResultSet rs, String columnName) { 249 try { 250 return rs.getString(columnName); 251 } catch (SQLException e) { 252 // Column may not exist in older databases 253 return null; 254 } 255 } 256 257 /** 258 * Gets the latest session ID. 259 * 260 * @return the latest session ID, or -1 if no sessions exist 261 */ 262 public long getLatestSessionId() { 263 try (Statement stmt = connection.createStatement(); 264 ResultSet rs = stmt.executeQuery("SELECT MAX(id) FROM sessions")) { 265 if (rs.next()) { 266 long id = rs.getLong(1); 267 return rs.wasNull() ? -1 : id; 268 } 269 } catch (SQLException e) { 270 throw new RuntimeException("Failed to get latest session", e); 271 } 272 return -1; 273 } 274 275 /** 276 * Lists recent sessions. 277 * 278 * @param limit maximum number of sessions to return 279 * @return list of session summaries, ordered by start time descending 280 */ 281 public List<SessionSummary> listSessions(int limit) { 282 List<SessionSummary> sessions = new ArrayList<>(); 283 try (PreparedStatement ps = connection.prepareStatement( 284 "SELECT id FROM sessions ORDER BY started_at DESC LIMIT ?")) { 285 ps.setInt(1, limit); 286 try (ResultSet rs = ps.executeQuery()) { 287 while (rs.next()) { 288 sessions.add(getSummary(rs.getLong("id"))); 289 } 290 } 291 } catch (SQLException e) { 292 throw new RuntimeException("Failed to list sessions", e); 293 } 294 return sessions; 295 } 296 297 /** 298 * Lists sessions filtered by criteria. 299 * 300 * @param workflowName filter by workflow name (null to skip) 301 * @param overlayName filter by overlay name (null to skip) 302 * @param inventoryName filter by inventory name (null to skip) 303 * @param startedAfter filter by start time (null to skip) 304 * @param limit maximum number of sessions to return 305 * @return list of session summaries matching the criteria 306 */ 307 public List<SessionSummary> listSessionsFiltered(String workflowName, String overlayName, 308 String inventoryName, LocalDateTime startedAfter, 309 int limit) { 310 return listSessionsFiltered(workflowName, overlayName, inventoryName, startedAfter, null, limit); 311 } 312 313 /** 314 * Lists sessions filtered by criteria including end time. 315 * 316 * @param workflowName filter by workflow name (null to skip) 317 * @param overlayName filter by overlay name (null to skip) 318 * @param inventoryName filter by inventory name (null to skip) 319 * @param startedAfter filter by start time (null to skip) 320 * @param endedAfter filter by end time (null to skip) 321 * @param limit maximum number of sessions to return 322 * @return list of session summaries matching the criteria 323 */ 324 public List<SessionSummary> listSessionsFiltered(String workflowName, String overlayName, 325 String inventoryName, LocalDateTime startedAfter, 326 LocalDateTime endedAfter, int limit) { 327 List<SessionSummary> sessions = new ArrayList<>(); 328 StringBuilder sql = new StringBuilder("SELECT id FROM sessions WHERE 1=1"); 329 330 if (workflowName != null) { 331 sql.append(" AND workflow_name = ?"); 332 } 333 if (overlayName != null) { 334 sql.append(" AND overlay_name = ?"); 335 } 336 if (inventoryName != null) { 337 sql.append(" AND inventory_name = ?"); 338 } 339 if (startedAfter != null) { 340 sql.append(" AND started_at >= ?"); 341 } 342 if (endedAfter != null) { 343 sql.append(" AND ended_at >= ?"); 344 } 345 sql.append(" ORDER BY started_at DESC LIMIT ?"); 346 347 try (PreparedStatement ps = connection.prepareStatement(sql.toString())) { 348 int idx = 1; 349 if (workflowName != null) { 350 ps.setString(idx++, workflowName); 351 } 352 if (overlayName != null) { 353 ps.setString(idx++, overlayName); 354 } 355 if (inventoryName != null) { 356 ps.setString(idx++, inventoryName); 357 } 358 if (startedAfter != null) { 359 ps.setTimestamp(idx++, Timestamp.valueOf(startedAfter)); 360 } 361 if (endedAfter != null) { 362 ps.setTimestamp(idx++, Timestamp.valueOf(endedAfter)); 363 } 364 ps.setInt(idx, limit); 365 366 try (ResultSet rs = ps.executeQuery()) { 367 while (rs.next()) { 368 sessions.add(getSummary(rs.getLong("id"))); 369 } 370 } 371 } catch (SQLException e) { 372 throw new RuntimeException("Failed to list sessions", e); 373 } 374 return sessions; 375 } 376 377 /** 378 * Lists sessions filtered by workflow name. 379 * 380 * @param workflowName the workflow name to filter by 381 * @param limit maximum number of sessions to return 382 * @return list of session summaries for the specified workflow 383 */ 384 public List<SessionSummary> listSessionsByWorkflow(String workflowName, int limit) { 385 return listSessionsFiltered(workflowName, null, null, null, limit); 386 } 387 388 /** 389 * Lists sessions filtered by overlay name. 390 * 391 * @param overlayName the overlay name to filter by 392 * @param limit maximum number of sessions to return 393 * @return list of session summaries for the specified overlay 394 */ 395 public List<SessionSummary> listSessionsByOverlay(String overlayName, int limit) { 396 return listSessionsFiltered(null, overlayName, null, null, limit); 397 } 398 399 /** 400 * Lists sessions filtered by inventory name. 401 * 402 * @param inventoryName the inventory name to filter by 403 * @param limit maximum number of sessions to return 404 * @return list of session summaries for the specified inventory 405 */ 406 public List<SessionSummary> listSessionsByInventory(String inventoryName, int limit) { 407 return listSessionsFiltered(null, null, inventoryName, null, limit); 408 } 409 410 /** 411 * Lists sessions started after the specified time. 412 * 413 * @param startedAfter only include sessions started after this time 414 * @param limit maximum number of sessions to return 415 * @return list of session summaries started after the specified time 416 */ 417 public List<SessionSummary> listSessionsAfter(LocalDateTime startedAfter, int limit) { 418 return listSessionsFiltered(null, null, null, startedAfter, limit); 419 } 420 421 /** 422 * Information about a node in a session. 423 * 424 * @param nodeId the node identifier 425 * @param status the node status (SUCCESS, FAILED, or null if not yet recorded) 426 * @param logCount the number of log entries for this node 427 */ 428 public record NodeInfo(String nodeId, String status, int logCount) {} 429 430 /** 431 * Gets all nodes that participated in a session. 432 * 433 * <p>Returns only nodes that have results recorded in node_results table, 434 * which represents actual workflow target nodes (not internal actors like 435 * cli, nodeGroup, etc.).</p> 436 * 437 * @param sessionId the session ID 438 * @return list of node information, ordered by node ID 439 */ 440 public List<NodeInfo> getNodesInSession(long sessionId) { 441 List<NodeInfo> nodes = new ArrayList<>(); 442 String sql = """ 443 SELECT nr.node_id, 444 nr.status, 445 COUNT(l.id) as log_count 446 FROM node_results nr 447 LEFT JOIN logs l ON nr.session_id = l.session_id AND nr.node_id = l.node_id 448 WHERE nr.session_id = ? 449 GROUP BY nr.node_id, nr.status 450 ORDER BY nr.node_id 451 """; 452 453 try (PreparedStatement ps = connection.prepareStatement(sql)) { 454 ps.setLong(1, sessionId); 455 try (ResultSet rs = ps.executeQuery()) { 456 while (rs.next()) { 457 nodes.add(new NodeInfo( 458 rs.getString("node_id"), 459 rs.getString("status"), 460 rs.getInt("log_count") 461 )); 462 } 463 } 464 } catch (SQLException e) { 465 throw new RuntimeException("Failed to get nodes in session", e); 466 } 467 return nodes; 468 } 469 470 private LogEntry mapLogEntry(ResultSet rs) throws SQLException { 471 Timestamp ts = rs.getTimestamp("timestamp"); 472 Integer exitCode = rs.getObject("exit_code") != null ? rs.getInt("exit_code") : null; 473 Long durationMs = rs.getObject("duration_ms") != null ? rs.getLong("duration_ms") : null; 474 String levelStr = rs.getString("level"); 475 LogLevel level = levelStr != null ? LogLevel.valueOf(levelStr) : LogLevel.INFO; 476 477 return new LogEntry( 478 rs.getLong("id"), 479 rs.getLong("session_id"), 480 ts != null ? ts.toLocalDateTime() : null, 481 rs.getString("node_id"), 482 rs.getString("label"), 483 rs.getString("action_name"), 484 level, 485 rs.getString("message"), 486 exitCode, 487 durationMs 488 ); 489 } 490 491 @Override 492 public void close() throws SQLException { 493 if (ownsConnection) { 494 connection.close(); 495 } 496 } 497}