get lettuce 4.0 sync and async working

This commit is contained in:
Richard Startin 2020-04-23 17:19:21 +01:00
parent 63ea57e7ca
commit 3f8bc96e8d
11 changed files with 1312 additions and 0 deletions

View File

@ -0,0 +1,34 @@
// Set properties before any plugins get loaded
ext {
minJavaVersionForTests = JavaVersion.VERSION_1_8
maxJavaVersionForTests = JavaVersion.VERSION_1_8
}
muzzle {
pass {
group = "biz.paluch.redis"
module = "lettuce"
versions = "[4.0.Final,4.5.0.Final]"
assertInverse = true
}
}
apply from: "${rootDir}/gradle/java.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
testSets {
latestDepTest {
dirName = 'test'
}
}
dependencies {
compileOnly group: 'biz.paluch.redis', name: 'lettuce', version: '4.0.Final'
main_java8CompileOnly group: 'biz.paluch.redis', name: 'lettuce', version: '4.5.0.Final'
testCompile group: 'com.github.kstyrc', name: 'embedded-redis', version: '0.6'
testCompile group: 'biz.paluch.redis', name: 'lettuce', version: '4.0.Final'
latestDepTestCompile group: 'biz.paluch.redis', name: 'lettuce', version: '4.+'
}

View File

@ -0,0 +1,45 @@
package datadog.trace.instrumentation.lettuce;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import java.util.Map;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public class LettuceAsyncCommandsInstrumentation extends Instrumenter.Default {
public LettuceAsyncCommandsInstrumentation() {
super("lettuce", "lettuce-4-async");
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("com.lambdaworks.redis.AbstractRedisAsyncCommands");
}
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".LettuceClientDecorator",
packageName + ".LettuceAsyncBiFunction",
packageName + ".LettuceInstrumentationUtil"
};
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod()
.and(named("dispatch"))
.and(takesArgument(0, named("com.lambdaworks.redis.protocol.RedisCommand"))),
// Cannot reference class directly here because it would lead to class load failure on Java7
packageName + ".LettuceAsyncCommandsAdvice");
}
}

View File

@ -0,0 +1,40 @@
package datadog.trace.instrumentation.lettuce;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import java.util.Map;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public final class LettuceClientInstrumentation extends Instrumenter.Default {
public LettuceClientInstrumentation() {
super("lettuce");
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("com.lambdaworks.redis.RedisClient");
}
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".LettuceClientDecorator", packageName + ".LettuceInstrumentationUtil"
};
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod().and(named("connectStandalone")),
// Cannot reference class directly here because it would lead to class load failure on Java7
packageName + ".RedisConnectionAdvice");
}
}

View File

@ -0,0 +1,39 @@
package datadog.trace.instrumentation.lettuce;
import static datadog.trace.instrumentation.lettuce.LettuceClientDecorator.DECORATE;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.concurrent.CancellationException;
import java.util.function.BiFunction;
/**
* Callback class to close the span on an error or a success in the RedisFuture returned by the
* lettuce async API
*
* @param <T> the normal completion result
* @param <U> the error
* @param <R> the return type, should be null since nothing else should happen from tracing
* standpoint after the span is closed
*/
public class LettuceAsyncBiFunction<T extends Object, U extends Throwable, R extends Object>
implements BiFunction<T, Throwable, R> {
private final AgentSpan span;
public LettuceAsyncBiFunction(final AgentSpan span) {
this.span = span;
}
@Override
public R apply(final T t, final Throwable throwable) {
if (throwable instanceof CancellationException) {
span.setTag("db.command.cancelled", true);
} else {
DECORATE.onError(span, throwable);
}
DECORATE.beforeFinish(span);
span.finish();
return null;
}
}

View File

@ -0,0 +1,45 @@
package datadog.trace.instrumentation.lettuce;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.lettuce.LettuceClientDecorator.DECORATE;
import static datadog.trace.instrumentation.lettuce.LettuceInstrumentationUtil.doFinishSpanEarly;
import com.lambdaworks.redis.protocol.AsyncCommand;
import com.lambdaworks.redis.protocol.RedisCommand;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import net.bytebuddy.asm.Advice;
public class LettuceAsyncCommandsAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope onEnter(@Advice.Argument(0) final RedisCommand command) {
final AgentSpan span = startSpan("redis.query");
DECORATE.afterStart(span);
DECORATE.onCommand(span, command);
return activateSpan(span, doFinishSpanEarly(command));
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Argument(0) final RedisCommand command,
@Advice.Enter final AgentScope scope,
@Advice.Thrown final Throwable throwable,
@Advice.Return final AsyncCommand<?, ?, ?> asyncCommand) {
final AgentSpan span = scope.span();
if (throwable != null) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
span.finish();
scope.close();
return;
}
// close spans on error or normal completion
if (!doFinishSpanEarly(command)) {
asyncCommand.handleAsync(new LettuceAsyncBiFunction<>(span));
}
scope.close();
}
}

View File

@ -0,0 +1,76 @@
package datadog.trace.instrumentation.lettuce;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.protocol.RedisCommand;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.Tags;
import datadog.trace.bootstrap.instrumentation.decorator.DatabaseClientDecorator;
public class LettuceClientDecorator extends DatabaseClientDecorator<RedisURI> {
public static final LettuceClientDecorator DECORATE = new LettuceClientDecorator();
@Override
protected String[] instrumentationNames() {
return new String[] {"lettuce"};
}
@Override
protected String service() {
return "redis";
}
@Override
protected String component() {
return "redis-client";
}
@Override
protected String spanType() {
return DDSpanTypes.REDIS;
}
@Override
protected String dbType() {
return "redis";
}
@Override
protected String dbUser(final RedisURI connection) {
return null;
}
@Override
protected String dbInstance(final RedisURI connection) {
return null;
}
@Override
public AgentSpan onConnection(final AgentSpan span, final RedisURI connection) {
if (connection != null) {
span.setTag(Tags.PEER_HOSTNAME, connection.getHost());
span.setTag(Tags.PEER_PORT, connection.getPort());
span.setTag("db.redis.dbIndex", connection.getDatabase());
span.setTag(
DDTags.RESOURCE_NAME,
"CONNECT:"
+ connection.getHost()
+ ":"
+ connection.getPort()
+ "/"
+ connection.getDatabase());
}
return super.onConnection(span, connection);
}
@SuppressWarnings("rawtypes")
public AgentSpan onCommand(final AgentSpan span, final RedisCommand command) {
String commandName = LettuceInstrumentationUtil.getCommandName(command);
span.setTag(
DDTags.RESOURCE_NAME, LettuceInstrumentationUtil.getCommandResourceName(commandName));
return span;
}
}

View File

@ -0,0 +1,71 @@
package datadog.trace.instrumentation.lettuce;
import com.lambdaworks.redis.protocol.RedisCommand;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
public class LettuceInstrumentationUtil {
public static final String AGENT_CRASHING_COMMAND_PREFIX = "COMMAND-NAME:";
public static final Set<String> nonInstrumentingCommands =
new HashSet<>(Arrays.asList("SHUTDOWN", "DEBUG", "OOM", "SEGFAULT"));
public static final Set<String> agentCrashingCommands =
new HashSet<>(Arrays.asList("CLIENT", "CLUSTER", "COMMAND", "CONFIG", "DEBUG", "SCRIPT"));
/**
* Determines whether a redis command should finish its relevant span early (as soon as tags are
* added and the command is executed) because these commands have no return values/call backs, so
* we must close the span early in order to provide info for the users
*
* @param command
* @return true if finish the span early (the command will not have a return value)
*/
public static boolean doFinishSpanEarly(final RedisCommand command) {
final String commandName = LettuceInstrumentationUtil.getCommandName(command);
return nonInstrumentingCommands.contains(commandName);
}
// Workaround to keep trace agent from crashing
// Currently the commands in AGENT_CRASHING_COMMANDS_WORDS will crash the trace agent and
// traces with these commands as the resource name will not be processed by the trace agent
// https://github.com/DataDog/datadog-trace-agent/blob/master/quantizer/redis.go#L18 has
// list of commands that will currently fail at the trace agent level.
/**
* Workaround to keep trace agent from crashing Currently the commands in
* AGENT_CRASHING_COMMANDS_WORDS will crash the trace agent and traces with these commands as the
* resource name will not be processed by the trace agent
* https://github.com/DataDog/datadog-trace-agent/blob/master/quantizer/redis.go#L18 has list of
* commands that will currently fail at the trace agent level.
*
* @param actualCommandName the actual redis command
* @return the redis command with a prefix if it is a command that will crash the trace agent,
* otherwise, the original command is returned.
*/
public static String getCommandResourceName(final String actualCommandName) {
if (agentCrashingCommands.contains(actualCommandName)) {
return AGENT_CRASHING_COMMAND_PREFIX + actualCommandName;
}
return actualCommandName;
}
/**
* Retrieves the actual redis command name from a RedisCommand object
*
* @param command the lettuce RedisCommand object
* @return the redis command as a string
*/
public static String getCommandName(final RedisCommand command) {
String commandName = "Redis Command";
if (command != null) {
// get the redis command name (i.e. GET, SET, HMSET, etc)
if (command.getType() != null) {
commandName = command.getType().name().trim();
}
}
return commandName;
}
}

View File

@ -0,0 +1,35 @@
package datadog.trace.instrumentation.lettuce;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.lettuce.LettuceClientDecorator.DECORATE;
import com.lambdaworks.redis.RedisURI;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import net.bytebuddy.asm.Advice;
public class RedisConnectionAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope onEnter(@Advice.Argument(1) RedisURI redisURI) {
AgentSpan span = startSpan("redis.query");
DECORATE.afterStart(span);
DECORATE.onConnection(span, redisURI);
return activateSpan(span, false);
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onReturn(@Advice.Enter AgentScope scope, @Advice.Thrown Throwable throwable) {
AgentSpan span = scope.span();
try {
if (throwable != null) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
}
} finally {
span.finish();
scope.close();
}
}
}

View File

@ -0,0 +1,542 @@
import com.lambdaworks.redis.ClientOptions
import com.lambdaworks.redis.RedisClient
import com.lambdaworks.redis.RedisFuture
import com.lambdaworks.redis.RedisURI
import com.lambdaworks.redis.api.StatefulConnection
import com.lambdaworks.redis.api.async.RedisAsyncCommands
import com.lambdaworks.redis.api.sync.RedisCommands
import com.lambdaworks.redis.protocol.AsyncCommand
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.utils.PortUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.bootstrap.instrumentation.api.Tags
import redis.embedded.RedisServer
import spock.lang.Shared
import spock.util.concurrent.AsyncConditions
import com.lambdaworks.redis.codec.Utf8StringCodec
import java.util.concurrent.CancellationException
import java.util.concurrent.TimeUnit
import java.util.function.BiConsumer
import java.util.function.BiFunction
import java.util.function.Consumer
import java.util.function.Function
import com.lambdaworks.redis.RedisConnectionException
import static datadog.trace.instrumentation.lettuce.LettuceInstrumentationUtil.AGENT_CRASHING_COMMAND_PREFIX
class LettuceAsyncClientTest extends AgentTestRunner {
public static final String HOST = "127.0.0.1"
public static final int DB_INDEX = 0
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
public static final ClientOptions CLIENT_OPTIONS = new ClientOptions.Builder().autoReconnect(false).build()
@Shared
int port
@Shared
int incorrectPort
@Shared
String dbAddr
@Shared
String dbAddrNonExistent
@Shared
String dbUriNonExistent
@Shared
String embeddedDbUri
@Shared
RedisServer redisServer
@Shared
Map<String, String> testHashMap = [
firstname: "John",
lastname : "Doe",
age : "53"
]
RedisClient redisClient
StatefulConnection connection
RedisAsyncCommands<String, ?> asyncCommands
RedisCommands<String, ?> syncCommands
def setupSpec() {
port = PortUtils.randomOpenPort()
incorrectPort = PortUtils.randomOpenPort()
dbAddr = HOST + ":" + port + "/" + DB_INDEX
dbAddrNonExistent = HOST + ":" + incorrectPort + "/" + DB_INDEX
dbUriNonExistent = "redis://" + dbAddrNonExistent
embeddedDbUri = "redis://" + dbAddr
redisServer = RedisServer.builder()
// bind to localhost to avoid firewall popup
.setting("bind " + HOST)
// set max memory to avoid problems in CI
.setting("maxmemory 128M")
.port(port).build()
}
def setup() {
redisClient = RedisClient.create(embeddedDbUri)
println "Using redis: $redisServer.args"
redisServer.start()
redisClient.setOptions(CLIENT_OPTIONS)
connection = redisClient.connect()
asyncCommands = connection.async()
syncCommands = connection.sync()
syncCommands.set("TESTKEY", "TESTVAL")
// 1 set + 1 connect trace
TEST_WRITER.waitForTraces(2)
TEST_WRITER.clear()
}
def cleanup() {
connection.close()
redisServer.stop()
}
def "connect using get on ConnectionFuture"() {
setup:
RedisClient testConnectionClient = RedisClient.create(embeddedDbUri)
testConnectionClient.setOptions(CLIENT_OPTIONS)
when:
StatefulConnection connection = testConnectionClient.connect(new Utf8StringCodec(),
new RedisURI(HOST, port, 3, TimeUnit.SECONDS))
then:
connection != null
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "CONNECT:" + dbAddr
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.PEER_HOSTNAME" HOST
"$Tags.PEER_PORT" port
"$Tags.DB_TYPE" "redis"
"db.redis.dbIndex" 0
defaultTags()
}
}
}
}
cleanup:
connection.close()
}
def "connect exception inside the connection future"() {
setup:
RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent)
testConnectionClient.setOptions(CLIENT_OPTIONS)
when:
StatefulConnection connection = testConnectionClient.connect(new Utf8StringCodec(),
new RedisURI(HOST, incorrectPort, 3, TimeUnit.SECONDS))
then:
connection == null
thrown RedisConnectionException
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "CONNECT:" + dbAddrNonExistent
errored true
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.PEER_HOSTNAME" HOST
"$Tags.PEER_PORT" incorrectPort
"$Tags.DB_TYPE" "redis"
"db.redis.dbIndex" 0
errorTags RedisConnectionException, String
defaultTags()
}
}
}
}
}
def "set command using Future get with timeout"() {
setup:
RedisFuture<String> redisFuture = asyncCommands.set("TESTSETKEY", "TESTSETVAL")
String res = redisFuture.get(3, TimeUnit.SECONDS)
expect:
res == "OK"
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "SET"
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
defaultTags()
}
}
}
}
}
def "get command chained with thenAccept"() {
setup:
def conds = new AsyncConditions()
Consumer<String> consumer = new Consumer<String>() {
@Override
void accept(String res) {
conds.evaluate {
assert res == "TESTVAL"
}
}
}
when:
RedisFuture<String> redisFuture = asyncCommands.get("TESTKEY")
redisFuture.thenAccept(consumer)
then:
conds.await()
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "GET"
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
defaultTags()
}
}
}
}
}
// to make sure instrumentation's chained completion stages won't interfere with user's, while still
// recording metrics
def "get non existent key command with handleAsync and chained with thenApply"() {
setup:
def conds = new AsyncConditions()
final String successStr = "KEY MISSING"
BiFunction<String, Throwable, String> firstStage = new BiFunction<String, Throwable, String>() {
@Override
String apply(String res, Throwable throwable) {
conds.evaluate {
assert res == null
assert throwable == null
}
return (res == null ? successStr : res)
}
}
Function<String, Object> secondStage = new Function<String, Object>() {
@Override
Object apply(String input) {
conds.evaluate {
assert input == successStr
}
return null
}
}
when:
RedisFuture<String> redisFuture = asyncCommands.get("NON_EXISTENT_KEY")
redisFuture.handleAsync(firstStage).thenApply(secondStage)
then:
conds.await()
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "GET"
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
defaultTags()
}
}
}
}
}
def "command with no arguments using a biconsumer"() {
setup:
def conds = new AsyncConditions()
BiConsumer<String, Throwable> biConsumer = new BiConsumer<String, Throwable>() {
@Override
void accept(String keyRetrieved, Throwable throwable) {
conds.evaluate {
assert keyRetrieved != null
}
}
}
when:
RedisFuture<String> redisFuture = asyncCommands.randomkey()
redisFuture.whenCompleteAsync(biConsumer)
then:
conds.await()
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "RANDOMKEY"
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
defaultTags()
}
}
}
}
}
def "hash set and then nest apply to hash getall"() {
setup:
def conds = new AsyncConditions()
when:
RedisFuture<String> hmsetFuture = asyncCommands.hmset("TESTHM", testHashMap)
hmsetFuture.thenApplyAsync(new Function<String, Object>() {
@Override
Object apply(String setResult) {
TEST_WRITER.waitForTraces(1) // Wait for 'hmset' trace to get written
conds.evaluate {
assert setResult == "OK"
}
RedisFuture<Map<String, String>> hmGetAllFuture = asyncCommands.hgetall("TESTHM")
hmGetAllFuture.exceptionally(new Function<Throwable, Map<String, String>>() {
@Override
Map<String, String> apply(Throwable throwable) {
println("unexpected:" + throwable.toString())
throwable.printStackTrace()
assert false
return null
}
})
hmGetAllFuture.thenAccept(new Consumer<Map<String, String>>() {
@Override
void accept(Map<String, String> hmGetAllResult) {
conds.evaluate {
assert testHashMap == hmGetAllResult
}
}
})
return null
}
})
then:
conds.await()
assertTraces(2) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "HMSET"
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
defaultTags()
}
}
}
trace(1, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "HGETALL"
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
defaultTags()
}
}
}
}
}
def "command completes exceptionally"() {
setup:
// turn off auto flush to complete the command exceptionally manually
asyncCommands.setAutoFlushCommands(false)
def conds = new AsyncConditions()
RedisFuture redisFuture = asyncCommands.del("key1", "key2")
boolean completedExceptionally = ((AsyncCommand) redisFuture).completeExceptionally(new IllegalStateException("TestException"))
redisFuture.exceptionally({
throwable ->
conds.evaluate {
assert throwable != null
assert throwable instanceof IllegalStateException
assert throwable.getMessage() == "TestException"
}
throw throwable
})
when:
// now flush and execute the command
asyncCommands.flushCommands()
redisFuture.get()
then:
conds.await()
completedExceptionally == true
thrown Exception
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "DEL"
errored true
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
errorTags(IllegalStateException, "TestException")
defaultTags()
}
}
}
}
}
def "cancel command before it finishes"() {
setup:
asyncCommands.setAutoFlushCommands(false)
def conds = new AsyncConditions()
RedisFuture redisFuture = asyncCommands.sadd("SKEY", "1", "2")
redisFuture.whenCompleteAsync({
res, throwable ->
conds.evaluate {
assert throwable != null
assert throwable instanceof CancellationException
}
})
when:
boolean cancelSuccess = redisFuture.cancel(true)
asyncCommands.flushCommands()
then:
conds.await()
cancelSuccess == true
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "SADD"
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
"db.command.cancelled" true
defaultTags()
}
}
}
}
}
def "debug segfault command (returns void) with no argument should produce span"() {
setup:
asyncCommands.debugSegfault()
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName AGENT_CRASHING_COMMAND_PREFIX + "DEBUG"
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
defaultTags()
}
}
}
}
}
def "shutdown command (returns void) should produce a span"() {
setup:
asyncCommands.shutdown(false)
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "SHUTDOWN"
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
defaultTags()
}
}
}
}
}
}

View File

@ -0,0 +1,384 @@
import com.lambdaworks.redis.ClientOptions
import com.lambdaworks.redis.RedisClient
import com.lambdaworks.redis.RedisConnectionException
import com.lambdaworks.redis.api.StatefulConnection
import com.lambdaworks.redis.api.sync.RedisCommands
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.utils.PortUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.bootstrap.instrumentation.api.Tags
import redis.embedded.RedisServer
import spock.lang.Shared
import static datadog.trace.instrumentation.lettuce.LettuceInstrumentationUtil.AGENT_CRASHING_COMMAND_PREFIX
class LettuceSyncClientTest extends AgentTestRunner {
public static final String HOST = "127.0.0.1"
public static final int DB_INDEX = 0
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
public static final ClientOptions CLIENT_OPTIONS = new ClientOptions.Builder().autoReconnect(false).build()
@Shared
int port
@Shared
int incorrectPort
@Shared
String dbAddr
@Shared
String dbAddrNonExistent
@Shared
String dbUriNonExistent
@Shared
String embeddedDbUri
@Shared
RedisServer redisServer
@Shared
Map<String, String> testHashMap = [
firstname: "John",
lastname : "Doe",
age : "53"
]
RedisClient redisClient
StatefulConnection connection
RedisCommands<String, ?> syncCommands
def setupSpec() {
port = PortUtils.randomOpenPort()
incorrectPort = PortUtils.randomOpenPort()
dbAddr = HOST + ":" + port + "/" + DB_INDEX
dbAddrNonExistent = HOST + ":" + incorrectPort + "/" + DB_INDEX
dbUriNonExistent = "redis://" + dbAddrNonExistent
embeddedDbUri = "redis://" + dbAddr
redisServer = RedisServer.builder()
// bind to localhost to avoid firewall popup
.setting("bind " + HOST)
// set max memory to avoid problems in CI
.setting("maxmemory 128M")
.port(port).build()
}
def setup() {
redisClient = RedisClient.create(embeddedDbUri)
redisServer.start()
connection = redisClient.connect()
syncCommands = connection.sync()
syncCommands.set("TESTKEY", "TESTVAL")
syncCommands.hmset("TESTHM", testHashMap)
// 2 sets + 1 connect trace
TEST_WRITER.waitForTraces(3)
TEST_WRITER.clear()
}
def cleanup() {
connection.close()
redisServer.stop()
}
def "connect"() {
setup:
RedisClient testConnectionClient = RedisClient.create(embeddedDbUri)
testConnectionClient.setOptions(CLIENT_OPTIONS)
when:
StatefulConnection connection = testConnectionClient.connect()
then:
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "CONNECT:" + dbAddr
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.PEER_HOSTNAME" HOST
"$Tags.PEER_PORT" port
"$Tags.DB_TYPE" "redis"
"db.redis.dbIndex" 0
defaultTags()
}
}
}
}
cleanup:
connection.close()
}
def "connect exception"() {
setup:
RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent)
testConnectionClient.setOptions(CLIENT_OPTIONS)
when:
testConnectionClient.connect()
then:
thrown RedisConnectionException
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "CONNECT:" + dbAddrNonExistent
errored true
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.PEER_HOSTNAME" HOST
"$Tags.PEER_PORT" incorrectPort
"$Tags.DB_TYPE" "redis"
"db.redis.dbIndex" 0
errorTags RedisConnectionException, String
defaultTags()
}
}
}
}
}
def "set command"() {
setup:
String res = syncCommands.set("TESTSETKEY", "TESTSETVAL")
expect:
res == "OK"
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "SET"
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
defaultTags()
}
}
}
}
}
def "get command"() {
setup:
String res = syncCommands.get("TESTKEY")
expect:
res == "TESTVAL"
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "GET"
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
defaultTags()
}
}
}
}
}
def "get non existent key command"() {
setup:
String res = syncCommands.get("NON_EXISTENT_KEY")
expect:
res == null
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "GET"
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
defaultTags()
}
}
}
}
}
def "command with no arguments"() {
setup:
def keyRetrieved = syncCommands.randomkey()
expect:
keyRetrieved != null
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "RANDOMKEY"
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
defaultTags()
}
}
}
}
}
def "list command"() {
setup:
long res = syncCommands.lpush("TESTLIST", "TESTLIST ELEMENT")
expect:
res == 1
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "LPUSH"
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
defaultTags()
}
}
}
}
}
def "hash set command"() {
setup:
def res = syncCommands.hmset("user", testHashMap)
expect:
res == "OK"
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "HMSET"
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
defaultTags()
}
}
}
}
}
def "hash getall command"() {
setup:
Map<String, String> res = syncCommands.hgetall("TESTHM")
expect:
res == testHashMap
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "HGETALL"
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
defaultTags()
}
}
}
}
}
def "debug segfault command (returns void) with no argument should produce span"() {
setup:
syncCommands.debugSegfault()
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName AGENT_CRASHING_COMMAND_PREFIX + "DEBUG"
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
defaultTags()
}
}
}
}
}
def "shutdown command (returns void) should produce a span"() {
setup:
syncCommands.shutdown(false)
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType DDSpanTypes.REDIS
resourceName "SHUTDOWN"
errored false
tags {
"$Tags.COMPONENT" "redis-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.DB_TYPE" "redis"
defaultTags()
}
}
}
}
}
}

View File

@ -107,6 +107,7 @@ include ':dd-java-agent:instrumentation:jms'
include ':dd-java-agent:instrumentation:jsp-2.3'
include ':dd-java-agent:instrumentation:kafka-clients-0.11'
include ':dd-java-agent:instrumentation:kafka-streams-0.11'
include ':dd-java-agent:instrumentation:lettuce-4.0'
include ':dd-java-agent:instrumentation:lettuce-5'
include ':dd-java-agent:instrumentation:log4j1'
include ':dd-java-agent:instrumentation:log4j2'