Skip to content

Commit

Permalink
New runtime logics seem sound, ready for merge and test
Browse files Browse the repository at this point in the history
  • Loading branch information
fvilla committed Nov 6, 2024
1 parent 8e37cc9 commit daa843b
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ default String getServiceName() {
* @return the ID of the task running in the runtime, which must be identical to the observation URN and
* will be sent to the scope with the resolution result message.
*/
String resolve(long id, ContextScope scope);
Future<Observation> resolve(long id, ContextScope scope);

/**
* Retrieve any assets from the knowledge graph in the digital twin matching a given class and some query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,17 @@ public class ResolutionRequest {

private Observation observation;
private Observable observable;
// private boolean startResolution;
private String agentName; // for provenance when needed. Agents are identified by name
private List<ResolutionConstraint> resolutionConstraints = new ArrayList<>();
private long observationId;

public long getObservationId() {
return observationId;
}

public void setObservationId(long observationId) {
this.observationId = observationId;
}

public Observation getObservation() {
return observation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.integratedmodelling.klab.api.engine.Engine;
import org.integratedmodelling.klab.api.engine.distribution.Distribution;
import org.integratedmodelling.klab.api.identities.UserIdentity;
import org.integratedmodelling.klab.api.knowledge.observation.Observation;
import org.integratedmodelling.klab.api.lang.kactors.beans.ActionStatistics;
import org.integratedmodelling.klab.api.lang.kactors.beans.TestStatistics;
import org.integratedmodelling.klab.api.lang.kim.KlabDocument;
Expand All @@ -18,6 +19,7 @@
import java.net.URI;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;

/**
* Messages exchanged between scopes using {@link Channel#send(Object...)}. They will be sent through the
Expand Down Expand Up @@ -228,8 +230,9 @@ enum MessageType {
/**
* Resolver event messages
*/
ResolutionSuccessful(Queue.Events, Long.class),
ResolutionAborted(Queue.Events, Long.class),
ResolutionSuccessful(Queue.Events, Observation.class),
ResolutionAborted(Queue.Events, Observation.class),
ResolutionStarted(Queue.Events, Observation.class),


/**
Expand Down Expand Up @@ -269,6 +272,8 @@ private MessageType(Queue queue, Class<?> payloadClass) {
*/
interface Match {

Match when(Predicate<Message> predicate);

/**
* This is called to ensure that the matcher remains active after the first match. The default is
* false.
Expand Down Expand Up @@ -297,6 +302,8 @@ interface Match {
boolean isPersistent();

Object getPayloadMatch();

Predicate<Message> getMessagePredicate();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,24 @@
import java.util.EnumSet;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;

public class MatchImpl implements Message.Match {

private Set<Message.MessageClass> applicableClasses = EnumSet.noneOf(Message.MessageClass.class);
private Set<Message.MessageType> applicableTypes = EnumSet.noneOf(Message.MessageType.class);
private Set<Message.Queue> applicableQueues = EnumSet.noneOf(Message.Queue.class);
private Predicate<Message> messagePredicate;
private Consumer<Message> messageConsumer;
private Object payloadMatch;
boolean persistent = false;

@Override
public Message.Match when(Predicate<Message> predicate) {
this.messagePredicate = predicate;
return this;
}

@Override
public Message.Match persistent(boolean persistent) {
this.persistent = persistent;
Expand Down Expand Up @@ -57,6 +65,11 @@ public Object getPayloadMatch() {
return payloadMatch;
}

@Override
public Predicate<Message> getMessagePredicate() {
return messagePredicate;
}

public static MatchImpl create(Object... args) {
var ret = new MatchImpl();
if (args != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import org.integratedmodelling.common.knowledge.IntelligentMap;
import org.integratedmodelling.common.utils.Utils;
import org.integratedmodelling.klab.api.identities.Identity;
import org.integratedmodelling.klab.api.identities.UserIdentity;
Expand All @@ -26,7 +25,6 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
* A channel instrumented for messaging, containing the AMQP connections and channels for all the subscribed
Expand Down Expand Up @@ -445,6 +443,12 @@ private boolean matchApplies(Message.Match match, Message message) {
}
}

if (match.getMessagePredicate() != null) {
if (!match.getMessagePredicate().test(message)) {
return false;
}
}

if (match.getPayloadMatch() != null) {
if (!match.getPayloadMatch().equals(message.getPayload(Object.class))) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,32 @@ public long submit(Observation observation, ContextScope scope) {
}

@Override
public String resolve(long id, ContextScope scope) {
return client.withScope(scope).get(ServicesAPI.RUNTIME.START_RESOLUTION, String.class, "id", id);
public Future<Observation> resolve(long id, ContextScope scope) {

/*
Set up the task to track the messages. We do this before invoking the method so it's guaranteed to
not return before we can notice.
*/
var ret = scope.trackMessages(Message.match(Message.MessageClass.ObservationLifecycle,
Message.MessageType.ResolutionAborted, Message.MessageType.ResolutionSuccessful).when((message) -> message.getPayload(Observation.class).getId() == id)
, (message) -> {
var observation = message.getPayload(Observation.class);
if (message.getMessageType() == Message.MessageType.ResolutionSuccessful) {
scope.info("Resolution of " + observation + " successful with coverage " + observation.getResolvedCoverage());
return message.getPayload(Observation.class);
}
scope.info("Resolution of " + observation + " failed");
return observation;
});

var request = new ResolutionRequest();
request.setResolutionConstraints(scope.getResolutionConstraints());
request.setObservationId(id);

// this returns the URN of the observation/task or null - we can ignore it at this stage.
client.withScope(scope).post(ServicesAPI.RUNTIME.START_RESOLUTION, request, String.class, "id", id);

return ret;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,7 @@ public Future<Observation> observe(Observation observation) {
long taskId = runtime.submit(observation, this);

if (taskId != Observation.UNASSIGNED_ID) {
var ret = trackMessages(Message.match(Message.MessageClass.ObservationLifecycle,
Message.MessageType.ResolutionAborted, Message.MessageType.ResolutionSuccessful,
observation.getUrn()), (message) -> {
if (message.getMessageType() == Message.MessageType.ResolutionSuccessful) {
info("Resolution of " + observation + " successful with coverage " + observation.getResolvedCoverage());
return message.getPayload(Observation.class);
}
info("Resolution of " + observation + " failed");
return observation;
});
runtime.resolve(taskId, this);
return ret;
return runtime.resolve(taskId, this);
}

// failure: this just returns the unresolved observation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,6 @@ public class ServiceContextScope extends ServiceSessionScope implements ContextS

private Observation observer;
private Observation contextObservation;
// private Set<String> resolutionScenarios = new LinkedHashSet<>();
// private Scale geometry = Scale.empty();
// @Deprecated
// private String resolutionNamespace;
// @Deprecated
// private String resolutionProject;
// @Deprecated
// private Map<Observable, Observation> catalog;
// @Deprecated
// private Map<String, Observable> namedCatalog = new HashMap<>();
// @Deprecated
// private Map<Concept, Concept> contextualizedPredicates = new HashMap<>();
private URL url;
private DigitalTwin digitalTwin;
// FIXME there's also parentScope (generic) and I'm not sure these should be duplicated
Expand Down Expand Up @@ -212,49 +200,9 @@ public Future<Observation> observe(Observation observation) {
return null;
}

return observe(observation, null);

}

private Future<Observation> observe(Observation observation, Activity parentActivity) {

// TODO FIXME this must just call the 2 new methods in the runtime and return the result.

// root-level activity when user is the agent. Inside resolution the activity may have children
var activity = digitalTwin.knowledgeGraph().activity(digitalTwin.knowledgeGraph().user(), this,
observation, Activity.Type.INSTANTIATION, parentActivity);

var id = activity.run(this);

// create task before resolution starts so we guarantee a response
var ret = new CompletableFuture<Observation>();

final var runtime = getService(RuntimeService.class);
final var resolver = getService(Resolver.class);

// start virtual resolution thread. This should be everything we need.
Thread.ofVirtual().start(() -> {
try {
var dataflow = resolver.resolve(observation, this);
if (dataflow != null) {
if (!dataflow.isEmpty()) {
/* TODO return value */
ret.complete(runtime.runDataflow(dataflow, this));
}
activity.success(this, observation, dataflow);
} else {
activity.fail(this, observation);
}
send(Message.MessageClass.ObservationLifecycle, Message.MessageType.ResolutionSuccessful, id);
} catch (Throwable t) {
activity.fail(this, observation, t);
ret.completeExceptionally(t);
send(Message.MessageClass.ObservationLifecycle, Message.MessageType.ResolutionAborted, id);
}
});

return ret;

var runtime = getService(RuntimeService.class);
long taskId = runtime.submit(observation, this);
return runtime.resolve(taskId, this);
}

private void finalizeObservation(Observation observation, Dataflow<Observation> dataflow,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,20 @@ public class RuntimeServerController {
throw new KlabInternalErrorException("Unexpected implementation of request authorization");
}

@PostMapping(ServicesAPI.RUNTIME.START_RESOLUTION)
public @ResponseBody String startResolution(ResolutionRequest request, Principal principal) {
if (principal instanceof EngineAuthorization authorization) {
var contextScope =
authorization.getScope(ContextScope.class).withResolutionConstraints(request.getResolutionConstraints().toArray(new ResolutionConstraint[0]));
if (contextScope instanceof ServiceContextScope serviceContextScope) {
var observation = serviceContextScope.getObservation(request.getObservationId());
runtimeService.klabService().resolve(observation.getId(), serviceContextScope);
return observation.getUrn();
}
}
throw new KlabInternalErrorException("Unexpected implementation of request authorization");
}

@GetMapping(ServicesAPI.RUNTIME.GET_SESSION_INFO)
public @ResponseBody List<SessionInfo> getSessionInfo(Principal principal) {
if (principal instanceof EngineAuthorization authorization) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public static ExecutionSequence compile(List<Pair<Actuator, Integer>> pairs,
}

public boolean run() {

for (var operationGroup : sequence) {
// groups are sequential; grouped items are parallel. Empty groups are currently possible although
// they should be filtered out, but we leave them for completeness for now as they don't really
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import java.io.File;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -260,7 +261,8 @@ public String registerContext(ContextScope contextScope) {
serviceContextScope.setId(
serviceContextScope.getParentScope().getId() + "." + Utils.Names.shortUUID());
getScopeManager().registerScope(serviceContextScope, capabilities(contextScope).getBrokerURI());
serviceContextScope.setDigitalTwin(new DigitalTwinImpl(this, contextScope, getMainKnowledgeGraph()));
serviceContextScope.setDigitalTwin(new DigitalTwinImpl(this, contextScope,
getMainKnowledgeGraph()));

if (serviceContextScope.getServices(RuntimeService.class).isEmpty()) {
// add self as the runtime service, which is needed by the slave scopes
Expand Down Expand Up @@ -309,51 +311,44 @@ public long submit(Observation observation, ContextScope scope) {
}

@Override
public String resolve(long id, ContextScope scope) {
public Future<Observation> resolve(long id, ContextScope scope) {

if (scope instanceof ServiceContextScope serviceContextScope) {

// TODO all the logic that's now in ServiceContextScope must be here. Messages should be
// clearly orchestrated and documented across the DT. There should be a task ID token
// coming from the request/client in the scope, so that task structure can be reconstructed
// and monitored. The task ID must be in every message.
var digitalTwin = getDigitalTwin(scope);
var resolver = serviceContextScope.getService(Resolver.class);
var observation = serviceContextScope.getObservation(id);
var digitalTwin = getDigitalTwin(scope);
var activity = digitalTwin.knowledgeGraph().activity(digitalTwin.knowledgeGraph().user(), scope,
observation, Activity.Type.RESOLUTION, null);

final var ret = new CompletableFuture<Observation>();

Thread.ofVirtual().start(() -> {
try {
var result = observation;
scope.send(Message.MessageClass.ObservationLifecycle,
Message.MessageType.ResolutionStarted, result);
var dataflow = resolver.resolve(observation, scope);
if (dataflow != null) {
if (!dataflow.isEmpty()) {
result = runDataflow(dataflow, scope);
ret.complete(result);
}
activity.success(scope, result, dataflow);
} else {
activity.fail(scope, observation);
}
scope.send(Message.MessageClass.ObservationLifecycle,
Message.MessageType.ResolutionSuccessful, id);
} catch (Throwable t) {
ret.completeExceptionally(t);
activity.fail(scope, observation, t);
scope.send(Message.MessageClass.ObservationLifecycle,
Message.MessageType.ResolutionAborted, id);
}
});

// // root-level activity when user is the agent. Inside resolution the activity may have children
// var activity = digitalTwin.knowledgeGraph().activity(digitalTwin.knowledgeGraph().user(), this,
// observation, Activity.Type.INSTANTIATION, parentActivity);
//
// var id = activity.run(this);
//
// // create task before resolution starts so we guarantee a response
// var ret = newMessageTrackingTask(EnumSet.of(Message.MessageType.ResolutionAborted,
// Message.MessageType.ResolutionSuccessful), Observation.class, id);
//
// final var runtime = getService(org.integratedmodelling.klab.api.services.RuntimeService.class);
// final var resolver = getService(Resolver.class);
//
// // start virtual resolution thread. This should be everything we need.
// Thread.ofVirtual().start(() -> {
// try {
// var dataflow = resolver.resolve(observation, this);
// if (dataflow != null) {
// if (!dataflow.isEmpty()) {
// /* TODO return value */
// runtime.runDataflow(dataflow, this);
// }
// activity.success(this, observation, dataflow);
// } else {
// activity.fail(this, observation);
// }
// send(Message.MessageClass.ObservationLifecycle, Message.MessageType.ResolutionSuccessful, id);
// } catch (Throwable t) {
// activity.fail(this, observation, t);
// send(Message.MessageClass.ObservationLifecycle, Message.MessageType.ResolutionAborted, id);
// }
// });
//
// return ret;
return ret;
}

throw new KlabInternalErrorException("Digital twin is inaccessible because of unexpected scope " +
Expand Down

0 comments on commit daa843b

Please sign in to comment.