Improve vertx-sql client context propagation (#9640)

This commit is contained in:
Lauri Tulmin 2023-10-10 16:37:21 +03:00 committed by GitHub
parent 13585c6d95
commit 6038a872c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 183 additions and 32 deletions

View File

@ -97,6 +97,7 @@ public class PoolInstrumentation implements TypeInstrumentation {
SqlConnectOptions sqlConnectOptions = virtualField.get(pool);
future = VertxSqlClientSingletons.attachConnectOptions(future, sqlConnectOptions);
future = VertxSqlClientSingletons.wrapContext(future);
}
}
}

View File

@ -6,9 +6,6 @@
package io.opentelemetry.javaagent.instrumentation.vertx.v4_0.sql;
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.vertx.v4_0.sql.VertxSqlClientSingletons.OTEL_CONTEXT_KEY;
import static io.opentelemetry.javaagent.instrumentation.vertx.v4_0.sql.VertxSqlClientSingletons.OTEL_PARENT_CONTEXT_KEY;
import static io.opentelemetry.javaagent.instrumentation.vertx.v4_0.sql.VertxSqlClientSingletons.OTEL_REQUEST_KEY;
import static io.opentelemetry.javaagent.instrumentation.vertx.v4_0.sql.VertxSqlClientSingletons.getSqlConnectOptions;
import static io.opentelemetry.javaagent.instrumentation.vertx.v4_0.sql.VertxSqlClientSingletons.instrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
@ -98,9 +95,7 @@ public class QueryExecutorInstrumentation implements TypeInstrumentation {
context = instrumenter().start(parentContext, otelRequest);
scope = context.makeCurrent();
promiseInternal.context().localContextData().put(OTEL_REQUEST_KEY, otelRequest);
promiseInternal.context().localContextData().put(OTEL_CONTEXT_KEY, context);
promiseInternal.context().localContextData().put(OTEL_PARENT_CONTEXT_KEY, parentContext);
VertxSqlClientSingletons.attachRequest(promiseInternal, otelRequest, context, parentContext);
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)

View File

@ -13,8 +13,6 @@ import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.vertx.core.Promise;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@ -40,12 +38,7 @@ public class QueryResultBuilderInstrumentation implements TypeInstrumentation {
public static class CompleteAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope onEnter(@Advice.FieldValue("handler") Promise<?> promise) {
if (!(promise instanceof PromiseInternal)) {
return null;
}
PromiseInternal<?> promiseInternal = (PromiseInternal<?>) promise;
ContextInternal contextInternal = promiseInternal.context();
return endQuerySpan(contextInternal.localContextData(), null);
return endQuerySpan(promise, null);
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
@ -61,12 +54,7 @@ public class QueryResultBuilderInstrumentation implements TypeInstrumentation {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope onEnter(
@Advice.Argument(0) Throwable throwable, @Advice.FieldValue("handler") Promise<?> promise) {
if (!(promise instanceof PromiseInternal)) {
return null;
}
PromiseInternal<?> promiseInternal = (PromiseInternal<?>) promise;
ContextInternal contextInternal = promiseInternal.context();
return endQuerySpan(contextInternal.localContextData(), throwable);
return endQuerySpan(promise, throwable);
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)

View File

@ -19,15 +19,13 @@ import io.opentelemetry.instrumentation.api.instrumenter.network.ServerAttribute
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.sqlclient.SqlConnectOptions;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.impl.SqlClientBase;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
public final class VertxSqlClientSingletons {
public static final String OTEL_REQUEST_KEY = "otel.request";
public static final String OTEL_CONTEXT_KEY = "otel.context";
public static final String OTEL_PARENT_CONTEXT_KEY = "otel.parent-context";
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.vertx-sql-client-4.0";
private static final Instrumenter<VertxSqlClientRequest, Void> INSTRUMENTER;
private static final ThreadLocal<SqlConnectOptions> connectOptions = new ThreadLocal<>();
@ -66,16 +64,33 @@ public final class VertxSqlClientSingletons {
return connectOptions.get();
}
public static Scope endQuerySpan(Map<Object, Object> contextData, Throwable throwable) {
VertxSqlClientRequest otelRequest =
(VertxSqlClientRequest) contextData.remove(OTEL_REQUEST_KEY);
Context otelContext = (Context) contextData.remove(OTEL_CONTEXT_KEY);
Context otelParentContext = (Context) contextData.remove(OTEL_PARENT_CONTEXT_KEY);
if (otelRequest == null || otelContext == null || otelParentContext == null) {
private static final VirtualField<Promise<?>, RequestData> requestDataField =
VirtualField.find(Promise.class, RequestData.class);
public static void attachRequest(
Promise<?> promise, VertxSqlClientRequest request, Context context, Context parentContext) {
requestDataField.set(promise, new RequestData(request, context, parentContext));
}
public static Scope endQuerySpan(Promise<?> promise, Throwable throwable) {
RequestData requestData = requestDataField.get(promise);
if (requestData == null) {
return null;
}
instrumenter().end(otelContext, otelRequest, null, throwable);
return otelParentContext.makeCurrent();
instrumenter().end(requestData.context, requestData.request, null, throwable);
return requestData.parentContext.makeCurrent();
}
static class RequestData {
final VertxSqlClientRequest request;
final Context context;
final Context parentContext;
RequestData(VertxSqlClientRequest request, Context context, Context parentContext) {
this.request = request;
this.context = context;
this.parentContext = parentContext;
}
}
// this virtual field is also used in SqlClientBase instrumentation
@ -93,5 +108,23 @@ public final class VertxSqlClientSingletons {
});
}
public static <T> Future<T> wrapContext(Future<T> future) {
Context context = Context.current();
CompletableFuture<T> result = new CompletableFuture<>();
future
.toCompletionStage()
.whenComplete(
(value, throwable) -> {
try (Scope ignore = context.makeCurrent()) {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
result.complete(value);
}
}
});
return Future.fromCompletionStage(result);
}
private VertxSqlClientSingletons() {}
}

View File

@ -20,8 +20,10 @@ import static io.opentelemetry.semconv.SemanticAttributes.NET_PEER_NAME;
import static io.opentelemetry.semconv.SemanticAttributes.NET_PEER_PORT;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.testing.assertj.TraceAssert;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.vertx.core.Vertx;
import io.vertx.pgclient.PgConnectOptions;
@ -30,10 +32,16 @@ import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.Tuple;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@ -54,6 +62,9 @@ class VertxSqlClientTest {
@RegisterExtension
private static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@RegisterExtension
private static final AutoCleanupExtension cleanup = AutoCleanupExtension.create();
private static GenericContainer<?> container;
private static Vertx vertx;
private static Pool pool;
@ -288,4 +299,127 @@ class VertxSqlClientTest {
assertPreparedSelect();
}
@Test
void testManyQueries() throws Exception {
int count = 50;
CountDownLatch latch = new CountDownLatch(count);
List<CompletableFuture<Object>> futureList = new ArrayList<>();
List<CompletableFuture<Object>> resultList = new ArrayList<>();
for (int i = 0; i < count; i++) {
CompletableFuture<Object> future = new CompletableFuture<>();
futureList.add(future);
resultList.add(
future.whenComplete((rows, throwable) -> testing.runWithSpan("callback", () -> {})));
}
for (CompletableFuture<Object> future : futureList) {
testing.runWithSpan(
"parent",
() ->
pool.query("select * from test")
.execute(
rowSetAsyncResult -> {
if (rowSetAsyncResult.succeeded()) {
future.complete(rowSetAsyncResult.result());
} else {
future.completeExceptionally(rowSetAsyncResult.cause());
}
latch.countDown();
}));
}
latch.await(30, TimeUnit.SECONDS);
for (CompletableFuture<Object> result : resultList) {
result.get(10, TimeUnit.SECONDS);
}
List<Consumer<TraceAssert>> assertions =
Collections.nCopies(
count,
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL),
span ->
span.hasName("SELECT tempdb.test")
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(DB_NAME, DB),
equalTo(DB_USER, USER_DB),
equalTo(DB_STATEMENT, "select * from test"),
equalTo(DB_OPERATION, "SELECT"),
equalTo(DB_SQL_TABLE, "test"),
equalTo(NET_PEER_NAME, "localhost"),
equalTo(NET_PEER_PORT, port)),
span ->
span.hasName("callback")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
testing.waitAndAssertTraces(assertions);
}
@Test
void testConcurrency() throws Exception {
int count = 50;
CountDownLatch latch = new CountDownLatch(count);
List<CompletableFuture<Object>> futureList = new ArrayList<>();
List<CompletableFuture<Object>> resultList = new ArrayList<>();
for (int i = 0; i < count; i++) {
CompletableFuture<Object> future = new CompletableFuture<>();
futureList.add(future);
resultList.add(
future.whenComplete((rows, throwable) -> testing.runWithSpan("callback", () -> {})));
}
ExecutorService executorService = Executors.newFixedThreadPool(4);
cleanup.deferCleanup(() -> executorService.shutdown());
for (CompletableFuture<Object> future : futureList) {
executorService.submit(
() -> {
testing
.runWithSpan(
"parent",
() ->
pool.withConnection(
conn ->
conn.preparedQuery("select * from test where id = $1")
.execute(Tuple.of(1))))
.onComplete(
rowSetAsyncResult -> {
if (rowSetAsyncResult.succeeded()) {
future.complete(rowSetAsyncResult.result());
} else {
future.completeExceptionally(rowSetAsyncResult.cause());
}
latch.countDown();
});
});
}
latch.await(30, TimeUnit.SECONDS);
for (CompletableFuture<Object> result : resultList) {
result.get(10, TimeUnit.SECONDS);
}
List<Consumer<TraceAssert>> assertions =
Collections.nCopies(
count,
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL),
span ->
span.hasName("SELECT tempdb.test")
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(DB_NAME, DB),
equalTo(DB_USER, USER_DB),
equalTo(DB_STATEMENT, "select * from test where id = $?"),
equalTo(DB_OPERATION, "SELECT"),
equalTo(DB_SQL_TABLE, "test"),
equalTo(NET_PEER_NAME, "localhost"),
equalTo(NET_PEER_PORT, port)),
span ->
span.hasName("callback")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
testing.waitAndAssertTraces(assertions);
}
}