From 2c4022450a901c3daf6aa02a83828d2fd2abdd54 Mon Sep 17 00:00:00 2001 From: Jay DeLuca Date: Fri, 29 Sep 2023 14:04:15 -0400 Subject: [PATCH] Convert lettuce-5.0 tests from groovy to java (#9547) --- .../test/groovy/LettuceAsyncClientTest.groovy | 544 ------------------ .../groovy/LettuceReactiveClientTest.groovy | 430 -------------- .../test/groovy/LettuceSyncClientTest.groovy | 323 ----------- .../v5_0/AbstractLettuceClientTest.java | 78 +++ .../lettuce/v5_0/LettuceAsyncClientTest.java | 496 ++++++++++++++++ .../v5_0/LettuceReactiveClientTest.java | 373 ++++++++++++ .../lettuce/v5_0/LettuceSyncClientTest.java | 291 ++++++++++ 7 files changed, 1238 insertions(+), 1297 deletions(-) delete mode 100644 instrumentation/lettuce/lettuce-5.0/javaagent/src/test/groovy/LettuceAsyncClientTest.groovy delete mode 100644 instrumentation/lettuce/lettuce-5.0/javaagent/src/test/groovy/LettuceReactiveClientTest.groovy delete mode 100644 instrumentation/lettuce/lettuce-5.0/javaagent/src/test/groovy/LettuceSyncClientTest.groovy create mode 100644 instrumentation/lettuce/lettuce-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_0/AbstractLettuceClientTest.java create mode 100644 instrumentation/lettuce/lettuce-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_0/LettuceAsyncClientTest.java create mode 100644 instrumentation/lettuce/lettuce-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_0/LettuceReactiveClientTest.java create mode 100644 instrumentation/lettuce/lettuce-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_0/LettuceSyncClientTest.java diff --git a/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/groovy/LettuceAsyncClientTest.groovy b/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/groovy/LettuceAsyncClientTest.groovy deleted file mode 100644 index 574a2f63c3..0000000000 --- a/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/groovy/LettuceAsyncClientTest.groovy +++ /dev/null @@ -1,544 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.lettuce.core.ClientOptions -import io.lettuce.core.ConnectionFuture -import io.lettuce.core.RedisClient -import io.lettuce.core.RedisFuture -import io.lettuce.core.RedisURI -import io.lettuce.core.api.StatefulConnection -import io.lettuce.core.api.async.RedisAsyncCommands -import io.lettuce.core.api.sync.RedisCommands -import io.lettuce.core.codec.StringCodec -import io.lettuce.core.protocol.AsyncCommand -import io.netty.channel.AbstractChannel -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import io.opentelemetry.instrumentation.test.utils.PortUtils -import io.opentelemetry.semconv.SemanticAttributes -import org.testcontainers.containers.GenericContainer -import spock.lang.Shared -import spock.util.concurrent.AsyncConditions - -import java.util.concurrent.CancellationException -import java.util.concurrent.ExecutionException -import java.util.concurrent.TimeUnit -import java.util.function.BiConsumer -import java.util.function.BiFunction -import java.util.function.Consumer -import java.util.function.Function - -import static io.opentelemetry.api.trace.SpanKind.CLIENT -import static io.opentelemetry.api.trace.SpanKind.INTERNAL -import static io.opentelemetry.api.trace.StatusCode.ERROR - -class LettuceAsyncClientTest extends AgentInstrumentationSpecification { - public static final int DB_INDEX = 0 - // Disable autoreconnect so we do not get stray traces popping up on server shutdown - public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build() - - private static GenericContainer redisServer = new GenericContainer<>("redis:6.2.3-alpine").withExposedPorts(6379) - - @Shared - String host - @Shared - int port - @Shared - int incorrectPort - @Shared - String dbAddr - @Shared - String dbAddrNonExistent - @Shared - String dbUriNonExistent - @Shared - String embeddedDbUri - - @Shared - Map testHashMap = [ - firstname: "John", - lastname : "Doe", - age : "53" - ] - - RedisClient redisClient - StatefulConnection connection - RedisAsyncCommands asyncCommands - RedisCommands syncCommands - - def setup() { - redisServer.start() - - host = redisServer.getHost() - port = redisServer.getMappedPort(6379) - dbAddr = host + ":" + port + "/" + DB_INDEX - embeddedDbUri = "redis://" + dbAddr - - incorrectPort = PortUtils.findOpenPort() - dbAddrNonExistent = host + ":" + incorrectPort + "/" + DB_INDEX - dbUriNonExistent = "redis://" + dbAddrNonExistent - - redisClient = RedisClient.create(embeddedDbUri) - - redisClient.setOptions(CLIENT_OPTIONS) - - connection = redisClient.connect() - asyncCommands = connection.async() - syncCommands = connection.sync() - - syncCommands.set("TESTKEY", "TESTVAL") - - // 1 set + 1 connect trace - ignoreTracesAndClear(2) - } - - def cleanup() { - connection.close() - redisServer.stop() - } - - def "connect using get on ConnectionFuture"() { - setup: - RedisClient testConnectionClient = RedisClient.create(embeddedDbUri) - testConnectionClient.setOptions(CLIENT_OPTIONS) - - when: - ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8, - new RedisURI(host, port, 3, TimeUnit.SECONDS)) - StatefulConnection connection = connectionFuture.get() - - then: - connection != null - assertTraces(1) { - trace(0, 1) { - span(0) { - name "CONNECT" - kind CLIENT - attributes { - "$SemanticAttributes.NET_PEER_NAME" host - "$SemanticAttributes.NET_PEER_PORT" port - "$SemanticAttributes.DB_SYSTEM" "redis" - } - } - } - } - - cleanup: - connection.close() - } - - def "connect exception inside the connection future"() { - setup: - RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent) - testConnectionClient.setOptions(CLIENT_OPTIONS) - - when: - ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8, - new RedisURI(host, incorrectPort, 3, TimeUnit.SECONDS)) - StatefulConnection connection = connectionFuture.get() - - then: - connection == null - thrown ExecutionException - assertTraces(1) { - trace(0, 1) { - span(0) { - name "CONNECT" - kind CLIENT - status ERROR - errorEvent AbstractChannel.AnnotatedConnectException, String - attributes { - "$SemanticAttributes.NET_PEER_NAME" host - "$SemanticAttributes.NET_PEER_PORT" incorrectPort - "$SemanticAttributes.DB_SYSTEM" "redis" - } - } - } - } - } - - def "set command using Future get with timeout"() { - setup: - RedisFuture redisFuture = asyncCommands.set("TESTSETKEY", "TESTSETVAL") - String res = redisFuture.get(3, TimeUnit.SECONDS) - - expect: - res == "OK" - assertTraces(1) { - trace(0, 1) { - span(0) { - name "SET" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "SET TESTSETKEY ?" - "$SemanticAttributes.DB_OPERATION" "SET" - } - } - } - } - } - - def "get command chained with thenAccept"() { - setup: - def conds = new AsyncConditions() - Consumer consumer = new Consumer() { - @Override - void accept(String res) { - runWithSpan("callback") { - conds.evaluate { - assert res == "TESTVAL" - } - } - } - } - - when: - runWithSpan("parent") { - RedisFuture redisFuture = asyncCommands.get("TESTKEY") - redisFuture.thenAccept(consumer) - } - - then: - conds.await(10) - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name "GET" - kind CLIENT - childOf(span(0)) - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "GET TESTKEY" - "$SemanticAttributes.DB_OPERATION" "GET" - } - } - span(2) { - name "callback" - kind INTERNAL - childOf(span(0)) - } - } - } - } - - // to make sure instrumentation's chained completion stages won't interfere with user's, while still - // recording metrics - def "get non existent key command with handleAsync and chained with thenApply"() { - setup: - def conds = new AsyncConditions() - String successStr = "KEY MISSING" - BiFunction firstStage = new BiFunction() { - @Override - String apply(String res, Throwable error) { - runWithSpan("callback1") { - conds.evaluate { - assert res == null - assert error == null - } - } - return (res == null ? successStr : res) - } - } - Function secondStage = new Function() { - @Override - Object apply(String input) { - runWithSpan("callback2") { - conds.evaluate { - assert input == successStr - } - } - return null - } - } - - when: - runWithSpan("parent") { - RedisFuture redisFuture = asyncCommands.get("NON_EXISTENT_KEY") - redisFuture.handleAsync(firstStage).thenApply(secondStage) - } - - then: - conds.await(10) - assertTraces(1) { - trace(0, 4) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name "GET" - kind CLIENT - childOf(span(0)) - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "GET NON_EXISTENT_KEY" - "$SemanticAttributes.DB_OPERATION" "GET" - } - } - span(2) { - name "callback1" - kind INTERNAL - childOf(span(0)) - } - span(3) { - name "callback2" - kind INTERNAL - childOf(span(0)) - } - } - } - } - - def "command with no arguments using a biconsumer"() { - setup: - def conds = new AsyncConditions() - BiConsumer biConsumer = new BiConsumer() { - @Override - void accept(String keyRetrieved, Throwable error) { - runWithSpan("callback") { - conds.evaluate { - assert keyRetrieved != null - } - } - } - } - - when: - runWithSpan("parent") { - RedisFuture redisFuture = asyncCommands.randomkey() - redisFuture.whenCompleteAsync(biConsumer) - } - - then: - conds.await(10) - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name "RANDOMKEY" - kind CLIENT - childOf(span(0)) - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "RANDOMKEY" - "$SemanticAttributes.DB_OPERATION" "RANDOMKEY" - } - } - span(2) { - name "callback" - kind INTERNAL - childOf(span(0)) - } - } - } - } - - def "hash set and then nest apply to hash getall"() { - setup: - def conds = new AsyncConditions() - - when: - RedisFuture hmsetFuture = asyncCommands.hmset("TESTHM", testHashMap) - hmsetFuture.thenApplyAsync(new Function() { - @Override - Object apply(String setResult) { - conds.evaluate { - assert setResult == "OK" - } - RedisFuture> hmGetAllFuture = asyncCommands.hgetall("TESTHM") - hmGetAllFuture.exceptionally(new Function>() { - @Override - Map apply(Throwable error) { - println("unexpected:" + error.toString()) - error.printStackTrace() - assert false - return null - } - }) - hmGetAllFuture.thenAccept(new Consumer>() { - @Override - void accept(Map hmGetAllResult) { - conds.evaluate { - assert testHashMap == hmGetAllResult - } - } - }) - return null - } - }) - - then: - conds.await(10) - assertTraces(2) { - trace(0, 1) { - span(0) { - name "HMSET" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "HMSET TESTHM firstname ? lastname ? age ?" - "$SemanticAttributes.DB_OPERATION" "HMSET" - } - } - } - trace(1, 1) { - span(0) { - name "HGETALL" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "HGETALL TESTHM" - "$SemanticAttributes.DB_OPERATION" "HGETALL" - } - } - } - } - } - - def "command completes exceptionally"() { - setup: - // turn off auto flush to complete the command exceptionally manually - asyncCommands.setAutoFlushCommands(false) - def conds = new AsyncConditions() - RedisFuture redisFuture = asyncCommands.del("key1", "key2") - boolean completedExceptionally = ((AsyncCommand) redisFuture).completeExceptionally(new IllegalStateException("TestException")) - redisFuture.exceptionally({ - error -> - conds.evaluate { - assert error != null - assert error instanceof IllegalStateException - assert error.getMessage() == "TestException" - } - throw error - }) - - when: - // now flush and execute the command - asyncCommands.flushCommands() - redisFuture.get() - - then: - conds.await(10) - completedExceptionally == true - thrown Exception - assertTraces(1) { - trace(0, 1) { - span(0) { - name "DEL" - kind CLIENT - status ERROR - errorEvent(IllegalStateException, "TestException") - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "DEL key1 key2" - "$SemanticAttributes.DB_OPERATION" "DEL" - } - } - } - } - } - - def "cancel command before it finishes"() { - setup: - asyncCommands.setAutoFlushCommands(false) - def conds = new AsyncConditions() - RedisFuture redisFuture = runWithSpan("parent") { - asyncCommands.sadd("SKEY", "1", "2") - } - redisFuture.whenCompleteAsync({ - res, error -> - runWithSpan("callback") { - conds.evaluate { - assert error != null - assert error instanceof CancellationException - } - } - }) - - when: - boolean cancelSuccess = redisFuture.cancel(true) - asyncCommands.flushCommands() - - then: - conds.await(10) - cancelSuccess == true - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name "SADD" - kind CLIENT - childOf(span(0)) - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "SADD SKEY ? ?" - "$SemanticAttributes.DB_OPERATION" "SADD" - "lettuce.command.cancelled" true - } - } - span(2) { - name "callback" - kind INTERNAL - childOf(span(0)) - } - } - } - } - - def "debug segfault command (returns void) with no argument should produce span"() { - setup: - asyncCommands.debugSegfault() - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "DEBUG" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "DEBUG SEGFAULT" - "$SemanticAttributes.DB_OPERATION" "DEBUG" - } - } - } - } - } - - - def "shutdown command (returns void) should produce a span"() { - setup: - asyncCommands.shutdown(false) - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "SHUTDOWN" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "SHUTDOWN NOSAVE" - "$SemanticAttributes.DB_OPERATION" "SHUTDOWN" - } - } - } - } - } -} diff --git a/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/groovy/LettuceReactiveClientTest.groovy b/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/groovy/LettuceReactiveClientTest.groovy deleted file mode 100644 index fa6e40350e..0000000000 --- a/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/groovy/LettuceReactiveClientTest.groovy +++ /dev/null @@ -1,430 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.lettuce.core.ClientOptions -import io.lettuce.core.RedisClient -import io.lettuce.core.api.StatefulConnection -import io.lettuce.core.api.reactive.RedisReactiveCommands -import io.lettuce.core.api.sync.RedisCommands -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import io.opentelemetry.semconv.SemanticAttributes -import org.testcontainers.containers.GenericContainer -import reactor.core.scheduler.Schedulers -import spock.lang.Shared -import spock.util.concurrent.AsyncConditions - -import java.util.function.Consumer - -import static io.opentelemetry.api.trace.SpanKind.CLIENT -import static io.opentelemetry.api.trace.SpanKind.INTERNAL - -class LettuceReactiveClientTest extends AgentInstrumentationSpecification { - public static final int DB_INDEX = 0 - // Disable autoreconnect so we do not get stray traces popping up on server shutdown - public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build() - - private static GenericContainer redisServer = new GenericContainer<>("redis:6.2.3-alpine").withExposedPorts(6379) - - @Shared - String embeddedDbUri - - - RedisClient redisClient - StatefulConnection connection - RedisReactiveCommands reactiveCommands - RedisCommands syncCommands - - def setupSpec() { - } - - def setup() { - redisServer.start() - - String host = redisServer.getHost() - int port = redisServer.getMappedPort(6379) - String dbAddr = host + ":" + port + "/" + DB_INDEX - embeddedDbUri = "redis://" + dbAddr - - redisClient = RedisClient.create(embeddedDbUri) - - redisClient.setOptions(CLIENT_OPTIONS) - - connection = redisClient.connect() - reactiveCommands = connection.reactive() - syncCommands = connection.sync() - - syncCommands.set("TESTKEY", "TESTVAL") - - // 1 set + 1 connect trace - ignoreTracesAndClear(2) - } - - def cleanup() { - connection.close() - redisClient.shutdown() - redisServer.stop() - } - - def "set command with subscribe on a defined consumer"() { - setup: - def conds = new AsyncConditions() - Consumer consumer = new Consumer() { - @Override - void accept(String res) { - runWithSpan("callback") { - conds.evaluate { - assert res == "OK" - } - } - } - } - - when: - runWithSpan("parent") { - reactiveCommands.set("TESTSETKEY", "TESTSETVAL").subscribe(consumer) - } - - then: - conds.await(10) - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name "SET" - kind CLIENT - childOf(span(0)) - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "SET TESTSETKEY ?" - "$SemanticAttributes.DB_OPERATION" "SET" - } - } - span(2) { - name "callback" - kind INTERNAL - childOf(span(0)) - } - } - } - } - - def "get command with lambda function"() { - setup: - def conds = new AsyncConditions() - - when: - reactiveCommands.get("TESTKEY").subscribe { res -> conds.evaluate { assert res == "TESTVAL" } } - - then: - conds.await(10) - assertTraces(1) { - trace(0, 1) { - span(0) { - name "GET" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "GET TESTKEY" - "$SemanticAttributes.DB_OPERATION" "GET" - } - } - } - } - } - - // to make sure instrumentation's chained completion stages won't interfere with user's, while still - // recording metrics - def "get non existent key command"() { - setup: - def conds = new AsyncConditions() - final defaultVal = "NOT THIS VALUE" - - when: - runWithSpan("parent") { - reactiveCommands.get("NON_EXISTENT_KEY").defaultIfEmpty(defaultVal).subscribe { - res -> - runWithSpan("callback") { - conds.evaluate { - assert res == defaultVal - } - } - } - } - - then: - conds.await(10) - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name "GET" - kind CLIENT - childOf(span(0)) - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "GET NON_EXISTENT_KEY" - "$SemanticAttributes.DB_OPERATION" "GET" - } - } - span(2) { - name "callback" - kind INTERNAL - childOf(span(0)) - } - } - } - - } - - def "command with no arguments"() { - setup: - def conds = new AsyncConditions() - - when: - reactiveCommands.randomkey().subscribe { - res -> - conds.evaluate { - assert res == "TESTKEY" - } - } - - then: - conds.await(10) - assertTraces(1) { - trace(0, 1) { - span(0) { - name "RANDOMKEY" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "RANDOMKEY" - "$SemanticAttributes.DB_OPERATION" "RANDOMKEY" - } - } - } - } - } - - def "command flux publisher "() { - setup: - reactiveCommands.command().subscribe() - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "COMMAND" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "COMMAND" - "$SemanticAttributes.DB_OPERATION" "COMMAND" - "lettuce.command.results.count" { it > 100 } - } - } - } - } - } - - def "command cancel after 2 on flux publisher "() { - setup: - reactiveCommands.command().take(2).subscribe() - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "COMMAND" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "COMMAND" - "$SemanticAttributes.DB_OPERATION" "COMMAND" - "lettuce.command.cancelled" true - "lettuce.command.results.count" 2 - } - } - } - } - } - - def "non reactive command should not produce span"() { - when: - def res = reactiveCommands.digest(null) - - then: - res != null - traces.size() == 0 - } - - def "debug segfault command (returns mono void) with no argument should produce span"() { - setup: - reactiveCommands.debugSegfault().subscribe() - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "DEBUG" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "DEBUG SEGFAULT" - "$SemanticAttributes.DB_OPERATION" "DEBUG" - } - } - } - } - } - - def "shutdown command (returns void) with argument should produce span"() { - setup: - reactiveCommands.shutdown(false).subscribe() - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "SHUTDOWN" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "SHUTDOWN NOSAVE" - "$SemanticAttributes.DB_OPERATION" "SHUTDOWN" - } - } - } - } - } - - def "blocking subscriber"() { - when: - runWithSpan("test-parent") { - reactiveCommands.set("a", "1") - .then(reactiveCommands.get("a")) - .block() - } - - then: - assertTraces(1) { - trace(0, 3) { - span(0) { - name "test-parent" - attributes { - } - } - span(1) { - name "SET" - kind CLIENT - childOf span(0) - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "SET a ?" - "$SemanticAttributes.DB_OPERATION" "SET" - } - } - span(2) { - name "GET" - kind CLIENT - childOf span(0) - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "GET a" - "$SemanticAttributes.DB_OPERATION" "GET" - } - } - } - } - } - - def "async subscriber"() { - when: - runWithSpan("test-parent") { - reactiveCommands.set("a", "1") - .then(reactiveCommands.get("a")) - .subscribe() - } - - then: - assertTraces(1) { - trace(0, 3) { - span(0) { - name "test-parent" - attributes { - } - } - span(1) { - name "SET" - kind CLIENT - childOf span(0) - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "SET a ?" - "$SemanticAttributes.DB_OPERATION" "SET" - } - } - span(2) { - name "GET" - kind CLIENT - childOf span(0) - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "GET a" - "$SemanticAttributes.DB_OPERATION" "GET" - } - } - } - } - } - - def "async subscriber with specific thread pool"() { - when: - runWithSpan("test-parent") { - reactiveCommands.set("a", "1") - .then(reactiveCommands.get("a")) - .subscribeOn(Schedulers.elastic()) - .subscribe() - } - - then: - assertTraces(1) { - trace(0, 3) { - span(0) { - name "test-parent" - attributes { - } - } - span(1) { - name "SET" - kind CLIENT - childOf span(0) - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "SET a ?" - "$SemanticAttributes.DB_OPERATION" "SET" - } - } - span(2) { - name "GET" - kind CLIENT - childOf span(0) - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "GET a" - "$SemanticAttributes.DB_OPERATION" "GET" - } - } - } - } - } -} diff --git a/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/groovy/LettuceSyncClientTest.groovy b/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/groovy/LettuceSyncClientTest.groovy deleted file mode 100644 index 75c0af254a..0000000000 --- a/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/groovy/LettuceSyncClientTest.groovy +++ /dev/null @@ -1,323 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.lettuce.core.ClientOptions -import io.lettuce.core.RedisClient -import io.lettuce.core.RedisConnectionException -import io.lettuce.core.api.StatefulConnection -import io.lettuce.core.api.sync.RedisCommands -import io.netty.channel.AbstractChannel -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import io.opentelemetry.instrumentation.test.utils.PortUtils -import io.opentelemetry.semconv.SemanticAttributes -import org.testcontainers.containers.GenericContainer -import spock.lang.Shared - -import static io.opentelemetry.api.trace.SpanKind.CLIENT -import static io.opentelemetry.api.trace.StatusCode.ERROR - -class LettuceSyncClientTest extends AgentInstrumentationSpecification { - public static final int DB_INDEX = 0 - // Disable autoreconnect so we do not get stray traces popping up on server shutdown - public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build() - - private static GenericContainer redisServer = new GenericContainer<>("redis:6.2.3-alpine").withExposedPorts(6379) - - @Shared - String host - @Shared - int port - @Shared - int incorrectPort - @Shared - String dbAddr - @Shared - String dbAddrNonExistent - @Shared - String dbUriNonExistent - @Shared - String embeddedDbUri - - @Shared - Map testHashMap = [ - firstname: "John", - lastname : "Doe", - age : "53" - ] - - RedisClient redisClient - StatefulConnection connection - RedisCommands syncCommands - - def setup() { - redisServer.start() - - host = redisServer.getHost() - port = redisServer.getMappedPort(6379) - dbAddr = host + ":" + port + "/" + DB_INDEX - embeddedDbUri = "redis://" + dbAddr - - incorrectPort = PortUtils.findOpenPort() - dbAddrNonExistent = host + ":" + incorrectPort + "/" + DB_INDEX - dbUriNonExistent = "redis://" + dbAddrNonExistent - - redisClient = RedisClient.create(embeddedDbUri) - - connection = redisClient.connect() - syncCommands = connection.sync() - - syncCommands.set("TESTKEY", "TESTVAL") - syncCommands.hmset("TESTHM", testHashMap) - - // 2 sets + 1 connect trace - ignoreTracesAndClear(3) - } - - def cleanup() { - connection.close() - redisServer.stop() - } - - def "connect"() { - setup: - RedisClient testConnectionClient = RedisClient.create(embeddedDbUri) - testConnectionClient.setOptions(CLIENT_OPTIONS) - - when: - StatefulConnection connection = testConnectionClient.connect() - - then: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "CONNECT" - kind CLIENT - attributes { - "$SemanticAttributes.NET_PEER_NAME" host - "$SemanticAttributes.NET_PEER_PORT" port - "$SemanticAttributes.DB_SYSTEM" "redis" - } - } - } - } - - cleanup: - connection.close() - } - - def "connect exception"() { - setup: - RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent) - testConnectionClient.setOptions(CLIENT_OPTIONS) - - when: - testConnectionClient.connect() - - then: - thrown RedisConnectionException - assertTraces(1) { - trace(0, 1) { - span(0) { - name "CONNECT" - kind CLIENT - status ERROR - errorEvent AbstractChannel.AnnotatedConnectException, String - attributes { - "$SemanticAttributes.NET_PEER_NAME" host - "$SemanticAttributes.NET_PEER_PORT" incorrectPort - "$SemanticAttributes.DB_SYSTEM" "redis" - } - } - } - } - } - - def "set command"() { - setup: - String res = syncCommands.set("TESTSETKEY", "TESTSETVAL") - - expect: - res == "OK" - assertTraces(1) { - trace(0, 1) { - span(0) { - name "SET" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "SET TESTSETKEY ?" - "$SemanticAttributes.DB_OPERATION" "SET" - } - } - } - } - } - - def "get command"() { - setup: - String res = syncCommands.get("TESTKEY") - - expect: - res == "TESTVAL" - assertTraces(1) { - trace(0, 1) { - span(0) { - name "GET" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "GET TESTKEY" - "$SemanticAttributes.DB_OPERATION" "GET" - } - } - } - } - } - - def "get non existent key command"() { - setup: - String res = syncCommands.get("NON_EXISTENT_KEY") - - expect: - res == null - assertTraces(1) { - trace(0, 1) { - span(0) { - name "GET" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "GET NON_EXISTENT_KEY" - "$SemanticAttributes.DB_OPERATION" "GET" - } - } - } - } - } - - def "command with no arguments"() { - setup: - def keyRetrieved = syncCommands.randomkey() - - expect: - keyRetrieved != null - assertTraces(1) { - trace(0, 1) { - span(0) { - name "RANDOMKEY" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "RANDOMKEY" - "$SemanticAttributes.DB_OPERATION" "RANDOMKEY" - } - } - } - } - } - - def "list command"() { - setup: - long res = syncCommands.lpush("TESTLIST", "TESTLIST ELEMENT") - - expect: - res == 1 - assertTraces(1) { - trace(0, 1) { - span(0) { - name "LPUSH" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "LPUSH TESTLIST ?" - "$SemanticAttributes.DB_OPERATION" "LPUSH" - } - } - } - } - } - - def "hash set command"() { - setup: - def res = syncCommands.hmset("user", testHashMap) - - expect: - res == "OK" - assertTraces(1) { - trace(0, 1) { - span(0) { - name "HMSET" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "HMSET user firstname ? lastname ? age ?" - "$SemanticAttributes.DB_OPERATION" "HMSET" - } - } - } - } - } - - def "hash getall command"() { - setup: - Map res = syncCommands.hgetall("TESTHM") - - expect: - res == testHashMap - assertTraces(1) { - trace(0, 1) { - span(0) { - name "HGETALL" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "HGETALL TESTHM" - "$SemanticAttributes.DB_OPERATION" "HGETALL" - } - } - } - } - } - - def "debug segfault command (returns void) with no argument should produce span"() { - setup: - syncCommands.debugSegfault() - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "DEBUG" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "DEBUG SEGFAULT" - "$SemanticAttributes.DB_OPERATION" "DEBUG" - } - } - } - } - } - - def "shutdown command (returns void) should produce a span"() { - setup: - syncCommands.shutdown(false) - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "SHUTDOWN" - kind CLIENT - attributes { - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "SHUTDOWN NOSAVE" - "$SemanticAttributes.DB_OPERATION" "SHUTDOWN" - } - } - } - } - } -} diff --git a/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_0/AbstractLettuceClientTest.java b/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_0/AbstractLettuceClientTest.java new file mode 100644 index 0000000000..255833f38e --- /dev/null +++ b/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_0/AbstractLettuceClientTest.java @@ -0,0 +1,78 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.lettuce.v5_0; + +import io.lettuce.core.ClientOptions; +import io.lettuce.core.RedisClient; +import io.lettuce.core.api.StatefulRedisConnection; +import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +abstract class AbstractLettuceClientTest { + + protected static final Logger logger = LoggerFactory.getLogger(AbstractLettuceClientTest.class); + + @RegisterExtension + protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create(); + + protected static final int DB_INDEX = 0; + + // Disable autoreconnect so we do not get stray traces popping up on server shutdown + protected static final ClientOptions CLIENT_OPTIONS = + ClientOptions.builder().autoReconnect(false).build(); + + static final DockerImageName containerImage = DockerImageName.parse("redis:6.2.3-alpine"); + + protected static final GenericContainer redisServer = + new GenericContainer<>(containerImage) + .withExposedPorts(6379) + .withLogConsumer(new Slf4jLogConsumer(logger)) + .waitingFor(Wait.forLogMessage(".*Ready to accept connections.*", 1)); + + protected static RedisClient redisClient; + + protected static StatefulRedisConnection connection; + + protected static String host; + protected static int port; + + protected static String embeddedDbUri; + + protected static StatefulRedisConnection newContainerConnection() { + GenericContainer server = + new GenericContainer<>(containerImage) + .withExposedPorts(6379) + .withLogConsumer(new Slf4jLogConsumer(logger)) + .waitingFor(Wait.forLogMessage(".*Ready to accept connections.*", 1)); + server.start(); + cleanup.deferCleanup(server::stop); + + long serverPort = server.getMappedPort(6379); + + RedisClient client = RedisClient.create("redis://" + host + ":" + serverPort + "/" + DB_INDEX); + client.setOptions(CLIENT_OPTIONS); + cleanup.deferCleanup(client::shutdown); + + StatefulRedisConnection statefulConnection = client.connect(); + cleanup.deferCleanup(statefulConnection); + + // 1 connect trace + testing.waitForTraces(1); + testing.clearData(); + + return statefulConnection; + } +} diff --git a/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_0/LettuceAsyncClientTest.java b/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_0/LettuceAsyncClientTest.java new file mode 100644 index 0000000000..def0d0b1ca --- /dev/null +++ b/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_0/LettuceAsyncClientTest.java @@ -0,0 +1,496 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.lettuce.v5_0; + +import static io.opentelemetry.api.common.AttributeKey.booleanKey; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchException; +import static org.assertj.core.api.Assertions.catchThrowable; +import static org.awaitility.Awaitility.await; + +import com.google.common.collect.ImmutableMap; +import io.lettuce.core.ConnectionFuture; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.codec.Utf8StringCodec; +import io.lettuce.core.protocol.AsyncCommand; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.test.utils.PortUtils; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.SemanticAttributes; +import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import org.assertj.core.api.AbstractAssert; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +@SuppressWarnings("deprecation") // until old http semconv are dropped in 2.0 +class LettuceAsyncClientTest extends AbstractLettuceClientTest { + private static int incorrectPort; + private static String dbUriNonExistent; + + private static final ImmutableMap testHashMap = + ImmutableMap.of( + "firstname", "John", + "lastname", "Doe", + "age", "53"); + + private static RedisAsyncCommands asyncCommands; + + @BeforeAll + static void setUp() { + redisServer.start(); + host = redisServer.getHost(); + port = redisServer.getMappedPort(6379); + embeddedDbUri = "redis://" + host + ":" + port + "/" + DB_INDEX; + + incorrectPort = PortUtils.findOpenPort(); + dbUriNonExistent = "redis://" + host + ":" + incorrectPort + "/" + DB_INDEX; + + redisClient = RedisClient.create(embeddedDbUri); + redisClient.setOptions(CLIENT_OPTIONS); + + connection = redisClient.connect(); + asyncCommands = connection.async(); + RedisCommands syncCommands = connection.sync(); + + syncCommands.set("TESTKEY", "TESTVAL"); + + // 1 set + 1 connect trace + testing.waitForTraces(2); + testing.clearData(); + } + + @AfterAll + static void cleanUp() { + connection.close(); + redisClient.shutdown(); + redisServer.stop(); + } + + @Test + void testConnectUsingGetOnConnectionFuture() throws ExecutionException, InterruptedException { + RedisClient testConnectionClient = RedisClient.create(embeddedDbUri); + testConnectionClient.setOptions(CLIENT_OPTIONS); + + ConnectionFuture> connectionFuture = + testConnectionClient.connectAsync( + new Utf8StringCodec(), new RedisURI(host, port, 3, TimeUnit.SECONDS)); + StatefulRedisConnection connection1 = connectionFuture.get(); + cleanup.deferCleanup(connection1); + cleanup.deferCleanup(testConnectionClient::shutdown); + + assertThat(connection1).isNotNull(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("CONNECT") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NET_PEER_NAME, host), + equalTo(SemanticAttributes.NET_PEER_PORT, port), + equalTo(SemanticAttributes.DB_SYSTEM, "redis")))); + } + + @Test + void testConnectExceptionInsideTheConnectionFuture() { + RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent); + testConnectionClient.setOptions(CLIENT_OPTIONS); + + Exception exception = + catchException( + () -> { + ConnectionFuture> connectionFuture = + testConnectionClient.connectAsync( + new Utf8StringCodec(), + new RedisURI(host, incorrectPort, 3, TimeUnit.SECONDS)); + connectionFuture.get(); + }); + + assertThat(exception).isInstanceOf(ExecutionException.class); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("CONNECT") + .hasKind(SpanKind.CLIENT) + .hasStatus(StatusData.error()) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NET_PEER_NAME, host), + equalTo(SemanticAttributes.NET_PEER_PORT, incorrectPort), + equalTo(SemanticAttributes.DB_SYSTEM, "redis")) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("exception") + .hasAttributesSatisfyingExactly( + equalTo( + AttributeKey.stringKey("exception.type"), + "io.netty.channel.AbstractChannel.AnnotatedConnectException"), + equalTo( + AttributeKey.stringKey("exception.message"), + "Connection refused: localhost/127.0.0.1:" + + incorrectPort), + satisfies( + AttributeKey.stringKey("exception.stacktrace"), + AbstractAssert::isNotNull))))); + } + + @Test + void testSetCommandUsingFutureGetWithTimeout() + throws ExecutionException, InterruptedException, TimeoutException { + RedisFuture redisFuture = asyncCommands.set("TESTSETKEY", "TESTSETVAL"); + String res = redisFuture.get(3, TimeUnit.SECONDS); + + assertThat(res).isEqualTo("OK"); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("SET") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "SET TESTSETKEY ?"), + equalTo(SemanticAttributes.DB_OPERATION, "SET")))); + } + + @Test + void testGetCommandChainedWithThenAccept() + throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture future = new CompletableFuture<>(); + Consumer consumer = + res -> { + testing.runWithSpan("callback", () -> assertThat(res).isEqualTo("TESTVAL")); + future.complete(res); + }; + + testing.runWithSpan( + "parent", + () -> { + RedisFuture redisFuture = asyncCommands.get("TESTKEY"); + redisFuture.thenAccept(consumer); + }); + + assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo("TESTVAL"); + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("GET") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "GET TESTKEY"), + equalTo(SemanticAttributes.DB_OPERATION, "GET")), + span -> + span.hasName("callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)))); + } + + // to make sure instrumentation's chained completion stages won't interfere with user's, while + // still recording spans + @Test + void testGetNonExistentKeyCommandWithHandleAsyncAndChainedWithThenApply() + throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture future = new CompletableFuture<>(); + + String successStr = "KEY MISSING"; + + BiFunction firstStage = + (res, error) -> { + testing.runWithSpan( + "callback1", + () -> { + assertThat(res).isNull(); + assertThat(error).isNull(); + }); + return (res == null ? successStr : res); + }; + Function secondStage = + input -> { + testing.runWithSpan( + "callback2", + () -> { + assertThat(input).isEqualTo(successStr); + future.complete(successStr); + }); + return null; + }; + + testing.runWithSpan( + "parent", + () -> { + RedisFuture redisFuture = asyncCommands.get("NON_EXISTENT_KEY"); + redisFuture.handle(firstStage).thenApply(secondStage); + }); + + assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo(successStr); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("GET") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "GET NON_EXISTENT_KEY"), + equalTo(SemanticAttributes.DB_OPERATION, "GET")), + span -> + span.hasName("callback1") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)), + span -> + span.hasName("callback2") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)))); + } + + @Test + void testCommandWithNoArgumentsUsingBiconsumer() + throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture future = new CompletableFuture<>(); + BiConsumer biConsumer = + (keyRetrieved, error) -> + testing.runWithSpan( + "callback", + () -> { + assertThat(keyRetrieved).isNotNull(); + future.complete(keyRetrieved); + }); + + testing.runWithSpan( + "parent", + () -> { + RedisFuture redisFuture = asyncCommands.randomkey(); + redisFuture.whenCompleteAsync(biConsumer); + }); + + assertThat(future.get(10, TimeUnit.SECONDS)).isNotNull(); + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("RANDOMKEY") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "RANDOMKEY"), + equalTo(SemanticAttributes.DB_OPERATION, "RANDOMKEY")), + span -> + span.hasName("callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)))); + } + + @Test + void testHashSetAndThenNestApplyToHashGetall() + throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture> future = new CompletableFuture<>(); + + RedisFuture hmsetFuture = asyncCommands.hmset("TESTHM", testHashMap); + hmsetFuture.thenApplyAsync( + setResult -> { + // Wait for 'hmset' trace to get written + testing.waitForTraces(1); + + if (!"OK".equals(setResult)) { + future.completeExceptionally(new AssertionError("Wrong hmset result " + setResult)); + return null; + } + + RedisFuture> hmGetAllFuture = asyncCommands.hgetall("TESTHM"); + hmGetAllFuture.whenComplete( + (result, exception) -> { + if (exception != null) { + future.completeExceptionally(exception); + } else { + future.complete(result); + } + }); + return null; + }); + + assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo(testHashMap); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("HMSET") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo( + SemanticAttributes.DB_STATEMENT, + "HMSET TESTHM firstname ? lastname ? age ?"), + equalTo(SemanticAttributes.DB_OPERATION, "HMSET"))), + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("HGETALL") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "HGETALL TESTHM"), + equalTo(SemanticAttributes.DB_OPERATION, "HGETALL")))); + } + + @Test + void testCommandCompletesExceptionally() { + // turn off auto flush to complete the command exceptionally manually + asyncCommands.setAutoFlushCommands(false); + cleanup.deferCleanup(() -> asyncCommands.setAutoFlushCommands(true)); + + RedisFuture redisFuture = asyncCommands.del("key1", "key2"); + boolean completedExceptionally = + ((AsyncCommand) redisFuture) + .completeExceptionally(new IllegalStateException("TestException")); + + redisFuture.exceptionally( + error -> { + assertThat(error).isNotNull(); + assertThat(error).isInstanceOf(IllegalStateException.class); + assertThat(error.getMessage()).isEqualTo("TestException"); + throw new RuntimeException(error); + }); + + asyncCommands.flushCommands(); + Throwable thrown = catchThrowable(redisFuture::get); + + await() + .untilAsserted( + () -> { + assertThat(thrown).isInstanceOf(ExecutionException.class); + assertThat(completedExceptionally).isTrue(); + }); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("DEL") + .hasKind(SpanKind.CLIENT) + .hasStatus(StatusData.error()) + .hasException(new IllegalStateException("TestException")) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "DEL key1 key2"), + equalTo(SemanticAttributes.DB_OPERATION, "DEL")))); + } + + @Test + void testCancelCommandBeforeItFinishes() { + asyncCommands.setAutoFlushCommands(false); + cleanup.deferCleanup(() -> asyncCommands.setAutoFlushCommands(true)); + + RedisFuture redisFuture = + testing.runWithSpan("parent", () -> asyncCommands.sadd("SKEY", "1", "2")); + redisFuture.whenCompleteAsync( + (res, error) -> + testing.runWithSpan( + "callback", + () -> { + assertThat(error).isNotNull(); + assertThat(error).isInstanceOf(CancellationException.class); + })); + + boolean cancelSuccess = redisFuture.cancel(true); + asyncCommands.flushCommands(); + + await().untilAsserted(() -> assertThat(cancelSuccess).isTrue()); + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("parent") + .hasKind(SpanKind.INTERNAL) + .hasNoParent() + .hasAttributes(Attributes.empty()), + span -> + span.hasName("SADD") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "SADD SKEY ? ?"), + equalTo(SemanticAttributes.DB_OPERATION, "SADD"), + equalTo(booleanKey("lettuce.command.cancelled"), true)), + span -> + span.hasName("callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)))); + } + + @Test + void testDebugSegfaultCommandWithNoArgumentShouldProduceSpan() { + // Test Causes redis to crash therefore it needs its own container + try (StatefulRedisConnection statefulConnection = newContainerConnection()) { + RedisAsyncCommands commands = statefulConnection.async(); + commands.debugSegfault(); + } + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("DEBUG") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "DEBUG SEGFAULT"), + equalTo(SemanticAttributes.DB_OPERATION, "DEBUG")))); + } + + @Test + void testShutdownCommandShouldProduceSpan() { + // Test Causes redis to crash therefore it needs its own container + try (StatefulRedisConnection statefulConnection = newContainerConnection()) { + RedisAsyncCommands commands = statefulConnection.async(); + commands.shutdown(false); + } + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("SHUTDOWN") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "SHUTDOWN NOSAVE"), + equalTo(SemanticAttributes.DB_OPERATION, "SHUTDOWN")))); + } +} diff --git a/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_0/LettuceReactiveClientTest.java b/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_0/LettuceReactiveClientTest.java new file mode 100644 index 0000000000..3d05ccf3b8 --- /dev/null +++ b/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_0/LettuceReactiveClientTest.java @@ -0,0 +1,373 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.lettuce.v5_0; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static org.assertj.core.api.Assertions.assertThat; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.reactive.RedisReactiveCommands; +import io.lettuce.core.api.sync.RedisCommands; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.semconv.SemanticAttributes; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import org.assertj.core.api.AbstractBooleanAssert; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import reactor.core.scheduler.Schedulers; + +class LettuceReactiveClientTest extends AbstractLettuceClientTest { + private static RedisReactiveCommands reactiveCommands; + + @BeforeAll + static void setUp() { + redisServer.start(); + + host = redisServer.getHost(); + port = redisServer.getMappedPort(6379); + embeddedDbUri = "redis://" + host + ":" + port + "/" + DB_INDEX; + + redisClient = RedisClient.create(embeddedDbUri); + redisClient.setOptions(CLIENT_OPTIONS); + + connection = redisClient.connect(); + reactiveCommands = connection.reactive(); + RedisCommands syncCommands = connection.sync(); + + syncCommands.set("TESTKEY", "TESTVAL"); + + // 1 set + 1 connect trace + testing.waitForTraces(2); + testing.clearData(); + } + + @AfterAll + static void cleanUp() { + connection.close(); + redisClient.shutdown(); + redisServer.stop(); + } + + @Test + void testSetCommandWithSubscribeOnDefinedConsumer() + throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture future = new CompletableFuture<>(); + + Consumer consumer = + res -> + testing.runWithSpan( + "callback", + () -> { + assertThat(res).isEqualTo("OK"); + future.complete(res); + }); + + testing.runWithSpan( + "parent", () -> reactiveCommands.set("TESTSETKEY", "TESTSETVAL").subscribe(consumer)); + + assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo("OK"); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("SET") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "SET TESTSETKEY ?"), + equalTo(SemanticAttributes.DB_OPERATION, "SET")), + span -> + span.hasName("callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)))); + } + + @Test + void testGetCommandWithLambdaFunction() + throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture future = new CompletableFuture<>(); + + reactiveCommands + .get("TESTKEY") + .subscribe( + res -> { + assertThat(res).isEqualTo("TESTVAL"); + future.complete(res); + }); + + assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo("TESTVAL"); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("GET") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "GET TESTKEY"), + equalTo(SemanticAttributes.DB_OPERATION, "GET")))); + } + + // to make sure instrumentation's chained completion stages won't interfere with user's, while + // still recording spans + @Test + void testGetNonExistentKeyCommand() + throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture future = new CompletableFuture<>(); + String defaultVal = "NOT THIS VALUE"; + + testing.runWithSpan( + "parent", + () -> { + reactiveCommands + .get("NON_EXISTENT_KEY") + .defaultIfEmpty(defaultVal) + .subscribe( + res -> + testing.runWithSpan( + "callback", + () -> { + assertThat(res).isEqualTo(defaultVal); + future.complete(res); + })); + }); + + assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo(defaultVal); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("GET") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "GET NON_EXISTENT_KEY"), + equalTo(SemanticAttributes.DB_OPERATION, "GET")), + span -> + span.hasName("callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)))); + } + + @Test + void testCommandWithNoArguments() + throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture future = new CompletableFuture<>(); + + reactiveCommands + .randomkey() + .subscribe( + res -> { + assertThat(res).isEqualTo("TESTKEY"); + future.complete(res); + }); + + assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo("TESTKEY"); + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("RANDOMKEY") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "RANDOMKEY"), + equalTo(SemanticAttributes.DB_OPERATION, "RANDOMKEY")))); + } + + @Test + void testCommandFluxPublisher() { + reactiveCommands.command().subscribe(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("COMMAND") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "COMMAND"), + equalTo(SemanticAttributes.DB_OPERATION, "COMMAND"), + satisfies( + AttributeKey.longKey("lettuce.command.results.count"), + val -> val.isGreaterThan(100))))); + } + + @Test + void testCommandCancelAfter2OnFluxPublisher() { + reactiveCommands.command().take(2).subscribe(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("COMMAND") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "COMMAND"), + equalTo(SemanticAttributes.DB_OPERATION, "COMMAND"), + satisfies( + AttributeKey.booleanKey("lettuce.command.cancelled"), + AbstractBooleanAssert::isTrue), + satisfies( + AttributeKey.longKey("lettuce.command.results.count"), + val -> val.isEqualTo(2))))); + } + + @Test + void testNonReactiveCommandShouldNotProduceSpan() { + String res = reactiveCommands.digest(null); + + assertThat(res).isNotNull(); + assertThat(testing.spans().size()).isEqualTo(0); + } + + @Test + void testDebugSegfaultCommandReturnsMonoVoidWithNoArgumentShouldProduceSpan() { + // Test Causes redis to crash therefore it needs its own container + try (StatefulRedisConnection statefulConnection = newContainerConnection()) { + RedisReactiveCommands commands = statefulConnection.reactive(); + commands.debugSegfault().subscribe(); + } + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("DEBUG") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "DEBUG SEGFAULT"), + equalTo(SemanticAttributes.DB_OPERATION, "DEBUG")))); + } + + @Test + void testShutdownCommandShouldProduceSpan() { + // Test Causes redis to crash therefore it needs its own container + try (StatefulRedisConnection statefulConnection = newContainerConnection()) { + RedisReactiveCommands commands = statefulConnection.reactive(); + commands.shutdown(false).subscribe(); + } + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("SHUTDOWN") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "SHUTDOWN NOSAVE"), + equalTo(SemanticAttributes.DB_OPERATION, "SHUTDOWN")))); + } + + @Test + void testBlockingSubscriber() { + testing.runWithSpan( + "test-parent", + () -> reactiveCommands.set("a", "1").then(reactiveCommands.get("a")).block()); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test-parent").hasAttributes(Attributes.empty()), + span -> + span.hasName("SET") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "SET a ?"), + equalTo(SemanticAttributes.DB_OPERATION, "SET")), + span -> + span.hasName("GET") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "GET a"), + equalTo(SemanticAttributes.DB_OPERATION, "GET")))); + } + + @Test + void testAsyncSubscriber() { + testing.runWithSpan( + "test-parent", + () -> reactiveCommands.set("a", "1").then(reactiveCommands.get("a")).subscribe()); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test-parent").hasAttributes(Attributes.empty()), + span -> + span.hasName("SET") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "SET a ?"), + equalTo(SemanticAttributes.DB_OPERATION, "SET")), + span -> + span.hasName("GET") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "GET a"), + equalTo(SemanticAttributes.DB_OPERATION, "GET")))); + } + + @Test + void testAsyncSubscriberWithSpecificThreadPool() { + testing.runWithSpan( + "test-parent", + () -> + reactiveCommands + .set("a", "1") + .then(reactiveCommands.get("a")) + .subscribeOn(Schedulers.elastic()) + .subscribe()); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test-parent").hasAttributes(Attributes.empty()), + span -> + span.hasName("SET") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "SET a ?"), + equalTo(SemanticAttributes.DB_OPERATION, "SET")), + span -> + span.hasName("GET") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "GET a"), + equalTo(SemanticAttributes.DB_OPERATION, "GET")))); + } +} diff --git a/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_0/LettuceSyncClientTest.java b/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_0/LettuceSyncClientTest.java new file mode 100644 index 0000000000..4b040a8584 --- /dev/null +++ b/instrumentation/lettuce/lettuce-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_0/LettuceSyncClientTest.java @@ -0,0 +1,291 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.lettuce.v5_0; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchException; + +import com.google.common.collect.ImmutableMap; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisConnectionException; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.sync.RedisCommands; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.test.utils.PortUtils; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.SemanticAttributes; +import java.util.Map; +import org.assertj.core.api.AbstractAssert; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +@SuppressWarnings("deprecation") // until old http semconv are dropped in 2.0 +class LettuceSyncClientTest extends AbstractLettuceClientTest { + private static int incorrectPort; + private static String dbUriNonExistent; + + private static final ImmutableMap testHashMap = + ImmutableMap.of( + "firstname", "John", + "lastname", "Doe", + "age", "53"); + + private static RedisCommands syncCommands; + + @BeforeAll + static void setUp() { + redisServer.start(); + host = redisServer.getHost(); + port = redisServer.getMappedPort(6379); + embeddedDbUri = "redis://" + host + ":" + port + "/" + DB_INDEX; + + incorrectPort = PortUtils.findOpenPort(); + dbUriNonExistent = "redis://" + host + ":" + incorrectPort + "/" + DB_INDEX; + + redisClient = RedisClient.create(embeddedDbUri); + redisClient.setOptions(CLIENT_OPTIONS); + + connection = redisClient.connect(); + syncCommands = connection.sync(); + + syncCommands.set("TESTKEY", "TESTVAL"); + syncCommands.hmset("TESTHM", testHashMap); + + // 2 sets + 1 connect trace + testing.waitForTraces(3); + testing.clearData(); + } + + @AfterAll + static void cleanUp() { + connection.close(); + redisClient.shutdown(); + redisServer.stop(); + } + + @Test + void testConnect() { + RedisClient testConnectionClient = RedisClient.create(embeddedDbUri); + testConnectionClient.setOptions(CLIENT_OPTIONS); + + StatefulRedisConnection testConnection = testConnectionClient.connect(); + cleanup.deferCleanup(testConnection); + cleanup.deferCleanup(testConnectionClient::shutdown); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("CONNECT") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NET_PEER_NAME, host), + equalTo(SemanticAttributes.NET_PEER_PORT, port), + equalTo(SemanticAttributes.DB_SYSTEM, "redis")))); + } + + @Test + void testConnectException() { + RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent); + testConnectionClient.setOptions(CLIENT_OPTIONS); + + Exception exception = catchException(testConnectionClient::connect); + + assertThat(exception).isInstanceOf(RedisConnectionException.class); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("CONNECT") + .hasKind(SpanKind.CLIENT) + .hasStatus(StatusData.error()) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NET_PEER_NAME, host), + equalTo(SemanticAttributes.NET_PEER_PORT, incorrectPort), + equalTo(SemanticAttributes.DB_SYSTEM, "redis")) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("exception") + .hasAttributesSatisfyingExactly( + equalTo( + AttributeKey.stringKey("exception.type"), + "io.netty.channel.AbstractChannel.AnnotatedConnectException"), + equalTo( + AttributeKey.stringKey("exception.message"), + "Connection refused: localhost/127.0.0.1:" + + incorrectPort), + satisfies( + AttributeKey.stringKey("exception.stacktrace"), + AbstractAssert::isNotNull))))); + } + + @Test + void testSetCommand() { + String res = syncCommands.set("TESTSETKEY", "TESTSETVAL"); + assertThat(res).isEqualTo("OK"); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("SET") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "SET TESTSETKEY ?"), + equalTo(SemanticAttributes.DB_OPERATION, "SET")))); + } + + @Test + void testGetCommand() { + String res = syncCommands.get("TESTKEY"); + assertThat(res).isEqualTo("TESTVAL"); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("GET") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "GET TESTKEY"), + equalTo(SemanticAttributes.DB_OPERATION, "GET")))); + } + + @Test + void testGetNonExistentKeyCommand() { + String res = syncCommands.get("NON_EXISTENT_KEY"); + assertThat(res).isNull(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("GET") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "GET NON_EXISTENT_KEY"), + equalTo(SemanticAttributes.DB_OPERATION, "GET")))); + } + + @Test + void testCommandWithNoArguments() { + String res = syncCommands.randomkey(); + assertThat(res).isNotNull(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("RANDOMKEY") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "RANDOMKEY"), + equalTo(SemanticAttributes.DB_OPERATION, "RANDOMKEY")))); + } + + @Test + void testListCommand() { + long res = syncCommands.lpush("TESTLIST", "TESTLIST ELEMENT"); + assertThat(res).isEqualTo(1); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("LPUSH") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "LPUSH TESTLIST ?"), + equalTo(SemanticAttributes.DB_OPERATION, "LPUSH")))); + } + + @Test + void testHashSetCommand() { + String res = syncCommands.hmset("user", testHashMap); + assertThat(res).isEqualTo("OK"); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("HMSET") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo( + SemanticAttributes.DB_STATEMENT, + "HMSET user firstname ? lastname ? age ?"), + equalTo(SemanticAttributes.DB_OPERATION, "HMSET")))); + } + + @Test + void testHashGetallCommand() { + Map res = syncCommands.hgetall("TESTHM"); + assertThat(res).isEqualTo(testHashMap); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("HGETALL") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "HGETALL TESTHM"), + equalTo(SemanticAttributes.DB_OPERATION, "HGETALL")))); + } + + @Test + void testDebugSegfaultCommandWithNoArgumentShouldProduceSpan() { + // Test causes redis to crash therefore it needs its own container + try (StatefulRedisConnection statefulConnection = newContainerConnection()) { + RedisCommands commands = statefulConnection.sync(); + commands.debugSegfault(); + } + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("DEBUG") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "DEBUG SEGFAULT"), + equalTo(SemanticAttributes.DB_OPERATION, "DEBUG")))); + } + + @Test + void testShutdownCommandShouldProduceSpan() { + // Test causes redis to crash therefore it needs its own container + try (StatefulRedisConnection statefulConnection = newContainerConnection()) { + RedisCommands commands = statefulConnection.sync(); + commands.shutdown(false); + } + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("SHUTDOWN") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "SHUTDOWN NOSAVE"), + equalTo(SemanticAttributes.DB_OPERATION, "SHUTDOWN")))); + } +}