Skip to content

Commit

Permalink
BoskDiagnosticContext
Browse files Browse the repository at this point in the history
  • Loading branch information
prdoyle committed Oct 5, 2023
1 parent ebd4fcf commit 016897c
Show file tree
Hide file tree
Showing 12 changed files with 375 additions and 49 deletions.
24 changes: 19 additions & 5 deletions bosk-core/src/main/java/io/vena/bosk/Bosk.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.vena.bosk;

import io.vena.bosk.BoskDiagnosticContext.DiagnosticScope;
import io.vena.bosk.ReferenceUtils.CatalogRef;
import io.vena.bosk.ReferenceUtils.ListingRef;
import io.vena.bosk.ReferenceUtils.SideTableRef;
Expand All @@ -25,6 +26,7 @@
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.var;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -76,6 +78,7 @@ public class Bosk<R extends StateTreeNode> {
@Getter private final String name;
@Getter private final Identifier instanceID = Identifier.from(randomUUID().toString());
@Getter private final BoskDriver<R> driver;
@Getter private final BoskDiagnosticContext diagnosticContext = new BoskDiagnosticContext();
private final LocalDriver localDriver;
private final RootRef rootRef;
private final ThreadLocal<R> rootSnapshot = new ThreadLocal<>();
Expand Down Expand Up @@ -361,14 +364,20 @@ private <T> void queueHooks(Reference<T> target, @Nullable R priorRoot) {
* on every matching object that exists in <code>rootForHook</code>.
*/
private <T,S> void triggerQueueingOfHooks(Reference<T> target, @Nullable R priorRoot, R rootForHook, HookRegistration<S> reg) {
MapValue<String> attributes = diagnosticContext.getAttributes();
reg.triggerAction(priorRoot, rootForHook, target, changedRef -> {
LOGGER.debug("Hook: queue {}({}) due to {}", reg.name, changedRef, target);
hookExecutionQueue.addLast(() -> {
try (@SuppressWarnings("unused") ReadContext executionContext = new ReadContext(rootForHook)) {
LOGGER.debug("Hook: RUN {}({})", reg.name, changedRef);
reg.hook.onChanged(changedRef);
} finally {
LOGGER.debug("Hook: end {}({})", reg.name, changedRef);
// We use two nested try statements here so that the "finally" clause runs within the diagnostic scope
try(
@SuppressWarnings("unused") DiagnosticScope foo = diagnosticContext.withOnly(attributes)
) {
try (@SuppressWarnings("unused") ReadContext executionContext = new ReadContext(rootForHook)) {
LOGGER.debug("Hook: RUN {}({})", reg.name, changedRef);
reg.hook.onChanged(changedRef);
} finally {
LOGGER.debug("Hook: end {}({})", reg.name, changedRef);
}
}
});
});
Expand Down Expand Up @@ -817,6 +826,11 @@ public <TT> Reference<Reference<TT>> thenReference(Class<TT> targetClass, Path p
return this.then(Classes.reference(targetClass), path);
}

@Override
public BoskDiagnosticContext diagnosticContext() {
return diagnosticContext;
}

@Override
public <T> T buildReferences(Class<T> refsClass) throws InvalidTypeException {
return ReferenceBuilder.buildReferences(refsClass, Bosk.this);
Expand Down
71 changes: 71 additions & 0 deletions bosk-core/src/main/java/io/vena/bosk/BoskDiagnosticContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.vena.bosk;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/**
* A thread-local set of name-value pairs that propagate all the way from
* submission of a driver update, through all the driver layers,
* to the execution of hooks.
*/
public final class BoskDiagnosticContext {
private final ThreadLocal<MapValue<String>> currentAttributes = ThreadLocal.withInitial(MapValue::empty);

public final class DiagnosticScope implements AutoCloseable {
final MapValue<String> oldAttributes = currentAttributes.get();

DiagnosticScope(MapValue<String> attributes) {
currentAttributes.set(attributes);
}

@Override
public void close() {
currentAttributes.set(oldAttributes);
}
}

/**
* @return the current thread's value of the attribute with the given <code>name</code>,
* or <code>null</code> if no such attribute has been defined.
*/
public @Nullable String getAttribute(String name) {
return currentAttributes.get().get(name);
}

public @NotNull MapValue<String> getAttributes() {
return currentAttributes.get();
}

/**
* Adds a single attribute to the current thread's diagnostic context.
* If the attribute already exists, it will be replaced.
*/
public DiagnosticScope withAttribute(String name, String value) {
return new DiagnosticScope(currentAttributes.get().with(name, value));
}

/**
* Adds attributes to the current thread's diagnostic context.
* If an attribute already exists, it will be replaced.
*/
public DiagnosticScope withAttributes(@NotNull MapValue<String> additionalAttributes) {
return new DiagnosticScope(currentAttributes.get().withAll(additionalAttributes));
}

/**
* Replaces all attributes in the current thread's diagnostic context.
* Existing attributes are removed/replaced.
* <p>
* This is intended for propagating context from one thread to another.
* <p>
* If <code>attributes</code> is null, this is a no-op, and any existing attributes on this thread are retained.
* If ensuring a clean set of attributes is important, pass an empty map instead of null.
*/
public DiagnosticScope withOnly(@Nullable MapValue<String> attributes) {
if (attributes == null) {
return new DiagnosticScope(currentAttributes.get());
} else {
return new DiagnosticScope(attributes);
}
}
}
4 changes: 4 additions & 0 deletions bosk-core/src/main/java/io/vena/bosk/MapValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public MapValue<V> without(String name) {
}
}

public MapValue<V> withAll(Map<String, ? extends V> m) {
return new MapValue<>(contents.plusAll(m));
}

@Override
public String toString() {
return contents.toString();
Expand Down
1 change: 1 addition & 0 deletions bosk-core/src/main/java/io/vena/bosk/RootReference.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ public interface RootReference<R> extends Reference<R> {
<K extends Entity,V> SideTableReference<K,V> thenSideTable(Class<K> keyClass, Class<V> valueClass, Path path) throws InvalidTypeException;
<T> Reference<Reference<T>> thenReference(Class<T> targetClass, Path path) throws InvalidTypeException;

BoskDiagnosticContext diagnosticContext();
}
34 changes: 23 additions & 11 deletions bosk-core/src/main/java/io/vena/bosk/drivers/BufferingDriver.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.vena.bosk.drivers;

import io.vena.bosk.BoskDiagnosticContext;
import io.vena.bosk.BoskDriver;
import io.vena.bosk.Identifier;
import io.vena.bosk.MapValue;
import io.vena.bosk.Reference;
import io.vena.bosk.StateTreeNode;
import io.vena.bosk.exceptions.InvalidTypeException;
Expand All @@ -11,6 +13,7 @@
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.Consumer;
import lombok.RequiredArgsConstructor;
import lombok.var;

import static lombok.AccessLevel.PROTECTED;

Expand Down Expand Up @@ -44,17 +47,27 @@ public R initialRoot(Type rootType) throws InvalidTypeException, IOException, In

@Override
public <T> void submitReplacement(Reference<T> target, T newValue) {
updateQueue.add(d -> d.submitReplacement(target, newValue));
enqueue(d -> d.submitReplacement(target, newValue), target.root().diagnosticContext());
}

@Override
public <T> void submitInitialization(Reference<T> target, T newValue) {
updateQueue.add(d -> d.submitInitialization(target, newValue));
enqueue(d -> d.submitInitialization(target, newValue), target.root().diagnosticContext());
}

@Override
public <T> void submitDeletion(Reference<T> target) {
updateQueue.add(d -> d.submitDeletion(target));
enqueue(d -> d.submitDeletion(target), target.root().diagnosticContext());
}

@Override
public <T> void submitConditionalReplacement(Reference<T> target, T newValue, Reference<Identifier> precondition, Identifier requiredValue) {
enqueue(d -> d.submitConditionalReplacement(target, newValue, precondition, requiredValue), target.root().diagnosticContext());
}

@Override
public <T> void submitConditionalDeletion(Reference<T> target, Reference<Identifier> precondition, Identifier requiredValue) {
enqueue(d -> d.submitConditionalDeletion(target, precondition, requiredValue), target.root().diagnosticContext());
}

@Override
Expand All @@ -65,14 +78,13 @@ public void flush() throws InterruptedException, IOException {
downstream.flush();
}

@Override
public <T> void submitConditionalReplacement(Reference<T> target, T newValue, Reference<Identifier> precondition, Identifier requiredValue) {
updateQueue.add(d -> d.submitConditionalReplacement(target, newValue, precondition, requiredValue));
}

@Override
public <T> void submitConditionalDeletion(Reference<T> target, Reference<Identifier> precondition, Identifier requiredValue) {
updateQueue.add(d -> d.submitConditionalDeletion(target, precondition, requiredValue));
private void enqueue(Consumer<BoskDriver<R>> action, BoskDiagnosticContext diagnosticContext) {
MapValue<String> capturedAttributes = diagnosticContext.getAttributes();
updateQueue.add(d -> {
try (var __ = diagnosticContext.withOnly(capturedAttributes)) {
action.accept(d);
}
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.vena.bosk;

import io.vena.bosk.annotations.ReferencePath;
import io.vena.bosk.drivers.AbstractDriverTest;
import io.vena.bosk.drivers.state.TestEntity;
import io.vena.bosk.exceptions.InvalidTypeException;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.var;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Note that context propagation for driver operations is tested by {@link io.vena.bosk.drivers.DriverConformanceTest}.
*/
class BoskDiagnosticContextTest extends AbstractDriverTest {
Refs refs;

public interface Refs {
@ReferencePath("/string") Reference<String> string();
}

@BeforeEach
void setupBosk() throws InvalidTypeException {
bosk = new Bosk<TestEntity>(
BoskDiagnosticContextTest.class.getSimpleName(),
TestEntity.class,
AbstractDriverTest::initialRoot,
Bosk::simpleDriver
);
refs = bosk.buildReferences(Refs.class);
}

@Test
void hookRegistration_propagatesDiagnosticContext() throws IOException, InterruptedException {
Semaphore diagnosticsVerified = new Semaphore(0);
bosk.driver().flush();
try (var __ = bosk.diagnosticContext().withAttribute("attributeName", "attributeValue")) {
bosk.registerHook("contextPropagatesToHook", bosk.rootReference(), ref -> {
assertEquals("attributeValue", bosk.diagnosticContext().getAttribute("attributeName"));
assertEquals(MapValue.singleton("attributeName", "attributeValue"), bosk.diagnosticContext().getAttributes());
diagnosticsVerified.release();
});
}
bosk.driver().flush();
assertTrue(diagnosticsVerified.tryAcquire(5, SECONDS));
}

}
49 changes: 49 additions & 0 deletions bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/Formatter.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.mongodb.client.model.changestream.UpdateDescription;
import io.vena.bosk.Bosk;
import io.vena.bosk.Listing;
import io.vena.bosk.MapValue;
import io.vena.bosk.Reference;
import io.vena.bosk.SerializationPlugin;
import io.vena.bosk.SideTable;
Expand All @@ -17,6 +18,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.function.Function;
Expand All @@ -28,6 +30,7 @@
import org.bson.BsonInt32;
import org.bson.BsonInt64;
import org.bson.BsonReader;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.codecs.Codec;
import org.bson.codecs.DecoderContext;
Expand Down Expand Up @@ -94,6 +97,7 @@ enum DocumentFields {
path,
state,
revision,
diagnostics,
}

private final BsonInt32 SUPPORTED_MANIFEST_VERSION = new BsonInt32(1);
Expand Down Expand Up @@ -170,6 +174,22 @@ Manifest decodeManifest(BsonDocument manifestDoc) throws UnrecognizedFormatExcep
DecoderContext.builder().build());
}

BsonDocument encodeDiagnostics(MapValue<String> attributes) {
BsonDocument result = new BsonDocument();
attributes.forEach((name, value) -> result.put(name, new BsonString(value)));
return new BsonDocument("attributes", result);
}

MapValue<String> decodeDiagnosticAttributes(BsonDocument diagnostics) {
MapValue<String> result = MapValue.empty();
for (Map.Entry<String, BsonValue> foo: diagnostics.getDocument("attributes").entrySet()) {
String name = foo.getKey();
String value = foo.getValue().asString().getValue();
result = result.with(name, value);
}
return result;
}

/**
* @see #bsonValue2object(BsonValue, Reference)
*/
Expand Down Expand Up @@ -213,6 +233,17 @@ BsonInt64 getRevisionFromFullDocument(BsonDocument fullDocument) {
return fullDocument.getInt64(DocumentFields.revision.name(), null);
}

MapValue<String> getDiagnosticAttributesFromFullDocument(BsonDocument fullDocument) {
if (fullDocument == null) {
return null;
}
BsonDocument diagnostics = fullDocument.getDocument(DocumentFields.diagnostics.name(), null);
if (diagnostics == null) {
return null;
}
return decodeDiagnosticAttributes(diagnostics);
}

BsonInt64 getRevisionFromUpdateEvent(ChangeStreamDocument<BsonDocument> event) {
if (event == null) {
return null;
Expand All @@ -228,6 +259,24 @@ BsonInt64 getRevisionFromUpdateEvent(ChangeStreamDocument<BsonDocument> event) {
return updatedFields.getInt64(DocumentFields.revision.name(), null);
}

MapValue<String> getDiagnosticAttributesFromUpdateEvent(ChangeStreamDocument<BsonDocument> event) {
if (event == null) {
return null;
}
UpdateDescription updateDescription = event.getUpdateDescription();
if (updateDescription == null) {
return null;
}
BsonDocument updatedFields = updateDescription.getUpdatedFields();
if (updatedFields == null) {
return null;
}
BsonDocument diagnostics = updatedFields.getDocument(DocumentFields.diagnostics.name(), null);
if (diagnostics == null) {
return null;
}
return decodeDiagnosticAttributes(diagnostics);
}

/**
* @return MongoDB field name corresponding to the given Reference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private R doInitialRoot(Type rootType) {
root = callDownstreamInitialRoot(rootType);
try (var session = collection.newSession()) {
FormatDriver<R> preferredDriver = newPreferredFormatDriver();
preferredDriver.initializeCollection(new StateAndMetadata<>(root, REVISION_ZERO));
preferredDriver.initializeCollection(new StateAndMetadata<>(root, REVISION_ZERO, bosk.diagnosticContext().getAttributes()));
preferredDriver.onRevisionToSkip(REVISION_ONE); // initialRoot handles REVISION_ONE; downstream only needs to know about changes after that
session.commitTransactionIfAny();
// We can now publish the driver knowing that the transaction, if there is one, has committed
Expand Down Expand Up @@ -333,7 +333,13 @@ public void onConnectionSucceeded() throws
LOGGER.debug("Loading database state to submit to downstream driver");
FormatDriver<R> newDriver = detectFormat();
StateAndMetadata<R> loadedState = newDriver.loadAllState();
downstream.submitReplacement(bosk.rootReference(), loadedState.state);
// TODO: It's not clear we actually want loadedState.diagnosticAttributes here.
// This causes downstream.submitReplacement to be associated with the last update to the state,
// which is of dubious relevance. We might just want to use the context from the current thread,
// which is probably empty because this runs on the ChangeReceiver thread.
try (var ___ = bosk.rootReference().diagnosticContext().withOnly(loadedState.diagnosticAttributes)) {
downstream.submitReplacement(bosk.rootReference(), loadedState.state);
}
newDriver.onRevisionToSkip(loadedState.revision);
publishFormatDriver(newDriver);
}
Expand Down
Loading

0 comments on commit 016897c

Please sign in to comment.