Skip to content

Commit

Permalink
Moving on
Browse files Browse the repository at this point in the history
  • Loading branch information
Ferdinando Villa committed Nov 14, 2024
1 parent 4c39cf7 commit 3a24fcb
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ public class KnowledgeGraphNeo4JClient extends KnowledgeGraphNeo4j implements Kn
// TODO connect to a DB and run a driver


@Override
public Operation operation(Agent agent, Activity parentActivity, Activity.Type activityType, Object... data) {
return null;
}

@Override
public KnowledgeGraph contextualize(ContextScope scope) {
return null;
Expand All @@ -41,11 +36,6 @@ public <T extends RuntimeAsset> List<T> get(RuntimeAsset source, DigitalTwin.Rel
return List.of();
}

// @Override
// public boolean canDistribute() {
// return false;
// }

@Override
public KnowledgeGraph merge(URL remoteDigitalTwinURL) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@

import org.integratedmodelling.common.logging.Logging;
import org.integratedmodelling.common.runtime.ActuatorImpl;
import org.integratedmodelling.klab.api.data.KnowledgeGraph;
import org.integratedmodelling.klab.api.data.RuntimeAsset;
import org.integratedmodelling.klab.api.digitaltwin.DigitalTwin;
import org.integratedmodelling.klab.api.exceptions.KlabIllegalArgumentException;
import org.integratedmodelling.klab.api.exceptions.KlabIllegalStateException;
import org.integratedmodelling.klab.api.exceptions.KlabInternalErrorException;
import org.integratedmodelling.klab.api.exceptions.KlabUnimplementedException;
import org.integratedmodelling.klab.api.geometry.Geometry;
import org.integratedmodelling.klab.api.knowledge.Observable;
import org.integratedmodelling.klab.api.knowledge.observation.Observation;
Expand All @@ -34,8 +32,8 @@
import org.neo4j.driver.*;

import java.io.IOException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;

/**
* TODO check spatial queries: https://www.lyonwj.com/blog/neo4j-spatial-procedures-congressional-boundaries
Expand All @@ -50,7 +48,7 @@ public abstract class KnowledgeGraphNeo4j extends AbstractKnowledgeGraph {
protected Agent user;
protected Agent klab;
protected String rootContextId;
// private RuntimeAsset contextNode;
// private RuntimeAsset contextNode;
private RuntimeAsset dataflowNode;
private RuntimeAsset provenanceNode;

Expand All @@ -60,7 +58,7 @@ interface Queries {
String REMOVE_CONTEXT = "match (n:Context {id: $contextId})-[*]-(c) detach delete n,c";
String FIND_CONTEXT = "MATCH (ctx:Context {id: $contextId}) RETURN ctx";
// String FIND_BY_ID = "MATCH (n) WHERE id(n) = $id RETURN n";
String FIND_BY_PROPERTY = "MATCH (n:{type}) WHERE n.{property} = $value RETURN n";
// String FIND_BY_PROPERTY = "MATCH (n:{type}) WHERE n.{property} = $value RETURN n";
// retrieve ID as records().getFirst().get(keys().getFirst()) ?
String CREATE_WITH_PROPERTIES = "CREATE (n:{type}) SET n = $properties RETURN n";
String UPDATE_PROPERTIES = "MATCH (n:{type}) WHERE n.id = $id SET n += $properties";
Expand Down Expand Up @@ -89,8 +87,9 @@ interface Queries {
+ "(prov)-[:HAS_CHILD]->(creation)"};
String GET_AGENT_BY_NAME = "match (ctx:Context {id: $contextId})-->(prov:Provenance)-[:HAS_AGENT]->" +
"(a:Agent {name: $agentName}) RETURN a";
String LINK_ASSETS = "match (n:{fromLabel}), (c:{toLabel}) WHERE n.{fromKeyProperty} = $fromKey AND" +
" c.{toKeyProperty} = $toKey CREATE (n)-[r:{relationshipLabel}]->(c) return r";
// String LINK_ASSETS = "match (n:{fromLabel}), (c:{toLabel}) WHERE n.{fromKeyProperty} =
// $fromKey AND" +
// " c.{toKeyProperty} = $toKey CREATE (n)-[r:{relationshipLabel}]->(c) return r";
}

/**
Expand All @@ -107,6 +106,7 @@ public class OperationImpl implements Operation {
private Scope.Status outcome;
private Throwable exception;
private Object[] assets;
private OperationImpl parent;

@Override
public Agent getAgent() {
Expand Down Expand Up @@ -150,25 +150,29 @@ public Operation fail(ContextScope scope, Object... assets) {
public void close() throws IOException {
// TODO commit or rollback based on status after success() or fail(). If none has been
// called, status is null and this is an internal error, logged with the activity
if (outcome == null) {
// Log an internal failure (no success or failure, should not happen)
transaction.rollback();
} else if (outcome == Scope.Status.FINISHED) {
transaction.commit();

if (assets != null) {
for (var asset : assets) {
if (asset instanceof Observation observation) {
update(observation, scope, "resolved", true, "coverage",
observation.getResolvedCoverage());
} else if (asset instanceof Dataflow<?> dataflow) {
storeDataflow(dataflow, scope, activity);

if (parent == null) {

if (outcome == null) {
// Log an internal failure (no success or failure, should not happen)
transaction.rollback();
} else if (outcome == Scope.Status.FINISHED) {
transaction.commit();

if (assets != null) {
for (var asset : assets) {
if (asset instanceof Observation observation) {
update(observation, scope, "resolved", true, "coverage",
observation.getResolvedCoverage());
} else if (asset instanceof Dataflow<?> dataflow) {
storeDataflow(dataflow, scope, activity);
}
}
}
}

} else if (outcome == Scope.Status.ABORTED) {
transaction.rollback();
} else if (outcome == Scope.Status.ABORTED) {
transaction.rollback();
}
}

update(this.activity, scope, "end", System.currentTimeMillis());
Expand All @@ -184,7 +188,6 @@ public Operation operation(Agent agent, Activity parentActivity, Activity.Type a
throw new KlabInternalErrorException("Knowledge graph operation: agent or activity is null");
}


// create and commit the activity record as a node, possibly linked to a parent
// activity.

Expand All @@ -206,17 +209,19 @@ public Operation operation(Agent agent, Activity parentActivity, Activity.Type a
for (var dat : data) {
if (dat instanceof String description) {
ret.activity.setDescription(description);
} /*else if (dat instanceof Observation) {
ret.activity.set
}*/
} else if (dat instanceof OperationImpl operation) {
ret.parent = operation;
}
}
}

KnowledgeGraphNeo4j.this.store(activity, scope);
KnowledgeGraphNeo4j.this.link(parentActivity, activity, DigitalTwin.Relationship.TRIGGERED, scope);

// open transaction
ret.transaction = driver.session().beginTransaction();
// open transaction if we are the root operation. We only commit within it.
ret.transaction = ret.parent == null
? driver.session().beginTransaction(TransactionConfig.builder().withTimeout(Duration.ZERO).build())
: ret.parent.transaction;

return ret;
}
Expand Down Expand Up @@ -275,36 +280,41 @@ protected void initializeContext() {

}

// /*
// establish IDs for the main nodes and create the respective RuntimeAssets
// */
// final var contextNodeId = getInternalId("MATCH (n:Context {id: $contextId}) return n.id", Map.of(
// "contextId", scope.getId()), scope);
// final var dataflowNodeId = getInternalId("MATCH (n:Dataflow {id: $contextId}) return n.id", Map.of(
// "contextId", scope.getId() + ".DATAFLOW"), scope);
// final var provenanceNodeId = getInternalId("MATCH (n:Provenance {id: $contextId}) return id(n)",
// Map.of(
// "contextId", scope.getId() + ".PROVENANCE"), scope);
//
// if (contextNodeId == Observation.UNASSIGNED_ID || provenanceNodeId == Observation.UNASSIGNED_ID || dataflowNodeId == Observation.UNASSIGNED_ID) {
// throw new KlabInternalErrorException("knowledge graph: contextual nodes are not present");
// }
// /*
// establish IDs for the main nodes and create the respective RuntimeAssets
// */
// final var contextNodeId = getInternalId("MATCH (n:Context {id: $contextId}) return n.id",
// Map.of(
// "contextId", scope.getId()), scope);
// final var dataflowNodeId = getInternalId("MATCH (n:Dataflow {id: $contextId}) return n
// .id", Map.of(
// "contextId", scope.getId() + ".DATAFLOW"), scope);
// final var provenanceNodeId = getInternalId("MATCH (n:Provenance {id: $contextId}) return
// id(n)",
// Map.of(
// "contextId", scope.getId() + ".PROVENANCE"), scope);
//
// if (contextNodeId == Observation.UNASSIGNED_ID || provenanceNodeId == Observation
// .UNASSIGNED_ID || dataflowNodeId == Observation.UNASSIGNED_ID) {
// throw new KlabInternalErrorException("knowledge graph: contextual nodes are not
// present");
// }

final var dataflowNodeId = nextKey();
final var provenanceNodeId = nextKey();

// this.contextNode = new RuntimeAsset() {
// @Override
// public long getId() {
// return contextNodeId;
// }
//
// @Override
// public Type classify() {
// // check - scope isn't a runtime asset
// return Type.ARTIFACT;
// }
// };
// this.contextNode = new RuntimeAsset() {
// @Override
// public long getId() {
// return contextNodeId;
// }
//
// @Override
// public Type classify() {
// // check - scope isn't a runtime asset
// return Type.ARTIFACT;
// }
// };

this.dataflowNode = new RuntimeAsset() {
@Override
Expand Down Expand Up @@ -348,21 +358,21 @@ public void deleteContext() {
query(Queries.REMOVE_CONTEXT, Map.of("contextId", scope.getId()), scope);
}

// /**
// * Return the internal long ID corresponding to the single result of a query
// *
// * @param query
// * @param parameters
// * @param scope
// * @return
// */
// protected Object getId(String query, Map<String, Object> parameters, Scope scope) {
// var result = query(query, parameters, scope);
// if (result != null && result.records().size() == 1) {
// return result.records().getFirst().get(result.keys().getFirst()).asLong();
// }
// return Observation.UNASSIGNED_ID;
// }
// /**
// * Return the internal long ID corresponding to the single result of a query
// *
// * @param query
// * @param parameters
// * @param scope
// * @return
// */
// protected Object getId(String query, Map<String, Object> parameters, Scope scope) {
// var result = query(query, parameters, scope);
// if (result != null && result.records().size() == 1) {
// return result.records().getFirst().get(result.keys().getFirst()).asLong();
// }
// return Observation.UNASSIGNED_ID;
// }

/**
* @param query
Expand Down Expand Up @@ -576,9 +586,9 @@ protected void link(Transaction transaction, RuntimeAsset source, RuntimeAsset d

private String matchAsset(RuntimeAsset asset, String name, String queryVariable) {
var ret = switch (asset) {
case Activity activity -> "id(" + name + ") = $" + queryVariable;
case Activity activity -> name + ".id = $" + queryVariable;
case Observation observation -> name + ".id = $" + queryVariable;
case Actuator actuator -> "id(" + name + ") = $" + queryVariable;
case Actuator actuator -> name + ".internalId = $" + queryVariable;
case Agent agent -> name + ".name = $" + queryVariable;
default -> null;
};
Expand Down Expand Up @@ -933,6 +943,7 @@ public void update(RuntimeAsset observation, ContextScope scope, Object... param
@Override
protected synchronized long nextKey() {
var ret = -1L;
var lastActivity = System.currentTimeMillis();
var result = query("MATCH (n:Statistics) return n.id", Map.of(), scope);
if (result != null) {

Expand All @@ -942,8 +953,9 @@ protected synchronized long nextKey() {
} else {
var id = result.records().getFirst().get(result.keys().getFirst()).asLong();
ret = id + 1;
query("MATCH (n:Statistics) WHERE n.id = $id SET n.id = $nextId", Map.of("id", id, "nextId"
, ret), scope);
query("MATCH (n:Statistics) WHERE n.id = $id SET n.id = $nextId, n.lastActivity = " +
"$lastActivity", Map.of("id", id, "nextId"
, ret, "lastActivity", lastActivity), scope);
}
}
return ret;
Expand Down

0 comments on commit 3a24fcb

Please sign in to comment.