Skip to content

Commit

Permalink
AsyncDriver
Browse files Browse the repository at this point in the history
  • Loading branch information
prdoyle committed Jan 21, 2024
1 parent b636b3b commit 9af0f65
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.vena.bosk.BoskDiagnosticContext;
import io.vena.bosk.BoskDriver;
import io.vena.bosk.DriverFactory;
import io.vena.bosk.Identifier;
import io.vena.bosk.MapValue;
import io.vena.bosk.Reference;
Expand Down Expand Up @@ -39,6 +40,10 @@ public static <RR extends StateTreeNode> BufferingDriver<RR> writingTo(BoskDrive
return new BufferingDriver<>(downstream);
}

public static <RR extends StateTreeNode> DriverFactory<RR> factory() {
return (b,d) -> new BufferingDriver<>(d);
}

@Override
public R initialRoot(Type rootType) throws InvalidTypeException, IOException, InterruptedException {
return downstream.initialRoot(rootType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.vena.bosk.drivers.AbstractDriverTest;
import io.vena.bosk.drivers.state.TestEntity;
import io.vena.bosk.exceptions.InvalidTypeException;
import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -48,7 +49,7 @@ public interface Refs {
}

@BeforeEach
void setup() throws InvalidTypeException {
void setup() throws InvalidTypeException, IOException, InterruptedException {
setupBosksAndReferences(Bosk::simpleDriver);
bsonPlugin = new BsonPlugin();
formatter = new Formatter(bosk, bsonPlugin);
Expand All @@ -69,6 +70,7 @@ void setup() throws InvalidTypeException {
makeCatalog(refs.doubleNestedCatalog().boundTo(Identifier.from("entity1"), Identifier.from("child1")));
driver.submitReplacement(sideTableRef.then(Identifier.from("child1")),
TestEntity.empty(Identifier.from("sideTableValue"), catalogRef));
driver.flush();
surgeon = new BsonSurgeon(graftPoints);
}

Expand Down
82 changes: 82 additions & 0 deletions bosk-testing/src/main/java/io/vena/bosk/drivers/AsyncDriver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package io.vena.bosk.drivers;

import io.vena.bosk.Bosk;
import io.vena.bosk.BoskDriver;
import io.vena.bosk.DriverFactory;
import io.vena.bosk.Identifier;
import io.vena.bosk.Reference;
import io.vena.bosk.StateTreeNode;
import io.vena.bosk.exceptions.InvalidTypeException;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static lombok.AccessLevel.PRIVATE;

@RequiredArgsConstructor(access = PRIVATE)
public class AsyncDriver<R extends StateTreeNode> implements BoskDriver<R> {
private final Bosk<R> bosk;
private final BoskDriver<R> downstream;
private final ExecutorService executor = Executors.newSingleThreadExecutor();

public static <RR extends StateTreeNode> DriverFactory<RR> factory() {
return AsyncDriver::new;
}

@Override
public R initialRoot(Type rootType) throws InvalidTypeException, IOException, InterruptedException {
return downstream.initialRoot(rootType);
}

@Override
public <T> void submitReplacement(Reference<T> target, T newValue) {
submitAsyncTask("submitReplacement", () -> downstream.submitReplacement(target, newValue));
}

@Override
public <T> void submitConditionalReplacement(Reference<T> target, T newValue, Reference<Identifier> precondition, Identifier requiredValue) {
submitAsyncTask("submitConditionalReplacement", () -> downstream.submitConditionalReplacement(target, newValue, precondition, requiredValue));
}

@Override
public <T> void submitInitialization(Reference<T> target, T newValue) {
submitAsyncTask("submitInitialization", () -> downstream.submitInitialization(target, newValue));
}

@Override
public <T> void submitDeletion(Reference<T> target) {
submitAsyncTask("submitDeletion", () -> downstream.submitDeletion(target));
}

@Override
public <T> void submitConditionalDeletion(Reference<T> target, Reference<Identifier> precondition, Identifier requiredValue) {
submitAsyncTask("submitConditionalDeletion", () -> downstream.submitConditionalDeletion(target, precondition, requiredValue));
}

@Override
public void flush() throws IOException, InterruptedException {
Semaphore semaphore = new Semaphore(0);
submitAsyncTask("flush", semaphore::release);
semaphore.acquire();
downstream.flush();
}

private void submitAsyncTask(String description, Runnable task) {
LOGGER.debug("Submit {}", description);
var diagnosticAttributes = bosk.diagnosticContext().getAttributes();
executor.submit(()->{
LOGGER.debug("Run {}", description);
try (var __ = bosk.diagnosticContext().withOnly(diagnosticAttributes)) {
task.run();
}
LOGGER.trace("Done {}", description);
});
}

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncDriver.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public static <RR extends StateTreeNode> DriverFactory<RR> wrap(DriverFactory<RR
DiagnosticScopeDriver.factory(dc -> dc.withAttribute(THREAD_NAME, currentThread().getName())),
ReportingDriver.factory(verifier::incomingUpdate, verifier::incomingFlush),
subject,
BufferingDriver.factory(), // This catches missing flush operations
ReportingDriver.factory(verifier::outgoingUpdate, verifier::outgoingFlush)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.vena.bosk.drivers;

import org.junit.jupiter.api.BeforeEach;

public class AsyncDriverConformanceTest extends DriverConformanceTest {

@BeforeEach
void setupDriverFactory() {
driverFactory = AsyncDriver.factory();
}

}

0 comments on commit 9af0f65

Please sign in to comment.