Migrate Memcache instrumentation to Decorator

This commit is contained in:
Tyler Benson 2019-03-12 11:18:45 -07:00
parent d31965ff5a
commit bca5614508
9 changed files with 130 additions and 105 deletions

View File

@ -1,10 +1,8 @@
// building and testing against 2.12 because setListenerExecutorService exists to facilitate easier
// testing. Instrumentation should work since 2.10
muzzle { muzzle {
pass { pass {
group = "net.spy" group = "net.spy"
module = 'spymemcached' module = 'spymemcached'
versions = "[2.10.0,)" versions = "[2.12.0,)"
assertInverse = true assertInverse = true
} }
} }

View File

@ -1,14 +1,14 @@
package datadog.trace.instrumentation.spymemcached; package datadog.trace.instrumentation.spymemcached;
import io.opentracing.Span; import io.opentracing.Span;
import io.opentracing.Tracer;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.internal.BulkGetFuture; import net.spy.memcached.internal.BulkGetFuture;
public class BulkGetCompletionListener extends CompletionListener<BulkGetFuture<?>> public class BulkGetCompletionListener extends CompletionListener<BulkGetFuture<?>>
implements net.spy.memcached.internal.BulkGetCompletionListener { implements net.spy.memcached.internal.BulkGetCompletionListener {
public BulkGetCompletionListener(final Tracer tracer, final String methodName) { public BulkGetCompletionListener(final MemcachedConnection connection, final String methodName) {
super(tracer, methodName, true); super(connection, methodName);
} }
@Override @Override
@ -21,7 +21,7 @@ public class BulkGetCompletionListener extends CompletionListener<BulkGetFuture<
throws ExecutionException, InterruptedException { throws ExecutionException, InterruptedException {
/* /*
Note: for now we do not have an affective way of representing results of bulk operations, Note: for now we do not have an affective way of representing results of bulk operations,
i.e. we cannot day that we got 4 hits out of 10. So we will just ignore results for now. i.e. we cannot say that we got 4 hits out of 10. So we will just ignore results for now.
*/ */
future.get(); future.get();
} }

View File

@ -1,17 +1,13 @@
package datadog.trace.instrumentation.spymemcached; package datadog.trace.instrumentation.spymemcached;
import static io.opentracing.log.Fields.ERROR_OBJECT; import static datadog.trace.instrumentation.spymemcached.MemcacheClientDecorator.DECORATE;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.Scope;
import io.opentracing.Span; import io.opentracing.Span;
import io.opentracing.Tracer; import io.opentracing.util.GlobalTracer;
import io.opentracing.tag.Tags;
import java.util.Collections;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.spy.memcached.MemcachedConnection;
@Slf4j @Slf4j
public abstract class CompletionListener<T> { public abstract class CompletionListener<T> {
@ -27,34 +23,18 @@ public abstract class CompletionListener<T> {
static final String HIT = "hit"; static final String HIT = "hit";
static final String MISS = "miss"; static final String MISS = "miss";
private final Tracer tracer; private final MemcachedConnection connection;
private final Scope scope; private final Span span;
public CompletionListener(final Tracer tracer, final String methodName, final boolean async) { public CompletionListener(final MemcachedConnection connection, final String methodName) {
this.tracer = tracer; this.connection = connection;
scope = buildSpan(getOperationName(methodName), async); span = GlobalTracer.get().buildSpan(OPERATION_NAME).start();
} DECORATE.afterStart(span);
DECORATE.onConnection(span, connection);
private Scope buildSpan(final String operation, final boolean async) { DECORATE.onOperation(span, methodName);
final Tracer.SpanBuilder spanBuilder =
tracer
.buildSpan(OPERATION_NAME)
.withTag(DDTags.SERVICE_NAME, SERVICE_NAME)
.withTag(DDTags.RESOURCE_NAME, operation)
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.MEMCACHED)
.withTag(Tags.COMPONENT.getKey(), COMPONENT_NAME)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.withTag(Tags.DB_TYPE.getKey(), DB_TYPE);
final Scope scope = spanBuilder.startActive(false);
if (async) {
scope.close();
}
return scope;
} }
protected void closeAsyncSpan(final T future) { protected void closeAsyncSpan(final T future) {
final Span span = scope.span();
try { try {
processResult(span, future); processResult(span, future);
} catch (final CancellationException e) { } catch (final CancellationException e) {
@ -64,33 +44,25 @@ public abstract class CompletionListener<T> {
// Looks like underlying OperationFuture wraps CancellationException into ExecutionException // Looks like underlying OperationFuture wraps CancellationException into ExecutionException
span.setTag(DB_COMMAND_CANCELLED, true); span.setTag(DB_COMMAND_CANCELLED, true);
} else { } else {
Tags.ERROR.set(span, Boolean.TRUE); DECORATE.onError(span, e.getCause());
span.log(Collections.singletonMap(ERROR_OBJECT, e.getCause()));
} }
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
// Avoid swallowing InterruptedException // Avoid swallowing InterruptedException
Tags.ERROR.set(span, Boolean.TRUE); DECORATE.onError(span, e);
span.log(Collections.singletonMap(ERROR_OBJECT, e));
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} catch (final Exception e) { } catch (final Exception e) {
// This should never happen, just in case to make sure we cover all unexpected exceptions // This should never happen, just in case to make sure we cover all unexpected exceptions
Tags.ERROR.set(span, Boolean.TRUE); DECORATE.onError(span, e);
span.log(Collections.singletonMap(ERROR_OBJECT, e));
} finally { } finally {
DECORATE.beforeFinish(span);
span.finish(); span.finish();
} }
} }
protected void closeSyncSpan(final Throwable thrown) { protected void closeSyncSpan(final Throwable thrown) {
final Span span = scope.span(); DECORATE.onError(span, thrown);
DECORATE.beforeFinish(span);
if (thrown != null) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, thrown));
}
span.finish(); span.finish();
scope.close();
} }
protected abstract void processResult(Span span, T future) protected abstract void processResult(Span span, T future)
@ -99,17 +71,4 @@ public abstract class CompletionListener<T> {
protected void setResultTag(final Span span, final boolean hit) { protected void setResultTag(final Span span, final boolean hit) {
span.setTag(MEMCACHED_RESULT, hit ? HIT : MISS); span.setTag(MEMCACHED_RESULT, hit ? HIT : MISS);
} }
private static String getOperationName(final String methodName) {
final char[] chars =
methodName
.replaceFirst("^async", "")
// 'CAS' name is special, we have to lowercase whole name
.replaceFirst("^CAS", "cas")
.toCharArray();
// Lowercase first letter
chars[0] = Character.toLowerCase(chars[0]);
return new String(chars);
}
} }

View File

@ -1,25 +1,25 @@
package datadog.trace.instrumentation.spymemcached; package datadog.trace.instrumentation.spymemcached;
import io.opentracing.Span; import io.opentracing.Span;
import io.opentracing.Tracer;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.internal.GetFuture; import net.spy.memcached.internal.GetFuture;
public class GetCompletionListener extends CompletionListener<GetFuture<?>> public class GetCompletionListener extends CompletionListener<GetFuture<?>>
implements net.spy.memcached.internal.GetCompletionListener { implements net.spy.memcached.internal.GetCompletionListener {
public GetCompletionListener(Tracer tracer, String methodName) { public GetCompletionListener(final MemcachedConnection connection, final String methodName) {
super(tracer, methodName, true); super(connection, methodName);
} }
@Override @Override
public void onComplete(GetFuture<?> future) { public void onComplete(final GetFuture<?> future) {
closeAsyncSpan(future); closeAsyncSpan(future);
} }
@Override @Override
protected void processResult(Span span, GetFuture<?> future) protected void processResult(final Span span, final GetFuture<?> future)
throws ExecutionException, InterruptedException { throws ExecutionException, InterruptedException {
Object result = future.get(); final Object result = future.get();
setResultTag(span, result != null); setResultTag(span, result != null);
} }
} }

View File

@ -0,0 +1,62 @@
package datadog.trace.instrumentation.spymemcached;
import datadog.trace.agent.decorator.DatabaseClientDecorator;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.Span;
import net.spy.memcached.MemcachedConnection;
public class MemcacheClientDecorator extends DatabaseClientDecorator<MemcachedConnection> {
public static final MemcacheClientDecorator DECORATE = new MemcacheClientDecorator();
@Override
protected String[] instrumentationNames() {
return new String[] {"spymemcached"};
}
@Override
protected String service() {
return "memcached";
}
@Override
protected String component() {
return "java-spymemcached";
}
@Override
protected String spanType() {
return DDSpanTypes.MEMCACHED;
}
@Override
protected String dbType() {
return "memcached";
}
@Override
protected String dbUser(final MemcachedConnection session) {
return null;
}
@Override
protected String dbInstance(final MemcachedConnection connection) {
return connection.connectionsStatus();
}
public Span onOperation(final Span span, final String methodName) {
final char[] chars =
methodName
.replaceFirst("^async", "")
// 'CAS' name is special, we have to lowercase whole name
.replaceFirst("^CAS", "cas")
.toCharArray();
// Lowercase first letter
chars[0] = Character.toLowerCase(chars[0]);
span.setTag(DDTags.RESOURCE_NAME, new String(chars));
return span;
}
}

View File

@ -9,8 +9,6 @@ import static net.bytebuddy.matcher.ElementMatchers.returns;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.CallDepthThreadLocalMap; import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import io.opentracing.util.GlobalTracer;
import java.lang.reflect.Method;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
@ -26,8 +24,6 @@ import net.spy.memcached.internal.OperationFuture;
public final class MemcachedClientInstrumentation extends Instrumenter.Default { public final class MemcachedClientInstrumentation extends Instrumenter.Default {
private static final String MEMCACHED_PACKAGE = "net.spy.memcached"; private static final String MEMCACHED_PACKAGE = "net.spy.memcached";
private static final String HELPERS_PACKAGE =
MemcachedClientInstrumentation.class.getPackage().getName();
public MemcachedClientInstrumentation() { public MemcachedClientInstrumentation() {
super("spymemcached"); super("spymemcached");
@ -41,11 +37,15 @@ public final class MemcachedClientInstrumentation extends Instrumenter.Default {
@Override @Override
public String[] helperClassNames() { public String[] helperClassNames() {
return new String[] { return new String[] {
HELPERS_PACKAGE + ".CompletionListener", "datadog.trace.agent.decorator.BaseDecorator",
HELPERS_PACKAGE + ".SyncCompletionListener", "datadog.trace.agent.decorator.ClientDecorator",
HELPERS_PACKAGE + ".GetCompletionListener", "datadog.trace.agent.decorator.DatabaseClientDecorator",
HELPERS_PACKAGE + ".OperationCompletionListener", packageName + ".MemcacheClientDecorator",
HELPERS_PACKAGE + ".BulkGetCompletionListener" packageName + ".CompletionListener",
packageName + ".SyncCompletionListener",
packageName + ".GetCompletionListener",
packageName + ".OperationCompletionListener",
packageName + ".BulkGetCompletionListener"
}; };
} }
@ -84,13 +84,14 @@ public final class MemcachedClientInstrumentation extends Instrumenter.Default {
@Advice.OnMethodExit(suppress = Throwable.class) @Advice.OnMethodExit(suppress = Throwable.class)
public static void methodExit( public static void methodExit(
@Advice.Enter final boolean shouldInjectListener, @Advice.Enter final boolean shouldInjectListener,
@Advice.Origin final Method method, @Advice.This final MemcachedClient client,
@Advice.Origin("#m") final String methodName,
@Advice.Return final OperationFuture future) { @Advice.Return final OperationFuture future) {
if (shouldInjectListener && future != null) { if (shouldInjectListener && future != null) {
final OperationCompletionListener listener =
new OperationCompletionListener(GlobalTracer.get(), method.getName());
future.addListener(listener);
CallDepthThreadLocalMap.reset(MemcachedClient.class); CallDepthThreadLocalMap.reset(MemcachedClient.class);
final OperationCompletionListener listener =
new OperationCompletionListener(client.getConnection(), methodName);
future.addListener(listener);
} }
} }
} }
@ -105,13 +106,14 @@ public final class MemcachedClientInstrumentation extends Instrumenter.Default {
@Advice.OnMethodExit(suppress = Throwable.class) @Advice.OnMethodExit(suppress = Throwable.class)
public static void methodExit( public static void methodExit(
@Advice.Enter final boolean shouldInjectListener, @Advice.Enter final boolean shouldInjectListener,
@Advice.Origin final Method method, @Advice.This final MemcachedClient client,
@Advice.Origin("#m") final String methodName,
@Advice.Return final GetFuture future) { @Advice.Return final GetFuture future) {
if (shouldInjectListener && future != null) { if (shouldInjectListener && future != null) {
final GetCompletionListener listener =
new GetCompletionListener(GlobalTracer.get(), method.getName());
future.addListener(listener);
CallDepthThreadLocalMap.reset(MemcachedClient.class); CallDepthThreadLocalMap.reset(MemcachedClient.class);
final GetCompletionListener listener =
new GetCompletionListener(client.getConnection(), methodName);
future.addListener(listener);
} }
} }
} }
@ -126,13 +128,14 @@ public final class MemcachedClientInstrumentation extends Instrumenter.Default {
@Advice.OnMethodExit(suppress = Throwable.class) @Advice.OnMethodExit(suppress = Throwable.class)
public static void methodExit( public static void methodExit(
@Advice.Enter final boolean shouldInjectListener, @Advice.Enter final boolean shouldInjectListener,
@Advice.Origin final Method method, @Advice.This final MemcachedClient client,
@Advice.Origin("#m") final String methodName,
@Advice.Return final BulkFuture future) { @Advice.Return final BulkFuture future) {
if (shouldInjectListener && future != null) { if (shouldInjectListener && future != null) {
final BulkGetCompletionListener listener =
new BulkGetCompletionListener(GlobalTracer.get(), method.getName());
future.addListener(listener);
CallDepthThreadLocalMap.reset(MemcachedClient.class); CallDepthThreadLocalMap.reset(MemcachedClient.class);
final BulkGetCompletionListener listener =
new BulkGetCompletionListener(client.getConnection(), methodName);
future.addListener(listener);
} }
} }
} }
@ -140,9 +143,10 @@ public final class MemcachedClientInstrumentation extends Instrumenter.Default {
public static class SyncOperationAdvice { public static class SyncOperationAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class) @Advice.OnMethodEnter(suppress = Throwable.class)
public static SyncCompletionListener methodEnter(@Advice.Origin final Method method) { public static SyncCompletionListener methodEnter(
@Advice.This final MemcachedClient client, @Advice.Origin("#m") final String methodName) {
if (CallDepthThreadLocalMap.incrementCallDepth(MemcachedClient.class) <= 0) { if (CallDepthThreadLocalMap.incrementCallDepth(MemcachedClient.class) <= 0) {
return new SyncCompletionListener(GlobalTracer.get(), method.getName()); return new SyncCompletionListener(client.getConnection(), methodName);
} else { } else {
return null; return null;
} }
@ -153,8 +157,8 @@ public final class MemcachedClientInstrumentation extends Instrumenter.Default {
@Advice.Enter final SyncCompletionListener listener, @Advice.Enter final SyncCompletionListener listener,
@Advice.Thrown final Throwable thrown) { @Advice.Thrown final Throwable thrown) {
if (listener != null) { if (listener != null) {
listener.done(thrown);
CallDepthThreadLocalMap.reset(MemcachedClient.class); CallDepthThreadLocalMap.reset(MemcachedClient.class);
listener.done(thrown);
} }
} }
} }

View File

@ -1,24 +1,25 @@
package datadog.trace.instrumentation.spymemcached; package datadog.trace.instrumentation.spymemcached;
import io.opentracing.Span; import io.opentracing.Span;
import io.opentracing.Tracer;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.internal.OperationFuture;
public class OperationCompletionListener public class OperationCompletionListener
extends CompletionListener<OperationFuture<? extends Object>> extends CompletionListener<OperationFuture<? extends Object>>
implements net.spy.memcached.internal.OperationCompletionListener { implements net.spy.memcached.internal.OperationCompletionListener {
public OperationCompletionListener(Tracer tracer, String methodName) { public OperationCompletionListener(
super(tracer, methodName, true); final MemcachedConnection connection, final String methodName) {
super(connection, methodName);
} }
@Override @Override
public void onComplete(OperationFuture<? extends Object> future) { public void onComplete(final OperationFuture<? extends Object> future) {
closeAsyncSpan(future); closeAsyncSpan(future);
} }
@Override @Override
protected void processResult(Span span, OperationFuture<? extends Object> future) protected void processResult(final Span span, final OperationFuture<? extends Object> future)
throws ExecutionException, InterruptedException { throws ExecutionException, InterruptedException {
future.get(); future.get();
} }

View File

@ -1,23 +1,23 @@
package datadog.trace.instrumentation.spymemcached; package datadog.trace.instrumentation.spymemcached;
import io.opentracing.Span; import io.opentracing.Span;
import io.opentracing.Tracer;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.spy.memcached.MemcachedConnection;
@Slf4j @Slf4j
public class SyncCompletionListener extends CompletionListener<Void> { public class SyncCompletionListener extends CompletionListener<Void> {
public SyncCompletionListener(Tracer tracer, String methodName) { public SyncCompletionListener(final MemcachedConnection connection, final String methodName) {
super(tracer, methodName, false); super(connection, methodName);
} }
@Override @Override
protected void processResult(Span span, Void future) protected void processResult(final Span span, final Void future)
throws ExecutionException, InterruptedException { throws ExecutionException, InterruptedException {
log.error("processResult was called on SyncCompletionListener. This should never happen. "); log.error("processResult was called on SyncCompletionListener. This should never happen. ");
} }
public void done(Throwable thrown) { public void done(final Throwable thrown) {
closeSyncSpan(thrown); closeSyncSpan(thrown);
} }
} }

View File

@ -635,6 +635,7 @@ class SpymemcachedTest extends AgentTestRunner {
"${Tags.COMPONENT.key}" COMPONENT_NAME "${Tags.COMPONENT.key}" COMPONENT_NAME
"${Tags.SPAN_KIND.key}" Tags.SPAN_KIND_CLIENT "${Tags.SPAN_KIND.key}" Tags.SPAN_KIND_CLIENT
"${Tags.DB_TYPE.key}" CompletionListener.DB_TYPE "${Tags.DB_TYPE.key}" CompletionListener.DB_TYPE
"$Tags.DB_INSTANCE.key" ~/Connection Status \{ \w*\/127.0.0.1:\d+ active: true, authed: true, last read: \d+ ms ago }/
if (error == "canceled") { if (error == "canceled") {
"${CompletionListener.DB_COMMAND_CANCELLED}" true "${CompletionListener.DB_COMMAND_CANCELLED}" true