Use Context more in DatabaseClientTracer (#1836)

This commit is contained in:
Trask Stalnaker 2020-12-06 23:15:15 -08:00 committed by GitHub
parent 1ca562ca9c
commit 4cbfb361e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 398 additions and 354 deletions

View File

@ -13,7 +13,6 @@ import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.attributes.SemanticAttributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.tracer.utils.NetPeerUtils;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;
@ -28,12 +27,17 @@ public abstract class DatabaseClientTracer<CONNECTION, QUERY> extends BaseTracer
tracer = OpenTelemetry.getGlobalTracer(getInstrumentationName(), getVersion());
}
public Span startSpan(CONNECTION connection, QUERY query) {
public boolean shouldStartSpan(Context parentContext) {
return parentContext.get(CONTEXT_CLIENT_SPAN_KEY) == null;
}
public Context startSpan(Context parentContext, CONNECTION connection, QUERY query) {
String normalizedQuery = normalizeQuery(query);
Span span =
tracer
.spanBuilder(spanName(connection, query, normalizedQuery))
.setParent(parentContext)
.setSpanKind(CLIENT)
.setAttribute(SemanticAttributes.DB_SYSTEM, dbSystem(connection))
.startSpan();
@ -44,20 +48,7 @@ public abstract class DatabaseClientTracer<CONNECTION, QUERY> extends BaseTracer
}
onStatement(span, normalizedQuery);
return span;
}
/**
* Creates new scoped context with the given span.
*
* <p>Attaches new context to the request to avoid creating duplicate client spans.
*/
@Override
public Scope startScope(Span span) {
// TODO we could do this in one go, but TracingContextUtils.CONTEXT_SPAN_KEY is private
Context clientSpanContext = Context.current().with(CONTEXT_CLIENT_SPAN_KEY, span);
Context newContext = clientSpanContext.with(span);
return newContext.makeCurrent();
return parentContext.with(span).with(CONTEXT_CLIENT_SPAN_KEY, span);
}
@Override
@ -70,13 +61,12 @@ public abstract class DatabaseClientTracer<CONNECTION, QUERY> extends BaseTracer
return context.get(CONTEXT_CLIENT_SPAN_KEY);
}
@Override
public void end(Span span) {
span.end();
public void end(Context context) {
Span.fromContext(context).end();
}
@Override
public void endExceptionally(Span span, Throwable throwable) {
public void endExceptionally(Context context, Throwable throwable) {
Span span = Span.fromContext(context);
onError(span, throwable);
end(span);
}

View File

@ -10,6 +10,7 @@ import com.datastax.driver.core.Host;
import com.datastax.driver.core.Session;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.attributes.SemanticAttributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.DatabaseClientTracer;
import io.opentelemetry.instrumentation.api.tracer.utils.NetPeerUtils;
import io.opentelemetry.javaagent.instrumentation.api.db.DbSystem;
@ -53,9 +54,10 @@ public class CassandraDatabaseClientTracer extends DatabaseClientTracer<Session,
return null;
}
public void end(Span span, ExecutionInfo executionInfo) {
public void end(Context context, ExecutionInfo executionInfo) {
Span span = Span.fromContext(context);
Host host = executionInfo.getQueriedHost();
NetPeerUtils.INSTANCE.setNetPeer(span, host.getSocketAddress());
end(span);
end(context);
}
}

View File

@ -21,7 +21,7 @@ import com.google.common.base.Function;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.Map;
@ -58,86 +58,86 @@ public class TracingSession implements Session {
@Override
public ResultSet execute(String query) {
Span span = tracer().startSpan(session, query);
Context context = tracer().startSpan(Context.current(), session, query);
ResultSet resultSet;
try (Scope ignored = tracer().startScope(span)) {
try (Scope ignored = context.makeCurrent()) {
resultSet = session.execute(query);
} catch (Throwable t) {
tracer().endExceptionally(span, t);
tracer().endExceptionally(context, t);
throw t;
}
tracer().end(span, resultSet.getExecutionInfo());
tracer().end(context, resultSet.getExecutionInfo());
return resultSet;
}
@Override
public ResultSet execute(String query, Object... values) {
Span span = tracer().startSpan(session, query);
Context context = tracer().startSpan(Context.current(), session, query);
ResultSet resultSet;
try (Scope ignored = tracer().startScope(span)) {
try (Scope ignored = context.makeCurrent()) {
resultSet = session.execute(query, values);
} catch (Throwable t) {
tracer().endExceptionally(span, t);
tracer().endExceptionally(context, t);
throw t;
}
tracer().end(span, resultSet.getExecutionInfo());
tracer().end(context, resultSet.getExecutionInfo());
return resultSet;
}
@Override
public ResultSet execute(String query, Map<String, Object> values) {
Span span = tracer().startSpan(session, query);
Context context = tracer().startSpan(Context.current(), session, query);
ResultSet resultSet;
try (Scope ignored = tracer().startScope(span)) {
try (Scope ignored = context.makeCurrent()) {
resultSet = session.execute(query, values);
} catch (Throwable t) {
tracer().endExceptionally(span, t);
tracer().endExceptionally(context, t);
throw t;
}
tracer().end(span, resultSet.getExecutionInfo());
tracer().end(context, resultSet.getExecutionInfo());
return resultSet;
}
@Override
public ResultSet execute(Statement statement) {
Span span = tracer().startSpan(session, getQuery(statement));
Context context = tracer().startSpan(Context.current(), session, getQuery(statement));
ResultSet resultSet;
try (Scope ignored = tracer().startScope(span)) {
try (Scope ignored = context.makeCurrent()) {
resultSet = session.execute(statement);
} catch (Throwable t) {
tracer().endExceptionally(span, t);
tracer().endExceptionally(context, t);
throw t;
}
tracer().end(span, resultSet.getExecutionInfo());
tracer().end(context, resultSet.getExecutionInfo());
return resultSet;
}
@Override
public ResultSetFuture executeAsync(String query) {
Span span = tracer().startSpan(session, query);
try (Scope ignored = tracer().startScope(span)) {
Context context = tracer().startSpan(Context.current(), session, query);
try (Scope ignored = context.makeCurrent()) {
ResultSetFuture future = session.executeAsync(query);
addCallbackToEndSpan(future, span);
addCallbackToEndSpan(future, context);
return future;
}
}
@Override
public ResultSetFuture executeAsync(String query, Object... values) {
Span span = tracer().startSpan(session, query);
try (Scope ignored = tracer().startScope(span)) {
Context context = tracer().startSpan(Context.current(), session, query);
try (Scope ignored = context.makeCurrent()) {
ResultSetFuture future = session.executeAsync(query, values);
addCallbackToEndSpan(future, span);
addCallbackToEndSpan(future, context);
return future;
}
}
@Override
public ResultSetFuture executeAsync(String query, Map<String, Object> values) {
Span span = tracer().startSpan(session, query);
try (Scope ignored = tracer().startScope(span)) {
Context context = tracer().startSpan(Context.current(), session, query);
try (Scope ignored = context.makeCurrent()) {
ResultSetFuture future = session.executeAsync(query, values);
addCallbackToEndSpan(future, span);
addCallbackToEndSpan(future, context);
return future;
}
}
@ -145,10 +145,10 @@ public class TracingSession implements Session {
@Override
public ResultSetFuture executeAsync(Statement statement) {
String query = getQuery(statement);
Span span = tracer().startSpan(session, query);
try (Scope ignored = tracer().startScope(span)) {
Context context = tracer().startSpan(Context.current(), session, query);
try (Scope ignored = context.makeCurrent()) {
ResultSetFuture future = session.executeAsync(statement);
addCallbackToEndSpan(future, span);
addCallbackToEndSpan(future, context);
return future;
}
}
@ -209,18 +209,18 @@ public class TracingSession implements Session {
return query == null ? "" : query;
}
private void addCallbackToEndSpan(ResultSetFuture future, final Span span) {
private void addCallbackToEndSpan(ResultSetFuture future, Context context) {
Futures.addCallback(
future,
new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
tracer().end(span, result.getExecutionInfo());
tracer().end(context, result.getExecutionInfo());
}
@Override
public void onFailure(Throwable t) {
tracer().endExceptionally(span, t);
tracer().endExceptionally(context, t);
}
},
directExecutor());

View File

@ -10,6 +10,7 @@ import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.metadata.Node;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.DatabaseClientTracer;
import io.opentelemetry.instrumentation.api.tracer.utils.NetPeerUtils;
import io.opentelemetry.javaagent.instrumentation.api.db.DbSystem;
@ -48,11 +49,12 @@ public class CassandraDatabaseClientTracer extends DatabaseClientTracer<CqlSessi
return null;
}
public void onResponse(Span span, ExecutionInfo executionInfo) {
public void onResponse(Context context, ExecutionInfo executionInfo) {
Node coordinator = executionInfo.getCoordinator();
if (coordinator != null) {
SocketAddress socketAddress = coordinator.getEndPoint().resolve();
if (socketAddress instanceof InetSocketAddress) {
Span span = Span.fromContext(context);
NetPeerUtils.INSTANCE.setNetPeer(span, ((InetSocketAddress) socketAddress));
}
}

View File

@ -23,7 +23,7 @@ import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
@ -174,17 +174,17 @@ public class TracingCqlSession implements CqlSession {
@NonNull
public ResultSet execute(@NonNull String query) {
Span span = tracer().startSpan(session, query);
try (Scope ignored = tracer().startScope(span)) {
Context context = tracer().startSpan(Context.current(), session, query);
try (Scope ignored = context.makeCurrent()) {
try {
ResultSet resultSet = session.execute(query);
tracer().onResponse(span, resultSet.getExecutionInfo());
tracer().onResponse(context, resultSet.getExecutionInfo());
return resultSet;
} catch (RuntimeException e) {
tracer().endExceptionally(span, e);
tracer().endExceptionally(context, e);
throw e;
} finally {
tracer().end(span);
tracer().end(context);
}
}
}
@ -194,17 +194,17 @@ public class TracingCqlSession implements CqlSession {
public ResultSet execute(@NonNull Statement<?> statement) {
String query = getQuery(statement);
Span span = tracer().startSpan(session, query);
try (Scope ignored = tracer().startScope(span)) {
Context context = tracer().startSpan(Context.current(), session, query);
try (Scope ignored = context.makeCurrent()) {
try {
ResultSet resultSet = session.execute(statement);
tracer().onResponse(span, resultSet.getExecutionInfo());
tracer().onResponse(context, resultSet.getExecutionInfo());
return resultSet;
} catch (RuntimeException e) {
tracer().endExceptionally(span, e);
tracer().endExceptionally(context, e);
throw e;
} finally {
tracer().end(span);
tracer().end(context);
}
}
}
@ -214,16 +214,16 @@ public class TracingCqlSession implements CqlSession {
public CompletionStage<AsyncResultSet> executeAsync(@NonNull Statement<?> statement) {
String query = getQuery(statement);
Span span = tracer().startSpan(session, query);
try (Scope ignored = tracer().startScope(span)) {
Context context = tracer().startSpan(Context.current(), session, query);
try (Scope ignored = context.makeCurrent()) {
CompletionStage<AsyncResultSet> stage = session.executeAsync(statement);
return stage.whenComplete(
(asyncResultSet, throwable) -> {
if (throwable != null) {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
} else {
tracer().onResponse(span, asyncResultSet.getExecutionInfo());
tracer().end(span);
tracer().onResponse(context, asyncResultSet.getExecutionInfo());
tracer().end(context);
}
});
}
@ -232,16 +232,16 @@ public class TracingCqlSession implements CqlSession {
@Override
@NonNull
public CompletionStage<AsyncResultSet> executeAsync(@NonNull String query) {
Span span = tracer().startSpan(session, query);
try (Scope ignored = tracer().startScope(span)) {
Context context = tracer().startSpan(Context.current(), session, query);
try (Scope ignored = context.makeCurrent()) {
CompletionStage<AsyncResultSet> stage = session.executeAsync(query);
return stage.whenComplete(
(asyncResultSet, throwable) -> {
if (throwable != null) {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
} else {
tracer().onResponse(span, asyncResultSet.getExecutionInfo());
tracer().end(span);
tracer().onResponse(context, asyncResultSet.getExecutionInfo());
tracer().end(context);
}
});
}

View File

@ -5,6 +5,7 @@
package io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.v5_0;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.ElasticsearchRestClientTracer.tracer;
import static io.opentelemetry.javaagent.tooling.matcher.NameMatchers.namedOneOf;
import static java.util.Collections.singletonList;
@ -15,7 +16,7 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
@ -64,24 +65,24 @@ public class Elasticsearch5RestClientInstrumentationModule extends Instrumentati
public static void onEnter(
@Advice.Argument(0) String method,
@Advice.Argument(1) String endpoint,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope,
@Advice.Argument(value = 5, readOnly = false) ResponseListener responseListener) {
span = tracer().startSpan(null, method + " " + endpoint);
scope = tracer().startScope(span);
context = tracer().startSpan(currentContext(), null, method + " " + endpoint);
scope = context.makeCurrent();
responseListener = new RestResponseListener(responseListener, span);
responseListener = new RestResponseListener(responseListener, context);
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
scope.close();
if (throwable != null) {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
}
}
}

View File

@ -7,33 +7,33 @@ package io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.v5_0;
import static io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.ElasticsearchRestClientTracer.tracer;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
public class RestResponseListener implements ResponseListener {
private final ResponseListener listener;
private final Span span;
private final Context context;
public RestResponseListener(ResponseListener listener, Span span) {
public RestResponseListener(ResponseListener listener, Context context) {
this.listener = listener;
this.span = span;
this.context = context;
}
@Override
public void onSuccess(Response response) {
if (response.getHost() != null) {
tracer().onResponse(span, response);
tracer().onResponse(context, response);
}
tracer().end(span);
tracer().end(context);
listener.onSuccess(response);
}
@Override
public void onFailure(Exception e) {
tracer().endExceptionally(span, e);
tracer().endExceptionally(context, e);
listener.onFailure(e);
}
}

View File

@ -5,6 +5,7 @@
package io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.v6_4;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.ElasticsearchRestClientTracer.tracer;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
@ -14,7 +15,7 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
@ -63,23 +64,25 @@ public class Elasticsearch6RestClientInstrumentationModule extends Instrumentati
public static void onEnter(
@Advice.Argument(0) Request request,
@Advice.Argument(value = 1, readOnly = false) ResponseListener responseListener,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
span = tracer().startSpan(null, request.getMethod() + " " + request.getEndpoint());
scope = tracer().startScope(span);
context =
tracer()
.startSpan(currentContext(), null, request.getMethod() + " " + request.getEndpoint());
scope = context.makeCurrent();
responseListener = new RestResponseListener(responseListener, span);
responseListener = new RestResponseListener(responseListener, context);
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
scope.close();
if (throwable != null) {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
}
}
}

View File

@ -7,33 +7,33 @@ package io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.v6_4;
import static io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.ElasticsearchRestClientTracer.tracer;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
public class RestResponseListener implements ResponseListener {
private final ResponseListener listener;
private final Span span;
private final Context context;
public RestResponseListener(ResponseListener listener, Span span) {
public RestResponseListener(ResponseListener listener, Context context) {
this.listener = listener;
this.span = span;
this.context = context;
}
@Override
public void onSuccess(Response response) {
if (response.getHost() != null) {
tracer().onResponse(span, response);
tracer().onResponse(context, response);
}
tracer().end(span);
tracer().end(context);
listener.onSuccess(response);
}
@Override
public void onFailure(Exception e) {
tracer().endExceptionally(span, e);
tracer().endExceptionally(context, e);
listener.onFailure(e);
}
}

View File

@ -7,6 +7,7 @@ package io.opentelemetry.javaagent.instrumentation.elasticsearch.rest;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.attributes.SemanticAttributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.DatabaseClientTracer;
import io.opentelemetry.instrumentation.api.tracer.utils.NetPeerUtils;
import java.net.InetSocketAddress;
@ -19,12 +20,12 @@ public class ElasticsearchRestClientTracer extends DatabaseClientTracer<Void, St
return TRACER;
}
public Span onResponse(Span span, Response response) {
public void onResponse(Context context, Response response) {
if (response != null && response.getHost() != null) {
Span span = Span.fromContext(context);
NetPeerUtils.INSTANCE.setNetPeer(span, response.getHost().getHostName(), null);
span.setAttribute(SemanticAttributes.NET_PEER_PORT, (long) response.getHost().getPort());
}
return span;
}
@Override

View File

@ -5,6 +5,7 @@
package io.opentelemetry.javaagent.instrumentation.elasticsearch.transport.v5_0;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.elasticsearch.transport.ElasticsearchTransportClientTracer.tracer;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
@ -13,7 +14,7 @@ import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
@ -66,27 +67,27 @@ public class Elasticsearch5TransportClientInstrumentationModule extends Instrume
public static void onEnter(
@Advice.Argument(0) Action action,
@Advice.Argument(1) ActionRequest actionRequest,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope,
@Advice.Argument(value = 2, readOnly = false)
ActionListener<ActionResponse> actionListener) {
span = tracer().startSpan(null, action);
scope = tracer().startScope(span);
context = tracer().startSpan(currentContext(), null, action);
scope = context.makeCurrent();
tracer().onRequest(span, action.getClass(), actionRequest.getClass());
actionListener = new TransportActionListener<>(actionRequest, actionListener, span);
tracer().onRequest(context, action.getClass(), actionRequest.getClass());
actionListener = new TransportActionListener<>(actionRequest, actionListener, context);
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
scope.close();
if (throwable != null) {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
}
}
}

View File

@ -9,6 +9,7 @@ import static io.opentelemetry.javaagent.instrumentation.elasticsearch.transport
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.attributes.SemanticAttributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.utils.NetPeerUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
@ -26,16 +27,18 @@ import org.elasticsearch.action.support.replication.ReplicationResponse;
public class TransportActionListener<T extends ActionResponse> implements ActionListener<T> {
private final ActionListener<T> listener;
private final Span span;
private final Context context;
public TransportActionListener(
ActionRequest actionRequest, ActionListener<T> listener, Span span) {
ActionRequest actionRequest, ActionListener<T> listener, Context context) {
this.listener = listener;
this.span = span;
this.context = context;
onRequest(actionRequest);
}
private void onRequest(ActionRequest request) {
Span span = Span.fromContext(context);
if (request instanceof IndicesRequest) {
IndicesRequest req = (IndicesRequest) request;
String[] indices = req.indices();
@ -59,6 +62,8 @@ public class TransportActionListener<T extends ActionResponse> implements Action
@Override
public void onResponse(T response) {
Span span = Span.fromContext(context);
if (response.remoteAddress() != null) {
NetPeerUtils.INSTANCE.setNetPeer(
span, response.remoteAddress().getHost(), response.remoteAddress().getAddress());
@ -113,7 +118,7 @@ public class TransportActionListener<T extends ActionResponse> implements Action
@Override
public void onFailure(Exception e) {
tracer().endExceptionally(span, e);
tracer().endExceptionally(context, e);
listener.onFailure(e);
}
}

View File

@ -5,6 +5,7 @@
package io.opentelemetry.javaagent.instrumentation.elasticsearch.transport.v5_3;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.elasticsearch.transport.ElasticsearchTransportClientTracer.tracer;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
@ -13,7 +14,7 @@ import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
@ -67,27 +68,27 @@ public class Elasticsearch53TransportClientInstrumentationModule extends Instrum
public static void onEnter(
@Advice.Argument(0) Action action,
@Advice.Argument(1) ActionRequest actionRequest,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope,
@Advice.Argument(value = 2, readOnly = false)
ActionListener<ActionResponse> actionListener) {
span = tracer().startSpan(null, action);
scope = tracer().startScope(span);
context = tracer().startSpan(currentContext(), null, action);
scope = context.makeCurrent();
tracer().onRequest(span, action.getClass(), actionRequest.getClass());
actionListener = new TransportActionListener<>(actionRequest, actionListener, span);
tracer().onRequest(context, action.getClass(), actionRequest.getClass());
actionListener = new TransportActionListener<>(actionRequest, actionListener, context);
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
scope.close();
if (throwable != null) {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
}
}
}

View File

@ -9,6 +9,7 @@ import static io.opentelemetry.javaagent.instrumentation.elasticsearch.transport
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.attributes.SemanticAttributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.utils.NetPeerUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
@ -26,16 +27,18 @@ import org.elasticsearch.action.support.replication.ReplicationResponse;
public class TransportActionListener<T extends ActionResponse> implements ActionListener<T> {
private final ActionListener<T> listener;
private final Span span;
private final Context context;
public TransportActionListener(
ActionRequest actionRequest, ActionListener<T> listener, Span span) {
ActionRequest actionRequest, ActionListener<T> listener, Context context) {
this.listener = listener;
this.span = span;
this.context = context;
onRequest(actionRequest);
}
private void onRequest(ActionRequest request) {
Span span = Span.fromContext(context);
if (request instanceof IndicesRequest) {
IndicesRequest req = (IndicesRequest) request;
String[] indices = req.indices();
@ -60,6 +63,8 @@ public class TransportActionListener<T extends ActionResponse> implements Action
@Override
public void onResponse(T response) {
Span span = Span.fromContext(context);
if (response.remoteAddress() != null) {
NetPeerUtils.INSTANCE.setNetPeer(
span, response.remoteAddress().getHost(), response.remoteAddress().getAddress());
@ -114,7 +119,7 @@ public class TransportActionListener<T extends ActionResponse> implements Action
@Override
public void onFailure(Exception e) {
tracer().endExceptionally(span, e);
tracer().endExceptionally(context, e);
listener.onFailure(e);
}
}

View File

@ -5,6 +5,7 @@
package io.opentelemetry.javaagent.instrumentation.elasticsearch.transport.v6_0;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.elasticsearch.transport.ElasticsearchTransportClientTracer.tracer;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
@ -13,7 +14,7 @@ import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
@ -70,26 +71,26 @@ public class Elasticsearch6TransportClientInstrumentationModule extends Instrume
public static void onEnter(
@Advice.Argument(0) Action action,
@Advice.Argument(1) ActionRequest actionRequest,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope,
@Advice.Argument(value = 2, readOnly = false)
ActionListener<ActionResponse> actionListener) {
span = tracer().startSpan(null, action);
scope = tracer().startScope(span);
tracer().onRequest(span, action.getClass(), actionRequest.getClass());
actionListener = new TransportActionListener<>(actionRequest, actionListener, span);
context = tracer().startSpan(currentContext(), null, action);
scope = context.makeCurrent();
tracer().onRequest(context, action.getClass(), actionRequest.getClass());
actionListener = new TransportActionListener<>(actionRequest, actionListener, context);
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
scope.close();
if (throwable != null) {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
}
}
}

View File

@ -9,6 +9,7 @@ import static io.opentelemetry.javaagent.instrumentation.elasticsearch.transport
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.attributes.SemanticAttributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.utils.NetPeerUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
@ -30,16 +31,17 @@ import org.elasticsearch.action.support.replication.ReplicationResponse;
public class TransportActionListener<T extends ActionResponse> implements ActionListener<T> {
private final ActionListener<T> listener;
private final Span span;
private final Context context;
public TransportActionListener(
ActionRequest actionRequest, ActionListener<T> listener, Span span) {
ActionRequest actionRequest, ActionListener<T> listener, Context context) {
this.listener = listener;
this.span = span;
this.context = context;
onRequest(actionRequest);
}
private void onRequest(ActionRequest request) {
Span span = Span.fromContext(context);
if (request instanceof IndicesRequest) {
IndicesRequest req = (IndicesRequest) request;
String[] indices = req.indices();
@ -64,6 +66,8 @@ public class TransportActionListener<T extends ActionResponse> implements Action
@Override
public void onResponse(T response) {
Span span = Span.fromContext(context);
if (response.remoteAddress() != null) {
NetPeerUtils.INSTANCE.setNetPeer(
span,
@ -114,13 +118,13 @@ public class TransportActionListener<T extends ActionResponse> implements Action
span.setAttribute("elasticsearch.node.cluster.name", resp.getClusterName().value());
}
tracer().end(span);
tracer().end(context);
listener.onResponse(response);
}
@Override
public void onFailure(Exception e) {
tracer().endExceptionally(span, e);
tracer().endExceptionally(context, e);
listener.onFailure(e);
}
}

View File

@ -7,6 +7,7 @@ package io.opentelemetry.javaagent.instrumentation.elasticsearch.transport;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.attributes.SemanticAttributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.DatabaseClientTracer;
import java.net.InetSocketAddress;
import org.elasticsearch.action.Action;
@ -20,10 +21,10 @@ public class ElasticsearchTransportClientTracer
return TRACER;
}
public Span onRequest(Span span, Class action, Class request) {
public void onRequest(Context context, Class action, Class request) {
Span span = Span.fromContext(context);
span.setAttribute("elasticsearch.action", action.getSimpleName());
span.setAttribute("elasticsearch.request", request.getSimpleName());
return span;
}
@Override

View File

@ -8,7 +8,7 @@ package io.opentelemetry.javaagent.instrumentation.jdbc;
import static io.opentelemetry.javaagent.instrumentation.jdbc.JdbcUtils.connectionFromStatement;
import static io.opentelemetry.javaagent.instrumentation.jdbc.JdbcUtils.normalizeAndExtractInfo;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.DatabaseClientTracer;
import io.opentelemetry.javaagent.instrumentation.api.CallDepth;
import io.opentelemetry.javaagent.instrumentation.api.CallDepthThreadLocalMap;
@ -66,15 +66,16 @@ public class JdbcTracer extends DatabaseClientTracer<DbInfo, SqlStatementInfo> {
return CallDepthThreadLocalMap.getCallDepth(Statement.class);
}
public Span startSpan(PreparedStatement statement) {
return startSpan(statement, JdbcMaps.preparedStatements.get(statement));
public Context startSpan(Context parentContext, PreparedStatement statement) {
return startSpan(parentContext, statement, JdbcMaps.preparedStatements.get(statement));
}
public Span startSpan(Statement statement, String query) {
return startSpan(statement, normalizeAndExtractInfo(query));
public Context startSpan(Context parentContext, Statement statement, String query) {
return startSpan(parentContext, statement, normalizeAndExtractInfo(query));
}
public Span startSpan(Statement statement, SqlStatementInfo queryInfo) {
private Context startSpan(
Context parentContext, Statement statement, SqlStatementInfo queryInfo) {
Connection connection = connectionFromStatement(statement);
if (connection == null) {
return null;
@ -82,7 +83,7 @@ public class JdbcTracer extends DatabaseClientTracer<DbInfo, SqlStatementInfo> {
DbInfo dbInfo = extractDbInfo(connection);
return startSpan(dbInfo, queryInfo);
return startSpan(parentContext, dbInfo, queryInfo);
}
@Override

View File

@ -5,6 +5,7 @@
package io.opentelemetry.javaagent.instrumentation.jdbc;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.jdbc.JdbcTracer.tracer;
import static io.opentelemetry.javaagent.tooling.ClassLoaderMatcher.hasClassesNamed;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface;
@ -14,9 +15,8 @@ import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.api.CallDepth;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.sql.PreparedStatement;
import java.util.Map;
@ -49,32 +49,31 @@ public class PreparedStatementInstrumentation implements TypeInstrumentation {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.This PreparedStatement statement,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelScope") Scope scope,
@Advice.Local("otelCallDepth") CallDepth callDepth) {
callDepth = tracer().getCallDepth();
if (callDepth.getAndIncrement() == 0) {
span = tracer().startSpan(statement);
if (span != null) {
scope = tracer().startScope(span);
}
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
Context parentContext = currentContext();
if (!tracer().shouldStartSpan(parentContext)) {
return;
}
context = tracer().startSpan(parentContext, statement);
scope = context.makeCurrent();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelScope") Scope scope,
@Advice.Local("otelCallDepth") CallDepth callDepth) {
if (callDepth.decrementAndGet() == 0 && scope != null) {
scope.close();
if (throwable == null) {
tracer().end(span);
} else {
tracer().endExceptionally(span, throwable);
}
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
if (scope == null) {
return;
}
scope.close();
if (throwable == null) {
tracer().end(context);
} else {
tracer().endExceptionally(context, throwable);
}
}
}

View File

@ -5,6 +5,7 @@
package io.opentelemetry.javaagent.instrumentation.jdbc;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.jdbc.JdbcTracer.tracer;
import static io.opentelemetry.javaagent.tooling.ClassLoaderMatcher.hasClassesNamed;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface;
@ -14,9 +15,8 @@ import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.api.CallDepth;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.sql.Statement;
import java.util.Map;
@ -50,32 +50,31 @@ public class StatementInstrumentation implements TypeInstrumentation {
public static void onEnter(
@Advice.Argument(0) String sql,
@Advice.This Statement statement,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelScope") Scope scope,
@Advice.Local("otelCallDepth") CallDepth callDepth) {
callDepth = tracer().getCallDepth();
if (callDepth.getAndIncrement() == 0) {
span = tracer().startSpan(statement, sql);
if (span != null) {
scope = tracer().startScope(span);
}
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
Context parentContext = currentContext();
if (!tracer().shouldStartSpan(parentContext)) {
return;
}
context = tracer().startSpan(parentContext, statement, sql);
scope = context.makeCurrent();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelScope") Scope scope,
@Advice.Local("otelCallDepth") CallDepth callDepth) {
if (callDepth.decrementAndGet() == 0 && scope != null) {
scope.close();
if (throwable == null) {
tracer().end(span);
} else {
tracer().endExceptionally(span, throwable);
}
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
if (scope == null) {
return;
}
scope.close();
if (throwable == null) {
tracer().end(context);
} else {
tracer().endExceptionally(context, throwable);
}
}
}

View File

@ -5,6 +5,7 @@
package io.opentelemetry.javaagent.instrumentation.jedis.v1_4;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.jedis.v1_4.JedisClientTracer.tracer;
import static io.opentelemetry.javaagent.tooling.ClassLoaderMatcher.hasClassesNamed;
import static java.util.Collections.singletonList;
@ -16,9 +17,8 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.api.CallDepthThreadLocalMap;
import io.opentelemetry.javaagent.instrumentation.jedis.v1_4.JedisClientTracer.CommandWithArgs;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
@ -84,32 +84,31 @@ public class JedisInstrumentationModule extends InstrumentationModule {
public static void onEnter(
@Advice.This Connection connection,
@Advice.Argument(0) Command command,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
int callDepth = CallDepthThreadLocalMap.incrementCallDepth(Connection.class);
if (callDepth > 0) {
Context parentContext = currentContext();
if (!tracer().shouldStartSpan(parentContext)) {
return;
}
span = tracer().startSpan(connection, new CommandWithArgs(command));
scope = tracer().startScope(span);
context = tracer().startSpan(parentContext, connection, new CommandWithArgs(command));
scope = context.makeCurrent();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
if (scope == null) {
return;
}
scope.close();
CallDepthThreadLocalMap.reset(Connection.class);
if (throwable != null) {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
} else {
tracer().end(span);
tracer().end(context);
}
}
}
@ -121,32 +120,31 @@ public class JedisInstrumentationModule extends InstrumentationModule {
@Advice.This Connection connection,
@Advice.Argument(0) Command command,
@Advice.Argument(1) byte[][] args,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
int callDepth = CallDepthThreadLocalMap.incrementCallDepth(Connection.class);
if (callDepth > 0) {
Context parentContext = currentContext();
if (!tracer().shouldStartSpan(parentContext)) {
return;
}
span = tracer().startSpan(connection, new CommandWithArgs(command, args));
scope = tracer().startScope(span);
context = tracer().startSpan(parentContext, connection, new CommandWithArgs(command, args));
scope = context.makeCurrent();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
if (scope == null) {
return;
}
scope.close();
CallDepthThreadLocalMap.reset(Connection.class);
scope.close();
if (throwable != null) {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
} else {
tracer().end(span);
tracer().end(context);
}
}
}

View File

@ -5,6 +5,7 @@
package io.opentelemetry.javaagent.instrumentation.jedis.v3_0;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.jedis.v3_0.JedisClientTracer.tracer;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
@ -15,7 +16,7 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.jedis.v3_0.JedisClientTracer.CommandWithArgs;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
@ -68,16 +69,17 @@ public class JedisInstrumentationModule extends InstrumentationModule {
@Advice.This Connection connection,
@Advice.Argument(0) ProtocolCommand command,
@Advice.Argument(1) byte[][] args,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
span = tracer().startSpan(connection, new CommandWithArgs(command, args));
scope = tracer().startScope(span);
context =
tracer().startSpan(currentContext(), connection, new CommandWithArgs(command, args));
scope = context.makeCurrent();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
if (scope == null) {
return;
@ -85,9 +87,9 @@ public class JedisInstrumentationModule extends InstrumentationModule {
scope.close();
if (throwable != null) {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
} else {
tracer().end(span);
tracer().end(context);
}
}
}

View File

@ -15,6 +15,7 @@ import com.lambdaworks.redis.protocol.CommandType;
import com.lambdaworks.redis.protocol.ProtocolKeyword;
import com.lambdaworks.redis.protocol.RedisCommand;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.CancellationException;
@ -25,27 +26,28 @@ public final class InstrumentationPoints {
public static void afterCommand(
RedisCommand<?, ?, ?> command,
Span span,
Context context,
Throwable throwable,
AsyncCommand<?, ?, ?> asyncCommand) {
if (throwable != null) {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
} else if (expectsResponse(command)) {
asyncCommand.handleAsync(
(value, ex) -> {
if (ex == null) {
tracer().end(span);
tracer().end(context);
} else if (ex instanceof CancellationException) {
Span span = Span.fromContext(context);
span.setAttribute("lettuce.command.cancelled", true);
tracer().end(span);
tracer().end(context);
} else {
tracer().endExceptionally(span, ex);
tracer().endExceptionally(context, ex);
}
return null;
});
} else {
// No response is expected, so we must finish the span now.
tracer().end(span);
tracer().end(context);
}
}

View File

@ -5,11 +5,12 @@
package io.opentelemetry.javaagent.instrumentation.lettuce.v4_0;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.lettuce.v4_0.LettuceDatabaseClientTracer.tracer;
import com.lambdaworks.redis.protocol.AsyncCommand;
import com.lambdaworks.redis.protocol.RedisCommand;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import net.bytebuddy.asm.Advice;
@ -18,10 +19,10 @@ public class LettuceAsyncCommandsAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(0) RedisCommand<?, ?, ?> command,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
span = tracer().startSpan(null, command);
scope = tracer().startScope(span);
context = tracer().startSpan(currentContext(), null, command);
scope = context.makeCurrent();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
@ -29,9 +30,9 @@ public class LettuceAsyncCommandsAdvice {
@Advice.Argument(0) RedisCommand<?, ?, ?> command,
@Advice.Thrown Throwable throwable,
@Advice.Return AsyncCommand<?, ?, ?> asyncCommand,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
scope.close();
InstrumentationPoints.afterCommand(command, span, throwable, asyncCommand);
InstrumentationPoints.afterCommand(command, context, throwable, asyncCommand);
}
}

View File

@ -5,10 +5,11 @@
package io.opentelemetry.javaagent.instrumentation.lettuce.v4_0;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.lettuce.v4_0.LettuceConnectionDatabaseClientTracer.tracer;
import com.lambdaworks.redis.RedisURI;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import net.bytebuddy.asm.Advice;
@ -17,22 +18,22 @@ public class RedisConnectionAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(1) RedisURI redisUri,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
span = tracer().startSpan(redisUri, "CONNECT");
scope = tracer().startScope(span);
context = tracer().startSpan(currentContext(), redisUri, "CONNECT");
scope = context.makeCurrent();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
scope.close();
if (throwable != null) {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
} else {
tracer().end(span);
tracer().end(context);
}
}
}

View File

@ -5,11 +5,12 @@
package io.opentelemetry.javaagent.instrumentation.lettuce.v5_0;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.lettuce.v5_0.LettuceConnectionDatabaseClientTracer.tracer;
import io.lettuce.core.ConnectionFuture;
import io.lettuce.core.RedisURI;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import net.bytebuddy.asm.Advice;
@ -18,24 +19,24 @@ public class ConnectionFutureAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(1) RedisURI redisUri,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
span = tracer().startSpan(redisUri, "CONNECT");
scope = tracer().startScope(span);
context = tracer().startSpan(currentContext(), redisUri, "CONNECT");
scope = context.makeCurrent();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Return ConnectionFuture<?> connectionFuture,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
scope.close();
if (throwable != null) {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
return;
}
connectionFuture.handleAsync(new LettuceAsyncBiFunction<>(span));
connectionFuture.handleAsync(new LettuceAsyncBiFunction<>(context));
}
}

View File

@ -8,6 +8,7 @@ package io.opentelemetry.javaagent.instrumentation.lettuce.v5_0;
import static io.opentelemetry.javaagent.instrumentation.lettuce.v5_0.LettuceDatabaseClientTracer.tracer;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import java.util.concurrent.CancellationException;
import java.util.function.BiFunction;
@ -23,19 +24,20 @@ import java.util.function.BiFunction;
public class LettuceAsyncBiFunction<T, U extends Throwable, R>
implements BiFunction<T, Throwable, R> {
private final Span span;
private final Context context;
public LettuceAsyncBiFunction(Span span) {
this.span = span;
public LettuceAsyncBiFunction(Context context) {
this.context = context;
}
@Override
public R apply(T t, Throwable throwable) {
if (throwable instanceof CancellationException) {
Span span = Span.fromContext(context);
span.setAttribute("lettuce.command.cancelled", true);
tracer().end(span);
tracer().end(context);
} else {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
}
return null;
}

View File

@ -5,12 +5,13 @@
package io.opentelemetry.javaagent.instrumentation.lettuce.v5_0;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.lettuce.v5_0.LettuceDatabaseClientTracer.tracer;
import static io.opentelemetry.javaagent.instrumentation.lettuce.v5_0.LettuceInstrumentationUtil.expectsResponse;
import io.lettuce.core.protocol.AsyncCommand;
import io.lettuce.core.protocol.RedisCommand;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import net.bytebuddy.asm.Advice;
@ -19,11 +20,11 @@ public class LettuceAsyncCommandsAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(0) RedisCommand<?, ?, ?> command,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
span = tracer().startSpan(null, command);
scope = tracer().startScope(span);
context = tracer().startSpan(currentContext(), null, command);
scope = context.makeCurrent();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
@ -31,20 +32,20 @@ public class LettuceAsyncCommandsAdvice {
@Advice.Argument(0) RedisCommand<?, ?, ?> command,
@Advice.Thrown Throwable throwable,
@Advice.Return AsyncCommand<?, ?, ?> asyncCommand,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
scope.close();
if (throwable != null) {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
return;
}
// close spans on error or normal completion
if (expectsResponse(command)) {
asyncCommand.handleAsync(new LettuceAsyncBiFunction<>(span));
asyncCommand.handleAsync(new LettuceAsyncBiFunction<>(context));
} else {
tracer().end(span);
tracer().end(context);
}
}
}

View File

@ -25,14 +25,14 @@ public class LettuceFluxCreationAdvice {
public static void monitorSpan(
@Advice.Enter RedisCommand command, @Advice.Return(readOnly = false) Flux<?> publisher) {
boolean finishSpanOnClose = !expectsResponse(command);
boolean expectsResponse = expectsResponse(command);
LettuceFluxTerminationRunnable handler =
new LettuceFluxTerminationRunnable(command, finishSpanOnClose);
new LettuceFluxTerminationRunnable(command, expectsResponse);
publisher = publisher.doOnSubscribe(handler.getOnSubscribeConsumer());
// don't register extra callbacks to finish the spans if the command being instrumented is one
// of those that return
// Mono<Void> (In here a flux is created first and then converted to Mono<Void>)
if (!finishSpanOnClose) {
if (expectsResponse) {
publisher = publisher.doOnEach(handler);
publisher = publisher.doOnCancel(handler);
}

View File

@ -9,6 +9,7 @@ import static io.opentelemetry.javaagent.instrumentation.lettuce.v5_0.LettuceDat
import io.lettuce.core.protocol.RedisCommand;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;
import org.slf4j.LoggerFactory;
@ -18,12 +19,12 @@ import reactor.core.publisher.SignalType;
public class LettuceFluxTerminationRunnable implements Consumer<Signal<?>>, Runnable {
private Span span = null;
private int numResults = 0;
private Context context;
private int numResults;
private final FluxOnSubscribeConsumer onSubscribeConsumer;
public LettuceFluxTerminationRunnable(RedisCommand<?, ?, ?> command, boolean finishSpanOnClose) {
onSubscribeConsumer = new FluxOnSubscribeConsumer(this, command, finishSpanOnClose);
public LettuceFluxTerminationRunnable(RedisCommand<?, ?, ?> command, boolean expectsResponse) {
onSubscribeConsumer = new FluxOnSubscribeConsumer(this, command, expectsResponse);
}
public FluxOnSubscribeConsumer getOnSubscribeConsumer() {
@ -31,7 +32,8 @@ public class LettuceFluxTerminationRunnable implements Consumer<Signal<?>>, Runn
}
private void finishSpan(boolean isCommandCancelled, Throwable throwable) {
if (span != null) {
if (context != null) {
Span span = Span.fromContext(context);
span.setAttribute("lettuce.command.results.count", numResults);
if (isCommandCancelled) {
span.setAttribute("lettuce.command.cancelled", true);
@ -68,22 +70,22 @@ public class LettuceFluxTerminationRunnable implements Consumer<Signal<?>>, Runn
private final LettuceFluxTerminationRunnable owner;
private final RedisCommand<?, ?, ?> command;
private final boolean finishSpanOnClose;
private final boolean expectsResponse;
public FluxOnSubscribeConsumer(
LettuceFluxTerminationRunnable owner,
RedisCommand<?, ?, ?> command,
boolean finishSpanOnClose) {
boolean expectsResponse) {
this.owner = owner;
this.command = command;
this.finishSpanOnClose = finishSpanOnClose;
this.expectsResponse = expectsResponse;
}
@Override
public void accept(Subscription subscription) {
owner.span = tracer().startSpan(null, command);
if (finishSpanOnClose) {
tracer().end(owner.span);
owner.context = tracer().startSpan(Context.current(), null, command);
if (!expectsResponse) {
tracer().end(owner.context);
}
}
}

View File

@ -8,7 +8,7 @@ package io.opentelemetry.javaagent.instrumentation.lettuce.v5_0.rx;
import static io.opentelemetry.javaagent.instrumentation.lettuce.v5_0.LettuceDatabaseClientTracer.tracer;
import io.lettuce.core.protocol.RedisCommand;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.slf4j.LoggerFactory;
@ -16,7 +16,7 @@ import reactor.core.publisher.Mono;
public class LettuceMonoDualConsumer<R, T> implements Consumer<R>, BiConsumer<T, Throwable> {
private Span span = null;
private Context context;
private final RedisCommand<?, ?, ?> command;
private final boolean finishSpanOnClose;
@ -27,19 +27,19 @@ public class LettuceMonoDualConsumer<R, T> implements Consumer<R>, BiConsumer<T,
@Override
public void accept(R r) {
span = tracer().startSpan(null, command);
context = tracer().startSpan(Context.current(), null, command);
if (finishSpanOnClose) {
tracer().end(span);
tracer().end(context);
}
}
@Override
public void accept(T t, Throwable throwable) {
if (span != null) {
if (context != null) {
if (throwable == null) {
tracer().end(span);
tracer().end(context);
} else {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
}
} else {
LoggerFactory.getLogger(Mono.class)

View File

@ -11,33 +11,33 @@ import com.mongodb.event.CommandFailedEvent;
import com.mongodb.event.CommandListener;
import com.mongodb.event.CommandStartedEvent;
import com.mongodb.event.CommandSucceededEvent;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class TracingCommandListener implements CommandListener {
private final Map<Integer, Span> spanMap = new ConcurrentHashMap<>();
private final Map<Integer, Context> contextMap = new ConcurrentHashMap<>();
@Override
public void commandStarted(CommandStartedEvent event) {
Span span = tracer().startSpan(event, event.getCommand());
spanMap.put(event.getRequestId(), span);
Context context = tracer().startSpan(Context.current(), event, event.getCommand());
contextMap.put(event.getRequestId(), context);
}
@Override
public void commandSucceeded(CommandSucceededEvent event) {
Span span = spanMap.remove(event.getRequestId());
if (span != null) {
tracer().end(span);
Context context = contextMap.remove(event.getRequestId());
if (context != null) {
tracer().end(context);
}
}
@Override
public void commandFailed(CommandFailedEvent event) {
Span span = spanMap.remove(event.getRequestId());
if (span != null) {
tracer().endExceptionally(span, event.getThrowable());
Context context = contextMap.remove(event.getRequestId());
if (context != null) {
tracer().endExceptionally(context, event.getThrowable());
}
}
}

View File

@ -5,6 +5,7 @@
package io.opentelemetry.javaagent.instrumentation.rediscala;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.rediscala.RediscalaClientTracer.tracer;
import static io.opentelemetry.javaagent.tooling.ClassLoaderMatcher.hasClassesNamed;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.safeHasSuperType;
@ -18,7 +19,7 @@ import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
@ -79,15 +80,15 @@ public class RediscalaInstrumentationModule extends InstrumentationModule {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(0) RedisCommand<?, ?> cmd,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
span = tracer().startSpan(cmd, cmd);
scope = tracer().startScope(span);
context = tracer().startSpan(currentContext(), cmd, cmd);
scope = context.makeCurrent();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope,
@Advice.Thrown Throwable throwable,
@Advice.FieldValue("executionContext") ExecutionContext ctx,
@ -95,26 +96,26 @@ public class RediscalaInstrumentationModule extends InstrumentationModule {
scope.close();
if (throwable != null) {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
} else {
responseFuture.onComplete(new OnCompleteHandler(span), ctx);
responseFuture.onComplete(new OnCompleteHandler(context), ctx);
}
}
}
public static class OnCompleteHandler extends AbstractFunction1<Try<Object>, Void> {
private final Span span;
private final Context context;
public OnCompleteHandler(Span span) {
this.span = span;
public OnCompleteHandler(Context context) {
this.context = context;
}
@Override
public Void apply(Try<Object> result) {
if (result.isFailure()) {
tracer().endExceptionally(span, result.failed().get());
tracer().endExceptionally(context, result.failed().get());
} else {
tracer().end(span);
tracer().end(context);
}
return null;
}

View File

@ -5,6 +5,7 @@
package io.opentelemetry.javaagent.instrumentation.redisson;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.redisson.RedissonClientTracer.tracer;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
@ -12,7 +13,7 @@ import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import com.google.auto.service.AutoService;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
@ -56,23 +57,23 @@ public class RedissonInstrumentation extends InstrumentationModule {
public static void onEnter(
@Advice.This RedisConnection connection,
@Advice.Argument(0) Object arg,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
span = tracer().startSpan(connection, arg);
scope = tracer().startScope(span);
context = tracer().startSpan(currentContext(), connection, arg);
scope = context.makeCurrent();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
scope.close();
if (throwable != null) {
tracer().endExceptionally(span, throwable);
tracer().endExceptionally(context, throwable);
} else {
tracer().end(span);
tracer().end(context);
}
}
}

View File

@ -6,6 +6,7 @@
package io.opentelemetry.javaagent.instrumentation.spymemcached;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import java.util.concurrent.ExecutionException;
import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.internal.BulkGetFuture;
@ -13,8 +14,9 @@ import net.spy.memcached.internal.BulkGetFuture;
public class BulkGetCompletionListener extends CompletionListener<BulkGetFuture<?>>
implements net.spy.memcached.internal.BulkGetCompletionListener {
public BulkGetCompletionListener(MemcachedConnection connection, String methodName) {
super(connection, methodName);
public BulkGetCompletionListener(
Context parentContext, MemcachedConnection connection, String methodName) {
super(parentContext, connection, methodName);
}
@Override

View File

@ -8,6 +8,7 @@ package io.opentelemetry.javaagent.instrumentation.spymemcached;
import static io.opentelemetry.javaagent.instrumentation.spymemcached.MemcacheClientTracer.tracer;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import net.spy.memcached.MemcachedConnection;
@ -19,13 +20,15 @@ public abstract class CompletionListener<T> {
static final String HIT = "hit";
static final String MISS = "miss";
private final Span span;
private final Context context;
public CompletionListener(MemcachedConnection connection, String methodName) {
span = tracer().startSpan(connection, methodName);
public CompletionListener(
Context parentContext, MemcachedConnection connection, String methodName) {
context = tracer().startSpan(parentContext, connection, methodName);
}
protected void closeAsyncSpan(T future) {
Span span = Span.fromContext(context);
try {
processResult(span, future);
} catch (CancellationException e) {
@ -51,7 +54,7 @@ public abstract class CompletionListener<T> {
}
protected void closeSyncSpan(Throwable thrown) {
tracer().endExceptionally(span, thrown);
tracer().endExceptionally(context, thrown);
}
protected abstract void processResult(Span span, T future)

View File

@ -6,14 +6,16 @@
package io.opentelemetry.javaagent.instrumentation.spymemcached;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import java.util.concurrent.ExecutionException;
import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.internal.GetFuture;
public class GetCompletionListener extends CompletionListener<GetFuture<?>>
implements net.spy.memcached.internal.GetCompletionListener {
public GetCompletionListener(MemcachedConnection connection, String methodName) {
super(connection, methodName);
public GetCompletionListener(
Context parentContext, MemcachedConnection connection, String methodName) {
super(parentContext, connection, methodName);
}
@Override

View File

@ -6,6 +6,7 @@
package io.opentelemetry.javaagent.instrumentation.spymemcached;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import java.util.concurrent.ExecutionException;
import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.internal.OperationFuture;
@ -13,8 +14,9 @@ import net.spy.memcached.internal.OperationFuture;
public class OperationCompletionListener
extends CompletionListener<OperationFuture<? extends Object>>
implements net.spy.memcached.internal.OperationCompletionListener {
public OperationCompletionListener(MemcachedConnection connection, String methodName) {
super(connection, methodName);
public OperationCompletionListener(
Context parentContext, MemcachedConnection connection, String methodName) {
super(parentContext, connection, methodName);
}
@Override

View File

@ -5,6 +5,7 @@
package io.opentelemetry.javaagent.instrumentation.spymemcached;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.tooling.matcher.NameMatchers.namedOneOf;
import static java.util.Collections.singletonList;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
@ -14,6 +15,7 @@ import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.instrumentation.api.CallDepth;
import io.opentelemetry.javaagent.instrumentation.api.CallDepthThreadLocalMap;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
@ -76,24 +78,24 @@ public class SpymemcachedInstrumentationModule extends InstrumentationModule {
public static class AsyncOperationAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static int trackCallDepth() {
return CallDepthThreadLocalMap.incrementCallDepth(MemcachedClient.class);
public static void trackCallDepth(@Advice.Local("otelCallDepth") CallDepth callDepth) {
callDepth = CallDepthThreadLocalMap.getCallDepth(MemcachedClient.class);
callDepth.getAndIncrement();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void methodExit(
@Advice.Enter int callDepth,
@Advice.This MemcachedClient client,
@Advice.Origin("#m") String methodName,
@Advice.Return OperationFuture<?> future) {
if (callDepth > 0) {
@Advice.Return OperationFuture<?> future,
@Advice.Local("otelCallDepth") CallDepth callDepth) {
if (callDepth.decrementAndGet() > 0) {
return;
}
CallDepthThreadLocalMap.reset(MemcachedClient.class);
if (future != null) {
OperationCompletionListener listener =
new OperationCompletionListener(client.getConnection(), methodName);
new OperationCompletionListener(currentContext(), client.getConnection(), methodName);
future.addListener(listener);
}
}
@ -102,24 +104,24 @@ public class SpymemcachedInstrumentationModule extends InstrumentationModule {
public static class AsyncGetAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static int trackCallDepth() {
return CallDepthThreadLocalMap.incrementCallDepth(MemcachedClient.class);
public static void trackCallDepth(@Advice.Local("otelCallDepth") CallDepth callDepth) {
callDepth = CallDepthThreadLocalMap.getCallDepth(MemcachedClient.class);
callDepth.getAndIncrement();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void methodExit(
@Advice.Enter int callDepth,
@Advice.This MemcachedClient client,
@Advice.Origin("#m") String methodName,
@Advice.Return GetFuture<?> future) {
if (callDepth > 0) {
@Advice.Return GetFuture<?> future,
@Advice.Local("otelCallDepth") CallDepth callDepth) {
if (callDepth.decrementAndGet() > 0) {
return;
}
CallDepthThreadLocalMap.reset(MemcachedClient.class);
if (future != null) {
GetCompletionListener listener =
new GetCompletionListener(client.getConnection(), methodName);
new GetCompletionListener(currentContext(), client.getConnection(), methodName);
future.addListener(listener);
}
}
@ -128,24 +130,24 @@ public class SpymemcachedInstrumentationModule extends InstrumentationModule {
public static class AsyncBulkAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static int trackCallDepth() {
return CallDepthThreadLocalMap.incrementCallDepth(MemcachedClient.class);
public static void trackCallDepth(@Advice.Local("otelCallDepth") CallDepth callDepth) {
callDepth = CallDepthThreadLocalMap.getCallDepth(MemcachedClient.class);
callDepth.getAndIncrement();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void methodExit(
@Advice.Enter int callDepth,
@Advice.This MemcachedClient client,
@Advice.Origin("#m") String methodName,
@Advice.Return BulkFuture<?> future) {
if (callDepth > 0) {
@Advice.Return BulkFuture<?> future,
@Advice.Local("otelCallDepth") CallDepth callDepth) {
if (callDepth.decrementAndGet() > 0) {
return;
}
CallDepthThreadLocalMap.reset(MemcachedClient.class);
if (future != null) {
BulkGetCompletionListener listener =
new BulkGetCompletionListener(client.getConnection(), methodName);
new BulkGetCompletionListener(currentContext(), client.getConnection(), methodName);
future.addListener(listener);
}
}
@ -155,22 +157,25 @@ public class SpymemcachedInstrumentationModule extends InstrumentationModule {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static SyncCompletionListener methodEnter(
@Advice.This MemcachedClient client, @Advice.Origin("#m") String methodName) {
int callDepth = CallDepthThreadLocalMap.incrementCallDepth(MemcachedClient.class);
if (callDepth > 0) {
@Advice.This MemcachedClient client,
@Advice.Origin("#m") String methodName,
@Advice.Local("otelCallDepth") CallDepth callDepth) {
callDepth = CallDepthThreadLocalMap.getCallDepth(MemcachedClient.class);
if (callDepth.getAndIncrement() > 0) {
return null;
}
return new SyncCompletionListener(client.getConnection(), methodName);
return new SyncCompletionListener(currentContext(), client.getConnection(), methodName);
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void methodExit(
@Advice.Enter SyncCompletionListener listener, @Advice.Thrown Throwable thrown) {
if (listener == null) {
@Advice.Enter SyncCompletionListener listener,
@Advice.Thrown Throwable thrown,
@Advice.Local("otelCallDepth") CallDepth callDepth) {
if (callDepth.decrementAndGet() > 0) {
return;
}
CallDepthThreadLocalMap.reset(MemcachedClient.class);
listener.done(thrown);
}

View File

@ -6,6 +6,7 @@
package io.opentelemetry.javaagent.instrumentation.spymemcached;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import net.spy.memcached.MemcachedConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -14,8 +15,9 @@ public class SyncCompletionListener extends CompletionListener<Void> {
private static final Logger log = LoggerFactory.getLogger(SyncCompletionListener.class);
public SyncCompletionListener(MemcachedConnection connection, String methodName) {
super(connection, methodName);
public SyncCompletionListener(
Context parentContext, MemcachedConnection connection, String methodName) {
super(parentContext, connection, methodName);
}
@Override