From 892fb8a38e4e3e8c7817c5836fe3902e9188fd3d Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Fri, 26 Aug 2022 10:37:33 +0200 Subject: [PATCH] Refactor Reactor-Netty 1.0 tests to Java (#6497) --- .../reactornetty/v1_0/DecoratorFunctions.java | 5 +- .../groovy/ReactorNettyWithSpanTest.groovy | 81 ----- .../AbstractReactorNettyHttpClientTest.groovy | 278 ------------------ .../v1_0/ReactorNettyClientSslTest.groovy | 208 ------------- .../ReactorNettyConnectionSpanTest.groovy | 179 ----------- .../v1_0/ReactorNettyHttpClientTest.groovy | 52 ---- ...ReactorNettyHttpClientUsingFromTest.groovy | 23 -- .../AbstractReactorNettyHttpClientTest.java | 268 +++++++++++++++++ .../v1_0/CustomNameResolverGroup.java | 56 ++++ .../v1_0/ReactorNettyClientSslTest.java | 228 ++++++++++++++ .../v1_0/ReactorNettyConnectionSpanTest.java | 177 +++++++++++ .../v1_0/ReactorNettyHttpClientTest.java | 51 ++++ .../ReactorNettyHttpClientUsingFromTest.java | 24 ++ .../v1_0/ReactorNettyWithSpanTest.java | 79 +++++ .../reactornetty/v1_0}/TracedWithSpan.java | 2 +- 15 files changed, 887 insertions(+), 824 deletions(-) delete mode 100644 instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/ReactorNettyWithSpanTest.groovy delete mode 100644 instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.groovy delete mode 100644 instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyClientSslTest.groovy delete mode 100644 instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.groovy delete mode 100644 instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientTest.groovy delete mode 100644 instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientUsingFromTest.groovy create mode 100644 instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.java create mode 100644 instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/CustomNameResolverGroup.java create mode 100644 instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyClientSslTest.java create mode 100644 instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.java create mode 100644 instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientTest.java create mode 100644 instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientUsingFromTest.java create mode 100644 instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyWithSpanTest.java rename instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/{test/reactor/netty => javaagent/instrumentation/reactornetty/v1_0}/TracedWithSpan.java (80%) diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/DecoratorFunctions.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/DecoratorFunctions.java index ab6fbf554a..17f3ec75e4 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/DecoratorFunctions.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/DecoratorFunctions.java @@ -15,9 +15,10 @@ import reactor.util.context.ContextView; public final class DecoratorFunctions { - // ignore our own callbacks - or already decorated functions + // ignore already decorated functions public static boolean shouldDecorate(Class callbackClass) { - return !callbackClass.getName().startsWith("io.opentelemetry.javaagent"); + return callbackClass != OnMessageDecorator.class + && callbackClass != OnMessageErrorDecorator.class; } public static final class OnMessageDecorator diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/ReactorNettyWithSpanTest.groovy b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/ReactorNettyWithSpanTest.groovy deleted file mode 100644 index d9ebc777cd..0000000000 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/ReactorNettyWithSpanTest.groovy +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.AgentTestTrait -import io.opentelemetry.instrumentation.test.InstrumentationSpecification -import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer -import io.opentelemetry.test.reactor.netty.TracedWithSpan -import reactor.core.publisher.Mono -import reactor.netty.http.client.HttpClient -import reactor.test.StepVerifier -import spock.lang.Shared - -import java.time.Duration - -import static io.opentelemetry.api.trace.SpanKind.CLIENT -import static io.opentelemetry.api.trace.SpanKind.INTERNAL -import static io.opentelemetry.api.trace.SpanKind.SERVER - -class ReactorNettyWithSpanTest extends InstrumentationSpecification implements AgentTestTrait { - - @Shared - private HttpClientTestServer server - - def setupSpec() { - server = new HttpClientTestServer(openTelemetry) - server.start() - } - - def cleanupSpec() { - server.stop() - } - - def "test successful nested under WithSpan"() { - when: - def httpClient = HttpClient.create() - - def httpRequest = Mono.defer({ -> - httpClient.get().uri("http://localhost:${server.httpPort()}/success") - .responseSingle({ resp, content -> - // Make sure to consume content since that's when we close the span. - content.map { resp } - }) - .map({ r -> r.status().code() }) - }) - - def getResponse = new TracedWithSpan().mono( - // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4348 - // our HTTP server is synchronous, i.e. it returns Mono.just with response - // which is not supported by TracingSubscriber - it does not instrument scalar calls - // so we delay here to fake async http request and let Reactor context instrumentation work - Mono.delay(Duration.ofMillis(1)).then(httpRequest)) - - then: - StepVerifier.create(getResponse) - .expectNext(200) - .expectComplete() - .verify() - - assertTraces(1) { - trace(0, 3) { - span(0) { - name "TracedWithSpan.mono" - kind INTERNAL - hasNoParent() - } - span(1) { - name "HTTP GET" - kind CLIENT - childOf(span(0)) - } - span(2) { - name "test-http-server" - kind SERVER - childOf(span(1)) - } - } - } - } -} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.groovy b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.groovy deleted file mode 100644 index 93e4d8f9f9..0000000000 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.groovy +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0 - -import io.netty.resolver.AddressResolver -import io.netty.resolver.AddressResolverGroup -import io.netty.resolver.InetNameResolver -import io.netty.util.concurrent.EventExecutor -import io.netty.util.concurrent.Promise -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.trace.Span -import io.opentelemetry.api.trace.SpanKind -import io.opentelemetry.api.trace.StatusCode -import io.opentelemetry.instrumentation.test.AgentTestTrait -import io.opentelemetry.instrumentation.test.base.HttpClientTest -import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest -import io.opentelemetry.sdk.trace.data.SpanData -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import java.time.Duration -import reactor.netty.http.client.HttpClient - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicReference - -import static io.opentelemetry.instrumentation.test.utils.PortUtils.UNUSABLE_PORT - -abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest implements AgentTestTrait { - - @Override - boolean testRedirects() { - false - } - - @Override - boolean testReadTimeout() { - true - } - - @Override - String userAgent() { - return "ReactorNetty" - } - - @Override - HttpClient.ResponseReceiver buildRequest(String method, URI uri, Map headers) { - def client = createHttpClient() - .followRedirect(true) - .headers({ h -> headers.each { k, v -> h.add(k, v) } }) - .baseUrl(resolveAddress("").toString()) - if (uri.toString().contains("/read-timeout")) { - client = client.responseTimeout(Duration.ofMillis(READ_TIMEOUT_MS)) - } - return client."${method.toLowerCase()}"() - .uri(uri.toString()) - } - - @Override - int sendRequest(HttpClient.ResponseReceiver request, String method, URI uri, Map headers) { - return request.responseSingle { resp, content -> - // Make sure to consume content since that's when we close the span. - content.map { - resp - } - }.block().status().code() - } - - @Override - void sendRequestWithCallback(HttpClient.ResponseReceiver request, String method, URI uri, Map headers, AbstractHttpClientTest.RequestResult requestResult) { - request.responseSingle { resp, content -> - // Make sure to consume content since that's when we close the span. - content.map { resp } - }.subscribe({ - requestResult.complete(it.status().code()) - }, { throwable -> - requestResult.complete(throwable) - }) - } - - @Override - Throwable clientSpanError(URI uri, Throwable exception) { - if (exception.class.getName().endsWith("ReactiveException")) { - switch (uri.toString()) { - case "http://localhost:61/": // unopened port - case "https://192.0.2.1/": // non routable address - exception = exception.getCause() - } - } - return exception - } - - @Override - Set> httpAttributes(URI uri) { - switch (uri.toString()) { - case "http://localhost:61/": // unopened port - case "https://192.0.2.1/": // non routable address - return [] - } - def attributes = super.httpAttributes(uri) - if (uri.toString().contains("/read-timeout")) { - attributes.remove(SemanticAttributes.NET_PEER_NAME) - attributes.remove(SemanticAttributes.NET_PEER_PORT) - attributes.remove(SemanticAttributes.HTTP_FLAVOR) - } - attributes - } - - abstract HttpClient createHttpClient() - - AddressResolverGroup getAddressResolverGroup() { - return CustomNameResolverGroup.INSTANCE - } - - def "should expose context to http client callbacks"() { - given: - def onRequestSpan = new AtomicReference() - def afterRequestSpan = new AtomicReference() - def onResponseSpan = new AtomicReference() - def afterResponseSpan = new AtomicReference() - def latch = new CountDownLatch(1) - - def httpClient = createHttpClient() - .doOnRequest({ rq, con -> onRequestSpan.set(Span.current()) }) - .doAfterRequest({ rq, con -> afterRequestSpan.set(Span.current()) }) - .doOnResponse({ rs, con -> onResponseSpan.set(Span.current()) }) - .doAfterResponseSuccess({ rs, con -> - afterResponseSpan.set(Span.current()) - latch.countDown() - }) - - when: - runWithSpan("parent") { - httpClient.baseUrl(resolveAddress("").toString()) - .get() - .uri("/success") - .responseSingle { resp, content -> - // Make sure to consume content since that's when we close the span. - content.map { resp } - } - .block() - } - latch.await() - - then: - assertTraces(1) { - trace(0, 3) { - def parentSpan = span(0) - def nettyClientSpan = span(1) - - span(0) { - name "parent" - kind SpanKind.INTERNAL - hasNoParent() - } - clientSpan(it, 1, parentSpan, "GET", resolveAddress("/success")) - serverSpan(it, 2, nettyClientSpan) - - assertSameSpan(parentSpan, onRequestSpan) - assertSameSpan(nettyClientSpan, afterRequestSpan) - assertSameSpan(nettyClientSpan, onResponseSpan) - assertSameSpan(parentSpan, afterResponseSpan) - } - } - } - - def "should expose context to http request error callback"() { - given: - def onRequestErrorSpan = new AtomicReference() - - def httpClient = createHttpClient() - .doOnRequestError({ rq, err -> onRequestErrorSpan.set(Span.current()) }) - - when: - runWithSpan("parent") { - httpClient.get() - .uri("http://localhost:$UNUSABLE_PORT/") - .response() - .block() - } - - then: - def ex = thrown(Exception) - - assertTraces(1) { - trace(0, 2) { - def parentSpan = span(0) - - span(0) { - name "parent" - kind SpanKind.INTERNAL - hasNoParent() - status StatusCode.ERROR - errorEvent(ex.class, ex.message) - } - span(1) { - def actualException = ex.cause - kind SpanKind.CLIENT - childOf parentSpan - status StatusCode.ERROR - errorEvent(actualException.class, actualException.message) - } - - assertSameSpan(parentSpan, onRequestErrorSpan) - } - } - } - - def "should not leak connections"() { - given: - def uniqueChannelHashes = new HashSet<>() - def httpClient = createHttpClient() - .doOnConnect({ uniqueChannelHashes.add(it.channelHash()) }) - def uri = "http://localhost:${server.httpPort()}/success" - - def count = 100 - - when: - (1..count).forEach({ - runWithSpan("parent") { - def status = httpClient.get().uri(uri) - .responseSingle { resp, content -> - // Make sure to consume content since that's when we close the span. - content.map { resp.status().code() } - }.block() - assert status == 200 - } - }) - - then: - traces.size() == count - uniqueChannelHashes.size() == 1 - } - - static void assertSameSpan(SpanData expected, AtomicReference actual) { - def expectedSpanContext = expected.spanContext - def actualSpanContext = actual.get().spanContext - assert expectedSpanContext.traceId == actualSpanContext.traceId - assert expectedSpanContext.spanId == actualSpanContext.spanId - } - - // custom address resolver that returns at most one address for each host - // adapted from io.netty.resolver.DefaultAddressResolverGroup - static class CustomNameResolverGroup extends AddressResolverGroup { - public static final CustomNameResolverGroup INSTANCE = new CustomNameResolverGroup() - - private CustomNameResolverGroup() { - } - - protected AddressResolver newResolver(EventExecutor executor) throws Exception { - return (new CustomNameResolver(executor)).asAddressResolver() - } - } - - static class CustomNameResolver extends InetNameResolver { - CustomNameResolver(EventExecutor executor) { - super(executor) - } - - protected void doResolve(String inetHost, Promise promise) throws Exception { - try { - promise.setSuccess(InetAddress.getByName(inetHost)) - } catch (UnknownHostException exception) { - promise.setFailure(exception) - } - } - - protected void doResolveAll(String inetHost, Promise> promise) throws Exception { - try { - // default implementation calls InetAddress.getAllByName - promise.setSuccess(Collections.singletonList(InetAddress.getByName(inetHost))) - } catch (UnknownHostException exception) { - promise.setFailure(exception) - } - } - } -} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyClientSslTest.groovy b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyClientSslTest.groovy deleted file mode 100644 index 33457e99c6..0000000000 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyClientSslTest.groovy +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0 - -import io.netty.handler.ssl.SslContextBuilder -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import reactor.netty.http.client.HttpClient -import reactor.netty.tcp.SslProvider -import spock.lang.Shared - -import javax.net.ssl.SSLHandshakeException - -import static io.opentelemetry.api.trace.SpanKind.CLIENT -import static io.opentelemetry.api.trace.SpanKind.INTERNAL -import static io.opentelemetry.api.trace.SpanKind.SERVER -import static io.opentelemetry.api.trace.StatusCode.ERROR -import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HttpFlavorValues.HTTP_1_1 -import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP - -class ReactorNettyClientSslTest extends AgentInstrumentationSpecification { - - @Shared - private HttpClientTestServer server - - def setupSpec() { - server = new HttpClientTestServer(openTelemetry) - server.start() - } - - def cleanupSpec() { - server.stop() - } - - def "should fail SSL handshake"() { - given: - def httpClient = createHttpClient(["SSLv3"]) - def uri = "https://localhost:${server.httpsPort()}/success" - - when: - def responseMono = httpClient.get().uri(uri) - .responseSingle { resp, content -> - // Make sure to consume content since that's when we close the span. - content.map { resp } - } - - runWithSpan("parent") { - responseMono.block() - } - - then: - Throwable thrownException = thrown() - - assertTraces(1) { - trace(0, 5) { - def list = Arrays.asList("RESOLVE", "CONNECT", "SSL handshake") - spans.subList(2, 5).sort(Comparator.comparing { item -> list.indexOf(item.name) }) - span(0) { - name "parent" - status ERROR - errorEvent(thrownException.class, thrownException.message) - } - span(1) { - name "HTTP GET" - kind CLIENT - childOf span(0) - status ERROR - // netty swallows the exception, it doesn't make any sense to hard-code the message - errorEventWithAnyMessage(SSLHandshakeException) - attributes { - "$SemanticAttributes.HTTP_METHOD" "GET" - "$SemanticAttributes.HTTP_URL" uri - } - } - span(2) { - name "RESOLVE" - kind INTERNAL - childOf span(1) - attributes { - "$SemanticAttributes.NET_TRANSPORT" IP_TCP - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" server.httpsPort() - } - } - span(3) { - name "CONNECT" - kind INTERNAL - childOf span(1) - attributes { - "$SemanticAttributes.NET_TRANSPORT" IP_TCP - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" server.httpsPort() - "net.sock.peer.addr" "127.0.0.1" - } - } - span(4) { - name "SSL handshake" - kind INTERNAL - childOf span(1) - status ERROR - // netty swallows the exception, it doesn't make any sense to hard-code the message - errorEventWithAnyMessage(SSLHandshakeException) - attributes { - "$SemanticAttributes.NET_TRANSPORT" IP_TCP - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" server.httpsPort() - "net.sock.peer.addr" "127.0.0.1" - } - } - } - } - } - - def "should successfully establish SSL handshake"() { - given: - def httpClient = createHttpClient() - def uri = "https://localhost:${server.httpsPort()}/success" - - when: - def responseMono = httpClient.get().uri(uri) - .responseSingle { resp, content -> - // Make sure to consume content since that's when we close the span. - content.map { resp } - } - - runWithSpan("parent") { - responseMono.block() - } - - then: - assertTraces(1) { - trace(0, 6) { - def list = Arrays.asList("RESOLVE", "CONNECT", "SSL handshake") - spans.subList(2, 5).sort(Comparator.comparing { item -> list.indexOf(item.name) }) - span(0) { - name "parent" - } - span(1) { - name "HTTP GET" - kind CLIENT - childOf span(0) - attributes { - "$SemanticAttributes.HTTP_METHOD" "GET" - "$SemanticAttributes.HTTP_URL" uri - "$SemanticAttributes.HTTP_FLAVOR" HTTP_1_1 - "$SemanticAttributes.HTTP_STATUS_CODE" 200 - "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" server.httpsPort() - "net.sock.peer.addr" "127.0.0.1" - } - } - span(2) { - name "RESOLVE" - kind INTERNAL - childOf span(1) - attributes { - "$SemanticAttributes.NET_TRANSPORT" IP_TCP - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" server.httpsPort() - } - } - span(3) { - name "CONNECT" - kind INTERNAL - childOf span(1) - attributes { - "$SemanticAttributes.NET_TRANSPORT" IP_TCP - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" server.httpsPort() - "net.sock.peer.addr" "127.0.0.1" - } - } - span(4) { - name "SSL handshake" - kind INTERNAL - childOf span(1) - attributes { - "$SemanticAttributes.NET_TRANSPORT" IP_TCP - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" server.httpsPort() - "net.sock.peer.addr" "127.0.0.1" - } - } - span(5) { - name "test-http-server" - kind SERVER - childOf span(1) - } - } - } - } - - private static HttpClient createHttpClient(List enabledProtocols = null) { - def sslContext = SslContextBuilder.forClient() - if (enabledProtocols != null) { - sslContext = sslContext.protocols(enabledProtocols) - } - def sslProvider = SslProvider.builder() - .sslContext(sslContext.build()) - .build() - HttpClient.create().secure(sslProvider) - } -} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.groovy b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.groovy deleted file mode 100644 index fbe6f45674..0000000000 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.groovy +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0 - -import io.opentelemetry.instrumentation.test.AgentTestTrait -import io.opentelemetry.instrumentation.test.InstrumentationSpecification -import io.opentelemetry.instrumentation.test.utils.PortUtils -import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import reactor.netty.http.client.HttpClient -import spock.lang.Shared - -import static io.opentelemetry.api.trace.SpanKind.CLIENT -import static io.opentelemetry.api.trace.SpanKind.INTERNAL -import static io.opentelemetry.api.trace.SpanKind.SERVER -import static io.opentelemetry.api.trace.StatusCode.ERROR -import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HttpFlavorValues.HTTP_1_1 -import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP - -class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implements AgentTestTrait { - - @Shared - private HttpClientTestServer server - - def setupSpec() { - server = new HttpClientTestServer(openTelemetry) - server.start() - } - - def cleanupSpec() { - server.stop() - } - - def "test successful request"() { - given: - def httpClient = HttpClient.create() - def uri = "http://localhost:${server.httpPort()}/success" - - when: - def responseCode = - runWithSpan("parent") { - httpClient.get().uri(uri) - .responseSingle { resp, content -> - // Make sure to consume content since that's when we close the span. - content.map { resp } - } - .block() - .status().code() - } - - then: - responseCode == 200 - assertTraces(1) { - trace(0, 5) { - def list = Arrays.asList("RESOLVE", "CONNECT") - spans.subList(2, 4).sort(Comparator.comparing { item -> list.indexOf(item.name) }) - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name "HTTP GET" - kind CLIENT - childOf span(0) - attributes { - "$SemanticAttributes.HTTP_METHOD" "GET" - "$SemanticAttributes.HTTP_URL" uri - "$SemanticAttributes.HTTP_FLAVOR" HTTP_1_1 - "$SemanticAttributes.HTTP_STATUS_CODE" 200 - "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" server.httpPort() - "net.sock.peer.addr" "127.0.0.1" - } - } - span(2) { - name "RESOLVE" - kind INTERNAL - childOf span(1) - attributes { - "$SemanticAttributes.NET_TRANSPORT" IP_TCP - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" server.httpPort() - } - } - span(3) { - name "CONNECT" - kind INTERNAL - childOf span(1) - attributes { - "$SemanticAttributes.NET_TRANSPORT" IP_TCP - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" server.httpPort() - "net.sock.peer.addr" "127.0.0.1" - } - } - span(4) { - name "test-http-server" - kind SERVER - childOf span(1) - } - } - } - } - - def "test failing request"() { - given: - def httpClient = HttpClient.create() - def uri = "http://localhost:${PortUtils.UNUSABLE_PORT}" - - when: - runWithSpan("parent") { - httpClient.get().uri(uri) - .responseSingle { resp, content -> - // Make sure to consume content since that's when we close the span. - content.map { resp } - } - .block() - .status().code() - } - - then: - def thrownException = thrown(Exception) - def connectException = thrownException.getCause() - - and: - assertTraces(1) { - trace(0, 4) { - def list = Arrays.asList("RESOLVE", "CONNECT") - spans.subList(2, 4).sort(Comparator.comparing { item -> list.indexOf(item.name) }) - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - status ERROR - errorEvent(thrownException.class, thrownException.message) - } - span(1) { - name "HTTP GET" - kind CLIENT - childOf span(0) - status ERROR - errorEvent(connectException.class, connectException.message) - attributes { - "$SemanticAttributes.HTTP_METHOD" "GET" - "$SemanticAttributes.HTTP_URL" uri - } - } - span(2) { - name "RESOLVE" - kind INTERNAL - childOf span(1) - attributes { - "$SemanticAttributes.NET_TRANSPORT" IP_TCP - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" PortUtils.UNUSABLE_PORT - } - } - span(3) { - name "CONNECT" - kind INTERNAL - childOf span(1) - status ERROR - errorEvent(connectException.class, connectException.message) - attributes { - "$SemanticAttributes.NET_TRANSPORT" IP_TCP - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" PortUtils.UNUSABLE_PORT - "net.sock.peer.addr" "127.0.0.1" - } - } - } - } - } -} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientTest.groovy b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientTest.groovy deleted file mode 100644 index c844c28783..0000000000 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientTest.groovy +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0 - -import io.netty.channel.ChannelOption -import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection -import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames -import reactor.netty.http.client.HttpClient - -import java.util.concurrent.ExecutionException -import java.util.concurrent.TimeoutException - -class ReactorNettyHttpClientTest extends AbstractReactorNettyHttpClientTest { - - HttpClient createHttpClient() { - return HttpClient.create() - .tcpConfiguration({ tcpClient -> - tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS) - }) - .resolver(getAddressResolverGroup()) - .headers({ headers -> headers.set(HttpHeaderNames.USER_AGENT, userAgent()) }) - } - - @Override - SingleConnection createSingleConnection(String host, int port) { - def httpClient = HttpClient - .newConnection() - .host(host) - .port(port) - .headers({ headers -> headers.set(HttpHeaderNames.USER_AGENT, userAgent()) }) - - return new SingleConnection() { - - @Override - int doRequest(String path, Map headers) throws ExecutionException, InterruptedException, TimeoutException { - return httpClient - .headers({ h -> headers.each { k, v -> h.add(k, v) } }) - .get() - .uri(path) - .responseSingle { resp, content -> - // Make sure to consume content since that's when we close the span. - content.map { resp } - } - .block() - .status().code() - } - } - } -} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientUsingFromTest.groovy b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientUsingFromTest.groovy deleted file mode 100644 index db1a9879be..0000000000 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientUsingFromTest.groovy +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0 - -import io.netty.channel.ChannelOption -import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames -import reactor.netty.http.client.HttpClient -import reactor.netty.tcp.TcpClient - -class ReactorNettyHttpClientUsingFromTest extends AbstractReactorNettyHttpClientTest { - - HttpClient createHttpClient() { - return HttpClient.from(TcpClient.create()) - .tcpConfiguration({ tcpClient -> - tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS) - }) - .resolver(getAddressResolverGroup()) - .headers({ headers -> headers.set(HttpHeaderNames.USER_AGENT, userAgent()) }) - } -} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.java new file mode 100644 index 0000000000..d305ce2e5c --- /dev/null +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.java @@ -0,0 +1,268 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import static io.opentelemetry.api.trace.SpanKind.CLIENT; +import static io.opentelemetry.api.trace.SpanKind.INTERNAL; +import static io.opentelemetry.api.trace.SpanKind.SERVER; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import static java.util.Collections.emptySet; +import static org.assertj.core.api.Assertions.catchThrowable; + +import io.netty.handler.codec.http.HttpMethod; +import io.netty.resolver.AddressResolverGroup; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.instrumentation.test.utils.PortUtils; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.netty.http.client.HttpClient; + +abstract class AbstractReactorNettyHttpClientTest + extends AbstractHttpClientTest> { + + static final String USER_AGENT = "ReactorNetty"; + + @RegisterExtension + static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent(); + + protected abstract HttpClient createHttpClient(); + + protected AddressResolverGroup getAddressResolverGroup() { + return CustomNameResolverGroup.INSTANCE; + } + + @Override + public HttpClient.ResponseReceiver buildRequest( + String method, URI uri, Map headers) { + HttpClient client = + createHttpClient() + .followRedirect(true) + .headers(h -> headers.forEach(h::add)) + .baseUrl(resolveAddress("").toString()); + if (uri.toString().contains("/read-timeout")) { + client = client.responseTimeout(READ_TIMEOUT); + } + return client.request(HttpMethod.valueOf(method)).uri(uri.toString()); + } + + @Override + public int sendRequest( + HttpClient.ResponseReceiver request, String method, URI uri, Map headers) { + return request + .responseSingle( + (resp, content) -> { + // Make sure to consume content since that's when we close the span. + return content.map(unused -> resp); + }) + .block() + .status() + .code(); + } + + @Override + public void sendRequestWithCallback( + HttpClient.ResponseReceiver request, + String method, + URI uri, + Map headers, + AbstractHttpClientTest.RequestResult requestResult) { + request + .responseSingle( + (resp, content) -> { + // Make sure to consume content since that's when we close the span. + return content.map(unused -> resp); + }) + .subscribe( + response -> requestResult.complete(response.status().code()), requestResult::complete); + } + + @Override + protected void configure(HttpClientTestOptions options) { + options.disableTestRedirects(); + options.enableTestReadTimeout(); + options.setUserAgent(USER_AGENT); + + options.setClientSpanErrorMapper( + (uri, exception) -> { + if (exception.getClass().getName().endsWith("ReactiveException")) { + // unopened port or non routable address + if ("http://localhost:61/".equals(uri.toString()) + || "https://192.0.2.1/".equals(uri.toString())) { + exception = exception.getCause(); + } + } + return exception; + }); + + options.setHttpAttributes( + uri -> { + // unopened port or non routable address + if ("http://localhost:61/".equals(uri.toString()) + || "https://192.0.2.1/".equals(uri.toString())) { + return emptySet(); + } + + Set> attributes = + new HashSet<>(HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES); + if (uri.toString().contains("/read-timeout")) { + attributes.remove(SemanticAttributes.NET_PEER_NAME); + attributes.remove(SemanticAttributes.NET_PEER_PORT); + attributes.remove(SemanticAttributes.HTTP_FLAVOR); + } + return attributes; + }); + } + + @Test + void shouldExposeContextToHttpClientCallbacks() throws InterruptedException { + AtomicReference onRequestSpan = new AtomicReference<>(); + AtomicReference afterRequestSpan = new AtomicReference<>(); + AtomicReference onResponseSpan = new AtomicReference<>(); + AtomicReference afterResponseSpan = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + + HttpClient httpClient = + createHttpClient() + .doOnRequest((rq, con) -> onRequestSpan.set(Span.current())) + .doAfterRequest((rq, con) -> afterRequestSpan.set(Span.current())) + .doOnResponse((rs, con) -> onResponseSpan.set(Span.current())) + .doAfterResponseSuccess( + (rs, con) -> { + afterResponseSpan.set(Span.current()); + latch.countDown(); + }); + + testing.runWithSpan( + "parent", + () -> + httpClient + .baseUrl(resolveAddress("").toString()) + .get() + .uri("/success") + .responseSingle( + (resp, content) -> { + // Make sure to consume content since that's when we close the span. + return content.map(unused -> resp); + }) + .block()); + + latch.await(10, TimeUnit.SECONDS); + + testing.waitAndAssertTraces( + trace -> { + SpanData parentSpan = trace.getSpan(0); + SpanData nettyClientSpan = trace.getSpan(1); + + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(INTERNAL).hasNoParent(), + span -> span.hasName("HTTP GET").hasKind(CLIENT).hasParent(parentSpan), + span -> span.hasName("test-http-server").hasKind(SERVER).hasParent(nettyClientSpan)); + + assertSameSpan(parentSpan, onRequestSpan); + assertSameSpan(nettyClientSpan, afterRequestSpan); + assertSameSpan(nettyClientSpan, onResponseSpan); + assertSameSpan(parentSpan, afterResponseSpan); + }); + } + + @Test + void shouldExposeContextToHttpRequestErrorCallback() { + AtomicReference onRequestErrorSpan = new AtomicReference<>(); + + HttpClient httpClient = + createHttpClient().doOnRequestError((rq, err) -> onRequestErrorSpan.set(Span.current())); + + Throwable thrown = + catchThrowable( + () -> + testing.runWithSpan( + "parent", + () -> + httpClient + .get() + .uri("http://localhost:" + PortUtils.UNUSABLE_PORT + "/") + .response() + .block())); + + testing.waitAndAssertTraces( + trace -> { + SpanData parentSpan = trace.getSpan(0); + + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("parent") + .hasKind(INTERNAL) + .hasNoParent() + .hasStatus(StatusData.error()) + .hasException(thrown), + span -> + span.hasKind(CLIENT) + .hasParent(parentSpan) + .hasStatus(StatusData.error()) + .hasException(thrown.getCause())); + + assertSameSpan(parentSpan, onRequestErrorSpan); + }); + } + + @Test + void shouldNotLeakConnections() { + HashSet uniqueChannelHashes = new HashSet<>(); + HttpClient httpClient = + createHttpClient().doOnConnect(config -> uniqueChannelHashes.add(config.channelHash())); + + int count = 100; + IntStream.range(0, count) + .forEach( + i -> + testing.runWithSpan( + "parent", + () -> { + int status = + httpClient + .get() + .uri(resolveAddress("/success")) + .responseSingle( + (resp, content) -> { + // Make sure to consume content since that's when we close the + // span. + return content.map(unused -> resp); + }) + .block() + .status() + .code(); + assertThat(status).isEqualTo(200); + })); + + testing.waitForTraces(count); + assertThat(uniqueChannelHashes).hasSize(1); + } + + private static void assertSameSpan(SpanData expected, AtomicReference actual) { + SpanContext expectedSpanContext = expected.getSpanContext(); + SpanContext actualSpanContext = actual.get().getSpanContext(); + assertThat(actualSpanContext.getTraceId()).isEqualTo(expectedSpanContext.getTraceId()); + assertThat(actualSpanContext.getSpanId()).isEqualTo(expectedSpanContext.getSpanId()); + } +} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/CustomNameResolverGroup.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/CustomNameResolverGroup.java new file mode 100644 index 0000000000..de5ddee31f --- /dev/null +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/CustomNameResolverGroup.java @@ -0,0 +1,56 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import static java.util.Collections.singletonList; + +import io.netty.resolver.AddressResolver; +import io.netty.resolver.AddressResolverGroup; +import io.netty.resolver.InetNameResolver; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Promise; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.List; + +public class CustomNameResolverGroup extends AddressResolverGroup { + + public static final CustomNameResolverGroup INSTANCE = new CustomNameResolverGroup(); + + private CustomNameResolverGroup() {} + + @Override + protected AddressResolver newResolver(EventExecutor executor) { + return new CustomNameResolver(executor).asAddressResolver(); + } + + private static class CustomNameResolver extends InetNameResolver { + + private CustomNameResolver(EventExecutor executor) { + super(executor); + } + + @Override + protected void doResolve(String inetHost, Promise promise) { + try { + promise.setSuccess(InetAddress.getByName(inetHost)); + } catch (UnknownHostException exception) { + promise.setFailure(exception); + } + } + + @Override + protected void doResolveAll(String inetHost, Promise> promise) { + try { + // default implementation calls InetAddress.getAllByName + promise.setSuccess(singletonList(InetAddress.getByName(inetHost))); + } catch (UnknownHostException exception) { + promise.setFailure(exception); + } + } + } +} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyClientSslTest.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyClientSslTest.java new file mode 100644 index 0000000000..962793afc7 --- /dev/null +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyClientSslTest.java @@ -0,0 +1,228 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static io.opentelemetry.api.trace.SpanKind.CLIENT; +import static io.opentelemetry.api.trace.SpanKind.INTERNAL; +import static io.opentelemetry.api.trace.SpanKind.SERVER; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HttpFlavorValues.HTTP_1_1; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP; +import static org.assertj.core.api.Assertions.catchThrowable; + +import io.netty.handler.ssl.SslContextBuilder; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer; +import io.opentelemetry.sdk.trace.data.EventData; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.List; +import javax.annotation.Nullable; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLHandshakeException; +import org.assertj.core.api.AbstractLongAssert; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.client.HttpClientResponse; +import reactor.netty.tcp.SslProvider; + +class ReactorNettyClientSslTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + static HttpClientTestServer server; + + @BeforeAll + static void setUp() { + server = new HttpClientTestServer(testing.getOpenTelemetry()); + server.start(); + } + + @AfterAll + static void tearDown() { + server.stop(); + } + + @Test + void shouldFailSslHandshake() throws SSLException { + HttpClient httpClient = createHttpClient("SSLv3"); + String uri = "https://localhost:" + server.httpsPort() + "/success"; + + Mono responseMono = + httpClient + .get() + .uri(uri) + .responseSingle( + (resp, content) -> { + // Make sure to consume content since that's when we close the span. + return content.map(unused -> resp); + }); + + Throwable thrown = + catchThrowable(() -> testing.runWithSpan("parent", () -> responseMono.block())); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactlyInAnyOrder( + span -> + span.hasName("parent") + .hasKind(INTERNAL) + .hasNoParent() + .hasStatus(StatusData.error()) + .hasException(thrown), + span -> + span.hasName("HTTP GET") + .hasKind(CLIENT) + .hasParent(trace.getSpan(0)) + .hasStatus(StatusData.error()) + // netty swallows the exception, it doesn't make any sense to hard-code the + // message + .hasEventsSatisfying(ReactorNettyClientSslTest::isSslHandshakeException) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.HTTP_METHOD, "GET"), + equalTo(SemanticAttributes.HTTP_URL, uri)), + span -> + span.hasName("RESOLVE") + .hasKind(INTERNAL) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort())), + span -> + span.hasName("CONNECT") + .hasKind(INTERNAL) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort()), + equalTo(stringKey("net.sock.peer.addr"), "127.0.0.1")), + span -> + span.hasName("SSL handshake") + .hasKind(INTERNAL) + .hasParent(trace.getSpan(1)) + .hasStatus(StatusData.error()) + // netty swallows the exception, it doesn't make any sense to hard-code the + // message + .hasEventsSatisfying(ReactorNettyClientSslTest::isSslHandshakeException) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort()), + equalTo(stringKey("net.sock.peer.addr"), "127.0.0.1")))); + } + + @Test + void shouldSuccessfullyEstablishSslHandshake() throws SSLException { + HttpClient httpClient = createHttpClient(); + String uri = "https://localhost:" + server.httpsPort() + "/success"; + + Mono responseMono = + httpClient + .get() + .uri(uri) + .responseSingle( + (resp, content) -> { + // Make sure to consume content since that's when we close the span. + return content.map(unused -> resp); + }); + + testing.runWithSpan("parent", () -> responseMono.block()); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactlyInAnyOrder( + span -> span.hasName("parent").hasKind(INTERNAL).hasNoParent(), + span -> + span.hasName("HTTP GET") + .hasKind(CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.HTTP_METHOD, "GET"), + equalTo(SemanticAttributes.HTTP_URL, uri), + equalTo(SemanticAttributes.HTTP_FLAVOR, HTTP_1_1), + equalTo(SemanticAttributes.HTTP_STATUS_CODE, 200), + satisfies( + SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH, + AbstractLongAssert::isNotNegative), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort()), + equalTo(stringKey("net.sock.peer.addr"), "127.0.0.1")), + span -> + span.hasName("RESOLVE") + .hasKind(INTERNAL) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort())), + span -> + span.hasName("CONNECT") + .hasKind(INTERNAL) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort()), + equalTo(stringKey("net.sock.peer.addr"), "127.0.0.1")), + span -> + span.hasName("SSL handshake") + .hasKind(INTERNAL) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort()), + equalTo(stringKey("net.sock.peer.addr"), "127.0.0.1")), + span -> + span.hasName("test-http-server").hasKind(SERVER).hasParent(trace.getSpan(1)))); + } + + private static HttpClient createHttpClient() throws SSLException { + return ReactorNettyClientSslTest.createHttpClient(null); + } + + private static HttpClient createHttpClient(@Nullable String enabledProtocol) throws SSLException { + SslContextBuilder sslContext = SslContextBuilder.forClient(); + if (enabledProtocol != null) { + sslContext = sslContext.protocols(enabledProtocol); + } + + SslProvider sslProvider = SslProvider.builder().sslContext(sslContext.build()).build(); + return HttpClient.create().secure(sslProvider); + } + + private static void isSslHandshakeException(List events) { + assertThat(events) + .filteredOn(event -> event.getName().equals(SemanticAttributes.EXCEPTION_EVENT_NAME)) + .satisfiesExactly( + event -> + assertThat(event) + .hasAttributesSatisfying( + attributes -> + assertThat(attributes) + .hasSize(3) + .containsEntry( + SemanticAttributes.EXCEPTION_TYPE, + SSLHandshakeException.class.getCanonicalName()) + .hasEntrySatisfying( + SemanticAttributes.EXCEPTION_MESSAGE, + s -> assertThat(s).isNotEmpty()) + .hasEntrySatisfying( + SemanticAttributes.EXCEPTION_STACKTRACE, + s -> assertThat(s).isNotEmpty()))); + } +} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.java new file mode 100644 index 0000000000..8da1c1cd61 --- /dev/null +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.java @@ -0,0 +1,177 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static io.opentelemetry.api.trace.SpanKind.CLIENT; +import static io.opentelemetry.api.trace.SpanKind.INTERNAL; +import static io.opentelemetry.api.trace.SpanKind.SERVER; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HttpFlavorValues.HTTP_1_1; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; + +import io.opentelemetry.instrumentation.test.utils.PortUtils; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import org.assertj.core.api.AbstractLongAssert; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.netty.http.client.HttpClient; + +class ReactorNettyConnectionSpanTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + static HttpClientTestServer server; + + @BeforeAll + static void setUp() { + server = new HttpClientTestServer(testing.getOpenTelemetry()); + server.start(); + } + + @AfterAll + static void tearDown() { + server.stop(); + } + + @Test + void testSuccessfulRequest() { + HttpClient httpClient = HttpClient.create(); + String uri = "http://localhost:" + server.httpPort() + "/success"; + + int responseCode = + testing.runWithSpan( + "parent", + () -> + httpClient + .get() + .uri(uri) + .responseSingle( + (resp, content) -> { + // Make sure to consume content since that's when we close the span. + return content.map(unused -> resp); + }) + .block() + .status() + .code()); + + assertThat(responseCode).isEqualTo(200); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactlyInAnyOrder( + span -> span.hasName("parent").hasKind(INTERNAL).hasNoParent(), + span -> + span.hasName("HTTP GET") + .hasKind(CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.HTTP_METHOD, "GET"), + equalTo(SemanticAttributes.HTTP_URL, uri), + equalTo(SemanticAttributes.HTTP_FLAVOR, HTTP_1_1), + equalTo(SemanticAttributes.HTTP_STATUS_CODE, 200), + satisfies( + SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH, + AbstractLongAssert::isNotNegative), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, server.httpPort()), + equalTo(stringKey("net.sock.peer.addr"), "127.0.0.1")), + span -> + span.hasName("RESOLVE") + .hasKind(INTERNAL) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, server.httpPort())), + span -> + span.hasName("CONNECT") + .hasKind(INTERNAL) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, server.httpPort()), + equalTo(stringKey("net.sock.peer.addr"), "127.0.0.1")), + span -> + span.hasName("test-http-server").hasKind(SERVER).hasParent(trace.getSpan(1)))); + } + + @Test + void testFailingRequest() { + HttpClient httpClient = HttpClient.create(); + String uri = "http://localhost:" + PortUtils.UNUSABLE_PORT; + + Throwable thrown = + catchThrowable( + () -> + testing.runWithSpan( + "parent", + () -> + httpClient + .get() + .uri(uri) + .responseSingle( + (resp, content) -> { + // Make sure to consume content since that's when we close the + // span. + return content.map(unused -> resp); + }) + .block() + .status() + .code())); + + Throwable connectException = thrown.getCause(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactlyInAnyOrder( + span -> + span.hasName("parent") + .hasKind(INTERNAL) + .hasNoParent() + .hasStatus(StatusData.error()) + .hasException(thrown), + span -> + span.hasName("HTTP GET") + .hasKind(CLIENT) + .hasParent(trace.getSpan(0)) + .hasStatus(StatusData.error()) + .hasException(connectException) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.HTTP_METHOD, "GET"), + equalTo(SemanticAttributes.HTTP_URL, uri)), + span -> + span.hasName("RESOLVE") + .hasKind(INTERNAL) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, PortUtils.UNUSABLE_PORT)), + span -> + span.hasName("CONNECT") + .hasKind(INTERNAL) + .hasParent(trace.getSpan(1)) + .hasStatus(StatusData.error()) + .hasException(connectException) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, PortUtils.UNUSABLE_PORT), + equalTo(stringKey("net.sock.peer.addr"), "127.0.0.1")))); + } +} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientTest.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientTest.java new file mode 100644 index 0000000000..9fe4971bc0 --- /dev/null +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientTest.java @@ -0,0 +1,51 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import io.netty.channel.ChannelOption; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions; +import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames; +import reactor.netty.http.client.HttpClient; + +class ReactorNettyHttpClientTest extends AbstractReactorNettyHttpClientTest { + + @Override + protected HttpClient createHttpClient() { + int connectionTimeoutMillis = (int) CONNECTION_TIMEOUT.toMillis(); + return HttpClient.create() + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutMillis) + .resolver(getAddressResolverGroup()) + .headers(headers -> headers.set(HttpHeaderNames.USER_AGENT, USER_AGENT)); + } + + @Override + protected void configure(HttpClientTestOptions options) { + super.configure(options); + + options.setSingleConnectionFactory( + (host, port) -> { + HttpClient httpClient = + HttpClient.newConnection() + .host(host) + .port(port) + .headers(headers -> headers.set(HttpHeaderNames.USER_AGENT, USER_AGENT)); + + return (path, headers) -> + httpClient + .headers(h -> headers.forEach(h::add)) + .get() + .uri(path) + .responseSingle( + (resp, content) -> { + // Make sure to consume content since that's when we close the span. + return content.map(unused -> resp); + }) + .block() + .status() + .code(); + }); + } +} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientUsingFromTest.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientUsingFromTest.java new file mode 100644 index 0000000000..446b658ad1 --- /dev/null +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientUsingFromTest.java @@ -0,0 +1,24 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import io.netty.channel.ChannelOption; +import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames; +import reactor.netty.http.client.HttpClient; +import reactor.netty.tcp.TcpClient; + +class ReactorNettyHttpClientUsingFromTest extends AbstractReactorNettyHttpClientTest { + + @SuppressWarnings("deprecation") // from(TcpClient) is deprecated, but we want to test it anyway + @Override + protected HttpClient createHttpClient() { + int connectionTimeoutMillis = (int) CONNECTION_TIMEOUT.toMillis(); + return HttpClient.from(TcpClient.create()) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutMillis) + .resolver(getAddressResolverGroup()) + .headers(headers -> headers.set(HttpHeaderNames.USER_AGENT, USER_AGENT)); + } +} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyWithSpanTest.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyWithSpanTest.java new file mode 100644 index 0000000000..3f1d605701 --- /dev/null +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyWithSpanTest.java @@ -0,0 +1,79 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import static io.opentelemetry.api.trace.SpanKind.CLIENT; +import static io.opentelemetry.api.trace.SpanKind.INTERNAL; +import static io.opentelemetry.api.trace.SpanKind.SERVER; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer; +import java.time.Duration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; +import reactor.test.StepVerifier; + +class ReactorNettyWithSpanTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + static HttpClientTestServer server; + + @BeforeAll + static void setUp() { + server = new HttpClientTestServer(testing.getOpenTelemetry()); + server.start(); + } + + @AfterAll + static void tearDown() { + server.stop(); + } + + @Test + public void testSuccessfulNestedUnderWithSpan() { + HttpClient httpClient = HttpClient.create(); + + Mono httpRequest = + Mono.defer( + () -> + httpClient + .get() + .uri("http://localhost:" + server.httpPort() + "/success") + .responseSingle( + (resp, content) -> { + // Make sure to consume content since that's when we close the span. + return content.map(unused -> resp); + }) + .map(r -> r.status().code())); + + Mono getResponse = + new TracedWithSpan() + .mono( + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4348 + // our HTTP server is synchronous, i.e. it returns Mono.just with response + // which is not supported by TracingSubscriber - it does not instrument scalar calls + // so we delay here to fake async http request and let Reactor context + // instrumentation work + Mono.delay(Duration.ofMillis(1)).then(httpRequest)); + + StepVerifier.create(getResponse).expectNext(200).expectComplete().verify(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("TracedWithSpan.mono").hasKind(INTERNAL).hasNoParent(), + span -> span.hasName("HTTP GET").hasKind(CLIENT).hasParent(trace.getSpan(0)), + span -> + span.hasName("test-http-server").hasKind(SERVER).hasParent(trace.getSpan(1)))); + } +} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/test/reactor/netty/TracedWithSpan.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/TracedWithSpan.java similarity index 80% rename from instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/test/reactor/netty/TracedWithSpan.java rename to instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/TracedWithSpan.java index eb5ba7a78c..2bd4c9607e 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/test/reactor/netty/TracedWithSpan.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/TracedWithSpan.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.test.reactor.netty; +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; import io.opentelemetry.instrumentation.annotations.WithSpan; import reactor.core.publisher.Mono;