Refactor Reactor-Netty 1.0 tests to Java (#6497)

This commit is contained in:
Mateusz Rzeszutek 2022-08-26 10:37:33 +02:00 committed by GitHub
parent 33d2e40a9e
commit 892fb8a38e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 887 additions and 824 deletions

View File

@ -15,9 +15,10 @@ import reactor.util.context.ContextView;
public final class DecoratorFunctions {
// ignore our own callbacks - or already decorated functions
// ignore already decorated functions
public static boolean shouldDecorate(Class<?> callbackClass) {
return !callbackClass.getName().startsWith("io.opentelemetry.javaagent");
return callbackClass != OnMessageDecorator.class
&& callbackClass != OnMessageErrorDecorator.class;
}
public static final class OnMessageDecorator<M extends HttpClientInfos>

View File

@ -1,81 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer
import io.opentelemetry.test.reactor.netty.TracedWithSpan
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.test.StepVerifier
import spock.lang.Shared
import java.time.Duration
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.SERVER
class ReactorNettyWithSpanTest extends InstrumentationSpecification implements AgentTestTrait {
@Shared
private HttpClientTestServer server
def setupSpec() {
server = new HttpClientTestServer(openTelemetry)
server.start()
}
def cleanupSpec() {
server.stop()
}
def "test successful nested under WithSpan"() {
when:
def httpClient = HttpClient.create()
def httpRequest = Mono.defer({ ->
httpClient.get().uri("http://localhost:${server.httpPort()}/success")
.responseSingle({ resp, content ->
// Make sure to consume content since that's when we close the span.
content.map { resp }
})
.map({ r -> r.status().code() })
})
def getResponse = new TracedWithSpan().mono(
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4348
// our HTTP server is synchronous, i.e. it returns Mono.just with response
// which is not supported by TracingSubscriber - it does not instrument scalar calls
// so we delay here to fake async http request and let Reactor context instrumentation work
Mono.delay(Duration.ofMillis(1)).then(httpRequest))
then:
StepVerifier.create(getResponse)
.expectNext(200)
.expectComplete()
.verify()
assertTraces(1) {
trace(0, 3) {
span(0) {
name "TracedWithSpan.mono"
kind INTERNAL
hasNoParent()
}
span(1) {
name "HTTP GET"
kind CLIENT
childOf(span(0))
}
span(2) {
name "test-http-server"
kind SERVER
childOf(span(1))
}
}
}
}
}

View File

@ -1,278 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0
import io.netty.resolver.AddressResolver
import io.netty.resolver.AddressResolverGroup
import io.netty.resolver.InetNameResolver
import io.netty.util.concurrent.EventExecutor
import io.netty.util.concurrent.Promise
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.time.Duration
import reactor.netty.http.client.HttpClient
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference
import static io.opentelemetry.instrumentation.test.utils.PortUtils.UNUSABLE_PORT
abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest<HttpClient.ResponseReceiver> implements AgentTestTrait {
@Override
boolean testRedirects() {
false
}
@Override
boolean testReadTimeout() {
true
}
@Override
String userAgent() {
return "ReactorNetty"
}
@Override
HttpClient.ResponseReceiver buildRequest(String method, URI uri, Map<String, String> headers) {
def client = createHttpClient()
.followRedirect(true)
.headers({ h -> headers.each { k, v -> h.add(k, v) } })
.baseUrl(resolveAddress("").toString())
if (uri.toString().contains("/read-timeout")) {
client = client.responseTimeout(Duration.ofMillis(READ_TIMEOUT_MS))
}
return client."${method.toLowerCase()}"()
.uri(uri.toString())
}
@Override
int sendRequest(HttpClient.ResponseReceiver request, String method, URI uri, Map<String, String> headers) {
return request.responseSingle { resp, content ->
// Make sure to consume content since that's when we close the span.
content.map {
resp
}
}.block().status().code()
}
@Override
void sendRequestWithCallback(HttpClient.ResponseReceiver request, String method, URI uri, Map<String, String> headers, AbstractHttpClientTest.RequestResult requestResult) {
request.responseSingle { resp, content ->
// Make sure to consume content since that's when we close the span.
content.map { resp }
}.subscribe({
requestResult.complete(it.status().code())
}, { throwable ->
requestResult.complete(throwable)
})
}
@Override
Throwable clientSpanError(URI uri, Throwable exception) {
if (exception.class.getName().endsWith("ReactiveException")) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "https://192.0.2.1/": // non routable address
exception = exception.getCause()
}
}
return exception
}
@Override
Set<AttributeKey<?>> httpAttributes(URI uri) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "https://192.0.2.1/": // non routable address
return []
}
def attributes = super.httpAttributes(uri)
if (uri.toString().contains("/read-timeout")) {
attributes.remove(SemanticAttributes.NET_PEER_NAME)
attributes.remove(SemanticAttributes.NET_PEER_PORT)
attributes.remove(SemanticAttributes.HTTP_FLAVOR)
}
attributes
}
abstract HttpClient createHttpClient()
AddressResolverGroup getAddressResolverGroup() {
return CustomNameResolverGroup.INSTANCE
}
def "should expose context to http client callbacks"() {
given:
def onRequestSpan = new AtomicReference<Span>()
def afterRequestSpan = new AtomicReference<Span>()
def onResponseSpan = new AtomicReference<Span>()
def afterResponseSpan = new AtomicReference<Span>()
def latch = new CountDownLatch(1)
def httpClient = createHttpClient()
.doOnRequest({ rq, con -> onRequestSpan.set(Span.current()) })
.doAfterRequest({ rq, con -> afterRequestSpan.set(Span.current()) })
.doOnResponse({ rs, con -> onResponseSpan.set(Span.current()) })
.doAfterResponseSuccess({ rs, con ->
afterResponseSpan.set(Span.current())
latch.countDown()
})
when:
runWithSpan("parent") {
httpClient.baseUrl(resolveAddress("").toString())
.get()
.uri("/success")
.responseSingle { resp, content ->
// Make sure to consume content since that's when we close the span.
content.map { resp }
}
.block()
}
latch.await()
then:
assertTraces(1) {
trace(0, 3) {
def parentSpan = span(0)
def nettyClientSpan = span(1)
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
}
clientSpan(it, 1, parentSpan, "GET", resolveAddress("/success"))
serverSpan(it, 2, nettyClientSpan)
assertSameSpan(parentSpan, onRequestSpan)
assertSameSpan(nettyClientSpan, afterRequestSpan)
assertSameSpan(nettyClientSpan, onResponseSpan)
assertSameSpan(parentSpan, afterResponseSpan)
}
}
}
def "should expose context to http request error callback"() {
given:
def onRequestErrorSpan = new AtomicReference<Span>()
def httpClient = createHttpClient()
.doOnRequestError({ rq, err -> onRequestErrorSpan.set(Span.current()) })
when:
runWithSpan("parent") {
httpClient.get()
.uri("http://localhost:$UNUSABLE_PORT/")
.response()
.block()
}
then:
def ex = thrown(Exception)
assertTraces(1) {
trace(0, 2) {
def parentSpan = span(0)
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
status StatusCode.ERROR
errorEvent(ex.class, ex.message)
}
span(1) {
def actualException = ex.cause
kind SpanKind.CLIENT
childOf parentSpan
status StatusCode.ERROR
errorEvent(actualException.class, actualException.message)
}
assertSameSpan(parentSpan, onRequestErrorSpan)
}
}
}
def "should not leak connections"() {
given:
def uniqueChannelHashes = new HashSet<>()
def httpClient = createHttpClient()
.doOnConnect({ uniqueChannelHashes.add(it.channelHash()) })
def uri = "http://localhost:${server.httpPort()}/success"
def count = 100
when:
(1..count).forEach({
runWithSpan("parent") {
def status = httpClient.get().uri(uri)
.responseSingle { resp, content ->
// Make sure to consume content since that's when we close the span.
content.map { resp.status().code() }
}.block()
assert status == 200
}
})
then:
traces.size() == count
uniqueChannelHashes.size() == 1
}
static void assertSameSpan(SpanData expected, AtomicReference<Span> actual) {
def expectedSpanContext = expected.spanContext
def actualSpanContext = actual.get().spanContext
assert expectedSpanContext.traceId == actualSpanContext.traceId
assert expectedSpanContext.spanId == actualSpanContext.spanId
}
// custom address resolver that returns at most one address for each host
// adapted from io.netty.resolver.DefaultAddressResolverGroup
static class CustomNameResolverGroup extends AddressResolverGroup<InetSocketAddress> {
public static final CustomNameResolverGroup INSTANCE = new CustomNameResolverGroup()
private CustomNameResolverGroup() {
}
protected AddressResolver<InetSocketAddress> newResolver(EventExecutor executor) throws Exception {
return (new CustomNameResolver(executor)).asAddressResolver()
}
}
static class CustomNameResolver extends InetNameResolver {
CustomNameResolver(EventExecutor executor) {
super(executor)
}
protected void doResolve(String inetHost, Promise<InetAddress> promise) throws Exception {
try {
promise.setSuccess(InetAddress.getByName(inetHost))
} catch (UnknownHostException exception) {
promise.setFailure(exception)
}
}
protected void doResolveAll(String inetHost, Promise<List<InetAddress>> promise) throws Exception {
try {
// default implementation calls InetAddress.getAllByName
promise.setSuccess(Collections.singletonList(InetAddress.getByName(inetHost)))
} catch (UnknownHostException exception) {
promise.setFailure(exception)
}
}
}
}

View File

@ -1,208 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0
import io.netty.handler.ssl.SslContextBuilder
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import reactor.netty.http.client.HttpClient
import reactor.netty.tcp.SslProvider
import spock.lang.Shared
import javax.net.ssl.SSLHandshakeException
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.SERVER
import static io.opentelemetry.api.trace.StatusCode.ERROR
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HttpFlavorValues.HTTP_1_1
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP
class ReactorNettyClientSslTest extends AgentInstrumentationSpecification {
@Shared
private HttpClientTestServer server
def setupSpec() {
server = new HttpClientTestServer(openTelemetry)
server.start()
}
def cleanupSpec() {
server.stop()
}
def "should fail SSL handshake"() {
given:
def httpClient = createHttpClient(["SSLv3"])
def uri = "https://localhost:${server.httpsPort()}/success"
when:
def responseMono = httpClient.get().uri(uri)
.responseSingle { resp, content ->
// Make sure to consume content since that's when we close the span.
content.map { resp }
}
runWithSpan("parent") {
responseMono.block()
}
then:
Throwable thrownException = thrown()
assertTraces(1) {
trace(0, 5) {
def list = Arrays.asList("RESOLVE", "CONNECT", "SSL handshake")
spans.subList(2, 5).sort(Comparator.comparing { item -> list.indexOf(item.name) })
span(0) {
name "parent"
status ERROR
errorEvent(thrownException.class, thrownException.message)
}
span(1) {
name "HTTP GET"
kind CLIENT
childOf span(0)
status ERROR
// netty swallows the exception, it doesn't make any sense to hard-code the message
errorEventWithAnyMessage(SSLHandshakeException)
attributes {
"$SemanticAttributes.HTTP_METHOD" "GET"
"$SemanticAttributes.HTTP_URL" uri
}
}
span(2) {
name "RESOLVE"
kind INTERNAL
childOf span(1)
attributes {
"$SemanticAttributes.NET_TRANSPORT" IP_TCP
"$SemanticAttributes.NET_PEER_NAME" "localhost"
"$SemanticAttributes.NET_PEER_PORT" server.httpsPort()
}
}
span(3) {
name "CONNECT"
kind INTERNAL
childOf span(1)
attributes {
"$SemanticAttributes.NET_TRANSPORT" IP_TCP
"$SemanticAttributes.NET_PEER_NAME" "localhost"
"$SemanticAttributes.NET_PEER_PORT" server.httpsPort()
"net.sock.peer.addr" "127.0.0.1"
}
}
span(4) {
name "SSL handshake"
kind INTERNAL
childOf span(1)
status ERROR
// netty swallows the exception, it doesn't make any sense to hard-code the message
errorEventWithAnyMessage(SSLHandshakeException)
attributes {
"$SemanticAttributes.NET_TRANSPORT" IP_TCP
"$SemanticAttributes.NET_PEER_NAME" "localhost"
"$SemanticAttributes.NET_PEER_PORT" server.httpsPort()
"net.sock.peer.addr" "127.0.0.1"
}
}
}
}
}
def "should successfully establish SSL handshake"() {
given:
def httpClient = createHttpClient()
def uri = "https://localhost:${server.httpsPort()}/success"
when:
def responseMono = httpClient.get().uri(uri)
.responseSingle { resp, content ->
// Make sure to consume content since that's when we close the span.
content.map { resp }
}
runWithSpan("parent") {
responseMono.block()
}
then:
assertTraces(1) {
trace(0, 6) {
def list = Arrays.asList("RESOLVE", "CONNECT", "SSL handshake")
spans.subList(2, 5).sort(Comparator.comparing { item -> list.indexOf(item.name) })
span(0) {
name "parent"
}
span(1) {
name "HTTP GET"
kind CLIENT
childOf span(0)
attributes {
"$SemanticAttributes.HTTP_METHOD" "GET"
"$SemanticAttributes.HTTP_URL" uri
"$SemanticAttributes.HTTP_FLAVOR" HTTP_1_1
"$SemanticAttributes.HTTP_STATUS_CODE" 200
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
"$SemanticAttributes.NET_PEER_NAME" "localhost"
"$SemanticAttributes.NET_PEER_PORT" server.httpsPort()
"net.sock.peer.addr" "127.0.0.1"
}
}
span(2) {
name "RESOLVE"
kind INTERNAL
childOf span(1)
attributes {
"$SemanticAttributes.NET_TRANSPORT" IP_TCP
"$SemanticAttributes.NET_PEER_NAME" "localhost"
"$SemanticAttributes.NET_PEER_PORT" server.httpsPort()
}
}
span(3) {
name "CONNECT"
kind INTERNAL
childOf span(1)
attributes {
"$SemanticAttributes.NET_TRANSPORT" IP_TCP
"$SemanticAttributes.NET_PEER_NAME" "localhost"
"$SemanticAttributes.NET_PEER_PORT" server.httpsPort()
"net.sock.peer.addr" "127.0.0.1"
}
}
span(4) {
name "SSL handshake"
kind INTERNAL
childOf span(1)
attributes {
"$SemanticAttributes.NET_TRANSPORT" IP_TCP
"$SemanticAttributes.NET_PEER_NAME" "localhost"
"$SemanticAttributes.NET_PEER_PORT" server.httpsPort()
"net.sock.peer.addr" "127.0.0.1"
}
}
span(5) {
name "test-http-server"
kind SERVER
childOf span(1)
}
}
}
}
private static HttpClient createHttpClient(List<String> enabledProtocols = null) {
def sslContext = SslContextBuilder.forClient()
if (enabledProtocols != null) {
sslContext = sslContext.protocols(enabledProtocols)
}
def sslProvider = SslProvider.builder()
.sslContext(sslContext.build())
.build()
HttpClient.create().secure(sslProvider)
}
}

View File

@ -1,179 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.instrumentation.test.utils.PortUtils
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import reactor.netty.http.client.HttpClient
import spock.lang.Shared
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.SERVER
import static io.opentelemetry.api.trace.StatusCode.ERROR
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HttpFlavorValues.HTTP_1_1
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP
class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implements AgentTestTrait {
@Shared
private HttpClientTestServer server
def setupSpec() {
server = new HttpClientTestServer(openTelemetry)
server.start()
}
def cleanupSpec() {
server.stop()
}
def "test successful request"() {
given:
def httpClient = HttpClient.create()
def uri = "http://localhost:${server.httpPort()}/success"
when:
def responseCode =
runWithSpan("parent") {
httpClient.get().uri(uri)
.responseSingle { resp, content ->
// Make sure to consume content since that's when we close the span.
content.map { resp }
}
.block()
.status().code()
}
then:
responseCode == 200
assertTraces(1) {
trace(0, 5) {
def list = Arrays.asList("RESOLVE", "CONNECT")
spans.subList(2, 4).sort(Comparator.comparing { item -> list.indexOf(item.name) })
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "HTTP GET"
kind CLIENT
childOf span(0)
attributes {
"$SemanticAttributes.HTTP_METHOD" "GET"
"$SemanticAttributes.HTTP_URL" uri
"$SemanticAttributes.HTTP_FLAVOR" HTTP_1_1
"$SemanticAttributes.HTTP_STATUS_CODE" 200
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
"$SemanticAttributes.NET_PEER_NAME" "localhost"
"$SemanticAttributes.NET_PEER_PORT" server.httpPort()
"net.sock.peer.addr" "127.0.0.1"
}
}
span(2) {
name "RESOLVE"
kind INTERNAL
childOf span(1)
attributes {
"$SemanticAttributes.NET_TRANSPORT" IP_TCP
"$SemanticAttributes.NET_PEER_NAME" "localhost"
"$SemanticAttributes.NET_PEER_PORT" server.httpPort()
}
}
span(3) {
name "CONNECT"
kind INTERNAL
childOf span(1)
attributes {
"$SemanticAttributes.NET_TRANSPORT" IP_TCP
"$SemanticAttributes.NET_PEER_NAME" "localhost"
"$SemanticAttributes.NET_PEER_PORT" server.httpPort()
"net.sock.peer.addr" "127.0.0.1"
}
}
span(4) {
name "test-http-server"
kind SERVER
childOf span(1)
}
}
}
}
def "test failing request"() {
given:
def httpClient = HttpClient.create()
def uri = "http://localhost:${PortUtils.UNUSABLE_PORT}"
when:
runWithSpan("parent") {
httpClient.get().uri(uri)
.responseSingle { resp, content ->
// Make sure to consume content since that's when we close the span.
content.map { resp }
}
.block()
.status().code()
}
then:
def thrownException = thrown(Exception)
def connectException = thrownException.getCause()
and:
assertTraces(1) {
trace(0, 4) {
def list = Arrays.asList("RESOLVE", "CONNECT")
spans.subList(2, 4).sort(Comparator.comparing { item -> list.indexOf(item.name) })
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
status ERROR
errorEvent(thrownException.class, thrownException.message)
}
span(1) {
name "HTTP GET"
kind CLIENT
childOf span(0)
status ERROR
errorEvent(connectException.class, connectException.message)
attributes {
"$SemanticAttributes.HTTP_METHOD" "GET"
"$SemanticAttributes.HTTP_URL" uri
}
}
span(2) {
name "RESOLVE"
kind INTERNAL
childOf span(1)
attributes {
"$SemanticAttributes.NET_TRANSPORT" IP_TCP
"$SemanticAttributes.NET_PEER_NAME" "localhost"
"$SemanticAttributes.NET_PEER_PORT" PortUtils.UNUSABLE_PORT
}
}
span(3) {
name "CONNECT"
kind INTERNAL
childOf span(1)
status ERROR
errorEvent(connectException.class, connectException.message)
attributes {
"$SemanticAttributes.NET_TRANSPORT" IP_TCP
"$SemanticAttributes.NET_PEER_NAME" "localhost"
"$SemanticAttributes.NET_PEER_PORT" PortUtils.UNUSABLE_PORT
"net.sock.peer.addr" "127.0.0.1"
}
}
}
}
}
}

View File

@ -1,52 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0
import io.netty.channel.ChannelOption
import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection
import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames
import reactor.netty.http.client.HttpClient
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeoutException
class ReactorNettyHttpClientTest extends AbstractReactorNettyHttpClientTest {
HttpClient createHttpClient() {
return HttpClient.create()
.tcpConfiguration({ tcpClient ->
tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS)
})
.resolver(getAddressResolverGroup())
.headers({ headers -> headers.set(HttpHeaderNames.USER_AGENT, userAgent()) })
}
@Override
SingleConnection createSingleConnection(String host, int port) {
def httpClient = HttpClient
.newConnection()
.host(host)
.port(port)
.headers({ headers -> headers.set(HttpHeaderNames.USER_AGENT, userAgent()) })
return new SingleConnection() {
@Override
int doRequest(String path, Map<String, String> headers) throws ExecutionException, InterruptedException, TimeoutException {
return httpClient
.headers({ h -> headers.each { k, v -> h.add(k, v) } })
.get()
.uri(path)
.responseSingle { resp, content ->
// Make sure to consume content since that's when we close the span.
content.map { resp }
}
.block()
.status().code()
}
}
}
}

View File

@ -1,23 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0
import io.netty.channel.ChannelOption
import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames
import reactor.netty.http.client.HttpClient
import reactor.netty.tcp.TcpClient
class ReactorNettyHttpClientUsingFromTest extends AbstractReactorNettyHttpClientTest {
HttpClient createHttpClient() {
return HttpClient.from(TcpClient.create())
.tcpConfiguration({ tcpClient ->
tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS)
})
.resolver(getAddressResolverGroup())
.headers({ headers -> headers.set(HttpHeaderNames.USER_AGENT, userAgent()) })
}
}

View File

@ -0,0 +1,268 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import static io.opentelemetry.api.trace.SpanKind.CLIENT;
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
import static io.opentelemetry.api.trace.SpanKind.SERVER;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static java.util.Collections.emptySet;
import static org.assertj.core.api.Assertions.catchThrowable;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.resolver.AddressResolverGroup;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.instrumentation.test.utils.PortUtils;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.netty.http.client.HttpClient;
abstract class AbstractReactorNettyHttpClientTest
extends AbstractHttpClientTest<HttpClient.ResponseReceiver<?>> {
static final String USER_AGENT = "ReactorNetty";
@RegisterExtension
static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent();
protected abstract HttpClient createHttpClient();
protected AddressResolverGroup<InetSocketAddress> getAddressResolverGroup() {
return CustomNameResolverGroup.INSTANCE;
}
@Override
public HttpClient.ResponseReceiver<?> buildRequest(
String method, URI uri, Map<String, String> headers) {
HttpClient client =
createHttpClient()
.followRedirect(true)
.headers(h -> headers.forEach(h::add))
.baseUrl(resolveAddress("").toString());
if (uri.toString().contains("/read-timeout")) {
client = client.responseTimeout(READ_TIMEOUT);
}
return client.request(HttpMethod.valueOf(method)).uri(uri.toString());
}
@Override
public int sendRequest(
HttpClient.ResponseReceiver<?> request, String method, URI uri, Map<String, String> headers) {
return request
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the span.
return content.map(unused -> resp);
})
.block()
.status()
.code();
}
@Override
public void sendRequestWithCallback(
HttpClient.ResponseReceiver<?> request,
String method,
URI uri,
Map<String, String> headers,
AbstractHttpClientTest.RequestResult requestResult) {
request
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the span.
return content.map(unused -> resp);
})
.subscribe(
response -> requestResult.complete(response.status().code()), requestResult::complete);
}
@Override
protected void configure(HttpClientTestOptions options) {
options.disableTestRedirects();
options.enableTestReadTimeout();
options.setUserAgent(USER_AGENT);
options.setClientSpanErrorMapper(
(uri, exception) -> {
if (exception.getClass().getName().endsWith("ReactiveException")) {
// unopened port or non routable address
if ("http://localhost:61/".equals(uri.toString())
|| "https://192.0.2.1/".equals(uri.toString())) {
exception = exception.getCause();
}
}
return exception;
});
options.setHttpAttributes(
uri -> {
// unopened port or non routable address
if ("http://localhost:61/".equals(uri.toString())
|| "https://192.0.2.1/".equals(uri.toString())) {
return emptySet();
}
Set<AttributeKey<?>> attributes =
new HashSet<>(HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES);
if (uri.toString().contains("/read-timeout")) {
attributes.remove(SemanticAttributes.NET_PEER_NAME);
attributes.remove(SemanticAttributes.NET_PEER_PORT);
attributes.remove(SemanticAttributes.HTTP_FLAVOR);
}
return attributes;
});
}
@Test
void shouldExposeContextToHttpClientCallbacks() throws InterruptedException {
AtomicReference<Span> onRequestSpan = new AtomicReference<>();
AtomicReference<Span> afterRequestSpan = new AtomicReference<>();
AtomicReference<Span> onResponseSpan = new AtomicReference<>();
AtomicReference<Span> afterResponseSpan = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
HttpClient httpClient =
createHttpClient()
.doOnRequest((rq, con) -> onRequestSpan.set(Span.current()))
.doAfterRequest((rq, con) -> afterRequestSpan.set(Span.current()))
.doOnResponse((rs, con) -> onResponseSpan.set(Span.current()))
.doAfterResponseSuccess(
(rs, con) -> {
afterResponseSpan.set(Span.current());
latch.countDown();
});
testing.runWithSpan(
"parent",
() ->
httpClient
.baseUrl(resolveAddress("").toString())
.get()
.uri("/success")
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the span.
return content.map(unused -> resp);
})
.block());
latch.await(10, TimeUnit.SECONDS);
testing.waitAndAssertTraces(
trace -> {
SpanData parentSpan = trace.getSpan(0);
SpanData nettyClientSpan = trace.getSpan(1);
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(INTERNAL).hasNoParent(),
span -> span.hasName("HTTP GET").hasKind(CLIENT).hasParent(parentSpan),
span -> span.hasName("test-http-server").hasKind(SERVER).hasParent(nettyClientSpan));
assertSameSpan(parentSpan, onRequestSpan);
assertSameSpan(nettyClientSpan, afterRequestSpan);
assertSameSpan(nettyClientSpan, onResponseSpan);
assertSameSpan(parentSpan, afterResponseSpan);
});
}
@Test
void shouldExposeContextToHttpRequestErrorCallback() {
AtomicReference<Span> onRequestErrorSpan = new AtomicReference<>();
HttpClient httpClient =
createHttpClient().doOnRequestError((rq, err) -> onRequestErrorSpan.set(Span.current()));
Throwable thrown =
catchThrowable(
() ->
testing.runWithSpan(
"parent",
() ->
httpClient
.get()
.uri("http://localhost:" + PortUtils.UNUSABLE_PORT + "/")
.response()
.block()));
testing.waitAndAssertTraces(
trace -> {
SpanData parentSpan = trace.getSpan(0);
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("parent")
.hasKind(INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(thrown),
span ->
span.hasKind(CLIENT)
.hasParent(parentSpan)
.hasStatus(StatusData.error())
.hasException(thrown.getCause()));
assertSameSpan(parentSpan, onRequestErrorSpan);
});
}
@Test
void shouldNotLeakConnections() {
HashSet<Integer> uniqueChannelHashes = new HashSet<>();
HttpClient httpClient =
createHttpClient().doOnConnect(config -> uniqueChannelHashes.add(config.channelHash()));
int count = 100;
IntStream.range(0, count)
.forEach(
i ->
testing.runWithSpan(
"parent",
() -> {
int status =
httpClient
.get()
.uri(resolveAddress("/success"))
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the
// span.
return content.map(unused -> resp);
})
.block()
.status()
.code();
assertThat(status).isEqualTo(200);
}));
testing.waitForTraces(count);
assertThat(uniqueChannelHashes).hasSize(1);
}
private static void assertSameSpan(SpanData expected, AtomicReference<Span> actual) {
SpanContext expectedSpanContext = expected.getSpanContext();
SpanContext actualSpanContext = actual.get().getSpanContext();
assertThat(actualSpanContext.getTraceId()).isEqualTo(expectedSpanContext.getTraceId());
assertThat(actualSpanContext.getSpanId()).isEqualTo(expectedSpanContext.getSpanId());
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import static java.util.Collections.singletonList;
import io.netty.resolver.AddressResolver;
import io.netty.resolver.AddressResolverGroup;
import io.netty.resolver.InetNameResolver;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Promise;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.List;
public class CustomNameResolverGroup extends AddressResolverGroup<InetSocketAddress> {
public static final CustomNameResolverGroup INSTANCE = new CustomNameResolverGroup();
private CustomNameResolverGroup() {}
@Override
protected AddressResolver<InetSocketAddress> newResolver(EventExecutor executor) {
return new CustomNameResolver(executor).asAddressResolver();
}
private static class CustomNameResolver extends InetNameResolver {
private CustomNameResolver(EventExecutor executor) {
super(executor);
}
@Override
protected void doResolve(String inetHost, Promise<InetAddress> promise) {
try {
promise.setSuccess(InetAddress.getByName(inetHost));
} catch (UnknownHostException exception) {
promise.setFailure(exception);
}
}
@Override
protected void doResolveAll(String inetHost, Promise<List<InetAddress>> promise) {
try {
// default implementation calls InetAddress.getAllByName
promise.setSuccess(singletonList(InetAddress.getByName(inetHost)));
} catch (UnknownHostException exception) {
promise.setFailure(exception);
}
}
}
}

View File

@ -0,0 +1,228 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.api.trace.SpanKind.CLIENT;
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
import static io.opentelemetry.api.trace.SpanKind.SERVER;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HttpFlavorValues.HTTP_1_1;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP;
import static org.assertj.core.api.Assertions.catchThrowable;
import io.netty.handler.ssl.SslContextBuilder;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer;
import io.opentelemetry.sdk.trace.data.EventData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.List;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
import org.assertj.core.api.AbstractLongAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientResponse;
import reactor.netty.tcp.SslProvider;
class ReactorNettyClientSslTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
static HttpClientTestServer server;
@BeforeAll
static void setUp() {
server = new HttpClientTestServer(testing.getOpenTelemetry());
server.start();
}
@AfterAll
static void tearDown() {
server.stop();
}
@Test
void shouldFailSslHandshake() throws SSLException {
HttpClient httpClient = createHttpClient("SSLv3");
String uri = "https://localhost:" + server.httpsPort() + "/success";
Mono<HttpClientResponse> responseMono =
httpClient
.get()
.uri(uri)
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the span.
return content.map(unused -> resp);
});
Throwable thrown =
catchThrowable(() -> testing.runWithSpan("parent", () -> responseMono.block()));
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactlyInAnyOrder(
span ->
span.hasName("parent")
.hasKind(INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(thrown),
span ->
span.hasName("HTTP GET")
.hasKind(CLIENT)
.hasParent(trace.getSpan(0))
.hasStatus(StatusData.error())
// netty swallows the exception, it doesn't make any sense to hard-code the
// message
.hasEventsSatisfying(ReactorNettyClientSslTest::isSslHandshakeException)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.HTTP_METHOD, "GET"),
equalTo(SemanticAttributes.HTTP_URL, uri)),
span ->
span.hasName("RESOLVE")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort())),
span ->
span.hasName("CONNECT")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort()),
equalTo(stringKey("net.sock.peer.addr"), "127.0.0.1")),
span ->
span.hasName("SSL handshake")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasStatus(StatusData.error())
// netty swallows the exception, it doesn't make any sense to hard-code the
// message
.hasEventsSatisfying(ReactorNettyClientSslTest::isSslHandshakeException)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort()),
equalTo(stringKey("net.sock.peer.addr"), "127.0.0.1"))));
}
@Test
void shouldSuccessfullyEstablishSslHandshake() throws SSLException {
HttpClient httpClient = createHttpClient();
String uri = "https://localhost:" + server.httpsPort() + "/success";
Mono<HttpClientResponse> responseMono =
httpClient
.get()
.uri(uri)
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the span.
return content.map(unused -> resp);
});
testing.runWithSpan("parent", () -> responseMono.block());
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactlyInAnyOrder(
span -> span.hasName("parent").hasKind(INTERNAL).hasNoParent(),
span ->
span.hasName("HTTP GET")
.hasKind(CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.HTTP_METHOD, "GET"),
equalTo(SemanticAttributes.HTTP_URL, uri),
equalTo(SemanticAttributes.HTTP_FLAVOR, HTTP_1_1),
equalTo(SemanticAttributes.HTTP_STATUS_CODE, 200),
satisfies(
SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH,
AbstractLongAssert::isNotNegative),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort()),
equalTo(stringKey("net.sock.peer.addr"), "127.0.0.1")),
span ->
span.hasName("RESOLVE")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort())),
span ->
span.hasName("CONNECT")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort()),
equalTo(stringKey("net.sock.peer.addr"), "127.0.0.1")),
span ->
span.hasName("SSL handshake")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort()),
equalTo(stringKey("net.sock.peer.addr"), "127.0.0.1")),
span ->
span.hasName("test-http-server").hasKind(SERVER).hasParent(trace.getSpan(1))));
}
private static HttpClient createHttpClient() throws SSLException {
return ReactorNettyClientSslTest.createHttpClient(null);
}
private static HttpClient createHttpClient(@Nullable String enabledProtocol) throws SSLException {
SslContextBuilder sslContext = SslContextBuilder.forClient();
if (enabledProtocol != null) {
sslContext = sslContext.protocols(enabledProtocol);
}
SslProvider sslProvider = SslProvider.builder().sslContext(sslContext.build()).build();
return HttpClient.create().secure(sslProvider);
}
private static void isSslHandshakeException(List<? extends EventData> events) {
assertThat(events)
.filteredOn(event -> event.getName().equals(SemanticAttributes.EXCEPTION_EVENT_NAME))
.satisfiesExactly(
event ->
assertThat(event)
.hasAttributesSatisfying(
attributes ->
assertThat(attributes)
.hasSize(3)
.containsEntry(
SemanticAttributes.EXCEPTION_TYPE,
SSLHandshakeException.class.getCanonicalName())
.hasEntrySatisfying(
SemanticAttributes.EXCEPTION_MESSAGE,
s -> assertThat(s).isNotEmpty())
.hasEntrySatisfying(
SemanticAttributes.EXCEPTION_STACKTRACE,
s -> assertThat(s).isNotEmpty())));
}
}

View File

@ -0,0 +1,177 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.api.trace.SpanKind.CLIENT;
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
import static io.opentelemetry.api.trace.SpanKind.SERVER;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HttpFlavorValues.HTTP_1_1;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import io.opentelemetry.instrumentation.test.utils.PortUtils;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import org.assertj.core.api.AbstractLongAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.netty.http.client.HttpClient;
class ReactorNettyConnectionSpanTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
static HttpClientTestServer server;
@BeforeAll
static void setUp() {
server = new HttpClientTestServer(testing.getOpenTelemetry());
server.start();
}
@AfterAll
static void tearDown() {
server.stop();
}
@Test
void testSuccessfulRequest() {
HttpClient httpClient = HttpClient.create();
String uri = "http://localhost:" + server.httpPort() + "/success";
int responseCode =
testing.runWithSpan(
"parent",
() ->
httpClient
.get()
.uri(uri)
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the span.
return content.map(unused -> resp);
})
.block()
.status()
.code());
assertThat(responseCode).isEqualTo(200);
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactlyInAnyOrder(
span -> span.hasName("parent").hasKind(INTERNAL).hasNoParent(),
span ->
span.hasName("HTTP GET")
.hasKind(CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.HTTP_METHOD, "GET"),
equalTo(SemanticAttributes.HTTP_URL, uri),
equalTo(SemanticAttributes.HTTP_FLAVOR, HTTP_1_1),
equalTo(SemanticAttributes.HTTP_STATUS_CODE, 200),
satisfies(
SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH,
AbstractLongAssert::isNotNegative),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpPort()),
equalTo(stringKey("net.sock.peer.addr"), "127.0.0.1")),
span ->
span.hasName("RESOLVE")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpPort())),
span ->
span.hasName("CONNECT")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpPort()),
equalTo(stringKey("net.sock.peer.addr"), "127.0.0.1")),
span ->
span.hasName("test-http-server").hasKind(SERVER).hasParent(trace.getSpan(1))));
}
@Test
void testFailingRequest() {
HttpClient httpClient = HttpClient.create();
String uri = "http://localhost:" + PortUtils.UNUSABLE_PORT;
Throwable thrown =
catchThrowable(
() ->
testing.runWithSpan(
"parent",
() ->
httpClient
.get()
.uri(uri)
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the
// span.
return content.map(unused -> resp);
})
.block()
.status()
.code()));
Throwable connectException = thrown.getCause();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactlyInAnyOrder(
span ->
span.hasName("parent")
.hasKind(INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(thrown),
span ->
span.hasName("HTTP GET")
.hasKind(CLIENT)
.hasParent(trace.getSpan(0))
.hasStatus(StatusData.error())
.hasException(connectException)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.HTTP_METHOD, "GET"),
equalTo(SemanticAttributes.HTTP_URL, uri)),
span ->
span.hasName("RESOLVE")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, PortUtils.UNUSABLE_PORT)),
span ->
span.hasName("CONNECT")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasStatus(StatusData.error())
.hasException(connectException)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, PortUtils.UNUSABLE_PORT),
equalTo(stringKey("net.sock.peer.addr"), "127.0.0.1"))));
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import io.netty.channel.ChannelOption;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames;
import reactor.netty.http.client.HttpClient;
class ReactorNettyHttpClientTest extends AbstractReactorNettyHttpClientTest {
@Override
protected HttpClient createHttpClient() {
int connectionTimeoutMillis = (int) CONNECTION_TIMEOUT.toMillis();
return HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutMillis)
.resolver(getAddressResolverGroup())
.headers(headers -> headers.set(HttpHeaderNames.USER_AGENT, USER_AGENT));
}
@Override
protected void configure(HttpClientTestOptions options) {
super.configure(options);
options.setSingleConnectionFactory(
(host, port) -> {
HttpClient httpClient =
HttpClient.newConnection()
.host(host)
.port(port)
.headers(headers -> headers.set(HttpHeaderNames.USER_AGENT, USER_AGENT));
return (path, headers) ->
httpClient
.headers(h -> headers.forEach(h::add))
.get()
.uri(path)
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the span.
return content.map(unused -> resp);
})
.block()
.status()
.code();
});
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import io.netty.channel.ChannelOption;
import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.TcpClient;
class ReactorNettyHttpClientUsingFromTest extends AbstractReactorNettyHttpClientTest {
@SuppressWarnings("deprecation") // from(TcpClient) is deprecated, but we want to test it anyway
@Override
protected HttpClient createHttpClient() {
int connectionTimeoutMillis = (int) CONNECTION_TIMEOUT.toMillis();
return HttpClient.from(TcpClient.create())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutMillis)
.resolver(getAddressResolverGroup())
.headers(headers -> headers.set(HttpHeaderNames.USER_AGENT, USER_AGENT));
}
}

View File

@ -0,0 +1,79 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import static io.opentelemetry.api.trace.SpanKind.CLIENT;
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
import static io.opentelemetry.api.trace.SpanKind.SERVER;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer;
import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.test.StepVerifier;
class ReactorNettyWithSpanTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
static HttpClientTestServer server;
@BeforeAll
static void setUp() {
server = new HttpClientTestServer(testing.getOpenTelemetry());
server.start();
}
@AfterAll
static void tearDown() {
server.stop();
}
@Test
public void testSuccessfulNestedUnderWithSpan() {
HttpClient httpClient = HttpClient.create();
Mono<Integer> httpRequest =
Mono.defer(
() ->
httpClient
.get()
.uri("http://localhost:" + server.httpPort() + "/success")
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the span.
return content.map(unused -> resp);
})
.map(r -> r.status().code()));
Mono<Integer> getResponse =
new TracedWithSpan()
.mono(
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4348
// our HTTP server is synchronous, i.e. it returns Mono.just with response
// which is not supported by TracingSubscriber - it does not instrument scalar calls
// so we delay here to fake async http request and let Reactor context
// instrumentation work
Mono.delay(Duration.ofMillis(1)).then(httpRequest));
StepVerifier.create(getResponse).expectNext(200).expectComplete().verify();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("TracedWithSpan.mono").hasKind(INTERNAL).hasNoParent(),
span -> span.hasName("HTTP GET").hasKind(CLIENT).hasParent(trace.getSpan(0)),
span ->
span.hasName("test-http-server").hasKind(SERVER).hasParent(trace.getSpan(1))));
}
}

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.test.reactor.netty;
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import reactor.core.publisher.Mono;