Reworking context propagation for Netty 4.0 (#2323)

* Reworking context propagation for Netty 4.0

* Spotless
This commit is contained in:
Nikita Salnikov-Tarnovski 2021-02-22 22:10:01 +02:00 committed by GitHub
parent 2f3ffe89f2
commit 573e512eda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 303 additions and 66 deletions

View File

@ -28,11 +28,7 @@ muzzle {
dependencies { dependencies {
library group: 'io.netty', name: 'netty-codec-http', version: '4.0.0.Final' library group: 'io.netty', name: 'netty-codec-http', version: '4.0.0.Final'
testLibrary group: 'org.asynchttpclient', name: 'async-http-client', version: '2.0.9'
latestDepTestLibrary group: 'io.netty', name: 'netty-codec-http', version: '4.0.56.Final' latestDepTestLibrary group: 'io.netty', name: 'netty-codec-http', version: '4.0.56.Final'
latestDepTestLibrary group: 'org.asynchttpclient', name: 'async-http-client', version: '2.0.+'
} }
// We need to force the dependency to the earliest supported version because other libraries declare newer versions. // We need to force the dependency to the earliest supported version because other libraries declare newer versions.

View File

@ -24,7 +24,9 @@ public class AbstractChannelHandlerContextInstrumentation implements TypeInstrum
@Override @Override
public ElementMatcher<TypeDescription> typeMatcher() { public ElementMatcher<TypeDescription> typeMatcher() {
return named("io.netty.channel.AbstractChannelHandlerContext"); // Different classes depending on Netty version
return named("io.netty.channel.AbstractChannelHandlerContext")
.or(named("io.netty.channel.DefaultChannelHandlerContext"));
} }
@Override @Override

View File

@ -15,16 +15,12 @@ public class AttributeKeys {
private static final WeakMap<ClassLoader, ConcurrentMap<String, AttributeKey<?>>> map = private static final WeakMap<ClassLoader, ConcurrentMap<String, AttributeKey<?>>> map =
WeakMap.Implementation.DEFAULT.get(); WeakMap.Implementation.DEFAULT.get();
private static final WeakMap.ValueSupplier<ClassLoader, ConcurrentMap<String, AttributeKey<?>>> private static final WeakMap.ValueSupplier<ClassLoader, ConcurrentMap<String, AttributeKey<?>>>
mapSupplier = mapSupplier = ignore -> new ConcurrentHashMap<>();
new WeakMap.ValueSupplier<ClassLoader, ConcurrentMap<String, AttributeKey<?>>>() {
@Override
public ConcurrentMap<String, AttributeKey<?>> get(ClassLoader ignore) {
return new ConcurrentHashMap<>();
}
};
public static final AttributeKey<Context> CONNECT_CONTEXT = public static final AttributeKey<Context> CONNECT_CONTEXT =
attributeKey(AttributeKeys.class.getName() + ".connect-context"); attributeKey(AttributeKeys.class.getName() + ".connect-context");
public static final AttributeKey<Context> WRITE_CONTEXT =
attributeKey(AttributeKeys.class.getName() + ".write-context");
// this is the context that has the server span // this is the context that has the server span
public static final AttributeKey<Context> SERVER_SPAN = public static final AttributeKey<Context> SERVER_SPAN =

View File

@ -0,0 +1,56 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.netty.v4_0;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.ClassLoaderMatcher.hasClassesNamed;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;
import io.netty.channel.Channel;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
/**
* This instrumentation preserves the context that was active during call to any "write" operation
* on Netty Channel in that channel's attribute. This context is later used by our various tracing
* handlers to scope the work.
*/
public class ChannelInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<ClassLoader> classLoaderOptimization() {
return hasClassesNamed("io.netty.channel.Channel");
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return implementsInterface(named("io.netty.channel.Channel"));
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod().and(nameStartsWith("write")),
ChannelInstrumentation.class.getName() + "$AttachContextAdvice");
}
public static class AttachContextAdvice {
@Advice.OnMethodEnter
public static void attachContext(@Advice.This Channel channel) {
channel
.attr(AttributeKeys.WRITE_CONTEXT)
.compareAndSet(null, Java8BytecodeBridge.currentContext());
}
}
}

View File

@ -33,6 +33,7 @@ public class NettyInstrumentationModule extends InstrumentationModule {
return asList( return asList(
new ChannelFutureListenerInstrumentation(), new ChannelFutureListenerInstrumentation(),
new NettyChannelPipelineInstrumentation(), new NettyChannelPipelineInstrumentation(),
new AbstractChannelHandlerContextInstrumentation()); new AbstractChannelHandlerContextInstrumentation(),
new ChannelInstrumentation());
} }
} }

View File

@ -24,7 +24,7 @@ public class HttpClientRequestTracingHandler extends ChannelOutboundHandlerAdapt
return; return;
} }
Context parentContext = ctx.channel().attr(AttributeKeys.CONNECT_CONTEXT).getAndRemove(); Context parentContext = ctx.channel().attr(AttributeKeys.WRITE_CONTEXT).getAndRemove();
if (parentContext == null) { if (parentContext == null) {
parentContext = Context.current(); parentContext = Context.current();
} }

View File

@ -0,0 +1,46 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import groovy.lang.Closure;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
/*
Bridges from async Netty world to the sync world of our http client tests.
When request initiated by a test gets a response, calls a given callback and completes given
future with response's status code.
*/
public class ClientHandler extends SimpleChannelInboundHandler<HttpObject> {
private final Closure<Void> callback;
private final CompletableFuture<Integer> responseCode;
public ClientHandler(Closure<Void> callback, CompletableFuture<Integer> responseCode) {
this.callback = callback;
this.responseCode = responseCode;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
if (msg instanceof HttpResponse) {
ctx.pipeline().remove(this);
if (callback != null) {
callback.call();
}
HttpResponse response = (HttpResponse) msg;
responseCode.complete(response.getStatus().code());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

View File

@ -6,47 +6,65 @@
import static io.opentelemetry.instrumentation.test.utils.PortUtils.UNUSABLE_PORT import static io.opentelemetry.instrumentation.test.utils.PortUtils.UNUSABLE_PORT
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
import static org.asynchttpclient.Dsl.asyncHttpClient
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelPipeline
import io.netty.channel.EventLoopGroup
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http.DefaultFullHttpRequest
import io.netty.handler.codec.http.HttpClientCodec
import io.netty.handler.codec.http.HttpHeaders
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpVersion
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import org.asynchttpclient.AsyncCompletionHandler
import org.asynchttpclient.AsyncHttpClient
import org.asynchttpclient.DefaultAsyncHttpClientConfig
import org.asynchttpclient.Response
import spock.lang.AutoCleanup
import spock.lang.Shared import spock.lang.Shared
import spock.lang.Timeout
@Timeout(5)
class Netty40ClientTest extends HttpClientTest implements AgentTestTrait { class Netty40ClientTest extends HttpClientTest implements AgentTestTrait {
@Shared @Shared
def clientConfig = DefaultAsyncHttpClientConfig.Builder.newInstance().setRequestTimeout(TimeUnit.SECONDS.toMillis(10).toInteger()) private Bootstrap bootstrap
@Shared
@AutoCleanup def setupSpec() {
AsyncHttpClient asyncHttpClient = asyncHttpClient(clientConfig) EventLoopGroup group = new NioEventLoopGroup()
bootstrap = new Bootstrap()
bootstrap.group(group)
.channel(NioSocketChannel)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline()
pipeline.addLast(new HttpClientCodec())
}
})
}
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) {
def methodName = "prepare" + method.toLowerCase().capitalize() Channel ch = bootstrap.connect(uri.host, uri.port).sync().channel()
def requestBuilder = asyncHttpClient."$methodName"(uri.toString()) def result = new CompletableFuture<Integer>()
headers.each { requestBuilder.setHeader(it.key, it.value) } ch.pipeline().addLast(new ClientHandler(callback, result))
def response = requestBuilder.execute(new AsyncCompletionHandler() {
@Override def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri.toString(), Unpooled.EMPTY_BUFFER)
Object onCompleted(Response response) throws Exception { HttpHeaders.setHost(request, uri.host)
callback?.call() request.headers().set("user-agent", userAgent())
return response headers.each { k, v -> request.headers().set(k, v) }
}
}).get() ch.writeAndFlush(request).get()
return response.statusCode return result.get(20, TimeUnit.SECONDS)
} }
@Override @Override
String userAgent() { String userAgent() {
return "AHC" return "Netty"
} }
@Override @Override
@ -54,17 +72,20 @@ class Netty40ClientTest extends HttpClientTest implements AgentTestTrait {
false false
} }
@Override
boolean testConnectionFailure() {
false
}
@Override @Override
boolean testRemoteConnection() { boolean testRemoteConnection() {
return false return false
} }
def "connection error (unopened port)"() { @Override
boolean testConnectionFailure() {
false
}
//This is almost identical to "connection error (unopened port)" test from superclass.
//But it uses somewhat different span name for the client span.
//For now creating a separate test for this, hoping to remove this duplication in the future.
def "netty connection error (unopened port)"() {
given: given:
def uri = new URI("http://127.0.0.1:$UNUSABLE_PORT/") // Use numeric address to avoid ipv4/ipv6 confusion def uri = new URI("http://127.0.0.1:$UNUSABLE_PORT/") // Use numeric address to avoid ipv4/ipv6 confusion
@ -79,26 +100,21 @@ class Netty40ClientTest extends HttpClientTest implements AgentTestTrait {
and: and:
assertTraces(1) { assertTraces(1) {
def size = traces[0].size() trace(0, 2) {
trace(0, size) {
basicSpan(it, 0, "parent", null, thrownException) basicSpan(it, 0, "parent", null, thrownException)
span(1) {
// AsyncHttpClient retries across multiple resolved IP addresses (e.g. 127.0.0.1 and 0:0:0:0:0:0:0:1) name "CONNECT"
// for up to a total of 10 seconds (default connection time limit) childOf span(0)
for (def i = 1; i < size; i++) { errored true
span(i) { Class errorClass = ConnectException
name "CONNECT" try {
childOf span(0) errorClass = Class.forName('io.netty.channel.AbstractChannel$AnnotatedConnectException')
errored true } catch (ClassNotFoundException e) {
Class errorClass = ConnectException // Older versions use 'java.net.ConnectException' and do not have 'io.netty.channel.AbstractChannel$AnnotatedConnectException'
try {
errorClass = Class.forName('io.netty.channel.AbstractChannel$AnnotatedConnectException')
} catch (ClassNotFoundException e) {
// Older versions use 'java.net.ConnectException' and do not have 'io.netty.channel.AbstractChannel$AnnotatedConnectException'
}
errorEvent(errorClass, ~/Connection refused:( no further information:)? \/127.0.0.1:$UNUSABLE_PORT/)
} }
errorEvent(errorClass, "Connection refused: /127.0.0.1:$UNUSABLE_PORT")
} }
} }
} }

View File

@ -57,4 +57,9 @@ class PlayWsClientTest extends HttpClientTest implements AgentTestTrait {
boolean testRemoteConnection() { boolean testRemoteConnection() {
return false return false
} }
@Override
boolean testCausality() {
return false
}
} }

View File

@ -0,0 +1,90 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.vertx;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.ClassLoaderMatcher.hasClassesNamed;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import io.vertx.core.http.HttpClientRequest;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
/**
* This hooks into two points in Vertx HttpClientRequest lifecycle.
*
* <p>First, when request is finished by the client, meaning that it is ready to be sent out, then
* {@link AttachContextAdvice} attaches current context to that request.
*
* <p>Second, when HttpClientRequest calls any method that actually performs write on the underlying
* Netty channel {@link MountContextAdvice} scopes that method call into the context captured on the
* first step.
*
* <p>This ensures proper context transfer between the client who actually initiated the http call
* and the Netty Channel that will perform that operation.
*/
public class HttpRequestInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<ClassLoader> classLoaderOptimization() {
return hasClassesNamed("io.vertx.core.http.HttpClientRequest");
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return implementsInterface(named("io.vertx.core.http.HttpClientRequest"));
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
transformers.put(
isMethod().and(nameStartsWith("end")),
HttpRequestInstrumentation.class.getName() + "$AttachContextAdvice");
transformers.put(
isMethod().and(nameStartsWith("write").and(isPrivate())),
HttpRequestInstrumentation.class.getName() + "$MountContextAdvice");
return transformers;
}
public static class AttachContextAdvice {
@Advice.OnMethodEnter
public static void attachContext(@Advice.This HttpClientRequest request) {
InstrumentationContext.get(HttpClientRequest.class, Context.class)
.put(request, Java8BytecodeBridge.currentContext());
}
}
public static class MountContextAdvice {
@Advice.OnMethodEnter
public static Scope mountContext(@Advice.This HttpClientRequest request) {
Context context =
InstrumentationContext.get(HttpClientRequest.class, Context.class).get(request);
return context == null ? null : context.makeCurrent();
}
@Advice.OnMethodExit
public static void unmountContext(@Advice.Enter Scope scope) {
if (scope != null) {
scope.close();
}
}
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.vertx;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import com.google.auto.service.AutoService;
import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.List;
import java.util.Map;
@AutoService(InstrumentationModule.class)
public class VertxClientInstrumentationModule extends InstrumentationModule {
public VertxClientInstrumentationModule() {
super("vertx-client", "vertx");
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new HttpRequestInstrumentation());
}
@Override
public Map<String, String> contextStore() {
return singletonMap("io.vertx.core.http.HttpClientRequest", Context.class.getName());
}
}

View File

@ -54,9 +54,4 @@ class VertxHttpClientTest extends HttpClientTest implements AgentTestTrait {
// FIXME: figure out how to configure timeouts. // FIXME: figure out how to configure timeouts.
false false
} }
@Override
boolean testCausality() {
false
}
} }