diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/DecoratorFunctions.java b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/DecoratorFunctions.java index 25f340ea6c..53e8eb5405 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/DecoratorFunctions.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/DecoratorFunctions.java @@ -17,9 +17,12 @@ import reactor.netty.http.client.HttpClientResponse; public final class DecoratorFunctions { - // ignore our own callbacks - or already decorated functions + // ignore already decorated functions public static boolean shouldDecorate(Class callbackClass) { - return !callbackClass.getName().startsWith("io.opentelemetry.javaagent"); + return callbackClass != OnRequestDecorator.class + && callbackClass != OnResponseDecorator.class + && callbackClass != OnRequestErrorDecorator.class + && callbackClass != OnResponseErrorDecorator.class; } private abstract static class OnMessageDecorator implements BiConsumer { diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/AbstractReactorNettyHttpClientTest.groovy b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/AbstractReactorNettyHttpClientTest.groovy deleted file mode 100644 index 54de8b9466..0000000000 --- a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/AbstractReactorNettyHttpClientTest.groovy +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9 - -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.trace.Span -import io.opentelemetry.api.trace.SpanKind -import io.opentelemetry.api.trace.StatusCode -import io.opentelemetry.instrumentation.test.AgentTestTrait -import io.opentelemetry.instrumentation.test.base.HttpClientTest -import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult -import io.opentelemetry.sdk.trace.data.SpanData -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import reactor.netty.http.client.HttpClient - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicReference - -import static io.opentelemetry.instrumentation.test.utils.PortUtils.UNUSABLE_PORT - -abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest implements AgentTestTrait { - - @Override - boolean testRedirects() { - false - } - - @Override - boolean testReadTimeout() { - true - } - - @Override - String userAgent() { - return "ReactorNetty" - } - - @Override - HttpClient.ResponseReceiver buildRequest(String method, URI uri, Map headers) { - def readTimeout = uri.toString().contains("/read-timeout") - return createHttpClient(readTimeout) - .followRedirect(true) - .headers({ h -> headers.each { k, v -> h.add(k, v) } }) - .baseUrl(resolveAddress("").toString()) - ."${method.toLowerCase()}"() - .uri(uri.toString()) - } - - @Override - int sendRequest(HttpClient.ResponseReceiver request, String method, URI uri, Map headers) { - return request.responseSingle { resp, content -> - // Make sure to consume content since that's when we close the span. - content.map { - resp - } - }.block().status().code() - } - - @Override - void sendRequestWithCallback(HttpClient.ResponseReceiver request, String method, URI uri, Map headers, HttpClientResult requestResult) { - request.responseSingle { resp, content -> - // Make sure to consume content since that's when we close the span. - content.map { resp } - }.subscribe({ - requestResult.complete(it.status().code()) - }, { throwable -> - requestResult.complete(throwable) - }) - } - - @Override - String expectedClientSpanName(URI uri, String method) { - switch (uri.toString()) { - case "http://localhost:61/": // unopened port - case "https://192.0.2.1/": // non routable address - return "CONNECT" - default: - return super.expectedClientSpanName(uri, method) - } - } - - @Override - Throwable clientSpanError(URI uri, Throwable exception) { - if (exception.class.getName().endsWith("ReactiveException")) { - switch (uri.toString()) { - case "http://localhost:61/": // unopened port - case "https://192.0.2.1/": // non routable address - exception = exception.getCause() - } - } - return exception - } - - @Override - Set> httpAttributes(URI uri) { - switch (uri.toString()) { - case "http://localhost:61/": // unopened port - case "https://192.0.2.1/": // non routable address - return [] - } - def attributes = super.httpAttributes(uri) - attributes.remove(SemanticAttributes.NET_PEER_NAME) - attributes.remove(SemanticAttributes.NET_PEER_PORT) - return attributes - } - - abstract HttpClient createHttpClient(boolean readTimeout) - - def "should expose context to http client callbacks"() { - given: - def onRequestSpan = new AtomicReference() - def afterRequestSpan = new AtomicReference() - def onResponseSpan = new AtomicReference() - def afterResponseSpan = new AtomicReference() - def latch = new CountDownLatch(1) - - def httpClient = createHttpClient(false) - .doOnRequest({ rq, con -> onRequestSpan.set(Span.current()) }) - .doAfterRequest({ rq, con -> afterRequestSpan.set(Span.current()) }) - .doOnResponse({ rs, con -> onResponseSpan.set(Span.current()) }) - .doAfterResponse({ rs, con -> - afterResponseSpan.set(Span.current()) - latch.countDown() - }) - - when: - runWithSpan("parent") { - httpClient.baseUrl(resolveAddress("").toString()) - .get() - .uri("/success") - .responseSingle { resp, content -> - // Make sure to consume content since that's when we close the span. - content.map { resp } - } - .block() - } - latch.await() - - then: - assertTraces(1) { - trace(0, 3) { - def parentSpan = span(0) - def nettyClientSpan = span(1) - - span(0) { - name "parent" - kind SpanKind.INTERNAL - hasNoParent() - } - clientSpan(it, 1, parentSpan, "GET", resolveAddress("/success")) - serverSpan(it, 2, nettyClientSpan) - - assertSameSpan(parentSpan, onRequestSpan) - assertSameSpan(nettyClientSpan, afterRequestSpan) - assertSameSpan(nettyClientSpan, onResponseSpan) - assertSameSpan(parentSpan, afterResponseSpan) - } - } - } - - def "should expose context to http request error callback"() { - given: - def onRequestErrorSpan = new AtomicReference() - - def httpClient = createHttpClient(false) - .doOnRequestError({ rq, err -> onRequestErrorSpan.set(Span.current()) }) - - when: - runWithSpan("parent") { - httpClient.get() - .uri("http://localhost:$UNUSABLE_PORT/") - .responseSingle { resp, content -> - // Make sure to consume content since that's when we close the span. - content.map { resp } - } - .block() - } - - then: - def ex = thrown(Exception) - - assertTraces(1) { - trace(0, 2) { - def parentSpan = span(0) - - span(0) { - name "parent" - kind SpanKind.INTERNAL - hasNoParent() - status StatusCode.ERROR - errorEvent(ex.class, ex.message) - } - span(1) { - def actualException = ex.cause - kind SpanKind.CLIENT - childOf parentSpan - status StatusCode.ERROR - errorEvent(actualException.class, actualException.message) - } - - assertSameSpan(parentSpan, onRequestErrorSpan) - } - } - } - - static void assertSameSpan(SpanData expected, AtomicReference actual) { - def expectedSpanContext = expected.spanContext - def actualSpanContext = actual.get().spanContext - assert expectedSpanContext.traceId == actualSpanContext.traceId - assert expectedSpanContext.spanId == actualSpanContext.spanId - } -} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyConnectionSpanTest.groovy b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyConnectionSpanTest.groovy deleted file mode 100644 index fce6795fbd..0000000000 --- a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyConnectionSpanTest.groovy +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9 - -import io.opentelemetry.instrumentation.test.AgentTestTrait -import io.opentelemetry.instrumentation.test.InstrumentationSpecification -import io.opentelemetry.instrumentation.test.utils.PortUtils -import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import reactor.netty.http.client.HttpClient -import spock.lang.Shared - -import static io.opentelemetry.api.trace.SpanKind.CLIENT -import static io.opentelemetry.api.trace.SpanKind.INTERNAL -import static io.opentelemetry.api.trace.SpanKind.SERVER -import static io.opentelemetry.api.trace.StatusCode.ERROR -import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP - -class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implements AgentTestTrait { - - @Shared - private HttpClientTestServer server - - def setupSpec() { - server = new HttpClientTestServer(openTelemetry) - server.start() - } - - def cleanupSpec() { - server.stop() - } - - def "test successful request"() { - when: - def httpClient = HttpClient.create() - def responseCode = - runWithSpan("parent") { - httpClient.get().uri("http://localhost:${server.httpPort()}/success") - .responseSingle { resp, content -> - // Make sure to consume content since that's when we close the span. - content.map { resp } - } - .block() - .status().code() - } - - then: - responseCode == 200 - assertTraces(1) { - trace(0, 5) { - def list = Arrays.asList("RESOLVE", "CONNECT") - spans.subList(1, 3).sort(Comparator.comparing { item -> list.indexOf(item.name) }) - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name "RESOLVE" - kind INTERNAL - childOf span(0) - attributes { - "$SemanticAttributes.NET_TRANSPORT" IP_TCP - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" server.httpPort() - } - } - span(2) { - name "CONNECT" - kind INTERNAL - childOf(span(0)) - attributes { - "$SemanticAttributes.NET_TRANSPORT" IP_TCP - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" server.httpPort() - "$SemanticAttributes.NET_SOCK_PEER_ADDR" "127.0.0.1" - } - } - span(3) { - name "HTTP GET" - kind CLIENT - childOf(span(0)) - } - span(4) { - name "test-http-server" - kind SERVER - childOf(span(3)) - } - } - } - } - - def "test failing request"() { - when: - def httpClient = HttpClient.create() - runWithSpan("parent") { - httpClient.get().uri("http://localhost:${PortUtils.UNUSABLE_PORT}") - .responseSingle { resp, content -> - // Make sure to consume content since that's when we close the span. - content.map { resp } - } - .block() - .status().code() - } - - then: - def thrownException = thrown(Exception) - def connectException = thrownException.getCause() - - and: - assertTraces(1) { - trace(0, 3) { - def list = Arrays.asList("RESOLVE", "CONNECT") - spans.subList(1, 3).sort(Comparator.comparing { item -> list.indexOf(item.name) }) - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - status ERROR - errorEvent(thrownException.class, thrownException.message) - } - span(1) { - name "RESOLVE" - kind INTERNAL - childOf span(0) - attributes { - "$SemanticAttributes.NET_TRANSPORT" IP_TCP - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" PortUtils.UNUSABLE_PORT - } - } - span(2) { - name "CONNECT" - kind INTERNAL - childOf(span(0)) - status ERROR - errorEvent(connectException.class, connectException.message) - attributes { - "$SemanticAttributes.NET_TRANSPORT" IP_TCP - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" PortUtils.UNUSABLE_PORT - "$SemanticAttributes.NET_SOCK_PEER_ADDR" { it == "127.0.0.1" || it == null } - } - } - } - } - } -} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyHttpClientTest.groovy b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyHttpClientTest.groovy deleted file mode 100644 index 559fb49825..0000000000 --- a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyHttpClientTest.groovy +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9 - -import io.netty.channel.ChannelOption -import io.netty.handler.timeout.ReadTimeoutHandler -import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection -import java.util.concurrent.TimeUnit -import reactor.netty.http.client.HttpClient - -import java.util.concurrent.ExecutionException -import java.util.concurrent.TimeoutException - -class ReactorNettyHttpClientTest extends AbstractReactorNettyHttpClientTest { - - HttpClient createHttpClient(boolean readTimeout) { - return HttpClient.create().tcpConfiguration({ tcpClient -> - if (readTimeout) { - tcpClient = tcpClient.doOnConnected({ connection -> - connection.addHandlerLast(new ReadTimeoutHandler(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS)) - }) - } - tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS) - }) - } - - @Override - SingleConnection createSingleConnection(String host, int port) { - String url - try { - url = new URL("http", host, port, "").toString() - } catch (MalformedURLException e) { - throw new ExecutionException(e) - } - - def httpClient = HttpClient - .newConnection() - .baseUrl(url) - - return new SingleConnection() { - - @Override - int doRequest(String path, Map headers) throws ExecutionException, InterruptedException, TimeoutException { - return httpClient - .headers({ h -> headers.each { k, v -> h.add(k, v) } }) - .get() - .uri(path) - .responseSingle { resp, content -> - // Make sure to consume content since that's when we close the span. - content.map { resp } - } - .block().status().code() - } - } - } -} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyHttpClientUsingFromTest.groovy b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyHttpClientUsingFromTest.groovy deleted file mode 100644 index 499d5a9d80..0000000000 --- a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyHttpClientUsingFromTest.groovy +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9 - -import io.netty.channel.ChannelOption -import io.netty.handler.timeout.ReadTimeoutHandler -import java.util.concurrent.TimeUnit -import reactor.netty.http.client.HttpClient -import reactor.netty.tcp.TcpClient - -class ReactorNettyHttpClientUsingFromTest extends AbstractReactorNettyHttpClientTest { - - HttpClient createHttpClient(boolean readTimeout) { - return HttpClient.from(TcpClient.create()).tcpConfiguration({ tcpClient -> - if (readTimeout) { - tcpClient = tcpClient.doOnConnected({ connection -> - connection.addHandlerLast(new ReadTimeoutHandler(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS)) - }) - } - tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS) - }) - } -} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/AbstractReactorNettyHttpClientTest.java b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/AbstractReactorNettyHttpClientTest.java new file mode 100644 index 0000000000..41fded243f --- /dev/null +++ b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/AbstractReactorNettyHttpClientTest.java @@ -0,0 +1,242 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9; + +import static io.opentelemetry.api.trace.SpanKind.CLIENT; +import static io.opentelemetry.api.trace.SpanKind.INTERNAL; +import static io.opentelemetry.api.trace.SpanKind.SERVER; +import static java.util.Collections.emptySet; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; + +import io.netty.handler.codec.http.HttpMethod; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.net.URI; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.netty.http.client.HttpClient; + +abstract class AbstractReactorNettyHttpClientTest + extends AbstractHttpClientTest> { + + @RegisterExtension + static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent(); + + static final String USER_AGENT = "ReactorNetty"; + + abstract HttpClient createHttpClient(boolean readTimeout); + + @Override + public HttpClient.ResponseReceiver buildRequest( + String method, URI uri, Map headers) { + boolean readTimeout = uri.toString().contains("/read-timeout"); + return createHttpClient(readTimeout) + .followRedirect(true) + .headers(h -> headers.forEach(h::add)) + .baseUrl(resolveAddress("").toString()) + .request(HttpMethod.valueOf(method)) + .uri(uri.toString()); + } + + @Override + public int sendRequest( + HttpClient.ResponseReceiver request, String method, URI uri, Map headers) { + return request + .responseSingle( + (resp, content) -> { + // Make sure to consume content since that's when we close the span. + return content.map(unused -> resp); + }) + .block() + .status() + .code(); + } + + @Override + public void sendRequestWithCallback( + HttpClient.ResponseReceiver request, + String method, + URI uri, + Map headers, + HttpClientResult httpClientResult) { + request + .responseSingle( + (resp, content) -> { + // Make sure to consume content since that's when we close the span. + return content.map(unused -> resp); + }) + .subscribe( + response -> httpClientResult.complete(response.status().code()), + httpClientResult::complete); + } + + @Override + protected void configure(HttpClientTestOptions options) { + options.disableTestRedirects(); + options.enableTestReadTimeout(); + options.setUserAgent(USER_AGENT); + + options.setExpectedClientSpanNameMapper( + (uri, method) -> { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "https://192.0.2.1/": // non routable address + return "CONNECT"; + default: + return HttpClientTestOptions.DEFAULT_EXPECTED_CLIENT_SPAN_NAME_MAPPER.apply( + uri, method); + } + }); + + options.setClientSpanErrorMapper( + (uri, exception) -> { + if (exception.getClass().getName().endsWith("ReactiveException")) { + // unopened port or non routable address + if ("http://localhost:61/".equals(uri.toString()) + || "https://192.0.2.1/".equals(uri.toString())) { + exception = exception.getCause(); + } + } + return exception; + }); + + options.setHttpAttributes( + uri -> { + // unopened port or non routable address + if ("http://localhost:61/".equals(uri.toString()) + || "https://192.0.2.1/".equals(uri.toString())) { + return emptySet(); + } + + Set> attributes = + new HashSet<>(HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES); + attributes.remove(SemanticAttributes.NET_PEER_NAME); + attributes.remove(SemanticAttributes.NET_PEER_PORT); + return attributes; + }); + } + + @Test + void shouldExposeContextToHttpClientCallbacks() throws InterruptedException { + AtomicReference onRequestSpan = new AtomicReference<>(); + AtomicReference afterRequestSpan = new AtomicReference<>(); + AtomicReference onResponseSpan = new AtomicReference<>(); + AtomicReference afterResponseSpan = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + + HttpClient httpClient = + createHttpClient(false) + .doOnRequest((rq, con) -> onRequestSpan.set(Span.current())) + .doAfterRequest((rq, con) -> afterRequestSpan.set(Span.current())) + .doOnResponse((rs, con) -> onResponseSpan.set(Span.current())) + .doAfterResponse( + (rs, con) -> { + afterResponseSpan.set(Span.current()); + latch.countDown(); + }); + + testing.runWithSpan( + "parent", + () -> + httpClient + .baseUrl(resolveAddress("").toString()) + .get() + .uri("/success") + .responseSingle( + (resp, content) -> { + // Make sure to consume content since that's when we close the span. + return content.map(unused -> resp); + }) + .block()); + + latch.await(10, TimeUnit.SECONDS); + + testing.waitAndAssertTraces( + trace -> { + SpanData parentSpan = trace.getSpan(0); + SpanData nettyClientSpan = trace.getSpan(1); + + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(INTERNAL).hasNoParent(), + span -> span.hasName("HTTP GET").hasKind(CLIENT).hasParent(parentSpan), + span -> span.hasName("test-http-server").hasKind(SERVER).hasParent(nettyClientSpan)); + + assertSameSpan(parentSpan, onRequestSpan); + assertSameSpan(nettyClientSpan, afterRequestSpan); + assertSameSpan(nettyClientSpan, onResponseSpan); + assertSameSpan(parentSpan, afterResponseSpan); + }); + } + + @Test + void shouldExposeContextToHttpRequestErrorCallback() { + AtomicReference onRequestErrorSpan = new AtomicReference<>(); + + HttpClient httpClient = + createHttpClient(false) + .doOnRequestError((rq, err) -> onRequestErrorSpan.set(Span.current())); + + Throwable thrown = + catchThrowable( + () -> + testing.runWithSpan( + "parent", + () -> + httpClient + .get() + .uri("http://localhost:$UNUSABLE_PORT/") + .responseSingle( + (resp, content) -> { + // Make sure to consume content since that's when we close the + // span. + return content.map(unused -> resp); + }) + .block())); + + testing.waitAndAssertTraces( + trace -> { + SpanData parentSpan = trace.getSpan(0); + + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("parent") + .hasKind(INTERNAL) + .hasNoParent() + .hasStatus(StatusData.error()) + .hasException(thrown), + span -> + span.hasKind(CLIENT) + .hasParent(parentSpan) + .hasStatus(StatusData.error()) + .hasException(thrown.getCause())); + + assertSameSpan(parentSpan, onRequestErrorSpan); + }); + } + + private static void assertSameSpan(SpanData expected, AtomicReference actual) { + SpanContext expectedSpanContext = expected.getSpanContext(); + SpanContext actualSpanContext = actual.get().getSpanContext(); + assertThat(actualSpanContext.getTraceId()).isEqualTo(expectedSpanContext.getTraceId()); + assertThat(actualSpanContext.getSpanId()).isEqualTo(expectedSpanContext.getSpanId()); + } +} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyConnectionSpanTest.java b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyConnectionSpanTest.java new file mode 100644 index 0000000000..8149cbf10e --- /dev/null +++ b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyConnectionSpanTest.java @@ -0,0 +1,153 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9; + +import static io.opentelemetry.api.trace.SpanKind.CLIENT; +import static io.opentelemetry.api.trace.SpanKind.INTERNAL; +import static io.opentelemetry.api.trace.SpanKind.SERVER; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; + +import io.opentelemetry.instrumentation.test.utils.PortUtils; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.netty.http.client.HttpClient; + +class ReactorNettyConnectionSpanTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + static HttpClientTestServer server; + + @BeforeAll + static void setUp() { + server = new HttpClientTestServer(testing.getOpenTelemetry()); + server.start(); + } + + @AfterAll + static void tearDown() { + server.stop(); + } + + @Test + void testSuccessfulRequest() { + HttpClient httpClient = HttpClient.create(); + String uri = "http://localhost:" + server.httpPort() + "/success"; + + int responseCode = + testing.runWithSpan( + "parent", + () -> + httpClient + .get() + .uri(uri) + .responseSingle( + (resp, content) -> { + // Make sure to consume content since that's when we close the span. + return content.map(unused -> resp); + }) + .block() + .status() + .code()); + + assertThat(responseCode).isEqualTo(200); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactlyInAnyOrder( + span -> span.hasName("parent").hasKind(INTERNAL).hasNoParent(), + span -> + span.hasName("RESOLVE") + .hasKind(INTERNAL) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, server.httpPort())), + span -> + span.hasName("CONNECT") + .hasKind(INTERNAL) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, server.httpPort()), + equalTo(SemanticAttributes.NET_SOCK_PEER_ADDR, "127.0.0.1")), + span -> span.hasName("HTTP GET").hasKind(CLIENT).hasParent(trace.getSpan(0)), + span -> + span.hasName("test-http-server").hasKind(SERVER).hasParent(trace.getSpan(3)))); + } + + @Test + void testFailingRequest() { + HttpClient httpClient = HttpClient.create(); + String uri = "http://localhost:" + PortUtils.UNUSABLE_PORT; + + Throwable thrown = + catchThrowable( + () -> + testing.runWithSpan( + "parent", + () -> + httpClient + .get() + .uri(uri) + .responseSingle( + (resp, content) -> { + // Make sure to consume content since that's when we close the + // span. + return content.map(unused -> resp); + }) + .block() + .status() + .code())); + + Throwable connectException = thrown.getCause(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactlyInAnyOrder( + span -> + span.hasName("parent") + .hasKind(INTERNAL) + .hasNoParent() + .hasStatus(StatusData.error()) + .hasException(thrown), + span -> + span.hasName("RESOLVE") + .hasKind(INTERNAL) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, PortUtils.UNUSABLE_PORT)), + span -> + span.hasName("CONNECT") + .hasKind(INTERNAL) + .hasParent(trace.getSpan(0)) + .hasStatus(StatusData.error()) + .hasException(connectException) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, PortUtils.UNUSABLE_PORT), + satisfies( + SemanticAttributes.NET_SOCK_PEER_ADDR, + val -> val.isIn(null, "127.0.0.1"))))); + } +} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyHttpClientTest.java b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyHttpClientTest.java new file mode 100644 index 0000000000..195777b9dd --- /dev/null +++ b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyHttpClientTest.java @@ -0,0 +1,61 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9; + +import io.netty.channel.ChannelOption; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.concurrent.TimeUnit; +import reactor.netty.http.client.HttpClient; + +class ReactorNettyHttpClientTest extends AbstractReactorNettyHttpClientTest { + + @Override + HttpClient createHttpClient(boolean readTimeout) { + return HttpClient.create() + .tcpConfiguration( + tcpClient -> { + if (readTimeout) { + tcpClient = + tcpClient.doOnConnected( + connection -> + connection.addHandlerLast( + new ReadTimeoutHandler( + READ_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS))); + } + return tcpClient.option( + ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) CONNECTION_TIMEOUT.toMillis()); + }); + } + + @Override + public SingleConnection createSingleConnection(String host, int port) { + String url; + try { + url = new URL("http", host, port, "").toString(); + } catch (MalformedURLException e) { + throw new AssertionError("Could not construct URL", e); + } + + HttpClient httpClient = HttpClient.newConnection().baseUrl(url); + + return (path, headers) -> + httpClient + .headers(h -> headers.forEach(h::add)) + .get() + .uri(path) + .responseSingle( + (resp, content) -> { + // Make sure to consume content since that's when we close the span. + return content.map(unused -> resp); + }) + .block() + .status() + .code(); + } +} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyHttpClientUsingFromTest.java b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyHttpClientUsingFromTest.java new file mode 100644 index 0000000000..e9634ba5f2 --- /dev/null +++ b/instrumentation/reactor/reactor-netty/reactor-netty-0.9/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyHttpClientUsingFromTest.java @@ -0,0 +1,33 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9; + +import io.netty.channel.ChannelOption; +import io.netty.handler.timeout.ReadTimeoutHandler; +import java.util.concurrent.TimeUnit; +import reactor.netty.http.client.HttpClient; +import reactor.netty.tcp.TcpClient; + +class ReactorNettyHttpClientUsingFromTest extends AbstractReactorNettyHttpClientTest { + + @Override + HttpClient createHttpClient(boolean readTimeout) { + return HttpClient.from(TcpClient.create()) + .tcpConfiguration( + tcpClient -> { + if (readTimeout) { + tcpClient = + tcpClient.doOnConnected( + connection -> + connection.addHandlerLast( + new ReadTimeoutHandler( + READ_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS))); + } + return tcpClient.option( + ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) CONNECTION_TIMEOUT.toMillis()); + }); + } +}