Propagate context to armeria callbacks (#3108)

This commit is contained in:
Lauri Tulmin 2021-05-29 01:01:31 +03:00 committed by GitHub
parent e16cf3001f
commit 32e28c1b1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 107 additions and 7 deletions

View File

@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.armeria.v1_3;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.reactivestreams.Subscriber;
public class AbstractStreamMessageSubscriptionInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("com.linecorp.armeria.common.stream.AbstractStreamMessage$SubscriptionImpl");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor()
.and(
takesArgument(0, named("com.linecorp.armeria.common.stream.AbstractStreamMessage")))
.and(takesArgument(1, named("org.reactivestreams.Subscriber"))),
AbstractStreamMessageSubscriptionInstrumentation.class.getName() + "$WrapSubscriberAdvice");
}
public static class WrapSubscriberAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void attachContext(
@Advice.Argument(value = 1, readOnly = false) Subscriber subscriber) {
subscriber = SubscriberWrapper.wrap(subscriber);
}
}
}

View File

@ -29,6 +29,8 @@ public class ArmeriaInstrumentationModule extends InstrumentationModule {
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new ArmeriaWebClientBuilderInstrumentation(), new ArmeriaServerBuilderInstrumentation());
new ArmeriaWebClientBuilderInstrumentation(),
new ArmeriaServerBuilderInstrumentation(),
new AbstractStreamMessageSubscriptionInstrumentation());
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.armeria.v1_3;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class SubscriberWrapper implements Subscriber<Object> {
private final Subscriber<Object> delegate;
private final Context context;
private SubscriberWrapper(Subscriber<Object> delegate, Context context) {
this.delegate = delegate;
this.context = context;
}
public static Subscriber<Object> wrap(Subscriber<Object> delegate) {
Context context = Context.current();
if (context != Context.root()) {
return new SubscriberWrapper(delegate, context);
}
return delegate;
}
@Override
public void onSubscribe(Subscription subscription) {
try (Scope ignore = context.makeCurrent()) {
delegate.onSubscribe(subscription);
}
}
@Override
public void onNext(Object o) {
try (Scope ignore = context.makeCurrent()) {
delegate.onNext(o);
}
}
@Override
public void onError(Throwable throwable) {
try (Scope ignore = context.makeCurrent()) {
delegate.onError(throwable);
}
}
@Override
public void onComplete() {
try (Scope ignore = context.makeCurrent()) {
delegate.onComplete();
}
}
}

View File

@ -26,4 +26,9 @@ class ArmeriaHttpClientTest extends AbstractArmeriaHttpClientTest implements Lib
boolean testCallbackWithParent() {
false
}
@Override
boolean testErrorWithCallback() {
return false
}
}

View File

@ -63,12 +63,6 @@ abstract class AbstractArmeriaHttpClientTest extends HttpClientTest<HttpRequest>
false
}
// TODO: context not propagated to callback
@Override
boolean testErrorWithCallback() {
return false
}
@Override
List<AttributeKey<?>> extraAttributes() {
[