make commands with no return values (void or Mono<Void>) to finish spans early in order to record command ran.

added support for cancellation of commands in async and reactive APIs.
since flux is a stream of events, when the stream is complete, the number of events emitted are recorded in the tag db.command.results.count
created helper class LettuceInstrumentationUtil
This commit is contained in:
Gary Huang 2018-06-04 11:59:48 -04:00 committed by Gary Huang
parent bfdf4c858a
commit 3d61d839c8
16 changed files with 894 additions and 147 deletions

View File

@ -54,4 +54,4 @@ configurations.latestDepTestCompile {
testJava8Only += '**/LettuceAsyncClientTest.class' testJava8Only += '**/LettuceAsyncClientTest.class'
testJava8Only += '**/LettuceSyncClientTest.class' testJava8Only += '**/LettuceSyncClientTest.class'
testJava8Only += '**/LettuceReactiveClientTest.class'

View File

@ -14,7 +14,9 @@ public class RedisAsyncCommandsInstrumentation extends Instrumenter.Configurable
private static final HelperInjector REDIS_ASYNC_HELPERS = private static final HelperInjector REDIS_ASYNC_HELPERS =
new HelperInjector( new HelperInjector(
RedisAsyncCommandsInstrumentation.class.getPackage().getName() + ".RedisAsyncBiFunction"); RedisAsyncCommandsInstrumentation.class.getPackage().getName() + ".RedisAsyncBiFunction",
RedisAsyncCommandsInstrumentation.class.getPackage().getName()
+ ".LettuceInstrumentationUtil");
public RedisAsyncCommandsInstrumentation() { public RedisAsyncCommandsInstrumentation() {
super("redis"); super("redis");

View File

@ -0,0 +1,64 @@
package datadog.trace.instrumentation.lettuce;
import static net.bytebuddy.matcher.ElementMatchers.*;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.DDAdvice;
import datadog.trace.agent.tooling.DDTransformers;
import datadog.trace.agent.tooling.HelperInjector;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.instrumentation.lettuce.rx.FluxCreationAdvice;
import datadog.trace.instrumentation.lettuce.rx.MonoCreationAdvice;
import net.bytebuddy.agent.builder.AgentBuilder;
@AutoService(Instrumenter.class)
public class RedisReactiveCommandsInstrumentation extends Instrumenter.Configurable {
private static final HelperInjector REDIS_ASYNC_HELPERS =
new HelperInjector(
RedisReactiveCommandsInstrumentation.class.getPackage().getName()
+ ".LettuceInstrumentationUtil",
RedisReactiveCommandsInstrumentation.class.getPackage().getName()
+ ".rx.MonoCreationAdvice",
RedisReactiveCommandsInstrumentation.class.getPackage().getName()
+ ".rx.MonoDualConsumer",
RedisReactiveCommandsInstrumentation.class.getPackage().getName()
+ ".rx.FluxCreationAdvice",
RedisReactiveCommandsInstrumentation.class.getPackage().getName()
+ ".rx.FluxTerminationCancellableRunnable",
RedisReactiveCommandsInstrumentation.class.getPackage().getName()
+ ".rx.FluxTerminationCancellableRunnable$FluxOnSubscribeConsumer");
public RedisReactiveCommandsInstrumentation() {
super("redis");
}
@Override
protected boolean defaultEnabled() {
return false;
}
@Override
protected AgentBuilder apply(AgentBuilder agentBuilder) {
return agentBuilder
.type(named("io.lettuce.core.AbstractRedisReactiveCommands"))
.transform(REDIS_ASYNC_HELPERS)
.transform(DDTransformers.defaultTransformers())
.transform(
DDAdvice.create()
.advice(
isMethod()
.and(named("createMono"))
.and(takesArgument(0, named("java.util.function.Supplier")))
.and(returns(named("reactor.core.publisher.Mono"))),
MonoCreationAdvice.class.getName())
.advice(
isMethod()
.and(nameStartsWith("create"))
.and(nameEndsWith("Flux"))
.and(takesArgument(0, named("java.util.function.Supplier")))
.and(returns(named(("reactor.core.publisher.Flux")))),
FluxCreationAdvice.class.getName()))
.asDecorator();
}
}

View File

@ -1,44 +0,0 @@
package datadog.trace.instrumentation.lettuce.rx;
import static net.bytebuddy.matcher.ElementMatchers.*;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.DDAdvice;
import datadog.trace.agent.tooling.DDTransformers;
import datadog.trace.agent.tooling.HelperInjector;
import datadog.trace.agent.tooling.Instrumenter;
import net.bytebuddy.agent.builder.AgentBuilder;
@AutoService(Instrumenter.class)
public class RedisReactiveCommandsInstrumentation extends Instrumenter.Configurable {
private static final HelperInjector REDIS_ASYNC_HELPERS =
new HelperInjector(
RedisReactiveCommandsInstrumentation.class.getPackage().getName() + ".MonoCreationAdvice",
RedisReactiveCommandsInstrumentation.class.getPackage().getName() + ".MonoDualConsumer");
public RedisReactiveCommandsInstrumentation() {
super("redis");
}
@Override
protected boolean defaultEnabled() {
return false;
}
@Override
protected AgentBuilder apply(AgentBuilder agentBuilder) {
return agentBuilder
.type(named("io.lettuce.core.AbstractRedisReactiveCommands"))
.transform(REDIS_ASYNC_HELPERS)
.transform(DDTransformers.defaultTransformers())
.transform(
DDAdvice.create()
.advice(
isMethod()
.and(named("createMono"))
.and(takesArgument(0, named("java.util.function.Supplier"))),
MonoCreationAdvice.class.getName()))
.asDecorator();
}
}

View File

@ -12,8 +12,6 @@ import net.bytebuddy.asm.Advice;
public class ConnectionFutureAdvice { public class ConnectionFutureAdvice {
public static final String SERVICE_NAME = "redis";
public static final String COMPONENT_NAME = SERVICE_NAME + "-client";
public static final String REDIS_URL_TAG_NAME = "db.redis.url"; public static final String REDIS_URL_TAG_NAME = "db.redis.url";
public static final String REDIS_DB_INDEX_TAG_NAME = "db.redis.dbIndex"; public static final String REDIS_DB_INDEX_TAG_NAME = "db.redis.dbIndex";
public static final String RESOURCE_NAME_PREFIX = "CONNECT:"; public static final String RESOURCE_NAME_PREFIX = "CONNECT:";
@ -23,9 +21,9 @@ public class ConnectionFutureAdvice {
final Scope scope = GlobalTracer.get().buildSpan("redis.query").startActive(false); final Scope scope = GlobalTracer.get().buildSpan("redis.query").startActive(false);
final Span span = scope.span(); final Span span = scope.span();
Tags.DB_TYPE.set(span, SERVICE_NAME); Tags.DB_TYPE.set(span, LettuceInstrumentationUtil.SERVICE_NAME);
Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_CLIENT); Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_CLIENT);
Tags.COMPONENT.set(span, COMPONENT_NAME); Tags.COMPONENT.set(span, LettuceInstrumentationUtil.COMPONENT_NAME);
final int redisPort = redisURI.getPort(); final int redisPort = redisURI.getPort();
Tags.PEER_PORT.set(span, redisPort); Tags.PEER_PORT.set(span, redisPort);
@ -36,8 +34,8 @@ public class ConnectionFutureAdvice {
span.setTag(REDIS_URL_TAG_NAME, url); span.setTag(REDIS_URL_TAG_NAME, url);
span.setTag(REDIS_DB_INDEX_TAG_NAME, redisURI.getDatabase()); span.setTag(REDIS_DB_INDEX_TAG_NAME, redisURI.getDatabase());
span.setTag(DDTags.RESOURCE_NAME, RESOURCE_NAME_PREFIX + url); span.setTag(DDTags.RESOURCE_NAME, RESOURCE_NAME_PREFIX + url);
span.setTag(DDTags.SERVICE_NAME, SERVICE_NAME); span.setTag(DDTags.SERVICE_NAME, LettuceInstrumentationUtil.SERVICE_NAME);
span.setTag(DDTags.SPAN_TYPE, SERVICE_NAME); span.setTag(DDTags.SPAN_TYPE, LettuceInstrumentationUtil.SERVICE_NAME);
return scope; return scope;
} }

View File

@ -0,0 +1,56 @@
package datadog.trace.instrumentation.lettuce;
import io.lettuce.core.protocol.RedisCommand;
import java.util.*;
public class LettuceInstrumentationUtil {
public static final String SERVICE_NAME = "redis";
public static final String COMPONENT_NAME = SERVICE_NAME + "-client";
public static final String MAP_KEY_CMD_NAME = "CMD_NAME";
public static final String MAP_KEY_CMD_ARGS = "CMD_ARGS";
public static final String[] NON_INSTRUMENTING_COMMAND_WORDS =
new String[] {"SHUTDOWN", "DEBUG", "OOM", "SEGFAULT"};
public static final Set<String> nonInstrumentingCommands =
new HashSet<>(Arrays.asList(NON_INSTRUMENTING_COMMAND_WORDS));
public static boolean doFinishSpanEarly(Map<String, String> commandMap) {
String cmdName = commandMap.get(MAP_KEY_CMD_NAME);
String cmdArgs = commandMap.get(MAP_KEY_CMD_ARGS);
if (cmdName.equals("SHUTDOWN")
|| (nonInstrumentingCommands.contains(cmdName)
&& nonInstrumentingCommands.contains(cmdArgs))) {
return true;
}
return false;
}
public static Map<String, String> getCommandInfo(RedisCommand command) {
String commandName = "Redis Command";
String commandArgs = null;
Map<String, String> commandMap = new HashMap<>();
if (command != null) {
// get the arguments passed into the redis command
if (command.getArgs() != null) {
// standardize to null instead of using empty string
commandArgs = command.getArgs().toCommandString();
if ("".equals(commandArgs)) {
commandArgs = null;
}
}
// get the redis command name (i.e. GET, SET, HMSET, etc)
if (command.getType() != null) {
commandName = command.getType().name();
// if it is an AUTH command, then remove the extracted command arguments since it is the password
if ("AUTH".equals(commandName)) {
commandArgs = null;
}
}
}
commandMap.put(MAP_KEY_CMD_NAME, commandName);
commandMap.put(MAP_KEY_CMD_ARGS, commandArgs);
return commandMap;
}
}

View File

@ -3,6 +3,7 @@ package datadog.trace.instrumentation.lettuce;
import io.opentracing.Span; import io.opentracing.Span;
import io.opentracing.tag.Tags; import io.opentracing.tag.Tags;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.CancellationException;
import java.util.function.BiFunction; import java.util.function.BiFunction;
/** /**
@ -26,8 +27,12 @@ public class RedisAsyncBiFunction<T extends Object, U extends Throwable, R exten
@Override @Override
public R apply(T t, Throwable throwable) { public R apply(T t, Throwable throwable) {
if (throwable != null) { if (throwable != null) {
Tags.ERROR.set(this.span, true); if (throwable instanceof CancellationException) {
this.span.log(Collections.singletonMap("error.object", throwable)); this.span.setTag("db.command.cancelled", true);
} else {
Tags.ERROR.set(this.span, true);
this.span.log(Collections.singletonMap("error.object", throwable));
}
} }
this.span.finish(); this.span.finish();
return null; return null;

View File

@ -8,43 +8,32 @@ import io.opentracing.Span;
import io.opentracing.tag.Tags; import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer; import io.opentracing.util.GlobalTracer;
import java.util.Collections; import java.util.Collections;
import java.util.Map;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
public class RedisAsyncCommandsAdvice { public class RedisAsyncCommandsAdvice {
private static final String SERVICE_NAME = "redis";
private static final String COMPONENT_NAME = SERVICE_NAME + "-client";
@Advice.OnMethodEnter(suppress = Throwable.class) @Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope startSpan(@Advice.Argument(0) final RedisCommand command) { public static Scope startSpan(@Advice.Argument(0) final RedisCommand command) {
final Scope scope = GlobalTracer.get().buildSpan(SERVICE_NAME + ".query").startActive(false); Map<String, String> commandMap = LettuceInstrumentationUtil.getCommandInfo(command);
String commandName = commandMap.get(LettuceInstrumentationUtil.MAP_KEY_CMD_NAME);
String commandArgs = commandMap.get(LettuceInstrumentationUtil.MAP_KEY_CMD_ARGS);
;
final Scope scope =
GlobalTracer.get()
.buildSpan(LettuceInstrumentationUtil.SERVICE_NAME + ".query")
.startActive(LettuceInstrumentationUtil.doFinishSpanEarly(commandMap));
final Span span = scope.span(); final Span span = scope.span();
Tags.DB_TYPE.set(span, SERVICE_NAME); Tags.DB_TYPE.set(span, LettuceInstrumentationUtil.SERVICE_NAME);
Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_CLIENT); Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_CLIENT);
Tags.COMPONENT.set(span, COMPONENT_NAME); Tags.COMPONENT.set(span, LettuceInstrumentationUtil.COMPONENT_NAME);
String commandName = "Redis Command";
String commandArgs = null;
if (command != null) {
// get the arguments passed into the redis command
if (command.getArgs() != null) {
commandArgs = command.getArgs().toCommandString();
}
// get the redis command name (i.e. GET, SET, HMSET, etc)
if (command.getType() != null) {
commandName = command.getType().name();
// if it is an AUTH command, then remove the extracted command arguments since it is the password
if ("AUTH".equals(commandName)) {
commandArgs = null;
}
}
}
span.setTag(DDTags.RESOURCE_NAME, commandName); span.setTag(DDTags.RESOURCE_NAME, commandName);
span.setTag("db.command.args", commandArgs); span.setTag("db.command.args", commandArgs);
span.setTag(DDTags.SERVICE_NAME, SERVICE_NAME); span.setTag(DDTags.SERVICE_NAME, LettuceInstrumentationUtil.SERVICE_NAME);
span.setTag(DDTags.SPAN_TYPE, SERVICE_NAME); span.setTag(DDTags.SPAN_TYPE, LettuceInstrumentationUtil.SERVICE_NAME);
return scope; return scope;
} }

View File

@ -0,0 +1,34 @@
package datadog.trace.instrumentation.lettuce.rx;
import datadog.trace.instrumentation.lettuce.LettuceInstrumentationUtil;
import io.lettuce.core.protocol.RedisCommand;
import java.util.Map;
import java.util.function.Supplier;
import net.bytebuddy.asm.Advice;
import reactor.core.publisher.Flux;
public class FluxCreationAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Map<String, String> extractCommand(
@Advice.Argument(0) final Supplier<RedisCommand> supplier) {
return LettuceInstrumentationUtil.getCommandInfo(supplier.get());
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void monitorSpan(
@Advice.Enter final Map<String, String> commandMap,
@Advice.Return(readOnly = false) Flux<?> publisher) {
boolean finishSpanOnClose = LettuceInstrumentationUtil.doFinishSpanEarly(commandMap);
FluxTerminationCancellableRunnable handler =
new FluxTerminationCancellableRunnable(commandMap, finishSpanOnClose);
publisher = publisher.doOnSubscribe(handler.getOnSubscribeConsumer());
// don't register extra callbacks to finish the spans if the command being instrumented is one of those that return
// Mono<Void> (In here a flux is created first and then converted to Mono<Void>)
if (!finishSpanOnClose) {
publisher = publisher.doOnEach(handler);
publisher = publisher.doOnCancel(handler);
}
}
}

View File

@ -0,0 +1,111 @@
package datadog.trace.instrumentation.lettuce.rx;
import datadog.trace.api.DDTags;
import datadog.trace.instrumentation.lettuce.LettuceInstrumentationUtil;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Signal;
import reactor.core.publisher.SignalType;
public class FluxTerminationCancellableRunnable implements Consumer<Signal>, Runnable {
private Span span = null;
private int numResults = 0;
private FluxOnSubscribeConsumer onSubscribeConsumer = null;
public FluxTerminationCancellableRunnable(
Map<String, String> commandMap, boolean finishSpanOnClose) {
this.onSubscribeConsumer = new FluxOnSubscribeConsumer(this, commandMap, finishSpanOnClose);
}
public FluxOnSubscribeConsumer getOnSubscribeConsumer() {
return onSubscribeConsumer;
}
private void finishSpan(boolean isCommandCancelled, Throwable throwable) {
if (this.span != null) {
this.span.setTag("db.command.results.count", this.numResults);
if (isCommandCancelled) {
this.span.setTag("db.command.cancelled", true);
}
if (throwable != null) {
Tags.ERROR.set(this.span, true);
this.span.log(Collections.singletonMap("error.object", throwable));
}
this.span.finish();
} else {
LoggerFactory.getLogger(Flux.class)
.error(
"Failed to finish this.span, FluxTerminationCancellableRunnable cannot find this.span because "
+ "it probably wasn't started.");
}
}
@Override
public void accept(Signal signal) {
if (SignalType.ON_COMPLETE.equals(signal.getType())
|| SignalType.ON_ERROR.equals(signal.getType())) {
finishSpan(false, signal.getThrowable());
} else if (SignalType.ON_NEXT.equals(signal.getType())) {
++this.numResults;
}
}
@Override
public void run() {
if (this.span != null) {
finishSpan(true, null);
} else {
LoggerFactory.getLogger(Flux.class)
.error(
"Failed to finish this.span to indicate cancellation, FluxTerminationCancellableRunnable cannot find this.span because "
+ "it probably wasn't started.");
}
}
public static class FluxOnSubscribeConsumer implements Consumer<Subscription> {
private final FluxTerminationCancellableRunnable owner;
private final Map<String, String> commandMap;
private final boolean finishSpanOnClose;
public FluxOnSubscribeConsumer(
FluxTerminationCancellableRunnable owner,
Map<String, String> commandMap,
boolean finishSpanOnClose) {
this.owner = owner;
this.commandMap = commandMap;
this.finishSpanOnClose = finishSpanOnClose;
}
@Override
public void accept(Subscription subscription) {
final Scope scope =
GlobalTracer.get()
.buildSpan(LettuceInstrumentationUtil.SERVICE_NAME + ".query")
.startActive(finishSpanOnClose);
final Span span = scope.span();
this.owner.span = span;
Tags.DB_TYPE.set(span, LettuceInstrumentationUtil.SERVICE_NAME);
Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_CLIENT);
Tags.COMPONENT.set(span, LettuceInstrumentationUtil.COMPONENT_NAME);
span.setTag(
DDTags.RESOURCE_NAME, this.commandMap.get(LettuceInstrumentationUtil.MAP_KEY_CMD_NAME));
span.setTag(
"db.command.args", this.commandMap.get(LettuceInstrumentationUtil.MAP_KEY_CMD_ARGS));
span.setTag(DDTags.SERVICE_NAME, LettuceInstrumentationUtil.SERVICE_NAME);
span.setTag(DDTags.SPAN_TYPE, LettuceInstrumentationUtil.SERVICE_NAME);
scope.close();
}
}
}

View File

@ -1,7 +1,7 @@
package datadog.trace.instrumentation.lettuce.rx; package datadog.trace.instrumentation.lettuce.rx;
import datadog.trace.instrumentation.lettuce.LettuceInstrumentationUtil;
import io.lettuce.core.protocol.RedisCommand; import io.lettuce.core.protocol.RedisCommand;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.function.Supplier; import java.util.function.Supplier;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
@ -9,47 +9,28 @@ import reactor.core.publisher.Mono;
public class MonoCreationAdvice { public class MonoCreationAdvice {
public static final String SERVICE_NAME = "redis";
public static final String COMPONENT_NAME = SERVICE_NAME + "-client";
public static final String MAP_KEY_CMD_NAME = "CMD_NAME"; public static final String MAP_KEY_CMD_NAME = "CMD_NAME";
public static final String MAP_KEY_CMD_ARGS = "CMD_ARGS"; public static final String MAP_KEY_CMD_ARGS = "CMD_ARGS";
@Advice.OnMethodEnter(suppress = Throwable.class) @Advice.OnMethodEnter(suppress = Throwable.class)
public static Map<String, String> extractCommand( public static Map<String, String> extractCommand(
@Advice.Argument(0) final Supplier<RedisCommand> supplier) { @Advice.Argument(0) final Supplier<RedisCommand> supplier) {
RedisCommand command = supplier.get(); return LettuceInstrumentationUtil.getCommandInfo(supplier.get());
String commandName = "Redis Command";
String commandArgs = null;
Map<String, String> commandMap = new HashMap<>();
if (command != null) {
// get the arguments passed into the redis command
if (command.getArgs() != null) {
commandArgs = command.getArgs().toCommandString();
}
// get the redis command name (i.e. GET, SET, HMSET, etc)
if (command.getType() != null) {
commandName = command.getType().name();
// if it is an AUTH command, then remove the extracted command arguments since it is the password
if ("AUTH".equals(commandName)) {
commandArgs = null;
}
}
}
commandMap.put(MAP_KEY_CMD_NAME, commandName);
commandMap.put(MAP_KEY_CMD_ARGS, commandArgs);
return commandMap;
} }
// throwables wouldn't matter here, because no spans have been started due to redis command not being // throwables wouldn't matter here, because no spans have been started due to redis command not being
// run until the user subscribes to the Mono publisher // run until the user subscribes to the Mono publisher
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @Advice.OnMethodExit(suppress = Throwable.class)
public static void monitorSpan( public static void monitorSpan(
@Advice.Enter final Map<String, String> commandMap, @Advice.Enter final Map<String, String> commandMap,
@Advice.Return(readOnly = false) Mono<?> publisher) { @Advice.Return(readOnly = false) Mono<?> publisher) {
MonoDualConsumer mdc = new MonoDualConsumer(commandMap); boolean finishSpanOnClose = LettuceInstrumentationUtil.doFinishSpanEarly(commandMap);
MonoDualConsumer mdc = new MonoDualConsumer(commandMap, finishSpanOnClose);
publisher = publisher.doOnSubscribe(mdc); publisher = publisher.doOnSubscribe(mdc);
publisher = publisher.doOnSuccessOrError(mdc); // register the call back to close the span only if necessary
if (!finishSpanOnClose) {
publisher = publisher.doOnSuccessOrError(mdc);
}
} }
} }

View File

@ -1,6 +1,7 @@
package datadog.trace.instrumentation.lettuce.rx; package datadog.trace.instrumentation.lettuce.rx;
import datadog.trace.api.DDTags; import datadog.trace.api.DDTags;
import datadog.trace.instrumentation.lettuce.LettuceInstrumentationUtil;
import io.opentracing.Scope; import io.opentracing.Scope;
import io.opentracing.Span; import io.opentracing.Span;
import io.opentracing.tag.Tags; import io.opentracing.tag.Tags;
@ -17,9 +18,11 @@ public class MonoDualConsumer<R, T, U extends Throwable>
private Span span = null; private Span span = null;
private final Map<String, String> commandMap; private final Map<String, String> commandMap;
private final boolean finishSpanOnClose;
public MonoDualConsumer(Map<String, String> commandMap) { public MonoDualConsumer(Map<String, String> commandMap, boolean finishSpanOnClose) {
this.commandMap = commandMap; this.commandMap = commandMap;
this.finishSpanOnClose = finishSpanOnClose;
} }
@Override @Override
@ -41,18 +44,21 @@ public class MonoDualConsumer<R, T, U extends Throwable>
@Override @Override
public void accept(R r) { public void accept(R r) {
final Scope scope = final Scope scope =
GlobalTracer.get().buildSpan(MonoCreationAdvice.SERVICE_NAME + ".query").startActive(false); GlobalTracer.get()
.buildSpan(LettuceInstrumentationUtil.SERVICE_NAME + ".query")
.startActive(finishSpanOnClose);
this.span = scope.span(); this.span = scope.span();
Tags.DB_TYPE.set(this.span, MonoCreationAdvice.SERVICE_NAME); Tags.DB_TYPE.set(this.span, LettuceInstrumentationUtil.SERVICE_NAME);
Tags.SPAN_KIND.set(this.span, Tags.SPAN_KIND_CLIENT); Tags.SPAN_KIND.set(this.span, Tags.SPAN_KIND_CLIENT);
Tags.COMPONENT.set(this.span, MonoCreationAdvice.COMPONENT_NAME); Tags.COMPONENT.set(this.span, LettuceInstrumentationUtil.COMPONENT_NAME);
this.span.setTag( this.span.setTag(
DDTags.RESOURCE_NAME, this.commandMap.get(MonoCreationAdvice.MAP_KEY_CMD_NAME)); DDTags.RESOURCE_NAME, this.commandMap.get(LettuceInstrumentationUtil.MAP_KEY_CMD_NAME));
this.span.setTag("db.command.args", this.commandMap.get(MonoCreationAdvice.MAP_KEY_CMD_ARGS)); this.span.setTag(
this.span.setTag(DDTags.SERVICE_NAME, MonoCreationAdvice.SERVICE_NAME); "db.command.args", this.commandMap.get(LettuceInstrumentationUtil.MAP_KEY_CMD_ARGS));
this.span.setTag(DDTags.SPAN_TYPE, MonoCreationAdvice.SERVICE_NAME); this.span.setTag(DDTags.SERVICE_NAME, LettuceInstrumentationUtil.SERVICE_NAME);
this.span.setTag(DDTags.SPAN_TYPE, LettuceInstrumentationUtil.SERVICE_NAME);
scope.close(); scope.close();
} }
} }

View File

@ -2,16 +2,19 @@ import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.TestUtils import datadog.trace.agent.test.TestUtils
import io.lettuce.core.ConnectionFuture import io.lettuce.core.ConnectionFuture
import io.lettuce.core.RedisClient import io.lettuce.core.RedisClient
import io.lettuce.core.RedisConnectionException
import io.lettuce.core.RedisFuture import io.lettuce.core.RedisFuture
import io.lettuce.core.RedisURI import io.lettuce.core.RedisURI
import io.lettuce.core.api.StatefulConnection import io.lettuce.core.api.StatefulConnection
import io.lettuce.core.api.async.RedisAsyncCommands import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.codec.StringCodec import io.lettuce.core.codec.StringCodec
import io.lettuce.core.protocol.AsyncCommand
import redis.embedded.RedisServer import redis.embedded.RedisServer
import spock.lang.Shared import spock.lang.Shared
import spock.util.concurrent.AsyncConditions import spock.util.concurrent.AsyncConditions
import java.util.concurrent.CancellationException
import java.util.concurrent.CompletionException
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.function.BiConsumer import java.util.function.BiConsumer
import java.util.function.BiFunction import java.util.function.BiFunction
@ -78,7 +81,6 @@ class LettuceAsyncClientTest extends AgentTestRunner {
ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8, ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8,
new RedisURI(HOST, PORT, 3, TimeUnit.SECONDS)) new RedisURI(HOST, PORT, 3, TimeUnit.SECONDS))
def connection = connectionFuture.get() def connection = connectionFuture.get()
TEST_WRITER.waitForTraces(1)
expect: expect:
connection != null connection != null
@ -111,18 +113,15 @@ class LettuceAsyncClientTest extends AgentTestRunner {
setup: setup:
RedisClient testConnectionClient = RedisClient.create(DB_URI_NON_EXISTENT) RedisClient testConnectionClient = RedisClient.create(DB_URI_NON_EXISTENT)
StatefulConnection connection = null StatefulConnection connection = null
try {
ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8,
new RedisURI(HOST, INCORRECT_PORT, 3, TimeUnit.SECONDS))
connection = connectionFuture.get()
} catch (Exception rce) {
// do nothing, this is expected
println("caught " + rce.getMessage())
}
expect: when:
TEST_WRITER.waitForTraces(1) ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8,
new RedisURI(HOST, INCORRECT_PORT, 3, TimeUnit.SECONDS))
connection = connectionFuture.get()
then:
connection == null connection == null
thrown ExecutionException
assertTraces(TEST_WRITER, 1) { assertTraces(TEST_WRITER, 1) {
trace(0, 1) { trace(0, 1) {
span(0) { span(0) {
@ -138,7 +137,7 @@ class LettuceAsyncClientTest extends AgentTestRunner {
"db.redis.url" DB_ADDR_NON_EXISTENT "db.redis.url" DB_ADDR_NON_EXISTENT
"db.redis.dbIndex" 0 "db.redis.dbIndex" 0
"db.type" "redis" "db.type" "redis"
errorTags(RedisConnectionException, "some error due to incorrect port number") errorTags CompletionException
"peer.hostname" HOST "peer.hostname" HOST
"peer.port" INCORRECT_PORT "peer.port" INCORRECT_PORT
"span.kind" "client" "span.kind" "client"
@ -153,7 +152,6 @@ class LettuceAsyncClientTest extends AgentTestRunner {
setup: setup:
RedisFuture<String> redisFuture = asyncCommands.set("TESTKEY", "TESTVAL") RedisFuture<String> redisFuture = asyncCommands.set("TESTKEY", "TESTVAL")
String res = redisFuture.get(3, TimeUnit.SECONDS) String res = redisFuture.get(3, TimeUnit.SECONDS)
TEST_WRITER.waitForTraces(1)
expect: expect:
res == "OK" res == "OK"
@ -196,7 +194,6 @@ class LettuceAsyncClientTest extends AgentTestRunner {
redisFuture.thenAccept(consumer) redisFuture.thenAccept(consumer)
then: then:
TEST_WRITER.waitForTraces(1)
conds.await() conds.await()
assertTraces(TEST_WRITER, 1) { assertTraces(TEST_WRITER, 1) {
trace(0, 1) { trace(0, 1) {
@ -251,7 +248,6 @@ class LettuceAsyncClientTest extends AgentTestRunner {
redisFuture.handleAsync(firstStage).thenApply(secondStage) redisFuture.handleAsync(firstStage).thenApply(secondStage)
then: then:
TEST_WRITER.waitForTraces(1)
conds.await() conds.await()
assertTraces(TEST_WRITER, 1) { assertTraces(TEST_WRITER, 1) {
trace(0, 1) { trace(0, 1) {
@ -293,7 +289,6 @@ class LettuceAsyncClientTest extends AgentTestRunner {
redisFuture.whenCompleteAsync(biConsumer) redisFuture.whenCompleteAsync(biConsumer)
then: then:
TEST_WRITER.waitForTraces(1)
conds.await() conds.await()
assertTraces(TEST_WRITER, 1) { assertTraces(TEST_WRITER, 1) {
trace(0, 1) { trace(0, 1) {
@ -351,7 +346,6 @@ class LettuceAsyncClientTest extends AgentTestRunner {
}) })
then: then:
TEST_WRITER.waitForTraces(2)
conds.await() conds.await()
assertTraces(TEST_WRITER, 2) { assertTraces(TEST_WRITER, 2) {
trace(0, 1) { trace(0, 1) {
@ -392,4 +386,166 @@ class LettuceAsyncClientTest extends AgentTestRunner {
} }
} }
} }
def "command completes exceptionally"() {
setup:
// turn off auto flush to complete the command exceptionally manually
asyncCommands.setAutoFlushCommands(false)
def conds = new AsyncConditions()
RedisFuture redisFuture = asyncCommands.del("key1", "key2")
boolean completedExceptionally = ((AsyncCommand) redisFuture).completeExceptionally(new IllegalStateException("TestException"))
redisFuture.exceptionally ({
throwable ->
conds.evaluate {
assert throwable != null
assert throwable instanceof IllegalStateException
assert throwable.getMessage() == "TestException"
}
throw throwable
})
when:
// now flush and execute the command
asyncCommands.flushCommands()
redisFuture.get()
then:
conds.await()
completedExceptionally == true
thrown Exception
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "DEL"
errored true
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"db.command.args" "key<key1> key<key2>"
errorTags(IllegalStateException, "TestException")
"span.kind" "client"
"span.type" "redis"
}
}
}
}
cleanup:
asyncCommands.setAutoFlushCommands(true)
}
def "cancel command before it finishes"() {
setup:
asyncCommands.setAutoFlushCommands(false)
def conds = new AsyncConditions()
RedisFuture redisFuture = asyncCommands.sadd("SKEY", "1", "2")
redisFuture.whenCompleteAsync({
res, throwable -> conds.evaluate {
assert throwable != null
assert throwable instanceof CancellationException
}
})
when:
boolean cancelSuccess = redisFuture.cancel(true)
asyncCommands.flushCommands()
then:
conds.await()
cancelSuccess == true
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "SADD"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"db.command.args" "key<SKEY> value<1> value<2>"
"db.command.cancelled" true
"span.kind" "client"
"span.type" "redis"
}
}
}
}
cleanup:
asyncCommands.setAutoFlushCommands(true)
}
def "debug segfault command (returns void) with no argument should produce span"() {
setup:
asyncCommands.debugSegfault()
expect:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "DEBUG"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"db.command.args" "SEGFAULT"
"span.kind" "client"
"span.type" "redis"
}
}
}
}
cleanup:
if (!redisServer.active) {
redisServer.start()
}
}
def "shutdown command (returns void) should produce a span"() {
setup:
asyncCommands.shutdown(false)
expect:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "SHUTDOWN"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"db.command.args" "NOSAVE"
"span.kind" "client"
"span.type" "redis"
}
}
}
}
cleanup:
if (!redisServer.active) {
redisServer.start()
}
}
} }

View File

@ -0,0 +1,327 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.TestUtils
import io.lettuce.core.*
import io.lettuce.core.api.StatefulConnection
import io.lettuce.core.api.reactive.RedisReactiveCommands
import redis.embedded.RedisServer
import spock.lang.Shared
import spock.util.concurrent.AsyncConditions
import java.util.function.Consumer
import static datadog.trace.agent.test.ListWriterAssert.assertTraces
class LettuceReactiveClientTest extends AgentTestRunner {
static {
System.setProperty("dd.integration.redis.enabled", "true")
}
@Shared
public static final String HOST = "127.0.0.1"
public static final int PORT = TestUtils.randomOpenPort()
public static final int DB_INDEX = 0
@Shared
public static final String DB_ADDR = HOST + ":" + PORT + "/" + DB_INDEX
public static final String EMBEDDED_DB_URI = "redis://" + DB_ADDR
@Shared
RedisServer redisServer = RedisServer.builder()
// bind to localhost to avoid firewall popup
.setting("bind " + HOST)
// set max memory to avoid problems in CI
.setting("maxmemory 128M")
.port(PORT).build()
@Shared
RedisClient redisClient = RedisClient.create(EMBEDDED_DB_URI)
@Shared
RedisReactiveCommands<String, ?> reactiveCommands = null
def setupSpec() {
println "Using redis: $redisServer.args"
redisServer.start()
StatefulConnection connection = redisClient.connect()
reactiveCommands = connection.reactive()
}
def cleanupSpec() {
redisServer.stop()
}
def "set command with subscribe on a defined consumer"() {
setup:
def conds = new AsyncConditions()
Consumer<String> consumer = new Consumer<String>() {
@Override
void accept(String res) {
conds.evaluate {
assert res == "OK"
}
}
}
when:
reactiveCommands.set("TESTKEY", "TESTVAL").subscribe(consumer)
then:
conds.await()
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "SET"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"db.command.args" "key<TESTKEY> value<TESTVAL>"
"span.kind" "client"
"span.type" "redis"
}
}
}
}
}
def "get command with lambda function"() {
setup:
def conds = new AsyncConditions()
when:
reactiveCommands.get("TESTKEY").subscribe { res -> conds.evaluate { assert res == "TESTVAL"} }
then:
conds.await()
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "GET"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"db.command.args" "key<TESTKEY>"
"span.kind" "client"
"span.type" "redis"
}
}
}
}
}
// to make sure instrumentation's chained completion stages won't interfere with user's, while still
// recording metrics
def "get non existent key command"() {
setup:
def conds = new AsyncConditions()
final defaultVal = "NOT THIS VALUE"
when:
reactiveCommands.get("NON_EXISTENT_KEY").defaultIfEmpty(defaultVal).subscribe {
res -> conds.evaluate {
assert res == defaultVal
}
}
then:
conds.await()
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "GET"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"db.command.args" "key<NON_EXISTENT_KEY>"
"span.kind" "client"
"span.type" "redis"
}
}
}
}
}
def "command with no arguments"() {
setup:
def conds = new AsyncConditions()
when:
reactiveCommands.randomkey().subscribe {
res -> conds.evaluate {
assert res == "TESTKEY"
}
}
then:
conds.await()
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "RANDOMKEY"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"span.kind" "client"
"span.type" "redis"
}
}
}
}
}
def "command flux publisher "() {
setup:
reactiveCommands.command().subscribe()
expect:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "COMMAND"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"db.command.results.count" 157
"span.kind" "client"
"span.type" "redis"
}
}
}
}
}
def "command cancel after 2 on flux publisher "() {
setup:
reactiveCommands.command().take(2).subscribe()
expect:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "COMMAND"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"db.command.cancelled" true
"db.command.results.count" 2
"span.kind" "client"
"span.type" "redis"
}
}
}
}
}
def "non reactive command should not produce span"() {
setup:
String res = null
when:
res = reactiveCommands.digest()
then:
res != null
TEST_WRITER.size() == 0
}
def "debug segfault command (returns mono void) with no argument should produce span"() {
setup:
reactiveCommands.debugSegfault().subscribe()
expect:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "DEBUG"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"db.command.args" "SEGFAULT"
"span.kind" "client"
"span.type" "redis"
}
}
}
}
cleanup:
if (!redisServer.active) {
redisServer.start()
}
}
def "shutdown command (returns void) with argument should produce span"() {
setup:
reactiveCommands.shutdown(false).subscribe()
expect:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "SHUTDOWN"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"db.command.args" "NOSAVE"
"span.kind" "client"
"span.type" "redis"
}
}
}
}
cleanup:
if (!redisServer.active) {
redisServer.start()
}
}
}

View File

@ -7,6 +7,8 @@ import io.lettuce.core.api.sync.RedisCommands
import redis.embedded.RedisServer import redis.embedded.RedisServer
import spock.lang.Shared import spock.lang.Shared
import java.util.concurrent.CompletionException
import static datadog.trace.instrumentation.lettuce.ConnectionFutureAdvice.RESOURCE_NAME_PREFIX import static datadog.trace.instrumentation.lettuce.ConnectionFutureAdvice.RESOURCE_NAME_PREFIX
import static datadog.trace.agent.test.ListWriterAssert.assertTraces import static datadog.trace.agent.test.ListWriterAssert.assertTraces
@ -64,7 +66,6 @@ class LettuceSyncClientTest extends AgentTestRunner {
setup: setup:
RedisClient testConnectionClient = RedisClient.create(EMBEDDED_DB_URI) RedisClient testConnectionClient = RedisClient.create(EMBEDDED_DB_URI)
testConnectionClient.connect() testConnectionClient.connect()
TEST_WRITER.waitForTraces(1)
expect: expect:
assertTraces(TEST_WRITER, 1) { assertTraces(TEST_WRITER, 1) {
@ -95,12 +96,12 @@ class LettuceSyncClientTest extends AgentTestRunner {
def "connect exception"() { def "connect exception"() {
setup: setup:
RedisClient testConnectionClient = RedisClient.create(DB_URI_NON_EXISTENT) RedisClient testConnectionClient = RedisClient.create(DB_URI_NON_EXISTENT)
try {
testConnectionClient.connect()
} catch (RedisConnectionException rce) { }
TEST_WRITER.waitForTraces(1)
expect: when:
testConnectionClient.connect()
then:
thrown RedisConnectionException
assertTraces(TEST_WRITER, 1) { assertTraces(TEST_WRITER, 1) {
trace(0, 1) { trace(0, 1) {
span(0) { span(0) {
@ -116,7 +117,7 @@ class LettuceSyncClientTest extends AgentTestRunner {
"db.redis.url" DB_ADDR_NON_EXISTENT "db.redis.url" DB_ADDR_NON_EXISTENT
"db.redis.dbIndex" 0 "db.redis.dbIndex" 0
"db.type" "redis" "db.type" "redis"
errorTags(RedisConnectionException, "some error due to incorrect port number") errorTags CompletionException
"peer.hostname" HOST "peer.hostname" HOST
"peer.port" INCORRECT_PORT "peer.port" INCORRECT_PORT
"span.kind" "client" "span.kind" "client"
@ -130,7 +131,6 @@ class LettuceSyncClientTest extends AgentTestRunner {
def "set command"() { def "set command"() {
setup: setup:
String res = syncCommands.set("TESTKEY", "TESTVAL") String res = syncCommands.set("TESTKEY", "TESTVAL")
TEST_WRITER.waitForTraces(1)
expect: expect:
res == "OK" res == "OK"
@ -159,7 +159,6 @@ class LettuceSyncClientTest extends AgentTestRunner {
def "get command"() { def "get command"() {
setup: setup:
String res = syncCommands.get("TESTKEY") String res = syncCommands.get("TESTKEY")
TEST_WRITER.waitForTraces(1)
expect: expect:
res == "TESTVAL" res == "TESTVAL"
@ -188,7 +187,6 @@ class LettuceSyncClientTest extends AgentTestRunner {
def "get non existent key command"() { def "get non existent key command"() {
setup: setup:
String res = syncCommands.get("NON_EXISTENT_KEY") String res = syncCommands.get("NON_EXISTENT_KEY")
TEST_WRITER.waitForTraces(1)
expect: expect:
res == null res == null
@ -217,7 +215,6 @@ class LettuceSyncClientTest extends AgentTestRunner {
def "command with no arguments"() { def "command with no arguments"() {
setup: setup:
def keyRetrieved = syncCommands.randomkey() def keyRetrieved = syncCommands.randomkey()
TEST_WRITER.waitForTraces(1)
expect: expect:
keyRetrieved == "TESTKEY" keyRetrieved == "TESTKEY"
@ -245,7 +242,6 @@ class LettuceSyncClientTest extends AgentTestRunner {
def "list command"() { def "list command"() {
setup: setup:
long res = syncCommands.lpush("TESTLIST", "TESTLIST ELEMENT") long res = syncCommands.lpush("TESTLIST", "TESTLIST ELEMENT")
TEST_WRITER.waitForTraces(1)
expect: expect:
res == 1 res == 1
@ -274,7 +270,6 @@ class LettuceSyncClientTest extends AgentTestRunner {
def "hash set command"() { def "hash set command"() {
setup: setup:
def res = syncCommands.hmset("user", testHashMap) def res = syncCommands.hmset("user", testHashMap)
TEST_WRITER.waitForTraces(1)
expect: expect:
res == "OK" res == "OK"
@ -303,7 +298,6 @@ class LettuceSyncClientTest extends AgentTestRunner {
def "hash getall command"() { def "hash getall command"() {
setup: setup:
Map<String, String> res = syncCommands.hgetall("user") Map<String, String> res = syncCommands.hgetall("user")
TEST_WRITER.waitForTraces(1)
expect: expect:
res == testHashMap res == testHashMap
@ -328,4 +322,68 @@ class LettuceSyncClientTest extends AgentTestRunner {
} }
} }
} }
def "debug segfault command (returns void) with no argument should produce span"() {
setup:
syncCommands.debugSegfault()
expect:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "DEBUG"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"db.command.args" "SEGFAULT"
"span.kind" "client"
"span.type" "redis"
}
}
}
}
cleanup:
if (!redisServer.active) {
redisServer.start()
}
}
def "shutdown command (returns void) should produce a span"() {
setup:
syncCommands.shutdown(false)
expect:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "SHUTDOWN"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"db.command.args" "NOSAVE"
"span.kind" "client"
"span.type" "redis"
}
}
}
}
cleanup:
if (!redisServer.active) {
redisServer.start()
}
}
} }

View File

@ -40,6 +40,10 @@ class TagsAssert {
if (message != null) { if (message != null) {
methodMissing("error.msg", [message].toArray()) methodMissing("error.msg", [message].toArray())
} else {
// don't make the message check mandatory, in case of exception messages that change on every run,
// i.e. random port that is destined to fail every time
assertedTags.add("error.msg")
} }
} }