001/* 002 * Copyright 2025 devteam@scivicslab.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, 011 * software distributed under the License is distributed on an 012 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the 014 * specific language governing permissions and limitations 015 * under the License. 016 */ 017 018package com.scivicslab.actoriac.accumulator; 019 020import java.util.ArrayList; 021import java.util.List; 022import java.util.concurrent.atomic.AtomicInteger; 023 024import com.scivicslab.pojoactor.core.accumulator.Accumulator; 025 026/** 027 * Multiplexer accumulator that forwards output to multiple downstream accumulators. 028 * 029 * <p>This accumulator receives all output from Node/NodeGroup actors and forwards 030 * it to configured downstream accumulators (console, file, database). This ensures 031 * that all output destinations receive identical content.</p> 032 * 033 * <h2>Architecture</h2> 034 * <pre> 035 * Node/NodeGroup Actors 036 * │ 037 * │ all output (cowsay, stdout, stderr) 038 * ▼ 039 * MultiplexerAccumulator 040 * │ 041 * ├─→ ConsoleAccumulator → System.out 042 * ├─→ FileAccumulator → text file 043 * └─→ DatabaseAccumulator → H2 database 044 * </pre> 045 * 046 * <h2>Usage</h2> 047 * <pre>{@code 048 * MultiplexerAccumulator multiplexer = new MultiplexerAccumulator(); 049 * multiplexer.addTarget(new ConsoleAccumulator()); 050 * multiplexer.addTarget(new FileAccumulator(logFile)); 051 * multiplexer.addTarget(new DatabaseAccumulator(logStore, sessionId)); 052 * 053 * // All output goes through the multiplexer 054 * multiplexer.add("node-1", "stdout", "command output..."); 055 * multiplexer.add("workflow", "cowsay", cowsayOutput); 056 * }</pre> 057 * 058 * @author devteam@scivicslab.com 059 * @since 2.12.0 060 */ 061public class MultiplexerAccumulator implements Accumulator { 062 063 private final List<Accumulator> targets = new ArrayList<>(); 064 private final AtomicInteger count = new AtomicInteger(0); 065 066 /** 067 * Constructs an empty MultiplexerAccumulator. 068 * Use {@link #addTarget(Accumulator)} to add downstream accumulators. 069 */ 070 public MultiplexerAccumulator() { 071 } 072 073 /** 074 * Adds a downstream accumulator target. 075 * 076 * <p>All data added to this multiplexer will be forwarded to this target.</p> 077 * 078 * @param target the downstream accumulator to add 079 */ 080 public void addTarget(Accumulator target) { 081 if (target != null) { 082 targets.add(target); 083 } 084 } 085 086 /** 087 * Removes a downstream accumulator target. 088 * 089 * @param target the downstream accumulator to remove 090 * @return true if the target was removed, false if it was not found 091 */ 092 public boolean removeTarget(Accumulator target) { 093 return targets.remove(target); 094 } 095 096 /** 097 * Returns the number of downstream targets. 098 * 099 * @return the number of targets 100 */ 101 public int getTargetCount() { 102 return targets.size(); 103 } 104 105 @Override 106 public void add(String source, String type, String data) { 107 for (Accumulator target : targets) { 108 try { 109 target.add(source, type, data); 110 } catch (Exception e) { 111 // Log but don't fail - other targets should still receive data 112 System.err.println("Warning: Failed to write to accumulator target: " + e.getMessage()); 113 } 114 } 115 count.incrementAndGet(); 116 } 117 118 @Override 119 public String getSummary() { 120 StringBuilder sb = new StringBuilder(); 121 sb.append("MultiplexerAccumulator: ").append(count.get()).append(" entries forwarded to ") 122 .append(targets.size()).append(" targets\n"); 123 for (int i = 0; i < targets.size(); i++) { 124 sb.append(" Target ").append(i + 1).append(": ").append(targets.get(i).getSummary()).append("\n"); 125 } 126 return sb.toString(); 127 } 128 129 @Override 130 public int getCount() { 131 return count.get(); 132 } 133 134 @Override 135 public void clear() { 136 count.set(0); 137 for (Accumulator target : targets) { 138 target.clear(); 139 } 140 } 141}