Skip to content

Commit

Permalink
Knowledge graph transactions almost OK
Browse files Browse the repository at this point in the history
  • Loading branch information
fvilla committed Nov 14, 2024
1 parent 74aac67 commit 4c39cf7
Show file tree
Hide file tree
Showing 6 changed files with 697 additions and 455 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ enum Relationship {
HAS_DATAFLOW,
HAS_PROVENANCE,
HAS_ACTIVITY,
HAS_CHILD
HAS_CHILD,
TRIGGERED;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ public <T> Collection<T> collect(Class<? extends T> cls) {

/**
* An agent must be defined in resolution constraints during resolution. This extracts it and returns it,
* also throwing a {@link org.integratedmodelling.klab.api.exceptions.KlabInternalErrorException} if the
* agent is not defined.
* If the agent is not defined, null is returned and the user agent should be assumed.
*
* @param scope
* @return
Expand All @@ -197,14 +196,12 @@ static Agent getAgent(ContextScope scope) {
}
}
}
throw new KlabInternalErrorException("Resolution constraint in scope " + scope.getId() + " do not " +
"contain an agent");
return null;
}

/**
* An activity must be defined in resolution constraints during resolution. This extracts it and returns
* it, also throwing a {@link org.integratedmodelling.klab.api.exceptions.KlabInternalErrorException} if
* the activity is not defined.
* it. If no activity is defined, a default activity can be created according to context.
*
* @param scope
* @return
Expand All @@ -219,8 +216,7 @@ static Activity getActivity(ContextScope scope) {
}
}
}
throw new KlabInternalErrorException("Resolution constraint in scope " + scope.getId() + " do not " +
"contain an agent");
return null;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,13 @@ public long submit(Observation observation, ContextScope scope) {

if (scope instanceof ServiceContextScope serviceContextScope) {
var digitalTwin = getDigitalTwin(scope);
var parentActivity = getInstantiationActivity(observation, scope);
var parentActivity = getInitializationActivity(observation, scope);
var agent = getAgent(scope);

/**
* The initial activity should be in the scope; if not, we're observing at the
* root DT level and we get the context initialization activity as parent.
*/
var instantiation = digitalTwin.knowledgeGraph().operation(agent, parentActivity,
Activity.Type.INSTANTIATION, observation);

Expand All @@ -307,16 +312,29 @@ public long submit(Observation observation, ContextScope scope) {
}

private Agent getAgent(ContextScope scope) {
// TODO
return null;

var ret = Provenance.getAgent(scope);
if (ret != null) {
return ret;
}
if (scope instanceof ServiceContextScope serviceContextScope) {
// assume the user is the agent
return serviceContextScope.getDigitalTwin().knowledgeGraph().user();
}
throw new KlabIllegalStateException("Cannot determine the requesting agent from scope");
}

private Activity getInstantiationActivity(Observation observation, ContextScope scope) {
/**
* TODO THE SCOPE NEEDS TO KNOW WHO AND WHY THIS IS BEING CREATED - CHECK PREVIOUS CODE
* Scope should contain the task ID that identifies the activity being run.
*/
return null;
private Activity getInitializationActivity(Observation observation, ContextScope scope) {
var ret = Provenance.getActivity(scope);
if (ret != null) {
return ret;
}
var activities = getDigitalTwin(scope).knowledgeGraph().get(scope, Activity.class,
Activity.Type.INITIALIZATION);
if (activities.size() == 1) {
return activities.getFirst();
}
throw new KlabInternalErrorException("cannot locate the context initialization activity");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;

public abstract class AbstractKnowledgeGraph implements KnowledgeGraph {


protected static int MAX_CACHED_OBSERVATIONS = 400;

protected ContextScope scope;
Expand All @@ -39,102 +39,7 @@ public RuntimeAsset load(Long key) throws Exception {
}
});

/**
* A provenance-linked "transaction" that can be committed or rolled back by reporting failure or success.
* The related activity becomes part of the graph in any case and success/failure is recorded with it.
* Everything else stored or linked is rolled back in case of failure.
*/
public class OperationImpl implements Operation {

private ActivityImpl activity;
private AgentImpl agent;
// FIXME ACH no this must be in the Neo4j implementation
private Transaction transaction;
private Scope.Status outcome;
private Throwable exception;

/**
* What should be passed: an agent that will own the activity; the current context scope for graph
* operations; the activity type
* <p>
* What can be passed: another operation so that its activity becomes the parent of the one we create
* here AND the transaction isn't finished upon execution; content for the activity such as
* description
* <p>
* The store/link methods use the same on the database, under the transaction we opened.
* <p>
* Each ExecutorOperation must include a previously built Operation; only the wrapping one should
* commit/rollback.
*
* @param arguments
*/
public OperationImpl(Object... arguments) {

// select arguments and put them where they belong

// validate arguments and complain loudly if anything is missing. Must have agent and activity

// create and commit the activity record as a node, possibly linked to a parent
// activity.

// open the transaction for the remaining operations
}

@Override
public Agent getAgent() {
return this.agent;
}

@Override
public Activity getActivity() {
return this.activity;
}

@Override
public long store(RuntimeAsset asset, Object... additionalProperties) {
// FIXME NO use transactional version!
return AbstractKnowledgeGraph.this.store(asset, scope, additionalProperties);
}

@Override
public void link(RuntimeAsset source, RuntimeAsset destination,
DigitalTwin.Relationship relationship, Object... additionalProperties) {
// FIXME NO use transactional version!
AbstractKnowledgeGraph.this.link(source, destination, relationship, scope,
additionalProperties);
}

@Override
public Operation success(ContextScope scope, Object... assets) {
this.outcome = Scope.Status.FINISHED;
// updates as needed (activity end, observation resolved if type == resolution, context timestamp
return this;
}

@Override
public Operation fail(ContextScope scope, Object... assets) {
// rollback; update activity end and context timestamp only, if we have an error or throwable
// update activity
this.outcome = Scope.Status.ABORTED;
return null;
}

@Override
public void close() throws IOException {
// TODO commit or rollback based on status after success() or fail(). If none has been
// called, status is null and this is an internal error, logged with the activity
if (outcome == null) {
// Log an internal failure (no success or failure, should not happen)
transaction.rollback();
} else if (outcome == Scope.Status.FINISHED) {
transaction.commit();
} else if (outcome == Scope.Status.ABORTED) {
transaction.rollback();
}
}
}

protected abstract RuntimeAsset getContextNode();
// protected abstract RuntimeAsset getContextNode();

/**
* Return a RuntimeAsset representing the overall dataflow related to the scope, so that it can be used
Expand All @@ -146,6 +51,8 @@ public void close() throws IOException {
*/
protected abstract RuntimeAsset getDataflowNode();

protected abstract long nextKey();

/**
* Return a RuntimeAsset representing the overall provenance related to the scope, so that it can be used
* for linking using the other CRUD methods.
Expand All @@ -164,7 +71,7 @@ public void close() throws IOException {
* @param <T>
* @return
*/
protected abstract <T extends RuntimeAsset> T retrieve(long key, Class<T> assetClass, Scope scope);
protected abstract <T extends RuntimeAsset> T retrieve(Object key, Class<T> assetClass, Scope scope);

/**
* Store the passed asset, return its unique long ID.
Expand Down Expand Up @@ -261,9 +168,4 @@ protected Map<String, Object> asParameters(Object asset, Object... additionalPar
return ret;
}

@Override
public Operation operation(Agent agent, Activity parentActivity, Activity.Type activityType,
Object... data) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,18 @@
* A local, embedded, persistent k.LAB-instrumented, configurable Neo4j database. To work with the f'ing
* community edition the database must be a singleton within the service, containing data for all contexts.
*/
public class KnowledgeGraphNeo4JEmbedded extends KnowledgeGraphNeo4j implements KnowledgeGraph {
public class KnowledgeGraphNeo4JEmbedded extends KnowledgeGraphNeo4j implements KnowledgeGraph {

private static final String DEFAULT_DATABASE_NAME = "klab";
private DatabaseManagementService managementService;
private GraphDatabaseService graphDb;
// private SessionFactory sessionFactory;
private boolean online = true;
// we use a session per context in normal usage, established through contextualize() which also sets up
// the root nodes for the context
// private Session contextSession_ = null;
// private GraphMapping.ContextMapping rootNode;
// private Driver driver;

private KnowledgeGraphNeo4JEmbedded(KnowledgeGraphNeo4JEmbedded parent, ContextScope scope) {
this.managementService = parent.managementService;
this.graphDb = parent.graphDb;
// this.sessionFactory = parent.sessionFactory;
this.online = parent.online;
// if (this.online) {
// this.contextSession_ = sessionFactory.openSession();
this.scope = scope;
// }
this.scope = scope;
this.driver = parent.driver;
}

Expand All @@ -69,12 +59,6 @@ public KnowledgeGraphNeo4JEmbedded(Path directory) {

this.graphDb = managementService.database(DEFAULT_DATABASE_NAME);

// this.sessionFactory = new SessionFactory(
// new Configuration.Builder()
// .encryptionLevel("DISABLED")
// .uri("bolt://localhost:7687").build(),
// this.getClass().getPackageName());

// TODO this could just reimplement query() to use the DB directly and not expose the
// connectors, losing debugging access outside the application
this.driver = GraphDatabase.driver("bolt://localhost:7687");
Expand Down Expand Up @@ -102,16 +86,16 @@ private void configureDatabase() {

// TODO all the needed indices

// IndexDefinition usernamesIndex;
// try ( Transaction tx = graphDb.beginTx() )
// {
// Schema schema = tx.schema();
// usernamesIndex = schema.indexFor(Label.label( "User" ) )
// .on( "username" )
// .withName( "usernames" )
// .create();
// tx.commit();
// }
// IndexDefinition usernamesIndex;
// try ( Transaction tx = graphDb.beginTx() )
// {
// Schema schema = tx.schema();
// usernamesIndex = schema.indexFor(Label.label( "User" ) )
// .on( "username" )
// .withName( "usernames" )
// .create();
// tx.commit();
// }
}

@Override
Expand All @@ -129,15 +113,15 @@ public KnowledgeGraph contextualize(ContextScope scope) {
}

var ret = new KnowledgeGraphNeo4JEmbedded(this, scope);
// ret.rootNode = node;

ret.initializeContext();

return ret;
}

@Override
public <T extends RuntimeAsset> List<T> get(RuntimeAsset source, DigitalTwin.Relationship linkType, Class<T> resultClass) {
public <T extends RuntimeAsset> List<T> get(RuntimeAsset source, DigitalTwin.Relationship linkType,
Class<T> resultClass) {
return List.of();
}

Expand Down
Loading

0 comments on commit 4c39cf7

Please sign in to comment.