Made gRPC instrumenter compliant with semantic conventions (#202)

* Added service name and corrected span name

* Support peer address for clients

* Added server peer address resolution

* Added constants for new tag names

* Cosmetic change

* Fixed muzzle issue

* Better handling of addresses

* Addressed PR comments
This commit is contained in:
Pontus Rydin 2020-03-03 13:01:26 -05:00 committed by GitHub
parent 81cd6dbb23
commit d3246121cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 198 additions and 39 deletions

View File

@ -29,4 +29,10 @@ public class MoreTags {
public static final String ERROR_MSG = "error.msg"; // string representing the error message
public static final String ERROR_TYPE = "error.type"; // string representing the type of the error
public static final String ERROR_STACK = "error.stack"; // human readable version of the stack
public static final String RPC_SERVICE = "rpc.service";
public static final String NET_PEER_NAME = "net.peer.name";
public static final String NET_PEER_IP = "net.peer.ip";
public static final String NET_PEER_PORT = "net.peer.port";
}

View File

@ -15,19 +15,26 @@
*/
package io.opentelemetry.auto.instrumentation.grpc.client;
import static java.util.Collections.singletonMap;
import static io.opentelemetry.auto.tooling.bytebuddy.matcher.AgentElementMatchers.extendsClass;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import com.google.auto.service.AutoService;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.auto.bootstrap.ContextStore;
import io.opentelemetry.auto.bootstrap.InstrumentationContext;
import io.opentelemetry.auto.tooling.Instrumenter;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import net.bytebuddy.matcher.ElementMatchers;
@AutoService(Instrumenter.class)
public class GrpcClientBuilderInstrumentation extends Instrumenter.Default {
@ -38,7 +45,7 @@ public class GrpcClientBuilderInstrumentation extends Instrumenter.Default {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("io.grpc.internal.AbstractManagedChannelImplBuilder");
return extendsClass(named("io.grpc.ManagedChannelBuilder"));
}
@Override
@ -48,23 +55,36 @@ public class GrpcClientBuilderInstrumentation extends Instrumenter.Default {
"io.opentelemetry.auto.instrumentation.grpc.client.TracingClientInterceptor",
"io.opentelemetry.auto.instrumentation.grpc.client.TracingClientInterceptor$TracingClientCall",
"io.opentelemetry.auto.instrumentation.grpc.client.TracingClientInterceptor$TracingClientCallListener",
"io.opentelemetry.auto.instrumentation.grpc.common.GrpcHelper",
"io.opentelemetry.auto.decorator.BaseDecorator",
"io.opentelemetry.auto.decorator.ClientDecorator",
packageName + ".GrpcClientDecorator",
};
}
@Override
public Map<String, String> contextStore() {
return Collections.singletonMap(
"io.grpc.ManagedChannelBuilder", InetSocketAddress.class.getName());
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
final Map<ElementMatcher<? super MethodDescription>, String> map = new HashMap<>(2);
map.put(
isMethod().and(named("build")),
GrpcClientBuilderInstrumentation.class.getName() + "$AddInterceptorAdvice");
map.put(
isMethod().and(named("forAddress").and(ElementMatchers.takesArguments(2))),
GrpcClientBuilderInstrumentation.class.getName() + "$ForAddressAdvice");
return map;
}
public static class AddInterceptorAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void addInterceptor(
@Advice.This final ManagedChannelBuilder thiz,
@Advice.FieldValue("interceptors") final List<ClientInterceptor> interceptors) {
boolean shouldRegister = true;
for (final ClientInterceptor interceptor : interceptors) {
@ -74,8 +94,23 @@ public class GrpcClientBuilderInstrumentation extends Instrumenter.Default {
}
}
if (shouldRegister) {
interceptors.add(0, TracingClientInterceptor.INSTANCE);
final ContextStore<ManagedChannelBuilder, InetSocketAddress> contextStore =
InstrumentationContext.get(ManagedChannelBuilder.class, InetSocketAddress.class);
final InetSocketAddress sockAddr = contextStore.get(thiz);
interceptors.add(0, new TracingClientInterceptor(sockAddr));
}
}
}
public static class ForAddressAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static final void forAddress(
@Advice.Argument(0) final String address,
@Advice.Argument(1) final int port,
@Advice.Return final ManagedChannelBuilder builder) {
final ContextStore<ManagedChannelBuilder, InetSocketAddress> contextStore =
InstrumentationContext.get(ManagedChannelBuilder.class, InetSocketAddress.class);
contextStore.put(builder, new InetSocketAddress(address, port));
}
}
}

View File

@ -29,17 +29,21 @@ import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.opentelemetry.auto.instrumentation.api.MoreTags;
import io.opentelemetry.auto.instrumentation.grpc.common.GrpcHelper;
import io.opentelemetry.context.Scope;
import io.opentelemetry.trace.AttributeValue;
import io.opentelemetry.trace.Span;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class TracingClientInterceptor implements ClientInterceptor {
private final InetSocketAddress peerAddress;
public static final TracingClientInterceptor INSTANCE = new TracingClientInterceptor();
public TracingClientInterceptor(final InetSocketAddress peerAddress) {
this.peerAddress = peerAddress;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
@ -47,10 +51,11 @@ public class TracingClientInterceptor implements ClientInterceptor {
final CallOptions callOptions,
final Channel next) {
final Span span = TRACER.spanBuilder("grpc.client").setSpanKind(CLIENT).startSpan();
span.setAttribute(MoreTags.RESOURCE_NAME, method.getFullMethodName());
final String methodName = method.getFullMethodName();
final Span span = TRACER.spanBuilder(methodName).setSpanKind(CLIENT).startSpan();
try (final Scope scope = TRACER.withSpan(span)) {
DECORATE.afterStart(span);
GrpcHelper.prepareSpan(span, methodName, peerAddress);
final ClientCall<ReqT, RespT> result;
try {

View File

@ -0,0 +1,47 @@
/*
* Copyright 2020, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.grpc.common;
import io.opentelemetry.auto.instrumentation.api.MoreTags;
import io.opentelemetry.trace.Span;
import java.net.InetSocketAddress;
public class GrpcHelper {
public static void prepareSpan(
final Span span, final String methodName, final InetSocketAddress peerAddress) {
String serviceName =
"(unknown)"; // Spec says it's mandatory, so populate even if we couldn't determine it.
final int slash = methodName.indexOf('/');
if (slash != -1) {
final String fullServiceName = methodName.substring(0, slash);
final int dot = fullServiceName.lastIndexOf('.');
if (dot != -1) {
serviceName = fullServiceName.substring(dot + 1);
}
}
span.setAttribute(MoreTags.RPC_SERVICE, serviceName);
if (peerAddress != null) {
span.setAttribute(MoreTags.NET_PEER_PORT, peerAddress.getPort());
span.setAttribute(MoreTags.NET_PEER_IP, peerAddress.getAddress().getHostAddress());
span.setAttribute(MoreTags.NET_PEER_NAME, peerAddress.getAddress().getHostName());
} else {
// The spec says these fields must be populated, so put some values in even if we don't have
// an address recorded.
span.setAttribute(MoreTags.NET_PEER_PORT, 0);
span.setAttribute(MoreTags.NET_PEER_NAME, "(unknown)");
}
}
}

View File

@ -47,6 +47,7 @@ public class GrpcServerBuilderInstrumentation extends Instrumenter.Default {
"io.opentelemetry.auto.instrumentation.grpc.server.TracingServerInterceptor",
"io.opentelemetry.auto.instrumentation.grpc.server.TracingServerInterceptor$TracingServerCall",
"io.opentelemetry.auto.instrumentation.grpc.server.TracingServerInterceptor$TracingServerCallListener",
"io.opentelemetry.auto.instrumentation.grpc.common.GrpcHelper",
"io.opentelemetry.auto.decorator.BaseDecorator",
"io.opentelemetry.auto.decorator.ServerDecorator",
packageName + ".GrpcServerDecorator",

View File

@ -22,16 +22,19 @@ import static io.opentelemetry.trace.Span.Kind.SERVER;
import io.grpc.ForwardingServerCall;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.opentelemetry.auto.instrumentation.api.MoreTags;
import io.opentelemetry.auto.instrumentation.grpc.common.GrpcHelper;
import io.opentelemetry.context.Scope;
import io.opentelemetry.trace.AttributeValue;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.SpanContext;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@ -48,7 +51,8 @@ public class TracingServerInterceptor implements ServerInterceptor {
final Metadata headers,
final ServerCallHandler<ReqT, RespT> next) {
final Span.Builder spanBuilder = TRACER.spanBuilder("grpc.server").setSpanKind(SERVER);
final String methodName = call.getMethodDescriptor().getFullMethodName();
final Span.Builder spanBuilder = TRACER.spanBuilder(methodName).setSpanKind(SERVER);
try {
final SpanContext extractedContext = TRACER.getHttpTextFormat().extract(headers, GETTER);
spanBuilder.setParent(extractedContext);
@ -57,7 +61,10 @@ public class TracingServerInterceptor implements ServerInterceptor {
spanBuilder.setNoParent();
}
final Span span = spanBuilder.startSpan();
span.setAttribute(MoreTags.RESOURCE_NAME, call.getMethodDescriptor().getFullMethodName());
final SocketAddress addr = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
final InetSocketAddress iAddr =
addr instanceof InetSocketAddress ? (InetSocketAddress) addr : null;
GrpcHelper.prepareSpan(span, methodName, iAddr);
DECORATE.afterStart(span);

View File

@ -17,14 +17,15 @@ import example.GreeterGrpc
import example.Helloworld
import io.grpc.BindableService
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import io.grpc.Server
import io.grpc.inprocess.InProcessChannelBuilder
import io.grpc.inprocess.InProcessServerBuilder
import io.grpc.ServerBuilder
import io.grpc.stub.StreamObserver
import io.opentelemetry.auto.instrumentation.api.MoreTags
import io.opentelemetry.auto.instrumentation.api.SpanTypes
import io.opentelemetry.auto.instrumentation.api.Tags
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.auto.test.utils.PortUtils
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.TimeUnit
@ -68,9 +69,17 @@ class GrpcStreamingTest extends AgentTestRunner {
}
}
}
Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter).directExecutor().build().start()
def port = PortUtils.randomOpenPort()
Server server = ServerBuilder.forPort(port).addService(greeter).build().start()
ManagedChannelBuilder channelBuilder = ManagedChannelBuilder.forAddress("localhost", port)
ManagedChannel channel = InProcessChannelBuilder.forName(getClass().name).build()
// Depending on the version of gRPC usePlainText may or may not take an argument.
try {
channelBuilder.usePlaintext()
} catch (MissingMethodException e) {
channelBuilder.usePlaintext(true)
}
ManagedChannel channel = channelBuilder.build()
GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel).withWaitForReady()
when:
@ -102,14 +111,17 @@ class GrpcStreamingTest extends AgentTestRunner {
assertTraces(1) {
trace(0, 2) {
span(0) {
operationName "grpc.client"
operationName "example.Greeter/Conversation"
spanKind CLIENT
parent()
errored false
tags {
"$MoreTags.RESOURCE_NAME" "example.Greeter/Conversation"
"$MoreTags.SPAN_TYPE" SpanTypes.RPC
"$MoreTags.RPC_SERVICE" "Greeter"
"$Tags.COMPONENT" "grpc-client"
"$MoreTags.NET_PEER_NAME" "localhost"
"$MoreTags.NET_PEER_IP" "127.0.0.1"
"$MoreTags.NET_PEER_PORT" port
"status.code" "OK"
}
(1..(clientMessageCount * serverMessageCount)).each {
@ -124,14 +136,17 @@ class GrpcStreamingTest extends AgentTestRunner {
}
}
span(1) {
operationName "grpc.server"
operationName "example.Greeter/Conversation"
spanKind SERVER
childOf span(0)
errored false
tags {
"$MoreTags.RESOURCE_NAME" "example.Greeter/Conversation"
"$MoreTags.SPAN_TYPE" SpanTypes.RPC
"$MoreTags.RPC_SERVICE" "Greeter"
"$Tags.COMPONENT" "grpc-server"
"$MoreTags.NET_PEER_NAME" "localhost"
"$MoreTags.NET_PEER_IP" "127.0.0.1"
"$MoreTags.NET_PEER_PORT" Long
"status.code" "OK"
}
clientRange.each {

View File

@ -17,16 +17,17 @@ import example.GreeterGrpc
import example.Helloworld
import io.grpc.BindableService
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import io.grpc.Server
import io.grpc.ServerBuilder
import io.grpc.Status
import io.grpc.StatusRuntimeException
import io.grpc.inprocess.InProcessChannelBuilder
import io.grpc.inprocess.InProcessServerBuilder
import io.grpc.stub.StreamObserver
import io.opentelemetry.auto.instrumentation.api.MoreTags
import io.opentelemetry.auto.instrumentation.api.SpanTypes
import io.opentelemetry.auto.instrumentation.api.Tags
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.auto.test.utils.PortUtils
import java.util.concurrent.TimeUnit
@ -46,9 +47,17 @@ class GrpcTest extends AgentTestRunner {
responseObserver.onCompleted()
}
}
Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter).directExecutor().build().start()
def port = PortUtils.randomOpenPort()
Server server = ServerBuilder.forPort(port).addService(greeter).build().start()
ManagedChannelBuilder channelBuilder = ManagedChannelBuilder.forAddress("localhost", port)
ManagedChannel channel = InProcessChannelBuilder.forName(getClass().name).build()
// Depending on the version of gRPC usePlainText may or may not take an argument.
try {
channelBuilder.usePlaintext()
} catch (MissingMethodException e) {
channelBuilder.usePlaintext(true)
}
ManagedChannel channel = channelBuilder.build()
GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel)
when:
@ -60,7 +69,7 @@ class GrpcTest extends AgentTestRunner {
assertTraces(1) {
trace(0, 2) {
span(0) {
operationName "grpc.client"
operationName "example.Greeter/SayHello"
spanKind CLIENT
parent()
errored false
@ -72,14 +81,17 @@ class GrpcTest extends AgentTestRunner {
}
}
tags {
"$MoreTags.RESOURCE_NAME" "example.Greeter/SayHello"
"$MoreTags.SPAN_TYPE" SpanTypes.RPC
"$MoreTags.RPC_SERVICE" "Greeter"
"$Tags.COMPONENT" "grpc-client"
"$MoreTags.NET_PEER_NAME" "localhost"
"$MoreTags.NET_PEER_IP" "127.0.0.1"
"$MoreTags.NET_PEER_PORT" port
"status.code" "OK"
}
}
span(1) {
operationName "grpc.server"
operationName "example.Greeter/SayHello"
spanKind SERVER
childOf span(0)
errored false
@ -91,9 +103,12 @@ class GrpcTest extends AgentTestRunner {
}
}
tags {
"$MoreTags.RESOURCE_NAME" "example.Greeter/SayHello"
"$MoreTags.SPAN_TYPE" SpanTypes.RPC
"$MoreTags.RPC_SERVICE" "Greeter"
"$Tags.COMPONENT" "grpc-server"
"$MoreTags.NET_PEER_NAME" "localhost"
"$MoreTags.NET_PEER_IP" "127.0.0.1"
"$MoreTags.NET_PEER_PORT" Long
"status.code" "OK"
}
}
@ -118,9 +133,17 @@ class GrpcTest extends AgentTestRunner {
responseObserver.onError(error)
}
}
Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter).directExecutor().build().start()
def port = PortUtils.randomOpenPort()
Server server = ServerBuilder.forPort(port).addService(greeter).build().start()
ManagedChannelBuilder channelBuilder = ManagedChannelBuilder.forAddress("localhost", port)
ManagedChannel channel = InProcessChannelBuilder.forName(getClass().name).build()
// Depending on the version of gRPC usePlainText may or may not take an argument.
try {
channelBuilder.usePlaintext()
} catch (MissingMethodException e) {
channelBuilder.usePlaintext(true)
}
ManagedChannel channel = channelBuilder.build()
GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel)
when:
@ -132,20 +155,23 @@ class GrpcTest extends AgentTestRunner {
assertTraces(1) {
trace(0, 2) {
span(0) {
operationName "grpc.client"
operationName "example.Greeter/SayHello"
spanKind CLIENT
parent()
errored true
tags {
"$MoreTags.RESOURCE_NAME" "example.Greeter/SayHello"
"$MoreTags.SPAN_TYPE" SpanTypes.RPC
"$MoreTags.RPC_SERVICE" "Greeter"
"$Tags.COMPONENT" "grpc-client"
"status.code" "${status.code.name()}"
"status.description" description
"$MoreTags.NET_PEER_NAME" "localhost"
"$MoreTags.NET_PEER_IP" "127.0.0.1"
"$MoreTags.NET_PEER_PORT" port
}
}
span(1) {
operationName "grpc.server"
operationName "example.Greeter/SayHello"
spanKind SERVER
childOf span(0)
errored true
@ -157,11 +183,14 @@ class GrpcTest extends AgentTestRunner {
}
}
tags {
"$MoreTags.RESOURCE_NAME" "example.Greeter/SayHello"
"$MoreTags.SPAN_TYPE" SpanTypes.RPC
"$Tags.COMPONENT" "grpc-server"
"$MoreTags.RPC_SERVICE" "Greeter"
"status.code" "${status.code.name()}"
"status.description" description
"$MoreTags.NET_PEER_NAME" "localhost"
"$MoreTags.NET_PEER_IP" "127.0.0.1"
"$MoreTags.NET_PEER_PORT" Long
if (status.cause != null) {
errorTags status.cause.class, status.cause.message
}
@ -194,9 +223,17 @@ class GrpcTest extends AgentTestRunner {
throw error
}
}
Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter).directExecutor().build().start()
def port = PortUtils.randomOpenPort()
Server server = ServerBuilder.forPort(port).addService(greeter).build().start()
ManagedChannelBuilder channelBuilder = ManagedChannelBuilder.forAddress("localhost", port)
ManagedChannel channel = InProcessChannelBuilder.forName(getClass().name).build()
// Depending on the version of gRPC usePlainText may or may not take an argument.
try {
channelBuilder.usePlaintext()
} catch (MissingMethodException e) {
channelBuilder.usePlaintext(true)
}
ManagedChannel channel = channelBuilder.build()
GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel)
when:
@ -208,19 +245,22 @@ class GrpcTest extends AgentTestRunner {
assertTraces(1) {
trace(0, 2) {
span(0) {
operationName "grpc.client"
operationName "example.Greeter/SayHello"
spanKind CLIENT
parent()
errored true
tags {
"$MoreTags.RESOURCE_NAME" "example.Greeter/SayHello"
"$MoreTags.SPAN_TYPE" SpanTypes.RPC
"$MoreTags.RPC_SERVICE" "Greeter"
"$Tags.COMPONENT" "grpc-client"
"status.code" "UNKNOWN"
"$MoreTags.NET_PEER_NAME" "localhost"
"$MoreTags.NET_PEER_IP" "127.0.0.1"
"$MoreTags.NET_PEER_PORT" Long
}
}
span(1) {
operationName "grpc.server"
operationName "example.Greeter/SayHello"
spanKind SERVER
childOf span(0)
errored true
@ -232,9 +272,12 @@ class GrpcTest extends AgentTestRunner {
}
}
tags {
"$MoreTags.RESOURCE_NAME" "example.Greeter/SayHello"
"$MoreTags.SPAN_TYPE" SpanTypes.RPC
"$Tags.COMPONENT" "grpc-server"
"$MoreTags.RPC_SERVICE" "Greeter"
"$MoreTags.NET_PEER_NAME" "localhost"
"$MoreTags.NET_PEER_IP" "127.0.0.1"
"$MoreTags.NET_PEER_PORT" Long
errorTags error.class, error.message
}
}