From 21fbb36d81c79d81d63abf0a7a64bc8c4124b941 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Mon, 5 Oct 2020 17:03:11 +0900 Subject: [PATCH] Add an OpenTelemetry context mechanism. (#1658) * Add an OpenTelemetry context mechanism. * Moar * Extracted interfaces * More context * Cleanup / tests * Move and cleanups * Brackets * Add example for brave context interop * Brave in OTel * Spotless * Missing folder * Another * Spotless --- build.gradle | 24 +- context/build.gradle | 32 ++ .../context/BraveInOtelTest.java | 125 +++++ .../OpenTelemetryCurrentTraceContext.java | 34 ++ .../grpc/override/ContextStorageOverride.java | 61 +++ .../opentelemetry/context/GrpcInOtelTest.java | 123 +++++ .../io/opentelemetry/context/Context.java | 209 +++++++++ .../context/ContextExecutorService.java | 113 +++++ .../io/opentelemetry/context/ContextKey.java | 44 ++ .../ContextScheduledExecutorService.java | 46 ++ .../opentelemetry/context/ContextStorage.java | 75 +++ .../context/ContextStorageProvider.java | 31 ++ .../opentelemetry/context/DefaultContext.java | 84 ++++ .../context/DefaultContextKey.java | 20 + .../io/opentelemetry/context/LazyStorage.java | 89 ++++ .../PersistentHashArrayMappedTrie.java | 283 ++++++++++++ .../java/io/opentelemetry/context/Scope.java | 33 ++ .../context/ThreadLocalContextStorage.java | 57 +++ .../context/BraveContextStorageProvider.java | 136 ++++++ .../context/OtelAsBraveTest.java | 122 +++++ ...entelemetry.context.ContextStorageProvider | 1 + .../context/GrpcContextStorageProvider.java | 123 +++++ .../opentelemetry/context/OtelAsGrpcTest.java | 121 +++++ ...entelemetry.context.ContextStorageProvider | 1 + .../context/GrpcContextStorageProvider.java | 41 ++ .../opentelemetry/context/OtelInGrpcTest.java | 123 +++++ ...entelemetry.context.ContextStorageProvider | 1 + .../io/opentelemetry/context/ContextTest.java | 433 ++++++++++++++++++ settings.gradle | 2 + 29 files changed, 2574 insertions(+), 13 deletions(-) create mode 100644 context/build.gradle create mode 100644 context/src/braveInOtelTest/java/io/opentelemetry/context/BraveInOtelTest.java create mode 100644 context/src/braveInOtelTest/java/io/opentelemetry/context/OpenTelemetryCurrentTraceContext.java create mode 100644 context/src/grpcInOtelTest/java/io/grpc/override/ContextStorageOverride.java create mode 100644 context/src/grpcInOtelTest/java/io/opentelemetry/context/GrpcInOtelTest.java create mode 100644 context/src/main/java/io/opentelemetry/context/Context.java create mode 100644 context/src/main/java/io/opentelemetry/context/ContextExecutorService.java create mode 100644 context/src/main/java/io/opentelemetry/context/ContextKey.java create mode 100644 context/src/main/java/io/opentelemetry/context/ContextScheduledExecutorService.java create mode 100644 context/src/main/java/io/opentelemetry/context/ContextStorage.java create mode 100644 context/src/main/java/io/opentelemetry/context/ContextStorageProvider.java create mode 100644 context/src/main/java/io/opentelemetry/context/DefaultContext.java create mode 100644 context/src/main/java/io/opentelemetry/context/DefaultContextKey.java create mode 100644 context/src/main/java/io/opentelemetry/context/LazyStorage.java create mode 100644 context/src/main/java/io/opentelemetry/context/PersistentHashArrayMappedTrie.java create mode 100644 context/src/main/java/io/opentelemetry/context/Scope.java create mode 100644 context/src/main/java/io/opentelemetry/context/ThreadLocalContextStorage.java create mode 100644 context/src/otelAsBraveTest/java/io/opentelemetry/context/BraveContextStorageProvider.java create mode 100644 context/src/otelAsBraveTest/java/io/opentelemetry/context/OtelAsBraveTest.java create mode 100644 context/src/otelAsBraveTest/resources/META-INF/services/io.opentelemetry.context.ContextStorageProvider create mode 100644 context/src/otelAsGrpcTest/java/io/opentelemetry/context/GrpcContextStorageProvider.java create mode 100644 context/src/otelAsGrpcTest/java/io/opentelemetry/context/OtelAsGrpcTest.java create mode 100644 context/src/otelAsGrpcTest/resources/META-INF/services/io.opentelemetry.context.ContextStorageProvider create mode 100644 context/src/otelInGrpcTest/java/io/opentelemetry/context/GrpcContextStorageProvider.java create mode 100644 context/src/otelInGrpcTest/java/io/opentelemetry/context/OtelInGrpcTest.java create mode 100644 context/src/otelInGrpcTest/resources/META-INF/services/io.opentelemetry.context.ContextStorageProvider create mode 100644 context/src/test/java/io/opentelemetry/context/ContextTest.java diff --git a/build.gradle b/build.gradle index 17cbcdf8f1..5136f591c4 100644 --- a/build.gradle +++ b/build.gradle @@ -354,9 +354,19 @@ configure(opentelemetryProjects) { } } - test { + tasks.withType(Test) { systemProperties project.properties.subMap(["enable.docker.tests"]) useJUnitPlatform() + + // At a test failure, log the stack trace to the console so that we don't + // have to open the HTML in a browser. + testLogging { + exceptionFormat = 'full' + showExceptions = true + showCauses = true + showStackTraces = true + } + maxHeapSize = '1500m' } javadoc.options { @@ -381,18 +391,6 @@ configure(opentelemetryProjects) { sign configurations.archives } - // At a test failure, log the stack trace to the console so that we don't - // have to open the HTML in a browser. - test { - testLogging { - exceptionFormat = 'full' - showExceptions = true - showCauses = true - showStackTraces = true - } - maxHeapSize = '1500m' - } - plugins.withId("ru.vyarus.animalsniffer") { animalsnifferTest { enabled = false diff --git a/context/build.gradle b/context/build.gradle new file mode 100644 index 0000000000..0886407244 --- /dev/null +++ b/context/build.gradle @@ -0,0 +1,32 @@ +plugins { + id "java" + + id "org.unbroken-dome.test-sets" + id "ru.vyarus.animalsniffer" +} + +description = 'OpenTelemetry Context (Incubator)' +ext.moduleName = "io.opentelemetry.context" + +testSets { + grpcInOtelTest + otelInGrpcTest + otelAsGrpcTest + + braveInOtelTest + otelAsBraveTest +} + +dependencies { + grpcInOtelTestImplementation libraries.grpc_context + otelAsGrpcTestImplementation libraries.grpc_context + otelInGrpcTestImplementation libraries.grpc_context + + braveInOtelTestImplementation "io.zipkin.brave:brave:5.12.6" + otelAsBraveTestImplementation "io.zipkin.brave:brave:5.12.6" + + testImplementation libraries.awaitility + + signature "org.codehaus.mojo.signature:java18:1.0@signature" + signature "net.sf.androidscents.signature:android-api-level-24:7.0_r2@signature" +} diff --git a/context/src/braveInOtelTest/java/io/opentelemetry/context/BraveInOtelTest.java b/context/src/braveInOtelTest/java/io/opentelemetry/context/BraveInOtelTest.java new file mode 100644 index 0000000000..8c799fd53d --- /dev/null +++ b/context/src/braveInOtelTest/java/io/opentelemetry/context/BraveInOtelTest.java @@ -0,0 +1,125 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +import static org.assertj.core.api.Assertions.assertThat; + +import brave.Tracing; +import brave.propagation.CurrentTraceContext; +import brave.propagation.TraceContext; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class BraveInOtelTest { + + private static final ContextKey ANIMAL = ContextKey.named("animal"); + + private static final Tracing TRACING = + Tracing.newBuilder().currentTraceContext(new OpenTelemetryCurrentTraceContext()).build(); + + private static final TraceContext TRACE_CONTEXT = + TraceContext.newBuilder().traceId(1).spanId(1).addExtra("japan").build(); + + private static ExecutorService otherThread; + + @BeforeAll + static void setUp() { + otherThread = Executors.newSingleThreadExecutor(); + } + + @AfterAll + static void tearDown() { + otherThread.shutdown(); + } + + @Test + void braveOtelMix() { + try (CurrentTraceContext.Scope ignored = + TRACING.currentTraceContext().newScope(TRACE_CONTEXT)) { + assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan"); + try (Scope ignored2 = Context.current().withValues(ANIMAL, "cat").makeCurrent()) { + assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan"); + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + + TraceContext context2 = + Tracing.current().currentTraceContext().get().toBuilder().addExtra("cheese").build(); + try (CurrentTraceContext.Scope ignored3 = + TRACING.currentTraceContext().newScope(context2)) { + assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan"); + assertThat(Tracing.current().currentTraceContext().get().extra()).contains("cheese"); + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + } + } + } + } + + @Test + void braveWrap() throws Exception { + try (CurrentTraceContext.Scope ignored = + TRACING.currentTraceContext().newScope(TRACE_CONTEXT)) { + try (Scope ignored2 = Context.current().withValues(ANIMAL, "cat").makeCurrent()) { + assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan"); + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + AtomicReference braveContainsJapan = new AtomicReference<>(); + AtomicReference otelValue = new AtomicReference<>(); + Runnable runnable = + () -> { + TraceContext traceContext = Tracing.current().currentTraceContext().get(); + if (traceContext != null && traceContext.extra().contains("japan")) { + braveContainsJapan.set(true); + } else { + braveContainsJapan.set(false); + } + otelValue.set(Context.current().getValue(ANIMAL)); + }; + otherThread.submit(runnable).get(); + assertThat(braveContainsJapan).hasValue(false); + assertThat(otelValue).hasValue(null); + + otherThread.submit(TRACING.currentTraceContext().wrap(runnable)).get(); + assertThat(braveContainsJapan).hasValue(true); + + // Since Brave context is inside the OTel context, propagating the Brave context does not + // propagate the OTel context. + assertThat(otelValue).hasValue(null); + } + } + } + + @Test + void otelWrap() throws Exception { + try (CurrentTraceContext.Scope ignored = + TRACING.currentTraceContext().newScope(TRACE_CONTEXT)) { + try (Scope ignored2 = Context.current().withValues(ANIMAL, "cat").makeCurrent()) { + assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan"); + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + AtomicReference braveContainsJapan = new AtomicReference<>(false); + AtomicReference otelValue = new AtomicReference<>(); + Runnable runnable = + () -> { + TraceContext traceContext = Tracing.current().currentTraceContext().get(); + if (traceContext != null && traceContext.extra().contains("japan")) { + braveContainsJapan.set(true); + } else { + braveContainsJapan.set(false); + } + otelValue.set(Context.current().getValue(ANIMAL)); + }; + otherThread.submit(runnable).get(); + assertThat(braveContainsJapan).hasValue(false); + assertThat(otelValue).hasValue(null); + + otherThread.submit(Context.current().wrap(runnable)).get(); + assertThat(braveContainsJapan).hasValue(true); + assertThat(otelValue).hasValue("cat"); + } + } + } +} diff --git a/context/src/braveInOtelTest/java/io/opentelemetry/context/OpenTelemetryCurrentTraceContext.java b/context/src/braveInOtelTest/java/io/opentelemetry/context/OpenTelemetryCurrentTraceContext.java new file mode 100644 index 0000000000..22dcd28432 --- /dev/null +++ b/context/src/braveInOtelTest/java/io/opentelemetry/context/OpenTelemetryCurrentTraceContext.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +import brave.propagation.CurrentTraceContext; +import brave.propagation.TraceContext; + +public class OpenTelemetryCurrentTraceContext extends CurrentTraceContext { + + private static final ContextKey TRACE_CONTEXT_KEY = + ContextKey.named("brave-tracecontext"); + + @Override + public TraceContext get() { + return Context.current().getValue(TRACE_CONTEXT_KEY); + } + + @SuppressWarnings("ReferenceEquality") + @Override + public Scope newScope(TraceContext context) { + Context currentOtel = Context.current(); + TraceContext currentBrave = currentOtel.getValue(TRACE_CONTEXT_KEY); + if (currentBrave == context) { + return Scope.NOOP; + } + + Context newOtel = currentOtel.withValues(TRACE_CONTEXT_KEY, context); + io.opentelemetry.context.Scope otelScope = newOtel.makeCurrent(); + return otelScope::close; + } +} diff --git a/context/src/grpcInOtelTest/java/io/grpc/override/ContextStorageOverride.java b/context/src/grpcInOtelTest/java/io/grpc/override/ContextStorageOverride.java new file mode 100644 index 0000000000..d3eee5539e --- /dev/null +++ b/context/src/grpcInOtelTest/java/io/grpc/override/ContextStorageOverride.java @@ -0,0 +1,61 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.grpc.override; + +import io.grpc.Context; +import io.opentelemetry.context.ContextKey; +import io.opentelemetry.context.Scope; +import java.util.logging.Level; +import java.util.logging.Logger; + +// This exact package / class name indicates to gRPC to use this override. +public class ContextStorageOverride extends Context.Storage { + + private static final Logger log = Logger.getLogger(ContextStorageOverride.class.getName()); + + private static final ContextKey GRPC_CONTEXT = ContextKey.named("grpc-context"); + private static final Context.Key OTEL_SCOPE = Context.key("otel-scope"); + + @Override + public Context doAttach(Context toAttach) { + io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context.current(); + Context current = otelContext.getValue(GRPC_CONTEXT); + + if (current == toAttach) { + return toAttach; + } + + if (current == null) { + current = Context.ROOT; + } + + io.opentelemetry.context.Context newOtelContext = + otelContext.withValues(GRPC_CONTEXT, toAttach); + Scope scope = newOtelContext.makeCurrent(); + return current.withValue(OTEL_SCOPE, scope); + } + + @Override + public void detach(Context toDetach, Context toRestore) { + if (current() != toDetach) { + // Log a severe message instead of throwing an exception as the context to attach is assumed + // to be the correct one and the unbalanced state represents a coding mistake in a lower + // layer in the stack that cannot be recovered from here. + log.log( + Level.SEVERE, + "Context was not attached when detaching", + new Throwable().fillInStackTrace()); + } + + Scope otelScope = OTEL_SCOPE.get(toRestore); + otelScope.close(); + } + + @Override + public Context current() { + return io.opentelemetry.context.Context.current().getValue(GRPC_CONTEXT); + } +} diff --git a/context/src/grpcInOtelTest/java/io/opentelemetry/context/GrpcInOtelTest.java b/context/src/grpcInOtelTest/java/io/opentelemetry/context/GrpcInOtelTest.java new file mode 100644 index 0000000000..02ae4fee33 --- /dev/null +++ b/context/src/grpcInOtelTest/java/io/opentelemetry/context/GrpcInOtelTest.java @@ -0,0 +1,123 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class GrpcInOtelTest { + + private static final ContextKey ANIMAL = ContextKey.named("animal"); + + private static final io.grpc.Context.Key FOOD = io.grpc.Context.key("food"); + private static final io.grpc.Context.Key COUNTRY = io.grpc.Context.key("country"); + + private static ExecutorService otherThread; + + @BeforeAll + static void setUp() { + otherThread = Executors.newSingleThreadExecutor(); + } + + @AfterAll + static void tearDown() { + otherThread.shutdown(); + } + + @Test + void grpcOtelMix() { + io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan"); + assertThat(COUNTRY.get()).isNull(); + io.grpc.Context root = grpcContext.attach(); + try { + assertThat(COUNTRY.get()).isEqualTo("japan"); + try (Scope ignored = Context.current().withValues(ANIMAL, "cat").makeCurrent()) { + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + assertThat(COUNTRY.get()).isEqualTo("japan"); + + io.grpc.Context context2 = io.grpc.Context.current().withValue(FOOD, "cheese"); + assertThat(FOOD.get()).isNull(); + io.grpc.Context toRestore = context2.attach(); + try { + assertThat(FOOD.get()).isEqualTo("cheese"); + assertThat(COUNTRY.get()).isEqualTo("japan"); + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + } finally { + context2.detach(toRestore); + } + } + } finally { + grpcContext.detach(root); + } + } + + @Test + void grpcWrap() throws Exception { + io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan"); + io.grpc.Context root = grpcContext.attach(); + try { + try (Scope ignored = Context.current().withValues(ANIMAL, "cat").makeCurrent()) { + assertThat(COUNTRY.get()).isEqualTo("japan"); + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + + AtomicReference grpcValue = new AtomicReference<>(); + AtomicReference otelValue = new AtomicReference<>(); + Runnable runnable = + () -> { + grpcValue.set(COUNTRY.get()); + otelValue.set(Context.current().getValue(ANIMAL)); + }; + otherThread.submit(runnable).get(); + assertThat(grpcValue).hasValue(null); + assertThat(otelValue).hasValue(null); + + otherThread.submit(io.grpc.Context.current().wrap(runnable)).get(); + assertThat(grpcValue).hasValue("japan"); + + // Since gRPC context is inside the OTel context, propagating gRPC context does not + // propagate the OTel context. + assertThat(otelValue).hasValue(null); + } + } finally { + grpcContext.detach(root); + } + } + + @Test + void otelWrap() throws Exception { + io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan"); + io.grpc.Context root = grpcContext.attach(); + try { + try (Scope ignored = Context.current().withValues(ANIMAL, "cat").makeCurrent()) { + assertThat(COUNTRY.get()).isEqualTo("japan"); + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + + AtomicReference grpcValue = new AtomicReference<>(); + AtomicReference otelValue = new AtomicReference<>(); + Runnable runnable = + () -> { + grpcValue.set(COUNTRY.get()); + otelValue.set(Context.current().getValue(ANIMAL)); + }; + otherThread.submit(runnable).get(); + assertThat(grpcValue).hasValue(null); + assertThat(otelValue).hasValue(null); + + otherThread.submit(Context.current().wrap(runnable)).get(); + assertThat(grpcValue).hasValue("japan"); + assertThat(otelValue).hasValue("cat"); + } + } finally { + grpcContext.detach(root); + } + } +} diff --git a/context/src/main/java/io/opentelemetry/context/Context.java b/context/src/main/java/io/opentelemetry/context/Context.java new file mode 100644 index 0000000000..4fb92c9814 --- /dev/null +++ b/context/src/main/java/io/opentelemetry/context/Context.java @@ -0,0 +1,209 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import javax.annotation.Nullable; + +/** + * A context propagation mechanism which can carry scoped-values across API boundaries and between + * threads. + * + *

A Context object can be {@linkplain #makeCurrent set} to the {@link ContextStorage}, which + * effectively forms a scope for the context. The scope is bound to the current thread. + * Within a scope, its Context is accessible even across API boundaries, through {@link #current}. + * The scope is later exited by {@link Scope#close()} closing} the scope. + * + *

Context objects are immutable and inherit state from their parent. To add or overwrite the + * current state a new context object must be created and then attached, replacing the previously + * bound context. For example: + * + *

{@code
+ * Context withCredential = Context.current().withValues(CRED_KEY, cred);
+ * withCredential.wrap(new Runnable() {
+ *   public void run() {
+ *      readUserRecords(userId, CRED_KEY.get());
+ *   }
+ * }).run();
+ * }
+ * + *

Notes and cautions on use: + * + *

    + *
  • Every {@link #makeCurrent()} must be followed by a {@link Scope#close()}. Breaking these + * rules may lead to memory leaks and incorrect scoping. + *
  • While Context objects are immutable they do not place such a restriction on the state they + * store. + *
  • Context is not intended for passing optional parameters to an API and developers should + * take care to avoid excessive dependence on context when designing an API. + *
  • Attaching Context from a different ancestor will cause information in the current Context + * to be lost. This should generally be avoided. + *
+ */ +public interface Context { + + /** Return the context associated with the current {@link Scope}. */ + static Context current() { + Context current = ContextStorage.get().current(); + return current != null ? current : root(); + } + + /** + * Returns the root {@link Context} which all other {@link Context} are derived from. + * + *

It should generally not be required to use the root {@link Context} directly - instead, use + * {@link Context#current()} to operate on the current {@link Context}. Only use this method if + * you are absolutely sure you need to disregard the current {@link Context} - this almost always + * is only a workaround hiding an underlying context propagation issue. + */ + static Context root() { + return DefaultContext.ROOT; + } + + /** + * Returns the value stored in this {@link Context} for the given {@link ContextKey}, or {@code + * null} if there is no value for the key in this context. + */ + @Nullable + V getValue(ContextKey key); + + /** + * Returns a new context with the given key value set. + * + *

{@code
+   * Context withCredential = Context.current().withValues(CRED_KEY, cred);
+   * withCredential.wrap(new Runnable() {
+   *   public void run() {
+   *      readUserRecords(userId, CRED_KEY.get());
+   *   }
+   * }).run();
+   * }
+ * + *

Note that multiple calls to {@code withValue} can be chained together. That is, + * + *

{@code
+   * context.withValues(K1, V1, K2, V2);
+   * // is the same as
+   * context.withValue(K1, V1).withValue(K2, V2);
+   * }
+ * + *

Nonetheless, {@link Context} should not be treated like a general purpose map with a large + * number of keys and values — combine multiple related items together into a single key instead + * of separating them. But if the items are unrelated, have separate keys for them. + */ + Context withValues(ContextKey k1, V v1); + + /** Returns a new context with the given key value set. */ + default Context withValues(ContextKey k1, V1 v1, ContextKey k2, V2 v2) { + return withValues(k1, v1).withValues(k2, v2); + } + + /** Returns a new context with the given key value set. */ + default Context withValues( + ContextKey k1, V1 v1, ContextKey k2, V2 v2, ContextKey k3, V3 v3) { + return withValues(k1, v1, k2, v2).withValues(k3, v3); + } + + /** + * Create a new context with the given key value set. + * + *

For more than 4 key-value pairs, note that multiple calls to {@link #withValues} can be + * chained together. That is, + * + *

+   * context.withValues(K1, V1, K2, V2);
+   * // is the same as
+   * context.withValue(K1, V1).withValue(K2, V2);
+   * 
+ * + *

Nonetheless, {@link Context} should not be treated like a general purpose map with a large + * number of keys and values — combine multiple related items together into a single key instead + * of separating them. But if the items are unrelated, have separate keys for them. + */ + default Context withValues( + ContextKey k1, + V1 v1, + ContextKey k2, + V2 v2, + ContextKey k3, + V3 v3, + ContextKey k4, + V4 v4) { + return withValues(k1, v1, k2, v2, k3, v3).withValues(k4, v4); + } + + /** + * Makes this the {@linkplain Context#current() current context} and returns a {@link Scope} which + * corresponds to the scope of execution this context is current for. {@link Context#current()} + * will return this {@link Context} until {@link Scope#close()} is called. {@link Scope#close()} + * must be called to properly restore the previous context from before this scope of execution or + * context will not work correctly. It is recommended to use try-with-resources to call {@link + * Scope#close()} automatically. + * + *

{@code
+   * Context prevCtx = Context.current();
+   * try (Scope ignored = ctx.attach()) {
+   *   assert Context.current() == ctx;
+   *   ...
+   * }
+   * assert Context.current() == prevCtx;
+   * }
+ */ + default Scope makeCurrent() { + return ContextStorage.get().attach(this); + } + + /** + * Returns a {@link Runnable} that makes this the {@linkplain Context#current current context} and + * then invokes the input {@link Runnable}. + */ + default Runnable wrap(Runnable runnable) { + return () -> { + try (Scope ignored = makeCurrent()) { + runnable.run(); + } + }; + } + + /** + * Returns a {@link Runnable} that makes this the {@linkplain Context#current current context} and + * then invokes the input {@link Runnable}. + */ + default Callable wrap(Callable callable) { + return () -> { + try (Scope ignored = makeCurrent()) { + return callable.call(); + } + }; + } + + /** + * Returns an {@link Executor} that will execute callbacks in the given {@code executor}, making + * this the {@linkplain Context#current current context} before each execution. + */ + default Executor wrap(Executor executor) { + return command -> executor.execute(wrap(command)); + } + + /** + * Returns an {@link ExecutorService} that will execute callbacks in the given {@code executor}, + * making this the {@linkplain Context#current current context} before each execution. + */ + default ExecutorService wrap(ExecutorService executor) { + return new ContextExecutorService(this, executor); + } + + /** + * Returns an {@link ScheduledExecutorService} that will execute callbacks in the given {@code + * executor}, making this the {@linkplain Context#current current context} before each execution. + */ + default ScheduledExecutorService wrap(ScheduledExecutorService executor) { + return new ContextScheduledExecutorService(this, executor); + } +} diff --git a/context/src/main/java/io/opentelemetry/context/ContextExecutorService.java b/context/src/main/java/io/opentelemetry/context/ContextExecutorService.java new file mode 100644 index 0000000000..dd49e8c0c2 --- /dev/null +++ b/context/src/main/java/io/opentelemetry/context/ContextExecutorService.java @@ -0,0 +1,113 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +class ContextExecutorService implements ExecutorService { + + private final Context context; + private final ExecutorService delegate; + + ContextExecutorService(Context context, ExecutorService delegate) { + this.context = context; + this.delegate = delegate; + } + + final Context context() { + return context; + } + + ExecutorService delegate() { + return delegate; + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + public List shutdownNow() { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) { + return delegate.submit(context.wrap(task)); + } + + @Override + public Future submit(Runnable task, T result) { + return delegate.submit(context.wrap(task), result); + } + + @Override + public Future submit(Runnable task) { + return delegate.submit(context.wrap(task)); + } + + @Override + public List> invokeAll(Collection> tasks) + throws InterruptedException { + return delegate.invokeAll(wrap(tasks)); + } + + @Override + public List> invokeAll( + Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return delegate.invokeAll(wrap(tasks), timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) + throws InterruptedException, ExecutionException { + return delegate.invokeAny(wrap(tasks)); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate.invokeAny(wrap(tasks), timeout, unit); + } + + @Override + public void execute(Runnable command) { + delegate.execute(context.wrap(command)); + } + + private Collection> wrap(Collection> tasks) { + List> wrapped = new ArrayList<>(); + for (Callable task : tasks) { + wrapped.add(context.wrap(task)); + } + return wrapped; + } +} diff --git a/context/src/main/java/io/opentelemetry/context/ContextKey.java b/context/src/main/java/io/opentelemetry/context/ContextKey.java new file mode 100644 index 0000000000..8e7afd2193 --- /dev/null +++ b/context/src/main/java/io/opentelemetry/context/ContextKey.java @@ -0,0 +1,44 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +/** + * Key for indexing values of type {@link T} stored in a {@link Context}. {@link ContextKey} are + * compared by reference, so it is expected that only one {@link ContextKey} is created for a + * particular type of context value. + * + *
{@code
+ * public class ContextUser {
+ *
+ *   private static final ContextKey KEY = ContextKey.named("MyState");
+ *
+ *   public Context startWork() {
+ *     return Context.withValues(KEY, new MyState());
+ *   }
+ *
+ *   public void continueWork(Context context) {
+ *     MyState state = context.getValue(KEY);
+ *     // Keys are compared by reference only.
+ *     assert state != Context.current().getValue(ContextKey.named("MyState"));
+ *     ...
+ *   }
+ * }
+ *
+ * }
+ */ +// ErrorProne false positive, this is used for its type constraint, not only as a bag of statics. +@SuppressWarnings("InterfaceWithOnlyStatics") +public interface ContextKey { + + /** + * Returns a new {@link ContextKey} with the given debug name. The name does not impact behavior + * and is only for debugging purposes. Multiple different keys with the same name will be separate + * keys. + */ + static ContextKey named(String name) { + return ContextStorage.get().contextKey(name); + } +} diff --git a/context/src/main/java/io/opentelemetry/context/ContextScheduledExecutorService.java b/context/src/main/java/io/opentelemetry/context/ContextScheduledExecutorService.java new file mode 100644 index 0000000000..a17ed89731 --- /dev/null +++ b/context/src/main/java/io/opentelemetry/context/ContextScheduledExecutorService.java @@ -0,0 +1,46 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +class ContextScheduledExecutorService extends ContextExecutorService + implements ScheduledExecutorService { + + ContextScheduledExecutorService(Context context, ScheduledExecutorService delegate) { + super(context, delegate); + } + + @Override + ScheduledExecutorService delegate() { + return (ScheduledExecutorService) super.delegate(); + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return delegate().schedule(context().wrap(command), delay, unit); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return delegate().schedule(context().wrap(callable), delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate( + Runnable command, long initialDelay, long period, TimeUnit unit) { + return delegate().scheduleAtFixedRate(context().wrap(command), initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay( + Runnable command, long initialDelay, long delay, TimeUnit unit) { + return delegate().scheduleWithFixedDelay(context().wrap(command), initialDelay, delay, unit); + } +} diff --git a/context/src/main/java/io/opentelemetry/context/ContextStorage.java b/context/src/main/java/io/opentelemetry/context/ContextStorage.java new file mode 100644 index 0000000000..c55c3ade8b --- /dev/null +++ b/context/src/main/java/io/opentelemetry/context/ContextStorage.java @@ -0,0 +1,75 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +/** + * The storage for storing and retrieving the current {@link Context}. + * + *

If you want to implement your own storage or add some hooks when a {@link Context} is attached + * and restored, you should use {@link ContextStorageProvider}. Here's an example that sets MDC + * before {@link Context} is attached: + * + *

{@code
+ * > public class MyStorage implements ContextStorageProvider {
+ * >
+ * >   @Override
+ * >   public ContextStorage get() {
+ * >     ContextStorage threadLocalStorage = Context.threadLocalStorage();
+ * >     return new RequestContextStorage() {
+ * >       @Override
+ * >       public Scope T attach(Context toAttach) {
+ * >         Context current = current();
+ * >         setMdc(toAttach);
+ * >         Scope scope = threadLocalStorage.attach(toAttach);
+ * >         return () -> {
+ * >           clearMdc();
+ * >           setMdc(current);
+ * >           scope.close();
+ * >         }
+ * >       }
+ * >
+ * >       @Override
+ * >       public Context current() {
+ * >         return threadLocalStorage.current();
+ * >       }
+ * >     }
+ * >   }
+ * > }
+ * }
+ */ +public interface ContextStorage { + + /** + * Returns the {@link ContextStorage} being used by this application. This is only for use when + * integrating with other context propagation mechanisms and not meant for direct use. To attach + * or detach a {@link Context} in an application, use {@link Context#makeCurrent()} and {@link + * Scope#close()}. + */ + static ContextStorage get() { + return LazyStorage.storage; + } + + /** + * Sets the specified {@link Context} as the current {@link Context} and returns a {@link Scope} + * representing the scope of execution. {@link Scope#close()} must be called when the current + * {@link Context} should be restored to what it was before attaching {@code toAttach}. + */ + Scope attach(Context toAttach); + + /** + * Returns the current {@link DefaultContext}. If no {@link DefaultContext} has been attached yet, + * this will be the {@linkplain Context#root()} root context}. + */ + Context current(); + + /** + * Returns a {@link ContextKey} for the given name. This is only useful when integrating with a + * separate context propagation mechanism, where + */ + default ContextKey contextKey(String name) { + return new DefaultContextKey<>(name); + } +} diff --git a/context/src/main/java/io/opentelemetry/context/ContextStorageProvider.java b/context/src/main/java/io/opentelemetry/context/ContextStorageProvider.java new file mode 100644 index 0000000000..6a5fa5e096 --- /dev/null +++ b/context/src/main/java/io/opentelemetry/context/ContextStorageProvider.java @@ -0,0 +1,31 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +import java.util.concurrent.Executor; + +/** + * A Java SPI (Service Provider Interface) to allow replacing the default {@link ContextStorage}. + * This can be useful if, for example, you want to store OpenTelemetry {@link Context} in another + * context propagation system. For example, the returned {@link ContextStorage} could delegate to + * methods in + * + *

{@code + * com.linecorp.armeria.common.RequestContext}, {@code + * io.grpc.context.Context}, or {@code + * org.eclipse.microprofile.context.ThreadContext} + * + *

if you are already using one of those systems in your application. Then you would not have to + * use methods like {@link Context#wrap(Executor)} and can use your current system instead. + */ +public interface ContextStorageProvider { + + /** Returns the {@link ContextStorage} to use to store {@link DefaultContext}. */ + ContextStorage get(); +} diff --git a/context/src/main/java/io/opentelemetry/context/DefaultContext.java b/context/src/main/java/io/opentelemetry/context/DefaultContext.java new file mode 100644 index 0000000000..bb0fd51df4 --- /dev/null +++ b/context/src/main/java/io/opentelemetry/context/DefaultContext.java @@ -0,0 +1,84 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +import javax.annotation.Nullable; + +final class DefaultContext implements Context { + + static final Context ROOT = new DefaultContext(); + + /** + * Returns the default {@link ContextStorage} used to attach {@link Context}s to scopes of + * execution. Should only be used when defining your own {@link ContextStorage} in case you want + * to delegate functionality to the default implementation. + */ + static ContextStorage threadLocalStorage() { + return ThreadLocalContextStorage.INSTANCE; + } + + @Nullable private final PersistentHashArrayMappedTrie.Node, Object> entries; + + private DefaultContext(PersistentHashArrayMappedTrie.Node, Object> entries) { + this.entries = entries; + } + + DefaultContext() { + entries = null; + } + + @Override + @Nullable + public V getValue(ContextKey key) { + // Because withValue enforces the value for a key is its type, this is always safe. + @SuppressWarnings("unchecked") + V value = (V) PersistentHashArrayMappedTrie.get(entries, key); + return value; + } + + @Override + public Context withValues(ContextKey k1, V v1) { + PersistentHashArrayMappedTrie.Node, Object> newEntries = + PersistentHashArrayMappedTrie.put(entries, k1, v1); + return new DefaultContext(newEntries); + } + + @Override + public Context withValues(ContextKey k1, V1 v1, ContextKey k2, V2 v2) { + PersistentHashArrayMappedTrie.Node, Object> newEntries = + PersistentHashArrayMappedTrie.put(entries, k1, v1); + newEntries = PersistentHashArrayMappedTrie.put(newEntries, k2, v2); + return new DefaultContext(newEntries); + } + + @Override + public Context withValues( + ContextKey k1, V1 v1, ContextKey k2, V2 v2, ContextKey k3, V3 v3) { + PersistentHashArrayMappedTrie.Node, Object> newEntries = + PersistentHashArrayMappedTrie.put(entries, k1, v1); + newEntries = PersistentHashArrayMappedTrie.put(newEntries, k2, v2); + newEntries = PersistentHashArrayMappedTrie.put(newEntries, k3, v3); + return new DefaultContext(newEntries); + } + + @Override + public Context withValues( + ContextKey k1, + V1 v1, + ContextKey k2, + V2 v2, + ContextKey k3, + V3 v3, + ContextKey k4, + V4 v4) { + PersistentHashArrayMappedTrie.Node, Object> newEntries = + PersistentHashArrayMappedTrie.put(entries, k1, v1); + newEntries = PersistentHashArrayMappedTrie.put(newEntries, k2, v2); + newEntries = PersistentHashArrayMappedTrie.put(newEntries, k3, v3); + newEntries = PersistentHashArrayMappedTrie.put(newEntries, k4, v4); + return new DefaultContext(newEntries); + } +} diff --git a/context/src/main/java/io/opentelemetry/context/DefaultContextKey.java b/context/src/main/java/io/opentelemetry/context/DefaultContextKey.java new file mode 100644 index 0000000000..d5e84cb566 --- /dev/null +++ b/context/src/main/java/io/opentelemetry/context/DefaultContextKey.java @@ -0,0 +1,20 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +final class DefaultContextKey implements ContextKey { + + private final String name; + + DefaultContextKey(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } +} diff --git a/context/src/main/java/io/opentelemetry/context/LazyStorage.java b/context/src/main/java/io/opentelemetry/context/LazyStorage.java new file mode 100644 index 0000000000..0afb6852d6 --- /dev/null +++ b/context/src/main/java/io/opentelemetry/context/LazyStorage.java @@ -0,0 +1,89 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +import java.util.ArrayList; +import java.util.List; +import java.util.ServiceLoader; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; + +// Lazy-loaded storage. Delaying storage initialization until after class initialization makes it +// much easier to avoid circular loading since there can still be references to Context as long as +// they don't depend on storage, like key() and currentContextExecutor(). It also makes it easier +// to handle exceptions. +final class LazyStorage { + + private static final String CONTEXT_STORAGE_PROVIDER_PROPERTY = + "io.opentelemetry.context.contextStorageProvider"; + + private static final Logger logger = Logger.getLogger(ThreadLocalContextStorage.class.getName()); + + static final ContextStorage storage; + + static { + AtomicReference deferredStorageFailure = new AtomicReference<>(); + storage = createStorage(deferredStorageFailure); + Throwable failure = deferredStorageFailure.get(); + // Logging must happen after storage has been set, as loggers may use Context. + if (failure != null) { + logger.log( + Level.WARNING, "ContextStorageProvider initialized failed. Using default", failure); + } + } + + private static ContextStorage createStorage(AtomicReference deferredStorageFailure) { + String providerClassName = System.getProperty(CONTEXT_STORAGE_PROVIDER_PROPERTY, ""); + + List providers = new ArrayList<>(); + for (ContextStorageProvider provider : ServiceLoader.load(ContextStorageProvider.class)) { + providers.add(provider); + } + + if (providers.isEmpty()) { + return DefaultContext.threadLocalStorage(); + } + + if (providers.size() == 1) { + ContextStorageProvider provider = providers.get(0); + try { + return provider.get(); + } catch (Throwable t) { + deferredStorageFailure.set(t); + return DefaultContext.threadLocalStorage(); + } + } + + if (providerClassName.isEmpty()) { + deferredStorageFailure.set( + new IllegalStateException( + "Found multiple ContextStorageProvider. Set the " + + "io.opentelemetry.context.ContextStorageProvider property to the fully " + + "qualified class name of the provider to use. Falling back to default " + + "ContextStorage. Found providers: " + + providers)); + return DefaultContext.threadLocalStorage(); + } + + for (ContextStorageProvider provider : providers) { + if (provider.getClass().getName().equals(providerClassName)) { + return provider.get(); + } + } + + deferredStorageFailure.set( + new IllegalStateException( + "io.opentelemetry.context.ContextStorageProvider property set but no matching class " + + "could be found, requested: " + + providerClassName + + " but found providers: " + + providers)); + return DefaultContext.threadLocalStorage(); + } + + private LazyStorage() {} +} diff --git a/context/src/main/java/io/opentelemetry/context/PersistentHashArrayMappedTrie.java b/context/src/main/java/io/opentelemetry/context/PersistentHashArrayMappedTrie.java new file mode 100644 index 0000000000..790f1325b0 --- /dev/null +++ b/context/src/main/java/io/opentelemetry/context/PersistentHashArrayMappedTrie.java @@ -0,0 +1,283 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +import java.util.Arrays; + +/** + * A persistent (copy-on-write) hash tree/trie. Collisions are handled linearly. Delete is not + * supported, but replacement is. The implementation favors simplicity and low memory allocation + * during insertion. Although the asymptotics are good, it is optimized for small sizes like less + * than 20; "unbelievably large" would be 100. + * + *

Inspired by popcnt-based compression seen in Ideal Hash Trees, Phil Bagwell (2000). The rest + * of the implementation is ignorant of/ignores the paper. + */ +final class PersistentHashArrayMappedTrie { + + private PersistentHashArrayMappedTrie() {} + + /** Returns the value with the specified key, or {@code null} if it does not exist. */ + static V get(Node root, K key) { + if (root == null) { + return null; + } + return root.get(key, System.identityHashCode(key), 0); + } + + /** Returns a new trie where the key is set to the specified value. */ + static Node put(Node root, K key, V value) { + if (root == null) { + return new Leaf<>(key, value); + } + return root.put(key, value, System.identityHashCode(key), 0); + } + + // Not actually annotated to avoid depending on guava + // @VisibleForTesting + static final class Leaf implements Node { + private final K key; + private final V value; + + public Leaf(K key, V value) { + this.key = key; + this.value = value; + } + + @Override + public int size() { + return 1; + } + + @Override + public V get(K key, int hash, int bitsConsumed) { + if (this.key == key) { + return value; + } else { + return null; + } + } + + @Override + public Node put(K key, V value, int hash, int bitsConsumed) { + int thisHash = System.identityHashCode(this.key); + if (thisHash != hash) { + // Insert + return CompressedIndex.combine(new Leaf<>(key, value), hash, this, thisHash, bitsConsumed); + } else if (this.key == key) { + // Replace + return new Leaf<>(key, value); + } else { + // Hash collision + return new CollisionLeaf<>(this.key, this.value, key, value); + } + } + + @Override + public String toString() { + return String.format("Leaf(key=%s value=%s)", key, value); + } + } + + // Not actually annotated to avoid depending on guava + // @VisibleForTesting + static final class CollisionLeaf implements Node { + // All keys must have same hash, but not have the same reference + private final K[] keys; + private final V[] values; + + // Not actually annotated to avoid depending on guava + // @VisibleForTesting + @SuppressWarnings("unchecked") + CollisionLeaf(K key1, V value1, K key2, V value2) { + this((K[]) new Object[] {key1, key2}, (V[]) new Object[] {value1, value2}); + assert key1 != key2; + assert System.identityHashCode(key1) == System.identityHashCode(key2); + } + + private CollisionLeaf(K[] keys, V[] values) { + this.keys = keys; + this.values = values; + } + + @Override + public int size() { + return values.length; + } + + @Override + public V get(K key, int hash, int bitsConsumed) { + for (int i = 0; i < keys.length; i++) { + if (keys[i] == key) { + return values[i]; + } + } + return null; + } + + @Override + public Node put(K key, V value, int hash, int bitsConsumed) { + int thisHash = System.identityHashCode(keys[0]); + int keyIndex; + if (thisHash != hash) { + // Insert + return CompressedIndex.combine(new Leaf<>(key, value), hash, this, thisHash, bitsConsumed); + } else if ((keyIndex = indexOfKey(key)) != -1) { + // Replace + K[] newKeys = Arrays.copyOf(keys, keys.length); + V[] newValues = Arrays.copyOf(values, keys.length); + newKeys[keyIndex] = key; + newValues[keyIndex] = value; + return new CollisionLeaf<>(newKeys, newValues); + } else { + // Yet another hash collision + K[] newKeys = Arrays.copyOf(keys, keys.length + 1); + V[] newValues = Arrays.copyOf(values, keys.length + 1); + newKeys[keys.length] = key; + newValues[keys.length] = value; + return new CollisionLeaf<>(newKeys, newValues); + } + } + + // -1 if not found + private int indexOfKey(K key) { + for (int i = 0; i < keys.length; i++) { + if (keys[i] == key) { + return i; + } + } + return -1; + } + + @Override + public String toString() { + StringBuilder valuesSb = new StringBuilder(); + valuesSb.append("CollisionLeaf("); + for (int i = 0; i < values.length; i++) { + valuesSb.append("(key=").append(keys[i]).append(" value=").append(values[i]).append(") "); + } + return valuesSb.append(")").toString(); + } + } + + // Not actually annotated to avoid depending on guava + // @VisibleForTesting + static final class CompressedIndex implements Node { + private static final int BITS = 5; + private static final int BITS_MASK = 0x1F; + + final int bitmap; + final Node[] values; + private final int size; + + private CompressedIndex(int bitmap, Node[] values, int size) { + this.bitmap = bitmap; + this.values = values; + this.size = size; + } + + @Override + public int size() { + return size; + } + + @Override + public V get(K key, int hash, int bitsConsumed) { + int indexBit = indexBit(hash, bitsConsumed); + if ((bitmap & indexBit) == 0) { + return null; + } + int compressedIndex = compressedIndex(indexBit); + return values[compressedIndex].get(key, hash, bitsConsumed + BITS); + } + + @Override + public Node put(K key, V value, int hash, int bitsConsumed) { + int indexBit = indexBit(hash, bitsConsumed); + int compressedIndex = compressedIndex(indexBit); + if ((bitmap & indexBit) == 0) { + // Insert + int newBitmap = bitmap | indexBit; + @SuppressWarnings("unchecked") + Node[] newValues = (Node[]) new Node[values.length + 1]; + System.arraycopy(values, 0, newValues, 0, compressedIndex); + newValues[compressedIndex] = new Leaf<>(key, value); + System.arraycopy( + values, + compressedIndex, + newValues, + compressedIndex + 1, + values.length - compressedIndex); + return new CompressedIndex<>(newBitmap, newValues, size() + 1); + } else { + // Replace + Node[] newValues = Arrays.copyOf(values, values.length); + newValues[compressedIndex] = + values[compressedIndex].put(key, value, hash, bitsConsumed + BITS); + int newSize = size(); + newSize += newValues[compressedIndex].size(); + newSize -= values[compressedIndex].size(); + return new CompressedIndex<>(bitmap, newValues, newSize); + } + } + + static Node combine( + Node node1, int hash1, Node node2, int hash2, int bitsConsumed) { + assert hash1 != hash2; + int indexBit1 = indexBit(hash1, bitsConsumed); + int indexBit2 = indexBit(hash2, bitsConsumed); + if (indexBit1 == indexBit2) { + Node node = combine(node1, hash1, node2, hash2, bitsConsumed + BITS); + @SuppressWarnings("unchecked") + Node[] values = (Node[]) new Node[] {node}; + return new CompressedIndex<>(indexBit1, values, node.size()); + } else { + // Make node1 the smallest + if (uncompressedIndex(hash1, bitsConsumed) > uncompressedIndex(hash2, bitsConsumed)) { + Node nodeCopy = node1; + node1 = node2; + node2 = nodeCopy; + } + @SuppressWarnings("unchecked") + Node[] values = (Node[]) new Node[] {node1, node2}; + return new CompressedIndex<>(indexBit1 | indexBit2, values, node1.size() + node2.size()); + } + } + + @Override + public String toString() { + StringBuilder valuesSb = new StringBuilder(); + valuesSb + .append("CompressedIndex(") + .append(String.format("bitmap=%s ", Integer.toBinaryString(bitmap))); + for (Node value : values) { + valuesSb.append(value).append(" "); + } + return valuesSb.append(")").toString(); + } + + private int compressedIndex(int indexBit) { + return Integer.bitCount(bitmap & (indexBit - 1)); + } + + private static int uncompressedIndex(int hash, int bitsConsumed) { + return (hash >>> bitsConsumed) & BITS_MASK; + } + + private static int indexBit(int hash, int bitsConsumed) { + int uncompressedIndex = uncompressedIndex(hash, bitsConsumed); + return 1 << uncompressedIndex; + } + } + + interface Node { + V get(K key, int hash, int bitsConsumed); + + Node put(K key, V value, int hash, int bitsConsumed); + + int size(); + } +} diff --git a/context/src/main/java/io/opentelemetry/context/Scope.java b/context/src/main/java/io/opentelemetry/context/Scope.java new file mode 100644 index 0000000000..5c79f8494d --- /dev/null +++ b/context/src/main/java/io/opentelemetry/context/Scope.java @@ -0,0 +1,33 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +import io.opentelemetry.context.ThreadLocalContextStorage.NoopScope; + +/** + * An {@link AutoCloseable} that represents a mounted context for a block of code. A failure to call + * {@link Scope#close()} will generally break tracing or cause memory leaks. It is recommended that + * you use this class with a {@code try-with-resources} block: + * + *

{@code
+ * try (Scope ignored = tracer.withSpan(span)) {
+ *   ...
+ * }
+ * }
+ */ +public interface Scope extends AutoCloseable { + + /** + * Returns a {@link Scope} that does nothing. Represents attaching a {@link Context} when it is + * already attached. + */ + static Scope noop() { + return NoopScope.INSTANCE; + } + + @Override + void close(); +} diff --git a/context/src/main/java/io/opentelemetry/context/ThreadLocalContextStorage.java b/context/src/main/java/io/opentelemetry/context/ThreadLocalContextStorage.java new file mode 100644 index 0000000000..0602d80a1e --- /dev/null +++ b/context/src/main/java/io/opentelemetry/context/ThreadLocalContextStorage.java @@ -0,0 +1,57 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +import java.util.logging.Level; +import java.util.logging.Logger; + +enum ThreadLocalContextStorage implements ContextStorage { + INSTANCE; + + private static final Logger logger = Logger.getLogger(ThreadLocalContextStorage.class.getName()); + + private static final ThreadLocal THREAD_LOCAL_STORAGE = new ThreadLocal<>(); + + static { + THREAD_LOCAL_STORAGE.set(Context.root()); + } + + @Override + public Scope attach(Context toAttach) { + if (toAttach == null) { + // Null context not allowed so ignore it. + return NoopScope.INSTANCE; + } + + Context beforeAttach = current(); + if (toAttach == beforeAttach) { + return NoopScope.INSTANCE; + } + + THREAD_LOCAL_STORAGE.set(toAttach); + + return () -> { + if (current() != toAttach) { + logger.log( + Level.FINE, + "Context in storage not the expected context, Scope.close was not called correctly"); + } + THREAD_LOCAL_STORAGE.set(beforeAttach); + }; + } + + @Override + public Context current() { + return THREAD_LOCAL_STORAGE.get(); + } + + enum NoopScope implements Scope { + INSTANCE; + + @Override + public void close() {} + } +} diff --git a/context/src/otelAsBraveTest/java/io/opentelemetry/context/BraveContextStorageProvider.java b/context/src/otelAsBraveTest/java/io/opentelemetry/context/BraveContextStorageProvider.java new file mode 100644 index 0000000000..5d313603ac --- /dev/null +++ b/context/src/otelAsBraveTest/java/io/opentelemetry/context/BraveContextStorageProvider.java @@ -0,0 +1,136 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +import brave.Tracing; +import brave.propagation.CurrentTraceContext; +import brave.propagation.TraceContext; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class BraveContextStorageProvider implements ContextStorageProvider { + + @Override + public ContextStorage get() { + return BraveContextStorage.INSTANCE; + } + + @SuppressWarnings("ReferenceEquality") + private enum BraveContextStorage implements ContextStorage { + INSTANCE; + + @Override + public Scope attach(Context toAttach) { + TraceContext braveContextToAttach = ((BraveContextWrapper) toAttach).braveContext; + + CurrentTraceContext currentTraceContext = Tracing.current().currentTraceContext(); + TraceContext currentBraveContext = currentTraceContext.get(); + if (currentBraveContext == braveContextToAttach) { + return Scope.noop(); + } + + CurrentTraceContext.Scope braveScope = currentTraceContext.newScope(braveContextToAttach); + return braveScope::close; + } + + @Override + public Context current() { + TraceContext current = Tracing.current().currentTraceContext().get(); + if (current != null) { + return new BraveContextWrapper(current); + } + return new BraveContextWrapper(TraceContext.newBuilder().traceId(1).spanId(1).build()); + } + } + + private static class BraveContextValues { + private final Object[] values; + + BraveContextValues(Object key, Object value) { + this.values = new Object[] {key, value}; + } + + BraveContextValues(Object[] values) { + this.values = values; + } + + Object getValue(Object key) { + for (int i = 0; i < values.length; i += 2) { + if (values[i] == key) { + return values[i + 1]; + } + } + return null; + } + + BraveContextValues with(Object key, Object value) { + final Object[] copy; + for (int i = 0; i < values.length; i += 2) { + if (values[i] == key) { + copy = values.clone(); + copy[i + 1] = value; + return new BraveContextValues(copy); + } + } + + copy = Arrays.copyOf(values, values.length + 2); + copy[values.length - 2] = key; + copy[values.length - 1] = value; + return new BraveContextValues(copy); + } + } + + private static class BraveContextWrapper implements Context { + + private final TraceContext braveContext; + + private BraveContextWrapper(TraceContext braveContext) { + this.braveContext = braveContext; + } + + @Override + public V getValue(ContextKey key) { + BraveContextValues values = braveContext.findExtra(BraveContextValues.class); + if (values == null) { + return null; + } + @SuppressWarnings("unchecked") + V value = (V) values.getValue(key); + return value; + } + + @Override + public Context withValues(ContextKey k1, V v1) { + List extras = braveContext.extra(); + BraveContextValues values = null; + int existingValuesIndex = -1; + for (int i = 0; i < extras.size(); i++) { + Object extra = extras.get(i); + if (extra instanceof BraveContextValues) { + values = (BraveContextValues) extra; + existingValuesIndex = i; + break; + } + } + final List newExtras; + if (values == null) { + values = new BraveContextValues(k1, v1); + newExtras = new ArrayList<>(extras.size() + 1); + newExtras.addAll(extras); + newExtras.add(values); + } else { + newExtras = new ArrayList<>(extras); + newExtras.set(existingValuesIndex, values.with(k1, v1)); + } + + TraceContext.Builder builder = braveContext.toBuilder(); + builder.clearExtra(); + newExtras.forEach(builder::addExtra); + return new BraveContextWrapper(builder.build()); + } + } +} diff --git a/context/src/otelAsBraveTest/java/io/opentelemetry/context/OtelAsBraveTest.java b/context/src/otelAsBraveTest/java/io/opentelemetry/context/OtelAsBraveTest.java new file mode 100644 index 0000000000..e1159f0d19 --- /dev/null +++ b/context/src/otelAsBraveTest/java/io/opentelemetry/context/OtelAsBraveTest.java @@ -0,0 +1,122 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +import static org.assertj.core.api.Assertions.assertThat; + +import brave.Tracing; +import brave.propagation.CurrentTraceContext; +import brave.propagation.TraceContext; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class OtelAsBraveTest { + + private static final ContextKey ANIMAL = ContextKey.named("animal"); + + private static final Tracing TRACING = + Tracing.newBuilder().currentTraceContext(CurrentTraceContext.Default.create()).build(); + + private static final TraceContext TRACE_CONTEXT = + TraceContext.newBuilder().traceId(1).spanId(1).addExtra("japan").build(); + + private static ExecutorService otherThread; + + @BeforeAll + static void setUp() { + otherThread = Executors.newSingleThreadExecutor(); + } + + @AfterAll + static void tearDown() { + otherThread.shutdown(); + } + + @Test + void braveOtelMix() { + try (CurrentTraceContext.Scope ignored = + TRACING.currentTraceContext().newScope(TRACE_CONTEXT)) { + assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan"); + try (Scope ignored2 = Context.current().withValues(ANIMAL, "cat").makeCurrent()) { + assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan"); + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + + TraceContext context2 = + Tracing.current().currentTraceContext().get().toBuilder().addExtra("cheese").build(); + try (CurrentTraceContext.Scope ignored3 = + TRACING.currentTraceContext().newScope(context2)) { + assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan"); + assertThat(Tracing.current().currentTraceContext().get().extra()).contains("cheese"); + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + } + } + } + } + + @Test + void braveWrap() throws Exception { + try (CurrentTraceContext.Scope ignored = + TRACING.currentTraceContext().newScope(TRACE_CONTEXT)) { + try (Scope ignored2 = Context.current().withValues(ANIMAL, "cat").makeCurrent()) { + assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan"); + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + AtomicReference braveContainsJapan = new AtomicReference<>(); + AtomicReference otelValue = new AtomicReference<>(); + Runnable runnable = + () -> { + TraceContext traceContext = Tracing.current().currentTraceContext().get(); + if (traceContext != null && traceContext.extra().contains("japan")) { + braveContainsJapan.set(true); + } else { + braveContainsJapan.set(false); + } + otelValue.set(Context.current().getValue(ANIMAL)); + }; + otherThread.submit(runnable).get(); + assertThat(braveContainsJapan).hasValue(false); + assertThat(otelValue).hasValue(null); + + otherThread.submit(TRACING.currentTraceContext().wrap(runnable)).get(); + assertThat(braveContainsJapan).hasValue(true); + assertThat(otelValue).hasValue("cat"); + } + } + } + + @Test + void otelWrap() throws Exception { + try (CurrentTraceContext.Scope ignored = + TRACING.currentTraceContext().newScope(TRACE_CONTEXT)) { + try (Scope ignored2 = Context.current().withValues(ANIMAL, "cat").makeCurrent()) { + assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan"); + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + AtomicReference braveContainsJapan = new AtomicReference<>(false); + AtomicReference otelValue = new AtomicReference<>(); + Runnable runnable = + () -> { + TraceContext traceContext = Tracing.current().currentTraceContext().get(); + if (traceContext != null && traceContext.extra().contains("japan")) { + braveContainsJapan.set(true); + } else { + braveContainsJapan.set(false); + } + otelValue.set(Context.current().getValue(ANIMAL)); + }; + otherThread.submit(runnable).get(); + assertThat(braveContainsJapan).hasValue(false); + assertThat(otelValue).hasValue(null); + + otherThread.submit(Context.current().wrap(runnable)).get(); + assertThat(braveContainsJapan).hasValue(true); + assertThat(otelValue).hasValue("cat"); + } + } + } +} diff --git a/context/src/otelAsBraveTest/resources/META-INF/services/io.opentelemetry.context.ContextStorageProvider b/context/src/otelAsBraveTest/resources/META-INF/services/io.opentelemetry.context.ContextStorageProvider new file mode 100644 index 0000000000..923d2cc2d4 --- /dev/null +++ b/context/src/otelAsBraveTest/resources/META-INF/services/io.opentelemetry.context.ContextStorageProvider @@ -0,0 +1 @@ +io.opentelemetry.context.BraveContextStorageProvider \ No newline at end of file diff --git a/context/src/otelAsGrpcTest/java/io/opentelemetry/context/GrpcContextStorageProvider.java b/context/src/otelAsGrpcTest/java/io/opentelemetry/context/GrpcContextStorageProvider.java new file mode 100644 index 0000000000..ef091bfbc1 --- /dev/null +++ b/context/src/otelAsGrpcTest/java/io/opentelemetry/context/GrpcContextStorageProvider.java @@ -0,0 +1,123 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +import java.util.logging.Level; +import java.util.logging.Logger; + +public class GrpcContextStorageProvider implements ContextStorageProvider { + + private static final Logger log = Logger.getLogger(GrpcContextStorageProvider.class.getName()); + + @Override + public ContextStorage get() { + return GrpcContextStorage.INSTANCE; + } + + private enum GrpcContextStorage implements ContextStorage { + INSTANCE; + + @Override + public Scope attach(Context toAttach) { + if (!(toAttach instanceof GrpcContextWrapper)) { + log.log( + Level.SEVERE, + "Context not created by GrpcContextStorageProvider. This is not " + + "allowed when using GrpcContextStorageProvider. Did you create this context " + + "using Context.current()?"); + return Scope.noop(); + } + + io.grpc.Context grpcContextToAttach = ((GrpcContextWrapper) toAttach).grpcContext; + + io.grpc.Context currentGrpcContext = io.grpc.Context.current(); + + if (grpcContextToAttach == currentGrpcContext) { + return Scope.noop(); + } + + io.grpc.Context toRestore = grpcContextToAttach.attach(); + return () -> grpcContextToAttach.detach(toRestore); + } + + @Override + public Context current() { + return new GrpcContextWrapper(io.grpc.Context.current()); + } + + @Override + public ContextKey contextKey(String name) { + return new GrpcContextKeyWrapper<>(io.grpc.Context.key(name)); + } + } + + private static class GrpcContextWrapper implements Context { + private final io.grpc.Context grpcContext; + + private GrpcContextWrapper(io.grpc.Context grpcContext) { + this.grpcContext = grpcContext; + } + + @Override + public V getValue(ContextKey key) { + return grpcKey(key).get(grpcContext); + } + + @Override + public Context withValues(ContextKey k1, V v1) { + return new GrpcContextWrapper(grpcContext.withValue(grpcKey(k1), v1)); + } + + @Override + public Context withValues(ContextKey k1, V1 v1, ContextKey k2, V2 v2) { + return new GrpcContextWrapper(grpcContext.withValues(grpcKey(k1), v1, grpcKey(k2), v2)); + } + + @Override + public Context withValues( + ContextKey k1, V1 v1, ContextKey k2, V2 v2, ContextKey k3, V3 v3) { + return new GrpcContextWrapper( + grpcContext.withValues(grpcKey(k1), v1, grpcKey(k2), v2, grpcKey(k3), v3)); + } + + @Override + public Context withValues( + ContextKey k1, + V1 v1, + ContextKey k2, + V2 v2, + ContextKey k3, + V3 v3, + ContextKey k4, + V4 v4) { + return new GrpcContextWrapper( + grpcContext.withValues( + grpcKey(k1), v1, grpcKey(k2), v2, grpcKey(k3), v3, grpcKey(k4), v4)); + } + } + + static class GrpcContextKeyWrapper implements ContextKey { + + private final io.grpc.Context.Key grpcContextKey; + + private GrpcContextKeyWrapper(io.grpc.Context.Key grpcContextKey) { + this.grpcContextKey = grpcContextKey; + } + } + + static io.grpc.Context.Key grpcKey(ContextKey key) { + if (key instanceof GrpcContextKeyWrapper) { + return ((GrpcContextKeyWrapper) key).grpcContextKey; + } + log.log( + Level.SEVERE, + "ContextKey not created by GrpcContextStorageProvider, " + + "this is not allowed when using GrpcContextStorageProvider. Did you create this " + + "key using ContextKey.named()?"); + // This ephemereal key is invalid but the best we can fallback to. + return io.grpc.Context.key("invalid-context-key-" + key); + } +} diff --git a/context/src/otelAsGrpcTest/java/io/opentelemetry/context/OtelAsGrpcTest.java b/context/src/otelAsGrpcTest/java/io/opentelemetry/context/OtelAsGrpcTest.java new file mode 100644 index 0000000000..2560cc2936 --- /dev/null +++ b/context/src/otelAsGrpcTest/java/io/opentelemetry/context/OtelAsGrpcTest.java @@ -0,0 +1,121 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class OtelAsGrpcTest { + + private static final ContextKey ANIMAL = ContextKey.named("animal"); + + private static final io.grpc.Context.Key FOOD = io.grpc.Context.key("food"); + private static final io.grpc.Context.Key COUNTRY = io.grpc.Context.key("country"); + + private static ExecutorService otherThread; + + @BeforeAll + static void setUp() { + otherThread = Executors.newSingleThreadExecutor(); + } + + @AfterAll + static void tearDown() { + otherThread.shutdown(); + } + + @Test + void grpcOtelMix() { + io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan"); + assertThat(COUNTRY.get()).isNull(); + io.grpc.Context root = grpcContext.attach(); + try { + assertThat(COUNTRY.get()).isEqualTo("japan"); + try (Scope ignored = Context.current().withValues(ANIMAL, "cat").makeCurrent()) { + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + assertThat(COUNTRY.get()).isEqualTo("japan"); + + io.grpc.Context context2 = io.grpc.Context.current().withValue(FOOD, "cheese"); + assertThat(FOOD.get()).isNull(); + io.grpc.Context toRestore = context2.attach(); + try { + assertThat(FOOD.get()).isEqualTo("cheese"); + assertThat(COUNTRY.get()).isEqualTo("japan"); + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + } finally { + context2.detach(toRestore); + } + } + } finally { + grpcContext.detach(root); + } + } + + @Test + void grpcWrap() throws Exception { + io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan"); + io.grpc.Context root = grpcContext.attach(); + try { + try (Scope ignored = Context.current().withValues(ANIMAL, "cat").makeCurrent()) { + assertThat(COUNTRY.get()).isEqualTo("japan"); + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + + AtomicReference grpcValue = new AtomicReference<>(); + AtomicReference otelValue = new AtomicReference<>(); + Runnable runnable = + () -> { + grpcValue.set(COUNTRY.get()); + otelValue.set(Context.current().getValue(ANIMAL)); + }; + otherThread.submit(runnable).get(); + assertThat(grpcValue).hasValue(null); + assertThat(otelValue).hasValue(null); + + otherThread.submit(io.grpc.Context.current().wrap(runnable)).get(); + assertThat(grpcValue).hasValue("japan"); + assertThat(otelValue).hasValue("cat"); + } + } finally { + grpcContext.detach(root); + } + } + + @Test + void otelWrap() throws Exception { + io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan"); + io.grpc.Context root = grpcContext.attach(); + try { + try (Scope ignored = Context.current().withValues(ANIMAL, "cat").makeCurrent()) { + assertThat(COUNTRY.get()).isEqualTo("japan"); + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + + AtomicReference grpcValue = new AtomicReference<>(); + AtomicReference otelValue = new AtomicReference<>(); + Runnable runnable = + () -> { + grpcValue.set(COUNTRY.get()); + otelValue.set(Context.current().getValue(ANIMAL)); + }; + otherThread.submit(runnable).get(); + assertThat(grpcValue).hasValue(null); + assertThat(otelValue).hasValue(null); + + otherThread.submit(Context.current().wrap(runnable)).get(); + + assertThat(grpcValue).hasValue("japan"); + assertThat(otelValue).hasValue("cat"); + } + } finally { + grpcContext.detach(root); + } + } +} diff --git a/context/src/otelAsGrpcTest/resources/META-INF/services/io.opentelemetry.context.ContextStorageProvider b/context/src/otelAsGrpcTest/resources/META-INF/services/io.opentelemetry.context.ContextStorageProvider new file mode 100644 index 0000000000..7e81d07a38 --- /dev/null +++ b/context/src/otelAsGrpcTest/resources/META-INF/services/io.opentelemetry.context.ContextStorageProvider @@ -0,0 +1 @@ +io.opentelemetry.context.GrpcContextStorageProvider \ No newline at end of file diff --git a/context/src/otelInGrpcTest/java/io/opentelemetry/context/GrpcContextStorageProvider.java b/context/src/otelInGrpcTest/java/io/opentelemetry/context/GrpcContextStorageProvider.java new file mode 100644 index 0000000000..d258432031 --- /dev/null +++ b/context/src/otelInGrpcTest/java/io/opentelemetry/context/GrpcContextStorageProvider.java @@ -0,0 +1,41 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +public class GrpcContextStorageProvider implements ContextStorageProvider { + + @Override + public ContextStorage get() { + return GrpcContextStorage.INSTANCE; + } + + private enum GrpcContextStorage implements ContextStorage { + INSTANCE; + + private static final io.grpc.Context.Key OTEL_CONTEXT = + io.grpc.Context.key("otel-context"); + + @Override + public Scope attach(Context toAttach) { + io.grpc.Context grpcContext = io.grpc.Context.current(); + Context current = OTEL_CONTEXT.get(grpcContext); + + if (current == toAttach) { + return Scope.noop(); + } + + io.grpc.Context newGrpcContext = grpcContext.withValue(OTEL_CONTEXT, toAttach); + io.grpc.Context toRestore = newGrpcContext.attach(); + + return () -> newGrpcContext.detach(toRestore); + } + + @Override + public Context current() { + return OTEL_CONTEXT.get(); + } + } +} diff --git a/context/src/otelInGrpcTest/java/io/opentelemetry/context/OtelInGrpcTest.java b/context/src/otelInGrpcTest/java/io/opentelemetry/context/OtelInGrpcTest.java new file mode 100644 index 0000000000..18773a9307 --- /dev/null +++ b/context/src/otelInGrpcTest/java/io/opentelemetry/context/OtelInGrpcTest.java @@ -0,0 +1,123 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class OtelInGrpcTest { + + private static final ContextKey ANIMAL = ContextKey.named("animal"); + + private static final io.grpc.Context.Key FOOD = io.grpc.Context.key("food"); + private static final io.grpc.Context.Key COUNTRY = io.grpc.Context.key("country"); + + private static ExecutorService otherThread; + + @BeforeAll + static void setUp() { + otherThread = Executors.newSingleThreadExecutor(); + } + + @AfterAll + static void tearDown() { + otherThread.shutdown(); + } + + @Test + void grpcOtelMix() { + io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan"); + assertThat(COUNTRY.get()).isNull(); + io.grpc.Context root = grpcContext.attach(); + try { + assertThat(COUNTRY.get()).isEqualTo("japan"); + try (Scope ignored = Context.current().withValues(ANIMAL, "cat").makeCurrent()) { + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + assertThat(COUNTRY.get()).isEqualTo("japan"); + + io.grpc.Context context2 = io.grpc.Context.current().withValue(FOOD, "cheese"); + assertThat(FOOD.get()).isNull(); + io.grpc.Context toRestore = context2.attach(); + try { + assertThat(FOOD.get()).isEqualTo("cheese"); + assertThat(COUNTRY.get()).isEqualTo("japan"); + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + } finally { + context2.detach(toRestore); + } + } + } finally { + grpcContext.detach(root); + } + } + + @Test + void grpcWrap() throws Exception { + io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan"); + io.grpc.Context root = grpcContext.attach(); + try { + try (Scope ignored = Context.current().withValues(ANIMAL, "cat").makeCurrent()) { + assertThat(COUNTRY.get()).isEqualTo("japan"); + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + + AtomicReference grpcValue = new AtomicReference<>(); + AtomicReference otelValue = new AtomicReference<>(); + Runnable runnable = + () -> { + grpcValue.set(COUNTRY.get()); + otelValue.set(Context.current().getValue(ANIMAL)); + }; + otherThread.submit(runnable).get(); + assertThat(grpcValue).hasValue(null); + assertThat(otelValue).hasValue(null); + + otherThread.submit(io.grpc.Context.current().wrap(runnable)).get(); + assertThat(grpcValue).hasValue("japan"); + assertThat(otelValue).hasValue("cat"); + } + } finally { + grpcContext.detach(root); + } + } + + @Test + void otelWrap() throws Exception { + io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan"); + io.grpc.Context root = grpcContext.attach(); + try { + try (Scope ignored = Context.current().withValues(ANIMAL, "cat").makeCurrent()) { + assertThat(COUNTRY.get()).isEqualTo("japan"); + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + + AtomicReference grpcValue = new AtomicReference<>(); + AtomicReference otelValue = new AtomicReference<>(); + Runnable runnable = + () -> { + grpcValue.set(COUNTRY.get()); + otelValue.set(Context.current().getValue(ANIMAL)); + }; + otherThread.submit(runnable).get(); + assertThat(grpcValue).hasValue(null); + assertThat(otelValue).hasValue(null); + + otherThread.submit(Context.current().wrap(runnable)).get(); + + // Since OTel context is inside the gRPC context, propagating OTel context does not + // propagate the gRPC context. + assertThat(grpcValue).hasValue(null); + assertThat(otelValue).hasValue("cat"); + } + } finally { + grpcContext.detach(root); + } + } +} diff --git a/context/src/otelInGrpcTest/resources/META-INF/services/io.opentelemetry.context.ContextStorageProvider b/context/src/otelInGrpcTest/resources/META-INF/services/io.opentelemetry.context.ContextStorageProvider new file mode 100644 index 0000000000..7e81d07a38 --- /dev/null +++ b/context/src/otelInGrpcTest/resources/META-INF/services/io.opentelemetry.context.ContextStorageProvider @@ -0,0 +1 @@ +io.opentelemetry.context.GrpcContextStorageProvider \ No newline at end of file diff --git a/context/src/test/java/io/opentelemetry/context/ContextTest.java b/context/src/test/java/io/opentelemetry/context/ContextTest.java new file mode 100644 index 0000000000..0ff0ed6502 --- /dev/null +++ b/context/src/test/java/io/opentelemetry/context/ContextTest.java @@ -0,0 +1,433 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.context; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Handler; +import java.util.logging.Level; +import java.util.logging.LogRecord; +import java.util.logging.Logger; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; + +@SuppressWarnings("ClassCanBeStatic") +class ContextTest { + + private static final ContextKey ANIMAL = ContextKey.named("animal"); + private static final ContextKey BAG = ContextKey.named("bag"); + private static final ContextKey FOOD = ContextKey.named("food"); + private static final ContextKey COOKIES = ContextKey.named("cookies"); + + private static final Context CAT = Context.current().withValues(ANIMAL, "cat"); + + // Make sure all tests clean up + @AfterEach + void tearDown() { + assertThat(Context.current()).isEqualTo(Context.root()); + } + + @Test + void startsWithRoot() { + assertThat(Context.current()).isEqualTo(Context.root()); + } + + @Test + void canBeAttached() { + Context context = Context.current().withValues(ANIMAL, "cat"); + assertThat(Context.current().getValue(ANIMAL)).isNull(); + try (Scope ignored = context.makeCurrent()) { + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + + try (Scope ignored2 = Context.root().makeCurrent()) { + assertThat(Context.current().getValue(ANIMAL)).isNull(); + } + + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + } + assertThat(Context.current().getValue(ANIMAL)).isNull(); + } + + @Test + void attachSameTwice() { + Context context = Context.current().withValues(ANIMAL, "cat"); + assertThat(Context.current().getValue(ANIMAL)).isNull(); + try (Scope ignored = context.makeCurrent()) { + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + + try (Scope ignored2 = context.makeCurrent()) { + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + } + + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + } + assertThat(Context.current().getValue(ANIMAL)).isNull(); + } + + @Test + void newThreadStartsWithRoot() throws Exception { + Context context = Context.current().withValues(ANIMAL, "cat"); + try (Scope ignored = context.makeCurrent()) { + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat"); + AtomicReference current = new AtomicReference<>(); + Thread thread = new Thread(() -> current.set(Context.current())); + thread.start(); + thread.join(); + assertThat(current.get()).isEqualTo(Context.root()); + } + } + + @Test + public void closingScopeWhenNotActiveIsLogged() { + final AtomicReference logRef = new AtomicReference<>(); + Handler handler = + new Handler() { + @Override + public void publish(LogRecord record) { + logRef.set(record); + } + + @Override + public void flush() {} + + @Override + public void close() {} + }; + Logger logger = Logger.getLogger(ThreadLocalContextStorage.class.getName()); + Level level = logger.getLevel(); + logger.setLevel(Level.ALL); + try { + logger.addHandler(handler); + Context initial = Context.current(); + Context context = initial.withValues(ANIMAL, "cat"); + try (Scope scope = context.makeCurrent()) { + Context context2 = context.withValues(ANIMAL, "dog"); + try (Scope ignored = context2.makeCurrent()) { + assertThat(Context.current().getValue(ANIMAL)).isEqualTo("dog"); + scope.close(); + } + } + assertThat(Context.current()).isEqualTo(initial); + assertThat(logRef.get()).isNotNull(); + assertThat(logRef.get().getMessage()).contains("Context in storage not the expected context"); + } finally { + logger.removeHandler(handler); + logger.setLevel(level); + } + } + + @Test + void withValues() { + Context context1 = Context.current().withValues(ANIMAL, "cat"); + assertThat(context1.getValue(ANIMAL)).isEqualTo("cat"); + + Context context2 = context1.withValues(BAG, 100); + // Old unaffected + assertThat(context1.getValue(ANIMAL)).isEqualTo("cat"); + assertThat(context1.getValue(BAG)).isNull(); + + assertThat(context2.getValue(ANIMAL)).isEqualTo("cat"); + assertThat(context2.getValue(BAG)).isEqualTo(100); + + Context context3 = context2.withValues(ANIMAL, "dog"); + // Old unaffected + assertThat(context2.getValue(ANIMAL)).isEqualTo("cat"); + assertThat(context2.getValue(BAG)).isEqualTo(100); + + assertThat(context3.getValue(ANIMAL)).isEqualTo("dog"); + assertThat(context3.getValue(BAG)).isEqualTo(100); + + Context context4 = context3.withValues(BAG, null); + // Old unaffected + assertThat(context3.getValue(ANIMAL)).isEqualTo("dog"); + assertThat(context3.getValue(BAG)).isEqualTo(100); + + assertThat(context4.getValue(ANIMAL)).isEqualTo("dog"); + assertThat(context4.getValue(BAG)).isNull(); + } + + @Test + void withTwoValues() { + Context context = Context.current().withValues(ANIMAL, "cat", FOOD, "hot dog"); + assertThat(context.getValue(ANIMAL)).isEqualTo("cat"); + assertThat(context.getValue(FOOD)).isEqualTo("hot dog"); + } + + @Test + void withThreeValues() { + Context context = Context.current().withValues(ANIMAL, "cat", FOOD, "hot dog", COOKIES, 100); + assertThat(context.getValue(ANIMAL)).isEqualTo("cat"); + assertThat(context.getValue(FOOD)).isEqualTo("hot dog"); + assertThat(context.getValue(COOKIES)).isEqualTo(100); + } + + @Test + void withFourValues() { + Context context = + Context.current().withValues(ANIMAL, "cat", FOOD, "hot dog", COOKIES, 100, BAG, "prada"); + assertThat(context.getValue(ANIMAL)).isEqualTo("cat"); + assertThat(context.getValue(FOOD)).isEqualTo("hot dog"); + assertThat(context.getValue(COOKIES)).isEqualTo(100); + assertThat(context.getValue(BAG)).isEqualTo("prada"); + } + + @Test + void wrapRunnable() { + AtomicReference value = new AtomicReference<>(); + Runnable callback = () -> value.set(Context.current().getValue(ANIMAL)); + + callback.run(); + assertThat(value).hasValue(null); + + CAT.wrap(callback).run(); + assertThat(value).hasValue("cat"); + + callback.run(); + assertThat(value).hasValue(null); + } + + @Test + void wrapCallable() throws Exception { + AtomicReference value = new AtomicReference<>(); + Callable callback = + () -> { + value.set(Context.current().getValue(ANIMAL)); + return "foo"; + }; + + assertThat(callback.call()).isEqualTo("foo"); + assertThat(value).hasValue(null); + + assertThat(CAT.wrap(callback).call()).isEqualTo("foo"); + assertThat(value).hasValue("cat"); + + assertThat(callback.call()).isEqualTo("foo"); + assertThat(value).hasValue(null); + } + + @Test + void wrapExecutor() { + AtomicReference value = new AtomicReference<>(); + Executor executor = MoreExecutors.directExecutor(); + Runnable callback = () -> value.set(Context.current().getValue(ANIMAL)); + + executor.execute(callback); + assertThat(value).hasValue(null); + + CAT.wrap(executor).execute(callback); + assertThat(value).hasValue("cat"); + + executor.execute(callback); + assertThat(value).hasValue(null); + } + + @Nested + @TestInstance(Lifecycle.PER_CLASS) + class WrapExecutorService { + + protected ScheduledExecutorService executor; + protected ExecutorService wrapped; + protected AtomicReference value; + + @BeforeAll + void initExecutor() { + executor = Executors.newSingleThreadScheduledExecutor(); + wrapped = CAT.wrap((ExecutorService) executor); + } + + @AfterAll + void stopExecutor() { + executor.shutdown(); + } + + @BeforeEach + void setUp() { + value = new AtomicReference<>(); + } + + @Test + void execute() { + Runnable runnable = () -> value.set(Context.current().getValue(ANIMAL)); + wrapped.execute(runnable); + await().untilAsserted(() -> assertThat(value).hasValue("cat")); + } + + @Test + void submitRunnable() { + Runnable runnable = () -> value.set(Context.current().getValue(ANIMAL)); + Futures.getUnchecked(wrapped.submit(runnable)); + assertThat(value).hasValue("cat"); + } + + @Test + void submitRunnableResult() { + Runnable runnable = () -> value.set(Context.current().getValue(ANIMAL)); + assertThat(Futures.getUnchecked(wrapped.submit(runnable, "foo"))).isEqualTo("foo"); + assertThat(value).hasValue("cat"); + } + + @Test + void submitCallable() { + Callable callable = + () -> { + value.set(Context.current().getValue(ANIMAL)); + return "foo"; + }; + assertThat(Futures.getUnchecked(wrapped.submit(callable))).isEqualTo("foo"); + assertThat(value).hasValue("cat"); + } + + @Test + void invokeAll() throws Exception { + AtomicReference value1 = new AtomicReference<>(); + AtomicReference value2 = new AtomicReference<>(); + Callable callable1 = + () -> { + value1.set(Context.current().getValue(ANIMAL)); + return "foo"; + }; + Callable callable2 = + () -> { + value2.set(Context.current().getValue(ANIMAL)); + return "bar"; + }; + List> futures = wrapped.invokeAll(Arrays.asList(callable1, callable2)); + assertThat(futures.get(0).get()).isEqualTo("foo"); + assertThat(futures.get(1).get()).isEqualTo("bar"); + assertThat(value1).hasValue("cat"); + assertThat(value2).hasValue("cat"); + } + + @Test + void invokeAllTimeout() throws Exception { + AtomicReference value1 = new AtomicReference<>(); + AtomicReference value2 = new AtomicReference<>(); + Callable callable1 = + () -> { + value1.set(Context.current().getValue(ANIMAL)); + return "foo"; + }; + Callable callable2 = + () -> { + value2.set(Context.current().getValue(ANIMAL)); + return "bar"; + }; + List> futures = + wrapped.invokeAll(Arrays.asList(callable1, callable2), 1, TimeUnit.SECONDS); + assertThat(futures.get(0).get()).isEqualTo("foo"); + assertThat(futures.get(1).get()).isEqualTo("bar"); + assertThat(value1).hasValue("cat"); + assertThat(value2).hasValue("cat"); + } + + @Test + void invokeAny() throws Exception { + AtomicReference value1 = new AtomicReference<>(); + AtomicReference value2 = new AtomicReference<>(); + Callable callable1 = + () -> { + value1.set(Context.current().getValue(ANIMAL)); + throw new IllegalStateException("callable2 wins"); + }; + Callable callable2 = + () -> { + value2.set(Context.current().getValue(ANIMAL)); + return "bar"; + }; + assertThat(wrapped.invokeAny(Arrays.asList(callable1, callable2))).isEqualTo("bar"); + assertThat(value1).hasValue("cat"); + assertThat(value2).hasValue("cat"); + } + + @Test + void invokeAnyTimeout() throws Exception { + AtomicReference value1 = new AtomicReference<>(); + AtomicReference value2 = new AtomicReference<>(); + Callable callable1 = + () -> { + value1.set(Context.current().getValue(ANIMAL)); + throw new IllegalStateException("callable2 wins"); + }; + Callable callable2 = + () -> { + value2.set(Context.current().getValue(ANIMAL)); + return "bar"; + }; + assertThat(wrapped.invokeAny(Arrays.asList(callable1, callable2), 1, TimeUnit.SECONDS)) + .isEqualTo("bar"); + assertThat(value1).hasValue("cat"); + assertThat(value2).hasValue("cat"); + } + } + + @Nested + @TestInstance(Lifecycle.PER_CLASS) + class WrapScheduledExecutorService extends WrapExecutorService { + + private ScheduledExecutorService wrapScheduled; + + @BeforeEach + void wrapScheduled() { + wrapScheduled = CAT.wrap(executor); + } + + @Test + void scheduleRunnable() throws Exception { + Runnable runnable = () -> value.set(Context.current().getValue(ANIMAL)); + wrapScheduled.schedule(runnable, 0, TimeUnit.SECONDS).get(); + assertThat(value).hasValue("cat"); + } + + @Test + void scheduleCallable() throws Exception { + Callable callable = + () -> { + value.set(Context.current().getValue(ANIMAL)); + return "foo"; + }; + assertThat(wrapScheduled.schedule(callable, 0, TimeUnit.SECONDS).get()).isEqualTo("foo"); + assertThat(value).hasValue("cat"); + } + + @Test + void scheduleAtFixedRate() { + Runnable runnable = () -> value.set(Context.current().getValue(ANIMAL)); + ScheduledFuture future = + wrapScheduled.scheduleAtFixedRate(runnable, 0, 10, TimeUnit.SECONDS); + await().untilAsserted(() -> assertThat(value).hasValue("cat")); + future.cancel(true); + } + + @Test + void scheduleWithFixedDelay() { + Runnable runnable = () -> value.set(Context.current().getValue(ANIMAL)); + ScheduledFuture future = + wrapScheduled.scheduleWithFixedDelay(runnable, 0, 10, TimeUnit.SECONDS); + await().untilAsserted(() -> assertThat(value).hasValue("cat")); + future.cancel(true); + } + } +} diff --git a/settings.gradle b/settings.gradle index b231e9732e..0ed8084d2d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -8,6 +8,7 @@ pluginManagement { id "io.morethan.jmhreport" version "0.9.0" id "me.champeau.gradle.jmh" version "0.5.0" id "net.ltgt.errorprone" version "1.2.0" + id "org.unbroken-dome.test-sets" version "3.0.1" id "ru.vyarus.animalsniffer" version "1.5.1" } @@ -20,6 +21,7 @@ pluginManagement { rootProject.name = "opentelemetry-java" include ":opentelemetry-all", ":opentelemetry-api", + ":opentelemetry-context", ":opentelemetry-context-prop", ":opentelemetry-extension-auto-annotations", ":opentelemetry-extension-runtime-metrics",