Vert.x Reactive HTTP server and client concurrency tests (#3061)

* Vert.x Reactive HTTP server and client concurrency tests

* Concurrency test for VertxReactivePropagationTest
This commit is contained in:
Ago Allikmaa 2021-05-24 22:01:05 +03:00 committed by GitHub
parent 2943acc39a
commit c28af1f50e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 270 additions and 5 deletions

View File

@ -3,16 +3,26 @@
* SPDX-License-Identifier: Apache-2.0
*/
import static VertxReactiveWebServer.TEST_REQUEST_ID_ATTRIBUTE
import static VertxReactiveWebServer.TEST_REQUEST_ID_PARAMETER
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.SERVER
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicClientSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicServerSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span
import io.opentelemetry.context.Context
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.utils.OkHttpUtils
import io.opentelemetry.instrumentation.test.utils.PortUtils
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import io.vertx.reactivex.core.Vertx
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import okhttp3.OkHttpClient
import okhttp3.Request
import spock.lang.Shared
@ -85,5 +95,76 @@ class VertxReactivePropagationTest extends AgentInstrumentationSpecification {
}
}
def "should propagate context correctly over vert.x rx-java framework with high concurrency"() {
setup:
int count = 100
def baseUrl = "http://localhost:$port/listProducts"
def latch = new CountDownLatch(1)
def pool = Executors.newFixedThreadPool(8)
def propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
def setter = { Request.Builder carrier, String name, String value ->
carrier.header(name, value)
}
when:
count.times { index ->
def job = {
latch.await()
Request.Builder builder = new Request.Builder().url("$baseUrl?$TEST_REQUEST_ID_PARAMETER=$index").get()
runUnderTrace("client " + index) {
Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index)
propagator.inject(Context.current(), builder, setter)
client.newCall(builder.build()).execute()
}
}
pool.submit(job)
}
latch.countDown()
then:
assertTraces(count) {
(0..count - 1).each {
trace(it, 5) {
def rootSpan = it.span(0)
def requestId = Long.valueOf(rootSpan.name.substring("client ".length()))
basicSpan(it, 0, "client $requestId", null, null) {
"${TEST_REQUEST_ID_ATTRIBUTE}" requestId
}
basicServerSpan(it, 1, "/listProducts", span(0), null) {
"${SemanticAttributes.NET_PEER_PORT.key}" Long
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
"${SemanticAttributes.HTTP_URL.key}" "$baseUrl?$TEST_REQUEST_ID_PARAMETER=$requestId"
"${SemanticAttributes.HTTP_METHOD.key}" "GET"
"${SemanticAttributes.HTTP_STATUS_CODE.key}" 200
"${SemanticAttributes.HTTP_FLAVOR.key}" "1.1"
"${SemanticAttributes.HTTP_USER_AGENT.key}" String
"${SemanticAttributes.HTTP_CLIENT_IP.key}" "127.0.0.1"
"${TEST_REQUEST_ID_ATTRIBUTE}" requestId
}
basicSpan(it, 2, "handleListProducts", span(1), null) {
"${TEST_REQUEST_ID_ATTRIBUTE}" requestId
}
basicSpan(it, 3, "listProducts", span(2), null) {
"${TEST_REQUEST_ID_ATTRIBUTE}" requestId
}
basicClientSpan(it, 4, "SELECT test.products", span(3), null) {
"${SemanticAttributes.DB_SYSTEM.key}" "hsqldb"
"${SemanticAttributes.DB_NAME.key}" "test"
"${SemanticAttributes.DB_USER.key}" "SA"
"${SemanticAttributes.DB_CONNECTION_STRING.key}" "hsqldb:mem:"
"${SemanticAttributes.DB_STATEMENT.key}" "SELECT id AS request$requestId, name, price, weight FROM products"
"${SemanticAttributes.DB_OPERATION.key}" "SELECT"
"${SemanticAttributes.DB_SQL_TABLE.key}" "products"
}
}
}
}
cleanup:
pool.shutdownNow()
}
}

View File

@ -7,6 +7,7 @@ package client
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.test.base.SingleConnection
import io.vertx.circuitbreaker.CircuitBreakerOptions
import io.vertx.core.AsyncResult
import io.vertx.core.VertxOptions
@ -96,7 +97,12 @@ class VertxRxCircuitBreakerWebClientTest extends HttpClientTest<HttpRequest<?>>
@Override
boolean testCausality() {
false
true
}
@Override
SingleConnection createSingleConnection(String host, int port) {
return new VertxRxCircuitBreakerSingleConnection(host, port, breaker)
}
@Override

View File

@ -8,6 +8,7 @@ package client
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.asserts.SpanAssert
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.test.base.SingleConnection
import io.vertx.core.VertxOptions
import io.vertx.core.http.HttpMethod
import io.vertx.ext.web.client.WebClientOptions
@ -85,7 +86,12 @@ class VertxRxWebClientTest extends HttpClientTest<HttpRequest<Buffer>> implement
@Override
boolean testCausality() {
false
true
}
@Override
SingleConnection createSingleConnection(String host, int port) {
return new VertxRxSingleConnection(host, port)
}
@Override

View File

@ -7,11 +7,13 @@ package server
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.ERROR
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.INDEXED_CHILD
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.REDIRECT
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS
import io.opentelemetry.api.trace.Span
import io.opentelemetry.instrumentation.test.base.HttpServerTest
import io.vertx.circuitbreaker.CircuitBreakerOptions
import io.vertx.core.Future
@ -53,6 +55,20 @@ class VertxRxCircuitBreakerHttpServerTest extends VertxRxHttpServerTest {
}
})
}
router.route(INDEXED_CHILD.path).handler { ctx ->
breaker.executeCommand({ future ->
future.complete(INDEXED_CHILD)
}, { it ->
if (it.failed()) {
throw it.cause()
}
HttpServerTest.ServerEndpoint endpoint = it.result()
controller(endpoint) {
Span.current().setAttribute("test.request.id", ctx.request().params().get("id") as long)
ctx.response().setStatusCode(endpoint.status).end()
}
})
}
router.route(QUERY_PARAM.path).handler { ctx ->
breaker.executeCommand({ future ->
future.complete(QUERY_PARAM)

View File

@ -7,12 +7,14 @@ package server
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.ERROR
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.INDEXED_CHILD
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.NOT_FOUND
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.REDIRECT
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS
import io.opentelemetry.api.trace.Span
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpServerTest
import io.vertx.core.DeploymentOptions
@ -71,6 +73,11 @@ class VertxRxHttpServerTest extends HttpServerTest<Vertx> implements AgentTestTr
}
}
@Override
boolean testConcurrency() {
return true
}
protected Class<AbstractVerticle> verticle() {
return VertxReactiveWebServer
}
@ -87,6 +94,12 @@ class VertxRxHttpServerTest extends HttpServerTest<Vertx> implements AgentTestTr
ctx.response().setStatusCode(SUCCESS.status).end(SUCCESS.body)
}
}
router.route(INDEXED_CHILD.path).handler { ctx ->
controller(QUERY_PARAM) {
Span.current().setAttribute("test.request.id", ctx.request().params().get("id") as long)
ctx.response().setStatusCode(INDEXED_CHILD.status).end()
}
}
router.route(QUERY_PARAM.path).handler { ctx ->
controller(QUERY_PARAM) {
ctx.response().setStatusCode(QUERY_PARAM.status).end(ctx.request().query())

View File

@ -36,6 +36,9 @@ public class VertxReactiveWebServer extends AbstractVerticle {
private static final Tracer tracer = GlobalOpenTelemetry.getTracer("test");
public static final String TEST_REQUEST_ID_PARAMETER = "test-request-id";
public static final String TEST_REQUEST_ID_ATTRIBUTE = "test.request.id";
private static final String CONFIG_HTTP_SERVER_PORT = "http.server.port";
private static JDBCClient client;
@ -94,10 +97,15 @@ public class VertxReactiveWebServer extends AbstractVerticle {
}
private void handleListProducts(RoutingContext routingContext) {
Long requestId = extractRequestId(routingContext);
attachRequestIdToCurrentSpan(requestId);
Span span = tracer.spanBuilder("handleListProducts").startSpan();
try (Scope ignored = Context.current().with(span).makeCurrent()) {
attachRequestIdToCurrentSpan(requestId);
HttpServerResponse response = routingContext.response();
Single<JsonArray> jsonArraySingle = listProducts();
Single<JsonArray> jsonArraySingle = listProducts(requestId);
jsonArraySingle.subscribe(
arr -> response.putHeader("content-type", "application/json").end(arr.encode()));
@ -106,11 +114,14 @@ public class VertxReactiveWebServer extends AbstractVerticle {
}
}
private Single<JsonArray> listProducts() {
private Single<JsonArray> listProducts(Long requestId) {
Span span = tracer.spanBuilder("listProducts").startSpan();
try (Scope ignored = Context.current().with(span).makeCurrent()) {
attachRequestIdToCurrentSpan(requestId);
String queryInfix = requestId != null ? " AS request" + requestId : "";
return client
.rxQuery("SELECT id, name, price, weight FROM products")
.rxQuery("SELECT id" + queryInfix + ", name, price, weight FROM products")
.flatMap(
result -> {
Thread.dumpStack();
@ -123,6 +134,17 @@ public class VertxReactiveWebServer extends AbstractVerticle {
}
}
private Long extractRequestId(RoutingContext routingContext) {
String requestIdString = routingContext.request().params().get(TEST_REQUEST_ID_PARAMETER);
return requestIdString != null ? Long.valueOf(requestIdString) : null;
}
private void attachRequestIdToCurrentSpan(Long requestId) {
if (requestId != null) {
Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, requestId);
}
}
private void setUpInitialData(Handler<Void> done) {
client.getConnection(
res -> {

View File

@ -0,0 +1,50 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package client;
import io.vertx.core.AsyncResult;
import io.vertx.reactivex.circuitbreaker.CircuitBreaker;
import io.vertx.reactivex.ext.web.client.HttpRequest;
import io.vertx.reactivex.ext.web.client.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
public class VertxRxCircuitBreakerSingleConnection extends VertxRxSingleConnection {
private final CircuitBreaker breaker;
public VertxRxCircuitBreakerSingleConnection(String host, int port, CircuitBreaker breaker) {
super(host, port);
this.breaker = breaker;
}
@Override
protected HttpResponse<?> fetchResponse(HttpRequest<?> request) {
CompletableFuture<Object> future = new CompletableFuture<>();
sendRequestWithCallback(
request,
it -> {
if (it.succeeded()) {
future.complete(it.result());
} else {
future.completeExceptionally(it.cause());
}
});
try {
return (HttpResponse<?>) future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void sendRequestWithCallback(HttpRequest<?> request, Consumer<AsyncResult<?>> consumer) {
breaker.executeCommand(
command ->
request.rxSend().doOnSuccess(command::complete).doOnError(command::fail).subscribe(),
consumer::accept);
}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package client;
import io.opentelemetry.instrumentation.test.base.SingleConnection;
import io.vertx.core.VertxOptions;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.ext.web.client.HttpRequest;
import io.vertx.reactivex.ext.web.client.HttpResponse;
import io.vertx.reactivex.ext.web.client.WebClient;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
public class VertxRxSingleConnection implements SingleConnection {
private final WebClient webClient;
private final String host;
private final int port;
public VertxRxSingleConnection(String host, int port) {
this.host = host;
this.port = port;
WebClientOptions clientOptions =
new WebClientOptions()
.setConnectTimeout(5000)
.setMaxPoolSize(1)
.setKeepAlive(true)
.setPipelining(true);
Vertx vertx = Vertx.vertx(new VertxOptions());
this.webClient = WebClient.create(vertx, clientOptions);
}
@Override
public int doRequest(String path, Map<String, String> headers) throws ExecutionException {
String requestId = Objects.requireNonNull(headers.get(REQUEST_ID_HEADER));
String url;
try {
url = new URL("http", host, port, path).toString();
} catch (MalformedURLException e) {
throw new ExecutionException(e);
}
HttpRequest<Buffer> request = webClient.request(HttpMethod.GET, port, host, url);
headers.forEach(request::putHeader);
HttpResponse<?> response = fetchResponse(request);
String responseId = response.getHeader(REQUEST_ID_HEADER);
if (!requestId.equals(responseId)) {
throw new IllegalStateException(
String.format("Received response with id %s, expected %s", responseId, requestId));
}
return response.statusCode();
}
protected HttpResponse<?> fetchResponse(HttpRequest<?> request) {
return request.rxSend().blockingGet();
}
}