Fix ClassCastException with redisson batch atomically (#7743)
Resolves https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/7702
This commit is contained in:
parent
2ec97a601c
commit
26689b2c40
|
@ -9,8 +9,8 @@ import io.opentelemetry.context.Context;
|
||||||
import io.opentelemetry.context.Scope;
|
import io.opentelemetry.context.Scope;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
public final class CompletableFutureWrapper<T> extends CompletableFuture<T>
|
public class CompletableFutureWrapper<T> extends CompletableFuture<T> implements PromiseWrapper<T> {
|
||||||
implements PromiseWrapper<T> {
|
private static final Class<?> batchPromiseClass = getBatchPromiseClass();
|
||||||
private volatile EndOperationListener<T> endOperationListener;
|
private volatile EndOperationListener<T> endOperationListener;
|
||||||
|
|
||||||
private CompletableFutureWrapper(CompletableFuture<T> delegate) {
|
private CompletableFutureWrapper(CompletableFuture<T> delegate) {
|
||||||
|
@ -36,7 +36,8 @@ public final class CompletableFutureWrapper<T> extends CompletableFuture<T>
|
||||||
* span, could be attached to it.
|
* span, could be attached to it.
|
||||||
*/
|
*/
|
||||||
public static <T> CompletableFuture<T> wrap(CompletableFuture<T> delegate) {
|
public static <T> CompletableFuture<T> wrap(CompletableFuture<T> delegate) {
|
||||||
if (delegate instanceof CompletableFutureWrapper) {
|
if (delegate instanceof CompletableFutureWrapper
|
||||||
|
|| (batchPromiseClass != null && batchPromiseClass.isInstance(delegate))) {
|
||||||
return delegate;
|
return delegate;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,4 +48,14 @@ public final class CompletableFutureWrapper<T> extends CompletableFuture<T>
|
||||||
public void setEndOperationListener(EndOperationListener<T> endOperationListener) {
|
public void setEndOperationListener(EndOperationListener<T> endOperationListener) {
|
||||||
this.endOperationListener = endOperationListener;
|
this.endOperationListener = endOperationListener;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Class<?> getBatchPromiseClass() {
|
||||||
|
try {
|
||||||
|
// using Class.forName because this class is not available in the redisson version we compile
|
||||||
|
// against
|
||||||
|
return Class.forName("org.redisson.command.BatchPromise");
|
||||||
|
} catch (ClassNotFoundException exception) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,5 +10,5 @@ dependencies {
|
||||||
implementation("org.spockframework:spock-core")
|
implementation("org.spockframework:spock-core")
|
||||||
implementation("org.testcontainers:testcontainers")
|
implementation("org.testcontainers:testcontainers")
|
||||||
|
|
||||||
compileOnly("org.redisson:redisson:3.0.0")
|
compileOnly("org.redisson:redisson:3.7.2")
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,10 @@
|
||||||
import io.opentelemetry.api.trace.Span
|
import io.opentelemetry.api.trace.Span
|
||||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||||
|
import org.junit.Assume
|
||||||
import org.redisson.Redisson
|
import org.redisson.Redisson
|
||||||
|
import org.redisson.api.BatchOptions
|
||||||
|
import org.redisson.api.RBatch
|
||||||
import org.redisson.api.RBucket
|
import org.redisson.api.RBucket
|
||||||
import org.redisson.api.RFuture
|
import org.redisson.api.RFuture
|
||||||
import org.redisson.api.RScheduledExecutorService
|
import org.redisson.api.RScheduledExecutorService
|
||||||
|
@ -151,4 +154,85 @@ abstract class AbstractRedissonAsyncClientTest extends AgentInstrumentationSpeci
|
||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def "test atomic batch command"() {
|
||||||
|
try {
|
||||||
|
// available since 3.7.2
|
||||||
|
Class.forName('org.redisson.api.BatchOptions$ExecutionMode')
|
||||||
|
} catch (ClassNotFoundException exception) {
|
||||||
|
Assume.assumeNoException(exception)
|
||||||
|
}
|
||||||
|
|
||||||
|
when:
|
||||||
|
CompletionStage<Boolean> result = runWithSpan("parent") {
|
||||||
|
def batchOptions = BatchOptions.defaults().executionMode(BatchOptions.ExecutionMode.REDIS_WRITE_ATOMIC)
|
||||||
|
RBatch batch = redisson.createBatch(batchOptions)
|
||||||
|
batch.getBucket("batch1").setAsync("v1")
|
||||||
|
batch.getBucket("batch2").setAsync("v2")
|
||||||
|
RFuture<Boolean> future = batch.executeAsync()
|
||||||
|
return future.whenComplete({ res, throwable ->
|
||||||
|
if (!Span.current().getSpanContext().isValid()) {
|
||||||
|
new Exception("Callback should have a parent span.").printStackTrace()
|
||||||
|
}
|
||||||
|
runWithSpan("callback") {
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
then:
|
||||||
|
result.toCompletableFuture().get(30, TimeUnit.SECONDS)
|
||||||
|
assertTraces(1) {
|
||||||
|
trace(0, 5) {
|
||||||
|
span(0) {
|
||||||
|
name "parent"
|
||||||
|
kind INTERNAL
|
||||||
|
hasNoParent()
|
||||||
|
}
|
||||||
|
span(1) {
|
||||||
|
name "DB Query"
|
||||||
|
kind CLIENT
|
||||||
|
childOf(span(0))
|
||||||
|
attributes {
|
||||||
|
"$SemanticAttributes.NET_SOCK_PEER_ADDR" "127.0.0.1"
|
||||||
|
"$SemanticAttributes.NET_SOCK_PEER_NAME" "localhost"
|
||||||
|
"$SemanticAttributes.NET_SOCK_PEER_PORT" port
|
||||||
|
"$SemanticAttributes.DB_SYSTEM" "redis"
|
||||||
|
"$SemanticAttributes.DB_STATEMENT" "MULTI;SET batch1 ?"
|
||||||
|
"$SemanticAttributes.DB_OPERATION" null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
span(2) {
|
||||||
|
name "SET"
|
||||||
|
kind CLIENT
|
||||||
|
childOf(span(0))
|
||||||
|
attributes {
|
||||||
|
"$SemanticAttributes.NET_SOCK_PEER_ADDR" "127.0.0.1"
|
||||||
|
"$SemanticAttributes.NET_SOCK_PEER_NAME" "localhost"
|
||||||
|
"$SemanticAttributes.NET_SOCK_PEER_PORT" port
|
||||||
|
"$SemanticAttributes.DB_SYSTEM" "redis"
|
||||||
|
"$SemanticAttributes.DB_STATEMENT" "SET batch2 ?"
|
||||||
|
"$SemanticAttributes.DB_OPERATION" "SET"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
span(3) {
|
||||||
|
name "EXEC"
|
||||||
|
kind CLIENT
|
||||||
|
childOf(span(0))
|
||||||
|
attributes {
|
||||||
|
"$SemanticAttributes.NET_SOCK_PEER_ADDR" "127.0.0.1"
|
||||||
|
"$SemanticAttributes.NET_SOCK_PEER_NAME" "localhost"
|
||||||
|
"$SemanticAttributes.NET_SOCK_PEER_PORT" port
|
||||||
|
"$SemanticAttributes.DB_SYSTEM" "redis"
|
||||||
|
"$SemanticAttributes.DB_STATEMENT" "EXEC"
|
||||||
|
"$SemanticAttributes.DB_OPERATION" "EXEC"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
span(4) {
|
||||||
|
name "callback"
|
||||||
|
kind INTERNAL
|
||||||
|
childOf(span(0))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,9 @@
|
||||||
|
|
||||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||||
|
import org.junit.Assume
|
||||||
import org.redisson.Redisson
|
import org.redisson.Redisson
|
||||||
|
import org.redisson.api.BatchOptions
|
||||||
import org.redisson.api.RAtomicLong
|
import org.redisson.api.RAtomicLong
|
||||||
import org.redisson.api.RBatch
|
import org.redisson.api.RBatch
|
||||||
import org.redisson.api.RBucket
|
import org.redisson.api.RBucket
|
||||||
|
@ -21,6 +23,7 @@ import org.testcontainers.containers.GenericContainer
|
||||||
import spock.lang.Shared
|
import spock.lang.Shared
|
||||||
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
||||||
|
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
||||||
import static java.util.regex.Pattern.compile
|
import static java.util.regex.Pattern.compile
|
||||||
import static java.util.regex.Pattern.quote
|
import static java.util.regex.Pattern.quote
|
||||||
|
|
||||||
|
@ -129,6 +132,74 @@ abstract class AbstractRedissonClientTest extends AgentInstrumentationSpecificat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def "test atomic batch command"() {
|
||||||
|
try {
|
||||||
|
// available since 3.7.2
|
||||||
|
Class.forName('org.redisson.api.BatchOptions$ExecutionMode')
|
||||||
|
} catch (ClassNotFoundException exception) {
|
||||||
|
Assume.assumeNoException(exception)
|
||||||
|
}
|
||||||
|
|
||||||
|
when:
|
||||||
|
runWithSpan("parent") {
|
||||||
|
def batchOptions = BatchOptions.defaults().executionMode(BatchOptions.ExecutionMode.REDIS_WRITE_ATOMIC)
|
||||||
|
RBatch batch = redisson.createBatch(batchOptions)
|
||||||
|
batch.getBucket("batch1").setAsync("v1")
|
||||||
|
batch.getBucket("batch2").setAsync("v2")
|
||||||
|
batch.execute()
|
||||||
|
}
|
||||||
|
|
||||||
|
then:
|
||||||
|
assertTraces(1) {
|
||||||
|
trace(0, 4) {
|
||||||
|
span(0) {
|
||||||
|
name "parent"
|
||||||
|
kind INTERNAL
|
||||||
|
hasNoParent()
|
||||||
|
}
|
||||||
|
span(1) {
|
||||||
|
name "DB Query"
|
||||||
|
kind CLIENT
|
||||||
|
childOf(span(0))
|
||||||
|
attributes {
|
||||||
|
"$SemanticAttributes.NET_SOCK_PEER_ADDR" "127.0.0.1"
|
||||||
|
"$SemanticAttributes.NET_SOCK_PEER_NAME" "localhost"
|
||||||
|
"$SemanticAttributes.NET_SOCK_PEER_PORT" port
|
||||||
|
"$SemanticAttributes.DB_SYSTEM" "redis"
|
||||||
|
"$SemanticAttributes.DB_STATEMENT" "MULTI;SET batch1 ?"
|
||||||
|
"$SemanticAttributes.DB_OPERATION" null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
span(2) {
|
||||||
|
name "SET"
|
||||||
|
kind CLIENT
|
||||||
|
childOf(span(0))
|
||||||
|
attributes {
|
||||||
|
"$SemanticAttributes.NET_SOCK_PEER_ADDR" "127.0.0.1"
|
||||||
|
"$SemanticAttributes.NET_SOCK_PEER_NAME" "localhost"
|
||||||
|
"$SemanticAttributes.NET_SOCK_PEER_PORT" port
|
||||||
|
"$SemanticAttributes.DB_SYSTEM" "redis"
|
||||||
|
"$SemanticAttributes.DB_STATEMENT" "SET batch2 ?"
|
||||||
|
"$SemanticAttributes.DB_OPERATION" "SET"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
span(3) {
|
||||||
|
name "EXEC"
|
||||||
|
kind CLIENT
|
||||||
|
childOf(span(0))
|
||||||
|
attributes {
|
||||||
|
"$SemanticAttributes.NET_SOCK_PEER_ADDR" "127.0.0.1"
|
||||||
|
"$SemanticAttributes.NET_SOCK_PEER_NAME" "localhost"
|
||||||
|
"$SemanticAttributes.NET_SOCK_PEER_PORT" port
|
||||||
|
"$SemanticAttributes.DB_SYSTEM" "redis"
|
||||||
|
"$SemanticAttributes.DB_STATEMENT" "EXEC"
|
||||||
|
"$SemanticAttributes.DB_OPERATION" "EXEC"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
def "test list command"() {
|
def "test list command"() {
|
||||||
when:
|
when:
|
||||||
RList<String> strings = redisson.getList("list1")
|
RList<String> strings = redisson.getList("list1")
|
||||||
|
|
Loading…
Reference in New Issue