From 73786e090031912af227434f3d06f12144d25209 Mon Sep 17 00:00:00 2001 From: Ferdinando Villa Date: Sat, 16 Nov 2024 18:09:30 +0100 Subject: [PATCH] Moving to store actuators --- .../klab/api/data/KnowledgeGraph.java | 114 +--- .../klab/api/provenance/Activity.java | 7 - .../api/provenance/impl/ActivityImpl.java | 10 - .../common/runtime/ActuatorImpl.java | 14 +- .../services/resolver/ResolverService.java | 563 ------------------ .../services/runtime/ExecutionSequence.java | 20 +- .../klab/services/runtime/RuntimeService.java | 24 +- .../runtime/neo4j/KnowledgeGraphNeo4j.java | 265 ++------- 8 files changed, 84 insertions(+), 933 deletions(-) diff --git a/klab.core.api/src/main/java/org/integratedmodelling/klab/api/data/KnowledgeGraph.java b/klab.core.api/src/main/java/org/integratedmodelling/klab/api/data/KnowledgeGraph.java index 2a273babb..bd747607d 100644 --- a/klab.core.api/src/main/java/org/integratedmodelling/klab/api/data/KnowledgeGraph.java +++ b/klab.core.api/src/main/java/org/integratedmodelling/klab/api/data/KnowledgeGraph.java @@ -54,6 +54,16 @@ interface Operation extends Closeable { */ Activity getActivity(); + /** + * Create a child operation using the same transaction and representing a new activity, which will be + * linked as a subordinate to the current one. Pass anything that can affect the child activity, at + * minimum an Activity.Type and a description. + * + * @param activityData + * @return + */ + Operation createChild(Object... activityData); + /** * Store the passed asset, return its unique long ID. * @@ -89,76 +99,6 @@ void linkToRootNode(RuntimeAsset destination, DigitalTwin.Relationship relationship, Object... additionalProperties); - - // - // /** - // * Run the operation as configured and return the ID of the last object created or - // modified, or - // * {@link Observation#UNASSIGNED_ID} if the operation failed or was wrongly defined. - // * - // * @return a valid ID or {@link Observation#UNASSIGNED_ID} - // * @deprecated just run each operation and rollback if unsuccessful - // */ - // long run(ContextScope scope); - // - // /** - // * Add the passed runtime asset, which must not be part of the graph already. - // *

- // * Sets the current link target to the created object. - // * - // * @param observation - // * @return - // * @deprecated use store - // */ - // Operation add(RuntimeAsset observation); - // - // /** - // * Sets an existing asset as the target for future links or updates called on the - // operation. If - // * properties are passed that differ from the existing ones, an update is performed at - // * {@link #run(ContextScope)} and the appropriate provenance records are inserted in the - // graph. - // * - // * @param source - // * @return - // * @deprecated shouldn't be necessary, use update on the main KG - // */ - // Operation set(RuntimeAsset source, Object... properties); - // - // /** - // * Link the last asset referenced in the call chain to the passed asset, which must exist - // . Use the - // * additional parameters to specify the link, which should include a link type unless - // that can be - // * inferred unambiguously, and can include PODs for properties as needed. Creates an - // outgoing - // * connection from the current asset to the passed one. - // *

- // * When returning, the target is still set as before the link call, so that various link - // calls can be - // * chained. - // * - // * @param assetFrom - // * @param assetTo - // * @param linkData - // * @return - // * @deprecated use link above, remove this - // */ - // Operation link(RuntimeAsset assetFrom, RuntimeAsset assetTo, DigitalTwin.Relationship - // relationship, - // Object... linkData); - // - // /** - // * Link the passed asset directly to the root object of reference - provenance, context - // or dataflow. - // * - // * @param asset - // * @param linkData - // * @return - // * @deprecated - // */ - // Operation rootLink(RuntimeAsset asset, Object... linkData); - /** * Call after run() when the activity has finished without errors to ensure that all info in the * knowledge graph is up to date from a successful run. @@ -179,40 +119,6 @@ void linkToRootNode(RuntimeAsset destination, Operation fail(ContextScope scope, Object... assets); } - // /** - // * Create a new operation to be run on the graph, resulting in assets being created or modified, - // with - // * recording of provenance information. As a result, even an atomic operation may add several - // assets and - // * relationships. Unless the operation is empty, exactly one Activity node is always created at - // run(). - // *

- // * If no parameters are passed, the first target must be set manually on the returned operation. - // Otherwise - // * the target will be set according to which parameters are passed and their existence in the - // graph. No - // * change is made to the graph until {@link Operation#run(ContextScope)} is called. - // *

- // * Obtaining and running an operation (i.e. creating a new Activity in the knowledge graph) will - // also - // * reset the idle time counter to 0 for those scopes whose expiration is IDLE_TIME. Operations - // may be - // * invoked on the scope by the API user, but also triggered by applications, behaviors etc. - // *

- // * When the operation has run, call success() or fail() to update the knowledge graph with any - // IDs and - // * states. - // * - // * @param agent the agent that will own the activity created - // * @param scope the specific scope, whose observer and context will determine the links made - // * @param targets any additional parameters. A string will be interpreted as the description of the - // * activity generated. An activity will be interpreted as the parent for the activity - // * generated. - // * @return a graph modification operation description ready for run() - // * @deprecated - // */ - // Operation activity(Agent agent, ContextScope scope, Object... targets); - /** * Create a new operation with a new activity and a transaction, which can be committed or rolled back * after using it to define the graph. diff --git a/klab.core.api/src/main/java/org/integratedmodelling/klab/api/provenance/Activity.java b/klab.core.api/src/main/java/org/integratedmodelling/klab/api/provenance/Activity.java index 0db548f36..8ca8bfd4f 100644 --- a/klab.core.api/src/main/java/org/integratedmodelling/klab/api/provenance/Activity.java +++ b/klab.core.api/src/main/java/org/integratedmodelling/klab/api/provenance/Activity.java @@ -80,13 +80,6 @@ enum Outcome { */ long getCredits(); - /** - * The activity, if any, that represent a super-activity of this one - * - * @return - */ - Activity getParent(); - /** * Logs each time that the action was executed (in lieu of having an action per each execution). Empty for * any action that wasn't called by the scheduler. If not empty the first time could be initialization or diff --git a/klab.core.api/src/main/java/org/integratedmodelling/klab/api/provenance/impl/ActivityImpl.java b/klab.core.api/src/main/java/org/integratedmodelling/klab/api/provenance/impl/ActivityImpl.java index e6060aa70..f5e01c168 100644 --- a/klab.core.api/src/main/java/org/integratedmodelling/klab/api/provenance/impl/ActivityImpl.java +++ b/klab.core.api/src/main/java/org/integratedmodelling/klab/api/provenance/impl/ActivityImpl.java @@ -16,7 +16,6 @@ public class ActivityImpl extends ProvenanceNodeImpl implements Activity { private String description; private long id; private String taskId; - private Activity parent; private Outcome outcome; private String stackTrace; @@ -89,15 +88,6 @@ public void setId(long id) { this.id = id; } - @Override - public Activity getParent() { - return parent; - } - - public void setParent(Activity parent) { - this.parent = parent; - } - /** * Non-API: the ID path through which the activity hierarchy can be reconstructed. When an activity is * included in a message, the taskId gets in there. diff --git a/klab.core.common/src/main/java/org/integratedmodelling/common/runtime/ActuatorImpl.java b/klab.core.common/src/main/java/org/integratedmodelling/common/runtime/ActuatorImpl.java index 29df17208..175f15d4e 100644 --- a/klab.core.common/src/main/java/org/integratedmodelling/common/runtime/ActuatorImpl.java +++ b/klab.core.common/src/main/java/org/integratedmodelling/common/runtime/ActuatorImpl.java @@ -1,6 +1,7 @@ package org.integratedmodelling.common.runtime; import org.integratedmodelling.klab.api.collections.Parameters; +import org.integratedmodelling.klab.api.data.KnowledgeGraph; import org.integratedmodelling.klab.api.geometry.Geometry; import org.integratedmodelling.klab.api.knowledge.Artifact; import org.integratedmodelling.klab.api.knowledge.Observable; @@ -28,7 +29,8 @@ public class ActuatorImpl implements Actuator { private Actuator.Type actuatorType; private long internalId; // ID within the graph, can't be the same as the observation private double resolvedCoverage; - private Activity activity; + + private transient KnowledgeGraph.Operation operation; @Override public long getId() { @@ -142,16 +144,18 @@ public void setInternalId(long internalId) { * * @return */ - public Activity getActivity() { - return activity; + public KnowledgeGraph.Operation getOperation() { + return operation; } - public void setActivity(Activity activity) { - this.activity = activity; + public void setOperation(KnowledgeGraph.Operation operation) { + this.operation = operation; } @Override public String toString() { return "ActuatorImpl{ " + this.id + "}"; } + + } diff --git a/klab.services.resolver/src/main/java/org/integratedmodelling/klab/services/resolver/ResolverService.java b/klab.services.resolver/src/main/java/org/integratedmodelling/klab/services/resolver/ResolverService.java index 47bb8e161..b0f996378 100644 --- a/klab.services.resolver/src/main/java/org/integratedmodelling/klab/services/resolver/ResolverService.java +++ b/klab.services.resolver/src/main/java/org/integratedmodelling/klab/services/resolver/ResolverService.java @@ -138,482 +138,6 @@ public String serviceId() { return configuration.getServiceId(); } - // /** - // * Top-level resolution, resolve and return an independent resolution graph. This creates a new - // resolution - // * graph which will contain any observations that were already resolved within the context - // observation in - // * the scope, if any. - // * - // * @param scope - // * @return - // */ - // public Resolution computeResolution(Observation observation, ContextScope scope) { - // - // var resolutionGeometry = scope.getObservationGeometry(observation); - // - // if (resolutionGeometry == null || resolutionGeometry.isEmpty()) { - // return ResolutionImpl.empty(observation, scope); - // } - // - // var scale = Scale.create(resolutionGeometry, scope); - // - // ResolutionImpl ret = new ResolutionImpl(observation.getObservable(), scale, scope); - // var coverage = resolveObservation(observation, scale, scope, ret, null); - // - // if (!coverage.isRelevant()) { - // ret.setEmpty(); - // } - // - // return ret; - // - // } - - // /** - // * Top-level resolution, resolve and return an independent resolution graph. This creates a new - // resolution - // * graph which will contain any observations that were already resolved within the context - // observation in - // * the scope, if any. - // * - // * @param knowledge - // * @param scope - // * @return - // */ - // public Resolution computeResolution(Resolvable knowledge, ContextScope scope) { - // - // Geometry resolutionGeometry = Geometry.EMPTY; - // - // Resolvable observable = switch (knowledge) { - // case Concept concept -> Observable.promote(concept); - // case Model model -> model.getObservables().get(0); - // case Observable obs -> obs; - // case Observation observation -> { - // resolutionGeometry = observation.getGeometry(); - // yield observation.getObservable(); - // } - // default -> null; - // }; - // - // if (observable == null) { - // // FIXME this should just set the resolution to an error state and return it - // throw new KlabIllegalStateException("knowledge " + knowledge + " is not resolvable"); - // } - // - // if (scope.getContextObservation() != null) { - // resolutionGeometry = scope.getContextObservation().getGeometry(); - // } else if (resolutionGeometry.isEmpty() && scope.getObserver() != null) { - // resolutionGeometry = scope.getObserver().getObserverGeometry(); - // } - // - // var scale = Scale.create(resolutionGeometry); - // - // ResolutionImpl ret = new ResolutionImpl(observable, scale, scope); - // if (knowledge instanceof Model) { - // resolveModel((Model) knowledge, observable, scale, - // scope.withResolutionNamespace(((Model) knowledge).getNamespace()), - // ret); - // } else if (observable instanceof Observable obs) { - // resolveObservable(obs, scale, scope, ret, null); - // } // TODO the rest - // - // return ret; - // - // } - - // private Coverage resolveObservation(Observation observation, Scale scale, ContextScope scope, - // ResolutionImpl parent, Model parentModel) { - // - // var observable = observation.getObservable(); - // Coverage ret = Coverage.create(scale, 0.0); - // - // // observation may have been resolved already. Also it could be being resolved from upstream, - // and - // // infinite recursion is fun but helps nobody. - // if (observation.isResolved() || parent.checkResolving(observable)) { - // return Coverage.universal(); - // } - // - // for (ObservationStrategy strategy : - // scope.getService(Reasoner.class).computeObservationStrategies(observation, scope)) { - // // this merges any useful strategy and returns the coverage - // ResolutionImpl resolution = resolveStrategy(strategy, scale, scope, parent, parentModel); - // ret = ret.merge(resolution.getCoverage(), LogicalConnector.UNION); - // if (ret.getGain() < MINIMUM_WORTHWHILE_CONTRIBUTION) { - // continue; - // } - // if (ret.isRelevant()) { - // // merge the resolution with the parent resolution - // parent.merge(parentModel, resolution, ResolutionType.DIRECT); - // if (parent.getCoverage().isComplete()) { - // break; - // } - // } - // } - // - // return ret; - // } - - // - // /** - // * We always resolve an observable first. This only reports coverage as it does not directly - // create a - // * resolution graph; this is done when resolving a model, which creates a graph and merges it - // with the - // * parent graph if successful. - // * - // * @param observable - // * @param parent - // * @return - // */ - // private Coverage resolveObservable(Observable observable, Scale scale, ContextScope scope, - // ResolutionImpl parent, Model parentModel) { - // - // /** - // * Make graph merging parent Set coverage to scale, 0; Strategies/models: foreach model: - // * resolve to new graph for same observable and add coverage; merge(union) if gain is - // * significant break when models are finished or coverage is complete if graph.coverage is - // * sufficient, merge into parent at parent model (root if null) return coverage is - // * sufficient - // */ - // Coverage ret = Coverage.create(scale, 0.0); - // - // // infinite recursion is nice but wastes time - // if (parent.checkResolving(observable)) { - // return Coverage.universal(); - // } - // - // // this returns an existing observation (resolved or not) or a new one with the unresolved ID - // Observation observation = requireObservation(observable, scope); - // - // if (observation.isResolved()) { - // // we have it: TODO must be in the resolution graph? - // return Coverage.universal(); - // } else if (observation.getId() >= 0) { - // return Coverage.empty(); - // } - // - // // see what the reasoner thinks of this observable - // for (ObservationStrategy strategy : - // scope.getService(Reasoner.class).computeObservationStrategies( - // observation, - // scope)) { - // // this merges any useful strategy and returns the coverage - // ResolutionImpl resolution = resolveStrategy(strategy, scale, scope, parent, parentModel); - // ret = ret.merge(resolution.getCoverage(), LogicalConnector.UNION); - // if (ret.getGain() < MINIMUM_WORTHWHILE_CONTRIBUTION) { - // continue; - // } - // if (ret.isRelevant()) { - // // merge the resolution with the parent resolution - // parent.merge(parentModel, resolution, ResolutionType.DIRECT); - // if (parent.getCoverage().isComplete()) { - // break; - // } - // } - // } - // - // return ret; - // } - - // /** - // * If the runtime contains the observation, return it (in resolved or unresolved status but with - // a valid - // * ID). Otherwise create one in the geometry that the scope implies, with the unresolved ID, and - // return it - // * for submission to the knowledge graph. - // * - // * @param observable - // * @param scope - // * @return a non-null observation - // */ - // private Observation requireObservation(Observable observable, ContextScope scope) { - // var ret = scope.query(Observation.class, observable); - // if (ret.isEmpty()) { - // - // var newObs = DigitalTwin.createObservation(scope, observable); - // if (SemanticType.isSubstantial(observable.getSemantics().getType())) { - // // TODO determine the right geometry and add it - // - // } - // var id = scope.getService(RuntimeService.class).submit(newObs, scope, false); - // if (id >= 0) { - // ret = scope.query(Observation.class, observable); - // } - // } - // - // if (ret.isEmpty()) { - // throw new KlabInternalErrorException("Observation of " + observable.getUrn() + " couldn't - // be " + - // "instantiated"); - // } - // - // return ret.getFirst(); - // } - - // /** - // * Resolve a single observation strategy; if the resolution succeeds, merge the resolution with the - // * parent. - // * - // * @param strategy - // * @param scale - // * @param scope - // * @param parent - // * @param parentModel - // * @return - // */ - // private ResolutionImpl resolveStrategy(ObservationStrategy strategy, Scale scale, - // ContextScope scope, - // ResolutionImpl parent, - // Model parentModel) { - // - // var coverage = Coverage.create(scale, 0.0); - // ResolutionImpl ret = null; - // - // for (var operation : strategy.getOperations()) { - // switch (operation.getType()) { - // case RESOLVE -> { - // /* - // Additional resolution for a different observable, have the runtime produce the - // observation, if resolved we're done, otherwise invoke resolution recursively - // */ - // ret = new ResolutionImpl(operation.getObservable(), scale, scope, - // parent); - // // TODO have the runtime create the observation - // // TODO resolve it and merge the resolution - // } - // case OBSERVE -> { - // - // /* - // Find models and compile them in, merge resolutions until satisfied. We pass the - // scale - // through scope constraints. - // */ - // List constraints = new ArrayList<>(); - // constraints.add(ResolutionConstraint.of( - // ResolutionConstraint.Type.Geometry, - // scale.as(Geometry.class))); - // if (parentModel != null) { - // constraints.add(ResolutionConstraint.of( - // ResolutionConstraint.Type.ResolutionNamespace, - // parentModel.getNamespace())); - // constraints.add(ResolutionConstraint.of( - // ResolutionConstraint.Type.ResolutionProject, - // parentModel.getProjectName())); - // } - // - // scope = scope.withResolutionConstraints(constraints.toArray - // (ResolutionConstraint[]::new)); - // - // ret = new ResolutionImpl(operation.getObservable(), scale, scope, parent); - // - // for (Model model : queryModels(operation.getObservable(), scope, scale)) { - // - // ResolutionImpl resolution = resolveModel(model, operation.getObservable(), - // scale, - // scope.withResolutionConstraints( - // ResolutionConstraint.of( - // ResolutionConstraint.Type.ResolutionNamespace, - // model.getNamespace()), - // ResolutionConstraint.of( - // ResolutionConstraint.Type.ResolutionProject, - // model.getProjectName())), - // parent); - // coverage = coverage.merge(resolution.getCoverage(), LogicalConnector.UNION); - // if (coverage.getGain() < MINIMUM_WORTHWHILE_CONTRIBUTION) { - // continue; - // } - // // merge the model at root level within the local resolution - // resolution.merge(model, coverage, operation.getObservable(), ResolutionType - // .DIRECT); - // if (coverage.isRelevant()) { - // // merge the resolution with the parent resolution - // ret.merge(parentModel, resolution, ResolutionType.DIRECT); - // if (parent.getCoverage().isComplete()) { - // break; - // } - // } - // } - // - // } - // case APPLY -> { - // // resolve the contextualizers merging the necessary resource set, coverage is - // // unchanged unless contextualizers are not available - // } - // } - // - // // add any deferrals to the compiled strategy node - // if (!ret.isEmpty()) { - // for (var deferral : operation.getContextualStrategies()) { - // - // } - // } - // } - // - // return ret; - // - // // ResolutionImpl ret = new ResolutionImpl(strategy.getOriginalObservable(), scale, - // scope, - // // parent); - // // - // // for (Pair - // // operation : - // // strategy) { - // // switch (operation.getFirst()) { - // // case OBSERVE -> { - // // for (Model model : queryModels(operation.getSecond().observable(), scope, - // // scale)) { - // // ResolutionImpl resolution = resolveModel(model, strategy - // // .getOriginalObservable(), - // // scale, - // // scope.withResolutionNamespace(model.getNamespace()), parent); - // // coverage = coverage.merge(resolution.getCoverage(), - // LogicalConnector.UNION); - // // if (coverage.getGain() < MINIMUM_WORTHWHILE_CONTRIBUTION) { - // // continue; - // // } - // // // merge the model at root level within the local resolution - // // resolution.merge(model, coverage, strategy.getOriginalObservable(), - // // ResolutionType.DIRECT); - // // if (coverage.isRelevant()) { - // // // merge the resolution with the parent resolution - // // ret.merge(parentModel, resolution, ResolutionType.DIRECT); - // // if (parent.getCoverage().isComplete()) { - // // break; - // // } - // // } - // // } - // // } - // // case RESOLVE -> { - // // - // // } - // // case APPLY -> { - // // } - // // case CONCRETIZE -> { - // // // TODO deprecated? - // // } - // // } - // // } - // // - // // return ret; - // // return null; - // } - - // /** - // * Parent is for the observable, model gets added if it contributes, then its dependencies - // * - // * @param model - // * @param scale - // * @param scope - // * @param parent - // * @return - // */ - // private ResolutionImpl resolveModel(Model model, Resolvable observable, Scale scale, ContextScope - // scope, - // ResolutionImpl parent) { - // - // ResolutionImpl ret = new ResolutionImpl(observable, scale, scope, parent); - // Coverage coverage = Coverage.create(scale, 1.0); - // if (!model.getCoverage().isEmpty()) { - // coverage = coverage.merge(model.getCoverage(), LogicalConnector.INTERSECTION); - // } - // for (Observable dependency : model.getDependencies()) { - // - // /** - // * TODO NOW - the scope must be adjusted for the observable based on the dependent - // * or substantial character - // */ - // - // Coverage depcoverage = resolveObservable(dependency, scale, scope, parent, model); - // coverage = coverage.merge(depcoverage, LogicalConnector.INTERSECTION); - // if (coverage.isEmpty()) { - // break; - // } - // } - // return ret.withCoverage(coverage); - // } - - // @Override - - // - // @SuppressWarnings("unchecked") - // public T resolveKnowledge(String urn, Scope scope) { - // - // Knowledge ret = null; - // - // var resources = scope.getService(ResourcesService.class); - // var reasoner = scope.getService(Reasoner.class); - // - // switch (Urn.classify(urn)) { - //// case KIM_OBJECT: - //// ResourceSet set = resources.resolve(urn, scope); - //// if (set.getResults().size() == 1) { - //// ret = loadKnowledge(set, scope); - //// } - //// break; - // case OBSERVABLE: - // ret = reasoner.resolveObservable(urn); - // break; - // case RESOURCE: - // // var resource = resources.resolveResource(urn, scope); - // // TODO make a ModelImpl that observes this. - // break; - // case REMOTE_URL: - // case UNKNOWN: - // scope.error("cannot resolve URN '" + urn + "' to observable knowledge"); - // break; - // } - // return (T) ret; - // } - - // /** - // * Return the first resource in results, or null. - // * - // * @param set - // * @param scope - // * @return - // */ - // private Knowledge loadKnowledge(ResourceSet set, Scope scope) { - // List result = loadResourceSet(set, scope); - // return result.size() > 0 ? result.get(0) : null; - // } - - // /** - // * Load all the knowledge in the set from the respective services in scope, including resolving - // components - // * if any. - // * - // * @param set - // * @param scope - // * @return - // */ - // private List loadResourceSet(ResourceSet set, Scope scope) { - // List ret = new ArrayList<>(); - // for (Resource namespace : set.getNamespaces()) { - // loadNamespace(namespace, scope); - // } - // for (Resource result : set.getResults()) { - // switch (result.getKnowledgeClass()) { - // // case INSTANCE: - // // Instance instance = instances.get(result.getResourceUrn()); - // // if (instance != null) { - // // ret.add(instance); - // // } - // // break; - // case MODEL: - // Model model = models.get(result.getResourceUrn()); - // if (model != null) { - // ret.add(model); - // } - // break; - // default: - // break; - // } - // } - // return ret; - // } - private List loadNamespace(KimNamespace namespace, Scope scope) { List ret = new ArrayList<>(); @@ -625,66 +149,6 @@ private List loadNamespace(KimNamespace namespace, Scope scope) { return ret; } - // private Observation loadInstance(KimInstance statement, Scope scope) { - // - // var reasoner = scope.getService(Reasoner.class); - ////// - ////// InstanceImpl instance = new InstanceImpl(); - ////// instance.setNamespace(statement.getNamespace()); - ////// instance.getAnnotations().addAll(statement.getAnnotations()); - ////// instance.setObservable(reasoner.declareObservable(statement.getObservable()).builder - // (scope).as(DescriptionType.ACKNOWLEDGEMENT).build()); - ////// instance.setUrn(statement.getNamespace() + "." + statement.getName()); - ////// instance.setMetadata(statement.getMetadata()); - ////// instance.setScale(createScaleFromBehavior(statement.getBehavior(), scope)); - ////// - ////// for (KimObservable state : statement.getStates()) { - ////// instance.getStates().add(reasoner.declareObservable(state)); - ////// } - ////// for (var child : statement.getChildren()) { - ////// instance.getChildren().add(loadInstance( child, scope)); - ////// } - //// - //// return instance; - // return null; - // } - - // private Scale createScaleFromBehavior(KimBehavior behavior, Scope scope) { - // - // Scale scale = null; - // List> extents = new ArrayList<>(); - // var languageService = Configuration.INSTANCE.getService(Language.class); - // - // if (behavior != null) { - // for (ServiceCall call : behavior.getExtentFunctions()) { - // var ext = languageService.execute(call, scope, Object.class); - // if (ext instanceof Scale) { - // scale = (Scale) ext; - // } else if (ext instanceof Geometry) { - // scale = Scale.create((Geometry) ext); - // } else if (ext instanceof Extent) { - // extents.add((Extent) ext); - // } else { - // throw new KlabIllegalStateException("the call to " + call.getUrn() + " did not - // produce a " + - // "scale or " + - // "an extent"); - // } - // } - // } - // if (scale != null) { - // for (Extent extent : extents) { - // scale = scale.mergeExtent(extent); - // } - // } else if (!extents.isEmpty()) { - // scale = Scale.create(extents); - // } else { - // scale = Scale.empty(); - // } - // - // return scale; - // } - private Model loadModel(KimModel statement, Scope scope) { var reasoner = scope.getService(Reasoner.class); @@ -719,33 +183,6 @@ private Model loadModel(KimModel statement, Scope scope) { return model; } - // /** - // * Query all the resource servers available in the scope to find the models that can observe the - // passed - // * observable. The result should be ranked in decreasing order of fit to the context and the - // * RESOLUTION_SCORE ranking should be in their metadata. - // * - // * @param observable - // * @param scope - // * @return - // */ - // @Override - // public List queryModels(Observable observable, ContextScope scope, Scale scale) { - // - // var prioritizer = new PrioritizerImpl(scope, scale); - // - // System.out.println("QUERYING MODELS FOR " + observable); - // - // // FIXME use virtual threads & join() to obtain a synchronized list of ResourceSet, then - // // use a merging strategy to get models one by one in their latest release - // var resources = scope.getService(ResourcesService.class); - // - // ResourceSet models = resources.queryModels(observable, scope); - // var ret = new ArrayList(KnowledgeRepository.INSTANCE.ingest(models, scope, Model.class)); - // ret.sort(prioritizer); - // return ret; - // } - @Override public boolean scopesAreReactive() { return false; diff --git a/klab.services.runtime/src/main/java/org/integratedmodelling/klab/services/runtime/ExecutionSequence.java b/klab.services.runtime/src/main/java/org/integratedmodelling/klab/services/runtime/ExecutionSequence.java index 467153434..64bb36836 100644 --- a/klab.services.runtime/src/main/java/org/integratedmodelling/klab/services/runtime/ExecutionSequence.java +++ b/klab.services.runtime/src/main/java/org/integratedmodelling/klab/services/runtime/ExecutionSequence.java @@ -1,8 +1,10 @@ package org.integratedmodelling.klab.services.runtime; +import org.integratedmodelling.common.runtime.ActuatorImpl; import org.integratedmodelling.klab.api.Klab; import org.integratedmodelling.klab.api.collections.Pair; import org.integratedmodelling.klab.api.collections.Parameters; +import org.integratedmodelling.klab.api.data.KnowledgeGraph; import org.integratedmodelling.klab.api.data.Storage; import org.integratedmodelling.klab.api.digitaltwin.DigitalTwin; import org.integratedmodelling.klab.api.geometry.Geometry; @@ -14,7 +16,6 @@ import org.integratedmodelling.klab.api.lang.ServiceCall; import org.integratedmodelling.klab.api.scope.ContextScope; import org.integratedmodelling.klab.api.scope.Scope; -import org.integratedmodelling.klab.api.services.CoreLibrary; import org.integratedmodelling.klab.api.services.Language; import org.integratedmodelling.klab.api.services.RuntimeService; import org.integratedmodelling.klab.api.services.runtime.Actuator; @@ -136,9 +137,15 @@ class ExecutorOperation { private final Observation observation; protected List> executors = new ArrayList<>(); private boolean scalar; + private KnowledgeGraph.Operation operation; public ExecutorOperation(Actuator actuator) { this.id = actuator.getId(); + if (actuator instanceof ActuatorImpl actuator1) { + this.operation = actuator1.getOperation(); + // remove to avoid leaks after using the actuator as a shuttle + actuator1.setOperation(null); + } this.observation = scope.getObservation(this.id); compile(actuator); } @@ -154,7 +161,7 @@ private void compile(Actuator actuator) { var preset = RuntimeService.CoreFunctor.classify(call); if (preset != null) { - switch(preset) { + switch (preset) { case URN_RESOLVER -> { } case URN_INSTANTIATOR -> { @@ -286,13 +293,18 @@ public boolean run() { long start = System.currentTimeMillis(); for (var executor : executors) { if (!executor.get()) { + if (operation != null) { + operation.fail(scope, observation); + } return false; } } long time = System.currentTimeMillis() - start; - scope.getDigitalTwin().knowledgeGraph().update(observation, scope, "msInitialization" - , time, "resolved", true, "coverage", resolvedCoverage); + + if (operation != null) { + operation.success(scope, observation, resolvedCoverage); + } return true; } diff --git a/klab.services.runtime/src/main/java/org/integratedmodelling/klab/services/runtime/RuntimeService.java b/klab.services.runtime/src/main/java/org/integratedmodelling/klab/services/runtime/RuntimeService.java index 61447cf1e..c56b318ea 100644 --- a/klab.services.runtime/src/main/java/org/integratedmodelling/klab/services/runtime/RuntimeService.java +++ b/klab.services.runtime/src/main/java/org/integratedmodelling/klab/services/runtime/RuntimeService.java @@ -295,7 +295,8 @@ public long submit(Observation observation, ContextScope scope) { try (instantiation) { var ret = instantiation.store(observation); - instantiation.link(instantiation.getActivity(), observation, DigitalTwin.Relationship.CREATED); + instantiation.link(instantiation.getActivity(), observation, + DigitalTwin.Relationship.CREATED); if (scope.getContextObservation() != null) { instantiation.link(scope.getContextObservation(), observation, DigitalTwin.Relationship.HAS_CHILD); @@ -454,8 +455,8 @@ public Observation runDataflow(Dataflow dataflow, ContextScope cont for (var rootActuator : dataflow.getComputation()) { ExecutionSequence executionSequence = ExecutionSequence.compile(sortComputation(rootActuator, dataflow, - contextScope), (dataflow instanceof DataflowImpl dataflow1 ? - dataflow1.getResolvedCoverage() : 1.0), + contextScope, contextualization), (dataflow instanceof DataflowImpl dataflow1 ? + dataflow1.getResolvedCoverage() : 1.0), (ServiceContextScope) contextScope, digitalTwin, getComponentRegistry()); if (!executionSequence.isEmpty()) { if (!executionSequence.run()) { @@ -484,23 +485,27 @@ private DigitalTwin getDigitalTwin(ContextScope contextScope) { "implementation"); } - private Graph computeActuatorOrder(Actuator rootActuator, ContextScope scope) { + private Graph computeActuatorOrder(Actuator rootActuator, ContextScope scope, + KnowledgeGraph.Operation contextualization) { Graph dependencyGraph = new DefaultDirectedGraph<>(DefaultEdge.class); Map cache = new HashMap<>(); - loadGraph(rootActuator, dependencyGraph, cache); + loadGraph(rootActuator, dependencyGraph, cache, contextualization); // keep the actuators that do nothing so we can tag their observation as resolved return dependencyGraph; } private void loadGraph(Actuator rootActuator, Graph dependencyGraph, Map cache) { + Actuator> cache, KnowledgeGraph.Operation contextualization) { + + var childContextualization = contextualization.createChild(rootActuator, "Contextualization of " + rootActuator); + cache.put(rootActuator.getId(), rootActuator); dependencyGraph.addVertex(rootActuator); for (Actuator child : rootActuator.getChildren()) { if (child.getActuatorType() == Actuator.Type.REFERENCE) { dependencyGraph.addEdge(cache.get(child.getId()), rootActuator); } else { - loadGraph(child, dependencyGraph, cache); + loadGraph(child, dependencyGraph, cache, childContextualization); dependencyGraph.addEdge(child, rootActuator); } } @@ -572,12 +577,13 @@ public List getSessionInfo(Scope scope) { */ private List> sortComputation(Actuator rootActuator, Dataflow dataflow, - ContextScope scope) { + ContextScope scope, + KnowledgeGraph.Operation contextualization) { List> ret = new ArrayList<>(); int executionOrder = 0; Map branch = new HashMap<>(); Set group = new HashSet<>(); - var dependencyGraph = computeActuatorOrder(rootActuator, scope); + var dependencyGraph = computeActuatorOrder(rootActuator, scope, contextualization); for (var nextActuator : ImmutableList.copyOf(new TopologicalOrderIterator<>(dependencyGraph))) { if (nextActuator.getActuatorType() != Actuator.Type.REFERENCE) { ret.add(Pair.of(nextActuator, (executionOrder = checkExecutionOrder diff --git a/klab.services.runtime/src/main/java/org/integratedmodelling/klab/services/runtime/neo4j/KnowledgeGraphNeo4j.java b/klab.services.runtime/src/main/java/org/integratedmodelling/klab/services/runtime/neo4j/KnowledgeGraphNeo4j.java index 2a5df8d4e..eb6d270fe 100644 --- a/klab.services.runtime/src/main/java/org/integratedmodelling/klab/services/runtime/neo4j/KnowledgeGraphNeo4j.java +++ b/klab.services.runtime/src/main/java/org/integratedmodelling/klab/services/runtime/neo4j/KnowledgeGraphNeo4j.java @@ -101,6 +101,7 @@ public class OperationImpl implements Operation { private Throwable exception; private Object[] assets; private OperationImpl parent; + private Actuator actuator; @Override public Agent getAgent() { @@ -112,6 +113,38 @@ public Activity getActivity() { return this.activity; } + @Override + public Operation createChild(Object... activityData) { + + var activity = new ActivityImpl(); + var ret = new OperationImpl(); + + ret.agent = agent; + ret.transaction = transaction; + ret.parent = this; + + if (activityData != null) { + for (Object o : activityData) { + if (o instanceof ActivityImpl a) { + activity = a; + } else if (o instanceof Activity.Type type) { + activity.setType(type); + } else if (o instanceof String description) { + activity.setDescription(description); + } else if (o instanceof Agent agent) { + ret.agent = agent; + } else if (o instanceof ActuatorImpl actuator) { + ret.actuator = actuator; + actuator.setOperation(ret); + } + } + } + + ret.activity = activity; + + return ret; + } + @Override public long store(RuntimeAsset asset, Object... additionalProperties) { return KnowledgeGraphNeo4j.this.store(transaction, asset, scope, additionalProperties); @@ -217,7 +250,6 @@ public Operation operation(Agent agent, Activity parentActivity, Activity.Type a // open the transaction for the remaining operations var activity = new ActivityImpl(); - activity.setParent(parentActivity); activity.setType(activityType); activity.setStart(System.currentTimeMillis()); activity.setName(activityType.name()); @@ -261,7 +293,7 @@ public Operation operation(Agent agent, Activity parentActivity, Activity.Type a protected EagerResult query(String query, Map parameters, Scope scope) { if (isOnline()) { try { - System.out.printf("\nQUERY " + query + "\n WITH " + parameters); + // System.out.printf("\nQUERY " + query + "\n WITH " + parameters); return driver.executableQuery(query).withParameters(parameters).execute(); } catch (Throwable t) { if (scope != null) { @@ -754,235 +786,6 @@ private void setId(RuntimeAsset asset, long id) { } } - // @Override - // protected long runOperation(OperationImplObsolete operation, ContextScope scope) { - // - // operation.registerAsset(scope, "id", scope.getId()); - // - // /* - // TODO use a transaction for the whole sequence! Also use the store/retrieve/link methods and - // have them take a transaction object, defaulting to null -> atomic operations - // First pass defines the activity. If we have one in the targets, that is the parent activity to - // link it to. - // */ - // - // List created = new ArrayList<>(); - // List plans = new ArrayList<>(); - // - // long ret = Observation.UNASSIGNED_ID; - // for (var step : operation.getSteps()) { - // switch (step.type()) { - // case CREATE -> { - // - // for (var target : step.targets()) { - // - // var type = getLabel(target); - // var props = asParameters(target); - // var result = query( - // Queries.CREATE_WITH_PROPERTIES.replace("{type}", type), - // Map.of("properties", asParameters(target)), scope); - // if (result != null && result.records().size() == 1) { - // ret = result.records().getFirst().get(result.keys().getFirst()).asLong(); - // if (target instanceof ObservationImpl observation) { - // - // observation.setId(ret); - // created.add(ret); - // observation.setUrn(scope.getId() + "." + ret); - // props.put("id", ret); - // props.put("urn", observation.getUrn()); - // // TODO generate the IDs internally and skip this - // query( - // Queries.UPDATE_PROPERTIES.replace("{type}", type), - // Map.of("id", ret, "properties", props), scope); - // - // // TODO store spatial and temporal boundaries or ideally the geometry - // // as is - // // using neo4j-spatial, hoping it appears on maven central - // var geometry = encodeGeometry(scope.getObservationGeometry - // (observation)); - // var geoRecord = query("MATCH (g:Geometry {definition: $definition}) " + - // "RETURN g", - // Map.of("definition", geometry), scope); - // - // if (geoRecord.records().isEmpty()) { - // query("MATCH (o:Observation {id: $observationId}) CREATE " + - // "(g:Geometry " + - // "{definition: $definition}), (o) - // -[:HAS_GEOMETRY]->" + - // "(g)", - // Map.of("observationId", ret, "definition", geometry), - // scope); - // } else { - // query("MATCH (o:Observation {id: $observationId}), (g:Geometry " + - // "{definition: $definition}) CREATE (o)" + - // "-[:HAS_GEOMETRY]->(g)", - // Map.of("observationId", ret, "definition", geometry), - // scope); - // } - // - // } else if (target instanceof ActuatorImpl actuator) { - // - // // TODO generate the ID and skip the update query - // actuator.setId(ret); - // created.add(ret); - // props.put("id", ret); - // - // query( - // Queries.UPDATE_PROPERTIES.replace("{type}", type), - // Map.of("id", ret, "properties", props), scope); - // } else if (target instanceof ActivityImpl activity) { - // - // // TODO generate the ID and skip the update query - // activity.setId(ret); - // props.put("id", ret); - // query( - // Queries.UPDATE_PROPERTIES.replace("{type}", type), - // Map.of("id", ret, "properties", props), scope); - // } else if (target instanceof PlanImpl plan) { - // - // // TODO generate the ID and skip the update query - // plan.setId(ret); - // plans.add(ret); - // props.put("id", ret); - // query( - // Queries.UPDATE_PROPERTIES.replace("{type}", type), - // Map.of("id", ret, "properties", props), scope); - // } - // - // operation.registerAsset(target, "id", ret); - // } - // } - // } - // case MODIFY -> { - // // TODO - do we need this here? maybe with scheduling - for now it's only at - // // finalization - // throw new KlabUnimplementedException("target setting or graph modification"); - // } - // case LINK -> { - // - // DigitalTwin.Relationship relationship = null; - // var props = new HashMap(); - // for (int i = 0; i < step.parameters().length; i++) { - // var arg = step.parameters()[i]; - // if (arg instanceof DigitalTwin.Relationship dr) { - // relationship = dr; - // } else { - // props.put(arg.toString(), step.parameters()[++i]); - // } - // } - // - // if (relationship != null && step.targets().size() == 2) { - // - // var query = Queries.LINK_ASSETS - // .replace("{relationshipLabel}", relationship.name()) - // .replace("{fromLabel}", getLabel(step.targets().getFirst())) - // .replace("{toLabel}", getLabel(step.targets().getLast())) - // .replace( - // "{fromKeyProperty}", - // operation.getAssetKeyProperty(step.targets().getFirst())) - // .replace( - // "{toKeyProperty}", - // operation.getAssetKeyProperty(step.targets().getLast())); - // - // query(query, Map.of("fromKey", operation.getAssetKey(step.targets().getFirst()), - // "toKey", operation.getAssetKey(step.targets().getLast())), scope); - // } - // } - // case ROOT_LINK -> { - // - // if (step.targets().getFirst() instanceof Observation observation) { - // - // var query = Queries.LINK_ASSETS - // .replace("{relationshipLabel}", DigitalTwin.Relationship.HAS_CHILD - // .name()) - // .replace("{fromLabel}", "Context") - // .replace("{toLabel}", "Observation") - // .replace("{fromKeyProperty}", "id") - // .replace("{toKeyProperty}", "id"); - // - // query(query, Map.of("fromKey", rootContextId, "toKey", observation.getId()), - // scope); - // - // } else if (step.targets().getFirst() instanceof Actuator actuator) { - // - // var query = Queries.LINK_ASSETS - // .replace("{relationshipLabel}", DigitalTwin.Relationship.HAS_CHILD - // .name()) - // .replace("{fromLabel}", "Dataflow") - // .replace("{toLabel}", "Actuator") - // .replace("{fromKeyProperty}", "id") - // .replace("{toKeyProperty}", "id"); - // - // query( - // query, - // Map.of("fromKey", rootContextId + ".DATAFLOW", "toKey", actuator - // .getId()), - // scope); - // - // } else if (step.targets().getFirst() instanceof Activity activity) { - // - // var query = Queries.LINK_ASSETS - // .replace("{relationshipLabel}", DigitalTwin.Relationship.HAS_CHILD - // .name()) - // .replace("{fromLabel}", "Provenance") - // .replace("{toLabel}", "Activity") - // .replace("{fromKeyProperty}", "id") - // .replace("{toKeyProperty}", "id"); - // - // query( - // query, - // Map.of("fromKey", rootContextId + ".PROVENANCE", "toKey", - // activity.getId()) - // , scope); - // - // } else { - // throw new KlabInternalErrorException("unexpected root link request"); - // } - // } - // } - // } - // - // /* - // Link created assets to the activity - // */ - // for (long asset : created) { - // query( - // "match (n:Activity), (c) WHERE id(n) = $fromId AND id(c) = $toId CREATE (n)" + - // "-[r:CREATED]->(c) return r", - // Map.of("fromId", operation.getActivity().getId(), "toId", asset), scope); - // } - // - // /* - // Link any plans to the activity (should be one at most) - // */ - // for (long plan : plans) { - // query( - // "match (n:Activity), (c:Plan) WHERE id(n) = $fromId AND id(c) = $toId CREATE (n)" + - // "-[r:HAS_PLAN]->(c) return r", - // Map.of("fromId", operation.getActivity().getId(), "toId", plan), scope); - // } - // - // // link the activity to the agent - // query( - // "match (n:Activity), (c:Agent) WHERE id(n) = $fromId AND c.name = $agentName CREATE - // (n)" + - // "-[r:BY_AGENT]->(c) return r", - // Map.of( - // "fromId", operation.getActivity().getId(), "agentName", - // operation.getAgent().getName()), scope); - // - // return ret; - // } - - // @Override - // protected RuntimeAsset getContextNode() { - // if (scope == null) { - // throw new KlabIllegalStateException("Access to context node in a non-contexual knowledge - // graph"); - // } - // return contextNode; - // } - @Override protected RuntimeAsset getDataflowNode() {