Migrate reactor-netty-0.9 tests to java (#7635)

Part of #7195
This commit is contained in:
Mateusz Rzeszutek 2023-01-23 18:34:11 +01:00 committed by GitHub
parent e1895e548c
commit 7657b75ddc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 494 additions and 453 deletions

View File

@ -17,9 +17,12 @@ import reactor.netty.http.client.HttpClientResponse;
public final class DecoratorFunctions { public final class DecoratorFunctions {
// ignore our own callbacks - or already decorated functions // ignore already decorated functions
public static boolean shouldDecorate(Class<?> callbackClass) { public static boolean shouldDecorate(Class<?> callbackClass) {
return !callbackClass.getName().startsWith("io.opentelemetry.javaagent"); return callbackClass != OnRequestDecorator.class
&& callbackClass != OnResponseDecorator.class
&& callbackClass != OnRequestErrorDecorator.class
&& callbackClass != OnResponseErrorDecorator.class;
} }
private abstract static class OnMessageDecorator<M> implements BiConsumer<M, Connection> { private abstract static class OnMessageDecorator<M> implements BiConsumer<M, Connection> {

View File

@ -1,215 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import reactor.netty.http.client.HttpClient
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference
import static io.opentelemetry.instrumentation.test.utils.PortUtils.UNUSABLE_PORT
abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest<HttpClient.ResponseReceiver> implements AgentTestTrait {
@Override
boolean testRedirects() {
false
}
@Override
boolean testReadTimeout() {
true
}
@Override
String userAgent() {
return "ReactorNetty"
}
@Override
HttpClient.ResponseReceiver buildRequest(String method, URI uri, Map<String, String> headers) {
def readTimeout = uri.toString().contains("/read-timeout")
return createHttpClient(readTimeout)
.followRedirect(true)
.headers({ h -> headers.each { k, v -> h.add(k, v) } })
.baseUrl(resolveAddress("").toString())
."${method.toLowerCase()}"()
.uri(uri.toString())
}
@Override
int sendRequest(HttpClient.ResponseReceiver request, String method, URI uri, Map<String, String> headers) {
return request.responseSingle { resp, content ->
// Make sure to consume content since that's when we close the span.
content.map {
resp
}
}.block().status().code()
}
@Override
void sendRequestWithCallback(HttpClient.ResponseReceiver request, String method, URI uri, Map<String, String> headers, HttpClientResult requestResult) {
request.responseSingle { resp, content ->
// Make sure to consume content since that's when we close the span.
content.map { resp }
}.subscribe({
requestResult.complete(it.status().code())
}, { throwable ->
requestResult.complete(throwable)
})
}
@Override
String expectedClientSpanName(URI uri, String method) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "https://192.0.2.1/": // non routable address
return "CONNECT"
default:
return super.expectedClientSpanName(uri, method)
}
}
@Override
Throwable clientSpanError(URI uri, Throwable exception) {
if (exception.class.getName().endsWith("ReactiveException")) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "https://192.0.2.1/": // non routable address
exception = exception.getCause()
}
}
return exception
}
@Override
Set<AttributeKey<?>> httpAttributes(URI uri) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "https://192.0.2.1/": // non routable address
return []
}
def attributes = super.httpAttributes(uri)
attributes.remove(SemanticAttributes.NET_PEER_NAME)
attributes.remove(SemanticAttributes.NET_PEER_PORT)
return attributes
}
abstract HttpClient createHttpClient(boolean readTimeout)
def "should expose context to http client callbacks"() {
given:
def onRequestSpan = new AtomicReference<Span>()
def afterRequestSpan = new AtomicReference<Span>()
def onResponseSpan = new AtomicReference<Span>()
def afterResponseSpan = new AtomicReference<Span>()
def latch = new CountDownLatch(1)
def httpClient = createHttpClient(false)
.doOnRequest({ rq, con -> onRequestSpan.set(Span.current()) })
.doAfterRequest({ rq, con -> afterRequestSpan.set(Span.current()) })
.doOnResponse({ rs, con -> onResponseSpan.set(Span.current()) })
.doAfterResponse({ rs, con ->
afterResponseSpan.set(Span.current())
latch.countDown()
})
when:
runWithSpan("parent") {
httpClient.baseUrl(resolveAddress("").toString())
.get()
.uri("/success")
.responseSingle { resp, content ->
// Make sure to consume content since that's when we close the span.
content.map { resp }
}
.block()
}
latch.await()
then:
assertTraces(1) {
trace(0, 3) {
def parentSpan = span(0)
def nettyClientSpan = span(1)
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
}
clientSpan(it, 1, parentSpan, "GET", resolveAddress("/success"))
serverSpan(it, 2, nettyClientSpan)
assertSameSpan(parentSpan, onRequestSpan)
assertSameSpan(nettyClientSpan, afterRequestSpan)
assertSameSpan(nettyClientSpan, onResponseSpan)
assertSameSpan(parentSpan, afterResponseSpan)
}
}
}
def "should expose context to http request error callback"() {
given:
def onRequestErrorSpan = new AtomicReference<Span>()
def httpClient = createHttpClient(false)
.doOnRequestError({ rq, err -> onRequestErrorSpan.set(Span.current()) })
when:
runWithSpan("parent") {
httpClient.get()
.uri("http://localhost:$UNUSABLE_PORT/")
.responseSingle { resp, content ->
// Make sure to consume content since that's when we close the span.
content.map { resp }
}
.block()
}
then:
def ex = thrown(Exception)
assertTraces(1) {
trace(0, 2) {
def parentSpan = span(0)
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
status StatusCode.ERROR
errorEvent(ex.class, ex.message)
}
span(1) {
def actualException = ex.cause
kind SpanKind.CLIENT
childOf parentSpan
status StatusCode.ERROR
errorEvent(actualException.class, actualException.message)
}
assertSameSpan(parentSpan, onRequestErrorSpan)
}
}
}
static void assertSameSpan(SpanData expected, AtomicReference<Span> actual) {
def expectedSpanContext = expected.spanContext
def actualSpanContext = actual.get().spanContext
assert expectedSpanContext.traceId == actualSpanContext.traceId
assert expectedSpanContext.spanId == actualSpanContext.spanId
}
}

View File

@ -1,151 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.instrumentation.test.utils.PortUtils
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import reactor.netty.http.client.HttpClient
import spock.lang.Shared
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.SERVER
import static io.opentelemetry.api.trace.StatusCode.ERROR
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP
class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implements AgentTestTrait {
@Shared
private HttpClientTestServer server
def setupSpec() {
server = new HttpClientTestServer(openTelemetry)
server.start()
}
def cleanupSpec() {
server.stop()
}
def "test successful request"() {
when:
def httpClient = HttpClient.create()
def responseCode =
runWithSpan("parent") {
httpClient.get().uri("http://localhost:${server.httpPort()}/success")
.responseSingle { resp, content ->
// Make sure to consume content since that's when we close the span.
content.map { resp }
}
.block()
.status().code()
}
then:
responseCode == 200
assertTraces(1) {
trace(0, 5) {
def list = Arrays.asList("RESOLVE", "CONNECT")
spans.subList(1, 3).sort(Comparator.comparing { item -> list.indexOf(item.name) })
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "RESOLVE"
kind INTERNAL
childOf span(0)
attributes {
"$SemanticAttributes.NET_TRANSPORT" IP_TCP
"$SemanticAttributes.NET_PEER_NAME" "localhost"
"$SemanticAttributes.NET_PEER_PORT" server.httpPort()
}
}
span(2) {
name "CONNECT"
kind INTERNAL
childOf(span(0))
attributes {
"$SemanticAttributes.NET_TRANSPORT" IP_TCP
"$SemanticAttributes.NET_PEER_NAME" "localhost"
"$SemanticAttributes.NET_PEER_PORT" server.httpPort()
"$SemanticAttributes.NET_SOCK_PEER_ADDR" "127.0.0.1"
}
}
span(3) {
name "HTTP GET"
kind CLIENT
childOf(span(0))
}
span(4) {
name "test-http-server"
kind SERVER
childOf(span(3))
}
}
}
}
def "test failing request"() {
when:
def httpClient = HttpClient.create()
runWithSpan("parent") {
httpClient.get().uri("http://localhost:${PortUtils.UNUSABLE_PORT}")
.responseSingle { resp, content ->
// Make sure to consume content since that's when we close the span.
content.map { resp }
}
.block()
.status().code()
}
then:
def thrownException = thrown(Exception)
def connectException = thrownException.getCause()
and:
assertTraces(1) {
trace(0, 3) {
def list = Arrays.asList("RESOLVE", "CONNECT")
spans.subList(1, 3).sort(Comparator.comparing { item -> list.indexOf(item.name) })
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
status ERROR
errorEvent(thrownException.class, thrownException.message)
}
span(1) {
name "RESOLVE"
kind INTERNAL
childOf span(0)
attributes {
"$SemanticAttributes.NET_TRANSPORT" IP_TCP
"$SemanticAttributes.NET_PEER_NAME" "localhost"
"$SemanticAttributes.NET_PEER_PORT" PortUtils.UNUSABLE_PORT
}
}
span(2) {
name "CONNECT"
kind INTERNAL
childOf(span(0))
status ERROR
errorEvent(connectException.class, connectException.message)
attributes {
"$SemanticAttributes.NET_TRANSPORT" IP_TCP
"$SemanticAttributes.NET_PEER_NAME" "localhost"
"$SemanticAttributes.NET_PEER_PORT" PortUtils.UNUSABLE_PORT
"$SemanticAttributes.NET_SOCK_PEER_ADDR" { it == "127.0.0.1" || it == null }
}
}
}
}
}
}

View File

@ -1,59 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9
import io.netty.channel.ChannelOption
import io.netty.handler.timeout.ReadTimeoutHandler
import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection
import java.util.concurrent.TimeUnit
import reactor.netty.http.client.HttpClient
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeoutException
class ReactorNettyHttpClientTest extends AbstractReactorNettyHttpClientTest {
HttpClient createHttpClient(boolean readTimeout) {
return HttpClient.create().tcpConfiguration({ tcpClient ->
if (readTimeout) {
tcpClient = tcpClient.doOnConnected({ connection ->
connection.addHandlerLast(new ReadTimeoutHandler(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS))
})
}
tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS)
})
}
@Override
SingleConnection createSingleConnection(String host, int port) {
String url
try {
url = new URL("http", host, port, "").toString()
} catch (MalformedURLException e) {
throw new ExecutionException(e)
}
def httpClient = HttpClient
.newConnection()
.baseUrl(url)
return new SingleConnection() {
@Override
int doRequest(String path, Map<String, String> headers) throws ExecutionException, InterruptedException, TimeoutException {
return httpClient
.headers({ h -> headers.each { k, v -> h.add(k, v) } })
.get()
.uri(path)
.responseSingle { resp, content ->
// Make sure to consume content since that's when we close the span.
content.map { resp }
}
.block().status().code()
}
}
}
}

View File

@ -1,26 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9
import io.netty.channel.ChannelOption
import io.netty.handler.timeout.ReadTimeoutHandler
import java.util.concurrent.TimeUnit
import reactor.netty.http.client.HttpClient
import reactor.netty.tcp.TcpClient
class ReactorNettyHttpClientUsingFromTest extends AbstractReactorNettyHttpClientTest {
HttpClient createHttpClient(boolean readTimeout) {
return HttpClient.from(TcpClient.create()).tcpConfiguration({ tcpClient ->
if (readTimeout) {
tcpClient = tcpClient.doOnConnected({ connection ->
connection.addHandlerLast(new ReadTimeoutHandler(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS))
})
}
tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS)
})
}
}

View File

@ -0,0 +1,242 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9;
import static io.opentelemetry.api.trace.SpanKind.CLIENT;
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
import static io.opentelemetry.api.trace.SpanKind.SERVER;
import static java.util.Collections.emptySet;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import io.netty.handler.codec.http.HttpMethod;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.net.URI;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.netty.http.client.HttpClient;
abstract class AbstractReactorNettyHttpClientTest
extends AbstractHttpClientTest<HttpClient.ResponseReceiver<?>> {
@RegisterExtension
static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent();
static final String USER_AGENT = "ReactorNetty";
abstract HttpClient createHttpClient(boolean readTimeout);
@Override
public HttpClient.ResponseReceiver<?> buildRequest(
String method, URI uri, Map<String, String> headers) {
boolean readTimeout = uri.toString().contains("/read-timeout");
return createHttpClient(readTimeout)
.followRedirect(true)
.headers(h -> headers.forEach(h::add))
.baseUrl(resolveAddress("").toString())
.request(HttpMethod.valueOf(method))
.uri(uri.toString());
}
@Override
public int sendRequest(
HttpClient.ResponseReceiver<?> request, String method, URI uri, Map<String, String> headers) {
return request
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the span.
return content.map(unused -> resp);
})
.block()
.status()
.code();
}
@Override
public void sendRequestWithCallback(
HttpClient.ResponseReceiver<?> request,
String method,
URI uri,
Map<String, String> headers,
HttpClientResult httpClientResult) {
request
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the span.
return content.map(unused -> resp);
})
.subscribe(
response -> httpClientResult.complete(response.status().code()),
httpClientResult::complete);
}
@Override
protected void configure(HttpClientTestOptions options) {
options.disableTestRedirects();
options.enableTestReadTimeout();
options.setUserAgent(USER_AGENT);
options.setExpectedClientSpanNameMapper(
(uri, method) -> {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "https://192.0.2.1/": // non routable address
return "CONNECT";
default:
return HttpClientTestOptions.DEFAULT_EXPECTED_CLIENT_SPAN_NAME_MAPPER.apply(
uri, method);
}
});
options.setClientSpanErrorMapper(
(uri, exception) -> {
if (exception.getClass().getName().endsWith("ReactiveException")) {
// unopened port or non routable address
if ("http://localhost:61/".equals(uri.toString())
|| "https://192.0.2.1/".equals(uri.toString())) {
exception = exception.getCause();
}
}
return exception;
});
options.setHttpAttributes(
uri -> {
// unopened port or non routable address
if ("http://localhost:61/".equals(uri.toString())
|| "https://192.0.2.1/".equals(uri.toString())) {
return emptySet();
}
Set<AttributeKey<?>> attributes =
new HashSet<>(HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES);
attributes.remove(SemanticAttributes.NET_PEER_NAME);
attributes.remove(SemanticAttributes.NET_PEER_PORT);
return attributes;
});
}
@Test
void shouldExposeContextToHttpClientCallbacks() throws InterruptedException {
AtomicReference<Span> onRequestSpan = new AtomicReference<>();
AtomicReference<Span> afterRequestSpan = new AtomicReference<>();
AtomicReference<Span> onResponseSpan = new AtomicReference<>();
AtomicReference<Span> afterResponseSpan = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
HttpClient httpClient =
createHttpClient(false)
.doOnRequest((rq, con) -> onRequestSpan.set(Span.current()))
.doAfterRequest((rq, con) -> afterRequestSpan.set(Span.current()))
.doOnResponse((rs, con) -> onResponseSpan.set(Span.current()))
.doAfterResponse(
(rs, con) -> {
afterResponseSpan.set(Span.current());
latch.countDown();
});
testing.runWithSpan(
"parent",
() ->
httpClient
.baseUrl(resolveAddress("").toString())
.get()
.uri("/success")
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the span.
return content.map(unused -> resp);
})
.block());
latch.await(10, TimeUnit.SECONDS);
testing.waitAndAssertTraces(
trace -> {
SpanData parentSpan = trace.getSpan(0);
SpanData nettyClientSpan = trace.getSpan(1);
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(INTERNAL).hasNoParent(),
span -> span.hasName("HTTP GET").hasKind(CLIENT).hasParent(parentSpan),
span -> span.hasName("test-http-server").hasKind(SERVER).hasParent(nettyClientSpan));
assertSameSpan(parentSpan, onRequestSpan);
assertSameSpan(nettyClientSpan, afterRequestSpan);
assertSameSpan(nettyClientSpan, onResponseSpan);
assertSameSpan(parentSpan, afterResponseSpan);
});
}
@Test
void shouldExposeContextToHttpRequestErrorCallback() {
AtomicReference<Span> onRequestErrorSpan = new AtomicReference<>();
HttpClient httpClient =
createHttpClient(false)
.doOnRequestError((rq, err) -> onRequestErrorSpan.set(Span.current()));
Throwable thrown =
catchThrowable(
() ->
testing.runWithSpan(
"parent",
() ->
httpClient
.get()
.uri("http://localhost:$UNUSABLE_PORT/")
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the
// span.
return content.map(unused -> resp);
})
.block()));
testing.waitAndAssertTraces(
trace -> {
SpanData parentSpan = trace.getSpan(0);
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("parent")
.hasKind(INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(thrown),
span ->
span.hasKind(CLIENT)
.hasParent(parentSpan)
.hasStatus(StatusData.error())
.hasException(thrown.getCause()));
assertSameSpan(parentSpan, onRequestErrorSpan);
});
}
private static void assertSameSpan(SpanData expected, AtomicReference<Span> actual) {
SpanContext expectedSpanContext = expected.getSpanContext();
SpanContext actualSpanContext = actual.get().getSpanContext();
assertThat(actualSpanContext.getTraceId()).isEqualTo(expectedSpanContext.getTraceId());
assertThat(actualSpanContext.getSpanId()).isEqualTo(expectedSpanContext.getSpanId());
}
}

View File

@ -0,0 +1,153 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9;
import static io.opentelemetry.api.trace.SpanKind.CLIENT;
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
import static io.opentelemetry.api.trace.SpanKind.SERVER;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import io.opentelemetry.instrumentation.test.utils.PortUtils;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.netty.http.client.HttpClient;
class ReactorNettyConnectionSpanTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
static HttpClientTestServer server;
@BeforeAll
static void setUp() {
server = new HttpClientTestServer(testing.getOpenTelemetry());
server.start();
}
@AfterAll
static void tearDown() {
server.stop();
}
@Test
void testSuccessfulRequest() {
HttpClient httpClient = HttpClient.create();
String uri = "http://localhost:" + server.httpPort() + "/success";
int responseCode =
testing.runWithSpan(
"parent",
() ->
httpClient
.get()
.uri(uri)
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the span.
return content.map(unused -> resp);
})
.block()
.status()
.code());
assertThat(responseCode).isEqualTo(200);
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactlyInAnyOrder(
span -> span.hasName("parent").hasKind(INTERNAL).hasNoParent(),
span ->
span.hasName("RESOLVE")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpPort())),
span ->
span.hasName("CONNECT")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpPort()),
equalTo(SemanticAttributes.NET_SOCK_PEER_ADDR, "127.0.0.1")),
span -> span.hasName("HTTP GET").hasKind(CLIENT).hasParent(trace.getSpan(0)),
span ->
span.hasName("test-http-server").hasKind(SERVER).hasParent(trace.getSpan(3))));
}
@Test
void testFailingRequest() {
HttpClient httpClient = HttpClient.create();
String uri = "http://localhost:" + PortUtils.UNUSABLE_PORT;
Throwable thrown =
catchThrowable(
() ->
testing.runWithSpan(
"parent",
() ->
httpClient
.get()
.uri(uri)
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the
// span.
return content.map(unused -> resp);
})
.block()
.status()
.code()));
Throwable connectException = thrown.getCause();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactlyInAnyOrder(
span ->
span.hasName("parent")
.hasKind(INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(thrown),
span ->
span.hasName("RESOLVE")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, PortUtils.UNUSABLE_PORT)),
span ->
span.hasName("CONNECT")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(0))
.hasStatus(StatusData.error())
.hasException(connectException)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, PortUtils.UNUSABLE_PORT),
satisfies(
SemanticAttributes.NET_SOCK_PEER_ADDR,
val -> val.isIn(null, "127.0.0.1")))));
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.TimeUnit;
import reactor.netty.http.client.HttpClient;
class ReactorNettyHttpClientTest extends AbstractReactorNettyHttpClientTest {
@Override
HttpClient createHttpClient(boolean readTimeout) {
return HttpClient.create()
.tcpConfiguration(
tcpClient -> {
if (readTimeout) {
tcpClient =
tcpClient.doOnConnected(
connection ->
connection.addHandlerLast(
new ReadTimeoutHandler(
READ_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)));
}
return tcpClient.option(
ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) CONNECTION_TIMEOUT.toMillis());
});
}
@Override
public SingleConnection createSingleConnection(String host, int port) {
String url;
try {
url = new URL("http", host, port, "").toString();
} catch (MalformedURLException e) {
throw new AssertionError("Could not construct URL", e);
}
HttpClient httpClient = HttpClient.newConnection().baseUrl(url);
return (path, headers) ->
httpClient
.headers(h -> headers.forEach(h::add))
.get()
.uri(path)
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the span.
return content.map(unused -> resp);
})
.block()
.status()
.code();
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import java.util.concurrent.TimeUnit;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.TcpClient;
class ReactorNettyHttpClientUsingFromTest extends AbstractReactorNettyHttpClientTest {
@Override
HttpClient createHttpClient(boolean readTimeout) {
return HttpClient.from(TcpClient.create())
.tcpConfiguration(
tcpClient -> {
if (readTimeout) {
tcpClient =
tcpClient.doOnConnected(
connection ->
connection.addHandlerLast(
new ReadTimeoutHandler(
READ_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)));
}
return tcpClient.option(
ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) CONNECTION_TIMEOUT.toMillis());
});
}
}