Add lettuce 5.2 instrumentation that uses lettuce's native tracing functionality. (#535)

* Copy Lettuce 5.0 to start 5.1 instrumentation

* Begin tracing adapter implementation

Co-authored-by: Dustin Neray <dustin.neray@gmail.com>

* Set floor to 5.2 instead

* Move around

* Finish

* Cleanups

* Instrument 5.1+ instead

* Cleanup

* 5.1

* Remove latestDepTest from lettuce-5.0 since we have a newer lettuce-5.1.

* Remove

* Remove package check

* Spotless

Co-authored-by: Dustin Neray <dustin.neray@gmail.com>
This commit is contained in:
Anuraag Agrawal 2020-06-19 00:22:45 +09:00 committed by GitHub
parent c1c02ac949
commit 9a2a0b5de9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1731 additions and 12 deletions

View File

@ -315,7 +315,6 @@ ruleset {
regex = '^[a-z][\\$_a-zA-Z0-9]*$|^.*\\s.*$'
}
ObjectOverrideMisspelledMethodName
PackageName
ParameterName
PropertyName
VariableName {

View File

@ -1,7 +1,6 @@
// Set properties before any plugins get loaded
ext {
minJavaVersionForTests = JavaVersion.VERSION_1_8
maxJavaVersionForTests = JavaVersion.VERSION_1_8
}
apply from: "$rootDir/gradle/instrumentation.gradle"
@ -11,24 +10,15 @@ muzzle {
pass {
group = "io.lettuce"
module = "lettuce-core"
versions = "[5.0.0.RELEASE,)"
versions = "[5.0.0.RELEASE,5.1.0.RELEASE)"
assertInverse = true
}
}
testSets {
latestDepTest {
dirName = 'test'
}
}
dependencies {
compileOnly group: 'io.lettuce', name: 'lettuce-core', version: '5.0.0.RELEASE'
compileOnly group: 'io.lettuce', name: 'lettuce-core', version: '5.0.0.RELEASE'
testCompile group: 'com.github.kstyrc', name: 'embedded-redis', version: '0.6'
testCompile group: 'io.lettuce', name: 'lettuce-core', version: '5.0.0.RELEASE'
testCompile project(':instrumentation:reactor-3.1')
latestDepTestCompile group: 'io.lettuce', name: 'lettuce-core', version: '5.+'
}

View File

@ -15,9 +15,11 @@
*/
package io.opentelemetry.auto.instrumentation.lettuce.v5_0;
import static io.opentelemetry.auto.tooling.ClassLoaderMatcher.hasClassesNamed;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
@ -34,6 +36,11 @@ public class LettuceAsyncCommandsInstrumentation extends Instrumenter.Default {
super("lettuce", "lettuce-5", "lettuce-5-async");
}
@Override
public ElementMatcher<ClassLoader> classLoaderMatcher() {
return not(hasClassesNamed("io.lettuce.core.tracing.Tracing"));
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("io.lettuce.core.AbstractRedisAsyncCommands");

View File

@ -15,12 +15,14 @@
*/
package io.opentelemetry.auto.instrumentation.lettuce.v5_0;
import static io.opentelemetry.auto.tooling.ClassLoaderMatcher.hasClassesNamed;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith;
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
@ -38,6 +40,11 @@ public final class LettuceClientInstrumentation extends Instrumenter.Default {
super("lettuce", "lettuce-5");
}
@Override
public ElementMatcher<ClassLoader> classLoaderMatcher() {
return not(hasClassesNamed("io.lettuce.core.tracing.Tracing"));
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("io.lettuce.core.RedisClient");

View File

@ -15,11 +15,13 @@
*/
package io.opentelemetry.auto.instrumentation.lettuce.v5_0;
import static io.opentelemetry.auto.tooling.ClassLoaderMatcher.hasClassesNamed;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith;
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
@ -38,6 +40,11 @@ public class LettuceReactiveCommandsInstrumentation extends Instrumenter.Default
super("lettuce", "lettuce-5", "lettuce-5-rx");
}
@Override
public ElementMatcher<ClassLoader> classLoaderMatcher() {
return not(hasClassesNamed("io.lettuce.core.tracing.Tracing"));
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("io.lettuce.core.AbstractRedisReactiveCommands");

View File

@ -0,0 +1,33 @@
// Set properties before any plugins get loaded
ext {
minJavaVersionForTests = JavaVersion.VERSION_1_8
}
apply from: "${rootDir}/gradle/instrumentation.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
muzzle {
pass {
group = "io.lettuce"
module = "lettuce-core"
versions = "[5.1.0.RELEASE,)"
assertInverse = true
}
}
testSets {
latestDepTest {
dirName = 'test'
}
}
dependencies {
compileOnly group: 'io.lettuce', name: 'lettuce-core', version: '5.1.0.RELEASE'
testCompile group: 'com.github.kstyrc', name: 'embedded-redis', version: '0.6'
// Only 5.2+ will have command arguments in the db.statement tag.
testCompile group: 'io.lettuce', name: 'lettuce-core', version: '5.2.0.RELEASE'
testCompile project(':instrumentation:reactor-3.1')
latestDepTestCompile group: 'io.lettuce', name: 'lettuce-core', version: '5.+'
}

View File

@ -0,0 +1,78 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.lettuce.v5_1;
import static io.opentelemetry.auto.tooling.ClassLoaderMatcher.hasClassesNamed;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import com.google.auto.service.AutoService;
import io.lettuce.core.resource.DefaultClientResources;
import io.opentelemetry.auto.tooling.Instrumenter;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public class LettuceClientResourcesInstrumentation extends Instrumenter.Default {
public LettuceClientResourcesInstrumentation() {
super("lettuce", "lettuce-5", "lettuce-5.1");
}
@Override
public ElementMatcher<ClassLoader> classLoaderMatcher() {
return hasClassesNamed("io.lettuce.core.tracing.Tracing");
}
@Override
public ElementMatcher<? super TypeDescription> typeMatcher() {
return named("io.lettuce.core.resource.DefaultClientResources");
}
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".OpenTelemetryTracing",
packageName + ".OpenTelemetryTracing$OpenTelemetryTracerProvider",
packageName + ".OpenTelemetryTracing$OpenTelemetryTraceContextProvider",
packageName + ".OpenTelemetryTracing$OpenTelemetryTraceContext",
packageName + ".OpenTelemetryTracing$OpenTelemetryEndpoint",
packageName + ".OpenTelemetryTracing$OpenTelemetryTracer",
packageName + ".OpenTelemetryTracing$OpenTelemetrySpan",
};
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod().and(isPublic()).and(isStatic()).and(named("builder")),
LettuceClientResourcesInstrumentation.class.getName() + "$DefaultClientResourcesAdvice");
}
public static class DefaultClientResourcesAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void methodEnter(@Advice.Return final DefaultClientResources.Builder builder) {
builder.tracing(OpenTelemetryTracing.INSTANCE);
}
}
}

View File

@ -0,0 +1,315 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.lettuce.v5_1;
import io.grpc.Context;
import io.lettuce.core.tracing.TraceContext;
import io.lettuce.core.tracing.TraceContextProvider;
import io.lettuce.core.tracing.Tracer;
import io.lettuce.core.tracing.TracerProvider;
import io.lettuce.core.tracing.Tracing;
import io.opentelemetry.OpenTelemetry;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.Span.Kind;
import io.opentelemetry.trace.Status;
import io.opentelemetry.trace.TracingContextUtils;
import io.opentelemetry.trace.attributes.SemanticAttributes;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import reactor.util.annotation.Nullable;
public enum OpenTelemetryTracing implements Tracing {
INSTANCE;
public static final io.opentelemetry.trace.Tracer TRACER =
OpenTelemetry.getTracerProvider().get("io.opentelemetry.auto.lettuce-5.1");
@Override
public TracerProvider getTracerProvider() {
return OpenTelemetryTracerProvider.INSTANCE;
}
@Override
public TraceContextProvider initialTraceContextProvider() {
return OpenTelemetryTraceContextProvider.INSTANCE;
}
@Override
public boolean isEnabled() {
return true;
}
// Added in lettuce 5.2
// @Override
public boolean includeCommandArgsInSpanTags() {
return true;
}
@Override
public Endpoint createEndpoint(SocketAddress socketAddress) {
if (socketAddress instanceof InetSocketAddress) {
InetSocketAddress address = (InetSocketAddress) socketAddress;
return new OpenTelemetryEndpoint(
address.getAddress().getHostAddress(), address.getPort(), address.getHostString());
}
return null;
}
private enum OpenTelemetryTracerProvider implements TracerProvider {
INSTANCE;
private final Tracer openTelemetryTracer = new OpenTelemetryTracer();
@Override
public Tracer getTracer() {
return openTelemetryTracer;
}
}
private enum OpenTelemetryTraceContextProvider implements TraceContextProvider {
INSTANCE;
@Override
public TraceContext getTraceContext() {
return new OpenTelemetryTraceContext();
}
}
private static class OpenTelemetryTraceContext implements TraceContext {
private final Context context;
OpenTelemetryTraceContext() {
this.context = Context.current();
}
public Context getContext() {
return context;
}
}
private static class OpenTelemetryEndpoint implements Endpoint {
final String ip;
final int port;
@Nullable final String name;
OpenTelemetryEndpoint(String ip, int port, @Nullable String name) {
this.ip = ip;
this.port = port;
if (!ip.equals(name)) {
this.name = name;
} else {
this.name = null;
}
}
}
private static class OpenTelemetryTracer extends Tracer {
OpenTelemetryTracer() {}
@Override
public OpenTelemetrySpan nextSpan() {
return new OpenTelemetrySpan(TRACER.getCurrentSpan());
}
@Override
public OpenTelemetrySpan nextSpan(TraceContext traceContext) {
if (!(traceContext instanceof OpenTelemetryTraceContext)) {
return nextSpan();
}
Context context = ((OpenTelemetryTraceContext) traceContext).getContext();
final io.opentelemetry.trace.Span parent = TracingContextUtils.getSpan(context);
return new OpenTelemetrySpan(parent);
}
}
// The order that callbacks will be called in or which thread they are called from is not well
// defined. We go ahead and buffer all data until we know we have a span. This implementation is
// particularly safe, synchronizing all accesses. Relying on implementation details would allow
// reducing synchronization but the impact should be minimal.
private static class OpenTelemetrySpan extends Tracer.Span {
private final Span.Builder spanBuilder;
@Nullable private String name;
@Nullable private List<Object> events;
@Nullable private Status status;
@Nullable private Span span;
@Nullable private String args;
OpenTelemetrySpan(Span parent) {
// Name will be updated later, we create with an arbitrary one here to store other data before
// the span starts.
spanBuilder =
TRACER
.spanBuilder("REDIS")
.setSpanKind(Kind.CLIENT)
.setParent(parent)
.setAttribute(SemanticAttributes.DB_TYPE.key(), "redis");
}
@Override
public synchronized Tracer.Span name(String name) {
if (span != null) {
span.updateName(name);
}
this.name = name;
return this;
}
@Override
public synchronized Tracer.Span remoteEndpoint(Endpoint endpoint) {
if (endpoint instanceof OpenTelemetryEndpoint) {
if (span != null) {
fillEndpoint(span, (OpenTelemetryEndpoint) endpoint);
} else {
fillEndpoint(spanBuilder, (OpenTelemetryEndpoint) endpoint);
}
}
return this;
}
@Override
public synchronized Tracer.Span start() {
span = spanBuilder.startSpan();
if (name != null) {
span.updateName(name);
}
if (events != null) {
for (int i = 0; i < events.size(); i += 2) {
span.addEvent((String) events.get(i), (long) events.get(i + 1));
}
events = null;
}
if (status != null) {
span.setStatus(status);
status = null;
}
return this;
}
@Override
public synchronized Tracer.Span annotate(String value) {
if (span != null) {
span.addEvent(value);
} else {
if (events == null) {
events = new ArrayList<>();
}
events.add(value);
final Instant now = Instant.now();
events.add(TimeUnit.SECONDS.toNanos(now.getEpochSecond()) + now.getNano());
}
return this;
}
@Override
public synchronized Tracer.Span tag(String key, String value) {
if (key.equals("redis.args")) {
args = value;
return this;
}
if (span != null) {
span.setAttribute(key, value);
} else {
spanBuilder.setAttribute(key, value);
}
return this;
}
@Override
public synchronized Tracer.Span error(Throwable throwable) {
// TODO(anuraaga): Check if any lettuce exceptions map well to a Status and try mapping.
final Status status =
Status.UNKNOWN.withDescription(throwable.getClass() + ": " + throwable.getMessage());
if (span != null) {
span.setStatus(status);
} else {
this.status = status;
}
return this;
}
@Override
public synchronized void finish() {
if (span != null) {
if (name != null) {
final String statement = args != null && !args.isEmpty() ? name + " " + args : name;
SemanticAttributes.DB_STATEMENT.set(span, statement);
}
span.end();
}
}
private static void fillEndpoint(Span.Builder span, OpenTelemetryEndpoint endpoint) {
span.setAttribute(SemanticAttributes.NET_TRANSPORT.key(), "IP.TCP");
span.setAttribute(SemanticAttributes.NET_PEER_IP.key(), endpoint.ip);
final StringBuilder redisUrl = new StringBuilder("redis://");
if (endpoint.name != null) {
span.setAttribute(SemanticAttributes.NET_PEER_NAME.key(), endpoint.name);
redisUrl.append(endpoint.name);
} else {
redisUrl.append(endpoint.ip);
}
if (endpoint.port != 0) {
span.setAttribute(SemanticAttributes.NET_PEER_PORT.key(), endpoint.port);
redisUrl.append(":").append(endpoint.port);
}
span.setAttribute(SemanticAttributes.DB_URL.key(), redisUrl.toString());
}
private static void fillEndpoint(Span span, OpenTelemetryEndpoint endpoint) {
span.setAttribute(SemanticAttributes.NET_TRANSPORT.key(), "IP.TCP");
span.setAttribute(SemanticAttributes.NET_PEER_IP.key(), endpoint.ip);
final StringBuilder redisUrl = new StringBuilder("redis://");
if (endpoint.name != null) {
span.setAttribute(SemanticAttributes.NET_PEER_NAME.key(), endpoint.name);
redisUrl.append(endpoint.name);
} else {
redisUrl.append(endpoint.ip);
}
if (endpoint.port != 0) {
span.setAttribute(SemanticAttributes.NET_PEER_PORT.key(), endpoint.port);
redisUrl.append(":").append(endpoint.port);
}
span.setAttribute(SemanticAttributes.DB_URL.key(), redisUrl.toString());
}
}
}

View File

@ -0,0 +1,407 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.lettuce.v5_1
import io.lettuce.core.ClientOptions
import io.lettuce.core.ConnectionFuture
import io.lettuce.core.RedisClient
import io.lettuce.core.RedisFuture
import io.lettuce.core.RedisURI
import io.lettuce.core.api.StatefulConnection
import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.api.sync.RedisCommands
import io.lettuce.core.codec.StringCodec
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.auto.test.utils.PortUtils
import redis.embedded.RedisServer
import spock.lang.Shared
import spock.util.concurrent.AsyncConditions
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit
import java.util.function.BiConsumer
import java.util.function.BiFunction
import java.util.function.Consumer
import java.util.function.Function
import static io.opentelemetry.trace.Span.Kind.CLIENT
class LettuceAsyncClientTest extends AgentTestRunner {
public static final String HOST = "127.0.0.1"
public static final int DB_INDEX = 0
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()
@Shared
int port
@Shared
int incorrectPort
@Shared
String dbAddr
@Shared
String dbAddrNonExistent
@Shared
String dbUriNonExistent
@Shared
String embeddedDbUri
@Shared
RedisServer redisServer
@Shared
Map<String, String> testHashMap = [
firstname: "John",
lastname : "Doe",
age : "53"
]
RedisClient redisClient
StatefulConnection connection
RedisAsyncCommands<String, ?> asyncCommands
RedisCommands<String, ?> syncCommands
def setupSpec() {
port = PortUtils.randomOpenPort()
incorrectPort = PortUtils.randomOpenPort()
dbAddr = HOST + ":" + port + "/" + DB_INDEX
dbAddrNonExistent = HOST + ":" + incorrectPort + "/" + DB_INDEX
dbUriNonExistent = "redis://" + dbAddrNonExistent
embeddedDbUri = "redis://" + dbAddr
redisServer = RedisServer.builder()
// bind to localhost to avoid firewall popup
.setting("bind " + HOST)
// set max memory to avoid problems in CI
.setting("maxmemory 128M")
.port(port).build()
}
def setup() {
redisClient = RedisClient.create(embeddedDbUri)
println "Using redis: $redisServer.args"
redisServer.start()
redisClient.setOptions(CLIENT_OPTIONS)
connection = redisClient.connect()
asyncCommands = connection.async()
syncCommands = connection.sync()
syncCommands.set("TESTKEY", "TESTVAL")
// 1 set
TEST_WRITER.waitForTraces(1)
TEST_WRITER.clear()
}
def cleanup() {
connection.close()
redisServer.stop()
}
def "connect using get on ConnectionFuture"() {
setup:
RedisClient testConnectionClient = RedisClient.create(embeddedDbUri)
testConnectionClient.setOptions(CLIENT_OPTIONS)
when:
ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8,
new RedisURI(HOST, port, 3, TimeUnit.SECONDS))
StatefulConnection connection = connectionFuture.get()
then:
connection != null
// Lettuce tracing does not trace connect
assertTraces(0) {}
cleanup:
connection.close()
}
def "connect exception inside the connection future"() {
setup:
RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent)
testConnectionClient.setOptions(CLIENT_OPTIONS)
when:
ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8,
new RedisURI(HOST, incorrectPort, 3, TimeUnit.SECONDS))
StatefulConnection connection = connectionFuture.get()
then:
connection == null
thrown ExecutionException
// Lettuce tracing does not trace connect
assertTraces(0) {}
}
def "set command using Future get with timeout"() {
setup:
RedisFuture<String> redisFuture = asyncCommands.set("TESTSETKEY", "TESTSETVAL")
String res = redisFuture.get(3, TimeUnit.SECONDS)
expect:
res == "OK"
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "SET"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "SET key<TESTSETKEY> value<TESTSETVAL>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
def "get command chained with thenAccept"() {
setup:
def conds = new AsyncConditions()
Consumer<String> consumer = new Consumer<String>() {
@Override
void accept(String res) {
conds.evaluate {
assert res == "TESTVAL"
}
}
}
when:
RedisFuture<String> redisFuture = asyncCommands.get("TESTKEY")
redisFuture.thenAccept(consumer)
then:
conds.await()
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "GET"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "GET key<TESTKEY>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
// to make sure instrumentation's chained completion stages won't interfere with user's, while still
// recording metrics
def "get non existent key command with handleAsync and chained with thenApply"() {
setup:
def conds = new AsyncConditions()
final String successStr = "KEY MISSING"
BiFunction<String, Throwable, String> firstStage = new BiFunction<String, Throwable, String>() {
@Override
String apply(String res, Throwable throwable) {
conds.evaluate {
assert res == null
assert throwable == null
}
return (res == null ? successStr : res)
}
}
Function<String, Object> secondStage = new Function<String, Object>() {
@Override
Object apply(String input) {
conds.evaluate {
assert input == successStr
}
return null
}
}
when:
RedisFuture<String> redisFuture = asyncCommands.get("NON_EXISTENT_KEY")
redisFuture.handleAsync(firstStage).thenApply(secondStage)
then:
conds.await()
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "GET"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "GET key<NON_EXISTENT_KEY>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
def "command with no arguments using a biconsumer"() {
setup:
def conds = new AsyncConditions()
BiConsumer<String, Throwable> biConsumer = new BiConsumer<String, Throwable>() {
@Override
void accept(String keyRetrieved, Throwable throwable) {
conds.evaluate {
assert keyRetrieved != null
}
}
}
when:
RedisFuture<String> redisFuture = asyncCommands.randomkey()
redisFuture.whenCompleteAsync(biConsumer)
then:
conds.await()
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "RANDOMKEY"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.statement" "RANDOMKEY"
"db.type" "redis"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
def "hash set and then nest apply to hash getall"() {
setup:
def conds = new AsyncConditions()
when:
RedisFuture<String> hmsetFuture = asyncCommands.hmset("TESTHM", testHashMap)
hmsetFuture.thenApplyAsync(new Function<String, Object>() {
@Override
Object apply(String setResult) {
conds.evaluate {
assert setResult == "OK"
}
RedisFuture<Map<String, String>> hmGetAllFuture = asyncCommands.hgetall("TESTHM")
hmGetAllFuture.exceptionally(new Function<Throwable, Map<String, String>>() {
@Override
Map<String, String> apply(Throwable throwable) {
println("unexpected:" + throwable.toString())
throwable.printStackTrace()
assert false
return null
}
})
hmGetAllFuture.thenAccept(new Consumer<Map<String, String>>() {
@Override
void accept(Map<String, String> hmGetAllResult) {
conds.evaluate {
assert testHashMap == hmGetAllResult
}
}
})
return null
}
})
then:
conds.await()
assertTraces(2) {
trace(0, 1) {
span(0) {
operationName "HMSET"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "HMSET key<TESTHM> key<firstname> value<John> key<lastname> value<Doe> key<age> value<53>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
trace(1, 1) {
span(0) {
operationName "HGETALL"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "HGETALL key<TESTHM>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
}

View File

@ -0,0 +1,474 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.lettuce.v5_1
import io.lettuce.core.ClientOptions
import io.lettuce.core.RedisClient
import io.lettuce.core.api.StatefulConnection
import io.lettuce.core.api.reactive.RedisReactiveCommands
import io.lettuce.core.api.sync.RedisCommands
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.auto.test.utils.PortUtils
import reactor.core.scheduler.Schedulers
import redis.embedded.RedisServer
import spock.lang.Shared
import spock.util.concurrent.AsyncConditions
import java.util.function.Consumer
import static io.opentelemetry.auto.test.utils.TraceUtils.runUnderTrace
import static io.opentelemetry.trace.Span.Kind.CLIENT
class LettuceReactiveClientTest extends AgentTestRunner {
public static final String HOST = "127.0.0.1"
public static final int DB_INDEX = 0
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()
@Shared
int port
@Shared
String embeddedDbUri
@Shared
RedisServer redisServer
RedisClient redisClient
StatefulConnection connection
RedisReactiveCommands<String, ?> reactiveCommands
RedisCommands<String, ?> syncCommands
def setupSpec() {
port = PortUtils.randomOpenPort()
String dbAddr = HOST + ":" + port + "/" + DB_INDEX
embeddedDbUri = "redis://" + dbAddr
redisServer = RedisServer.builder()
// bind to localhost to avoid firewall popup
.setting("bind " + HOST)
// set max memory to avoid problems in CI
.setting("maxmemory 128M")
.port(port).build()
}
def setup() {
redisClient = RedisClient.create(embeddedDbUri)
println "Using redis: $redisServer.args"
redisServer.start()
redisClient.setOptions(CLIENT_OPTIONS)
connection = redisClient.connect()
reactiveCommands = connection.reactive()
syncCommands = connection.sync()
syncCommands.set("TESTKEY", "TESTVAL")
// 1 set
TEST_WRITER.waitForTraces(1)
TEST_WRITER.clear()
}
def cleanup() {
connection.close()
redisClient.shutdown()
redisServer.stop()
}
def "set command with subscribe on a defined consumer"() {
setup:
def conds = new AsyncConditions()
Consumer<String> consumer = new Consumer<String>() {
@Override
void accept(String res) {
conds.evaluate {
assert res == "OK"
}
}
}
when:
reactiveCommands.set("TESTSETKEY", "TESTSETVAL").subscribe(consumer)
then:
conds.await()
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "SET"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "SET key<TESTSETKEY> value<TESTSETVAL>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
def "get command with lambda function"() {
setup:
def conds = new AsyncConditions()
when:
reactiveCommands.get("TESTKEY").subscribe { res -> conds.evaluate { assert res == "TESTVAL" } }
then:
conds.await()
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "GET"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "GET key<TESTKEY>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
// to make sure instrumentation's chained completion stages won't interfere with user's, while still
// recording metrics
def "get non existent key command"() {
setup:
def conds = new AsyncConditions()
final defaultVal = "NOT THIS VALUE"
when:
reactiveCommands.get("NON_EXISTENT_KEY").defaultIfEmpty(defaultVal).subscribe {
res ->
conds.evaluate {
assert res == defaultVal
}
}
then:
conds.await()
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "GET"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "GET key<NON_EXISTENT_KEY>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
def "command with no arguments"() {
setup:
def conds = new AsyncConditions()
when:
reactiveCommands.randomkey().subscribe {
res ->
conds.evaluate {
assert res == "TESTKEY"
}
}
then:
conds.await()
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "RANDOMKEY"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.statement" "RANDOMKEY"
"db.type" "redis"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
def "command flux publisher "() {
setup:
reactiveCommands.command().subscribe()
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "COMMAND"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.statement" "COMMAND"
"db.type" "redis"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
def "non reactive command should not produce span"() {
setup:
String res = null
when:
res = reactiveCommands.digest()
then:
res != null
TEST_WRITER.traces.size() == 0
}
def "blocking subscriber"() {
when:
runUnderTrace("test-parent") {
reactiveCommands.set("a", "1")
.then(reactiveCommands.get("a"))
.block()
}
then:
assertTraces(1) {
trace(0, 3) {
span(0) {
operationName "test-parent"
errored false
tags {
}
}
span(1) {
operationName "SET"
spanKind CLIENT
errored false
childOf span(0)
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "SET key<a> value<1>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
span(2) {
operationName "GET"
spanKind CLIENT
errored false
childOf span(0)
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "GET key<a>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
def "async subscriber"() {
when:
runUnderTrace("test-parent") {
reactiveCommands.set("a", "1")
.then(reactiveCommands.get("a"))
.subscribe()
}
then:
assertTraces(1) {
trace(0, 3) {
span(0) {
operationName "test-parent"
errored false
tags {
}
}
span(1) {
operationName "SET"
spanKind CLIENT
errored false
childOf span(0)
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "SET key<a> value<1>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
span(2) {
operationName "GET"
spanKind CLIENT
errored false
childOf span(0)
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "GET key<a>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
def "async subscriber with specific thread pool"() {
when:
runUnderTrace("test-parent") {
reactiveCommands.set("a", "1")
.then(reactiveCommands.get("a"))
.subscribeOn(Schedulers.elastic())
.subscribe()
}
then:
assertTraces(1) {
trace(0, 3) {
span(0) {
operationName "test-parent"
errored false
tags {
}
}
span(1) {
operationName "SET"
spanKind CLIENT
errored false
childOf span(0)
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "SET key<a> value<1>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
span(2) {
operationName "GET"
spanKind CLIENT
errored false
childOf span(0)
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "GET key<a>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
}

View File

@ -0,0 +1,401 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.lettuce.v5_1
import io.lettuce.core.ClientOptions
import io.lettuce.core.RedisClient
import io.lettuce.core.RedisConnectionException
import io.lettuce.core.api.StatefulConnection
import io.lettuce.core.api.sync.RedisCommands
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.auto.test.utils.PortUtils
import redis.embedded.RedisServer
import spock.lang.Shared
import static io.opentelemetry.trace.Span.Kind.CLIENT
class LettuceSyncClientTest extends AgentTestRunner {
public static final String HOST = "127.0.0.1"
public static final int DB_INDEX = 0
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()
@Shared
int port
@Shared
int incorrectPort
@Shared
String dbAddr
@Shared
String dbAddrNonExistent
@Shared
String dbUriNonExistent
@Shared
String embeddedDbUri
@Shared
String embeddedDbLocalhostUri
@Shared
RedisServer redisServer
@Shared
Map<String, String> testHashMap = [
firstname: "John",
lastname : "Doe",
age : "53"
]
RedisClient redisClient
StatefulConnection connection
RedisCommands<String, ?> syncCommands
def setupSpec() {
port = PortUtils.randomOpenPort()
incorrectPort = PortUtils.randomOpenPort()
dbAddr = HOST + ":" + port + "/" + DB_INDEX
dbAddrNonExistent = HOST + ":" + incorrectPort + "/" + DB_INDEX
dbUriNonExistent = "redis://" + dbAddrNonExistent
embeddedDbUri = "redis://" + dbAddr
embeddedDbLocalhostUri = "redis://localhost:" + port + "/" + DB_INDEX
redisServer = RedisServer.builder()
// bind to localhost to avoid firewall popup
.setting("bind " + HOST)
// set max memory to avoid problems in CI
.setting("maxmemory 128M")
.port(port).build()
}
def setup() {
redisClient = RedisClient.create(embeddedDbUri)
redisServer.start()
connection = redisClient.connect()
syncCommands = connection.sync()
syncCommands.set("TESTKEY", "TESTVAL")
syncCommands.hmset("TESTHM", testHashMap)
// 2 sets
TEST_WRITER.waitForTraces(2)
TEST_WRITER.clear()
}
def cleanup() {
connection.close()
redisServer.stop()
}
def "connect"() {
setup:
RedisClient testConnectionClient = RedisClient.create(embeddedDbUri)
testConnectionClient.setOptions(CLIENT_OPTIONS)
when:
StatefulConnection connection = testConnectionClient.connect()
then:
// Lettuce tracing does not trace connect
assertTraces(0) {}
cleanup:
connection.close()
}
def "connect exception"() {
setup:
RedisClient testConnectionClient = RedisClient.create(dbUriNonExistent)
testConnectionClient.setOptions(CLIENT_OPTIONS)
when:
testConnectionClient.connect()
then:
thrown RedisConnectionException
// Lettuce tracing does not trace connect
assertTraces(0) {}
}
def "set command"() {
setup:
String res = syncCommands.set("TESTSETKEY", "TESTSETVAL")
expect:
res == "OK"
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "SET"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "SET key<TESTSETKEY> value<TESTSETVAL>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
def "set command localhost"() {
setup:
RedisClient testConnectionClient = RedisClient.create(embeddedDbLocalhostUri)
testConnectionClient.setOptions(CLIENT_OPTIONS)
StatefulConnection connection = testConnectionClient.connect()
String res = connection.sync().set("TESTSETKEY", "TESTSETVAL")
expect:
res == "OK"
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "SET"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.name" "localhost"
"net.peer.port" port
"db.url" "redis://localhost:$port"
"db.type" "redis"
"db.statement" "SET key<TESTSETKEY> value<TESTSETVAL>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
def "get command"() {
setup:
String res = syncCommands.get("TESTKEY")
expect:
res == "TESTVAL"
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "GET"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "GET key<TESTKEY>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
def "get non existent key command"() {
setup:
String res = syncCommands.get("NON_EXISTENT_KEY")
expect:
res == null
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "GET"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "GET key<NON_EXISTENT_KEY>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
def "command with no arguments"() {
setup:
def keyRetrieved = syncCommands.randomkey()
expect:
keyRetrieved != null
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "RANDOMKEY"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.statement" "RANDOMKEY"
"db.type" "redis"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
def "list command"() {
setup:
long res = syncCommands.lpush("TESTLIST", "TESTLIST ELEMENT")
expect:
res == 1
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "LPUSH"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "LPUSH key<TESTLIST> value<TESTLIST ELEMENT>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
def "hash set command"() {
setup:
def res = syncCommands.hmset("user", testHashMap)
expect:
res == "OK"
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "HMSET"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "HMSET key<user> key<firstname> value<John> key<lastname> value<Doe> key<age> value<53>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
def "hash getall command"() {
setup:
Map<String, String> res = syncCommands.hgetall("TESTHM")
expect:
res == testHashMap
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "HGETALL"
spanKind CLIENT
errored false
tags {
"net.transport" "IP.TCP"
"net.peer.ip" "127.0.0.1"
"net.peer.port" port
"db.url" "redis://127.0.0.1:$port"
"db.type" "redis"
"db.statement" "HGETALL key<TESTHM>"
}
event(0) {
eventName "redis.encode.start"
}
event(1) {
eventName "redis.encode.end"
}
}
}
}
}
def "debug segfault command (returns void) with no argument produces no span"() {
setup:
syncCommands.debugSegfault()
expect:
// lettuce tracing does not trace debug
assertTraces(0) {}
}
def "shutdown command (returns void) produces no span"() {
setup:
syncCommands.shutdown(false)
expect:
// lettuce tracing does not trace shutdown
assertTraces(0) {}
}
}

View File

@ -104,6 +104,7 @@ include ':instrumentation:kafka-streams-0.11'
include ':instrumentation:khttp-0.1'
include ':instrumentation:lettuce:lettuce-4.0'
include ':instrumentation:lettuce:lettuce-5.0'
include ':instrumentation:lettuce:lettuce-5.1'
include ':instrumentation:log4j:log4j-1.1'
include ':instrumentation:log4j:log4j-2.0'
include ':instrumentation:logback-1.0'