Skip to content

Commit

Permalink
Better messaging and activity recording
Browse files Browse the repository at this point in the history
  • Loading branch information
fvilla committed Nov 28, 2024
1 parent 62d26ad commit f281665
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.integratedmodelling.klab.api.knowledge.SemanticType;
import org.integratedmodelling.klab.api.knowledge.observation.scale.time.Time;
import org.integratedmodelling.klab.api.knowledge.observation.scale.time.TimeInstant;
import org.integratedmodelling.klab.api.services.KlabService;

/**
* Activity (process). Primary processes produce artifacts. Secondary processes (after creation) may modify
Expand All @@ -41,8 +42,16 @@ default RuntimeAsset.Type classify() {
return RuntimeAsset.Type.ACTIVITY;
}

String getServiceId();

String getServiceName();

KlabService.Type getServiceType();

String getDataflow();

enum Type {
INITIALIZATION, RESOLUTION, CONTEXTUALIZATION, INSTANTIATION
INITIALIZATION, RESOLUTION, CONTEXTUALIZATION, INSTANTIATION, EXECUTION
}

enum Outcome {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.integratedmodelling.klab.api.provenance.impl;

import org.integratedmodelling.klab.api.provenance.Activity;
import org.integratedmodelling.klab.api.services.KlabService;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -18,6 +19,10 @@ public class ActivityImpl extends ProvenanceNodeImpl implements Activity {
private String taskId;
private Outcome outcome;
private String stackTrace;
private String serviceId;
private String serviceName;
private KlabService.Type serviceType;
private String dataflow;

@Override
public long getStart() {
Expand Down Expand Up @@ -120,6 +125,42 @@ public void setStackTrace(String stackTrace) {
this.stackTrace = stackTrace;
}

@Override
public String getServiceId() {
return serviceId;
}

public void setServiceId(String serviceId) {
this.serviceId = serviceId;
}

@Override
public String getServiceName() {
return serviceName;
}

public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}

@Override
public KlabService.Type getServiceType() {
return serviceType;
}

public void setServiceType(KlabService.Type serviceType) {
this.serviceType = serviceType;
}

@Override
public String getDataflow() {
return dataflow;
}

public void setDataflow(String dataflow) {
this.dataflow = dataflow;
}

@Override
public String toString() {
return "ActivityImpl{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ public void installQueueConsumer(String queueId, Consumer<Message> consumer) {
queueConsumers.computeIfAbsent(queueId, k -> new ArrayList<>()).add(consumer);
}

protected void closeMessaging() {
if (this.channel_ != null) {
try {
this.channel_.close();
this.connection.close();
} catch (Exception e) {
error("Error closing messaging channel", e);
}
}
}

protected Channel getOrCreateChannel(Message.Queue queue) {

// if (!queueNames.containsKey(queue)) {
Expand Down Expand Up @@ -169,7 +180,7 @@ public Collection<Message.Queue> setupMessaging(String brokerUrl, String scopeId
this.connection = this.connectionFactory.newConnection();
return setupMessagingQueues(scopeId, queuesHeader);
} catch (Throwable t) {
error(t);
error("Error connecting to broker: no messaging available", t);
return EnumSet.noneOf(Message.Queue.class);
}
}
Expand All @@ -196,7 +207,8 @@ public Collection<Message.Queue> setupMessagingQueues(String scopeId,
for (var queue : queuesHeader) {
try {
String queueId = scopeId + "." + queue.name().toLowerCase();
getOrCreateChannel(queue).queueDeclare(queueId, true, false, false, Map.of());
getOrCreateChannel(queue).queueDeclare(queueId, true, false, true /* TODO LINK TO SCOPE
PERSISTENCE */, Map.of());
this.queueNames.put(queue, queueId);
ret.add(queue);
} catch (Throwable e) {
Expand Down Expand Up @@ -313,9 +325,7 @@ public Collection<Message.Queue> setupMessagingQueues(String scopeId,
info(this.getClass().getCanonicalName() + " scope connected to queues "
+ ret +
" through broker " + connectionFactory.getHost() + (receiver ? " (R)" : "") + (sender ?
" (T)" : ""));
} else {
info("CHE CAZZO, connection factory is null for " + this.getClass().getCanonicalName());
" (T)" : ""));
}

return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
import org.integratedmodelling.klab.api.services.runtime.Dataflow;

import java.io.PrintWriter;
import java.io.StringWriter;

/**
* Encode a dataflow into a k.LAB-compatible serialized form.
*/
public class DataflowEncoder {

private final Dataflow<Observation> dataflow;
private final Dataflow<?> dataflow;
private final ContextScope scope;

public DataflowEncoder(Dataflow<Observation> dataflow, ContextScope scope) {
public DataflowEncoder(Dataflow<?> dataflow, ContextScope scope) {
this.dataflow = dataflow;
this.scope = scope;
}
Expand Down Expand Up @@ -93,4 +94,14 @@ private String sanitize(String name) {
// return ret;
}

@Override
public String toString() {
try (var writer = new StringWriter()) {
var printWriter = new PrintWriter(writer);
encode(printWriter);
return writer.toString();
} catch (Exception e) {
}
return "Error encoding dataflow";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public <T extends KlabService> Collection<T> getServices(Class<T> serviceClass)

@Override
public void close() {
closeMessaging();
var runtime = getService(RuntimeService.class);
if (runtime != null) {
runtime.releaseSession(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,8 @@ public void close() {
}
}

closeMessaging();

var runtime = getService(RuntimeService.class);
if (runtime instanceof BaseService baseService) {
baseService.getScopeManager().releaseScope(this.getId());
Expand Down
10 changes: 0 additions & 10 deletions klab.core.services/src/main/resources/logback-spring.xml

This file was deleted.

9 changes: 9 additions & 0 deletions klab.services.runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@
<version>5.20.0</version>
</dependency>
-->

<!-- this will enable logging across the board -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.5.11</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ This will commit or rollback at close()
dataflow = resolver.resolve(observation, scope);
if (dataflow != null) {
resolution.success(scope, result, dataflow,
"Resolution of observation _" + observation.getUrn() + "_ of **" + observation.getObservable().getUrn() + "**");
"Resolution of observation _" + observation.getUrn() + "_ of **" + observation.getObservable().getUrn() + "**", resolver);
scope.send(Message.MessageClass.ObservationLifecycle,
Message.MessageType.ResolutionSuccessful, result);
resolutionActivity = resolution.getActivity();
Expand All @@ -410,7 +410,7 @@ this will commit all resources at close()
*/
var contextualization =
digitalTwin.knowledgeGraph().operation(digitalTwin.knowledgeGraph().klab(),
resolutionActivity, Activity.Type.CONTEXTUALIZATION);
resolutionActivity, Activity.Type.EXECUTION, dataflow);

try (contextualization) {
// TODO contextualization gets its own activities to use in operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ protected Map<String, Object> asParameters(Object asset, Object... additionalPar
ret.put("type", activity.getType().name());
ret.put("name", activity.getName());
ret.put("id", activity.getId());
ret.put("serviceId", activity.getServiceId());
ret.put("serviceName", activity.getServiceName());
ret.put("serviceType", activity.getServiceType() == null ? null : activity.getServiceType().name());
ret.put("dataflow", activity.getDataflow());
ret.put("outcome", activity.getOutcome() == null ? null : activity.getOutcome().name());
ret.put("stackTrace", activity.getStackTrace());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.integratedmodelling.common.logging.Logging;
import org.integratedmodelling.common.runtime.ActuatorImpl;
import org.integratedmodelling.common.services.client.resolver.DataflowEncoder;
import org.integratedmodelling.klab.api.data.RuntimeAsset;
import org.integratedmodelling.klab.api.digitaltwin.DigitalTwin;
import org.integratedmodelling.klab.api.exceptions.KlabIllegalArgumentException;
Expand All @@ -23,9 +24,11 @@
import org.integratedmodelling.klab.api.scope.Scope;
import org.integratedmodelling.klab.api.scope.SessionScope;
import org.integratedmodelling.klab.api.scope.UserScope;
import org.integratedmodelling.klab.api.services.KlabService;
import org.integratedmodelling.klab.api.services.Reasoner;
import org.integratedmodelling.klab.api.services.resolver.Coverage;
import org.integratedmodelling.klab.api.services.runtime.Actuator;
import org.integratedmodelling.klab.api.services.runtime.Dataflow;
import org.integratedmodelling.klab.api.services.runtime.objects.ContextInfo;
import org.integratedmodelling.klab.api.services.runtime.objects.SessionInfo;
import org.integratedmodelling.klab.api.utils.Utils;
Expand Down Expand Up @@ -330,8 +333,13 @@ public Operation operation(Agent agent, Activity parentActivity, Activity.Type a
ret.activity.setDescription(description);
} else if (dat instanceof OperationImpl operation) {
ret.parent = operation;
} // TODO this should also get an Actuator and link it as plan to the activity & parent +
// set up for later
} else if (dat instanceof KlabService service) {
activity.setServiceId(service.serviceId());
activity.setServiceName(service.getServiceName());
activity.setServiceType(KlabService.Type.classify(service));
} else if (dat instanceof Dataflow<?> dataflow) {
activity.setDataflow(new DataflowEncoder(dataflow, scope).toString());
}
}
}

Expand Down Expand Up @@ -532,8 +540,13 @@ protected <T> List<T> adapt(EagerResult query, Class<T> cls, Scope scope) {
instance.setStart(node.get("start").asLong());
instance.setEnd(node.get("end").asLong());
instance.setName(node.get("name").asString());
instance.setServiceName(node.get("serviceName").isNull()? null : node.get("serviceName").asString());
instance.setServiceId(node.get("serviceId").isNull() ? null : node.get("serviceId").asString());
instance.setServiceType(node.get("serviceType").isNull() ? null :
KlabService.Type.valueOf(node.get("serviceType").asString()));
instance.setDataflow(node.get("dataflow").isNull() ? null : node.get("dataflow").asString());
instance.setType(Activity.Type.valueOf(instance.getName()));
instance.setDescription(node.get("description") == null ? "No description" : node.get(
instance.setDescription(node.get("description").isNull() ? "No description" : node.get(
"description").asString());
instance.setId(node.get("id").asLong());
ret.add((T) instance);
Expand Down Expand Up @@ -975,7 +988,6 @@ public <T extends RuntimeAsset> List<T> get(ContextScope scope, Class<T> resultC

private List<Activity> getActivity(ContextScope scope, Object... queriables) {


Map<String, Object> queryParameters = new LinkedHashMap<>();
var query = new StringBuilder(getScopeQuery(scope, queryParameters) + "-[:HAS_PROVENANCE]->" +
"(:Provenance)-[:HAS_CHILD]->");
Expand Down

0 comments on commit f281665

Please sign in to comment.