Add gRPC request metadata instrumentation (#7011)
Solves. #6991 This PR implements the request portion of the new gRPC metadata instrumentation spec: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/rpc.md#grpc-request-and-response-metadata The changes include: - new CommonConfig entry for desired gRPC metadata values: 'otel.instrumentation.grpc.capture-metadata.request' (Similar to http headers) - setting the desired metadata values in GrpcTelemetry - new property in GrpcAttributesExtractor that holds a reference to the GrpcRpcAttributesGetter - new property in GrpcAttributesExtractor that stores the desired values so it can iterate them and extract each one from the request - inject the GrpcRpcAttributesGetter to GrpcAttributesExtractor (in GrpcTelemetryBuilder) - logic in GrpcRpcAttributesGetter to safely extract the gRPC metadata value - A new test in GrpcTest that makes sure that when a certain metadata key name is inserted, it also ends up in the span attributes ** Doesn't take care of the response because gRPC response is not implemented in java-instrumentation yet. (This is absolutely necessary but out of scope for this PR) ** "metadataValue" is only implemented inside GrpcRpcAttributesGetter and not in RpcAttributesGetter to avoid providing implementations for every RpcAttributesGetter in the repo as this PR only focuses on gRPC. Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
This commit is contained in:
parent
12619c89c2
commit
b9c10c9607
|
@ -33,5 +33,7 @@ tasks {
|
|||
// The agent context debug mechanism isn't compatible with the bridge approach which may add a
|
||||
// gRPC context to the root.
|
||||
jvmArgs("-Dotel.javaagent.experimental.thread-propagation-debugger.enabled=false")
|
||||
jvmArgs("-Dotel.instrumentation.grpc.capture-metadata.client.request=some-client-key")
|
||||
jvmArgs("-Dotel.instrumentation.grpc.capture-metadata.server.request=some-server-key")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,8 @@
|
|||
|
||||
package io.opentelemetry.javaagent.instrumentation.grpc.v1_6;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
|
||||
import io.grpc.ClientInterceptor;
|
||||
import io.grpc.Context;
|
||||
import io.grpc.ServerInterceptor;
|
||||
|
@ -12,6 +14,7 @@ import io.opentelemetry.api.GlobalOpenTelemetry;
|
|||
import io.opentelemetry.instrumentation.grpc.v1_6.GrpcTelemetry;
|
||||
import io.opentelemetry.instrumentation.grpc.v1_6.internal.ContextStorageBridge;
|
||||
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
|
||||
import java.util.List;
|
||||
|
||||
// Holds singleton references.
|
||||
public final class GrpcSingletons {
|
||||
|
@ -27,9 +30,18 @@ public final class GrpcSingletons {
|
|||
InstrumentationConfig.get()
|
||||
.getBoolean("otel.instrumentation.grpc.experimental-span-attributes", false);
|
||||
|
||||
List<String> clientRequestMetadata =
|
||||
InstrumentationConfig.get()
|
||||
.getList("otel.instrumentation.grpc.capture-metadata.client.request", emptyList());
|
||||
List<String> serverRequestMetadata =
|
||||
InstrumentationConfig.get()
|
||||
.getList("otel.instrumentation.grpc.capture-metadata.server.request", emptyList());
|
||||
|
||||
GrpcTelemetry telemetry =
|
||||
GrpcTelemetry.builder(GlobalOpenTelemetry.get())
|
||||
.setCaptureExperimentalSpanAttributes(experimentalSpanAttributes)
|
||||
.setCapturedClientRequestMetadata(clientRequestMetadata)
|
||||
.setCapturedServerRequestMetadata(serverRequestMetadata)
|
||||
.build();
|
||||
|
||||
CLIENT_INTERCEPTOR = telemetry.newClientInterceptor();
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.grpc.v1_6;
|
||||
|
||||
import static java.util.Collections.unmodifiableList;
|
||||
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
final class CapturedGrpcMetadataUtil {
|
||||
private static final String RPC_REQUEST_METADATA_KEY_ATTRIBUTE_PREFIX =
|
||||
"rpc.grpc.request.metadata.";
|
||||
private static final ConcurrentMap<String, AttributeKey<List<String>>> requestKeysCache =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
static List<String> lowercase(List<String> names) {
|
||||
return unmodifiableList(
|
||||
names.stream().map(s -> s.toLowerCase(Locale.ROOT)).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
static AttributeKey<List<String>> requestAttributeKey(String metadataKey) {
|
||||
return requestKeysCache.computeIfAbsent(
|
||||
metadataKey, CapturedGrpcMetadataUtil::createRequestKey);
|
||||
}
|
||||
|
||||
private static AttributeKey<List<String>> createRequestKey(String metadataKey) {
|
||||
return AttributeKey.stringArrayKey(RPC_REQUEST_METADATA_KEY_ATTRIBUTE_PREFIX + metadataKey);
|
||||
}
|
||||
|
||||
private CapturedGrpcMetadataUtil() {}
|
||||
}
|
|
@ -5,18 +5,30 @@
|
|||
|
||||
package io.opentelemetry.instrumentation.grpc.v1_6;
|
||||
|
||||
import static io.opentelemetry.instrumentation.grpc.v1_6.CapturedGrpcMetadataUtil.lowercase;
|
||||
import static io.opentelemetry.instrumentation.grpc.v1_6.CapturedGrpcMetadataUtil.requestAttributeKey;
|
||||
|
||||
import io.grpc.Status;
|
||||
import io.opentelemetry.api.common.AttributesBuilder;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
||||
import java.util.List;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
final class GrpcAttributesExtractor implements AttributesExtractor<GrpcRequest, Status> {
|
||||
private final GrpcRpcAttributesGetter getter;
|
||||
private final List<String> capturedRequestMetadata;
|
||||
|
||||
GrpcAttributesExtractor(
|
||||
GrpcRpcAttributesGetter getter, List<String> requestMetadataValuesToCapture) {
|
||||
this.getter = getter;
|
||||
this.capturedRequestMetadata = lowercase(requestMetadataValuesToCapture);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStart(
|
||||
AttributesBuilder attributes, Context parentContext, GrpcRequest grpcRequest) {
|
||||
// No request attributes
|
||||
public void onStart(AttributesBuilder attributes, Context parentContext, GrpcRequest request) {
|
||||
// Request attributes captured on request end.
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -29,5 +41,11 @@ final class GrpcAttributesExtractor implements AttributesExtractor<GrpcRequest,
|
|||
if (status != null) {
|
||||
attributes.put(SemanticAttributes.RPC_GRPC_STATUS_CODE, status.getCode().value());
|
||||
}
|
||||
for (String key : capturedRequestMetadata) {
|
||||
List<String> value = getter.metadataValue(request, key);
|
||||
if (!value.isEmpty()) {
|
||||
attributes.put(requestAttributeKey(key), value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,12 @@
|
|||
|
||||
package io.opentelemetry.instrumentation.grpc.v1_6;
|
||||
|
||||
import io.grpc.Metadata;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcAttributesGetter;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
enum GrpcRpcAttributesGetter implements RpcAttributesGetter<GrpcRequest> {
|
||||
|
@ -37,4 +42,23 @@ enum GrpcRpcAttributesGetter implements RpcAttributesGetter<GrpcRequest> {
|
|||
}
|
||||
return fullMethodName.substring(slashIndex + 1);
|
||||
}
|
||||
|
||||
List<String> metadataValue(GrpcRequest request, String key) {
|
||||
if (request.getMetadata() == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
if (key == null || key.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
Iterable<String> values =
|
||||
request.getMetadata().getAll(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
|
||||
|
||||
if (values == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
return StreamSupport.stream(values.spliterator(), false).collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import io.opentelemetry.instrumentation.grpc.v1_6.internal.GrpcNetClientAttribut
|
|||
import io.opentelemetry.instrumentation.grpc.v1_6.internal.GrpcNetServerAttributesGetter;
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
|
@ -52,6 +53,8 @@ public final class GrpcTelemetryBuilder {
|
|||
additionalServerExtractors = new ArrayList<>();
|
||||
|
||||
private boolean captureExperimentalSpanAttributes;
|
||||
private List<String> capturedClientRequestMetadata = Collections.emptyList();
|
||||
private List<String> capturedServerRequestMetadata = Collections.emptyList();
|
||||
|
||||
GrpcTelemetryBuilder(OpenTelemetry openTelemetry) {
|
||||
this.openTelemetry = openTelemetry;
|
||||
|
@ -129,6 +132,22 @@ public final class GrpcTelemetryBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
/** Sets which metadata request values should be captured as span attributes on client spans. */
|
||||
@CanIgnoreReturnValue
|
||||
public GrpcTelemetryBuilder setCapturedClientRequestMetadata(
|
||||
List<String> capturedClientRequestMetadata) {
|
||||
this.capturedClientRequestMetadata = capturedClientRequestMetadata;
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Sets which metadata request values should be captured as span attributes on server spans. */
|
||||
@CanIgnoreReturnValue
|
||||
public GrpcTelemetryBuilder setCapturedServerRequestMetadata(
|
||||
List<String> capturedServerRequestMetadata) {
|
||||
this.capturedServerRequestMetadata = capturedServerRequestMetadata;
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Returns a new {@link GrpcTelemetry} with the settings of this {@link GrpcTelemetryBuilder}. */
|
||||
public GrpcTelemetry build() {
|
||||
SpanNameExtractor<GrpcRequest> originalSpanNameExtractor = new GrpcSpanNameExtractor();
|
||||
|
@ -153,7 +172,6 @@ public final class GrpcTelemetryBuilder {
|
|||
instrumenter ->
|
||||
instrumenter
|
||||
.setSpanStatusExtractor(new GrpcSpanStatusExtractor())
|
||||
.addAttributesExtractor(new GrpcAttributesExtractor())
|
||||
.addAttributesExtractors(additionalExtractors));
|
||||
|
||||
GrpcNetClientAttributesGetter netClientAttributesGetter = new GrpcNetClientAttributesGetter();
|
||||
|
@ -163,11 +181,17 @@ public final class GrpcTelemetryBuilder {
|
|||
.addAttributesExtractor(RpcClientAttributesExtractor.create(rpcAttributesGetter))
|
||||
.addAttributesExtractor(NetClientAttributesExtractor.create(netClientAttributesGetter))
|
||||
.addAttributesExtractors(additionalClientExtractors)
|
||||
.addAttributesExtractor(
|
||||
new GrpcAttributesExtractor(
|
||||
GrpcRpcAttributesGetter.INSTANCE, capturedClientRequestMetadata))
|
||||
.addOperationMetrics(RpcClientMetrics.get());
|
||||
serverInstrumenterBuilder
|
||||
.addAttributesExtractor(RpcServerAttributesExtractor.create(rpcAttributesGetter))
|
||||
.addAttributesExtractor(
|
||||
NetServerAttributesExtractor.create(new GrpcNetServerAttributesGetter()))
|
||||
.addAttributesExtractor(
|
||||
new GrpcAttributesExtractor(
|
||||
GrpcRpcAttributesGetter.INSTANCE, capturedServerRequestMetadata))
|
||||
.addAttributesExtractors(additionalServerExtractors)
|
||||
.addOperationMetrics(RpcServerMetrics.get());
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
|
|||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
|
||||
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.annotation.Nullable;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -43,13 +44,21 @@ class GrpcTest extends AbstractGrpcTest {
|
|||
@Override
|
||||
protected ServerBuilder<?> configureServer(ServerBuilder<?> server) {
|
||||
return server.intercept(
|
||||
GrpcTelemetry.create(testing.getOpenTelemetry()).newServerInterceptor());
|
||||
GrpcTelemetry.builder(testing.getOpenTelemetry())
|
||||
.setCapturedServerRequestMetadata(
|
||||
Collections.singletonList(SERVER_REQUEST_METADATA_KEY))
|
||||
.build()
|
||||
.newServerInterceptor());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ManagedChannelBuilder<?> configureClient(ManagedChannelBuilder<?> client) {
|
||||
return client.intercept(
|
||||
GrpcTelemetry.create(testing.getOpenTelemetry()).newClientInterceptor());
|
||||
GrpcTelemetry.builder(testing.getOpenTelemetry())
|
||||
.setCapturedClientRequestMetadata(
|
||||
Collections.singletonList(CLIENT_REQUEST_METADATA_KEY))
|
||||
.build()
|
||||
.newClientInterceptor());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -39,13 +39,18 @@ import io.grpc.protobuf.services.ProtoReflectionService;
|
|||
import io.grpc.reflection.v1alpha.ServerReflectionGrpc;
|
||||
import io.grpc.reflection.v1alpha.ServerReflectionRequest;
|
||||
import io.grpc.reflection.v1alpha.ServerReflectionResponse;
|
||||
import io.grpc.stub.MetadataUtils;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.util.ThrowingRunnable;
|
||||
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
|
||||
import io.opentelemetry.sdk.trace.data.StatusData;
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -66,6 +71,9 @@ import org.junit.jupiter.params.provider.ValueSource;
|
|||
|
||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||
public abstract class AbstractGrpcTest {
|
||||
protected static final String CLIENT_REQUEST_METADATA_KEY = "some-client-key";
|
||||
|
||||
protected static final String SERVER_REQUEST_METADATA_KEY = "some-server-key";
|
||||
|
||||
protected abstract ServerBuilder<?> configureServer(ServerBuilder<?> server);
|
||||
|
||||
|
@ -1669,6 +1677,72 @@ public abstract class AbstractGrpcTest {
|
|||
assertThat(error).hasValue(null);
|
||||
}
|
||||
|
||||
@Test
|
||||
void setCapturedRequestMetadata() throws Exception {
|
||||
String metadataAttributePrefix = "rpc.grpc.request.metadata.";
|
||||
AttributeKey<List<String>> clientAttributeKey =
|
||||
AttributeKey.stringArrayKey(metadataAttributePrefix + CLIENT_REQUEST_METADATA_KEY);
|
||||
AttributeKey<List<String>> serverAttributeKey =
|
||||
AttributeKey.stringArrayKey(metadataAttributePrefix + SERVER_REQUEST_METADATA_KEY);
|
||||
String serverMetadataValue = "server-value";
|
||||
String clientMetadataValue = "client-value";
|
||||
|
||||
BindableService greeter =
|
||||
new GreeterGrpc.GreeterImplBase() {
|
||||
@Override
|
||||
public void sayHello(
|
||||
Helloworld.Request req, StreamObserver<Helloworld.Response> responseObserver) {
|
||||
Helloworld.Response reply =
|
||||
Helloworld.Response.newBuilder().setMessage("Hello " + req.getName()).build();
|
||||
responseObserver.onNext(reply);
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
};
|
||||
|
||||
Server server = configureServer(ServerBuilder.forPort(0).addService(greeter)).build().start();
|
||||
|
||||
ManagedChannel channel = createChannel(server);
|
||||
|
||||
Metadata extraMetadata = new Metadata();
|
||||
extraMetadata.put(
|
||||
Metadata.Key.of(SERVER_REQUEST_METADATA_KEY, Metadata.ASCII_STRING_MARSHALLER),
|
||||
serverMetadataValue);
|
||||
extraMetadata.put(
|
||||
Metadata.Key.of(CLIENT_REQUEST_METADATA_KEY, Metadata.ASCII_STRING_MARSHALLER),
|
||||
clientMetadataValue);
|
||||
|
||||
GreeterGrpc.GreeterBlockingStub client =
|
||||
GreeterGrpc.newBlockingStub(channel)
|
||||
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(extraMetadata));
|
||||
|
||||
Helloworld.Response response =
|
||||
testing()
|
||||
.runWithSpan(
|
||||
"parent",
|
||||
() -> client.sayHello(Helloworld.Request.newBuilder().setName("test").build()));
|
||||
|
||||
OpenTelemetryAssertions.assertThat(response.getMessage()).isEqualTo("Hello test");
|
||||
|
||||
testing()
|
||||
.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
|
||||
span ->
|
||||
span.hasName("example.Greeter/SayHello")
|
||||
.hasKind(SpanKind.CLIENT)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttribute(
|
||||
clientAttributeKey, Collections.singletonList(clientMetadataValue)),
|
||||
span ->
|
||||
span.hasName("example.Greeter/SayHello")
|
||||
.hasKind(SpanKind.SERVER)
|
||||
.hasParent(trace.getSpan(1))
|
||||
.hasAttribute(
|
||||
serverAttributeKey,
|
||||
Collections.singletonList(serverMetadataValue))));
|
||||
}
|
||||
|
||||
private ManagedChannel createChannel(Server server) throws Exception {
|
||||
ManagedChannelBuilder<?> channelBuilder =
|
||||
configureClient(ManagedChannelBuilder.forAddress("localhost", server.getPort()));
|
||||
|
|
Loading…
Reference in New Issue