Update cassandra-4.0 to Instrumenter API (#3126)
* Update cassandra-4.0 to Instrumenter API * Update cassandra-3.0 instrumentation to be consistent
This commit is contained in:
parent
7d46c336b0
commit
d986981d70
|
@ -5,7 +5,7 @@
|
|||
|
||||
package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0;
|
||||
|
||||
import com.datastax.driver.core.ResultSet;
|
||||
import com.datastax.driver.core.ExecutionInfo;
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
|
||||
|
@ -16,15 +16,17 @@ import io.opentelemetry.instrumentation.api.instrumenter.db.DbSpanNameExtractor;
|
|||
public final class CassandraInstrumenters {
|
||||
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.javaagent.cassandra-3.0";
|
||||
|
||||
private static final Instrumenter<CassandraRequest, ResultSet> INSTRUMENTER;
|
||||
// could use RESPONSE "ResultSet" here, but using RESPONSE "ExecutionInfo" in cassandra-4.0
|
||||
// instrumentation (see comment over there for why), so also using here for consistency
|
||||
private static final Instrumenter<CassandraRequest, ExecutionInfo> INSTRUMENTER;
|
||||
|
||||
static {
|
||||
DbAttributesExtractor<CassandraRequest, ResultSet> attributesExtractor =
|
||||
DbAttributesExtractor<CassandraRequest, ExecutionInfo> attributesExtractor =
|
||||
new CassandraSqlAttributesExtractor();
|
||||
SpanNameExtractor<CassandraRequest> spanName = DbSpanNameExtractor.create(attributesExtractor);
|
||||
|
||||
INSTRUMENTER =
|
||||
Instrumenter.<CassandraRequest, ResultSet>newBuilder(
|
||||
Instrumenter.<CassandraRequest, ExecutionInfo>newBuilder(
|
||||
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanName)
|
||||
.addAttributesExtractor(attributesExtractor)
|
||||
.addAttributesExtractor(new CassandraNetAttributesExtractor())
|
||||
|
@ -32,7 +34,7 @@ public final class CassandraInstrumenters {
|
|||
.newInstrumenter(SpanKindExtractor.alwaysClient());
|
||||
}
|
||||
|
||||
public static Instrumenter<CassandraRequest, ResultSet> instrumenter() {
|
||||
public static Instrumenter<CassandraRequest, ExecutionInfo> instrumenter() {
|
||||
return INSTRUMENTER;
|
||||
}
|
||||
|
||||
|
|
|
@ -5,12 +5,13 @@
|
|||
|
||||
package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0;
|
||||
|
||||
import com.datastax.driver.core.ResultSet;
|
||||
import com.datastax.driver.core.ExecutionInfo;
|
||||
import io.opentelemetry.api.common.AttributesBuilder;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
||||
|
||||
final class CassandraKeyspaceExtractor extends AttributesExtractor<CassandraRequest, ResultSet> {
|
||||
final class CassandraKeyspaceExtractor
|
||||
extends AttributesExtractor<CassandraRequest, ExecutionInfo> {
|
||||
|
||||
@Override
|
||||
protected void onStart(AttributesBuilder attributes, CassandraRequest request) {
|
||||
|
@ -20,5 +21,5 @@ final class CassandraKeyspaceExtractor extends AttributesExtractor<CassandraRequ
|
|||
|
||||
@Override
|
||||
protected void onEnd(
|
||||
AttributesBuilder attributes, CassandraRequest request, ResultSet response) {}
|
||||
AttributesBuilder attributes, CassandraRequest request, ExecutionInfo response) {}
|
||||
}
|
||||
|
|
|
@ -5,13 +5,13 @@
|
|||
|
||||
package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0;
|
||||
|
||||
import com.datastax.driver.core.ResultSet;
|
||||
import com.datastax.driver.core.ExecutionInfo;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.net.InetSocketAddressNetAttributesExtractor;
|
||||
import java.net.InetSocketAddress;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
final class CassandraNetAttributesExtractor
|
||||
extends InetSocketAddressNetAttributesExtractor<CassandraRequest, ResultSet> {
|
||||
extends InetSocketAddressNetAttributesExtractor<CassandraRequest, ExecutionInfo> {
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
|
@ -21,9 +21,7 @@ final class CassandraNetAttributesExtractor
|
|||
|
||||
@Override
|
||||
public @Nullable InetSocketAddress getAddress(
|
||||
CassandraRequest request, @Nullable ResultSet response) {
|
||||
return response == null
|
||||
? null
|
||||
: response.getExecutionInfo().getQueriedHost().getSocketAddress();
|
||||
CassandraRequest request, @Nullable ExecutionInfo response) {
|
||||
return response == null ? null : response.getQueriedHost().getSocketAddress();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,14 +5,14 @@
|
|||
|
||||
package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0;
|
||||
|
||||
import com.datastax.driver.core.ResultSet;
|
||||
import com.datastax.driver.core.ExecutionInfo;
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.db.SqlAttributesExtractor;
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
final class CassandraSqlAttributesExtractor
|
||||
extends SqlAttributesExtractor<CassandraRequest, ResultSet> {
|
||||
extends SqlAttributesExtractor<CassandraRequest, ExecutionInfo> {
|
||||
|
||||
@Override
|
||||
protected String system(CassandraRequest request) {
|
||||
|
|
|
@ -57,7 +57,7 @@ public class TracingSession implements Session {
|
|||
instrumenter().end(context, request, null, t);
|
||||
throw t;
|
||||
}
|
||||
instrumenter().end(context, request, resultSet, null);
|
||||
instrumenter().end(context, request, resultSet.getExecutionInfo(), null);
|
||||
return resultSet;
|
||||
}
|
||||
|
||||
|
@ -72,7 +72,7 @@ public class TracingSession implements Session {
|
|||
instrumenter().end(context, request, null, t);
|
||||
throw t;
|
||||
}
|
||||
instrumenter().end(context, request, resultSet, null);
|
||||
instrumenter().end(context, request, resultSet.getExecutionInfo(), null);
|
||||
return resultSet;
|
||||
}
|
||||
|
||||
|
@ -87,7 +87,7 @@ public class TracingSession implements Session {
|
|||
instrumenter().end(context, request, null, t);
|
||||
throw t;
|
||||
}
|
||||
instrumenter().end(context, request, resultSet, null);
|
||||
instrumenter().end(context, request, resultSet.getExecutionInfo(), null);
|
||||
return resultSet;
|
||||
}
|
||||
|
||||
|
@ -103,7 +103,7 @@ public class TracingSession implements Session {
|
|||
instrumenter().end(context, request, null, t);
|
||||
throw t;
|
||||
}
|
||||
instrumenter().end(context, request, resultSet, null);
|
||||
instrumenter().end(context, request, resultSet.getExecutionInfo(), null);
|
||||
return resultSet;
|
||||
}
|
||||
|
||||
|
@ -215,7 +215,7 @@ public class TracingSession implements Session {
|
|||
new FutureCallback<ResultSet>() {
|
||||
@Override
|
||||
public void onSuccess(ResultSet resultSet) {
|
||||
instrumenter().end(context, request, resultSet, null);
|
||||
instrumenter().end(context, request, resultSet.getExecutionInfo(), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -12,5 +12,8 @@ muzzle {
|
|||
dependencies {
|
||||
library "com.datastax.oss:java-driver-core:4.0.0"
|
||||
|
||||
compileOnly "com.google.auto.value:auto-value-annotations"
|
||||
annotationProcessor "com.google.auto.value:auto-value"
|
||||
|
||||
latestDepTestLibrary "com.datastax.oss:java-driver-core:4.+"
|
||||
}
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlIdentifier;
|
||||
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
|
||||
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
|
||||
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
|
||||
import com.datastax.oss.driver.api.core.cql.Statement;
|
||||
import com.datastax.oss.driver.api.core.metadata.Node;
|
||||
import io.opentelemetry.api.common.AttributesBuilder;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
final class CassandraAttributesExtractor
|
||||
extends AttributesExtractor<CassandraRequest, ExecutionInfo> {
|
||||
|
||||
@Override
|
||||
protected void onStart(AttributesBuilder attributes, CassandraRequest request) {
|
||||
set(
|
||||
attributes,
|
||||
SemanticAttributes.DB_CASSANDRA_KEYSPACE,
|
||||
request.getSession().getKeyspace().map(CqlIdentifier::toString).orElse(null));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onEnd(
|
||||
AttributesBuilder attributes, CassandraRequest request, @Nullable ExecutionInfo response) {
|
||||
if (response == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
Node coordinator = response.getCoordinator();
|
||||
if (coordinator != null) {
|
||||
if (coordinator.getDatacenter() != null) {
|
||||
set(
|
||||
attributes,
|
||||
SemanticAttributes.DB_CASSANDRA_COORDINATOR_DC,
|
||||
coordinator.getDatacenter());
|
||||
}
|
||||
if (coordinator.getHostId() != null) {
|
||||
set(
|
||||
attributes,
|
||||
SemanticAttributes.DB_CASSANDRA_COORDINATOR_ID,
|
||||
coordinator.getHostId().toString());
|
||||
}
|
||||
}
|
||||
set(
|
||||
attributes,
|
||||
SemanticAttributes.DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT,
|
||||
(long) response.getSpeculativeExecutionCount());
|
||||
|
||||
Statement<?> statement = response.getStatement();
|
||||
DriverExecutionProfile config =
|
||||
request.getSession().getContext().getConfig().getDefaultProfile();
|
||||
if (statement.getConsistencyLevel() != null) {
|
||||
set(
|
||||
attributes,
|
||||
SemanticAttributes.DB_CASSANDRA_CONSISTENCY_LEVEL,
|
||||
statement.getConsistencyLevel().name());
|
||||
} else {
|
||||
set(
|
||||
attributes,
|
||||
SemanticAttributes.DB_CASSANDRA_CONSISTENCY_LEVEL,
|
||||
config.getString(DefaultDriverOption.REQUEST_CONSISTENCY));
|
||||
}
|
||||
if (statement.getPageSize() > 0) {
|
||||
set(attributes, SemanticAttributes.DB_CASSANDRA_PAGE_SIZE, (long) statement.getPageSize());
|
||||
} else {
|
||||
int pageSize = config.getInt(DefaultDriverOption.REQUEST_PAGE_SIZE);
|
||||
if (pageSize > 0) {
|
||||
set(attributes, SemanticAttributes.DB_CASSANDRA_PAGE_SIZE, (long) pageSize);
|
||||
}
|
||||
}
|
||||
if (statement.isIdempotent() != null) {
|
||||
set(attributes, SemanticAttributes.DB_CASSANDRA_IDEMPOTENCE, statement.isIdempotent());
|
||||
} else {
|
||||
set(
|
||||
attributes,
|
||||
SemanticAttributes.DB_CASSANDRA_IDEMPOTENCE,
|
||||
config.getBoolean(DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,181 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlIdentifier;
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.DriverException;
|
||||
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
|
||||
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
|
||||
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
|
||||
import com.datastax.oss.driver.api.core.cql.Statement;
|
||||
import com.datastax.oss.driver.api.core.metadata.Node;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanBuilder;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.api.db.SqlStatementInfo;
|
||||
import io.opentelemetry.instrumentation.api.db.SqlStatementSanitizer;
|
||||
import io.opentelemetry.instrumentation.api.tracer.DatabaseClientTracer;
|
||||
import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes;
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DbSystemValues;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
public class CassandraDatabaseClientTracer
|
||||
extends DatabaseClientTracer<CqlSession, String, SqlStatementInfo> {
|
||||
|
||||
private static final CassandraDatabaseClientTracer TRACER = new CassandraDatabaseClientTracer();
|
||||
|
||||
private CassandraDatabaseClientTracer() {
|
||||
super(NetPeerAttributes.INSTANCE);
|
||||
}
|
||||
|
||||
public static CassandraDatabaseClientTracer tracer() {
|
||||
return TRACER;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getInstrumentationName() {
|
||||
return "io.opentelemetry.javaagent.cassandra-4.0";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SqlStatementInfo sanitizeStatement(String statement) {
|
||||
return SqlStatementSanitizer.sanitize(statement).mapTable(this::stripKeyspace);
|
||||
}
|
||||
|
||||
// account for splitting out the keyspace, <keyspace>.<table>
|
||||
@Nullable
|
||||
private String stripKeyspace(String table) {
|
||||
int i;
|
||||
if (table == null || (i = table.indexOf('.')) == -1) {
|
||||
return table;
|
||||
}
|
||||
return table.substring(i + 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String spanName(
|
||||
CqlSession connection, String statement, SqlStatementInfo sanitizedStatement) {
|
||||
return conventionSpanName(
|
||||
dbName(connection),
|
||||
sanitizedStatement.getOperation(),
|
||||
sanitizedStatement.getTable(),
|
||||
sanitizedStatement.getFullStatement());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String dbSystem(CqlSession session) {
|
||||
return DbSystemValues.CASSANDRA;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String dbName(CqlSession session) {
|
||||
return session.getKeyspace().map(CqlIdentifier::toString).orElse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InetSocketAddress peerAddress(CqlSession cqlSession) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onStatement(
|
||||
SpanBuilder span,
|
||||
CqlSession connection,
|
||||
String statement,
|
||||
SqlStatementInfo sanitizedStatement) {
|
||||
super.onStatement(span, connection, statement, sanitizedStatement);
|
||||
String table = sanitizedStatement.getTable();
|
||||
if (table != null) {
|
||||
span.setAttribute(SemanticAttributes.DB_CASSANDRA_TABLE, table);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String dbStatement(
|
||||
CqlSession connection, String statement, SqlStatementInfo sanitizedStatement) {
|
||||
return sanitizedStatement.getFullStatement();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String dbOperation(
|
||||
CqlSession connection, String statement, SqlStatementInfo sanitizedStatement) {
|
||||
return sanitizedStatement.getOperation();
|
||||
}
|
||||
|
||||
public void onResponse(Context context, CqlSession cqlSession, ExecutionInfo executionInfo) {
|
||||
Span span = Span.fromContext(context);
|
||||
Node coordinator = executionInfo.getCoordinator();
|
||||
if (coordinator != null) {
|
||||
SocketAddress socketAddress = coordinator.getEndPoint().resolve();
|
||||
if (socketAddress instanceof InetSocketAddress) {
|
||||
NetPeerAttributes.INSTANCE.setNetPeer(span, ((InetSocketAddress) socketAddress));
|
||||
}
|
||||
if (coordinator.getDatacenter() != null) {
|
||||
span.setAttribute(
|
||||
SemanticAttributes.DB_CASSANDRA_COORDINATOR_DC, coordinator.getDatacenter());
|
||||
}
|
||||
if (coordinator.getHostId() != null) {
|
||||
span.setAttribute(
|
||||
SemanticAttributes.DB_CASSANDRA_COORDINATOR_ID, coordinator.getHostId().toString());
|
||||
}
|
||||
}
|
||||
span.setAttribute(
|
||||
SemanticAttributes.DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT,
|
||||
executionInfo.getSpeculativeExecutionCount());
|
||||
|
||||
Statement<?> statement = executionInfo.getStatement();
|
||||
DriverExecutionProfile config = cqlSession.getContext().getConfig().getDefaultProfile();
|
||||
if (statement.getConsistencyLevel() != null) {
|
||||
span.setAttribute(
|
||||
SemanticAttributes.DB_CASSANDRA_CONSISTENCY_LEVEL,
|
||||
statement.getConsistencyLevel().name());
|
||||
} else {
|
||||
span.setAttribute(
|
||||
SemanticAttributes.DB_CASSANDRA_CONSISTENCY_LEVEL,
|
||||
config.getString(DefaultDriverOption.REQUEST_CONSISTENCY));
|
||||
}
|
||||
if (statement.getPageSize() > 0) {
|
||||
span.setAttribute(SemanticAttributes.DB_CASSANDRA_PAGE_SIZE, statement.getPageSize());
|
||||
} else {
|
||||
int pageSize = config.getInt(DefaultDriverOption.REQUEST_PAGE_SIZE);
|
||||
if (pageSize > 0) {
|
||||
span.setAttribute(SemanticAttributes.DB_CASSANDRA_PAGE_SIZE, pageSize);
|
||||
}
|
||||
}
|
||||
if (statement.isIdempotent() != null) {
|
||||
span.setAttribute(SemanticAttributes.DB_CASSANDRA_IDEMPOTENCE, statement.isIdempotent());
|
||||
} else {
|
||||
span.setAttribute(
|
||||
SemanticAttributes.DB_CASSANDRA_IDEMPOTENCE,
|
||||
config.getBoolean(DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE));
|
||||
}
|
||||
}
|
||||
|
||||
/** Use this method instead of {@link #endExceptionally(Context, Throwable)}. */
|
||||
public void endExceptionally(Context context, final Throwable throwable, CqlSession cqlSession) {
|
||||
DriverException e = null;
|
||||
if (throwable instanceof DriverException) {
|
||||
e = (DriverException) throwable;
|
||||
} else if (throwable.getCause() instanceof DriverException) {
|
||||
e = (DriverException) throwable.getCause();
|
||||
}
|
||||
if (e != null && e.getExecutionInfo() != null) {
|
||||
onResponse(context, cqlSession, e.getExecutionInfo());
|
||||
}
|
||||
super.endExceptionally(context, throwable);
|
||||
}
|
||||
|
||||
/** Use {@link #endExceptionally(Context, Throwable, CqlSession)}. */
|
||||
@Override
|
||||
public void endExceptionally(Context context, final Throwable throwable) {
|
||||
throw new IllegalStateException(
|
||||
"use the endExceptionally method with a CqlSession in CassandraDatabaseClientTracer");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
|
||||
|
||||
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.db.DbAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.db.DbSpanNameExtractor;
|
||||
|
||||
public final class CassandraInstrumenters {
|
||||
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.javaagent.cassandra-4.0";
|
||||
|
||||
// using ExecutionInfo because we can get that from ResultSet, AsyncResultSet and DriverException
|
||||
private static final Instrumenter<CassandraRequest, ExecutionInfo> INSTRUMENTER;
|
||||
|
||||
static {
|
||||
DbAttributesExtractor<CassandraRequest, ExecutionInfo> attributesExtractor =
|
||||
new CassandraSqlAttributesExtractor();
|
||||
SpanNameExtractor<CassandraRequest> spanName = DbSpanNameExtractor.create(attributesExtractor);
|
||||
|
||||
INSTRUMENTER =
|
||||
Instrumenter.<CassandraRequest, ExecutionInfo>newBuilder(
|
||||
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanName)
|
||||
.addAttributesExtractor(attributesExtractor)
|
||||
.addAttributesExtractor(new CassandraNetAttributesExtractor())
|
||||
.addAttributesExtractor(new CassandraAttributesExtractor())
|
||||
.newInstrumenter(SpanKindExtractor.alwaysClient());
|
||||
}
|
||||
|
||||
public static Instrumenter<CassandraRequest, ExecutionInfo> instrumenter() {
|
||||
return INSTRUMENTER;
|
||||
}
|
||||
|
||||
private CassandraInstrumenters() {}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
|
||||
|
||||
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
|
||||
import com.datastax.oss.driver.api.core.metadata.Node;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.net.InetSocketAddressNetAttributesExtractor;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
final class CassandraNetAttributesExtractor
|
||||
extends InetSocketAddressNetAttributesExtractor<CassandraRequest, ExecutionInfo> {
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public String transport(CassandraRequest request) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable InetSocketAddress getAddress(
|
||||
CassandraRequest request, @Nullable ExecutionInfo response) {
|
||||
if (response == null) {
|
||||
return null;
|
||||
}
|
||||
Node coordinator = response.getCoordinator();
|
||||
if (coordinator == null) {
|
||||
return null;
|
||||
}
|
||||
// resolve() returns an existing InetSocketAddress, it does not do a dns resolve,
|
||||
// at least in the only current EndPoint implementation (DefaultEndPoint)
|
||||
SocketAddress address = coordinator.getEndPoint().resolve();
|
||||
return address instanceof InetSocketAddress ? (InetSocketAddress) address : null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
|
||||
|
||||
import com.datastax.oss.driver.api.core.session.Session;
|
||||
import com.google.auto.value.AutoValue;
|
||||
|
||||
@AutoValue
|
||||
public abstract class CassandraRequest {
|
||||
|
||||
public abstract Session getSession();
|
||||
|
||||
public abstract String getStatement();
|
||||
|
||||
public static CassandraRequest create(Session session, String statement) {
|
||||
return new AutoValue_CassandraRequest(session, statement);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlIdentifier;
|
||||
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.db.SqlAttributesExtractor;
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
final class CassandraSqlAttributesExtractor
|
||||
extends SqlAttributesExtractor<CassandraRequest, ExecutionInfo> {
|
||||
|
||||
@Override
|
||||
protected String system(CassandraRequest request) {
|
||||
return SemanticAttributes.DbSystemValues.CASSANDRA;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
protected String user(CassandraRequest request) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
protected String name(CassandraRequest request) {
|
||||
return request.getSession().getKeyspace().map(CqlIdentifier::toString).orElse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
protected String connectionString(CassandraRequest request) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AttributeKey<String> dbTableAttribute() {
|
||||
return SemanticAttributes.DB_CASSANDRA_TABLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
protected String rawStatement(CassandraRequest request) {
|
||||
return request.getStatement();
|
||||
}
|
||||
}
|
|
@ -5,13 +5,15 @@
|
|||
|
||||
package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
|
||||
|
||||
import static io.opentelemetry.javaagent.instrumentation.cassandra.v4_0.CassandraDatabaseClientTracer.tracer;
|
||||
import static io.opentelemetry.javaagent.instrumentation.cassandra.v4_0.CassandraInstrumenters.instrumenter;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlIdentifier;
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.DriverException;
|
||||
import com.datastax.oss.driver.api.core.context.DriverContext;
|
||||
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
|
||||
import com.datastax.oss.driver.api.core.cql.BoundStatement;
|
||||
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
|
||||
import com.datastax.oss.driver.api.core.cql.PrepareRequest;
|
||||
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
|
||||
import com.datastax.oss.driver.api.core.cql.ResultSet;
|
||||
|
@ -21,12 +23,11 @@ import com.datastax.oss.driver.api.core.metadata.Metadata;
|
|||
import com.datastax.oss.driver.api.core.metrics.Metrics;
|
||||
import com.datastax.oss.driver.api.core.session.Request;
|
||||
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
|
||||
import edu.umd.cs.findbugs.annotations.NonNull;
|
||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
public class TracingCqlSession implements CqlSession {
|
||||
private final CqlSession session;
|
||||
|
@ -36,49 +37,41 @@ public class TracingCqlSession implements CqlSession {
|
|||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public PreparedStatement prepare(@NonNull SimpleStatement statement) {
|
||||
public PreparedStatement prepare(SimpleStatement statement) {
|
||||
return session.prepare(statement);
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public PreparedStatement prepare(@NonNull String query) {
|
||||
public PreparedStatement prepare(String query) {
|
||||
return session.prepare(query);
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public PreparedStatement prepare(@NonNull PrepareRequest request) {
|
||||
public PreparedStatement prepare(PrepareRequest request) {
|
||||
return session.prepare(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public CompletionStage<PreparedStatement> prepareAsync(@NonNull SimpleStatement statement) {
|
||||
public CompletionStage<PreparedStatement> prepareAsync(SimpleStatement statement) {
|
||||
return session.prepareAsync(statement);
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public CompletionStage<PreparedStatement> prepareAsync(@NonNull String query) {
|
||||
public CompletionStage<PreparedStatement> prepareAsync(String query) {
|
||||
return session.prepareAsync(query);
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public CompletionStage<PreparedStatement> prepareAsync(PrepareRequest request) {
|
||||
return session.prepareAsync(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public String getName() {
|
||||
return session.getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public Metadata getMetadata() {
|
||||
return session.getMetadata();
|
||||
}
|
||||
|
@ -89,25 +82,21 @@ public class TracingCqlSession implements CqlSession {
|
|||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public CompletionStage<Metadata> setSchemaMetadataEnabled(@Nullable Boolean newValue) {
|
||||
return session.setSchemaMetadataEnabled(newValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public CompletionStage<Metadata> refreshSchemaAsync() {
|
||||
return session.refreshSchemaAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public Metadata refreshSchema() {
|
||||
return session.refreshSchema();
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public CompletionStage<Boolean> checkSchemaAgreementAsync() {
|
||||
return session.checkSchemaAgreementAsync();
|
||||
}
|
||||
|
@ -118,25 +107,21 @@ public class TracingCqlSession implements CqlSession {
|
|||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public DriverContext getContext() {
|
||||
return session.getContext();
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public Optional<CqlIdentifier> getKeyspace() {
|
||||
return session.getKeyspace();
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public Optional<Metrics> getMetrics() {
|
||||
return session.getMetrics();
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public CompletionStage<Void> closeFuture() {
|
||||
return session.closeFuture();
|
||||
}
|
||||
|
@ -147,13 +132,11 @@ public class TracingCqlSession implements CqlSession {
|
|||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public CompletionStage<Void> closeAsync() {
|
||||
return session.closeAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public CompletionStage<Void> forceCloseAsync() {
|
||||
return session.forceCloseAsync();
|
||||
}
|
||||
|
@ -166,83 +149,66 @@ public class TracingCqlSession implements CqlSession {
|
|||
@Override
|
||||
@Nullable
|
||||
public <REQUEST extends Request, RESULT> RESULT execute(
|
||||
@NonNull REQUEST request, @NonNull GenericType<RESULT> resultType) {
|
||||
REQUEST request, GenericType<RESULT> resultType) {
|
||||
return session.execute(request, resultType);
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public ResultSet execute(@NonNull String query) {
|
||||
|
||||
Context context = tracer().startSpan(Context.current(), session, query);
|
||||
public ResultSet execute(String query) {
|
||||
CassandraRequest request = CassandraRequest.create(session, query);
|
||||
Context context = instrumenter().start(Context.current(), request);
|
||||
ResultSet resultSet;
|
||||
try (Scope ignored = context.makeCurrent()) {
|
||||
try {
|
||||
ResultSet resultSet = session.execute(query);
|
||||
tracer().onResponse(context, session, resultSet.getExecutionInfo());
|
||||
return resultSet;
|
||||
} catch (RuntimeException e) {
|
||||
tracer().endExceptionally(context, e, session);
|
||||
throw e;
|
||||
} finally {
|
||||
tracer().end(context);
|
||||
}
|
||||
resultSet = session.execute(query);
|
||||
} catch (RuntimeException e) {
|
||||
instrumenter().end(context, request, getExecutionInfo(e), e);
|
||||
throw e;
|
||||
}
|
||||
instrumenter().end(context, request, resultSet.getExecutionInfo(), null);
|
||||
return resultSet;
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public ResultSet execute(@NonNull Statement<?> statement) {
|
||||
public ResultSet execute(Statement<?> statement) {
|
||||
String query = getQuery(statement);
|
||||
|
||||
Context context = tracer().startSpan(Context.current(), session, query);
|
||||
CassandraRequest request = CassandraRequest.create(session, query);
|
||||
Context context = instrumenter().start(Context.current(), request);
|
||||
ResultSet resultSet;
|
||||
try (Scope ignored = context.makeCurrent()) {
|
||||
try {
|
||||
ResultSet resultSet = session.execute(statement);
|
||||
tracer().onResponse(context, session, resultSet.getExecutionInfo());
|
||||
return resultSet;
|
||||
} catch (RuntimeException e) {
|
||||
tracer().endExceptionally(context, e, session);
|
||||
throw e;
|
||||
} finally {
|
||||
tracer().end(context);
|
||||
}
|
||||
resultSet = session.execute(statement);
|
||||
} catch (RuntimeException e) {
|
||||
instrumenter().end(context, request, getExecutionInfo(e), e);
|
||||
throw e;
|
||||
}
|
||||
instrumenter().end(context, request, resultSet.getExecutionInfo(), null);
|
||||
return resultSet;
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public CompletionStage<AsyncResultSet> executeAsync(@NonNull Statement<?> statement) {
|
||||
public CompletionStage<AsyncResultSet> executeAsync(Statement<?> statement) {
|
||||
String query = getQuery(statement);
|
||||
|
||||
Context context = tracer().startSpan(Context.current(), session, query);
|
||||
CassandraRequest request = CassandraRequest.create(session, query);
|
||||
Context context = instrumenter().start(Context.current(), request);
|
||||
try (Scope ignored = context.makeCurrent()) {
|
||||
CompletionStage<AsyncResultSet> stage = session.executeAsync(statement);
|
||||
return stage.whenComplete(
|
||||
(asyncResultSet, throwable) -> {
|
||||
if (throwable != null) {
|
||||
tracer().endExceptionally(context, throwable, session);
|
||||
} else {
|
||||
tracer().onResponse(context, session, asyncResultSet.getExecutionInfo());
|
||||
tracer().end(context);
|
||||
}
|
||||
instrumenter()
|
||||
.end(context, request, getExecutionInfo(asyncResultSet, throwable), throwable);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@NonNull
|
||||
public CompletionStage<AsyncResultSet> executeAsync(@NonNull String query) {
|
||||
Context context = tracer().startSpan(Context.current(), session, query);
|
||||
public CompletionStage<AsyncResultSet> executeAsync(String query) {
|
||||
CassandraRequest request = CassandraRequest.create(session, query);
|
||||
Context context = instrumenter().start(Context.current(), request);
|
||||
try (Scope ignored = context.makeCurrent()) {
|
||||
CompletionStage<AsyncResultSet> stage = session.executeAsync(query);
|
||||
return stage.whenComplete(
|
||||
(asyncResultSet, throwable) -> {
|
||||
if (throwable != null) {
|
||||
tracer().endExceptionally(context, throwable, session);
|
||||
} else {
|
||||
tracer().onResponse(context, session, asyncResultSet.getExecutionInfo());
|
||||
tracer().end(context);
|
||||
}
|
||||
instrumenter()
|
||||
.end(context, request, getExecutionInfo(asyncResultSet, throwable), throwable);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -257,4 +223,24 @@ public class TracingCqlSession implements CqlSession {
|
|||
|
||||
return query == null ? "" : query;
|
||||
}
|
||||
|
||||
private static ExecutionInfo getExecutionInfo(
|
||||
@Nullable AsyncResultSet asyncResultSet, @Nullable Throwable throwable) {
|
||||
if (asyncResultSet != null) {
|
||||
return asyncResultSet.getExecutionInfo();
|
||||
} else {
|
||||
return getExecutionInfo(throwable);
|
||||
}
|
||||
}
|
||||
|
||||
private static ExecutionInfo getExecutionInfo(@Nullable Throwable throwable) {
|
||||
if (throwable instanceof DriverException) {
|
||||
return ((DriverException) throwable).getExecutionInfo();
|
||||
} else if (throwable != null && throwable.getCause() instanceof DriverException) {
|
||||
// TODO (trask) find out if this is needed and if so add comment explaining
|
||||
return ((DriverException) throwable.getCause()).getExecutionInfo();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,10 +62,10 @@ class CassandraClientTest extends AgentInstrumentationSpecification {
|
|||
|
||||
where:
|
||||
keyspace | statement | expectedStatement | spanName | operation | table
|
||||
null | "DROP KEYSPACE IF EXISTS sync_test" | "DROP KEYSPACE IF EXISTS sync_test" | expectedStatement | null | null
|
||||
null | "CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | "CREATE KEYSPACE sync_test WITH REPLICATION = {?:?, ?:?}" | expectedStatement | null | null
|
||||
null | "DROP KEYSPACE IF EXISTS sync_test" | "DROP KEYSPACE IF EXISTS sync_test" | "DB Query" | null | null
|
||||
null | "CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | "CREATE KEYSPACE sync_test WITH REPLICATION = {?:?, ?:?}" | "DB Query" | null | null
|
||||
"sync_test" | "CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | null | null
|
||||
"sync_test" | "INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "INSERT INTO sync_test.users (id, name) values (uuid(), ?)" | "INSERT sync_test.users" | "INSERT" | "users"
|
||||
"sync_test" | "INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "INSERT INTO sync_test.users (id, name) values (uuid(), ?)" | "INSERT sync_test.users" | "INSERT" | "sync_test.users"
|
||||
"sync_test" | "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "SELECT * FROM users where name = ? ALLOW FILTERING" | "SELECT sync_test.users" | "SELECT" | "users"
|
||||
}
|
||||
|
||||
|
@ -90,10 +90,10 @@ class CassandraClientTest extends AgentInstrumentationSpecification {
|
|||
|
||||
where:
|
||||
keyspace | statement | expectedStatement | spanName | operation | table
|
||||
null | "DROP KEYSPACE IF EXISTS async_test" | "DROP KEYSPACE IF EXISTS async_test" | expectedStatement | null | null
|
||||
null | "CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | "CREATE KEYSPACE async_test WITH REPLICATION = {?:?, ?:?}" | expectedStatement | null | null
|
||||
null | "DROP KEYSPACE IF EXISTS async_test" | "DROP KEYSPACE IF EXISTS async_test" | "DB Query" | null | null
|
||||
null | "CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | "CREATE KEYSPACE async_test WITH REPLICATION = {?:?, ?:?}" | "DB Query" | null | null
|
||||
"async_test" | "CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | null | null
|
||||
"async_test" | "INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "INSERT INTO async_test.users (id, name) values (uuid(), ?)" | "INSERT async_test.users" | "INSERT" | "users"
|
||||
"async_test" | "INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "INSERT INTO async_test.users (id, name) values (uuid(), ?)" | "INSERT async_test.users" | "INSERT" | "async_test.users"
|
||||
"async_test" | "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "SELECT * FROM users where name = ? ALLOW FILTERING" | "SELECT async_test.users" | "SELECT" | "users"
|
||||
}
|
||||
|
||||
|
@ -120,6 +120,7 @@ class CassandraClientTest extends AgentInstrumentationSpecification {
|
|||
"$SemanticAttributes.DB_CASSANDRA_IDEMPOTENCE.key" Boolean
|
||||
"$SemanticAttributes.DB_CASSANDRA_PAGE_SIZE.key" 5000
|
||||
"$SemanticAttributes.DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT.key" 0
|
||||
"$SemanticAttributes.DB_CASSANDRA_KEYSPACE.key" keyspace
|
||||
// the SqlStatementSanitizer can't handle CREATE statements yet
|
||||
"$SemanticAttributes.DB_CASSANDRA_TABLE.key" table
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue