Convert lettuce-5.0 tests from groovy to java (#9547)
This commit is contained in:
parent
e69d1c234a
commit
2c4022450a
|
@ -1,544 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright The OpenTelemetry Authors
|
|
||||||
* SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
import io.lettuce.core.ClientOptions
|
|
||||||
import io.lettuce.core.ConnectionFuture
|
|
||||||
import io.lettuce.core.RedisClient
|
|
||||||
import io.lettuce.core.RedisFuture
|
|
||||||
import io.lettuce.core.RedisURI
|
|
||||||
import io.lettuce.core.api.StatefulConnection
|
|
||||||
import io.lettuce.core.api.async.RedisAsyncCommands
|
|
||||||
import io.lettuce.core.api.sync.RedisCommands
|
|
||||||
import io.lettuce.core.codec.StringCodec
|
|
||||||
import io.lettuce.core.protocol.AsyncCommand
|
|
||||||
import io.netty.channel.AbstractChannel
|
|
||||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
|
||||||
import io.opentelemetry.instrumentation.test.utils.PortUtils
|
|
||||||
import io.opentelemetry.semconv.SemanticAttributes
|
|
||||||
import org.testcontainers.containers.GenericContainer
|
|
||||||
import spock.lang.Shared
|
|
||||||
import spock.util.concurrent.AsyncConditions
|
|
||||||
|
|
||||||
import java.util.concurrent.CancellationException
|
|
||||||
import java.util.concurrent.ExecutionException
|
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
import java.util.function.BiConsumer
|
|
||||||
import java.util.function.BiFunction
|
|
||||||
import java.util.function.Consumer
|
|
||||||
import java.util.function.Function
|
|
||||||
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
|
||||||
import static io.opentelemetry.api.trace.StatusCode.ERROR
|
|
||||||
|
|
||||||
class LettuceAsyncClientTest extends AgentInstrumentationSpecification {
|
|
||||||
public static final int DB_INDEX = 0
|
|
||||||
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
|
|
||||||
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()
|
|
||||||
|
|
||||||
private static GenericContainer redisServer = new GenericContainer<>("redis:6.2.3-alpine").withExposedPorts(6379)
|
|
||||||
|
|
||||||
@Shared
|
|
||||||
String host
|
|
||||||
@Shared
|
|
||||||
int port
|
|
||||||
@Shared
|
|
||||||
int incorrectPort
|
|
||||||
@Shared
|
|
||||||
String dbAddr
|
|
||||||
@Shared
|
|
||||||
String dbAddrNonExistent
|
|
||||||
@Shared
|
|
||||||
String dbUriNonExistent
|
|
||||||
@Shared
|
|
||||||
String embeddedDbUri
|
|
||||||
|
|
||||||
@Shared
|
|
||||||
Map<String, String> testHashMap = [
|
|
||||||
firstname: "John",
|
|
||||||
lastname : "Doe",
|
|
||||||
age : "53"
|
|
||||||
]
|
|
||||||
|
|
||||||
RedisClient redisClient
|
|
||||||
StatefulConnection connection
|
|
||||||
RedisAsyncCommands<String, ?> asyncCommands
|
|
||||||
RedisCommands<String, ?> syncCommands
|
|
||||||
|
|
||||||
def setup() {
|
|
||||||
redisServer.start()
|
|
||||||
|
|
||||||
host = redisServer.getHost()
|
|
||||||
port = redisServer.getMappedPort(6379)
|
|
||||||
dbAddr = host + ":" + port + "/" + DB_INDEX
|
|
||||||
embeddedDbUri = "redis://" + dbAddr
|
|
||||||
|
|
||||||
incorrectPort = PortUtils.findOpenPort()
|
|
||||||
dbAddrNonExistent = host + ":" + incorrectPort + "/" + DB_INDEX
|
|
||||||
dbUriNonExistent = "redis://" + dbAddrNonExistent
|
|
||||||
|
|
||||||
redisClient = RedisClient.create(embeddedDbUri)
|
|
||||||
|
|
||||||
redisClient.setOptions(CLIENT_OPTIONS)
|
|
||||||
|
|
||||||
connection = redisClient.connect()
|
|
||||||
asyncCommands = connection.async()
|
|
||||||
syncCommands = connection.sync()
|
|
||||||
|
|
||||||
syncCommands.set("TESTKEY", "TESTVAL")
|
|
||||||
|
|
||||||
// 1 set + 1 connect trace
|
|
||||||
ignoreTracesAndClear(2)
|
|
||||||
}
|
|
||||||
|
|
||||||
def cleanup() {
|
|
||||||
connection.close()
|
|
||||||
redisServer.stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
def "connect using get on ConnectionFuture"() {
|
|
||||||
setup:
|
|
||||||
RedisClient testConnectionClient = RedisClient.create(embeddedDbUri)
|
|
||||||
testConnectionClient.setOptions(CLIENT_OPTIONS)
|
|
||||||
|
|
||||||
when:
|
|
||||||
ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8,
|
|
||||||
new RedisURI(host, port, 3, TimeUnit.SECONDS))
|
|
||||||
StatefulConnection connection = connectionFuture.get()
|
|
||||||
|
|
||||||
then:
|
|
||||||
connection != null
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "CONNECT"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.NET_PEER_NAME" host
|
|
||||||
"$SemanticAttributes.NET_PEER_PORT" port
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
connection.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
def "connect exception inside the connection future"() {
|
|
||||||
setup:
|
|
||||||
RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent)
|
|
||||||
testConnectionClient.setOptions(CLIENT_OPTIONS)
|
|
||||||
|
|
||||||
when:
|
|
||||||
ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8,
|
|
||||||
new RedisURI(host, incorrectPort, 3, TimeUnit.SECONDS))
|
|
||||||
StatefulConnection connection = connectionFuture.get()
|
|
||||||
|
|
||||||
then:
|
|
||||||
connection == null
|
|
||||||
thrown ExecutionException
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "CONNECT"
|
|
||||||
kind CLIENT
|
|
||||||
status ERROR
|
|
||||||
errorEvent AbstractChannel.AnnotatedConnectException, String
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.NET_PEER_NAME" host
|
|
||||||
"$SemanticAttributes.NET_PEER_PORT" incorrectPort
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "set command using Future get with timeout"() {
|
|
||||||
setup:
|
|
||||||
RedisFuture<String> redisFuture = asyncCommands.set("TESTSETKEY", "TESTSETVAL")
|
|
||||||
String res = redisFuture.get(3, TimeUnit.SECONDS)
|
|
||||||
|
|
||||||
expect:
|
|
||||||
res == "OK"
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "SET"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "SET TESTSETKEY ?"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "SET"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "get command chained with thenAccept"() {
|
|
||||||
setup:
|
|
||||||
def conds = new AsyncConditions()
|
|
||||||
Consumer<String> consumer = new Consumer<String>() {
|
|
||||||
@Override
|
|
||||||
void accept(String res) {
|
|
||||||
runWithSpan("callback") {
|
|
||||||
conds.evaluate {
|
|
||||||
assert res == "TESTVAL"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
when:
|
|
||||||
runWithSpan("parent") {
|
|
||||||
RedisFuture<String> redisFuture = asyncCommands.get("TESTKEY")
|
|
||||||
redisFuture.thenAccept(consumer)
|
|
||||||
}
|
|
||||||
|
|
||||||
then:
|
|
||||||
conds.await(10)
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 3) {
|
|
||||||
span(0) {
|
|
||||||
name "parent"
|
|
||||||
kind INTERNAL
|
|
||||||
hasNoParent()
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name "GET"
|
|
||||||
kind CLIENT
|
|
||||||
childOf(span(0))
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "GET TESTKEY"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "GET"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(2) {
|
|
||||||
name "callback"
|
|
||||||
kind INTERNAL
|
|
||||||
childOf(span(0))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// to make sure instrumentation's chained completion stages won't interfere with user's, while still
|
|
||||||
// recording metrics
|
|
||||||
def "get non existent key command with handleAsync and chained with thenApply"() {
|
|
||||||
setup:
|
|
||||||
def conds = new AsyncConditions()
|
|
||||||
String successStr = "KEY MISSING"
|
|
||||||
BiFunction<String, Throwable, String> firstStage = new BiFunction<String, Throwable, String>() {
|
|
||||||
@Override
|
|
||||||
String apply(String res, Throwable error) {
|
|
||||||
runWithSpan("callback1") {
|
|
||||||
conds.evaluate {
|
|
||||||
assert res == null
|
|
||||||
assert error == null
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return (res == null ? successStr : res)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Function<String, Object> secondStage = new Function<String, Object>() {
|
|
||||||
@Override
|
|
||||||
Object apply(String input) {
|
|
||||||
runWithSpan("callback2") {
|
|
||||||
conds.evaluate {
|
|
||||||
assert input == successStr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
when:
|
|
||||||
runWithSpan("parent") {
|
|
||||||
RedisFuture<String> redisFuture = asyncCommands.get("NON_EXISTENT_KEY")
|
|
||||||
redisFuture.handleAsync(firstStage).thenApply(secondStage)
|
|
||||||
}
|
|
||||||
|
|
||||||
then:
|
|
||||||
conds.await(10)
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 4) {
|
|
||||||
span(0) {
|
|
||||||
name "parent"
|
|
||||||
kind INTERNAL
|
|
||||||
hasNoParent()
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name "GET"
|
|
||||||
kind CLIENT
|
|
||||||
childOf(span(0))
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "GET NON_EXISTENT_KEY"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "GET"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(2) {
|
|
||||||
name "callback1"
|
|
||||||
kind INTERNAL
|
|
||||||
childOf(span(0))
|
|
||||||
}
|
|
||||||
span(3) {
|
|
||||||
name "callback2"
|
|
||||||
kind INTERNAL
|
|
||||||
childOf(span(0))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "command with no arguments using a biconsumer"() {
|
|
||||||
setup:
|
|
||||||
def conds = new AsyncConditions()
|
|
||||||
BiConsumer<String, Throwable> biConsumer = new BiConsumer<String, Throwable>() {
|
|
||||||
@Override
|
|
||||||
void accept(String keyRetrieved, Throwable error) {
|
|
||||||
runWithSpan("callback") {
|
|
||||||
conds.evaluate {
|
|
||||||
assert keyRetrieved != null
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
when:
|
|
||||||
runWithSpan("parent") {
|
|
||||||
RedisFuture<String> redisFuture = asyncCommands.randomkey()
|
|
||||||
redisFuture.whenCompleteAsync(biConsumer)
|
|
||||||
}
|
|
||||||
|
|
||||||
then:
|
|
||||||
conds.await(10)
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 3) {
|
|
||||||
span(0) {
|
|
||||||
name "parent"
|
|
||||||
kind INTERNAL
|
|
||||||
hasNoParent()
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name "RANDOMKEY"
|
|
||||||
kind CLIENT
|
|
||||||
childOf(span(0))
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "RANDOMKEY"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "RANDOMKEY"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(2) {
|
|
||||||
name "callback"
|
|
||||||
kind INTERNAL
|
|
||||||
childOf(span(0))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "hash set and then nest apply to hash getall"() {
|
|
||||||
setup:
|
|
||||||
def conds = new AsyncConditions()
|
|
||||||
|
|
||||||
when:
|
|
||||||
RedisFuture<String> hmsetFuture = asyncCommands.hmset("TESTHM", testHashMap)
|
|
||||||
hmsetFuture.thenApplyAsync(new Function<String, Object>() {
|
|
||||||
@Override
|
|
||||||
Object apply(String setResult) {
|
|
||||||
conds.evaluate {
|
|
||||||
assert setResult == "OK"
|
|
||||||
}
|
|
||||||
RedisFuture<Map<String, String>> hmGetAllFuture = asyncCommands.hgetall("TESTHM")
|
|
||||||
hmGetAllFuture.exceptionally(new Function<Throwable, Map<String, String>>() {
|
|
||||||
@Override
|
|
||||||
Map<String, String> apply(Throwable error) {
|
|
||||||
println("unexpected:" + error.toString())
|
|
||||||
error.printStackTrace()
|
|
||||||
assert false
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
})
|
|
||||||
hmGetAllFuture.thenAccept(new Consumer<Map<String, String>>() {
|
|
||||||
@Override
|
|
||||||
void accept(Map<String, String> hmGetAllResult) {
|
|
||||||
conds.evaluate {
|
|
||||||
assert testHashMap == hmGetAllResult
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
then:
|
|
||||||
conds.await(10)
|
|
||||||
assertTraces(2) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "HMSET"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "HMSET TESTHM firstname ? lastname ? age ?"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "HMSET"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
trace(1, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "HGETALL"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "HGETALL TESTHM"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "HGETALL"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "command completes exceptionally"() {
|
|
||||||
setup:
|
|
||||||
// turn off auto flush to complete the command exceptionally manually
|
|
||||||
asyncCommands.setAutoFlushCommands(false)
|
|
||||||
def conds = new AsyncConditions()
|
|
||||||
RedisFuture redisFuture = asyncCommands.del("key1", "key2")
|
|
||||||
boolean completedExceptionally = ((AsyncCommand) redisFuture).completeExceptionally(new IllegalStateException("TestException"))
|
|
||||||
redisFuture.exceptionally({
|
|
||||||
error ->
|
|
||||||
conds.evaluate {
|
|
||||||
assert error != null
|
|
||||||
assert error instanceof IllegalStateException
|
|
||||||
assert error.getMessage() == "TestException"
|
|
||||||
}
|
|
||||||
throw error
|
|
||||||
})
|
|
||||||
|
|
||||||
when:
|
|
||||||
// now flush and execute the command
|
|
||||||
asyncCommands.flushCommands()
|
|
||||||
redisFuture.get()
|
|
||||||
|
|
||||||
then:
|
|
||||||
conds.await(10)
|
|
||||||
completedExceptionally == true
|
|
||||||
thrown Exception
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "DEL"
|
|
||||||
kind CLIENT
|
|
||||||
status ERROR
|
|
||||||
errorEvent(IllegalStateException, "TestException")
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "DEL key1 key2"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "DEL"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "cancel command before it finishes"() {
|
|
||||||
setup:
|
|
||||||
asyncCommands.setAutoFlushCommands(false)
|
|
||||||
def conds = new AsyncConditions()
|
|
||||||
RedisFuture redisFuture = runWithSpan("parent") {
|
|
||||||
asyncCommands.sadd("SKEY", "1", "2")
|
|
||||||
}
|
|
||||||
redisFuture.whenCompleteAsync({
|
|
||||||
res, error ->
|
|
||||||
runWithSpan("callback") {
|
|
||||||
conds.evaluate {
|
|
||||||
assert error != null
|
|
||||||
assert error instanceof CancellationException
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
when:
|
|
||||||
boolean cancelSuccess = redisFuture.cancel(true)
|
|
||||||
asyncCommands.flushCommands()
|
|
||||||
|
|
||||||
then:
|
|
||||||
conds.await(10)
|
|
||||||
cancelSuccess == true
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 3) {
|
|
||||||
span(0) {
|
|
||||||
name "parent"
|
|
||||||
kind INTERNAL
|
|
||||||
hasNoParent()
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name "SADD"
|
|
||||||
kind CLIENT
|
|
||||||
childOf(span(0))
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "SADD SKEY ? ?"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "SADD"
|
|
||||||
"lettuce.command.cancelled" true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(2) {
|
|
||||||
name "callback"
|
|
||||||
kind INTERNAL
|
|
||||||
childOf(span(0))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "debug segfault command (returns void) with no argument should produce span"() {
|
|
||||||
setup:
|
|
||||||
asyncCommands.debugSegfault()
|
|
||||||
|
|
||||||
expect:
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "DEBUG"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "DEBUG SEGFAULT"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "DEBUG"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def "shutdown command (returns void) should produce a span"() {
|
|
||||||
setup:
|
|
||||||
asyncCommands.shutdown(false)
|
|
||||||
|
|
||||||
expect:
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "SHUTDOWN"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "SHUTDOWN NOSAVE"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "SHUTDOWN"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,430 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright The OpenTelemetry Authors
|
|
||||||
* SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
import io.lettuce.core.ClientOptions
|
|
||||||
import io.lettuce.core.RedisClient
|
|
||||||
import io.lettuce.core.api.StatefulConnection
|
|
||||||
import io.lettuce.core.api.reactive.RedisReactiveCommands
|
|
||||||
import io.lettuce.core.api.sync.RedisCommands
|
|
||||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
|
||||||
import io.opentelemetry.semconv.SemanticAttributes
|
|
||||||
import org.testcontainers.containers.GenericContainer
|
|
||||||
import reactor.core.scheduler.Schedulers
|
|
||||||
import spock.lang.Shared
|
|
||||||
import spock.util.concurrent.AsyncConditions
|
|
||||||
|
|
||||||
import java.util.function.Consumer
|
|
||||||
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
|
||||||
|
|
||||||
class LettuceReactiveClientTest extends AgentInstrumentationSpecification {
|
|
||||||
public static final int DB_INDEX = 0
|
|
||||||
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
|
|
||||||
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()
|
|
||||||
|
|
||||||
private static GenericContainer redisServer = new GenericContainer<>("redis:6.2.3-alpine").withExposedPorts(6379)
|
|
||||||
|
|
||||||
@Shared
|
|
||||||
String embeddedDbUri
|
|
||||||
|
|
||||||
|
|
||||||
RedisClient redisClient
|
|
||||||
StatefulConnection connection
|
|
||||||
RedisReactiveCommands<String, ?> reactiveCommands
|
|
||||||
RedisCommands<String, ?> syncCommands
|
|
||||||
|
|
||||||
def setupSpec() {
|
|
||||||
}
|
|
||||||
|
|
||||||
def setup() {
|
|
||||||
redisServer.start()
|
|
||||||
|
|
||||||
String host = redisServer.getHost()
|
|
||||||
int port = redisServer.getMappedPort(6379)
|
|
||||||
String dbAddr = host + ":" + port + "/" + DB_INDEX
|
|
||||||
embeddedDbUri = "redis://" + dbAddr
|
|
||||||
|
|
||||||
redisClient = RedisClient.create(embeddedDbUri)
|
|
||||||
|
|
||||||
redisClient.setOptions(CLIENT_OPTIONS)
|
|
||||||
|
|
||||||
connection = redisClient.connect()
|
|
||||||
reactiveCommands = connection.reactive()
|
|
||||||
syncCommands = connection.sync()
|
|
||||||
|
|
||||||
syncCommands.set("TESTKEY", "TESTVAL")
|
|
||||||
|
|
||||||
// 1 set + 1 connect trace
|
|
||||||
ignoreTracesAndClear(2)
|
|
||||||
}
|
|
||||||
|
|
||||||
def cleanup() {
|
|
||||||
connection.close()
|
|
||||||
redisClient.shutdown()
|
|
||||||
redisServer.stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
def "set command with subscribe on a defined consumer"() {
|
|
||||||
setup:
|
|
||||||
def conds = new AsyncConditions()
|
|
||||||
Consumer<String> consumer = new Consumer<String>() {
|
|
||||||
@Override
|
|
||||||
void accept(String res) {
|
|
||||||
runWithSpan("callback") {
|
|
||||||
conds.evaluate {
|
|
||||||
assert res == "OK"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
when:
|
|
||||||
runWithSpan("parent") {
|
|
||||||
reactiveCommands.set("TESTSETKEY", "TESTSETVAL").subscribe(consumer)
|
|
||||||
}
|
|
||||||
|
|
||||||
then:
|
|
||||||
conds.await(10)
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 3) {
|
|
||||||
span(0) {
|
|
||||||
name "parent"
|
|
||||||
kind INTERNAL
|
|
||||||
hasNoParent()
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name "SET"
|
|
||||||
kind CLIENT
|
|
||||||
childOf(span(0))
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "SET TESTSETKEY ?"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "SET"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(2) {
|
|
||||||
name "callback"
|
|
||||||
kind INTERNAL
|
|
||||||
childOf(span(0))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "get command with lambda function"() {
|
|
||||||
setup:
|
|
||||||
def conds = new AsyncConditions()
|
|
||||||
|
|
||||||
when:
|
|
||||||
reactiveCommands.get("TESTKEY").subscribe { res -> conds.evaluate { assert res == "TESTVAL" } }
|
|
||||||
|
|
||||||
then:
|
|
||||||
conds.await(10)
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "GET"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "GET TESTKEY"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "GET"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// to make sure instrumentation's chained completion stages won't interfere with user's, while still
|
|
||||||
// recording metrics
|
|
||||||
def "get non existent key command"() {
|
|
||||||
setup:
|
|
||||||
def conds = new AsyncConditions()
|
|
||||||
final defaultVal = "NOT THIS VALUE"
|
|
||||||
|
|
||||||
when:
|
|
||||||
runWithSpan("parent") {
|
|
||||||
reactiveCommands.get("NON_EXISTENT_KEY").defaultIfEmpty(defaultVal).subscribe {
|
|
||||||
res ->
|
|
||||||
runWithSpan("callback") {
|
|
||||||
conds.evaluate {
|
|
||||||
assert res == defaultVal
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
then:
|
|
||||||
conds.await(10)
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 3) {
|
|
||||||
span(0) {
|
|
||||||
name "parent"
|
|
||||||
kind INTERNAL
|
|
||||||
hasNoParent()
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name "GET"
|
|
||||||
kind CLIENT
|
|
||||||
childOf(span(0))
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "GET NON_EXISTENT_KEY"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "GET"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(2) {
|
|
||||||
name "callback"
|
|
||||||
kind INTERNAL
|
|
||||||
childOf(span(0))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
def "command with no arguments"() {
|
|
||||||
setup:
|
|
||||||
def conds = new AsyncConditions()
|
|
||||||
|
|
||||||
when:
|
|
||||||
reactiveCommands.randomkey().subscribe {
|
|
||||||
res ->
|
|
||||||
conds.evaluate {
|
|
||||||
assert res == "TESTKEY"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
then:
|
|
||||||
conds.await(10)
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "RANDOMKEY"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "RANDOMKEY"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "RANDOMKEY"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "command flux publisher "() {
|
|
||||||
setup:
|
|
||||||
reactiveCommands.command().subscribe()
|
|
||||||
|
|
||||||
expect:
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "COMMAND"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "COMMAND"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "COMMAND"
|
|
||||||
"lettuce.command.results.count" { it > 100 }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "command cancel after 2 on flux publisher "() {
|
|
||||||
setup:
|
|
||||||
reactiveCommands.command().take(2).subscribe()
|
|
||||||
|
|
||||||
expect:
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "COMMAND"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "COMMAND"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "COMMAND"
|
|
||||||
"lettuce.command.cancelled" true
|
|
||||||
"lettuce.command.results.count" 2
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "non reactive command should not produce span"() {
|
|
||||||
when:
|
|
||||||
def res = reactiveCommands.digest(null)
|
|
||||||
|
|
||||||
then:
|
|
||||||
res != null
|
|
||||||
traces.size() == 0
|
|
||||||
}
|
|
||||||
|
|
||||||
def "debug segfault command (returns mono void) with no argument should produce span"() {
|
|
||||||
setup:
|
|
||||||
reactiveCommands.debugSegfault().subscribe()
|
|
||||||
|
|
||||||
expect:
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "DEBUG"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "DEBUG SEGFAULT"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "DEBUG"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "shutdown command (returns void) with argument should produce span"() {
|
|
||||||
setup:
|
|
||||||
reactiveCommands.shutdown(false).subscribe()
|
|
||||||
|
|
||||||
expect:
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "SHUTDOWN"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "SHUTDOWN NOSAVE"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "SHUTDOWN"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "blocking subscriber"() {
|
|
||||||
when:
|
|
||||||
runWithSpan("test-parent") {
|
|
||||||
reactiveCommands.set("a", "1")
|
|
||||||
.then(reactiveCommands.get("a"))
|
|
||||||
.block()
|
|
||||||
}
|
|
||||||
|
|
||||||
then:
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 3) {
|
|
||||||
span(0) {
|
|
||||||
name "test-parent"
|
|
||||||
attributes {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name "SET"
|
|
||||||
kind CLIENT
|
|
||||||
childOf span(0)
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "SET a ?"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "SET"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(2) {
|
|
||||||
name "GET"
|
|
||||||
kind CLIENT
|
|
||||||
childOf span(0)
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "GET a"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "GET"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "async subscriber"() {
|
|
||||||
when:
|
|
||||||
runWithSpan("test-parent") {
|
|
||||||
reactiveCommands.set("a", "1")
|
|
||||||
.then(reactiveCommands.get("a"))
|
|
||||||
.subscribe()
|
|
||||||
}
|
|
||||||
|
|
||||||
then:
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 3) {
|
|
||||||
span(0) {
|
|
||||||
name "test-parent"
|
|
||||||
attributes {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name "SET"
|
|
||||||
kind CLIENT
|
|
||||||
childOf span(0)
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "SET a ?"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "SET"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(2) {
|
|
||||||
name "GET"
|
|
||||||
kind CLIENT
|
|
||||||
childOf span(0)
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "GET a"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "GET"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "async subscriber with specific thread pool"() {
|
|
||||||
when:
|
|
||||||
runWithSpan("test-parent") {
|
|
||||||
reactiveCommands.set("a", "1")
|
|
||||||
.then(reactiveCommands.get("a"))
|
|
||||||
.subscribeOn(Schedulers.elastic())
|
|
||||||
.subscribe()
|
|
||||||
}
|
|
||||||
|
|
||||||
then:
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 3) {
|
|
||||||
span(0) {
|
|
||||||
name "test-parent"
|
|
||||||
attributes {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name "SET"
|
|
||||||
kind CLIENT
|
|
||||||
childOf span(0)
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "SET a ?"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "SET"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(2) {
|
|
||||||
name "GET"
|
|
||||||
kind CLIENT
|
|
||||||
childOf span(0)
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "GET a"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "GET"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,323 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright The OpenTelemetry Authors
|
|
||||||
* SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
import io.lettuce.core.ClientOptions
|
|
||||||
import io.lettuce.core.RedisClient
|
|
||||||
import io.lettuce.core.RedisConnectionException
|
|
||||||
import io.lettuce.core.api.StatefulConnection
|
|
||||||
import io.lettuce.core.api.sync.RedisCommands
|
|
||||||
import io.netty.channel.AbstractChannel
|
|
||||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
|
||||||
import io.opentelemetry.instrumentation.test.utils.PortUtils
|
|
||||||
import io.opentelemetry.semconv.SemanticAttributes
|
|
||||||
import org.testcontainers.containers.GenericContainer
|
|
||||||
import spock.lang.Shared
|
|
||||||
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
|
||||||
import static io.opentelemetry.api.trace.StatusCode.ERROR
|
|
||||||
|
|
||||||
class LettuceSyncClientTest extends AgentInstrumentationSpecification {
|
|
||||||
public static final int DB_INDEX = 0
|
|
||||||
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
|
|
||||||
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()
|
|
||||||
|
|
||||||
private static GenericContainer redisServer = new GenericContainer<>("redis:6.2.3-alpine").withExposedPorts(6379)
|
|
||||||
|
|
||||||
@Shared
|
|
||||||
String host
|
|
||||||
@Shared
|
|
||||||
int port
|
|
||||||
@Shared
|
|
||||||
int incorrectPort
|
|
||||||
@Shared
|
|
||||||
String dbAddr
|
|
||||||
@Shared
|
|
||||||
String dbAddrNonExistent
|
|
||||||
@Shared
|
|
||||||
String dbUriNonExistent
|
|
||||||
@Shared
|
|
||||||
String embeddedDbUri
|
|
||||||
|
|
||||||
@Shared
|
|
||||||
Map<String, String> testHashMap = [
|
|
||||||
firstname: "John",
|
|
||||||
lastname : "Doe",
|
|
||||||
age : "53"
|
|
||||||
]
|
|
||||||
|
|
||||||
RedisClient redisClient
|
|
||||||
StatefulConnection connection
|
|
||||||
RedisCommands<String, ?> syncCommands
|
|
||||||
|
|
||||||
def setup() {
|
|
||||||
redisServer.start()
|
|
||||||
|
|
||||||
host = redisServer.getHost()
|
|
||||||
port = redisServer.getMappedPort(6379)
|
|
||||||
dbAddr = host + ":" + port + "/" + DB_INDEX
|
|
||||||
embeddedDbUri = "redis://" + dbAddr
|
|
||||||
|
|
||||||
incorrectPort = PortUtils.findOpenPort()
|
|
||||||
dbAddrNonExistent = host + ":" + incorrectPort + "/" + DB_INDEX
|
|
||||||
dbUriNonExistent = "redis://" + dbAddrNonExistent
|
|
||||||
|
|
||||||
redisClient = RedisClient.create(embeddedDbUri)
|
|
||||||
|
|
||||||
connection = redisClient.connect()
|
|
||||||
syncCommands = connection.sync()
|
|
||||||
|
|
||||||
syncCommands.set("TESTKEY", "TESTVAL")
|
|
||||||
syncCommands.hmset("TESTHM", testHashMap)
|
|
||||||
|
|
||||||
// 2 sets + 1 connect trace
|
|
||||||
ignoreTracesAndClear(3)
|
|
||||||
}
|
|
||||||
|
|
||||||
def cleanup() {
|
|
||||||
connection.close()
|
|
||||||
redisServer.stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
def "connect"() {
|
|
||||||
setup:
|
|
||||||
RedisClient testConnectionClient = RedisClient.create(embeddedDbUri)
|
|
||||||
testConnectionClient.setOptions(CLIENT_OPTIONS)
|
|
||||||
|
|
||||||
when:
|
|
||||||
StatefulConnection connection = testConnectionClient.connect()
|
|
||||||
|
|
||||||
then:
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "CONNECT"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.NET_PEER_NAME" host
|
|
||||||
"$SemanticAttributes.NET_PEER_PORT" port
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
connection.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
def "connect exception"() {
|
|
||||||
setup:
|
|
||||||
RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent)
|
|
||||||
testConnectionClient.setOptions(CLIENT_OPTIONS)
|
|
||||||
|
|
||||||
when:
|
|
||||||
testConnectionClient.connect()
|
|
||||||
|
|
||||||
then:
|
|
||||||
thrown RedisConnectionException
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "CONNECT"
|
|
||||||
kind CLIENT
|
|
||||||
status ERROR
|
|
||||||
errorEvent AbstractChannel.AnnotatedConnectException, String
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.NET_PEER_NAME" host
|
|
||||||
"$SemanticAttributes.NET_PEER_PORT" incorrectPort
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "set command"() {
|
|
||||||
setup:
|
|
||||||
String res = syncCommands.set("TESTSETKEY", "TESTSETVAL")
|
|
||||||
|
|
||||||
expect:
|
|
||||||
res == "OK"
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "SET"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "SET TESTSETKEY ?"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "SET"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "get command"() {
|
|
||||||
setup:
|
|
||||||
String res = syncCommands.get("TESTKEY")
|
|
||||||
|
|
||||||
expect:
|
|
||||||
res == "TESTVAL"
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "GET"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "GET TESTKEY"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "GET"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "get non existent key command"() {
|
|
||||||
setup:
|
|
||||||
String res = syncCommands.get("NON_EXISTENT_KEY")
|
|
||||||
|
|
||||||
expect:
|
|
||||||
res == null
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "GET"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "GET NON_EXISTENT_KEY"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "GET"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "command with no arguments"() {
|
|
||||||
setup:
|
|
||||||
def keyRetrieved = syncCommands.randomkey()
|
|
||||||
|
|
||||||
expect:
|
|
||||||
keyRetrieved != null
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "RANDOMKEY"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "RANDOMKEY"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "RANDOMKEY"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "list command"() {
|
|
||||||
setup:
|
|
||||||
long res = syncCommands.lpush("TESTLIST", "TESTLIST ELEMENT")
|
|
||||||
|
|
||||||
expect:
|
|
||||||
res == 1
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "LPUSH"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "LPUSH TESTLIST ?"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "LPUSH"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "hash set command"() {
|
|
||||||
setup:
|
|
||||||
def res = syncCommands.hmset("user", testHashMap)
|
|
||||||
|
|
||||||
expect:
|
|
||||||
res == "OK"
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "HMSET"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "HMSET user firstname ? lastname ? age ?"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "HMSET"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "hash getall command"() {
|
|
||||||
setup:
|
|
||||||
Map<String, String> res = syncCommands.hgetall("TESTHM")
|
|
||||||
|
|
||||||
expect:
|
|
||||||
res == testHashMap
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "HGETALL"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "HGETALL TESTHM"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "HGETALL"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "debug segfault command (returns void) with no argument should produce span"() {
|
|
||||||
setup:
|
|
||||||
syncCommands.debugSegfault()
|
|
||||||
|
|
||||||
expect:
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "DEBUG"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "DEBUG SEGFAULT"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "DEBUG"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "shutdown command (returns void) should produce a span"() {
|
|
||||||
setup:
|
|
||||||
syncCommands.shutdown(false)
|
|
||||||
|
|
||||||
expect:
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "SHUTDOWN"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "SHUTDOWN NOSAVE"
|
|
||||||
"$SemanticAttributes.DB_OPERATION" "SHUTDOWN"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.javaagent.instrumentation.lettuce.v5_0;
|
||||||
|
|
||||||
|
import io.lettuce.core.ClientOptions;
|
||||||
|
import io.lettuce.core.RedisClient;
|
||||||
|
import io.lettuce.core.api.StatefulRedisConnection;
|
||||||
|
import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||||
|
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.testcontainers.containers.GenericContainer;
|
||||||
|
import org.testcontainers.containers.output.Slf4jLogConsumer;
|
||||||
|
import org.testcontainers.containers.wait.strategy.Wait;
|
||||||
|
import org.testcontainers.utility.DockerImageName;
|
||||||
|
|
||||||
|
abstract class AbstractLettuceClientTest {
|
||||||
|
|
||||||
|
protected static final Logger logger = LoggerFactory.getLogger(AbstractLettuceClientTest.class);
|
||||||
|
|
||||||
|
@RegisterExtension
|
||||||
|
protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
|
||||||
|
|
||||||
|
@RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create();
|
||||||
|
|
||||||
|
protected static final int DB_INDEX = 0;
|
||||||
|
|
||||||
|
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
|
||||||
|
protected static final ClientOptions CLIENT_OPTIONS =
|
||||||
|
ClientOptions.builder().autoReconnect(false).build();
|
||||||
|
|
||||||
|
static final DockerImageName containerImage = DockerImageName.parse("redis:6.2.3-alpine");
|
||||||
|
|
||||||
|
protected static final GenericContainer<?> redisServer =
|
||||||
|
new GenericContainer<>(containerImage)
|
||||||
|
.withExposedPorts(6379)
|
||||||
|
.withLogConsumer(new Slf4jLogConsumer(logger))
|
||||||
|
.waitingFor(Wait.forLogMessage(".*Ready to accept connections.*", 1));
|
||||||
|
|
||||||
|
protected static RedisClient redisClient;
|
||||||
|
|
||||||
|
protected static StatefulRedisConnection<String, String> connection;
|
||||||
|
|
||||||
|
protected static String host;
|
||||||
|
protected static int port;
|
||||||
|
|
||||||
|
protected static String embeddedDbUri;
|
||||||
|
|
||||||
|
protected static StatefulRedisConnection<String, String> newContainerConnection() {
|
||||||
|
GenericContainer<?> server =
|
||||||
|
new GenericContainer<>(containerImage)
|
||||||
|
.withExposedPorts(6379)
|
||||||
|
.withLogConsumer(new Slf4jLogConsumer(logger))
|
||||||
|
.waitingFor(Wait.forLogMessage(".*Ready to accept connections.*", 1));
|
||||||
|
server.start();
|
||||||
|
cleanup.deferCleanup(server::stop);
|
||||||
|
|
||||||
|
long serverPort = server.getMappedPort(6379);
|
||||||
|
|
||||||
|
RedisClient client = RedisClient.create("redis://" + host + ":" + serverPort + "/" + DB_INDEX);
|
||||||
|
client.setOptions(CLIENT_OPTIONS);
|
||||||
|
cleanup.deferCleanup(client::shutdown);
|
||||||
|
|
||||||
|
StatefulRedisConnection<String, String> statefulConnection = client.connect();
|
||||||
|
cleanup.deferCleanup(statefulConnection);
|
||||||
|
|
||||||
|
// 1 connect trace
|
||||||
|
testing.waitForTraces(1);
|
||||||
|
testing.clearData();
|
||||||
|
|
||||||
|
return statefulConnection;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,496 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.javaagent.instrumentation.lettuce.v5_0;
|
||||||
|
|
||||||
|
import static io.opentelemetry.api.common.AttributeKey.booleanKey;
|
||||||
|
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
|
||||||
|
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.assertj.core.api.Assertions.catchException;
|
||||||
|
import static org.assertj.core.api.Assertions.catchThrowable;
|
||||||
|
import static org.awaitility.Awaitility.await;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import io.lettuce.core.ConnectionFuture;
|
||||||
|
import io.lettuce.core.RedisClient;
|
||||||
|
import io.lettuce.core.RedisFuture;
|
||||||
|
import io.lettuce.core.RedisURI;
|
||||||
|
import io.lettuce.core.api.StatefulRedisConnection;
|
||||||
|
import io.lettuce.core.api.async.RedisAsyncCommands;
|
||||||
|
import io.lettuce.core.api.sync.RedisCommands;
|
||||||
|
import io.lettuce.core.codec.Utf8StringCodec;
|
||||||
|
import io.lettuce.core.protocol.AsyncCommand;
|
||||||
|
import io.opentelemetry.api.common.AttributeKey;
|
||||||
|
import io.opentelemetry.api.common.Attributes;
|
||||||
|
import io.opentelemetry.api.trace.SpanKind;
|
||||||
|
import io.opentelemetry.instrumentation.test.utils.PortUtils;
|
||||||
|
import io.opentelemetry.sdk.trace.data.StatusData;
|
||||||
|
import io.opentelemetry.semconv.SemanticAttributes;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CancellationException;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
|
import java.util.function.BiFunction;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import org.assertj.core.api.AbstractAssert;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation") // until old http semconv are dropped in 2.0
|
||||||
|
class LettuceAsyncClientTest extends AbstractLettuceClientTest {
|
||||||
|
private static int incorrectPort;
|
||||||
|
private static String dbUriNonExistent;
|
||||||
|
|
||||||
|
private static final ImmutableMap<String, String> testHashMap =
|
||||||
|
ImmutableMap.of(
|
||||||
|
"firstname", "John",
|
||||||
|
"lastname", "Doe",
|
||||||
|
"age", "53");
|
||||||
|
|
||||||
|
private static RedisAsyncCommands<String, String> asyncCommands;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
static void setUp() {
|
||||||
|
redisServer.start();
|
||||||
|
host = redisServer.getHost();
|
||||||
|
port = redisServer.getMappedPort(6379);
|
||||||
|
embeddedDbUri = "redis://" + host + ":" + port + "/" + DB_INDEX;
|
||||||
|
|
||||||
|
incorrectPort = PortUtils.findOpenPort();
|
||||||
|
dbUriNonExistent = "redis://" + host + ":" + incorrectPort + "/" + DB_INDEX;
|
||||||
|
|
||||||
|
redisClient = RedisClient.create(embeddedDbUri);
|
||||||
|
redisClient.setOptions(CLIENT_OPTIONS);
|
||||||
|
|
||||||
|
connection = redisClient.connect();
|
||||||
|
asyncCommands = connection.async();
|
||||||
|
RedisCommands<String, String> syncCommands = connection.sync();
|
||||||
|
|
||||||
|
syncCommands.set("TESTKEY", "TESTVAL");
|
||||||
|
|
||||||
|
// 1 set + 1 connect trace
|
||||||
|
testing.waitForTraces(2);
|
||||||
|
testing.clearData();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
static void cleanUp() {
|
||||||
|
connection.close();
|
||||||
|
redisClient.shutdown();
|
||||||
|
redisServer.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testConnectUsingGetOnConnectionFuture() throws ExecutionException, InterruptedException {
|
||||||
|
RedisClient testConnectionClient = RedisClient.create(embeddedDbUri);
|
||||||
|
testConnectionClient.setOptions(CLIENT_OPTIONS);
|
||||||
|
|
||||||
|
ConnectionFuture<StatefulRedisConnection<String, String>> connectionFuture =
|
||||||
|
testConnectionClient.connectAsync(
|
||||||
|
new Utf8StringCodec(), new RedisURI(host, port, 3, TimeUnit.SECONDS));
|
||||||
|
StatefulRedisConnection<String, String> connection1 = connectionFuture.get();
|
||||||
|
cleanup.deferCleanup(connection1);
|
||||||
|
cleanup.deferCleanup(testConnectionClient::shutdown);
|
||||||
|
|
||||||
|
assertThat(connection1).isNotNull();
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("CONNECT")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.NET_PEER_NAME, host),
|
||||||
|
equalTo(SemanticAttributes.NET_PEER_PORT, port),
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testConnectExceptionInsideTheConnectionFuture() {
|
||||||
|
RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent);
|
||||||
|
testConnectionClient.setOptions(CLIENT_OPTIONS);
|
||||||
|
|
||||||
|
Exception exception =
|
||||||
|
catchException(
|
||||||
|
() -> {
|
||||||
|
ConnectionFuture<StatefulRedisConnection<String, String>> connectionFuture =
|
||||||
|
testConnectionClient.connectAsync(
|
||||||
|
new Utf8StringCodec(),
|
||||||
|
new RedisURI(host, incorrectPort, 3, TimeUnit.SECONDS));
|
||||||
|
connectionFuture.get();
|
||||||
|
});
|
||||||
|
|
||||||
|
assertThat(exception).isInstanceOf(ExecutionException.class);
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("CONNECT")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasStatus(StatusData.error())
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.NET_PEER_NAME, host),
|
||||||
|
equalTo(SemanticAttributes.NET_PEER_PORT, incorrectPort),
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"))
|
||||||
|
.hasEventsSatisfyingExactly(
|
||||||
|
event ->
|
||||||
|
event
|
||||||
|
.hasName("exception")
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(
|
||||||
|
AttributeKey.stringKey("exception.type"),
|
||||||
|
"io.netty.channel.AbstractChannel.AnnotatedConnectException"),
|
||||||
|
equalTo(
|
||||||
|
AttributeKey.stringKey("exception.message"),
|
||||||
|
"Connection refused: localhost/127.0.0.1:"
|
||||||
|
+ incorrectPort),
|
||||||
|
satisfies(
|
||||||
|
AttributeKey.stringKey("exception.stacktrace"),
|
||||||
|
AbstractAssert::isNotNull)))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testSetCommandUsingFutureGetWithTimeout()
|
||||||
|
throws ExecutionException, InterruptedException, TimeoutException {
|
||||||
|
RedisFuture<String> redisFuture = asyncCommands.set("TESTSETKEY", "TESTSETVAL");
|
||||||
|
String res = redisFuture.get(3, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
assertThat(res).isEqualTo("OK");
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("SET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "SET TESTSETKEY ?"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "SET"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testGetCommandChainedWithThenAccept()
|
||||||
|
throws ExecutionException, InterruptedException, TimeoutException {
|
||||||
|
CompletableFuture<String> future = new CompletableFuture<>();
|
||||||
|
Consumer<String> consumer =
|
||||||
|
res -> {
|
||||||
|
testing.runWithSpan("callback", () -> assertThat(res).isEqualTo("TESTVAL"));
|
||||||
|
future.complete(res);
|
||||||
|
};
|
||||||
|
|
||||||
|
testing.runWithSpan(
|
||||||
|
"parent",
|
||||||
|
() -> {
|
||||||
|
RedisFuture<String> redisFuture = asyncCommands.get("TESTKEY");
|
||||||
|
redisFuture.thenAccept(consumer);
|
||||||
|
});
|
||||||
|
|
||||||
|
assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo("TESTVAL");
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
|
||||||
|
span ->
|
||||||
|
span.hasName("GET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasParent(trace.getSpan(0))
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "GET TESTKEY"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "GET")),
|
||||||
|
span ->
|
||||||
|
span.hasName("callback")
|
||||||
|
.hasKind(SpanKind.INTERNAL)
|
||||||
|
.hasParent(trace.getSpan(0))));
|
||||||
|
}
|
||||||
|
|
||||||
|
// to make sure instrumentation's chained completion stages won't interfere with user's, while
|
||||||
|
// still recording spans
|
||||||
|
@Test
|
||||||
|
void testGetNonExistentKeyCommandWithHandleAsyncAndChainedWithThenApply()
|
||||||
|
throws ExecutionException, InterruptedException, TimeoutException {
|
||||||
|
CompletableFuture<String> future = new CompletableFuture<>();
|
||||||
|
|
||||||
|
String successStr = "KEY MISSING";
|
||||||
|
|
||||||
|
BiFunction<String, Throwable, String> firstStage =
|
||||||
|
(res, error) -> {
|
||||||
|
testing.runWithSpan(
|
||||||
|
"callback1",
|
||||||
|
() -> {
|
||||||
|
assertThat(res).isNull();
|
||||||
|
assertThat(error).isNull();
|
||||||
|
});
|
||||||
|
return (res == null ? successStr : res);
|
||||||
|
};
|
||||||
|
Function<String, Object> secondStage =
|
||||||
|
input -> {
|
||||||
|
testing.runWithSpan(
|
||||||
|
"callback2",
|
||||||
|
() -> {
|
||||||
|
assertThat(input).isEqualTo(successStr);
|
||||||
|
future.complete(successStr);
|
||||||
|
});
|
||||||
|
return null;
|
||||||
|
};
|
||||||
|
|
||||||
|
testing.runWithSpan(
|
||||||
|
"parent",
|
||||||
|
() -> {
|
||||||
|
RedisFuture<String> redisFuture = asyncCommands.get("NON_EXISTENT_KEY");
|
||||||
|
redisFuture.handle(firstStage).thenApply(secondStage);
|
||||||
|
});
|
||||||
|
|
||||||
|
assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo(successStr);
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
|
||||||
|
span ->
|
||||||
|
span.hasName("GET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasParent(trace.getSpan(0))
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "GET NON_EXISTENT_KEY"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "GET")),
|
||||||
|
span ->
|
||||||
|
span.hasName("callback1")
|
||||||
|
.hasKind(SpanKind.INTERNAL)
|
||||||
|
.hasParent(trace.getSpan(0)),
|
||||||
|
span ->
|
||||||
|
span.hasName("callback2")
|
||||||
|
.hasKind(SpanKind.INTERNAL)
|
||||||
|
.hasParent(trace.getSpan(0))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testCommandWithNoArgumentsUsingBiconsumer()
|
||||||
|
throws ExecutionException, InterruptedException, TimeoutException {
|
||||||
|
CompletableFuture<String> future = new CompletableFuture<>();
|
||||||
|
BiConsumer<String, Throwable> biConsumer =
|
||||||
|
(keyRetrieved, error) ->
|
||||||
|
testing.runWithSpan(
|
||||||
|
"callback",
|
||||||
|
() -> {
|
||||||
|
assertThat(keyRetrieved).isNotNull();
|
||||||
|
future.complete(keyRetrieved);
|
||||||
|
});
|
||||||
|
|
||||||
|
testing.runWithSpan(
|
||||||
|
"parent",
|
||||||
|
() -> {
|
||||||
|
RedisFuture<String> redisFuture = asyncCommands.randomkey();
|
||||||
|
redisFuture.whenCompleteAsync(biConsumer);
|
||||||
|
});
|
||||||
|
|
||||||
|
assertThat(future.get(10, TimeUnit.SECONDS)).isNotNull();
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
|
||||||
|
span ->
|
||||||
|
span.hasName("RANDOMKEY")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasParent(trace.getSpan(0))
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "RANDOMKEY"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "RANDOMKEY")),
|
||||||
|
span ->
|
||||||
|
span.hasName("callback")
|
||||||
|
.hasKind(SpanKind.INTERNAL)
|
||||||
|
.hasParent(trace.getSpan(0))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testHashSetAndThenNestApplyToHashGetall()
|
||||||
|
throws ExecutionException, InterruptedException, TimeoutException {
|
||||||
|
CompletableFuture<Map<String, String>> future = new CompletableFuture<>();
|
||||||
|
|
||||||
|
RedisFuture<String> hmsetFuture = asyncCommands.hmset("TESTHM", testHashMap);
|
||||||
|
hmsetFuture.thenApplyAsync(
|
||||||
|
setResult -> {
|
||||||
|
// Wait for 'hmset' trace to get written
|
||||||
|
testing.waitForTraces(1);
|
||||||
|
|
||||||
|
if (!"OK".equals(setResult)) {
|
||||||
|
future.completeExceptionally(new AssertionError("Wrong hmset result " + setResult));
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
RedisFuture<Map<String, String>> hmGetAllFuture = asyncCommands.hgetall("TESTHM");
|
||||||
|
hmGetAllFuture.whenComplete(
|
||||||
|
(result, exception) -> {
|
||||||
|
if (exception != null) {
|
||||||
|
future.completeExceptionally(exception);
|
||||||
|
} else {
|
||||||
|
future.complete(result);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo(testHashMap);
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("HMSET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(
|
||||||
|
SemanticAttributes.DB_STATEMENT,
|
||||||
|
"HMSET TESTHM firstname ? lastname ? age ?"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "HMSET"))),
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("HGETALL")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "HGETALL TESTHM"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "HGETALL"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testCommandCompletesExceptionally() {
|
||||||
|
// turn off auto flush to complete the command exceptionally manually
|
||||||
|
asyncCommands.setAutoFlushCommands(false);
|
||||||
|
cleanup.deferCleanup(() -> asyncCommands.setAutoFlushCommands(true));
|
||||||
|
|
||||||
|
RedisFuture<Long> redisFuture = asyncCommands.del("key1", "key2");
|
||||||
|
boolean completedExceptionally =
|
||||||
|
((AsyncCommand<?, ?, ?>) redisFuture)
|
||||||
|
.completeExceptionally(new IllegalStateException("TestException"));
|
||||||
|
|
||||||
|
redisFuture.exceptionally(
|
||||||
|
error -> {
|
||||||
|
assertThat(error).isNotNull();
|
||||||
|
assertThat(error).isInstanceOf(IllegalStateException.class);
|
||||||
|
assertThat(error.getMessage()).isEqualTo("TestException");
|
||||||
|
throw new RuntimeException(error);
|
||||||
|
});
|
||||||
|
|
||||||
|
asyncCommands.flushCommands();
|
||||||
|
Throwable thrown = catchThrowable(redisFuture::get);
|
||||||
|
|
||||||
|
await()
|
||||||
|
.untilAsserted(
|
||||||
|
() -> {
|
||||||
|
assertThat(thrown).isInstanceOf(ExecutionException.class);
|
||||||
|
assertThat(completedExceptionally).isTrue();
|
||||||
|
});
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("DEL")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasStatus(StatusData.error())
|
||||||
|
.hasException(new IllegalStateException("TestException"))
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "DEL key1 key2"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "DEL"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testCancelCommandBeforeItFinishes() {
|
||||||
|
asyncCommands.setAutoFlushCommands(false);
|
||||||
|
cleanup.deferCleanup(() -> asyncCommands.setAutoFlushCommands(true));
|
||||||
|
|
||||||
|
RedisFuture<Long> redisFuture =
|
||||||
|
testing.runWithSpan("parent", () -> asyncCommands.sadd("SKEY", "1", "2"));
|
||||||
|
redisFuture.whenCompleteAsync(
|
||||||
|
(res, error) ->
|
||||||
|
testing.runWithSpan(
|
||||||
|
"callback",
|
||||||
|
() -> {
|
||||||
|
assertThat(error).isNotNull();
|
||||||
|
assertThat(error).isInstanceOf(CancellationException.class);
|
||||||
|
}));
|
||||||
|
|
||||||
|
boolean cancelSuccess = redisFuture.cancel(true);
|
||||||
|
asyncCommands.flushCommands();
|
||||||
|
|
||||||
|
await().untilAsserted(() -> assertThat(cancelSuccess).isTrue());
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("parent")
|
||||||
|
.hasKind(SpanKind.INTERNAL)
|
||||||
|
.hasNoParent()
|
||||||
|
.hasAttributes(Attributes.empty()),
|
||||||
|
span ->
|
||||||
|
span.hasName("SADD")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasParent(trace.getSpan(0))
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "SADD SKEY ? ?"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "SADD"),
|
||||||
|
equalTo(booleanKey("lettuce.command.cancelled"), true)),
|
||||||
|
span ->
|
||||||
|
span.hasName("callback")
|
||||||
|
.hasKind(SpanKind.INTERNAL)
|
||||||
|
.hasParent(trace.getSpan(0))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testDebugSegfaultCommandWithNoArgumentShouldProduceSpan() {
|
||||||
|
// Test Causes redis to crash therefore it needs its own container
|
||||||
|
try (StatefulRedisConnection<String, String> statefulConnection = newContainerConnection()) {
|
||||||
|
RedisAsyncCommands<String, String> commands = statefulConnection.async();
|
||||||
|
commands.debugSegfault();
|
||||||
|
}
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("DEBUG")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "DEBUG SEGFAULT"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "DEBUG"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testShutdownCommandShouldProduceSpan() {
|
||||||
|
// Test Causes redis to crash therefore it needs its own container
|
||||||
|
try (StatefulRedisConnection<String, String> statefulConnection = newContainerConnection()) {
|
||||||
|
RedisAsyncCommands<String, String> commands = statefulConnection.async();
|
||||||
|
commands.shutdown(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("SHUTDOWN")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "SHUTDOWN NOSAVE"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "SHUTDOWN"))));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,373 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.javaagent.instrumentation.lettuce.v5_0;
|
||||||
|
|
||||||
|
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
|
||||||
|
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
import io.lettuce.core.RedisClient;
|
||||||
|
import io.lettuce.core.api.StatefulRedisConnection;
|
||||||
|
import io.lettuce.core.api.reactive.RedisReactiveCommands;
|
||||||
|
import io.lettuce.core.api.sync.RedisCommands;
|
||||||
|
import io.opentelemetry.api.common.AttributeKey;
|
||||||
|
import io.opentelemetry.api.common.Attributes;
|
||||||
|
import io.opentelemetry.api.trace.SpanKind;
|
||||||
|
import io.opentelemetry.semconv.SemanticAttributes;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
import org.assertj.core.api.AbstractBooleanAssert;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import reactor.core.scheduler.Schedulers;
|
||||||
|
|
||||||
|
class LettuceReactiveClientTest extends AbstractLettuceClientTest {
|
||||||
|
private static RedisReactiveCommands<String, String> reactiveCommands;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
static void setUp() {
|
||||||
|
redisServer.start();
|
||||||
|
|
||||||
|
host = redisServer.getHost();
|
||||||
|
port = redisServer.getMappedPort(6379);
|
||||||
|
embeddedDbUri = "redis://" + host + ":" + port + "/" + DB_INDEX;
|
||||||
|
|
||||||
|
redisClient = RedisClient.create(embeddedDbUri);
|
||||||
|
redisClient.setOptions(CLIENT_OPTIONS);
|
||||||
|
|
||||||
|
connection = redisClient.connect();
|
||||||
|
reactiveCommands = connection.reactive();
|
||||||
|
RedisCommands<String, String> syncCommands = connection.sync();
|
||||||
|
|
||||||
|
syncCommands.set("TESTKEY", "TESTVAL");
|
||||||
|
|
||||||
|
// 1 set + 1 connect trace
|
||||||
|
testing.waitForTraces(2);
|
||||||
|
testing.clearData();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
static void cleanUp() {
|
||||||
|
connection.close();
|
||||||
|
redisClient.shutdown();
|
||||||
|
redisServer.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testSetCommandWithSubscribeOnDefinedConsumer()
|
||||||
|
throws ExecutionException, InterruptedException, TimeoutException {
|
||||||
|
CompletableFuture<String> future = new CompletableFuture<>();
|
||||||
|
|
||||||
|
Consumer<String> consumer =
|
||||||
|
res ->
|
||||||
|
testing.runWithSpan(
|
||||||
|
"callback",
|
||||||
|
() -> {
|
||||||
|
assertThat(res).isEqualTo("OK");
|
||||||
|
future.complete(res);
|
||||||
|
});
|
||||||
|
|
||||||
|
testing.runWithSpan(
|
||||||
|
"parent", () -> reactiveCommands.set("TESTSETKEY", "TESTSETVAL").subscribe(consumer));
|
||||||
|
|
||||||
|
assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo("OK");
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
|
||||||
|
span ->
|
||||||
|
span.hasName("SET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasParent(trace.getSpan(0))
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "SET TESTSETKEY ?"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "SET")),
|
||||||
|
span ->
|
||||||
|
span.hasName("callback")
|
||||||
|
.hasKind(SpanKind.INTERNAL)
|
||||||
|
.hasParent(trace.getSpan(0))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testGetCommandWithLambdaFunction()
|
||||||
|
throws ExecutionException, InterruptedException, TimeoutException {
|
||||||
|
CompletableFuture<String> future = new CompletableFuture<>();
|
||||||
|
|
||||||
|
reactiveCommands
|
||||||
|
.get("TESTKEY")
|
||||||
|
.subscribe(
|
||||||
|
res -> {
|
||||||
|
assertThat(res).isEqualTo("TESTVAL");
|
||||||
|
future.complete(res);
|
||||||
|
});
|
||||||
|
|
||||||
|
assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo("TESTVAL");
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("GET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "GET TESTKEY"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "GET"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
// to make sure instrumentation's chained completion stages won't interfere with user's, while
|
||||||
|
// still recording spans
|
||||||
|
@Test
|
||||||
|
void testGetNonExistentKeyCommand()
|
||||||
|
throws ExecutionException, InterruptedException, TimeoutException {
|
||||||
|
CompletableFuture<String> future = new CompletableFuture<>();
|
||||||
|
String defaultVal = "NOT THIS VALUE";
|
||||||
|
|
||||||
|
testing.runWithSpan(
|
||||||
|
"parent",
|
||||||
|
() -> {
|
||||||
|
reactiveCommands
|
||||||
|
.get("NON_EXISTENT_KEY")
|
||||||
|
.defaultIfEmpty(defaultVal)
|
||||||
|
.subscribe(
|
||||||
|
res ->
|
||||||
|
testing.runWithSpan(
|
||||||
|
"callback",
|
||||||
|
() -> {
|
||||||
|
assertThat(res).isEqualTo(defaultVal);
|
||||||
|
future.complete(res);
|
||||||
|
}));
|
||||||
|
});
|
||||||
|
|
||||||
|
assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo(defaultVal);
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
|
||||||
|
span ->
|
||||||
|
span.hasName("GET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "GET NON_EXISTENT_KEY"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "GET")),
|
||||||
|
span ->
|
||||||
|
span.hasName("callback")
|
||||||
|
.hasKind(SpanKind.INTERNAL)
|
||||||
|
.hasParent(trace.getSpan(0))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testCommandWithNoArguments()
|
||||||
|
throws ExecutionException, InterruptedException, TimeoutException {
|
||||||
|
CompletableFuture<String> future = new CompletableFuture<>();
|
||||||
|
|
||||||
|
reactiveCommands
|
||||||
|
.randomkey()
|
||||||
|
.subscribe(
|
||||||
|
res -> {
|
||||||
|
assertThat(res).isEqualTo("TESTKEY");
|
||||||
|
future.complete(res);
|
||||||
|
});
|
||||||
|
|
||||||
|
assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo("TESTKEY");
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("RANDOMKEY")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "RANDOMKEY"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "RANDOMKEY"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testCommandFluxPublisher() {
|
||||||
|
reactiveCommands.command().subscribe();
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("COMMAND")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "COMMAND"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "COMMAND"),
|
||||||
|
satisfies(
|
||||||
|
AttributeKey.longKey("lettuce.command.results.count"),
|
||||||
|
val -> val.isGreaterThan(100)))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testCommandCancelAfter2OnFluxPublisher() {
|
||||||
|
reactiveCommands.command().take(2).subscribe();
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("COMMAND")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "COMMAND"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "COMMAND"),
|
||||||
|
satisfies(
|
||||||
|
AttributeKey.booleanKey("lettuce.command.cancelled"),
|
||||||
|
AbstractBooleanAssert::isTrue),
|
||||||
|
satisfies(
|
||||||
|
AttributeKey.longKey("lettuce.command.results.count"),
|
||||||
|
val -> val.isEqualTo(2)))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testNonReactiveCommandShouldNotProduceSpan() {
|
||||||
|
String res = reactiveCommands.digest(null);
|
||||||
|
|
||||||
|
assertThat(res).isNotNull();
|
||||||
|
assertThat(testing.spans().size()).isEqualTo(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testDebugSegfaultCommandReturnsMonoVoidWithNoArgumentShouldProduceSpan() {
|
||||||
|
// Test Causes redis to crash therefore it needs its own container
|
||||||
|
try (StatefulRedisConnection<String, String> statefulConnection = newContainerConnection()) {
|
||||||
|
RedisReactiveCommands<String, String> commands = statefulConnection.reactive();
|
||||||
|
commands.debugSegfault().subscribe();
|
||||||
|
}
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("DEBUG")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "DEBUG SEGFAULT"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "DEBUG"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testShutdownCommandShouldProduceSpan() {
|
||||||
|
// Test Causes redis to crash therefore it needs its own container
|
||||||
|
try (StatefulRedisConnection<String, String> statefulConnection = newContainerConnection()) {
|
||||||
|
RedisReactiveCommands<String, String> commands = statefulConnection.reactive();
|
||||||
|
commands.shutdown(false).subscribe();
|
||||||
|
}
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("SHUTDOWN")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "SHUTDOWN NOSAVE"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "SHUTDOWN"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testBlockingSubscriber() {
|
||||||
|
testing.runWithSpan(
|
||||||
|
"test-parent",
|
||||||
|
() -> reactiveCommands.set("a", "1").then(reactiveCommands.get("a")).block());
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span -> span.hasName("test-parent").hasAttributes(Attributes.empty()),
|
||||||
|
span ->
|
||||||
|
span.hasName("SET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasParent(trace.getSpan(0))
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "SET a ?"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "SET")),
|
||||||
|
span ->
|
||||||
|
span.hasName("GET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasParent(trace.getSpan(0))
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "GET a"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "GET"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testAsyncSubscriber() {
|
||||||
|
testing.runWithSpan(
|
||||||
|
"test-parent",
|
||||||
|
() -> reactiveCommands.set("a", "1").then(reactiveCommands.get("a")).subscribe());
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span -> span.hasName("test-parent").hasAttributes(Attributes.empty()),
|
||||||
|
span ->
|
||||||
|
span.hasName("SET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasParent(trace.getSpan(0))
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "SET a ?"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "SET")),
|
||||||
|
span ->
|
||||||
|
span.hasName("GET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasParent(trace.getSpan(0))
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "GET a"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "GET"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testAsyncSubscriberWithSpecificThreadPool() {
|
||||||
|
testing.runWithSpan(
|
||||||
|
"test-parent",
|
||||||
|
() ->
|
||||||
|
reactiveCommands
|
||||||
|
.set("a", "1")
|
||||||
|
.then(reactiveCommands.get("a"))
|
||||||
|
.subscribeOn(Schedulers.elastic())
|
||||||
|
.subscribe());
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span -> span.hasName("test-parent").hasAttributes(Attributes.empty()),
|
||||||
|
span ->
|
||||||
|
span.hasName("SET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasParent(trace.getSpan(0))
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "SET a ?"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "SET")),
|
||||||
|
span ->
|
||||||
|
span.hasName("GET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasParent(trace.getSpan(0))
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "GET a"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "GET"))));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,291 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.javaagent.instrumentation.lettuce.v5_0;
|
||||||
|
|
||||||
|
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
|
||||||
|
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.assertj.core.api.Assertions.catchException;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import io.lettuce.core.RedisClient;
|
||||||
|
import io.lettuce.core.RedisConnectionException;
|
||||||
|
import io.lettuce.core.api.StatefulRedisConnection;
|
||||||
|
import io.lettuce.core.api.sync.RedisCommands;
|
||||||
|
import io.opentelemetry.api.common.AttributeKey;
|
||||||
|
import io.opentelemetry.api.trace.SpanKind;
|
||||||
|
import io.opentelemetry.instrumentation.test.utils.PortUtils;
|
||||||
|
import io.opentelemetry.sdk.trace.data.StatusData;
|
||||||
|
import io.opentelemetry.semconv.SemanticAttributes;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.assertj.core.api.AbstractAssert;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation") // until old http semconv are dropped in 2.0
|
||||||
|
class LettuceSyncClientTest extends AbstractLettuceClientTest {
|
||||||
|
private static int incorrectPort;
|
||||||
|
private static String dbUriNonExistent;
|
||||||
|
|
||||||
|
private static final ImmutableMap<String, String> testHashMap =
|
||||||
|
ImmutableMap.of(
|
||||||
|
"firstname", "John",
|
||||||
|
"lastname", "Doe",
|
||||||
|
"age", "53");
|
||||||
|
|
||||||
|
private static RedisCommands<String, String> syncCommands;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
static void setUp() {
|
||||||
|
redisServer.start();
|
||||||
|
host = redisServer.getHost();
|
||||||
|
port = redisServer.getMappedPort(6379);
|
||||||
|
embeddedDbUri = "redis://" + host + ":" + port + "/" + DB_INDEX;
|
||||||
|
|
||||||
|
incorrectPort = PortUtils.findOpenPort();
|
||||||
|
dbUriNonExistent = "redis://" + host + ":" + incorrectPort + "/" + DB_INDEX;
|
||||||
|
|
||||||
|
redisClient = RedisClient.create(embeddedDbUri);
|
||||||
|
redisClient.setOptions(CLIENT_OPTIONS);
|
||||||
|
|
||||||
|
connection = redisClient.connect();
|
||||||
|
syncCommands = connection.sync();
|
||||||
|
|
||||||
|
syncCommands.set("TESTKEY", "TESTVAL");
|
||||||
|
syncCommands.hmset("TESTHM", testHashMap);
|
||||||
|
|
||||||
|
// 2 sets + 1 connect trace
|
||||||
|
testing.waitForTraces(3);
|
||||||
|
testing.clearData();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
static void cleanUp() {
|
||||||
|
connection.close();
|
||||||
|
redisClient.shutdown();
|
||||||
|
redisServer.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testConnect() {
|
||||||
|
RedisClient testConnectionClient = RedisClient.create(embeddedDbUri);
|
||||||
|
testConnectionClient.setOptions(CLIENT_OPTIONS);
|
||||||
|
|
||||||
|
StatefulRedisConnection<String, String> testConnection = testConnectionClient.connect();
|
||||||
|
cleanup.deferCleanup(testConnection);
|
||||||
|
cleanup.deferCleanup(testConnectionClient::shutdown);
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("CONNECT")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.NET_PEER_NAME, host),
|
||||||
|
equalTo(SemanticAttributes.NET_PEER_PORT, port),
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testConnectException() {
|
||||||
|
RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent);
|
||||||
|
testConnectionClient.setOptions(CLIENT_OPTIONS);
|
||||||
|
|
||||||
|
Exception exception = catchException(testConnectionClient::connect);
|
||||||
|
|
||||||
|
assertThat(exception).isInstanceOf(RedisConnectionException.class);
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("CONNECT")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasStatus(StatusData.error())
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.NET_PEER_NAME, host),
|
||||||
|
equalTo(SemanticAttributes.NET_PEER_PORT, incorrectPort),
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"))
|
||||||
|
.hasEventsSatisfyingExactly(
|
||||||
|
event ->
|
||||||
|
event
|
||||||
|
.hasName("exception")
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(
|
||||||
|
AttributeKey.stringKey("exception.type"),
|
||||||
|
"io.netty.channel.AbstractChannel.AnnotatedConnectException"),
|
||||||
|
equalTo(
|
||||||
|
AttributeKey.stringKey("exception.message"),
|
||||||
|
"Connection refused: localhost/127.0.0.1:"
|
||||||
|
+ incorrectPort),
|
||||||
|
satisfies(
|
||||||
|
AttributeKey.stringKey("exception.stacktrace"),
|
||||||
|
AbstractAssert::isNotNull)))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testSetCommand() {
|
||||||
|
String res = syncCommands.set("TESTSETKEY", "TESTSETVAL");
|
||||||
|
assertThat(res).isEqualTo("OK");
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("SET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "SET TESTSETKEY ?"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "SET"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testGetCommand() {
|
||||||
|
String res = syncCommands.get("TESTKEY");
|
||||||
|
assertThat(res).isEqualTo("TESTVAL");
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("GET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "GET TESTKEY"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "GET"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testGetNonExistentKeyCommand() {
|
||||||
|
String res = syncCommands.get("NON_EXISTENT_KEY");
|
||||||
|
assertThat(res).isNull();
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("GET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "GET NON_EXISTENT_KEY"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "GET"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testCommandWithNoArguments() {
|
||||||
|
String res = syncCommands.randomkey();
|
||||||
|
assertThat(res).isNotNull();
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("RANDOMKEY")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "RANDOMKEY"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "RANDOMKEY"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testListCommand() {
|
||||||
|
long res = syncCommands.lpush("TESTLIST", "TESTLIST ELEMENT");
|
||||||
|
assertThat(res).isEqualTo(1);
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("LPUSH")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "LPUSH TESTLIST ?"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "LPUSH"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testHashSetCommand() {
|
||||||
|
String res = syncCommands.hmset("user", testHashMap);
|
||||||
|
assertThat(res).isEqualTo("OK");
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("HMSET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(
|
||||||
|
SemanticAttributes.DB_STATEMENT,
|
||||||
|
"HMSET user firstname ? lastname ? age ?"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "HMSET"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testHashGetallCommand() {
|
||||||
|
Map<String, String> res = syncCommands.hgetall("TESTHM");
|
||||||
|
assertThat(res).isEqualTo(testHashMap);
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("HGETALL")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "HGETALL TESTHM"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "HGETALL"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testDebugSegfaultCommandWithNoArgumentShouldProduceSpan() {
|
||||||
|
// Test causes redis to crash therefore it needs its own container
|
||||||
|
try (StatefulRedisConnection<String, String> statefulConnection = newContainerConnection()) {
|
||||||
|
RedisCommands<String, String> commands = statefulConnection.sync();
|
||||||
|
commands.debugSegfault();
|
||||||
|
}
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("DEBUG")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "DEBUG SEGFAULT"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "DEBUG"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testShutdownCommandShouldProduceSpan() {
|
||||||
|
// Test causes redis to crash therefore it needs its own container
|
||||||
|
try (StatefulRedisConnection<String, String> statefulConnection = newContainerConnection()) {
|
||||||
|
RedisCommands<String, String> commands = statefulConnection.sync();
|
||||||
|
commands.shutdown(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("SHUTDOWN")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "SHUTDOWN NOSAVE"),
|
||||||
|
equalTo(SemanticAttributes.DB_OPERATION, "SHUTDOWN"))));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue