Add Cassandra Driver 4 instrumentation (#334)

This commit is contained in:
Sergei Malafeev 2020-04-21 09:23:09 +08:00 committed by GitHub
parent 3561ccdfa2
commit 51cb8c6007
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 663 additions and 5 deletions

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.cassandra;
package io.opentelemetry.auto.instrumentation.cassandra.v3_0;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.ResultSet;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.cassandra;
package io.opentelemetry.auto.instrumentation.cassandra.v3_0;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

View File

@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.cassandra;
package io.opentelemetry.auto.instrumentation.cassandra.v3_0;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.opentelemetry.auto.instrumentation.cassandra.CassandraClientDecorator.DECORATE;
import static io.opentelemetry.auto.instrumentation.cassandra.v3_0.CassandraClientDecorator.DECORATE;
import static io.opentelemetry.trace.Span.Kind.CLIENT;
import static io.opentelemetry.trace.TracingContextUtils.currentContextWith;

View File

@ -0,0 +1,33 @@
// Set properties before any plugins get loaded
ext {
minJavaVersionForTests = JavaVersion.VERSION_1_8
maxJavaVersionForTests = JavaVersion.VERSION_1_8
cassandraDriverTestVersions = "[4.0,)"
}
apply from: "${rootDir}/gradle/instrumentation.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
muzzle {
pass {
group = "com.datastax.oss"
module = "java-driver-core"
versions = cassandraDriverTestVersions
assertInverse = true
}
}
testSets {
latestDepTest {
dirName = 'test'
}
}
dependencies {
main_java8CompileOnly group: 'com.datastax.oss', name: 'java-driver-core', version: '4.0.0'
testCompile group: 'com.datastax.oss', name: 'java-driver-core', version: '4.0.0'
testCompile group: 'org.cassandraunit', name: 'cassandra-unit', version: '4.3.1.0'
latestDepTestCompile group: 'com.datastax.oss', name: 'java-driver-core', version: '4.+'
}

View File

@ -0,0 +1,61 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.cassandra.v4_0;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import io.opentelemetry.auto.tooling.Instrumenter;
import java.util.Map;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public class CassandraClientInstrumentation extends Instrumenter.Default {
public CassandraClientInstrumentation() {
super("cassandra");
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
// Note: Cassandra has a large driver and we instrument single class in it.
// The rest is ignored in AdditionalLibraryIgnoresMatcher
return named("com.datastax.oss.driver.api.core.session.SessionBuilder");
}
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".CassandraClientDecorator",
packageName + ".TracingCqlSession",
packageName + ".CompletionStageFunction"
};
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod().and(isPublic()).and(named("buildAsync")).and(takesArguments(0)),
// Cannot reference class directly here because it would lead to class load failure on Java7
packageName + ".CassandraClientAdvice");
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.cassandra.v4_0;
import java.util.concurrent.CompletionStage;
import net.bytebuddy.asm.Advice;
public class CassandraClientAdvice {
/**
* Strategy: each time we build a connection to a Cassandra cluster, the
* com.datastax.oss.driver.api.core.session.SessionBuilder.buildAsync() method is called. The
* opentracing contribution is a simple wrapper, so we just have to wrap the new session.
*
* @param stage The fresh CompletionStage to patch. This stage produces session which is
* replaced with new session
*/
@Advice.OnMethodExit(suppress = Throwable.class)
public static void injectTracingSession(
@Advice.Return(readOnly = false) CompletionStage<?> stage) {
stage = stage.thenApply(new CompletionStageFunction());
}
}

View File

@ -0,0 +1,54 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.cassandra.v4_0;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.metadata.Node;
import io.opentelemetry.auto.bootstrap.instrumentation.decorator.DatabaseClientDecorator;
import io.opentelemetry.trace.Span;
import java.net.InetSocketAddress;
import java.util.Optional;
public class CassandraClientDecorator extends DatabaseClientDecorator<CqlSession> {
public static final CassandraClientDecorator DECORATE = new CassandraClientDecorator();
@Override
protected String dbType() {
return "cassandra";
}
@Override
protected String dbUser(final CqlSession session) {
return null;
}
@Override
protected String dbInstance(final CqlSession session) {
return session.getKeyspace().map(CqlIdentifier::toString).orElse(null);
}
public void onResponse(final Span span, final ExecutionInfo executionInfo) {
if (executionInfo != null) {
final Node coordinator = executionInfo.getCoordinator();
if (coordinator != null) {
final Optional<InetSocketAddress> address = coordinator.getBroadcastRpcAddress();
address.ifPresent(inetSocketAddress -> onPeerConnection(span, inetSocketAddress));
}
}
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.cassandra.v4_0;
import com.datastax.oss.driver.api.core.CqlSession;
import java.util.function.Function;
public class CompletionStageFunction implements Function<Object, Object> {
@Override
public Object apply(Object session) {
if (session == null) {
return null;
}
// This should cover ours and OT's TracingCqlSession
if (session.getClass().getName().endsWith("cassandra4.TracingCqlSession")) {
return session;
}
return new TracingCqlSession((CqlSession) session);
};
}

View File

@ -0,0 +1,306 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.cassandra.v4_0;
import static io.opentelemetry.auto.instrumentation.cassandra.v4_0.CassandraClientDecorator.DECORATE;
import static io.opentelemetry.trace.Span.Kind.CLIENT;
import static io.opentelemetry.trace.TracingContextUtils.currentContextWith;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PrepareRequest;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metrics.Metrics;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.opentelemetry.OpenTelemetry;
import io.opentelemetry.context.Scope;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.Tracer;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TracingCqlSession implements CqlSession {
private static final Tracer TRACER =
OpenTelemetry.getTracerProvider().get("io.opentelemetry.auto.cassandra-4.0");
private final ExecutorService executorService = Executors.newCachedThreadPool();
private final CqlSession session;
public TracingCqlSession(final CqlSession session) {
this.session = session;
}
@Override
@NonNull
public PreparedStatement prepare(@NonNull SimpleStatement statement) {
return session.prepare(statement);
}
@Override
@NonNull
public PreparedStatement prepare(@NonNull String query) {
return session.prepare(query);
}
@Override
@NonNull
public PreparedStatement prepare(@NonNull PrepareRequest request) {
return session.prepare(request);
}
@Override
@NonNull
public CompletionStage<PreparedStatement> prepareAsync(@NonNull SimpleStatement statement) {
return session.prepareAsync(statement);
}
@Override
@NonNull
public CompletionStage<PreparedStatement> prepareAsync(@NonNull String query) {
return session.prepareAsync(query);
}
@Override
@NonNull
public CompletionStage<PreparedStatement> prepareAsync(PrepareRequest request) {
return session.prepareAsync(request);
}
@Override
@NonNull
public String getName() {
return session.getName();
}
@Override
@NonNull
public Metadata getMetadata() {
return session.getMetadata();
}
@Override
public boolean isSchemaMetadataEnabled() {
return session.isSchemaMetadataEnabled();
}
@Override
@NonNull
public CompletionStage<Metadata> setSchemaMetadataEnabled(@Nullable Boolean newValue) {
return session.setSchemaMetadataEnabled(newValue);
}
@Override
@NonNull
public CompletionStage<Metadata> refreshSchemaAsync() {
return session.refreshSchemaAsync();
}
@Override
@NonNull
public Metadata refreshSchema() {
return session.refreshSchema();
}
@Override
@NonNull
public CompletionStage<Boolean> checkSchemaAgreementAsync() {
return session.checkSchemaAgreementAsync();
}
@Override
public boolean checkSchemaAgreement() {
return session.checkSchemaAgreement();
}
@Override
@NonNull
public DriverContext getContext() {
return session.getContext();
}
@Override
@NonNull
public Optional<CqlIdentifier> getKeyspace() {
return session.getKeyspace();
}
@Override
@NonNull
public Optional<Metrics> getMetrics() {
return session.getMetrics();
}
@Override
@Nullable
public <RequestT extends Request, ResultT> ResultT execute(
@NonNull RequestT request, @NonNull GenericType<ResultT> resultType) {
return session.execute(request, resultType);
}
@Override
@NonNull
public CompletionStage<Void> closeFuture() {
return session.closeFuture();
}
@Override
public boolean isClosed() {
return session.isClosed();
}
@Override
@NonNull
public CompletionStage<Void> closeAsync() {
return session.closeAsync();
}
@Override
@NonNull
public CompletionStage<Void> forceCloseAsync() {
return session.forceCloseAsync();
}
@Override
public void close() {
session.close();
}
@Override
@NonNull
public ResultSet execute(@NonNull Statement<?> statement) {
final String query = getQuery(statement);
final Span span = startSpan(query);
try (final Scope scope = currentContextWith(span)) {
try {
final ResultSet resultSet = session.execute(statement);
beforeSpanFinish(span, resultSet);
return resultSet;
} catch (final RuntimeException e) {
beforeSpanFinish(span, e);
throw e;
} finally {
span.end();
}
}
}
@Override
@NonNull
public ResultSet execute(@NonNull String query) {
final Span span = startSpan(query);
try (final Scope scope = currentContextWith(span)) {
try {
final ResultSet resultSet = session.execute(query);
beforeSpanFinish(span, resultSet);
return resultSet;
} catch (final RuntimeException e) {
beforeSpanFinish(span, e);
throw e;
} finally {
span.end();
}
}
}
@Override
@NonNull
public CompletionStage<AsyncResultSet> executeAsync(@NonNull Statement<?> statement) {
final String query = getQuery(statement);
final Span span = startSpan(query);
try (final Scope scope = currentContextWith(span)) {
final CompletionStage<AsyncResultSet> stage = session.executeAsync(statement);
return stage.whenComplete(
(asyncResultSet, throwable) -> {
if (throwable != null) {
beforeSpanFinish(span, throwable);
} else {
beforeSpanFinish(span, asyncResultSet);
}
span.end();
});
}
}
@Override
@NonNull
public CompletionStage<AsyncResultSet> executeAsync(@NonNull String query) {
final Span span = startSpan(query);
try (final Scope scope = currentContextWith(span)) {
final CompletionStage<AsyncResultSet> stage = session.executeAsync(query);
return stage.whenComplete(
(asyncResultSet, throwable) -> {
if (throwable != null) {
beforeSpanFinish(span, throwable);
} else {
beforeSpanFinish(span, asyncResultSet);
}
span.end();
});
}
}
private static String getQuery(final Statement<?> statement) {
String query = null;
if (statement instanceof SimpleStatement) {
query = ((SimpleStatement) statement).getQuery();
} else if (statement instanceof BoundStatement) {
query = ((BoundStatement) statement).getPreparedStatement().getQuery();
}
return query == null ? "" : query;
}
private Span startSpan(final String query) {
final Span span = TRACER.spanBuilder(query).setSpanKind(CLIENT).startSpan();
CassandraClientDecorator.DECORATE.afterStart(span);
CassandraClientDecorator.DECORATE.onConnection(span, session);
CassandraClientDecorator.DECORATE.onStatement(span, query);
return span;
}
private static void beforeSpanFinish(final Span span, final ResultSet resultSet) {
if (resultSet != null) {
CassandraClientDecorator.DECORATE.onResponse(span, resultSet.getExecutionInfo());
}
CassandraClientDecorator.DECORATE.beforeFinish(span);
}
private static void beforeSpanFinish(final Span span, final Throwable e) {
CassandraClientDecorator.DECORATE.onError(span, e);
CassandraClientDecorator.DECORATE.beforeFinish(span);
}
private void beforeSpanFinish(Span span, AsyncResultSet asyncResultSet) {
if (asyncResultSet != null) {
CassandraClientDecorator.DECORATE.onResponse(span, asyncResultSet.getExecutionInfo());
}
CassandraClientDecorator.DECORATE.beforeFinish(span);
}
}

View File

@ -0,0 +1,134 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.config.DefaultDriverOption
import com.datastax.oss.driver.api.core.config.DriverConfigLoader
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader
import io.opentelemetry.auto.instrumentation.api.MoreTags
import io.opentelemetry.auto.instrumentation.api.Tags
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.auto.test.asserts.TraceAssert
import io.opentelemetry.sdk.trace.data.SpanData
import org.cassandraunit.utils.EmbeddedCassandraServerHelper
import java.time.Duration
import static io.opentelemetry.auto.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.auto.test.utils.TraceUtils.runUnderTrace
import static io.opentelemetry.trace.Span.Kind.CLIENT
class CassandraClientTest extends AgentTestRunner {
def setupSpec() {
/*
This timeout seems excessive but we've seen tests fail with timeout of 40s.
TODO: if we continue to see failures we may want to consider using 'real' Cassandra
started in container like we do for memcached. Note: this will complicate things because
tests would have to assume they run under shared Cassandra and act accordingly.
*/
EmbeddedCassandraServerHelper.startEmbeddedCassandra(EmbeddedCassandraServerHelper.CASSANDRA_RNDPORT_YML_FILE, 120000L)
}
def cleanupSpec() {
EmbeddedCassandraServerHelper.cleanEmbeddedCassandra()
}
def "test sync"() {
setup:
CqlSession session = getSession(keyspace)
session.execute(statement)
expect:
assertTraces(1) {
trace(0, 1) {
cassandraSpan(it, 0, statement, keyspace)
}
}
cleanup:
session.close()
where:
statement | keyspace
"DROP KEYSPACE IF EXISTS sync_test" | null
"CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null
"CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test"
"INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "sync_test"
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "sync_test"
}
def "test async"() {
setup:
CqlSession session = getSession(keyspace)
runUnderTrace("parent") {
session.executeAsync(statement).toCompletableFuture().get()
}
expect:
assertTraces(1) {
trace(0, 2) {
basicSpan(it, 0, "parent")
cassandraSpan(it, 1, statement, keyspace, span(0))
}
}
cleanup:
session.close()
where:
statement | keyspace
"DROP KEYSPACE IF EXISTS async_test" | null
"CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null
"CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test"
"INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "async_test"
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "async_test"
}
def cassandraSpan(TraceAssert trace, int index, String statement, String keyspace, Object parentSpan = null, Throwable exception = null) {
trace.span(index) {
operationName statement
spanKind CLIENT
if (parentSpan == null) {
parent()
} else {
childOf((SpanData) parentSpan)
}
tags {
"$MoreTags.NET_PEER_NAME" "localhost"
"$MoreTags.NET_PEER_IP" "127.0.0.1"
"$MoreTags.NET_PEER_PORT" EmbeddedCassandraServerHelper.getNativeTransportPort()
"$Tags.DB_TYPE" "cassandra"
"$Tags.DB_INSTANCE" keyspace
"$Tags.DB_STATEMENT" statement
}
}
}
def getSession(String keyspace) {
DriverConfigLoader configLoader = DefaultDriverConfigLoader.builder()
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(0))
.build()
return CqlSession.builder()
.addContactPoint(new InetSocketAddress(EmbeddedCassandraServerHelper.getHost(), EmbeddedCassandraServerHelper.getNativeTransportPort()))
.withConfigLoader(configLoader)
.withLocalDatacenter("datacenter1")
.withKeyspace((String) keyspace)
.build()
}
}

View File

@ -47,7 +47,8 @@ include ':instrumentation:apache-httpclient:apache-httpclient-2.0'
include ':instrumentation:apache-httpclient:apache-httpclient-4.0'
include ':instrumentation:aws-sdk:aws-sdk-1.11'
include ':instrumentation:aws-sdk:aws-sdk-2.2'
include ':instrumentation:cassandra-3.0'
include ':instrumentation:cassandra:cassandra-3.0'
include ':instrumentation:cassandra:cassandra-4.0'
include ':instrumentation:cdi-testing'
include ':instrumentation:couchbase:couchbase-2.0'
include ':instrumentation:couchbase:couchbase-2.6'