Migrate gRPC to Instrumenter API (#3073)

* Migrate gRPC to Instrumenter API

* Don't unwrap grpc exception

* Fix merge and cleanup
This commit is contained in:
Anuraag Agrawal 2021-05-27 15:40:53 +09:00 committed by GitHub
parent df0d028a32
commit acedbff78d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 545 additions and 259 deletions

View File

@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.api.instrumenter.rpc;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
* Extractor of <a
* href="https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/rpc.md">RPC
* attributes</a>. Instrumentations of RPC libraries should extend this class, defining {@link
* REQUEST} with the actual request type of the instrumented library. If an attribute is not
* available in this library, it is appropriate to return {@code null} from the protected attribute
* methods, but implement as many as possible for best compliance with the OpenTelemetry
* specification.
*/
public abstract class RpcAttributesExtractor<REQUEST, RESPONSE>
extends AttributesExtractor<REQUEST, RESPONSE> {
@Override
protected final void onStart(AttributesBuilder attributes, REQUEST request) {
set(attributes, SemanticAttributes.RPC_SYSTEM, system(request));
set(attributes, SemanticAttributes.RPC_SERVICE, service(request));
set(attributes, SemanticAttributes.RPC_METHOD, method(request));
}
@Override
protected final void onEnd(AttributesBuilder attributes, REQUEST request, RESPONSE response) {
// No response attributes
}
@Nullable
protected abstract String system(REQUEST request);
@Nullable
protected abstract String service(REQUEST request);
@Nullable
protected abstract String method(REQUEST request);
}

View File

@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.api.instrumenter.rpc;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
/** A {@link SpanNameExtractor} for RPC requests. */
public final class RpcSpanNameExtractor<REQUEST> implements SpanNameExtractor<REQUEST> {
/**
* Returns a {@link SpanNameExtractor} that constructs the span name according to RPC semantic
* conventions: {@code <rpc.service>/<rpc.method>}.
*/
public static <REQUEST> SpanNameExtractor<REQUEST> create(
RpcAttributesExtractor<REQUEST, ?> attributesExtractor) {
return new RpcSpanNameExtractor<>(attributesExtractor);
}
private final RpcAttributesExtractor<REQUEST, ?> attributesExtractor;
private RpcSpanNameExtractor(RpcAttributesExtractor<REQUEST, ?> attributesExtractor) {
this.attributesExtractor = attributesExtractor;
}
@Override
public String extract(REQUEST request) {
String service = attributesExtractor.service(request);
String method = attributesExtractor.method(request);
if (service == null || method == null) {
return "RPC request";
}
return service + '/' + method;
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.api.instrumenter.rpc;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.Test;
class RpcAttributesExtractorTest {
static class TestExtractor extends RpcAttributesExtractor<Map<String, String>, Void> {
@Override
protected String system(Map<String, String> request) {
return "test";
}
@Override
protected String service(Map<String, String> request) {
return request.get("service");
}
@Override
protected String method(Map<String, String> request) {
return request.get("method");
}
}
@Test
void normal() {
Map<String, String> request = new HashMap<>();
request.put("service", "my.Service");
request.put("method", "Method");
TestExtractor extractor = new TestExtractor();
AttributesBuilder attributes = Attributes.builder();
extractor.onStart(attributes, request);
assertThat(attributes.build())
.containsOnly(
entry(SemanticAttributes.RPC_SYSTEM, "test"),
entry(SemanticAttributes.RPC_SERVICE, "my.Service"),
entry(SemanticAttributes.RPC_METHOD, "Method"));
extractor.onEnd(attributes, request, null);
assertThat(attributes.build())
.containsOnly(
entry(SemanticAttributes.RPC_SYSTEM, "test"),
entry(SemanticAttributes.RPC_SERVICE, "my.Service"),
entry(SemanticAttributes.RPC_METHOD, "Method"));
}
}

View File

@ -0,0 +1,54 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.api.instrumenter.rpc;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class RpcSpanNameExtractorTest {
@Mock RpcAttributesExtractor<RpcRequest, Void> attributesExtractor;
@Test
void normal() {
RpcRequest request = new RpcRequest();
when(attributesExtractor.service(request)).thenReturn("my.Service");
when(attributesExtractor.method(request)).thenReturn("Method");
SpanNameExtractor<RpcRequest> extractor = RpcSpanNameExtractor.create(attributesExtractor);
assertThat(extractor.extract(request)).isEqualTo("my.Service/Method");
}
@Test
void serviceNull() {
RpcRequest request = new RpcRequest();
when(attributesExtractor.method(request)).thenReturn("Method");
SpanNameExtractor<RpcRequest> extractor = RpcSpanNameExtractor.create(attributesExtractor);
assertThat(extractor.extract(request)).isEqualTo("RPC request");
}
@Test
void methodNull() {
RpcRequest request = new RpcRequest();
when(attributesExtractor.service(request)).thenReturn("my.Service");
SpanNameExtractor<RpcRequest> extractor = RpcSpanNameExtractor.create(attributesExtractor);
assertThat(extractor.extract(request)).isEqualTo("RPC request");
}
static class RpcRequest {}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.grpc.v1_6;
import io.grpc.Status;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import org.checkerframework.checker.nullness.qual.Nullable;
final class GrpcAttributesExtractor extends AttributesExtractor<GrpcRequest, Status> {
@Override
protected void onStart(AttributesBuilder attributes, GrpcRequest grpcRequest) {
// No request attributes
}
@Override
protected void onEnd(
AttributesBuilder attributes, GrpcRequest grpcRequest, @Nullable Status status) {
if (status != null) {
attributes.put(SemanticAttributes.RPC_GRPC_STATUS_CODE, status.getCode().value());
}
}
}

View File

@ -1,62 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.grpc.v1_6;
import static io.opentelemetry.api.trace.SpanKind.CLIENT;
import io.grpc.Status;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.RpcClientTracer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
final class GrpcClientTracer extends RpcClientTracer {
GrpcClientTracer(OpenTelemetry openTelemetry) {
super(openTelemetry);
}
@Override
protected String getRpcSystem() {
return "grpc";
}
public Context startSpan(String name) {
Context parentContext = Context.current();
Span span =
spanBuilder(parentContext, name, CLIENT)
.setAttribute(SemanticAttributes.RPC_SYSTEM, getRpcSystem())
.startSpan();
// TODO: withClientSpan()
return parentContext.with(span);
}
public void end(Context context, Status status) {
StatusCode statusCode = GrpcHelper.statusFromGrpcStatus(status);
if (statusCode != StatusCode.UNSET) {
Span.fromContext(context).setStatus(statusCode, status.getDescription());
}
end(context);
}
@Override
public void onException(Context context, Throwable throwable) {
Status grpcStatus = Status.fromThrowable(throwable);
Span span = Span.fromContext(context);
StatusCode statusCode = GrpcHelper.statusFromGrpcStatus(grpcStatus);
if (statusCode != StatusCode.UNSET) {
span.setStatus(statusCode, grpcStatus.getDescription());
}
span.recordException(unwrapThrowable(grpcStatus.getCause()));
}
@Override
protected String getInstrumentationName() {
return "io.opentelemetry.javaagent.grpc-1.5";
}
}

View File

@ -7,18 +7,23 @@ package io.opentelemetry.instrumentation.grpc.v1_6;
import io.grpc.Metadata;
import io.opentelemetry.context.propagation.TextMapGetter;
import org.checkerframework.checker.nullness.qual.Nullable;
final class GrpcExtractAdapter implements TextMapGetter<Metadata> {
final class GrpcExtractAdapter implements TextMapGetter<GrpcRequest> {
static final GrpcExtractAdapter GETTER = new GrpcExtractAdapter();
@Override
public Iterable<String> keys(Metadata metadata) {
return metadata.keys();
public Iterable<String> keys(GrpcRequest request) {
return request.getMetadata().keys();
}
@Override
public String get(Metadata carrier, String key) {
return carrier.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
@Nullable
public String get(@Nullable GrpcRequest request, String key) {
if (request == null) {
return null;
}
return request.getMetadata().get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
}
}

View File

@ -5,36 +5,12 @@
package io.opentelemetry.instrumentation.grpc.v1_6;
import io.grpc.Status.Code;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
final class GrpcHelper {
public static final AttributeKey<String> MESSAGE_TYPE = AttributeKey.stringKey("message.type");
public static final AttributeKey<Long> MESSAGE_ID = AttributeKey.longKey("message.id");
public static void prepareSpan(Span span, String fullMethodName) {
int slash = fullMethodName.indexOf('/');
String serviceName = slash == -1 ? fullMethodName : fullMethodName.substring(0, slash);
String methodName = slash == -1 ? null : fullMethodName.substring(slash + 1);
span.setAttribute(SemanticAttributes.RPC_SERVICE, serviceName);
if (methodName != null) {
span.setAttribute(SemanticAttributes.RPC_METHOD, methodName);
}
}
public static StatusCode statusFromGrpcStatus(io.grpc.Status grpcStatus) {
return codeFromGrpcCode(grpcStatus.getCode());
}
private static StatusCode codeFromGrpcCode(Code grpcCode) {
return grpcCode.equals(Code.OK) ? StatusCode.UNSET : StatusCode.ERROR;
}
static final AttributeKey<String> MESSAGE_TYPE = AttributeKey.stringKey("message.type");
static final AttributeKey<Long> MESSAGE_ID = AttributeKey.longKey("message.id");
private GrpcHelper() {}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.grpc.v1_6;
import io.grpc.Status;
import io.opentelemetry.instrumentation.api.instrumenter.net.InetSocketAddressNetAttributesExtractor;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import org.checkerframework.checker.nullness.qual.Nullable;
final class GrpcNetAttributesExtractor
extends InetSocketAddressNetAttributesExtractor<GrpcRequest, Status> {
@Override
@Nullable
public InetSocketAddress getAddress(GrpcRequest grpcRequest, @Nullable Status status) {
SocketAddress address = grpcRequest.getRemoteAddress();
if (address instanceof InetSocketAddress) {
return (InetSocketAddress) address;
}
return null;
}
@Override
@Nullable
public String transport(GrpcRequest grpcRequest) {
return SemanticAttributes.NetTransportValues.IP_TCP;
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.grpc.v1_6;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import java.net.SocketAddress;
import org.checkerframework.checker.nullness.qual.Nullable;
final class GrpcRequest {
private final MethodDescriptor<?, ?> method;
@Nullable private final Metadata metadata;
@Nullable private volatile SocketAddress remoteAddress;
GrpcRequest(
MethodDescriptor<?, ?> method,
@Nullable Metadata metadata,
@Nullable SocketAddress remoteAddress) {
this.method = method;
this.metadata = metadata;
this.remoteAddress = remoteAddress;
}
MethodDescriptor<?, ?> getMethod() {
return method;
}
Metadata getMetadata() {
return metadata;
}
SocketAddress getRemoteAddress() {
return remoteAddress;
}
void setRemoteAddress(SocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.grpc.v1_6;
import io.grpc.Status;
import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcAttributesExtractor;
import org.checkerframework.checker.nullness.qual.Nullable;
final class GrpcRpcAttributesExtractor extends RpcAttributesExtractor<GrpcRequest, Status> {
@Override
protected String system(GrpcRequest request) {
return "grpc";
}
@Override
@Nullable
protected String service(GrpcRequest request) {
String fullMethodName = request.getMethod().getFullMethodName();
int slashIndex = fullMethodName.lastIndexOf('/');
if (slashIndex == -1) {
return null;
}
return fullMethodName.substring(0, slashIndex);
}
@Override
@Nullable
protected String method(GrpcRequest request) {
String fullMethodName = request.getMethod().getFullMethodName();
int slashIndex = fullMethodName.lastIndexOf('/');
if (slashIndex == -1) {
return null;
}
return fullMethodName.substring(slashIndex + 1);
}
}

View File

@ -1,57 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.grpc.v1_6;
import static io.opentelemetry.api.trace.SpanKind.SERVER;
import io.grpc.Metadata;
import io.grpc.Status;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.instrumentation.api.tracer.RpcServerTracer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
final class GrpcServerTracer extends RpcServerTracer<Metadata> {
GrpcServerTracer(OpenTelemetry openTelemetry) {
super(openTelemetry);
}
public Context startSpan(String name, Metadata headers) {
Context parentContext = extract(headers, getGetter());
SpanBuilder spanBuilder = spanBuilder(parentContext, name, SERVER);
spanBuilder.setAttribute(SemanticAttributes.RPC_SYSTEM, "grpc");
// TODO: withServerSpan()
return parentContext.with(spanBuilder.startSpan());
}
public void setStatus(Context context, Status status) {
Span span = Span.fromContext(context);
span.setStatus(GrpcHelper.statusFromGrpcStatus(status), status.getDescription());
if (status.getCause() != null) {
span.recordException(unwrapThrowable(status.getCause()));
}
}
@Override
public void onException(Context context, Throwable throwable) {
Status grpcStatus = Status.fromThrowable(throwable);
setStatus(context, grpcStatus);
}
@Override
protected String getInstrumentationName() {
return "io.opentelemetry.javaagent.grpc-1.5";
}
@Override
protected TextMapGetter<Metadata> getGetter() {
return GrpcExtractAdapter.GETTER;
}
}

View File

@ -0,0 +1,16 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.grpc.v1_6;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
// Small optimization to avoid RpcSpanNameExtractor because gRPC provides the span name directly.
final class GrpcSpanNameExtractor implements SpanNameExtractor<GrpcRequest> {
@Override
public String extract(GrpcRequest grpcRequest) {
return grpcRequest.getMethod().getFullMethodName();
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.grpc.v1_6;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.instrumentation.api.instrumenter.SpanStatusExtractor;
import org.checkerframework.checker.nullness.qual.Nullable;
final class GrpcSpanStatusExtractor implements SpanStatusExtractor<GrpcRequest, Status> {
@Override
public StatusCode extract(GrpcRequest grpcRequest, Status status, @Nullable Throwable error) {
if (status == null) {
if (error instanceof StatusRuntimeException) {
status = ((StatusRuntimeException) error).getStatus();
} else if (error instanceof StatusException) {
status = ((StatusException) error).getStatus();
}
}
if (status != null) {
if (status.isOk()) {
return StatusCode.UNSET;
}
return StatusCode.ERROR;
}
return SpanStatusExtractor.getDefault().extract(grpcRequest, status, error);
}
}

View File

@ -7,7 +7,10 @@ package io.opentelemetry.instrumentation.grpc.v1_6;
import io.grpc.ClientInterceptor;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
/** Entrypoint for tracing gRPC servers or clients. */
public final class GrpcTracing {
@ -22,15 +25,20 @@ public final class GrpcTracing {
return new GrpcTracingBuilder(openTelemetry);
}
private final Instrumenter<GrpcRequest, Status> serverInstrumenter;
private final Instrumenter<GrpcRequest, Status> clientInstrumenter;
private final ContextPropagators propagators;
private final boolean captureExperimentalSpanAttributes;
private final GrpcClientTracer clientTracer;
private final GrpcServerTracer serverTracer;
GrpcTracing(OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
GrpcTracing(
Instrumenter<GrpcRequest, Status> serverInstrumenter,
Instrumenter<GrpcRequest, Status> clientInstrumenter,
ContextPropagators propagators,
boolean captureExperimentalSpanAttributes) {
this.serverInstrumenter = serverInstrumenter;
this.clientInstrumenter = clientInstrumenter;
this.propagators = propagators;
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
clientTracer = new GrpcClientTracer(openTelemetry);
serverTracer = new GrpcServerTracer(openTelemetry);
}
/**
@ -38,7 +46,7 @@ public final class GrpcTracing {
* io.grpc.ManagedChannelBuilder#intercept(ClientInterceptor...)}.
*/
public ClientInterceptor newClientInterceptor() {
return new TracingClientInterceptor(clientTracer);
return new TracingClientInterceptor(clientInstrumenter, propagators);
}
/**
@ -46,6 +54,6 @@ public final class GrpcTracing {
* io.grpc.ServerBuilder#intercept(ServerInterceptor)}.
*/
public ServerInterceptor newServerInterceptor() {
return new TracingServerInterceptor(serverTracer, captureExperimentalSpanAttributes);
return new TracingServerInterceptor(serverInstrumenter, captureExperimentalSpanAttributes);
}
}

View File

@ -5,19 +5,41 @@
package io.opentelemetry.instrumentation.grpc.v1_6;
import io.grpc.Status;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import java.util.ArrayList;
import java.util.List;
/** A builder of {@link GrpcTracing}. */
public final class GrpcTracingBuilder {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.grpc-1.6";
private final OpenTelemetry openTelemetry;
private final List<AttributesExtractor<? super GrpcRequest, ? super Status>>
additionalExtractors = new ArrayList<>();
private boolean captureExperimentalSpanAttributes;
GrpcTracingBuilder(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
}
/**
* Adds an additional {@link AttributesExtractor} to invoke to set attributes to instrumented
* items. The {@link AttributesExtractor} will be executed after all default extractors.
*/
public GrpcTracingBuilder addAttributeExtractor(
AttributesExtractor<? super GrpcRequest, ? super Status> attributesExtractor) {
additionalExtractors.add(attributesExtractor);
return this;
}
/**
* Sets whether experimental attributes should be set to spans. These attributes may be changed or
* removed in the future, so only enable this if you know you do not require attributes filled by
@ -31,6 +53,20 @@ public final class GrpcTracingBuilder {
/** Returns a new {@link GrpcTracing} with the settings of this {@link GrpcTracingBuilder}. */
public GrpcTracing build() {
return new GrpcTracing(openTelemetry, captureExperimentalSpanAttributes);
InstrumenterBuilder<GrpcRequest, Status> instrumenterBuilder =
Instrumenter.newBuilder(openTelemetry, INSTRUMENTATION_NAME, new GrpcSpanNameExtractor());
instrumenterBuilder
.setSpanStatusExtractor(new GrpcSpanStatusExtractor())
.addAttributesExtractors(
new GrpcNetAttributesExtractor(),
new GrpcRpcAttributesExtractor(),
new GrpcAttributesExtractor());
return new GrpcTracing(
instrumenterBuilder.newServerInstrumenter(GrpcExtractAdapter.GETTER),
// gRPC client interceptors require two phases, one to set up request and one to execute.
// So we go ahead and inject manually in this instrumentation.
instrumenterBuilder.newInstrumenter(SpanKindExtractor.alwaysClient()),
openTelemetry.getPropagators(),
captureExperimentalSpanAttributes);
}
}

View File

@ -22,65 +22,64 @@ import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes;
import java.net.InetSocketAddress;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicLong;
final class TracingClientInterceptor implements ClientInterceptor {
private final GrpcClientTracer tracer;
private final Instrumenter<GrpcRequest, Status> instrumenter;
private final ContextPropagators propagators;
TracingClientInterceptor(GrpcClientTracer tracer) {
this.tracer = tracer;
TracingClientInterceptor(
Instrumenter<GrpcRequest, Status> instrumenter, ContextPropagators propagators) {
this.instrumenter = instrumenter;
this.propagators = propagators;
}
@Override
public <REQUEST, RESPONSE> ClientCall<REQUEST, RESPONSE> interceptCall(
MethodDescriptor<REQUEST, RESPONSE> method, CallOptions callOptions, Channel next) {
String methodName = method.getFullMethodName();
Context context = tracer.startSpan(methodName);
Span span = Span.fromContext(context);
GrpcHelper.prepareSpan(span, methodName);
GrpcRequest request = new GrpcRequest(method, null, null);
Context context = instrumenter.start(Context.current(), request);
final ClientCall<REQUEST, RESPONSE> result;
try (Scope ignored = context.makeCurrent()) {
try {
// call other interceptors
result = next.newCall(method, callOptions);
} catch (Throwable e) {
tracer.endExceptionally(context, e);
instrumenter.end(context, request, null, e);
throw e;
}
}
SocketAddress address = result.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
if (address instanceof InetSocketAddress) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) address;
NetPeerAttributes.INSTANCE.setNetPeer(span, inetSocketAddress);
}
request.setRemoteAddress(address);
return new TracingClientCall<>(result, span, context);
return new TracingClientCall<>(result, context, request);
}
final class TracingClientCall<REQUEST, RESPONSE>
extends ForwardingClientCall.SimpleForwardingClientCall<REQUEST, RESPONSE> {
private final Span span;
private final Context context;
private final GrpcRequest request;
TracingClientCall(ClientCall<REQUEST, RESPONSE> delegate, Span span, Context context) {
TracingClientCall(
ClientCall<REQUEST, RESPONSE> delegate, Context context, GrpcRequest request) {
super(delegate);
this.span = span;
this.context = context;
this.request = request;
}
@Override
public void start(Listener<RESPONSE> responseListener, Metadata headers) {
tracer.inject(context, headers, SETTER);
propagators.getTextMapPropagator().inject(context, headers, SETTER);
try (Scope ignored = context.makeCurrent()) {
super.start(new TracingClientCallListener<>(responseListener, context), headers);
super.start(new TracingClientCallListener<>(responseListener, context, request), headers);
} catch (Throwable e) {
tracer.endExceptionally(context, e);
instrumenter.end(context, request, null, e);
throw e;
}
}
@ -90,7 +89,7 @@ final class TracingClientInterceptor implements ClientInterceptor {
try (Scope ignored = context.makeCurrent()) {
super.sendMessage(message);
} catch (Throwable e) {
tracer.endExceptionally(context, e);
instrumenter.end(context, request, null, e);
throw e;
}
}
@ -99,12 +98,14 @@ final class TracingClientInterceptor implements ClientInterceptor {
final class TracingClientCallListener<RESPONSE>
extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RESPONSE> {
private final Context context;
private final GrpcRequest request;
private final AtomicLong messageId = new AtomicLong();
TracingClientCallListener(Listener<RESPONSE> delegate, Context context) {
TracingClientCallListener(Listener<RESPONSE> delegate, Context context, GrpcRequest request) {
super(delegate);
this.context = context;
this.request = request;
}
@Override
@ -117,7 +118,7 @@ final class TracingClientInterceptor implements ClientInterceptor {
try (Scope ignored = context.makeCurrent()) {
delegate().onMessage(message);
} catch (Throwable e) {
tracer.onException(context, e);
instrumenter.end(context, request, null, e);
throw e;
}
}
@ -127,10 +128,10 @@ final class TracingClientInterceptor implements ClientInterceptor {
try (Scope ignored = context.makeCurrent()) {
delegate().onClose(status, trailers);
} catch (Throwable e) {
tracer.endExceptionally(context, e);
instrumenter.end(context, request, status, e);
throw e;
}
tracer.end(context, status);
instrumenter.end(context, request, status, status.getCause());
}
@Override
@ -138,7 +139,7 @@ final class TracingClientInterceptor implements ClientInterceptor {
try (Scope ignored = context.makeCurrent()) {
delegate().onReady();
} catch (Throwable e) {
tracer.endExceptionally(context, e);
instrumenter.end(context, request, null, e);
throw e;
}
}

View File

@ -19,18 +19,17 @@ import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.concurrent.atomic.AtomicLong;
final class TracingServerInterceptor implements ServerInterceptor {
private final GrpcServerTracer tracer;
private final Instrumenter<GrpcRequest, Status> instrumenter;
private final boolean captureExperimentalSpanAttributes;
TracingServerInterceptor(GrpcServerTracer tracer, boolean captureExperimentalSpanAttributes) {
this.tracer = tracer;
TracingServerInterceptor(
Instrumenter<GrpcRequest, Status> instrumenter, boolean captureExperimentalSpanAttributes) {
this.instrumenter = instrumenter;
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
}
@ -39,27 +38,24 @@ final class TracingServerInterceptor implements ServerInterceptor {
ServerCall<REQUEST, RESPONSE> call,
Metadata headers,
ServerCallHandler<REQUEST, RESPONSE> next) {
String methodName = call.getMethodDescriptor().getFullMethodName();
Context context = tracer.startSpan(methodName, headers);
Span span = Span.fromContext(context);
SocketAddress address = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
if (address instanceof InetSocketAddress) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) address;
span.setAttribute(SemanticAttributes.NET_PEER_PORT, inetSocketAddress.getPort());
span.setAttribute(
SemanticAttributes.NET_PEER_IP, inetSocketAddress.getAddress().getHostAddress());
}
GrpcHelper.prepareSpan(span, methodName);
GrpcRequest request =
new GrpcRequest(
call.getMethodDescriptor(),
headers,
call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
Context context = instrumenter.start(Context.current(), request);
try (Scope ignored = context.makeCurrent()) {
return new TracingServerCallListener<>(
Contexts.interceptCall(
io.grpc.Context.current(), new TracingServerCall<>(call, context), headers, next),
context);
io.grpc.Context.current(),
new TracingServerCall<>(call, context, request),
headers,
next),
context,
request);
} catch (Throwable e) {
tracer.endExceptionally(context, e);
instrumenter.end(context, request, null, e);
throw e;
}
}
@ -67,33 +63,38 @@ final class TracingServerInterceptor implements ServerInterceptor {
final class TracingServerCall<REQUEST, RESPONSE>
extends ForwardingServerCall.SimpleForwardingServerCall<REQUEST, RESPONSE> {
private final Context context;
private final GrpcRequest request;
TracingServerCall(ServerCall<REQUEST, RESPONSE> delegate, Context context) {
TracingServerCall(
ServerCall<REQUEST, RESPONSE> delegate, Context context, GrpcRequest request) {
super(delegate);
this.context = context;
this.request = request;
}
@Override
public void close(Status status, Metadata trailers) {
tracer.setStatus(context, status);
try {
delegate().close(status, trailers);
} catch (Throwable e) {
tracer.endExceptionally(context, e);
instrumenter.end(context, request, status, e);
throw e;
}
instrumenter.end(context, request, status, status.getCause());
}
}
final class TracingServerCallListener<REQUEST>
extends ForwardingServerCallListener.SimpleForwardingServerCallListener<REQUEST> {
private final Context context;
private final GrpcRequest request;
private final AtomicLong messageId = new AtomicLong();
TracingServerCallListener(Listener<REQUEST> delegate, Context context) {
TracingServerCallListener(Listener<REQUEST> delegate, Context context, GrpcRequest request) {
super(delegate);
this.context = context;
this.request = request;
}
@Override
@ -114,7 +115,7 @@ final class TracingServerInterceptor implements ServerInterceptor {
try {
delegate().onHalfClose();
} catch (Throwable e) {
tracer.endExceptionally(context, e);
instrumenter.end(context, request, null, e);
throw e;
}
}
@ -127,10 +128,10 @@ final class TracingServerInterceptor implements ServerInterceptor {
Span.fromContext(context).setAttribute("grpc.canceled", true);
}
} catch (Throwable e) {
tracer.endExceptionally(context, e);
instrumenter.end(context, request, null, e);
throw e;
}
tracer.end(context);
instrumenter.end(context, request, null, null);
}
@Override
@ -138,10 +139,9 @@ final class TracingServerInterceptor implements ServerInterceptor {
try {
delegate().onComplete();
} catch (Throwable e) {
tracer.endExceptionally(context, e);
instrumenter.end(context, request, null, e);
throw e;
}
tracer.end(context);
}
@Override
@ -149,7 +149,7 @@ final class TracingServerInterceptor implements ServerInterceptor {
try {
delegate().onReady();
} catch (Throwable e) {
tracer.endExceptionally(context, e);
instrumenter.end(context, request, null, e);
throw e;
}
}

View File

@ -1,38 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.grpc.v1_6
import io.grpc.Status
import io.opentelemetry.api.trace.StatusCode
import spock.lang.Specification
class GrpcHelperTest extends Specification {
def "test status from #grpcStatus.code"() {
when:
def status = GrpcHelper.statusFromGrpcStatus(grpcStatus)
then:
if (grpcStatus == Status.OK) {
status == StatusCode.UNSET
} else {
status == StatusCode.ERROR
}
// Considering history of status, if we compare all values of the gRPC status by name, we will
// probably find any new mismatches with the OpenTelemetry spec.
where:
grpcStatus << Status.Code.values().collect { Status.fromCode(it) }
}
def "test status has grpc description"() {
when:
def status = GrpcHelper.statusFromGrpcStatus(Status.INVALID_ARGUMENT.withDescription("bad argument"))
then:
status == StatusCode.ERROR
}
}

View File

@ -15,6 +15,7 @@ import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import io.grpc.Server
import io.grpc.ServerBuilder
import io.grpc.Status
import io.grpc.stub.StreamObserver
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.instrumentation.test.utils.PortUtils
@ -112,6 +113,8 @@ abstract class AbstractGrpcStreamingTest extends InstrumentationSpecification {
"${SemanticAttributes.RPC_SYSTEM.key}" "grpc"
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "Conversation"
"${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE.key}" Status.OK.code.value()
}
(1..(clientMessageCount * serverMessageCount)).each {
def messageId = it
@ -133,7 +136,10 @@ abstract class AbstractGrpcStreamingTest extends InstrumentationSpecification {
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "Conversation"
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
"${SemanticAttributes.NET_PEER_PORT.key}" Long
"${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE.key}" Status.OK.code.value()
}
clientRange.each {
def messageId = it

View File

@ -101,6 +101,8 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
"${SemanticAttributes.RPC_SYSTEM.key}" "grpc"
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
"${SemanticAttributes.NET_TRANSPORT}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE}" Status.Code.OK.value()
}
}
span(2) {
@ -119,7 +121,10 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
"${SemanticAttributes.NET_PEER_PORT.key}" Long
"${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE.key}" Status.Code.OK.value()
}
}
}
@ -175,6 +180,8 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
"${SemanticAttributes.RPC_SYSTEM.key}" "grpc"
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
"${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE.key}" grpcStatus.code.value()
}
}
span(1) {
@ -197,7 +204,10 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
"${SemanticAttributes.NET_PEER_PORT.key}" Long
"${SemanticAttributes.NET_TRANSPORT}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE}" grpcStatus.code.value()
}
}
}
@ -262,6 +272,8 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
"${SemanticAttributes.RPC_SYSTEM.key}" "grpc"
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
"${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE.key}" Status.UNKNOWN.code.value()
}
}
span(1) {
@ -276,15 +288,15 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
"message.id" 1
}
}
if (grpcStatus.cause != null) {
errorEvent grpcStatus.cause.class, grpcStatus.cause.message, 1
}
errorEvent grpcStatus.asRuntimeException().class, grpcStatus.asRuntimeException().message, 1
attributes {
"${SemanticAttributes.RPC_SYSTEM.key}" "grpc"
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
"${SemanticAttributes.NET_PEER_PORT.key}" Long
"${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP
}
}
}
@ -423,6 +435,8 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
"${SemanticAttributes.RPC_SYSTEM.key}" "grpc"
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
"${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE.key}" Status.OK.code.value()
}
}
span(2) {
@ -441,7 +455,10 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
"${SemanticAttributes.NET_PEER_PORT.key}" Long
"${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE.key}" Status.OK.code.value()
}
}
}
@ -530,6 +547,7 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
"${SemanticAttributes.RPC_SYSTEM.key}" "grpc"
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
"${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP
}
}
span(2) {
@ -548,7 +566,9 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
"${SemanticAttributes.NET_PEER_PORT.key}" Long
"${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP
}
}
}
@ -625,6 +645,8 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
"${SemanticAttributes.RPC_SYSTEM.key}" "grpc"
"${SemanticAttributes.RPC_SERVICE.key}" "grpc.reflection.v1alpha.ServerReflection"
"${SemanticAttributes.RPC_METHOD.key}" "ServerReflectionInfo"
"${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE.key}" Status.OK.code.value()
}
}
span(1) {
@ -643,7 +665,10 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
"${SemanticAttributes.RPC_SERVICE.key}" "grpc.reflection.v1alpha.ServerReflection"
"${SemanticAttributes.RPC_METHOD.key}" "ServerReflectionInfo"
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
"${SemanticAttributes.NET_PEER_PORT.key}" Long
"${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE.key}" Status.OK.code.value()
}
}
}