From 26689b2c4050e00e600544031ec1907b035b42cf Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Wed, 8 Feb 2023 05:25:20 +0200 Subject: [PATCH] Fix ClassCastException with redisson batch atomically (#7743) Resolves https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/7702 --- .../redisson/CompletableFutureWrapper.java | 17 +++- .../redisson-common/testing/build.gradle.kts | 2 +- .../AbstractRedissonAsyncClientTest.groovy | 84 +++++++++++++++++++ .../groovy/AbstractRedissonClientTest.groovy | 71 ++++++++++++++++ 4 files changed, 170 insertions(+), 4 deletions(-) diff --git a/instrumentation/redisson/redisson-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/redisson/CompletableFutureWrapper.java b/instrumentation/redisson/redisson-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/redisson/CompletableFutureWrapper.java index 8020e25536..681c8adc9d 100644 --- a/instrumentation/redisson/redisson-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/redisson/CompletableFutureWrapper.java +++ b/instrumentation/redisson/redisson-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/redisson/CompletableFutureWrapper.java @@ -9,8 +9,8 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import java.util.concurrent.CompletableFuture; -public final class CompletableFutureWrapper extends CompletableFuture - implements PromiseWrapper { +public class CompletableFutureWrapper extends CompletableFuture implements PromiseWrapper { + private static final Class batchPromiseClass = getBatchPromiseClass(); private volatile EndOperationListener endOperationListener; private CompletableFutureWrapper(CompletableFuture delegate) { @@ -36,7 +36,8 @@ public final class CompletableFutureWrapper extends CompletableFuture * span, could be attached to it. */ public static CompletableFuture wrap(CompletableFuture delegate) { - if (delegate instanceof CompletableFutureWrapper) { + if (delegate instanceof CompletableFutureWrapper + || (batchPromiseClass != null && batchPromiseClass.isInstance(delegate))) { return delegate; } @@ -47,4 +48,14 @@ public final class CompletableFutureWrapper extends CompletableFuture public void setEndOperationListener(EndOperationListener endOperationListener) { this.endOperationListener = endOperationListener; } + + private static Class getBatchPromiseClass() { + try { + // using Class.forName because this class is not available in the redisson version we compile + // against + return Class.forName("org.redisson.command.BatchPromise"); + } catch (ClassNotFoundException exception) { + return null; + } + } } diff --git a/instrumentation/redisson/redisson-common/testing/build.gradle.kts b/instrumentation/redisson/redisson-common/testing/build.gradle.kts index e730646c79..4eba7a46cb 100644 --- a/instrumentation/redisson/redisson-common/testing/build.gradle.kts +++ b/instrumentation/redisson/redisson-common/testing/build.gradle.kts @@ -10,5 +10,5 @@ dependencies { implementation("org.spockframework:spock-core") implementation("org.testcontainers:testcontainers") - compileOnly("org.redisson:redisson:3.0.0") + compileOnly("org.redisson:redisson:3.7.2") } diff --git a/instrumentation/redisson/redisson-common/testing/src/main/groovy/AbstractRedissonAsyncClientTest.groovy b/instrumentation/redisson/redisson-common/testing/src/main/groovy/AbstractRedissonAsyncClientTest.groovy index e4899ca231..4be1622468 100644 --- a/instrumentation/redisson/redisson-common/testing/src/main/groovy/AbstractRedissonAsyncClientTest.groovy +++ b/instrumentation/redisson/redisson-common/testing/src/main/groovy/AbstractRedissonAsyncClientTest.groovy @@ -6,7 +6,10 @@ import io.opentelemetry.api.trace.Span import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import org.junit.Assume import org.redisson.Redisson +import org.redisson.api.BatchOptions +import org.redisson.api.RBatch import org.redisson.api.RBucket import org.redisson.api.RFuture import org.redisson.api.RScheduledExecutorService @@ -151,4 +154,85 @@ abstract class AbstractRedissonAsyncClientTest extends AgentInstrumentationSpeci return null } } + + def "test atomic batch command"() { + try { + // available since 3.7.2 + Class.forName('org.redisson.api.BatchOptions$ExecutionMode') + } catch (ClassNotFoundException exception) { + Assume.assumeNoException(exception) + } + + when: + CompletionStage result = runWithSpan("parent") { + def batchOptions = BatchOptions.defaults().executionMode(BatchOptions.ExecutionMode.REDIS_WRITE_ATOMIC) + RBatch batch = redisson.createBatch(batchOptions) + batch.getBucket("batch1").setAsync("v1") + batch.getBucket("batch2").setAsync("v2") + RFuture future = batch.executeAsync() + return future.whenComplete({ res, throwable -> + if (!Span.current().getSpanContext().isValid()) { + new Exception("Callback should have a parent span.").printStackTrace() + } + runWithSpan("callback") { + } + }) + } + + then: + result.toCompletableFuture().get(30, TimeUnit.SECONDS) + assertTraces(1) { + trace(0, 5) { + span(0) { + name "parent" + kind INTERNAL + hasNoParent() + } + span(1) { + name "DB Query" + kind CLIENT + childOf(span(0)) + attributes { + "$SemanticAttributes.NET_SOCK_PEER_ADDR" "127.0.0.1" + "$SemanticAttributes.NET_SOCK_PEER_NAME" "localhost" + "$SemanticAttributes.NET_SOCK_PEER_PORT" port + "$SemanticAttributes.DB_SYSTEM" "redis" + "$SemanticAttributes.DB_STATEMENT" "MULTI;SET batch1 ?" + "$SemanticAttributes.DB_OPERATION" null + } + } + span(2) { + name "SET" + kind CLIENT + childOf(span(0)) + attributes { + "$SemanticAttributes.NET_SOCK_PEER_ADDR" "127.0.0.1" + "$SemanticAttributes.NET_SOCK_PEER_NAME" "localhost" + "$SemanticAttributes.NET_SOCK_PEER_PORT" port + "$SemanticAttributes.DB_SYSTEM" "redis" + "$SemanticAttributes.DB_STATEMENT" "SET batch2 ?" + "$SemanticAttributes.DB_OPERATION" "SET" + } + } + span(3) { + name "EXEC" + kind CLIENT + childOf(span(0)) + attributes { + "$SemanticAttributes.NET_SOCK_PEER_ADDR" "127.0.0.1" + "$SemanticAttributes.NET_SOCK_PEER_NAME" "localhost" + "$SemanticAttributes.NET_SOCK_PEER_PORT" port + "$SemanticAttributes.DB_SYSTEM" "redis" + "$SemanticAttributes.DB_STATEMENT" "EXEC" + "$SemanticAttributes.DB_OPERATION" "EXEC" + } + } + span(4) { + name "callback" + kind INTERNAL + childOf(span(0)) + } + } + } + } } diff --git a/instrumentation/redisson/redisson-common/testing/src/main/groovy/AbstractRedissonClientTest.groovy b/instrumentation/redisson/redisson-common/testing/src/main/groovy/AbstractRedissonClientTest.groovy index c275aa81a0..bed716658c 100644 --- a/instrumentation/redisson/redisson-common/testing/src/main/groovy/AbstractRedissonClientTest.groovy +++ b/instrumentation/redisson/redisson-common/testing/src/main/groovy/AbstractRedissonClientTest.groovy @@ -5,7 +5,9 @@ import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import org.junit.Assume import org.redisson.Redisson +import org.redisson.api.BatchOptions import org.redisson.api.RAtomicLong import org.redisson.api.RBatch import org.redisson.api.RBucket @@ -21,6 +23,7 @@ import org.testcontainers.containers.GenericContainer import spock.lang.Shared import static io.opentelemetry.api.trace.SpanKind.CLIENT +import static io.opentelemetry.api.trace.SpanKind.INTERNAL import static java.util.regex.Pattern.compile import static java.util.regex.Pattern.quote @@ -129,6 +132,74 @@ abstract class AbstractRedissonClientTest extends AgentInstrumentationSpecificat } } + def "test atomic batch command"() { + try { + // available since 3.7.2 + Class.forName('org.redisson.api.BatchOptions$ExecutionMode') + } catch (ClassNotFoundException exception) { + Assume.assumeNoException(exception) + } + + when: + runWithSpan("parent") { + def batchOptions = BatchOptions.defaults().executionMode(BatchOptions.ExecutionMode.REDIS_WRITE_ATOMIC) + RBatch batch = redisson.createBatch(batchOptions) + batch.getBucket("batch1").setAsync("v1") + batch.getBucket("batch2").setAsync("v2") + batch.execute() + } + + then: + assertTraces(1) { + trace(0, 4) { + span(0) { + name "parent" + kind INTERNAL + hasNoParent() + } + span(1) { + name "DB Query" + kind CLIENT + childOf(span(0)) + attributes { + "$SemanticAttributes.NET_SOCK_PEER_ADDR" "127.0.0.1" + "$SemanticAttributes.NET_SOCK_PEER_NAME" "localhost" + "$SemanticAttributes.NET_SOCK_PEER_PORT" port + "$SemanticAttributes.DB_SYSTEM" "redis" + "$SemanticAttributes.DB_STATEMENT" "MULTI;SET batch1 ?" + "$SemanticAttributes.DB_OPERATION" null + } + } + span(2) { + name "SET" + kind CLIENT + childOf(span(0)) + attributes { + "$SemanticAttributes.NET_SOCK_PEER_ADDR" "127.0.0.1" + "$SemanticAttributes.NET_SOCK_PEER_NAME" "localhost" + "$SemanticAttributes.NET_SOCK_PEER_PORT" port + "$SemanticAttributes.DB_SYSTEM" "redis" + "$SemanticAttributes.DB_STATEMENT" "SET batch2 ?" + "$SemanticAttributes.DB_OPERATION" "SET" + } + } + span(3) { + name "EXEC" + kind CLIENT + childOf(span(0)) + attributes { + "$SemanticAttributes.NET_SOCK_PEER_ADDR" "127.0.0.1" + "$SemanticAttributes.NET_SOCK_PEER_NAME" "localhost" + "$SemanticAttributes.NET_SOCK_PEER_PORT" port + "$SemanticAttributes.DB_SYSTEM" "redis" + "$SemanticAttributes.DB_STATEMENT" "EXEC" + "$SemanticAttributes.DB_OPERATION" "EXEC" + } + } + } + } + } + def "test list command"() { when: RList strings = redisson.getList("list1")