Skip to content

Commit

Permalink
Fixes, better handling of queues, logging
Browse files Browse the repository at this point in the history
  • Loading branch information
fvilla committed Nov 28, 2024
1 parent f281665 commit 1ee2155
Show file tree
Hide file tree
Showing 15 changed files with 86 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@

import org.integratedmodelling.klab.api.collections.Parameters;
import org.integratedmodelling.klab.api.exceptions.KlabServiceAccessException;
import org.integratedmodelling.klab.api.identities.Identity;
import org.integratedmodelling.klab.api.lang.kactors.KActorsBehavior.Ref;
import org.integratedmodelling.klab.api.services.KlabService;
import org.integratedmodelling.klab.api.services.ResourcesService;
import org.integratedmodelling.klab.api.services.runtime.Channel;

/**
Expand Down Expand Up @@ -52,9 +49,9 @@
public interface Scope extends Channel {

enum Expiration {
IDLE_TIME,
AT_CLOSE,
EXPLICIT
IDLE_TIMEOUT,
SERVICE_SHUTDOWN,
EXPLICIT_ACTION
}

enum Status {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,24 @@
import java.util.List;

/**
* Request for the runtime to add an observation to the knowledge graph and optionally resolve it. The
* resolution constraints come from the scope, creating a new content scope at server side from the root
* context scope pointed to by the scope token.
* Request for the runtime to add an observation to the knowledge graph. The resolution constraints come from
* the scope, creating a new content scope at server side from the root context scope pointed to by the scope
* token. The observation becomes an observer if the role == OBSERVER, and the knowledge graph has the faculty
* of providing a default observer if a regular observation is made in a scope that doesn't have one.
*/
public class ResolutionRequest {

public enum Role {
OBSERVATION,
OBSERVER
}

private Observation observation;
private Observable observable;
private String agentName; // for provenance when needed. Agents are identified by name
private List<ResolutionConstraint> resolutionConstraints = new ArrayList<>();
private long observationId;
private Role role = Role.OBSERVATION;

public long getObservationId() {
return observationId;
Expand Down Expand Up @@ -51,14 +58,6 @@ public Observable getObservable() {
public void setObservable(Observable observable) {
this.observable = observable;
}
//
// public boolean isStartResolution() {
// return startResolution;
// }
//
// public void setStartResolution(boolean startResolution) {
// this.startResolution = startResolution;
// }

public String getAgentName() {
return agentName;
Expand All @@ -67,5 +66,13 @@ public String getAgentName() {
public void setAgentName(String agentName) {
this.agentName = agentName;
}

public Role getRole() {
return role;
}

public void setRole(Role role) {
this.role = role;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ enum MessageType {
ResolutionAborted(Queue.Events, Observation.class),
ResolutionStarted(Queue.Events, Observation.class),

ContextClosed(Queue.Events, String.class),

/**
* Engine status has changed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
package org.integratedmodelling.common.authentication.scope;

import org.integratedmodelling.klab.api.collections.Parameters;
import org.integratedmodelling.klab.api.exceptions.KlabIllegalArgumentException;
import org.integratedmodelling.klab.api.exceptions.KlabResourceAccessException;
import org.integratedmodelling.klab.api.identities.Identity;
import org.integratedmodelling.klab.api.lang.kactors.KActorsBehavior;
import org.integratedmodelling.klab.api.scope.Scope;
import org.integratedmodelling.klab.api.services.*;
import org.integratedmodelling.klab.api.services.runtime.Channel;
import org.integratedmodelling.klab.api.services.runtime.Message;

import java.io.IOException;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* An abstract scope delegating all communication to an externally supplied Channel. Provides the basic API to
Expand All @@ -25,7 +20,7 @@ public abstract class AbstractDelegatingScope implements Scope {
Parameters<String> data = Parameters.create();
Status status = Status.EMPTY;
Scope parentScope;
private Expiration expiration = Expiration.AT_CLOSE;
private Expiration expiration = Expiration.SERVICE_SHUTDOWN;

public AbstractDelegatingScope(Channel delegateChannel) {
this.delegateChannel = delegateChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
public abstract class AbstractReactiveScopeImpl extends MessagingChannelImpl implements ReactiveScope {

protected KActorsBehavior.Ref agent;
protected Expiration expiration = Expiration.AT_CLOSE;
protected Expiration expiration = Expiration.SERVICE_SHUTDOWN;

public AbstractReactiveScopeImpl(Identity identity, boolean isSender, boolean isReceiver) {
super(identity, isSender, isReceiver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ public void installQueueConsumer(String queueId, Consumer<Message> consumer) {
protected void closeMessaging() {
if (this.channel_ != null) {
try {
this.channel_.close();
this.connection.close();
for (var queue : queueNames.values()) {
this.channel_.queueDelete(queue);
}
queueNames.clear();
} catch (Exception e) {
error("Error closing messaging channel", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
* A singleton that ingests {@link ResourceSet}s intelligently and keeps tabs on loaded knowledge, caching
* documents to minimize network transfer. It can be configured with callbacks to extract derived knowledge
* assets from documents upon loading, also handling versions.
* <p>
* TODO when used with locked services, should subscribe to resource events and reload any changed namespace
* when changes are notified.
*/
public enum KnowledgeRepository {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class KimSymbolDefinitionImpl extends KimStatementImpl implements KimSymb
private String urn;
private String defineClass;
private Object value;
private boolean defaulted;

@Override
public String getUrn() {
Expand Down Expand Up @@ -59,4 +60,12 @@ public void setName(String name) {
public void visit(Visitor visitor) {

}

public boolean isDefaulted() {
return defaulted;
}

public void setDefaulted(boolean defaulted) {
this.defaulted = defaulted;
}
}
16 changes: 16 additions & 0 deletions klab.core.services/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,22 @@
<version>0.13.0</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.20.0</version>
</dependency>

<!-- hns ttps://mvnrepository.com/artifact/org.springframework.security/spring-security-core -->
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ public ScopeManager getScopeManager() {
return _scopeManager;
}


/**
* The service secret is a legitimate API key for the service, only known to clients that can read it
* because they are sharing the filesystem. These clients can access the service by just stating their
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@ private UserIdentity createUserIdentity(EngineAuthorization engineAuthorization)
return ret;
}

public synchronized <T extends Scope> List<T> getScopes(Scope.Type type, Class<T> scopeClass) {
List<T> ret = new ArrayList<>();
for (var scope : scopes.values()) {
if (scope.getType() == type) {
ret.add((T)scope);
}
}
return ret;
}

public ServiceUserScope getOrCreateUserScope(EngineAuthorization authorization) {

var ret = scopes.get(authorization.getUsername());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ public void setDigitalTwin(DigitalTwin digitalTwin) {
@Override
public void close() {

// TODO when we're not in a runtime, we should not touch the digital twin (which is null) and
// we MUST call closeContext on all the other services we have paired with

digitalTwin.dispose();

// Call close() on all closeables in our dataset, including AutoCloseable if any.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ private KlabStatement adaptDefine(DefineSyntax define, KimNamespace namespace) {
ret.setLength(define.getCodeLength());
ret.setNamespace(namespace.getUrn());
ret.setProjectName(namespace.getProjectName());
ret.setDefaulted(define.isDefaulted());
ret.setDocumentClass(KlabAsset.KnowledgeClass.NAMESPACE);
ret.setValue(adaptValue(define.getValue(), namespace.getUrn(), namespace.getProjectName(),
KlabAsset.KnowledgeClass.NAMESPACE));
Expand Down
9 changes: 0 additions & 9 deletions klab.services.runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,6 @@
<version>5.20.0</version>
</dependency>
-->

<!-- this will enable logging across the board -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.5.11</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package org.integratedmodelling.klab.services.runtime;

import com.google.common.collect.ImmutableList;
import org.apache.qpid.server.SystemLauncher;
import org.integratedmodelling.common.authentication.scope.AbstractServiceDelegatingScope;
import org.integratedmodelling.common.logging.Logging;
import org.integratedmodelling.common.runtime.DataflowImpl;
import org.integratedmodelling.common.services.RuntimeCapabilitiesImpl;
import org.integratedmodelling.klab.api.collections.Pair;
import org.integratedmodelling.klab.api.data.KnowledgeGraph;
import org.integratedmodelling.klab.api.data.RuntimeAsset;
import org.integratedmodelling.klab.api.digitaltwin.DigitalTwin;
Expand All @@ -20,16 +18,13 @@
import org.integratedmodelling.klab.api.provenance.Activity;
import org.integratedmodelling.klab.api.provenance.Agent;
import org.integratedmodelling.klab.api.provenance.Provenance;
import org.integratedmodelling.klab.api.provenance.impl.ActivityImpl;
import org.integratedmodelling.klab.api.scope.ContextScope;
import org.integratedmodelling.klab.api.scope.Scope;
import org.integratedmodelling.klab.api.scope.SessionScope;
import org.integratedmodelling.klab.api.services.Reasoner;
import org.integratedmodelling.klab.api.services.Resolver;
import org.integratedmodelling.klab.api.services.ResourcesService;
import org.integratedmodelling.klab.api.services.resolver.Coverage;
import org.integratedmodelling.klab.api.services.resources.ResourceSet;
import org.integratedmodelling.klab.api.services.runtime.Actuator;
import org.integratedmodelling.klab.api.services.runtime.Dataflow;
import org.integratedmodelling.klab.api.services.runtime.Message;
import org.integratedmodelling.klab.api.services.runtime.Notification;
Expand All @@ -44,10 +39,6 @@
import org.integratedmodelling.klab.services.scopes.ServiceSessionScope;
import org.integratedmodelling.klab.services.scopes.messaging.EmbeddedBroker;
import org.integratedmodelling.klab.utilities.Utils;
import org.jgrapht.Graph;
import org.jgrapht.graph.DefaultDirectedGraph;
import org.jgrapht.graph.DefaultEdge;
import org.jgrapht.traverse.TopologicalOrderIterator;

import java.io.File;
import java.util.*;
Expand Down Expand Up @@ -142,6 +133,18 @@ public boolean operationalizeService() {
@Override
public boolean shutdown() {

/**
* Close every scope that's scheduled for closing at service shutdown
*/
for (var scope : getScopeManager().getScopes(Scope.Type.CONTEXT, ContextScope.class)) {
if (scope instanceof ServiceContextScope serviceContextScope && serviceContextScope.getExpiration() == Scope.Expiration.SERVICE_SHUTDOWN) {
scope.send(Message.MessageClass.SessionLifecycle, Message.MessageType.ContextClosed,
scope.getId());
scope.close();
Logging.INSTANCE.info("Context " + scope.getId() + " closed upon service shutdown");
}
}

serviceScope().send(Message.MessageClass.ServiceLifecycle, Message.MessageType.ServiceUnavailable,
capabilities(serviceScope()));
if (systemLauncher != null) {
Expand Down Expand Up @@ -291,7 +294,7 @@ public long submit(Observation observation, ContextScope scope) {
* root DT level and we get the context initialization activity as parent.
*/
var instantiation = digitalTwin.knowledgeGraph().operation(agent, parentActivity,
Activity.Type.INSTANTIATION, observation);
Activity.Type.INSTANTIATION, observation, this);

try (instantiation) {

Expand Down Expand Up @@ -372,7 +375,7 @@ public Future<Observation> resolve(long id, ContextScope scope) {
This will commit or rollback at close()
*/
var resolution = digitalTwin.knowledgeGraph().operation(digitalTwin.knowledgeGraph().klab()
, parentActivity, Activity.Type.RESOLUTION);
, parentActivity, Activity.Type.RESOLUTION, resolver);

try (resolution) {
result = observation;
Expand Down Expand Up @@ -410,7 +413,7 @@ this will commit all resources at close()
*/
var contextualization =
digitalTwin.knowledgeGraph().operation(digitalTwin.knowledgeGraph().klab(),
resolutionActivity, Activity.Type.EXECUTION, dataflow);
resolutionActivity, Activity.Type.EXECUTION, dataflow, this);

try (contextualization) {
// TODO contextualization gets its own activities to use in operations
Expand Down

0 comments on commit 1ee2155

Please sign in to comment.