Support Lettuce 6 (#2589)
* Add support for Lettuce 6 * Finish * Remove unnecessary null check
This commit is contained in:
parent
6ea1e8d0ef
commit
2fd933b578
|
@ -4,7 +4,7 @@ muzzle {
|
|||
pass {
|
||||
group = "io.lettuce"
|
||||
module = "lettuce-core"
|
||||
versions = "[5.1.0.RELEASE,6.0.0.RELEASE)"
|
||||
versions = "[5.1.0.RELEASE,)"
|
||||
assertInverse = true
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,8 @@ dependencies {
|
|||
// Only 5.2+ will have command arguments in the db.statement tag.
|
||||
testLibrary group: 'io.lettuce', name: 'lettuce-core', version: '5.2.0.RELEASE'
|
||||
testInstrumentation project(':instrumentation:reactor-3.1:javaagent')
|
||||
|
||||
latestDepTestLibrary group: 'io.lettuce', name: 'lettuce-core', version: '5.+'
|
||||
}
|
||||
|
||||
test {
|
||||
systemProperty "testLatestDeps", testLatestDeps
|
||||
}
|
||||
|
|
|
@ -6,8 +6,10 @@ dependencies {
|
|||
|
||||
implementation project(':instrumentation:lettuce:lettuce-common:library')
|
||||
|
||||
latestDepTestLibrary group: 'io.lettuce', name: 'lettuce-core', version: '5.+'
|
||||
|
||||
testImplementation project(':instrumentation:lettuce:lettuce-5.1:testing')
|
||||
testImplementation project(':instrumentation:reactor-3.1:library')
|
||||
}
|
||||
|
||||
test {
|
||||
systemProperty "testLatestDeps", testLatestDeps
|
||||
}
|
||||
|
|
|
@ -7,6 +7,9 @@ package io.opentelemetry.instrumentation.lettuce.v5_1;
|
|||
|
||||
import static io.opentelemetry.instrumentation.lettuce.common.LettuceArgSplitter.splitArgs;
|
||||
|
||||
import io.lettuce.core.output.CommandOutput;
|
||||
import io.lettuce.core.protocol.CompleteableCommand;
|
||||
import io.lettuce.core.protocol.RedisCommand;
|
||||
import io.lettuce.core.tracing.TraceContext;
|
||||
import io.lettuce.core.tracing.TraceContextProvider;
|
||||
import io.lettuce.core.tracing.Tracer;
|
||||
|
@ -52,7 +55,7 @@ final class OpenTelemetryTracing implements Tracing {
|
|||
return true;
|
||||
}
|
||||
|
||||
// Added in lettuce 5.2
|
||||
// Added in lettuce 5.2, ignored in 6.0+
|
||||
// @Override
|
||||
public boolean includeCommandArgsInSpanTags() {
|
||||
return true;
|
||||
|
@ -196,6 +199,45 @@ final class OpenTelemetryTracing implements Tracing {
|
|||
return this;
|
||||
}
|
||||
|
||||
// Added and called in 6.0+
|
||||
// @Override
|
||||
public synchronized Tracer.Span start(RedisCommand<?, ?, ?> command) {
|
||||
start();
|
||||
|
||||
Span span = this.span;
|
||||
if (span == null) {
|
||||
throw new IllegalStateException("Span started but null, this is a programming error.");
|
||||
}
|
||||
span.updateName(command.getType().name());
|
||||
|
||||
if (command.getArgs() != null) {
|
||||
args = command.getArgs().toCommandString();
|
||||
}
|
||||
|
||||
if (command instanceof CompleteableCommand) {
|
||||
CompleteableCommand<?> completeableCommand = (CompleteableCommand<?>) command;
|
||||
completeableCommand.onComplete(
|
||||
(o, throwable) -> {
|
||||
if (throwable != null) {
|
||||
span.recordException(throwable);
|
||||
}
|
||||
|
||||
CommandOutput<?, ?, ?> output = command.getOutput();
|
||||
if (output != null) {
|
||||
String error = output.getError();
|
||||
if (error != null) {
|
||||
span.setStatus(StatusCode.ERROR, error);
|
||||
}
|
||||
}
|
||||
|
||||
finish(span);
|
||||
});
|
||||
}
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
// Not called by Lettuce in 6.0+ (though we call it ourselves above).
|
||||
@Override
|
||||
public synchronized Tracer.Span start() {
|
||||
span = spanBuilder.startSpan();
|
||||
|
@ -260,13 +302,17 @@ final class OpenTelemetryTracing implements Tracing {
|
|||
@Override
|
||||
public synchronized void finish() {
|
||||
if (span != null) {
|
||||
if (name != null) {
|
||||
String statement = RedisCommandSanitizer.sanitize(name, splitArgs(args));
|
||||
span.setAttribute(SemanticAttributes.DB_STATEMENT, statement);
|
||||
}
|
||||
span.end();
|
||||
finish(span);
|
||||
}
|
||||
}
|
||||
|
||||
private void finish(Span span) {
|
||||
if (name != null) {
|
||||
String statement = RedisCommandSanitizer.sanitize(name, splitArgs(args));
|
||||
span.setAttribute(SemanticAttributes.DB_STATEMENT, statement);
|
||||
}
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
private static void fillEndpoint(AttributeSetter span, OpenTelemetryEndpoint endpoint) {
|
||||
|
|
|
@ -7,7 +7,6 @@ package io.opentelemetry.instrumentation.lettuce.v5_1
|
|||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
||||
|
||||
import io.lettuce.core.ClientOptions
|
||||
import io.lettuce.core.ConnectionFuture
|
||||
import io.lettuce.core.RedisClient
|
||||
import io.lettuce.core.RedisFuture
|
||||
|
@ -32,8 +31,6 @@ import spock.util.concurrent.AsyncConditions
|
|||
abstract class AbstractLettuceAsyncClientTest extends InstrumentationSpecification {
|
||||
public static final String HOST = "127.0.0.1"
|
||||
public static final int DB_INDEX = 0
|
||||
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
|
||||
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()
|
||||
|
||||
abstract RedisClient createClient(String uri)
|
||||
|
||||
|
@ -86,7 +83,7 @@ abstract class AbstractLettuceAsyncClientTest extends InstrumentationSpecificati
|
|||
|
||||
println "Using redis: $redisServer.args"
|
||||
redisServer.start()
|
||||
redisClient.setOptions(CLIENT_OPTIONS)
|
||||
redisClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)
|
||||
|
||||
connection = redisClient.connect()
|
||||
asyncCommands = connection.async()
|
||||
|
@ -106,11 +103,11 @@ abstract class AbstractLettuceAsyncClientTest extends InstrumentationSpecificati
|
|||
def "connect using get on ConnectionFuture"() {
|
||||
setup:
|
||||
RedisClient testConnectionClient = RedisClient.create(embeddedDbUri)
|
||||
testConnectionClient.setOptions(CLIENT_OPTIONS)
|
||||
testConnectionClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)
|
||||
|
||||
when:
|
||||
ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8,
|
||||
new RedisURI(HOST, port, 3, TimeUnit.SECONDS))
|
||||
RedisURI.create("redis://${HOST}:${port}?timeout=3s"))
|
||||
StatefulConnection connection = connectionFuture.get()
|
||||
|
||||
then:
|
||||
|
@ -125,11 +122,11 @@ abstract class AbstractLettuceAsyncClientTest extends InstrumentationSpecificati
|
|||
def "connect exception inside the connection future"() {
|
||||
setup:
|
||||
RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent)
|
||||
testConnectionClient.setOptions(CLIENT_OPTIONS)
|
||||
testConnectionClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)
|
||||
|
||||
when:
|
||||
ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8,
|
||||
new RedisURI(HOST, incorrectPort, 3, TimeUnit.SECONDS))
|
||||
RedisURI.create("redis://${HOST}:${incorrectPort}?timeout=3s"))
|
||||
StatefulConnection connection = connectionFuture.get()
|
||||
|
||||
then:
|
||||
|
|
|
@ -8,7 +8,6 @@ package io.opentelemetry.instrumentation.lettuce.v5_1
|
|||
import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
||||
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
|
||||
|
||||
import io.lettuce.core.ClientOptions
|
||||
import io.lettuce.core.RedisClient
|
||||
import io.lettuce.core.api.StatefulConnection
|
||||
import io.lettuce.core.api.reactive.RedisReactiveCommands
|
||||
|
@ -24,8 +23,6 @@ import spock.util.concurrent.AsyncConditions
|
|||
abstract class AbstractLettuceReactiveClientTest extends InstrumentationSpecification {
|
||||
public static final String HOST = "127.0.0.1"
|
||||
public static final int DB_INDEX = 0
|
||||
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
|
||||
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()
|
||||
|
||||
abstract RedisClient createClient(String uri)
|
||||
|
||||
|
@ -60,7 +57,7 @@ abstract class AbstractLettuceReactiveClientTest extends InstrumentationSpecific
|
|||
|
||||
println "Using redis: $redisServer.args"
|
||||
redisServer.start()
|
||||
redisClient.setOptions(CLIENT_OPTIONS)
|
||||
redisClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)
|
||||
|
||||
connection = redisClient.connect()
|
||||
reactiveCommands = connection.reactive()
|
||||
|
|
|
@ -7,7 +7,6 @@ package io.opentelemetry.instrumentation.lettuce.v5_1
|
|||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
||||
|
||||
import io.lettuce.core.ClientOptions
|
||||
import io.lettuce.core.RedisClient
|
||||
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
|
||||
import io.opentelemetry.instrumentation.test.utils.PortUtils
|
||||
|
@ -18,8 +17,6 @@ import spock.lang.Shared
|
|||
abstract class AbstractLettuceSyncClientAuthTest extends InstrumentationSpecification {
|
||||
public static final String HOST = "127.0.0.1"
|
||||
public static final int DB_INDEX = 0
|
||||
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
|
||||
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()
|
||||
|
||||
abstract RedisClient createClient(String uri)
|
||||
|
||||
|
@ -55,7 +52,7 @@ abstract class AbstractLettuceSyncClientAuthTest extends InstrumentationSpecific
|
|||
|
||||
def setup() {
|
||||
redisClient = createClient(embeddedDbUri)
|
||||
redisClient.setOptions(CLIENT_OPTIONS)
|
||||
redisClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)
|
||||
redisServer.start()
|
||||
}
|
||||
|
||||
|
|
|
@ -8,9 +8,9 @@ package io.opentelemetry.instrumentation.lettuce.v5_1
|
|||
import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
||||
import static java.nio.charset.StandardCharsets.UTF_8
|
||||
|
||||
import io.lettuce.core.ClientOptions
|
||||
import io.lettuce.core.RedisClient
|
||||
import io.lettuce.core.RedisConnectionException
|
||||
import io.lettuce.core.RedisException
|
||||
import io.lettuce.core.ScriptOutputType
|
||||
import io.lettuce.core.api.StatefulConnection
|
||||
import io.lettuce.core.api.sync.RedisCommands
|
||||
|
@ -23,8 +23,6 @@ import spock.lang.Shared
|
|||
abstract class AbstractLettuceSyncClientTest extends InstrumentationSpecification {
|
||||
public static final String HOST = "127.0.0.1"
|
||||
public static final int DB_INDEX = 0
|
||||
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
|
||||
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()
|
||||
|
||||
abstract RedisClient createClient(String uri)
|
||||
|
||||
|
@ -76,6 +74,7 @@ abstract class AbstractLettuceSyncClientTest extends InstrumentationSpecificatio
|
|||
|
||||
def setup() {
|
||||
redisClient = createClient(embeddedDbUri)
|
||||
redisClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)
|
||||
|
||||
redisServer.start()
|
||||
connection = redisClient.connect()
|
||||
|
@ -96,7 +95,7 @@ abstract class AbstractLettuceSyncClientTest extends InstrumentationSpecificatio
|
|||
def "connect"() {
|
||||
setup:
|
||||
RedisClient testConnectionClient = createClient(embeddedDbUri)
|
||||
testConnectionClient.setOptions(CLIENT_OPTIONS)
|
||||
testConnectionClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)
|
||||
|
||||
when:
|
||||
StatefulConnection connection = testConnectionClient.connect()
|
||||
|
@ -112,7 +111,7 @@ abstract class AbstractLettuceSyncClientTest extends InstrumentationSpecificatio
|
|||
def "connect exception"() {
|
||||
setup:
|
||||
RedisClient testConnectionClient = createClient(dbUriNonExistent)
|
||||
testConnectionClient.setOptions(CLIENT_OPTIONS)
|
||||
testConnectionClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)
|
||||
|
||||
when:
|
||||
testConnectionClient.connect()
|
||||
|
@ -157,7 +156,7 @@ abstract class AbstractLettuceSyncClientTest extends InstrumentationSpecificatio
|
|||
def "set command localhost"() {
|
||||
setup:
|
||||
RedisClient testConnectionClient = createClient(embeddedDbLocalhostUri)
|
||||
testConnectionClient.setOptions(CLIENT_OPTIONS)
|
||||
testConnectionClient.setOptions(LettuceTestUtil.CLIENT_OPTIONS)
|
||||
StatefulConnection connection = testConnectionClient.connect()
|
||||
String res = connection.sync().set("TESTSETKEY", "TESTSETVAL")
|
||||
|
||||
|
@ -451,8 +450,33 @@ abstract class AbstractLettuceSyncClientTest extends InstrumentationSpecificatio
|
|||
syncCommands.debugSegfault()
|
||||
|
||||
expect:
|
||||
// lettuce tracing does not trace debug
|
||||
assertTraces(0) {}
|
||||
assertTraces(1) {
|
||||
trace(0, 1) {
|
||||
span(0) {
|
||||
name "DEBUG"
|
||||
// Disconnect not an actual error even though an exception is recorded.
|
||||
errored false
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" "IP.TCP"
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" port
|
||||
"${SemanticAttributes.DB_CONNECTION_STRING.key}" "redis://127.0.0.1:$port"
|
||||
"${SemanticAttributes.DB_SYSTEM.key}" "redis"
|
||||
"${SemanticAttributes.DB_STATEMENT.key}" "DEBUG SEGFAULT"
|
||||
}
|
||||
event(0) {
|
||||
eventName "redis.encode.start"
|
||||
}
|
||||
event(1) {
|
||||
eventName "redis.encode.end"
|
||||
}
|
||||
if (Boolean.getBoolean("testLatestDeps")) {
|
||||
// Seems to only be recorded with Lettuce 6+
|
||||
errorEvent(RedisException, "Connection disconnected", 2)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "shutdown command (returns void) produces no span"() {
|
||||
|
@ -460,7 +484,38 @@ abstract class AbstractLettuceSyncClientTest extends InstrumentationSpecificatio
|
|||
syncCommands.shutdown(false)
|
||||
|
||||
expect:
|
||||
// lettuce tracing does not trace shutdown
|
||||
assertTraces(0) {}
|
||||
assertTraces(1) {
|
||||
trace(0, 1) {
|
||||
span(0) {
|
||||
name "SHUTDOWN"
|
||||
if (Boolean.getBoolean("testLatestDeps")) {
|
||||
// Seems to only be treated as an error with Lettuce 6+
|
||||
errored true
|
||||
}
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" "IP.TCP"
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" port
|
||||
"${SemanticAttributes.DB_CONNECTION_STRING.key}" "redis://127.0.0.1:$port"
|
||||
"${SemanticAttributes.DB_SYSTEM.key}" "redis"
|
||||
"${SemanticAttributes.DB_STATEMENT.key}" "SHUTDOWN NOSAVE"
|
||||
if (!Boolean.getBoolean("testLatestDeps")) {
|
||||
// Lettuce adds this tag before 6.0
|
||||
// TODO(anuraaga): Filter this out?
|
||||
"error" "Connection disconnected"
|
||||
}
|
||||
}
|
||||
event(0) {
|
||||
eventName "redis.encode.start"
|
||||
}
|
||||
event(1) {
|
||||
eventName "redis.encode.end"
|
||||
}
|
||||
if (Boolean.getBoolean("testLatestDeps")) {
|
||||
errorEvent(RedisException, "Connection disconnected", 2)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.lettuce.v5_1
|
||||
|
||||
import groovy.transform.PackageScope
|
||||
import io.lettuce.core.ClientOptions
|
||||
|
||||
@PackageScope
|
||||
final class LettuceTestUtil {
|
||||
|
||||
static final ClientOptions CLIENT_OPTIONS
|
||||
|
||||
static {
|
||||
def options = ClientOptions.builder()
|
||||
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
|
||||
.autoReconnect(false)
|
||||
if (Boolean.getBoolean("testLatestDeps")) {
|
||||
// Force RESP2 on 6+ for consistency in tests
|
||||
options
|
||||
.pingBeforeActivateConnection(false)
|
||||
.protocolVersion(Class.forName("io.lettuce.core.protocol.ProtocolVersion").getField("RESP2").get(null))
|
||||
}
|
||||
CLIENT_OPTIONS = options.build()
|
||||
}
|
||||
|
||||
private LettuceTestUtil() {}
|
||||
}
|
Loading…
Reference in New Issue