From 32e28c1b1e4f3eda1590954029f74772eaefb579 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Sat, 29 May 2021 01:01:31 +0300 Subject: [PATCH] Propagate context to armeria callbacks (#3108) --- ...eamMessageSubscriptionInstrumentation.java | 42 ++++++++++++++ .../v1_3/ArmeriaInstrumentationModule.java | 4 +- .../armeria/v1_3/SubscriberWrapper.java | 57 +++++++++++++++++++ .../armeria/v1_3/ArmeriaHttpClientTest.groovy | 5 ++ .../v1_3/AbstractArmeriaHttpClientTest.groovy | 6 -- 5 files changed, 107 insertions(+), 7 deletions(-) create mode 100644 instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/AbstractStreamMessageSubscriptionInstrumentation.java create mode 100644 instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/SubscriberWrapper.java diff --git a/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/AbstractStreamMessageSubscriptionInstrumentation.java b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/AbstractStreamMessageSubscriptionInstrumentation.java new file mode 100644 index 0000000000..e36e6fca0f --- /dev/null +++ b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/AbstractStreamMessageSubscriptionInstrumentation.java @@ -0,0 +1,42 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.armeria.v1_3; + +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.reactivestreams.Subscriber; + +public class AbstractStreamMessageSubscriptionInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return named("com.linecorp.armeria.common.stream.AbstractStreamMessage$SubscriptionImpl"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isConstructor() + .and( + takesArgument(0, named("com.linecorp.armeria.common.stream.AbstractStreamMessage"))) + .and(takesArgument(1, named("org.reactivestreams.Subscriber"))), + AbstractStreamMessageSubscriptionInstrumentation.class.getName() + "$WrapSubscriberAdvice"); + } + + public static class WrapSubscriberAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void attachContext( + @Advice.Argument(value = 1, readOnly = false) Subscriber subscriber) { + subscriber = SubscriberWrapper.wrap(subscriber); + } + } +} diff --git a/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/ArmeriaInstrumentationModule.java b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/ArmeriaInstrumentationModule.java index b7ae8a9bb8..b85398965d 100644 --- a/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/ArmeriaInstrumentationModule.java +++ b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/ArmeriaInstrumentationModule.java @@ -29,6 +29,8 @@ public class ArmeriaInstrumentationModule extends InstrumentationModule { @Override public List typeInstrumentations() { return asList( - new ArmeriaWebClientBuilderInstrumentation(), new ArmeriaServerBuilderInstrumentation()); + new ArmeriaWebClientBuilderInstrumentation(), + new ArmeriaServerBuilderInstrumentation(), + new AbstractStreamMessageSubscriptionInstrumentation()); } } diff --git a/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/SubscriberWrapper.java b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/SubscriberWrapper.java new file mode 100644 index 0000000000..a28d33fa7f --- /dev/null +++ b/instrumentation/armeria-1.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/armeria/v1_3/SubscriberWrapper.java @@ -0,0 +1,57 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.armeria.v1_3; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +public class SubscriberWrapper implements Subscriber { + private final Subscriber delegate; + private final Context context; + + private SubscriberWrapper(Subscriber delegate, Context context) { + this.delegate = delegate; + this.context = context; + } + + public static Subscriber wrap(Subscriber delegate) { + Context context = Context.current(); + if (context != Context.root()) { + return new SubscriberWrapper(delegate, context); + } + return delegate; + } + + @Override + public void onSubscribe(Subscription subscription) { + try (Scope ignore = context.makeCurrent()) { + delegate.onSubscribe(subscription); + } + } + + @Override + public void onNext(Object o) { + try (Scope ignore = context.makeCurrent()) { + delegate.onNext(o); + } + } + + @Override + public void onError(Throwable throwable) { + try (Scope ignore = context.makeCurrent()) { + delegate.onError(throwable); + } + } + + @Override + public void onComplete() { + try (Scope ignore = context.makeCurrent()) { + delegate.onComplete(); + } + } +} diff --git a/instrumentation/armeria-1.3/library/src/test/groovy/io/opentelemetry/instrumentation/armeria/v1_3/ArmeriaHttpClientTest.groovy b/instrumentation/armeria-1.3/library/src/test/groovy/io/opentelemetry/instrumentation/armeria/v1_3/ArmeriaHttpClientTest.groovy index a0c743d9df..6bee41bddd 100644 --- a/instrumentation/armeria-1.3/library/src/test/groovy/io/opentelemetry/instrumentation/armeria/v1_3/ArmeriaHttpClientTest.groovy +++ b/instrumentation/armeria-1.3/library/src/test/groovy/io/opentelemetry/instrumentation/armeria/v1_3/ArmeriaHttpClientTest.groovy @@ -26,4 +26,9 @@ class ArmeriaHttpClientTest extends AbstractArmeriaHttpClientTest implements Lib boolean testCallbackWithParent() { false } + + @Override + boolean testErrorWithCallback() { + return false + } } diff --git a/instrumentation/armeria-1.3/testing/src/main/groovy/io/opentelemetry/instrumentation/armeria/v1_3/AbstractArmeriaHttpClientTest.groovy b/instrumentation/armeria-1.3/testing/src/main/groovy/io/opentelemetry/instrumentation/armeria/v1_3/AbstractArmeriaHttpClientTest.groovy index 5c7c0f8a67..b6b2e7d006 100644 --- a/instrumentation/armeria-1.3/testing/src/main/groovy/io/opentelemetry/instrumentation/armeria/v1_3/AbstractArmeriaHttpClientTest.groovy +++ b/instrumentation/armeria-1.3/testing/src/main/groovy/io/opentelemetry/instrumentation/armeria/v1_3/AbstractArmeriaHttpClientTest.groovy @@ -63,12 +63,6 @@ abstract class AbstractArmeriaHttpClientTest extends HttpClientTest false } - // TODO: context not propagated to callback - @Override - boolean testErrorWithCallback() { - return false - } - @Override List> extraAttributes() { [