Convert netty 4.0 test to java (#9976)

Co-authored-by: Lauri Tulmin <ltulmin@splunk.com>
This commit is contained in:
Helen 2024-02-26 11:01:35 -08:00 committed by GitHub
parent f61308c87a
commit 9a2bad67f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 1161 additions and 948 deletions

View File

@ -167,16 +167,13 @@ public abstract class AbstractNettyChannelPipelineInstrumentation implements Typ
pipeline.remove(ourHandler);
}
virtualField.set(handler, null);
} else if (handler
.getClass()
.getName()
.startsWith("io.opentelemetry.javaagent.instrumentation.netty.")) {
handler = pipeline.removeLast();
} else if (handler
.getClass()
.getName()
.startsWith("io.opentelemetry.instrumentation.netty.")) {
handler = pipeline.removeLast();
} else {
String handlerClassName = handler.getClass().getName();
if (handlerClassName.endsWith("TracingHandler")
&& (handlerClassName.startsWith("io.opentelemetry.javaagent.instrumentation.netty.")
|| handlerClassName.startsWith("io.opentelemetry.instrumentation.netty."))) {
handler = pipeline.removeLast();
}
}
}
}
@ -210,8 +207,9 @@ public abstract class AbstractNettyChannelPipelineInstrumentation implements Typ
for (Iterator<ChannelHandler> iterator = map.values().iterator(); iterator.hasNext(); ) {
ChannelHandler handler = iterator.next();
String handlerClassName = handler.getClass().getName();
if (handlerClassName.startsWith("io.opentelemetry.javaagent.instrumentation.netty.")
|| handlerClassName.startsWith("io.opentelemetry.instrumentation.netty.")) {
if (handlerClassName.endsWith("TracingHandler")
&& (handlerClassName.startsWith("io.opentelemetry.javaagent.instrumentation.netty.")
|| handlerClassName.startsWith("io.opentelemetry.instrumentation.netty."))) {
iterator.remove();
}
}

View File

@ -1,74 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.embedded.EmbeddedChannel
import io.netty.util.concurrent.Future
import io.netty.util.concurrent.GenericFutureListener
import io.netty.util.concurrent.GenericProgressiveFutureListener
import io.netty.util.concurrent.ProgressiveFuture
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
class ChannelFutureTest extends AgentInstrumentationSpecification {
// regression test for https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2705
def "should clean up wrapped listeners"() {
given:
def channel = new EmbeddedChannel(new EmptyChannelHandler())
def counter = new AtomicInteger()
def listener1 = newListener(counter)
channel.closeFuture().addListener(listener1)
channel.closeFuture().removeListener(listener1)
def listener2 = newListener(counter)
def listener3 = newProgressiveListener(counter)
channel.closeFuture().addListeners(listener2, listener3)
channel.closeFuture().removeListeners(listener2, listener3)
when:
channel.close().await(5, TimeUnit.SECONDS)
then:
counter.get() == 0
}
private static GenericFutureListener newListener(AtomicInteger counter) {
new GenericFutureListener() {
void operationComplete(Future future) throws Exception {
counter.incrementAndGet()
}
}
}
private static GenericFutureListener newProgressiveListener(AtomicInteger counter) {
new GenericProgressiveFutureListener() {
void operationProgressed(ProgressiveFuture future, long progress, long total) throws Exception {
counter.incrementAndGet()
}
void operationComplete(Future future) throws Exception {
counter.incrementAndGet()
}
}
}
private static class EmptyChannelHandler implements ChannelHandler {
@Override
void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
@Override
void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
@Override
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
}
}

View File

@ -1,165 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.netty.channel.ChannelHandlerAdapter
import io.netty.channel.DefaultChannelPipeline
import io.netty.channel.embedded.EmbeddedChannel
import io.netty.handler.codec.http.HttpClientCodec
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.javaagent.instrumentation.netty.v4_0.client.HttpClientTracingHandler
import spock.lang.Unroll
@Unroll
class ChannelPipelineTest extends AgentInstrumentationSpecification {
// regression test for https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1373
// and https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4040
def "test remove our handler #testName"() {
setup:
def channel = new EmbeddedChannel(new NoopChannelHandler())
def channelPipeline = new DefaultChannelPipeline(channel)
def handler = new HttpClientCodec()
when:
// no handlers
channelPipeline.first() == null
channelPipeline.last() == null
then:
// add handler
channelPipeline.addLast("http", handler)
channelPipeline.first() == handler
// our handler was also added
channelPipeline.last().getClass().getSimpleName() == "HttpClientTracingHandler"
and:
removeMethod.call(channelPipeline, handler)
// removing handler also removes our handler
channelPipeline.first() == null || "io.netty.channel.DefaultChannelPipeline\$TailHandler" == channelPipeline.first().getClass().getName()
channelPipeline.last() == null
where:
testName | removeMethod
"by instance" | { pipeline, h -> pipeline.remove(h) }
"by class" | { pipeline, h -> pipeline.remove(h.getClass()) }
"by name" | { pipeline, h -> pipeline.remove("http") }
"first" | { pipeline, h -> pipeline.removeFirst() }
}
// regression test for https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4040
def "should replace handler #desc"() {
setup:
def channel = new EmbeddedChannel(new NoopChannelHandler())
def channelPipeline = new DefaultChannelPipeline(channel)
def httpHandler = new HttpClientCodec()
expect: "no handlers initially"
channelPipeline.size() == 0
when:
def noopHandler = new NoopChannelHandler()
channelPipeline.addFirst("test", noopHandler)
then: "only the noop handler"
channelPipeline.size() == 1
channelPipeline.first() == noopHandler
when:
replaceMethod(channelPipeline, "test", noopHandler, "http", httpHandler)
then: "noop handler was removed; http and instrumentation handlers were added"
channelPipeline.size() == 1
channelPipeline.first() == httpHandler
channelPipeline.last().getClass().getSimpleName() == "HttpClientTracingHandler"
when:
def anotherNoopHandler = new NoopChannelHandler()
replaceMethod(channelPipeline, "http", httpHandler, "test", anotherNoopHandler)
then: "http and instrumentation handlers were removed; noop handler was added"
channelPipeline.size() == 1
channelPipeline.first() == anotherNoopHandler
where:
desc | replaceMethod
"by instance" | { pipeline, oldName, oldHandler, newName, newHandler -> pipeline.replace(oldHandler, newName, newHandler) }
"by class" | { pipeline, oldName, oldHandler, newName, newHandler -> pipeline.replace(oldHandler.getClass(), newName, newHandler) }
"by name" | { pipeline, oldName, oldHandler, newName, newHandler -> pipeline.replace(oldName, newName, newHandler) }
}
// regression test for https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4056
def "should addAfter and removeLast handler #desc"() {
setup:
def channel = new EmbeddedChannel(new NoopChannelHandler())
def channelPipeline = new DefaultChannelPipeline(channel)
def httpHandler = new HttpClientCodec()
expect: "no handlers initially"
channelPipeline.size() == 0
when:
channelPipeline.addLast("http", httpHandler)
then: "add http and instrumentation handlers"
channelPipeline.size() == 1
channelPipeline.first() == httpHandler
channelPipeline.last().getClass().getSimpleName() == "HttpClientTracingHandler"
when:
def noopHandler = new NoopChannelHandler()
channelPipeline.addAfter("http", "noop", noopHandler)
then: "instrumentation handler is between with http and noop"
channelPipeline.size() == 2
channelPipeline.first() == httpHandler
channelPipeline.last() == noopHandler
when:
channelPipeline.removeLast()
then: "http and instrumentation handlers will be remained"
channelPipeline.size() == 1
channelPipeline.first() == httpHandler
channelPipeline.last().getClass().getSimpleName() == "HttpClientTracingHandler"
when:
def removed = channelPipeline.removeLast()
then: "there is no handler in pipeline"
channelPipeline.size() == 0
// removing tracing handler also removes the http handler and returns it
removed == httpHandler
}
// regression test for https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/10377
def "our handler not in handlers map"() {
setup:
def channel = new EmbeddedChannel(new NoopChannelHandler())
def channelPipeline = new DefaultChannelPipeline(channel)
def handler = new HttpClientCodec()
when:
// no handlers
channelPipeline.first() == null
then:
// add handler
channelPipeline.addLast("http", handler)
channelPipeline.first() == handler
// our handler was also added
channelPipeline.last().getClass().simpleName == "HttpClientTracingHandler"
// our handler not counted
channelPipeline.size() == 1
// our handler is not in handlers map
channelPipeline.toMap().size() == 1
// our handler is not in handlers iterator
def list = []
channelPipeline.iterator().forEachRemaining {list.add(it) }
list.size() == 1
}
private static class NoopChannelHandler extends ChannelHandlerAdapter {
}
}

View File

@ -1,219 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelPipeline
import io.netty.channel.EventLoopGroup
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http.DefaultFullHttpRequest
import io.netty.handler.codec.http.HttpClientCodec
import io.netty.handler.codec.http.HttpHeaders
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.ssl.SslHandler
import io.opentelemetry.instrumentation.api.semconv.network.internal.NetworkAttributes
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer
import io.opentelemetry.semconv.SemanticAttributes
import spock.lang.Shared
import javax.net.ssl.SSLContext
import javax.net.ssl.SSLHandshakeException
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.SERVER
import static io.opentelemetry.api.trace.StatusCode.ERROR
class Netty40ClientSslTest extends AgentInstrumentationSpecification {
@Shared
HttpClientTestServer server
@Shared
EventLoopGroup eventLoopGroup
def setupSpec() {
server = new HttpClientTestServer(openTelemetry)
server.start()
eventLoopGroup = new NioEventLoopGroup()
}
def cleanupSpec() {
server.stop().get(10, TimeUnit.SECONDS)
eventLoopGroup.shutdownGracefully()
}
def "should fail SSL handshake"() {
given:
def bootstrap = createBootstrap(eventLoopGroup, ["SSLv3"])
def uri = server.resolveHttpsAddress("/success")
def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.path, Unpooled.EMPTY_BUFFER)
HttpHeaders.setHost(request, uri.host + ":" + uri.port)
when:
Channel channel = null
runWithSpan("parent") {
channel = bootstrap.connect(uri.host, uri.port).sync().channel()
def result = new CompletableFuture<Integer>()
channel.pipeline().addLast(new ClientHandler(result))
channel.writeAndFlush(request).get(10, TimeUnit.SECONDS)
result.get(10, TimeUnit.SECONDS)
}
then:
Throwable thrownException = thrown()
if (thrownException instanceof ExecutionException) {
thrownException = thrownException.cause
}
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
status ERROR
errorEvent(thrownException.class, thrownException.message)
}
span(1) {
name "CONNECT"
kind INTERNAL
childOf span(0)
attributes {
"$SemanticAttributes.NETWORK_TRANSPORT" "tcp"
"$SemanticAttributes.NETWORK_TYPE" "ipv4"
"$SemanticAttributes.SERVER_ADDRESS" uri.host
"$SemanticAttributes.SERVER_PORT" uri.port
"$NetworkAttributes.NETWORK_PEER_PORT" uri.port
"$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1"
}
}
span(2) {
name "SSL handshake"
kind INTERNAL
childOf span(0)
status ERROR
// netty swallows the exception, it doesn't make any sense to hard-code the message
errorEventWithAnyMessage(SSLHandshakeException)
attributes {
"$SemanticAttributes.NETWORK_TRANSPORT" "tcp"
"$SemanticAttributes.NETWORK_TYPE" "ipv4"
"$NetworkAttributes.NETWORK_PEER_PORT" uri.port
"$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1"
}
}
}
}
cleanup:
channel?.close()?.sync()
}
def "should successfully establish SSL handshake"() {
given:
def bootstrap = createBootstrap(eventLoopGroup, ["TLSv1", "TLSv1.1", "TLSv1.2", "TLSv1.3"])
def uri = server.resolveHttpsAddress("/success")
def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.path, Unpooled.EMPTY_BUFFER)
HttpHeaders.setHost(request, uri.host + ":" + uri.port)
when:
Channel channel = null
runWithSpan("parent") {
channel = bootstrap.connect(uri.host, uri.port).sync().channel()
def result = new CompletableFuture<Integer>()
channel.pipeline().addLast(new ClientHandler(result))
channel.writeAndFlush(request).get(10, TimeUnit.SECONDS)
result.get(10, TimeUnit.SECONDS)
}
then:
assertTraces(1) {
trace(0, 5) {
span(0) {
name "parent"
}
span(1) {
name "CONNECT"
kind INTERNAL
childOf span(0)
attributes {
"$SemanticAttributes.NETWORK_TRANSPORT" "tcp"
"$SemanticAttributes.NETWORK_TYPE" "ipv4"
"$SemanticAttributes.SERVER_ADDRESS" uri.host
"$SemanticAttributes.SERVER_PORT" uri.port
"$NetworkAttributes.NETWORK_PEER_PORT" uri.port
"$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1"
}
}
span(2) {
name "SSL handshake"
kind INTERNAL
childOf span(0)
attributes {
"$SemanticAttributes.NETWORK_TRANSPORT" "tcp"
"$SemanticAttributes.NETWORK_TYPE" "ipv4"
"$NetworkAttributes.NETWORK_PEER_PORT" uri.port
"$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1"
}
}
span(3) {
name "GET"
kind CLIENT
childOf(span(0))
}
span(4) {
name "test-http-server"
kind SERVER
childOf(span(3))
}
}
}
cleanup:
channel?.close()?.sync()
}
// list of default ciphers copied from netty's JdkSslContext
private static final String[] SUPPORTED_CIPHERS = [
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA",
"TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA",
"TLS_RSA_WITH_AES_128_GCM_SHA256",
"TLS_RSA_WITH_AES_128_CBC_SHA",
"TLS_RSA_WITH_AES_256_CBC_SHA",
"SSL_RSA_WITH_3DES_EDE_CBC_SHA"
]
private static Bootstrap createBootstrap(EventLoopGroup eventLoopGroup, List<String> enabledProtocols) {
def bootstrap = new Bootstrap()
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline()
def sslContext = SSLContext.getInstance("TLS")
sslContext.init(null, null, null)
def sslEngine = sslContext.createSSLEngine()
sslEngine.setUseClientMode(true)
sslEngine.setEnabledProtocols(enabledProtocols as String[])
sslEngine.setEnabledCipherSuites(SUPPORTED_CIPHERS)
pipeline.addLast(new SslHandler(sslEngine))
pipeline.addLast(new HttpClientCodec())
}
})
bootstrap
}
}

View File

@ -1,145 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPipeline
import io.netty.channel.EventLoopGroup
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http.DefaultFullHttpRequest
import io.netty.handler.codec.http.HttpClientCodec
import io.netty.handler.codec.http.HttpHeaders
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.timeout.ReadTimeoutHandler
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult
import io.opentelemetry.semconv.SemanticAttributes
import spock.lang.Shared
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
class Netty40ClientTest extends HttpClientTest<DefaultFullHttpRequest> implements AgentTestTrait {
@Shared
private EventLoopGroup eventLoopGroup = new NioEventLoopGroup()
@Shared
private Bootstrap bootstrap = buildBootstrap()
@Shared
private Bootstrap readTimeoutBootstrap = buildBootstrap(true)
def cleanupSpec() {
eventLoopGroup?.shutdownGracefully()
}
Bootstrap buildBootstrap(boolean readTimeout = false) {
Bootstrap bootstrap = new Bootstrap()
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline()
if (readTimeout) {
pipeline.addLast(new ReadTimeoutHandler(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS))
}
pipeline.addLast(new HttpClientCodec())
}
})
return bootstrap
}
Bootstrap getBootstrap(URI uri) {
if (uri.getPath() == "/read-timeout") {
return readTimeoutBootstrap
}
return bootstrap
}
@Override
DefaultFullHttpRequest buildRequest(String method, URI uri, Map<String, String> headers) {
def target = uri.path
if (uri.query != null) {
target += "?" + uri.query
}
def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), target, Unpooled.EMPTY_BUFFER)
HttpHeaders.setHost(request, uri.host + ":" + uri.port)
request.headers().set("user-agent", "Netty")
headers.each { k, v -> request.headers().set(k, v) }
return request
}
@Override
int sendRequest(DefaultFullHttpRequest request, String method, URI uri, Map<String, String> headers) {
def channel = getBootstrap(uri).connect(uri.host, getPort(uri)).sync().channel()
def result = new CompletableFuture<Integer>()
channel.pipeline().addLast(new ClientHandler(result))
channel.writeAndFlush(request).get()
return result.get(20, TimeUnit.SECONDS)
}
@Override
void sendRequestWithCallback(DefaultFullHttpRequest request, String method, URI uri, Map<String, String> headers, HttpClientResult requestResult) {
Channel ch
try {
ch = getBootstrap(uri).connect(uri.host, getPort(uri)).sync().channel()
} catch (Exception exception) {
requestResult.complete(exception)
return
}
def result = new CompletableFuture<Integer>()
result.whenComplete { status, throwable ->
requestResult.complete({ status }, throwable)
}
ch.pipeline().addLast(new ClientHandler(result))
ch.writeAndFlush(request)
}
@Override
String expectedClientSpanName(URI uri, String method) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "http://192.0.2.1/": // non routable address
return "CONNECT"
default:
return super.expectedClientSpanName(uri, method)
}
}
@Override
Set<AttributeKey<?>> httpAttributes(URI uri) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "http://192.0.2.1/": // non routable address
return []
}
def attributes = super.httpAttributes(uri)
attributes.remove(SemanticAttributes.SERVER_ADDRESS)
attributes.remove(SemanticAttributes.SERVER_PORT)
return attributes
}
@Override
boolean testRedirects() {
false
}
@Override
boolean testHttps() {
false
}
}

View File

@ -1,169 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelPipeline
import io.netty.channel.EventLoopGroup
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http.DefaultFullHttpRequest
import io.netty.handler.codec.http.HttpClientCodec
import io.netty.handler.codec.http.HttpHeaders
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpVersion
import io.opentelemetry.instrumentation.api.semconv.network.internal.NetworkAttributes
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.instrumentation.test.utils.PortUtils
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer
import io.opentelemetry.semconv.SemanticAttributes
import spock.lang.Shared
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.SERVER
import static io.opentelemetry.api.trace.StatusCode.ERROR
class Netty40ConnectionSpanTest extends InstrumentationSpecification implements AgentTestTrait {
@Shared
private HttpClientTestServer server
@Shared
private EventLoopGroup eventLoopGroup = new NioEventLoopGroup()
@Shared
private Bootstrap bootstrap = buildBootstrap()
def setupSpec() {
server = new HttpClientTestServer(openTelemetry)
server.start()
}
def cleanupSpec() {
eventLoopGroup.shutdownGracefully()
server.stop()
}
Bootstrap buildBootstrap() {
Bootstrap bootstrap = new Bootstrap()
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline()
pipeline.addLast(new HttpClientCodec())
}
})
return bootstrap
}
DefaultFullHttpRequest buildRequest(String method, URI uri, Map<String, String> headers) {
def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri.path, Unpooled.EMPTY_BUFFER)
HttpHeaders.setHost(request, uri.host + ":" + uri.port)
headers.each { k, v -> request.headers().set(k, v) }
return request
}
int sendRequest(DefaultFullHttpRequest request, URI uri) {
def channel = bootstrap.connect(uri.host, uri.port).sync().channel()
def result = new CompletableFuture<Integer>()
channel.pipeline().addLast(new ClientHandler(result))
channel.writeAndFlush(request).get()
return result.get(20, TimeUnit.SECONDS)
}
def "test successful request"() {
when:
def uri = URI.create("http://localhost:${server.httpPort()}/success")
def request = buildRequest("GET", uri, [:])
def responseCode = runWithSpan("parent") {
sendRequest(request, uri)
}
then:
responseCode == 200
assertTraces(1) {
trace(0, 4) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "CONNECT"
kind INTERNAL
childOf(span(0))
attributes {
"$SemanticAttributes.NETWORK_TRANSPORT" "tcp"
"$SemanticAttributes.NETWORK_TYPE" "ipv4"
"$SemanticAttributes.SERVER_ADDRESS" uri.host
"$SemanticAttributes.SERVER_PORT" uri.port
"$NetworkAttributes.NETWORK_PEER_PORT" uri.port
"$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1"
}
}
span(2) {
name "GET"
kind CLIENT
childOf(span(0))
}
span(3) {
name "test-http-server"
kind SERVER
childOf(span(2))
}
}
}
}
def "test failing request"() {
when:
URI uri = URI.create("http://localhost:${PortUtils.UNUSABLE_PORT}")
def request = buildRequest("GET", uri, [:])
runWithSpan("parent") {
sendRequest(request, uri)
}
then:
def thrownException = thrown(Exception)
and:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
status ERROR
errorEvent(thrownException.class, thrownException.message)
}
span(1) {
name "CONNECT"
kind INTERNAL
childOf(span(0))
status ERROR
errorEvent(thrownException.class, thrownException.message)
attributes {
"$SemanticAttributes.NETWORK_TRANSPORT" "tcp"
"$SemanticAttributes.NETWORK_TYPE" { it == "ipv4" || it == null }
"$SemanticAttributes.SERVER_ADDRESS" uri.host
"$SemanticAttributes.SERVER_PORT" uri.port
"$NetworkAttributes.NETWORK_PEER_ADDRESS" { it == "127.0.0.1" || it == null }
"$NetworkAttributes.NETWORK_PEER_PORT" { it == uri.port || it == null }
}
}
}
}
}
}

View File

@ -1,153 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelPipeline
import io.netty.channel.EventLoopGroup
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.FullHttpResponse
import io.netty.handler.codec.http.HttpHeaders
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpRequestDecoder
import io.netty.handler.codec.http.HttpResponseEncoder
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.QueryStringDecoder
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import io.netty.util.CharsetUtil
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpServerTest
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint
import io.opentelemetry.semconv.SemanticAttributes
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.CAPTURE_HEADERS
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.ERROR
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.INDEXED_CHILD
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.NOT_FOUND
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.QUERY_PARAM
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.REDIRECT
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS
class Netty40ServerTest extends HttpServerTest<EventLoopGroup> implements AgentTestTrait {
static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(SERVER_LOGGER.name, LogLevel.DEBUG)
@Override
boolean hasResponseCustomizer(ServerEndpoint endpoint) {
true
}
@Override
EventLoopGroup startServer(int port) {
def eventLoopGroup = new NioEventLoopGroup()
ServerBootstrap bootstrap = new ServerBootstrap()
.group(eventLoopGroup)
.handler(LOGGING_HANDLER)
.childHandler([
initChannel: { ch ->
ChannelPipeline pipeline = ch.pipeline()
pipeline.addFirst("logger", LOGGING_HANDLER)
def handlers = [new HttpRequestDecoder(), new HttpResponseEncoder()]
handlers.each { pipeline.addLast(it) }
pipeline.addLast(new SimpleChannelInboundHandler() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest) {
def request = msg as HttpRequest
def uri = URI.create(request.uri)
ServerEndpoint endpoint = ServerEndpoint.forPath(uri.path)
ctx.write controller(endpoint) {
ByteBuf content = null
FullHttpResponse response
switch (endpoint) {
case SUCCESS:
case ERROR:
content = Unpooled.copiedBuffer(endpoint.body, CharsetUtil.UTF_8)
response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status), content)
break
case INDEXED_CHILD:
content = Unpooled.EMPTY_BUFFER
endpoint.collectSpanAttributes { new QueryStringDecoder(uri).parameters().get(it).find() }
response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status), content)
break
case QUERY_PARAM:
content = Unpooled.copiedBuffer(uri.query, CharsetUtil.UTF_8)
response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status), content)
break
case REDIRECT:
content = Unpooled.EMPTY_BUFFER
response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status), content)
response.headers().set(HttpHeaders.Names.LOCATION, endpoint.body)
break
case CAPTURE_HEADERS:
content = Unpooled.copiedBuffer(endpoint.body, CharsetUtil.UTF_8)
response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status), content)
response.headers().set("X-Test-Response", request.headers().get("X-Test-Request"))
break
case EXCEPTION:
throw new Exception(endpoint.body)
default:
content = Unpooled.copiedBuffer(NOT_FOUND.body, CharsetUtil.UTF_8)
response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(NOT_FOUND.status), content)
break
}
response.headers().set(CONTENT_TYPE, "text/plain")
if (content) {
response.headers().set(CONTENT_LENGTH, content.readableBytes())
}
return response
}
}
}
@Override
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ByteBuf content = Unpooled.copiedBuffer(cause.message, CharsetUtil.UTF_8)
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR, content)
response.headers().set(CONTENT_TYPE, "text/plain")
response.headers().set(CONTENT_LENGTH, content.readableBytes())
ctx.write(response)
}
@Override
void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush()
}
})
}
] as ChannelInitializer).channel(NioServerSocketChannel)
bootstrap.bind(port).sync()
return eventLoopGroup
}
@Override
void stopServer(EventLoopGroup server) {
server?.shutdownGracefully()
}
@Override
Set<AttributeKey<?>> httpAttributes(ServerEndpoint endpoint) {
def attributes = super.httpAttributes(endpoint)
attributes.remove(SemanticAttributes.HTTP_ROUTE)
attributes
}
}

View File

@ -0,0 +1,82 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.netty.v4_0;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.GenericProgressiveFutureListener;
import io.netty.util.concurrent.ProgressiveFuture;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
class ChannelFutureTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@SuppressWarnings("unchecked")
@Test
void shouldCleanUpWrappedListeners() throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(new EmptyChannelHandler());
AtomicInteger counter = new AtomicInteger();
GenericFutureListener<Future<Void>> listener1 = newListener(counter);
channel.closeFuture().addListener(listener1);
channel.closeFuture().removeListener(listener1);
GenericFutureListener<Future<Void>> listener2 = newListener(counter);
GenericProgressiveFutureListener<ProgressiveFuture<Void>> listener3 =
newProgressiveListener(counter);
channel.closeFuture().addListener(listener2);
channel.closeFuture().addListener(listener3);
channel.closeFuture().removeListeners(listener2, listener3);
channel.close().await(5, TimeUnit.SECONDS);
assertEquals(0, counter.get());
}
private static GenericFutureListener<Future<Void>> newListener(AtomicInteger counter) {
return future -> counter.incrementAndGet();
}
private static GenericProgressiveFutureListener<ProgressiveFuture<Void>> newProgressiveListener(
AtomicInteger counter) {
return new GenericProgressiveFutureListener<ProgressiveFuture<Void>>() {
@Override
public void operationComplete(@NotNull ProgressiveFuture<Void> future) throws Exception {
counter.incrementAndGet();
}
@Override
public void operationProgressed(ProgressiveFuture<Void> future, long progress, long total)
throws Exception {
counter.incrementAndGet();
}
};
}
private static class EmptyChannelHandler implements ChannelHandler {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {}
}
}

View File

@ -0,0 +1,228 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.netty.v4_0;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
class ChannelPipelineTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
private static final Class<?> defaultChannelPipelineClass = getDefaultChannelPipelineClass();
@Nullable
private static Class<?> getDefaultChannelPipelineClass() {
try {
return Class.forName("io.netty.channel.DefaultChannelPipeline");
} catch (Exception e) {
return null;
}
}
@NotNull
private static Constructor<?> getConstructor() throws NoSuchMethodException {
assertNotNull(defaultChannelPipelineClass);
Constructor<?> constructor = defaultChannelPipelineClass.getDeclaredConstructor(Channel.class);
constructor.setAccessible(true);
return constructor;
}
// regression test for
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1373
// and https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4040
@ParameterizedTest
@CsvSource({"by instance", "by class", "by name", "first"})
void testRemoveOurHandler(String testName) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(new NoopChannelHandler());
ChannelPipeline channelPipeline = (ChannelPipeline) getConstructor().newInstance(channel);
HttpClientCodec handler = new HttpClientCodec();
// no handlers initially except the default one
assertTrue(
channelPipeline.first() == null
|| "io.netty.channel.DefaultChannelPipeline$TailHandler"
.equals(channelPipeline.first().getClass().getName()));
assertNull(channelPipeline.last());
assertEquals(0, channelPipeline.toMap().size());
// add handler
channelPipeline.addLast("http", handler);
assertEquals(handler, channelPipeline.first());
// our handler was also added
assertEquals("HttpClientTracingHandler", channelPipeline.last().getClass().getSimpleName());
assertEquals(1, channelPipeline.toMap().size());
if ("by instance".equals(testName)) {
channelPipeline.remove(handler);
} else if ("by class".equals(testName)) {
channelPipeline.remove(handler.getClass());
} else if ("by name".equals(testName)) {
channelPipeline.remove("http");
} else if ("first".equals(testName)) {
channelPipeline.removeFirst();
}
// removing handler also removes our handler
assertTrue(
channelPipeline.first() == null
|| "io.netty.channel.DefaultChannelPipeline$TailHandler"
.equals(channelPipeline.first().getClass().getName()));
assertNull(channelPipeline.last());
assertEquals(0, channelPipeline.toMap().size());
}
// regression test for
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4040
@ParameterizedTest
@CsvSource({"by instance", "by class", "by name"})
void shouldReplaceHandler(String desc) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(new NoopChannelHandler());
ChannelPipeline channelPipeline = (ChannelPipeline) getConstructor().newInstance(channel);
HttpClientCodec httpHandler = new HttpClientCodec();
// no handlers initially except the default one
assertTrue(
channelPipeline.first() == null
|| "io.netty.channel.DefaultChannelPipeline$TailHandler"
.equals(channelPipeline.first().getClass().getName()));
assertNull(channelPipeline.last());
assertEquals(0, channelPipeline.toMap().size());
NoopChannelHandler noopHandler = new NoopChannelHandler();
channelPipeline.addFirst("test", noopHandler);
// only the noop handler
assertEquals(noopHandler, channelPipeline.first());
assertEquals(1, channelPipeline.toMap().size());
if ("by instance".equals(desc)) {
channelPipeline.replace(noopHandler, "http", httpHandler);
} else if ("by class".equals(desc)) {
channelPipeline.replace(noopHandler.getClass(), "http", httpHandler);
} else if ("by name".equals(desc)) {
channelPipeline.replace("test", "http", httpHandler);
}
// noop handler was removed; http and instrumentation handlers were added
assertEquals(httpHandler, channelPipeline.first());
assertEquals("HttpClientTracingHandler", channelPipeline.last().getClass().getSimpleName());
assertEquals(1, channelPipeline.toMap().size());
NoopChannelHandler anotherNoopHandler = new NoopChannelHandler();
channelPipeline.replace("http", "test", anotherNoopHandler);
// http and instrumentation handlers were removed; noop handler was added
assertEquals(anotherNoopHandler, channelPipeline.first());
}
// regression test for
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4056
@Test
void shouldAddAfterAndRemoveLastHandler() throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(new NoopChannelHandler());
ChannelPipeline channelPipeline = (ChannelPipeline) getConstructor().newInstance(channel);
HttpClientCodec httpHandler = new HttpClientCodec();
// no handlers initially
assertTrue(
channelPipeline.first() == null
|| "io.netty.channel.DefaultChannelPipeline$TailHandler"
.equals(channelPipeline.first().getClass().getName()));
assertNull(channelPipeline.last());
assertEquals(0, channelPipeline.toMap().size());
// Add http and instrumentation handlers
channelPipeline.addLast("http", httpHandler);
assertEquals(channelPipeline.first(), httpHandler);
assertEquals("HttpClientTracingHandler", channelPipeline.last().getClass().getSimpleName());
assertEquals(1, channelPipeline.toMap().size());
NoopChannelHandler noopHandler = new NoopChannelHandler();
channelPipeline.addAfter("http", "noop", noopHandler);
// instrumentation handler is between http and noop handlers
assertEquals(channelPipeline.first(), httpHandler);
assertEquals(channelPipeline.last(), noopHandler);
assertEquals(2, channelPipeline.toMap().size());
// http and instrumentation handlers will remain when last handler is removed
{
ChannelHandler removed = channelPipeline.removeLast();
assertEquals(noopHandler, removed);
assertEquals(channelPipeline.first(), httpHandler);
assertEquals("HttpClientTracingHandler", channelPipeline.last().getClass().getSimpleName());
assertEquals(1, channelPipeline.toMap().size());
}
// there is no handler in pipeline when last handler is removed
{
ChannelHandler removed = channelPipeline.removeLast();
assertEquals(httpHandler, removed);
assertEquals(0, channelPipeline.toMap().size());
}
}
// regression test for
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/10377
@Test
void ourHandlerNotInHandlerMap() throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(new NoopChannelHandler());
ChannelPipeline channelPipeline = (ChannelPipeline) getConstructor().newInstance(channel);
HttpClientCodec httpHandler = new HttpClientCodec();
// no handlers initially
assertTrue(
channelPipeline.first() == null
|| "io.netty.channel.DefaultChannelPipeline$TailHandler"
.equals(channelPipeline.first().getClass().getName()));
assertNull(channelPipeline.last());
assertEquals(0, channelPipeline.toMap().size());
// add handler
channelPipeline.addLast("http", httpHandler);
assertEquals(httpHandler, channelPipeline.first());
// our handler was also added
assertEquals("HttpClientTracingHandler", channelPipeline.last().getClass().getSimpleName());
// our handler is not in handlers map
assertEquals(1, channelPipeline.toMap().size());
// our handler is not in handlers iterator
List<ChannelHandler> list = new ArrayList<>();
channelPipeline
.iterator()
.forEachRemaining(
entry -> {
list.add(entry.getValue());
});
assertEquals(1, list.size());
}
private static class NoopChannelHandler extends ChannelHandlerAdapter {}
}

View File

@ -3,9 +3,10 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.netty.v4_0.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpObject;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
@ -14,26 +15,25 @@ Bridges from async Netty world to the sync world of our http client tests.
When request initiated by a test gets a response, calls a given callback and completes given
future with response's status code.
*/
public class ClientHandler extends SimpleChannelInboundHandler<HttpObject> {
private final CompletableFuture<Integer> responseCode;
class ClientHandler extends ChannelInboundHandlerAdapter {
private final CompletableFuture<Integer> result;
public ClientHandler(CompletableFuture<Integer> responseCode) {
this.responseCode = responseCode;
public ClientHandler(CompletableFuture<Integer> result) {
this.result = result;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpResponse) {
ctx.pipeline().remove(this);
HttpResponse response = (HttpResponse) msg;
responseCode.complete(response.getStatus().code());
result.complete(response.getStatus().code());
}
ctx.fireChannelRead(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
responseCode.completeExceptionally(cause);
result.completeExceptionally(cause);
ctx.close();
}
}

View File

@ -0,0 +1,249 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.netty.v4_0.client;
import static io.opentelemetry.api.trace.SpanKind.CLIENT;
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
import static io.opentelemetry.api.trace.SpanKind.SERVER;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static org.assertj.core.api.Assertions.catchThrowable;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.ssl.SslHandler;
import io.opentelemetry.instrumentation.api.semconv.network.internal.NetworkAttributes;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.SemanticAttributes;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLContext;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
class Netty40ClientSslTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
private static HttpClientTestServer server;
private static EventLoopGroup eventLoopGroup;
@BeforeAll
static void setup() {
server = new HttpClientTestServer(testing.getOpenTelemetry());
server.start();
eventLoopGroup = new NioEventLoopGroup();
}
@AfterAll
static void cleanup() throws InterruptedException, ExecutionException, TimeoutException {
eventLoopGroup.shutdownGracefully();
server.stop().get(10, TimeUnit.SECONDS);
}
@Test
public void shouldFailSslHandshake() {
Bootstrap bootstrap = createBootstrap(eventLoopGroup, Collections.singletonList("SSLv3"));
URI uri = server.resolveHttpsAddress("/success");
DefaultFullHttpRequest request =
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getPath(), Unpooled.EMPTY_BUFFER);
HttpHeaders.setHost(request, uri.getHost() + ":" + uri.getPort());
Throwable thrownException = getThrowable(bootstrap, uri, request);
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("parent")
.hasStatus(StatusData.error())
.hasException(thrownException),
span -> {
span.hasName("CONNECT").hasKind(INTERNAL).hasParent(trace.getSpan(0));
span.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NETWORK_TRANSPORT, "tcp"),
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
equalTo(SemanticAttributes.SERVER_ADDRESS, uri.getHost()),
equalTo(SemanticAttributes.SERVER_PORT, uri.getPort()),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, uri.getPort()),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"));
},
span -> {
span.hasName("SSL handshake")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(0))
.hasStatus(StatusData.error());
span.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NETWORK_TRANSPORT, "tcp"),
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, uri.getPort()),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"));
}));
}
private static Throwable getThrowable(
Bootstrap bootstrap, URI uri, DefaultFullHttpRequest request) {
AtomicReference<Channel> channel = new AtomicReference<>();
Throwable thrown =
catchThrowable(
() ->
testing.runWithSpan(
"parent",
() -> {
try {
channel.set(
bootstrap.connect(uri.getHost(), uri.getPort()).sync().channel());
CompletableFuture<Integer> result = new CompletableFuture<>();
channel.get().pipeline().addLast(new ClientHandler(result));
channel.get().writeAndFlush(request).get(10, TimeUnit.SECONDS);
result.get(10, TimeUnit.SECONDS);
} finally {
if (channel.get() != null) {
channel.get().close();
}
}
}));
// Then
Throwable thrownException;
if (thrown instanceof ExecutionException) {
thrownException = thrown.getCause();
} else {
thrownException = thrown;
}
return thrownException;
}
@SuppressWarnings("InterruptedExceptionSwallowed")
@Test
public void shouldSuccessfullyEstablishSslHandshake() throws Exception {
// given
Bootstrap bootstrap =
createBootstrap(eventLoopGroup, Arrays.asList("TLSv1", "TLSv1.1", "TLSv1.2", "TLSv1.3"));
URI uri = server.resolveHttpsAddress("/success");
DefaultFullHttpRequest request =
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getPath(), Unpooled.EMPTY_BUFFER);
HttpHeaders.setHost(request, uri.getHost() + ":" + uri.getPort());
AtomicReference<Channel> channel = new AtomicReference<>();
// when
testing.runWithSpan(
"parent",
() -> {
try {
channel.set(bootstrap.connect(uri.getHost(), uri.getPort()).sync().channel());
CompletableFuture<Integer> result = new CompletableFuture<>();
channel.get().pipeline().addLast(new ClientHandler(result));
channel.get().writeAndFlush(request).get(10, TimeUnit.SECONDS);
result.get(10, TimeUnit.SECONDS);
} finally {
if (channel.get() != null) {
channel.get().close();
}
}
});
// then
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent"),
span -> {
span.hasName("CONNECT").hasKind(INTERNAL).hasParent(trace.getSpan(0));
span.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NETWORK_TRANSPORT, "tcp"),
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
equalTo(SemanticAttributes.SERVER_ADDRESS, uri.getHost()),
equalTo(SemanticAttributes.SERVER_PORT, uri.getPort()),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, uri.getPort()),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"));
},
span -> {
span.hasName("SSL handshake").hasKind(INTERNAL).hasParent(trace.getSpan(0));
span.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NETWORK_TRANSPORT, "tcp"),
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, uri.getPort()),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"));
},
span -> {
span.hasName("GET").hasKind(CLIENT).hasParent(trace.getSpan(0));
},
span -> {
span.hasName("test-http-server").hasKind(SERVER).hasParent(trace.getSpan(3));
}));
if (channel.get() != null) {
channel.get().close().sync();
}
}
// list of default ciphers copied from netty's JdkSslContext
private static final String[] SUPPORTED_CIPHERS =
new String[] {
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA",
"TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA",
"TLS_RSA_WITH_AES_128_GCM_SHA256",
"TLS_RSA_WITH_AES_128_CBC_SHA",
"TLS_RSA_WITH_AES_256_CBC_SHA",
"SSL_RSA_WITH_3DES_EDE_CBC_SHA"
};
private static Bootstrap createBootstrap(
EventLoopGroup eventLoopGroup, List<String> enabledProtocols) {
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(@NotNull SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, null, null);
javax.net.ssl.SSLEngine sslEngine = sslContext.createSSLEngine();
sslEngine.setUseClientMode(true);
sslEngine.setEnabledProtocols(enabledProtocols.toArray(new String[0]));
sslEngine.setEnabledCipherSuites(SUPPORTED_CIPHERS);
pipeline.addLast(new SslHandler(sslEngine));
pipeline.addLast(new HttpClientCodec());
}
});
return bootstrap;
}
}

View File

@ -0,0 +1,183 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.netty.v4_0.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
import io.opentelemetry.semconv.SemanticAttributes;
import java.net.URI;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.extension.RegisterExtension;
class Netty40ClientTest extends AbstractHttpClientTest<DefaultFullHttpRequest> {
@RegisterExtension
static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent();
private final EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
private final Bootstrap bootstrap = buildBootstrap(false);
private final Bootstrap readTimeoutBootstrap = buildBootstrap(true);
void cleanupSpec() {
if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully();
}
}
Bootstrap buildBootstrap(boolean readTimeout) {
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(@NotNull SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
if (readTimeout) {
pipeline.addLast(new ReadTimeoutHandler(2000, TimeUnit.MILLISECONDS));
}
pipeline.addLast(new HttpClientCodec());
}
});
return bootstrap;
}
private Bootstrap getBootstrap(URI uri) {
if ("/read-timeout".equals(uri.getPath())) {
return readTimeoutBootstrap;
}
return bootstrap;
}
@Override
public DefaultFullHttpRequest buildRequest(String method, URI uri, Map<String, String> headers) {
String target = uri.getPath();
if (uri.getQuery() != null) {
target += "?" + uri.getQuery();
}
DefaultFullHttpRequest request =
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), target, Unpooled.EMPTY_BUFFER);
HttpHeaders.setHost(request, uri.getHost() + ":" + uri.getPort());
request.headers().set("user-agent", "Netty");
headers.forEach((k, v) -> request.headers().set(k, v));
return request;
}
@Override
public int sendRequest(
DefaultFullHttpRequest request, String method, URI uri, Map<String, String> headers)
throws Exception {
Channel channel = getBootstrap(uri).connect(uri.getHost(), getPort(uri)).sync().channel();
CompletableFuture<Integer> result = new CompletableFuture<>();
channel.pipeline().addLast(new ClientHandler(result));
channel.writeAndFlush(request).get();
return result.get(20, TimeUnit.SECONDS);
}
@Override
public void sendRequestWithCallback(
DefaultFullHttpRequest request,
String method,
URI uri,
Map<String, String> headers,
HttpClientResult httpClientResult)
throws Exception {
Channel ch;
try {
ch = getBootstrap(uri).connect(uri.getHost(), getPort(uri)).sync().channel();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (Throwable th) {
httpClientResult.complete(th);
return;
}
CompletableFuture<Integer> result = new CompletableFuture<>();
result.whenComplete((status, throwable) -> httpClientResult.complete(() -> status, throwable));
ch.pipeline().addLast(new ClientHandler(result));
ch.writeAndFlush(request);
}
@Override
protected void configure(HttpClientTestOptions.Builder optionsBuilder) {
optionsBuilder.disableTestRedirects();
optionsBuilder.disableTestHttps();
optionsBuilder.disableTestReadTimeout();
optionsBuilder.setExpectedClientSpanNameMapper(Netty40ClientTest::expectedClientSpanName);
optionsBuilder.setHttpAttributes(Netty40ClientTest::httpAttributes);
}
private static int getPort(URI uri) {
int port = uri.getPort();
if (port == -1) {
switch (uri.getScheme()) {
case "http":
return 80;
case "https":
return 443;
default:
throw new IllegalArgumentException("Unknown scheme: " + uri.getScheme());
}
}
return port;
}
private static String expectedClientSpanName(URI uri, String method) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "http://192.0.2.1/": // non routable address
return "CONNECT";
default:
return HttpClientTestOptions.DEFAULT_EXPECTED_CLIENT_SPAN_NAME_MAPPER.apply(uri, method);
}
}
@SuppressWarnings("MissingDefault")
private static Set<AttributeKey<?>> httpAttributes(URI uri) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "http://192.0.2.1/": // non routable address
return Collections.emptySet();
}
Set<AttributeKey<?>> attributes = new HashSet<>(HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES);
attributes.remove(SemanticAttributes.SERVER_ADDRESS);
attributes.remove(SemanticAttributes.SERVER_PORT);
return attributes;
}
}

View File

@ -0,0 +1,176 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.netty.v4_0.client;
import static io.opentelemetry.api.trace.SpanKind.CLIENT;
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
import static io.opentelemetry.api.trace.SpanKind.SERVER;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.opentelemetry.instrumentation.api.semconv.network.internal.NetworkAttributes;
import io.opentelemetry.instrumentation.test.utils.PortUtils;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer;
import io.opentelemetry.semconv.SemanticAttributes;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
class Netty40ConnectionSpanTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
private static HttpClientTestServer server;
private static final EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
private static final Bootstrap bootstrap = buildBootstrap();
@BeforeAll
static void setupSpec() {
server = new HttpClientTestServer(testing.getOpenTelemetry());
server.start();
}
@AfterAll
static void cleanupSpec() throws InterruptedException, ExecutionException, TimeoutException {
eventLoopGroup.shutdownGracefully();
server.stop().get(10, TimeUnit.SECONDS);
}
static Bootstrap buildBootstrap() {
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(@NotNull SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new HttpClientCodec());
}
});
return bootstrap;
}
@Test
void successfulRequest() throws Exception {
// when
URI uri = URI.create("http://localhost:" + server.httpPort() + "/success");
DefaultFullHttpRequest request = buildRequest("GET", uri, new HashMap<>());
int responseCode = testing.runWithSpan("parent", () -> sendRequest(request, uri));
// then
assertThat(responseCode).isEqualTo(200);
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(INTERNAL).hasNoParent(),
span -> {
span.hasName("CONNECT").hasKind(INTERNAL).hasParent(trace.getSpan(0));
span.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NETWORK_TRANSPORT, "tcp"),
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
equalTo(SemanticAttributes.SERVER_ADDRESS, uri.getHost()),
equalTo(SemanticAttributes.SERVER_PORT, uri.getPort()),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, uri.getPort()),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"));
},
span -> span.hasName("GET").hasKind(CLIENT).hasParent(trace.getSpan(0)),
span ->
span.hasName("test-http-server").hasKind(SERVER).hasParent(trace.getSpan(2))));
}
@Test
void failedRequest() throws Exception {
// when
URI uri = URI.create("http://localhost:" + PortUtils.UNUSABLE_PORT);
DefaultFullHttpRequest request = buildRequest("GET", uri, new HashMap<>());
Throwable thrown =
catchThrowable(() -> testing.runWithSpan("parent", () -> sendRequest(request, uri)));
// then
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(INTERNAL).hasNoParent().hasException(thrown),
span -> {
span.hasName("CONNECT").hasKind(INTERNAL).hasParent(trace.getSpan(0));
span.hasAttributesSatisfying(
equalTo(SemanticAttributes.NETWORK_TRANSPORT, "tcp"));
satisfies(
SemanticAttributes.NETWORK_TYPE,
val ->
val.satisfiesAnyOf(
v -> assertThat(val).isNull(), v -> assertThat(v).isEqualTo("ipv4")));
span.hasAttributesSatisfying(
equalTo(SemanticAttributes.SERVER_ADDRESS, uri.getHost()),
equalTo(SemanticAttributes.SERVER_PORT, uri.getPort()));
satisfies(
NetworkAttributes.NETWORK_PEER_PORT,
val ->
val.satisfiesAnyOf(
v -> assertThat(val).isNull(),
v -> assertThat(v).isEqualTo(uri.getPort())));
satisfies(
NetworkAttributes.NETWORK_PEER_ADDRESS,
val ->
val.satisfiesAnyOf(
v -> assertThat(val).isNull(),
v -> assertThat(v).isEqualTo("127.0.0.1")));
}));
}
private static DefaultFullHttpRequest buildRequest(
String method, URI uri, Map<String, String> headers) {
DefaultFullHttpRequest request =
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri.getPath(), Unpooled.EMPTY_BUFFER);
HttpHeaders.setHost(request, uri.getHost() + ":" + uri.getPort());
headers.forEach((k, v) -> request.headers().set(k, v));
return request;
}
private static int sendRequest(DefaultFullHttpRequest request, URI uri)
throws InterruptedException, ExecutionException, TimeoutException {
Channel channel = bootstrap.connect(uri.getHost(), uri.getPort()).sync().channel();
CompletableFuture<Integer> result = new CompletableFuture<Integer>();
channel.pipeline().addLast(new ClientHandler(result));
channel.writeAndFlush(request).get();
return result.get(20, TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,222 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.netty.v4_0.server;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.CAPTURE_HEADERS;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.ERROR;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.INDEXED_CHILD;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.NOT_FOUND;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.QUERY_PARAM;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.REDIRECT;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTest;
import io.opentelemetry.instrumentation.testing.junit.http.HttpServerInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.HttpServerTestOptions;
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint;
import io.opentelemetry.semconv.SemanticAttributes;
import java.net.URI;
import java.util.HashSet;
import java.util.Set;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.extension.RegisterExtension;
class Netty40ServerTest extends AbstractHttpServerTest<EventLoopGroup> {
@RegisterExtension
static final InstrumentationExtension testing = HttpServerInstrumentationExtension.forAgent();
private static final LoggingHandler LOGGING_HANDLER =
new LoggingHandler(Netty40ServerTest.class, LogLevel.DEBUG);
@Override
protected EventLoopGroup setupServer() throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap =
new ServerBootstrap()
.group(eventLoopGroup)
.handler(LOGGING_HANDLER)
.childHandler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(@NotNull SocketChannel socketChannel)
throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addFirst("logger", LOGGING_HANDLER);
pipeline.addLast(new HttpRequestDecoder());
pipeline.addLast(new HttpResponseEncoder());
pipeline.addLast(
new SimpleChannelInboundHandler<Object>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg)
throws Exception {
if (!(msg instanceof HttpRequest)) {
return;
}
HttpRequest request = (HttpRequest) msg;
URI uri = URI.create(request.getUri());
ServerEndpoint endpoint = ServerEndpoint.forPath(uri.getPath());
ctx.write(
controller(
endpoint,
() -> {
ByteBuf content;
FullHttpResponse response;
if (endpoint.equals(SUCCESS) || endpoint.equals(ERROR)) {
content =
Unpooled.copiedBuffer(
endpoint.getBody(), CharsetUtil.UTF_8);
response =
new DefaultFullHttpResponse(
HTTP_1_1,
HttpResponseStatus.valueOf(endpoint.getStatus()),
content);
} else if (endpoint.equals(INDEXED_CHILD)) {
content = Unpooled.EMPTY_BUFFER;
endpoint.collectSpanAttributes(
it ->
new QueryStringDecoder(uri)
.parameters()
.get(it)
.get(0));
response =
new DefaultFullHttpResponse(
HTTP_1_1,
HttpResponseStatus.valueOf(endpoint.getStatus()),
content);
} else if (endpoint.equals(QUERY_PARAM)) {
content =
Unpooled.copiedBuffer(
uri.getQuery(), CharsetUtil.UTF_8);
response =
new DefaultFullHttpResponse(
HTTP_1_1,
HttpResponseStatus.valueOf(endpoint.getStatus()),
content);
} else if (endpoint.equals(REDIRECT)) {
content = Unpooled.EMPTY_BUFFER;
response =
new DefaultFullHttpResponse(
HTTP_1_1,
HttpResponseStatus.valueOf(endpoint.getStatus()),
content);
response
.headers()
.set(HttpHeaders.Names.LOCATION, endpoint.getBody());
} else if (endpoint.equals(CAPTURE_HEADERS)) {
content =
Unpooled.copiedBuffer(
endpoint.getBody(), CharsetUtil.UTF_8);
response =
new DefaultFullHttpResponse(
HTTP_1_1,
HttpResponseStatus.valueOf(endpoint.getStatus()),
content);
response
.headers()
.set(
"X-Test-Response",
request.headers().get("X-Test-Request"));
} else if (endpoint.equals(EXCEPTION)) {
throw new IllegalArgumentException(endpoint.getBody());
} else {
content =
Unpooled.copiedBuffer(
NOT_FOUND.getBody(), CharsetUtil.UTF_8);
response =
new DefaultFullHttpResponse(
HTTP_1_1,
HttpResponseStatus.valueOf(NOT_FOUND.getStatus()),
content);
}
response.headers().set(CONTENT_TYPE, "text/plain");
response
.headers()
.set(CONTENT_LENGTH, content.readableBytes());
return response;
}));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable ex)
throws Exception {
ByteBuf content =
Unpooled.copiedBuffer(ex.getMessage(), CharsetUtil.UTF_8);
FullHttpResponse response =
new DefaultFullHttpResponse(
HTTP_1_1, INTERNAL_SERVER_ERROR, content);
response.headers().set(CONTENT_TYPE, "text/plain");
response.headers().set(CONTENT_LENGTH, content.readableBytes());
ctx.write(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
ctx.flush();
}
});
}
})
.channel(NioServerSocketChannel.class);
serverBootstrap.bind(port).sync();
return eventLoopGroup;
}
@Override
protected void stopServer(EventLoopGroup server) {
if (server != null) {
server.shutdownGracefully();
}
}
@Override
protected void configure(HttpServerTestOptions options) {
options.setHttpAttributes(
serverEndpoint -> {
Set<AttributeKey<?>> attributes =
new HashSet<>(HttpServerTestOptions.DEFAULT_HTTP_ATTRIBUTES);
attributes.remove(SemanticAttributes.HTTP_ROUTE);
return attributes;
});
options.setExpectedException(new IllegalArgumentException(EXCEPTION.getBody()));
options.setHasResponseCustomizer(serverEndpoint -> true);
}
}