Improve hibernate reactive instrumentation (#9486)
This commit is contained in:
parent
331aa04e35
commit
403e133446
|
@ -52,6 +52,7 @@ testing {
|
|||
implementation("org.hibernate.reactive:hibernate-reactive-core:2.0.0.Final")
|
||||
implementation("io.vertx:vertx-pg-client:4.4.2")
|
||||
}
|
||||
compileOnly("io.vertx:vertx-codegen:4.4.2")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -123,28 +123,156 @@ class HibernateReactiveTest {
|
|||
testing.runWithSpan(
|
||||
"parent",
|
||||
() ->
|
||||
Vertx.vertx()
|
||||
.getOrCreateContext()
|
||||
.runOnContext(
|
||||
event ->
|
||||
stageSessionFactory
|
||||
.withSession(
|
||||
session -> {
|
||||
if (!Span.current().getSpanContext().isValid()) {
|
||||
throw new IllegalStateException("missing parent span");
|
||||
}
|
||||
runWithVertx(
|
||||
() ->
|
||||
stageSessionFactory
|
||||
.withSession(
|
||||
session -> {
|
||||
if (!Span.current().getSpanContext().isValid()) {
|
||||
throw new IllegalStateException("missing parent span");
|
||||
}
|
||||
|
||||
return session
|
||||
.find(Value.class, 1L)
|
||||
.thenAccept(
|
||||
value -> testing.runWithSpan("callback", () -> {}));
|
||||
})
|
||||
.thenAccept(unused -> latch.countDown())));
|
||||
return session
|
||||
.find(Value.class, 1L)
|
||||
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
|
||||
})
|
||||
.thenAccept(unused -> latch.countDown())));
|
||||
latch.await(30, TimeUnit.SECONDS);
|
||||
|
||||
assertTrace();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testStageWithStatelessSession() throws Exception {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
testing.runWithSpan(
|
||||
"parent",
|
||||
() ->
|
||||
runWithVertx(
|
||||
() ->
|
||||
stageSessionFactory
|
||||
.withStatelessSession(
|
||||
session -> {
|
||||
if (!Span.current().getSpanContext().isValid()) {
|
||||
throw new IllegalStateException("missing parent span");
|
||||
}
|
||||
|
||||
return session
|
||||
.get(Value.class, 1L)
|
||||
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
|
||||
})
|
||||
.thenAccept(unused -> latch.countDown())));
|
||||
latch.await(30, TimeUnit.SECONDS);
|
||||
|
||||
assertTrace();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testStageSessionWithTransaction() throws Exception {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
testing.runWithSpan(
|
||||
"parent",
|
||||
() ->
|
||||
runWithVertx(
|
||||
() ->
|
||||
stageSessionFactory
|
||||
.withSession(
|
||||
session -> {
|
||||
if (!Span.current().getSpanContext().isValid()) {
|
||||
throw new IllegalStateException("missing parent span");
|
||||
}
|
||||
|
||||
return session
|
||||
.withTransaction(transaction -> session.find(Value.class, 1L))
|
||||
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
|
||||
})
|
||||
.thenAccept(unused -> latch.countDown())));
|
||||
latch.await(30, TimeUnit.SECONDS);
|
||||
|
||||
assertTrace();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testStageStatelessSessionWithTransaction() throws Exception {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
testing.runWithSpan(
|
||||
"parent",
|
||||
() ->
|
||||
runWithVertx(
|
||||
() ->
|
||||
stageSessionFactory
|
||||
.withStatelessSession(
|
||||
session -> {
|
||||
if (!Span.current().getSpanContext().isValid()) {
|
||||
throw new IllegalStateException("missing parent span");
|
||||
}
|
||||
|
||||
return session
|
||||
.withTransaction(transaction -> session.get(Value.class, 1L))
|
||||
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
|
||||
})
|
||||
.thenAccept(unused -> latch.countDown())));
|
||||
latch.await(30, TimeUnit.SECONDS);
|
||||
|
||||
assertTrace();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testStageOpenSession() throws Exception {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
testing.runWithSpan(
|
||||
"parent",
|
||||
() ->
|
||||
runWithVertx(
|
||||
() ->
|
||||
stageSessionFactory
|
||||
.openSession()
|
||||
.thenApply(
|
||||
session -> {
|
||||
if (!Span.current().getSpanContext().isValid()) {
|
||||
throw new IllegalStateException("missing parent span");
|
||||
}
|
||||
|
||||
return session
|
||||
.find(Value.class, 1L)
|
||||
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
|
||||
})
|
||||
.thenAccept(unused -> latch.countDown())));
|
||||
latch.await(30, TimeUnit.SECONDS);
|
||||
|
||||
assertTrace();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testStageOpenStatelessSession() throws Exception {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
testing.runWithSpan(
|
||||
"parent",
|
||||
() ->
|
||||
runWithVertx(
|
||||
() ->
|
||||
stageSessionFactory
|
||||
.openStatelessSession()
|
||||
.thenApply(
|
||||
session -> {
|
||||
if (!Span.current().getSpanContext().isValid()) {
|
||||
throw new IllegalStateException("missing parent span");
|
||||
}
|
||||
|
||||
return session
|
||||
.get(Value.class, 1L)
|
||||
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
|
||||
})
|
||||
.thenAccept(unused -> latch.countDown())));
|
||||
latch.await(30, TimeUnit.SECONDS);
|
||||
|
||||
assertTrace();
|
||||
}
|
||||
|
||||
private static void runWithVertx(Runnable runnable) {
|
||||
Vertx.vertx().getOrCreateContext().runOnContext(event -> runnable.run());
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation") // until old http semconv are dropped in 2.0
|
||||
private static void assertTrace() {
|
||||
testing.waitAndAssertTraces(
|
||||
|
|
|
@ -18,9 +18,11 @@ import io.opentelemetry.api.trace.Span;
|
|||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||
import io.vertx.core.Vertx;
|
||||
import jakarta.persistence.EntityManagerFactory;
|
||||
import jakarta.persistence.Persistence;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.hibernate.reactive.mutiny.Mutiny;
|
||||
import org.hibernate.reactive.stage.Stage;
|
||||
|
@ -138,6 +140,131 @@ class HibernateReactiveTest {
|
|||
assertTrace();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testStageWithStatelessSession() throws Exception {
|
||||
testing
|
||||
.runWithSpan(
|
||||
"parent",
|
||||
() ->
|
||||
stageSessionFactory
|
||||
.withStatelessSession(
|
||||
session -> {
|
||||
if (!Span.current().getSpanContext().isValid()) {
|
||||
throw new IllegalStateException("missing parent span");
|
||||
}
|
||||
|
||||
return session
|
||||
.get(Value.class, 1L)
|
||||
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
|
||||
})
|
||||
.toCompletableFuture())
|
||||
.get(30, TimeUnit.SECONDS);
|
||||
|
||||
assertTrace();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testStageSessionWithTransaction() throws Exception {
|
||||
testing
|
||||
.runWithSpan(
|
||||
"parent",
|
||||
() ->
|
||||
stageSessionFactory
|
||||
.withSession(
|
||||
session -> {
|
||||
if (!Span.current().getSpanContext().isValid()) {
|
||||
throw new IllegalStateException("missing parent span");
|
||||
}
|
||||
|
||||
return session
|
||||
.withTransaction(transaction -> session.find(Value.class, 1L))
|
||||
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
|
||||
})
|
||||
.toCompletableFuture())
|
||||
.get(30, TimeUnit.SECONDS);
|
||||
|
||||
assertTrace();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testStageStatelessSessionWithTransaction() throws Exception {
|
||||
testing
|
||||
.runWithSpan(
|
||||
"parent",
|
||||
() ->
|
||||
stageSessionFactory
|
||||
.withStatelessSession(
|
||||
session -> {
|
||||
if (!Span.current().getSpanContext().isValid()) {
|
||||
throw new IllegalStateException("missing parent span");
|
||||
}
|
||||
|
||||
return session
|
||||
.withTransaction(transaction -> session.get(Value.class, 1L))
|
||||
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
|
||||
})
|
||||
.toCompletableFuture())
|
||||
.get(30, TimeUnit.SECONDS);
|
||||
|
||||
assertTrace();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testStageOpenSession() throws Exception {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
testing.runWithSpan(
|
||||
"parent",
|
||||
() ->
|
||||
runWithVertx(
|
||||
() ->
|
||||
stageSessionFactory
|
||||
.openSession()
|
||||
.thenApply(
|
||||
session -> {
|
||||
if (!Span.current().getSpanContext().isValid()) {
|
||||
throw new IllegalStateException("missing parent span");
|
||||
}
|
||||
|
||||
return session
|
||||
.find(Value.class, 1L)
|
||||
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
|
||||
})
|
||||
.thenAccept(unused -> latch.countDown())));
|
||||
latch.await(30, TimeUnit.SECONDS);
|
||||
|
||||
assertTrace();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testStageOpenStatelessSession() throws Exception {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
testing.runWithSpan(
|
||||
"parent",
|
||||
() ->
|
||||
runWithVertx(
|
||||
() ->
|
||||
stageSessionFactory
|
||||
.openStatelessSession()
|
||||
.thenApply(
|
||||
session -> {
|
||||
if (!Span.current().getSpanContext().isValid()) {
|
||||
throw new IllegalStateException("missing parent span");
|
||||
}
|
||||
|
||||
return session
|
||||
.get(Value.class, 1L)
|
||||
.thenAccept(value -> testing.runWithSpan("callback", () -> {}));
|
||||
})
|
||||
.thenAccept(unused -> latch.countDown())));
|
||||
latch.await(30, TimeUnit.SECONDS);
|
||||
|
||||
assertTrace();
|
||||
}
|
||||
|
||||
private static void runWithVertx(Runnable runnable) {
|
||||
Vertx.vertx().getOrCreateContext().runOnContext(event -> runnable.run());
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation") // until old http semconv are dropped in 2.0
|
||||
private static void assertTrace() {
|
||||
testing.waitAndAssertTraces(
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.hibernate.reactive.v1_0;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
|
||||
public final class CompletionStageWrapper {
|
||||
|
||||
private CompletionStageWrapper() {}
|
||||
|
||||
public static <T> CompletionStage<T> wrap(CompletionStage<T> future) {
|
||||
Context context = Context.current();
|
||||
if (context != Context.root()) {
|
||||
return wrap(future, context);
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
private static <T> CompletionStage<T> wrap(CompletionStage<T> completionStage, Context context) {
|
||||
CompletableFuture<T> result = new CompletableFuture<>();
|
||||
completionStage.whenComplete(
|
||||
(T value, Throwable throwable) -> {
|
||||
try (Scope ignored = context.makeCurrent()) {
|
||||
if (throwable != null) {
|
||||
result.completeExceptionally(throwable);
|
||||
} else {
|
||||
result.complete(value);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -22,6 +22,8 @@ public class HibernateReactiveInstrumentationModule extends InstrumentationModul
|
|||
@Override
|
||||
public List<TypeInstrumentation> typeInstrumentations() {
|
||||
return asList(
|
||||
new StageSessionFactoryInstrumentation(), new MutinySessionFactoryInstrumentation());
|
||||
new StageSessionFactoryInstrumentation(),
|
||||
new StageSessionImplInstrumentation(),
|
||||
new MutinySessionFactoryInstrumentation());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,10 +7,12 @@ package io.opentelemetry.javaagent.instrumentation.hibernate.reactive.v1_0;
|
|||
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.returns;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.function.Function;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
|
@ -30,6 +32,9 @@ public class StageSessionFactoryInstrumentation implements TypeInstrumentation {
|
|||
transformer.applyAdviceToMethod(
|
||||
namedOneOf("withSession", "withStatelessSession").and(takesArgument(1, Function.class)),
|
||||
this.getClass().getName() + "$Function1Advice");
|
||||
transformer.applyAdviceToMethod(
|
||||
namedOneOf("openSession", "openStatelessSession").and(returns(CompletionStage.class)),
|
||||
this.getClass().getName() + "$OpenSessionAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
|
@ -49,4 +54,12 @@ public class StageSessionFactoryInstrumentation implements TypeInstrumentation {
|
|||
function = FunctionWrapper.wrap(function);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class OpenSessionAdvice {
|
||||
@Advice.OnMethodExit(suppress = Throwable.class)
|
||||
public static void onExit(@Advice.Return(readOnly = false) CompletionStage<?> completionStage) {
|
||||
completionStage = CompletionStageWrapper.wrap(completionStage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.hibernate.reactive.v1_0;
|
||||
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.returns;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.function.Function;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
public class StageSessionImplInstrumentation implements TypeInstrumentation {
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return namedOneOf(
|
||||
"org.hibernate.reactive.stage.impl.StageSessionImpl",
|
||||
"org.hibernate.reactive.stage.impl.StageStatelessSessionImpl");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
named("withTransaction")
|
||||
.and(takesArgument(0, Function.class).and(returns(CompletionStage.class))),
|
||||
this.getClass().getName() + "$WithTransactionAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class WithTransactionAdvice {
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static void onEnter(
|
||||
@Advice.Argument(value = 0, readOnly = false) Function<?, ?> function) {
|
||||
function = FunctionWrapper.wrap(function);
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(suppress = Throwable.class)
|
||||
public static void onExit(@Advice.Return(readOnly = false) CompletionStage<?> completionStage) {
|
||||
completionStage = CompletionStageWrapper.wrap(completionStage);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue