Skip to content

Commit

Permalink
Agent-based observation workflow is functional
Browse files Browse the repository at this point in the history
  • Loading branch information
fvilla committed Nov 14, 2023
1 parent 076c109 commit 44f1ef3
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 47 deletions.
13 changes: 4 additions & 9 deletions kcli/src/main/java/org/integratedmodelling/kcli/Context.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.integratedmodelling.kcli;

import java.io.PrintWriter;

import org.integratedmodelling.kcli.engine.Engine;
import org.integratedmodelling.kcli.engine.Geometries;
import org.integratedmodelling.klab.Version;
Expand All @@ -10,14 +8,11 @@
import org.integratedmodelling.klab.api.scope.SessionScope;
import org.integratedmodelling.klab.api.utils.Utils;
import org.integratedmodelling.klab.utils.NameGenerator;

import picocli.CommandLine.Command;
import picocli.CommandLine.*;
import picocli.CommandLine.Help.Ansi;
import picocli.CommandLine.Model.CommandSpec;
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;
import picocli.CommandLine.ParentCommand;
import picocli.CommandLine.Spec;

import java.io.PrintWriter;

@Command(name = "context", mixinStandardHelpOptions = true, version = Version.CURRENT, description = {
"Commands to create, access and manipulate contexts.",
Expand Down Expand Up @@ -130,7 +125,7 @@ public static class Observe implements Runnable {
public void run() {

PrintWriter out = commandSpec.commandLine().getOut();
ContextScope ctx = context == null ? Engine.INSTANCE.getCurrentContext(false)
ContextScope ctx = context == null ? Engine.INSTANCE.getCurrentContext(true)
: Engine.INSTANCE.getContext(context);

if (within != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,56 @@
package org.integratedmodelling.klab.runtime.kactors.messages;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;

import org.integratedmodelling.klab.api.scope.Scope.Status;
import org.integratedmodelling.klab.api.services.runtime.kactors.VM;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;

/**
* Superclass for messages to/from actor. The general pattern is that whenever an action has
* finished with a status, the actor will send the same message that has triggered it after
* completing it with a status and any other needed information so that the scope can be updated at
* the receiving end.
*
* @author Ferd
* Superclass for messages to/from actor. The general pattern is that whenever an action has finished with a
* status, the actor will send the same message that has triggered it after completing it with a status and
* any other needed information so that the scope can be updated at the receiving end.
*
* @author Ferd
*/
public abstract class AgentMessage implements Serializable, VM.AgentMessage {

private static final long serialVersionUID = 721530303478254820L;
private static AtomicLong nextId = new AtomicLong(0L);

/**
* Send a FINAL response, which will remove the response handler at the calling scope.
*
* @param status
* @param data
* @return
*/
public AgentResponse response(Status status, Object... data) {
AgentResponse ret = new AgentResponse();
ret.setStatus(status);
ret.setId(id);
ret.setRemoveHandler(true);
if (data != null) {
for (int i = 0; i < data.length; i++) {
ret.getData().put(data[i].toString(), data[++i]);
}
}
return ret;
}

/**
* Send a NON-FINAL response, leaving the response handler in place to handle the final one. Ensure that a
* final response is sent in all possible situations.
*
* @param status
* @param data
* @return
*/
public AgentResponse statusResponse(Status status, Object... data) {
AgentResponse ret = new AgentResponse();
ret.setStatus(status);
ret.setId(id);
ret.setRemoveHandler(false);
if (data != null) {
for (int i = 0; i < data.length; i++) {
ret.getData().put(data[i].toString(), data[++i]);
Expand All @@ -43,12 +70,12 @@ public void setStatus(Status status) {
this.status = status;
}

public long getId() {
return id;
}
public long getId() {
return id;
}

public void setId(long id) {
this.id = id;
}

public void setId(long id) {
this.id = id;
}

}
Original file line number Diff line number Diff line change
@@ -1,23 +1,41 @@
package org.integratedmodelling.klab.runtime.kactors.messages;

import java.io.Serializable;

import org.integratedmodelling.klab.api.collections.Parameters;
import org.integratedmodelling.klab.api.scope.Scope.Status;
import org.integratedmodelling.klab.api.services.runtime.kactors.VM;

import java.io.Serializable;

/**
* Message sent back from an agent to the calling scope to communicate action status. For messages that can be
* repeated, call {@link #setRemoveHandler(boolean)} before sending back the response, otherwise the calling
* code will remove the response handler.
*/
public class AgentResponse implements Serializable, VM.AgentMessage {

private static final long serialVersionUID = 7014141315769313725L;

// constants for often-used data FIXME/CHECK
public static final String RESULT = "result";
public static final String ERROR = "error";

private long id;
private Parameters<String> data = Parameters.create();
private Status status;

public boolean isRemoveHandler() {
return removeHandler;
}

public void setRemoveHandler(boolean removeHandler) {
this.removeHandler = removeHandler;
}

/*
default behavior for responses is to remove the response handler after the first call.
*/
private boolean removeHandler = true;

public long getId() {
return id;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.integratedmodelling.klab.services.actors;

import org.integratedmodelling.klab.api.geometry.Geometry;
import org.integratedmodelling.klab.api.knowledge.Instance;
import org.integratedmodelling.klab.api.knowledge.Knowledge;
import org.integratedmodelling.klab.api.knowledge.observation.Observation;
import org.integratedmodelling.klab.api.scope.ContextScope;
Expand Down Expand Up @@ -58,7 +59,7 @@ protected void observe(ReActorContext rctx, Observe message) {

var resolver = scope.getService(Resolver.class);

scope.send(message.response(Status.STARTED));
scope.send(message.statusResponse(Status.STARTED));

try {

Expand All @@ -69,19 +70,24 @@ protected void observe(ReActorContext rctx, Observe message) {
return;
}

ContextScope resolutionScope = message.getScope();
if (resolvable instanceof Instance instance) {
resolutionScope = resolutionScope.withGeometry(instance.getScale());
}

/*
* Build the dataflow in the scope
*/
var resolution = resolver.resolve(resolvable, message.getScope());
var resolution = resolver.resolve(resolvable, resolutionScope);

if (resolution.getCoverage().isRelevant()) {

Dataflow<Observation> dataflow = resolver.compile(resolvable, resolution, message.getScope());
Dataflow<Observation> dataflow = resolver.compile(resolvable, resolution, resolutionScope);

/*
* Run the dataflow
*/
result = scope.getService(RuntimeService.class).run(dataflow, message.getScope()).get();
result = scope.getService(RuntimeService.class).run(dataflow, resolutionScope).get();

/*
* TODO adjust overall geometry and catalog
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,6 @@
package org.integratedmodelling.klab.services.scope;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import io.reacted.core.messages.reactors.ReActorStop;
import org.integratedmodelling.klab.Logging;
import org.integratedmodelling.klab.api.collections.Pair;
import org.integratedmodelling.klab.api.collections.Parameters;
Expand All @@ -30,7 +19,13 @@
import org.integratedmodelling.klab.services.actors.messages.user.CreateApplication;
import org.integratedmodelling.klab.services.actors.messages.user.CreateSession;

import io.reacted.core.messages.reactors.ReActorStop;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* Implementations must fill in the getService() strategy. This is a scope that
Expand Down Expand Up @@ -221,7 +216,9 @@ public void post(Consumer<Message> responseHandler, Object... message) {
if (handler != null) {
executor.execute(() -> {
handler.getSecond().accept(handler.getFirst(), (AgentResponse) message[0]);
responseHandlers.remove(((AgentResponse) message[0]).getId());
if (((AgentResponse)message[0]).isRemoveHandler()) {
responseHandlers.remove(((AgentResponse) message[0]).getId());
}
});
}
return;
Expand Down

0 comments on commit 44f1ef3

Please sign in to comment.