Add helper class to capture context using ScheduledExecutorService (#6712)

Signed-off-by: Adriano Machado <60320+ammachado@users.noreply.github.com>
This commit is contained in:
Adriano Machado 2024-10-07 11:08:21 -04:00 committed by GitHub
parent 0f859b4385
commit eb53fe3a61
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 383 additions and 49 deletions

1
.gitignore vendored
View File

@ -1,6 +1,7 @@
# Gradle # Gradle
build build
.gradle .gradle
.kotlin
local.properties local.properties
out/ out/

View File

@ -27,6 +27,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -135,9 +136,37 @@ public interface Context {
* @since 1.1.0 * @since 1.1.0
*/ */
static ExecutorService taskWrapping(ExecutorService executorService) { static ExecutorService taskWrapping(ExecutorService executorService) {
if (executorService instanceof CurrentContextExecutorService) {
return executorService;
}
return new CurrentContextExecutorService(executorService); return new CurrentContextExecutorService(executorService);
} }
/**
* Returns an {@link ScheduledExecutorService} which delegates to the provided {@code
* executorService}, wrapping all invocations of {@link ExecutorService} methods such as {@link
* ExecutorService#execute(Runnable)} or {@link ExecutorService#submit(Runnable)} with the
* {@linkplain Context#current() current context} at the time of invocation.
*
* <p>This is generally used to create an {@link ScheduledExecutorService} which will forward the
* {@link Context} during an invocation to another thread. For example, you may use something like
* {@code ScheduledExecutorService dbExecutor = Context.wrapTasks(threadPool)} to ensure calls
* like {@code dbExecutor.execute(() -> database.query())} have {@link Context} available on the
* thread executing database queries.
*
* <p>Note: The context will not be propagated for {@link
* ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} and {@link
* ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)} calls.
*
* @since 1.43.0
*/
static ScheduledExecutorService taskWrapping(ScheduledExecutorService executorService) {
if (executorService instanceof CurrentContextScheduledExecutorService) {
return executorService;
}
return new CurrentContextScheduledExecutorService(executorService);
}
/** /**
* Returns the value stored in this {@link Context} for the given {@link ContextKey}, or {@code * Returns the value stored in this {@link Context} for the given {@link ContextKey}, or {@code
* null} if there is no value for the key in this context. * null} if there is no value for the key in this context.

View File

@ -14,7 +14,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
final class CurrentContextExecutorService extends ForwardingExecutorService { class CurrentContextExecutorService extends ForwardingExecutorService {
CurrentContextExecutorService(ExecutorService delegate) { CurrentContextExecutorService(ExecutorService delegate) {
super(delegate); super(delegate);

View File

@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.context;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
final class CurrentContextScheduledExecutorService extends CurrentContextExecutorService
implements ScheduledExecutorService {
private final ScheduledExecutorService delegate;
CurrentContextScheduledExecutorService(ScheduledExecutorService delegate) {
super(delegate);
this.delegate = delegate;
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return delegate.schedule(Context.current().wrap(command), delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return delegate.schedule(Context.current().wrap(callable), delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
}

View File

@ -9,6 +9,9 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await; import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -28,6 +31,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -118,7 +122,7 @@ class ContextTest {
} }
@Test @Test
public void closingScopeWhenNotActiveIsNoopAndLogged() { void closingScopeWhenNotActiveIsNoopAndLogged() {
Context initial = Context.current(); Context initial = Context.current();
Context context = initial.with(ANIMAL, "cat"); Context context = initial.with(ANIMAL, "cat");
try (Scope scope = context.makeCurrent()) { try (Scope scope = context.makeCurrent()) {
@ -137,7 +141,7 @@ class ContextTest {
@SuppressWarnings("MustBeClosedChecker") @SuppressWarnings("MustBeClosedChecker")
@Test @Test
public void closeScopeIsIdempotent() { void closeScopeIsIdempotent() {
Context initial = Context.current(); Context initial = Context.current();
Context context1 = Context.root().with(ANIMAL, "cat"); Context context1 = Context.root().with(ANIMAL, "cat");
Scope scope1 = context1.makeCurrent(); Scope scope1 = context1.makeCurrent();
@ -188,11 +192,10 @@ class ContextTest {
assertThat(context5).isSameAs(context4); assertThat(context5).isSameAs(context4);
String dog = new String("dog"); String dog = new String("dog");
assertThat(dog).isEqualTo("dog"); assertThat(dog).isEqualTo("dog").isNotSameAs("dog");
assertThat(dog).isNotSameAs("dog");
Context context6 = context5.with(ANIMAL, dog); Context context6 = context5.with(ANIMAL, dog);
assertThat(context6.get(ANIMAL)).isEqualTo("dog"); assertThat(context6.get(ANIMAL)).isEqualTo("dog");
// We reuse context object when values match by reference, not value. // We reuse the context object when values match by reference, not value.
assertThat(context6).isNotSameAs(context5); assertThat(context6).isNotSameAs(context5);
} }
@ -234,7 +237,7 @@ class ContextTest {
void wrapFunction() { void wrapFunction() {
AtomicReference<String> value = new AtomicReference<>(); AtomicReference<String> value = new AtomicReference<>();
Function<String, String> callback = Function<String, String> callback =
(a) -> { a -> {
value.set(Context.current().get(ANIMAL)); value.set(Context.current().get(ANIMAL));
return "foo"; return "foo";
}; };
@ -273,7 +276,7 @@ class ContextTest {
AtomicReference<String> value = new AtomicReference<>(); AtomicReference<String> value = new AtomicReference<>();
AtomicBoolean consumed = new AtomicBoolean(); AtomicBoolean consumed = new AtomicBoolean();
Consumer<String> callback = Consumer<String> callback =
(a) -> { a -> {
value.set(Context.current().get(ANIMAL)); value.set(Context.current().get(ANIMAL));
consumed.set(true); consumed.set(true);
}; };
@ -362,7 +365,7 @@ class ContextTest {
@TestInstance(Lifecycle.PER_CLASS) @TestInstance(Lifecycle.PER_CLASS)
class WrapExecutorService { class WrapExecutorService {
protected ScheduledExecutorService executor; protected ExecutorService executor;
protected ExecutorService wrapped; protected ExecutorService wrapped;
protected AtomicReference<String> value; protected AtomicReference<String> value;
@ -501,6 +504,204 @@ class ContextTest {
} }
} }
@Nested
@TestInstance(Lifecycle.PER_CLASS)
class WrapScheduledExecutorService {
protected ScheduledExecutorService executor;
protected ScheduledExecutorService wrapped;
protected AtomicReference<String> value;
protected ScheduledExecutorService wrap(ScheduledExecutorService executorService) {
return CAT.wrap(executorService);
}
@BeforeAll
void initExecutor() {
executor = Executors.newSingleThreadScheduledExecutor();
wrapped = wrap(executor);
}
@AfterAll
void stopExecutor() {
executor.shutdown();
}
@BeforeEach
void setUp() {
value = new AtomicReference<>();
}
@Test
void execute() {
Runnable runnable = () -> value.set(Context.current().get(ANIMAL));
wrapped.execute(runnable);
await().untilAsserted(() -> assertThat(value).hasValue("cat"));
}
@Test
void submitRunnable() {
Runnable runnable = () -> value.set(Context.current().get(ANIMAL));
Futures.getUnchecked(wrapped.submit(runnable));
assertThat(value).hasValue("cat");
}
@Test
void submitRunnableResult() {
Runnable runnable = () -> value.set(Context.current().get(ANIMAL));
assertThat(Futures.getUnchecked(wrapped.submit(runnable, "foo"))).isEqualTo("foo");
assertThat(value).hasValue("cat");
}
@Test
void submitCallable() {
Callable<String> callable =
() -> {
value.set(Context.current().get(ANIMAL));
return "foo";
};
assertThat(Futures.getUnchecked(wrapped.submit(callable))).isEqualTo("foo");
assertThat(value).hasValue("cat");
}
@Test
void invokeAll() throws Exception {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
Callable<String> callable1 =
() -> {
value1.set(Context.current().get(ANIMAL));
return "foo";
};
Callable<String> callable2 =
() -> {
value2.set(Context.current().get(ANIMAL));
return "bar";
};
List<Future<String>> futures = wrapped.invokeAll(Arrays.asList(callable1, callable2));
assertThat(futures.get(0).get()).isEqualTo("foo");
assertThat(futures.get(1).get()).isEqualTo("bar");
assertThat(value1).hasValue("cat");
assertThat(value2).hasValue("cat");
}
@Test
void invokeAllTimeout() throws Exception {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
Callable<String> callable1 =
() -> {
value1.set(Context.current().get(ANIMAL));
return "foo";
};
Callable<String> callable2 =
() -> {
value2.set(Context.current().get(ANIMAL));
return "bar";
};
List<Future<String>> futures =
wrapped.invokeAll(Arrays.asList(callable1, callable2), 10, TimeUnit.SECONDS);
assertThat(futures.get(0).get()).isEqualTo("foo");
assertThat(futures.get(1).get()).isEqualTo("bar");
assertThat(value1).hasValue("cat");
assertThat(value2).hasValue("cat");
}
@Test
void invokeAny() throws Exception {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
Callable<String> callable1 =
() -> {
value1.set(Context.current().get(ANIMAL));
throw new IllegalStateException("callable2 wins");
};
Callable<String> callable2 =
() -> {
value2.set(Context.current().get(ANIMAL));
return "bar";
};
assertThat(wrapped.invokeAny(Arrays.asList(callable1, callable2))).isEqualTo("bar");
assertThat(value1).hasValue("cat");
assertThat(value2).hasValue("cat");
}
@Test
void invokeAnyTimeout() throws Exception {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
Callable<String> callable1 =
() -> {
value1.set(Context.current().get(ANIMAL));
throw new IllegalStateException("callable2 wins");
};
Callable<String> callable2 =
() -> {
value2.set(Context.current().get(ANIMAL));
return "bar";
};
assertThat(wrapped.invokeAny(Arrays.asList(callable1, callable2), 10, TimeUnit.SECONDS))
.isEqualTo("bar");
assertThat(value1).hasValue("cat");
assertThat(value2).hasValue("cat");
}
@Test
void scheduleRunnable() {
Runnable runnable = () -> value.set(Context.current().get(ANIMAL));
assertThat(Futures.getUnchecked(wrapped.schedule(runnable, 1L, TimeUnit.MILLISECONDS)))
.isNull();
assertThat(value).hasValue("cat");
}
@Test
void scheduleCallable() {
Callable<String> callable =
() -> {
value.set(Context.current().get(ANIMAL));
return "foo";
};
assertThat(Futures.getUnchecked(wrapped.schedule(callable, 1L, TimeUnit.MILLISECONDS)))
.isEqualTo("foo");
assertThat(value).hasValue("cat");
}
@Test
void scheduleAtFixedRate() {
LongAdder longAdder = new LongAdder();
Runnable runnable = longAdder::increment;
Future<?> future = wrapped.scheduleAtFixedRate(runnable, 1L, 2L, TimeUnit.NANOSECONDS);
assertThat(future).isNotNull();
await()
.await()
.untilAsserted(
() -> {
if (!future.isCancelled()) {
future.cancel(true);
}
assertThat(longAdder.intValue()).isGreaterThan(1);
});
assertThat(longAdder.intValue()).isGreaterThan(1);
}
@Test
void scheduleWithFixedDelay() {
LongAdder longAdder = new LongAdder();
Runnable runnable = longAdder::increment;
Future<?> future = wrapped.scheduleWithFixedDelay(runnable, 1L, 2L, TimeUnit.NANOSECONDS);
assertThat(future).isNotNull();
await()
.await()
.untilAsserted(
() -> {
if (!future.isCancelled()) {
future.cancel(true);
}
assertThat(longAdder.intValue()).isGreaterThan(1);
});
}
}
@Nested @Nested
@TestInstance(Lifecycle.PER_CLASS) @TestInstance(Lifecycle.PER_CLASS)
class CurrentContextWrappingExecutorService extends WrapExecutorService { class CurrentContextWrappingExecutorService extends WrapExecutorService {
@ -525,9 +726,34 @@ class ContextTest {
} }
} }
@Nested
@TestInstance(Lifecycle.PER_CLASS)
class CurrentContextWrappingScheduledExecutorService extends WrapScheduledExecutorService {
@Override
protected ScheduledExecutorService wrap(ScheduledExecutorService executorService) {
return Context.taskWrapping(executorService);
}
private Scope scope;
@BeforeEach
// Closed in AfterEach
@SuppressWarnings("MustBeClosedChecker")
void makeCurrent() {
scope = CAT.makeCurrent();
}
@AfterEach
void close() {
scope.close();
scope = null;
}
}
@Test @Test
void keyToString() { void keyToString() {
assertThat(ANIMAL.toString()).isEqualTo("animal"); assertThat(ANIMAL).hasToString("animal");
} }
@Test @Test
@ -552,6 +778,7 @@ class ContextTest {
@Test @Test
void delegatesCleanupMethods() throws Exception { void delegatesCleanupMethods() throws Exception {
ExecutorService wrapped = CAT.wrap(executor); ExecutorService wrapped = CAT.wrap(executor);
doNothing().when(executor).shutdown();
wrapped.shutdown(); wrapped.shutdown();
verify(executor).shutdown(); verify(executor).shutdown();
verifyNoMoreInteractions(executor); verifyNoMoreInteractions(executor);
@ -573,57 +800,74 @@ class ContextTest {
} }
} }
// We test real context-related above but should test cleanup gets delegated, which is best with
// a mock.
@Nested @Nested
@TestInstance(Lifecycle.PER_CLASS) @TestInstance(Lifecycle.PER_CLASS)
class WrapScheduledExecutorService extends WrapExecutorService { @SuppressWarnings("MockitoDoSetup")
class DelegatesToScheduledExecutorService {
private ScheduledExecutorService wrapScheduled; @Mock private ScheduledExecutorService executor;
@Mock private ScheduledFuture<?> scheduledFuture;
@BeforeEach
void wrapScheduled() {
wrapScheduled = CAT.wrap(executor);
}
@Test @Test
void scheduleRunnable() throws Exception { void delegatesCleanupMethods() throws Exception {
Runnable runnable = () -> value.set(Context.current().get(ANIMAL)); ScheduledExecutorService wrapped = CAT.wrap(executor);
wrapScheduled.schedule(runnable, 0, TimeUnit.SECONDS).get();
assertThat(value).hasValue("cat");
}
@Test wrapped.shutdown();
void scheduleCallable() throws Exception { verify(executor).shutdown();
Callable<String> callable = verifyNoMoreInteractions(executor);
() -> {
value.set(Context.current().get(ANIMAL));
return "foo";
};
assertThat(wrapScheduled.schedule(callable, 0, TimeUnit.SECONDS).get()).isEqualTo("foo");
assertThat(value).hasValue("cat");
}
@Test wrapped.shutdownNow();
void scheduleAtFixedRate() { verify(executor).shutdownNow();
Runnable runnable = () -> value.set(Context.current().get(ANIMAL)); verifyNoMoreInteractions(executor);
ScheduledFuture<?> future =
wrapScheduled.scheduleAtFixedRate(runnable, 0, 10, TimeUnit.SECONDS);
await().untilAsserted(() -> assertThat(value).hasValue("cat"));
future.cancel(true);
}
@Test when(executor.isShutdown()).thenReturn(true);
void scheduleWithFixedDelay() { assertThat(wrapped.isShutdown()).isTrue();
Runnable runnable = () -> value.set(Context.current().get(ANIMAL)); verify(executor).isShutdown();
ScheduledFuture<?> future = verifyNoMoreInteractions(executor);
wrapScheduled.scheduleWithFixedDelay(runnable, 0, 10, TimeUnit.SECONDS);
await().untilAsserted(() -> assertThat(value).hasValue("cat")); when(wrapped.isTerminated()).thenReturn(true);
future.cancel(true); assertThat(wrapped.isTerminated()).isTrue();
verify(executor).isTerminated();
verifyNoMoreInteractions(executor);
when(executor.awaitTermination(anyLong(), any())).thenReturn(true);
assertThat(wrapped.awaitTermination(1L, TimeUnit.SECONDS)).isTrue();
verify(executor).awaitTermination(1L, TimeUnit.SECONDS);
verifyNoMoreInteractions(executor);
doReturn(scheduledFuture)
.when(executor)
.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
assertThat((Future<?>) wrapped.schedule(() -> {}, 1L, TimeUnit.SECONDS))
.isSameAs(scheduledFuture);
verify(executor).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
verifyNoMoreInteractions(executor);
doReturn(scheduledFuture)
.when(executor)
.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
assertThat((Future<?>) wrapped.scheduleAtFixedRate(() -> {}, 1L, 1L, TimeUnit.SECONDS))
.isSameAs(scheduledFuture);
verify(executor)
.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
verifyNoMoreInteractions(executor);
doReturn(scheduledFuture)
.when(executor)
.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
assertThat((Future<?>) wrapped.scheduleWithFixedDelay(() -> {}, 1L, 1L, TimeUnit.SECONDS))
.isSameAs(scheduledFuture);
verify(executor)
.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
verifyNoMoreInteractions(executor);
} }
} }
@Test @Test
void emptyContext() { void emptyContext() {
assertThat(Context.root().get(new HashCollidingKey())).isEqualTo(null); assertThat(Context.root().get(new HashCollidingKey())).isNull();
} }
@Test @Test
@ -646,6 +890,20 @@ class ContextTest {
assertThat(twoKeys.get(cheese)).isEqualTo("whiz"); assertThat(twoKeys.get(cheese)).isEqualTo("whiz");
} }
@Test
void doNotWrapExecutorService() {
ExecutorService executor = mock(CurrentContextExecutorService.class);
ExecutorService wrapped = Context.taskWrapping(executor);
assertThat(wrapped).isSameAs(executor);
}
@Test
void doNotWrapScheduledExecutorService() {
ScheduledExecutorService executor = mock(CurrentContextScheduledExecutorService.class);
ScheduledExecutorService wrapped = Context.taskWrapping(executor);
assertThat(wrapped).isSameAs(executor);
}
@SuppressWarnings("HashCodeToString") @SuppressWarnings("HashCodeToString")
private static class HashCollidingKey implements ContextKey<String> { private static class HashCollidingKey implements ContextKey<String> {
@Override @Override

View File

@ -1,2 +1,4 @@
Comparing source compatibility of opentelemetry-context-1.43.0-SNAPSHOT.jar against opentelemetry-context-1.42.1.jar Comparing source compatibility of opentelemetry-context-1.43.0-SNAPSHOT.jar against opentelemetry-context-1.42.1.jar
No changes. *** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.context.Context (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) STATIC(+) java.util.concurrent.ScheduledExecutorService taskWrapping(java.util.concurrent.ScheduledExecutorService)