diff --git a/instrumentation/lettuce/lettuce-5.1/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/lettuce/v5_1/LettuceAsyncClientTest.groovy b/instrumentation/lettuce/lettuce-5.1/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/lettuce/v5_1/LettuceAsyncClientTest.groovy deleted file mode 100644 index 04fa6160c8..0000000000 --- a/instrumentation/lettuce/lettuce-5.1/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/lettuce/v5_1/LettuceAsyncClientTest.groovy +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.lettuce.v5_1 - -import io.lettuce.core.RedisClient -import io.opentelemetry.instrumentation.lettuce.v5_1.AbstractLettuceAsyncClientTest -import io.opentelemetry.instrumentation.test.AgentTestTrait - -class LettuceAsyncClientTest extends AbstractLettuceAsyncClientTest implements AgentTestTrait { - @Override - RedisClient createClient(String uri) { - return RedisClient.create(uri) - } -} diff --git a/instrumentation/lettuce/lettuce-5.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_1/LettuceAsyncClientTest.java b/instrumentation/lettuce/lettuce-5.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_1/LettuceAsyncClientTest.java new file mode 100644 index 0000000000..595699bdee --- /dev/null +++ b/instrumentation/lettuce/lettuce-5.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/lettuce/v5_1/LettuceAsyncClientTest.java @@ -0,0 +1,27 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.lettuce.v5_1; + +import io.lettuce.core.RedisClient; +import io.opentelemetry.instrumentation.lettuce.v5_1.AbstractLettuceAsyncClientTest; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class LettuceAsyncClientTest extends AbstractLettuceAsyncClientTest { + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + public InstrumentationExtension getInstrumentationExtension() { + return testing; + } + + @Override + protected RedisClient createClient(String uri) { + return RedisClient.create(uri); + } +} diff --git a/instrumentation/lettuce/lettuce-5.1/library/src/test/groovy/io/opentelemetry/instrumentation/lettuce/v5_1/LettuceAsyncClientTest.groovy b/instrumentation/lettuce/lettuce-5.1/library/src/test/groovy/io/opentelemetry/instrumentation/lettuce/v5_1/LettuceAsyncClientTest.groovy deleted file mode 100644 index ce86466732..0000000000 --- a/instrumentation/lettuce/lettuce-5.1/library/src/test/groovy/io/opentelemetry/instrumentation/lettuce/v5_1/LettuceAsyncClientTest.groovy +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.lettuce.v5_1 - -import io.lettuce.core.RedisClient -import io.lettuce.core.resource.ClientResources -import io.opentelemetry.instrumentation.test.LibraryTestTrait - -class LettuceAsyncClientTest extends AbstractLettuceAsyncClientTest implements LibraryTestTrait { - @Override - RedisClient createClient(String uri) { - return RedisClient.create( - ClientResources.builder() - .tracing(LettuceTelemetry.create(getOpenTelemetry()).newTracing()) - .build(), - uri) - } - - @Override - boolean testCallback() { - // context is not propagated into callbacks - return false - } -} diff --git a/instrumentation/lettuce/lettuce-5.1/library/src/test/java/io/opentelemetry/instrumentation/lettuce/v5_1/LettuceAsyncClientTest.java b/instrumentation/lettuce/lettuce-5.1/library/src/test/java/io/opentelemetry/instrumentation/lettuce/v5_1/LettuceAsyncClientTest.java new file mode 100644 index 0000000000..38bf96282f --- /dev/null +++ b/instrumentation/lettuce/lettuce-5.1/library/src/test/java/io/opentelemetry/instrumentation/lettuce/v5_1/LettuceAsyncClientTest.java @@ -0,0 +1,39 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.lettuce.v5_1; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.resource.ClientResources; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class LettuceAsyncClientTest extends AbstractLettuceAsyncClientTest { + @RegisterExtension + static InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @Override + public InstrumentationExtension getInstrumentationExtension() { + return testing; + } + + @Override + protected RedisClient createClient(String uri) { + return RedisClient.create( + ClientResources.builder() + .tracing( + LettuceTelemetry.create(getInstrumentationExtension().getOpenTelemetry()) + .newTracing()) + .build(), + uri); + } + + @Override + boolean testCallback() { + // context is not propagated into callbacks + return false; + } +} diff --git a/instrumentation/lettuce/lettuce-5.1/testing/src/main/groovy/io/opentelemetry/instrumentation/lettuce/v5_1/AbstractLettuceAsyncClientTest.groovy b/instrumentation/lettuce/lettuce-5.1/testing/src/main/groovy/io/opentelemetry/instrumentation/lettuce/v5_1/AbstractLettuceAsyncClientTest.groovy deleted file mode 100644 index 2e60c5e5e4..0000000000 --- a/instrumentation/lettuce/lettuce-5.1/testing/src/main/groovy/io/opentelemetry/instrumentation/lettuce/v5_1/AbstractLettuceAsyncClientTest.groovy +++ /dev/null @@ -1,450 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.lettuce.v5_1 - -import io.lettuce.core.ConnectionFuture -import io.lettuce.core.RedisClient -import io.lettuce.core.RedisFuture -import io.lettuce.core.RedisURI -import io.lettuce.core.api.StatefulConnection -import io.lettuce.core.api.async.RedisAsyncCommands -import io.lettuce.core.api.sync.RedisCommands -import io.lettuce.core.codec.StringCodec -import io.opentelemetry.instrumentation.api.semconv.network.internal.NetworkAttributes -import io.opentelemetry.instrumentation.test.InstrumentationSpecification -import io.opentelemetry.instrumentation.test.utils.PortUtils -import io.opentelemetry.semconv.SemanticAttributes -import org.testcontainers.containers.GenericContainer -import spock.lang.Shared -import spock.util.concurrent.AsyncConditions - -import java.util.concurrent.ExecutionException -import java.util.concurrent.TimeUnit -import java.util.function.BiConsumer -import java.util.function.BiFunction -import java.util.function.Consumer -import java.util.function.Function - -import static io.opentelemetry.api.trace.SpanKind.CLIENT -import static io.opentelemetry.api.trace.SpanKind.INTERNAL - -abstract class AbstractLettuceAsyncClientTest extends InstrumentationSpecification { - public static final int DB_INDEX = 0 - - private static GenericContainer redisServer = new GenericContainer<>("redis:6.2.3-alpine").withExposedPorts(6379) - - abstract RedisClient createClient(String uri) - - @Shared - String host - @Shared - String expectedHostAttributeValue - @Shared - int port - @Shared - int incorrectPort - @Shared - String dbUriNonExistent - @Shared - String embeddedDbUri - - @Shared - Map testHashMap = [ - firstname: "John", - lastname : "Doe", - age : "53" - ] - - RedisClient redisClient - StatefulConnection connection - RedisAsyncCommands asyncCommands - RedisCommands syncCommands - - def setup() { - redisServer.start() - - port = redisServer.getMappedPort(6379) - host = redisServer.getHost() - String dbAddr = host + ":" + port + "/" + DB_INDEX - embeddedDbUri = "redis://" + dbAddr - expectedHostAttributeValue = host == "127.0.0.1" ? null : host - - incorrectPort = PortUtils.findOpenPort() - String dbAddrNonExistent = host + ":" + incorrectPort + "/" + DB_INDEX - dbUriNonExistent = "redis://" + dbAddrNonExistent - - redisClient = createClient(embeddedDbUri) - redisClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS) - - connection = redisClient.connect() - asyncCommands = connection.async() - syncCommands = connection.sync() - - syncCommands.set("TESTKEY", "TESTVAL") - - // 1 set - ignoreTracesAndClear(1) - } - - def cleanup() { - connection.close() - redisClient.shutdown() - redisServer.stop() - } - - boolean testCallback() { - return true - } - - def T runWithCallbackSpan(String spanName, Closure callback) { - if (testCallback()) { - return runWithSpan(spanName, callback) - } - return callback.call() - } - - def "connect using get on ConnectionFuture"() { - setup: - RedisClient testConnectionClient = RedisClient.create(embeddedDbUri) - testConnectionClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS) - - when: - ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8, - RedisURI.create("redis://${host}:${port}?timeout=3s")) - StatefulConnection connection = connectionFuture.get() - - then: - connection != null - // Lettuce tracing does not trace connect - assertTraces(0) {} - - cleanup: - connection.close() - testConnectionClient.shutdown() - } - - def "connect exception inside the connection future"() { - setup: - RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent) - testConnectionClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS) - - when: - ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8, - RedisURI.create("redis://${host}:${incorrectPort}?timeout=3s")) - StatefulConnection connection = connectionFuture.get() - - then: - connection == null - thrown ExecutionException - // Lettuce tracing does not trace connect - assertTraces(0) {} - - cleanup: - testConnectionClient.shutdown() - } - - def "set command using Future get with timeout"() { - setup: - RedisFuture redisFuture = asyncCommands.set("TESTSETKEY", "TESTSETVAL") - String res = redisFuture.get(3, TimeUnit.SECONDS) - - expect: - res == "OK" - assertTraces(1) { - trace(0, 1) { - span(0) { - name "SET" - kind CLIENT - attributes { - "$SemanticAttributes.NETWORK_TYPE" "ipv4" - "$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1" - "$NetworkAttributes.NETWORK_PEER_PORT" port - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "SET TESTSETKEY ?" - } - event(0) { - eventName "redis.encode.start" - } - event(1) { - eventName "redis.encode.end" - } - } - } - } - } - - def "get command chained with thenAccept"() { - setup: - def conds = new AsyncConditions() - Consumer consumer = new Consumer() { - @Override - void accept(String res) { - runWithCallbackSpan("callback") { - conds.evaluate { - assert res == "TESTVAL" - } - } - } - } - - when: - runWithSpan("parent") { - RedisFuture redisFuture = asyncCommands.get("TESTKEY") - redisFuture.thenAccept(consumer) - } - - then: - conds.await(10) - assertTraces(1) { - trace(0, 2 + (testCallback() ? 1 : 0)) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name "GET" - kind CLIENT - attributes { - "$SemanticAttributes.NETWORK_TYPE" "ipv4" - "$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1" - "$NetworkAttributes.NETWORK_PEER_PORT" port - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "GET TESTKEY" - } - event(0) { - eventName "redis.encode.start" - } - event(1) { - eventName "redis.encode.end" - } - } - if (testCallback()) { - span(2) { - name "callback" - kind INTERNAL - childOf(span(0)) - } - } - } - } - } - - // to make sure instrumentation's chained completion stages won't interfere with user's, while still - // recording metrics - def "get non existent key command with handleAsync and chained with thenApply"() { - setup: - def conds = new AsyncConditions() - String successStr = "KEY MISSING" - BiFunction firstStage = new BiFunction() { - @Override - String apply(String res, Throwable throwable) { - runWithCallbackSpan("callback1") { - conds.evaluate { - assert res == null - assert throwable == null - } - } - return (res == null ? successStr : res) - } - } - Function secondStage = new Function() { - @Override - Object apply(String input) { - runWithCallbackSpan("callback2") { - conds.evaluate { - assert input == successStr - } - } - return null - } - } - - when: - runWithSpan("parent") { - RedisFuture redisFuture = asyncCommands.get("NON_EXISTENT_KEY") - redisFuture.handleAsync(firstStage).thenApply(secondStage) - } - - then: - conds.await(10) - assertTraces(1) { - trace(0, 2 + (testCallback() ? 2 : 0)) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name "GET" - kind CLIENT - childOf(span(0)) - attributes { - "$SemanticAttributes.NETWORK_TYPE" "ipv4" - "$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1" - "$NetworkAttributes.NETWORK_PEER_PORT" port - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "GET NON_EXISTENT_KEY" - } - event(0) { - eventName "redis.encode.start" - } - event(1) { - eventName "redis.encode.end" - } - } - if (testCallback()) { - span(2) { - name "callback1" - kind INTERNAL - childOf(span(0)) - } - span(3) { - name "callback2" - kind INTERNAL - childOf(span(0)) - } - } - } - } - } - - def "command with no arguments using a biconsumer"() { - setup: - def conds = new AsyncConditions() - BiConsumer biConsumer = new BiConsumer() { - @Override - void accept(String keyRetrieved, Throwable throwable) { - runWithCallbackSpan("callback") { - conds.evaluate { - assert keyRetrieved != null - } - } - } - } - - when: - runWithSpan("parent") { - RedisFuture redisFuture = asyncCommands.randomkey() - redisFuture.whenCompleteAsync(biConsumer) - } - - then: - conds.await(10) - assertTraces(1) { - trace(0, 2 + (testCallback() ? 1 : 0)) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name "RANDOMKEY" - kind CLIENT - childOf(span(0)) - attributes { - "$SemanticAttributes.NETWORK_TYPE" "ipv4" - "$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1" - "$NetworkAttributes.NETWORK_PEER_PORT" port - "$SemanticAttributes.DB_STATEMENT" "RANDOMKEY" - "$SemanticAttributes.DB_SYSTEM" "redis" - } - event(0) { - eventName "redis.encode.start" - } - event(1) { - eventName "redis.encode.end" - } - } - if (testCallback()) { - span(2) { - name "callback" - kind INTERNAL - childOf(span(0)) - } - } - } - } - } - - def "hash set and then nest apply to hash getall"() { - setup: - def conds = new AsyncConditions() - - when: - RedisFuture hmsetFuture = asyncCommands.hmset("TESTHM", testHashMap) - hmsetFuture.thenApplyAsync(new Function() { - @Override - Object apply(String setResult) { - conds.evaluate { - assert setResult == "OK" - } - RedisFuture> hmGetAllFuture = asyncCommands.hgetall("TESTHM") - hmGetAllFuture.exceptionally(new Function>() { - @Override - Map apply(Throwable throwable) { - println("unexpected:" + throwable.toString()) - throwable.printStackTrace() - assert false - return null - } - }) - hmGetAllFuture.thenAccept(new Consumer>() { - @Override - void accept(Map hmGetAllResult) { - conds.evaluate { - assert testHashMap == hmGetAllResult - } - } - }) - return null - } - }) - - then: - conds.await(10) - assertTraces(2) { - trace(0, 1) { - span(0) { - name "HMSET" - kind CLIENT - attributes { - "$SemanticAttributes.NETWORK_TYPE" "ipv4" - "$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1" - "$NetworkAttributes.NETWORK_PEER_PORT" port - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "HMSET TESTHM firstname ? lastname ? age ?" - } - event(0) { - eventName "redis.encode.start" - } - event(1) { - eventName "redis.encode.end" - } - } - } - trace(1, 1) { - span(0) { - name "HGETALL" - kind CLIENT - attributes { - "$SemanticAttributes.NETWORK_TYPE" "ipv4" - "$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1" - "$NetworkAttributes.NETWORK_PEER_PORT" port - "$SemanticAttributes.DB_SYSTEM" "redis" - "$SemanticAttributes.DB_STATEMENT" "HGETALL TESTHM" - } - event(0) { - eventName "redis.encode.start" - } - event(1) { - eventName "redis.encode.end" - } - } - } - } - } -} diff --git a/instrumentation/lettuce/lettuce-5.1/testing/src/main/java/io/opentelemetry/instrumentation/lettuce/v5_1/AbstractLettuceAsyncClientTest.java b/instrumentation/lettuce/lettuce-5.1/testing/src/main/java/io/opentelemetry/instrumentation/lettuce/v5_1/AbstractLettuceAsyncClientTest.java new file mode 100644 index 0000000000..e50398fbc0 --- /dev/null +++ b/instrumentation/lettuce/lettuce-5.1/testing/src/main/java/io/opentelemetry/instrumentation/lettuce/v5_1/AbstractLettuceAsyncClientTest.java @@ -0,0 +1,411 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.lettuce.v5_1; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; + +import com.google.common.collect.ImmutableMap; +import io.lettuce.core.ConnectionFuture; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.codec.StringCodec; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.api.semconv.network.internal.NetworkAttributes; +import io.opentelemetry.instrumentation.test.utils.PortUtils; +import io.opentelemetry.sdk.testing.assertj.SpanDataAssert; +import io.opentelemetry.semconv.SemanticAttributes; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +@SuppressWarnings({"InterruptedExceptionSwallowed"}) +public abstract class AbstractLettuceAsyncClientTest extends AbstractLettuceClientTest { + private static String dbUriNonExistent; + private static int incorrectPort; + + private static final ImmutableMap testHashMap = + ImmutableMap.of( + "firstname", "John", + "lastname", "Doe", + "age", "53"); + + private static RedisAsyncCommands asyncCommands; + + @BeforeAll + void setUp() { + redisServer.start(); + host = redisServer.getHost(); + port = redisServer.getMappedPort(6379); + embeddedDbUri = "redis://" + host + ":" + port + "/" + DB_INDEX; + + incorrectPort = PortUtils.findOpenPort(); + dbUriNonExistent = "redis://" + host + ":" + incorrectPort + "/" + DB_INDEX; + + redisClient = createClient(embeddedDbUri); + redisClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS); + + connection = redisClient.connect(); + asyncCommands = connection.async(); + RedisCommands syncCommands = connection.sync(); + + syncCommands.set("TESTKEY", "TESTVAL"); + + // 1 set trace + getInstrumentationExtension().waitForTraces(1); + getInstrumentationExtension().clearData(); + } + + @AfterAll + static void cleanUp() { + connection.close(); + redisClient.shutdown(); + redisServer.stop(); + } + + boolean testCallback() { + return true; + } + + @Test + void testConnectUsingGetOnConnectionFuture() throws Exception { + RedisClient testConnectionClient = RedisClient.create(embeddedDbUri); + testConnectionClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS); + + ConnectionFuture> connectionFuture = + testConnectionClient.connectAsync( + StringCodec.UTF8, RedisURI.create("redis://" + host + ":" + port + "?timeout=3s")); + StatefulRedisConnection connection1 = connectionFuture.get(); + cleanup.deferCleanup(connection1); + cleanup.deferCleanup(testConnectionClient::shutdown); + + assertThat(connection1).isNotNull(); + // Lettuce tracing does not trace connect + assertThat(getInstrumentationExtension().spans()).isEmpty(); + } + + @Test + void testConnectExceptionInsideTheConnectionFuture() { + RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent); + testConnectionClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS); + cleanup.deferCleanup(testConnectionClient::shutdown); + + Throwable thrown = + catchThrowable( + () -> { + ConnectionFuture> connectionFuture = + testConnectionClient.connectAsync( + StringCodec.UTF8, + RedisURI.create("redis://" + host + ":" + incorrectPort + "?timeout=3s")); + StatefulRedisConnection connection1 = connectionFuture.get(); + cleanup.deferCleanup(connection1); + assertThat(connection1).isNull(); + }); + + assertThat(thrown).isInstanceOf(ExecutionException.class); + + // Lettuce tracing does not trace connect + assertThat(getInstrumentationExtension().spans()).isEmpty(); + } + + @Test + void testSetCommandUsingFutureGetWithTimeout() throws Exception { + RedisFuture redisFuture = asyncCommands.set("TESTSETKEY", "TESTSETVAL"); + String res = redisFuture.get(3, TimeUnit.SECONDS); + + assertThat(res).isEqualTo("OK"); + + getInstrumentationExtension() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("SET") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"), + equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"), + equalTo(NetworkAttributes.NETWORK_PEER_PORT, port), + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "SET TESTSETKEY ?")) + .hasEventsSatisfyingExactly( + event -> event.hasName("redis.encode.start"), + event -> event.hasName("redis.encode.end")))); + } + + @Test + void testGetCommandChainedWithThenAccept() throws Exception { + CompletableFuture future = new CompletableFuture<>(); + Consumer consumer = + res -> { + if (testCallback()) { + getInstrumentationExtension() + .runWithSpan("callback", () -> assertThat(res).isEqualTo("TESTVAL")); + } + future.complete(res); + }; + + getInstrumentationExtension() + .runWithSpan( + "parent", + () -> { + RedisFuture redisFuture = asyncCommands.get("TESTKEY"); + redisFuture.thenAccept(consumer); + }); + + assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo("TESTVAL"); + + getInstrumentationExtension() + .waitAndAssertTraces( + trace -> { + List> spanAsserts = + new ArrayList<>( + Arrays.asList( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("GET") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"), + equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"), + equalTo(NetworkAttributes.NETWORK_PEER_PORT, port), + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "GET TESTKEY")) + .hasEventsSatisfyingExactly( + event -> event.hasName("redis.encode.start"), + event -> event.hasName("redis.encode.end")))); + + if (testCallback()) { + spanAsserts.add( + span -> + span.hasName("callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0))); + } + trace.hasSpansSatisfyingExactly(spanAsserts); + }); + } + + // to make sure instrumentation's chained completion stages won't interfere with user's, while + // still recording spans + @Test + void testGetNonExistentKeyCommandWithHandleAsyncAndChainedWithThenApply() throws Exception { + CompletableFuture future = new CompletableFuture<>(); + String successStr = "KEY MISSING"; + + BiFunction firstStage = + (res, error) -> { + if (testCallback()) { + getInstrumentationExtension() + .runWithSpan( + "callback1", + () -> { + assertThat(res).isNull(); + assertThat(error).isNull(); + }); + } + return (res == null ? successStr : res); + }; + Function secondStage = + input -> { + if (testCallback()) { + getInstrumentationExtension() + .runWithSpan( + "callback2", + () -> { + assertThat(input).isEqualTo(successStr); + }); + } + future.complete(successStr); + return null; + }; + + getInstrumentationExtension() + .runWithSpan( + "parent", + () -> { + RedisFuture redisFuture = asyncCommands.get("NON_EXISTENT_KEY"); + redisFuture.handleAsync(firstStage).thenApply(secondStage); + }); + + assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo(successStr); + + getInstrumentationExtension() + .waitAndAssertTraces( + trace -> { + List> spanAsserts = + new ArrayList<>( + Arrays.asList( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("GET") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"), + equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"), + equalTo(NetworkAttributes.NETWORK_PEER_PORT, port), + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo( + SemanticAttributes.DB_STATEMENT, "GET NON_EXISTENT_KEY")) + .hasEventsSatisfyingExactly( + event -> event.hasName("redis.encode.start"), + event -> event.hasName("redis.encode.end")))); + + if (testCallback()) { + spanAsserts.addAll( + Arrays.asList( + span -> + span.hasName("callback1") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)), + span -> + span.hasName("callback2") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)))); + } + trace.hasSpansSatisfyingExactly(spanAsserts); + }); + } + + @Test + void testCommandWithNoArgumentsUsingBiconsumer() throws Exception { + CompletableFuture future = new CompletableFuture<>(); + BiConsumer biConsumer = + (keyRetrieved, error) -> { + if (testCallback()) { + getInstrumentationExtension() + .runWithSpan( + "callback", + () -> { + assertThat(keyRetrieved).isNotNull(); + }); + } + future.complete(keyRetrieved); + }; + + getInstrumentationExtension() + .runWithSpan( + "parent", + () -> { + RedisFuture redisFuture = asyncCommands.randomkey(); + redisFuture.whenCompleteAsync(biConsumer); + }); + + assertThat(future.get(10, TimeUnit.SECONDS)).isNotNull(); + + getInstrumentationExtension() + .waitAndAssertTraces( + trace -> { + List> spanAsserts = + new ArrayList<>( + Arrays.asList( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("RANDOMKEY") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"), + equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"), + equalTo(NetworkAttributes.NETWORK_PEER_PORT, port), + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "RANDOMKEY")) + .hasEventsSatisfyingExactly( + event -> event.hasName("redis.encode.start"), + event -> event.hasName("redis.encode.end")))); + + if (testCallback()) { + spanAsserts.add( + span -> + span.hasName("callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0))); + } + trace.hasSpansSatisfyingExactly(spanAsserts); + }); + } + + @Test + void testHashSetAndThenNestApplyToHashGetall() throws Exception { + CompletableFuture> future = new CompletableFuture<>(); + + RedisFuture hmsetFuture = asyncCommands.hmset("TESTHM", testHashMap); + hmsetFuture.thenApplyAsync( + setResult -> { + // Wait for 'hmset' trace to get written + getInstrumentationExtension().waitForTraces(1); + + if (!"OK".equals(setResult)) { + future.completeExceptionally(new AssertionError("Wrong hmset result " + setResult)); + return null; + } + + RedisFuture> hmGetAllFuture = asyncCommands.hgetall("TESTHM"); + hmGetAllFuture.whenComplete( + (result, exception) -> { + if (exception != null) { + future.completeExceptionally(exception); + } else { + future.complete(result); + } + }); + return null; + }); + + assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo(testHashMap); + + getInstrumentationExtension() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("HMSET") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"), + equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"), + equalTo(NetworkAttributes.NETWORK_PEER_PORT, port), + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo( + SemanticAttributes.DB_STATEMENT, + "HMSET TESTHM firstname ? lastname ? age ?")) + .hasEventsSatisfyingExactly( + event -> event.hasName("redis.encode.start"), + event -> event.hasName("redis.encode.end"))), + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("HGETALL") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"), + equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"), + equalTo(NetworkAttributes.NETWORK_PEER_PORT, port), + equalTo(SemanticAttributes.DB_SYSTEM, "redis"), + equalTo(SemanticAttributes.DB_STATEMENT, "HGETALL TESTHM")) + .hasEventsSatisfyingExactly( + event -> event.hasName("redis.encode.start"), + event -> event.hasName("redis.encode.end")))); + } +}