Skip to content

Commit

Permalink
Dataflow stored more or less OK
Browse files Browse the repository at this point in the history
  • Loading branch information
Ferdinando Villa committed Nov 24, 2024
1 parent 75efd25 commit 59f09b0
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 290 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ enum Relationship {
EMERGED_FROM,
HAS_OBSERVER,
HAS_PLAN,
RESOLVED_BY,
BY_AGENT,
CREATED,
HAS_DATAFLOW,
HAS_PROVENANCE,
HAS_ACTIVITY,
HAS_CHILD,
TRIGGERED;
TRIGGERED,
CONTEXTUALIZED;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public class ActuatorImpl implements Actuator {
private Actuator.Type actuatorType;
private long internalId; // ID within the graph, can't be the same as the observation
private double resolvedCoverage;

private transient KnowledgeGraph.Operation operation;
//
// private transient KnowledgeGraph.Operation operation;

@Override
public long getId() {
Expand Down Expand Up @@ -137,20 +137,20 @@ public void setResolvedCoverage(double resolvedCoverage) {
public void setInternalId(long internalId) {
this.internalId = internalId;
}

/**
* Non-API: the actuator carries the activity it represents, which it transfers to the scope of execution
* so that provenance can be reconstructed.
*
* @return
*/
public KnowledgeGraph.Operation getOperation() {
return operation;
}

public void setOperation(KnowledgeGraph.Operation operation) {
this.operation = operation;
}
//
// /**
// * Non-API: the actuator carries the activity it represents, which it transfers to the scope of execution
// * so that provenance can be reconstructed.
// *
// * @return
// */
// public KnowledgeGraph.Operation getOperation() {
// return operation;
// }
//
// public void setOperation(KnowledgeGraph.Operation operation) {
// this.operation = operation;
// }

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,34 +47,17 @@ public class ResolverService extends BaseService implements Resolver {
*/
private static double MINIMUM_WORTHWHILE_CONTRIBUTION = 0.15;

/*
* The knowledge repository. Models and instances are built and kept in the resolver upon input
* from the resource services. For now we keep everything in memory.
*
* FIXME the URNs should include the version number after @ to ensure version matching instead
* of only storing the latest version
*
* Version of the latest loaded object is kept for everything, including namespaces
*/
// Map<String, Version> urnToVersion = Collections.synchronizedMap(new HashMap<>());
// Map<String, Model> models = Collections.synchronizedMap(new HashMap<>());
// Map<String, Observation> instances = Collections.synchronizedMap(new HashMap<>());
// Parameters<String> defines = Parameters.createSynchronized();
private String hardwareSignature = Utils.Names.getHardwareId();
private final String hardwareSignature = Utils.Names.getHardwareId();
private ResolverConfiguration configuration;

// OBVIOUSLY temporary - when all done, merge its methods with this and remove the porker and the old
// dirt.
private ResolutionCompiler resolutionCompiler = new ResolutionCompiler();
private final ResolutionCompiler resolutionCompiler = new ResolutionCompiler();

public ResolverService(AbstractServiceDelegatingScope scope, ServiceStartupOptions options) {
super(scope, Type.RESOLVER, options);
// setProvideScopesAutomatically(true);
ServiceConfiguration.INSTANCE.setMainService(this);
readConfiguration(options);
KnowledgeRepository.INSTANCE.setProcessor(KlabAsset.KnowledgeClass.NAMESPACE, (ns) -> {
return loadNamespace((KimNamespace) ns, scope);
});
KnowledgeRepository.INSTANCE.setProcessor(KlabAsset.KnowledgeClass.NAMESPACE,
(ns) -> loadNamespace((KimNamespace) ns, scope));
}

private void readConfiguration(ServiceStartupOptions options) {
Expand Down Expand Up @@ -172,7 +155,8 @@ private Model loadModel(KimModel statement, Scope scope) {
// TODO any literal value must be added first

for (var resourceUrn : statement.getResourceUrns()) {
// FIXME this should be one multi-resource contextualizable
// FIXME when >1 this should be one multi-resource contextualizable
// TODO use static builders instead of polymorphic constructors
model.getComputation().add(new ContextualizableImpl(resourceUrn));
}
model.getComputation().addAll(statement.getContextualization());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.integratedmodelling.klab.services.runtime;

import com.google.common.collect.ImmutableList;
import org.integratedmodelling.common.runtime.ActuatorImpl;
import org.integratedmodelling.common.runtime.DataflowImpl;
import org.integratedmodelling.klab.api.Klab;
import org.integratedmodelling.klab.api.collections.Pair;
import org.integratedmodelling.klab.api.collections.Parameters;
Expand All @@ -14,22 +16,27 @@
import org.integratedmodelling.klab.api.knowledge.observation.scale.space.Space;
import org.integratedmodelling.klab.api.knowledge.observation.scale.time.Time;
import org.integratedmodelling.klab.api.lang.ServiceCall;
import org.integratedmodelling.klab.api.provenance.Activity;
import org.integratedmodelling.klab.api.scope.ContextScope;
import org.integratedmodelling.klab.api.scope.Scope;
import org.integratedmodelling.klab.api.services.Language;
import org.integratedmodelling.klab.api.services.RuntimeService;
import org.integratedmodelling.klab.api.services.runtime.Actuator;
import org.integratedmodelling.klab.api.services.runtime.Dataflow;
import org.integratedmodelling.klab.components.ComponentRegistry;
import org.integratedmodelling.klab.configuration.ServiceConfiguration;
import org.integratedmodelling.klab.runtime.storage.BooleanStorage;
import org.integratedmodelling.klab.runtime.storage.DoubleStorage;
import org.integratedmodelling.klab.runtime.storage.FloatStorage;
import org.integratedmodelling.klab.runtime.storage.KeyedStorage;
import org.integratedmodelling.klab.services.scopes.ServiceContextScope;
import org.jgrapht.Graph;
import org.jgrapht.graph.DefaultDirectedGraph;
import org.jgrapht.graph.DefaultEdge;
import org.jgrapht.traverse.TopologicalOrderIterator;
import org.ojalgo.concurrent.Parallelism;

import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -43,26 +50,33 @@ public class ExecutionSequence {

private final ServiceContextScope scope;
private final DigitalTwin digitalTwin;
private final Language languageService;
private final ComponentRegistry componentRegistry;
private final double resolvedCoverage;
private final KnowledgeGraph.Operation contextualization;
private final Dataflow<Observation> dataflow;
private List<List<ExecutorOperation>> sequence = new ArrayList<>();
private boolean empty;
// the context for the next operation. Starts at the observation and doesn't normally change but
// implementations
// may change it when they return a non-null, non-POD object.
// TODO check if this should be a RuntimeAsset or even an Observation.
private Object currentExecutionContext;
private Map<Actuator, KnowledgeGraph.Operation> operations = new HashMap<>();

private ExecutionSequence(List<Pair<Actuator, Integer>> pairs, double resolvedCoverage,
ServiceContextScope contextScope,
DigitalTwin digitalTwin, ComponentRegistry componentRegistry) {

public ExecutionSequence(KnowledgeGraph.Operation contextualization, Dataflow<Observation> dataflow,
ComponentRegistry componentRegistry, ServiceContextScope contextScope) {
this.scope = contextScope;
this.digitalTwin = digitalTwin;
this.languageService = ServiceConfiguration.INSTANCE.getService(Language.class);
this.contextualization = contextualization;
this.resolvedCoverage = dataflow instanceof DataflowImpl dataflow1 ?
dataflow1.getResolvedCoverage() : 1.0;
this.componentRegistry = componentRegistry;
this.resolvedCoverage = resolvedCoverage;
this.dataflow = dataflow;
this.digitalTwin = contextScope.getDigitalTwin();
}

public boolean compile(Actuator rootActuator) {

var pairs = sortComputation(rootActuator);
List<ExecutorOperation> current = null;
int currentGroup = -1;
for (var pair : pairs) {
Expand All @@ -78,15 +92,10 @@ private ExecutionSequence(List<Pair<Actuator, Integer>> pairs, double resolvedCo

if (current != null) {
sequence.add(current);
return true;
}

}

public static ExecutionSequence compile(List<Pair<Actuator, Integer>> pairs,
double resolvedCoverage, ServiceContextScope contextScope,
DigitalTwin digitalTwin,
ComponentRegistry componentRegistry) {
return new ExecutionSequence(pairs, resolvedCoverage, contextScope, digitalTwin, componentRegistry);
return false;
}

public boolean run() {
Expand Down Expand Up @@ -142,11 +151,7 @@ class ExecutorOperation {

public ExecutorOperation(Actuator actuator) {
this.id = actuator.getId();
if (actuator instanceof ActuatorImpl actuator1) {
this.operation = actuator1.getOperation();
// remove to avoid leaks after using the actuator as a shuttle
actuator1.setOperation(null);
}
this.operation = operations.get(actuator);
this.observation = scope.getObservation(this.id);
compile(actuator);
}
Expand Down Expand Up @@ -345,4 +350,91 @@ public ExecutionSequence runActuator(Actuator actuator) {
return this;
}


/**
* Establish the order of execution and the possible parallelism. Each root actuator should be sorted by
* dependency and appended in order to the result list along with its order of execution. Successive roots
* can refer to the previous roots but they must be executed sequentially.
* <p>
* The DigitalTwin is asked to register the actuator in the scope and prepare the environment and state
* for its execution, including defining its contextualization scale in context.
*
* @return
*/
private List<Pair<Actuator, Integer>> sortComputation(Actuator rootActuator) {
List<Pair<Actuator, Integer>> ret = new ArrayList<>();
int executionOrder = 0;
Map<Long, Actuator> branch = new HashMap<>();
Set<Actuator> group = new HashSet<>();
var dependencyGraph = computeActuatorOrder(rootActuator);
for (var nextActuator : ImmutableList.copyOf(new TopologicalOrderIterator<>(dependencyGraph))) {
if (nextActuator.getActuatorType() != Actuator.Type.REFERENCE) {
ret.add(Pair.of(nextActuator, (executionOrder = checkExecutionOrder
(executionOrder, nextActuator, dependencyGraph, group))));
}
}
return ret;
}

/**
* If the actuator depends on any in the currentGroup, empty the group and increment the order; otherwise,
* add it to the group and return the same order.
*
* @param executionOrder
* @param current
* @param dependencyGraph
* @param currentGroup
* @return
*/
private int checkExecutionOrder(int executionOrder, Actuator current,
Graph<Actuator, DefaultEdge> dependencyGraph,
Set<Actuator> currentGroup) {
boolean dependency = false;
for (Actuator previous : currentGroup) {
for (var edge : dependencyGraph.incomingEdgesOf(current)) {
if (currentGroup.contains(dependencyGraph.getEdgeSource(edge))) {
dependency = true;
break;
}
}
}

if (dependency) {
currentGroup.clear();
return executionOrder + 1;
}

currentGroup.add(current);

return executionOrder;
}


private Graph<Actuator, DefaultEdge> computeActuatorOrder(Actuator rootActuator) {
Graph<Actuator, DefaultEdge> dependencyGraph = new DefaultDirectedGraph<>(DefaultEdge.class);
Map<Long, Actuator> cache = new HashMap<>();
loadGraph(rootActuator, dependencyGraph, cache, this.contextualization);
// keep the actuators that do nothing so we can tag their observation as resolved
return dependencyGraph;
}


private void loadGraph(Actuator rootActuator, Graph<Actuator, DefaultEdge> dependencyGraph, Map<Long,
Actuator> cache, KnowledgeGraph.Operation contextualization) {

var childContextualization = contextualization.createChild(rootActuator,
"Contextualization of " + rootActuator, Activity.Type.CONTEXTUALIZATION);
operations.put(rootActuator, childContextualization);

cache.put(rootActuator.getId(), rootActuator);
dependencyGraph.addVertex(rootActuator);
for (Actuator child : rootActuator.getChildren()) {
if (child.getActuatorType() == Actuator.Type.REFERENCE) {
dependencyGraph.addEdge(cache.get(child.getId()), rootActuator);
} else {
loadGraph(child, dependencyGraph, cache, childContextualization);
dependencyGraph.addEdge(child, rootActuator);
}
}
}
}
Loading

0 comments on commit 59f09b0

Please sign in to comment.