Instrumenting cassandra executeReactive method (#6441)
It follows the
[issue](https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/6395#issue-1323561263)
I opened some days ago.
The `executeReactive` method use the same processor used by
`executeAsync` (see
[here](65d2c19c40/core/src/main/java/com/datastax/dse/driver/internal/core/cql/reactive/CqlRequestReactiveProcessor.java (L53)
))
and wrap the callback in the `DefaultReactiveResultSet` publisher.
Here I'm simply overriding the `executeReactive` method doing the same
thing: call the already instrumented `executeAsync` method and wrapping
the callback using the `DefaultReactiveResultSet` publisher.
~~I did an upgrade of the `java-driver-core` library to have
`TracingCqlSession.java` extending the `ReactiveSession`. I have to
probably rename the `cassandra-4.0` module in `cassandra-4.14` but I'll
let you confirm this.~~ -> Cassandra-4.4 is enough.
---------
Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
This commit is contained in:
parent
583e2a76f8
commit
1a7e0f3235
|
@ -42,7 +42,7 @@ These are the supported libraries and frameworks:
|
|||
| [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/java-handler.html) | 1.0+ | [opentelemetry-aws-lambda-core-1.0](../instrumentation/aws-lambda/aws-lambda-core-1.0/library),<br>[opentelemetry-aws-lambda-events-2.2](../instrumentation/aws-lambda/aws-lambda-events-2.2/library) | [FaaS Server Spans] |
|
||||
| [AWS SDK](https://aws.amazon.com/sdk-for-java/) | 1.11.x and 2.2.0+ | [opentelemetry-aws-sdk-1.11](../instrumentation/aws-sdk/aws-sdk-1.11/library),<br>[opentelemetry-aws-sdk-1.11-autoconfigure](../instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure),<br>[opentelemetry-aws-sdk-2.2](../instrumentation/aws-sdk/aws-sdk-2.2/library),<br>[opentelemetry-aws-sdk-2.2-autoconfigure](../instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure) | [Messaging Spans], [Database Client Spans], [HTTP Client Spans] |
|
||||
| [Azure Core](https://docs.microsoft.com/en-us/java/api/overview/azure/core-readme) | 1.14+ | N/A | Context propagation |
|
||||
| [Cassandra Driver](https://github.com/datastax/java-driver) | 3.0+ | N/A | [Database Client Spans] |
|
||||
| [Cassandra Driver](https://github.com/datastax/java-driver) | 3.0+ | [opentelemetry-cassandra-4.4](../instrumentation/cassandra/cassandra-4.4/library) | [Database Client Spans] |
|
||||
| [Couchbase Client](https://github.com/couchbase/couchbase-java-client) | 2.0+ and 3.1+ | N/A | [Database Client Spans] |
|
||||
| [c3p0](https://github.com/swaldman/c3p0) | 0.9.2+ | [opentelemetry-c3p0-0.9](../instrumentation/c3p0-0.9/library) | [Database Pool Metrics] |
|
||||
| [Dropwizard Metrics](https://metrics.dropwizard.io/) | 4.0+ (disabled by default) | N/A | none |
|
||||
|
|
|
@ -35,6 +35,9 @@ dependencies {
|
|||
testInstrumentation(project(":instrumentation:guava-10.0:javaagent"))
|
||||
|
||||
latestDepTestLibrary("com.datastax.cassandra:cassandra-driver-core:3.+") // see cassandra-4.0 module
|
||||
|
||||
testInstrumentation(project(":instrumentation:cassandra:cassandra-4.0:javaagent"))
|
||||
testInstrumentation(project(":instrumentation:cassandra:cassandra-4.4:javaagent"))
|
||||
}
|
||||
|
||||
// Requires old Guava. Can't use enforcedPlatform since predates BOM
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
plugins {
|
||||
id("otel.java-conventions")
|
||||
}
|
||||
|
||||
dependencies {
|
||||
api(project(":testing-common"))
|
||||
|
||||
implementation("org.testcontainers:testcontainers:1.17.5")
|
||||
implementation("com.datastax.oss:java-driver-core:4.0.0")
|
||||
}
|
|
@ -3,6 +3,8 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.cassandra.v4.common;
|
||||
|
||||
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
|
||||
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CASSANDRA_CONSISTENCY_LEVEL;
|
||||
|
@ -26,14 +28,12 @@ import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
|
|||
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
|
||||
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.time.Duration;
|
||||
import java.util.stream.Stream;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
@ -42,17 +42,20 @@ import org.slf4j.LoggerFactory;
|
|||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.output.Slf4jLogConsumer;
|
||||
|
||||
public class CassandraClientTest {
|
||||
public abstract class AbstractCassandraTest {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(CassandraClientTest.class);
|
||||
|
||||
@RegisterExtension
|
||||
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
|
||||
private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraTest.class);
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private static GenericContainer cassandra;
|
||||
|
||||
private static int cassandraPort;
|
||||
protected static int cassandraPort;
|
||||
|
||||
protected abstract InstrumentationExtension testing();
|
||||
|
||||
protected CqlSession wrap(CqlSession session) {
|
||||
return session;
|
||||
}
|
||||
|
||||
@BeforeAll
|
||||
static void beforeAll() {
|
||||
|
@ -79,30 +82,33 @@ public class CassandraClientTest {
|
|||
|
||||
session.execute(parameter.statement);
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName(parameter.spanName)
|
||||
.hasKind(SpanKind.CLIENT)
|
||||
.hasNoParent()
|
||||
.hasAttributesSatisfyingExactly(
|
||||
equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"),
|
||||
equalTo(NET_SOCK_PEER_NAME, "localhost"),
|
||||
equalTo(NET_SOCK_PEER_PORT, cassandraPort),
|
||||
equalTo(DB_SYSTEM, "cassandra"),
|
||||
equalTo(DB_NAME, parameter.keyspace),
|
||||
equalTo(DB_STATEMENT, parameter.expectedStatement),
|
||||
equalTo(DB_OPERATION, parameter.operation),
|
||||
equalTo(DB_CASSANDRA_CONSISTENCY_LEVEL, "LOCAL_ONE"),
|
||||
equalTo(DB_CASSANDRA_COORDINATOR_DC, "datacenter1"),
|
||||
satisfies(
|
||||
DB_CASSANDRA_COORDINATOR_ID, val -> val.isInstanceOf(String.class)),
|
||||
satisfies(
|
||||
DB_CASSANDRA_IDEMPOTENCE, val -> val.isInstanceOf(Boolean.class)),
|
||||
equalTo(DB_CASSANDRA_PAGE_SIZE, 5000),
|
||||
equalTo(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, 0),
|
||||
equalTo(DB_CASSANDRA_TABLE, parameter.table))));
|
||||
testing()
|
||||
.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName(parameter.spanName)
|
||||
.hasKind(SpanKind.CLIENT)
|
||||
.hasNoParent()
|
||||
.hasAttributesSatisfyingExactly(
|
||||
equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"),
|
||||
equalTo(NET_SOCK_PEER_NAME, "localhost"),
|
||||
equalTo(NET_SOCK_PEER_PORT, cassandraPort),
|
||||
equalTo(DB_SYSTEM, "cassandra"),
|
||||
equalTo(DB_NAME, parameter.keyspace),
|
||||
equalTo(DB_STATEMENT, parameter.expectedStatement),
|
||||
equalTo(DB_OPERATION, parameter.operation),
|
||||
equalTo(DB_CASSANDRA_CONSISTENCY_LEVEL, "LOCAL_ONE"),
|
||||
equalTo(DB_CASSANDRA_COORDINATOR_DC, "datacenter1"),
|
||||
satisfies(
|
||||
DB_CASSANDRA_COORDINATOR_ID,
|
||||
val -> val.isInstanceOf(String.class)),
|
||||
satisfies(
|
||||
DB_CASSANDRA_IDEMPOTENCE,
|
||||
val -> val.isInstanceOf(Boolean.class)),
|
||||
equalTo(DB_CASSANDRA_PAGE_SIZE, 5000),
|
||||
equalTo(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, 0),
|
||||
equalTo(DB_CASSANDRA_TABLE, parameter.table))));
|
||||
|
||||
session.close();
|
||||
}
|
||||
|
@ -112,42 +118,48 @@ public class CassandraClientTest {
|
|||
void asyncTest(Parameter parameter) throws Exception {
|
||||
CqlSession session = getSession(parameter.keyspace);
|
||||
|
||||
testing.runWithSpan(
|
||||
"parent",
|
||||
() ->
|
||||
session
|
||||
.executeAsync(parameter.statement)
|
||||
.toCompletableFuture()
|
||||
.whenComplete((result, throwable) -> testing.runWithSpan("child", () -> {}))
|
||||
.get());
|
||||
testing()
|
||||
.runWithSpan(
|
||||
"parent",
|
||||
() ->
|
||||
session
|
||||
.executeAsync(parameter.statement)
|
||||
.toCompletableFuture()
|
||||
.whenComplete((result, throwable) -> testing().runWithSpan("child", () -> {}))
|
||||
.get());
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
|
||||
span ->
|
||||
span.hasName(parameter.spanName)
|
||||
.hasKind(SpanKind.CLIENT)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributesSatisfyingExactly(
|
||||
equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"),
|
||||
equalTo(NET_SOCK_PEER_NAME, "localhost"),
|
||||
equalTo(NET_SOCK_PEER_PORT, cassandraPort),
|
||||
equalTo(DB_SYSTEM, "cassandra"),
|
||||
equalTo(DB_NAME, parameter.keyspace),
|
||||
equalTo(DB_STATEMENT, parameter.expectedStatement),
|
||||
equalTo(DB_OPERATION, parameter.operation),
|
||||
equalTo(DB_CASSANDRA_CONSISTENCY_LEVEL, "LOCAL_ONE"),
|
||||
equalTo(DB_CASSANDRA_COORDINATOR_DC, "datacenter1"),
|
||||
satisfies(
|
||||
DB_CASSANDRA_COORDINATOR_ID, val -> val.isInstanceOf(String.class)),
|
||||
satisfies(
|
||||
DB_CASSANDRA_IDEMPOTENCE, val -> val.isInstanceOf(Boolean.class)),
|
||||
equalTo(DB_CASSANDRA_PAGE_SIZE, 5000),
|
||||
equalTo(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, 0),
|
||||
equalTo(DB_CASSANDRA_TABLE, parameter.table)),
|
||||
span ->
|
||||
span.hasName("child").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(0))));
|
||||
testing()
|
||||
.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
|
||||
span ->
|
||||
span.hasName(parameter.spanName)
|
||||
.hasKind(SpanKind.CLIENT)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributesSatisfyingExactly(
|
||||
equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"),
|
||||
equalTo(NET_SOCK_PEER_NAME, "localhost"),
|
||||
equalTo(NET_SOCK_PEER_PORT, cassandraPort),
|
||||
equalTo(DB_SYSTEM, "cassandra"),
|
||||
equalTo(DB_NAME, parameter.keyspace),
|
||||
equalTo(DB_STATEMENT, parameter.expectedStatement),
|
||||
equalTo(DB_OPERATION, parameter.operation),
|
||||
equalTo(DB_CASSANDRA_CONSISTENCY_LEVEL, "LOCAL_ONE"),
|
||||
equalTo(DB_CASSANDRA_COORDINATOR_DC, "datacenter1"),
|
||||
satisfies(
|
||||
DB_CASSANDRA_COORDINATOR_ID,
|
||||
val -> val.isInstanceOf(String.class)),
|
||||
satisfies(
|
||||
DB_CASSANDRA_IDEMPOTENCE,
|
||||
val -> val.isInstanceOf(Boolean.class)),
|
||||
equalTo(DB_CASSANDRA_PAGE_SIZE, 5000),
|
||||
equalTo(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, 0),
|
||||
equalTo(DB_CASSANDRA_TABLE, parameter.table)),
|
||||
span ->
|
||||
span.hasName("child")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0))));
|
||||
|
||||
session.close();
|
||||
}
|
||||
|
@ -260,7 +272,7 @@ public class CassandraClientTest {
|
|||
"users"))));
|
||||
}
|
||||
|
||||
private static class Parameter {
|
||||
protected static class Parameter {
|
||||
public final String keyspace;
|
||||
public final String statement;
|
||||
public final String expectedStatement;
|
||||
|
@ -284,16 +296,17 @@ public class CassandraClientTest {
|
|||
}
|
||||
}
|
||||
|
||||
CqlSession getSession(String keyspace) {
|
||||
protected CqlSession getSession(String keyspace) {
|
||||
DriverConfigLoader configLoader =
|
||||
DefaultDriverConfigLoader.builder()
|
||||
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(0))
|
||||
.build();
|
||||
return CqlSession.builder()
|
||||
.addContactPoint(new InetSocketAddress("localhost", cassandraPort))
|
||||
.withConfigLoader(configLoader)
|
||||
.withLocalDatacenter("datacenter1")
|
||||
.withKeyspace(keyspace)
|
||||
.build();
|
||||
return wrap(
|
||||
CqlSession.builder()
|
||||
.addContactPoint(new InetSocketAddress("localhost", cassandraPort))
|
||||
.withConfigLoader(configLoader)
|
||||
.withLocalDatacenter("datacenter1")
|
||||
.withKeyspace(keyspace)
|
||||
.build());
|
||||
}
|
||||
}
|
|
@ -6,7 +6,7 @@ muzzle {
|
|||
pass {
|
||||
group.set("com.datastax.oss")
|
||||
module.set("java-driver-core")
|
||||
versions.set("[4.0,)")
|
||||
versions.set("[4.0,4.4)")
|
||||
assertInverse.set(true)
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,11 @@ dependencies {
|
|||
|
||||
compileOnly("com.google.auto.value:auto-value-annotations")
|
||||
annotationProcessor("com.google.auto.value:auto-value")
|
||||
|
||||
testImplementation(project(":instrumentation:cassandra:cassandra-4-common:testing"))
|
||||
|
||||
testInstrumentation(project(":instrumentation:cassandra:cassandra-3.0:javaagent"))
|
||||
testInstrumentation(project(":instrumentation:cassandra:cassandra-4.4:javaagent"))
|
||||
}
|
||||
|
||||
tasks {
|
||||
|
|
|
@ -5,15 +5,19 @@
|
|||
|
||||
package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
|
||||
|
||||
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.not;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import java.util.List;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
@AutoService(InstrumentationModule.class)
|
||||
public class CassandraClientInstrumentationModule extends InstrumentationModule {
|
||||
|
||||
public CassandraClientInstrumentationModule() {
|
||||
super("cassandra", "cassandra-4.0");
|
||||
}
|
||||
|
@ -22,4 +26,10 @@ public class CassandraClientInstrumentationModule extends InstrumentationModule
|
|||
public List<TypeInstrumentation> typeInstrumentations() {
|
||||
return singletonList(new SessionBuilderInstrumentation());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
|
||||
// new public interface introduced in version 4.4
|
||||
return not(hasClassesNamed("com.datastax.dse.driver.api.core.cql.reactive.ReactiveSession"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import io.opentelemetry.cassandra.v4.common.AbstractCassandraTest;
|
||||
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
public class CassandraTest extends AbstractCassandraTest {
|
||||
|
||||
@RegisterExtension
|
||||
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
|
||||
|
||||
@Override
|
||||
protected InstrumentationExtension testing() {
|
||||
return testing;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
plugins {
|
||||
id("otel.javaagent-instrumentation")
|
||||
}
|
||||
|
||||
muzzle {
|
||||
pass {
|
||||
group.set("com.datastax.oss")
|
||||
module.set("java-driver-core")
|
||||
versions.set("[4.4,]")
|
||||
assertInverse.set(true)
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation(project(":instrumentation:cassandra:cassandra-4.4:library"))
|
||||
|
||||
library("com.datastax.oss:java-driver-core:4.4.0")
|
||||
|
||||
compileOnly("com.google.auto.value:auto-value-annotations")
|
||||
annotationProcessor("com.google.auto.value:auto-value")
|
||||
|
||||
testImplementation("io.projectreactor:reactor-core:3.4.21")
|
||||
testImplementation(project(":instrumentation:cassandra:cassandra-4.4:testing"))
|
||||
|
||||
testInstrumentation(project(":instrumentation:cassandra:cassandra-3.0:javaagent"))
|
||||
testInstrumentation(project(":instrumentation:cassandra:cassandra-4.0:javaagent"))
|
||||
}
|
||||
|
||||
tasks {
|
||||
test {
|
||||
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].getService())
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.cassandra.v4_4;
|
||||
|
||||
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
|
||||
import static java.util.Collections.singletonList;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import java.util.List;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
@AutoService(InstrumentationModule.class)
|
||||
public class CassandraClientInstrumentationModule extends InstrumentationModule {
|
||||
|
||||
public CassandraClientInstrumentationModule() {
|
||||
super("cassandra", "cassandra-4.4");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TypeInstrumentation> typeInstrumentations() {
|
||||
return singletonList(new SessionBuilderInstrumentation());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
|
||||
// new public interface introduced in version 4.4
|
||||
return hasClassesNamed("com.datastax.dse.driver.api.core.cql.reactive.ReactiveSession");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.cassandra.v4_4;
|
||||
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.instrumentation.cassandra.v4_4.CassandraTelemetry;
|
||||
import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig;
|
||||
|
||||
final class CassandraSingletons {
|
||||
|
||||
static final CassandraTelemetry telemetry =
|
||||
CassandraTelemetry.builder(GlobalOpenTelemetry.get())
|
||||
.setStatementSanitizationEnabled(CommonConfig.get().isStatementSanitizationEnabled())
|
||||
.build();
|
||||
|
||||
private CassandraSingletons() {}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.cassandra.v4_4;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class CompletionStageFunction implements Function<Object, Object> {
|
||||
|
||||
@Override
|
||||
public Object apply(Object session) {
|
||||
if (session == null) {
|
||||
return null;
|
||||
}
|
||||
// This should cover ours and OT's TracingCqlSession
|
||||
if (session.getClass().getName().endsWith("cassandra4.TracingCqlSession")) {
|
||||
return session;
|
||||
}
|
||||
return CassandraSingletons.telemetry.wrap((CqlSession) session);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.cassandra.v4_4;
|
||||
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
|
||||
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
public class SessionBuilderInstrumentation implements TypeInstrumentation {
|
||||
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
// Note: Cassandra has a large driver and we instrument single class in it.
|
||||
// The rest is ignored in AdditionalLibraryIgnoresMatcher
|
||||
return named("com.datastax.oss.driver.api.core.session.SessionBuilder");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
isMethod().and(isPublic()).and(named("buildAsync")).and(takesArguments(0)),
|
||||
SessionBuilderInstrumentation.class.getName() + "$BuildAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class BuildAdvice {
|
||||
|
||||
/**
|
||||
* Strategy: each time we build a connection to a Cassandra cluster, the
|
||||
* com.datastax.oss.driver.api.core.session.SessionBuilder.buildAsync() method is called. The
|
||||
* opentracing contribution is a simple wrapper, so we just have to wrap the new session.
|
||||
*
|
||||
* @param stage The fresh CompletionStage to patch. This stage produces session which is
|
||||
* replaced with new session
|
||||
*/
|
||||
@Advice.OnMethodExit(suppress = Throwable.class)
|
||||
public static void injectTracingSession(
|
||||
@Advice.Return(readOnly = false) CompletionStage<?> stage) {
|
||||
stage = stage.thenApply(new CompletionStageFunction());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.cassandra.v4_4;
|
||||
|
||||
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||
import io.opentelemetry.testing.cassandra.v4_4.AbstractCassandra44Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
public class CassandraTest extends AbstractCassandra44Test {
|
||||
|
||||
@RegisterExtension
|
||||
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
|
||||
|
||||
@Override
|
||||
protected InstrumentationExtension testing() {
|
||||
return testing;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
plugins {
|
||||
id("otel.library-instrumentation")
|
||||
}
|
||||
|
||||
dependencies {
|
||||
library("com.datastax.oss:java-driver-core:4.4.0")
|
||||
|
||||
compileOnly("com.google.auto.value:auto-value-annotations")
|
||||
annotationProcessor("com.google.auto.value:auto-value")
|
||||
|
||||
testImplementation(project(":instrumentation:cassandra:cassandra-4.4:testing"))
|
||||
}
|
|
@ -0,0 +1,77 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.cassandra.v4_4;
|
||||
|
||||
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
|
||||
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
|
||||
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
|
||||
import com.datastax.oss.driver.api.core.cql.Statement;
|
||||
import com.datastax.oss.driver.api.core.metadata.Node;
|
||||
import io.opentelemetry.api.common.AttributesBuilder;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
final class CassandraAttributesExtractor
|
||||
implements AttributesExtractor<CassandraRequest, ExecutionInfo> {
|
||||
|
||||
@Override
|
||||
public void onStart(
|
||||
AttributesBuilder attributes, Context parentContext, CassandraRequest request) {}
|
||||
|
||||
@Override
|
||||
public void onEnd(
|
||||
AttributesBuilder attributes,
|
||||
Context context,
|
||||
CassandraRequest request,
|
||||
@Nullable ExecutionInfo executionInfo,
|
||||
@Nullable Throwable error) {
|
||||
if (executionInfo == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
Node coordinator = executionInfo.getCoordinator();
|
||||
if (coordinator != null) {
|
||||
if (coordinator.getDatacenter() != null) {
|
||||
attributes.put(SemanticAttributes.DB_CASSANDRA_COORDINATOR_DC, coordinator.getDatacenter());
|
||||
}
|
||||
if (coordinator.getHostId() != null) {
|
||||
attributes.put(
|
||||
SemanticAttributes.DB_CASSANDRA_COORDINATOR_ID, coordinator.getHostId().toString());
|
||||
}
|
||||
}
|
||||
attributes.put(
|
||||
SemanticAttributes.DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT,
|
||||
executionInfo.getSpeculativeExecutionCount());
|
||||
|
||||
Statement<?> statement = (Statement<?>) executionInfo.getRequest();
|
||||
String consistencyLevel;
|
||||
DriverExecutionProfile config =
|
||||
request.getSession().getContext().getConfig().getDefaultProfile();
|
||||
if (statement.getConsistencyLevel() != null) {
|
||||
consistencyLevel = statement.getConsistencyLevel().name();
|
||||
} else {
|
||||
consistencyLevel = config.getString(DefaultDriverOption.REQUEST_CONSISTENCY);
|
||||
}
|
||||
attributes.put(SemanticAttributes.DB_CASSANDRA_CONSISTENCY_LEVEL, consistencyLevel);
|
||||
|
||||
if (statement.getPageSize() > 0) {
|
||||
attributes.put(SemanticAttributes.DB_CASSANDRA_PAGE_SIZE, statement.getPageSize());
|
||||
} else {
|
||||
int pageSize = config.getInt(DefaultDriverOption.REQUEST_PAGE_SIZE);
|
||||
if (pageSize > 0) {
|
||||
attributes.put(SemanticAttributes.DB_CASSANDRA_PAGE_SIZE, pageSize);
|
||||
}
|
||||
}
|
||||
|
||||
Boolean idempotent = statement.isIdempotent();
|
||||
if (idempotent == null) {
|
||||
idempotent = config.getBoolean(DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE);
|
||||
}
|
||||
attributes.put(SemanticAttributes.DB_CASSANDRA_IDEMPOTENCE, idempotent);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.cassandra.v4_4;
|
||||
|
||||
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
|
||||
import com.datastax.oss.driver.api.core.metadata.Node;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.net.InetSocketAddressNetClientAttributesGetter;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
final class CassandraNetAttributesGetter
|
||||
extends InetSocketAddressNetClientAttributesGetter<CassandraRequest, ExecutionInfo> {
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public String getTransport(CassandraRequest request, @Nullable ExecutionInfo executionInfo) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String getPeerName(CassandraRequest request) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Integer getPeerPort(CassandraRequest request) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
protected InetSocketAddress getPeerSocketAddress(
|
||||
CassandraRequest request, @Nullable ExecutionInfo executionInfo) {
|
||||
if (executionInfo == null) {
|
||||
return null;
|
||||
}
|
||||
Node coordinator = executionInfo.getCoordinator();
|
||||
if (coordinator == null) {
|
||||
return null;
|
||||
}
|
||||
// resolve() returns an existing InetSocketAddress, it does not do a dns resolve,
|
||||
// at least in the only current EndPoint implementation (DefaultEndPoint)
|
||||
SocketAddress address = coordinator.getEndPoint().resolve();
|
||||
return address instanceof InetSocketAddress ? (InetSocketAddress) address : null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.cassandra.v4_4;
|
||||
|
||||
import com.datastax.oss.driver.api.core.session.Session;
|
||||
import com.google.auto.value.AutoValue;
|
||||
|
||||
@AutoValue
|
||||
public abstract class CassandraRequest {
|
||||
|
||||
public static CassandraRequest create(Session session, String statement) {
|
||||
return new AutoValue_CassandraRequest(session, statement);
|
||||
}
|
||||
|
||||
public abstract Session getSession();
|
||||
|
||||
public abstract String getStatement();
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.cassandra.v4_4;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlIdentifier;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.db.SqlClientAttributesGetter;
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
final class CassandraSqlAttributesGetter implements SqlClientAttributesGetter<CassandraRequest> {
|
||||
|
||||
@Override
|
||||
public String getSystem(CassandraRequest request) {
|
||||
return SemanticAttributes.DbSystemValues.CASSANDRA;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public String getUser(CassandraRequest request) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public String getName(CassandraRequest request) {
|
||||
return request.getSession().getKeyspace().map(CqlIdentifier::toString).orElse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public String getConnectionString(CassandraRequest request) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public String getRawStatement(CassandraRequest request) {
|
||||
return request.getStatement();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.cassandra.v4_4;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
|
||||
import io.opentelemetry.api.OpenTelemetry;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
|
||||
/** Entrypoint for instrumenting cassandra sessions. */
|
||||
public class CassandraTelemetry {
|
||||
|
||||
/** Returns a new {@link CassandraTelemetry} configured with the given {@link OpenTelemetry}. */
|
||||
public static CassandraTelemetry create(OpenTelemetry openTelemetry) {
|
||||
return builder(openTelemetry).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new {@link CassandraTelemetryBuilder} configured with the given {@link
|
||||
* OpenTelemetry}.
|
||||
*/
|
||||
public static CassandraTelemetryBuilder builder(OpenTelemetry openTelemetry) {
|
||||
return new CassandraTelemetryBuilder(openTelemetry);
|
||||
}
|
||||
|
||||
private final Instrumenter<CassandraRequest, ExecutionInfo> instrumenter;
|
||||
|
||||
protected CassandraTelemetry(Instrumenter<CassandraRequest, ExecutionInfo> instrumenter) {
|
||||
this.instrumenter = instrumenter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a new tracing-enable CqlSession using the provided {@link CqlSession} instance.
|
||||
*
|
||||
* @param session An instance of CqlSession configured as desired.
|
||||
* @return a {@link TracingCqlSession}.
|
||||
*/
|
||||
public CqlSession wrap(CqlSession session) {
|
||||
return new TracingCqlSession(session, instrumenter);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.cassandra.v4_4;
|
||||
|
||||
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
|
||||
import com.google.errorprone.annotations.CanIgnoreReturnValue;
|
||||
import io.opentelemetry.api.OpenTelemetry;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.db.DbClientSpanNameExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.db.SqlClientAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesExtractor;
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
||||
|
||||
/** A builder of {@link CassandraTelemetry}. */
|
||||
public class CassandraTelemetryBuilder {
|
||||
|
||||
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.cassandra-4.4";
|
||||
|
||||
private final OpenTelemetry openTelemetry;
|
||||
|
||||
private boolean statementSanitizationEnabled = true;
|
||||
|
||||
protected CassandraTelemetryBuilder(OpenTelemetry openTelemetry) {
|
||||
this.openTelemetry = openTelemetry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets whether the {@code db.statement} attribute on the spans emitted by the constructed {@link
|
||||
* CassandraTelemetry} should be sanitized. If set to {@code true}, all parameters that can
|
||||
* potentially contain sensitive information will be masked. Enabled by default.
|
||||
*/
|
||||
@CanIgnoreReturnValue
|
||||
public CassandraTelemetryBuilder setStatementSanitizationEnabled(boolean enabled) {
|
||||
this.statementSanitizationEnabled = enabled;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new {@link CassandraTelemetry} with the settings of this {@link
|
||||
* CassandraTelemetryBuilder}.
|
||||
*/
|
||||
public CassandraTelemetry build() {
|
||||
return new CassandraTelemetry(createInstrumenter(openTelemetry, statementSanitizationEnabled));
|
||||
}
|
||||
|
||||
protected Instrumenter<CassandraRequest, ExecutionInfo> createInstrumenter(
|
||||
OpenTelemetry openTelemetry, boolean statementSanitizationEnabled) {
|
||||
CassandraSqlAttributesGetter attributesGetter = new CassandraSqlAttributesGetter();
|
||||
|
||||
return Instrumenter.<CassandraRequest, ExecutionInfo>builder(
|
||||
openTelemetry, INSTRUMENTATION_NAME, DbClientSpanNameExtractor.create(attributesGetter))
|
||||
.addAttributesExtractor(
|
||||
SqlClientAttributesExtractor.builder(attributesGetter)
|
||||
.setTableAttribute(SemanticAttributes.DB_CASSANDRA_TABLE)
|
||||
.setStatementSanitizationEnabled(statementSanitizationEnabled)
|
||||
.build())
|
||||
.addAttributesExtractor(
|
||||
NetClientAttributesExtractor.create(new CassandraNetAttributesGetter()))
|
||||
.addAttributesExtractor(new CassandraAttributesExtractor())
|
||||
.buildInstrumenter(SpanKindExtractor.alwaysClient());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,273 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.cassandra.v4_4;
|
||||
|
||||
import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet;
|
||||
import com.datastax.dse.driver.internal.core.cql.reactive.DefaultReactiveResultSet;
|
||||
import com.datastax.oss.driver.api.core.CqlIdentifier;
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.DriverException;
|
||||
import com.datastax.oss.driver.api.core.context.DriverContext;
|
||||
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
|
||||
import com.datastax.oss.driver.api.core.cql.BoundStatement;
|
||||
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
|
||||
import com.datastax.oss.driver.api.core.cql.PrepareRequest;
|
||||
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
|
||||
import com.datastax.oss.driver.api.core.cql.ResultSet;
|
||||
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
|
||||
import com.datastax.oss.driver.api.core.cql.Statement;
|
||||
import com.datastax.oss.driver.api.core.metadata.Metadata;
|
||||
import com.datastax.oss.driver.api.core.metrics.Metrics;
|
||||
import com.datastax.oss.driver.api.core.session.Request;
|
||||
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.function.Supplier;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
public class TracingCqlSession implements CqlSession {
|
||||
private final CqlSession session;
|
||||
|
||||
private final Instrumenter<CassandraRequest, ExecutionInfo> instrumenter;
|
||||
|
||||
public TracingCqlSession(
|
||||
CqlSession session, Instrumenter<CassandraRequest, ExecutionInfo> instrumenter) {
|
||||
this.session = session;
|
||||
this.instrumenter = instrumenter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PreparedStatement prepare(SimpleStatement statement) {
|
||||
return session.prepare(statement);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PreparedStatement prepare(String query) {
|
||||
return session.prepare(query);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PreparedStatement prepare(PrepareRequest request) {
|
||||
return session.prepare(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<PreparedStatement> prepareAsync(SimpleStatement statement) {
|
||||
return session.prepareAsync(statement);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<PreparedStatement> prepareAsync(String query) {
|
||||
return session.prepareAsync(query);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<PreparedStatement> prepareAsync(PrepareRequest request) {
|
||||
return session.prepareAsync(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return session.getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Metadata getMetadata() {
|
||||
return session.getMetadata();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSchemaMetadataEnabled() {
|
||||
return session.isSchemaMetadataEnabled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Metadata> setSchemaMetadataEnabled(@Nullable Boolean newValue) {
|
||||
return session.setSchemaMetadataEnabled(newValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Metadata> refreshSchemaAsync() {
|
||||
return session.refreshSchemaAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Metadata refreshSchema() {
|
||||
return session.refreshSchema();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Boolean> checkSchemaAgreementAsync() {
|
||||
return session.checkSchemaAgreementAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkSchemaAgreement() {
|
||||
return session.checkSchemaAgreement();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DriverContext getContext() {
|
||||
return session.getContext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<CqlIdentifier> getKeyspace() {
|
||||
return session.getKeyspace();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Metrics> getMetrics() {
|
||||
return session.getMetrics();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Void> closeFuture() {
|
||||
return session.closeFuture();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
return session.isClosed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Void> closeAsync() {
|
||||
return session.closeAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Void> forceCloseAsync() {
|
||||
return session.forceCloseAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
session.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public <REQUEST extends Request, RESULT> RESULT execute(
|
||||
REQUEST request, GenericType<RESULT> resultType) {
|
||||
return session.execute(request, resultType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultSet execute(String query) {
|
||||
CassandraRequest request = CassandraRequest.create(session, query);
|
||||
Context context = instrumenter.start(Context.current(), request);
|
||||
ResultSet resultSet;
|
||||
try (Scope ignored = context.makeCurrent()) {
|
||||
resultSet = session.execute(query);
|
||||
} catch (RuntimeException e) {
|
||||
instrumenter.end(context, request, getExecutionInfo(e), e);
|
||||
throw e;
|
||||
}
|
||||
instrumenter.end(context, request, resultSet.getExecutionInfo(), null);
|
||||
return resultSet;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultSet execute(Statement<?> statement) {
|
||||
String query = getQuery(statement);
|
||||
CassandraRequest request = CassandraRequest.create(session, query);
|
||||
Context context = instrumenter.start(Context.current(), request);
|
||||
ResultSet resultSet;
|
||||
try (Scope ignored = context.makeCurrent()) {
|
||||
resultSet = session.execute(statement);
|
||||
} catch (RuntimeException e) {
|
||||
instrumenter.end(context, request, getExecutionInfo(e), e);
|
||||
throw e;
|
||||
}
|
||||
instrumenter.end(context, request, resultSet.getExecutionInfo(), null);
|
||||
return resultSet;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<AsyncResultSet> executeAsync(Statement<?> statement) {
|
||||
String query = getQuery(statement);
|
||||
CassandraRequest request = CassandraRequest.create(session, query);
|
||||
return executeAsync(request, () -> session.executeAsync(statement));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<AsyncResultSet> executeAsync(String query) {
|
||||
CassandraRequest request = CassandraRequest.create(session, query);
|
||||
return executeAsync(request, () -> session.executeAsync(query));
|
||||
}
|
||||
|
||||
private CompletionStage<AsyncResultSet> executeAsync(
|
||||
CassandraRequest request, Supplier<CompletionStage<AsyncResultSet>> query) {
|
||||
Context parentContext = Context.current();
|
||||
Context context = instrumenter.start(parentContext, request);
|
||||
try (Scope ignored = context.makeCurrent()) {
|
||||
CompletionStage<AsyncResultSet> stage = query.get();
|
||||
return wrap(
|
||||
stage.whenComplete(
|
||||
(asyncResultSet, throwable) ->
|
||||
instrumenter.end(
|
||||
context, request, getExecutionInfo(asyncResultSet, throwable), throwable)),
|
||||
parentContext);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReactiveResultSet executeReactive(Statement<?> statement) {
|
||||
return new DefaultReactiveResultSet(() -> executeAsync(statement));
|
||||
}
|
||||
|
||||
static <T> CompletableFuture<T> wrap(CompletionStage<T> future, Context context) {
|
||||
CompletableFuture<T> result = new CompletableFuture<>();
|
||||
future.whenComplete(
|
||||
(T value, Throwable throwable) -> {
|
||||
try (Scope ignored = context.makeCurrent()) {
|
||||
if (throwable != null) {
|
||||
result.completeExceptionally(throwable);
|
||||
} else {
|
||||
result.complete(value);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private static String getQuery(Statement<?> statement) {
|
||||
String query = null;
|
||||
if (statement instanceof SimpleStatement) {
|
||||
query = ((SimpleStatement) statement).getQuery();
|
||||
} else if (statement instanceof BoundStatement) {
|
||||
query = ((BoundStatement) statement).getPreparedStatement().getQuery();
|
||||
}
|
||||
|
||||
return query == null ? "" : query;
|
||||
}
|
||||
|
||||
private static ExecutionInfo getExecutionInfo(
|
||||
@Nullable AsyncResultSet asyncResultSet, @Nullable Throwable throwable) {
|
||||
if (asyncResultSet != null) {
|
||||
return asyncResultSet.getExecutionInfo();
|
||||
} else {
|
||||
return getExecutionInfo(throwable);
|
||||
}
|
||||
}
|
||||
|
||||
private static ExecutionInfo getExecutionInfo(@Nullable Throwable throwable) {
|
||||
if (throwable instanceof DriverException) {
|
||||
return ((DriverException) throwable).getExecutionInfo();
|
||||
} else if (throwable != null && throwable.getCause() instanceof DriverException) {
|
||||
// TODO (trask) find out if this is needed and if so add comment explaining
|
||||
return ((DriverException) throwable.getCause()).getExecutionInfo();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.cassandra.v4_4;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
|
||||
import io.opentelemetry.testing.cassandra.v4_4.AbstractCassandra44Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
public class CassandraTest extends AbstractCassandra44Test {
|
||||
|
||||
@RegisterExtension
|
||||
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
|
||||
|
||||
@Override
|
||||
protected InstrumentationExtension testing() {
|
||||
return testing;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CqlSession wrap(CqlSession session) {
|
||||
return CassandraTelemetry.create(testing.getOpenTelemetry()).wrap(session);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
plugins {
|
||||
id("otel.java-conventions")
|
||||
}
|
||||
|
||||
dependencies {
|
||||
api(project(":testing-common"))
|
||||
|
||||
implementation("com.datastax.oss:java-driver-core:4.4.0")
|
||||
implementation("io.projectreactor:reactor-core:3.5.3")
|
||||
|
||||
api(project(":instrumentation:cassandra:cassandra-4-common:testing"))
|
||||
}
|
|
@ -0,0 +1,139 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.testing.cassandra.v4_4;
|
||||
|
||||
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
|
||||
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CASSANDRA_CONSISTENCY_LEVEL;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CASSANDRA_COORDINATOR_DC;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CASSANDRA_COORDINATOR_ID;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CASSANDRA_IDEMPOTENCE;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CASSANDRA_PAGE_SIZE;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CASSANDRA_TABLE;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_NAME;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_OPERATION;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_STATEMENT;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_SYSTEM;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_SOCK_PEER_ADDR;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_SOCK_PEER_NAME;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_SOCK_PEER_PORT;
|
||||
import static org.junit.jupiter.api.Named.named;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.cassandra.v4.common.AbstractCassandraTest;
|
||||
import java.util.stream.Stream;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
public abstract class AbstractCassandra44Test extends AbstractCassandraTest {
|
||||
|
||||
@ParameterizedTest(name = "{index}: {0}")
|
||||
@MethodSource("provideReactiveParameters")
|
||||
void reactiveTest(Parameter parameter) {
|
||||
CqlSession session = getSession(parameter.keyspace);
|
||||
|
||||
testing()
|
||||
.runWithSpan(
|
||||
"parent",
|
||||
() ->
|
||||
Flux.from(session.executeReactive(parameter.statement))
|
||||
.doOnComplete(() -> testing().runWithSpan("child", () -> {}))
|
||||
.blockLast());
|
||||
|
||||
testing()
|
||||
.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
|
||||
span ->
|
||||
span.hasName(parameter.spanName)
|
||||
.hasKind(SpanKind.CLIENT)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributesSatisfyingExactly(
|
||||
equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"),
|
||||
equalTo(NET_SOCK_PEER_NAME, "localhost"),
|
||||
equalTo(NET_SOCK_PEER_PORT, cassandraPort),
|
||||
equalTo(DB_SYSTEM, "cassandra"),
|
||||
equalTo(DB_NAME, parameter.keyspace),
|
||||
equalTo(DB_STATEMENT, parameter.expectedStatement),
|
||||
equalTo(DB_OPERATION, parameter.operation),
|
||||
equalTo(DB_CASSANDRA_CONSISTENCY_LEVEL, "LOCAL_ONE"),
|
||||
equalTo(DB_CASSANDRA_COORDINATOR_DC, "datacenter1"),
|
||||
satisfies(
|
||||
DB_CASSANDRA_COORDINATOR_ID,
|
||||
val -> val.isInstanceOf(String.class)),
|
||||
satisfies(
|
||||
DB_CASSANDRA_IDEMPOTENCE,
|
||||
val -> val.isInstanceOf(Boolean.class)),
|
||||
equalTo(DB_CASSANDRA_PAGE_SIZE, 5000),
|
||||
equalTo(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT, 0),
|
||||
equalTo(DB_CASSANDRA_TABLE, parameter.table)),
|
||||
span ->
|
||||
span.hasName("child")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0))));
|
||||
|
||||
session.close();
|
||||
}
|
||||
|
||||
private static Stream<Arguments> provideReactiveParameters() {
|
||||
return Stream.of(
|
||||
Arguments.of(
|
||||
named(
|
||||
"Drop keyspace if exists",
|
||||
new Parameter(
|
||||
null,
|
||||
"DROP KEYSPACE IF EXISTS reactive_test",
|
||||
"DROP KEYSPACE IF EXISTS reactive_test",
|
||||
"DB Query",
|
||||
null,
|
||||
null))),
|
||||
Arguments.of(
|
||||
named(
|
||||
"Create keyspace with replication",
|
||||
new Parameter(
|
||||
null,
|
||||
"CREATE KEYSPACE reactive_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}",
|
||||
"CREATE KEYSPACE reactive_test WITH REPLICATION = {?:?, ?:?}",
|
||||
"DB Query",
|
||||
null,
|
||||
null))),
|
||||
Arguments.of(
|
||||
named(
|
||||
"Create table",
|
||||
new Parameter(
|
||||
"reactive_test",
|
||||
"CREATE TABLE reactive_test.users ( id UUID PRIMARY KEY, name text )",
|
||||
"CREATE TABLE reactive_test.users ( id UUID PRIMARY KEY, name text )",
|
||||
"reactive_test",
|
||||
null,
|
||||
null))),
|
||||
Arguments.of(
|
||||
named(
|
||||
"Insert data",
|
||||
new Parameter(
|
||||
"reactive_test",
|
||||
"INSERT INTO reactive_test.users (id, name) values (uuid(), 'alice')",
|
||||
"INSERT INTO reactive_test.users (id, name) values (uuid(), ?)",
|
||||
"INSERT reactive_test.users",
|
||||
"INSERT",
|
||||
"reactive_test.users"))),
|
||||
Arguments.of(
|
||||
named(
|
||||
"Select data",
|
||||
new Parameter(
|
||||
"reactive_test",
|
||||
"SELECT * FROM users where name = 'alice' ALLOW FILTERING",
|
||||
"SELECT * FROM users where name = ? ALLOW FILTERING",
|
||||
"SELECT reactive_test.users",
|
||||
"SELECT",
|
||||
"users"))));
|
||||
}
|
||||
}
|
|
@ -174,6 +174,10 @@ hideFromDependabot(":instrumentation:azure-core:azure-core-1.36:javaagent")
|
|||
hideFromDependabot(":instrumentation:azure-core:azure-core-1.36:library-instrumentation-shaded")
|
||||
hideFromDependabot(":instrumentation:cassandra:cassandra-3.0:javaagent")
|
||||
hideFromDependabot(":instrumentation:cassandra:cassandra-4.0:javaagent")
|
||||
hideFromDependabot(":instrumentation:cassandra:cassandra-4.4:javaagent")
|
||||
hideFromDependabot(":instrumentation:cassandra:cassandra-4.4:library")
|
||||
hideFromDependabot(":instrumentation:cassandra:cassandra-4.4:testing")
|
||||
hideFromDependabot(":instrumentation:cassandra:cassandra-4-common:testing")
|
||||
hideFromDependabot(":instrumentation:cdi-testing")
|
||||
hideFromDependabot(":instrumentation:graphql-java-12.0:javaagent")
|
||||
hideFromDependabot(":instrumentation:graphql-java-12.0:library")
|
||||
|
|
Loading…
Reference in New Issue