From 44f1ef36708174e02111a2ab913fa747bb45f5c8 Mon Sep 17 00:00:00 2001 From: Ferdinando Villa Date: Tue, 14 Nov 2023 12:28:15 +0100 Subject: [PATCH] Agent-based observation workflow is functional --- .../org/integratedmodelling/kcli/Context.java | 13 ++-- .../kactors/messages/AgentMessage.java | 59 ++++++++++++++----- .../kactors/messages/AgentResponse.java | 26 ++++++-- .../klab/services/actors/ContextAgent.java | 14 +++-- .../klab/services/scope/EngineScope.java | 25 ++++---- 5 files changed, 90 insertions(+), 47 deletions(-) diff --git a/kcli/src/main/java/org/integratedmodelling/kcli/Context.java b/kcli/src/main/java/org/integratedmodelling/kcli/Context.java index e707524b1..7b13afefb 100644 --- a/kcli/src/main/java/org/integratedmodelling/kcli/Context.java +++ b/kcli/src/main/java/org/integratedmodelling/kcli/Context.java @@ -1,7 +1,5 @@ package org.integratedmodelling.kcli; -import java.io.PrintWriter; - import org.integratedmodelling.kcli.engine.Engine; import org.integratedmodelling.kcli.engine.Geometries; import org.integratedmodelling.klab.Version; @@ -10,14 +8,11 @@ import org.integratedmodelling.klab.api.scope.SessionScope; import org.integratedmodelling.klab.api.utils.Utils; import org.integratedmodelling.klab.utils.NameGenerator; - -import picocli.CommandLine.Command; +import picocli.CommandLine.*; import picocli.CommandLine.Help.Ansi; import picocli.CommandLine.Model.CommandSpec; -import picocli.CommandLine.Option; -import picocli.CommandLine.Parameters; -import picocli.CommandLine.ParentCommand; -import picocli.CommandLine.Spec; + +import java.io.PrintWriter; @Command(name = "context", mixinStandardHelpOptions = true, version = Version.CURRENT, description = { "Commands to create, access and manipulate contexts.", @@ -130,7 +125,7 @@ public static class Observe implements Runnable { public void run() { PrintWriter out = commandSpec.commandLine().getOut(); - ContextScope ctx = context == null ? Engine.INSTANCE.getCurrentContext(false) + ContextScope ctx = context == null ? Engine.INSTANCE.getCurrentContext(true) : Engine.INSTANCE.getContext(context); if (within != null) { diff --git a/klab.services.core/src/main/java/org/integratedmodelling/klab/runtime/kactors/messages/AgentMessage.java b/klab.services.core/src/main/java/org/integratedmodelling/klab/runtime/kactors/messages/AgentMessage.java index c065821da..45af48caf 100644 --- a/klab.services.core/src/main/java/org/integratedmodelling/klab/runtime/kactors/messages/AgentMessage.java +++ b/klab.services.core/src/main/java/org/integratedmodelling/klab/runtime/kactors/messages/AgentMessage.java @@ -1,29 +1,56 @@ package org.integratedmodelling.klab.runtime.kactors.messages; -import java.io.Serializable; -import java.util.concurrent.atomic.AtomicLong; - import org.integratedmodelling.klab.api.scope.Scope.Status; import org.integratedmodelling.klab.api.services.runtime.kactors.VM; +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicLong; + /** - * Superclass for messages to/from actor. The general pattern is that whenever an action has - * finished with a status, the actor will send the same message that has triggered it after - * completing it with a status and any other needed information so that the scope can be updated at - * the receiving end. - * - * @author Ferd + * Superclass for messages to/from actor. The general pattern is that whenever an action has finished with a + * status, the actor will send the same message that has triggered it after completing it with a status and + * any other needed information so that the scope can be updated at the receiving end. * + * @author Ferd */ public abstract class AgentMessage implements Serializable, VM.AgentMessage { private static final long serialVersionUID = 721530303478254820L; private static AtomicLong nextId = new AtomicLong(0L); + /** + * Send a FINAL response, which will remove the response handler at the calling scope. + * + * @param status + * @param data + * @return + */ public AgentResponse response(Status status, Object... data) { AgentResponse ret = new AgentResponse(); ret.setStatus(status); ret.setId(id); + ret.setRemoveHandler(true); + if (data != null) { + for (int i = 0; i < data.length; i++) { + ret.getData().put(data[i].toString(), data[++i]); + } + } + return ret; + } + + /** + * Send a NON-FINAL response, leaving the response handler in place to handle the final one. Ensure that a + * final response is sent in all possible situations. + * + * @param status + * @param data + * @return + */ + public AgentResponse statusResponse(Status status, Object... data) { + AgentResponse ret = new AgentResponse(); + ret.setStatus(status); + ret.setId(id); + ret.setRemoveHandler(false); if (data != null) { for (int i = 0; i < data.length; i++) { ret.getData().put(data[i].toString(), data[++i]); @@ -43,12 +70,12 @@ public void setStatus(Status status) { this.status = status; } - public long getId() { - return id; - } + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } - public void setId(long id) { - this.id = id; - } - } diff --git a/klab.services.core/src/main/java/org/integratedmodelling/klab/runtime/kactors/messages/AgentResponse.java b/klab.services.core/src/main/java/org/integratedmodelling/klab/runtime/kactors/messages/AgentResponse.java index e67c185a7..62182a3af 100644 --- a/klab.services.core/src/main/java/org/integratedmodelling/klab/runtime/kactors/messages/AgentResponse.java +++ b/klab.services.core/src/main/java/org/integratedmodelling/klab/runtime/kactors/messages/AgentResponse.java @@ -1,23 +1,41 @@ package org.integratedmodelling.klab.runtime.kactors.messages; -import java.io.Serializable; - import org.integratedmodelling.klab.api.collections.Parameters; import org.integratedmodelling.klab.api.scope.Scope.Status; import org.integratedmodelling.klab.api.services.runtime.kactors.VM; +import java.io.Serializable; + +/** + * Message sent back from an agent to the calling scope to communicate action status. For messages that can be + * repeated, call {@link #setRemoveHandler(boolean)} before sending back the response, otherwise the calling + * code will remove the response handler. + */ public class AgentResponse implements Serializable, VM.AgentMessage { private static final long serialVersionUID = 7014141315769313725L; - + // constants for often-used data FIXME/CHECK public static final String RESULT = "result"; public static final String ERROR = "error"; - + private long id; private Parameters data = Parameters.create(); private Status status; + public boolean isRemoveHandler() { + return removeHandler; + } + + public void setRemoveHandler(boolean removeHandler) { + this.removeHandler = removeHandler; + } + + /* + default behavior for responses is to remove the response handler after the first call. + */ + private boolean removeHandler = true; + public long getId() { return id; } diff --git a/klab.services.engine/src/main/java/org/integratedmodelling/klab/services/actors/ContextAgent.java b/klab.services.engine/src/main/java/org/integratedmodelling/klab/services/actors/ContextAgent.java index e5350290f..1cbc3097c 100644 --- a/klab.services.engine/src/main/java/org/integratedmodelling/klab/services/actors/ContextAgent.java +++ b/klab.services.engine/src/main/java/org/integratedmodelling/klab/services/actors/ContextAgent.java @@ -1,6 +1,7 @@ package org.integratedmodelling.klab.services.actors; import org.integratedmodelling.klab.api.geometry.Geometry; +import org.integratedmodelling.klab.api.knowledge.Instance; import org.integratedmodelling.klab.api.knowledge.Knowledge; import org.integratedmodelling.klab.api.knowledge.observation.Observation; import org.integratedmodelling.klab.api.scope.ContextScope; @@ -58,7 +59,7 @@ protected void observe(ReActorContext rctx, Observe message) { var resolver = scope.getService(Resolver.class); - scope.send(message.response(Status.STARTED)); + scope.send(message.statusResponse(Status.STARTED)); try { @@ -69,19 +70,24 @@ protected void observe(ReActorContext rctx, Observe message) { return; } + ContextScope resolutionScope = message.getScope(); + if (resolvable instanceof Instance instance) { + resolutionScope = resolutionScope.withGeometry(instance.getScale()); + } + /* * Build the dataflow in the scope */ - var resolution = resolver.resolve(resolvable, message.getScope()); + var resolution = resolver.resolve(resolvable, resolutionScope); if (resolution.getCoverage().isRelevant()) { - Dataflow dataflow = resolver.compile(resolvable, resolution, message.getScope()); + Dataflow dataflow = resolver.compile(resolvable, resolution, resolutionScope); /* * Run the dataflow */ - result = scope.getService(RuntimeService.class).run(dataflow, message.getScope()).get(); + result = scope.getService(RuntimeService.class).run(dataflow, resolutionScope).get(); /* * TODO adjust overall geometry and catalog diff --git a/klab.services.engine/src/main/java/org/integratedmodelling/klab/services/scope/EngineScope.java b/klab.services.engine/src/main/java/org/integratedmodelling/klab/services/scope/EngineScope.java index 9e85e8eed..8aa1fae61 100644 --- a/klab.services.engine/src/main/java/org/integratedmodelling/klab/services/scope/EngineScope.java +++ b/klab.services.engine/src/main/java/org/integratedmodelling/klab/services/scope/EngineScope.java @@ -1,17 +1,6 @@ package org.integratedmodelling.klab.services.scope; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.ScheduledExecutorService; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - +import io.reacted.core.messages.reactors.ReActorStop; import org.integratedmodelling.klab.Logging; import org.integratedmodelling.klab.api.collections.Pair; import org.integratedmodelling.klab.api.collections.Parameters; @@ -30,7 +19,13 @@ import org.integratedmodelling.klab.services.actors.messages.user.CreateApplication; import org.integratedmodelling.klab.services.actors.messages.user.CreateSession; -import io.reacted.core.messages.reactors.ReActorStop; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.*; +import java.util.function.BiConsumer; +import java.util.function.Consumer; /** * Implementations must fill in the getService() strategy. This is a scope that @@ -221,7 +216,9 @@ public void post(Consumer responseHandler, Object... message) { if (handler != null) { executor.execute(() -> { handler.getSecond().accept(handler.getFirst(), (AgentResponse) message[0]); - responseHandlers.remove(((AgentResponse) message[0]).getId()); + if (((AgentResponse)message[0]).isRemoveHandler()) { + responseHandlers.remove(((AgentResponse) message[0]).getId()); + } }); } return;