Skip to content

Commit

Permalink
POC OTel context propagation working
Browse files Browse the repository at this point in the history
  • Loading branch information
prdoyle committed Jun 21, 2024
1 parent eb76998 commit 6f3dc30
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 8 deletions.
55 changes: 55 additions & 0 deletions bosk-core/src/main/java/io/vena/bosk/BoskDiagnosticContext.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
package io.vena.bosk;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import static java.util.stream.Collectors.toSet;

/**
* A thread-local set of name-value pairs that propagate all the way from
* submission of a driver update, through all the driver layers,
Expand All @@ -13,13 +24,20 @@ public final class BoskDiagnosticContext {

public final class DiagnosticScope implements AutoCloseable {
final MapValue<String> oldAttributes = currentAttributes.get();
final Scope otelScope;

DiagnosticScope(MapValue<String> attributes) {
currentAttributes.set(attributes);
otelScope = PROPAGATORS.getTextMapPropagator().extract(
Context.current(),
attributes,
DIAGNOSTIC_ATTRIBUTE_GETTER
).makeCurrent();
}

@Override
public void close() {
otelScope.close();
currentAttributes.set(oldAttributes);
}
}
Expand Down Expand Up @@ -68,4 +86,41 @@ public DiagnosticScope withOnly(@Nullable MapValue<String> attributes) {
return new DiagnosticScope(attributes);
}
}

public DiagnosticScope withCurrentOtelContext() {
Map<String, String> carrier = new LinkedHashMap<>();
TextMapPropagator textMapPropagator = PROPAGATORS
.getTextMapPropagator();
textMapPropagator
.inject(
Context.current(),
carrier,
(map, key, value) -> {
if (map != null) {
map.put(OTEL_PREFIX + key, value);
}
});

return withAttributes(MapValue.fromOrderedMap(carrier));
}

private static final ContextPropagators PROPAGATORS = GlobalOpenTelemetry.getPropagators();

private static final TextMapGetter<MapValue<String>> DIAGNOSTIC_ATTRIBUTE_GETTER =
new TextMapGetter<>() {
@Override
public Set<String> keys(MapValue<String> carrier) {
return carrier.keySet().stream()
.filter(k -> k.startsWith(OTEL_PREFIX))
.map(k -> k.substring(OTEL_PREFIX.length()))
.collect(toSet());
}

@Override
public String get(MapValue<String> carrier, String key) {
return carrier == null ? null : carrier.get(OTEL_PREFIX + key);
}
};

public static final String OTEL_PREFIX = "otel.";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.vena.bosk.drivers;

import io.opentelemetry.api.trace.SpanContext;
import io.vena.bosk.BoskDiagnosticContext;
import io.vena.bosk.BoskDriver;
import io.vena.bosk.DriverFactory;
import io.vena.bosk.Identifier;
import io.vena.bosk.Reference;
import io.vena.bosk.StateTreeNode;
import io.vena.bosk.exceptions.InvalidTypeException;
import java.io.IOException;
import java.lang.reflect.Type;

/**
* Adds OpenTelemetry {@link SpanContext} info to the {@link BoskDiagnosticContext}
* automatically on each method call.
*/
public class OtelSpanContextDriver<R extends StateTreeNode> implements BoskDriver<R> {
private final BoskDiagnosticContext context;
private final BoskDriver<R> downstream;

OtelSpanContextDriver(BoskDiagnosticContext context, BoskDriver<R> downstream) {
this.context = context;
this.downstream = downstream;
}

public static <RR extends StateTreeNode> DriverFactory<RR> factory() {
return (b,d) -> new OtelSpanContextDriver<>(b.rootReference().diagnosticContext(), d);
}

@Override
public R initialRoot(Type rootType) throws InvalidTypeException, IOException, InterruptedException {
try (var __ = context.withCurrentOtelContext()) {
return downstream.initialRoot(rootType);
}
}

@Override
public <T> void submitReplacement(Reference<T> target, T newValue) {
try (var __ = context.withCurrentOtelContext()) {
downstream.submitReplacement(target, newValue);
}
}

@Override
public <T> void submitConditionalReplacement(Reference<T> target, T newValue, Reference<Identifier> precondition, Identifier requiredValue) {
try (var __ = context.withCurrentOtelContext()) {
downstream.submitConditionalReplacement(target, newValue, precondition, requiredValue);
}
}

@Override
public <T> void submitInitialization(Reference<T> target, T newValue) {
try (var __ = context.withCurrentOtelContext()) {
downstream.submitInitialization(target, newValue);
}
}

@Override
public <T> void submitDeletion(Reference<T> target) {
try (var __ = context.withCurrentOtelContext()) {
downstream.submitDeletion(target);
}
}

@Override
public <T> void submitConditionalDeletion(Reference<T> target, Reference<Identifier> precondition, Identifier requiredValue) {
try (var __ = context.withCurrentOtelContext()) {
downstream.submitConditionalDeletion(target, precondition, requiredValue);
}
}

@Override
public void flush() throws IOException, InterruptedException {
try (var __ = context.withCurrentOtelContext()) {
downstream.flush();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.vena.bosk.drivers.mongo;

import io.vena.bosk.DriverStack;
import io.vena.bosk.drivers.HanoiTest;
import io.vena.bosk.drivers.OtelSpanContextDriver;
import io.vena.bosk.drivers.mongo.TestParameters.ParameterSet;
import io.vena.bosk.junit.ParametersByName;
import java.util.stream.Stream;
Expand All @@ -19,10 +21,13 @@ public class MongoDriverHanoiTest extends HanoiTest {
@ParametersByName
public MongoDriverHanoiTest(ParameterSet parameters) {
MongoDriverSettings settings = parameters.driverSettingsBuilder().build();
this.driverFactory = MongoDriver.factory(
mongoService.clientSettings(),
settings,
new BsonPlugin()
this.driverFactory = DriverStack.of(
OtelSpanContextDriver.factory(),
MongoDriver.factory(
mongoService.clientSettings(),
settings,
new BsonPlugin()
)
);
mongoService.client()
.getDatabase(settings.database())
Expand All @@ -39,10 +44,10 @@ static void setupMongoConnection() {
static Stream<ParameterSet> parameters() {
return TestParameters.driverSettings(
Stream.of(
PandoFormat.oneBigDocument(),
PandoFormat.withGraftPoints("/puzzles"),
PandoFormat.withGraftPoints("/puzzles/-puzzle-/towers"),
PandoFormat.withGraftPoints("/puzzles", "/puzzles/-puzzle-/towers/-tower-/discs"),
// PandoFormat.oneBigDocument(),
// PandoFormat.withGraftPoints("/puzzles"),
// PandoFormat.withGraftPoints("/puzzles/-puzzle-/towers"),
// PandoFormat.withGraftPoints("/puzzles", "/puzzles/-puzzle-/towers/-tower-/discs"),
SEQUOIA
),
Stream.of(NORMAL)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.vena.bosk.drivers;

import io.opentelemetry.instrumentation.annotations.WithSpan;
import io.vena.bosk.BindingEnvironment;
import io.vena.bosk.Bosk;
import io.vena.bosk.Catalog;
Expand Down Expand Up @@ -73,6 +74,7 @@ void setup() throws InvalidTypeException {
}

@ParametersByName
@WithSpan
void onePuzzle() throws InterruptedException {
int numDiscs = 6;
bosk.driver().submitReplacement(refs.puzzle(PUZZLE_1),
Expand Down

0 comments on commit 6f3dc30

Please sign in to comment.