Skip to content

Commit

Permalink
Test with remote AMQP broker OK; more articulation in KG
Browse files Browse the repository at this point in the history
  • Loading branch information
fvilla committed Nov 26, 2024
1 parent d8934a1 commit f06b6b6
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ public Collection<Message.Queue> setupMessagingQueues(String scopeId,
var message = Utils.Json.parseObject(new String(delivery.getBody(),
StandardCharsets.UTF_8), Message.class);

System.out.println("DIO PESCHIERE " + message);

// if there is a consumer installed fo this queue, run it. Then if it returns
// continue, continue, else stop
var consumers = queueConsumers.get(queueId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class ExecutionSequence {
// TODO check if this should be a RuntimeAsset or even an Observation.
private Object currentExecutionContext;
private Map<Actuator, KnowledgeGraph.Operation> operations = new HashMap<>();
private Throwable cause;

public ExecutionSequence(KnowledgeGraph.Operation contextualization, Dataflow<Observation> dataflow,
ComponentRegistry componentRegistry, ServiceContextScope contextScope) {
Expand Down Expand Up @@ -130,6 +131,7 @@ public boolean run() {
return false;
}
} catch (InterruptedException e) {
this.cause = e;
scope.error(e);
}
}
Expand Down Expand Up @@ -266,6 +268,7 @@ private void compile(Actuator actuator) {
setExecutionContext(context == null ? observation : context);
return true;
} catch (Exception e) {
cause = e;
scope.error(e /* TODO tracing parameters */);
}
return true;
Expand All @@ -278,6 +281,7 @@ private void compile(Actuator actuator) {
setExecutionContext(context == null ? observation : context);
return true;
} catch (Exception e) {
cause = e;
scope.error(e /* TODO tracing parameters */);
}
return true;
Expand All @@ -300,7 +304,7 @@ public boolean run() {
for (var executor : executors) {
if (!executor.get()) {
if (operation != null) {
operation.fail(scope, observation);
operation.fail(scope, observation, cause);
}
return false;
}
Expand Down Expand Up @@ -437,4 +441,8 @@ private void loadGraph(Actuator rootActuator, Graph<Actuator, DefaultEdge> depen
}
}
}

public Throwable getCause() {
return cause;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public long submit(Observation observation, ContextScope scope) {
return ret;

} catch (Throwable t) {
instantiation.fail(scope, observation);
instantiation.fail(scope, observation, t);
}
}

Expand Down Expand Up @@ -419,7 +419,7 @@ this will commit all resources at close()
ret.complete(result);
contextualization.success(scope, dataflow, result);
} catch (Throwable t) {
contextualization.fail(scope, dataflow, result);
contextualization.fail(scope, dataflow, result, t);
ret.completeExceptionally(t);
}
}
Expand Down Expand Up @@ -461,7 +461,7 @@ public Observation runDataflow(Dataflow<Observation> dataflow, ContextScope cont
executionSequence.compile(rootActuator);
if (!executionSequence.isEmpty()) {
if (!executionSequence.run()) {
contextualization.fail(contextScope, dataflow.getTarget());
contextualization.fail(contextScope, dataflow.getTarget(), executionSequence.getCause());
return Observation.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ protected Map<String, Object> asParameters(Object asset, Object... additionalPar
ret.put("urn", observation.getUrn());
ret.put("semantictype", SemanticType.fundamentalType(
observation.getObservable().getSemantics().getType()).name());
ret.put("semantics", observation.getObservable().getUrn());
ret.put("semantics", observation.getObservable().getSemantics().getUrn());
ret.put("observable", observation.getObservable().getUrn());
ret.put("id", observation.getId());
}
case Agent agent -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.integratedmodelling.klab.api.services.Reasoner;
import org.integratedmodelling.klab.api.services.resolver.Coverage;
import org.integratedmodelling.klab.api.services.runtime.Actuator;
import org.integratedmodelling.klab.api.services.runtime.Dataflow;
import org.integratedmodelling.klab.api.services.runtime.objects.ContextInfo;
import org.integratedmodelling.klab.api.services.runtime.objects.SessionInfo;
import org.integratedmodelling.klab.api.utils.Utils;
Expand Down Expand Up @@ -514,11 +513,10 @@ protected <T> List<T> adapt(EagerResult query, Class<T> cls, Scope scope) {

instance.setUrn(node.get("urn").asString());
instance.setName(node.get("name").asString());
instance.setObservable(reasoner.resolveObservable(node.get("semantics").asString()));
instance.setObservable(reasoner.resolveObservable(node.get("observable").asString()));
instance.setResolved(node.get("resolved").asBoolean());
instance.setId(node.get("id").asLong());

// SHIT, THE GEOMETRY - geometry, metadata etc
var gResult = query("MATCH (o:Observation)-[:HAS_GEOMETRY]->(g:Geometry) WHERE o.id" +
" = $id RETURN g", Map.of("id", node.get("id").asLong()), scope);

Expand Down Expand Up @@ -548,6 +546,7 @@ protected <T> List<T> adapt(EagerResult query, Class<T> cls, Scope scope) {
// TODO
ret.add((T) instance);
} else if (Geometry.class.isAssignableFrom(cls)) {
// TODO use a cache storing scales
ret.add((T) Geometry.create(node.get("definition").asString()));
}
}
Expand Down Expand Up @@ -1037,7 +1036,7 @@ private List<Observation> getObservation(ContextScope scope, Object... queriable
if (queriables != null) {
for (var parameter : queriables) {
if (parameter instanceof Observable observable) {
queryParameters.put("semantics", observable.getUrn());
queryParameters.put("semantics", observable.getSemantics().getUrn());
query.append("MATCH (o:Observation {semantics: $semantics}");
} else if (parameter instanceof Activity rootActivity) {
} else if (parameter instanceof Long id) {
Expand Down

0 comments on commit f06b6b6

Please sign in to comment.