001/* 002 * Copyright 2025 devteam@scivicslab.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, 011 * software distributed under the License is distributed on an 012 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the 014 * specific language governing permissions and limitations 015 * under the License. 016 */ 017 018package com.scivicslab.actoriac; 019 020import java.io.File; 021import java.io.FileInputStream; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InputStream; 025import java.util.concurrent.ExecutionException; 026import java.util.logging.Level; 027import java.util.logging.Logger; 028 029import org.json.JSONArray; 030import org.json.JSONObject; 031 032import com.scivicslab.pojoactor.core.Action; 033import com.scivicslab.pojoactor.core.ActionResult; 034import com.scivicslab.pojoactor.workflow.IIActorRef; 035import com.scivicslab.pojoactor.workflow.IIActorSystem; 036 037import static com.scivicslab.pojoactor.core.ActionArgs.getFirst; 038 039/** 040 * Interpreter-interfaced actor reference for {@link NodeInterpreter} instances. 041 * 042 * <p>This class provides a concrete implementation of {@link IIActorRef} 043 * specifically for {@link NodeInterpreter} objects. It handles action invocations 044 * by name, supporting both workflow execution actions (inherited from Interpreter) 045 * and infrastructure actions (SSH command execution).</p> 046 * 047 * <p><strong>Supported actions:</strong></p> 048 * <p><em>Workflow actions (from Interpreter):</em></p> 049 * <ul> 050 * <li>{@code execCode} - Executes the loaded workflow code</li> 051 * <li>{@code readYaml} - Reads a YAML workflow definition from a file path</li> 052 * <li>{@code readJson} - Reads a JSON workflow definition from a file path</li> 053 * <li>{@code readXml} - Reads an XML workflow definition from a file path</li> 054 * <li>{@code reset} - Resets the interpreter state</li> 055 * </ul> 056 * <p><em>Infrastructure actions (Node-specific):</em></p> 057 * <ul> 058 * <li>{@code executeCommand} - Executes a command and reports to accumulator (default)</li> 059 * <li>{@code executeCommandQuiet} - Executes a command without reporting</li> 060 * <li>{@code executeSudoCommand} - Executes sudo command and reports to accumulator (default)</li> 061 * <li>{@code executeSudoCommandQuiet} - Executes sudo command without reporting</li> 062 * </ul> 063 * 064 * @author devteam@scivicslab.com 065 */ 066public class NodeIIAR extends IIActorRef<NodeInterpreter> { 067 068 Logger logger = null; 069 070 /** 071 * Constructs a new NodeIIAR with the specified actor name and node interpreter object. 072 * 073 * @param actorName the name of this actor 074 * @param object the {@link NodeInterpreter} instance managed by this actor reference 075 */ 076 public NodeIIAR(String actorName, NodeInterpreter object) { 077 super(actorName, object); 078 logger = Logger.getLogger(actorName); 079 } 080 081 /** 082 * Constructs a new NodeIIAR with the specified actor name, node interpreter object, 083 * and actor system. 084 * 085 * @param actorName the name of this actor 086 * @param object the {@link NodeInterpreter} instance managed by this actor reference 087 * @param system the actor system managing this actor 088 */ 089 public NodeIIAR(String actorName, NodeInterpreter object, IIActorSystem system) { 090 super(actorName, object, system); 091 logger = Logger.getLogger(actorName); 092 093 // Set the selfActorRef in the Interpreter (NodeInterpreter extends Interpreter) 094 object.setSelfActorRef(this); 095 } 096 097 // ======================================================================== 098 // Workflow Actions 099 // ======================================================================== 100 101 @Action("execCode") 102 public ActionResult execCode(String args) { 103 try { 104 return this.ask(n -> n.execCode()).get(); 105 } catch (InterruptedException | ExecutionException e) { 106 return handleException(e); 107 } 108 } 109 110 @Action("readYaml") 111 public ActionResult readYaml(String args) { 112 String arg = getFirst(args); 113 try { 114 String overlayPath = this.object.getOverlayDir(); 115 if (overlayPath != null) { 116 java.nio.file.Path yamlPath = java.nio.file.Path.of(arg); 117 java.nio.file.Path overlayDir = java.nio.file.Path.of(overlayPath); 118 this.tell(n -> { 119 try { 120 n.readYaml(yamlPath, overlayDir); 121 } catch (IOException e) { 122 throw new RuntimeException(e); 123 } 124 }).get(); 125 return new ActionResult(true, "YAML loaded with overlay: " + overlayPath); 126 } else { 127 try (InputStream input = new FileInputStream(new File(arg))) { 128 this.tell(n -> n.readYaml(input)).get(); 129 return new ActionResult(true, "YAML loaded successfully"); 130 } 131 } 132 } catch (FileNotFoundException e) { 133 logger.log(Level.SEVERE, String.format("file not found: %s", arg), e); 134 return new ActionResult(false, "File not found: " + arg); 135 } catch (IOException e) { 136 logger.log(Level.SEVERE, String.format("IOException: %s", arg), e); 137 return new ActionResult(false, "IO error: " + arg); 138 } catch (InterruptedException | ExecutionException e) { 139 return handleException(e); 140 } catch (RuntimeException e) { 141 if (e.getCause() instanceof IOException) { 142 logger.log(Level.SEVERE, String.format("IOException: %s", arg), e.getCause()); 143 return new ActionResult(false, "IO error: " + arg); 144 } 145 throw e; 146 } 147 } 148 149 @Action("readJson") 150 public ActionResult readJson(String args) { 151 String arg = getFirst(args); 152 try (InputStream input = new FileInputStream(new File(arg))) { 153 this.tell(n -> { 154 try { 155 n.readJson(input); 156 } catch (IOException e) { 157 throw new RuntimeException(e); 158 } 159 }).get(); 160 return new ActionResult(true, "JSON loaded successfully"); 161 } catch (FileNotFoundException e) { 162 logger.log(Level.SEVERE, String.format("file not found: %s", arg), e); 163 return new ActionResult(false, "File not found: " + arg); 164 } catch (IOException e) { 165 logger.log(Level.SEVERE, String.format("IOException: %s", arg), e); 166 return new ActionResult(false, "IO error: " + arg); 167 } catch (InterruptedException | ExecutionException e) { 168 return handleException(e); 169 } 170 } 171 172 @Action("readXml") 173 public ActionResult readXml(String args) { 174 String arg = getFirst(args); 175 try (InputStream input = new FileInputStream(new File(arg))) { 176 this.tell(n -> { 177 try { 178 n.readXml(input); 179 } catch (Exception e) { 180 throw new RuntimeException(e); 181 } 182 }).get(); 183 return new ActionResult(true, "XML loaded successfully"); 184 } catch (FileNotFoundException e) { 185 logger.log(Level.SEVERE, String.format("file not found: %s", arg), e); 186 return new ActionResult(false, "File not found: " + arg); 187 } catch (Exception e) { 188 logger.log(Level.SEVERE, String.format("Exception: %s", arg), e); 189 return new ActionResult(false, "Error: " + arg); 190 } 191 } 192 193 @Action("reset") 194 public ActionResult reset(String args) { 195 try { 196 this.tell(n -> n.reset()).get(); 197 return new ActionResult(true, "Interpreter reset successfully"); 198 } catch (InterruptedException | ExecutionException e) { 199 return handleException(e); 200 } 201 } 202 203 @Action("runUntilEnd") 204 public ActionResult runUntilEnd(String args) { 205 try { 206 int maxIterations = parseMaxIterations(args, 10000); 207 return this.ask(n -> n.runUntilEnd(maxIterations)).get(); 208 } catch (InterruptedException | ExecutionException e) { 209 return handleException(e); 210 } 211 } 212 213 @Action("call") 214 public ActionResult call(String args) { 215 try { 216 JSONArray callArgs = new JSONArray(args); 217 String callWorkflowFile = callArgs.getString(0); 218 return this.ask(n -> n.call(callWorkflowFile)).get(); 219 } catch (InterruptedException | ExecutionException e) { 220 return handleException(e); 221 } 222 } 223 224 @Action("runWorkflow") 225 public ActionResult runWorkflow(String args) { 226 try { 227 JSONArray runArgs = new JSONArray(args); 228 String runWorkflowFile = runArgs.getString(0); 229 int runMaxIterations = runArgs.length() > 1 ? runArgs.getInt(1) : 10000; 230 logger.fine(String.format("Running workflow: %s (maxIterations=%d)", runWorkflowFile, runMaxIterations)); 231 ActionResult result = this.object.runWorkflow(runWorkflowFile, runMaxIterations); 232 logger.fine(String.format("Workflow completed: success=%s, result=%s", result.isSuccess(), result.getResult())); 233 return result; 234 } catch (Exception e) { 235 return handleException(e); 236 } 237 } 238 239 @Action("apply") 240 public ActionResult apply(String args) { 241 try { 242 return this.ask(n -> n.apply(args)).get(); 243 } catch (InterruptedException | ExecutionException e) { 244 return handleException(e); 245 } 246 } 247 248 // ======================================================================== 249 // Command Execution Actions 250 // ======================================================================== 251 252 @Action("executeCommand") 253 public ActionResult executeCommand(String args) { 254 try { 255 String command = extractCommandFromArgs(args); 256 String nodeName = this.getName(); 257 258 Node.OutputCallback callback = createOutputCallback(nodeName); 259 260 Node.CommandResult result = this.ask(n -> { 261 try { 262 return n.executeCommand(command, callback); 263 } catch (IOException e) { 264 throw new RuntimeException(e); 265 } 266 }).get(); 267 268 reportToAccumulator(result); 269 return new ActionResult(result.isSuccess(), combineOutput(result)); 270 } catch (InterruptedException | ExecutionException e) { 271 return handleException(e); 272 } 273 } 274 275 @Action("executeCommandQuiet") 276 public ActionResult executeCommandQuiet(String args) { 277 try { 278 String command = extractCommandFromArgs(args); 279 Node.CommandResult result = this.ask(n -> { 280 try { 281 return n.executeCommand(command); 282 } catch (IOException e) { 283 throw new RuntimeException(e); 284 } 285 }).get(); 286 287 return new ActionResult(result.isSuccess(), 288 String.format("exitCode=%d, stdout='%s', stderr='%s'", 289 result.getExitCode(), result.getStdout(), result.getStderr())); 290 } catch (InterruptedException | ExecutionException e) { 291 return handleException(e); 292 } 293 } 294 295 @Action("executeSudoCommand") 296 public ActionResult executeSudoCommand(String args) { 297 try { 298 String command = extractCommandFromArgs(args); 299 String nodeName = this.getName(); 300 301 Node.OutputCallback callback = createOutputCallback(nodeName); 302 303 Node.CommandResult result = this.ask(n -> { 304 try { 305 return n.executeSudoCommand(command, callback); 306 } catch (IOException e) { 307 throw new RuntimeException(e); 308 } 309 }).get(); 310 311 reportToAccumulator(result); 312 return new ActionResult(result.isSuccess(), combineOutput(result)); 313 } catch (ExecutionException e) { 314 // Check if this is a SUDO_PASSWORD error 315 Throwable cause = e.getCause(); 316 if (cause != null && cause.getMessage() != null && 317 cause.getMessage().contains("SUDO_PASSWORD environment variable is not set")) { 318 String hostname = this.object.getHostname(); 319 String errorMessage = "%" + hostname + ": [FAIL] SUDO_PASSWORD not set"; 320 reportOutputToMultiplexer(this.getName(), errorMessage); 321 return new ActionResult(false, errorMessage); 322 } 323 return handleException(e); 324 } catch (InterruptedException e) { 325 return handleException(e); 326 } 327 } 328 329 @Action("executeSudoCommandQuiet") 330 public ActionResult executeSudoCommandQuiet(String args) { 331 try { 332 String command = extractCommandFromArgs(args); 333 Node.CommandResult result = this.ask(n -> { 334 try { 335 return n.executeSudoCommand(command); 336 } catch (IOException e) { 337 throw new RuntimeException(e); 338 } 339 }).get(); 340 341 return new ActionResult(result.isSuccess(), 342 String.format("exitCode=%d, stdout='%s', stderr='%s'", 343 result.getExitCode(), result.getStdout(), result.getStderr())); 344 } catch (InterruptedException | ExecutionException e) { 345 return handleException(e); 346 } 347 } 348 349 // ======================================================================== 350 // Utility Actions 351 // ======================================================================== 352 353 @Action("sleep") 354 public ActionResult sleep(String args) { 355 try { 356 long millis = Long.parseLong(getFirst(args)); 357 Thread.sleep(millis); 358 return new ActionResult(true, "Slept for " + millis + "ms"); 359 } catch (NumberFormatException e) { 360 logger.log(Level.SEVERE, String.format("Invalid sleep duration: %s", args), e); 361 return new ActionResult(false, "Invalid sleep duration: " + args); 362 } catch (InterruptedException e) { 363 return new ActionResult(false, "Sleep interrupted: " + e.getMessage()); 364 } 365 } 366 367 @Action("print") 368 public ActionResult print(String args) { 369 String text = getFirst(args); 370 System.out.println(text); 371 return new ActionResult(true, "Printed: " + text); 372 } 373 374 @Action("doNothing") 375 public ActionResult doNothing(String args) { 376 return new ActionResult(true, getFirst(args)); 377 } 378 379 // ======================================================================== 380 // Document Workflow Actions 381 // ======================================================================== 382 383 @Action("detectDocumentChanges") 384 public ActionResult detectDocumentChanges(String args) { 385 try { 386 return this.ask(n -> { 387 try { 388 String docListPath = extractCommandFromArgs(args); 389 return n.detectDocumentChanges(docListPath); 390 } catch (IOException e) { 391 return new ActionResult(false, "Error detecting changes: " + e.getMessage()); 392 } 393 }).get(); 394 } catch (InterruptedException | ExecutionException e) { 395 return handleException(e); 396 } 397 } 398 399 @Action("cloneChangedDocuments") 400 public ActionResult cloneChangedDocuments(String args) { 401 try { 402 return this.ask(n -> { 403 try { 404 String docListPath = extractCommandFromArgs(args); 405 return n.cloneChangedDocuments(docListPath); 406 } catch (IOException e) { 407 return new ActionResult(false, "Error cloning documents: " + e.getMessage()); 408 } 409 }).get(); 410 } catch (InterruptedException | ExecutionException e) { 411 return handleException(e); 412 } 413 } 414 415 @Action("buildChangedDocuments") 416 public ActionResult buildChangedDocuments(String args) { 417 try { 418 return this.ask(n -> { 419 try { 420 String docListPath = extractCommandFromArgs(args); 421 return n.buildChangedDocuments(docListPath); 422 } catch (IOException e) { 423 return new ActionResult(false, "Error building documents: " + e.getMessage()); 424 } 425 }).get(); 426 } catch (InterruptedException | ExecutionException e) { 427 return handleException(e); 428 } 429 } 430 431 @Action("deployChangedDocuments") 432 public ActionResult deployChangedDocuments(String args) { 433 try { 434 return this.ask(n -> { 435 try { 436 String docListPath = extractCommandFromArgs(args); 437 return n.deployChangedDocuments(docListPath); 438 } catch (IOException e) { 439 return new ActionResult(false, "Error deploying documents: " + e.getMessage()); 440 } 441 }).get(); 442 } catch (InterruptedException | ExecutionException e) { 443 return handleException(e); 444 } 445 } 446 447 // ======================================================================== 448 // JSON State Output Actions 449 // ======================================================================== 450 451 /** 452 * Outputs JSON State at the given path in pretty JSON format via outputMultiplexer. 453 * 454 * @param args the path to output (from JSON array) 455 * @return ActionResult with the formatted JSON 456 */ 457 @Action("printJson") 458 public ActionResult printJson(String args) { 459 String path = getFirst(args); 460 String formatted = toStringOfJson(path); 461 sendToMultiplexer(formatted); 462 return new ActionResult(true, formatted); 463 } 464 465 /** 466 * Outputs JSON State at the given path in YAML format via outputMultiplexer. 467 * 468 * @param args the path to output (from JSON array) 469 * @return ActionResult with the formatted YAML 470 */ 471 @Action("printYaml") 472 public ActionResult printYaml(String args) { 473 String path = getFirst(args); 474 logger.info(String.format("printYaml called: path='%s'", path)); 475 String formatted = toStringOfYaml(path); 476 logger.info(String.format("printYaml output length: %d", formatted.length())); 477 sendToMultiplexer(formatted); 478 return new ActionResult(true, formatted); 479 } 480 481 /** 482 * Sends formatted output to the outputMultiplexer, line by line. 483 */ 484 private void sendToMultiplexer(String formatted) { 485 IIActorSystem sys = (IIActorSystem) this.system(); 486 IIActorRef<?> multiplexer = sys.getIIActor("outputMultiplexer"); 487 if (multiplexer == null) { 488 return; 489 } 490 491 String nodeName = this.getName(); 492 for (String line : formatted.split("\n")) { 493 JSONObject arg = new JSONObject(); 494 arg.put("source", nodeName); 495 arg.put("type", "stdout"); 496 arg.put("data", line); 497 multiplexer.callByActionName("add", arg.toString()); 498 } 499 } 500 501 // ======================================================================== 502 // Helper Methods 503 // ======================================================================== 504 505 /** 506 * Handles exceptions and returns an appropriate ActionResult. 507 */ 508 private ActionResult handleException(Exception e) { 509 String message; 510 if (e instanceof ExecutionException) { 511 message = extractRootCauseMessage((ExecutionException) e); 512 } else { 513 message = e.getMessage(); 514 } 515 logger.warning(String.format("%s: %s", this.getName(), message)); 516 return new ActionResult(false, message); 517 } 518 519 private int parseMaxIterations(String arg, int defaultValue) { 520 if (arg != null && !arg.isEmpty() && !arg.equals("[]")) { 521 try { 522 JSONArray args = new JSONArray(arg); 523 if (args.length() > 0) { 524 return args.getInt(0); 525 } 526 } catch (Exception e) { 527 // Use default if parsing fails 528 } 529 } 530 return defaultValue; 531 } 532 533 /** 534 * Creates an OutputCallback that forwards output to the multiplexer accumulator. 535 */ 536 private Node.OutputCallback createOutputCallback(String nodeName) { 537 IIActorSystem sys = (IIActorSystem) this.system(); 538 IIActorRef<?> multiplexer = sys.getIIActor("outputMultiplexer"); 539 540 if (multiplexer == null) { 541 return null; 542 } 543 544 return new Node.OutputCallback() { 545 @Override 546 public void onStdout(String line) { 547 JSONObject arg = new JSONObject(); 548 arg.put("source", nodeName); 549 arg.put("type", "stdout"); 550 arg.put("data", line); 551 multiplexer.callByActionName("add", arg.toString()); 552 } 553 554 @Override 555 public void onStderr(String line) { 556 JSONObject arg = new JSONObject(); 557 arg.put("source", nodeName); 558 arg.put("type", "stderr"); 559 arg.put("data", line); 560 multiplexer.callByActionName("add", arg.toString()); 561 } 562 }; 563 } 564 565 /** 566 * Reports command result to the multiplexer accumulator actor if available. 567 */ 568 private void reportToAccumulator(Node.CommandResult result) { 569 IIActorSystem sys = (IIActorSystem) this.system(); 570 IIActorRef<?> multiplexer = sys.getIIActor("outputMultiplexer"); 571 if (multiplexer != null) { 572 JSONObject reportArg = new JSONObject(); 573 reportArg.put("source", this.getName()); 574 reportArg.put("type", this.object.getCurrentTransitionYaml()); 575 reportArg.put("data", combineOutput(result)); 576 multiplexer.callByActionName("add", reportArg.toString()); 577 } 578 } 579 580 /** 581 * Reports a message to the multiplexer accumulator. 582 */ 583 private void reportOutputToMultiplexer(String nodeName, String message) { 584 IIActorSystem sys = (IIActorSystem) this.system(); 585 IIActorRef<?> multiplexer = sys.getIIActor("outputMultiplexer"); 586 if (multiplexer != null) { 587 JSONObject reportArg = new JSONObject(); 588 reportArg.put("source", nodeName); 589 reportArg.put("type", "error"); 590 reportArg.put("data", message); 591 multiplexer.callByActionName("add", reportArg.toString()); 592 } 593 } 594 595 /** 596 * Combines stdout and stderr into a single output string. 597 */ 598 private String combineOutput(Node.CommandResult result) { 599 String output = result.getStdout().trim(); 600 String stderr = result.getStderr().trim(); 601 if (!stderr.isEmpty()) { 602 output = output.isEmpty() ? stderr : output + "\n[stderr]\n" + stderr; 603 } 604 return output; 605 } 606 607 /** 608 * Extracts a meaningful error message from an ExecutionException. 609 */ 610 private String extractRootCauseMessage(ExecutionException e) { 611 Throwable cause = e.getCause(); 612 Throwable current = cause; 613 while (current != null) { 614 if (current instanceof java.io.IOException) { 615 return current.getMessage(); 616 } 617 current = current.getCause(); 618 } 619 return cause != null ? cause.getMessage() : e.getMessage(); 620 } 621 622 /** 623 * Extracts a command string from JSON array arguments. 624 */ 625 private String extractCommandFromArgs(String arg) { 626 try { 627 JSONArray jsonArray = new JSONArray(arg); 628 if (jsonArray.length() == 0) { 629 throw new IllegalArgumentException("Command arguments cannot be empty"); 630 } 631 return jsonArray.getString(0); 632 } catch (Exception e) { 633 throw new IllegalArgumentException( 634 "Invalid command argument format. Expected JSON array with command string: " + arg, e); 635 } 636 } 637}