Spymemcached: add instumentation
This commit is contained in:
parent
63c4cdff87
commit
dafd52cb7a
|
@ -1,6 +1,25 @@
|
|||
apply plugin: 'version-scan'
|
||||
|
||||
versionScan {
|
||||
group = "net.spy"
|
||||
module = 'spymemcached'
|
||||
versions = "[1.12.0,)"
|
||||
scanMethods = true
|
||||
verifyPresent = [
|
||||
"net.spy.memcached.ConnectionFactoryBuilder": "setListenerExecutorService",
|
||||
]
|
||||
}
|
||||
|
||||
apply from: "${rootDir}/gradle/java.gradle"
|
||||
|
||||
apply plugin: 'org.unbroken-dome.test-sets'
|
||||
|
||||
testSets {
|
||||
latestDepTest {
|
||||
dirName = 'test'
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
compileOnly group: 'net.spy', name: 'spymemcached', version: '2.12.0'
|
||||
|
||||
|
@ -13,11 +32,14 @@ dependencies {
|
|||
|
||||
testCompile project(':dd-java-agent:testing')
|
||||
|
||||
// Include servlet instrumentation for verifying the tomcat requests
|
||||
testCompile project(':dd-java-agent:instrumentation:servlet-3')
|
||||
|
||||
|
||||
testCompile group: 'net.spy', name: 'spymemcached', version: '2.12.0'
|
||||
testCompile group: 'org.spockframework', name:'spock-core', version:'1.1-groovy-2.4'
|
||||
testCompile group: 'org.testcontainers', name:'testcontainers', version:'1.7.3'
|
||||
}
|
||||
|
||||
configurations.latestDepTestCompile {
|
||||
resolutionStrategy {
|
||||
force group: 'net.spy', name: 'spymemcached', version: '+'
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
package datadog.trace.instrumentation.spymemcached;
|
||||
|
||||
import io.opentracing.Span;
|
||||
import io.opentracing.Tracer;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import net.spy.memcached.internal.*;
|
||||
|
||||
public class BulkGetCompletionListener extends CompletionListener<BulkGetFuture<?>>
|
||||
implements net.spy.memcached.internal.BulkGetCompletionListener {
|
||||
public BulkGetCompletionListener(Tracer tracer, String methodName) {
|
||||
super(tracer, methodName, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(BulkGetFuture<?> future) {
|
||||
closeAsyncSpan(future);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processResult(Span span, BulkGetFuture<?> future)
|
||||
throws ExecutionException, InterruptedException {
|
||||
/*
|
||||
Note: for now we do not have an affective way of representing results of bulk operations,
|
||||
i.e. we cannot day that we got 4 hits out of 10. So we will just ignore results for now.
|
||||
*/
|
||||
future.get();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,122 @@
|
|||
package datadog.trace.instrumentation.spymemcached;
|
||||
|
||||
import static io.opentracing.log.Fields.ERROR_OBJECT;
|
||||
|
||||
import datadog.trace.api.DDTags;
|
||||
import io.opentracing.Scope;
|
||||
import io.opentracing.Span;
|
||||
import io.opentracing.Tracer;
|
||||
import io.opentracing.tag.Tags;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public abstract class CompletionListener<T> {
|
||||
|
||||
// Note: it looks like this value is being ignored and DBTypeDecorator overwrites it.
|
||||
static final String OPERATION_NAME = "memcached.query";
|
||||
|
||||
static final String SERVICE_NAME = "memcached";
|
||||
/*
|
||||
Ideally this should be "spymemcached" or something along those lines.
|
||||
Unfortunately nondeterministic interaction between SpanTypeDecorator and DBTypeDecorator
|
||||
pretty much forces this to be "sql".
|
||||
*/
|
||||
static final String SPAN_TYPE = "sql";
|
||||
static final String COMPONENT_NAME = "java-spymemcached";
|
||||
static final String DB_TYPE = "memcached";
|
||||
static final String DB_COMMAND_CANCELLED = "db.command.cancelled";
|
||||
static final String MEMCACHED_RESULT = "memcaced.result";
|
||||
static final String HIT = "hit";
|
||||
static final String MISS = "miss";
|
||||
|
||||
private final Tracer tracer;
|
||||
private final Scope scope;
|
||||
|
||||
public CompletionListener(final Tracer tracer, String methodName, boolean async) {
|
||||
this.tracer = tracer;
|
||||
scope = buildSpan(getOperationName(methodName), async);
|
||||
}
|
||||
|
||||
private Scope buildSpan(String operation, boolean async) {
|
||||
final Tracer.SpanBuilder spanBuilder =
|
||||
tracer
|
||||
.buildSpan(OPERATION_NAME)
|
||||
.withTag(DDTags.SERVICE_NAME, SERVICE_NAME)
|
||||
.withTag(DDTags.RESOURCE_NAME, operation)
|
||||
.withTag(DDTags.SPAN_TYPE, SPAN_TYPE)
|
||||
.withTag(Tags.COMPONENT.getKey(), COMPONENT_NAME)
|
||||
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
|
||||
.withTag(Tags.DB_TYPE.getKey(), DB_TYPE);
|
||||
|
||||
Scope scope = spanBuilder.startActive(false);
|
||||
if (async) {
|
||||
scope.close();
|
||||
}
|
||||
return scope;
|
||||
}
|
||||
|
||||
protected void closeAsyncSpan(T future) {
|
||||
Span span = scope.span();
|
||||
try {
|
||||
processResult(span, future);
|
||||
} catch (CancellationException e) {
|
||||
span.setTag(DB_COMMAND_CANCELLED, true);
|
||||
} catch (ExecutionException e) {
|
||||
if (e.getCause() instanceof CancellationException) {
|
||||
// Looks like underlying OperationFuture wraps CancellationException into ExecutionException
|
||||
span.setTag(DB_COMMAND_CANCELLED, true);
|
||||
} else {
|
||||
Tags.ERROR.set(span, Boolean.TRUE);
|
||||
span.log(Collections.singletonMap(ERROR_OBJECT, e.getCause()));
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// Avoid swallowing InterruptedException
|
||||
Tags.ERROR.set(span, Boolean.TRUE);
|
||||
span.log(Collections.singletonMap(ERROR_OBJECT, e));
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Exception e) {
|
||||
// This should never happen, just in case to make sure we cover all unexpected exceptions
|
||||
Tags.ERROR.set(span, Boolean.TRUE);
|
||||
span.log(Collections.singletonMap(ERROR_OBJECT, e));
|
||||
} finally {
|
||||
span.finish();
|
||||
}
|
||||
}
|
||||
|
||||
protected void closeSyncSpan(Throwable thrown) {
|
||||
Span span = scope.span();
|
||||
|
||||
if (thrown != null) {
|
||||
Tags.ERROR.set(span, Boolean.TRUE);
|
||||
span.log(Collections.singletonMap(ERROR_OBJECT, thrown));
|
||||
}
|
||||
|
||||
span.finish();
|
||||
scope.close();
|
||||
}
|
||||
|
||||
protected void processResult(Span span, T future)
|
||||
throws ExecutionException, InterruptedException {
|
||||
throw new UnsupportedOperationException("processResult was not implemented");
|
||||
}
|
||||
|
||||
protected void setResultTag(Span span, boolean hit) {
|
||||
span.setTag(MEMCACHED_RESULT, hit ? HIT : MISS);
|
||||
}
|
||||
|
||||
private static String getOperationName(String methodName) {
|
||||
char chars[] =
|
||||
methodName
|
||||
.replaceFirst("^async", "")
|
||||
// 'CAS' name is special, we have to lowercase whole name
|
||||
.replaceFirst("^CAS", "cas")
|
||||
.toCharArray();
|
||||
|
||||
// Lowercase first letter
|
||||
chars[0] = Character.toLowerCase(chars[0]);
|
||||
return new String(chars);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
package datadog.trace.instrumentation.spymemcached;
|
||||
|
||||
import io.opentracing.Span;
|
||||
import io.opentracing.Tracer;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import net.spy.memcached.internal.GetFuture;
|
||||
|
||||
public class GetCompletionListener extends CompletionListener<GetFuture<?>>
|
||||
implements net.spy.memcached.internal.GetCompletionListener {
|
||||
public GetCompletionListener(Tracer tracer, String methodName) {
|
||||
super(tracer, methodName, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(GetFuture<?> future) {
|
||||
closeAsyncSpan(future);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processResult(Span span, GetFuture<?> future)
|
||||
throws ExecutionException, InterruptedException {
|
||||
Object result = future.get();
|
||||
setResultTag(span, result != null);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,169 @@
|
|||
package datadog.trace.instrumentation.spymemcached;
|
||||
|
||||
import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClassWithMethod;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.*;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import datadog.trace.agent.tooling.DDAdvice;
|
||||
import datadog.trace.agent.tooling.DDTransformers;
|
||||
import datadog.trace.agent.tooling.HelperInjector;
|
||||
import datadog.trace.agent.tooling.Instrumenter;
|
||||
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
|
||||
import io.opentracing.util.GlobalTracer;
|
||||
import java.lang.reflect.Method;
|
||||
import net.bytebuddy.agent.builder.AgentBuilder;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.spy.memcached.MemcachedClient;
|
||||
import net.spy.memcached.internal.BulkFuture;
|
||||
import net.spy.memcached.internal.GetFuture;
|
||||
import net.spy.memcached.internal.OperationFuture;
|
||||
|
||||
@AutoService(Instrumenter.class)
|
||||
public final class MemcachedClientInstrumentation extends Instrumenter.Configurable {
|
||||
|
||||
private static final String MEMCACHED_PACKAGE = "net.spy.memcached";
|
||||
private static final String HELPERS_PACKAGE =
|
||||
MemcachedClientInstrumentation.class.getPackage().getName();
|
||||
|
||||
public static final HelperInjector HELPER_INJECTOR =
|
||||
new HelperInjector(
|
||||
HELPERS_PACKAGE + ".CompletionListener",
|
||||
HELPERS_PACKAGE + ".GetCompletionListener",
|
||||
HELPERS_PACKAGE + ".OperationCompletionListener",
|
||||
HELPERS_PACKAGE + ".BulkGetCompletionListener");
|
||||
|
||||
public MemcachedClientInstrumentation() {
|
||||
super("spymemcached");
|
||||
}
|
||||
|
||||
@Override
|
||||
public AgentBuilder apply(final AgentBuilder agentBuilder) {
|
||||
return agentBuilder
|
||||
.type(
|
||||
named(MEMCACHED_PACKAGE + ".MemcachedClient"),
|
||||
// Target 2.12 that has this method
|
||||
classLoaderHasClassWithMethod(
|
||||
MEMCACHED_PACKAGE + ".ConnectionFactoryBuilder",
|
||||
"setListenerExecutorService",
|
||||
"java.util.concurrent.ExecutorService"))
|
||||
.transform(HELPER_INJECTOR)
|
||||
.transform(DDTransformers.defaultTransformers())
|
||||
.transform(
|
||||
DDAdvice.create()
|
||||
.advice(
|
||||
isMethod()
|
||||
.and(isPublic())
|
||||
.and(returns(named(MEMCACHED_PACKAGE + ".internal.OperationFuture")))
|
||||
/*
|
||||
Flush seems to have a bug when listeners may not be always called.
|
||||
Also tracing flush is probably of a very limited value.
|
||||
*/
|
||||
.and(not(named("flush"))),
|
||||
AsyncOperationAdvice.class.getName()))
|
||||
.transform(
|
||||
DDAdvice.create()
|
||||
.advice(
|
||||
isMethod()
|
||||
.and(isPublic())
|
||||
.and(returns(named(MEMCACHED_PACKAGE + ".internal.GetFuture"))),
|
||||
AsyncGetAdvice.class.getName()))
|
||||
.transform(
|
||||
DDAdvice.create()
|
||||
.advice(
|
||||
isMethod()
|
||||
.and(isPublic())
|
||||
.and(returns(named(MEMCACHED_PACKAGE + ".internal.BulkFuture"))),
|
||||
AsyncBulkAdvice.class.getName()))
|
||||
.transform(
|
||||
DDAdvice.create()
|
||||
.advice(
|
||||
isMethod().and(isPublic()).and(named("incr").or(named("decr"))),
|
||||
SyncOperationAdvice.class.getName()))
|
||||
.asDecorator();
|
||||
}
|
||||
|
||||
public static class AsyncOperationAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static Boolean methodEnter() {
|
||||
return CallDepthThreadLocalMap.incrementCallDepth(MemcachedClient.class) <= 0;
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(suppress = Throwable.class)
|
||||
public static void methodExit(
|
||||
@Advice.Enter final Boolean shouldInjectListener,
|
||||
@Advice.Origin final Method method,
|
||||
@Advice.Return final OperationFuture future) {
|
||||
if (shouldInjectListener) {
|
||||
OperationCompletionListener listener =
|
||||
new OperationCompletionListener(GlobalTracer.get(), method.getName());
|
||||
future.addListener(listener);
|
||||
CallDepthThreadLocalMap.reset(MemcachedClient.class);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class AsyncGetAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static Boolean methodEnter() {
|
||||
return CallDepthThreadLocalMap.incrementCallDepth(MemcachedClient.class) <= 0;
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(suppress = Throwable.class)
|
||||
public static void methodExit(
|
||||
@Advice.Enter final Boolean shouldInjectListener,
|
||||
@Advice.Origin final Method method,
|
||||
@Advice.Return final GetFuture future) {
|
||||
if (shouldInjectListener) {
|
||||
GetCompletionListener listener =
|
||||
new GetCompletionListener(GlobalTracer.get(), method.getName());
|
||||
future.addListener(listener);
|
||||
CallDepthThreadLocalMap.reset(MemcachedClient.class);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class AsyncBulkAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static Boolean methodEnter() {
|
||||
return CallDepthThreadLocalMap.incrementCallDepth(MemcachedClient.class) <= 0;
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(suppress = Throwable.class)
|
||||
public static void methodExit(
|
||||
@Advice.Enter final Boolean shouldInjectListener,
|
||||
@Advice.Origin final Method method,
|
||||
@Advice.Return final BulkFuture future) {
|
||||
if (shouldInjectListener) {
|
||||
BulkGetCompletionListener listener =
|
||||
new BulkGetCompletionListener(GlobalTracer.get(), method.getName());
|
||||
future.addListener(listener);
|
||||
CallDepthThreadLocalMap.reset(MemcachedClient.class);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class SyncOperationAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static SyncCompletionListener methodEnter(@Advice.Origin final Method method) {
|
||||
if (CallDepthThreadLocalMap.incrementCallDepth(MemcachedClient.class) <= 0) {
|
||||
return new SyncCompletionListener(GlobalTracer.get(), method.getName());
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void methodExit(
|
||||
@Advice.Enter final SyncCompletionListener listener,
|
||||
@Advice.Thrown final Throwable thrown) {
|
||||
if (listener != null) {
|
||||
listener.done(thrown);
|
||||
CallDepthThreadLocalMap.reset(MemcachedClient.class);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
package datadog.trace.instrumentation.spymemcached;
|
||||
|
||||
import io.opentracing.Span;
|
||||
import io.opentracing.Tracer;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import net.spy.memcached.internal.OperationFuture;
|
||||
|
||||
public class OperationCompletionListener
|
||||
extends CompletionListener<OperationFuture<? extends Object>>
|
||||
implements net.spy.memcached.internal.OperationCompletionListener {
|
||||
public OperationCompletionListener(Tracer tracer, String methodName) {
|
||||
super(tracer, methodName, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(OperationFuture<? extends Object> future) {
|
||||
closeAsyncSpan(future);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processResult(Span span, OperationFuture<? extends Object> future)
|
||||
throws ExecutionException, InterruptedException {
|
||||
future.get();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
package datadog.trace.instrumentation.spymemcached;
|
||||
|
||||
import io.opentracing.Tracer;
|
||||
|
||||
public class SyncCompletionListener extends CompletionListener<Void> {
|
||||
public SyncCompletionListener(Tracer tracer, String methodName) {
|
||||
super(tracer, methodName, false);
|
||||
}
|
||||
|
||||
public void done(Throwable thrown) {
|
||||
closeSyncSpan(thrown);
|
||||
}
|
||||
}
|
|
@ -1,28 +1,58 @@
|
|||
package datadog.trace.instrumentation.spymemcached
|
||||
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import datadog.trace.agent.test.AgentTestRunner
|
||||
import datadog.trace.agent.test.TraceAssert
|
||||
import datadog.trace.api.DDTags
|
||||
import io.opentracing.tag.Tags
|
||||
import net.spy.memcached.CASResponse
|
||||
import net.spy.memcached.ConnectionFactory
|
||||
import net.spy.memcached.ConnectionFactoryBuilder
|
||||
import net.spy.memcached.DefaultConnectionFactory
|
||||
import net.spy.memcached.MemcachedClient
|
||||
import net.spy.memcached.internal.CheckedOperationTimeoutException
|
||||
import net.spy.memcached.ops.Operation
|
||||
import net.spy.memcached.ops.OperationQueueFactory
|
||||
import org.testcontainers.containers.GenericContainer
|
||||
|
||||
import spock.lang.Shared
|
||||
|
||||
import java.util.concurrent.ArrayBlockingQueue
|
||||
import java.util.concurrent.BlockingQueue
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
|
||||
import static datadog.trace.agent.test.ListWriterAssert.assertTraces
|
||||
import static CompletionListener.COMPONENT_NAME
|
||||
import static CompletionListener.OPERATION_NAME
|
||||
import static CompletionListener.SERVICE_NAME
|
||||
import static CompletionListener.SPAN_TYPE
|
||||
import static datadog.trace.agent.test.TestUtils.runUnderTrace
|
||||
import static net.spy.memcached.ConnectionFactoryBuilder.Protocol.BINARY
|
||||
|
||||
class SpymemcachedTest extends AgentTestRunner {
|
||||
|
||||
@Shared
|
||||
def parentOperation = "parent-span"
|
||||
@Shared
|
||||
def expiration = 3600
|
||||
@Shared
|
||||
def keyPrefix = "SpymemcachedTest-" + (Math.abs(new Random().nextInt())) + "-"
|
||||
@Shared
|
||||
def defaultMemcachedPort = 11211
|
||||
@Shared
|
||||
def timingOutMemcachedOpTimeout = 1000
|
||||
|
||||
/*
|
||||
Note: type here has to stay undefined, otherwise tests will fail in CI in Java 7 because
|
||||
'testcontainers' are built for Java 8 and Java 7 cannot load this class.
|
||||
*/
|
||||
def memcachedContainer
|
||||
|
||||
@Shared
|
||||
MemcachedClient memcached
|
||||
def memcachedContainer
|
||||
@Shared
|
||||
InetSocketAddress memcachedAddress = new InetSocketAddress("127.0.0.1", defaultMemcachedPort)
|
||||
|
||||
def setupSpec() {
|
||||
// Setup default hostname and port
|
||||
String ip = "127.0.0.1"
|
||||
int port = defaultMemcachedPort
|
||||
|
||||
/*
|
||||
CI will provide us with memcached container running along side our build.
|
||||
|
@ -33,11 +63,11 @@ class SpymemcachedTest extends AgentTestRunner {
|
|||
memcachedContainer = new GenericContainer('memcached:latest')
|
||||
.withExposedPorts(defaultMemcachedPort)
|
||||
memcachedContainer.start()
|
||||
ip = memcachedContainer.containerIpAddress
|
||||
port = memcachedContainer.getMappedPort(defaultMemcachedPort)
|
||||
memcachedAddress = new InetSocketAddress(
|
||||
memcachedContainer.containerIpAddress,
|
||||
memcachedContainer.getMappedPort(defaultMemcachedPort)
|
||||
)
|
||||
}
|
||||
|
||||
memcached = new MemcachedClient(new InetSocketAddress(ip, port))
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
|
@ -46,11 +76,588 @@ class SpymemcachedTest extends AgentTestRunner {
|
|||
}
|
||||
}
|
||||
|
||||
def "command with no arguments"() {
|
||||
ReentrantLock queueLock
|
||||
MemcachedClient memcached
|
||||
MemcachedClient locableMemcached
|
||||
MemcachedClient timingoutMemcached
|
||||
|
||||
def setup() {
|
||||
queueLock = new ReentrantLock()
|
||||
|
||||
// Use direct executor service so our listeners finish in deterministic order
|
||||
ExecutorService listenerExecutorService = MoreExecutors.newDirectExecutorService()
|
||||
|
||||
ConnectionFactory connectionFactory = (new ConnectionFactoryBuilder())
|
||||
.setListenerExecutorService(listenerExecutorService)
|
||||
.setProtocol(BINARY)
|
||||
.build()
|
||||
memcached = new MemcachedClient(connectionFactory, Arrays.asList(memcachedAddress))
|
||||
|
||||
def lockableQueueFactory = new OperationQueueFactory() {
|
||||
@Override
|
||||
BlockingQueue<Operation> create() {
|
||||
return getLockableQueue(queueLock)
|
||||
}
|
||||
}
|
||||
|
||||
ConnectionFactory lockableConnectionFactory = (new ConnectionFactoryBuilder())
|
||||
.setListenerExecutorService(listenerExecutorService)
|
||||
.setProtocol(BINARY)
|
||||
.setOpQueueFactory(lockableQueueFactory)
|
||||
.build()
|
||||
locableMemcached = new MemcachedClient(lockableConnectionFactory, Arrays.asList(memcachedAddress))
|
||||
|
||||
ConnectionFactory timingoutConnectionFactory = (new ConnectionFactoryBuilder())
|
||||
.setListenerExecutorService(listenerExecutorService)
|
||||
.setProtocol(BINARY)
|
||||
.setOpQueueFactory(lockableQueueFactory)
|
||||
.setOpTimeout(timingOutMemcachedOpTimeout)
|
||||
.build()
|
||||
timingoutMemcached = new MemcachedClient(timingoutConnectionFactory, Arrays.asList(memcachedAddress))
|
||||
|
||||
// Add some keys to test on later:
|
||||
def valuesToSet = [
|
||||
"test-get": "get test",
|
||||
"test-get-2": "get test 2",
|
||||
"test-append": "append test",
|
||||
"test-prepend": "prepend test",
|
||||
"test-delete": "delete test",
|
||||
"test-replace": "replace test",
|
||||
"test-touch": "touch test",
|
||||
"test-cas": "cas test",
|
||||
"test-decr": "200",
|
||||
"test-incr": "100"
|
||||
]
|
||||
runUnderTrace("setup") {
|
||||
valuesToSet.each { k, v -> assert memcached.set(key(k), expiration, v).get() }
|
||||
}
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
TEST_WRITER.clear()
|
||||
}
|
||||
|
||||
def "test get hit"() {
|
||||
when:
|
||||
memcached.set("foo", 3600, "bar").get()
|
||||
runUnderTrace(parentOperation) {
|
||||
assert "get test" == memcached.get(key("test-get"))
|
||||
}
|
||||
|
||||
then:
|
||||
memcached.get("foo") == "bar"
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 2) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "get", null,"hit")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test get miss"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
assert null == memcached.get(key("test-get-key-that-doesn't-exist"))
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 2) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "get", null,"miss")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test get cancel"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
queueLock.lock()
|
||||
locableMemcached.asyncGet(key("test-get")).cancel(true)
|
||||
queueLock.unlock()
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 2) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "get", "canceled")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test get timeout"() {
|
||||
when:
|
||||
/*
|
||||
Not using runUnderTrace since timeouts happen in separate thread
|
||||
and direct executor doesn't help to make sure that parent span finishes last.
|
||||
Instead run without parent span to have only 1 span to test with.
|
||||
*/
|
||||
try {
|
||||
queueLock.lock()
|
||||
timingoutMemcached.asyncGet(key("test-get"))
|
||||
Thread.sleep(timingOutMemcachedOpTimeout + 1000)
|
||||
} finally {
|
||||
queueLock.unlock()
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 1) {
|
||||
getSpan(it, 0, "get", "timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test bulk get"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
def expected = [(key("test-get")): "get test", (key("test-get-2")): "get test 2"]
|
||||
assert expected == memcached.getBulk(key("test-get"), key("test-get-2"))
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 2) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "getBulk", null,null)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test set"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
assert memcached.set(key("test-set"), expiration, "bar").get()
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 2) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "set")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test set cancel"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
queueLock.lock()
|
||||
assert locableMemcached.set(key("test-set-cancel"), expiration, "bar").cancel()
|
||||
queueLock.unlock()
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 2) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "set", "canceled")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test add"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
assert memcached.add(key("test-add"), expiration, "add bar").get()
|
||||
assert "add bar" == memcached.get(key("test-add"))
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 3) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "get", null,"hit")
|
||||
getSpan(it, 2, "add")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test second add"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
assert memcached.add(key("test-add-2"), expiration, "add bar").get()
|
||||
assert !memcached.add(key("test-add-2"), expiration, "add bar 123").get()
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 3) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "add")
|
||||
getSpan(it, 2, "add")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test delete"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
assert memcached.delete(key("test-delete")).get()
|
||||
assert null == memcached.get(key("test-delete"))
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 3) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "get", null,"miss")
|
||||
getSpan(it, 2, "delete")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test delete non existent"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
assert !memcached.delete(key("test-delete-non-existent")).get()
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 2) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "delete")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test replace"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
assert memcached.replace(key("test-replace"), expiration, "new value").get()
|
||||
assert "new value" == memcached.get(key("test-replace"))
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 3) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "get", null,"hit")
|
||||
getSpan(it, 2, "replace")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test replace non existent"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
assert !memcached.replace(key("test-replace-non-existent"), expiration, "new value").get()
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 2) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "replace")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test append"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
def cas = memcached.gets(key("test-append"))
|
||||
assert memcached.append(cas.cas, key("test-append"), " appended").get()
|
||||
assert "append test appended" == memcached.get(key("test-append"))
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 4) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "get", null,"hit")
|
||||
getSpan(it, 2, "append")
|
||||
getSpan(it, 3, "gets")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test prepend"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
def cas = memcached.gets(key("test-prepend"))
|
||||
assert memcached.prepend(cas.cas, key("test-prepend"), "prepended ").get()
|
||||
assert "prepended prepend test" == memcached.get(key("test-prepend"))
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 4) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "get", null,"hit")
|
||||
getSpan(it, 2, "prepend")
|
||||
getSpan(it, 3, "gets")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test cas"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
def cas = memcached.gets(key("test-cas"))
|
||||
assert CASResponse.OK == memcached.cas(key("test-cas"), cas.cas, expiration, "cas bar")
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 3) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "cas")
|
||||
getSpan(it, 2, "gets")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test cas not found"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
assert CASResponse.NOT_FOUND == memcached.cas(key("test-cas-doesnt-exist"), 1234, expiration, "cas bar")
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 2) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "cas")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test touch"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
assert memcached.touch(key("test-touch"), expiration).get()
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 2) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "touch")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test touch non existent"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
assert !memcached.touch(key("test-touch-non-existent"), expiration).get()
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 2) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "touch")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test get and touch"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
assert "touch test" == memcached.getAndTouch(key("test-touch"), expiration).value
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 2) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "getAndTouch")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test get and touch non existent"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
assert null == memcached.getAndTouch(key("test-touch-non-existent"), expiration)
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 2) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "getAndTouch")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test decr"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
/*
|
||||
Memcached is funny in the way it handles incr/decr operations:
|
||||
it needs values to be strings (with digits in them) and it returns actual long from decr/incr
|
||||
*/
|
||||
assert 195 == memcached.decr(key("test-decr"), 5)
|
||||
assert "195" == memcached.get(key("test-decr"))
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 3) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "get", null,"hit")
|
||||
getSpan(it, 2, "decr")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test decr non existent"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
assert -1 == memcached.decr(key("test-decr-non-existent"), 5)
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 2) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "decr")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test decr exception"() {
|
||||
when:
|
||||
memcached.decr(key("long key: " + longString()), 5)
|
||||
|
||||
then:
|
||||
thrown IllegalArgumentException
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 1) {
|
||||
getSpan(it, 0, "decr", "long key")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test incr"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
/*
|
||||
Memcached is funny in the way it handles incr/decr operations:
|
||||
it needs values to be strings (with digits in them) and it returns actual long from decr/incr
|
||||
*/
|
||||
assert 105 == memcached.incr(key("test-incr"), 5)
|
||||
assert "105" == memcached.get(key("test-incr"))
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 3) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "get", null,"hit")
|
||||
getSpan(it, 2, "incr")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test incr non existent"() {
|
||||
when:
|
||||
runUnderTrace(parentOperation) {
|
||||
assert -1 == memcached.incr(key("test-incr-non-existent"), 5)
|
||||
}
|
||||
|
||||
then:
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 2) {
|
||||
getParentSpan(it, 0)
|
||||
getSpan(it, 1, "incr")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test incr exception"() {
|
||||
when:
|
||||
memcached.incr(key("long key: " + longString()), 5)
|
||||
|
||||
then:
|
||||
thrown IllegalArgumentException
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 1) {
|
||||
getSpan(it, 0, "incr", "long key")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def key(String k) {
|
||||
keyPrefix + k
|
||||
}
|
||||
|
||||
def longString(char c='s' as char) {
|
||||
char[] chars = new char[250]
|
||||
Arrays.fill(chars, 's' as char)
|
||||
return new String(chars)
|
||||
}
|
||||
|
||||
def getLockableQueue(ReentrantLock queueLock) {
|
||||
return new ArrayBlockingQueue<Operation>(DefaultConnectionFactory.DEFAULT_OP_QUEUE_LEN) {
|
||||
|
||||
@Override
|
||||
int drainTo(Collection<? super Operation> c, int maxElements) {
|
||||
try {
|
||||
queueLock.lock()
|
||||
return super.drainTo(c, maxElements)
|
||||
} finally {
|
||||
queueLock.unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def getParentSpan(TraceAssert trace, int index) {
|
||||
return trace.span(index) {
|
||||
operationName parentOperation
|
||||
parent()
|
||||
errored false
|
||||
tags {
|
||||
defaultTags()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def getSpan(TraceAssert trace, int index, String operation, String error=null, String result=null) {
|
||||
return trace.span(index) {
|
||||
if (index > 0) {
|
||||
childOf(trace.span(0))
|
||||
}
|
||||
|
||||
serviceName SERVICE_NAME
|
||||
operationName OPERATION_NAME
|
||||
resourceName operation
|
||||
spanType SPAN_TYPE
|
||||
errored (error != null && error != "canceled")
|
||||
|
||||
tags {
|
||||
defaultTags()
|
||||
"${DDTags.SPAN_TYPE}" SPAN_TYPE
|
||||
"${Tags.COMPONENT.getKey()}" COMPONENT_NAME
|
||||
"${Tags.SPAN_KIND.getKey()}" Tags.SPAN_KIND_CLIENT
|
||||
"${Tags.DB_TYPE.getKey()}" CompletionListener.DB_TYPE
|
||||
|
||||
if (error == "canceled") {
|
||||
"${CompletionListener.DB_COMMAND_CANCELLED}" true
|
||||
}
|
||||
|
||||
if (error == "timeout") {
|
||||
errorTags(
|
||||
CheckedOperationTimeoutException,
|
||||
"Operation timed out. - failing node: ${memcachedAddress.address}:${memcachedAddress.port}")
|
||||
}
|
||||
|
||||
if (error == "long key") {
|
||||
errorTags(
|
||||
IllegalArgumentException,
|
||||
"Key is too long (maxlen = 250)")
|
||||
}
|
||||
|
||||
if (result == "hit") {
|
||||
"${CompletionListener.MEMCACHED_RESULT}" CompletionListener.HIT
|
||||
}
|
||||
|
||||
if (result == "miss") {
|
||||
"${CompletionListener.MEMCACHED_RESULT}" CompletionListener.MISS
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue