001/* 002 * Copyright 2025 devteam@scivicslab.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, 011 * software distributed under the License is distributed on an 012 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the 014 * specific language governing permissions and limitations 015 * under the License. 016 */ 017 018package com.scivicslab.actoriac.accumulator; 019 020import java.util.concurrent.ExecutorService; 021import java.util.concurrent.atomic.AtomicInteger; 022 023import com.scivicslab.actoriac.log.DistributedLogStore; 024import com.scivicslab.pojoactor.core.ActorRef; 025import com.scivicslab.pojoactor.core.accumulator.Accumulator; 026 027/** 028 * Accumulator that writes output to an H2 database via DistributedLogStore. 029 * 030 * <p>This accumulator writes all output to the H2 database asynchronously. 031 * It uses a dedicated executor to avoid blocking workflow execution.</p> 032 * 033 * <h2>Usage</h2> 034 * <pre>{@code 035 * DatabaseAccumulator dbAcc = new DatabaseAccumulator(logStoreActor, dbExecutor, sessionId); 036 * dbAcc.add("node-1", "stdout", "command output"); 037 * dbAcc.add("workflow", "cowsay", renderedCowsayArt); 038 * }</pre> 039 * 040 * @author devteam@scivicslab.com 041 * @since 2.12.0 042 */ 043public class DatabaseAccumulator implements Accumulator { 044 045 private final ActorRef<DistributedLogStore> logStoreActor; 046 private final ExecutorService dbExecutor; 047 private final long sessionId; 048 private final AtomicInteger count = new AtomicInteger(0); 049 050 /** 051 * Constructs a DatabaseAccumulator. 052 * 053 * @param logStoreActor the actor reference for the distributed log store 054 * @param dbExecutor the executor service for async DB writes 055 * @param sessionId the session ID for this workflow execution 056 */ 057 public DatabaseAccumulator(ActorRef<DistributedLogStore> logStoreActor, 058 ExecutorService dbExecutor, 059 long sessionId) { 060 this.logStoreActor = logStoreActor; 061 this.dbExecutor = dbExecutor; 062 this.sessionId = sessionId; 063 } 064 065 @Override 066 public void add(String source, String type, String data) { 067 if (logStoreActor == null || sessionId < 0) { 068 count.incrementAndGet(); 069 return; 070 } 071 072 if (data == null || data.isEmpty()) { 073 count.incrementAndGet(); 074 return; 075 } 076 077 // Format output with fixed-width source prefix on each line 078 String formattedData = formatOutput(source, data); 079 080 // Fire-and-forget: don't wait for DB write to complete 081 logStoreActor.tell( 082 store -> store.logAction(sessionId, source, type, "output", 0, 0L, formattedData), 083 dbExecutor 084 ); 085 count.incrementAndGet(); 086 } 087 088 /** 089 * Formats the output with a fixed-width source prefix on each line. 090 * 091 * @param source the source identifier (e.g., "node-web-01", "cli") 092 * @param data the output data (may contain multiple lines) 093 * @return the formatted output string with prefix on each line 094 */ 095 private String formatOutput(String source, String data) { 096 String prefix = formatPrefix(source); 097 StringBuilder sb = new StringBuilder(); 098 099 String[] lines = data.split("\n", -1); 100 for (int i = 0; i < lines.length; i++) { 101 sb.append(prefix).append(lines[i]); 102 if (i < lines.length - 1) { 103 sb.append("\n"); 104 } 105 } 106 107 return sb.toString(); 108 } 109 110 /** 111 * Creates a prefix from the source name. 112 * 113 * @param source the source identifier 114 * @return formatted prefix like "[node-web-01] " 115 */ 116 private String formatPrefix(String source) { 117 String src = (source != null) ? source : ""; 118 return "[" + src + "] "; 119 } 120 121 @Override 122 public String getSummary() { 123 return "DatabaseAccumulator: " + count.get() + " entries written to database (session " + sessionId + ")"; 124 } 125 126 @Override 127 public int getCount() { 128 return count.get(); 129 } 130 131 @Override 132 public void clear() { 133 count.set(0); 134 } 135}