Convert Lettuce 4.0 tests from groovy to java (#9419)

This commit is contained in:
Jay DeLuca 2023-09-15 08:59:48 -04:00 committed by GitHub
parent 913bebb979
commit f7b2de72b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 823 additions and 843 deletions

View File

@ -1,529 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import com.lambdaworks.redis.ClientOptions
import com.lambdaworks.redis.RedisClient
import com.lambdaworks.redis.RedisConnectionException
import com.lambdaworks.redis.RedisFuture
import com.lambdaworks.redis.RedisURI
import com.lambdaworks.redis.api.StatefulConnection
import com.lambdaworks.redis.api.async.RedisAsyncCommands
import com.lambdaworks.redis.api.sync.RedisCommands
import com.lambdaworks.redis.codec.Utf8StringCodec
import com.lambdaworks.redis.protocol.AsyncCommand
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.utils.PortUtils
import io.opentelemetry.semconv.SemanticAttributes
import org.testcontainers.containers.GenericContainer
import spock.lang.Shared
import spock.util.concurrent.AsyncConditions
import java.util.concurrent.CancellationException
import java.util.concurrent.TimeUnit
import java.util.function.BiConsumer
import java.util.function.BiFunction
import java.util.function.Consumer
import java.util.function.Function
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.StatusCode.ERROR
class LettuceAsyncClientTest extends AgentInstrumentationSpecification {
public static final int DB_INDEX = 0
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
public static final ClientOptions CLIENT_OPTIONS = new ClientOptions.Builder().autoReconnect(false).build()
private static GenericContainer redisServer = new GenericContainer<>("redis:6.2.3-alpine").withExposedPorts(6379)
@Shared
String host
@Shared
int port
@Shared
int incorrectPort
@Shared
String dbAddr
@Shared
String dbAddrNonExistent
@Shared
String dbUriNonExistent
@Shared
String embeddedDbUri
@Shared
Map<String, String> testHashMap = [
firstname: "John",
lastname : "Doe",
age : "53"
]
RedisClient redisClient
StatefulConnection connection
RedisAsyncCommands<String, ?> asyncCommands
RedisCommands<String, ?> syncCommands
def setup() {
redisServer.start()
host = redisServer.getHost()
port = redisServer.getMappedPort(6379)
dbAddr = host + ":" + port + "/" + DB_INDEX
embeddedDbUri = "redis://" + dbAddr
incorrectPort = PortUtils.findOpenPort()
dbAddrNonExistent = host + ":" + incorrectPort + "/" + DB_INDEX
dbUriNonExistent = "redis://" + dbAddrNonExistent
redisClient = RedisClient.create(embeddedDbUri)
redisClient.setOptions(CLIENT_OPTIONS)
connection = redisClient.connect()
asyncCommands = connection.async()
syncCommands = connection.sync()
syncCommands.set("TESTKEY", "TESTVAL")
// 1 set + 1 connect trace
ignoreTracesAndClear(2)
}
def cleanup() {
connection.close()
redisServer.stop()
}
def "connect using get on ConnectionFuture"() {
setup:
RedisClient testConnectionClient = RedisClient.create(embeddedDbUri)
testConnectionClient.setOptions(CLIENT_OPTIONS)
when:
StatefulConnection connection = testConnectionClient.connect(new Utf8StringCodec(),
new RedisURI(host, port, 3, TimeUnit.SECONDS))
then:
connection != null
assertTraces(1) {
trace(0, 1) {
span(0) {
name "CONNECT"
kind CLIENT
attributes {
"$SemanticAttributes.NET_PEER_NAME" host
"$SemanticAttributes.NET_PEER_PORT" port
"$SemanticAttributes.DB_SYSTEM" "redis"
}
}
}
}
cleanup:
connection.close()
}
def "connect exception inside the connection future"() {
setup:
RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent)
testConnectionClient.setOptions(CLIENT_OPTIONS)
when:
StatefulConnection connection = testConnectionClient.connect(new Utf8StringCodec(),
new RedisURI(host, incorrectPort, 3, TimeUnit.SECONDS))
then:
connection == null
thrown RedisConnectionException
assertTraces(1) {
trace(0, 1) {
span(0) {
name "CONNECT"
kind CLIENT
status ERROR
errorEvent RedisConnectionException, String
attributes {
"$SemanticAttributes.NET_PEER_NAME" host
"$SemanticAttributes.NET_PEER_PORT" incorrectPort
"$SemanticAttributes.DB_SYSTEM" "redis"
}
}
}
}
}
def "set command using Future get with timeout"() {
setup:
RedisFuture<String> redisFuture = asyncCommands.set("TESTSETKEY", "TESTSETVAL")
String res = redisFuture.get(3, TimeUnit.SECONDS)
expect:
res == "OK"
assertTraces(1) {
trace(0, 1) {
span(0) {
name "SET"
kind CLIENT
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "SET"
}
}
}
}
}
def "get command chained with thenAccept"() {
setup:
def conds = new AsyncConditions()
Consumer<String> consumer = new Consumer<String>() {
@Override
void accept(String res) {
runWithSpan("callback") {
conds.evaluate {
assert res == "TESTVAL"
}
}
}
}
when:
runWithSpan("parent") {
RedisFuture<String> redisFuture = asyncCommands.get("TESTKEY")
redisFuture.thenAccept(consumer)
}
then:
conds.await(10)
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "GET"
kind CLIENT
childOf(span(0))
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "GET"
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
}
// to make sure instrumentation's chained completion stages won't interfere with user's, while still
// recording metrics
def "get non existent key command with handleAsync and chained with thenApply"() {
setup:
def conds = new AsyncConditions()
String successStr = "KEY MISSING"
BiFunction<String, Throwable, String> firstStage = new BiFunction<String, Throwable, String>() {
@Override
String apply(String res, Throwable error) {
runWithSpan("callback1") {
conds.evaluate {
assert res == null
assert error == null
}
}
return (res == null ? successStr : res)
}
}
Function<String, Object> secondStage = new Function<String, Object>() {
@Override
Object apply(String input) {
runWithSpan("callback2") {
conds.evaluate {
assert input == successStr
}
}
return null
}
}
when:
runWithSpan("parent") {
RedisFuture<String> redisFuture = asyncCommands.get("NON_EXISTENT_KEY")
redisFuture.handle(firstStage).thenApply(secondStage)
}
then:
conds.await(10)
assertTraces(1) {
trace(0, 4) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "GET"
kind CLIENT
childOf(span(0))
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "GET"
}
}
span(2) {
name "callback1"
kind INTERNAL
childOf(span(0))
}
span(3) {
name "callback2"
kind INTERNAL
childOf(span(0))
}
}
}
}
def "command with no arguments using a biconsumer"() {
setup:
def conds = new AsyncConditions()
BiConsumer<String, Throwable> biConsumer = new BiConsumer<String, Throwable>() {
@Override
void accept(String keyRetrieved, Throwable error) {
runWithSpan("callback") {
conds.evaluate {
assert keyRetrieved != null
}
}
}
}
when:
runWithSpan("parent") {
RedisFuture<String> redisFuture = asyncCommands.randomkey()
redisFuture.whenCompleteAsync(biConsumer)
}
then:
conds.await(10)
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "RANDOMKEY"
kind CLIENT
childOf(span(0))
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "RANDOMKEY"
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
}
def "hash set and then nest apply to hash getall"() {
setup:
def conds = new AsyncConditions()
when:
RedisFuture<String> hmsetFuture = asyncCommands.hmset("TESTHM", testHashMap)
hmsetFuture.thenApplyAsync(new Function<String, Object>() {
@Override
Object apply(String setResult) {
waitForTraces(1) // Wait for 'hmset' trace to get written
conds.evaluate {
assert setResult == "OK"
}
RedisFuture<Map<String, String>> hmGetAllFuture = asyncCommands.hgetall("TESTHM")
hmGetAllFuture.exceptionally(new Function<Throwable, Map<String, String>>() {
@Override
Map<String, String> apply(Throwable error) {
println("unexpected:" + error.toString())
error.printStackTrace()
assert false
return null
}
})
hmGetAllFuture.thenAccept(new Consumer<Map<String, String>>() {
@Override
void accept(Map<String, String> hmGetAllResult) {
conds.evaluate {
assert testHashMap == hmGetAllResult
}
}
})
return null
}
})
then:
conds.await(10)
assertTraces(2) {
trace(0, 1) {
span(0) {
name "HMSET"
kind CLIENT
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "HMSET"
}
}
}
trace(1, 1) {
span(0) {
name "HGETALL"
kind CLIENT
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "HGETALL"
}
}
}
}
}
def "command completes exceptionally"() {
setup:
// turn off auto flush to complete the command exceptionally manually
asyncCommands.setAutoFlushCommands(false)
def conds = new AsyncConditions()
RedisFuture redisFuture = asyncCommands.del("key1", "key2")
boolean completedExceptionally = ((AsyncCommand) redisFuture).completeExceptionally(new IllegalStateException("TestException"))
redisFuture.exceptionally({
error ->
conds.evaluate {
assert error != null
assert error instanceof IllegalStateException
assert error.getMessage() == "TestException"
}
throw error
})
when:
// now flush and execute the command
asyncCommands.flushCommands()
redisFuture.get()
then:
conds.await(10)
completedExceptionally == true
thrown Exception
assertTraces(1) {
trace(0, 1) {
span(0) {
name "DEL"
kind CLIENT
status ERROR
errorEvent(IllegalStateException, "TestException")
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "DEL"
}
}
}
}
}
def "cancel command before it finishes"() {
setup:
asyncCommands.setAutoFlushCommands(false)
def conds = new AsyncConditions()
RedisFuture redisFuture = runWithSpan("parent") {
asyncCommands.sadd("SKEY", "1", "2")
}
redisFuture.whenCompleteAsync({
res, error ->
runWithSpan("callback") {
conds.evaluate {
assert error != null
assert error instanceof CancellationException
}
}
})
when:
boolean cancelSuccess = redisFuture.cancel(true)
asyncCommands.flushCommands()
then:
conds.await(10)
cancelSuccess == true
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "SADD"
kind CLIENT
childOf(span(0))
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "SADD"
"lettuce.command.cancelled" true
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
}
def "debug segfault command (returns void) with no argument should produce span"() {
setup:
asyncCommands.debugSegfault()
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "DEBUG"
kind CLIENT
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "DEBUG"
}
}
}
}
}
def "shutdown command (returns void) should produce a span"() {
setup:
asyncCommands.shutdown(false)
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "SHUTDOWN"
kind CLIENT
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "SHUTDOWN"
}
}
}
}
}
}

View File

@ -1,314 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import com.lambdaworks.redis.ClientOptions
import com.lambdaworks.redis.RedisClient
import com.lambdaworks.redis.RedisConnectionException
import com.lambdaworks.redis.api.StatefulConnection
import com.lambdaworks.redis.api.sync.RedisCommands
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.utils.PortUtils
import io.opentelemetry.semconv.SemanticAttributes
import org.testcontainers.containers.GenericContainer
import spock.lang.Shared
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.StatusCode.ERROR
class LettuceSyncClientTest extends AgentInstrumentationSpecification {
public static final int DB_INDEX = 0
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
public static final ClientOptions CLIENT_OPTIONS = new ClientOptions.Builder().autoReconnect(false).build()
private static GenericContainer redisServer = new GenericContainer<>("redis:6.2.3-alpine").withExposedPorts(6379)
@Shared
String host
@Shared
int port
@Shared
int incorrectPort
@Shared
String dbAddr
@Shared
String dbAddrNonExistent
@Shared
String dbUriNonExistent
@Shared
String embeddedDbUri
@Shared
Map<String, String> testHashMap = [
firstname: "John",
lastname : "Doe",
age : "53"
]
RedisClient redisClient
StatefulConnection connection
RedisCommands<String, ?> syncCommands
def setup() {
//TODO do not restart server for every test
redisServer.start()
host = redisServer.getHost()
port = redisServer.getMappedPort(6379)
dbAddr = host + ":" + port + "/" + DB_INDEX
embeddedDbUri = "redis://" + dbAddr
incorrectPort = PortUtils.findOpenPort()
dbAddrNonExistent = host + ":" + incorrectPort + "/" + DB_INDEX
dbUriNonExistent = "redis://" + dbAddrNonExistent
redisClient = RedisClient.create(embeddedDbUri)
connection = redisClient.connect()
syncCommands = connection.sync()
syncCommands.set("TESTKEY", "TESTVAL")
syncCommands.hmset("TESTHM", testHashMap)
// 2 sets + 1 connect trace
ignoreTracesAndClear(3)
}
def cleanup() {
connection.close()
redisServer.stop()
}
def "connect"() {
setup:
RedisClient testConnectionClient = RedisClient.create(embeddedDbUri)
testConnectionClient.setOptions(CLIENT_OPTIONS)
when:
StatefulConnection connection = testConnectionClient.connect()
then:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "CONNECT"
kind CLIENT
attributes {
"$SemanticAttributes.NET_PEER_NAME" host
"$SemanticAttributes.NET_PEER_PORT" port
"$SemanticAttributes.DB_SYSTEM" "redis"
}
}
}
}
cleanup:
connection.close()
}
def "connect exception"() {
setup:
RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent)
testConnectionClient.setOptions(CLIENT_OPTIONS)
when:
testConnectionClient.connect()
then:
thrown RedisConnectionException
assertTraces(1) {
trace(0, 1) {
span(0) {
name "CONNECT"
kind CLIENT
status ERROR
errorEvent RedisConnectionException, String
attributes {
"$SemanticAttributes.NET_PEER_NAME" host
"$SemanticAttributes.NET_PEER_PORT" incorrectPort
"$SemanticAttributes.DB_SYSTEM" "redis"
}
}
}
}
}
def "set command"() {
setup:
String res = syncCommands.set("TESTSETKEY", "TESTSETVAL")
expect:
res == "OK"
assertTraces(1) {
trace(0, 1) {
span(0) {
name "SET"
kind CLIENT
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "SET"
}
}
}
}
}
def "get command"() {
setup:
String res = syncCommands.get("TESTKEY")
expect:
res == "TESTVAL"
assertTraces(1) {
trace(0, 1) {
span(0) {
name "GET"
kind CLIENT
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "GET"
}
}
}
}
}
def "get non existent key command"() {
setup:
String res = syncCommands.get("NON_EXISTENT_KEY")
expect:
res == null
assertTraces(1) {
trace(0, 1) {
span(0) {
name "GET"
kind CLIENT
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "GET"
}
}
}
}
}
def "command with no arguments"() {
setup:
def keyRetrieved = syncCommands.randomkey()
expect:
keyRetrieved != null
assertTraces(1) {
trace(0, 1) {
span(0) {
name "RANDOMKEY"
kind CLIENT
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "RANDOMKEY"
}
}
}
}
}
def "list command"() {
setup:
long res = syncCommands.lpush("TESTLIST", "TESTLIST ELEMENT")
expect:
res == 1
assertTraces(1) {
trace(0, 1) {
span(0) {
name "LPUSH"
kind CLIENT
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "LPUSH"
}
}
}
}
}
def "hash set command"() {
setup:
def res = syncCommands.hmset("user", testHashMap)
expect:
res == "OK"
assertTraces(1) {
trace(0, 1) {
span(0) {
name "HMSET"
kind CLIENT
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "HMSET"
}
}
}
}
}
def "hash getall command"() {
setup:
Map<String, String> res = syncCommands.hgetall("TESTHM")
expect:
res == testHashMap
assertTraces(1) {
trace(0, 1) {
span(0) {
name "HGETALL"
kind CLIENT
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "HGETALL"
}
}
}
}
}
def "debug segfault command (returns void) with no argument should produce span"() {
setup:
syncCommands.debugSegfault()
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "DEBUG"
kind CLIENT
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "DEBUG"
}
}
}
}
}
def "shutdown command (returns void) should produce a span"() {
setup:
syncCommands.shutdown(false)
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "SHUTDOWN"
kind CLIENT
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.DB_OPERATION" "SHUTDOWN"
}
}
}
}
}
}

View File

@ -0,0 +1,513 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.lettuce.v4_0;
import static io.opentelemetry.api.common.AttributeKey.booleanKey;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchException;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.awaitility.Awaitility.await;
import com.google.common.collect.ImmutableMap;
import com.lambdaworks.redis.ClientOptions;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnectionException;
import com.lambdaworks.redis.RedisFuture;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.api.StatefulRedisConnection;
import com.lambdaworks.redis.api.async.RedisAsyncCommands;
import com.lambdaworks.redis.api.sync.RedisCommands;
import com.lambdaworks.redis.codec.Utf8StringCodec;
import com.lambdaworks.redis.protocol.AsyncCommand;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.test.utils.PortUtils;
import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.SemanticAttributes;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;
@SuppressWarnings("deprecation") // until old http semconv are dropped in 2.0
class LettuceAsyncClientTest {
@RegisterExtension
protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create();
static final DockerImageName containerImage = DockerImageName.parse("redis:6.2.3-alpine");
private static final int DB_INDEX = 0;
// Disable auto reconnect, so we do not get stray traces popping up on server shutdown
private static final ClientOptions CLIENT_OPTIONS =
new ClientOptions.Builder().autoReconnect(false).build();
private static final GenericContainer<?> redisServer =
new GenericContainer<>(containerImage).withExposedPorts(6379);
private static String host;
private static int port;
private static int incorrectPort;
private static String dbUriNonExistent;
private static String embeddedDbUri;
private static final ImmutableMap<String, String> testHashMap =
ImmutableMap.of(
"firstname", "John",
"lastname", "Doe",
"age", "53");
static RedisClient redisClient;
private static StatefulRedisConnection<String, String> connection;
static RedisAsyncCommands<String, String> asyncCommands;
@BeforeAll
static void setUp() {
redisServer.start();
host = redisServer.getHost();
port = redisServer.getMappedPort(6379);
embeddedDbUri = "redis://" + host + ":" + port + "/" + DB_INDEX;
incorrectPort = PortUtils.findOpenPort();
dbUriNonExistent = "redis://" + host + ":" + incorrectPort + "/" + DB_INDEX;
redisClient = RedisClient.create(embeddedDbUri);
redisClient.setOptions(CLIENT_OPTIONS);
connection = redisClient.connect();
asyncCommands = connection.async();
RedisCommands<String, String> syncCommands = connection.sync();
syncCommands.set("TESTKEY", "TESTVAL");
// 1 set + 1 connect trace
testing.clearData();
}
@AfterAll
static void cleanUp() {
connection.close();
redisServer.stop();
}
@Test
void testConnectUsingGetOnConnectionFuture() {
RedisClient testConnectionClient = RedisClient.create(embeddedDbUri);
testConnectionClient.setOptions(CLIENT_OPTIONS);
StatefulRedisConnection<String, String> connection1 =
testConnectionClient.connect(
new Utf8StringCodec(), new RedisURI(host, port, 3, TimeUnit.SECONDS));
cleanup.deferCleanup(connection1);
assertThat(connection1).isNotNull();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("CONNECT")
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_PEER_NAME, host),
equalTo(SemanticAttributes.NET_PEER_PORT, port),
equalTo(SemanticAttributes.DB_SYSTEM, "redis"))));
}
@Test
void testExceptionInsideTheConnectionFuture() {
RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent);
testConnectionClient.setOptions(CLIENT_OPTIONS);
Exception exception =
catchException(
() ->
testConnectionClient.connect(
new Utf8StringCodec(), new RedisURI(host, incorrectPort, 3, TimeUnit.SECONDS)));
assertThat(exception).isInstanceOf(RedisConnectionException.class);
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("CONNECT")
.hasKind(SpanKind.CLIENT)
.hasStatus(StatusData.error())
.hasException(exception)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_PEER_NAME, host),
equalTo(SemanticAttributes.NET_PEER_PORT, incorrectPort),
equalTo(SemanticAttributes.DB_SYSTEM, "redis"))));
}
@Test
void testSetCommandUsingFutureGetWithTimeout()
throws ExecutionException, InterruptedException, TimeoutException {
RedisFuture<String> redisFuture = asyncCommands.set("TESTSETKEY", "TESTSETVAL");
String res = redisFuture.get(3, TimeUnit.SECONDS);
assertThat(res).isEqualTo("OK");
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("SET")
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "SET"))));
}
@Test
void testCommandChainedWithThenAccept() {
CompletableFuture<String> future = new CompletableFuture<>();
Consumer<String> consumer =
res -> {
testing.runWithSpan("callback", () -> assertThat(res).isEqualTo("TESTVAL"));
future.complete(res);
};
testing.runWithSpan(
"parent",
() -> {
RedisFuture<String> redisFuture = asyncCommands.get("TESTKEY");
redisFuture.thenAccept(consumer);
});
await().untilAsserted(() -> assertThat(future).isCompletedWithValue("TESTVAL"));
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName("GET")
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "GET")),
span ->
span.hasName("callback")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
// to make sure instrumentation's chained completion stages won't interfere with user's, while
// still recording spans
@Test
void getNonExistentKeyCommandWithHandleAsyncAndChainedWithThenApply() {
CompletableFuture<String> future1 = new CompletableFuture<>();
CompletableFuture<String> future2 = new CompletableFuture<>();
String successStr = "KEY MISSING";
BiFunction<String, Throwable, String> firstStage =
(res, error) -> {
testing.runWithSpan(
"callback1",
() -> {
assertThat(res).isNull();
assertThat(error).isNull();
future1.complete(null);
});
return (res == null ? successStr : res);
};
Function<String, Object> secondStage =
input -> {
testing.runWithSpan(
"callback2",
() -> {
assertThat(input).isEqualTo(successStr);
future2.complete(successStr);
});
return null;
};
testing.runWithSpan(
"parent",
() -> {
RedisFuture<String> redisFuture = asyncCommands.get("NON_EXISTENT_KEY");
redisFuture.handle(firstStage).thenApply(secondStage);
});
await()
.untilAsserted(
() -> {
assertThat(future1).isCompletedWithValue(null);
assertThat(future2).isCompletedWithValue(successStr);
});
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName("GET")
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "GET")),
span ->
span.hasName("callback1")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName("callback2")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
void testCommandWithNoArgumentsUsingBiconsumer() {
CompletableFuture<String> future = new CompletableFuture<>();
BiConsumer<String, Throwable> biConsumer =
(keyRetrieved, error) ->
testing.runWithSpan(
"callback",
() -> {
assertThat(keyRetrieved).isNotNull();
future.complete(keyRetrieved);
});
testing.runWithSpan(
"parent",
() -> {
RedisFuture<String> redisFuture = asyncCommands.randomkey();
redisFuture.whenCompleteAsync(biConsumer);
});
await().untilAsserted(() -> assertThat(future).isCompleted());
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName("RANDOMKEY")
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "RANDOMKEY")),
span ->
span.hasName("callback")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
void testHashSetAndThenNestApplyToHashGetall() {
CompletableFuture<Map<String, String>> future = new CompletableFuture<>();
RedisFuture<String> hmsetFuture = asyncCommands.hmset("TESTHM", testHashMap);
hmsetFuture.thenApplyAsync(
setResult -> {
// Wait for 'hmset' trace to get written
testing.waitForTraces(1);
if (!"OK".equals(setResult)) {
future.completeExceptionally(new AssertionError("Wrong hmset result " + setResult));
return null;
}
RedisFuture<Map<String, String>> hmGetAllFuture = asyncCommands.hgetall("TESTHM");
hmGetAllFuture.whenComplete(
(result, exception) -> {
if (exception != null) {
future.completeExceptionally(exception);
} else {
future.complete(result);
}
});
return null;
});
await().untilAsserted(() -> assertThat(future).isCompletedWithValue(testHashMap));
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("HMSET")
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "HMSET"))),
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("HGETALL")
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "HGETALL"))));
}
@Test
void testCommandCompletesExceptionally() {
// turn off auto flush to complete the command exceptionally manually
asyncCommands.setAutoFlushCommands(false);
cleanup.deferCleanup(() -> asyncCommands.setAutoFlushCommands(true));
RedisFuture<Long> redisFuture = asyncCommands.del("key1", "key2");
boolean completedExceptionally =
((AsyncCommand<?, ?, ?>) redisFuture)
.completeExceptionally(new IllegalStateException("TestException"));
redisFuture.exceptionally(
error -> {
assertThat(error).isNotNull();
assertThat(error).isInstanceOf(IllegalStateException.class);
assertThat(error.getMessage()).isEqualTo("TestException");
throw new RuntimeException(error);
});
asyncCommands.flushCommands();
Throwable thrown = catchThrowable(redisFuture::get);
await()
.untilAsserted(
() -> {
assertThat(thrown).isInstanceOf(ExecutionException.class);
assertThat(completedExceptionally).isTrue();
});
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("DEL")
.hasKind(SpanKind.CLIENT)
.hasStatus(StatusData.error())
.hasException(new IllegalStateException("TestException"))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "DEL"))));
}
@Test
void testCommandBeforeItFinished() {
asyncCommands.setAutoFlushCommands(false);
cleanup.deferCleanup(() -> asyncCommands.setAutoFlushCommands(true));
RedisFuture<Long> redisFuture =
testing.runWithSpan("parent", () -> asyncCommands.sadd("SKEY", "1", "2"));
redisFuture.whenCompleteAsync(
(res, error) ->
testing.runWithSpan(
"callback",
() -> {
assertThat(error).isNotNull();
assertThat(error).isInstanceOf(CancellationException.class);
}));
boolean cancelSuccess = redisFuture.cancel(true);
asyncCommands.flushCommands();
await().untilAsserted(() -> assertThat(cancelSuccess).isTrue());
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("parent")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasAttributes(Attributes.empty()),
span ->
span.hasName("SADD")
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "SADD"),
equalTo(booleanKey("lettuce.command.cancelled"), true)),
span ->
span.hasName("callback")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
@Test
void testDebugSegfaultCommandWithNoArgumentShouldProduceSpan() {
// Test Causes redis to crash therefore it needs its own container
GenericContainer<?> server = new GenericContainer<>(containerImage).withExposedPorts(6379);
server.start();
cleanup.deferCleanup(server::stop);
long serverPort = server.getMappedPort(6379);
RedisClient client = RedisClient.create("redis://" + host + ":" + serverPort + "/" + DB_INDEX);
StatefulRedisConnection<String, String> connection1 = client.connect();
cleanup.deferCleanup(connection1);
RedisAsyncCommands<String, String> commands = connection1.async();
// 1 connect trace
testing.clearData();
commands.debugSegfault();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("DEBUG")
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "DEBUG"))));
}
@Test
void testShutdownCommandShouldProduceSpan() {
// Test Causes redis to crash therefore it needs its own container
GenericContainer<?> server = new GenericContainer<>(containerImage).withExposedPorts(6379);
server.start();
cleanup.deferCleanup(server::stop);
long shutdownServerPort = server.getMappedPort(6379);
RedisClient client =
RedisClient.create("redis://" + host + ":" + shutdownServerPort + "/" + DB_INDEX);
StatefulRedisConnection<String, String> connection1 = client.connect();
cleanup.deferCleanup(connection1);
RedisAsyncCommands<String, String> commands = connection1.async();
// 1 connect trace
testing.clearData();
commands.shutdown(false);
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("SHUTDOWN")
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "SHUTDOWN"))));
}
}

View File

@ -0,0 +1,310 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.lettuce.v4_0;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchException;
import com.google.common.collect.ImmutableMap;
import com.lambdaworks.redis.ClientOptions;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnectionException;
import com.lambdaworks.redis.api.StatefulRedisConnection;
import com.lambdaworks.redis.api.sync.RedisCommands;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.test.utils.PortUtils;
import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.semconv.SemanticAttributes;
import java.util.Map;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;
@SuppressWarnings("deprecation") // until old http semconv are dropped in 2.0
class LettuceSyncClientTest {
@RegisterExtension
protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create();
static final DockerImageName containerImage = DockerImageName.parse("redis:6.2.3-alpine");
private static final int DB_INDEX = 0;
// Disable auto reconnect, so we do not get stray traces popping up on server shutdown
private static final ClientOptions CLIENT_OPTIONS =
new ClientOptions.Builder().autoReconnect(false).build();
private static final GenericContainer<?> redisServer =
new GenericContainer<>(containerImage).withExposedPorts(6379);
private static String host;
private static int port;
private static int incorrectPort;
private static String dbUriNonExistent;
private static String embeddedDbUri;
private static final ImmutableMap<String, String> testHashMap =
ImmutableMap.of(
"firstname", "John",
"lastname", "Doe",
"age", "53");
static RedisClient redisClient;
private static StatefulRedisConnection<String, String> connection;
static RedisCommands<String, String> syncCommands;
@BeforeAll
static void setUp() {
redisServer.start();
host = redisServer.getHost();
port = redisServer.getMappedPort(6379);
embeddedDbUri = "redis://" + host + ":" + port + "/" + DB_INDEX;
incorrectPort = PortUtils.findOpenPort();
dbUriNonExistent = "redis://" + host + ":" + incorrectPort + "/" + DB_INDEX;
redisClient = RedisClient.create(embeddedDbUri);
connection = redisClient.connect();
syncCommands = connection.sync();
syncCommands.set("TESTKEY", "TESTVAL");
syncCommands.hmset("TESTHM", testHashMap);
// 2 sets + 1 connect trace
testing.clearData();
}
@AfterAll
static void cleanUp() {
connection.close();
redisServer.stop();
}
@Test
void testConnect() {
RedisClient testConnectionClient = RedisClient.create(embeddedDbUri);
testConnectionClient.setOptions(CLIENT_OPTIONS);
StatefulRedisConnection<String, String> testConnection = testConnectionClient.connect();
cleanup.deferCleanup(() -> testConnection.close());
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("CONNECT")
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_PEER_NAME, host),
equalTo(SemanticAttributes.NET_PEER_PORT, port),
equalTo(SemanticAttributes.DB_SYSTEM, "redis"))));
}
@Test
void testConnectException() {
RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent);
testConnectionClient.setOptions(CLIENT_OPTIONS);
Exception exception = catchException(testConnectionClient::connect);
assertThat(exception).isInstanceOf(RedisConnectionException.class);
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("CONNECT")
.hasKind(SpanKind.CLIENT)
.hasException(exception)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_PEER_NAME, host),
equalTo(SemanticAttributes.NET_PEER_PORT, incorrectPort),
equalTo(SemanticAttributes.DB_SYSTEM, "redis"))));
}
@Test
void testSetCommand() {
String res = syncCommands.set("TESTSETKEY", "TESTSETVAL");
assertThat(res).isEqualTo("OK");
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("SET")
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "SET"))));
}
@Test
void testGetCommand() {
String res = syncCommands.get("TESTKEY");
assertThat(res).isEqualTo("TESTVAL");
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("GET")
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "GET"))));
}
@Test
void testGetNonExistentKeyCommand() {
String res = syncCommands.get("NON_EXISTENT_KEY");
assertThat(res).isNull();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("GET")
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "GET"))));
}
@Test
void testCommandWithNoArguments() {
String res = syncCommands.randomkey();
assertThat(res).isNotNull();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("RANDOMKEY")
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "RANDOMKEY"))));
}
@Test
void testListCommand() {
long res = syncCommands.lpush("TESTLIST", "TESTLIST ELEMENT");
assertThat(res).isEqualTo(1);
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("LPUSH")
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "LPUSH"))));
}
@Test
void testHashSetCommand() {
String res = syncCommands.hmset("user", testHashMap);
assertThat(res).isEqualTo("OK");
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("HMSET")
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "HMSET"))));
}
@Test
void testHashGetallCommand() {
Map<String, String> res = syncCommands.hgetall("TESTHM");
assertThat(res).isEqualTo(testHashMap);
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("HGETALL")
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "HGETALL"))));
}
@Test
void testDebugSegfaultCommandWithNoArgumentShouldProduceSpan() {
// Test Causes redis to crash therefore it needs its own container
GenericContainer<?> server = new GenericContainer<>(containerImage).withExposedPorts(6379);
server.start();
cleanup.deferCleanup(server::stop);
long serverPort = server.getMappedPort(6379);
RedisClient client = RedisClient.create("redis://" + host + ":" + serverPort + "/" + DB_INDEX);
StatefulRedisConnection<String, String> connection1 = client.connect();
cleanup.deferCleanup(connection1);
RedisCommands<String, String> commands = connection1.sync();
// 1 connect trace
testing.clearData();
commands.debugSegfault();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("DEBUG")
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "DEBUG"))));
}
@Test
void testShutdownCommandShouldProduceSpan() {
// Test Causes redis to crash therefore it needs its own container
GenericContainer<?> server = new GenericContainer<>(containerImage).withExposedPorts(6379);
server.start();
cleanup.deferCleanup(server::stop);
long shutdownServerPort = server.getMappedPort(6379);
RedisClient client =
RedisClient.create("redis://" + host + ":" + shutdownServerPort + "/" + DB_INDEX);
StatefulRedisConnection<String, String> connection1 = client.connect();
cleanup.deferCleanup(connection1);
RedisCommands<String, String> commands = connection1.sync();
// 1 connect trace
testing.clearData();
commands.shutdown(false);
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("SHUTDOWN")
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
equalTo(SemanticAttributes.DB_OPERATION, "SHUTDOWN"))));
}
}