Merge pull request #900 from DataDog/labbati/reactor-08
Calculate depth per-handler class in netty channel pipeline instrumentation
This commit is contained in:
commit
d5227cde8e
|
@ -97,8 +97,17 @@ public class NettyChannelPipelineInstrumentation extends Instrumenter.Default {
|
||||||
*/
|
*/
|
||||||
public static class ChannelPipelineAddAdvice {
|
public static class ChannelPipelineAddAdvice {
|
||||||
@Advice.OnMethodEnter
|
@Advice.OnMethodEnter
|
||||||
public static int checkDepth() {
|
public static int checkDepth(@Advice.Argument(2) final ChannelHandler handler) {
|
||||||
return CallDepthThreadLocalMap.incrementCallDepth(ChannelPipeline.class);
|
// Previously we used one unique call depth tracker for all handlers, using
|
||||||
|
// ChannelPipeline.class as a key.
|
||||||
|
// The problem with this approach is that it does not work with netty's
|
||||||
|
// io.netty.channel.ChannelInitializer which provides an `initChannel` that can be used to
|
||||||
|
// `addLast` other handlers. In that case the depth would exceed 0 and handlers added from
|
||||||
|
// initializers would not be considered.
|
||||||
|
// Using the specific handler key instead of the generic ChannelPipeline.class will help us
|
||||||
|
// both to handle such cases and avoid adding our additional handlers in case of internal
|
||||||
|
// calls of `addLast` to other method overloads with a compatible signature.
|
||||||
|
return CallDepthThreadLocalMap.incrementCallDepth(handler.getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Advice.OnMethodExit(suppress = Throwable.class)
|
@Advice.OnMethodExit(suppress = Throwable.class)
|
||||||
|
@ -140,7 +149,7 @@ public class NettyChannelPipelineInstrumentation extends Instrumenter.Default {
|
||||||
} catch (final IllegalArgumentException e) {
|
} catch (final IllegalArgumentException e) {
|
||||||
// Prevented adding duplicate handlers.
|
// Prevented adding duplicate handlers.
|
||||||
} finally {
|
} finally {
|
||||||
CallDepthThreadLocalMap.reset(ChannelPipeline.class);
|
CallDepthThreadLocalMap.reset(handler.getClass());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,13 @@
|
||||||
import datadog.trace.agent.test.base.HttpClientTest
|
import datadog.trace.agent.test.base.HttpClientTest
|
||||||
|
import datadog.trace.instrumentation.netty41.client.HttpClientTracingHandler
|
||||||
import datadog.trace.instrumentation.netty41.client.NettyHttpClientDecorator
|
import datadog.trace.instrumentation.netty41.client.NettyHttpClientDecorator
|
||||||
import io.netty.channel.AbstractChannel
|
import io.netty.channel.AbstractChannel
|
||||||
|
import io.netty.channel.Channel
|
||||||
|
import io.netty.channel.ChannelHandler
|
||||||
|
import io.netty.channel.ChannelHandlerContext
|
||||||
|
import io.netty.channel.ChannelInitializer
|
||||||
|
import io.netty.channel.embedded.EmbeddedChannel
|
||||||
|
import io.netty.handler.codec.http.HttpClientCodec
|
||||||
import io.opentracing.tag.Tags
|
import io.opentracing.tag.Tags
|
||||||
import org.asynchttpclient.AsyncHttpClient
|
import org.asynchttpclient.AsyncHttpClient
|
||||||
import org.asynchttpclient.DefaultAsyncHttpClientConfig
|
import org.asynchttpclient.DefaultAsyncHttpClientConfig
|
||||||
|
@ -87,4 +94,102 @@ class Netty41ClientTest extends HttpClientTest<NettyHttpClientDecorator> {
|
||||||
where:
|
where:
|
||||||
method = "GET"
|
method = "GET"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def "when a handler is added to the netty pipeline we add our tracing handler"() {
|
||||||
|
setup:
|
||||||
|
def channel = new EmbeddedChannel()
|
||||||
|
def pipeline = channel.pipeline()
|
||||||
|
|
||||||
|
when:
|
||||||
|
pipeline.addLast("name", new HttpClientCodec())
|
||||||
|
|
||||||
|
then:
|
||||||
|
// The first one returns the removed tracing handler
|
||||||
|
pipeline.remove(HttpClientTracingHandler.getName()) != null
|
||||||
|
}
|
||||||
|
|
||||||
|
def "when a handler is added to the netty pipeline we add ONLY ONE tracing handler"() {
|
||||||
|
setup:
|
||||||
|
def channel = new EmbeddedChannel()
|
||||||
|
def pipeline = channel.pipeline()
|
||||||
|
|
||||||
|
when:
|
||||||
|
pipeline.addLast("name", new HttpClientCodec())
|
||||||
|
// The first one returns the removed tracing handler
|
||||||
|
pipeline.remove(HttpClientTracingHandler.getName())
|
||||||
|
// There is only one
|
||||||
|
pipeline.remove(HttpClientTracingHandler.getName()) == null
|
||||||
|
|
||||||
|
then:
|
||||||
|
thrown NoSuchElementException
|
||||||
|
}
|
||||||
|
|
||||||
|
def "handlers of different types can be added"() {
|
||||||
|
setup:
|
||||||
|
def channel = new EmbeddedChannel()
|
||||||
|
def pipeline = channel.pipeline()
|
||||||
|
|
||||||
|
when:
|
||||||
|
pipeline.addLast("some_handler", new SimpleHandler())
|
||||||
|
pipeline.addLast("a_traced_handler", new HttpClientCodec())
|
||||||
|
|
||||||
|
then:
|
||||||
|
// The first one returns the removed tracing handler
|
||||||
|
null != pipeline.remove(HttpClientTracingHandler.getName())
|
||||||
|
null != pipeline.remove("some_handler")
|
||||||
|
null != pipeline.remove("a_traced_handler")
|
||||||
|
}
|
||||||
|
|
||||||
|
def "calling pipeline.addLast methods that use overloaded methods does not cause infinite loop"() {
|
||||||
|
setup:
|
||||||
|
def channel = new EmbeddedChannel()
|
||||||
|
|
||||||
|
when:
|
||||||
|
channel.pipeline().addLast(new SimpleHandler(), new OtherSimpleHandler())
|
||||||
|
|
||||||
|
then:
|
||||||
|
null != channel.pipeline().remove('Netty41ClientTest$SimpleHandler#0')
|
||||||
|
null != channel.pipeline().remove('Netty41ClientTest$OtherSimpleHandler#0')
|
||||||
|
}
|
||||||
|
|
||||||
|
def "when a traced handler is added from an initializer we still detect it and add our channel handlers"() {
|
||||||
|
// This test method replicates a scenario similar to how reactor 0.8.x register the `HttpClientCodec` handler
|
||||||
|
// into the pipeline.
|
||||||
|
|
||||||
|
setup:
|
||||||
|
def channel = new EmbeddedChannel()
|
||||||
|
|
||||||
|
when:
|
||||||
|
channel.pipeline().addLast(new TracedHandlerFromInitializerHandler())
|
||||||
|
|
||||||
|
then:
|
||||||
|
null != channel.pipeline().remove("added_in_initializer")
|
||||||
|
null != channel.pipeline().remove(HttpClientTracingHandler.getName())
|
||||||
|
}
|
||||||
|
|
||||||
|
class SimpleHandler implements ChannelHandler {
|
||||||
|
@Override
|
||||||
|
void handlerAdded(ChannelHandlerContext ctx) throws Exception {}
|
||||||
|
@Override
|
||||||
|
void handlerRemoved(ChannelHandlerContext ctx) throws Exception {}
|
||||||
|
@Override
|
||||||
|
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {}
|
||||||
|
}
|
||||||
|
|
||||||
|
class OtherSimpleHandler implements ChannelHandler {
|
||||||
|
@Override
|
||||||
|
void handlerAdded(ChannelHandlerContext ctx) throws Exception {}
|
||||||
|
@Override
|
||||||
|
void handlerRemoved(ChannelHandlerContext ctx) throws Exception {}
|
||||||
|
@Override
|
||||||
|
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {}
|
||||||
|
}
|
||||||
|
|
||||||
|
class TracedHandlerFromInitializerHandler extends ChannelInitializer<Channel> implements ChannelHandler {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(Channel ch) throws Exception {
|
||||||
|
// This replicates how reactor 0.8.x add the HttpClientCodec
|
||||||
|
ch.pipeline().addLast("added_in_initializer", new HttpClientCodec())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue