From 098aee06c67a581e931c94714868797338081cf9 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Mon, 13 Sep 2021 08:55:46 -0700 Subject: [PATCH] Fix grpc instrumentation of callbacks (#4097) * Fix grpc instrumentation of callbacks * Add ListenableFuture test * Futures.transform --- .../grpc/v1_6/TracingClientInterceptor.java | 30 ++- .../grpc/v1_6/AbstractGrpcTest.groovy | 229 ++++++++++++++++++ 2 files changed, 248 insertions(+), 11 deletions(-) diff --git a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java index 1edd68e152..b7607cf298 100644 --- a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java +++ b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java @@ -46,7 +46,8 @@ final class TracingClientInterceptor implements ClientInterceptor { public ClientCall interceptCall( MethodDescriptor method, CallOptions callOptions, Channel next) { GrpcRequest request = new GrpcRequest(method, null, null); - Context context = instrumenter.start(Context.current(), request); + Context parentContext = Context.current(); + Context context = instrumenter.start(parentContext, request); final ClientCall result; try (Scope ignored = context.makeCurrent()) { try { @@ -61,18 +62,23 @@ final class TracingClientInterceptor implements ClientInterceptor { SocketAddress address = result.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); request.setRemoteAddress(address); - return new TracingClientCall<>(result, context, request); + return new TracingClientCall<>(result, parentContext, context, request); } final class TracingClientCall extends ForwardingClientCall.SimpleForwardingClientCall { + private final Context parentContext; private final Context context; private final GrpcRequest request; TracingClientCall( - ClientCall delegate, Context context, GrpcRequest request) { + ClientCall delegate, + Context parentContext, + Context context, + GrpcRequest request) { super(delegate); + this.parentContext = parentContext; this.context = context; this.request = request; } @@ -81,7 +87,9 @@ final class TracingClientInterceptor implements ClientInterceptor { public void start(Listener responseListener, Metadata headers) { propagators.getTextMapPropagator().inject(context, headers, SETTER); try (Scope ignored = context.makeCurrent()) { - super.start(new TracingClientCallListener<>(responseListener, context, request), headers); + super.start( + new TracingClientCallListener<>(responseListener, parentContext, context, request), + headers); } catch (Throwable e) { instrumenter.end(context, request, null, e); throw e; @@ -102,6 +110,7 @@ final class TracingClientInterceptor implements ClientInterceptor { final class TracingClientCallListener extends ForwardingClientCallListener.SimpleForwardingClientCallListener { + private final Context parentContext; private final Context context; private final GrpcRequest request; @@ -109,8 +118,10 @@ final class TracingClientInterceptor implements ClientInterceptor { @SuppressWarnings("UnusedVariable") volatile long messageId; - TracingClientCallListener(Listener delegate, Context context, GrpcRequest request) { + TracingClientCallListener( + Listener delegate, Context parentContext, Context context, GrpcRequest request) { super(delegate); + this.parentContext = parentContext; this.context = context; this.request = request; } @@ -135,13 +146,10 @@ final class TracingClientInterceptor implements ClientInterceptor { @Override public void onClose(Status status, Metadata trailers) { - try (Scope ignored = context.makeCurrent()) { - delegate().onClose(status, trailers); - } catch (Throwable e) { - instrumenter.end(context, request, status, e); - throw e; - } instrumenter.end(context, request, status, status.getCause()); + try (Scope ignored = parentContext.makeCurrent()) { + delegate().onClose(status, trailers); + } } @Override diff --git a/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy b/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy index bbb5f8d0b7..3618e57325 100644 --- a/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy +++ b/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.grpc.v1_6 +import com.google.common.util.concurrent.Futures +import com.google.common.util.concurrent.MoreExecutors import example.GreeterGrpc import example.Helloworld import io.grpc.BindableService @@ -143,6 +145,233 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification { paramName << ["some name", "some other name"] } + def "test ListenableFuture callback"() { + setup: + BindableService greeter = new GreeterGrpc.GreeterImplBase() { + @Override + void sayHello( + final Helloworld.Request req, final StreamObserver responseObserver) { + final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build() + responseObserver.onNext(reply) + responseObserver.onCompleted() + } + } + def port = PortUtils.findOpenPort() + Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start() + ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port)) + + // Depending on the version of gRPC usePlainText may or may not take an argument. + try { + channelBuilder.usePlaintext() + } catch (MissingMethodException e) { + channelBuilder.usePlaintext(true) + } + ManagedChannel channel = channelBuilder.build() + GreeterGrpc.GreeterFutureStub client = GreeterGrpc.newFutureStub(channel) + + when: + AtomicReference response = new AtomicReference<>() + AtomicReference error = new AtomicReference<>() + runWithSpan("parent") { + def future = Futures.transform( + client.sayHello(Helloworld.Request.newBuilder().setName("test").build()), + { + runWithSpan("child") {} + return it + }, + MoreExecutors.directExecutor()) + try { + response.set(future.get()) + } catch (Exception e) { + error.set(e) + } + } + + then: + error.get() == null + response.get().message == "Hello test" + + assertTraces(1) { + trace(0, 4) { + span(0) { + name "parent" + kind SpanKind.INTERNAL + hasNoParent() + } + span(1) { + name "example.Greeter/SayHello" + kind CLIENT + childOf span(0) + event(0) { + eventName "message" + attributes { + "message.type" "SENT" + "message.id" 1 + } + } + attributes { + "${SemanticAttributes.RPC_SYSTEM.key}" "grpc" + "${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter" + "${SemanticAttributes.RPC_METHOD.key}" "SayHello" + "${SemanticAttributes.NET_TRANSPORT}" SemanticAttributes.NetTransportValues.IP_TCP + "${SemanticAttributes.RPC_GRPC_STATUS_CODE}" Status.Code.OK.value() + } + } + span(2) { + name "example.Greeter/SayHello" + kind SERVER + childOf span(1) + event(0) { + eventName "message" + attributes { + "message.type" "RECEIVED" + "message.id" 1 + } + } + attributes { + "${SemanticAttributes.RPC_SYSTEM.key}" "grpc" + "${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter" + "${SemanticAttributes.RPC_METHOD.key}" "SayHello" + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" + // "localhost" on linux, "127.0.0.1" on windows + "${SemanticAttributes.NET_PEER_NAME.key}" { it == "localhost" || it == "127.0.0.1" } + "${SemanticAttributes.NET_PEER_PORT.key}" Long + "${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP + "${SemanticAttributes.RPC_GRPC_STATUS_CODE.key}" Status.Code.OK.value() + } + } + span(3) { + name "child" + kind SpanKind.INTERNAL + childOf span(0) + } + } + } + + cleanup: + channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) + server?.shutdownNow()?.awaitTermination() + } + + + def "test onCompleted callback"() { + setup: + BindableService greeter = new GreeterGrpc.GreeterImplBase() { + @Override + void sayHello( + final Helloworld.Request req, final StreamObserver responseObserver) { + final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build() + responseObserver.onNext(reply) + responseObserver.onCompleted() + } + } + def port = PortUtils.findOpenPort() + Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start() + ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port)) + + // Depending on the version of gRPC usePlainText may or may not take an argument. + try { + channelBuilder.usePlaintext() + } catch (MissingMethodException e) { + channelBuilder.usePlaintext(true) + } + ManagedChannel channel = channelBuilder.build() + GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel) + + when: + AtomicReference response = new AtomicReference<>() + AtomicReference error = new AtomicReference<>() + CountDownLatch latch = new CountDownLatch(1) + runWithSpan("parent") { + client.sayHello(Helloworld.Request.newBuilder().setName("test").build(), + new StreamObserver() { + @Override + void onNext(Helloworld.Response r) { + response.set(r) + } + + @Override + void onError(Throwable throwable) { + error.set(throwable) + } + + @Override + void onCompleted() { + runWithSpan("child") {} + latch.countDown() + } + } + ) + } + + latch.await(10, TimeUnit.SECONDS) + + then: + error.get() == null + response.get().message == "Hello test" + + assertTraces(1) { + trace(0, 4) { + span(0) { + name "parent" + kind SpanKind.INTERNAL + hasNoParent() + } + span(1) { + name "example.Greeter/SayHello" + kind CLIENT + childOf span(0) + event(0) { + eventName "message" + attributes { + "message.type" "SENT" + "message.id" 1 + } + } + attributes { + "${SemanticAttributes.RPC_SYSTEM.key}" "grpc" + "${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter" + "${SemanticAttributes.RPC_METHOD.key}" "SayHello" + "${SemanticAttributes.NET_TRANSPORT}" SemanticAttributes.NetTransportValues.IP_TCP + "${SemanticAttributes.RPC_GRPC_STATUS_CODE}" Status.Code.OK.value() + } + } + span(2) { + name "example.Greeter/SayHello" + kind SERVER + childOf span(1) + event(0) { + eventName "message" + attributes { + "message.type" "RECEIVED" + "message.id" 1 + } + } + attributes { + "${SemanticAttributes.RPC_SYSTEM.key}" "grpc" + "${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter" + "${SemanticAttributes.RPC_METHOD.key}" "SayHello" + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" + // "localhost" on linux, "127.0.0.1" on windows + "${SemanticAttributes.NET_PEER_NAME.key}" { it == "localhost" || it == "127.0.0.1" } + "${SemanticAttributes.NET_PEER_PORT.key}" Long + "${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP + "${SemanticAttributes.RPC_GRPC_STATUS_CODE.key}" Status.Code.OK.value() + } + } + span(3) { + name "child" + kind SpanKind.INTERNAL + childOf span(0) + } + } + } + + cleanup: + channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) + server?.shutdownNow()?.awaitTermination() + } + def "test error - #paramName"() { setup: def error = grpcStatus.asException()