001/*
002 * Copyright 2025 devteam@scivicslab.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing,
011 * software distributed under the License is distributed on an
012 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied.  See the License for the
014 * specific language governing permissions and limitations
015 * under the License.
016 */
017
018package com.scivicslab.actoriac.accumulator;
019
020import java.util.concurrent.ExecutorService;
021import java.util.concurrent.atomic.AtomicInteger;
022
023import com.scivicslab.actoriac.log.DistributedLogStore;
024import com.scivicslab.pojoactor.core.ActorRef;
025import com.scivicslab.pojoactor.core.accumulator.Accumulator;
026
027/**
028 * Accumulator that writes output to an H2 database via DistributedLogStore.
029 *
030 * <p>This accumulator writes all output to the H2 database asynchronously.
031 * It uses a dedicated executor to avoid blocking workflow execution.</p>
032 *
033 * <h2>Usage</h2>
034 * <pre>{@code
035 * DatabaseAccumulator dbAcc = new DatabaseAccumulator(logStoreActor, dbExecutor, sessionId);
036 * dbAcc.add("node-1", "stdout", "command output");
037 * dbAcc.add("workflow", "cowsay", renderedCowsayArt);
038 * }</pre>
039 *
040 * @author devteam@scivicslab.com
041 * @since 2.12.0
042 */
043public class DatabaseAccumulator implements Accumulator {
044
045    private final ActorRef<DistributedLogStore> logStoreActor;
046    private final ExecutorService dbExecutor;
047    private final long sessionId;
048    private final AtomicInteger count = new AtomicInteger(0);
049
050    /**
051     * Constructs a DatabaseAccumulator.
052     *
053     * @param logStoreActor the actor reference for the distributed log store
054     * @param dbExecutor the executor service for async DB writes
055     * @param sessionId the session ID for this workflow execution
056     */
057    public DatabaseAccumulator(ActorRef<DistributedLogStore> logStoreActor,
058                               ExecutorService dbExecutor,
059                               long sessionId) {
060        this.logStoreActor = logStoreActor;
061        this.dbExecutor = dbExecutor;
062        this.sessionId = sessionId;
063    }
064
065    @Override
066    public void add(String source, String type, String data) {
067        if (logStoreActor == null || sessionId < 0) {
068            count.incrementAndGet();
069            return;
070        }
071
072        if (data == null || data.isEmpty()) {
073            count.incrementAndGet();
074            return;
075        }
076
077        // Format output with fixed-width source prefix on each line
078        String formattedData = formatOutput(source, data);
079
080        // Fire-and-forget: don't wait for DB write to complete
081        logStoreActor.tell(
082            store -> store.logAction(sessionId, source, type, "output", 0, 0L, formattedData),
083            dbExecutor
084        );
085        count.incrementAndGet();
086    }
087
088    /**
089     * Formats the output with a fixed-width source prefix on each line.
090     *
091     * @param source the source identifier (e.g., "node-web-01", "cli")
092     * @param data the output data (may contain multiple lines)
093     * @return the formatted output string with prefix on each line
094     */
095    private String formatOutput(String source, String data) {
096        String prefix = formatPrefix(source);
097        StringBuilder sb = new StringBuilder();
098
099        String[] lines = data.split("\n", -1);
100        for (int i = 0; i < lines.length; i++) {
101            sb.append(prefix).append(lines[i]);
102            if (i < lines.length - 1) {
103                sb.append("\n");
104            }
105        }
106
107        return sb.toString();
108    }
109
110    /**
111     * Creates a prefix from the source name.
112     *
113     * @param source the source identifier
114     * @return formatted prefix like "[node-web-01] "
115     */
116    private String formatPrefix(String source) {
117        String src = (source != null) ? source : "";
118        return "[" + src + "] ";
119    }
120
121    @Override
122    public String getSummary() {
123        return "DatabaseAccumulator: " + count.get() + " entries written to database (session " + sessionId + ")";
124    }
125
126    @Override
127    public int getCount() {
128        return count.get();
129    }
130
131    @Override
132    public void clear() {
133        count.set(0);
134    }
135}