Skip to content

Commit

Permalink
Almost there
Browse files Browse the repository at this point in the history
  • Loading branch information
fvilla committed Nov 5, 2024
1 parent 74ef367 commit c2d715b
Show file tree
Hide file tree
Showing 14 changed files with 484 additions and 268 deletions.
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
/*
* This file is part of k.LAB.
*
* k.LAB is free software: you can redistribute it and/or modify it under the terms of the Affero
* GNU General Public License as published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* A copy of the GNU Affero General Public License is distributed in the root directory of the k.LAB
* distribution (LICENSE.txt). If this cannot be found see <http://www.gnu.org/licenses/>.
*
* Copyright (C) 2007-2018 integratedmodelling.org and any authors mentioned in author tags. All
* rights reserved.
*/
package org.integratedmodelling.klab.api.identities;

import org.integratedmodelling.klab.api.scope.ContextScope;

/**
* The Interface ITaskIdentity.
*
* @author ferdinando.villa
* @version $Id: $Id
*/
public interface TaskIdentity extends RuntimeIdentity {

/** Constant <code>type</code> */
Type type = Type.TASK;

/**
* All tasks happen in a context, which may be null for root tasks (which create the root
* context).
*
* @return
*/
ContextScope getScope();
}
///*
// * This file is part of k.LAB.
// *
// * k.LAB is free software: you can redistribute it and/or modify it under the terms of the Affero
// * GNU General Public License as published by the Free Software Foundation, either version 3 of the
// * License, or (at your option) any later version.
// *
// * A copy of the GNU Affero General Public License is distributed in the root directory of the k.LAB
// * distribution (LICENSE.txt). If this cannot be found see <http://www.gnu.org/licenses/>.
// *
// * Copyright (C) 2007-2018 integratedmodelling.org and any authors mentioned in author tags. All
// * rights reserved.
// */
//package org.integratedmodelling.klab.api.identities;
//
//import org.integratedmodelling.klab.api.scope.ContextScope;
//
///**
// * The Interface ITaskIdentity.
// *
// * @author ferdinando.villa
// * @version $Id: $Id
// */
//public interface TaskIdentity extends RuntimeIdentity {
//
// /** Constant <code>type</code> */
// Type type = Type.TASK;
//
// /**
// * All tasks happen in a context, which may be null for root tasks (which create the root
// * context).
// *
// * @return
// */
// ContextScope getScope();
//}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.integratedmodelling.klab.api.services.resolver.ResolutionConstraint;
import org.integratedmodelling.klab.api.services.runtime.Dataflow;
import org.integratedmodelling.klab.api.services.runtime.Report;
import org.integratedmodelling.klab.api.services.runtime.Task;
import org.integratedmodelling.klab.api.utils.Utils;

import java.net.URL;
Expand Down Expand Up @@ -165,35 +164,21 @@ default Type getType() {
ContextScope connect(URL remoteContext);

/**
* Submit an observation to the digital twin for resolution. If no context observation or observer is
* present in the scope, the arguments must fully specify a non-collective <em>substantial</em> with its
* own geometry. Otherwise, the geometry of reference is the observation geometry of the observer if the
* observation is of a collective substantial, and that of the context observation, which must be present,
* if the observation is of a dependent. If the observation is a non-collective relationship, the scope
* must be the result of a {@link #between(Observation, Observation)} call to specify the source and
* target.
* <p>
* If the passed observation contains an observation geometry and the semantics is of an agent, the
* runtime will build an observation suitable to be an observer. At the API level, no automatic linking is
* done in the DT except for what the scope defines, so setting it as the observer for other observations
* must be done through the scope using the with- methods.
* <p>
* After this is called, resolution will started in the runtime service chosen at context creation, using
* any {@link ResolutionConstraint}s set into the scope. The ID tracked by the returned {@link Task} is
* the ID of the observation. If resolution fails, the task will fail and the observation will be returned
* in an unresolved state, recorded but invisible to the DT unless unresolved observations are queried.
* Unresolved observations are those that the resolver wasn't able to explain with a model: this only
* applies to dependents (qualities and processes) as substantials can be simply acknowledged as long as
* their scale is given. If any dependents are unresolved, the context will be in an inconsistent state.
* Submit an observation to the digital twin and start its resolution in this scope. Returns a future for
* the resolved (or unresolved in case of failure) observation. The {@link Observation#isResolved()}
* method should be checked after the future is complete. The scope will be notified of all events related
* to the resolution, with messages that will carry a task ID equal to the URN of the observation or
* derived from it so that what is happening can be reconstructed at the client side.
*
* @param observation an unresolved observation to be resolved by the runtime and added to the digital
* twin. The
* {@link
* org.integratedmodelling.klab.api.digitaltwin.DigitalTwin#createObservation(Scope,
* Object...)} method can be used to construct it from existing knowledge.
* @return a {@link Future} producing the resolved observation when resolution is finished.
* @return a {@link Future} producing the resolved observation when resolution is finished. If resolution
* has failed, the observation in the future will be unresolved.
*/
Task<Observation> observe(Observation observation);
Future<Observation> observe(Observation observation);

/**
* Return all observations affected by the passed one in this scope, either through model dependencies or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ default String getServiceName() {
* @param contextScope
* @return
*/
Coverage runDataflow(Dataflow<Observation> dataflow, ContextScope contextScope);
Observation runDataflow(Dataflow<Observation> dataflow, ContextScope contextScope);

/**
* Submit the ID of a valid observation to invoke the resolver, build a dataflow and run it to obtain the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
import org.integratedmodelling.klab.api.services.KlabService;
import org.integratedmodelling.klab.api.services.Reasoner;
import org.integratedmodelling.klab.api.services.resources.ResourceSet;
import org.integratedmodelling.klab.api.services.runtime.impl.MatchImpl;
import org.integratedmodelling.klab.api.services.runtime.impl.MessageImpl;
import org.integratedmodelling.klab.api.services.runtime.impl.ScopeOptions;

import java.io.Serializable;
import java.net.URI;
import java.util.Set;
import java.util.function.Consumer;

/**
Expand All @@ -33,8 +35,6 @@
*/
public interface Message extends Serializable {

public static Message NO_RESPONSE = null;

/**
* Different session/digital twin queues that a channel can subscribe to. The client should ask for the
* desired queues when creating a session or context scope, and verify what it got from the runtime after
Expand All @@ -56,20 +56,6 @@ enum Queue {
None
}

// @Deprecated
// enum ForwardingPolicy {
// /**
// * Message was created locally and will be forwarded to paired scopes
// */
// Forward,
// /**
// * Message was forwarded from a paired scope and will not be further forwarded. This should
// be the
// * default policy for newly created messages.
// */
// DoNotForward
// }

/**
* Message class. Ugly type name makes life easier.
* TODO add enumset of all acceptable messageTypes and validate messages
Expand Down Expand Up @@ -173,7 +159,6 @@ enum MessageClass {
private MessageClass(MessageType... messageTypes) {
this.messageTypes = messageTypes == null ? new MessageType[]{} : messageTypes;
}

}

/**
Expand Down Expand Up @@ -276,10 +261,58 @@ private MessageType(Queue queue, Class<?> payloadClass) {
this.queue = queue;
this.payloadClass = payloadClass;
}
}

/**
* Matcher that can be used to match messages and specify actions to be taken upon match. The details can
* be opaque: filtering conditions are specified in the match() function that produces it.
*/
interface Match {

/**
* This is called to ensure that the matcher remains active after the first match. The default is
* false.
*
* @param persistent
*/
Match persistent(boolean persistent);

/**
* Specify a message consumer to invoke when a matching message arrives. If persistency is false
* (default), the matcher is then removed from the
*
* @param consumer
* @return
*/
Match thenDo(Consumer<Message> consumer);

Set<MessageClass> getApplicableClasses();

Set<MessageType> getApplicableTypes();

Set<Queue> getApplicableQueues();

Consumer<Message> getMessageConsumer();

boolean isPersistent();

Object getPayloadMatch();
}

// ForwardingPolicy getForwardingPolicy();
/**
* Return a match for a message to which an action can be attached. The arguments are used to select the
* incoming message. This can be used in scopes to monitor queues.
*
* @param matchingArguments one or more {@link MessageType}, {@link MessageClass} (all are in OR) and/or
* payload to compare. A {@link Queue} can be passed to select a specific message
* queue if the MessageClass does not already resolve it. A
* <code>Predicate<Message></code> can also be passed although it will slow the
* matching down and shouldn't be the only criterion.
* @return a new Match object to use in any function that supports it.
*/
static Match match(Object... matchingArguments) {
return MatchImpl.create(matchingArguments);
}

/**
* Unique ID for each message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,24 @@ void connectToService(KlabService.ServiceCapabilities capabilities, UserIdentity
Consumer<Message> consumer);

/**
* naah
* Install one or more message matchers to react to messages received and sent through the managed
* queues.
*
* @param matchers
* @return
* @param <T>
*/
<T> Future<T> trackMessages(Function<Message, T>... matchers);
void trackMessages(Message.Match... matchers);

/**
* Install a message matcher and a supplier that turns the matching message into an object, and return a
* future for the supplied object.
*
* @param match
* @param supplier
* @param <T>
* @return
*/
<T> Future<T> trackMessages(Message.Match match, Function<Message, T> supplier);

/**
* True if {@link #connectToService(KlabService.ServiceCapabilities, UserIdentity, Consumer)} has been
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
package org.integratedmodelling.klab.api.services.runtime;

import org.integratedmodelling.klab.api.scope.ContextScope;

import java.util.concurrent.Future;

/**
* A task is a future for an object that exists in a
* {@link org.integratedmodelling.klab.api.scope.ContextScope}, exposes a tracking URN for the object of
* interest and admits k.LAB-aware listeners for any messages contextualized to it or any sub-tasks.
*
* @param <T>
*/
public interface Task<T> extends Future<T> {

ContextScope getScope();

String getUrn();

}
//package org.integratedmodelling.klab.api.services.runtime;
//
//import org.integratedmodelling.klab.api.scope.ContextScope;
//
//import java.util.concurrent.Future;
//
///**
// * A task is a future for an object that exists in a
// * {@link org.integratedmodelling.klab.api.scope.ContextScope}, exposes a tracking URN for the object of
// * interest and admits k.LAB-aware listeners for any messages contextualized to it or any sub-tasks.
// *
// * @param <T>
// */
//public interface Task<T> extends Future<T> {
//
// ContextScope getScope();
//
// String getUrn();
//
//}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.integratedmodelling.klab.api.services.runtime.impl;

import org.integratedmodelling.klab.api.services.runtime.Message;

import java.util.EnumSet;
import java.util.Set;
import java.util.function.Consumer;

public class MatchImpl implements Message.Match {

private Set<Message.MessageClass> applicableClasses = EnumSet.noneOf(Message.MessageClass.class);
private Set<Message.MessageType> applicableTypes = EnumSet.noneOf(Message.MessageType.class);
private Set<Message.Queue> applicableQueues = EnumSet.noneOf(Message.Queue.class);
private Consumer<Message> messageConsumer;
private Object payloadMatch;
boolean persistent = false;

@Override
public Message.Match persistent(boolean persistent) {
this.persistent = persistent;
return this;
}

@Override
public Message.Match thenDo(Consumer<Message> consumer) {
this.messageConsumer = consumer;
return this;
}

@Override
public Set<Message.MessageClass> getApplicableClasses() {
return applicableClasses;
}

@Override
public Set<Message.MessageType> getApplicableTypes() {
return applicableTypes;
}

@Override
public Set<Message.Queue> getApplicableQueues() {
return applicableQueues;
}

@Override
public Consumer<Message> getMessageConsumer() {
return messageConsumer;
}

@Override
public boolean isPersistent() {
return persistent;
}

@Override
public Object getPayloadMatch() {
return payloadMatch;
}

public static MatchImpl create(Object... args) {
var ret = new MatchImpl();
if (args != null) {
for (var arg : args) {
if (arg instanceof Message.MessageType type) {
ret.applicableTypes.add(type);
} else if (arg instanceof Message.MessageClass type) {
ret.applicableClasses.add(type);
} else if (arg instanceof Message.Queue queue) {
ret.applicableQueues.add(queue);
} else {
ret.payloadMatch = arg;
}
}
}
return ret;
}
}
Loading

0 comments on commit c2d715b

Please sign in to comment.