Add Cassandra decorator
This commit is contained in:
parent
9c8e689b6d
commit
dd35372a72
|
@ -0,0 +1,68 @@
|
|||
package datadog.trace.instrumentation.datastax.cassandra;
|
||||
|
||||
import com.datastax.driver.core.Host;
|
||||
import com.datastax.driver.core.ResultSet;
|
||||
import com.datastax.driver.core.Session;
|
||||
import datadog.trace.agent.decorator.DatabaseClientDecorator;
|
||||
import datadog.trace.api.DDSpanTypes;
|
||||
import io.opentracing.Span;
|
||||
import io.opentracing.tag.Tags;
|
||||
import java.net.Inet4Address;
|
||||
import java.net.InetAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
class CassandraClientDecorator extends DatabaseClientDecorator<Session> {
|
||||
public static final CassandraClientDecorator INSTANCE = new CassandraClientDecorator();
|
||||
|
||||
@Override
|
||||
protected String[] instrumentationNames() {
|
||||
return new String[] {"cassandra"};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String service() {
|
||||
return "cassandra";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String component() {
|
||||
return "java-cassandra";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String spanType() {
|
||||
return DDSpanTypes.CASSANDRA;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String dbType() {
|
||||
return "cassandra";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String dbUser(final Session session) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String dbInstance(final Session session) {
|
||||
return session.getLoggedKeyspace();
|
||||
}
|
||||
|
||||
public Span onResponse(final Span span, final ResultSet result) {
|
||||
if (result != null) {
|
||||
final Host host = result.getExecutionInfo().getQueriedHost();
|
||||
Tags.PEER_PORT.set(span, host.getSocketAddress().getPort());
|
||||
Tags.PEER_HOSTNAME.set(span, host.getAddress().getHostName());
|
||||
|
||||
final InetAddress inetAddress = host.getSocketAddress().getAddress();
|
||||
if (inetAddress instanceof Inet4Address) {
|
||||
final byte[] address = inetAddress.getAddress();
|
||||
Tags.PEER_HOST_IPV4.set(span, ByteBuffer.wrap(address).getInt());
|
||||
} else {
|
||||
Tags.PEER_HOST_IPV6.set(span, inetAddress.getHostAddress());
|
||||
}
|
||||
}
|
||||
return span;
|
||||
}
|
||||
}
|
|
@ -9,9 +9,7 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
|
|||
import com.datastax.driver.core.Session;
|
||||
import com.google.auto.service.AutoService;
|
||||
import datadog.trace.agent.tooling.Instrumenter;
|
||||
import io.opentracing.Tracer;
|
||||
import io.opentracing.util.GlobalTracer;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.util.Map;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.method.MethodDescription;
|
||||
|
@ -33,9 +31,13 @@ public class CassandraClientInstrumentation extends Instrumenter.Default {
|
|||
@Override
|
||||
public String[] helperClassNames() {
|
||||
return new String[] {
|
||||
"datadog.trace.instrumentation.datastax.cassandra.TracingSession",
|
||||
"datadog.trace.instrumentation.datastax.cassandra.TracingSession$1",
|
||||
"datadog.trace.instrumentation.datastax.cassandra.TracingSession$2"
|
||||
"datadog.trace.agent.decorator.BaseDecorator",
|
||||
"datadog.trace.agent.decorator.ClientDecorator",
|
||||
"datadog.trace.agent.decorator.DatabaseClientDecorator",
|
||||
packageName + ".CassandraClientDecorator",
|
||||
packageName + ".TracingSession",
|
||||
packageName + ".TracingSession$1",
|
||||
packageName + ".TracingSession$2",
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -62,12 +64,7 @@ public class CassandraClientInstrumentation extends Instrumenter.Default {
|
|||
if (session.getClass().getName().endsWith("cassandra.TracingSession")) {
|
||||
return;
|
||||
}
|
||||
|
||||
final Class<?> clazz =
|
||||
Class.forName("datadog.trace.instrumentation.datastax.cassandra.TracingSession");
|
||||
final Constructor<?> constructor = clazz.getDeclaredConstructor(Session.class, Tracer.class);
|
||||
constructor.setAccessible(true);
|
||||
session = (Session) constructor.newInstance(session, GlobalTracer.get());
|
||||
session = new TracingSession(session, GlobalTracer.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,28 +1,8 @@
|
|||
/*
|
||||
* Copyright 2017-2018 The OpenTracing Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package datadog.trace.instrumentation.datastax.cassandra;
|
||||
|
||||
import static io.opentracing.log.Fields.ERROR_KIND;
|
||||
import static io.opentracing.log.Fields.ERROR_OBJECT;
|
||||
import static io.opentracing.log.Fields.EVENT;
|
||||
import static io.opentracing.log.Fields.MESSAGE;
|
||||
import static io.opentracing.log.Fields.STACK;
|
||||
|
||||
import com.datastax.driver.core.BoundStatement;
|
||||
import com.datastax.driver.core.CloseFuture;
|
||||
import com.datastax.driver.core.Cluster;
|
||||
import com.datastax.driver.core.Host;
|
||||
import com.datastax.driver.core.PreparedStatement;
|
||||
import com.datastax.driver.core.RegularStatement;
|
||||
import com.datastax.driver.core.ResultSet;
|
||||
|
@ -32,48 +12,34 @@ import com.datastax.driver.core.Statement;
|
|||
import com.google.common.base.Function;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import datadog.trace.api.DDSpanTypes;
|
||||
import datadog.trace.api.DDTags;
|
||||
import io.opentracing.Span;
|
||||
import io.opentracing.Tracer;
|
||||
import io.opentracing.tag.Tags;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.net.Inet4Address;
|
||||
import java.net.InetAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/** Package private decorator for {@link Session} Instantiated only by TracingCluster */
|
||||
class TracingSession implements Session {
|
||||
public class TracingSession implements Session {
|
||||
|
||||
static final String COMPONENT_NAME = "java-cassandra";
|
||||
private final ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
private final Session session;
|
||||
private final Tracer tracer;
|
||||
|
||||
TracingSession(final Session session, final Tracer tracer) {
|
||||
public TracingSession(final Session session, final Tracer tracer) {
|
||||
this.session = session;
|
||||
this.tracer = tracer;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public String getLoggedKeyspace() {
|
||||
return session.getLoggedKeyspace();
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public Session init() {
|
||||
return new TracingSession(session.init(), tracer);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public ListenableFuture<Session> initAsync() {
|
||||
return Futures.transform(
|
||||
|
@ -86,7 +52,6 @@ class TracingSession implements Session {
|
|||
});
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public ResultSet execute(final String query) {
|
||||
final Span span = buildSpan(query);
|
||||
|
@ -99,7 +64,6 @@ class TracingSession implements Session {
|
|||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public ResultSet execute(final String query, final Object... values) {
|
||||
final Span span = buildSpan(query);
|
||||
|
@ -112,7 +76,6 @@ class TracingSession implements Session {
|
|||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public ResultSet execute(final String query, final Map<String, Object> values) {
|
||||
final Span span = buildSpan(query);
|
||||
|
@ -125,7 +88,6 @@ class TracingSession implements Session {
|
|||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public ResultSet execute(final Statement statement) {
|
||||
final String query = getQuery(statement);
|
||||
|
@ -139,7 +101,6 @@ class TracingSession implements Session {
|
|||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public ResultSetFuture executeAsync(final String query) {
|
||||
final Span span = buildSpan(query);
|
||||
|
@ -149,7 +110,6 @@ class TracingSession implements Session {
|
|||
return future;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public ResultSetFuture executeAsync(final String query, final Object... values) {
|
||||
final Span span = buildSpan(query);
|
||||
|
@ -159,7 +119,6 @@ class TracingSession implements Session {
|
|||
return future;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public ResultSetFuture executeAsync(final String query, final Map<String, Object> values) {
|
||||
final Span span = buildSpan(query);
|
||||
|
@ -169,7 +128,6 @@ class TracingSession implements Session {
|
|||
return future;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public ResultSetFuture executeAsync(final Statement statement) {
|
||||
final String query = getQuery(statement);
|
||||
|
@ -180,37 +138,31 @@ class TracingSession implements Session {
|
|||
return future;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public PreparedStatement prepare(final String query) {
|
||||
return session.prepare(query);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public PreparedStatement prepare(final RegularStatement statement) {
|
||||
return session.prepare(statement);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public ListenableFuture<PreparedStatement> prepareAsync(final String query) {
|
||||
return session.prepareAsync(query);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public ListenableFuture<PreparedStatement> prepareAsync(final RegularStatement statement) {
|
||||
return session.prepareAsync(statement);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public CloseFuture closeAsync() {
|
||||
return session.closeAsync();
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public void close() {
|
||||
session.close();
|
||||
|
@ -221,13 +173,11 @@ class TracingSession implements Session {
|
|||
return session.isClosed();
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public Cluster getCluster() {
|
||||
return session.getCluster();
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public State getState() {
|
||||
return session.getState();
|
||||
|
@ -258,63 +208,22 @@ class TracingSession implements Session {
|
|||
}
|
||||
|
||||
private Span buildSpan(final String query) {
|
||||
final Tracer.SpanBuilder spanBuilder =
|
||||
tracer
|
||||
.buildSpan("cassandra.execute")
|
||||
.withTag(Tags.COMPONENT.getKey(), "datastax-cassandra")
|
||||
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
|
||||
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.CASSANDRA);
|
||||
|
||||
final Span span = spanBuilder.start();
|
||||
|
||||
Tags.COMPONENT.set(span, COMPONENT_NAME);
|
||||
Tags.DB_STATEMENT.set(span, query);
|
||||
Tags.DB_TYPE.set(span, "cassandra");
|
||||
|
||||
final String keyspace = getLoggedKeyspace();
|
||||
if (keyspace != null) {
|
||||
Tags.DB_INSTANCE.set(span, keyspace);
|
||||
}
|
||||
|
||||
final Span span = tracer.buildSpan("cassandra.execute").start();
|
||||
CassandraClientDecorator.INSTANCE.afterStart(span);
|
||||
CassandraClientDecorator.INSTANCE.onSession(span, session);
|
||||
CassandraClientDecorator.INSTANCE.onStatement(span, query);
|
||||
return span;
|
||||
}
|
||||
|
||||
private static void finishSpan(final Span span, final ResultSet resultSet) {
|
||||
if (resultSet != null) {
|
||||
final Host host = resultSet.getExecutionInfo().getQueriedHost();
|
||||
Tags.PEER_PORT.set(span, host.getSocketAddress().getPort());
|
||||
|
||||
Tags.PEER_HOSTNAME.set(span, host.getAddress().getHostName());
|
||||
final InetAddress inetAddress = host.getSocketAddress().getAddress();
|
||||
|
||||
if (inetAddress instanceof Inet4Address) {
|
||||
final byte[] address = inetAddress.getAddress();
|
||||
Tags.PEER_HOST_IPV4.set(span, ByteBuffer.wrap(address).getInt());
|
||||
} else {
|
||||
Tags.PEER_HOST_IPV6.set(span, inetAddress.getHostAddress());
|
||||
}
|
||||
}
|
||||
CassandraClientDecorator.INSTANCE.onResponse(span, resultSet);
|
||||
CassandraClientDecorator.INSTANCE.beforeFinish(span);
|
||||
span.finish();
|
||||
}
|
||||
|
||||
private static void finishSpan(final Span span, final Exception e) {
|
||||
Tags.ERROR.set(span, Boolean.TRUE);
|
||||
span.log(errorLogs(e));
|
||||
CassandraClientDecorator.INSTANCE.onError(span, e);
|
||||
CassandraClientDecorator.INSTANCE.beforeFinish(span);
|
||||
span.finish();
|
||||
}
|
||||
|
||||
private static Map<String, Object> errorLogs(final Throwable throwable) {
|
||||
final Map<String, Object> errorLogs = new HashMap<>(4);
|
||||
errorLogs.put(EVENT, Tags.ERROR.getKey());
|
||||
errorLogs.put(ERROR_KIND, throwable.getClass().getName());
|
||||
errorLogs.put(ERROR_OBJECT, throwable);
|
||||
|
||||
errorLogs.put(MESSAGE, throwable.getMessage());
|
||||
|
||||
final StringWriter sw = new StringWriter();
|
||||
throwable.printStackTrace(new PrintWriter(sw));
|
||||
errorLogs.put(STACK, sw.toString());
|
||||
|
||||
return errorLogs;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue