From be645f08ab804a35ddcb685459f656c44910a4ab Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Fri, 30 Jul 2021 21:42:07 +0300 Subject: [PATCH] Test latest version of vert.x reactive (#3715) * Test latest version of vert.x reactive * review comment --- .../vertx/v4_0/client/VertxClientTracer.java | 7 +- .../javaagent/build.gradle.kts | 42 ++-- .../VertxReactivePropagationTest.groovy | 0 .../VertxRxCircuitBreakerWebClientTest.groovy | 125 +++++++++++ .../groovy/client/VertxRxWebClientTest.groovy | 112 ++++++++++ ...VertxRxCircuitBreakerHttpServerTest.groovy | 148 +++++++++++++ .../server/VertxRxHttpServerTest.groovy | 133 ++++++++++++ .../java/VertxReactiveWebServer.java | 177 ++++++++++++++++ ...VertxRxCircuitBreakerSingleConnection.java | 46 ++++ .../java/client/VertxRxSingleConnection.java | 0 .../handler/codec/haproxy/HAProxyMessage.java | 10 + .../codec/haproxy/HAProxyProxiedProtocol.java | 10 + .../VertxReactivePropagationTest.groovy | 199 ++++++++++++++++++ .../VertxRxCircuitBreakerWebClientTest.groovy | 0 .../groovy/client/VertxRxWebClientTest.groovy | 0 ...VertxRxCircuitBreakerHttpServerTest.groovy | 0 .../server/VertxRxHttpServerTest.groovy | 0 .../java/VertxReactiveWebServer.java | 0 ...VertxRxCircuitBreakerSingleConnection.java | 0 .../java/client/VertxRxSingleConnection.java | 71 +++++++ 20 files changed, 1059 insertions(+), 21 deletions(-) rename instrumentation/vertx-reactive-3.5/javaagent/src/{test => latestDepTest}/groovy/VertxReactivePropagationTest.groovy (100%) create mode 100644 instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/groovy/client/VertxRxCircuitBreakerWebClientTest.groovy create mode 100644 instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/groovy/client/VertxRxWebClientTest.groovy create mode 100644 instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/groovy/server/VertxRxCircuitBreakerHttpServerTest.groovy create mode 100644 instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/groovy/server/VertxRxHttpServerTest.groovy create mode 100644 instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/java/VertxReactiveWebServer.java create mode 100644 instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/java/client/VertxRxCircuitBreakerSingleConnection.java rename instrumentation/vertx-reactive-3.5/javaagent/src/{test => latestDepTest}/java/client/VertxRxSingleConnection.java (100%) create mode 100644 instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/java/io/netty/handler/codec/haproxy/HAProxyMessage.java create mode 100644 instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/java/io/netty/handler/codec/haproxy/HAProxyProxiedProtocol.java create mode 100644 instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/groovy/VertxReactivePropagationTest.groovy rename instrumentation/vertx-reactive-3.5/javaagent/src/{test => version35Test}/groovy/client/VertxRxCircuitBreakerWebClientTest.groovy (100%) rename instrumentation/vertx-reactive-3.5/javaagent/src/{test => version35Test}/groovy/client/VertxRxWebClientTest.groovy (100%) rename instrumentation/vertx-reactive-3.5/javaagent/src/{test => version35Test}/groovy/server/VertxRxCircuitBreakerHttpServerTest.groovy (100%) rename instrumentation/vertx-reactive-3.5/javaagent/src/{test => version35Test}/groovy/server/VertxRxHttpServerTest.groovy (100%) rename instrumentation/vertx-reactive-3.5/javaagent/src/{test => version35Test}/java/VertxReactiveWebServer.java (100%) rename instrumentation/vertx-reactive-3.5/javaagent/src/{test => version35Test}/java/client/VertxRxCircuitBreakerSingleConnection.java (100%) create mode 100644 instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/java/client/VertxRxSingleConnection.java diff --git a/instrumentation/vertx-http-client/vertx-http-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/client/VertxClientTracer.java b/instrumentation/vertx-http-client/vertx-http-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/client/VertxClientTracer.java index aa9085e6e8..319bcd41a2 100644 --- a/instrumentation/vertx-http-client/vertx-http-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/client/VertxClientTracer.java +++ b/instrumentation/vertx-http-client/vertx-http-client-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/v4_0/client/VertxClientTracer.java @@ -31,9 +31,10 @@ public class VertxClientTracer extends AbstractVertxClientTracer { @Override @Nullable protected URI url(HttpClientRequest request) throws URISyntaxException { - if (request.absoluteURI().startsWith(request.getURI())) { - return new URI(request.getURI()); + URI uri = new URI(request.getURI()); + if (!uri.isAbsolute()) { + uri = new URI(request.absoluteURI()); } - return new URI(request.absoluteURI()); + return uri; } } diff --git a/instrumentation/vertx-reactive-3.5/javaagent/build.gradle.kts b/instrumentation/vertx-reactive-3.5/javaagent/build.gradle.kts index bd33710be4..3ea9f99dfc 100644 --- a/instrumentation/vertx-reactive-3.5/javaagent/build.gradle.kts +++ b/instrumentation/vertx-reactive-3.5/javaagent/build.gradle.kts @@ -1,5 +1,6 @@ plugins { id("otel.javaagent-instrumentation") + id("org.unbroken-dome.test-sets") } muzzle { @@ -10,12 +11,23 @@ muzzle { } } +testSets { + create("version35Test") + create("latestDepTest") +} + +tasks { + named("test") { + dependsOn("version35Test") + } +} + //The first Vert.x version that uses rx-java 2 val vertxVersion = "3.5.0" dependencies { - library("io.vertx:vertx-web:${vertxVersion}") - library("io.vertx:vertx-rx-java2:${vertxVersion}") + compileOnly("io.vertx:vertx-web:${vertxVersion}") + compileOnly("io.vertx:vertx-rx-java2:${vertxVersion}") testInstrumentation(project(":instrumentation:jdbc:javaagent")) testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent")) @@ -24,23 +36,17 @@ dependencies { testInstrumentation(project(":instrumentation:vertx-http-client:vertx-http-client-4.0:javaagent")) testInstrumentation(project(":instrumentation:vertx-web-3.0:javaagent")) - testLibrary("io.vertx:vertx-web-client:${vertxVersion}") - testLibrary("io.vertx:vertx-jdbc-client:${vertxVersion}") - testLibrary("io.vertx:vertx-circuit-breaker:${vertxVersion}") testImplementation("org.hsqldb:hsqldb:2.3.4") - // Vert.x 4.0 is incompatible with our tests. - // 3.9.7 Requires Netty 4.1.60, no other version works with it. - latestDepTestLibrary(enforcedPlatform("io.netty:netty-bom:4.1.60.Final")) - latestDepTestLibrary("io.vertx:vertx-web:3.+") - latestDepTestLibrary("io.vertx:vertx-web-client:3.+") - latestDepTestLibrary("io.vertx:vertx-jdbc-client:3.+") - latestDepTestLibrary("io.vertx:vertx-circuit-breaker:3.+") - latestDepTestLibrary("io.vertx:vertx-rx-java2:3.+") -} + add("version35TestImplementation", "io.vertx:vertx-web:${vertxVersion}") + add("version35TestImplementation", "io.vertx:vertx-rx-java2:${vertxVersion}") + add("version35TestImplementation", "io.vertx:vertx-web-client:${vertxVersion}") + add("version35TestImplementation", "io.vertx:vertx-jdbc-client:${vertxVersion}") + add("version35TestImplementation", "io.vertx:vertx-circuit-breaker:${vertxVersion}") -tasks { - named("test") { - systemProperty("testLatestDeps", findProperty("testLatestDeps")) - } + add("latestDepTestImplementation", "io.vertx:vertx-web:4.+") + add("latestDepTestImplementation", "io.vertx:vertx-rx-java2:4.+") + add("latestDepTestImplementation", "io.vertx:vertx-web-client:4.+") + add("latestDepTestImplementation", "io.vertx:vertx-jdbc-client:4.+") + add("latestDepTestImplementation", "io.vertx:vertx-circuit-breaker:4.+") } diff --git a/instrumentation/vertx-reactive-3.5/javaagent/src/test/groovy/VertxReactivePropagationTest.groovy b/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/groovy/VertxReactivePropagationTest.groovy similarity index 100% rename from instrumentation/vertx-reactive-3.5/javaagent/src/test/groovy/VertxReactivePropagationTest.groovy rename to instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/groovy/VertxReactivePropagationTest.groovy diff --git a/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/groovy/client/VertxRxCircuitBreakerWebClientTest.groovy b/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/groovy/client/VertxRxCircuitBreakerWebClientTest.groovy new file mode 100644 index 0000000000..72d28c6ef6 --- /dev/null +++ b/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/groovy/client/VertxRxCircuitBreakerWebClientTest.groovy @@ -0,0 +1,125 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package client + +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.instrumentation.test.AgentTestTrait +import io.opentelemetry.instrumentation.test.base.HttpClientTest +import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest +import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection +import io.vertx.circuitbreaker.CircuitBreakerOptions +import io.vertx.core.AsyncResult +import io.vertx.core.VertxOptions +import io.vertx.core.http.HttpMethod +import io.vertx.ext.web.client.WebClientOptions +import io.vertx.reactivex.circuitbreaker.CircuitBreaker +import io.vertx.reactivex.core.Vertx +import io.vertx.reactivex.ext.web.client.HttpRequest +import io.vertx.reactivex.ext.web.client.WebClient +import java.util.concurrent.CompletableFuture +import java.util.function.Consumer +import spock.lang.Shared + +class VertxRxCircuitBreakerWebClientTest extends HttpClientTest> implements AgentTestTrait { + + @Shared + Vertx vertx = Vertx.vertx(new VertxOptions()) + @Shared + def clientOptions = new WebClientOptions().setConnectTimeout(CONNECT_TIMEOUT_MS) + @Shared + WebClient client = WebClient.create(vertx, clientOptions) + @Shared + CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx, + new CircuitBreakerOptions() + .setTimeout(-1) // Disable the timeout otherwise it makes each test take this long. + ) + + @Override + HttpRequest buildRequest(String method, URI uri, Map headers) { + def request = client.request(HttpMethod.valueOf(method), getPort(uri), uri.host, "$uri") + headers.each { request.putHeader(it.key, it.value) } + return request + } + + @Override + int sendRequest(HttpRequest request, String method, URI uri, Map headers) { + // VertxRx doesn't seem to provide a synchronous API at all for circuit breaker. Bridge through + // a callback. + CompletableFuture future = new CompletableFuture<>() + sendRequestWithCallback(request) { + if (it.succeeded()) { + future.complete(it.result().statusCode()) + } else { + future.completeExceptionally(it.cause()) + } + } + return future.get() + } + + void sendRequestWithCallback(HttpRequest request, Consumer consumer) { + breaker.execute({ command -> + request.rxSend().doOnSuccess { + command.complete(it) + }.doOnError { + command.fail(it) + }.subscribe() + }, { + consumer.accept(it) + }) + } + + @Override + void sendRequestWithCallback(HttpRequest request, String method, URI uri, Map headers, AbstractHttpClientTest.RequestResult requestResult) { + sendRequestWithCallback(request) { + if (it.succeeded()) { + requestResult.complete(it.result().statusCode()) + } else { + requestResult.complete(it.cause()) + } + } + } + + @Override + String expectedClientSpanName(URI uri, String method) { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "https://192.0.2.1/": // non routable address + return "CONNECT" + default: + return super.expectedClientSpanName(uri, method) + } + } + + @Override + Set> httpAttributes(URI uri) { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "https://192.0.2.1/": // non routable address + return [] + } + return super.httpAttributes(uri) + } + + @Override + String userAgent() { + return "Vert.x-WebClient" + } + + @Override + boolean testRedirects() { + false + } + + @Override + boolean testHttps() { + false + } + + @Override + SingleConnection createSingleConnection(String host, int port) { + return new VertxRxCircuitBreakerSingleConnection(host, port, breaker) + } +} diff --git a/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/groovy/client/VertxRxWebClientTest.groovy b/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/groovy/client/VertxRxWebClientTest.groovy new file mode 100644 index 0000000000..bd7292a285 --- /dev/null +++ b/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/groovy/client/VertxRxWebClientTest.groovy @@ -0,0 +1,112 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package client + +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.instrumentation.test.AgentTestTrait +import io.opentelemetry.instrumentation.test.base.HttpClientTest +import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest +import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection +import io.vertx.core.VertxOptions +import io.vertx.core.http.HttpMethod +import io.vertx.ext.web.client.WebClientOptions +import io.vertx.reactivex.core.Vertx +import io.vertx.reactivex.core.buffer.Buffer +import io.vertx.reactivex.ext.web.client.HttpRequest +import io.vertx.reactivex.ext.web.client.HttpResponse +import io.vertx.reactivex.ext.web.client.WebClient +import spock.lang.Shared + +class VertxRxWebClientTest extends HttpClientTest> implements AgentTestTrait { + + @Shared + Vertx vertx = Vertx.vertx(new VertxOptions()) + @Shared + def clientOptions = new WebClientOptions().setConnectTimeout(CONNECT_TIMEOUT_MS) + @Shared + WebClient client = WebClient.create(vertx, clientOptions) + + @Override + HttpRequest buildRequest(String method, URI uri, Map headers) { + def request = client.request(HttpMethod.valueOf(method), getPort(uri), uri.host, "$uri") + headers.each { request.putHeader(it.key, it.value) } + return request + } + + @Override + int sendRequest(HttpRequest request, String method, URI uri, Map headers) { + return request.rxSend().blockingGet().statusCode() + } + + @Override + void sendRequestWithCallback(HttpRequest request, String method, URI uri, Map headers, AbstractHttpClientTest.RequestResult requestResult) { + request.rxSend() + .subscribe(new io.reactivex.functions.Consumer>() { + @Override + void accept(HttpResponse httpResponse) throws Exception { + requestResult.complete(httpResponse.statusCode()) + } + }, new io.reactivex.functions.Consumer() { + @Override + void accept(Throwable throwable) throws Exception { + requestResult.complete(throwable) + } + }) + } + + @Override + String expectedClientSpanName(URI uri, String method) { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "https://192.0.2.1/": // non routable address + return "CONNECT" + default: + return super.expectedClientSpanName(uri, method) + } + } + + @Override + Throwable clientSpanError(URI uri, Throwable exception) { + if (exception.class == RuntimeException) { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "https://192.0.2.1/": // non routable address + exception = exception.getCause() + } + } + return exception + } + + @Override + Set> httpAttributes(URI uri) { + switch (uri.toString()) { + case "http://localhost:61/": // unopened port + case "https://192.0.2.1/": // non routable address + return [] + } + return super.httpAttributes(uri) + } + + @Override + String userAgent() { + return "Vert.x-WebClient" + } + + @Override + boolean testRedirects() { + false + } + + @Override + boolean testHttps() { + false + } + + @Override + SingleConnection createSingleConnection(String host, int port) { + return new VertxRxSingleConnection(host, port) + } +} diff --git a/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/groovy/server/VertxRxCircuitBreakerHttpServerTest.groovy b/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/groovy/server/VertxRxCircuitBreakerHttpServerTest.groovy new file mode 100644 index 0000000000..fa4fe7ef8f --- /dev/null +++ b/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/groovy/server/VertxRxCircuitBreakerHttpServerTest.groovy @@ -0,0 +1,148 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package server + +import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.ERROR +import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.EXCEPTION +import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.INDEXED_CHILD +import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM +import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM +import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.REDIRECT +import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS + +import io.opentelemetry.instrumentation.test.base.HttpServerTest +import io.vertx.circuitbreaker.CircuitBreakerOptions +import io.vertx.core.Promise +import io.vertx.reactivex.circuitbreaker.CircuitBreaker +import io.vertx.reactivex.core.AbstractVerticle +import io.vertx.reactivex.ext.web.Router + +class VertxRxCircuitBreakerHttpServerTest extends VertxRxHttpServerTest { + + @Override + protected Class verticle() { + return VertxRxCircuitBreakerWebTestServer + } + + static class VertxRxCircuitBreakerWebTestServer extends AbstractVerticle { + + @Override + void start(Promise startPromise) { + int port = config().getInteger(CONFIG_HTTP_SERVER_PORT) + Router router = Router.router(super.@vertx) + CircuitBreaker breaker = + CircuitBreaker.create( + "my-circuit-breaker", + super.@vertx, + new CircuitBreakerOptions() + .setTimeout(-1) // Disable the timeout otherwise it makes each test take this long. + ) + + router.route(SUCCESS.path).handler { ctx -> + breaker.execute({ future -> + future.complete(SUCCESS) + }, { it -> + if (it.failed()) { + throw it.cause() + } + HttpServerTest.ServerEndpoint endpoint = it.result() + controller(endpoint) { + ctx.response().setStatusCode(endpoint.status).end(endpoint.body) + } + }) + } + router.route(INDEXED_CHILD.path).handler { ctx -> + breaker.execute({ future -> + future.complete(INDEXED_CHILD) + }, { it -> + if (it.failed()) { + throw it.cause() + } + HttpServerTest.ServerEndpoint endpoint = it.result() + controller(endpoint) { + endpoint.collectSpanAttributes { ctx.request().params().get(it) } + ctx.response().setStatusCode(endpoint.status).end() + } + }) + } + router.route(QUERY_PARAM.path).handler { ctx -> + breaker.execute({ future -> + future.complete(QUERY_PARAM) + }, { it -> + if (it.failed()) { + throw it.cause() + } + HttpServerTest.ServerEndpoint endpoint = it.result() + controller(endpoint) { + ctx.response().setStatusCode(endpoint.status).end(ctx.request().query()) + } + }) + } + router.route(REDIRECT.path).handler { ctx -> + breaker.execute({ future -> + future.complete(REDIRECT) + }, { + if (it.failed()) { + throw it.cause() + } + HttpServerTest.ServerEndpoint endpoint = it.result() + controller(endpoint) { + ctx.response().setStatusCode(endpoint.status).putHeader("location", endpoint.body).end() + } + }) + } + router.route(ERROR.path).handler { ctx -> + breaker.execute({ future -> + future.complete(ERROR) + }, { + if (it.failed()) { + throw it.cause() + } + HttpServerTest.ServerEndpoint endpoint = it.result() + controller(endpoint) { + ctx.response().setStatusCode(endpoint.status).end(endpoint.body) + } + }) + } + router.route(EXCEPTION.path).handler { ctx -> + breaker.execute({ future -> + future.fail(new Exception(EXCEPTION.body)) + }, { + try { + def cause = it.cause() + controller(EXCEPTION) { + throw cause + } + } catch (Exception ex) { + ctx.response().setStatusCode(EXCEPTION.status).end(ex.message) + } + }) + } + router.route("/path/:id/param").handler { ctx -> + breaker.execute({ future -> + future.complete(PATH_PARAM) + }, { + if (it.failed()) { + throw it.cause() + } + HttpServerTest.ServerEndpoint endpoint = it.result() + controller(endpoint) { + ctx.response().setStatusCode(endpoint.status).end(ctx.request().getParam("id")) + } + }) + } + + super.@vertx.createHttpServer() + .requestHandler(router) + .listen(port) { startPromise.complete() } + } + } + + @Override + boolean hasExceptionOnServerSpan(HttpServerTest.ServerEndpoint endpoint) { + return endpoint != EXCEPTION && super.hasExceptionOnServerSpan(endpoint) + } +} diff --git a/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/groovy/server/VertxRxHttpServerTest.groovy b/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/groovy/server/VertxRxHttpServerTest.groovy new file mode 100644 index 0000000000..bb769b382b --- /dev/null +++ b/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/groovy/server/VertxRxHttpServerTest.groovy @@ -0,0 +1,133 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package server + +import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.ERROR +import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.EXCEPTION +import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.INDEXED_CHILD +import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.NOT_FOUND +import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM +import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM +import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.REDIRECT +import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS + +import io.opentelemetry.instrumentation.test.AgentTestTrait +import io.opentelemetry.instrumentation.test.base.HttpServerTest +import io.vertx.core.DeploymentOptions +import io.vertx.core.Promise +import io.vertx.core.Vertx +import io.vertx.core.VertxOptions +import io.vertx.core.json.JsonObject +import io.vertx.reactivex.core.AbstractVerticle +import io.vertx.reactivex.ext.web.Router +import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit + +class VertxRxHttpServerTest extends HttpServerTest implements AgentTestTrait { + public static final String CONFIG_HTTP_SERVER_PORT = "http.server.port" + + @Override + Vertx startServer(int port) { + Vertx server = Vertx.vertx(new VertxOptions() + // Useful for debugging: + // .setBlockedThreadCheckInterval(Integer.MAX_VALUE) + ) + CompletableFuture future = new CompletableFuture<>() + server.deployVerticle(verticle().getName(), + new DeploymentOptions() + .setConfig(new JsonObject().put(CONFIG_HTTP_SERVER_PORT, port)) + .setInstances(3)) { res -> + if (!res.succeeded()) { + throw new IllegalStateException("Cannot deploy server Verticle", res.cause()) + } + future.complete(null) + } + + future.get(30, TimeUnit.SECONDS) + return server + } + + @Override + void stopServer(Vertx server) { + server.close() + } + + @Override + boolean testPathParam() { + return true + } + + @Override + String expectedServerSpanName(ServerEndpoint endpoint) { + switch (endpoint) { + case PATH_PARAM: + return "/path/:id/param" + case NOT_FOUND: + return "HTTP GET" + default: + return endpoint.getPath() + } + } + + @Override + boolean testConcurrency() { + return true + } + + protected Class verticle() { + return VertxReactiveWebServer + } + + static class VertxReactiveWebServer extends AbstractVerticle { + + @Override + void start(Promise startPromise) { + int port = config().getInteger(CONFIG_HTTP_SERVER_PORT) + Router router = Router.router(super.@vertx) + + router.route(SUCCESS.path).handler { ctx -> + controller(SUCCESS) { + ctx.response().setStatusCode(SUCCESS.status).end(SUCCESS.body) + } + } + router.route(INDEXED_CHILD.path).handler { ctx -> + controller(INDEXED_CHILD) { + INDEXED_CHILD.collectSpanAttributes { ctx.request().params().get(it) } + ctx.response().setStatusCode(INDEXED_CHILD.status).end() + } + } + router.route(QUERY_PARAM.path).handler { ctx -> + controller(QUERY_PARAM) { + ctx.response().setStatusCode(QUERY_PARAM.status).end(ctx.request().query()) + } + } + router.route(REDIRECT.path).handler { ctx -> + controller(REDIRECT) { + ctx.response().setStatusCode(REDIRECT.status).putHeader("location", REDIRECT.body).end() + } + } + router.route(ERROR.path).handler { ctx -> + controller(ERROR) { + ctx.response().setStatusCode(ERROR.status).end(ERROR.body) + } + } + router.route(EXCEPTION.path).handler { ctx -> + controller(EXCEPTION) { + throw new Exception(EXCEPTION.body) + } + } + router.route("/path/:id/param").handler { ctx -> + controller(PATH_PARAM) { + ctx.response().setStatusCode(PATH_PARAM.status).end(ctx.request().getParam("id")) + } + } + + super.@vertx.createHttpServer() + .requestHandler(router) + .listen(port) { startPromise.complete() } + } + } +} diff --git a/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/java/VertxReactiveWebServer.java b/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/java/VertxReactiveWebServer.java new file mode 100644 index 0000000000..667dc33cd4 --- /dev/null +++ b/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/java/VertxReactiveWebServer.java @@ -0,0 +1,177 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.reactivex.Single; +import io.vertx.core.DeploymentOptions; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.VertxOptions; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.reactivex.core.AbstractVerticle; +import io.vertx.reactivex.core.Vertx; +import io.vertx.reactivex.core.http.HttpServerResponse; +import io.vertx.reactivex.ext.jdbc.JDBCClient; +import io.vertx.reactivex.ext.sql.SQLConnection; +import io.vertx.reactivex.ext.web.Router; +import io.vertx.reactivex.ext.web.RoutingContext; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class VertxReactiveWebServer extends AbstractVerticle { + + private static final Logger logger = LoggerFactory.getLogger(VertxReactiveWebServer.class); + + private static final Tracer tracer = GlobalOpenTelemetry.getTracer("test"); + + public static final String TEST_REQUEST_ID_PARAMETER = "test-request-id"; + public static final String TEST_REQUEST_ID_ATTRIBUTE = "test.request.id"; + + private static final String CONFIG_HTTP_SERVER_PORT = "http.server.port"; + private static JDBCClient client; + + public static Vertx start(int port) + throws ExecutionException, InterruptedException, TimeoutException { + /* This is highly against Vertx ideas, but our tests are synchronous + so we have to make sure server is up and running */ + CompletableFuture future = new CompletableFuture<>(); + + Vertx server = Vertx.vertx(new VertxOptions()); + + client = + JDBCClient.createShared( + server, + new JsonObject() + .put("url", "jdbc:hsqldb:mem:test?shutdown=true") + .put("driver_class", "org.hsqldb.jdbcDriver")); + + logger.info("Starting on port {}", port); + server.deployVerticle( + VertxReactiveWebServer.class.getName(), + new DeploymentOptions().setConfig(new JsonObject().put(CONFIG_HTTP_SERVER_PORT, port)), + res -> { + if (!res.succeeded()) { + RuntimeException exception = + new RuntimeException("Cannot deploy server Verticle", res.cause()); + future.completeExceptionally(exception); + } + future.complete(null); + }); + // block until vertx server is up + future.get(30, TimeUnit.SECONDS); + + return server; + } + + @Override + public void start(Promise startPromise) { + setUpInitialData( + ready -> { + Router router = Router.router(vertx); + int port = config().getInteger(CONFIG_HTTP_SERVER_PORT); + logger.info("Listening on port {}", port); + router + .route(SUCCESS.getPath()) + .handler( + ctx -> ctx.response().setStatusCode(SUCCESS.getStatus()).end(SUCCESS.getBody())); + + router.route("/listProducts").handler(VertxReactiveWebServer::handleListProducts); + + vertx + .createHttpServer() + .requestHandler(router) + .listen(port, h -> startPromise.complete()); + }); + } + + @SuppressWarnings("CheckReturnValue") + private static void handleListProducts(RoutingContext routingContext) { + Long requestId = extractRequestId(routingContext); + attachRequestIdToCurrentSpan(requestId); + + Span span = tracer.spanBuilder("handleListProducts").startSpan(); + try (Scope ignored = Context.current().with(span).makeCurrent()) { + attachRequestIdToCurrentSpan(requestId); + + HttpServerResponse response = routingContext.response(); + Single jsonArraySingle = listProducts(requestId); + + jsonArraySingle.subscribe( + arr -> response.putHeader("content-type", "application/json").end(arr.encode())); + } finally { + span.end(); + } + } + + private static Single listProducts(Long requestId) { + Span span = tracer.spanBuilder("listProducts").startSpan(); + try (Scope ignored = Context.current().with(span).makeCurrent()) { + attachRequestIdToCurrentSpan(requestId); + String queryInfix = requestId != null ? " AS request" + requestId : ""; + + return client + .rxQuery("SELECT id" + queryInfix + ", name, price, weight FROM products") + .flatMap( + result -> { + JsonArray arr = new JsonArray(); + result.getRows().forEach(arr::add); + return Single.just(arr); + }); + } finally { + span.end(); + } + } + + private static Long extractRequestId(RoutingContext routingContext) { + String requestIdString = routingContext.request().params().get(TEST_REQUEST_ID_PARAMETER); + return requestIdString != null ? Long.valueOf(requestIdString) : null; + } + + private static void attachRequestIdToCurrentSpan(Long requestId) { + if (requestId != null) { + Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, requestId); + } + } + + private static void setUpInitialData(Handler done) { + client.getConnection( + res -> { + if (res.failed()) { + throw new IllegalStateException(res.cause()); + } + + SQLConnection conn = res.result(); + + conn.execute( + "CREATE TABLE IF NOT EXISTS products(id INT IDENTITY, name VARCHAR(255), price FLOAT, weight INT)", + ddl -> { + if (ddl.failed()) { + throw new IllegalStateException(ddl.cause()); + } + + conn.execute( + "INSERT INTO products (name, price, weight) VALUES ('Egg Whisk', 3.99, 150), ('Tea Cosy', 5.99, 100), ('Spatula', 1.00, 80)", + fixtures -> { + if (fixtures.failed()) { + throw new IllegalStateException(fixtures.cause()); + } + + done.handle(null); + }); + }); + }); + } +} diff --git a/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/java/client/VertxRxCircuitBreakerSingleConnection.java b/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/java/client/VertxRxCircuitBreakerSingleConnection.java new file mode 100644 index 0000000000..fc1020e2eb --- /dev/null +++ b/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/java/client/VertxRxCircuitBreakerSingleConnection.java @@ -0,0 +1,46 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package client; + +import io.vertx.core.AsyncResult; +import io.vertx.reactivex.circuitbreaker.CircuitBreaker; +import io.vertx.reactivex.ext.web.client.HttpRequest; +import io.vertx.reactivex.ext.web.client.HttpResponse; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +public class VertxRxCircuitBreakerSingleConnection extends VertxRxSingleConnection { + private final CircuitBreaker breaker; + + public VertxRxCircuitBreakerSingleConnection(String host, int port, CircuitBreaker breaker) { + super(host, port); + this.breaker = breaker; + } + + @Override + protected HttpResponse fetchResponse(HttpRequest request) { + CompletableFuture future = new CompletableFuture<>(); + + sendRequestWithCallback( + request, + it -> { + if (it.succeeded()) { + future.complete(it.result()); + } else { + future.completeExceptionally(it.cause()); + } + }); + + return (HttpResponse) future.join(); + } + + private void sendRequestWithCallback(HttpRequest request, Consumer> consumer) { + breaker.execute( + command -> + request.rxSend().doOnSuccess(command::complete).doOnError(command::fail).subscribe(), + consumer::accept); + } +} diff --git a/instrumentation/vertx-reactive-3.5/javaagent/src/test/java/client/VertxRxSingleConnection.java b/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/java/client/VertxRxSingleConnection.java similarity index 100% rename from instrumentation/vertx-reactive-3.5/javaagent/src/test/java/client/VertxRxSingleConnection.java rename to instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/java/client/VertxRxSingleConnection.java diff --git a/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/java/io/netty/handler/codec/haproxy/HAProxyMessage.java b/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/java/io/netty/handler/codec/haproxy/HAProxyMessage.java new file mode 100644 index 0000000000..3dcf2cafed --- /dev/null +++ b/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/java/io/netty/handler/codec/haproxy/HAProxyMessage.java @@ -0,0 +1,10 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.netty.handler.codec.haproxy; + +// instrumentation fails without this class +@SuppressWarnings("checkstyle:AbbreviationAsWordInName") +public class HAProxyMessage {} diff --git a/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/java/io/netty/handler/codec/haproxy/HAProxyProxiedProtocol.java b/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/java/io/netty/handler/codec/haproxy/HAProxyProxiedProtocol.java new file mode 100644 index 0000000000..5659024f96 --- /dev/null +++ b/instrumentation/vertx-reactive-3.5/javaagent/src/latestDepTest/java/io/netty/handler/codec/haproxy/HAProxyProxiedProtocol.java @@ -0,0 +1,10 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.netty.handler.codec.haproxy; + +// instrumentation fails without this class +@SuppressWarnings("checkstyle:AbbreviationAsWordInName") +public class HAProxyProxiedProtocol {} diff --git a/instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/groovy/VertxReactivePropagationTest.groovy b/instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/groovy/VertxReactivePropagationTest.groovy new file mode 100644 index 0000000000..4de2b2ce90 --- /dev/null +++ b/instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/groovy/VertxReactivePropagationTest.groovy @@ -0,0 +1,199 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import static VertxReactiveWebServer.TEST_REQUEST_ID_ATTRIBUTE +import static VertxReactiveWebServer.TEST_REQUEST_ID_PARAMETER +import static io.opentelemetry.api.trace.SpanKind.CLIENT +import static io.opentelemetry.api.trace.SpanKind.SERVER +import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS + +import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.trace.Span +import io.opentelemetry.api.trace.SpanKind +import io.opentelemetry.context.Context +import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification +import io.opentelemetry.instrumentation.test.utils.PortUtils +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import io.opentelemetry.testing.internal.armeria.client.WebClient +import io.opentelemetry.testing.internal.armeria.common.HttpRequest +import io.opentelemetry.testing.internal.armeria.common.HttpRequestBuilder +import io.vertx.reactivex.core.Vertx +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import spock.lang.Shared + +class VertxReactivePropagationTest extends AgentInstrumentationSpecification { + @Shared + WebClient client + + @Shared + int port + + @Shared + Vertx server + + def setupSpec() { + port = PortUtils.findOpenPort() + server = VertxReactiveWebServer.start(port) + client = WebClient.of("h1c://localhost:${port}") + } + + def cleanupSpec() { + server.close() + } + + //Verifies that context is correctly propagated and sql query span has correct parent. + //Tests io.opentelemetry.javaagent.instrumentation.vertx.reactive.VertxRxInstrumentation + def "should propagate context over vert.x rx-java framework"() { + setup: + def response = client.get("/listProducts").aggregate().join() + + expect: + response.status().code() == SUCCESS.status + + and: + assertTraces(1) { + trace(0, 4) { + span(0) { + name "/listProducts" + kind SERVER + hasNoParent() + attributes { + "${SemanticAttributes.NET_PEER_PORT.key}" Long + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" + "${SemanticAttributes.HTTP_URL.key}" "http://localhost:${port}/listProducts" + "${SemanticAttributes.HTTP_METHOD.key}" "GET" + "${SemanticAttributes.HTTP_STATUS_CODE.key}" 200 + "${SemanticAttributes.HTTP_FLAVOR.key}" "1.1" + "${SemanticAttributes.HTTP_USER_AGENT.key}" String + "${SemanticAttributes.HTTP_CLIENT_IP.key}" "127.0.0.1" + } + } + span(1) { + name "handleListProducts" + kind SpanKind.INTERNAL + childOf span(0) + } + span(2) { + name "listProducts" + kind SpanKind.INTERNAL + childOf span(1) + } + span(3) { + name "SELECT test.products" + kind CLIENT + childOf span(2) + attributes { + "${SemanticAttributes.DB_SYSTEM.key}" "hsqldb" + "${SemanticAttributes.DB_NAME.key}" "test" + "${SemanticAttributes.DB_USER.key}" "SA" + "${SemanticAttributes.DB_CONNECTION_STRING.key}" "hsqldb:mem:" + "${SemanticAttributes.DB_STATEMENT.key}" "SELECT id, name, price, weight FROM products" + "${SemanticAttributes.DB_OPERATION.key}" "SELECT" + "${SemanticAttributes.DB_SQL_TABLE.key}" "products" + } + } + } + } + } + + def "should propagate context correctly over vert.x rx-java framework with high concurrency"() { + setup: + int count = 100 + def baseUrl = "/listProducts" + def latch = new CountDownLatch(1) + + def pool = Executors.newFixedThreadPool(8) + def propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator() + def setter = { HttpRequestBuilder carrier, String name, String value -> + carrier.header(name, value) + } + + when: + count.times { index -> + def job = { + latch.await() + runWithSpan("client " + index) { + HttpRequestBuilder builder = HttpRequest.builder() + .get("${baseUrl}?${TEST_REQUEST_ID_PARAMETER}=${index}") + Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index) + propagator.inject(Context.current(), builder, setter) + client.execute(builder.build()).aggregate().join() + } + } + pool.submit(job) + } + + latch.countDown() + + then: + assertTraces(count) { + (0..count - 1).each { + trace(it, 5) { + def rootSpan = it.span(0) + def requestId = Long.valueOf(rootSpan.name.substring("client ".length())) + + span(0) { + name "client $requestId" + kind SpanKind.INTERNAL + hasNoParent() + attributes { + "${TEST_REQUEST_ID_ATTRIBUTE}" requestId + } + } + span(1) { + name "/listProducts" + kind SERVER + childOf(span(0)) + attributes { + "${SemanticAttributes.NET_PEER_PORT.key}" Long + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" + "${SemanticAttributes.HTTP_URL.key}" "http://localhost:$port$baseUrl?$TEST_REQUEST_ID_PARAMETER=$requestId" + "${SemanticAttributes.HTTP_METHOD.key}" "GET" + "${SemanticAttributes.HTTP_STATUS_CODE.key}" 200 + "${SemanticAttributes.HTTP_FLAVOR.key}" "1.1" + "${SemanticAttributes.HTTP_USER_AGENT.key}" String + "${SemanticAttributes.HTTP_CLIENT_IP.key}" "127.0.0.1" + "${TEST_REQUEST_ID_ATTRIBUTE}" requestId + } + } + span(2) { + name "handleListProducts" + kind SpanKind.INTERNAL + childOf(span(1)) + attributes { + "${TEST_REQUEST_ID_ATTRIBUTE}" requestId + } + } + span(3) { + name "listProducts" + kind SpanKind.INTERNAL + childOf(span(2)) + attributes { + "${TEST_REQUEST_ID_ATTRIBUTE}" requestId + } + } + span(4) { + name "SELECT test.products" + kind CLIENT + childOf(span(3)) + attributes { + "${SemanticAttributes.DB_SYSTEM.key}" "hsqldb" + "${SemanticAttributes.DB_NAME.key}" "test" + "${SemanticAttributes.DB_USER.key}" "SA" + "${SemanticAttributes.DB_CONNECTION_STRING.key}" "hsqldb:mem:" + "${SemanticAttributes.DB_STATEMENT.key}" "SELECT id AS request$requestId, name, price, weight FROM products" + "${SemanticAttributes.DB_OPERATION.key}" "SELECT" + "${SemanticAttributes.DB_SQL_TABLE.key}" "products" + } + } + } + } + } + + cleanup: + pool.shutdownNow() + } +} diff --git a/instrumentation/vertx-reactive-3.5/javaagent/src/test/groovy/client/VertxRxCircuitBreakerWebClientTest.groovy b/instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/groovy/client/VertxRxCircuitBreakerWebClientTest.groovy similarity index 100% rename from instrumentation/vertx-reactive-3.5/javaagent/src/test/groovy/client/VertxRxCircuitBreakerWebClientTest.groovy rename to instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/groovy/client/VertxRxCircuitBreakerWebClientTest.groovy diff --git a/instrumentation/vertx-reactive-3.5/javaagent/src/test/groovy/client/VertxRxWebClientTest.groovy b/instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/groovy/client/VertxRxWebClientTest.groovy similarity index 100% rename from instrumentation/vertx-reactive-3.5/javaagent/src/test/groovy/client/VertxRxWebClientTest.groovy rename to instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/groovy/client/VertxRxWebClientTest.groovy diff --git a/instrumentation/vertx-reactive-3.5/javaagent/src/test/groovy/server/VertxRxCircuitBreakerHttpServerTest.groovy b/instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/groovy/server/VertxRxCircuitBreakerHttpServerTest.groovy similarity index 100% rename from instrumentation/vertx-reactive-3.5/javaagent/src/test/groovy/server/VertxRxCircuitBreakerHttpServerTest.groovy rename to instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/groovy/server/VertxRxCircuitBreakerHttpServerTest.groovy diff --git a/instrumentation/vertx-reactive-3.5/javaagent/src/test/groovy/server/VertxRxHttpServerTest.groovy b/instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/groovy/server/VertxRxHttpServerTest.groovy similarity index 100% rename from instrumentation/vertx-reactive-3.5/javaagent/src/test/groovy/server/VertxRxHttpServerTest.groovy rename to instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/groovy/server/VertxRxHttpServerTest.groovy diff --git a/instrumentation/vertx-reactive-3.5/javaagent/src/test/java/VertxReactiveWebServer.java b/instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/java/VertxReactiveWebServer.java similarity index 100% rename from instrumentation/vertx-reactive-3.5/javaagent/src/test/java/VertxReactiveWebServer.java rename to instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/java/VertxReactiveWebServer.java diff --git a/instrumentation/vertx-reactive-3.5/javaagent/src/test/java/client/VertxRxCircuitBreakerSingleConnection.java b/instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/java/client/VertxRxCircuitBreakerSingleConnection.java similarity index 100% rename from instrumentation/vertx-reactive-3.5/javaagent/src/test/java/client/VertxRxCircuitBreakerSingleConnection.java rename to instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/java/client/VertxRxCircuitBreakerSingleConnection.java diff --git a/instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/java/client/VertxRxSingleConnection.java b/instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/java/client/VertxRxSingleConnection.java new file mode 100644 index 0000000000..96ee230533 --- /dev/null +++ b/instrumentation/vertx-reactive-3.5/javaagent/src/version35Test/java/client/VertxRxSingleConnection.java @@ -0,0 +1,71 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package client; + +import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection; +import io.vertx.core.VertxOptions; +import io.vertx.core.http.HttpMethod; +import io.vertx.ext.web.client.WebClientOptions; +import io.vertx.reactivex.core.Vertx; +import io.vertx.reactivex.core.buffer.Buffer; +import io.vertx.reactivex.ext.web.client.HttpRequest; +import io.vertx.reactivex.ext.web.client.HttpResponse; +import io.vertx.reactivex.ext.web.client.WebClient; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; + +public class VertxRxSingleConnection implements SingleConnection { + private final WebClient webClient; + private final String host; + private final int port; + + public VertxRxSingleConnection(String host, int port) { + this.host = host; + this.port = port; + + WebClientOptions clientOptions = + new WebClientOptions() + .setConnectTimeout(5000) + .setMaxPoolSize(1) + .setKeepAlive(true) + .setPipelining(true); + + Vertx vertx = Vertx.vertx(new VertxOptions()); + this.webClient = WebClient.create(vertx, clientOptions); + } + + @Override + public int doRequest(String path, Map headers) throws ExecutionException { + String requestId = Objects.requireNonNull(headers.get(REQUEST_ID_HEADER)); + + String url; + try { + url = new URL("http", host, port, path).toString(); + } catch (MalformedURLException e) { + throw new ExecutionException(e); + } + + HttpRequest request = webClient.request(HttpMethod.GET, port, host, url); + headers.forEach(request::putHeader); + + HttpResponse response = fetchResponse(request); + + String responseId = response.getHeader(REQUEST_ID_HEADER); + if (!requestId.equals(responseId)) { + throw new IllegalStateException( + String.format("Received response with id %s, expected %s", responseId, requestId)); + } + + return response.statusCode(); + } + + protected HttpResponse fetchResponse(HttpRequest request) { + return request.rxSend().blockingGet(); + } +}