001/* 002 * Copyright 2025 devteam@scivicslab.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, 011 * software distributed under the License is distributed on an 012 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the 014 * specific language governing permissions and limitations 015 * under the License. 016 */ 017 018package com.scivicslab.actoriac; 019 020import java.util.concurrent.ExecutorService; 021import java.util.logging.Level; 022import java.util.logging.Logger; 023 024import org.json.JSONObject; 025 026import com.scivicslab.actoriac.log.DistributedLogStore; 027import com.scivicslab.actoriac.log.LogLevel; 028import com.scivicslab.actoriac.log.SessionStatus; 029import com.scivicslab.pojoactor.core.Action; 030import com.scivicslab.pojoactor.core.ActionResult; 031import com.scivicslab.pojoactor.workflow.IIActorRef; 032import com.scivicslab.pojoactor.workflow.IIActorSystem; 033 034/** 035 * Actor wrapper for DistributedLogStore. 036 * 037 * <p>This actor centralizes all database writes for logging. It should be created 038 * under ROOT and used by all accumulator actors in the system.</p> 039 * 040 * <p>To ensure logs are written without blocking workflow execution, this actor 041 * should be called using a dedicated ManagedThreadPool with 1 thread:</p> 042 * 043 * <pre>{@code 044 * // Setup 045 * system.addManagedThreadPool(1); // index 1 for DB writes 046 * ExecutorService dbPool = system.getManagedThreadPool(1); 047 * 048 * // Usage from accumulator 049 * logStoreActor.tell(store -> store.log(...), dbPool); 050 * }</pre> 051 * 052 * <h2>Actor Tree Position</h2> 053 * <pre> 054 * ROOT 055 * ├── logStore <-- this actor 056 * ├── accumulator (system-level) 057 * └── nodeGroup 058 * ├── accumulator (workflow-level) 059 * └── node-* 060 * </pre> 061 * 062 * <h2>Supported Actions</h2> 063 * <ul> 064 * <li>{@code log} - Log a message with level</li> 065 * <li>{@code logAction} - Log an action result</li> 066 * <li>{@code startSession} - Start a new workflow session</li> 067 * <li>{@code endSession} - End a workflow session</li> 068 * <li>{@code markNodeSuccess} - Mark a node as succeeded</li> 069 * <li>{@code markNodeFailed} - Mark a node as failed</li> 070 * </ul> 071 * 072 * @author devteam@scivicslab.com 073 * @since 2.13.0 074 */ 075public class LogStoreIIAR extends IIActorRef<DistributedLogStore> { 076 077 private static final Logger logger = Logger.getLogger(LogStoreIIAR.class.getName()); 078 079 /** 080 * The dedicated executor service for DB writes. 081 * Using a single-threaded pool ensures writes are serialized. 082 */ 083 private final ExecutorService dbExecutor; 084 085 /** 086 * Constructs a new LogStoreIIAR. 087 * 088 * @param actorName the name of this actor (typically "logStore") 089 * @param logStore the DistributedLogStore implementation 090 * @param system the actor system 091 * @param dbExecutor the dedicated executor service for DB writes (should be single-threaded) 092 */ 093 public LogStoreIIAR(String actorName, DistributedLogStore logStore, 094 IIActorSystem system, ExecutorService dbExecutor) { 095 super(actorName, logStore, system); 096 this.dbExecutor = dbExecutor; 097 } 098 099 // ======================================================================== 100 // Actions 101 // ======================================================================== 102 103 /** 104 * Logs a message with level. 105 * 106 * @param arg JSON with sessionId, nodeId, level, message 107 * @return ActionResult 108 */ 109 @Action("log") 110 public ActionResult log(String arg) { 111 try { 112 JSONObject json = new JSONObject(arg); 113 long sessionId = json.getLong("sessionId"); 114 String nodeId = json.getString("nodeId"); 115 String levelStr = json.getString("level"); 116 String message = json.getString("message"); 117 118 LogLevel level = LogLevel.valueOf(levelStr); 119 120 this.tell(store -> store.log(sessionId, nodeId, level, message), dbExecutor).get(); 121 122 return new ActionResult(true, "Logged"); 123 } catch (Exception e) { 124 logger.log(Level.SEVERE, "Error in log", e); 125 return new ActionResult(false, "Error: " + e.getMessage()); 126 } 127 } 128 129 /** 130 * Logs an action result. 131 * 132 * @param arg JSON with sessionId, nodeId, label, actionName, exitCode, durationMs, output 133 * @return ActionResult 134 */ 135 @Action("logAction") 136 public ActionResult logAction(String arg) { 137 try { 138 JSONObject json = new JSONObject(arg); 139 long sessionId = json.getLong("sessionId"); 140 String nodeId = json.getString("nodeId"); 141 String label = json.getString("label"); 142 String action = json.getString("actionName"); 143 int exitCode = json.getInt("exitCode"); 144 long durationMs = json.getLong("durationMs"); 145 String output = json.getString("output"); 146 147 this.tell(store -> store.logAction(sessionId, nodeId, label, action, exitCode, durationMs, output), 148 dbExecutor).get(); 149 150 return new ActionResult(true, "Action logged"); 151 } catch (Exception e) { 152 logger.log(Level.SEVERE, "Error in logAction", e); 153 return new ActionResult(false, "Error: " + e.getMessage()); 154 } 155 } 156 157 /** 158 * Starts a new workflow session. 159 * 160 * @param arg JSON with workflowName, overlayName, inventoryName, nodeCount 161 * @return ActionResult with session ID 162 */ 163 @Action("startSession") 164 public ActionResult startSession(String arg) { 165 try { 166 JSONObject json = new JSONObject(arg); 167 String workflowName = json.getString("workflowName"); 168 String overlayName = json.optString("overlayName", null); 169 String inventoryName = json.optString("inventoryName", null); 170 int nodeCount = json.getInt("nodeCount"); 171 172 long sessionId = this.ask(store -> 173 store.startSession(workflowName, overlayName, inventoryName, nodeCount), 174 dbExecutor).get(); 175 176 return new ActionResult(true, String.valueOf(sessionId)); 177 } catch (Exception e) { 178 logger.log(Level.SEVERE, "Error in startSession", e); 179 return new ActionResult(false, "Error: " + e.getMessage()); 180 } 181 } 182 183 /** 184 * Ends a workflow session. 185 * 186 * @param arg JSON with sessionId, status 187 * @return ActionResult 188 */ 189 @Action("endSession") 190 public ActionResult endSession(String arg) { 191 try { 192 JSONObject json = new JSONObject(arg); 193 long sessionId = json.getLong("sessionId"); 194 String statusStr = json.getString("status"); 195 196 SessionStatus status = SessionStatus.valueOf(statusStr); 197 198 this.tell(store -> store.endSession(sessionId, status), dbExecutor).get(); 199 200 return new ActionResult(true, "Session ended"); 201 } catch (Exception e) { 202 logger.log(Level.SEVERE, "Error in endSession", e); 203 return new ActionResult(false, "Error: " + e.getMessage()); 204 } 205 } 206 207 /** 208 * Marks a node as succeeded. 209 * 210 * @param arg JSON with sessionId, nodeId 211 * @return ActionResult 212 */ 213 @Action("markNodeSuccess") 214 public ActionResult markNodeSuccess(String arg) { 215 try { 216 JSONObject json = new JSONObject(arg); 217 long sessionId = json.getLong("sessionId"); 218 String nodeId = json.getString("nodeId"); 219 220 this.tell(store -> store.markNodeSuccess(sessionId, nodeId), dbExecutor).get(); 221 222 return new ActionResult(true, "Node marked as success"); 223 } catch (Exception e) { 224 logger.log(Level.SEVERE, "Error in markNodeSuccess", e); 225 return new ActionResult(false, "Error: " + e.getMessage()); 226 } 227 } 228 229 /** 230 * Marks a node as failed. 231 * 232 * @param arg JSON with sessionId, nodeId, reason 233 * @return ActionResult 234 */ 235 @Action("markNodeFailed") 236 public ActionResult markNodeFailed(String arg) { 237 try { 238 JSONObject json = new JSONObject(arg); 239 long sessionId = json.getLong("sessionId"); 240 String nodeId = json.getString("nodeId"); 241 String reason = json.getString("reason"); 242 243 this.tell(store -> store.markNodeFailed(sessionId, nodeId, reason), dbExecutor).get(); 244 245 return new ActionResult(true, "Node marked as failed"); 246 } catch (Exception e) { 247 logger.log(Level.SEVERE, "Error in markNodeFailed", e); 248 return new ActionResult(false, "Error: " + e.getMessage()); 249 } 250 } 251 252 /** 253 * Gets the dedicated executor service for DB writes. 254 * 255 * @return the DB executor service 256 */ 257 public ExecutorService getDbExecutor() { 258 return dbExecutor; 259 } 260}