Vert.x instrumentation improvements (#503)

* Support for Vert.x rx-java async tasks

* Use Vert.x route for server span name

* Move reactive Vert.x instrumentation into separate module

* Test fixes

* Format fixes

* Polish

* Fix license header

* Add Vert.x to README
This commit is contained in:
Nikita Salnikov-Tarnovski 2020-06-13 23:18:30 +03:00 committed by GitHub
parent 4a26bd3ce7
commit c11835963f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1016 additions and 108 deletions

View File

@ -135,6 +135,8 @@ provide the path to a JAR file including an SPI implementation using the system
| [Spring Webflux](https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/web/reactive/package-summary.html) | 5.0+ |
| [Spymemcached](https://github.com/couchbase/spymemcached) | 2.12+ |
| [Twilio](https://github.com/twilio/twilio-java) | 6.6+ |
| [Vert.x](https://vertx.io) | 3.0+ |
| [Vert.x RxJava2](https://vertx.io/docs/vertx-rx/java2/) | 3.5+ |
### Disabled instrumentations

View File

@ -43,7 +43,7 @@ abstract class AkkaHttpServerInstrumentationTest extends HttpServerTest<Object>
void serverSpan(TraceAssert trace, int index, String traceID = null, String parentID = null, String method = "GET", ServerEndpoint endpoint = SUCCESS) {
trace.span(index) {
operationName expectedOperationName(method)
operationName expectedOperationName(method, endpoint)
spanKind SERVER
errored endpoint.errored
if (parentID != null) {

View File

@ -106,7 +106,7 @@ class GrizzlyTest extends HttpServerTest<HttpServer> {
}
@Override
String expectedOperationName(String method) {
String expectedOperationName(String method, ServerEndpoint serverEndpoint) {
return 'HttpHandler.doHandle'
}
}

View File

@ -111,7 +111,7 @@ class PlayServerTest extends HttpServerTest<Server> {
void serverSpan(TraceAssert trace, int index, String traceID = null, String parentID = null, String method = "GET", ServerEndpoint endpoint = SUCCESS) {
trace.span(index) {
operationName expectedOperationName(method)
operationName expectedOperationName(method, endpoint)
spanKind SERVER
errored endpoint.errored
if (parentID != null) {

View File

@ -0,0 +1,57 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.vertx.reactive;
import static io.opentelemetry.auto.instrumentation.vertx.reactive.VertxDecorator.TRACER;
import io.opentelemetry.context.Scope;
import io.opentelemetry.trace.Span;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class AsyncResultConsumerWrapper implements Consumer<Handler<AsyncResult<?>>> {
private final Consumer<Handler<AsyncResult<?>>> delegate;
private final Span parentSpan;
public AsyncResultConsumerWrapper(
final Consumer<Handler<AsyncResult<?>>> delegate, Span parentSpan) {
this.delegate = delegate;
this.parentSpan = parentSpan;
}
@Override
public void accept(final Handler<AsyncResult<?>> asyncResultHandler) {
if (parentSpan != null) {
try (final Scope scope = TRACER.withSpan(parentSpan)) {
delegate.accept(asyncResultHandler);
}
} else {
delegate.accept(asyncResultHandler);
}
}
public static Consumer<Handler<AsyncResult<?>>> wrapIfNeeded(
final Consumer<Handler<AsyncResult<?>>> delegate, final Span parentSpan) {
if (!(delegate instanceof AsyncResultConsumerWrapper)) {
log.debug("Wrapping consumer {}", delegate);
return new AsyncResultConsumerWrapper(delegate, parentSpan);
}
return delegate;
}
}

View File

@ -0,0 +1,54 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.vertx.reactive;
import io.opentelemetry.context.Scope;
import io.opentelemetry.trace.Span;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class AsyncResultHandlerWrapper implements Handler<Handler<AsyncResult<?>>> {
private final Handler<Handler<AsyncResult<?>>> delegate;
private final Span parentSpan;
public AsyncResultHandlerWrapper(
final Handler<Handler<AsyncResult<?>>> delegate, Span parentSpan) {
this.delegate = delegate;
this.parentSpan = parentSpan;
}
@Override
public void handle(final Handler<AsyncResult<?>> asyncResultHandler) {
if (parentSpan != null) {
try (final Scope scope = VertxDecorator.TRACER.withSpan(parentSpan)) {
delegate.handle(asyncResultHandler);
}
} else {
delegate.handle(asyncResultHandler);
}
}
public static Handler<Handler<AsyncResult<?>>> wrapIfNeeded(
final Handler<Handler<AsyncResult<?>>> delegate, final Span parentSpan) {
if (!(delegate instanceof AsyncResultHandlerWrapper)) {
log.debug("Wrapping handler {}", delegate);
return new AsyncResultHandlerWrapper(delegate, parentSpan);
}
return delegate;
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.vertx.reactive;
import io.opentelemetry.OpenTelemetry;
import io.opentelemetry.auto.bootstrap.instrumentation.decorator.BaseDecorator;
import io.opentelemetry.trace.Tracer;
public class VertxDecorator extends BaseDecorator {
public static final Tracer TRACER =
OpenTelemetry.getTracerProvider().get("io.opentelemetry.auto.vertx");
}

View File

@ -0,0 +1,95 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.vertx.reactive;
import static io.opentelemetry.auto.instrumentation.vertx.reactive.VertxDecorator.TRACER;
import static io.opentelemetry.auto.tooling.ClassLoaderMatcher.hasClassesNamed;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import io.opentelemetry.auto.tooling.Instrumenter;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
/** This instrumentation allows span context propagation across Vert.x reactive executions. */
@AutoService(Instrumenter.class)
public class VertxRxInstrumentation extends Instrumenter.Default {
public VertxRxInstrumentation() {
super("vertx");
}
@Override
public ElementMatcher<ClassLoader> classLoaderMatcher() {
// Different versions of Vert.x has this class in different packages
return hasClassesNamed("io.vertx.reactivex.core.impl.AsyncResultSingle")
.or(hasClassesNamed("io.vertx.reactivex.impl.AsyncResultSingle"));
}
@Override
public ElementMatcher<? super TypeDescription> typeMatcher() {
return named("io.vertx.reactivex.core.impl.AsyncResultSingle")
.or(named("io.vertx.reactivex.impl.AsyncResultSingle"));
}
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".AsyncResultConsumerWrapper",
packageName + ".AsyncResultHandlerWrapper",
packageName + ".VertxDecorator"
};
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
Map<ElementMatcher<? super MethodDescription>, String> result = new HashMap<>();
result.put(
isConstructor().and(takesArgument(0, named("io.vertx.core.Handler"))),
this.getClass().getName() + "$AsyncResultSingleHandlerAdvice");
result.put(
isConstructor().and(takesArgument(0, named("java.util.function.Consumer"))),
this.getClass().getName() + "$AsyncResultSingleConsumerAdvice");
return result;
}
public static class AsyncResultSingleHandlerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapHandler(
@Advice.Argument(value = 0, readOnly = false) Handler<Handler<AsyncResult<?>>> handler) {
handler = AsyncResultHandlerWrapper.wrapIfNeeded(handler, TRACER.getCurrentSpan());
}
}
public static class AsyncResultSingleConsumerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapHandler(
@Advice.Argument(value = 0, readOnly = false) Consumer<Handler<AsyncResult<?>>> handler) {
handler = AsyncResultConsumerWrapper.wrapIfNeeded(handler, TRACER.getCurrentSpan());
}
}
}

View File

@ -0,0 +1,9 @@
/**
* The majority of monitoring needs of Vert.x application is covered by generic instrumentations.
* Such as those of netty or JDBC.
*
* <p>{@link io.opentelemetry.auto.instrumentation.vertx.reactive.VertxRxInstrumentation} wraps
* {code AsyncResultSingle} classes from Vert.x RxJava library to ensure proper span context
* propagation in reactive Vert.x applications.
*/
package io.opentelemetry.auto.instrumentation.vertx.reactive;

View File

@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static net.bytebuddy.matcher.ElementMatchers.named;
import com.google.auto.service.AutoService;
import io.opentelemetry.auto.test.base.HttpServerTestAdvice;
import io.opentelemetry.auto.tooling.Instrumenter;
import net.bytebuddy.agent.builder.AgentBuilder;
@AutoService(Instrumenter.class)
public class NettyServerTestInstrumentation implements Instrumenter {
@Override
public AgentBuilder instrument(final AgentBuilder agentBuilder) {
return agentBuilder
.type(named("io.netty.handler.codec.ByteToMessageDecoder"))
.transform(
new AgentBuilder.Transformer.ForAdvice()
.advice(
named("channelRead"), HttpServerTestAdvice.ServerEntryAdvice.class.getName()));
}
}

View File

@ -0,0 +1,98 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import io.opentelemetry.auto.instrumentation.api.MoreTags
import io.opentelemetry.auto.instrumentation.api.Tags
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.auto.test.utils.OkHttpUtils
import io.opentelemetry.auto.test.utils.PortUtils
import io.vertx.reactivex.core.Vertx
import okhttp3.OkHttpClient
import okhttp3.Request
import spock.lang.Shared
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.SUCCESS
import static io.opentelemetry.auto.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.trace.Span.Kind.CLIENT
import static io.opentelemetry.trace.Span.Kind.SERVER
class VertxReactivePropagationTest extends AgentTestRunner {
@Shared
OkHttpClient client = OkHttpUtils.client()
@Shared
int port
@Shared
Vertx server
def setupSpec() {
port = PortUtils.randomOpenPort()
server = VertxReactiveWebServer.start(port)
}
def cleanupSpec() {
server.close()
}
//Verifies that context is correctly propagated and sql query span has correct parent.
//Tests io.opentelemetry.auto.instrumentation.vertx.reactive.VertxRxInstrumentation
def "should propagate context over vert.x rx-java framework"() {
setup:
def url = "http://localhost:$port/listProducts"
def request = new Request.Builder().url(url).get().build()
def response = client.newCall(request).execute()
expect:
response.code() == SUCCESS.status
and:
assertTraces(1) {
trace(0, 4) {
span(0) {
operationName "/listProducts"
spanKind SERVER
errored false
parent()
tags {
"$MoreTags.NET_PEER_PORT" Long
"$MoreTags.NET_PEER_IP" { it == null || it == "127.0.0.1" } // Optional
"$Tags.HTTP_URL" url
"$Tags.HTTP_METHOD" "GET"
"$Tags.HTTP_STATUS" 200
}
}
basicSpan(it, 1, "VertxReactiveWebServer.handleListProducts", span(0))
basicSpan(it, 2, "VertxReactiveWebServer.listProducts", span(1))
span(3) {
operationName "SELECT id, name, price, weight FROM products"
spanKind CLIENT
childOf span(2)
errored false
tags {
"$Tags.DB_TYPE" "sql"
"$Tags.DB_INSTANCE" "test?shutdown=true"
"$Tags.DB_USER" "SA"
"$Tags.DB_STATEMENT" "SELECT id, name, price, weight FROM products"
"$Tags.DB_URL" "hsqldb:mem:"
"span.origin.type" String
}
}
}
}
}
}

View File

@ -0,0 +1,147 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.SUCCESS;
import io.opentelemetry.contrib.auto.annotations.WithSpan;
import io.reactivex.Single;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Handler;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.http.HttpServerResponse;
import io.vertx.reactivex.ext.jdbc.JDBCClient;
import io.vertx.reactivex.ext.sql.SQLConnection;
import io.vertx.reactivex.ext.web.Router;
import io.vertx.reactivex.ext.web.RoutingContext;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class VertxReactiveWebServer extends AbstractVerticle {
private static final String CONFIG_HTTP_SERVER_PORT = "http.server.port";
private static JDBCClient client;
public static Vertx start(final int port)
throws ExecutionException, InterruptedException, TimeoutException {
/* This is highly against Vertx ideas, but our tests are synchronous
so we have to make sure server is up and running */
final CompletableFuture<Void> future = new CompletableFuture<>();
final Vertx server = Vertx.vertx(new VertxOptions());
client =
JDBCClient.createShared(
server,
new JsonObject()
.put("url", "jdbc:hsqldb:mem:test?shutdown=true")
.put("driver_class", "org.hsqldb.jdbcDriver"));
log.info("Starting on port {}", port);
server.deployVerticle(
VertxReactiveWebServer.class.getName(),
new DeploymentOptions().setConfig(new JsonObject().put(CONFIG_HTTP_SERVER_PORT, port)),
res -> {
if (!res.succeeded()) {
final RuntimeException exception =
new RuntimeException("Cannot deploy server Verticle", res.cause());
future.completeExceptionally(exception);
}
future.complete(null);
});
// block until vertx server is up
future.get(30, TimeUnit.SECONDS);
return server;
}
@Override
public void start(final io.vertx.core.Future<Void> startFuture) {
setUpInitialData(
ready -> {
final Router router = Router.router(vertx);
final int port = config().getInteger(CONFIG_HTTP_SERVER_PORT);
log.info("Listening on port {}", port);
router
.route(SUCCESS.getPath())
.handler(
ctx -> ctx.response().setStatusCode(SUCCESS.getStatus()).end(SUCCESS.getBody()));
router.route("/listProducts").handler(this::handleListProducts);
vertx
.createHttpServer()
.requestHandler(router::accept)
.listen(port, h -> startFuture.complete());
});
}
@WithSpan
private void handleListProducts(final RoutingContext routingContext) {
final HttpServerResponse response = routingContext.response();
final Single<JsonArray> jsonArraySingle = listProducts();
jsonArraySingle.subscribe(
arr -> response.putHeader("content-type", "application/json").end(arr.encode()));
}
@WithSpan
private Single<JsonArray> listProducts() {
return client
.rxQuery("SELECT id, name, price, weight FROM products")
.flatMap(
result -> {
Thread.dumpStack();
final JsonArray arr = new JsonArray();
result.getRows().forEach(arr::add);
return Single.just(arr);
});
}
private void setUpInitialData(final Handler<Void> done) {
client.getConnection(
res -> {
if (res.failed()) {
throw new RuntimeException(res.cause());
}
final SQLConnection conn = res.result();
conn.execute(
"CREATE TABLE IF NOT EXISTS products(id INT IDENTITY, name VARCHAR(255), price FLOAT, weight INT)",
ddl -> {
if (ddl.failed()) {
throw new RuntimeException(ddl.cause());
}
conn.execute(
"INSERT INTO products (name, price, weight) VALUES ('Egg Whisk', 3.99, 150), ('Tea Cosy', 5.99, 100), ('Spatula', 1.00, 80)",
fixtures -> {
if (fixtures.failed()) {
throw new RuntimeException(fixtures.cause());
}
done.handle(null);
});
});
});
}
}

View File

@ -17,17 +17,19 @@ package server
import io.opentelemetry.auto.test.base.HttpServerTest
import io.vertx.circuitbreaker.CircuitBreakerOptions
import io.vertx.core.Future
import io.vertx.reactivex.circuitbreaker.CircuitBreaker
import io.vertx.reactivex.core.AbstractVerticle
import io.vertx.reactivex.ext.web.Router
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.ERROR
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.REDIRECT
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.SUCCESS
class VertxRxCircuitBreakerHttpServerTest extends VertxHttpServerTest {
class VertxRxCircuitBreakerHttpServerTest extends VertxRxHttpServerTest {
@Override
protected Class<AbstractVerticle> verticle() {
@ -37,7 +39,7 @@ class VertxRxCircuitBreakerHttpServerTest extends VertxHttpServerTest {
static class VertxRxCircuitBreakerWebTestServer extends AbstractVerticle {
@Override
void start(final io.vertx.core.Future<Void> startFuture) {
void start(final Future<Void> startFuture) {
final int port = config().getInteger(CONFIG_HTTP_SERVER_PORT)
final Router router = Router.router(super.@vertx)
final CircuitBreaker breaker =
@ -114,6 +116,20 @@ class VertxRxCircuitBreakerHttpServerTest extends VertxHttpServerTest {
}
})
}
router.route("/path/:id/param").handler { ctx ->
breaker.executeCommand({ future ->
future.complete(PATH_PARAM)
}, {
if (it.failed()) {
throw it.cause()
}
HttpServerTest.ServerEndpoint endpoint = it.result()
controller(endpoint) {
ctx.response().setStatusCode(endpoint.status).end(ctx.request().getParam("id"))
}
})
}
super.@vertx.createHttpServer()
.requestHandler { router.accept(it) }

View File

@ -16,33 +16,34 @@
package server
import io.opentelemetry.auto.test.base.HttpServerTest
import io.vertx.core.AbstractVerticle
import io.vertx.core.DeploymentOptions
import io.vertx.core.Future
import io.vertx.core.Vertx
import io.vertx.core.VertxOptions
import io.vertx.core.json.JsonObject
import io.vertx.ext.web.Router
import io.vertx.reactivex.core.AbstractVerticle
import io.vertx.reactivex.ext.web.Router
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.ERROR
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.REDIRECT
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.SUCCESS
class VertxHttpServerTest extends HttpServerTest<Vertx> {
class VertxRxHttpServerTest extends HttpServerTest<Vertx> {
public static final String CONFIG_HTTP_SERVER_PORT = "http.server.port"
@Override
Vertx startServer(int port) {
def server = Vertx.vertx(new VertxOptions()
Vertx server = Vertx.vertx(new VertxOptions()
// Useful for debugging:
// .setBlockedThreadCheckInterval(Integer.MAX_VALUE)
.setClusterPort(port))
final CompletableFuture<Void> future = new CompletableFuture<>()
server.deployVerticle(verticle().name,
server.deployVerticle(verticle().getName(),
new DeploymentOptions()
.setConfig(new JsonObject().put(CONFIG_HTTP_SERVER_PORT, port))
.setInstances(3)) { res ->
@ -52,14 +53,10 @@ class VertxHttpServerTest extends HttpServerTest<Vertx> {
future.complete(null)
}
future.get()
future.get(30, TimeUnit.SECONDS)
return server
}
protected Class<io.vertx.reactivex.core.AbstractVerticle> verticle() {
return VertxWebTestServer
}
@Override
void stopServer(Vertx server) {
server.close()
@ -67,15 +64,34 @@ class VertxHttpServerTest extends HttpServerTest<Vertx> {
@Override
boolean testExceptionBody() {
false
return false
}
static class VertxWebTestServer extends AbstractVerticle {
@Override
boolean testPathParam() {
return true
}
@Override
boolean testNotFound() {
return false
}
@Override
String expectedOperationName(String method, ServerEndpoint endpoint) {
return endpoint == PATH_PARAM ? "/path/:id/param" : endpoint.getPath()
}
protected Class<AbstractVerticle> verticle() {
return VertxReactiveWebServer
}
static class VertxReactiveWebServer extends AbstractVerticle {
@Override
void start(final Future<Void> startFuture) {
final int port = config().getInteger(CONFIG_HTTP_SERVER_PORT)
final Router router = Router.router(vertx)
final Router router = Router.router(super.@vertx)
router.route(SUCCESS.path).handler { ctx ->
controller(SUCCESS) {
@ -102,8 +118,14 @@ class VertxHttpServerTest extends HttpServerTest<Vertx> {
throw new Exception(EXCEPTION.body)
}
}
router.route("/path/:id/param").handler { ctx ->
controller(PATH_PARAM) {
ctx.response().setStatusCode(PATH_PARAM.status).end(ctx.request().getParam("id"))
}
}
vertx.createHttpServer()
super.@vertx.createHttpServer()
.requestHandler { router.accept(it) }
.listen(port) { startFuture.complete() }
}

View File

@ -1,4 +1,3 @@
// Set properties before any plugins get loaded
ext {
minJavaVersionForTests = JavaVersion.VERSION_1_8
}
@ -6,30 +5,44 @@ ext {
apply from: "$rootDir/gradle/instrumentation.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
muzzle {
pass {
group = 'io.vertx'
module = 'vertx-rx-java2'
versions = "[3.5.0,)"
}
}
testSets {
latestDepTest {
dirName = 'test'
}
}
sourceCompatibility = 1.8
targetCompatibility = 1.8
//The first Vert.x version that uses rx-java 2
ext.vertxVersion = '3.5.0'
dependencies {
// compileOnly group: 'io.vertx', name: 'vertx-web', version: '3.5.0'
compileOnly group: 'io.vertx', name: 'vertx-web', version: vertxVersion
compileOnly group: 'io.vertx', name: 'vertx-rx-java2', version: vertxVersion
testCompile project(':instrumentation:jdbc')
testCompile project(':instrumentation:netty:netty-4.1')
testCompile project(':instrumentation:trace-annotation')
testCompile project(':instrumentation:vertx')
testCompile group: 'io.vertx', name: 'vertx-web', version: vertxVersion
testCompile group: 'io.vertx', name: 'vertx-web-client', version: vertxVersion
testCompile group: 'io.vertx', name: 'vertx-jdbc-client', version: vertxVersion
testCompile group: 'io.vertx', name: 'vertx-circuit-breaker', version: vertxVersion
testCompile group: 'io.vertx', name: 'vertx-rx-java2', version: vertxVersion
testCompile 'org.hsqldb:hsqldb:2.3.4'
// Tests seem to fail before 3.5... maybe a problem with some of the tests?
testCompile group: 'io.vertx', name: 'vertx-web', version: '3.5.0'
testCompile group: 'io.vertx', name: 'vertx-web-client', version: '3.5.0'
testCompile group: 'io.vertx', name: 'vertx-circuit-breaker', version: '3.5.0'
testCompile group: 'io.vertx', name: 'vertx-rx-java2', version: '3.5.0'
// Vert.x 4.0 is incompatible with our tests.
latestDepTestCompile group: 'io.vertx', name: 'vertx-web', version: '3.+'
latestDepTestCompile group: 'io.vertx', name: 'vertx-web-client', version: '3.+'
latestDepTestCompile group: 'io.vertx', name: 'vertx-jdbc-client', version: '3.+'
latestDepTestCompile group: 'io.vertx', name: 'vertx-circuit-breaker', version: '3.+'
latestDepTestCompile group: 'io.vertx', name: 'vertx-rx-java2', version: '3.+'
}

View File

@ -1,74 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package server
import io.vertx.core.Future
import io.vertx.reactivex.core.AbstractVerticle
import io.vertx.reactivex.ext.web.Router
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.ERROR
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.REDIRECT
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.SUCCESS
class VertxRxHttpServerTest extends VertxHttpServerTest {
@Override
protected Class<AbstractVerticle> verticle() {
return VertxRxWebTestServer
}
static class VertxRxWebTestServer extends AbstractVerticle {
@Override
void start(final Future<Void> startFuture) {
final int port = config().getInteger(CONFIG_HTTP_SERVER_PORT)
final Router router = Router.router(super.@vertx)
router.route(SUCCESS.path).handler { ctx ->
controller(SUCCESS) {
ctx.response().setStatusCode(SUCCESS.status).end(SUCCESS.body)
}
}
router.route(QUERY_PARAM.path).handler { ctx ->
controller(QUERY_PARAM) {
ctx.response().setStatusCode(QUERY_PARAM.status).end(ctx.request().query())
}
}
router.route(REDIRECT.path).handler { ctx ->
controller(REDIRECT) {
ctx.response().setStatusCode(REDIRECT.status).putHeader("location", REDIRECT.body).end()
}
}
router.route(ERROR.path).handler { ctx ->
controller(ERROR) {
ctx.response().setStatusCode(ERROR.status).end(ERROR.body)
}
}
router.route(EXCEPTION.path).handler { ctx ->
controller(EXCEPTION) {
throw new Exception(EXCEPTION.body)
}
}
super.@vertx.createHttpServer()
.requestHandler { router.accept(it) }
.listen(port) { startFuture.complete() }
}
}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.vertx;
import static io.opentelemetry.auto.tooling.ClassLoaderMatcher.hasClassesNamed;
import static io.opentelemetry.auto.tooling.bytebuddy.matcher.AgentElementMatchers.safeHasSuperType;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import io.opentelemetry.auto.tooling.Instrumenter;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public final class RouteInstrumentation extends Instrumenter.Default {
public RouteInstrumentation() {
super("vertx");
}
@Override
public ElementMatcher<ClassLoader> classLoaderMatcher() {
return hasClassesNamed("io.vertx.ext.web.Route");
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isInterface()).and(safeHasSuperType(named("io.vertx.ext.web.Route")));
}
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".RoutingContextHandlerWrapper", packageName + ".VertxDecorator",
};
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod().and(named("handler")).and(takesArgument(0, named("io.vertx.core.Handler"))),
RouteInstrumentation.class.getName() + "$RouteAdvice");
}
public static class RouteAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapHandler(
@Advice.Argument(value = 0, readOnly = false) Handler<RoutingContext> handler) {
handler = new RoutingContextHandlerWrapper(handler);
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.vertx;
import static io.opentelemetry.auto.instrumentation.vertx.VertxDecorator.TRACER;
import io.opentelemetry.trace.Span;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;
import lombok.extern.slf4j.Slf4j;
/** This is used to wrap Vert.x Handlers to provide nice user-friendly SERVER span names */
@Slf4j
public final class RoutingContextHandlerWrapper implements Handler<RoutingContext> {
private final Handler<RoutingContext> handler;
public RoutingContextHandlerWrapper(final Handler<RoutingContext> handler) {
this.handler = handler;
}
@Override
public void handle(RoutingContext context) {
try {
Span currentSpan = TRACER.getCurrentSpan();
if (currentSpan.getContext().isValid()) {
// TODO should update only SERVER span using
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/465
currentSpan.updateName(context.currentRoute().getPath());
}
} catch (Exception ex) {
log.error("Failed to update server span name with vert.x route", ex);
}
handler.handle(context);
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.vertx;
import io.opentelemetry.OpenTelemetry;
import io.opentelemetry.auto.bootstrap.instrumentation.decorator.BaseDecorator;
import io.opentelemetry.trace.Tracer;
public class VertxDecorator extends BaseDecorator {
public static final Tracer TRACER =
OpenTelemetry.getTracerProvider().get("io.opentelemetry.auto.vertx");
}

View File

@ -0,0 +1,14 @@
/**
* The majority of monitoring needs of Vert.x application is covered by generic instrumentations.
* Such as those of netty or JDBC.
*
* <p>{@link io.opentelemetry.auto.instrumentation.vertx.RouteInstrumentation} wraps all Vert.x
* route handlers in order to update the name of the currently active SERVER span with the name of
* route. This is, arguably, a much more user-friendly name that defaults provided by HTTP server
* instrumentations.
*
* <p>{@link io.opentelemetry.auto.instrumentation.vertx.reactive.VertxRxInstrumentation} wraps
* {code AsyncResultSingle} classes from Vert.x RxJava library to ensure proper span context
* propagation in reactive Vert.x applications.
*/
package io.opentelemetry.auto.instrumentation.vertx;

View File

@ -0,0 +1,82 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package server
import io.opentelemetry.auto.test.base.HttpServerTest
import io.vertx.core.AbstractVerticle
import io.vertx.core.DeploymentOptions
import io.vertx.core.Vertx
import io.vertx.core.VertxOptions
import io.vertx.core.json.JsonObject
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM
class VertxHttpServerTest extends HttpServerTest<Vertx> {
public static final String CONFIG_HTTP_SERVER_PORT = "http.server.port"
@Override
Vertx startServer(int port) {
Vertx server = Vertx.vertx(new VertxOptions()
// Useful for debugging:
// .setBlockedThreadCheckInterval(Integer.MAX_VALUE)
.setClusterPort(port))
final CompletableFuture<Void> future = new CompletableFuture<>()
server.deployVerticle(verticle().getName(),
new DeploymentOptions()
.setConfig(new JsonObject().put(CONFIG_HTTP_SERVER_PORT, port))
.setInstances(3)) { res ->
if (!res.succeeded()) {
throw new RuntimeException("Cannot deploy server Verticle", res.cause())
}
future.complete(null)
}
future.get(30, TimeUnit.SECONDS)
return server
}
protected Class<? extends AbstractVerticle> verticle() {
return VertxWebServer
}
@Override
void stopServer(Vertx server) {
server.close()
}
@Override
boolean testExceptionBody() {
return false
}
@Override
boolean testPathParam() {
return true
}
@Override
boolean testNotFound() {
return false
}
@Override
String expectedOperationName(String method, ServerEndpoint endpoint) {
return endpoint == PATH_PARAM ? "/path/:id/param" : endpoint.getPath()
}
}

View File

@ -0,0 +1,123 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package server;
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.ERROR;
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.EXCEPTION;
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM;
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM;
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.REDIRECT;
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.SUCCESS;
import io.opentelemetry.auto.test.base.HttpServerTest;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
public class VertxWebServer extends AbstractVerticle {
@Override
public void start(final Future<Void> startFuture) {
final int port = config().getInteger(VertxHttpServerTest.CONFIG_HTTP_SERVER_PORT);
final Router router = Router.router(vertx);
//noinspection Convert2Lambda
router
.route(SUCCESS.getPath())
.handler(
// This is not a closure/lambda on purpose to verify how do we instrument actual Handler
// classes
new Handler<RoutingContext>() {
@Override
public void handle(RoutingContext ctx) {
HttpServerTest.controller(
SUCCESS,
() -> {
ctx.response().setStatusCode(SUCCESS.getStatus()).end(SUCCESS.getBody());
return null;
});
}
});
router
.route(QUERY_PARAM.getPath())
.handler(
ctx -> {
HttpServerTest.controller(
QUERY_PARAM,
() -> {
ctx.response()
.setStatusCode(QUERY_PARAM.getStatus())
.end(ctx.request().query());
return null;
});
});
router
.route(REDIRECT.getPath())
.handler(
ctx -> {
HttpServerTest.controller(
REDIRECT,
() -> {
ctx.response()
.setStatusCode(REDIRECT.getStatus())
.putHeader("location", REDIRECT.getBody())
.end();
return null;
});
});
router
.route(ERROR.getPath())
.handler(
ctx -> {
HttpServerTest.controller(
ERROR,
() -> {
ctx.response().setStatusCode(ERROR.getStatus()).end(ERROR.getBody());
return null;
});
});
router
.route(EXCEPTION.getPath())
.handler(
ctx -> {
HttpServerTest.controller(
EXCEPTION,
() -> {
throw new Exception(EXCEPTION.getBody());
});
});
router
.route("/path/:id/param")
.handler(
ctx -> {
HttpServerTest.controller(
PATH_PARAM,
() -> {
ctx.response()
.setStatusCode(PATH_PARAM.getStatus())
.end(ctx.request().getParam("id"));
return null;
});
});
vertx
.createHttpServer()
.requestHandler(router::accept)
.listen(port, it -> startFuture.complete());
}
}

View File

@ -0,0 +1,40 @@
// Set properties before any plugins get loaded
ext {
minJavaVersionForTests = JavaVersion.VERSION_1_8
}
apply from: "${rootDir}/gradle/instrumentation.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
muzzle {
pass {
group = 'io.vertx'
module = 'vertx-web'
versions = "[3.0.0,)"
}
}
testSets {
latestDepTest {
dirName = 'test'
}
}
ext.vertxVersion = '3.0.0'
dependencies {
compileOnly group: 'io.vertx', name: 'vertx-web', version: vertxVersion
//We need both version as different versions of Vert.x use different versions of Netty
testCompile project(':instrumentation:netty:netty-4.0')
testCompile project(':instrumentation:netty:netty-4.1')
testCompile project(':instrumentation:jdbc')
testCompile project(':instrumentation:trace-annotation')
testCompile group: 'io.vertx', name: 'vertx-web', version: vertxVersion
testCompile group: 'io.vertx', name: 'vertx-jdbc-client', version: vertxVersion
// Vert.x 4.0 is incompatible with our tests.
latestDepTestCompile group: 'io.vertx', name: 'vertx-web', version: '3.+'
latestDepTestCompile group: 'io.vertx', name: 'vertx-web-client', version: '3.+'
}

View File

@ -145,7 +145,8 @@ include ':instrumentation:spring-webflux-5.0'
include ':instrumentation:spymemcached-2.12'
include ':instrumentation:trace-annotation'
include ':instrumentation:twilio-6.6'
include ':instrumentation:vertx-testing'
include ':instrumentation:vertx'
include ':instrumentation:vertx-reactive'
include ':instrumentation-core:aws-sdk:aws-sdk-2.2-core'
include ':instrumentation-core:spring'

View File

@ -25,6 +25,7 @@ import io.opentelemetry.auto.test.utils.OkHttpUtils
import io.opentelemetry.auto.test.utils.PortUtils
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.trace.Span
import java.util.concurrent.Callable
import okhttp3.HttpUrl
import okhttp3.OkHttpClient
import okhttp3.Request
@ -96,7 +97,7 @@ abstract class HttpServerTest<SERVER> extends AgentTestRunner {
abstract void stopServer(SERVER server)
String expectedOperationName(String method) {
String expectedOperationName(String method, ServerEndpoint endpoint) {
return method != null ? "HTTP $method" : HttpServerDecorator.DEFAULT_SPAN_NAME
}
@ -207,10 +208,10 @@ abstract class HttpServerTest<SERVER> extends AgentTestRunner {
.method(method, body)
}
static <T> T controller(ServerEndpoint endpoint, Closure<T> closure) {
static <T> T controller(ServerEndpoint endpoint, Callable<T> closure) {
assert TEST_TRACER.getCurrentSpan().getContext().isValid(): "Controller should have a parent span."
if (endpoint == NOT_FOUND) {
return closure()
return closure.call()
}
return runUnderTrace("controller", closure)
}
@ -453,7 +454,7 @@ abstract class HttpServerTest<SERVER> extends AgentTestRunner {
// parent span must be cast otherwise it breaks debugging classloading (junit loads it early)
void serverSpan(TraceAssert trace, int index, String traceID = null, String parentID = null, String method = "GET", ServerEndpoint endpoint = SUCCESS) {
trace.span(index) {
operationName expectedOperationName(method)
operationName expectedOperationName(method, endpoint)
spanKind Span.Kind.SERVER // can't use static import because of SERVER type parameter
errored endpoint.errored
if (parentID != null) {