Update cassandra-3.0 to Instrumenter API (#3066)

This commit is contained in:
Trask Stalnaker 2021-05-25 21:45:03 -07:00 committed by GitHub
parent fd132d463d
commit a6c3d80b50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 243 additions and 148 deletions

View File

@ -43,7 +43,8 @@ public final class DbSpanNameExtractor<REQUEST> implements SpanNameExtractor<REQ
if (dbName != null || table != null) { if (dbName != null || table != null) {
name.append(' '); name.append(' ');
} }
if (dbName != null) { // skip db name if table already has a db name prefixed to it
if (dbName != null && (table == null || table.indexOf('.') == -1)) {
name.append(dbName); name.append(dbName);
if (table != null) { if (table != null) {
name.append('.'); name.append('.');

View File

@ -37,6 +37,24 @@ class DbSpanNameExtractorTest {
assertEquals("SELECT database.table", spanName); assertEquals("SELECT database.table", spanName);
} }
@Test
void shouldSkipDbNameIfTableAlreadyHasDbNamePrefix() {
// given
DbRequest dbRequest = new DbRequest();
// cannot stub dbOperation() and dbTable() because they're final
given(sqlAttributesExtractor.rawStatement(dbRequest)).willReturn("SELECT * FROM another.table");
given(sqlAttributesExtractor.name(dbRequest)).willReturn("database");
SpanNameExtractor<DbRequest> underTest = DbSpanNameExtractor.create(sqlAttributesExtractor);
// when
String spanName = underTest.extract(dbRequest);
// then
assertEquals("SELECT another.table", spanName);
}
@Test @Test
void shouldExtractOperationAndTable() { void shouldExtractOperationAndTable() {
// given // given

View File

@ -1,114 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0;
import com.datastax.driver.core.ExecutionInfo;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Session;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.db.SqlStatementInfo;
import io.opentelemetry.instrumentation.api.db.SqlStatementSanitizer;
import io.opentelemetry.instrumentation.api.tracer.DatabaseClientTracer;
import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DbSystemValues;
import java.net.InetSocketAddress;
import org.checkerframework.checker.nullness.qual.Nullable;
public class CassandraDatabaseClientTracer
extends DatabaseClientTracer<Session, String, SqlStatementInfo> {
private static final CassandraDatabaseClientTracer TRACER = new CassandraDatabaseClientTracer();
private CassandraDatabaseClientTracer() {
super(NetPeerAttributes.INSTANCE);
}
public static CassandraDatabaseClientTracer tracer() {
return TRACER;
}
@Override
protected String getInstrumentationName() {
return "io.opentelemetry.javaagent.cassandra-3.0";
}
@Override
protected SqlStatementInfo sanitizeStatement(String statement) {
return SqlStatementSanitizer.sanitize(statement).mapTable(this::stripKeyspace);
}
// account for splitting out the keyspace, <keyspace>.<table>
@Nullable
private String stripKeyspace(String table) {
int i;
if (table == null || (i = table.indexOf('.')) == -1) {
return table;
}
return table.substring(i + 1);
}
@Override
protected String spanName(
Session connection, String statement, SqlStatementInfo sanitizedStatement) {
return conventionSpanName(
dbName(connection),
sanitizedStatement.getOperation(),
sanitizedStatement.getTable(),
sanitizedStatement.getFullStatement());
}
@Override
protected String dbSystem(Session session) {
return DbSystemValues.CASSANDRA;
}
@Override
protected void onConnection(SpanBuilder span, Session session) {
span.setAttribute(SemanticAttributes.DB_CASSANDRA_KEYSPACE, session.getLoggedKeyspace());
super.onConnection(span, session);
}
@Override
protected String dbName(Session session) {
return session.getLoggedKeyspace();
}
@Override
protected InetSocketAddress peerAddress(Session session) {
return null;
}
@Override
protected void onStatement(
SpanBuilder span, Session connection, String statement, SqlStatementInfo sanitizedStatement) {
super.onStatement(span, connection, statement, sanitizedStatement);
String table = sanitizedStatement.getTable();
if (table != null) {
span.setAttribute(SemanticAttributes.DB_CASSANDRA_TABLE, table);
}
}
@Override
protected String dbStatement(
Session connection, String statement, SqlStatementInfo sanitizedStatement) {
return sanitizedStatement.getFullStatement();
}
@Override
protected String dbOperation(
Session connection, String statement, SqlStatementInfo sanitizedStatement) {
return sanitizedStatement.getOperation();
}
public void end(Context context, ExecutionInfo executionInfo) {
Span span = Span.fromContext(context);
Host host = executionInfo.getQueriedHost();
NetPeerAttributes.INSTANCE.setNetPeer(span, host.getSocketAddress());
end(context);
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.db.DbAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.db.DbSpanNameExtractor;
public final class CassandraInstrumenters {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.javaagent.cassandra-3.0";
private static final Instrumenter<CassandraRequest, Void> INSTRUMENTER;
static {
DbAttributesExtractor<CassandraRequest> attributesExtractor =
new CassandraSqlAttributesExtractor();
SpanNameExtractor<CassandraRequest> spanName = DbSpanNameExtractor.create(attributesExtractor);
INSTRUMENTER =
Instrumenter.<CassandraRequest, Void>newBuilder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanName)
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(new CassandraNetAttributesExtractor())
.addAttributesExtractor(new CassandraKeyspaceExtractor())
.newInstrumenter(SpanKindExtractor.alwaysClient());
}
public static Instrumenter<CassandraRequest, Void> instrumenter() {
return INSTRUMENTER;
}
private CassandraInstrumenters() {}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
final class CassandraKeyspaceExtractor extends AttributesExtractor<CassandraRequest, Void> {
@Override
protected void onStart(AttributesBuilder attributes, CassandraRequest request) {
attributes.put(
SemanticAttributes.DB_CASSANDRA_KEYSPACE, request.getSession().getLoggedKeyspace());
}
@Override
protected void onEnd(AttributesBuilder attributes, CassandraRequest request, Void unused) {}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0;
import com.datastax.driver.core.ExecutionInfo;
import io.opentelemetry.instrumentation.api.instrumenter.net.InetSocketAddressNetAttributesExtractor;
import java.net.InetSocketAddress;
import org.checkerframework.checker.nullness.qual.Nullable;
final class CassandraNetAttributesExtractor
extends InetSocketAddressNetAttributesExtractor<CassandraRequest, Void> {
@Override
@Nullable
protected String transport(CassandraRequest request) {
return null;
}
@Override
protected @Nullable InetSocketAddress getAddress(
CassandraRequest request, @Nullable Void response) {
ExecutionInfo executionInfo = request.getExecutionInfo();
return executionInfo == null ? null : executionInfo.getQueriedHost().getSocketAddress();
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0;
import com.datastax.driver.core.ExecutionInfo;
import com.datastax.driver.core.Session;
public final class CassandraRequest {
private final Session session;
private final String statement;
// volatile is not needed here as this field is set and get from the same thread
private ExecutionInfo executionInfo;
public CassandraRequest(Session session, String statement) {
this.session = session;
this.statement = statement;
}
public Session getSession() {
return session;
}
public String getStatement() {
return statement;
}
public void setExecutionInfo(ExecutionInfo executionInfo) {
this.executionInfo = executionInfo;
}
public ExecutionInfo getExecutionInfo() {
return executionInfo;
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.instrumentation.api.instrumenter.db.SqlAttributesExtractor;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import org.checkerframework.checker.nullness.qual.Nullable;
final class CassandraSqlAttributesExtractor extends SqlAttributesExtractor<CassandraRequest> {
@Override
protected String system(CassandraRequest request) {
return SemanticAttributes.DbSystemValues.CASSANDRA;
}
@Override
@Nullable
protected String user(CassandraRequest request) {
return null;
}
@Override
@Nullable
protected String name(CassandraRequest request) {
return request.getSession().getLoggedKeyspace();
}
@Override
@Nullable
protected String connectionString(CassandraRequest request) {
return null;
}
@Override
protected AttributeKey<String> dbTableAttribute() {
return SemanticAttributes.DB_CASSANDRA_TABLE;
}
@Override
@Nullable
protected String rawStatement(CassandraRequest request) {
return request.getStatement();
}
}

View File

@ -5,7 +5,7 @@
package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0; package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0;
import static io.opentelemetry.javaagent.instrumentation.cassandra.v3_0.CassandraDatabaseClientTracer.tracer; import static io.opentelemetry.javaagent.instrumentation.cassandra.v3_0.CassandraInstrumenters.instrumenter;
import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.CloseFuture; import com.datastax.driver.core.CloseFuture;
@ -48,86 +48,98 @@ public class TracingSession implements Session {
@Override @Override
public ResultSet execute(String query) { public ResultSet execute(String query) {
Context context = tracer().startSpan(Context.current(), session, query); CassandraRequest request = new CassandraRequest(session, query);
Context context = instrumenter().start(Context.current(), request);
ResultSet resultSet; ResultSet resultSet;
try (Scope ignored = context.makeCurrent()) { try (Scope ignored = context.makeCurrent()) {
resultSet = session.execute(query); resultSet = session.execute(query);
} catch (Throwable t) { } catch (Throwable t) {
tracer().endExceptionally(context, t); instrumenter().end(context, request, null, t);
throw t; throw t;
} }
tracer().end(context, resultSet.getExecutionInfo()); request.setExecutionInfo(resultSet.getExecutionInfo());
instrumenter().end(context, request, null, null);
return resultSet; return resultSet;
} }
@Override @Override
public ResultSet execute(String query, Object... values) { public ResultSet execute(String query, Object... values) {
Context context = tracer().startSpan(Context.current(), session, query); CassandraRequest request = new CassandraRequest(session, query);
Context context = instrumenter().start(Context.current(), request);
ResultSet resultSet; ResultSet resultSet;
try (Scope ignored = context.makeCurrent()) { try (Scope ignored = context.makeCurrent()) {
resultSet = session.execute(query, values); resultSet = session.execute(query, values);
} catch (Throwable t) { } catch (Throwable t) {
tracer().endExceptionally(context, t); instrumenter().end(context, request, null, t);
throw t; throw t;
} }
tracer().end(context, resultSet.getExecutionInfo()); request.setExecutionInfo(resultSet.getExecutionInfo());
instrumenter().end(context, request, null, null);
return resultSet; return resultSet;
} }
@Override @Override
public ResultSet execute(String query, Map<String, Object> values) { public ResultSet execute(String query, Map<String, Object> values) {
Context context = tracer().startSpan(Context.current(), session, query); CassandraRequest request = new CassandraRequest(session, query);
Context context = instrumenter().start(Context.current(), request);
ResultSet resultSet; ResultSet resultSet;
try (Scope ignored = context.makeCurrent()) { try (Scope ignored = context.makeCurrent()) {
resultSet = session.execute(query, values); resultSet = session.execute(query, values);
} catch (Throwable t) { } catch (Throwable t) {
tracer().endExceptionally(context, t); instrumenter().end(context, request, null, t);
throw t; throw t;
} }
tracer().end(context, resultSet.getExecutionInfo()); request.setExecutionInfo(resultSet.getExecutionInfo());
instrumenter().end(context, request, null, null);
return resultSet; return resultSet;
} }
@Override @Override
public ResultSet execute(Statement statement) { public ResultSet execute(Statement statement) {
Context context = tracer().startSpan(Context.current(), session, getQuery(statement)); String query = getQuery(statement);
CassandraRequest request = new CassandraRequest(session, query);
Context context = instrumenter().start(Context.current(), request);
ResultSet resultSet; ResultSet resultSet;
try (Scope ignored = context.makeCurrent()) { try (Scope ignored = context.makeCurrent()) {
resultSet = session.execute(statement); resultSet = session.execute(statement);
} catch (Throwable t) { } catch (Throwable t) {
tracer().endExceptionally(context, t); instrumenter().end(context, request, null, t);
throw t; throw t;
} }
tracer().end(context, resultSet.getExecutionInfo()); request.setExecutionInfo(resultSet.getExecutionInfo());
instrumenter().end(context, request, null, null);
return resultSet; return resultSet;
} }
@Override @Override
public ResultSetFuture executeAsync(String query) { public ResultSetFuture executeAsync(String query) {
Context context = tracer().startSpan(Context.current(), session, query); CassandraRequest request = new CassandraRequest(session, query);
Context context = instrumenter().start(Context.current(), request);
try (Scope ignored = context.makeCurrent()) { try (Scope ignored = context.makeCurrent()) {
ResultSetFuture future = session.executeAsync(query); ResultSetFuture future = session.executeAsync(query);
addCallbackToEndSpan(future, context); addCallbackToEndSpan(future, context, request);
return future; return future;
} }
} }
@Override @Override
public ResultSetFuture executeAsync(String query, Object... values) { public ResultSetFuture executeAsync(String query, Object... values) {
Context context = tracer().startSpan(Context.current(), session, query); CassandraRequest request = new CassandraRequest(session, query);
Context context = instrumenter().start(Context.current(), request);
try (Scope ignored = context.makeCurrent()) { try (Scope ignored = context.makeCurrent()) {
ResultSetFuture future = session.executeAsync(query, values); ResultSetFuture future = session.executeAsync(query, values);
addCallbackToEndSpan(future, context); addCallbackToEndSpan(future, context, request);
return future; return future;
} }
} }
@Override @Override
public ResultSetFuture executeAsync(String query, Map<String, Object> values) { public ResultSetFuture executeAsync(String query, Map<String, Object> values) {
Context context = tracer().startSpan(Context.current(), session, query); CassandraRequest request = new CassandraRequest(session, query);
Context context = instrumenter().start(Context.current(), request);
try (Scope ignored = context.makeCurrent()) { try (Scope ignored = context.makeCurrent()) {
ResultSetFuture future = session.executeAsync(query, values); ResultSetFuture future = session.executeAsync(query, values);
addCallbackToEndSpan(future, context); addCallbackToEndSpan(future, context, request);
return future; return future;
} }
} }
@ -135,10 +147,11 @@ public class TracingSession implements Session {
@Override @Override
public ResultSetFuture executeAsync(Statement statement) { public ResultSetFuture executeAsync(Statement statement) {
String query = getQuery(statement); String query = getQuery(statement);
Context context = tracer().startSpan(Context.current(), session, query); CassandraRequest request = new CassandraRequest(session, query);
Context context = instrumenter().start(Context.current(), request);
try (Scope ignored = context.makeCurrent()) { try (Scope ignored = context.makeCurrent()) {
ResultSetFuture future = session.executeAsync(statement); ResultSetFuture future = session.executeAsync(statement);
addCallbackToEndSpan(future, context); addCallbackToEndSpan(future, context, request);
return future; return future;
} }
} }
@ -199,18 +212,20 @@ public class TracingSession implements Session {
return query == null ? "" : query; return query == null ? "" : query;
} }
private void addCallbackToEndSpan(ResultSetFuture future, Context context) { private void addCallbackToEndSpan(
ResultSetFuture future, Context context, CassandraRequest request) {
Futures.addCallback( Futures.addCallback(
future, future,
new FutureCallback<ResultSet>() { new FutureCallback<ResultSet>() {
@Override @Override
public void onSuccess(ResultSet result) { public void onSuccess(ResultSet resultSet) {
tracer().end(context, result.getExecutionInfo()); request.setExecutionInfo(resultSet.getExecutionInfo());
instrumenter().end(context, request, null, null);
} }
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
tracer().endExceptionally(context, t); instrumenter().end(context, request, null, t);
} }
}, },
Runnable::run); Runnable::run);

View File

@ -64,7 +64,7 @@ class CassandraClientTest extends AgentInstrumentationSpecification {
assertTraces(keyspace ? 2 : 1) { assertTraces(keyspace ? 2 : 1) {
if (keyspace) { if (keyspace) {
trace(0, 1) { trace(0, 1) {
cassandraSpan(it, 0, "USE $keyspace", "USE $keyspace") cassandraSpan(it, 0, "DB Query", "USE $keyspace")
} }
} }
trace(keyspace ? 1 : 0, 1) { trace(keyspace ? 1 : 0, 1) {
@ -77,10 +77,10 @@ class CassandraClientTest extends AgentInstrumentationSpecification {
where: where:
keyspace | statement | expectedStatement | spanName | operation | table keyspace | statement | expectedStatement | spanName | operation | table
null | "DROP KEYSPACE IF EXISTS sync_test" | "DROP KEYSPACE IF EXISTS sync_test" | expectedStatement | null | null null | "DROP KEYSPACE IF EXISTS sync_test" | "DROP KEYSPACE IF EXISTS sync_test" | "DB Query" | null | null
null | "CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | "CREATE KEYSPACE sync_test WITH REPLICATION = {?:?, ?:?}" | expectedStatement | null | null null | "CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | "CREATE KEYSPACE sync_test WITH REPLICATION = {?:?, ?:?}" | "DB Query" | null | null
"sync_test" | "CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | null | null "sync_test" | "CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | null | null
"sync_test" | "INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "INSERT INTO sync_test.users (id, name) values (uuid(), ?)" | "INSERT sync_test.users" | "INSERT" | "users" "sync_test" | "INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "INSERT INTO sync_test.users (id, name) values (uuid(), ?)" | "INSERT sync_test.users" | "INSERT" | "sync_test.users"
"sync_test" | "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "SELECT * FROM users where name = ? ALLOW FILTERING" | "SELECT sync_test.users" | "SELECT" | "users" "sync_test" | "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "SELECT * FROM users where name = ? ALLOW FILTERING" | "SELECT sync_test.users" | "SELECT" | "users"
} }
@ -101,7 +101,7 @@ class CassandraClientTest extends AgentInstrumentationSpecification {
assertTraces(keyspace ? 2 : 1) { assertTraces(keyspace ? 2 : 1) {
if (keyspace) { if (keyspace) {
trace(0, 1) { trace(0, 1) {
cassandraSpan(it, 0, "USE $keyspace", "USE $keyspace") cassandraSpan(it, 0, "DB Query", "USE $keyspace")
} }
} }
trace(keyspace ? 1 : 0, 3) { trace(keyspace ? 1 : 0, 3) {
@ -116,10 +116,10 @@ class CassandraClientTest extends AgentInstrumentationSpecification {
where: where:
keyspace | statement | expectedStatement | spanName | operation | table keyspace | statement | expectedStatement | spanName | operation | table
null | "DROP KEYSPACE IF EXISTS async_test" | "DROP KEYSPACE IF EXISTS async_test" | expectedStatement | null | null null | "DROP KEYSPACE IF EXISTS async_test" | "DROP KEYSPACE IF EXISTS async_test" | "DB Query" | null | null
null | "CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | "CREATE KEYSPACE async_test WITH REPLICATION = {?:?, ?:?}" | expectedStatement | null | null null | "CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | "CREATE KEYSPACE async_test WITH REPLICATION = {?:?, ?:?}" | "DB Query" | null | null
"async_test" | "CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | null | null "async_test" | "CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | null | null
"async_test" | "INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "INSERT INTO async_test.users (id, name) values (uuid(), ?)" | "INSERT async_test.users" | "INSERT" | "users" "async_test" | "INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "INSERT INTO async_test.users (id, name) values (uuid(), ?)" | "INSERT async_test.users" | "INSERT" | "async_test.users"
"async_test" | "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "SELECT * FROM users where name = ? ALLOW FILTERING" | "SELECT async_test.users" | "SELECT" | "users" "async_test" | "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "SELECT * FROM users where name = ? ALLOW FILTERING" | "SELECT async_test.users" | "SELECT" | "users"
} }