diff --git a/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4/common/AbstractNettyChannelPipelineInstrumentation.java b/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4/common/AbstractNettyChannelPipelineInstrumentation.java index 6dc633f6e8..b53d111424 100644 --- a/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4/common/AbstractNettyChannelPipelineInstrumentation.java +++ b/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4/common/AbstractNettyChannelPipelineInstrumentation.java @@ -167,16 +167,13 @@ public abstract class AbstractNettyChannelPipelineInstrumentation implements Typ pipeline.remove(ourHandler); } virtualField.set(handler, null); - } else if (handler - .getClass() - .getName() - .startsWith("io.opentelemetry.javaagent.instrumentation.netty.")) { - handler = pipeline.removeLast(); - } else if (handler - .getClass() - .getName() - .startsWith("io.opentelemetry.instrumentation.netty.")) { - handler = pipeline.removeLast(); + } else { + String handlerClassName = handler.getClass().getName(); + if (handlerClassName.endsWith("TracingHandler") + && (handlerClassName.startsWith("io.opentelemetry.javaagent.instrumentation.netty.") + || handlerClassName.startsWith("io.opentelemetry.instrumentation.netty."))) { + handler = pipeline.removeLast(); + } } } } @@ -210,8 +207,9 @@ public abstract class AbstractNettyChannelPipelineInstrumentation implements Typ for (Iterator iterator = map.values().iterator(); iterator.hasNext(); ) { ChannelHandler handler = iterator.next(); String handlerClassName = handler.getClass().getName(); - if (handlerClassName.startsWith("io.opentelemetry.javaagent.instrumentation.netty.") - || handlerClassName.startsWith("io.opentelemetry.instrumentation.netty.")) { + if (handlerClassName.endsWith("TracingHandler") + && (handlerClassName.startsWith("io.opentelemetry.javaagent.instrumentation.netty.") + || handlerClassName.startsWith("io.opentelemetry.instrumentation.netty."))) { iterator.remove(); } } diff --git a/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/ChannelFutureTest.groovy b/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/ChannelFutureTest.groovy deleted file mode 100644 index 8bc8346670..0000000000 --- a/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/ChannelFutureTest.groovy +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.netty.channel.ChannelHandler -import io.netty.channel.ChannelHandlerContext -import io.netty.channel.embedded.EmbeddedChannel -import io.netty.util.concurrent.Future -import io.netty.util.concurrent.GenericFutureListener -import io.netty.util.concurrent.GenericProgressiveFutureListener -import io.netty.util.concurrent.ProgressiveFuture -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification - -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicInteger - -class ChannelFutureTest extends AgentInstrumentationSpecification { - // regression test for https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2705 - def "should clean up wrapped listeners"() { - given: - def channel = new EmbeddedChannel(new EmptyChannelHandler()) - def counter = new AtomicInteger() - - def listener1 = newListener(counter) - channel.closeFuture().addListener(listener1) - channel.closeFuture().removeListener(listener1) - - def listener2 = newListener(counter) - def listener3 = newProgressiveListener(counter) - channel.closeFuture().addListeners(listener2, listener3) - channel.closeFuture().removeListeners(listener2, listener3) - - when: - channel.close().await(5, TimeUnit.SECONDS) - - then: - counter.get() == 0 - } - - private static GenericFutureListener newListener(AtomicInteger counter) { - new GenericFutureListener() { - void operationComplete(Future future) throws Exception { - counter.incrementAndGet() - } - } - } - - private static GenericFutureListener newProgressiveListener(AtomicInteger counter) { - new GenericProgressiveFutureListener() { - void operationProgressed(ProgressiveFuture future, long progress, long total) throws Exception { - counter.incrementAndGet() - } - - void operationComplete(Future future) throws Exception { - counter.incrementAndGet() - } - } - } - - private static class EmptyChannelHandler implements ChannelHandler { - @Override - void handlerAdded(ChannelHandlerContext ctx) throws Exception { - } - - @Override - void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - } - - @Override - void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - } - } -} diff --git a/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/ChannelPipelineTest.groovy b/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/ChannelPipelineTest.groovy deleted file mode 100644 index cbe819dc5b..0000000000 --- a/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/ChannelPipelineTest.groovy +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.netty.channel.ChannelHandlerAdapter -import io.netty.channel.DefaultChannelPipeline -import io.netty.channel.embedded.EmbeddedChannel -import io.netty.handler.codec.http.HttpClientCodec -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import io.opentelemetry.javaagent.instrumentation.netty.v4_0.client.HttpClientTracingHandler -import spock.lang.Unroll - -@Unroll -class ChannelPipelineTest extends AgentInstrumentationSpecification { - - // regression test for https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1373 - // and https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4040 - def "test remove our handler #testName"() { - setup: - def channel = new EmbeddedChannel(new NoopChannelHandler()) - def channelPipeline = new DefaultChannelPipeline(channel) - def handler = new HttpClientCodec() - - when: - // no handlers - channelPipeline.first() == null - channelPipeline.last() == null - - then: - // add handler - channelPipeline.addLast("http", handler) - channelPipeline.first() == handler - // our handler was also added - channelPipeline.last().getClass().getSimpleName() == "HttpClientTracingHandler" - - and: - removeMethod.call(channelPipeline, handler) - // removing handler also removes our handler - channelPipeline.first() == null || "io.netty.channel.DefaultChannelPipeline\$TailHandler" == channelPipeline.first().getClass().getName() - channelPipeline.last() == null - - where: - testName | removeMethod - "by instance" | { pipeline, h -> pipeline.remove(h) } - "by class" | { pipeline, h -> pipeline.remove(h.getClass()) } - "by name" | { pipeline, h -> pipeline.remove("http") } - "first" | { pipeline, h -> pipeline.removeFirst() } - } - - // regression test for https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4040 - def "should replace handler #desc"() { - setup: - def channel = new EmbeddedChannel(new NoopChannelHandler()) - def channelPipeline = new DefaultChannelPipeline(channel) - def httpHandler = new HttpClientCodec() - - expect: "no handlers initially" - channelPipeline.size() == 0 - - when: - def noopHandler = new NoopChannelHandler() - channelPipeline.addFirst("test", noopHandler) - - then: "only the noop handler" - channelPipeline.size() == 1 - channelPipeline.first() == noopHandler - - when: - replaceMethod(channelPipeline, "test", noopHandler, "http", httpHandler) - - then: "noop handler was removed; http and instrumentation handlers were added" - channelPipeline.size() == 1 - channelPipeline.first() == httpHandler - channelPipeline.last().getClass().getSimpleName() == "HttpClientTracingHandler" - - when: - def anotherNoopHandler = new NoopChannelHandler() - replaceMethod(channelPipeline, "http", httpHandler, "test", anotherNoopHandler) - - then: "http and instrumentation handlers were removed; noop handler was added" - channelPipeline.size() == 1 - channelPipeline.first() == anotherNoopHandler - - where: - desc | replaceMethod - "by instance" | { pipeline, oldName, oldHandler, newName, newHandler -> pipeline.replace(oldHandler, newName, newHandler) } - "by class" | { pipeline, oldName, oldHandler, newName, newHandler -> pipeline.replace(oldHandler.getClass(), newName, newHandler) } - "by name" | { pipeline, oldName, oldHandler, newName, newHandler -> pipeline.replace(oldName, newName, newHandler) } - } - - // regression test for https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4056 - def "should addAfter and removeLast handler #desc"() { - setup: - def channel = new EmbeddedChannel(new NoopChannelHandler()) - def channelPipeline = new DefaultChannelPipeline(channel) - def httpHandler = new HttpClientCodec() - - expect: "no handlers initially" - channelPipeline.size() == 0 - - when: - channelPipeline.addLast("http", httpHandler) - - then: "add http and instrumentation handlers" - channelPipeline.size() == 1 - channelPipeline.first() == httpHandler - channelPipeline.last().getClass().getSimpleName() == "HttpClientTracingHandler" - - when: - def noopHandler = new NoopChannelHandler() - channelPipeline.addAfter("http", "noop", noopHandler) - - then: "instrumentation handler is between with http and noop" - channelPipeline.size() == 2 - channelPipeline.first() == httpHandler - channelPipeline.last() == noopHandler - - when: - channelPipeline.removeLast() - - then: "http and instrumentation handlers will be remained" - channelPipeline.size() == 1 - channelPipeline.first() == httpHandler - channelPipeline.last().getClass().getSimpleName() == "HttpClientTracingHandler" - - when: - def removed = channelPipeline.removeLast() - - then: "there is no handler in pipeline" - channelPipeline.size() == 0 - // removing tracing handler also removes the http handler and returns it - removed == httpHandler - } - - // regression test for https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/10377 - def "our handler not in handlers map"() { - setup: - def channel = new EmbeddedChannel(new NoopChannelHandler()) - def channelPipeline = new DefaultChannelPipeline(channel) - def handler = new HttpClientCodec() - - when: - // no handlers - channelPipeline.first() == null - - then: - // add handler - channelPipeline.addLast("http", handler) - channelPipeline.first() == handler - // our handler was also added - channelPipeline.last().getClass().simpleName == "HttpClientTracingHandler" - // our handler not counted - channelPipeline.size() == 1 - // our handler is not in handlers map - channelPipeline.toMap().size() == 1 - // our handler is not in handlers iterator - def list = [] - channelPipeline.iterator().forEachRemaining {list.add(it) } - list.size() == 1 - } - - private static class NoopChannelHandler extends ChannelHandlerAdapter { - } -} diff --git a/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/Netty40ClientSslTest.groovy b/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/Netty40ClientSslTest.groovy deleted file mode 100644 index 27ea9e069a..0000000000 --- a/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/Netty40ClientSslTest.groovy +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.netty.bootstrap.Bootstrap -import io.netty.buffer.Unpooled -import io.netty.channel.Channel -import io.netty.channel.ChannelInitializer -import io.netty.channel.ChannelPipeline -import io.netty.channel.EventLoopGroup -import io.netty.channel.nio.NioEventLoopGroup -import io.netty.channel.socket.SocketChannel -import io.netty.channel.socket.nio.NioSocketChannel -import io.netty.handler.codec.http.DefaultFullHttpRequest -import io.netty.handler.codec.http.HttpClientCodec -import io.netty.handler.codec.http.HttpHeaders -import io.netty.handler.codec.http.HttpMethod -import io.netty.handler.codec.http.HttpVersion -import io.netty.handler.ssl.SslHandler -import io.opentelemetry.instrumentation.api.semconv.network.internal.NetworkAttributes -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer -import io.opentelemetry.semconv.SemanticAttributes -import spock.lang.Shared - -import javax.net.ssl.SSLContext -import javax.net.ssl.SSLHandshakeException -import java.util.concurrent.CompletableFuture -import java.util.concurrent.ExecutionException -import java.util.concurrent.TimeUnit - -import static io.opentelemetry.api.trace.SpanKind.CLIENT -import static io.opentelemetry.api.trace.SpanKind.INTERNAL -import static io.opentelemetry.api.trace.SpanKind.SERVER -import static io.opentelemetry.api.trace.StatusCode.ERROR - -class Netty40ClientSslTest extends AgentInstrumentationSpecification { - - @Shared - HttpClientTestServer server - @Shared - EventLoopGroup eventLoopGroup - - def setupSpec() { - server = new HttpClientTestServer(openTelemetry) - server.start() - eventLoopGroup = new NioEventLoopGroup() - } - - def cleanupSpec() { - server.stop().get(10, TimeUnit.SECONDS) - eventLoopGroup.shutdownGracefully() - } - - def "should fail SSL handshake"() { - given: - def bootstrap = createBootstrap(eventLoopGroup, ["SSLv3"]) - - def uri = server.resolveHttpsAddress("/success") - def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.path, Unpooled.EMPTY_BUFFER) - HttpHeaders.setHost(request, uri.host + ":" + uri.port) - - when: - Channel channel = null - runWithSpan("parent") { - channel = bootstrap.connect(uri.host, uri.port).sync().channel() - def result = new CompletableFuture() - channel.pipeline().addLast(new ClientHandler(result)) - channel.writeAndFlush(request).get(10, TimeUnit.SECONDS) - result.get(10, TimeUnit.SECONDS) - } - - then: - Throwable thrownException = thrown() - if (thrownException instanceof ExecutionException) { - thrownException = thrownException.cause - } - - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - status ERROR - errorEvent(thrownException.class, thrownException.message) - } - span(1) { - name "CONNECT" - kind INTERNAL - childOf span(0) - attributes { - "$SemanticAttributes.NETWORK_TRANSPORT" "tcp" - "$SemanticAttributes.NETWORK_TYPE" "ipv4" - "$SemanticAttributes.SERVER_ADDRESS" uri.host - "$SemanticAttributes.SERVER_PORT" uri.port - "$NetworkAttributes.NETWORK_PEER_PORT" uri.port - "$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1" - } - } - span(2) { - name "SSL handshake" - kind INTERNAL - childOf span(0) - status ERROR - // netty swallows the exception, it doesn't make any sense to hard-code the message - errorEventWithAnyMessage(SSLHandshakeException) - attributes { - "$SemanticAttributes.NETWORK_TRANSPORT" "tcp" - "$SemanticAttributes.NETWORK_TYPE" "ipv4" - "$NetworkAttributes.NETWORK_PEER_PORT" uri.port - "$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1" - } - } - } - } - - cleanup: - channel?.close()?.sync() - } - - def "should successfully establish SSL handshake"() { - given: - def bootstrap = createBootstrap(eventLoopGroup, ["TLSv1", "TLSv1.1", "TLSv1.2", "TLSv1.3"]) - - def uri = server.resolveHttpsAddress("/success") - def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.path, Unpooled.EMPTY_BUFFER) - HttpHeaders.setHost(request, uri.host + ":" + uri.port) - - when: - Channel channel = null - runWithSpan("parent") { - channel = bootstrap.connect(uri.host, uri.port).sync().channel() - def result = new CompletableFuture() - channel.pipeline().addLast(new ClientHandler(result)) - channel.writeAndFlush(request).get(10, TimeUnit.SECONDS) - result.get(10, TimeUnit.SECONDS) - } - - then: - assertTraces(1) { - trace(0, 5) { - span(0) { - name "parent" - } - span(1) { - name "CONNECT" - kind INTERNAL - childOf span(0) - attributes { - "$SemanticAttributes.NETWORK_TRANSPORT" "tcp" - "$SemanticAttributes.NETWORK_TYPE" "ipv4" - "$SemanticAttributes.SERVER_ADDRESS" uri.host - "$SemanticAttributes.SERVER_PORT" uri.port - "$NetworkAttributes.NETWORK_PEER_PORT" uri.port - "$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1" - } - } - span(2) { - name "SSL handshake" - kind INTERNAL - childOf span(0) - attributes { - "$SemanticAttributes.NETWORK_TRANSPORT" "tcp" - "$SemanticAttributes.NETWORK_TYPE" "ipv4" - "$NetworkAttributes.NETWORK_PEER_PORT" uri.port - "$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1" - } - } - span(3) { - name "GET" - kind CLIENT - childOf(span(0)) - } - span(4) { - name "test-http-server" - kind SERVER - childOf(span(3)) - } - } - } - - cleanup: - channel?.close()?.sync() - } - - // list of default ciphers copied from netty's JdkSslContext - private static final String[] SUPPORTED_CIPHERS = [ - "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", - "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", - "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA", - "TLS_RSA_WITH_AES_128_GCM_SHA256", - "TLS_RSA_WITH_AES_128_CBC_SHA", - "TLS_RSA_WITH_AES_256_CBC_SHA", - "SSL_RSA_WITH_3DES_EDE_CBC_SHA" - ] - - private static Bootstrap createBootstrap(EventLoopGroup eventLoopGroup, List enabledProtocols) { - def bootstrap = new Bootstrap() - bootstrap.group(eventLoopGroup) - .channel(NioSocketChannel) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - ChannelPipeline pipeline = socketChannel.pipeline() - - def sslContext = SSLContext.getInstance("TLS") - sslContext.init(null, null, null) - def sslEngine = sslContext.createSSLEngine() - sslEngine.setUseClientMode(true) - sslEngine.setEnabledProtocols(enabledProtocols as String[]) - sslEngine.setEnabledCipherSuites(SUPPORTED_CIPHERS) - pipeline.addLast(new SslHandler(sslEngine)) - - pipeline.addLast(new HttpClientCodec()) - } - }) - bootstrap - } -} diff --git a/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/Netty40ClientTest.groovy b/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/Netty40ClientTest.groovy deleted file mode 100644 index 96a3694179..0000000000 --- a/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/Netty40ClientTest.groovy +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.netty.bootstrap.Bootstrap -import io.netty.buffer.Unpooled -import io.netty.channel.Channel -import io.netty.channel.ChannelInitializer -import io.netty.channel.ChannelOption -import io.netty.channel.ChannelPipeline -import io.netty.channel.EventLoopGroup -import io.netty.channel.nio.NioEventLoopGroup -import io.netty.channel.socket.SocketChannel -import io.netty.channel.socket.nio.NioSocketChannel -import io.netty.handler.codec.http.DefaultFullHttpRequest -import io.netty.handler.codec.http.HttpClientCodec -import io.netty.handler.codec.http.HttpHeaders -import io.netty.handler.codec.http.HttpMethod -import io.netty.handler.codec.http.HttpVersion -import io.netty.handler.timeout.ReadTimeoutHandler -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.instrumentation.test.AgentTestTrait -import io.opentelemetry.instrumentation.test.base.HttpClientTest -import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult -import io.opentelemetry.semconv.SemanticAttributes -import spock.lang.Shared - -import java.util.concurrent.CompletableFuture -import java.util.concurrent.TimeUnit - -class Netty40ClientTest extends HttpClientTest implements AgentTestTrait { - - @Shared - private EventLoopGroup eventLoopGroup = new NioEventLoopGroup() - - @Shared - private Bootstrap bootstrap = buildBootstrap() - - @Shared - private Bootstrap readTimeoutBootstrap = buildBootstrap(true) - - def cleanupSpec() { - eventLoopGroup?.shutdownGracefully() - } - - Bootstrap buildBootstrap(boolean readTimeout = false) { - Bootstrap bootstrap = new Bootstrap() - bootstrap.group(eventLoopGroup) - .channel(NioSocketChannel) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - ChannelPipeline pipeline = socketChannel.pipeline() - if (readTimeout) { - pipeline.addLast(new ReadTimeoutHandler(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS)) - } - pipeline.addLast(new HttpClientCodec()) - } - }) - - return bootstrap - } - - Bootstrap getBootstrap(URI uri) { - if (uri.getPath() == "/read-timeout") { - return readTimeoutBootstrap - } - return bootstrap - } - - @Override - DefaultFullHttpRequest buildRequest(String method, URI uri, Map headers) { - def target = uri.path - if (uri.query != null) { - target += "?" + uri.query - } - def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), target, Unpooled.EMPTY_BUFFER) - HttpHeaders.setHost(request, uri.host + ":" + uri.port) - request.headers().set("user-agent", "Netty") - headers.each { k, v -> request.headers().set(k, v) } - return request - } - - @Override - int sendRequest(DefaultFullHttpRequest request, String method, URI uri, Map headers) { - def channel = getBootstrap(uri).connect(uri.host, getPort(uri)).sync().channel() - def result = new CompletableFuture() - channel.pipeline().addLast(new ClientHandler(result)) - channel.writeAndFlush(request).get() - return result.get(20, TimeUnit.SECONDS) - } - - @Override - void sendRequestWithCallback(DefaultFullHttpRequest request, String method, URI uri, Map headers, HttpClientResult requestResult) { - Channel ch - try { - ch = getBootstrap(uri).connect(uri.host, getPort(uri)).sync().channel() - } catch (Exception exception) { - requestResult.complete(exception) - return - } - def result = new CompletableFuture() - result.whenComplete { status, throwable -> - requestResult.complete({ status }, throwable) - } - ch.pipeline().addLast(new ClientHandler(result)) - ch.writeAndFlush(request) - } - - @Override - String expectedClientSpanName(URI uri, String method) { - switch (uri.toString()) { - case "http://localhost:61/": // unopened port - case "http://192.0.2.1/": // non routable address - return "CONNECT" - default: - return super.expectedClientSpanName(uri, method) - } - } - - @Override - Set> httpAttributes(URI uri) { - switch (uri.toString()) { - case "http://localhost:61/": // unopened port - case "http://192.0.2.1/": // non routable address - return [] - } - def attributes = super.httpAttributes(uri) - attributes.remove(SemanticAttributes.SERVER_ADDRESS) - attributes.remove(SemanticAttributes.SERVER_PORT) - return attributes - } - - @Override - boolean testRedirects() { - false - } - - @Override - boolean testHttps() { - false - } -} diff --git a/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/Netty40ConnectionSpanTest.groovy b/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/Netty40ConnectionSpanTest.groovy deleted file mode 100644 index a070209880..0000000000 --- a/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/Netty40ConnectionSpanTest.groovy +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.netty.bootstrap.Bootstrap -import io.netty.buffer.Unpooled -import io.netty.channel.ChannelInitializer -import io.netty.channel.ChannelPipeline -import io.netty.channel.EventLoopGroup -import io.netty.channel.nio.NioEventLoopGroup -import io.netty.channel.socket.SocketChannel -import io.netty.channel.socket.nio.NioSocketChannel -import io.netty.handler.codec.http.DefaultFullHttpRequest -import io.netty.handler.codec.http.HttpClientCodec -import io.netty.handler.codec.http.HttpHeaders -import io.netty.handler.codec.http.HttpMethod -import io.netty.handler.codec.http.HttpVersion -import io.opentelemetry.instrumentation.api.semconv.network.internal.NetworkAttributes -import io.opentelemetry.instrumentation.test.AgentTestTrait -import io.opentelemetry.instrumentation.test.InstrumentationSpecification -import io.opentelemetry.instrumentation.test.utils.PortUtils -import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer -import io.opentelemetry.semconv.SemanticAttributes -import spock.lang.Shared - -import java.util.concurrent.CompletableFuture -import java.util.concurrent.TimeUnit - -import static io.opentelemetry.api.trace.SpanKind.CLIENT -import static io.opentelemetry.api.trace.SpanKind.INTERNAL -import static io.opentelemetry.api.trace.SpanKind.SERVER -import static io.opentelemetry.api.trace.StatusCode.ERROR - -class Netty40ConnectionSpanTest extends InstrumentationSpecification implements AgentTestTrait { - - @Shared - private HttpClientTestServer server - - @Shared - private EventLoopGroup eventLoopGroup = new NioEventLoopGroup() - - @Shared - private Bootstrap bootstrap = buildBootstrap() - - def setupSpec() { - server = new HttpClientTestServer(openTelemetry) - server.start() - } - - def cleanupSpec() { - eventLoopGroup.shutdownGracefully() - server.stop() - } - - Bootstrap buildBootstrap() { - Bootstrap bootstrap = new Bootstrap() - bootstrap.group(eventLoopGroup) - .channel(NioSocketChannel) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - ChannelPipeline pipeline = socketChannel.pipeline() - pipeline.addLast(new HttpClientCodec()) - } - }) - - return bootstrap - } - - DefaultFullHttpRequest buildRequest(String method, URI uri, Map headers) { - def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri.path, Unpooled.EMPTY_BUFFER) - HttpHeaders.setHost(request, uri.host + ":" + uri.port) - headers.each { k, v -> request.headers().set(k, v) } - return request - } - - int sendRequest(DefaultFullHttpRequest request, URI uri) { - def channel = bootstrap.connect(uri.host, uri.port).sync().channel() - def result = new CompletableFuture() - channel.pipeline().addLast(new ClientHandler(result)) - channel.writeAndFlush(request).get() - return result.get(20, TimeUnit.SECONDS) - } - - def "test successful request"() { - when: - def uri = URI.create("http://localhost:${server.httpPort()}/success") - def request = buildRequest("GET", uri, [:]) - def responseCode = runWithSpan("parent") { - sendRequest(request, uri) - } - - then: - responseCode == 200 - assertTraces(1) { - trace(0, 4) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name "CONNECT" - kind INTERNAL - childOf(span(0)) - attributes { - "$SemanticAttributes.NETWORK_TRANSPORT" "tcp" - "$SemanticAttributes.NETWORK_TYPE" "ipv4" - "$SemanticAttributes.SERVER_ADDRESS" uri.host - "$SemanticAttributes.SERVER_PORT" uri.port - "$NetworkAttributes.NETWORK_PEER_PORT" uri.port - "$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1" - } - } - span(2) { - name "GET" - kind CLIENT - childOf(span(0)) - } - span(3) { - name "test-http-server" - kind SERVER - childOf(span(2)) - } - } - } - } - - def "test failing request"() { - when: - URI uri = URI.create("http://localhost:${PortUtils.UNUSABLE_PORT}") - def request = buildRequest("GET", uri, [:]) - runWithSpan("parent") { - sendRequest(request, uri) - } - - then: - def thrownException = thrown(Exception) - - and: - assertTraces(1) { - trace(0, 2) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - status ERROR - errorEvent(thrownException.class, thrownException.message) - } - span(1) { - name "CONNECT" - kind INTERNAL - childOf(span(0)) - status ERROR - errorEvent(thrownException.class, thrownException.message) - attributes { - "$SemanticAttributes.NETWORK_TRANSPORT" "tcp" - "$SemanticAttributes.NETWORK_TYPE" { it == "ipv4" || it == null } - "$SemanticAttributes.SERVER_ADDRESS" uri.host - "$SemanticAttributes.SERVER_PORT" uri.port - "$NetworkAttributes.NETWORK_PEER_ADDRESS" { it == "127.0.0.1" || it == null } - "$NetworkAttributes.NETWORK_PEER_PORT" { it == uri.port || it == null } - } - } - } - } - } -} diff --git a/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/Netty40ServerTest.groovy b/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/Netty40ServerTest.groovy deleted file mode 100644 index b5a2094c24..0000000000 --- a/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/Netty40ServerTest.groovy +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.netty.bootstrap.ServerBootstrap -import io.netty.buffer.ByteBuf -import io.netty.buffer.Unpooled -import io.netty.channel.ChannelHandlerContext -import io.netty.channel.ChannelInitializer -import io.netty.channel.ChannelPipeline -import io.netty.channel.EventLoopGroup -import io.netty.channel.SimpleChannelInboundHandler -import io.netty.channel.nio.NioEventLoopGroup -import io.netty.channel.socket.nio.NioServerSocketChannel -import io.netty.handler.codec.http.DefaultFullHttpResponse -import io.netty.handler.codec.http.FullHttpResponse -import io.netty.handler.codec.http.HttpHeaders -import io.netty.handler.codec.http.HttpRequest -import io.netty.handler.codec.http.HttpRequestDecoder -import io.netty.handler.codec.http.HttpResponseEncoder -import io.netty.handler.codec.http.HttpResponseStatus -import io.netty.handler.codec.http.QueryStringDecoder -import io.netty.handler.logging.LogLevel -import io.netty.handler.logging.LoggingHandler -import io.netty.util.CharsetUtil -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.instrumentation.test.AgentTestTrait -import io.opentelemetry.instrumentation.test.base.HttpServerTest -import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint -import io.opentelemetry.semconv.SemanticAttributes - -import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH -import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE -import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1 -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.CAPTURE_HEADERS -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.ERROR -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.INDEXED_CHILD -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.NOT_FOUND -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.QUERY_PARAM -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.REDIRECT -import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS - -class Netty40ServerTest extends HttpServerTest implements AgentTestTrait { - - static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(SERVER_LOGGER.name, LogLevel.DEBUG) - - @Override - boolean hasResponseCustomizer(ServerEndpoint endpoint) { - true - } - - @Override - EventLoopGroup startServer(int port) { - def eventLoopGroup = new NioEventLoopGroup() - - ServerBootstrap bootstrap = new ServerBootstrap() - .group(eventLoopGroup) - .handler(LOGGING_HANDLER) - .childHandler([ - initChannel: { ch -> - ChannelPipeline pipeline = ch.pipeline() - pipeline.addFirst("logger", LOGGING_HANDLER) - - def handlers = [new HttpRequestDecoder(), new HttpResponseEncoder()] - handlers.each { pipeline.addLast(it) } - pipeline.addLast(new SimpleChannelInboundHandler() { - - @Override - protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg instanceof HttpRequest) { - def request = msg as HttpRequest - def uri = URI.create(request.uri) - ServerEndpoint endpoint = ServerEndpoint.forPath(uri.path) - ctx.write controller(endpoint) { - ByteBuf content = null - FullHttpResponse response - switch (endpoint) { - case SUCCESS: - case ERROR: - content = Unpooled.copiedBuffer(endpoint.body, CharsetUtil.UTF_8) - response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status), content) - break - case INDEXED_CHILD: - content = Unpooled.EMPTY_BUFFER - endpoint.collectSpanAttributes { new QueryStringDecoder(uri).parameters().get(it).find() } - response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status), content) - break - case QUERY_PARAM: - content = Unpooled.copiedBuffer(uri.query, CharsetUtil.UTF_8) - response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status), content) - break - case REDIRECT: - content = Unpooled.EMPTY_BUFFER - response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status), content) - response.headers().set(HttpHeaders.Names.LOCATION, endpoint.body) - break - case CAPTURE_HEADERS: - content = Unpooled.copiedBuffer(endpoint.body, CharsetUtil.UTF_8) - response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status), content) - response.headers().set("X-Test-Response", request.headers().get("X-Test-Request")) - break - case EXCEPTION: - throw new Exception(endpoint.body) - default: - content = Unpooled.copiedBuffer(NOT_FOUND.body, CharsetUtil.UTF_8) - response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(NOT_FOUND.status), content) - break - } - response.headers().set(CONTENT_TYPE, "text/plain") - if (content) { - response.headers().set(CONTENT_LENGTH, content.readableBytes()) - } - return response - } - } - } - - @Override - void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - ByteBuf content = Unpooled.copiedBuffer(cause.message, CharsetUtil.UTF_8) - FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR, content) - response.headers().set(CONTENT_TYPE, "text/plain") - response.headers().set(CONTENT_LENGTH, content.readableBytes()) - ctx.write(response) - } - - @Override - void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - ctx.flush() - } - }) - } - ] as ChannelInitializer).channel(NioServerSocketChannel) - bootstrap.bind(port).sync() - - return eventLoopGroup - } - - @Override - void stopServer(EventLoopGroup server) { - server?.shutdownGracefully() - } - - @Override - Set> httpAttributes(ServerEndpoint endpoint) { - def attributes = super.httpAttributes(endpoint) - attributes.remove(SemanticAttributes.HTTP_ROUTE) - attributes - } -} diff --git a/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/ChannelFutureTest.java b/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/ChannelFutureTest.java new file mode 100644 index 0000000000..efbc4a828b --- /dev/null +++ b/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/ChannelFutureTest.java @@ -0,0 +1,82 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.netty.v4_0; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.GenericProgressiveFutureListener; +import io.netty.util.concurrent.ProgressiveFuture; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class ChannelFutureTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @SuppressWarnings("unchecked") + @Test + void shouldCleanUpWrappedListeners() throws Exception { + EmbeddedChannel channel = new EmbeddedChannel(new EmptyChannelHandler()); + AtomicInteger counter = new AtomicInteger(); + + GenericFutureListener> listener1 = newListener(counter); + channel.closeFuture().addListener(listener1); + channel.closeFuture().removeListener(listener1); + + GenericFutureListener> listener2 = newListener(counter); + GenericProgressiveFutureListener> listener3 = + newProgressiveListener(counter); + channel.closeFuture().addListener(listener2); + channel.closeFuture().addListener(listener3); + channel.closeFuture().removeListeners(listener2, listener3); + + channel.close().await(5, TimeUnit.SECONDS); + + assertEquals(0, counter.get()); + } + + private static GenericFutureListener> newListener(AtomicInteger counter) { + return future -> counter.incrementAndGet(); + } + + private static GenericProgressiveFutureListener> newProgressiveListener( + AtomicInteger counter) { + return new GenericProgressiveFutureListener>() { + @Override + public void operationComplete(@NotNull ProgressiveFuture future) throws Exception { + counter.incrementAndGet(); + } + + @Override + public void operationProgressed(ProgressiveFuture future, long progress, long total) + throws Exception { + counter.incrementAndGet(); + } + }; + } + + private static class EmptyChannelHandler implements ChannelHandler { + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception {} + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {} + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {} + } +} diff --git a/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/ChannelPipelineTest.java b/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/ChannelPipelineTest.java new file mode 100644 index 0000000000..ecc5486c7b --- /dev/null +++ b/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/ChannelPipelineTest.java @@ -0,0 +1,228 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.netty.v4_0; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.http.HttpClientCodec; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +class ChannelPipelineTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + private static final Class defaultChannelPipelineClass = getDefaultChannelPipelineClass(); + + @Nullable + private static Class getDefaultChannelPipelineClass() { + try { + return Class.forName("io.netty.channel.DefaultChannelPipeline"); + } catch (Exception e) { + return null; + } + } + + @NotNull + private static Constructor getConstructor() throws NoSuchMethodException { + assertNotNull(defaultChannelPipelineClass); + Constructor constructor = defaultChannelPipelineClass.getDeclaredConstructor(Channel.class); + constructor.setAccessible(true); + return constructor; + } + + // regression test for + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1373 + // and https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4040 + @ParameterizedTest + @CsvSource({"by instance", "by class", "by name", "first"}) + void testRemoveOurHandler(String testName) throws Exception { + EmbeddedChannel channel = new EmbeddedChannel(new NoopChannelHandler()); + ChannelPipeline channelPipeline = (ChannelPipeline) getConstructor().newInstance(channel); + HttpClientCodec handler = new HttpClientCodec(); + + // no handlers initially except the default one + assertTrue( + channelPipeline.first() == null + || "io.netty.channel.DefaultChannelPipeline$TailHandler" + .equals(channelPipeline.first().getClass().getName())); + assertNull(channelPipeline.last()); + assertEquals(0, channelPipeline.toMap().size()); + + // add handler + channelPipeline.addLast("http", handler); + assertEquals(handler, channelPipeline.first()); + // our handler was also added + assertEquals("HttpClientTracingHandler", channelPipeline.last().getClass().getSimpleName()); + assertEquals(1, channelPipeline.toMap().size()); + + if ("by instance".equals(testName)) { + channelPipeline.remove(handler); + } else if ("by class".equals(testName)) { + channelPipeline.remove(handler.getClass()); + } else if ("by name".equals(testName)) { + channelPipeline.remove("http"); + } else if ("first".equals(testName)) { + channelPipeline.removeFirst(); + } + + // removing handler also removes our handler + assertTrue( + channelPipeline.first() == null + || "io.netty.channel.DefaultChannelPipeline$TailHandler" + .equals(channelPipeline.first().getClass().getName())); + assertNull(channelPipeline.last()); + assertEquals(0, channelPipeline.toMap().size()); + } + + // regression test for + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4040 + @ParameterizedTest + @CsvSource({"by instance", "by class", "by name"}) + void shouldReplaceHandler(String desc) throws Exception { + EmbeddedChannel channel = new EmbeddedChannel(new NoopChannelHandler()); + ChannelPipeline channelPipeline = (ChannelPipeline) getConstructor().newInstance(channel); + HttpClientCodec httpHandler = new HttpClientCodec(); + + // no handlers initially except the default one + assertTrue( + channelPipeline.first() == null + || "io.netty.channel.DefaultChannelPipeline$TailHandler" + .equals(channelPipeline.first().getClass().getName())); + assertNull(channelPipeline.last()); + assertEquals(0, channelPipeline.toMap().size()); + + NoopChannelHandler noopHandler = new NoopChannelHandler(); + channelPipeline.addFirst("test", noopHandler); + + // only the noop handler + assertEquals(noopHandler, channelPipeline.first()); + assertEquals(1, channelPipeline.toMap().size()); + + if ("by instance".equals(desc)) { + channelPipeline.replace(noopHandler, "http", httpHandler); + } else if ("by class".equals(desc)) { + channelPipeline.replace(noopHandler.getClass(), "http", httpHandler); + } else if ("by name".equals(desc)) { + channelPipeline.replace("test", "http", httpHandler); + } + + // noop handler was removed; http and instrumentation handlers were added + assertEquals(httpHandler, channelPipeline.first()); + assertEquals("HttpClientTracingHandler", channelPipeline.last().getClass().getSimpleName()); + assertEquals(1, channelPipeline.toMap().size()); + + NoopChannelHandler anotherNoopHandler = new NoopChannelHandler(); + channelPipeline.replace("http", "test", anotherNoopHandler); + + // http and instrumentation handlers were removed; noop handler was added + assertEquals(anotherNoopHandler, channelPipeline.first()); + } + + // regression test for + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4056 + @Test + void shouldAddAfterAndRemoveLastHandler() throws Exception { + EmbeddedChannel channel = new EmbeddedChannel(new NoopChannelHandler()); + ChannelPipeline channelPipeline = (ChannelPipeline) getConstructor().newInstance(channel); + HttpClientCodec httpHandler = new HttpClientCodec(); + + // no handlers initially + assertTrue( + channelPipeline.first() == null + || "io.netty.channel.DefaultChannelPipeline$TailHandler" + .equals(channelPipeline.first().getClass().getName())); + assertNull(channelPipeline.last()); + assertEquals(0, channelPipeline.toMap().size()); + + // Add http and instrumentation handlers + channelPipeline.addLast("http", httpHandler); + assertEquals(channelPipeline.first(), httpHandler); + assertEquals("HttpClientTracingHandler", channelPipeline.last().getClass().getSimpleName()); + assertEquals(1, channelPipeline.toMap().size()); + + NoopChannelHandler noopHandler = new NoopChannelHandler(); + channelPipeline.addAfter("http", "noop", noopHandler); + + // instrumentation handler is between http and noop handlers + assertEquals(channelPipeline.first(), httpHandler); + assertEquals(channelPipeline.last(), noopHandler); + assertEquals(2, channelPipeline.toMap().size()); + + // http and instrumentation handlers will remain when last handler is removed + { + ChannelHandler removed = channelPipeline.removeLast(); + assertEquals(noopHandler, removed); + assertEquals(channelPipeline.first(), httpHandler); + assertEquals("HttpClientTracingHandler", channelPipeline.last().getClass().getSimpleName()); + assertEquals(1, channelPipeline.toMap().size()); + } + + // there is no handler in pipeline when last handler is removed + { + ChannelHandler removed = channelPipeline.removeLast(); + assertEquals(httpHandler, removed); + assertEquals(0, channelPipeline.toMap().size()); + } + } + + // regression test for + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/10377 + @Test + void ourHandlerNotInHandlerMap() throws Exception { + EmbeddedChannel channel = new EmbeddedChannel(new NoopChannelHandler()); + ChannelPipeline channelPipeline = (ChannelPipeline) getConstructor().newInstance(channel); + HttpClientCodec httpHandler = new HttpClientCodec(); + + // no handlers initially + assertTrue( + channelPipeline.first() == null + || "io.netty.channel.DefaultChannelPipeline$TailHandler" + .equals(channelPipeline.first().getClass().getName())); + assertNull(channelPipeline.last()); + assertEquals(0, channelPipeline.toMap().size()); + + // add handler + channelPipeline.addLast("http", httpHandler); + assertEquals(httpHandler, channelPipeline.first()); + + // our handler was also added + assertEquals("HttpClientTracingHandler", channelPipeline.last().getClass().getSimpleName()); + + // our handler is not in handlers map + assertEquals(1, channelPipeline.toMap().size()); + + // our handler is not in handlers iterator + List list = new ArrayList<>(); + channelPipeline + .iterator() + .forEachRemaining( + entry -> { + list.add(entry.getValue()); + }); + assertEquals(1, list.size()); + } + + private static class NoopChannelHandler extends ChannelHandlerAdapter {} +} diff --git a/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/ClientHandler.java b/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/client/ClientHandler.java similarity index 54% rename from instrumentation/netty/netty-4.0/javaagent/src/test/groovy/ClientHandler.java rename to instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/client/ClientHandler.java index c96c9697dc..b2fc03b9be 100644 --- a/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/ClientHandler.java +++ b/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/client/ClientHandler.java @@ -3,9 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ +package io.opentelemetry.javaagent.instrumentation.netty.v4_0.client; + import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.HttpObject; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.HttpResponse; import java.util.concurrent.CompletableFuture; @@ -14,26 +15,25 @@ Bridges from async Netty world to the sync world of our http client tests. When request initiated by a test gets a response, calls a given callback and completes given future with response's status code. */ -public class ClientHandler extends SimpleChannelInboundHandler { - private final CompletableFuture responseCode; +class ClientHandler extends ChannelInboundHandlerAdapter { + private final CompletableFuture result; - public ClientHandler(CompletableFuture responseCode) { - this.responseCode = responseCode; + public ClientHandler(CompletableFuture result) { + this.result = result; } @Override - public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) { + public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof HttpResponse) { - ctx.pipeline().remove(this); - HttpResponse response = (HttpResponse) msg; - responseCode.complete(response.getStatus().code()); + result.complete(response.getStatus().code()); } + ctx.fireChannelRead(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - responseCode.completeExceptionally(cause); + result.completeExceptionally(cause); ctx.close(); } } diff --git a/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/client/Netty40ClientSslTest.java b/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/client/Netty40ClientSslTest.java new file mode 100644 index 0000000000..d083b89b91 --- /dev/null +++ b/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/client/Netty40ClientSslTest.java @@ -0,0 +1,249 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.netty.v4_0.client; + +import static io.opentelemetry.api.trace.SpanKind.CLIENT; +import static io.opentelemetry.api.trace.SpanKind.INTERNAL; +import static io.opentelemetry.api.trace.SpanKind.SERVER; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static org.assertj.core.api.Assertions.catchThrowable; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.ssl.SslHandler; +import io.opentelemetry.instrumentation.api.semconv.network.internal.NetworkAttributes; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.SemanticAttributes; +import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import javax.net.ssl.SSLContext; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class Netty40ClientSslTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + private static HttpClientTestServer server; + private static EventLoopGroup eventLoopGroup; + + @BeforeAll + static void setup() { + server = new HttpClientTestServer(testing.getOpenTelemetry()); + server.start(); + eventLoopGroup = new NioEventLoopGroup(); + } + + @AfterAll + static void cleanup() throws InterruptedException, ExecutionException, TimeoutException { + eventLoopGroup.shutdownGracefully(); + server.stop().get(10, TimeUnit.SECONDS); + } + + @Test + public void shouldFailSslHandshake() { + Bootstrap bootstrap = createBootstrap(eventLoopGroup, Collections.singletonList("SSLv3")); + + URI uri = server.resolveHttpsAddress("/success"); + DefaultFullHttpRequest request = + new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getPath(), Unpooled.EMPTY_BUFFER); + HttpHeaders.setHost(request, uri.getHost() + ":" + uri.getPort()); + + Throwable thrownException = getThrowable(bootstrap, uri, request); + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("parent") + .hasStatus(StatusData.error()) + .hasException(thrownException), + span -> { + span.hasName("CONNECT").hasKind(INTERNAL).hasParent(trace.getSpan(0)); + span.hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NETWORK_TRANSPORT, "tcp"), + equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"), + equalTo(SemanticAttributes.SERVER_ADDRESS, uri.getHost()), + equalTo(SemanticAttributes.SERVER_PORT, uri.getPort()), + equalTo(NetworkAttributes.NETWORK_PEER_PORT, uri.getPort()), + equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1")); + }, + span -> { + span.hasName("SSL handshake") + .hasKind(INTERNAL) + .hasParent(trace.getSpan(0)) + .hasStatus(StatusData.error()); + span.hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NETWORK_TRANSPORT, "tcp"), + equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"), + equalTo(NetworkAttributes.NETWORK_PEER_PORT, uri.getPort()), + equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1")); + })); + } + + private static Throwable getThrowable( + Bootstrap bootstrap, URI uri, DefaultFullHttpRequest request) { + AtomicReference channel = new AtomicReference<>(); + Throwable thrown = + catchThrowable( + () -> + testing.runWithSpan( + "parent", + () -> { + try { + channel.set( + bootstrap.connect(uri.getHost(), uri.getPort()).sync().channel()); + CompletableFuture result = new CompletableFuture<>(); + channel.get().pipeline().addLast(new ClientHandler(result)); + channel.get().writeAndFlush(request).get(10, TimeUnit.SECONDS); + result.get(10, TimeUnit.SECONDS); + } finally { + if (channel.get() != null) { + channel.get().close(); + } + } + })); + + // Then + Throwable thrownException; + if (thrown instanceof ExecutionException) { + thrownException = thrown.getCause(); + } else { + thrownException = thrown; + } + return thrownException; + } + + @SuppressWarnings("InterruptedExceptionSwallowed") + @Test + public void shouldSuccessfullyEstablishSslHandshake() throws Exception { + // given + Bootstrap bootstrap = + createBootstrap(eventLoopGroup, Arrays.asList("TLSv1", "TLSv1.1", "TLSv1.2", "TLSv1.3")); + + URI uri = server.resolveHttpsAddress("/success"); + DefaultFullHttpRequest request = + new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getPath(), Unpooled.EMPTY_BUFFER); + HttpHeaders.setHost(request, uri.getHost() + ":" + uri.getPort()); + + AtomicReference channel = new AtomicReference<>(); + // when + testing.runWithSpan( + "parent", + () -> { + try { + channel.set(bootstrap.connect(uri.getHost(), uri.getPort()).sync().channel()); + CompletableFuture result = new CompletableFuture<>(); + channel.get().pipeline().addLast(new ClientHandler(result)); + channel.get().writeAndFlush(request).get(10, TimeUnit.SECONDS); + result.get(10, TimeUnit.SECONDS); + } finally { + if (channel.get() != null) { + channel.get().close(); + } + } + }); + + // then + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent"), + span -> { + span.hasName("CONNECT").hasKind(INTERNAL).hasParent(trace.getSpan(0)); + span.hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NETWORK_TRANSPORT, "tcp"), + equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"), + equalTo(SemanticAttributes.SERVER_ADDRESS, uri.getHost()), + equalTo(SemanticAttributes.SERVER_PORT, uri.getPort()), + equalTo(NetworkAttributes.NETWORK_PEER_PORT, uri.getPort()), + equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1")); + }, + span -> { + span.hasName("SSL handshake").hasKind(INTERNAL).hasParent(trace.getSpan(0)); + span.hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NETWORK_TRANSPORT, "tcp"), + equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"), + equalTo(NetworkAttributes.NETWORK_PEER_PORT, uri.getPort()), + equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1")); + }, + span -> { + span.hasName("GET").hasKind(CLIENT).hasParent(trace.getSpan(0)); + }, + span -> { + span.hasName("test-http-server").hasKind(SERVER).hasParent(trace.getSpan(3)); + })); + + if (channel.get() != null) { + channel.get().close().sync(); + } + } + + // list of default ciphers copied from netty's JdkSslContext + private static final String[] SUPPORTED_CIPHERS = + new String[] { + "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", + "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", + "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA", + "TLS_RSA_WITH_AES_128_GCM_SHA256", + "TLS_RSA_WITH_AES_128_CBC_SHA", + "TLS_RSA_WITH_AES_256_CBC_SHA", + "SSL_RSA_WITH_3DES_EDE_CBC_SHA" + }; + + private static Bootstrap createBootstrap( + EventLoopGroup eventLoopGroup, List enabledProtocols) { + Bootstrap bootstrap = new Bootstrap(); + bootstrap + .group(eventLoopGroup) + .channel(NioSocketChannel.class) + .handler( + new ChannelInitializer() { + @Override + protected void initChannel(@NotNull SocketChannel socketChannel) throws Exception { + ChannelPipeline pipeline = socketChannel.pipeline(); + + SSLContext sslContext = SSLContext.getInstance("TLS"); + sslContext.init(null, null, null); + javax.net.ssl.SSLEngine sslEngine = sslContext.createSSLEngine(); + sslEngine.setUseClientMode(true); + sslEngine.setEnabledProtocols(enabledProtocols.toArray(new String[0])); + sslEngine.setEnabledCipherSuites(SUPPORTED_CIPHERS); + pipeline.addLast(new SslHandler(sslEngine)); + + pipeline.addLast(new HttpClientCodec()); + } + }); + return bootstrap; + } +} diff --git a/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/client/Netty40ClientTest.java b/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/client/Netty40ClientTest.java new file mode 100644 index 0000000000..3daca927a9 --- /dev/null +++ b/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/client/Netty40ClientTest.java @@ -0,0 +1,183 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.netty.v4_0.client; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions; +import io.opentelemetry.semconv.SemanticAttributes; +import java.net.URI; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.extension.RegisterExtension; + +class Netty40ClientTest extends AbstractHttpClientTest { + + @RegisterExtension + static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent(); + + private final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); + + private final Bootstrap bootstrap = buildBootstrap(false); + + private final Bootstrap readTimeoutBootstrap = buildBootstrap(true); + + void cleanupSpec() { + if (eventLoopGroup != null) { + eventLoopGroup.shutdownGracefully(); + } + } + + Bootstrap buildBootstrap(boolean readTimeout) { + Bootstrap bootstrap = new Bootstrap(); + bootstrap + .group(eventLoopGroup) + .channel(NioSocketChannel.class) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) + .handler( + new ChannelInitializer() { + @Override + protected void initChannel(@NotNull SocketChannel socketChannel) throws Exception { + ChannelPipeline pipeline = socketChannel.pipeline(); + if (readTimeout) { + pipeline.addLast(new ReadTimeoutHandler(2000, TimeUnit.MILLISECONDS)); + } + pipeline.addLast(new HttpClientCodec()); + } + }); + + return bootstrap; + } + + private Bootstrap getBootstrap(URI uri) { + if ("/read-timeout".equals(uri.getPath())) { + return readTimeoutBootstrap; + } + return bootstrap; + } + + @Override + public DefaultFullHttpRequest buildRequest(String method, URI uri, Map headers) { + String target = uri.getPath(); + if (uri.getQuery() != null) { + target += "?" + uri.getQuery(); + } + DefaultFullHttpRequest request = + new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), target, Unpooled.EMPTY_BUFFER); + HttpHeaders.setHost(request, uri.getHost() + ":" + uri.getPort()); + request.headers().set("user-agent", "Netty"); + headers.forEach((k, v) -> request.headers().set(k, v)); + return request; + } + + @Override + public int sendRequest( + DefaultFullHttpRequest request, String method, URI uri, Map headers) + throws Exception { + Channel channel = getBootstrap(uri).connect(uri.getHost(), getPort(uri)).sync().channel(); + CompletableFuture result = new CompletableFuture<>(); + channel.pipeline().addLast(new ClientHandler(result)); + channel.writeAndFlush(request).get(); + return result.get(20, TimeUnit.SECONDS); + } + + @Override + public void sendRequestWithCallback( + DefaultFullHttpRequest request, + String method, + URI uri, + Map headers, + HttpClientResult httpClientResult) + throws Exception { + Channel ch; + try { + ch = getBootstrap(uri).connect(uri.getHost(), getPort(uri)).sync().channel(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } catch (Throwable th) { + httpClientResult.complete(th); + return; + } + CompletableFuture result = new CompletableFuture<>(); + result.whenComplete((status, throwable) -> httpClientResult.complete(() -> status, throwable)); + ch.pipeline().addLast(new ClientHandler(result)); + ch.writeAndFlush(request); + } + + @Override + protected void configure(HttpClientTestOptions.Builder optionsBuilder) { + optionsBuilder.disableTestRedirects(); + optionsBuilder.disableTestHttps(); + optionsBuilder.disableTestReadTimeout(); + + optionsBuilder.setExpectedClientSpanNameMapper(Netty40ClientTest::expectedClientSpanName); + optionsBuilder.setHttpAttributes(Netty40ClientTest::httpAttributes); + } + + private static int getPort(URI uri) { + int port = uri.getPort(); + if (port == -1) { + switch (uri.getScheme()) { + case "http": + return 80; + case "https": + return 443; + default: + throw new IllegalArgumentException("Unknown scheme: " + uri.getScheme()); + } + } + return port; + } + + private static String expectedClientSpanName(URI uri, String method) { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "http://192.0.2.1/": // non routable address + return "CONNECT"; + default: + return HttpClientTestOptions.DEFAULT_EXPECTED_CLIENT_SPAN_NAME_MAPPER.apply(uri, method); + } + } + + @SuppressWarnings("MissingDefault") + private static Set> httpAttributes(URI uri) { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "http://192.0.2.1/": // non routable address + return Collections.emptySet(); + } + Set> attributes = new HashSet<>(HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES); + attributes.remove(SemanticAttributes.SERVER_ADDRESS); + attributes.remove(SemanticAttributes.SERVER_PORT); + return attributes; + } +} diff --git a/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/client/Netty40ConnectionSpanTest.java b/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/client/Netty40ConnectionSpanTest.java new file mode 100644 index 0000000000..5e470d01e8 --- /dev/null +++ b/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/client/Netty40ConnectionSpanTest.java @@ -0,0 +1,176 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.netty.v4_0.client; + +import static io.opentelemetry.api.trace.SpanKind.CLIENT; +import static io.opentelemetry.api.trace.SpanKind.INTERNAL; +import static io.opentelemetry.api.trace.SpanKind.SERVER; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; +import io.opentelemetry.instrumentation.api.semconv.network.internal.NetworkAttributes; +import io.opentelemetry.instrumentation.test.utils.PortUtils; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer; +import io.opentelemetry.semconv.SemanticAttributes; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class Netty40ConnectionSpanTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + private static HttpClientTestServer server; + private static final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); + private static final Bootstrap bootstrap = buildBootstrap(); + + @BeforeAll + static void setupSpec() { + server = new HttpClientTestServer(testing.getOpenTelemetry()); + server.start(); + } + + @AfterAll + static void cleanupSpec() throws InterruptedException, ExecutionException, TimeoutException { + eventLoopGroup.shutdownGracefully(); + server.stop().get(10, TimeUnit.SECONDS); + } + + static Bootstrap buildBootstrap() { + Bootstrap bootstrap = new Bootstrap(); + bootstrap + .group(eventLoopGroup) + .channel(NioSocketChannel.class) + .handler( + new ChannelInitializer() { + @Override + protected void initChannel(@NotNull SocketChannel socketChannel) throws Exception { + ChannelPipeline pipeline = socketChannel.pipeline(); + pipeline.addLast(new HttpClientCodec()); + } + }); + + return bootstrap; + } + + @Test + void successfulRequest() throws Exception { + // when + URI uri = URI.create("http://localhost:" + server.httpPort() + "/success"); + + DefaultFullHttpRequest request = buildRequest("GET", uri, new HashMap<>()); + int responseCode = testing.runWithSpan("parent", () -> sendRequest(request, uri)); + + // then + assertThat(responseCode).isEqualTo(200); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(INTERNAL).hasNoParent(), + span -> { + span.hasName("CONNECT").hasKind(INTERNAL).hasParent(trace.getSpan(0)); + span.hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NETWORK_TRANSPORT, "tcp"), + equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"), + equalTo(SemanticAttributes.SERVER_ADDRESS, uri.getHost()), + equalTo(SemanticAttributes.SERVER_PORT, uri.getPort()), + equalTo(NetworkAttributes.NETWORK_PEER_PORT, uri.getPort()), + equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1")); + }, + span -> span.hasName("GET").hasKind(CLIENT).hasParent(trace.getSpan(0)), + span -> + span.hasName("test-http-server").hasKind(SERVER).hasParent(trace.getSpan(2)))); + } + + @Test + void failedRequest() throws Exception { + // when + URI uri = URI.create("http://localhost:" + PortUtils.UNUSABLE_PORT); + + DefaultFullHttpRequest request = buildRequest("GET", uri, new HashMap<>()); + Throwable thrown = + catchThrowable(() -> testing.runWithSpan("parent", () -> sendRequest(request, uri))); + + // then + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(INTERNAL).hasNoParent().hasException(thrown), + span -> { + span.hasName("CONNECT").hasKind(INTERNAL).hasParent(trace.getSpan(0)); + span.hasAttributesSatisfying( + equalTo(SemanticAttributes.NETWORK_TRANSPORT, "tcp")); + satisfies( + SemanticAttributes.NETWORK_TYPE, + val -> + val.satisfiesAnyOf( + v -> assertThat(val).isNull(), v -> assertThat(v).isEqualTo("ipv4"))); + span.hasAttributesSatisfying( + equalTo(SemanticAttributes.SERVER_ADDRESS, uri.getHost()), + equalTo(SemanticAttributes.SERVER_PORT, uri.getPort())); + satisfies( + NetworkAttributes.NETWORK_PEER_PORT, + val -> + val.satisfiesAnyOf( + v -> assertThat(val).isNull(), + v -> assertThat(v).isEqualTo(uri.getPort()))); + satisfies( + NetworkAttributes.NETWORK_PEER_ADDRESS, + val -> + val.satisfiesAnyOf( + v -> assertThat(val).isNull(), + v -> assertThat(v).isEqualTo("127.0.0.1"))); + })); + } + + private static DefaultFullHttpRequest buildRequest( + String method, URI uri, Map headers) { + DefaultFullHttpRequest request = + new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri.getPath(), Unpooled.EMPTY_BUFFER); + HttpHeaders.setHost(request, uri.getHost() + ":" + uri.getPort()); + headers.forEach((k, v) -> request.headers().set(k, v)); + return request; + } + + private static int sendRequest(DefaultFullHttpRequest request, URI uri) + throws InterruptedException, ExecutionException, TimeoutException { + Channel channel = bootstrap.connect(uri.getHost(), uri.getPort()).sync().channel(); + CompletableFuture result = new CompletableFuture(); + channel.pipeline().addLast(new ClientHandler(result)); + channel.writeAndFlush(request).get(); + return result.get(20, TimeUnit.SECONDS); + } +} diff --git a/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/server/Netty40ServerTest.java b/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/server/Netty40ServerTest.java new file mode 100644 index 0000000000..dedebca21f --- /dev/null +++ b/instrumentation/netty/netty-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/server/Netty40ServerTest.java @@ -0,0 +1,222 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.netty.v4_0.server; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.CAPTURE_HEADERS; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.ERROR; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.INDEXED_CHILD; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.NOT_FOUND; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.QUERY_PARAM; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.REDIRECT; +import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpRequestDecoder; +import io.netty.handler.codec.http.HttpResponseEncoder; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.QueryStringDecoder; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.util.CharsetUtil; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTest; +import io.opentelemetry.instrumentation.testing.junit.http.HttpServerInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpServerTestOptions; +import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint; +import io.opentelemetry.semconv.SemanticAttributes; +import java.net.URI; +import java.util.HashSet; +import java.util.Set; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.extension.RegisterExtension; + +class Netty40ServerTest extends AbstractHttpServerTest { + + @RegisterExtension + static final InstrumentationExtension testing = HttpServerInstrumentationExtension.forAgent(); + + private static final LoggingHandler LOGGING_HANDLER = + new LoggingHandler(Netty40ServerTest.class, LogLevel.DEBUG); + + @Override + protected EventLoopGroup setupServer() throws InterruptedException { + EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); + + ServerBootstrap serverBootstrap = + new ServerBootstrap() + .group(eventLoopGroup) + .handler(LOGGING_HANDLER) + .childHandler( + new ChannelInitializer() { + @Override + protected void initChannel(@NotNull SocketChannel socketChannel) + throws Exception { + ChannelPipeline pipeline = socketChannel.pipeline(); + pipeline.addFirst("logger", LOGGING_HANDLER); + pipeline.addLast(new HttpRequestDecoder()); + pipeline.addLast(new HttpResponseEncoder()); + pipeline.addLast( + new SimpleChannelInboundHandler() { + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) + throws Exception { + if (!(msg instanceof HttpRequest)) { + return; + } + HttpRequest request = (HttpRequest) msg; + URI uri = URI.create(request.getUri()); + ServerEndpoint endpoint = ServerEndpoint.forPath(uri.getPath()); + ctx.write( + controller( + endpoint, + () -> { + ByteBuf content; + FullHttpResponse response; + if (endpoint.equals(SUCCESS) || endpoint.equals(ERROR)) { + content = + Unpooled.copiedBuffer( + endpoint.getBody(), CharsetUtil.UTF_8); + response = + new DefaultFullHttpResponse( + HTTP_1_1, + HttpResponseStatus.valueOf(endpoint.getStatus()), + content); + } else if (endpoint.equals(INDEXED_CHILD)) { + content = Unpooled.EMPTY_BUFFER; + endpoint.collectSpanAttributes( + it -> + new QueryStringDecoder(uri) + .parameters() + .get(it) + .get(0)); + response = + new DefaultFullHttpResponse( + HTTP_1_1, + HttpResponseStatus.valueOf(endpoint.getStatus()), + content); + } else if (endpoint.equals(QUERY_PARAM)) { + content = + Unpooled.copiedBuffer( + uri.getQuery(), CharsetUtil.UTF_8); + response = + new DefaultFullHttpResponse( + HTTP_1_1, + HttpResponseStatus.valueOf(endpoint.getStatus()), + content); + } else if (endpoint.equals(REDIRECT)) { + content = Unpooled.EMPTY_BUFFER; + response = + new DefaultFullHttpResponse( + HTTP_1_1, + HttpResponseStatus.valueOf(endpoint.getStatus()), + content); + response + .headers() + .set(HttpHeaders.Names.LOCATION, endpoint.getBody()); + } else if (endpoint.equals(CAPTURE_HEADERS)) { + content = + Unpooled.copiedBuffer( + endpoint.getBody(), CharsetUtil.UTF_8); + response = + new DefaultFullHttpResponse( + HTTP_1_1, + HttpResponseStatus.valueOf(endpoint.getStatus()), + content); + response + .headers() + .set( + "X-Test-Response", + request.headers().get("X-Test-Request")); + } else if (endpoint.equals(EXCEPTION)) { + throw new IllegalArgumentException(endpoint.getBody()); + } else { + content = + Unpooled.copiedBuffer( + NOT_FOUND.getBody(), CharsetUtil.UTF_8); + response = + new DefaultFullHttpResponse( + HTTP_1_1, + HttpResponseStatus.valueOf(NOT_FOUND.getStatus()), + content); + } + + response.headers().set(CONTENT_TYPE, "text/plain"); + response + .headers() + .set(CONTENT_LENGTH, content.readableBytes()); + return response; + })); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable ex) + throws Exception { + ByteBuf content = + Unpooled.copiedBuffer(ex.getMessage(), CharsetUtil.UTF_8); + FullHttpResponse response = + new DefaultFullHttpResponse( + HTTP_1_1, INTERNAL_SERVER_ERROR, content); + response.headers().set(CONTENT_TYPE, "text/plain"); + response.headers().set(CONTENT_LENGTH, content.readableBytes()); + ctx.write(response); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) + throws Exception { + ctx.flush(); + } + }); + } + }) + .channel(NioServerSocketChannel.class); + serverBootstrap.bind(port).sync(); + + return eventLoopGroup; + } + + @Override + protected void stopServer(EventLoopGroup server) { + if (server != null) { + server.shutdownGracefully(); + } + } + + @Override + protected void configure(HttpServerTestOptions options) { + options.setHttpAttributes( + serverEndpoint -> { + Set> attributes = + new HashSet<>(HttpServerTestOptions.DEFAULT_HTTP_ATTRIBUTES); + attributes.remove(SemanticAttributes.HTTP_ROUTE); + return attributes; + }); + + options.setExpectedException(new IllegalArgumentException(EXCEPTION.getBody())); + options.setHasResponseCustomizer(serverEndpoint -> true); + } +}