feat(cassandra4): more attributes (#1314)

* feat(cassandra4): more attributes

fixes #1298

Co-authored-by: Anuraag Agrawal <anuraaga@gmail.com>

* Update instrumentation/cassandra/cassandra-4.0/javaagent/src/test/groovy/CassandraClientTest.groovy

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>

* add error handling

* default idempotence

* embed session into onResponse to prevent need to overwrite default tags

* change javadoc according to spotless

* use shared table name extractor

* fix tests to account for SqlStatementSanitizer

Co-authored-by: Anuraag Agrawal <anuraaga@gmail.com>
Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
This commit is contained in:
Frank Spitulski 2021-02-02 01:12:55 -08:00 committed by GitHub
parent 7a3f345c18
commit 9ded7188b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 92 additions and 10 deletions

View File

@ -7,18 +7,24 @@ package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DriverException;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Node;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.DatabaseClientTracer;
import io.opentelemetry.instrumentation.api.tracer.utils.NetPeerUtils;
import io.opentelemetry.javaagent.instrumentation.api.db.SqlStatementSanitizer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DbSystemValues;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
public class CassandraDatabaseClientTracer extends DatabaseClientTracer<CqlSession, String> {
private static final CassandraDatabaseClientTracer TRACER = new CassandraDatabaseClientTracer();
public static CassandraDatabaseClientTracer tracer() {
@ -50,14 +56,82 @@ public class CassandraDatabaseClientTracer extends DatabaseClientTracer<CqlSessi
return null;
}
public void onResponse(Context context, ExecutionInfo executionInfo) {
public void onResponse(Context context, CqlSession cqlSession, ExecutionInfo executionInfo) {
Span span = Span.fromContext(context);
Node coordinator = executionInfo.getCoordinator();
if (coordinator != null) {
SocketAddress socketAddress = coordinator.getEndPoint().resolve();
if (socketAddress instanceof InetSocketAddress) {
Span span = Span.fromContext(context);
NetPeerUtils.INSTANCE.setNetPeer(span, ((InetSocketAddress) socketAddress));
}
if (coordinator.getDatacenter() != null) {
span.setAttribute(
SemanticAttributes.DB_CASSANDRA_COORDINATOR_DC, coordinator.getDatacenter());
}
if (coordinator.getHostId() != null) {
span.setAttribute(
SemanticAttributes.DB_CASSANDRA_COORDINATOR_ID, coordinator.getHostId().toString());
}
}
span.setAttribute(
SemanticAttributes.DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT,
executionInfo.getSpeculativeExecutionCount());
Statement<?> statement = executionInfo.getStatement();
DriverExecutionProfile config = cqlSession.getContext().getConfig().getDefaultProfile();
if (statement.getConsistencyLevel() != null) {
span.setAttribute(
SemanticAttributes.DB_CASSANDRA_CONSISTENCY_LEVEL,
statement.getConsistencyLevel().name());
} else {
span.setAttribute(
SemanticAttributes.DB_CASSANDRA_CONSISTENCY_LEVEL,
config.getString(DefaultDriverOption.REQUEST_CONSISTENCY));
}
if (statement.getPageSize() > 0) {
span.setAttribute(SemanticAttributes.DB_CASSANDRA_PAGE_SIZE, statement.getPageSize());
} else {
int pageSize = config.getInt(DefaultDriverOption.REQUEST_PAGE_SIZE);
if (pageSize > 0) {
span.setAttribute(SemanticAttributes.DB_CASSANDRA_PAGE_SIZE, pageSize);
}
}
if (statement.isIdempotent() != null) {
span.setAttribute(SemanticAttributes.DB_CASSANDRA_IDEMPOTENCE, statement.isIdempotent());
} else {
span.setAttribute(
SemanticAttributes.DB_CASSANDRA_IDEMPOTENCE,
config.getBoolean(DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE));
}
}
/** Use this method instead of {@link #endExceptionally(Context, Throwable)}. */
public void endExceptionally(Context context, final Throwable throwable, CqlSession cqlSession) {
DriverException e = null;
if (throwable instanceof DriverException) {
e = (DriverException) throwable;
} else if (throwable.getCause() instanceof DriverException) {
e = (DriverException) throwable.getCause();
}
if (e != null && e.getExecutionInfo() != null) {
onResponse(context, cqlSession, e.getExecutionInfo());
}
super.endExceptionally(context, throwable);
}
/** Use {@link #endExceptionally(Context, Throwable, CqlSession)}. */
@Override
public void endExceptionally(Context context, final Throwable throwable) {
throw new IllegalStateException(
"use the endExceptionally method with a CqlSession in CassandraDatabaseClientTracer");
}
@Override
protected void onStatement(Span span, String statement) {
super.onStatement(span, statement);
String table = SqlStatementSanitizer.sanitize(statement).getTable();
if (table != null) {
span.setAttribute(SemanticAttributes.DB_CASSANDRA_TABLE, table);
}
}
}

View File

@ -178,10 +178,10 @@ public class TracingCqlSession implements CqlSession {
try (Scope ignored = context.makeCurrent()) {
try {
ResultSet resultSet = session.execute(query);
tracer().onResponse(context, resultSet.getExecutionInfo());
tracer().onResponse(context, session, resultSet.getExecutionInfo());
return resultSet;
} catch (RuntimeException e) {
tracer().endExceptionally(context, e);
tracer().endExceptionally(context, e, session);
throw e;
} finally {
tracer().end(context);
@ -198,10 +198,10 @@ public class TracingCqlSession implements CqlSession {
try (Scope ignored = context.makeCurrent()) {
try {
ResultSet resultSet = session.execute(statement);
tracer().onResponse(context, resultSet.getExecutionInfo());
tracer().onResponse(context, session, resultSet.getExecutionInfo());
return resultSet;
} catch (RuntimeException e) {
tracer().endExceptionally(context, e);
tracer().endExceptionally(context, e, session);
throw e;
} finally {
tracer().end(context);
@ -220,9 +220,9 @@ public class TracingCqlSession implements CqlSession {
return stage.whenComplete(
(asyncResultSet, throwable) -> {
if (throwable != null) {
tracer().endExceptionally(context, throwable);
tracer().endExceptionally(context, throwable, session);
} else {
tracer().onResponse(context, asyncResultSet.getExecutionInfo());
tracer().onResponse(context, session, asyncResultSet.getExecutionInfo());
tracer().end(context);
}
});
@ -238,9 +238,9 @@ public class TracingCqlSession implements CqlSession {
return stage.whenComplete(
(asyncResultSet, throwable) -> {
if (throwable != null) {
tracer().endExceptionally(context, throwable);
tracer().endExceptionally(context, throwable, session);
} else {
tracer().onResponse(context, asyncResultSet.getExecutionInfo());
tracer().onResponse(context, session, asyncResultSet.getExecutionInfo());
tracer().end(context);
}
});

View File

@ -113,6 +113,14 @@ class CassandraClientTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.DB_SYSTEM.key" "cassandra"
"$SemanticAttributes.DB_NAME.key" keyspace
"$SemanticAttributes.DB_STATEMENT.key" statement
"$SemanticAttributes.DB_CASSANDRA_CONSISTENCY_LEVEL.key" "LOCAL_ONE"
"$SemanticAttributes.DB_CASSANDRA_COORDINATOR_DC.key" "datacenter1"
"$SemanticAttributes.DB_CASSANDRA_COORDINATOR_ID.key" String
"$SemanticAttributes.DB_CASSANDRA_IDEMPOTENCE.key" Boolean
"$SemanticAttributes.DB_CASSANDRA_PAGE_SIZE.key" 5000
"$SemanticAttributes.DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT.key" 0
// the SqlStatementSanitizer can't handle CREATE statements or splitting out the keyspace
"$SemanticAttributes.DB_CASSANDRA_TABLE.key" { table -> (statement.contains("CREATE") || keyspace == null ? true : table.endsWith("users")) }
}
}
}