Skip to content

Commit

Permalink
Clean up KG transactional logics; implementation missing
Browse files Browse the repository at this point in the history
  • Loading branch information
fvilla committed Nov 13, 2024
1 parent 233b588 commit cb49609
Show file tree
Hide file tree
Showing 7 changed files with 455 additions and 659 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ interface Operation extends Closeable {

/**
* Link the two passed assets.
*
* <p>
* FIXME remove scope when the other is gone
*
* @param source
Expand All @@ -78,65 +78,74 @@ void link(RuntimeAsset source, RuntimeAsset destination,
DigitalTwin.Relationship relationship, Scope scope,
Object... additionalProperties);


/**
* Run the operation as configured and return the ID of the last object created or modified, or
* {@link Observation#UNASSIGNED_ID} if the operation failed or was wrongly defined.
*
* @return a valid ID or {@link Observation#UNASSIGNED_ID}
* @deprecated just run each operation and rollback if unsuccessful
*/
long run(ContextScope scope);

/**
* Add the passed runtime asset, which must not be part of the graph already.
* <p>
* Sets the current link target to the created object.
*
* @param observation
* @return
* @deprecated use store
*/
Operation add(RuntimeAsset observation);

/**
* Sets an existing asset as the target for future links or updates called on the operation. If
* properties are passed that differ from the existing ones, an update is performed at
* {@link #run(ContextScope)} and the appropriate provenance records are inserted in the graph.
*
* @param source
* @return
* @deprecated shouldn't be necessary, use update on the main KG
*/
Operation set(RuntimeAsset source, Object... properties);

/**
* Link the last asset referenced in the call chain to the passed asset, which must exist. Use the
* additional parameters to specify the link, which should include a link type unless that can be
* inferred unambiguously, and can include PODs for properties as needed. Creates an outgoing
* connection from the current asset to the passed one.
* <p>
* When returning, the target is still set as before the link call, so that various link calls can be
* chained.
*
* @param assetFrom
* @param assetTo
* @param linkData
* @return
* @deprecated use link above, remove this
*/
Operation link(RuntimeAsset assetFrom, RuntimeAsset assetTo, DigitalTwin.Relationship relationship,
Object... linkData);

/**
* Link the passed asset directly to the root object of reference - provenance, context or dataflow.
*
* @param asset
* @param linkData
* @return
* @deprecated
*/
Operation rootLink(RuntimeAsset asset, Object... linkData);
//
// /**
// * Run the operation as configured and return the ID of the last object created or
// modified, or
// * {@link Observation#UNASSIGNED_ID} if the operation failed or was wrongly defined.
// *
// * @return a valid ID or {@link Observation#UNASSIGNED_ID}
// * @deprecated just run each operation and rollback if unsuccessful
// */
// long run(ContextScope scope);
//
// /**
// * Add the passed runtime asset, which must not be part of the graph already.
// * <p>
// * Sets the current link target to the created object.
// *
// * @param observation
// * @return
// * @deprecated use store
// */
// Operation add(RuntimeAsset observation);
//
// /**
// * Sets an existing asset as the target for future links or updates called on the
// operation. If
// * properties are passed that differ from the existing ones, an update is performed at
// * {@link #run(ContextScope)} and the appropriate provenance records are inserted in the
// graph.
// *
// * @param source
// * @return
// * @deprecated shouldn't be necessary, use update on the main KG
// */
// Operation set(RuntimeAsset source, Object... properties);
//
// /**
// * Link the last asset referenced in the call chain to the passed asset, which must exist
// . Use the
// * additional parameters to specify the link, which should include a link type unless
// that can be
// * inferred unambiguously, and can include PODs for properties as needed. Creates an
// outgoing
// * connection from the current asset to the passed one.
// * <p>
// * When returning, the target is still set as before the link call, so that various link
// calls can be
// * chained.
// *
// * @param assetFrom
// * @param assetTo
// * @param linkData
// * @return
// * @deprecated use link above, remove this
// */
// Operation link(RuntimeAsset assetFrom, RuntimeAsset assetTo, DigitalTwin.Relationship
// relationship,
// Object... linkData);
//
// /**
// * Link the passed asset directly to the root object of reference - provenance, context
// or dataflow.
// *
// * @param asset
// * @param linkData
// * @return
// * @deprecated
// */
// Operation rootLink(RuntimeAsset asset, Object... linkData);

/**
* Call after run() when the activity has finished without errors to ensure that all info in the
Expand All @@ -158,30 +167,47 @@ Operation link(RuntimeAsset assetFrom, RuntimeAsset assetTo, DigitalTwin.Relatio
Operation fail(ContextScope scope, Object... assets);
}

// /**
// * Create a new operation to be run on the graph, resulting in assets being created or modified,
// with
// * recording of provenance information. As a result, even an atomic operation may add several
// assets and
// * relationships. Unless the operation is empty, exactly one Activity node is always created at
// run().
// * <p>
// * If no parameters are passed, the first target must be set manually on the returned operation.
// Otherwise
// * the target will be set according to which parameters are passed and their existence in the
// graph. No
// * change is made to the graph until {@link Operation#run(ContextScope)} is called.
// * <p>
// * Obtaining and running an operation (i.e. creating a new Activity in the knowledge graph) will
// also
// * reset the idle time counter to 0 for those scopes whose expiration is IDLE_TIME. Operations
// may be
// * invoked on the scope by the API user, but also triggered by applications, behaviors etc.
// * <p>
// * When the operation has run, call success() or fail() to update the knowledge graph with any
// IDs and
// * states.
// *
// * @param agent the agent that will own the activity created
// * @param scope the specific scope, whose observer and context will determine the links made
// * @param targets any additional parameters. A string will be interpreted as the description of the
// * activity generated. An activity will be interpreted as the parent for the activity
// * generated.
// * @return a graph modification operation description ready for run()
// * @deprecated
// */
// Operation activity(Agent agent, ContextScope scope, Object... targets);

/**
* Create a new operation to be run on the graph, resulting in assets being created or modified, with
* recording of provenance information. As a result, even an atomic operation may add several assets and
* relationships. Unless the operation is empty, exactly one Activity node is always created at run().
* <p>
* If no parameters are passed, the first target must be set manually on the returned operation. Otherwise
* the target will be set according to which parameters are passed and their existence in the graph. No
* change is made to the graph until {@link Operation#run(ContextScope)} is called.
* <p>
* Obtaining and running an operation (i.e. creating a new Activity in the knowledge graph) will also
* reset the idle time counter to 0 for those scopes whose expiration is IDLE_TIME. Operations may be
* invoked on the scope by the API user, but also triggered by applications, behaviors etc.
* <p>
* When the operation has run, call success() or fail() to update the knowledge graph with any IDs and
* states.
* Create a new operation with a new activity and a transaction, which can be committed or rolled back
* after using it to define the graph.
*
* @param agent the agent that will own the activity created
* @param scope the specific scope, whose observer and context will determine the links made
* @param targets any additional parameters. A string will be interpreted as the description of the
* activity generated. An activity will be interpreted as the parent for the activity
* generated.
* @return a graph modification operation description ready for run()
* @return
*/
Operation activity(Agent agent, ContextScope scope, Object... targets);
Operation operation(Agent agent, Activity parentActivity, Activity.Type activityType, Object... data);

/**
* Remove all data relative to the currently contextualized scope. Graph becomes unusable after this is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,23 +389,23 @@ public void close() {
}
}

public long insertIntoKnowledgeGraph(Observation observation, Object... arguments) {

var activity = digitalTwin.knowledgeGraph().activity(Provenance.getAgent(this),
this, Utils.Collections.flatCollection(Activity.Type.INSTANTIATION, arguments).toArray()).add(observation);
if (getContextObservation() != null) {
activity = activity.link(getContextObservation(), observation,
DigitalTwin.Relationship.HAS_CHILD);
} else {
activity = activity.rootLink(observation);
}

if (getObserver() != null) {
activity = activity.link(observation, getObserver(), DigitalTwin.Relationship.HAS_OBSERVER);
}

return activity.run(this);
}
// public long insertIntoKnowledgeGraph(Observation observation, Object... arguments) {
//
// var activity = digitalTwin.knowledgeGraph().activity(Provenance.getAgent(this),
// this, Utils.Collections.flatCollection(Activity.Type.INSTANTIATION, arguments).toArray()).add(observation);
// if (getContextObservation() != null) {
// activity = activity.link(getContextObservation(), observation,
// DigitalTwin.Relationship.HAS_CHILD);
// } else {
// activity = activity.rootLink(observation);
// }
//
// if (getObserver() != null) {
// activity = activity.link(observation, getObserver(), DigitalTwin.Relationship.HAS_OBSERVER);
// }
//
// return activity.run(this);
// }

public Parallelism getParallelism() {
// TODO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ public class RuntimeServerController {
if (contextScope instanceof ServiceContextScope serviceContextScope) {
var agent =
serviceContextScope.getDigitalTwin().knowledgeGraph().requireAgent(resolutionRequest.getAgentName());
return serviceContextScope
.withResolutionConstraints(ResolutionConstraint.of(ResolutionConstraint.Type.Provenance, agent))
.insertIntoKnowledgeGraph(resolutionRequest.getObservation(),
"Observation of " + resolutionRequest.getObservation().getObservable().getUrn() + " made through REST endpoint in runtime " + runtimeService.klabService().serviceId());
var scope = serviceContextScope
.withResolutionConstraints(ResolutionConstraint.of(ResolutionConstraint.Type.Provenance, agent));
return runtimeService.klabService().submit(resolutionRequest.getObservation(), scope);
}
}
throw new KlabInternalErrorException("Unexpected implementation of request authorization");
Expand Down
Loading

0 comments on commit cb49609

Please sign in to comment.