Rework context propagation to redisson async callback (#5748)

* Rework context propagation to redisson async callback

* add comments
This commit is contained in:
Lauri Tulmin 2022-04-05 19:59:22 +03:00 committed by GitHub
parent 4815f1e7d1
commit 27e8201618
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 175 additions and 18 deletions

View File

@ -13,29 +13,58 @@ public final class CompletableFutureWrapper<T> extends CompletableFuture<T>
implements PromiseWrapper<T> {
private volatile EndOperationListener<T> endOperationListener;
private CompletableFutureWrapper(CompletableFuture<T> delegate, Context context) {
private CompletableFutureWrapper(CompletableFuture<T> delegate) {
this.whenComplete(
(result, error) -> {
EndOperationListener<T> endOperationListener = this.endOperationListener;
if (endOperationListener != null) {
endOperationListener.accept(result, error);
}
try (Scope ignored = context.makeCurrent()) {
if (error != null) {
delegate.completeExceptionally(error);
} else {
delegate.complete(result);
}
}
});
}
/**
* Wrap {@link CompletableFuture} so that {@link EndOperationListener}, that is used to end the
* span, could be attached to it.
*/
public static <T> CompletableFuture<T> wrap(CompletableFuture<T> delegate) {
if (delegate instanceof CompletableFutureWrapper) {
return delegate;
}
return new CompletableFutureWrapper<>(delegate, Context.current());
return new CompletableFutureWrapper<>(delegate);
}
/**
* Wrap {@link CompletableFuture} to run callbacks with the context that was current at the time
* this method was called.
*
* <p>This method should be called on, or as close as possible to, the {@link CompletableFuture}
* that is returned to the user to ensure that the callbacks added by user are run in appropriate
* context.
*/
public static <T> CompletableFuture<T> wrapContext(CompletableFuture<T> future) {
Context context = Context.current();
// when input future is completed, complete result future with context that was current
// at the time when the future was wrapped
CompletableFuture<T> result = new CompletableFuture<>();
future.whenComplete(
(T value, Throwable throwable) -> {
try (Scope ignored = context.makeCurrent()) {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
result.complete(value);
}
}
});
return result;
}
@Override

View File

@ -0,0 +1,40 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.redisson;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.redisson.misc.RPromise;
public class RedisCommandAsyncServiceInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.redisson.command.CommandAsyncService");
}
@Override
public void transform(TypeTransformer transformer) {
// used before 3.16.8
transformer.applyAdviceToMethod(
named("async").and(takesArgument(5, named("org.redisson.misc.RPromise"))),
this.getClass().getName() + "$WrapPromiseAdvice");
}
@SuppressWarnings("unused")
public static class WrapPromiseAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(value = 5, readOnly = false) RPromise<?> promise) {
promise = RedissonPromiseWrapper.wrapContext(promise);
}
}
}

View File

@ -34,7 +34,7 @@ public class RedisCommandDataInstrumentation implements TypeInstrumentation {
this.getClass().getName() + "$WrapPromiseAdvice");
// since 3.16.8
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, named("java.util.concurrent.CompletableFuture"))),
isConstructor().and(takesArgument(0, CompletableFuture.class)),
this.getClass().getName() + "$WrapCompletableFutureAdvice");
}

View File

@ -0,0 +1,56 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.redisson;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
public class RedissonCompletableFutureWrapperInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.redisson.misc.CompletableFutureWrapper");
}
@Override
public void transform(TypeTransformer transformer) {
// used since 3.16.8
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, CompletionStage.class)),
this.getClass().getName() + "$WrapCompletionStageAdvice");
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, CompletableFuture.class)),
this.getClass().getName() + "$WrapCompletableFutureAdvice");
}
@SuppressWarnings("unused")
public static class WrapCompletableFutureAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) CompletableFuture<?> completableFuture) {
completableFuture = CompletableFutureWrapper.wrapContext(completableFuture);
}
}
@SuppressWarnings("unused")
public static class WrapCompletionStageAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) CompletionStage<?> completionStage) {
completionStage = CompletableFutureWrapper.wrapContext(completionStage.toCompletableFuture());
}
}
}

View File

@ -21,6 +21,10 @@ public class RedissonInstrumentationModule extends InstrumentationModule {
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(new RedisConnectionInstrumentation(), new RedisCommandDataInstrumentation());
return asList(
new RedisConnectionInstrumentation(),
new RedisCommandDataInstrumentation(),
new RedisCommandAsyncServiceInstrumentation(),
new RedissonCompletableFutureWrapperInstrumentation());
}
}

View File

@ -13,29 +13,57 @@ import org.redisson.misc.RedissonPromise;
public class RedissonPromiseWrapper<T> extends RedissonPromise<T> implements PromiseWrapper<T> {
private volatile EndOperationListener<T> endOperationListener;
private RedissonPromiseWrapper(RPromise<T> delegate, Context context) {
private RedissonPromiseWrapper(RPromise<T> delegate) {
this.whenComplete(
(result, error) -> {
EndOperationListener<T> endOperationListener = this.endOperationListener;
if (endOperationListener != null) {
endOperationListener.accept(result, error);
}
try (Scope ignored = context.makeCurrent()) {
if (error != null) {
delegate.tryFailure(error);
} else {
delegate.trySuccess(result);
}
}
});
}
/**
* Wrap {@link RPromise} so that {@link EndOperationListener}, that is used to end the span, could
* be attached to it.
*/
public static <T> RPromise<T> wrap(RPromise<T> delegate) {
if (delegate instanceof RedissonPromiseWrapper) {
return delegate;
}
return new RedissonPromiseWrapper<>(delegate, Context.current());
return new RedissonPromiseWrapper<>(delegate);
}
/**
* Wrap {@link RPromise} to run callbacks with the context that was current at the time this
* method was called.
*
* <p>This method should be called on, or as close as possible to, the {@link RPromise} that is
* returned to the user to ensure that the callbacks added by user are run in appropriate context.
*/
public static <T> RPromise<T> wrapContext(RPromise<T> promise) {
Context context = Context.current();
// when returned promise is completed, complete input promise with context that was current
// at the time when the promise was wrapped
RPromise<T> result = new RedissonPromise<T>();
result.whenComplete(
(value, error) -> {
try (Scope ignored = context.makeCurrent()) {
if (error != null) {
promise.tryFailure(error);
} else {
promise.trySuccess(value);
}
}
});
return result;
}
@Override