Test latest version of vert.x reactive (#3715)

* Test latest version of vert.x reactive

* review comment
This commit is contained in:
Lauri Tulmin 2021-07-30 21:42:07 +03:00 committed by GitHub
parent b52fd39d8d
commit be645f08ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1059 additions and 21 deletions

View File

@ -31,9 +31,10 @@ public class VertxClientTracer extends AbstractVertxClientTracer {
@Override
@Nullable
protected URI url(HttpClientRequest request) throws URISyntaxException {
if (request.absoluteURI().startsWith(request.getURI())) {
return new URI(request.getURI());
URI uri = new URI(request.getURI());
if (!uri.isAbsolute()) {
uri = new URI(request.absoluteURI());
}
return new URI(request.absoluteURI());
return uri;
}
}

View File

@ -1,5 +1,6 @@
plugins {
id("otel.javaagent-instrumentation")
id("org.unbroken-dome.test-sets")
}
muzzle {
@ -10,12 +11,23 @@ muzzle {
}
}
testSets {
create("version35Test")
create("latestDepTest")
}
tasks {
named<Test>("test") {
dependsOn("version35Test")
}
}
//The first Vert.x version that uses rx-java 2
val vertxVersion = "3.5.0"
dependencies {
library("io.vertx:vertx-web:${vertxVersion}")
library("io.vertx:vertx-rx-java2:${vertxVersion}")
compileOnly("io.vertx:vertx-web:${vertxVersion}")
compileOnly("io.vertx:vertx-rx-java2:${vertxVersion}")
testInstrumentation(project(":instrumentation:jdbc:javaagent"))
testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent"))
@ -24,23 +36,17 @@ dependencies {
testInstrumentation(project(":instrumentation:vertx-http-client:vertx-http-client-4.0:javaagent"))
testInstrumentation(project(":instrumentation:vertx-web-3.0:javaagent"))
testLibrary("io.vertx:vertx-web-client:${vertxVersion}")
testLibrary("io.vertx:vertx-jdbc-client:${vertxVersion}")
testLibrary("io.vertx:vertx-circuit-breaker:${vertxVersion}")
testImplementation("org.hsqldb:hsqldb:2.3.4")
// Vert.x 4.0 is incompatible with our tests.
// 3.9.7 Requires Netty 4.1.60, no other version works with it.
latestDepTestLibrary(enforcedPlatform("io.netty:netty-bom:4.1.60.Final"))
latestDepTestLibrary("io.vertx:vertx-web:3.+")
latestDepTestLibrary("io.vertx:vertx-web-client:3.+")
latestDepTestLibrary("io.vertx:vertx-jdbc-client:3.+")
latestDepTestLibrary("io.vertx:vertx-circuit-breaker:3.+")
latestDepTestLibrary("io.vertx:vertx-rx-java2:3.+")
}
add("version35TestImplementation", "io.vertx:vertx-web:${vertxVersion}")
add("version35TestImplementation", "io.vertx:vertx-rx-java2:${vertxVersion}")
add("version35TestImplementation", "io.vertx:vertx-web-client:${vertxVersion}")
add("version35TestImplementation", "io.vertx:vertx-jdbc-client:${vertxVersion}")
add("version35TestImplementation", "io.vertx:vertx-circuit-breaker:${vertxVersion}")
tasks {
named<Test>("test") {
systemProperty("testLatestDeps", findProperty("testLatestDeps"))
}
add("latestDepTestImplementation", "io.vertx:vertx-web:4.+")
add("latestDepTestImplementation", "io.vertx:vertx-rx-java2:4.+")
add("latestDepTestImplementation", "io.vertx:vertx-web-client:4.+")
add("latestDepTestImplementation", "io.vertx:vertx-jdbc-client:4.+")
add("latestDepTestImplementation", "io.vertx:vertx-circuit-breaker:4.+")
}

View File

@ -0,0 +1,125 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package client
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest
import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection
import io.vertx.circuitbreaker.CircuitBreakerOptions
import io.vertx.core.AsyncResult
import io.vertx.core.VertxOptions
import io.vertx.core.http.HttpMethod
import io.vertx.ext.web.client.WebClientOptions
import io.vertx.reactivex.circuitbreaker.CircuitBreaker
import io.vertx.reactivex.core.Vertx
import io.vertx.reactivex.ext.web.client.HttpRequest
import io.vertx.reactivex.ext.web.client.WebClient
import java.util.concurrent.CompletableFuture
import java.util.function.Consumer
import spock.lang.Shared
class VertxRxCircuitBreakerWebClientTest extends HttpClientTest<HttpRequest<?>> implements AgentTestTrait {
@Shared
Vertx vertx = Vertx.vertx(new VertxOptions())
@Shared
def clientOptions = new WebClientOptions().setConnectTimeout(CONNECT_TIMEOUT_MS)
@Shared
WebClient client = WebClient.create(vertx, clientOptions)
@Shared
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions()
.setTimeout(-1) // Disable the timeout otherwise it makes each test take this long.
)
@Override
HttpRequest<?> buildRequest(String method, URI uri, Map<String, String> headers) {
def request = client.request(HttpMethod.valueOf(method), getPort(uri), uri.host, "$uri")
headers.each { request.putHeader(it.key, it.value) }
return request
}
@Override
int sendRequest(HttpRequest<?> request, String method, URI uri, Map<String, String> headers) {
// VertxRx doesn't seem to provide a synchronous API at all for circuit breaker. Bridge through
// a callback.
CompletableFuture<Integer> future = new CompletableFuture<>()
sendRequestWithCallback(request) {
if (it.succeeded()) {
future.complete(it.result().statusCode())
} else {
future.completeExceptionally(it.cause())
}
}
return future.get()
}
void sendRequestWithCallback(HttpRequest<?> request, Consumer<AsyncResult> consumer) {
breaker.execute({ command ->
request.rxSend().doOnSuccess {
command.complete(it)
}.doOnError {
command.fail(it)
}.subscribe()
}, {
consumer.accept(it)
})
}
@Override
void sendRequestWithCallback(HttpRequest<?> request, String method, URI uri, Map<String, String> headers, AbstractHttpClientTest.RequestResult requestResult) {
sendRequestWithCallback(request) {
if (it.succeeded()) {
requestResult.complete(it.result().statusCode())
} else {
requestResult.complete(it.cause())
}
}
}
@Override
String expectedClientSpanName(URI uri, String method) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "https://192.0.2.1/": // non routable address
return "CONNECT"
default:
return super.expectedClientSpanName(uri, method)
}
}
@Override
Set<AttributeKey<?>> httpAttributes(URI uri) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "https://192.0.2.1/": // non routable address
return []
}
return super.httpAttributes(uri)
}
@Override
String userAgent() {
return "Vert.x-WebClient"
}
@Override
boolean testRedirects() {
false
}
@Override
boolean testHttps() {
false
}
@Override
SingleConnection createSingleConnection(String host, int port) {
return new VertxRxCircuitBreakerSingleConnection(host, port, breaker)
}
}

View File

@ -0,0 +1,112 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package client
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest
import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection
import io.vertx.core.VertxOptions
import io.vertx.core.http.HttpMethod
import io.vertx.ext.web.client.WebClientOptions
import io.vertx.reactivex.core.Vertx
import io.vertx.reactivex.core.buffer.Buffer
import io.vertx.reactivex.ext.web.client.HttpRequest
import io.vertx.reactivex.ext.web.client.HttpResponse
import io.vertx.reactivex.ext.web.client.WebClient
import spock.lang.Shared
class VertxRxWebClientTest extends HttpClientTest<HttpRequest<Buffer>> implements AgentTestTrait {
@Shared
Vertx vertx = Vertx.vertx(new VertxOptions())
@Shared
def clientOptions = new WebClientOptions().setConnectTimeout(CONNECT_TIMEOUT_MS)
@Shared
WebClient client = WebClient.create(vertx, clientOptions)
@Override
HttpRequest<Buffer> buildRequest(String method, URI uri, Map<String, String> headers) {
def request = client.request(HttpMethod.valueOf(method), getPort(uri), uri.host, "$uri")
headers.each { request.putHeader(it.key, it.value) }
return request
}
@Override
int sendRequest(HttpRequest<Buffer> request, String method, URI uri, Map<String, String> headers) {
return request.rxSend().blockingGet().statusCode()
}
@Override
void sendRequestWithCallback(HttpRequest<Buffer> request, String method, URI uri, Map<String, String> headers, AbstractHttpClientTest.RequestResult requestResult) {
request.rxSend()
.subscribe(new io.reactivex.functions.Consumer<HttpResponse<?>>() {
@Override
void accept(HttpResponse<?> httpResponse) throws Exception {
requestResult.complete(httpResponse.statusCode())
}
}, new io.reactivex.functions.Consumer<Throwable>() {
@Override
void accept(Throwable throwable) throws Exception {
requestResult.complete(throwable)
}
})
}
@Override
String expectedClientSpanName(URI uri, String method) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "https://192.0.2.1/": // non routable address
return "CONNECT"
default:
return super.expectedClientSpanName(uri, method)
}
}
@Override
Throwable clientSpanError(URI uri, Throwable exception) {
if (exception.class == RuntimeException) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "https://192.0.2.1/": // non routable address
exception = exception.getCause()
}
}
return exception
}
@Override
Set<AttributeKey<?>> httpAttributes(URI uri) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "https://192.0.2.1/": // non routable address
return []
}
return super.httpAttributes(uri)
}
@Override
String userAgent() {
return "Vert.x-WebClient"
}
@Override
boolean testRedirects() {
false
}
@Override
boolean testHttps() {
false
}
@Override
SingleConnection createSingleConnection(String host, int port) {
return new VertxRxSingleConnection(host, port)
}
}

View File

@ -0,0 +1,148 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package server
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.ERROR
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.INDEXED_CHILD
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.REDIRECT
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS
import io.opentelemetry.instrumentation.test.base.HttpServerTest
import io.vertx.circuitbreaker.CircuitBreakerOptions
import io.vertx.core.Promise
import io.vertx.reactivex.circuitbreaker.CircuitBreaker
import io.vertx.reactivex.core.AbstractVerticle
import io.vertx.reactivex.ext.web.Router
class VertxRxCircuitBreakerHttpServerTest extends VertxRxHttpServerTest {
@Override
protected Class<AbstractVerticle> verticle() {
return VertxRxCircuitBreakerWebTestServer
}
static class VertxRxCircuitBreakerWebTestServer extends AbstractVerticle {
@Override
void start(Promise<Void> startPromise) {
int port = config().getInteger(CONFIG_HTTP_SERVER_PORT)
Router router = Router.router(super.@vertx)
CircuitBreaker breaker =
CircuitBreaker.create(
"my-circuit-breaker",
super.@vertx,
new CircuitBreakerOptions()
.setTimeout(-1) // Disable the timeout otherwise it makes each test take this long.
)
router.route(SUCCESS.path).handler { ctx ->
breaker.execute({ future ->
future.complete(SUCCESS)
}, { it ->
if (it.failed()) {
throw it.cause()
}
HttpServerTest.ServerEndpoint endpoint = it.result()
controller(endpoint) {
ctx.response().setStatusCode(endpoint.status).end(endpoint.body)
}
})
}
router.route(INDEXED_CHILD.path).handler { ctx ->
breaker.execute({ future ->
future.complete(INDEXED_CHILD)
}, { it ->
if (it.failed()) {
throw it.cause()
}
HttpServerTest.ServerEndpoint endpoint = it.result()
controller(endpoint) {
endpoint.collectSpanAttributes { ctx.request().params().get(it) }
ctx.response().setStatusCode(endpoint.status).end()
}
})
}
router.route(QUERY_PARAM.path).handler { ctx ->
breaker.execute({ future ->
future.complete(QUERY_PARAM)
}, { it ->
if (it.failed()) {
throw it.cause()
}
HttpServerTest.ServerEndpoint endpoint = it.result()
controller(endpoint) {
ctx.response().setStatusCode(endpoint.status).end(ctx.request().query())
}
})
}
router.route(REDIRECT.path).handler { ctx ->
breaker.execute({ future ->
future.complete(REDIRECT)
}, {
if (it.failed()) {
throw it.cause()
}
HttpServerTest.ServerEndpoint endpoint = it.result()
controller(endpoint) {
ctx.response().setStatusCode(endpoint.status).putHeader("location", endpoint.body).end()
}
})
}
router.route(ERROR.path).handler { ctx ->
breaker.execute({ future ->
future.complete(ERROR)
}, {
if (it.failed()) {
throw it.cause()
}
HttpServerTest.ServerEndpoint endpoint = it.result()
controller(endpoint) {
ctx.response().setStatusCode(endpoint.status).end(endpoint.body)
}
})
}
router.route(EXCEPTION.path).handler { ctx ->
breaker.execute({ future ->
future.fail(new Exception(EXCEPTION.body))
}, {
try {
def cause = it.cause()
controller(EXCEPTION) {
throw cause
}
} catch (Exception ex) {
ctx.response().setStatusCode(EXCEPTION.status).end(ex.message)
}
})
}
router.route("/path/:id/param").handler { ctx ->
breaker.execute({ future ->
future.complete(PATH_PARAM)
}, {
if (it.failed()) {
throw it.cause()
}
HttpServerTest.ServerEndpoint endpoint = it.result()
controller(endpoint) {
ctx.response().setStatusCode(endpoint.status).end(ctx.request().getParam("id"))
}
})
}
super.@vertx.createHttpServer()
.requestHandler(router)
.listen(port) { startPromise.complete() }
}
}
@Override
boolean hasExceptionOnServerSpan(HttpServerTest.ServerEndpoint endpoint) {
return endpoint != EXCEPTION && super.hasExceptionOnServerSpan(endpoint)
}
}

View File

@ -0,0 +1,133 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package server
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.ERROR
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.INDEXED_CHILD
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.NOT_FOUND
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.REDIRECT
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpServerTest
import io.vertx.core.DeploymentOptions
import io.vertx.core.Promise
import io.vertx.core.Vertx
import io.vertx.core.VertxOptions
import io.vertx.core.json.JsonObject
import io.vertx.reactivex.core.AbstractVerticle
import io.vertx.reactivex.ext.web.Router
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
class VertxRxHttpServerTest extends HttpServerTest<Vertx> implements AgentTestTrait {
public static final String CONFIG_HTTP_SERVER_PORT = "http.server.port"
@Override
Vertx startServer(int port) {
Vertx server = Vertx.vertx(new VertxOptions()
// Useful for debugging:
// .setBlockedThreadCheckInterval(Integer.MAX_VALUE)
)
CompletableFuture<Void> future = new CompletableFuture<>()
server.deployVerticle(verticle().getName(),
new DeploymentOptions()
.setConfig(new JsonObject().put(CONFIG_HTTP_SERVER_PORT, port))
.setInstances(3)) { res ->
if (!res.succeeded()) {
throw new IllegalStateException("Cannot deploy server Verticle", res.cause())
}
future.complete(null)
}
future.get(30, TimeUnit.SECONDS)
return server
}
@Override
void stopServer(Vertx server) {
server.close()
}
@Override
boolean testPathParam() {
return true
}
@Override
String expectedServerSpanName(ServerEndpoint endpoint) {
switch (endpoint) {
case PATH_PARAM:
return "/path/:id/param"
case NOT_FOUND:
return "HTTP GET"
default:
return endpoint.getPath()
}
}
@Override
boolean testConcurrency() {
return true
}
protected Class<AbstractVerticle> verticle() {
return VertxReactiveWebServer
}
static class VertxReactiveWebServer extends AbstractVerticle {
@Override
void start(Promise<Void> startPromise) {
int port = config().getInteger(CONFIG_HTTP_SERVER_PORT)
Router router = Router.router(super.@vertx)
router.route(SUCCESS.path).handler { ctx ->
controller(SUCCESS) {
ctx.response().setStatusCode(SUCCESS.status).end(SUCCESS.body)
}
}
router.route(INDEXED_CHILD.path).handler { ctx ->
controller(INDEXED_CHILD) {
INDEXED_CHILD.collectSpanAttributes { ctx.request().params().get(it) }
ctx.response().setStatusCode(INDEXED_CHILD.status).end()
}
}
router.route(QUERY_PARAM.path).handler { ctx ->
controller(QUERY_PARAM) {
ctx.response().setStatusCode(QUERY_PARAM.status).end(ctx.request().query())
}
}
router.route(REDIRECT.path).handler { ctx ->
controller(REDIRECT) {
ctx.response().setStatusCode(REDIRECT.status).putHeader("location", REDIRECT.body).end()
}
}
router.route(ERROR.path).handler { ctx ->
controller(ERROR) {
ctx.response().setStatusCode(ERROR.status).end(ERROR.body)
}
}
router.route(EXCEPTION.path).handler { ctx ->
controller(EXCEPTION) {
throw new Exception(EXCEPTION.body)
}
}
router.route("/path/:id/param").handler { ctx ->
controller(PATH_PARAM) {
ctx.response().setStatusCode(PATH_PARAM.status).end(ctx.request().getParam("id"))
}
}
super.@vertx.createHttpServer()
.requestHandler(router)
.listen(port) { startPromise.complete() }
}
}
}

View File

@ -0,0 +1,177 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.reactivex.Single;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.http.HttpServerResponse;
import io.vertx.reactivex.ext.jdbc.JDBCClient;
import io.vertx.reactivex.ext.sql.SQLConnection;
import io.vertx.reactivex.ext.web.Router;
import io.vertx.reactivex.ext.web.RoutingContext;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VertxReactiveWebServer extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(VertxReactiveWebServer.class);
private static final Tracer tracer = GlobalOpenTelemetry.getTracer("test");
public static final String TEST_REQUEST_ID_PARAMETER = "test-request-id";
public static final String TEST_REQUEST_ID_ATTRIBUTE = "test.request.id";
private static final String CONFIG_HTTP_SERVER_PORT = "http.server.port";
private static JDBCClient client;
public static Vertx start(int port)
throws ExecutionException, InterruptedException, TimeoutException {
/* This is highly against Vertx ideas, but our tests are synchronous
so we have to make sure server is up and running */
CompletableFuture<Void> future = new CompletableFuture<>();
Vertx server = Vertx.vertx(new VertxOptions());
client =
JDBCClient.createShared(
server,
new JsonObject()
.put("url", "jdbc:hsqldb:mem:test?shutdown=true")
.put("driver_class", "org.hsqldb.jdbcDriver"));
logger.info("Starting on port {}", port);
server.deployVerticle(
VertxReactiveWebServer.class.getName(),
new DeploymentOptions().setConfig(new JsonObject().put(CONFIG_HTTP_SERVER_PORT, port)),
res -> {
if (!res.succeeded()) {
RuntimeException exception =
new RuntimeException("Cannot deploy server Verticle", res.cause());
future.completeExceptionally(exception);
}
future.complete(null);
});
// block until vertx server is up
future.get(30, TimeUnit.SECONDS);
return server;
}
@Override
public void start(Promise<Void> startPromise) {
setUpInitialData(
ready -> {
Router router = Router.router(vertx);
int port = config().getInteger(CONFIG_HTTP_SERVER_PORT);
logger.info("Listening on port {}", port);
router
.route(SUCCESS.getPath())
.handler(
ctx -> ctx.response().setStatusCode(SUCCESS.getStatus()).end(SUCCESS.getBody()));
router.route("/listProducts").handler(VertxReactiveWebServer::handleListProducts);
vertx
.createHttpServer()
.requestHandler(router)
.listen(port, h -> startPromise.complete());
});
}
@SuppressWarnings("CheckReturnValue")
private static void handleListProducts(RoutingContext routingContext) {
Long requestId = extractRequestId(routingContext);
attachRequestIdToCurrentSpan(requestId);
Span span = tracer.spanBuilder("handleListProducts").startSpan();
try (Scope ignored = Context.current().with(span).makeCurrent()) {
attachRequestIdToCurrentSpan(requestId);
HttpServerResponse response = routingContext.response();
Single<JsonArray> jsonArraySingle = listProducts(requestId);
jsonArraySingle.subscribe(
arr -> response.putHeader("content-type", "application/json").end(arr.encode()));
} finally {
span.end();
}
}
private static Single<JsonArray> listProducts(Long requestId) {
Span span = tracer.spanBuilder("listProducts").startSpan();
try (Scope ignored = Context.current().with(span).makeCurrent()) {
attachRequestIdToCurrentSpan(requestId);
String queryInfix = requestId != null ? " AS request" + requestId : "";
return client
.rxQuery("SELECT id" + queryInfix + ", name, price, weight FROM products")
.flatMap(
result -> {
JsonArray arr = new JsonArray();
result.getRows().forEach(arr::add);
return Single.just(arr);
});
} finally {
span.end();
}
}
private static Long extractRequestId(RoutingContext routingContext) {
String requestIdString = routingContext.request().params().get(TEST_REQUEST_ID_PARAMETER);
return requestIdString != null ? Long.valueOf(requestIdString) : null;
}
private static void attachRequestIdToCurrentSpan(Long requestId) {
if (requestId != null) {
Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, requestId);
}
}
private static void setUpInitialData(Handler<Void> done) {
client.getConnection(
res -> {
if (res.failed()) {
throw new IllegalStateException(res.cause());
}
SQLConnection conn = res.result();
conn.execute(
"CREATE TABLE IF NOT EXISTS products(id INT IDENTITY, name VARCHAR(255), price FLOAT, weight INT)",
ddl -> {
if (ddl.failed()) {
throw new IllegalStateException(ddl.cause());
}
conn.execute(
"INSERT INTO products (name, price, weight) VALUES ('Egg Whisk', 3.99, 150), ('Tea Cosy', 5.99, 100), ('Spatula', 1.00, 80)",
fixtures -> {
if (fixtures.failed()) {
throw new IllegalStateException(fixtures.cause());
}
done.handle(null);
});
});
});
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package client;
import io.vertx.core.AsyncResult;
import io.vertx.reactivex.circuitbreaker.CircuitBreaker;
import io.vertx.reactivex.ext.web.client.HttpRequest;
import io.vertx.reactivex.ext.web.client.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
public class VertxRxCircuitBreakerSingleConnection extends VertxRxSingleConnection {
private final CircuitBreaker breaker;
public VertxRxCircuitBreakerSingleConnection(String host, int port, CircuitBreaker breaker) {
super(host, port);
this.breaker = breaker;
}
@Override
protected HttpResponse<?> fetchResponse(HttpRequest<?> request) {
CompletableFuture<Object> future = new CompletableFuture<>();
sendRequestWithCallback(
request,
it -> {
if (it.succeeded()) {
future.complete(it.result());
} else {
future.completeExceptionally(it.cause());
}
});
return (HttpResponse<?>) future.join();
}
private void sendRequestWithCallback(HttpRequest<?> request, Consumer<AsyncResult<?>> consumer) {
breaker.execute(
command ->
request.rxSend().doOnSuccess(command::complete).doOnError(command::fail).subscribe(),
consumer::accept);
}
}

View File

@ -0,0 +1,10 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.netty.handler.codec.haproxy;
// instrumentation fails without this class
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class HAProxyMessage {}

View File

@ -0,0 +1,10 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.netty.handler.codec.haproxy;
// instrumentation fails without this class
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class HAProxyProxiedProtocol {}

View File

@ -0,0 +1,199 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import static VertxReactiveWebServer.TEST_REQUEST_ID_ATTRIBUTE
import static VertxReactiveWebServer.TEST_REQUEST_ID_PARAMETER
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.SERVER
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.context.Context
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.utils.PortUtils
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import io.opentelemetry.testing.internal.armeria.client.WebClient
import io.opentelemetry.testing.internal.armeria.common.HttpRequest
import io.opentelemetry.testing.internal.armeria.common.HttpRequestBuilder
import io.vertx.reactivex.core.Vertx
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import spock.lang.Shared
class VertxReactivePropagationTest extends AgentInstrumentationSpecification {
@Shared
WebClient client
@Shared
int port
@Shared
Vertx server
def setupSpec() {
port = PortUtils.findOpenPort()
server = VertxReactiveWebServer.start(port)
client = WebClient.of("h1c://localhost:${port}")
}
def cleanupSpec() {
server.close()
}
//Verifies that context is correctly propagated and sql query span has correct parent.
//Tests io.opentelemetry.javaagent.instrumentation.vertx.reactive.VertxRxInstrumentation
def "should propagate context over vert.x rx-java framework"() {
setup:
def response = client.get("/listProducts").aggregate().join()
expect:
response.status().code() == SUCCESS.status
and:
assertTraces(1) {
trace(0, 4) {
span(0) {
name "/listProducts"
kind SERVER
hasNoParent()
attributes {
"${SemanticAttributes.NET_PEER_PORT.key}" Long
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
"${SemanticAttributes.HTTP_URL.key}" "http://localhost:${port}/listProducts"
"${SemanticAttributes.HTTP_METHOD.key}" "GET"
"${SemanticAttributes.HTTP_STATUS_CODE.key}" 200
"${SemanticAttributes.HTTP_FLAVOR.key}" "1.1"
"${SemanticAttributes.HTTP_USER_AGENT.key}" String
"${SemanticAttributes.HTTP_CLIENT_IP.key}" "127.0.0.1"
}
}
span(1) {
name "handleListProducts"
kind SpanKind.INTERNAL
childOf span(0)
}
span(2) {
name "listProducts"
kind SpanKind.INTERNAL
childOf span(1)
}
span(3) {
name "SELECT test.products"
kind CLIENT
childOf span(2)
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "hsqldb"
"${SemanticAttributes.DB_NAME.key}" "test"
"${SemanticAttributes.DB_USER.key}" "SA"
"${SemanticAttributes.DB_CONNECTION_STRING.key}" "hsqldb:mem:"
"${SemanticAttributes.DB_STATEMENT.key}" "SELECT id, name, price, weight FROM products"
"${SemanticAttributes.DB_OPERATION.key}" "SELECT"
"${SemanticAttributes.DB_SQL_TABLE.key}" "products"
}
}
}
}
}
def "should propagate context correctly over vert.x rx-java framework with high concurrency"() {
setup:
int count = 100
def baseUrl = "/listProducts"
def latch = new CountDownLatch(1)
def pool = Executors.newFixedThreadPool(8)
def propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
def setter = { HttpRequestBuilder carrier, String name, String value ->
carrier.header(name, value)
}
when:
count.times { index ->
def job = {
latch.await()
runWithSpan("client " + index) {
HttpRequestBuilder builder = HttpRequest.builder()
.get("${baseUrl}?${TEST_REQUEST_ID_PARAMETER}=${index}")
Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index)
propagator.inject(Context.current(), builder, setter)
client.execute(builder.build()).aggregate().join()
}
}
pool.submit(job)
}
latch.countDown()
then:
assertTraces(count) {
(0..count - 1).each {
trace(it, 5) {
def rootSpan = it.span(0)
def requestId = Long.valueOf(rootSpan.name.substring("client ".length()))
span(0) {
name "client $requestId"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
"${TEST_REQUEST_ID_ATTRIBUTE}" requestId
}
}
span(1) {
name "/listProducts"
kind SERVER
childOf(span(0))
attributes {
"${SemanticAttributes.NET_PEER_PORT.key}" Long
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
"${SemanticAttributes.HTTP_URL.key}" "http://localhost:$port$baseUrl?$TEST_REQUEST_ID_PARAMETER=$requestId"
"${SemanticAttributes.HTTP_METHOD.key}" "GET"
"${SemanticAttributes.HTTP_STATUS_CODE.key}" 200
"${SemanticAttributes.HTTP_FLAVOR.key}" "1.1"
"${SemanticAttributes.HTTP_USER_AGENT.key}" String
"${SemanticAttributes.HTTP_CLIENT_IP.key}" "127.0.0.1"
"${TEST_REQUEST_ID_ATTRIBUTE}" requestId
}
}
span(2) {
name "handleListProducts"
kind SpanKind.INTERNAL
childOf(span(1))
attributes {
"${TEST_REQUEST_ID_ATTRIBUTE}" requestId
}
}
span(3) {
name "listProducts"
kind SpanKind.INTERNAL
childOf(span(2))
attributes {
"${TEST_REQUEST_ID_ATTRIBUTE}" requestId
}
}
span(4) {
name "SELECT test.products"
kind CLIENT
childOf(span(3))
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "hsqldb"
"${SemanticAttributes.DB_NAME.key}" "test"
"${SemanticAttributes.DB_USER.key}" "SA"
"${SemanticAttributes.DB_CONNECTION_STRING.key}" "hsqldb:mem:"
"${SemanticAttributes.DB_STATEMENT.key}" "SELECT id AS request$requestId, name, price, weight FROM products"
"${SemanticAttributes.DB_OPERATION.key}" "SELECT"
"${SemanticAttributes.DB_SQL_TABLE.key}" "products"
}
}
}
}
}
cleanup:
pool.shutdownNow()
}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package client;
import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection;
import io.vertx.core.VertxOptions;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.ext.web.client.HttpRequest;
import io.vertx.reactivex.ext.web.client.HttpResponse;
import io.vertx.reactivex.ext.web.client.WebClient;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
public class VertxRxSingleConnection implements SingleConnection {
private final WebClient webClient;
private final String host;
private final int port;
public VertxRxSingleConnection(String host, int port) {
this.host = host;
this.port = port;
WebClientOptions clientOptions =
new WebClientOptions()
.setConnectTimeout(5000)
.setMaxPoolSize(1)
.setKeepAlive(true)
.setPipelining(true);
Vertx vertx = Vertx.vertx(new VertxOptions());
this.webClient = WebClient.create(vertx, clientOptions);
}
@Override
public int doRequest(String path, Map<String, String> headers) throws ExecutionException {
String requestId = Objects.requireNonNull(headers.get(REQUEST_ID_HEADER));
String url;
try {
url = new URL("http", host, port, path).toString();
} catch (MalformedURLException e) {
throw new ExecutionException(e);
}
HttpRequest<Buffer> request = webClient.request(HttpMethod.GET, port, host, url);
headers.forEach(request::putHeader);
HttpResponse<?> response = fetchResponse(request);
String responseId = response.getHeader(REQUEST_ID_HEADER);
if (!requestId.equals(responseId)) {
throw new IllegalStateException(
String.format("Received response with id %s, expected %s", responseId, requestId));
}
return response.statusCode();
}
protected HttpResponse<?> fetchResponse(HttpRequest<?> request) {
return request.rxSend().blockingGet();
}
}