lettuce reactive api for mono publishers completed.
This commit is contained in:
parent
989a259ec2
commit
bfdf4c858a
|
@ -0,0 +1,44 @@
|
||||||
|
package datadog.trace.instrumentation.lettuce.rx;
|
||||||
|
|
||||||
|
import static net.bytebuddy.matcher.ElementMatchers.*;
|
||||||
|
|
||||||
|
import com.google.auto.service.AutoService;
|
||||||
|
import datadog.trace.agent.tooling.DDAdvice;
|
||||||
|
import datadog.trace.agent.tooling.DDTransformers;
|
||||||
|
import datadog.trace.agent.tooling.HelperInjector;
|
||||||
|
import datadog.trace.agent.tooling.Instrumenter;
|
||||||
|
import net.bytebuddy.agent.builder.AgentBuilder;
|
||||||
|
|
||||||
|
@AutoService(Instrumenter.class)
|
||||||
|
public class RedisReactiveCommandsInstrumentation extends Instrumenter.Configurable {
|
||||||
|
|
||||||
|
private static final HelperInjector REDIS_ASYNC_HELPERS =
|
||||||
|
new HelperInjector(
|
||||||
|
RedisReactiveCommandsInstrumentation.class.getPackage().getName() + ".MonoCreationAdvice",
|
||||||
|
RedisReactiveCommandsInstrumentation.class.getPackage().getName() + ".MonoDualConsumer");
|
||||||
|
|
||||||
|
public RedisReactiveCommandsInstrumentation() {
|
||||||
|
super("redis");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean defaultEnabled() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AgentBuilder apply(AgentBuilder agentBuilder) {
|
||||||
|
return agentBuilder
|
||||||
|
.type(named("io.lettuce.core.AbstractRedisReactiveCommands"))
|
||||||
|
.transform(REDIS_ASYNC_HELPERS)
|
||||||
|
.transform(DDTransformers.defaultTransformers())
|
||||||
|
.transform(
|
||||||
|
DDAdvice.create()
|
||||||
|
.advice(
|
||||||
|
isMethod()
|
||||||
|
.and(named("createMono"))
|
||||||
|
.and(takesArgument(0, named("java.util.function.Supplier"))),
|
||||||
|
MonoCreationAdvice.class.getName()))
|
||||||
|
.asDecorator();
|
||||||
|
}
|
||||||
|
}
|
|
@ -59,6 +59,7 @@ public class RedisAsyncCommandsAdvice {
|
||||||
final Span span = scope.span();
|
final Span span = scope.span();
|
||||||
Tags.ERROR.set(span, true);
|
Tags.ERROR.set(span, true);
|
||||||
span.log(Collections.singletonMap("error.object", throwable));
|
span.log(Collections.singletonMap("error.object", throwable));
|
||||||
|
span.finish();
|
||||||
scope.close();
|
scope.close();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
package datadog.trace.instrumentation.lettuce.rx;
|
||||||
|
|
||||||
|
import io.lettuce.core.protocol.RedisCommand;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
import net.bytebuddy.asm.Advice;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
|
public class MonoCreationAdvice {
|
||||||
|
|
||||||
|
public static final String SERVICE_NAME = "redis";
|
||||||
|
public static final String COMPONENT_NAME = SERVICE_NAME + "-client";
|
||||||
|
public static final String MAP_KEY_CMD_NAME = "CMD_NAME";
|
||||||
|
public static final String MAP_KEY_CMD_ARGS = "CMD_ARGS";
|
||||||
|
|
||||||
|
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||||
|
public static Map<String, String> extractCommand(
|
||||||
|
@Advice.Argument(0) final Supplier<RedisCommand> supplier) {
|
||||||
|
RedisCommand command = supplier.get();
|
||||||
|
|
||||||
|
String commandName = "Redis Command";
|
||||||
|
String commandArgs = null;
|
||||||
|
Map<String, String> commandMap = new HashMap<>();
|
||||||
|
if (command != null) {
|
||||||
|
// get the arguments passed into the redis command
|
||||||
|
if (command.getArgs() != null) {
|
||||||
|
commandArgs = command.getArgs().toCommandString();
|
||||||
|
}
|
||||||
|
// get the redis command name (i.e. GET, SET, HMSET, etc)
|
||||||
|
if (command.getType() != null) {
|
||||||
|
commandName = command.getType().name();
|
||||||
|
// if it is an AUTH command, then remove the extracted command arguments since it is the password
|
||||||
|
if ("AUTH".equals(commandName)) {
|
||||||
|
commandArgs = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
commandMap.put(MAP_KEY_CMD_NAME, commandName);
|
||||||
|
commandMap.put(MAP_KEY_CMD_ARGS, commandArgs);
|
||||||
|
return commandMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
// throwables wouldn't matter here, because no spans have been started due to redis command not being
|
||||||
|
// run until the user subscribes to the Mono publisher
|
||||||
|
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||||
|
public static void monitorSpan(
|
||||||
|
@Advice.Enter final Map<String, String> commandMap,
|
||||||
|
@Advice.Return(readOnly = false) Mono<?> publisher) {
|
||||||
|
|
||||||
|
MonoDualConsumer mdc = new MonoDualConsumer(commandMap);
|
||||||
|
publisher = publisher.doOnSubscribe(mdc);
|
||||||
|
publisher = publisher.doOnSuccessOrError(mdc);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,58 @@
|
||||||
|
package datadog.trace.instrumentation.lettuce.rx;
|
||||||
|
|
||||||
|
import datadog.trace.api.DDTags;
|
||||||
|
import io.opentracing.Scope;
|
||||||
|
import io.opentracing.Span;
|
||||||
|
import io.opentracing.tag.Tags;
|
||||||
|
import io.opentracing.util.GlobalTracer;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
|
public class MonoDualConsumer<R, T, U extends Throwable>
|
||||||
|
implements Consumer<R>, BiConsumer<T, Throwable> {
|
||||||
|
|
||||||
|
private Span span = null;
|
||||||
|
private final Map<String, String> commandMap;
|
||||||
|
|
||||||
|
public MonoDualConsumer(Map<String, String> commandMap) {
|
||||||
|
this.commandMap = commandMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void accept(T t, Throwable throwable) {
|
||||||
|
if (this.span != null) {
|
||||||
|
if (throwable != null) {
|
||||||
|
Tags.ERROR.set(this.span, true);
|
||||||
|
this.span.log(Collections.singletonMap("error.object", throwable));
|
||||||
|
}
|
||||||
|
this.span.finish();
|
||||||
|
} else {
|
||||||
|
LoggerFactory.getLogger(Mono.class)
|
||||||
|
.error(
|
||||||
|
"Failed to finish this.span, BiConsumer cannot find this.span because "
|
||||||
|
+ "it probably wasn't started.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void accept(R r) {
|
||||||
|
final Scope scope =
|
||||||
|
GlobalTracer.get().buildSpan(MonoCreationAdvice.SERVICE_NAME + ".query").startActive(false);
|
||||||
|
this.span = scope.span();
|
||||||
|
|
||||||
|
Tags.DB_TYPE.set(this.span, MonoCreationAdvice.SERVICE_NAME);
|
||||||
|
Tags.SPAN_KIND.set(this.span, Tags.SPAN_KIND_CLIENT);
|
||||||
|
Tags.COMPONENT.set(this.span, MonoCreationAdvice.COMPONENT_NAME);
|
||||||
|
|
||||||
|
this.span.setTag(
|
||||||
|
DDTags.RESOURCE_NAME, this.commandMap.get(MonoCreationAdvice.MAP_KEY_CMD_NAME));
|
||||||
|
this.span.setTag("db.command.args", this.commandMap.get(MonoCreationAdvice.MAP_KEY_CMD_ARGS));
|
||||||
|
this.span.setTag(DDTags.SERVICE_NAME, MonoCreationAdvice.SERVICE_NAME);
|
||||||
|
this.span.setTag(DDTags.SPAN_TYPE, MonoCreationAdvice.SERVICE_NAME);
|
||||||
|
scope.close();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue