diff --git a/dd-java-agent/instrumentation/spymemcached-2.12/spymemcached-2.12.gradle b/dd-java-agent/instrumentation/spymemcached-2.12/spymemcached-2.12.gradle index 7715271f78..d2e4677843 100644 --- a/dd-java-agent/instrumentation/spymemcached-2.12/spymemcached-2.12.gradle +++ b/dd-java-agent/instrumentation/spymemcached-2.12/spymemcached-2.12.gradle @@ -1,6 +1,25 @@ +apply plugin: 'version-scan' + +versionScan { + group = "net.spy" + module = 'spymemcached' + versions = "[1.12.0,)" + scanMethods = true + verifyPresent = [ + "net.spy.memcached.ConnectionFactoryBuilder": "setListenerExecutorService", + ] +} apply from: "${rootDir}/gradle/java.gradle" +apply plugin: 'org.unbroken-dome.test-sets' + +testSets { + latestDepTest { + dirName = 'test' + } +} + dependencies { compileOnly group: 'net.spy', name: 'spymemcached', version: '2.12.0' @@ -13,11 +32,14 @@ dependencies { testCompile project(':dd-java-agent:testing') - // Include servlet instrumentation for verifying the tomcat requests - testCompile project(':dd-java-agent:instrumentation:servlet-3') - - testCompile group: 'net.spy', name: 'spymemcached', version: '2.12.0' testCompile group: 'org.spockframework', name:'spock-core', version:'1.1-groovy-2.4' testCompile group: 'org.testcontainers', name:'testcontainers', version:'1.7.3' } + +configurations.latestDepTestCompile { + resolutionStrategy { + force group: 'net.spy', name: 'spymemcached', version: '+' + } +} + diff --git a/dd-java-agent/instrumentation/spymemcached-2.12/src/main/java/datadog/trace/instrumentation/spymemcached/BulkGetCompletionListener.java b/dd-java-agent/instrumentation/spymemcached-2.12/src/main/java/datadog/trace/instrumentation/spymemcached/BulkGetCompletionListener.java new file mode 100644 index 0000000000..34f89c2b34 --- /dev/null +++ b/dd-java-agent/instrumentation/spymemcached-2.12/src/main/java/datadog/trace/instrumentation/spymemcached/BulkGetCompletionListener.java @@ -0,0 +1,28 @@ +package datadog.trace.instrumentation.spymemcached; + +import io.opentracing.Span; +import io.opentracing.Tracer; +import java.util.concurrent.ExecutionException; +import net.spy.memcached.internal.*; + +public class BulkGetCompletionListener extends CompletionListener> + implements net.spy.memcached.internal.BulkGetCompletionListener { + public BulkGetCompletionListener(Tracer tracer, String methodName) { + super(tracer, methodName, true); + } + + @Override + public void onComplete(BulkGetFuture future) { + closeAsyncSpan(future); + } + + @Override + protected void processResult(Span span, BulkGetFuture future) + throws ExecutionException, InterruptedException { + /* + Note: for now we do not have an affective way of representing results of bulk operations, + i.e. we cannot day that we got 4 hits out of 10. So we will just ignore results for now. + */ + future.get(); + } +} diff --git a/dd-java-agent/instrumentation/spymemcached-2.12/src/main/java/datadog/trace/instrumentation/spymemcached/CompletionListener.java b/dd-java-agent/instrumentation/spymemcached-2.12/src/main/java/datadog/trace/instrumentation/spymemcached/CompletionListener.java new file mode 100644 index 0000000000..f14ce9505f --- /dev/null +++ b/dd-java-agent/instrumentation/spymemcached-2.12/src/main/java/datadog/trace/instrumentation/spymemcached/CompletionListener.java @@ -0,0 +1,122 @@ +package datadog.trace.instrumentation.spymemcached; + +import static io.opentracing.log.Fields.ERROR_OBJECT; + +import datadog.trace.api.DDTags; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.opentracing.tag.Tags; +import java.util.Collections; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class CompletionListener { + + // Note: it looks like this value is being ignored and DBTypeDecorator overwrites it. + static final String OPERATION_NAME = "memcached.query"; + + static final String SERVICE_NAME = "memcached"; + /* + Ideally this should be "spymemcached" or something along those lines. + Unfortunately nondeterministic interaction between SpanTypeDecorator and DBTypeDecorator + pretty much forces this to be "sql". + */ + static final String SPAN_TYPE = "sql"; + static final String COMPONENT_NAME = "java-spymemcached"; + static final String DB_TYPE = "memcached"; + static final String DB_COMMAND_CANCELLED = "db.command.cancelled"; + static final String MEMCACHED_RESULT = "memcaced.result"; + static final String HIT = "hit"; + static final String MISS = "miss"; + + private final Tracer tracer; + private final Scope scope; + + public CompletionListener(final Tracer tracer, String methodName, boolean async) { + this.tracer = tracer; + scope = buildSpan(getOperationName(methodName), async); + } + + private Scope buildSpan(String operation, boolean async) { + final Tracer.SpanBuilder spanBuilder = + tracer + .buildSpan(OPERATION_NAME) + .withTag(DDTags.SERVICE_NAME, SERVICE_NAME) + .withTag(DDTags.RESOURCE_NAME, operation) + .withTag(DDTags.SPAN_TYPE, SPAN_TYPE) + .withTag(Tags.COMPONENT.getKey(), COMPONENT_NAME) + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT) + .withTag(Tags.DB_TYPE.getKey(), DB_TYPE); + + Scope scope = spanBuilder.startActive(false); + if (async) { + scope.close(); + } + return scope; + } + + protected void closeAsyncSpan(T future) { + Span span = scope.span(); + try { + processResult(span, future); + } catch (CancellationException e) { + span.setTag(DB_COMMAND_CANCELLED, true); + } catch (ExecutionException e) { + if (e.getCause() instanceof CancellationException) { + // Looks like underlying OperationFuture wraps CancellationException into ExecutionException + span.setTag(DB_COMMAND_CANCELLED, true); + } else { + Tags.ERROR.set(span, Boolean.TRUE); + span.log(Collections.singletonMap(ERROR_OBJECT, e.getCause())); + } + } catch (InterruptedException e) { + // Avoid swallowing InterruptedException + Tags.ERROR.set(span, Boolean.TRUE); + span.log(Collections.singletonMap(ERROR_OBJECT, e)); + Thread.currentThread().interrupt(); + } catch (Exception e) { + // This should never happen, just in case to make sure we cover all unexpected exceptions + Tags.ERROR.set(span, Boolean.TRUE); + span.log(Collections.singletonMap(ERROR_OBJECT, e)); + } finally { + span.finish(); + } + } + + protected void closeSyncSpan(Throwable thrown) { + Span span = scope.span(); + + if (thrown != null) { + Tags.ERROR.set(span, Boolean.TRUE); + span.log(Collections.singletonMap(ERROR_OBJECT, thrown)); + } + + span.finish(); + scope.close(); + } + + protected void processResult(Span span, T future) + throws ExecutionException, InterruptedException { + throw new UnsupportedOperationException("processResult was not implemented"); + } + + protected void setResultTag(Span span, boolean hit) { + span.setTag(MEMCACHED_RESULT, hit ? HIT : MISS); + } + + private static String getOperationName(String methodName) { + char chars[] = + methodName + .replaceFirst("^async", "") + // 'CAS' name is special, we have to lowercase whole name + .replaceFirst("^CAS", "cas") + .toCharArray(); + + // Lowercase first letter + chars[0] = Character.toLowerCase(chars[0]); + return new String(chars); + } +} diff --git a/dd-java-agent/instrumentation/spymemcached-2.12/src/main/java/datadog/trace/instrumentation/spymemcached/GetCompletionListener.java b/dd-java-agent/instrumentation/spymemcached-2.12/src/main/java/datadog/trace/instrumentation/spymemcached/GetCompletionListener.java new file mode 100644 index 0000000000..3c96606bec --- /dev/null +++ b/dd-java-agent/instrumentation/spymemcached-2.12/src/main/java/datadog/trace/instrumentation/spymemcached/GetCompletionListener.java @@ -0,0 +1,25 @@ +package datadog.trace.instrumentation.spymemcached; + +import io.opentracing.Span; +import io.opentracing.Tracer; +import java.util.concurrent.ExecutionException; +import net.spy.memcached.internal.GetFuture; + +public class GetCompletionListener extends CompletionListener> + implements net.spy.memcached.internal.GetCompletionListener { + public GetCompletionListener(Tracer tracer, String methodName) { + super(tracer, methodName, true); + } + + @Override + public void onComplete(GetFuture future) { + closeAsyncSpan(future); + } + + @Override + protected void processResult(Span span, GetFuture future) + throws ExecutionException, InterruptedException { + Object result = future.get(); + setResultTag(span, result != null); + } +} diff --git a/dd-java-agent/instrumentation/spymemcached-2.12/src/main/java/datadog/trace/instrumentation/spymemcached/MemcachedClientInstrumentation.java b/dd-java-agent/instrumentation/spymemcached-2.12/src/main/java/datadog/trace/instrumentation/spymemcached/MemcachedClientInstrumentation.java new file mode 100644 index 0000000000..2f8d531b8f --- /dev/null +++ b/dd-java-agent/instrumentation/spymemcached-2.12/src/main/java/datadog/trace/instrumentation/spymemcached/MemcachedClientInstrumentation.java @@ -0,0 +1,169 @@ +package datadog.trace.instrumentation.spymemcached; + +import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClassWithMethod; +import static net.bytebuddy.matcher.ElementMatchers.*; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.DDAdvice; +import datadog.trace.agent.tooling.DDTransformers; +import datadog.trace.agent.tooling.HelperInjector; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.CallDepthThreadLocalMap; +import io.opentracing.util.GlobalTracer; +import java.lang.reflect.Method; +import net.bytebuddy.agent.builder.AgentBuilder; +import net.bytebuddy.asm.Advice; +import net.spy.memcached.MemcachedClient; +import net.spy.memcached.internal.BulkFuture; +import net.spy.memcached.internal.GetFuture; +import net.spy.memcached.internal.OperationFuture; + +@AutoService(Instrumenter.class) +public final class MemcachedClientInstrumentation extends Instrumenter.Configurable { + + private static final String MEMCACHED_PACKAGE = "net.spy.memcached"; + private static final String HELPERS_PACKAGE = + MemcachedClientInstrumentation.class.getPackage().getName(); + + public static final HelperInjector HELPER_INJECTOR = + new HelperInjector( + HELPERS_PACKAGE + ".CompletionListener", + HELPERS_PACKAGE + ".GetCompletionListener", + HELPERS_PACKAGE + ".OperationCompletionListener", + HELPERS_PACKAGE + ".BulkGetCompletionListener"); + + public MemcachedClientInstrumentation() { + super("spymemcached"); + } + + @Override + public AgentBuilder apply(final AgentBuilder agentBuilder) { + return agentBuilder + .type( + named(MEMCACHED_PACKAGE + ".MemcachedClient"), + // Target 2.12 that has this method + classLoaderHasClassWithMethod( + MEMCACHED_PACKAGE + ".ConnectionFactoryBuilder", + "setListenerExecutorService", + "java.util.concurrent.ExecutorService")) + .transform(HELPER_INJECTOR) + .transform(DDTransformers.defaultTransformers()) + .transform( + DDAdvice.create() + .advice( + isMethod() + .and(isPublic()) + .and(returns(named(MEMCACHED_PACKAGE + ".internal.OperationFuture"))) + /* + Flush seems to have a bug when listeners may not be always called. + Also tracing flush is probably of a very limited value. + */ + .and(not(named("flush"))), + AsyncOperationAdvice.class.getName())) + .transform( + DDAdvice.create() + .advice( + isMethod() + .and(isPublic()) + .and(returns(named(MEMCACHED_PACKAGE + ".internal.GetFuture"))), + AsyncGetAdvice.class.getName())) + .transform( + DDAdvice.create() + .advice( + isMethod() + .and(isPublic()) + .and(returns(named(MEMCACHED_PACKAGE + ".internal.BulkFuture"))), + AsyncBulkAdvice.class.getName())) + .transform( + DDAdvice.create() + .advice( + isMethod().and(isPublic()).and(named("incr").or(named("decr"))), + SyncOperationAdvice.class.getName())) + .asDecorator(); + } + + public static class AsyncOperationAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static Boolean methodEnter() { + return CallDepthThreadLocalMap.incrementCallDepth(MemcachedClient.class) <= 0; + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void methodExit( + @Advice.Enter final Boolean shouldInjectListener, + @Advice.Origin final Method method, + @Advice.Return final OperationFuture future) { + if (shouldInjectListener) { + OperationCompletionListener listener = + new OperationCompletionListener(GlobalTracer.get(), method.getName()); + future.addListener(listener); + CallDepthThreadLocalMap.reset(MemcachedClient.class); + } + } + } + + public static class AsyncGetAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static Boolean methodEnter() { + return CallDepthThreadLocalMap.incrementCallDepth(MemcachedClient.class) <= 0; + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void methodExit( + @Advice.Enter final Boolean shouldInjectListener, + @Advice.Origin final Method method, + @Advice.Return final GetFuture future) { + if (shouldInjectListener) { + GetCompletionListener listener = + new GetCompletionListener(GlobalTracer.get(), method.getName()); + future.addListener(listener); + CallDepthThreadLocalMap.reset(MemcachedClient.class); + } + } + } + + public static class AsyncBulkAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static Boolean methodEnter() { + return CallDepthThreadLocalMap.incrementCallDepth(MemcachedClient.class) <= 0; + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void methodExit( + @Advice.Enter final Boolean shouldInjectListener, + @Advice.Origin final Method method, + @Advice.Return final BulkFuture future) { + if (shouldInjectListener) { + BulkGetCompletionListener listener = + new BulkGetCompletionListener(GlobalTracer.get(), method.getName()); + future.addListener(listener); + CallDepthThreadLocalMap.reset(MemcachedClient.class); + } + } + } + + public static class SyncOperationAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static SyncCompletionListener methodEnter(@Advice.Origin final Method method) { + if (CallDepthThreadLocalMap.incrementCallDepth(MemcachedClient.class) <= 0) { + return new SyncCompletionListener(GlobalTracer.get(), method.getName()); + } else { + return null; + } + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void methodExit( + @Advice.Enter final SyncCompletionListener listener, + @Advice.Thrown final Throwable thrown) { + if (listener != null) { + listener.done(thrown); + CallDepthThreadLocalMap.reset(MemcachedClient.class); + } + } + } +} diff --git a/dd-java-agent/instrumentation/spymemcached-2.12/src/main/java/datadog/trace/instrumentation/spymemcached/OperationCompletionListener.java b/dd-java-agent/instrumentation/spymemcached-2.12/src/main/java/datadog/trace/instrumentation/spymemcached/OperationCompletionListener.java new file mode 100644 index 0000000000..7687c136c5 --- /dev/null +++ b/dd-java-agent/instrumentation/spymemcached-2.12/src/main/java/datadog/trace/instrumentation/spymemcached/OperationCompletionListener.java @@ -0,0 +1,25 @@ +package datadog.trace.instrumentation.spymemcached; + +import io.opentracing.Span; +import io.opentracing.Tracer; +import java.util.concurrent.ExecutionException; +import net.spy.memcached.internal.OperationFuture; + +public class OperationCompletionListener + extends CompletionListener> + implements net.spy.memcached.internal.OperationCompletionListener { + public OperationCompletionListener(Tracer tracer, String methodName) { + super(tracer, methodName, true); + } + + @Override + public void onComplete(OperationFuture future) { + closeAsyncSpan(future); + } + + @Override + protected void processResult(Span span, OperationFuture future) + throws ExecutionException, InterruptedException { + future.get(); + } +} diff --git a/dd-java-agent/instrumentation/spymemcached-2.12/src/main/java/datadog/trace/instrumentation/spymemcached/SyncCompletionListener.java b/dd-java-agent/instrumentation/spymemcached-2.12/src/main/java/datadog/trace/instrumentation/spymemcached/SyncCompletionListener.java new file mode 100644 index 0000000000..a3fe5711c5 --- /dev/null +++ b/dd-java-agent/instrumentation/spymemcached-2.12/src/main/java/datadog/trace/instrumentation/spymemcached/SyncCompletionListener.java @@ -0,0 +1,13 @@ +package datadog.trace.instrumentation.spymemcached; + +import io.opentracing.Tracer; + +public class SyncCompletionListener extends CompletionListener { + public SyncCompletionListener(Tracer tracer, String methodName) { + super(tracer, methodName, false); + } + + public void done(Throwable thrown) { + closeSyncSpan(thrown); + } +} diff --git a/dd-java-agent/instrumentation/spymemcached-2.12/src/test/groovy/datadog/trace/instrumentation/spymemcached/SpymemcachedTest.groovy b/dd-java-agent/instrumentation/spymemcached-2.12/src/test/groovy/datadog/trace/instrumentation/spymemcached/SpymemcachedTest.groovy index 8de399653a..0c742754ad 100644 --- a/dd-java-agent/instrumentation/spymemcached-2.12/src/test/groovy/datadog/trace/instrumentation/spymemcached/SpymemcachedTest.groovy +++ b/dd-java-agent/instrumentation/spymemcached-2.12/src/test/groovy/datadog/trace/instrumentation/spymemcached/SpymemcachedTest.groovy @@ -1,28 +1,58 @@ package datadog.trace.instrumentation.spymemcached +import com.google.common.util.concurrent.MoreExecutors import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.agent.test.TraceAssert +import datadog.trace.api.DDTags +import io.opentracing.tag.Tags +import net.spy.memcached.CASResponse +import net.spy.memcached.ConnectionFactory +import net.spy.memcached.ConnectionFactoryBuilder +import net.spy.memcached.DefaultConnectionFactory import net.spy.memcached.MemcachedClient +import net.spy.memcached.internal.CheckedOperationTimeoutException +import net.spy.memcached.ops.Operation +import net.spy.memcached.ops.OperationQueueFactory import org.testcontainers.containers.GenericContainer + import spock.lang.Shared +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import java.util.concurrent.ExecutorService +import java.util.concurrent.locks.ReentrantLock + +import static datadog.trace.agent.test.ListWriterAssert.assertTraces +import static CompletionListener.COMPONENT_NAME +import static CompletionListener.OPERATION_NAME +import static CompletionListener.SERVICE_NAME +import static CompletionListener.SPAN_TYPE +import static datadog.trace.agent.test.TestUtils.runUnderTrace +import static net.spy.memcached.ConnectionFactoryBuilder.Protocol.BINARY + class SpymemcachedTest extends AgentTestRunner { + @Shared + def parentOperation = "parent-span" + @Shared + def expiration = 3600 + @Shared + def keyPrefix = "SpymemcachedTest-" + (Math.abs(new Random().nextInt())) + "-" @Shared def defaultMemcachedPort = 11211 @Shared + def timingOutMemcachedOpTimeout = 1000 + /* Note: type here has to stay undefined, otherwise tests will fail in CI in Java 7 because 'testcontainers' are built for Java 8 and Java 7 cannot load this class. */ - def memcachedContainer - @Shared - MemcachedClient memcached + def memcachedContainer + @Shared + InetSocketAddress memcachedAddress = new InetSocketAddress("127.0.0.1", defaultMemcachedPort) def setupSpec() { - // Setup default hostname and port - String ip = "127.0.0.1" - int port = defaultMemcachedPort /* CI will provide us with memcached container running along side our build. @@ -33,11 +63,11 @@ class SpymemcachedTest extends AgentTestRunner { memcachedContainer = new GenericContainer('memcached:latest') .withExposedPorts(defaultMemcachedPort) memcachedContainer.start() - ip = memcachedContainer.containerIpAddress - port = memcachedContainer.getMappedPort(defaultMemcachedPort) + memcachedAddress = new InetSocketAddress( + memcachedContainer.containerIpAddress, + memcachedContainer.getMappedPort(defaultMemcachedPort) + ) } - - memcached = new MemcachedClient(new InetSocketAddress(ip, port)) } def cleanupSpec() { @@ -46,11 +76,588 @@ class SpymemcachedTest extends AgentTestRunner { } } - def "command with no arguments"() { + ReentrantLock queueLock + MemcachedClient memcached + MemcachedClient locableMemcached + MemcachedClient timingoutMemcached + + def setup() { + queueLock = new ReentrantLock() + + // Use direct executor service so our listeners finish in deterministic order + ExecutorService listenerExecutorService = MoreExecutors.newDirectExecutorService() + + ConnectionFactory connectionFactory = (new ConnectionFactoryBuilder()) + .setListenerExecutorService(listenerExecutorService) + .setProtocol(BINARY) + .build() + memcached = new MemcachedClient(connectionFactory, Arrays.asList(memcachedAddress)) + + def lockableQueueFactory = new OperationQueueFactory() { + @Override + BlockingQueue create() { + return getLockableQueue(queueLock) + } + } + + ConnectionFactory lockableConnectionFactory = (new ConnectionFactoryBuilder()) + .setListenerExecutorService(listenerExecutorService) + .setProtocol(BINARY) + .setOpQueueFactory(lockableQueueFactory) + .build() + locableMemcached = new MemcachedClient(lockableConnectionFactory, Arrays.asList(memcachedAddress)) + + ConnectionFactory timingoutConnectionFactory = (new ConnectionFactoryBuilder()) + .setListenerExecutorService(listenerExecutorService) + .setProtocol(BINARY) + .setOpQueueFactory(lockableQueueFactory) + .setOpTimeout(timingOutMemcachedOpTimeout) + .build() + timingoutMemcached = new MemcachedClient(timingoutConnectionFactory, Arrays.asList(memcachedAddress)) + + // Add some keys to test on later: + def valuesToSet = [ + "test-get": "get test", + "test-get-2": "get test 2", + "test-append": "append test", + "test-prepend": "prepend test", + "test-delete": "delete test", + "test-replace": "replace test", + "test-touch": "touch test", + "test-cas": "cas test", + "test-decr": "200", + "test-incr": "100" + ] + runUnderTrace("setup") { + valuesToSet.each { k, v -> assert memcached.set(key(k), expiration, v).get() } + } + TEST_WRITER.waitForTraces(1) + TEST_WRITER.clear() + } + + def "test get hit"() { when: - memcached.set("foo", 3600, "bar").get() + runUnderTrace(parentOperation) { + assert "get test" == memcached.get(key("test-get")) + } then: - memcached.get("foo") == "bar" + assertTraces(TEST_WRITER, 1) { + trace(0, 2) { + getParentSpan(it, 0) + getSpan(it, 1, "get", null,"hit") + } + } + } + + def "test get miss"() { + when: + runUnderTrace(parentOperation) { + assert null == memcached.get(key("test-get-key-that-doesn't-exist")) + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 2) { + getParentSpan(it, 0) + getSpan(it, 1, "get", null,"miss") + } + } + } + + def "test get cancel"() { + when: + runUnderTrace(parentOperation) { + queueLock.lock() + locableMemcached.asyncGet(key("test-get")).cancel(true) + queueLock.unlock() + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 2) { + getParentSpan(it, 0) + getSpan(it, 1, "get", "canceled") + } + } + } + + def "test get timeout"() { + when: + /* + Not using runUnderTrace since timeouts happen in separate thread + and direct executor doesn't help to make sure that parent span finishes last. + Instead run without parent span to have only 1 span to test with. + */ + try { + queueLock.lock() + timingoutMemcached.asyncGet(key("test-get")) + Thread.sleep(timingOutMemcachedOpTimeout + 1000) + } finally { + queueLock.unlock() + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 1) { + getSpan(it, 0, "get", "timeout") + } + } + } + + def "test bulk get"() { + when: + runUnderTrace(parentOperation) { + def expected = [(key("test-get")): "get test", (key("test-get-2")): "get test 2"] + assert expected == memcached.getBulk(key("test-get"), key("test-get-2")) + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 2) { + getParentSpan(it, 0) + getSpan(it, 1, "getBulk", null,null) + } + } + } + + def "test set"() { + when: + runUnderTrace(parentOperation) { + assert memcached.set(key("test-set"), expiration, "bar").get() + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 2) { + getParentSpan(it, 0) + getSpan(it, 1, "set") + } + } + } + + def "test set cancel"() { + when: + runUnderTrace(parentOperation) { + queueLock.lock() + assert locableMemcached.set(key("test-set-cancel"), expiration, "bar").cancel() + queueLock.unlock() + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 2) { + getParentSpan(it, 0) + getSpan(it, 1, "set", "canceled") + } + } + } + + def "test add"() { + when: + runUnderTrace(parentOperation) { + assert memcached.add(key("test-add"), expiration, "add bar").get() + assert "add bar" == memcached.get(key("test-add")) + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 3) { + getParentSpan(it, 0) + getSpan(it, 1, "get", null,"hit") + getSpan(it, 2, "add") + } + } + } + + def "test second add"() { + when: + runUnderTrace(parentOperation) { + assert memcached.add(key("test-add-2"), expiration, "add bar").get() + assert !memcached.add(key("test-add-2"), expiration, "add bar 123").get() + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 3) { + getParentSpan(it, 0) + getSpan(it, 1, "add") + getSpan(it, 2, "add") + } + } + } + + def "test delete"() { + when: + runUnderTrace(parentOperation) { + assert memcached.delete(key("test-delete")).get() + assert null == memcached.get(key("test-delete")) + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 3) { + getParentSpan(it, 0) + getSpan(it, 1, "get", null,"miss") + getSpan(it, 2, "delete") + } + } + } + + def "test delete non existent"() { + when: + runUnderTrace(parentOperation) { + assert !memcached.delete(key("test-delete-non-existent")).get() + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 2) { + getParentSpan(it, 0) + getSpan(it, 1, "delete") + } + } + } + + def "test replace"() { + when: + runUnderTrace(parentOperation) { + assert memcached.replace(key("test-replace"), expiration, "new value").get() + assert "new value" == memcached.get(key("test-replace")) + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 3) { + getParentSpan(it, 0) + getSpan(it, 1, "get", null,"hit") + getSpan(it, 2, "replace") + } + } + } + + def "test replace non existent"() { + when: + runUnderTrace(parentOperation) { + assert !memcached.replace(key("test-replace-non-existent"), expiration, "new value").get() + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 2) { + getParentSpan(it, 0) + getSpan(it, 1, "replace") + } + } + } + + def "test append"() { + when: + runUnderTrace(parentOperation) { + def cas = memcached.gets(key("test-append")) + assert memcached.append(cas.cas, key("test-append"), " appended").get() + assert "append test appended" == memcached.get(key("test-append")) + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 4) { + getParentSpan(it, 0) + getSpan(it, 1, "get", null,"hit") + getSpan(it, 2, "append") + getSpan(it, 3, "gets") + } + } + } + + def "test prepend"() { + when: + runUnderTrace(parentOperation) { + def cas = memcached.gets(key("test-prepend")) + assert memcached.prepend(cas.cas, key("test-prepend"), "prepended ").get() + assert "prepended prepend test" == memcached.get(key("test-prepend")) + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 4) { + getParentSpan(it, 0) + getSpan(it, 1, "get", null,"hit") + getSpan(it, 2, "prepend") + getSpan(it, 3, "gets") + } + } + } + + def "test cas"() { + when: + runUnderTrace(parentOperation) { + def cas = memcached.gets(key("test-cas")) + assert CASResponse.OK == memcached.cas(key("test-cas"), cas.cas, expiration, "cas bar") + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 3) { + getParentSpan(it, 0) + getSpan(it, 1, "cas") + getSpan(it, 2, "gets") + } + } + } + + def "test cas not found"() { + when: + runUnderTrace(parentOperation) { + assert CASResponse.NOT_FOUND == memcached.cas(key("test-cas-doesnt-exist"), 1234, expiration, "cas bar") + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 2) { + getParentSpan(it, 0) + getSpan(it, 1, "cas") + } + } + } + + def "test touch"() { + when: + runUnderTrace(parentOperation) { + assert memcached.touch(key("test-touch"), expiration).get() + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 2) { + getParentSpan(it, 0) + getSpan(it, 1, "touch") + } + } + } + + def "test touch non existent"() { + when: + runUnderTrace(parentOperation) { + assert !memcached.touch(key("test-touch-non-existent"), expiration).get() + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 2) { + getParentSpan(it, 0) + getSpan(it, 1, "touch") + } + } + } + + def "test get and touch"() { + when: + runUnderTrace(parentOperation) { + assert "touch test" == memcached.getAndTouch(key("test-touch"), expiration).value + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 2) { + getParentSpan(it, 0) + getSpan(it, 1, "getAndTouch") + } + } + } + + def "test get and touch non existent"() { + when: + runUnderTrace(parentOperation) { + assert null == memcached.getAndTouch(key("test-touch-non-existent"), expiration) + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 2) { + getParentSpan(it, 0) + getSpan(it, 1, "getAndTouch") + } + } + } + + def "test decr"() { + when: + runUnderTrace(parentOperation) { + /* + Memcached is funny in the way it handles incr/decr operations: + it needs values to be strings (with digits in them) and it returns actual long from decr/incr + */ + assert 195 == memcached.decr(key("test-decr"), 5) + assert "195" == memcached.get(key("test-decr")) + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 3) { + getParentSpan(it, 0) + getSpan(it, 1, "get", null,"hit") + getSpan(it, 2, "decr") + } + } + } + + def "test decr non existent"() { + when: + runUnderTrace(parentOperation) { + assert -1 == memcached.decr(key("test-decr-non-existent"), 5) + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 2) { + getParentSpan(it, 0) + getSpan(it, 1, "decr") + } + } + } + + def "test decr exception"() { + when: + memcached.decr(key("long key: " + longString()), 5) + + then: + thrown IllegalArgumentException + assertTraces(TEST_WRITER, 1) { + trace(0, 1) { + getSpan(it, 0, "decr", "long key") + } + } + } + + def "test incr"() { + when: + runUnderTrace(parentOperation) { + /* + Memcached is funny in the way it handles incr/decr operations: + it needs values to be strings (with digits in them) and it returns actual long from decr/incr + */ + assert 105 == memcached.incr(key("test-incr"), 5) + assert "105" == memcached.get(key("test-incr")) + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 3) { + getParentSpan(it, 0) + getSpan(it, 1, "get", null,"hit") + getSpan(it, 2, "incr") + } + } + } + + def "test incr non existent"() { + when: + runUnderTrace(parentOperation) { + assert -1 == memcached.incr(key("test-incr-non-existent"), 5) + } + + then: + assertTraces(TEST_WRITER, 1) { + trace(0, 2) { + getParentSpan(it, 0) + getSpan(it, 1, "incr") + } + } + } + + def "test incr exception"() { + when: + memcached.incr(key("long key: " + longString()), 5) + + then: + thrown IllegalArgumentException + assertTraces(TEST_WRITER, 1) { + trace(0, 1) { + getSpan(it, 0, "incr", "long key") + } + } + } + + def key(String k) { + keyPrefix + k + } + + def longString(char c='s' as char) { + char[] chars = new char[250] + Arrays.fill(chars, 's' as char) + return new String(chars) + } + + def getLockableQueue(ReentrantLock queueLock) { + return new ArrayBlockingQueue(DefaultConnectionFactory.DEFAULT_OP_QUEUE_LEN) { + + @Override + int drainTo(Collection c, int maxElements) { + try { + queueLock.lock() + return super.drainTo(c, maxElements) + } finally { + queueLock.unlock() + } + } + } + } + + def getParentSpan(TraceAssert trace, int index) { + return trace.span(index) { + operationName parentOperation + parent() + errored false + tags { + defaultTags() + } + } + } + + def getSpan(TraceAssert trace, int index, String operation, String error=null, String result=null) { + return trace.span(index) { + if (index > 0) { + childOf(trace.span(0)) + } + + serviceName SERVICE_NAME + operationName OPERATION_NAME + resourceName operation + spanType SPAN_TYPE + errored (error != null && error != "canceled") + + tags { + defaultTags() + "${DDTags.SPAN_TYPE}" SPAN_TYPE + "${Tags.COMPONENT.getKey()}" COMPONENT_NAME + "${Tags.SPAN_KIND.getKey()}" Tags.SPAN_KIND_CLIENT + "${Tags.DB_TYPE.getKey()}" CompletionListener.DB_TYPE + + if (error == "canceled") { + "${CompletionListener.DB_COMMAND_CANCELLED}" true + } + + if (error == "timeout") { + errorTags( + CheckedOperationTimeoutException, + "Operation timed out. - failing node: ${memcachedAddress.address}:${memcachedAddress.port}") + } + + if (error == "long key") { + errorTags( + IllegalArgumentException, + "Key is too long (maxlen = 250)") + } + + if (result == "hit") { + "${CompletionListener.MEMCACHED_RESULT}" CompletionListener.HIT + } + + if (result == "miss") { + "${CompletionListener.MEMCACHED_RESULT}" CompletionListener.MISS + } + } + } } }