Convert lettuce 5.1 Async client tests from groovy to java (#9994)
This commit is contained in:
parent
2eb5974ecd
commit
d6d6528526
|
@ -1,17 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright The OpenTelemetry Authors
|
|
||||||
* SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.opentelemetry.javaagent.instrumentation.lettuce.v5_1
|
|
||||||
|
|
||||||
import io.lettuce.core.RedisClient
|
|
||||||
import io.opentelemetry.instrumentation.lettuce.v5_1.AbstractLettuceAsyncClientTest
|
|
||||||
import io.opentelemetry.instrumentation.test.AgentTestTrait
|
|
||||||
|
|
||||||
class LettuceAsyncClientTest extends AbstractLettuceAsyncClientTest implements AgentTestTrait {
|
|
||||||
@Override
|
|
||||||
RedisClient createClient(String uri) {
|
|
||||||
return RedisClient.create(uri)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.javaagent.instrumentation.lettuce.v5_1;
|
||||||
|
|
||||||
|
import io.lettuce.core.RedisClient;
|
||||||
|
import io.opentelemetry.instrumentation.lettuce.v5_1.AbstractLettuceAsyncClientTest;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||||
|
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||||
|
|
||||||
|
class LettuceAsyncClientTest extends AbstractLettuceAsyncClientTest {
|
||||||
|
@RegisterExtension
|
||||||
|
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InstrumentationExtension getInstrumentationExtension() {
|
||||||
|
return testing;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RedisClient createClient(String uri) {
|
||||||
|
return RedisClient.create(uri);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,27 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright The OpenTelemetry Authors
|
|
||||||
* SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.opentelemetry.instrumentation.lettuce.v5_1
|
|
||||||
|
|
||||||
import io.lettuce.core.RedisClient
|
|
||||||
import io.lettuce.core.resource.ClientResources
|
|
||||||
import io.opentelemetry.instrumentation.test.LibraryTestTrait
|
|
||||||
|
|
||||||
class LettuceAsyncClientTest extends AbstractLettuceAsyncClientTest implements LibraryTestTrait {
|
|
||||||
@Override
|
|
||||||
RedisClient createClient(String uri) {
|
|
||||||
return RedisClient.create(
|
|
||||||
ClientResources.builder()
|
|
||||||
.tracing(LettuceTelemetry.create(getOpenTelemetry()).newTracing())
|
|
||||||
.build(),
|
|
||||||
uri)
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
boolean testCallback() {
|
|
||||||
// context is not propagated into callbacks
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.instrumentation.lettuce.v5_1;
|
||||||
|
|
||||||
|
import io.lettuce.core.RedisClient;
|
||||||
|
import io.lettuce.core.resource.ClientResources;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
|
||||||
|
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||||
|
|
||||||
|
class LettuceAsyncClientTest extends AbstractLettuceAsyncClientTest {
|
||||||
|
@RegisterExtension
|
||||||
|
static InstrumentationExtension testing = LibraryInstrumentationExtension.create();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InstrumentationExtension getInstrumentationExtension() {
|
||||||
|
return testing;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RedisClient createClient(String uri) {
|
||||||
|
return RedisClient.create(
|
||||||
|
ClientResources.builder()
|
||||||
|
.tracing(
|
||||||
|
LettuceTelemetry.create(getInstrumentationExtension().getOpenTelemetry())
|
||||||
|
.newTracing())
|
||||||
|
.build(),
|
||||||
|
uri);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
boolean testCallback() {
|
||||||
|
// context is not propagated into callbacks
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,450 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright The OpenTelemetry Authors
|
|
||||||
* SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.opentelemetry.instrumentation.lettuce.v5_1
|
|
||||||
|
|
||||||
import io.lettuce.core.ConnectionFuture
|
|
||||||
import io.lettuce.core.RedisClient
|
|
||||||
import io.lettuce.core.RedisFuture
|
|
||||||
import io.lettuce.core.RedisURI
|
|
||||||
import io.lettuce.core.api.StatefulConnection
|
|
||||||
import io.lettuce.core.api.async.RedisAsyncCommands
|
|
||||||
import io.lettuce.core.api.sync.RedisCommands
|
|
||||||
import io.lettuce.core.codec.StringCodec
|
|
||||||
import io.opentelemetry.instrumentation.api.semconv.network.internal.NetworkAttributes
|
|
||||||
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
|
|
||||||
import io.opentelemetry.instrumentation.test.utils.PortUtils
|
|
||||||
import io.opentelemetry.semconv.SemanticAttributes
|
|
||||||
import org.testcontainers.containers.GenericContainer
|
|
||||||
import spock.lang.Shared
|
|
||||||
import spock.util.concurrent.AsyncConditions
|
|
||||||
|
|
||||||
import java.util.concurrent.ExecutionException
|
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
import java.util.function.BiConsumer
|
|
||||||
import java.util.function.BiFunction
|
|
||||||
import java.util.function.Consumer
|
|
||||||
import java.util.function.Function
|
|
||||||
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
|
||||||
|
|
||||||
abstract class AbstractLettuceAsyncClientTest extends InstrumentationSpecification {
|
|
||||||
public static final int DB_INDEX = 0
|
|
||||||
|
|
||||||
private static GenericContainer redisServer = new GenericContainer<>("redis:6.2.3-alpine").withExposedPorts(6379)
|
|
||||||
|
|
||||||
abstract RedisClient createClient(String uri)
|
|
||||||
|
|
||||||
@Shared
|
|
||||||
String host
|
|
||||||
@Shared
|
|
||||||
String expectedHostAttributeValue
|
|
||||||
@Shared
|
|
||||||
int port
|
|
||||||
@Shared
|
|
||||||
int incorrectPort
|
|
||||||
@Shared
|
|
||||||
String dbUriNonExistent
|
|
||||||
@Shared
|
|
||||||
String embeddedDbUri
|
|
||||||
|
|
||||||
@Shared
|
|
||||||
Map<String, String> testHashMap = [
|
|
||||||
firstname: "John",
|
|
||||||
lastname : "Doe",
|
|
||||||
age : "53"
|
|
||||||
]
|
|
||||||
|
|
||||||
RedisClient redisClient
|
|
||||||
StatefulConnection connection
|
|
||||||
RedisAsyncCommands<String, ?> asyncCommands
|
|
||||||
RedisCommands<String, ?> syncCommands
|
|
||||||
|
|
||||||
def setup() {
|
|
||||||
redisServer.start()
|
|
||||||
|
|
||||||
port = redisServer.getMappedPort(6379)
|
|
||||||
host = redisServer.getHost()
|
|
||||||
String dbAddr = host + ":" + port + "/" + DB_INDEX
|
|
||||||
embeddedDbUri = "redis://" + dbAddr
|
|
||||||
expectedHostAttributeValue = host == "127.0.0.1" ? null : host
|
|
||||||
|
|
||||||
incorrectPort = PortUtils.findOpenPort()
|
|
||||||
String dbAddrNonExistent = host + ":" + incorrectPort + "/" + DB_INDEX
|
|
||||||
dbUriNonExistent = "redis://" + dbAddrNonExistent
|
|
||||||
|
|
||||||
redisClient = createClient(embeddedDbUri)
|
|
||||||
redisClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)
|
|
||||||
|
|
||||||
connection = redisClient.connect()
|
|
||||||
asyncCommands = connection.async()
|
|
||||||
syncCommands = connection.sync()
|
|
||||||
|
|
||||||
syncCommands.set("TESTKEY", "TESTVAL")
|
|
||||||
|
|
||||||
// 1 set
|
|
||||||
ignoreTracesAndClear(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
def cleanup() {
|
|
||||||
connection.close()
|
|
||||||
redisClient.shutdown()
|
|
||||||
redisServer.stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean testCallback() {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
def <T> T runWithCallbackSpan(String spanName, Closure callback) {
|
|
||||||
if (testCallback()) {
|
|
||||||
return runWithSpan(spanName, callback)
|
|
||||||
}
|
|
||||||
return callback.call()
|
|
||||||
}
|
|
||||||
|
|
||||||
def "connect using get on ConnectionFuture"() {
|
|
||||||
setup:
|
|
||||||
RedisClient testConnectionClient = RedisClient.create(embeddedDbUri)
|
|
||||||
testConnectionClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)
|
|
||||||
|
|
||||||
when:
|
|
||||||
ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8,
|
|
||||||
RedisURI.create("redis://${host}:${port}?timeout=3s"))
|
|
||||||
StatefulConnection connection = connectionFuture.get()
|
|
||||||
|
|
||||||
then:
|
|
||||||
connection != null
|
|
||||||
// Lettuce tracing does not trace connect
|
|
||||||
assertTraces(0) {}
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
connection.close()
|
|
||||||
testConnectionClient.shutdown()
|
|
||||||
}
|
|
||||||
|
|
||||||
def "connect exception inside the connection future"() {
|
|
||||||
setup:
|
|
||||||
RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent)
|
|
||||||
testConnectionClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)
|
|
||||||
|
|
||||||
when:
|
|
||||||
ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8,
|
|
||||||
RedisURI.create("redis://${host}:${incorrectPort}?timeout=3s"))
|
|
||||||
StatefulConnection connection = connectionFuture.get()
|
|
||||||
|
|
||||||
then:
|
|
||||||
connection == null
|
|
||||||
thrown ExecutionException
|
|
||||||
// Lettuce tracing does not trace connect
|
|
||||||
assertTraces(0) {}
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
testConnectionClient.shutdown()
|
|
||||||
}
|
|
||||||
|
|
||||||
def "set command using Future get with timeout"() {
|
|
||||||
setup:
|
|
||||||
RedisFuture<String> redisFuture = asyncCommands.set("TESTSETKEY", "TESTSETVAL")
|
|
||||||
String res = redisFuture.get(3, TimeUnit.SECONDS)
|
|
||||||
|
|
||||||
expect:
|
|
||||||
res == "OK"
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "SET"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.NETWORK_TYPE" "ipv4"
|
|
||||||
"$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1"
|
|
||||||
"$NetworkAttributes.NETWORK_PEER_PORT" port
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "SET TESTSETKEY ?"
|
|
||||||
}
|
|
||||||
event(0) {
|
|
||||||
eventName "redis.encode.start"
|
|
||||||
}
|
|
||||||
event(1) {
|
|
||||||
eventName "redis.encode.end"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "get command chained with thenAccept"() {
|
|
||||||
setup:
|
|
||||||
def conds = new AsyncConditions()
|
|
||||||
Consumer<String> consumer = new Consumer<String>() {
|
|
||||||
@Override
|
|
||||||
void accept(String res) {
|
|
||||||
runWithCallbackSpan("callback") {
|
|
||||||
conds.evaluate {
|
|
||||||
assert res == "TESTVAL"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
when:
|
|
||||||
runWithSpan("parent") {
|
|
||||||
RedisFuture<String> redisFuture = asyncCommands.get("TESTKEY")
|
|
||||||
redisFuture.thenAccept(consumer)
|
|
||||||
}
|
|
||||||
|
|
||||||
then:
|
|
||||||
conds.await(10)
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 2 + (testCallback() ? 1 : 0)) {
|
|
||||||
span(0) {
|
|
||||||
name "parent"
|
|
||||||
kind INTERNAL
|
|
||||||
hasNoParent()
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name "GET"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.NETWORK_TYPE" "ipv4"
|
|
||||||
"$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1"
|
|
||||||
"$NetworkAttributes.NETWORK_PEER_PORT" port
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "GET TESTKEY"
|
|
||||||
}
|
|
||||||
event(0) {
|
|
||||||
eventName "redis.encode.start"
|
|
||||||
}
|
|
||||||
event(1) {
|
|
||||||
eventName "redis.encode.end"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (testCallback()) {
|
|
||||||
span(2) {
|
|
||||||
name "callback"
|
|
||||||
kind INTERNAL
|
|
||||||
childOf(span(0))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// to make sure instrumentation's chained completion stages won't interfere with user's, while still
|
|
||||||
// recording metrics
|
|
||||||
def "get non existent key command with handleAsync and chained with thenApply"() {
|
|
||||||
setup:
|
|
||||||
def conds = new AsyncConditions()
|
|
||||||
String successStr = "KEY MISSING"
|
|
||||||
BiFunction<String, Throwable, String> firstStage = new BiFunction<String, Throwable, String>() {
|
|
||||||
@Override
|
|
||||||
String apply(String res, Throwable throwable) {
|
|
||||||
runWithCallbackSpan("callback1") {
|
|
||||||
conds.evaluate {
|
|
||||||
assert res == null
|
|
||||||
assert throwable == null
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return (res == null ? successStr : res)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Function<String, Object> secondStage = new Function<String, Object>() {
|
|
||||||
@Override
|
|
||||||
Object apply(String input) {
|
|
||||||
runWithCallbackSpan("callback2") {
|
|
||||||
conds.evaluate {
|
|
||||||
assert input == successStr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
when:
|
|
||||||
runWithSpan("parent") {
|
|
||||||
RedisFuture<String> redisFuture = asyncCommands.get("NON_EXISTENT_KEY")
|
|
||||||
redisFuture.handleAsync(firstStage).thenApply(secondStage)
|
|
||||||
}
|
|
||||||
|
|
||||||
then:
|
|
||||||
conds.await(10)
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 2 + (testCallback() ? 2 : 0)) {
|
|
||||||
span(0) {
|
|
||||||
name "parent"
|
|
||||||
kind INTERNAL
|
|
||||||
hasNoParent()
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name "GET"
|
|
||||||
kind CLIENT
|
|
||||||
childOf(span(0))
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.NETWORK_TYPE" "ipv4"
|
|
||||||
"$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1"
|
|
||||||
"$NetworkAttributes.NETWORK_PEER_PORT" port
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "GET NON_EXISTENT_KEY"
|
|
||||||
}
|
|
||||||
event(0) {
|
|
||||||
eventName "redis.encode.start"
|
|
||||||
}
|
|
||||||
event(1) {
|
|
||||||
eventName "redis.encode.end"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (testCallback()) {
|
|
||||||
span(2) {
|
|
||||||
name "callback1"
|
|
||||||
kind INTERNAL
|
|
||||||
childOf(span(0))
|
|
||||||
}
|
|
||||||
span(3) {
|
|
||||||
name "callback2"
|
|
||||||
kind INTERNAL
|
|
||||||
childOf(span(0))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "command with no arguments using a biconsumer"() {
|
|
||||||
setup:
|
|
||||||
def conds = new AsyncConditions()
|
|
||||||
BiConsumer<String, Throwable> biConsumer = new BiConsumer<String, Throwable>() {
|
|
||||||
@Override
|
|
||||||
void accept(String keyRetrieved, Throwable throwable) {
|
|
||||||
runWithCallbackSpan("callback") {
|
|
||||||
conds.evaluate {
|
|
||||||
assert keyRetrieved != null
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
when:
|
|
||||||
runWithSpan("parent") {
|
|
||||||
RedisFuture<String> redisFuture = asyncCommands.randomkey()
|
|
||||||
redisFuture.whenCompleteAsync(biConsumer)
|
|
||||||
}
|
|
||||||
|
|
||||||
then:
|
|
||||||
conds.await(10)
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 2 + (testCallback() ? 1 : 0)) {
|
|
||||||
span(0) {
|
|
||||||
name "parent"
|
|
||||||
kind INTERNAL
|
|
||||||
hasNoParent()
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name "RANDOMKEY"
|
|
||||||
kind CLIENT
|
|
||||||
childOf(span(0))
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.NETWORK_TYPE" "ipv4"
|
|
||||||
"$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1"
|
|
||||||
"$NetworkAttributes.NETWORK_PEER_PORT" port
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "RANDOMKEY"
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
}
|
|
||||||
event(0) {
|
|
||||||
eventName "redis.encode.start"
|
|
||||||
}
|
|
||||||
event(1) {
|
|
||||||
eventName "redis.encode.end"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (testCallback()) {
|
|
||||||
span(2) {
|
|
||||||
name "callback"
|
|
||||||
kind INTERNAL
|
|
||||||
childOf(span(0))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def "hash set and then nest apply to hash getall"() {
|
|
||||||
setup:
|
|
||||||
def conds = new AsyncConditions()
|
|
||||||
|
|
||||||
when:
|
|
||||||
RedisFuture<String> hmsetFuture = asyncCommands.hmset("TESTHM", testHashMap)
|
|
||||||
hmsetFuture.thenApplyAsync(new Function<String, Object>() {
|
|
||||||
@Override
|
|
||||||
Object apply(String setResult) {
|
|
||||||
conds.evaluate {
|
|
||||||
assert setResult == "OK"
|
|
||||||
}
|
|
||||||
RedisFuture<Map<String, String>> hmGetAllFuture = asyncCommands.hgetall("TESTHM")
|
|
||||||
hmGetAllFuture.exceptionally(new Function<Throwable, Map<String, String>>() {
|
|
||||||
@Override
|
|
||||||
Map<String, String> apply(Throwable throwable) {
|
|
||||||
println("unexpected:" + throwable.toString())
|
|
||||||
throwable.printStackTrace()
|
|
||||||
assert false
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
})
|
|
||||||
hmGetAllFuture.thenAccept(new Consumer<Map<String, String>>() {
|
|
||||||
@Override
|
|
||||||
void accept(Map<String, String> hmGetAllResult) {
|
|
||||||
conds.evaluate {
|
|
||||||
assert testHashMap == hmGetAllResult
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
then:
|
|
||||||
conds.await(10)
|
|
||||||
assertTraces(2) {
|
|
||||||
trace(0, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "HMSET"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.NETWORK_TYPE" "ipv4"
|
|
||||||
"$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1"
|
|
||||||
"$NetworkAttributes.NETWORK_PEER_PORT" port
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "HMSET TESTHM firstname ? lastname ? age ?"
|
|
||||||
}
|
|
||||||
event(0) {
|
|
||||||
eventName "redis.encode.start"
|
|
||||||
}
|
|
||||||
event(1) {
|
|
||||||
eventName "redis.encode.end"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
trace(1, 1) {
|
|
||||||
span(0) {
|
|
||||||
name "HGETALL"
|
|
||||||
kind CLIENT
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.NETWORK_TYPE" "ipv4"
|
|
||||||
"$NetworkAttributes.NETWORK_PEER_ADDRESS" "127.0.0.1"
|
|
||||||
"$NetworkAttributes.NETWORK_PEER_PORT" port
|
|
||||||
"$SemanticAttributes.DB_SYSTEM" "redis"
|
|
||||||
"$SemanticAttributes.DB_STATEMENT" "HGETALL TESTHM"
|
|
||||||
}
|
|
||||||
event(0) {
|
|
||||||
eventName "redis.encode.start"
|
|
||||||
}
|
|
||||||
event(1) {
|
|
||||||
eventName "redis.encode.end"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,411 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.instrumentation.lettuce.v5_1;
|
||||||
|
|
||||||
|
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.assertj.core.api.Assertions.catchThrowable;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import io.lettuce.core.ConnectionFuture;
|
||||||
|
import io.lettuce.core.RedisClient;
|
||||||
|
import io.lettuce.core.RedisFuture;
|
||||||
|
import io.lettuce.core.RedisURI;
|
||||||
|
import io.lettuce.core.api.StatefulRedisConnection;
|
||||||
|
import io.lettuce.core.api.async.RedisAsyncCommands;
|
||||||
|
import io.lettuce.core.api.sync.RedisCommands;
|
||||||
|
import io.lettuce.core.codec.StringCodec;
|
||||||
|
import io.opentelemetry.api.trace.SpanKind;
|
||||||
|
import io.opentelemetry.instrumentation.api.semconv.network.internal.NetworkAttributes;
|
||||||
|
import io.opentelemetry.instrumentation.test.utils.PortUtils;
|
||||||
|
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
|
||||||
|
import io.opentelemetry.semconv.SemanticAttributes;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
|
import java.util.function.BiFunction;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
@SuppressWarnings({"InterruptedExceptionSwallowed"})
|
||||||
|
public abstract class AbstractLettuceAsyncClientTest extends AbstractLettuceClientTest {
|
||||||
|
private static String dbUriNonExistent;
|
||||||
|
private static int incorrectPort;
|
||||||
|
|
||||||
|
private static final ImmutableMap<String, String> testHashMap =
|
||||||
|
ImmutableMap.of(
|
||||||
|
"firstname", "John",
|
||||||
|
"lastname", "Doe",
|
||||||
|
"age", "53");
|
||||||
|
|
||||||
|
private static RedisAsyncCommands<String, String> asyncCommands;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
void setUp() {
|
||||||
|
redisServer.start();
|
||||||
|
host = redisServer.getHost();
|
||||||
|
port = redisServer.getMappedPort(6379);
|
||||||
|
embeddedDbUri = "redis://" + host + ":" + port + "/" + DB_INDEX;
|
||||||
|
|
||||||
|
incorrectPort = PortUtils.findOpenPort();
|
||||||
|
dbUriNonExistent = "redis://" + host + ":" + incorrectPort + "/" + DB_INDEX;
|
||||||
|
|
||||||
|
redisClient = createClient(embeddedDbUri);
|
||||||
|
redisClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS);
|
||||||
|
|
||||||
|
connection = redisClient.connect();
|
||||||
|
asyncCommands = connection.async();
|
||||||
|
RedisCommands<String, String> syncCommands = connection.sync();
|
||||||
|
|
||||||
|
syncCommands.set("TESTKEY", "TESTVAL");
|
||||||
|
|
||||||
|
// 1 set trace
|
||||||
|
getInstrumentationExtension().waitForTraces(1);
|
||||||
|
getInstrumentationExtension().clearData();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
static void cleanUp() {
|
||||||
|
connection.close();
|
||||||
|
redisClient.shutdown();
|
||||||
|
redisServer.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean testCallback() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testConnectUsingGetOnConnectionFuture() throws Exception {
|
||||||
|
RedisClient testConnectionClient = RedisClient.create(embeddedDbUri);
|
||||||
|
testConnectionClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS);
|
||||||
|
|
||||||
|
ConnectionFuture<StatefulRedisConnection<String, String>> connectionFuture =
|
||||||
|
testConnectionClient.connectAsync(
|
||||||
|
StringCodec.UTF8, RedisURI.create("redis://" + host + ":" + port + "?timeout=3s"));
|
||||||
|
StatefulRedisConnection<String, String> connection1 = connectionFuture.get();
|
||||||
|
cleanup.deferCleanup(connection1);
|
||||||
|
cleanup.deferCleanup(testConnectionClient::shutdown);
|
||||||
|
|
||||||
|
assertThat(connection1).isNotNull();
|
||||||
|
// Lettuce tracing does not trace connect
|
||||||
|
assertThat(getInstrumentationExtension().spans()).isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testConnectExceptionInsideTheConnectionFuture() {
|
||||||
|
RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent);
|
||||||
|
testConnectionClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS);
|
||||||
|
cleanup.deferCleanup(testConnectionClient::shutdown);
|
||||||
|
|
||||||
|
Throwable thrown =
|
||||||
|
catchThrowable(
|
||||||
|
() -> {
|
||||||
|
ConnectionFuture<StatefulRedisConnection<String, String>> connectionFuture =
|
||||||
|
testConnectionClient.connectAsync(
|
||||||
|
StringCodec.UTF8,
|
||||||
|
RedisURI.create("redis://" + host + ":" + incorrectPort + "?timeout=3s"));
|
||||||
|
StatefulRedisConnection<String, String> connection1 = connectionFuture.get();
|
||||||
|
cleanup.deferCleanup(connection1);
|
||||||
|
assertThat(connection1).isNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
assertThat(thrown).isInstanceOf(ExecutionException.class);
|
||||||
|
|
||||||
|
// Lettuce tracing does not trace connect
|
||||||
|
assertThat(getInstrumentationExtension().spans()).isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testSetCommandUsingFutureGetWithTimeout() throws Exception {
|
||||||
|
RedisFuture<String> redisFuture = asyncCommands.set("TESTSETKEY", "TESTSETVAL");
|
||||||
|
String res = redisFuture.get(3, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
assertThat(res).isEqualTo("OK");
|
||||||
|
|
||||||
|
getInstrumentationExtension()
|
||||||
|
.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("SET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
|
||||||
|
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
|
||||||
|
equalTo(NetworkAttributes.NETWORK_PEER_PORT, port),
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "SET TESTSETKEY ?"))
|
||||||
|
.hasEventsSatisfyingExactly(
|
||||||
|
event -> event.hasName("redis.encode.start"),
|
||||||
|
event -> event.hasName("redis.encode.end"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testGetCommandChainedWithThenAccept() throws Exception {
|
||||||
|
CompletableFuture<String> future = new CompletableFuture<>();
|
||||||
|
Consumer<String> consumer =
|
||||||
|
res -> {
|
||||||
|
if (testCallback()) {
|
||||||
|
getInstrumentationExtension()
|
||||||
|
.runWithSpan("callback", () -> assertThat(res).isEqualTo("TESTVAL"));
|
||||||
|
}
|
||||||
|
future.complete(res);
|
||||||
|
};
|
||||||
|
|
||||||
|
getInstrumentationExtension()
|
||||||
|
.runWithSpan(
|
||||||
|
"parent",
|
||||||
|
() -> {
|
||||||
|
RedisFuture<String> redisFuture = asyncCommands.get("TESTKEY");
|
||||||
|
redisFuture.thenAccept(consumer);
|
||||||
|
});
|
||||||
|
|
||||||
|
assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo("TESTVAL");
|
||||||
|
|
||||||
|
getInstrumentationExtension()
|
||||||
|
.waitAndAssertTraces(
|
||||||
|
trace -> {
|
||||||
|
List<Consumer<SpanDataAssert>> spanAsserts =
|
||||||
|
new ArrayList<>(
|
||||||
|
Arrays.asList(
|
||||||
|
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
|
||||||
|
span ->
|
||||||
|
span.hasName("GET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasParent(trace.getSpan(0))
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
|
||||||
|
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
|
||||||
|
equalTo(NetworkAttributes.NETWORK_PEER_PORT, port),
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "GET TESTKEY"))
|
||||||
|
.hasEventsSatisfyingExactly(
|
||||||
|
event -> event.hasName("redis.encode.start"),
|
||||||
|
event -> event.hasName("redis.encode.end"))));
|
||||||
|
|
||||||
|
if (testCallback()) {
|
||||||
|
spanAsserts.add(
|
||||||
|
span ->
|
||||||
|
span.hasName("callback")
|
||||||
|
.hasKind(SpanKind.INTERNAL)
|
||||||
|
.hasParent(trace.getSpan(0)));
|
||||||
|
}
|
||||||
|
trace.hasSpansSatisfyingExactly(spanAsserts);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// to make sure instrumentation's chained completion stages won't interfere with user's, while
|
||||||
|
// still recording spans
|
||||||
|
@Test
|
||||||
|
void testGetNonExistentKeyCommandWithHandleAsyncAndChainedWithThenApply() throws Exception {
|
||||||
|
CompletableFuture<String> future = new CompletableFuture<>();
|
||||||
|
String successStr = "KEY MISSING";
|
||||||
|
|
||||||
|
BiFunction<String, Throwable, String> firstStage =
|
||||||
|
(res, error) -> {
|
||||||
|
if (testCallback()) {
|
||||||
|
getInstrumentationExtension()
|
||||||
|
.runWithSpan(
|
||||||
|
"callback1",
|
||||||
|
() -> {
|
||||||
|
assertThat(res).isNull();
|
||||||
|
assertThat(error).isNull();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return (res == null ? successStr : res);
|
||||||
|
};
|
||||||
|
Function<String, Object> secondStage =
|
||||||
|
input -> {
|
||||||
|
if (testCallback()) {
|
||||||
|
getInstrumentationExtension()
|
||||||
|
.runWithSpan(
|
||||||
|
"callback2",
|
||||||
|
() -> {
|
||||||
|
assertThat(input).isEqualTo(successStr);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
future.complete(successStr);
|
||||||
|
return null;
|
||||||
|
};
|
||||||
|
|
||||||
|
getInstrumentationExtension()
|
||||||
|
.runWithSpan(
|
||||||
|
"parent",
|
||||||
|
() -> {
|
||||||
|
RedisFuture<String> redisFuture = asyncCommands.get("NON_EXISTENT_KEY");
|
||||||
|
redisFuture.handleAsync(firstStage).thenApply(secondStage);
|
||||||
|
});
|
||||||
|
|
||||||
|
assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo(successStr);
|
||||||
|
|
||||||
|
getInstrumentationExtension()
|
||||||
|
.waitAndAssertTraces(
|
||||||
|
trace -> {
|
||||||
|
List<Consumer<SpanDataAssert>> spanAsserts =
|
||||||
|
new ArrayList<>(
|
||||||
|
Arrays.asList(
|
||||||
|
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
|
||||||
|
span ->
|
||||||
|
span.hasName("GET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasParent(trace.getSpan(0))
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
|
||||||
|
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
|
||||||
|
equalTo(NetworkAttributes.NETWORK_PEER_PORT, port),
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(
|
||||||
|
SemanticAttributes.DB_STATEMENT, "GET NON_EXISTENT_KEY"))
|
||||||
|
.hasEventsSatisfyingExactly(
|
||||||
|
event -> event.hasName("redis.encode.start"),
|
||||||
|
event -> event.hasName("redis.encode.end"))));
|
||||||
|
|
||||||
|
if (testCallback()) {
|
||||||
|
spanAsserts.addAll(
|
||||||
|
Arrays.asList(
|
||||||
|
span ->
|
||||||
|
span.hasName("callback1")
|
||||||
|
.hasKind(SpanKind.INTERNAL)
|
||||||
|
.hasParent(trace.getSpan(0)),
|
||||||
|
span ->
|
||||||
|
span.hasName("callback2")
|
||||||
|
.hasKind(SpanKind.INTERNAL)
|
||||||
|
.hasParent(trace.getSpan(0))));
|
||||||
|
}
|
||||||
|
trace.hasSpansSatisfyingExactly(spanAsserts);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testCommandWithNoArgumentsUsingBiconsumer() throws Exception {
|
||||||
|
CompletableFuture<String> future = new CompletableFuture<>();
|
||||||
|
BiConsumer<String, Throwable> biConsumer =
|
||||||
|
(keyRetrieved, error) -> {
|
||||||
|
if (testCallback()) {
|
||||||
|
getInstrumentationExtension()
|
||||||
|
.runWithSpan(
|
||||||
|
"callback",
|
||||||
|
() -> {
|
||||||
|
assertThat(keyRetrieved).isNotNull();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
future.complete(keyRetrieved);
|
||||||
|
};
|
||||||
|
|
||||||
|
getInstrumentationExtension()
|
||||||
|
.runWithSpan(
|
||||||
|
"parent",
|
||||||
|
() -> {
|
||||||
|
RedisFuture<String> redisFuture = asyncCommands.randomkey();
|
||||||
|
redisFuture.whenCompleteAsync(biConsumer);
|
||||||
|
});
|
||||||
|
|
||||||
|
assertThat(future.get(10, TimeUnit.SECONDS)).isNotNull();
|
||||||
|
|
||||||
|
getInstrumentationExtension()
|
||||||
|
.waitAndAssertTraces(
|
||||||
|
trace -> {
|
||||||
|
List<Consumer<SpanDataAssert>> spanAsserts =
|
||||||
|
new ArrayList<>(
|
||||||
|
Arrays.asList(
|
||||||
|
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
|
||||||
|
span ->
|
||||||
|
span.hasName("RANDOMKEY")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasParent(trace.getSpan(0))
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
|
||||||
|
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
|
||||||
|
equalTo(NetworkAttributes.NETWORK_PEER_PORT, port),
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "RANDOMKEY"))
|
||||||
|
.hasEventsSatisfyingExactly(
|
||||||
|
event -> event.hasName("redis.encode.start"),
|
||||||
|
event -> event.hasName("redis.encode.end"))));
|
||||||
|
|
||||||
|
if (testCallback()) {
|
||||||
|
spanAsserts.add(
|
||||||
|
span ->
|
||||||
|
span.hasName("callback")
|
||||||
|
.hasKind(SpanKind.INTERNAL)
|
||||||
|
.hasParent(trace.getSpan(0)));
|
||||||
|
}
|
||||||
|
trace.hasSpansSatisfyingExactly(spanAsserts);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testHashSetAndThenNestApplyToHashGetall() throws Exception {
|
||||||
|
CompletableFuture<Map<String, String>> future = new CompletableFuture<>();
|
||||||
|
|
||||||
|
RedisFuture<String> hmsetFuture = asyncCommands.hmset("TESTHM", testHashMap);
|
||||||
|
hmsetFuture.thenApplyAsync(
|
||||||
|
setResult -> {
|
||||||
|
// Wait for 'hmset' trace to get written
|
||||||
|
getInstrumentationExtension().waitForTraces(1);
|
||||||
|
|
||||||
|
if (!"OK".equals(setResult)) {
|
||||||
|
future.completeExceptionally(new AssertionError("Wrong hmset result " + setResult));
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
RedisFuture<Map<String, String>> hmGetAllFuture = asyncCommands.hgetall("TESTHM");
|
||||||
|
hmGetAllFuture.whenComplete(
|
||||||
|
(result, exception) -> {
|
||||||
|
if (exception != null) {
|
||||||
|
future.completeExceptionally(exception);
|
||||||
|
} else {
|
||||||
|
future.complete(result);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo(testHashMap);
|
||||||
|
|
||||||
|
getInstrumentationExtension()
|
||||||
|
.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("HMSET")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
|
||||||
|
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
|
||||||
|
equalTo(NetworkAttributes.NETWORK_PEER_PORT, port),
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(
|
||||||
|
SemanticAttributes.DB_STATEMENT,
|
||||||
|
"HMSET TESTHM firstname ? lastname ? age ?"))
|
||||||
|
.hasEventsSatisfyingExactly(
|
||||||
|
event -> event.hasName("redis.encode.start"),
|
||||||
|
event -> event.hasName("redis.encode.end"))),
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("HGETALL")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
|
||||||
|
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
|
||||||
|
equalTo(NetworkAttributes.NETWORK_PEER_PORT, port),
|
||||||
|
equalTo(SemanticAttributes.DB_SYSTEM, "redis"),
|
||||||
|
equalTo(SemanticAttributes.DB_STATEMENT, "HGETALL TESTHM"))
|
||||||
|
.hasEventsSatisfyingExactly(
|
||||||
|
event -> event.hasName("redis.encode.start"),
|
||||||
|
event -> event.hasName("redis.encode.end"))));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue