001/*
002 * Copyright 2025 devteam@scivicslab.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing,
011 * software distributed under the License is distributed on an
012 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied.  See the License for the
014 * specific language governing permissions and limitations
015 * under the License.
016 */
017
018package com.scivicslab.actoriac;
019
020import java.util.concurrent.ExecutorService;
021import java.util.logging.Level;
022import java.util.logging.Logger;
023
024import org.json.JSONObject;
025
026import com.scivicslab.actoriac.log.DistributedLogStore;
027import com.scivicslab.actoriac.log.LogLevel;
028import com.scivicslab.actoriac.log.SessionStatus;
029import com.scivicslab.pojoactor.core.Action;
030import com.scivicslab.pojoactor.core.ActionResult;
031import com.scivicslab.pojoactor.workflow.IIActorRef;
032import com.scivicslab.pojoactor.workflow.IIActorSystem;
033
034/**
035 * Actor wrapper for DistributedLogStore.
036 *
037 * <p>This actor centralizes all database writes for logging. It should be created
038 * under ROOT and used by all accumulator actors in the system.</p>
039 *
040 * <p>To ensure logs are written without blocking workflow execution, this actor
041 * should be called using a dedicated ManagedThreadPool with 1 thread:</p>
042 *
043 * <pre>{@code
044 * // Setup
045 * system.addManagedThreadPool(1);  // index 1 for DB writes
046 * ExecutorService dbPool = system.getManagedThreadPool(1);
047 *
048 * // Usage from accumulator
049 * logStoreActor.tell(store -> store.log(...), dbPool);
050 * }</pre>
051 *
052 * <h2>Actor Tree Position</h2>
053 * <pre>
054 * ROOT
055 * ├── logStore              &lt;-- this actor
056 * ├── accumulator           (system-level)
057 * └── nodeGroup
058 *     ├── accumulator       (workflow-level)
059 *     └── node-*
060 * </pre>
061 *
062 * <h2>Supported Actions</h2>
063 * <ul>
064 *   <li>{@code log} - Log a message with level</li>
065 *   <li>{@code logAction} - Log an action result</li>
066 *   <li>{@code startSession} - Start a new workflow session</li>
067 *   <li>{@code endSession} - End a workflow session</li>
068 *   <li>{@code markNodeSuccess} - Mark a node as succeeded</li>
069 *   <li>{@code markNodeFailed} - Mark a node as failed</li>
070 * </ul>
071 *
072 * @author devteam@scivicslab.com
073 * @since 2.13.0
074 */
075public class LogStoreIIAR extends IIActorRef<DistributedLogStore> {
076
077    private static final Logger logger = Logger.getLogger(LogStoreIIAR.class.getName());
078
079    /**
080     * The dedicated executor service for DB writes.
081     * Using a single-threaded pool ensures writes are serialized.
082     */
083    private final ExecutorService dbExecutor;
084
085    /**
086     * Constructs a new LogStoreIIAR.
087     *
088     * @param actorName the name of this actor (typically "logStore")
089     * @param logStore the DistributedLogStore implementation
090     * @param system the actor system
091     * @param dbExecutor the dedicated executor service for DB writes (should be single-threaded)
092     */
093    public LogStoreIIAR(String actorName, DistributedLogStore logStore,
094                        IIActorSystem system, ExecutorService dbExecutor) {
095        super(actorName, logStore, system);
096        this.dbExecutor = dbExecutor;
097    }
098
099    // ========================================================================
100    // Actions
101    // ========================================================================
102
103    /**
104     * Logs a message with level.
105     *
106     * @param arg JSON with sessionId, nodeId, level, message
107     * @return ActionResult
108     */
109    @Action("log")
110    public ActionResult log(String arg) {
111        try {
112            JSONObject json = new JSONObject(arg);
113            long sessionId = json.getLong("sessionId");
114            String nodeId = json.getString("nodeId");
115            String levelStr = json.getString("level");
116            String message = json.getString("message");
117
118            LogLevel level = LogLevel.valueOf(levelStr);
119
120            this.tell(store -> store.log(sessionId, nodeId, level, message), dbExecutor).get();
121
122            return new ActionResult(true, "Logged");
123        } catch (Exception e) {
124            logger.log(Level.SEVERE, "Error in log", e);
125            return new ActionResult(false, "Error: " + e.getMessage());
126        }
127    }
128
129    /**
130     * Logs an action result.
131     *
132     * @param arg JSON with sessionId, nodeId, label, actionName, exitCode, durationMs, output
133     * @return ActionResult
134     */
135    @Action("logAction")
136    public ActionResult logAction(String arg) {
137        try {
138            JSONObject json = new JSONObject(arg);
139            long sessionId = json.getLong("sessionId");
140            String nodeId = json.getString("nodeId");
141            String label = json.getString("label");
142            String action = json.getString("actionName");
143            int exitCode = json.getInt("exitCode");
144            long durationMs = json.getLong("durationMs");
145            String output = json.getString("output");
146
147            this.tell(store -> store.logAction(sessionId, nodeId, label, action, exitCode, durationMs, output),
148                      dbExecutor).get();
149
150            return new ActionResult(true, "Action logged");
151        } catch (Exception e) {
152            logger.log(Level.SEVERE, "Error in logAction", e);
153            return new ActionResult(false, "Error: " + e.getMessage());
154        }
155    }
156
157    /**
158     * Starts a new workflow session.
159     *
160     * @param arg JSON with workflowName, overlayName, inventoryName, nodeCount
161     * @return ActionResult with session ID
162     */
163    @Action("startSession")
164    public ActionResult startSession(String arg) {
165        try {
166            JSONObject json = new JSONObject(arg);
167            String workflowName = json.getString("workflowName");
168            String overlayName = json.optString("overlayName", null);
169            String inventoryName = json.optString("inventoryName", null);
170            int nodeCount = json.getInt("nodeCount");
171
172            long sessionId = this.ask(store ->
173                store.startSession(workflowName, overlayName, inventoryName, nodeCount),
174                dbExecutor).get();
175
176            return new ActionResult(true, String.valueOf(sessionId));
177        } catch (Exception e) {
178            logger.log(Level.SEVERE, "Error in startSession", e);
179            return new ActionResult(false, "Error: " + e.getMessage());
180        }
181    }
182
183    /**
184     * Ends a workflow session.
185     *
186     * @param arg JSON with sessionId, status
187     * @return ActionResult
188     */
189    @Action("endSession")
190    public ActionResult endSession(String arg) {
191        try {
192            JSONObject json = new JSONObject(arg);
193            long sessionId = json.getLong("sessionId");
194            String statusStr = json.getString("status");
195
196            SessionStatus status = SessionStatus.valueOf(statusStr);
197
198            this.tell(store -> store.endSession(sessionId, status), dbExecutor).get();
199
200            return new ActionResult(true, "Session ended");
201        } catch (Exception e) {
202            logger.log(Level.SEVERE, "Error in endSession", e);
203            return new ActionResult(false, "Error: " + e.getMessage());
204        }
205    }
206
207    /**
208     * Marks a node as succeeded.
209     *
210     * @param arg JSON with sessionId, nodeId
211     * @return ActionResult
212     */
213    @Action("markNodeSuccess")
214    public ActionResult markNodeSuccess(String arg) {
215        try {
216            JSONObject json = new JSONObject(arg);
217            long sessionId = json.getLong("sessionId");
218            String nodeId = json.getString("nodeId");
219
220            this.tell(store -> store.markNodeSuccess(sessionId, nodeId), dbExecutor).get();
221
222            return new ActionResult(true, "Node marked as success");
223        } catch (Exception e) {
224            logger.log(Level.SEVERE, "Error in markNodeSuccess", e);
225            return new ActionResult(false, "Error: " + e.getMessage());
226        }
227    }
228
229    /**
230     * Marks a node as failed.
231     *
232     * @param arg JSON with sessionId, nodeId, reason
233     * @return ActionResult
234     */
235    @Action("markNodeFailed")
236    public ActionResult markNodeFailed(String arg) {
237        try {
238            JSONObject json = new JSONObject(arg);
239            long sessionId = json.getLong("sessionId");
240            String nodeId = json.getString("nodeId");
241            String reason = json.getString("reason");
242
243            this.tell(store -> store.markNodeFailed(sessionId, nodeId, reason), dbExecutor).get();
244
245            return new ActionResult(true, "Node marked as failed");
246        } catch (Exception e) {
247            logger.log(Level.SEVERE, "Error in markNodeFailed", e);
248            return new ActionResult(false, "Error: " + e.getMessage());
249        }
250    }
251
252    /**
253     * Gets the dedicated executor service for DB writes.
254     *
255     * @return the DB executor service
256     */
257    public ExecutorService getDbExecutor() {
258        return dbExecutor;
259    }
260}