Add an OpenTelemetry context mechanism. (#1658)

* Add an OpenTelemetry context mechanism.

* Moar

* Extracted interfaces

* More context

* Cleanup / tests

* Move and cleanups

* Brackets

* Add example for brave context interop

* Brave in OTel

* Spotless

* Missing folder

* Another

* Spotless
This commit is contained in:
Anuraag Agrawal 2020-10-05 17:03:11 +09:00 committed by GitHub
parent ed169645f7
commit 21fbb36d81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 2574 additions and 13 deletions

View File

@ -354,9 +354,19 @@ configure(opentelemetryProjects) {
}
}
test {
tasks.withType(Test) {
systemProperties project.properties.subMap(["enable.docker.tests"])
useJUnitPlatform()
// At a test failure, log the stack trace to the console so that we don't
// have to open the HTML in a browser.
testLogging {
exceptionFormat = 'full'
showExceptions = true
showCauses = true
showStackTraces = true
}
maxHeapSize = '1500m'
}
javadoc.options {
@ -381,18 +391,6 @@ configure(opentelemetryProjects) {
sign configurations.archives
}
// At a test failure, log the stack trace to the console so that we don't
// have to open the HTML in a browser.
test {
testLogging {
exceptionFormat = 'full'
showExceptions = true
showCauses = true
showStackTraces = true
}
maxHeapSize = '1500m'
}
plugins.withId("ru.vyarus.animalsniffer") {
animalsnifferTest {
enabled = false

32
context/build.gradle Normal file
View File

@ -0,0 +1,32 @@
plugins {
id "java"
id "org.unbroken-dome.test-sets"
id "ru.vyarus.animalsniffer"
}
description = 'OpenTelemetry Context (Incubator)'
ext.moduleName = "io.opentelemetry.context"
testSets {
grpcInOtelTest
otelInGrpcTest
otelAsGrpcTest
braveInOtelTest
otelAsBraveTest
}
dependencies {
grpcInOtelTestImplementation libraries.grpc_context
otelAsGrpcTestImplementation libraries.grpc_context
otelInGrpcTestImplementation libraries.grpc_context
braveInOtelTestImplementation "io.zipkin.brave:brave:5.12.6"
otelAsBraveTestImplementation "io.zipkin.brave:brave:5.12.6"
testImplementation libraries.awaitility
signature "org.codehaus.mojo.signature:java18:1.0@signature"
signature "net.sf.androidscents.signature:android-api-level-24:7.0_r2@signature"
}

View File

@ -0,0 +1,125 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import static org.assertj.core.api.Assertions.assertThat;
import brave.Tracing;
import brave.propagation.CurrentTraceContext;
import brave.propagation.TraceContext;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
class BraveInOtelTest {
private static final ContextKey<String> ANIMAL = ContextKey.named("animal");
private static final Tracing TRACING =
Tracing.newBuilder().currentTraceContext(new OpenTelemetryCurrentTraceContext()).build();
private static final TraceContext TRACE_CONTEXT =
TraceContext.newBuilder().traceId(1).spanId(1).addExtra("japan").build();
private static ExecutorService otherThread;
@BeforeAll
static void setUp() {
otherThread = Executors.newSingleThreadExecutor();
}
@AfterAll
static void tearDown() {
otherThread.shutdown();
}
@Test
void braveOtelMix() {
try (CurrentTraceContext.Scope ignored =
TRACING.currentTraceContext().newScope(TRACE_CONTEXT)) {
assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan");
try (Scope ignored2 = Context.current().withValues(ANIMAL, "cat").makeCurrent()) {
assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan");
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
TraceContext context2 =
Tracing.current().currentTraceContext().get().toBuilder().addExtra("cheese").build();
try (CurrentTraceContext.Scope ignored3 =
TRACING.currentTraceContext().newScope(context2)) {
assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan");
assertThat(Tracing.current().currentTraceContext().get().extra()).contains("cheese");
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
}
}
}
}
@Test
void braveWrap() throws Exception {
try (CurrentTraceContext.Scope ignored =
TRACING.currentTraceContext().newScope(TRACE_CONTEXT)) {
try (Scope ignored2 = Context.current().withValues(ANIMAL, "cat").makeCurrent()) {
assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan");
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
AtomicReference<Boolean> braveContainsJapan = new AtomicReference<>();
AtomicReference<String> otelValue = new AtomicReference<>();
Runnable runnable =
() -> {
TraceContext traceContext = Tracing.current().currentTraceContext().get();
if (traceContext != null && traceContext.extra().contains("japan")) {
braveContainsJapan.set(true);
} else {
braveContainsJapan.set(false);
}
otelValue.set(Context.current().getValue(ANIMAL));
};
otherThread.submit(runnable).get();
assertThat(braveContainsJapan).hasValue(false);
assertThat(otelValue).hasValue(null);
otherThread.submit(TRACING.currentTraceContext().wrap(runnable)).get();
assertThat(braveContainsJapan).hasValue(true);
// Since Brave context is inside the OTel context, propagating the Brave context does not
// propagate the OTel context.
assertThat(otelValue).hasValue(null);
}
}
}
@Test
void otelWrap() throws Exception {
try (CurrentTraceContext.Scope ignored =
TRACING.currentTraceContext().newScope(TRACE_CONTEXT)) {
try (Scope ignored2 = Context.current().withValues(ANIMAL, "cat").makeCurrent()) {
assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan");
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
AtomicReference<Boolean> braveContainsJapan = new AtomicReference<>(false);
AtomicReference<String> otelValue = new AtomicReference<>();
Runnable runnable =
() -> {
TraceContext traceContext = Tracing.current().currentTraceContext().get();
if (traceContext != null && traceContext.extra().contains("japan")) {
braveContainsJapan.set(true);
} else {
braveContainsJapan.set(false);
}
otelValue.set(Context.current().getValue(ANIMAL));
};
otherThread.submit(runnable).get();
assertThat(braveContainsJapan).hasValue(false);
assertThat(otelValue).hasValue(null);
otherThread.submit(Context.current().wrap(runnable)).get();
assertThat(braveContainsJapan).hasValue(true);
assertThat(otelValue).hasValue("cat");
}
}
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import brave.propagation.CurrentTraceContext;
import brave.propagation.TraceContext;
public class OpenTelemetryCurrentTraceContext extends CurrentTraceContext {
private static final ContextKey<TraceContext> TRACE_CONTEXT_KEY =
ContextKey.named("brave-tracecontext");
@Override
public TraceContext get() {
return Context.current().getValue(TRACE_CONTEXT_KEY);
}
@SuppressWarnings("ReferenceEquality")
@Override
public Scope newScope(TraceContext context) {
Context currentOtel = Context.current();
TraceContext currentBrave = currentOtel.getValue(TRACE_CONTEXT_KEY);
if (currentBrave == context) {
return Scope.NOOP;
}
Context newOtel = currentOtel.withValues(TRACE_CONTEXT_KEY, context);
io.opentelemetry.context.Scope otelScope = newOtel.makeCurrent();
return otelScope::close;
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.grpc.override;
import io.grpc.Context;
import io.opentelemetry.context.ContextKey;
import io.opentelemetry.context.Scope;
import java.util.logging.Level;
import java.util.logging.Logger;
// This exact package / class name indicates to gRPC to use this override.
public class ContextStorageOverride extends Context.Storage {
private static final Logger log = Logger.getLogger(ContextStorageOverride.class.getName());
private static final ContextKey<Context> GRPC_CONTEXT = ContextKey.named("grpc-context");
private static final Context.Key<Scope> OTEL_SCOPE = Context.key("otel-scope");
@Override
public Context doAttach(Context toAttach) {
io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context.current();
Context current = otelContext.getValue(GRPC_CONTEXT);
if (current == toAttach) {
return toAttach;
}
if (current == null) {
current = Context.ROOT;
}
io.opentelemetry.context.Context newOtelContext =
otelContext.withValues(GRPC_CONTEXT, toAttach);
Scope scope = newOtelContext.makeCurrent();
return current.withValue(OTEL_SCOPE, scope);
}
@Override
public void detach(Context toDetach, Context toRestore) {
if (current() != toDetach) {
// Log a severe message instead of throwing an exception as the context to attach is assumed
// to be the correct one and the unbalanced state represents a coding mistake in a lower
// layer in the stack that cannot be recovered from here.
log.log(
Level.SEVERE,
"Context was not attached when detaching",
new Throwable().fillInStackTrace());
}
Scope otelScope = OTEL_SCOPE.get(toRestore);
otelScope.close();
}
@Override
public Context current() {
return io.opentelemetry.context.Context.current().getValue(GRPC_CONTEXT);
}
}

View File

@ -0,0 +1,123 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
class GrpcInOtelTest {
private static final ContextKey<String> ANIMAL = ContextKey.named("animal");
private static final io.grpc.Context.Key<String> FOOD = io.grpc.Context.key("food");
private static final io.grpc.Context.Key<String> COUNTRY = io.grpc.Context.key("country");
private static ExecutorService otherThread;
@BeforeAll
static void setUp() {
otherThread = Executors.newSingleThreadExecutor();
}
@AfterAll
static void tearDown() {
otherThread.shutdown();
}
@Test
void grpcOtelMix() {
io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan");
assertThat(COUNTRY.get()).isNull();
io.grpc.Context root = grpcContext.attach();
try {
assertThat(COUNTRY.get()).isEqualTo("japan");
try (Scope ignored = Context.current().withValues(ANIMAL, "cat").makeCurrent()) {
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
assertThat(COUNTRY.get()).isEqualTo("japan");
io.grpc.Context context2 = io.grpc.Context.current().withValue(FOOD, "cheese");
assertThat(FOOD.get()).isNull();
io.grpc.Context toRestore = context2.attach();
try {
assertThat(FOOD.get()).isEqualTo("cheese");
assertThat(COUNTRY.get()).isEqualTo("japan");
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
} finally {
context2.detach(toRestore);
}
}
} finally {
grpcContext.detach(root);
}
}
@Test
void grpcWrap() throws Exception {
io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan");
io.grpc.Context root = grpcContext.attach();
try {
try (Scope ignored = Context.current().withValues(ANIMAL, "cat").makeCurrent()) {
assertThat(COUNTRY.get()).isEqualTo("japan");
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
AtomicReference<String> grpcValue = new AtomicReference<>();
AtomicReference<String> otelValue = new AtomicReference<>();
Runnable runnable =
() -> {
grpcValue.set(COUNTRY.get());
otelValue.set(Context.current().getValue(ANIMAL));
};
otherThread.submit(runnable).get();
assertThat(grpcValue).hasValue(null);
assertThat(otelValue).hasValue(null);
otherThread.submit(io.grpc.Context.current().wrap(runnable)).get();
assertThat(grpcValue).hasValue("japan");
// Since gRPC context is inside the OTel context, propagating gRPC context does not
// propagate the OTel context.
assertThat(otelValue).hasValue(null);
}
} finally {
grpcContext.detach(root);
}
}
@Test
void otelWrap() throws Exception {
io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan");
io.grpc.Context root = grpcContext.attach();
try {
try (Scope ignored = Context.current().withValues(ANIMAL, "cat").makeCurrent()) {
assertThat(COUNTRY.get()).isEqualTo("japan");
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
AtomicReference<String> grpcValue = new AtomicReference<>();
AtomicReference<String> otelValue = new AtomicReference<>();
Runnable runnable =
() -> {
grpcValue.set(COUNTRY.get());
otelValue.set(Context.current().getValue(ANIMAL));
};
otherThread.submit(runnable).get();
assertThat(grpcValue).hasValue(null);
assertThat(otelValue).hasValue(null);
otherThread.submit(Context.current().wrap(runnable)).get();
assertThat(grpcValue).hasValue("japan");
assertThat(otelValue).hasValue("cat");
}
} finally {
grpcContext.detach(root);
}
}
}

View File

@ -0,0 +1,209 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
/**
* A context propagation mechanism which can carry scoped-values across API boundaries and between
* threads.
*
* <p>A Context object can be {@linkplain #makeCurrent set} to the {@link ContextStorage}, which
* effectively forms a <b>scope</b> for the context. The scope is bound to the current thread.
* Within a scope, its Context is accessible even across API boundaries, through {@link #current}.
* The scope is later exited by {@link Scope#close()} closing} the scope.
*
* <p>Context objects are immutable and inherit state from their parent. To add or overwrite the
* current state a new context object must be created and then attached, replacing the previously
* bound context. For example:
*
* <pre>{@code
* Context withCredential = Context.current().withValues(CRED_KEY, cred);
* withCredential.wrap(new Runnable() {
* public void run() {
* readUserRecords(userId, CRED_KEY.get());
* }
* }).run();
* }</pre>
*
* <p>Notes and cautions on use:
*
* <ul>
* <li>Every {@link #makeCurrent()} must be followed by a {@link Scope#close()}. Breaking these
* rules may lead to memory leaks and incorrect scoping.
* <li>While Context objects are immutable they do not place such a restriction on the state they
* store.
* <li>Context is not intended for passing optional parameters to an API and developers should
* take care to avoid excessive dependence on context when designing an API.
* <li>Attaching Context from a different ancestor will cause information in the current Context
* to be lost. This should generally be avoided.
* </ul>
*/
public interface Context {
/** Return the context associated with the current {@link Scope}. */
static Context current() {
Context current = ContextStorage.get().current();
return current != null ? current : root();
}
/**
* Returns the root {@link Context} which all other {@link Context} are derived from.
*
* <p>It should generally not be required to use the root {@link Context} directly - instead, use
* {@link Context#current()} to operate on the current {@link Context}. Only use this method if
* you are absolutely sure you need to disregard the current {@link Context} - this almost always
* is only a workaround hiding an underlying context propagation issue.
*/
static Context root() {
return DefaultContext.ROOT;
}
/**
* Returns the value stored in this {@link Context} for the given {@link ContextKey}, or {@code
* null} if there is no value for the key in this context.
*/
@Nullable
<V> V getValue(ContextKey<V> key);
/**
* Returns a new context with the given key value set.
*
* <pre>{@code
* Context withCredential = Context.current().withValues(CRED_KEY, cred);
* withCredential.wrap(new Runnable() {
* public void run() {
* readUserRecords(userId, CRED_KEY.get());
* }
* }).run();
* }</pre>
*
* <p>Note that multiple calls to {@code withValue} can be chained together. That is,
*
* <pre>{@code
* context.withValues(K1, V1, K2, V2);
* // is the same as
* context.withValue(K1, V1).withValue(K2, V2);
* }</pre>
*
* <p>Nonetheless, {@link Context} should not be treated like a general purpose map with a large
* number of keys and values combine multiple related items together into a single key instead
* of separating them. But if the items are unrelated, have separate keys for them.
*/
<V> Context withValues(ContextKey<V> k1, V v1);
/** Returns a new context with the given key value set. */
default <V1, V2> Context withValues(ContextKey<V1> k1, V1 v1, ContextKey<V2> k2, V2 v2) {
return withValues(k1, v1).withValues(k2, v2);
}
/** Returns a new context with the given key value set. */
default <V1, V2, V3> Context withValues(
ContextKey<V1> k1, V1 v1, ContextKey<V2> k2, V2 v2, ContextKey<V3> k3, V3 v3) {
return withValues(k1, v1, k2, v2).withValues(k3, v3);
}
/**
* Create a new context with the given key value set.
*
* <p>For more than 4 key-value pairs, note that multiple calls to {@link #withValues} can be
* chained together. That is,
*
* <pre>
* context.withValues(K1, V1, K2, V2);
* // is the same as
* context.withValue(K1, V1).withValue(K2, V2);
* </pre>
*
* <p>Nonetheless, {@link Context} should not be treated like a general purpose map with a large
* number of keys and values combine multiple related items together into a single key instead
* of separating them. But if the items are unrelated, have separate keys for them.
*/
default <V1, V2, V3, V4> Context withValues(
ContextKey<V1> k1,
V1 v1,
ContextKey<V2> k2,
V2 v2,
ContextKey<V3> k3,
V3 v3,
ContextKey<V4> k4,
V4 v4) {
return withValues(k1, v1, k2, v2, k3, v3).withValues(k4, v4);
}
/**
* Makes this the {@linkplain Context#current() current context} and returns a {@link Scope} which
* corresponds to the scope of execution this context is current for. {@link Context#current()}
* will return this {@link Context} until {@link Scope#close()} is called. {@link Scope#close()}
* must be called to properly restore the previous context from before this scope of execution or
* context will not work correctly. It is recommended to use try-with-resources to call {@link
* Scope#close()} automatically.
*
* <pre>{@code
* Context prevCtx = Context.current();
* try (Scope ignored = ctx.attach()) {
* assert Context.current() == ctx;
* ...
* }
* assert Context.current() == prevCtx;
* }</pre>
*/
default Scope makeCurrent() {
return ContextStorage.get().attach(this);
}
/**
* Returns a {@link Runnable} that makes this the {@linkplain Context#current current context} and
* then invokes the input {@link Runnable}.
*/
default Runnable wrap(Runnable runnable) {
return () -> {
try (Scope ignored = makeCurrent()) {
runnable.run();
}
};
}
/**
* Returns a {@link Runnable} that makes this the {@linkplain Context#current current context} and
* then invokes the input {@link Runnable}.
*/
default <T> Callable<T> wrap(Callable<T> callable) {
return () -> {
try (Scope ignored = makeCurrent()) {
return callable.call();
}
};
}
/**
* Returns an {@link Executor} that will execute callbacks in the given {@code executor}, making
* this the {@linkplain Context#current current context} before each execution.
*/
default Executor wrap(Executor executor) {
return command -> executor.execute(wrap(command));
}
/**
* Returns an {@link ExecutorService} that will execute callbacks in the given {@code executor},
* making this the {@linkplain Context#current current context} before each execution.
*/
default ExecutorService wrap(ExecutorService executor) {
return new ContextExecutorService(this, executor);
}
/**
* Returns an {@link ScheduledExecutorService} that will execute callbacks in the given {@code
* executor}, making this the {@linkplain Context#current current context} before each execution.
*/
default ScheduledExecutorService wrap(ScheduledExecutorService executor) {
return new ContextScheduledExecutorService(this, executor);
}
}

View File

@ -0,0 +1,113 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
class ContextExecutorService implements ExecutorService {
private final Context context;
private final ExecutorService delegate;
ContextExecutorService(Context context, ExecutorService delegate) {
this.context = context;
this.delegate = delegate;
}
final Context context() {
return context;
}
ExecutorService delegate() {
return delegate;
}
@Override
public void shutdown() {
delegate.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}
@Override
public boolean isShutdown() {
return delegate.isShutdown();
}
@Override
public boolean isTerminated() {
return delegate.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(context.wrap(task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(context.wrap(task), result);
}
@Override
public Future<?> submit(Runnable task) {
return delegate.submit(context.wrap(task));
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return delegate.invokeAll(wrap(tasks));
}
@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return delegate.invokeAll(wrap(tasks), timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return delegate.invokeAny(wrap(tasks));
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(wrap(tasks), timeout, unit);
}
@Override
public void execute(Runnable command) {
delegate.execute(context.wrap(command));
}
private <T> Collection<? extends Callable<T>> wrap(Collection<? extends Callable<T>> tasks) {
List<Callable<T>> wrapped = new ArrayList<>();
for (Callable<T> task : tasks) {
wrapped.add(context.wrap(task));
}
return wrapped;
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
/**
* Key for indexing values of type {@link T} stored in a {@link Context}. {@link ContextKey} are
* compared by reference, so it is expected that only one {@link ContextKey} is created for a
* particular type of context value.
*
* <pre>{@code
* public class ContextUser {
*
* private static final ContextKey<MyState> KEY = ContextKey.named("MyState");
*
* public Context startWork() {
* return Context.withValues(KEY, new MyState());
* }
*
* public void continueWork(Context context) {
* MyState state = context.getValue(KEY);
* // Keys are compared by reference only.
* assert state != Context.current().getValue(ContextKey.named("MyState"));
* ...
* }
* }
*
* }</pre>
*/
// ErrorProne false positive, this is used for its type constraint, not only as a bag of statics.
@SuppressWarnings("InterfaceWithOnlyStatics")
public interface ContextKey<T> {
/**
* Returns a new {@link ContextKey} with the given debug name. The name does not impact behavior
* and is only for debugging purposes. Multiple different keys with the same name will be separate
* keys.
*/
static <T> ContextKey<T> named(String name) {
return ContextStorage.get().contextKey(name);
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
class ContextScheduledExecutorService extends ContextExecutorService
implements ScheduledExecutorService {
ContextScheduledExecutorService(Context context, ScheduledExecutorService delegate) {
super(context, delegate);
}
@Override
ScheduledExecutorService delegate() {
return (ScheduledExecutorService) super.delegate();
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return delegate().schedule(context().wrap(command), delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return delegate().schedule(context().wrap(callable), delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
return delegate().scheduleAtFixedRate(context().wrap(command), initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
return delegate().scheduleWithFixedDelay(context().wrap(command), initialDelay, delay, unit);
}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
/**
* The storage for storing and retrieving the current {@link Context}.
*
* <p>If you want to implement your own storage or add some hooks when a {@link Context} is attached
* and restored, you should use {@link ContextStorageProvider}. Here's an example that sets MDC
* before {@link Context} is attached:
*
* <pre>{@code
* > public class MyStorage implements ContextStorageProvider {
* >
* > @Override
* > public ContextStorage get() {
* > ContextStorage threadLocalStorage = Context.threadLocalStorage();
* > return new RequestContextStorage() {
* > @Override
* > public Scope T attach(Context toAttach) {
* > Context current = current();
* > setMdc(toAttach);
* > Scope scope = threadLocalStorage.attach(toAttach);
* > return () -> {
* > clearMdc();
* > setMdc(current);
* > scope.close();
* > }
* > }
* >
* > @Override
* > public Context current() {
* > return threadLocalStorage.current();
* > }
* > }
* > }
* > }
* }</pre>
*/
public interface ContextStorage {
/**
* Returns the {@link ContextStorage} being used by this application. This is only for use when
* integrating with other context propagation mechanisms and not meant for direct use. To attach
* or detach a {@link Context} in an application, use {@link Context#makeCurrent()} and {@link
* Scope#close()}.
*/
static ContextStorage get() {
return LazyStorage.storage;
}
/**
* Sets the specified {@link Context} as the current {@link Context} and returns a {@link Scope}
* representing the scope of execution. {@link Scope#close()} must be called when the current
* {@link Context} should be restored to what it was before attaching {@code toAttach}.
*/
Scope attach(Context toAttach);
/**
* Returns the current {@link DefaultContext}. If no {@link DefaultContext} has been attached yet,
* this will be the {@linkplain Context#root()} root context}.
*/
Context current();
/**
* Returns a {@link ContextKey} for the given name. This is only useful when integrating with a
* separate context propagation mechanism, where
*/
default <T> ContextKey<T> contextKey(String name) {
return new DefaultContextKey<>(name);
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import java.util.concurrent.Executor;
/**
* A Java SPI (Service Provider Interface) to allow replacing the default {@link ContextStorage}.
* This can be useful if, for example, you want to store OpenTelemetry {@link Context} in another
* context propagation system. For example, the returned {@link ContextStorage} could delegate to
* methods in
*
* <p><a
* href="https://javadoc.io/doc/com.linecorp.armeria/armeria-javadoc/latest/com/linecorp/armeria/common/RequestContext.html">{@code
* com.linecorp.armeria.common.RequestContext}</a>, <a
* href="https://grpc.github.io/grpc-java/javadoc/io/grpc/Context.html">{@code
* io.grpc.context.Context}</a>, or <a
* href="https://download.eclipse.org/microprofile/microprofile-context-propagation-1.0.2/apidocs/org/eclipse/microprofile/context/ThreadContext.html">{@code
* org.eclipse.microprofile.context.ThreadContext}</a>
*
* <p>if you are already using one of those systems in your application. Then you would not have to
* use methods like {@link Context#wrap(Executor)} and can use your current system instead.
*/
public interface ContextStorageProvider {
/** Returns the {@link ContextStorage} to use to store {@link DefaultContext}. */
ContextStorage get();
}

View File

@ -0,0 +1,84 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import javax.annotation.Nullable;
final class DefaultContext implements Context {
static final Context ROOT = new DefaultContext();
/**
* Returns the default {@link ContextStorage} used to attach {@link Context}s to scopes of
* execution. Should only be used when defining your own {@link ContextStorage} in case you want
* to delegate functionality to the default implementation.
*/
static ContextStorage threadLocalStorage() {
return ThreadLocalContextStorage.INSTANCE;
}
@Nullable private final PersistentHashArrayMappedTrie.Node<ContextKey<?>, Object> entries;
private DefaultContext(PersistentHashArrayMappedTrie.Node<ContextKey<?>, Object> entries) {
this.entries = entries;
}
DefaultContext() {
entries = null;
}
@Override
@Nullable
public <V> V getValue(ContextKey<V> key) {
// Because withValue enforces the value for a key is its type, this is always safe.
@SuppressWarnings("unchecked")
V value = (V) PersistentHashArrayMappedTrie.get(entries, key);
return value;
}
@Override
public <V> Context withValues(ContextKey<V> k1, V v1) {
PersistentHashArrayMappedTrie.Node<ContextKey<?>, Object> newEntries =
PersistentHashArrayMappedTrie.put(entries, k1, v1);
return new DefaultContext(newEntries);
}
@Override
public <V1, V2> Context withValues(ContextKey<V1> k1, V1 v1, ContextKey<V2> k2, V2 v2) {
PersistentHashArrayMappedTrie.Node<ContextKey<?>, Object> newEntries =
PersistentHashArrayMappedTrie.put(entries, k1, v1);
newEntries = PersistentHashArrayMappedTrie.put(newEntries, k2, v2);
return new DefaultContext(newEntries);
}
@Override
public <V1, V2, V3> Context withValues(
ContextKey<V1> k1, V1 v1, ContextKey<V2> k2, V2 v2, ContextKey<V3> k3, V3 v3) {
PersistentHashArrayMappedTrie.Node<ContextKey<?>, Object> newEntries =
PersistentHashArrayMappedTrie.put(entries, k1, v1);
newEntries = PersistentHashArrayMappedTrie.put(newEntries, k2, v2);
newEntries = PersistentHashArrayMappedTrie.put(newEntries, k3, v3);
return new DefaultContext(newEntries);
}
@Override
public <V1, V2, V3, V4> Context withValues(
ContextKey<V1> k1,
V1 v1,
ContextKey<V2> k2,
V2 v2,
ContextKey<V3> k3,
V3 v3,
ContextKey<V4> k4,
V4 v4) {
PersistentHashArrayMappedTrie.Node<ContextKey<?>, Object> newEntries =
PersistentHashArrayMappedTrie.put(entries, k1, v1);
newEntries = PersistentHashArrayMappedTrie.put(newEntries, k2, v2);
newEntries = PersistentHashArrayMappedTrie.put(newEntries, k3, v3);
newEntries = PersistentHashArrayMappedTrie.put(newEntries, k4, v4);
return new DefaultContext(newEntries);
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
final class DefaultContextKey<T> implements ContextKey<T> {
private final String name;
DefaultContextKey(String name) {
this.name = name;
}
@Override
public String toString() {
return name;
}
}

View File

@ -0,0 +1,89 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
// Lazy-loaded storage. Delaying storage initialization until after class initialization makes it
// much easier to avoid circular loading since there can still be references to Context as long as
// they don't depend on storage, like key() and currentContextExecutor(). It also makes it easier
// to handle exceptions.
final class LazyStorage {
private static final String CONTEXT_STORAGE_PROVIDER_PROPERTY =
"io.opentelemetry.context.contextStorageProvider";
private static final Logger logger = Logger.getLogger(ThreadLocalContextStorage.class.getName());
static final ContextStorage storage;
static {
AtomicReference<Throwable> deferredStorageFailure = new AtomicReference<>();
storage = createStorage(deferredStorageFailure);
Throwable failure = deferredStorageFailure.get();
// Logging must happen after storage has been set, as loggers may use Context.
if (failure != null) {
logger.log(
Level.WARNING, "ContextStorageProvider initialized failed. Using default", failure);
}
}
private static ContextStorage createStorage(AtomicReference<Throwable> deferredStorageFailure) {
String providerClassName = System.getProperty(CONTEXT_STORAGE_PROVIDER_PROPERTY, "");
List<ContextStorageProvider> providers = new ArrayList<>();
for (ContextStorageProvider provider : ServiceLoader.load(ContextStorageProvider.class)) {
providers.add(provider);
}
if (providers.isEmpty()) {
return DefaultContext.threadLocalStorage();
}
if (providers.size() == 1) {
ContextStorageProvider provider = providers.get(0);
try {
return provider.get();
} catch (Throwable t) {
deferredStorageFailure.set(t);
return DefaultContext.threadLocalStorage();
}
}
if (providerClassName.isEmpty()) {
deferredStorageFailure.set(
new IllegalStateException(
"Found multiple ContextStorageProvider. Set the "
+ "io.opentelemetry.context.ContextStorageProvider property to the fully "
+ "qualified class name of the provider to use. Falling back to default "
+ "ContextStorage. Found providers: "
+ providers));
return DefaultContext.threadLocalStorage();
}
for (ContextStorageProvider provider : providers) {
if (provider.getClass().getName().equals(providerClassName)) {
return provider.get();
}
}
deferredStorageFailure.set(
new IllegalStateException(
"io.opentelemetry.context.ContextStorageProvider property set but no matching class "
+ "could be found, requested: "
+ providerClassName
+ " but found providers: "
+ providers));
return DefaultContext.threadLocalStorage();
}
private LazyStorage() {}
}

View File

@ -0,0 +1,283 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import java.util.Arrays;
/**
* A persistent (copy-on-write) hash tree/trie. Collisions are handled linearly. Delete is not
* supported, but replacement is. The implementation favors simplicity and low memory allocation
* during insertion. Although the asymptotics are good, it is optimized for small sizes like less
* than 20; "unbelievably large" would be 100.
*
* <p>Inspired by popcnt-based compression seen in Ideal Hash Trees, Phil Bagwell (2000). The rest
* of the implementation is ignorant of/ignores the paper.
*/
final class PersistentHashArrayMappedTrie {
private PersistentHashArrayMappedTrie() {}
/** Returns the value with the specified key, or {@code null} if it does not exist. */
static <K, V> V get(Node<K, V> root, K key) {
if (root == null) {
return null;
}
return root.get(key, System.identityHashCode(key), 0);
}
/** Returns a new trie where the key is set to the specified value. */
static <K, V> Node<K, V> put(Node<K, V> root, K key, V value) {
if (root == null) {
return new Leaf<>(key, value);
}
return root.put(key, value, System.identityHashCode(key), 0);
}
// Not actually annotated to avoid depending on guava
// @VisibleForTesting
static final class Leaf<K, V> implements Node<K, V> {
private final K key;
private final V value;
public Leaf(K key, V value) {
this.key = key;
this.value = value;
}
@Override
public int size() {
return 1;
}
@Override
public V get(K key, int hash, int bitsConsumed) {
if (this.key == key) {
return value;
} else {
return null;
}
}
@Override
public Node<K, V> put(K key, V value, int hash, int bitsConsumed) {
int thisHash = System.identityHashCode(this.key);
if (thisHash != hash) {
// Insert
return CompressedIndex.combine(new Leaf<>(key, value), hash, this, thisHash, bitsConsumed);
} else if (this.key == key) {
// Replace
return new Leaf<>(key, value);
} else {
// Hash collision
return new CollisionLeaf<>(this.key, this.value, key, value);
}
}
@Override
public String toString() {
return String.format("Leaf(key=%s value=%s)", key, value);
}
}
// Not actually annotated to avoid depending on guava
// @VisibleForTesting
static final class CollisionLeaf<K, V> implements Node<K, V> {
// All keys must have same hash, but not have the same reference
private final K[] keys;
private final V[] values;
// Not actually annotated to avoid depending on guava
// @VisibleForTesting
@SuppressWarnings("unchecked")
CollisionLeaf(K key1, V value1, K key2, V value2) {
this((K[]) new Object[] {key1, key2}, (V[]) new Object[] {value1, value2});
assert key1 != key2;
assert System.identityHashCode(key1) == System.identityHashCode(key2);
}
private CollisionLeaf(K[] keys, V[] values) {
this.keys = keys;
this.values = values;
}
@Override
public int size() {
return values.length;
}
@Override
public V get(K key, int hash, int bitsConsumed) {
for (int i = 0; i < keys.length; i++) {
if (keys[i] == key) {
return values[i];
}
}
return null;
}
@Override
public Node<K, V> put(K key, V value, int hash, int bitsConsumed) {
int thisHash = System.identityHashCode(keys[0]);
int keyIndex;
if (thisHash != hash) {
// Insert
return CompressedIndex.combine(new Leaf<>(key, value), hash, this, thisHash, bitsConsumed);
} else if ((keyIndex = indexOfKey(key)) != -1) {
// Replace
K[] newKeys = Arrays.copyOf(keys, keys.length);
V[] newValues = Arrays.copyOf(values, keys.length);
newKeys[keyIndex] = key;
newValues[keyIndex] = value;
return new CollisionLeaf<>(newKeys, newValues);
} else {
// Yet another hash collision
K[] newKeys = Arrays.copyOf(keys, keys.length + 1);
V[] newValues = Arrays.copyOf(values, keys.length + 1);
newKeys[keys.length] = key;
newValues[keys.length] = value;
return new CollisionLeaf<>(newKeys, newValues);
}
}
// -1 if not found
private int indexOfKey(K key) {
for (int i = 0; i < keys.length; i++) {
if (keys[i] == key) {
return i;
}
}
return -1;
}
@Override
public String toString() {
StringBuilder valuesSb = new StringBuilder();
valuesSb.append("CollisionLeaf(");
for (int i = 0; i < values.length; i++) {
valuesSb.append("(key=").append(keys[i]).append(" value=").append(values[i]).append(") ");
}
return valuesSb.append(")").toString();
}
}
// Not actually annotated to avoid depending on guava
// @VisibleForTesting
static final class CompressedIndex<K, V> implements Node<K, V> {
private static final int BITS = 5;
private static final int BITS_MASK = 0x1F;
final int bitmap;
final Node<K, V>[] values;
private final int size;
private CompressedIndex(int bitmap, Node<K, V>[] values, int size) {
this.bitmap = bitmap;
this.values = values;
this.size = size;
}
@Override
public int size() {
return size;
}
@Override
public V get(K key, int hash, int bitsConsumed) {
int indexBit = indexBit(hash, bitsConsumed);
if ((bitmap & indexBit) == 0) {
return null;
}
int compressedIndex = compressedIndex(indexBit);
return values[compressedIndex].get(key, hash, bitsConsumed + BITS);
}
@Override
public Node<K, V> put(K key, V value, int hash, int bitsConsumed) {
int indexBit = indexBit(hash, bitsConsumed);
int compressedIndex = compressedIndex(indexBit);
if ((bitmap & indexBit) == 0) {
// Insert
int newBitmap = bitmap | indexBit;
@SuppressWarnings("unchecked")
Node<K, V>[] newValues = (Node<K, V>[]) new Node<?, ?>[values.length + 1];
System.arraycopy(values, 0, newValues, 0, compressedIndex);
newValues[compressedIndex] = new Leaf<>(key, value);
System.arraycopy(
values,
compressedIndex,
newValues,
compressedIndex + 1,
values.length - compressedIndex);
return new CompressedIndex<>(newBitmap, newValues, size() + 1);
} else {
// Replace
Node<K, V>[] newValues = Arrays.copyOf(values, values.length);
newValues[compressedIndex] =
values[compressedIndex].put(key, value, hash, bitsConsumed + BITS);
int newSize = size();
newSize += newValues[compressedIndex].size();
newSize -= values[compressedIndex].size();
return new CompressedIndex<>(bitmap, newValues, newSize);
}
}
static <K, V> Node<K, V> combine(
Node<K, V> node1, int hash1, Node<K, V> node2, int hash2, int bitsConsumed) {
assert hash1 != hash2;
int indexBit1 = indexBit(hash1, bitsConsumed);
int indexBit2 = indexBit(hash2, bitsConsumed);
if (indexBit1 == indexBit2) {
Node<K, V> node = combine(node1, hash1, node2, hash2, bitsConsumed + BITS);
@SuppressWarnings("unchecked")
Node<K, V>[] values = (Node<K, V>[]) new Node<?, ?>[] {node};
return new CompressedIndex<>(indexBit1, values, node.size());
} else {
// Make node1 the smallest
if (uncompressedIndex(hash1, bitsConsumed) > uncompressedIndex(hash2, bitsConsumed)) {
Node<K, V> nodeCopy = node1;
node1 = node2;
node2 = nodeCopy;
}
@SuppressWarnings("unchecked")
Node<K, V>[] values = (Node<K, V>[]) new Node<?, ?>[] {node1, node2};
return new CompressedIndex<>(indexBit1 | indexBit2, values, node1.size() + node2.size());
}
}
@Override
public String toString() {
StringBuilder valuesSb = new StringBuilder();
valuesSb
.append("CompressedIndex(")
.append(String.format("bitmap=%s ", Integer.toBinaryString(bitmap)));
for (Node<K, V> value : values) {
valuesSb.append(value).append(" ");
}
return valuesSb.append(")").toString();
}
private int compressedIndex(int indexBit) {
return Integer.bitCount(bitmap & (indexBit - 1));
}
private static int uncompressedIndex(int hash, int bitsConsumed) {
return (hash >>> bitsConsumed) & BITS_MASK;
}
private static int indexBit(int hash, int bitsConsumed) {
int uncompressedIndex = uncompressedIndex(hash, bitsConsumed);
return 1 << uncompressedIndex;
}
}
interface Node<K, V> {
V get(K key, int hash, int bitsConsumed);
Node<K, V> put(K key, V value, int hash, int bitsConsumed);
int size();
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import io.opentelemetry.context.ThreadLocalContextStorage.NoopScope;
/**
* An {@link AutoCloseable} that represents a mounted context for a block of code. A failure to call
* {@link Scope#close()} will generally break tracing or cause memory leaks. It is recommended that
* you use this class with a {@code try-with-resources} block:
*
* <pre>{@code
* try (Scope ignored = tracer.withSpan(span)) {
* ...
* }
* }</pre>
*/
public interface Scope extends AutoCloseable {
/**
* Returns a {@link Scope} that does nothing. Represents attaching a {@link Context} when it is
* already attached.
*/
static Scope noop() {
return NoopScope.INSTANCE;
}
@Override
void close();
}

View File

@ -0,0 +1,57 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import java.util.logging.Level;
import java.util.logging.Logger;
enum ThreadLocalContextStorage implements ContextStorage {
INSTANCE;
private static final Logger logger = Logger.getLogger(ThreadLocalContextStorage.class.getName());
private static final ThreadLocal<Context> THREAD_LOCAL_STORAGE = new ThreadLocal<>();
static {
THREAD_LOCAL_STORAGE.set(Context.root());
}
@Override
public Scope attach(Context toAttach) {
if (toAttach == null) {
// Null context not allowed so ignore it.
return NoopScope.INSTANCE;
}
Context beforeAttach = current();
if (toAttach == beforeAttach) {
return NoopScope.INSTANCE;
}
THREAD_LOCAL_STORAGE.set(toAttach);
return () -> {
if (current() != toAttach) {
logger.log(
Level.FINE,
"Context in storage not the expected context, Scope.close was not called correctly");
}
THREAD_LOCAL_STORAGE.set(beforeAttach);
};
}
@Override
public Context current() {
return THREAD_LOCAL_STORAGE.get();
}
enum NoopScope implements Scope {
INSTANCE;
@Override
public void close() {}
}
}

View File

@ -0,0 +1,136 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import brave.Tracing;
import brave.propagation.CurrentTraceContext;
import brave.propagation.TraceContext;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class BraveContextStorageProvider implements ContextStorageProvider {
@Override
public ContextStorage get() {
return BraveContextStorage.INSTANCE;
}
@SuppressWarnings("ReferenceEquality")
private enum BraveContextStorage implements ContextStorage {
INSTANCE;
@Override
public Scope attach(Context toAttach) {
TraceContext braveContextToAttach = ((BraveContextWrapper) toAttach).braveContext;
CurrentTraceContext currentTraceContext = Tracing.current().currentTraceContext();
TraceContext currentBraveContext = currentTraceContext.get();
if (currentBraveContext == braveContextToAttach) {
return Scope.noop();
}
CurrentTraceContext.Scope braveScope = currentTraceContext.newScope(braveContextToAttach);
return braveScope::close;
}
@Override
public Context current() {
TraceContext current = Tracing.current().currentTraceContext().get();
if (current != null) {
return new BraveContextWrapper(current);
}
return new BraveContextWrapper(TraceContext.newBuilder().traceId(1).spanId(1).build());
}
}
private static class BraveContextValues {
private final Object[] values;
BraveContextValues(Object key, Object value) {
this.values = new Object[] {key, value};
}
BraveContextValues(Object[] values) {
this.values = values;
}
Object getValue(Object key) {
for (int i = 0; i < values.length; i += 2) {
if (values[i] == key) {
return values[i + 1];
}
}
return null;
}
BraveContextValues with(Object key, Object value) {
final Object[] copy;
for (int i = 0; i < values.length; i += 2) {
if (values[i] == key) {
copy = values.clone();
copy[i + 1] = value;
return new BraveContextValues(copy);
}
}
copy = Arrays.copyOf(values, values.length + 2);
copy[values.length - 2] = key;
copy[values.length - 1] = value;
return new BraveContextValues(copy);
}
}
private static class BraveContextWrapper implements Context {
private final TraceContext braveContext;
private BraveContextWrapper(TraceContext braveContext) {
this.braveContext = braveContext;
}
@Override
public <V> V getValue(ContextKey<V> key) {
BraveContextValues values = braveContext.findExtra(BraveContextValues.class);
if (values == null) {
return null;
}
@SuppressWarnings("unchecked")
V value = (V) values.getValue(key);
return value;
}
@Override
public <V> Context withValues(ContextKey<V> k1, V v1) {
List<Object> extras = braveContext.extra();
BraveContextValues values = null;
int existingValuesIndex = -1;
for (int i = 0; i < extras.size(); i++) {
Object extra = extras.get(i);
if (extra instanceof BraveContextValues) {
values = (BraveContextValues) extra;
existingValuesIndex = i;
break;
}
}
final List<Object> newExtras;
if (values == null) {
values = new BraveContextValues(k1, v1);
newExtras = new ArrayList<>(extras.size() + 1);
newExtras.addAll(extras);
newExtras.add(values);
} else {
newExtras = new ArrayList<>(extras);
newExtras.set(existingValuesIndex, values.with(k1, v1));
}
TraceContext.Builder builder = braveContext.toBuilder();
builder.clearExtra();
newExtras.forEach(builder::addExtra);
return new BraveContextWrapper(builder.build());
}
}
}

View File

@ -0,0 +1,122 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import static org.assertj.core.api.Assertions.assertThat;
import brave.Tracing;
import brave.propagation.CurrentTraceContext;
import brave.propagation.TraceContext;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
class OtelAsBraveTest {
private static final ContextKey<String> ANIMAL = ContextKey.named("animal");
private static final Tracing TRACING =
Tracing.newBuilder().currentTraceContext(CurrentTraceContext.Default.create()).build();
private static final TraceContext TRACE_CONTEXT =
TraceContext.newBuilder().traceId(1).spanId(1).addExtra("japan").build();
private static ExecutorService otherThread;
@BeforeAll
static void setUp() {
otherThread = Executors.newSingleThreadExecutor();
}
@AfterAll
static void tearDown() {
otherThread.shutdown();
}
@Test
void braveOtelMix() {
try (CurrentTraceContext.Scope ignored =
TRACING.currentTraceContext().newScope(TRACE_CONTEXT)) {
assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan");
try (Scope ignored2 = Context.current().withValues(ANIMAL, "cat").makeCurrent()) {
assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan");
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
TraceContext context2 =
Tracing.current().currentTraceContext().get().toBuilder().addExtra("cheese").build();
try (CurrentTraceContext.Scope ignored3 =
TRACING.currentTraceContext().newScope(context2)) {
assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan");
assertThat(Tracing.current().currentTraceContext().get().extra()).contains("cheese");
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
}
}
}
}
@Test
void braveWrap() throws Exception {
try (CurrentTraceContext.Scope ignored =
TRACING.currentTraceContext().newScope(TRACE_CONTEXT)) {
try (Scope ignored2 = Context.current().withValues(ANIMAL, "cat").makeCurrent()) {
assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan");
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
AtomicReference<Boolean> braveContainsJapan = new AtomicReference<>();
AtomicReference<String> otelValue = new AtomicReference<>();
Runnable runnable =
() -> {
TraceContext traceContext = Tracing.current().currentTraceContext().get();
if (traceContext != null && traceContext.extra().contains("japan")) {
braveContainsJapan.set(true);
} else {
braveContainsJapan.set(false);
}
otelValue.set(Context.current().getValue(ANIMAL));
};
otherThread.submit(runnable).get();
assertThat(braveContainsJapan).hasValue(false);
assertThat(otelValue).hasValue(null);
otherThread.submit(TRACING.currentTraceContext().wrap(runnable)).get();
assertThat(braveContainsJapan).hasValue(true);
assertThat(otelValue).hasValue("cat");
}
}
}
@Test
void otelWrap() throws Exception {
try (CurrentTraceContext.Scope ignored =
TRACING.currentTraceContext().newScope(TRACE_CONTEXT)) {
try (Scope ignored2 = Context.current().withValues(ANIMAL, "cat").makeCurrent()) {
assertThat(Tracing.current().currentTraceContext().get().extra()).contains("japan");
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
AtomicReference<Boolean> braveContainsJapan = new AtomicReference<>(false);
AtomicReference<String> otelValue = new AtomicReference<>();
Runnable runnable =
() -> {
TraceContext traceContext = Tracing.current().currentTraceContext().get();
if (traceContext != null && traceContext.extra().contains("japan")) {
braveContainsJapan.set(true);
} else {
braveContainsJapan.set(false);
}
otelValue.set(Context.current().getValue(ANIMAL));
};
otherThread.submit(runnable).get();
assertThat(braveContainsJapan).hasValue(false);
assertThat(otelValue).hasValue(null);
otherThread.submit(Context.current().wrap(runnable)).get();
assertThat(braveContainsJapan).hasValue(true);
assertThat(otelValue).hasValue("cat");
}
}
}
}

View File

@ -0,0 +1 @@
io.opentelemetry.context.BraveContextStorageProvider

View File

@ -0,0 +1,123 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import java.util.logging.Level;
import java.util.logging.Logger;
public class GrpcContextStorageProvider implements ContextStorageProvider {
private static final Logger log = Logger.getLogger(GrpcContextStorageProvider.class.getName());
@Override
public ContextStorage get() {
return GrpcContextStorage.INSTANCE;
}
private enum GrpcContextStorage implements ContextStorage {
INSTANCE;
@Override
public Scope attach(Context toAttach) {
if (!(toAttach instanceof GrpcContextWrapper)) {
log.log(
Level.SEVERE,
"Context not created by GrpcContextStorageProvider. This is not "
+ "allowed when using GrpcContextStorageProvider. Did you create this context "
+ "using Context.current()?");
return Scope.noop();
}
io.grpc.Context grpcContextToAttach = ((GrpcContextWrapper) toAttach).grpcContext;
io.grpc.Context currentGrpcContext = io.grpc.Context.current();
if (grpcContextToAttach == currentGrpcContext) {
return Scope.noop();
}
io.grpc.Context toRestore = grpcContextToAttach.attach();
return () -> grpcContextToAttach.detach(toRestore);
}
@Override
public Context current() {
return new GrpcContextWrapper(io.grpc.Context.current());
}
@Override
public <T> ContextKey<T> contextKey(String name) {
return new GrpcContextKeyWrapper<>(io.grpc.Context.key(name));
}
}
private static class GrpcContextWrapper implements Context {
private final io.grpc.Context grpcContext;
private GrpcContextWrapper(io.grpc.Context grpcContext) {
this.grpcContext = grpcContext;
}
@Override
public <V> V getValue(ContextKey<V> key) {
return grpcKey(key).get(grpcContext);
}
@Override
public <V> Context withValues(ContextKey<V> k1, V v1) {
return new GrpcContextWrapper(grpcContext.withValue(grpcKey(k1), v1));
}
@Override
public <V1, V2> Context withValues(ContextKey<V1> k1, V1 v1, ContextKey<V2> k2, V2 v2) {
return new GrpcContextWrapper(grpcContext.withValues(grpcKey(k1), v1, grpcKey(k2), v2));
}
@Override
public <V1, V2, V3> Context withValues(
ContextKey<V1> k1, V1 v1, ContextKey<V2> k2, V2 v2, ContextKey<V3> k3, V3 v3) {
return new GrpcContextWrapper(
grpcContext.withValues(grpcKey(k1), v1, grpcKey(k2), v2, grpcKey(k3), v3));
}
@Override
public <V1, V2, V3, V4> Context withValues(
ContextKey<V1> k1,
V1 v1,
ContextKey<V2> k2,
V2 v2,
ContextKey<V3> k3,
V3 v3,
ContextKey<V4> k4,
V4 v4) {
return new GrpcContextWrapper(
grpcContext.withValues(
grpcKey(k1), v1, grpcKey(k2), v2, grpcKey(k3), v3, grpcKey(k4), v4));
}
}
static class GrpcContextKeyWrapper<T> implements ContextKey<T> {
private final io.grpc.Context.Key<T> grpcContextKey;
private GrpcContextKeyWrapper(io.grpc.Context.Key<T> grpcContextKey) {
this.grpcContextKey = grpcContextKey;
}
}
static <T> io.grpc.Context.Key<T> grpcKey(ContextKey<T> key) {
if (key instanceof GrpcContextKeyWrapper) {
return ((GrpcContextKeyWrapper<T>) key).grpcContextKey;
}
log.log(
Level.SEVERE,
"ContextKey not created by GrpcContextStorageProvider, "
+ "this is not allowed when using GrpcContextStorageProvider. Did you create this "
+ "key using ContextKey.named()?");
// This ephemereal key is invalid but the best we can fallback to.
return io.grpc.Context.key("invalid-context-key-" + key);
}
}

View File

@ -0,0 +1,121 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
class OtelAsGrpcTest {
private static final ContextKey<String> ANIMAL = ContextKey.named("animal");
private static final io.grpc.Context.Key<String> FOOD = io.grpc.Context.key("food");
private static final io.grpc.Context.Key<String> COUNTRY = io.grpc.Context.key("country");
private static ExecutorService otherThread;
@BeforeAll
static void setUp() {
otherThread = Executors.newSingleThreadExecutor();
}
@AfterAll
static void tearDown() {
otherThread.shutdown();
}
@Test
void grpcOtelMix() {
io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan");
assertThat(COUNTRY.get()).isNull();
io.grpc.Context root = grpcContext.attach();
try {
assertThat(COUNTRY.get()).isEqualTo("japan");
try (Scope ignored = Context.current().withValues(ANIMAL, "cat").makeCurrent()) {
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
assertThat(COUNTRY.get()).isEqualTo("japan");
io.grpc.Context context2 = io.grpc.Context.current().withValue(FOOD, "cheese");
assertThat(FOOD.get()).isNull();
io.grpc.Context toRestore = context2.attach();
try {
assertThat(FOOD.get()).isEqualTo("cheese");
assertThat(COUNTRY.get()).isEqualTo("japan");
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
} finally {
context2.detach(toRestore);
}
}
} finally {
grpcContext.detach(root);
}
}
@Test
void grpcWrap() throws Exception {
io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan");
io.grpc.Context root = grpcContext.attach();
try {
try (Scope ignored = Context.current().withValues(ANIMAL, "cat").makeCurrent()) {
assertThat(COUNTRY.get()).isEqualTo("japan");
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
AtomicReference<String> grpcValue = new AtomicReference<>();
AtomicReference<String> otelValue = new AtomicReference<>();
Runnable runnable =
() -> {
grpcValue.set(COUNTRY.get());
otelValue.set(Context.current().getValue(ANIMAL));
};
otherThread.submit(runnable).get();
assertThat(grpcValue).hasValue(null);
assertThat(otelValue).hasValue(null);
otherThread.submit(io.grpc.Context.current().wrap(runnable)).get();
assertThat(grpcValue).hasValue("japan");
assertThat(otelValue).hasValue("cat");
}
} finally {
grpcContext.detach(root);
}
}
@Test
void otelWrap() throws Exception {
io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan");
io.grpc.Context root = grpcContext.attach();
try {
try (Scope ignored = Context.current().withValues(ANIMAL, "cat").makeCurrent()) {
assertThat(COUNTRY.get()).isEqualTo("japan");
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
AtomicReference<String> grpcValue = new AtomicReference<>();
AtomicReference<String> otelValue = new AtomicReference<>();
Runnable runnable =
() -> {
grpcValue.set(COUNTRY.get());
otelValue.set(Context.current().getValue(ANIMAL));
};
otherThread.submit(runnable).get();
assertThat(grpcValue).hasValue(null);
assertThat(otelValue).hasValue(null);
otherThread.submit(Context.current().wrap(runnable)).get();
assertThat(grpcValue).hasValue("japan");
assertThat(otelValue).hasValue("cat");
}
} finally {
grpcContext.detach(root);
}
}
}

View File

@ -0,0 +1 @@
io.opentelemetry.context.GrpcContextStorageProvider

View File

@ -0,0 +1,41 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
public class GrpcContextStorageProvider implements ContextStorageProvider {
@Override
public ContextStorage get() {
return GrpcContextStorage.INSTANCE;
}
private enum GrpcContextStorage implements ContextStorage {
INSTANCE;
private static final io.grpc.Context.Key<Context> OTEL_CONTEXT =
io.grpc.Context.key("otel-context");
@Override
public Scope attach(Context toAttach) {
io.grpc.Context grpcContext = io.grpc.Context.current();
Context current = OTEL_CONTEXT.get(grpcContext);
if (current == toAttach) {
return Scope.noop();
}
io.grpc.Context newGrpcContext = grpcContext.withValue(OTEL_CONTEXT, toAttach);
io.grpc.Context toRestore = newGrpcContext.attach();
return () -> newGrpcContext.detach(toRestore);
}
@Override
public Context current() {
return OTEL_CONTEXT.get();
}
}
}

View File

@ -0,0 +1,123 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
class OtelInGrpcTest {
private static final ContextKey<String> ANIMAL = ContextKey.named("animal");
private static final io.grpc.Context.Key<String> FOOD = io.grpc.Context.key("food");
private static final io.grpc.Context.Key<String> COUNTRY = io.grpc.Context.key("country");
private static ExecutorService otherThread;
@BeforeAll
static void setUp() {
otherThread = Executors.newSingleThreadExecutor();
}
@AfterAll
static void tearDown() {
otherThread.shutdown();
}
@Test
void grpcOtelMix() {
io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan");
assertThat(COUNTRY.get()).isNull();
io.grpc.Context root = grpcContext.attach();
try {
assertThat(COUNTRY.get()).isEqualTo("japan");
try (Scope ignored = Context.current().withValues(ANIMAL, "cat").makeCurrent()) {
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
assertThat(COUNTRY.get()).isEqualTo("japan");
io.grpc.Context context2 = io.grpc.Context.current().withValue(FOOD, "cheese");
assertThat(FOOD.get()).isNull();
io.grpc.Context toRestore = context2.attach();
try {
assertThat(FOOD.get()).isEqualTo("cheese");
assertThat(COUNTRY.get()).isEqualTo("japan");
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
} finally {
context2.detach(toRestore);
}
}
} finally {
grpcContext.detach(root);
}
}
@Test
void grpcWrap() throws Exception {
io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan");
io.grpc.Context root = grpcContext.attach();
try {
try (Scope ignored = Context.current().withValues(ANIMAL, "cat").makeCurrent()) {
assertThat(COUNTRY.get()).isEqualTo("japan");
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
AtomicReference<String> grpcValue = new AtomicReference<>();
AtomicReference<String> otelValue = new AtomicReference<>();
Runnable runnable =
() -> {
grpcValue.set(COUNTRY.get());
otelValue.set(Context.current().getValue(ANIMAL));
};
otherThread.submit(runnable).get();
assertThat(grpcValue).hasValue(null);
assertThat(otelValue).hasValue(null);
otherThread.submit(io.grpc.Context.current().wrap(runnable)).get();
assertThat(grpcValue).hasValue("japan");
assertThat(otelValue).hasValue("cat");
}
} finally {
grpcContext.detach(root);
}
}
@Test
void otelWrap() throws Exception {
io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan");
io.grpc.Context root = grpcContext.attach();
try {
try (Scope ignored = Context.current().withValues(ANIMAL, "cat").makeCurrent()) {
assertThat(COUNTRY.get()).isEqualTo("japan");
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
AtomicReference<String> grpcValue = new AtomicReference<>();
AtomicReference<String> otelValue = new AtomicReference<>();
Runnable runnable =
() -> {
grpcValue.set(COUNTRY.get());
otelValue.set(Context.current().getValue(ANIMAL));
};
otherThread.submit(runnable).get();
assertThat(grpcValue).hasValue(null);
assertThat(otelValue).hasValue(null);
otherThread.submit(Context.current().wrap(runnable)).get();
// Since OTel context is inside the gRPC context, propagating OTel context does not
// propagate the gRPC context.
assertThat(grpcValue).hasValue(null);
assertThat(otelValue).hasValue("cat");
}
} finally {
grpcContext.detach(root);
}
}
}

View File

@ -0,0 +1 @@
io.opentelemetry.context.GrpcContextStorageProvider

View File

@ -0,0 +1,433 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
@SuppressWarnings("ClassCanBeStatic")
class ContextTest {
private static final ContextKey<String> ANIMAL = ContextKey.named("animal");
private static final ContextKey<Object> BAG = ContextKey.named("bag");
private static final ContextKey<String> FOOD = ContextKey.named("food");
private static final ContextKey<Integer> COOKIES = ContextKey.named("cookies");
private static final Context CAT = Context.current().withValues(ANIMAL, "cat");
// Make sure all tests clean up
@AfterEach
void tearDown() {
assertThat(Context.current()).isEqualTo(Context.root());
}
@Test
void startsWithRoot() {
assertThat(Context.current()).isEqualTo(Context.root());
}
@Test
void canBeAttached() {
Context context = Context.current().withValues(ANIMAL, "cat");
assertThat(Context.current().getValue(ANIMAL)).isNull();
try (Scope ignored = context.makeCurrent()) {
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
try (Scope ignored2 = Context.root().makeCurrent()) {
assertThat(Context.current().getValue(ANIMAL)).isNull();
}
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
}
assertThat(Context.current().getValue(ANIMAL)).isNull();
}
@Test
void attachSameTwice() {
Context context = Context.current().withValues(ANIMAL, "cat");
assertThat(Context.current().getValue(ANIMAL)).isNull();
try (Scope ignored = context.makeCurrent()) {
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
try (Scope ignored2 = context.makeCurrent()) {
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
}
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
}
assertThat(Context.current().getValue(ANIMAL)).isNull();
}
@Test
void newThreadStartsWithRoot() throws Exception {
Context context = Context.current().withValues(ANIMAL, "cat");
try (Scope ignored = context.makeCurrent()) {
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("cat");
AtomicReference<Context> current = new AtomicReference<>();
Thread thread = new Thread(() -> current.set(Context.current()));
thread.start();
thread.join();
assertThat(current.get()).isEqualTo(Context.root());
}
}
@Test
public void closingScopeWhenNotActiveIsLogged() {
final AtomicReference<LogRecord> logRef = new AtomicReference<>();
Handler handler =
new Handler() {
@Override
public void publish(LogRecord record) {
logRef.set(record);
}
@Override
public void flush() {}
@Override
public void close() {}
};
Logger logger = Logger.getLogger(ThreadLocalContextStorage.class.getName());
Level level = logger.getLevel();
logger.setLevel(Level.ALL);
try {
logger.addHandler(handler);
Context initial = Context.current();
Context context = initial.withValues(ANIMAL, "cat");
try (Scope scope = context.makeCurrent()) {
Context context2 = context.withValues(ANIMAL, "dog");
try (Scope ignored = context2.makeCurrent()) {
assertThat(Context.current().getValue(ANIMAL)).isEqualTo("dog");
scope.close();
}
}
assertThat(Context.current()).isEqualTo(initial);
assertThat(logRef.get()).isNotNull();
assertThat(logRef.get().getMessage()).contains("Context in storage not the expected context");
} finally {
logger.removeHandler(handler);
logger.setLevel(level);
}
}
@Test
void withValues() {
Context context1 = Context.current().withValues(ANIMAL, "cat");
assertThat(context1.getValue(ANIMAL)).isEqualTo("cat");
Context context2 = context1.withValues(BAG, 100);
// Old unaffected
assertThat(context1.getValue(ANIMAL)).isEqualTo("cat");
assertThat(context1.getValue(BAG)).isNull();
assertThat(context2.getValue(ANIMAL)).isEqualTo("cat");
assertThat(context2.getValue(BAG)).isEqualTo(100);
Context context3 = context2.withValues(ANIMAL, "dog");
// Old unaffected
assertThat(context2.getValue(ANIMAL)).isEqualTo("cat");
assertThat(context2.getValue(BAG)).isEqualTo(100);
assertThat(context3.getValue(ANIMAL)).isEqualTo("dog");
assertThat(context3.getValue(BAG)).isEqualTo(100);
Context context4 = context3.withValues(BAG, null);
// Old unaffected
assertThat(context3.getValue(ANIMAL)).isEqualTo("dog");
assertThat(context3.getValue(BAG)).isEqualTo(100);
assertThat(context4.getValue(ANIMAL)).isEqualTo("dog");
assertThat(context4.getValue(BAG)).isNull();
}
@Test
void withTwoValues() {
Context context = Context.current().withValues(ANIMAL, "cat", FOOD, "hot dog");
assertThat(context.getValue(ANIMAL)).isEqualTo("cat");
assertThat(context.getValue(FOOD)).isEqualTo("hot dog");
}
@Test
void withThreeValues() {
Context context = Context.current().withValues(ANIMAL, "cat", FOOD, "hot dog", COOKIES, 100);
assertThat(context.getValue(ANIMAL)).isEqualTo("cat");
assertThat(context.getValue(FOOD)).isEqualTo("hot dog");
assertThat(context.getValue(COOKIES)).isEqualTo(100);
}
@Test
void withFourValues() {
Context context =
Context.current().withValues(ANIMAL, "cat", FOOD, "hot dog", COOKIES, 100, BAG, "prada");
assertThat(context.getValue(ANIMAL)).isEqualTo("cat");
assertThat(context.getValue(FOOD)).isEqualTo("hot dog");
assertThat(context.getValue(COOKIES)).isEqualTo(100);
assertThat(context.getValue(BAG)).isEqualTo("prada");
}
@Test
void wrapRunnable() {
AtomicReference<String> value = new AtomicReference<>();
Runnable callback = () -> value.set(Context.current().getValue(ANIMAL));
callback.run();
assertThat(value).hasValue(null);
CAT.wrap(callback).run();
assertThat(value).hasValue("cat");
callback.run();
assertThat(value).hasValue(null);
}
@Test
void wrapCallable() throws Exception {
AtomicReference<String> value = new AtomicReference<>();
Callable<String> callback =
() -> {
value.set(Context.current().getValue(ANIMAL));
return "foo";
};
assertThat(callback.call()).isEqualTo("foo");
assertThat(value).hasValue(null);
assertThat(CAT.wrap(callback).call()).isEqualTo("foo");
assertThat(value).hasValue("cat");
assertThat(callback.call()).isEqualTo("foo");
assertThat(value).hasValue(null);
}
@Test
void wrapExecutor() {
AtomicReference<String> value = new AtomicReference<>();
Executor executor = MoreExecutors.directExecutor();
Runnable callback = () -> value.set(Context.current().getValue(ANIMAL));
executor.execute(callback);
assertThat(value).hasValue(null);
CAT.wrap(executor).execute(callback);
assertThat(value).hasValue("cat");
executor.execute(callback);
assertThat(value).hasValue(null);
}
@Nested
@TestInstance(Lifecycle.PER_CLASS)
class WrapExecutorService {
protected ScheduledExecutorService executor;
protected ExecutorService wrapped;
protected AtomicReference<String> value;
@BeforeAll
void initExecutor() {
executor = Executors.newSingleThreadScheduledExecutor();
wrapped = CAT.wrap((ExecutorService) executor);
}
@AfterAll
void stopExecutor() {
executor.shutdown();
}
@BeforeEach
void setUp() {
value = new AtomicReference<>();
}
@Test
void execute() {
Runnable runnable = () -> value.set(Context.current().getValue(ANIMAL));
wrapped.execute(runnable);
await().untilAsserted(() -> assertThat(value).hasValue("cat"));
}
@Test
void submitRunnable() {
Runnable runnable = () -> value.set(Context.current().getValue(ANIMAL));
Futures.getUnchecked(wrapped.submit(runnable));
assertThat(value).hasValue("cat");
}
@Test
void submitRunnableResult() {
Runnable runnable = () -> value.set(Context.current().getValue(ANIMAL));
assertThat(Futures.getUnchecked(wrapped.submit(runnable, "foo"))).isEqualTo("foo");
assertThat(value).hasValue("cat");
}
@Test
void submitCallable() {
Callable<String> callable =
() -> {
value.set(Context.current().getValue(ANIMAL));
return "foo";
};
assertThat(Futures.getUnchecked(wrapped.submit(callable))).isEqualTo("foo");
assertThat(value).hasValue("cat");
}
@Test
void invokeAll() throws Exception {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
Callable<String> callable1 =
() -> {
value1.set(Context.current().getValue(ANIMAL));
return "foo";
};
Callable<String> callable2 =
() -> {
value2.set(Context.current().getValue(ANIMAL));
return "bar";
};
List<Future<String>> futures = wrapped.invokeAll(Arrays.asList(callable1, callable2));
assertThat(futures.get(0).get()).isEqualTo("foo");
assertThat(futures.get(1).get()).isEqualTo("bar");
assertThat(value1).hasValue("cat");
assertThat(value2).hasValue("cat");
}
@Test
void invokeAllTimeout() throws Exception {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
Callable<String> callable1 =
() -> {
value1.set(Context.current().getValue(ANIMAL));
return "foo";
};
Callable<String> callable2 =
() -> {
value2.set(Context.current().getValue(ANIMAL));
return "bar";
};
List<Future<String>> futures =
wrapped.invokeAll(Arrays.asList(callable1, callable2), 1, TimeUnit.SECONDS);
assertThat(futures.get(0).get()).isEqualTo("foo");
assertThat(futures.get(1).get()).isEqualTo("bar");
assertThat(value1).hasValue("cat");
assertThat(value2).hasValue("cat");
}
@Test
void invokeAny() throws Exception {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
Callable<String> callable1 =
() -> {
value1.set(Context.current().getValue(ANIMAL));
throw new IllegalStateException("callable2 wins");
};
Callable<String> callable2 =
() -> {
value2.set(Context.current().getValue(ANIMAL));
return "bar";
};
assertThat(wrapped.invokeAny(Arrays.asList(callable1, callable2))).isEqualTo("bar");
assertThat(value1).hasValue("cat");
assertThat(value2).hasValue("cat");
}
@Test
void invokeAnyTimeout() throws Exception {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
Callable<String> callable1 =
() -> {
value1.set(Context.current().getValue(ANIMAL));
throw new IllegalStateException("callable2 wins");
};
Callable<String> callable2 =
() -> {
value2.set(Context.current().getValue(ANIMAL));
return "bar";
};
assertThat(wrapped.invokeAny(Arrays.asList(callable1, callable2), 1, TimeUnit.SECONDS))
.isEqualTo("bar");
assertThat(value1).hasValue("cat");
assertThat(value2).hasValue("cat");
}
}
@Nested
@TestInstance(Lifecycle.PER_CLASS)
class WrapScheduledExecutorService extends WrapExecutorService {
private ScheduledExecutorService wrapScheduled;
@BeforeEach
void wrapScheduled() {
wrapScheduled = CAT.wrap(executor);
}
@Test
void scheduleRunnable() throws Exception {
Runnable runnable = () -> value.set(Context.current().getValue(ANIMAL));
wrapScheduled.schedule(runnable, 0, TimeUnit.SECONDS).get();
assertThat(value).hasValue("cat");
}
@Test
void scheduleCallable() throws Exception {
Callable<String> callable =
() -> {
value.set(Context.current().getValue(ANIMAL));
return "foo";
};
assertThat(wrapScheduled.schedule(callable, 0, TimeUnit.SECONDS).get()).isEqualTo("foo");
assertThat(value).hasValue("cat");
}
@Test
void scheduleAtFixedRate() {
Runnable runnable = () -> value.set(Context.current().getValue(ANIMAL));
ScheduledFuture<?> future =
wrapScheduled.scheduleAtFixedRate(runnable, 0, 10, TimeUnit.SECONDS);
await().untilAsserted(() -> assertThat(value).hasValue("cat"));
future.cancel(true);
}
@Test
void scheduleWithFixedDelay() {
Runnable runnable = () -> value.set(Context.current().getValue(ANIMAL));
ScheduledFuture<?> future =
wrapScheduled.scheduleWithFixedDelay(runnable, 0, 10, TimeUnit.SECONDS);
await().untilAsserted(() -> assertThat(value).hasValue("cat"));
future.cancel(true);
}
}
}

View File

@ -8,6 +8,7 @@ pluginManagement {
id "io.morethan.jmhreport" version "0.9.0"
id "me.champeau.gradle.jmh" version "0.5.0"
id "net.ltgt.errorprone" version "1.2.0"
id "org.unbroken-dome.test-sets" version "3.0.1"
id "ru.vyarus.animalsniffer" version "1.5.1"
}
@ -20,6 +21,7 @@ pluginManagement {
rootProject.name = "opentelemetry-java"
include ":opentelemetry-all",
":opentelemetry-api",
":opentelemetry-context",
":opentelemetry-context-prop",
":opentelemetry-extension-auto-annotations",
":opentelemetry-extension-runtime-metrics",