From 6f3dc308ca9393c4c4ddf224c278ce0a271fe08b Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Thu, 20 Jun 2024 20:27:18 -0400 Subject: [PATCH] POC OTel context propagation working --- .../io/vena/bosk/BoskDiagnosticContext.java | 55 +++++++++++++ .../bosk/drivers/OtelSpanContextDriver.java | 79 +++++++++++++++++++ .../drivers/mongo/MongoDriverHanoiTest.java | 21 +++-- .../java/io/vena/bosk/drivers/HanoiTest.java | 2 + 4 files changed, 149 insertions(+), 8 deletions(-) create mode 100644 bosk-core/src/main/java/io/vena/bosk/drivers/OtelSpanContextDriver.java diff --git a/bosk-core/src/main/java/io/vena/bosk/BoskDiagnosticContext.java b/bosk-core/src/main/java/io/vena/bosk/BoskDiagnosticContext.java index 6358bf38..edfcc636 100644 --- a/bosk-core/src/main/java/io/vena/bosk/BoskDiagnosticContext.java +++ b/bosk-core/src/main/java/io/vena/bosk/BoskDiagnosticContext.java @@ -1,8 +1,19 @@ package io.vena.bosk; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.TextMapPropagator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static java.util.stream.Collectors.toSet; + /** * A thread-local set of name-value pairs that propagate all the way from * submission of a driver update, through all the driver layers, @@ -13,13 +24,20 @@ public final class BoskDiagnosticContext { public final class DiagnosticScope implements AutoCloseable { final MapValue oldAttributes = currentAttributes.get(); + final Scope otelScope; DiagnosticScope(MapValue attributes) { currentAttributes.set(attributes); + otelScope = PROPAGATORS.getTextMapPropagator().extract( + Context.current(), + attributes, + DIAGNOSTIC_ATTRIBUTE_GETTER + ).makeCurrent(); } @Override public void close() { + otelScope.close(); currentAttributes.set(oldAttributes); } } @@ -68,4 +86,41 @@ public DiagnosticScope withOnly(@Nullable MapValue attributes) { return new DiagnosticScope(attributes); } } + + public DiagnosticScope withCurrentOtelContext() { + Map carrier = new LinkedHashMap<>(); + TextMapPropagator textMapPropagator = PROPAGATORS + .getTextMapPropagator(); + textMapPropagator + .inject( + Context.current(), + carrier, + (map, key, value) -> { + if (map != null) { + map.put(OTEL_PREFIX + key, value); + } + }); + + return withAttributes(MapValue.fromOrderedMap(carrier)); + } + + private static final ContextPropagators PROPAGATORS = GlobalOpenTelemetry.getPropagators(); + + private static final TextMapGetter> DIAGNOSTIC_ATTRIBUTE_GETTER = + new TextMapGetter<>() { + @Override + public Set keys(MapValue carrier) { + return carrier.keySet().stream() + .filter(k -> k.startsWith(OTEL_PREFIX)) + .map(k -> k.substring(OTEL_PREFIX.length())) + .collect(toSet()); + } + + @Override + public String get(MapValue carrier, String key) { + return carrier == null ? null : carrier.get(OTEL_PREFIX + key); + } + }; + + public static final String OTEL_PREFIX = "otel."; } diff --git a/bosk-core/src/main/java/io/vena/bosk/drivers/OtelSpanContextDriver.java b/bosk-core/src/main/java/io/vena/bosk/drivers/OtelSpanContextDriver.java new file mode 100644 index 00000000..f3f6c03f --- /dev/null +++ b/bosk-core/src/main/java/io/vena/bosk/drivers/OtelSpanContextDriver.java @@ -0,0 +1,79 @@ +package io.vena.bosk.drivers; + +import io.opentelemetry.api.trace.SpanContext; +import io.vena.bosk.BoskDiagnosticContext; +import io.vena.bosk.BoskDriver; +import io.vena.bosk.DriverFactory; +import io.vena.bosk.Identifier; +import io.vena.bosk.Reference; +import io.vena.bosk.StateTreeNode; +import io.vena.bosk.exceptions.InvalidTypeException; +import java.io.IOException; +import java.lang.reflect.Type; + +/** + * Adds OpenTelemetry {@link SpanContext} info to the {@link BoskDiagnosticContext} + * automatically on each method call. + */ +public class OtelSpanContextDriver implements BoskDriver { + private final BoskDiagnosticContext context; + private final BoskDriver downstream; + + OtelSpanContextDriver(BoskDiagnosticContext context, BoskDriver downstream) { + this.context = context; + this.downstream = downstream; + } + + public static DriverFactory factory() { + return (b,d) -> new OtelSpanContextDriver<>(b.rootReference().diagnosticContext(), d); + } + + @Override + public R initialRoot(Type rootType) throws InvalidTypeException, IOException, InterruptedException { + try (var __ = context.withCurrentOtelContext()) { + return downstream.initialRoot(rootType); + } + } + + @Override + public void submitReplacement(Reference target, T newValue) { + try (var __ = context.withCurrentOtelContext()) { + downstream.submitReplacement(target, newValue); + } + } + + @Override + public void submitConditionalReplacement(Reference target, T newValue, Reference precondition, Identifier requiredValue) { + try (var __ = context.withCurrentOtelContext()) { + downstream.submitConditionalReplacement(target, newValue, precondition, requiredValue); + } + } + + @Override + public void submitInitialization(Reference target, T newValue) { + try (var __ = context.withCurrentOtelContext()) { + downstream.submitInitialization(target, newValue); + } + } + + @Override + public void submitDeletion(Reference target) { + try (var __ = context.withCurrentOtelContext()) { + downstream.submitDeletion(target); + } + } + + @Override + public void submitConditionalDeletion(Reference target, Reference precondition, Identifier requiredValue) { + try (var __ = context.withCurrentOtelContext()) { + downstream.submitConditionalDeletion(target, precondition, requiredValue); + } + } + + @Override + public void flush() throws IOException, InterruptedException { + try (var __ = context.withCurrentOtelContext()) { + downstream.flush(); + } + } +} diff --git a/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverHanoiTest.java b/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverHanoiTest.java index 8157d597..9736fd56 100644 --- a/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverHanoiTest.java +++ b/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverHanoiTest.java @@ -1,6 +1,8 @@ package io.vena.bosk.drivers.mongo; +import io.vena.bosk.DriverStack; import io.vena.bosk.drivers.HanoiTest; +import io.vena.bosk.drivers.OtelSpanContextDriver; import io.vena.bosk.drivers.mongo.TestParameters.ParameterSet; import io.vena.bosk.junit.ParametersByName; import java.util.stream.Stream; @@ -19,10 +21,13 @@ public class MongoDriverHanoiTest extends HanoiTest { @ParametersByName public MongoDriverHanoiTest(ParameterSet parameters) { MongoDriverSettings settings = parameters.driverSettingsBuilder().build(); - this.driverFactory = MongoDriver.factory( - mongoService.clientSettings(), - settings, - new BsonPlugin() + this.driverFactory = DriverStack.of( + OtelSpanContextDriver.factory(), + MongoDriver.factory( + mongoService.clientSettings(), + settings, + new BsonPlugin() + ) ); mongoService.client() .getDatabase(settings.database()) @@ -39,10 +44,10 @@ static void setupMongoConnection() { static Stream parameters() { return TestParameters.driverSettings( Stream.of( - PandoFormat.oneBigDocument(), - PandoFormat.withGraftPoints("/puzzles"), - PandoFormat.withGraftPoints("/puzzles/-puzzle-/towers"), - PandoFormat.withGraftPoints("/puzzles", "/puzzles/-puzzle-/towers/-tower-/discs"), +// PandoFormat.oneBigDocument(), +// PandoFormat.withGraftPoints("/puzzles"), +// PandoFormat.withGraftPoints("/puzzles/-puzzle-/towers"), +// PandoFormat.withGraftPoints("/puzzles", "/puzzles/-puzzle-/towers/-tower-/discs"), SEQUOIA ), Stream.of(NORMAL) diff --git a/bosk-testing/src/main/java/io/vena/bosk/drivers/HanoiTest.java b/bosk-testing/src/main/java/io/vena/bosk/drivers/HanoiTest.java index 594a3e5b..7047d49b 100644 --- a/bosk-testing/src/main/java/io/vena/bosk/drivers/HanoiTest.java +++ b/bosk-testing/src/main/java/io/vena/bosk/drivers/HanoiTest.java @@ -1,5 +1,6 @@ package io.vena.bosk.drivers; +import io.opentelemetry.instrumentation.annotations.WithSpan; import io.vena.bosk.BindingEnvironment; import io.vena.bosk.Bosk; import io.vena.bosk.Catalog; @@ -73,6 +74,7 @@ void setup() throws InvalidTypeException { } @ParametersByName + @WithSpan void onePuzzle() throws InterruptedException { int numDiscs = 6; bosk.driver().submitReplacement(refs.puzzle(PUZZLE_1),