Fix grpc instrumentation of callbacks (#4097)

* Fix grpc instrumentation of callbacks

* Add ListenableFuture test

* Futures.transform
This commit is contained in:
Trask Stalnaker 2021-09-13 08:55:46 -07:00 committed by GitHub
parent 1db3f657d1
commit 098aee06c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 248 additions and 11 deletions

View File

@ -46,7 +46,8 @@ final class TracingClientInterceptor implements ClientInterceptor {
public <REQUEST, RESPONSE> ClientCall<REQUEST, RESPONSE> interceptCall(
MethodDescriptor<REQUEST, RESPONSE> method, CallOptions callOptions, Channel next) {
GrpcRequest request = new GrpcRequest(method, null, null);
Context context = instrumenter.start(Context.current(), request);
Context parentContext = Context.current();
Context context = instrumenter.start(parentContext, request);
final ClientCall<REQUEST, RESPONSE> result;
try (Scope ignored = context.makeCurrent()) {
try {
@ -61,18 +62,23 @@ final class TracingClientInterceptor implements ClientInterceptor {
SocketAddress address = result.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
request.setRemoteAddress(address);
return new TracingClientCall<>(result, context, request);
return new TracingClientCall<>(result, parentContext, context, request);
}
final class TracingClientCall<REQUEST, RESPONSE>
extends ForwardingClientCall.SimpleForwardingClientCall<REQUEST, RESPONSE> {
private final Context parentContext;
private final Context context;
private final GrpcRequest request;
TracingClientCall(
ClientCall<REQUEST, RESPONSE> delegate, Context context, GrpcRequest request) {
ClientCall<REQUEST, RESPONSE> delegate,
Context parentContext,
Context context,
GrpcRequest request) {
super(delegate);
this.parentContext = parentContext;
this.context = context;
this.request = request;
}
@ -81,7 +87,9 @@ final class TracingClientInterceptor implements ClientInterceptor {
public void start(Listener<RESPONSE> responseListener, Metadata headers) {
propagators.getTextMapPropagator().inject(context, headers, SETTER);
try (Scope ignored = context.makeCurrent()) {
super.start(new TracingClientCallListener<>(responseListener, context, request), headers);
super.start(
new TracingClientCallListener<>(responseListener, parentContext, context, request),
headers);
} catch (Throwable e) {
instrumenter.end(context, request, null, e);
throw e;
@ -102,6 +110,7 @@ final class TracingClientInterceptor implements ClientInterceptor {
final class TracingClientCallListener<RESPONSE>
extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RESPONSE> {
private final Context parentContext;
private final Context context;
private final GrpcRequest request;
@ -109,8 +118,10 @@ final class TracingClientInterceptor implements ClientInterceptor {
@SuppressWarnings("UnusedVariable")
volatile long messageId;
TracingClientCallListener(Listener<RESPONSE> delegate, Context context, GrpcRequest request) {
TracingClientCallListener(
Listener<RESPONSE> delegate, Context parentContext, Context context, GrpcRequest request) {
super(delegate);
this.parentContext = parentContext;
this.context = context;
this.request = request;
}
@ -135,13 +146,10 @@ final class TracingClientInterceptor implements ClientInterceptor {
@Override
public void onClose(Status status, Metadata trailers) {
try (Scope ignored = context.makeCurrent()) {
delegate().onClose(status, trailers);
} catch (Throwable e) {
instrumenter.end(context, request, status, e);
throw e;
}
instrumenter.end(context, request, status, status.getCause());
try (Scope ignored = parentContext.makeCurrent()) {
delegate().onClose(status, trailers);
}
}
@Override

View File

@ -5,6 +5,8 @@
package io.opentelemetry.instrumentation.grpc.v1_6
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.MoreExecutors
import example.GreeterGrpc
import example.Helloworld
import io.grpc.BindableService
@ -143,6 +145,233 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
paramName << ["some name", "some other name"]
}
def "test ListenableFuture callback"() {
setup:
BindableService greeter = new GreeterGrpc.GreeterImplBase() {
@Override
void sayHello(
final Helloworld.Request req, final StreamObserver<Helloworld.Response> responseObserver) {
final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build()
responseObserver.onNext(reply)
responseObserver.onCompleted()
}
}
def port = PortUtils.findOpenPort()
Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start()
ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port))
// Depending on the version of gRPC usePlainText may or may not take an argument.
try {
channelBuilder.usePlaintext()
} catch (MissingMethodException e) {
channelBuilder.usePlaintext(true)
}
ManagedChannel channel = channelBuilder.build()
GreeterGrpc.GreeterFutureStub client = GreeterGrpc.newFutureStub(channel)
when:
AtomicReference<Helloworld.Response> response = new AtomicReference<>()
AtomicReference<Throwable> error = new AtomicReference<>()
runWithSpan("parent") {
def future = Futures.transform(
client.sayHello(Helloworld.Request.newBuilder().setName("test").build()),
{
runWithSpan("child") {}
return it
},
MoreExecutors.directExecutor())
try {
response.set(future.get())
} catch (Exception e) {
error.set(e)
}
}
then:
error.get() == null
response.get().message == "Hello test"
assertTraces(1) {
trace(0, 4) {
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
}
span(1) {
name "example.Greeter/SayHello"
kind CLIENT
childOf span(0)
event(0) {
eventName "message"
attributes {
"message.type" "SENT"
"message.id" 1
}
}
attributes {
"${SemanticAttributes.RPC_SYSTEM.key}" "grpc"
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
"${SemanticAttributes.NET_TRANSPORT}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE}" Status.Code.OK.value()
}
}
span(2) {
name "example.Greeter/SayHello"
kind SERVER
childOf span(1)
event(0) {
eventName "message"
attributes {
"message.type" "RECEIVED"
"message.id" 1
}
}
attributes {
"${SemanticAttributes.RPC_SYSTEM.key}" "grpc"
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
// "localhost" on linux, "127.0.0.1" on windows
"${SemanticAttributes.NET_PEER_NAME.key}" { it == "localhost" || it == "127.0.0.1" }
"${SemanticAttributes.NET_PEER_PORT.key}" Long
"${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE.key}" Status.Code.OK.value()
}
}
span(3) {
name "child"
kind SpanKind.INTERNAL
childOf span(0)
}
}
}
cleanup:
channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS)
server?.shutdownNow()?.awaitTermination()
}
def "test onCompleted callback"() {
setup:
BindableService greeter = new GreeterGrpc.GreeterImplBase() {
@Override
void sayHello(
final Helloworld.Request req, final StreamObserver<Helloworld.Response> responseObserver) {
final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build()
responseObserver.onNext(reply)
responseObserver.onCompleted()
}
}
def port = PortUtils.findOpenPort()
Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start()
ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port))
// Depending on the version of gRPC usePlainText may or may not take an argument.
try {
channelBuilder.usePlaintext()
} catch (MissingMethodException e) {
channelBuilder.usePlaintext(true)
}
ManagedChannel channel = channelBuilder.build()
GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel)
when:
AtomicReference<Helloworld.Response> response = new AtomicReference<>()
AtomicReference<Throwable> error = new AtomicReference<>()
CountDownLatch latch = new CountDownLatch(1)
runWithSpan("parent") {
client.sayHello(Helloworld.Request.newBuilder().setName("test").build(),
new StreamObserver<Helloworld.Response>() {
@Override
void onNext(Helloworld.Response r) {
response.set(r)
}
@Override
void onError(Throwable throwable) {
error.set(throwable)
}
@Override
void onCompleted() {
runWithSpan("child") {}
latch.countDown()
}
}
)
}
latch.await(10, TimeUnit.SECONDS)
then:
error.get() == null
response.get().message == "Hello test"
assertTraces(1) {
trace(0, 4) {
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
}
span(1) {
name "example.Greeter/SayHello"
kind CLIENT
childOf span(0)
event(0) {
eventName "message"
attributes {
"message.type" "SENT"
"message.id" 1
}
}
attributes {
"${SemanticAttributes.RPC_SYSTEM.key}" "grpc"
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
"${SemanticAttributes.NET_TRANSPORT}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE}" Status.Code.OK.value()
}
}
span(2) {
name "example.Greeter/SayHello"
kind SERVER
childOf span(1)
event(0) {
eventName "message"
attributes {
"message.type" "RECEIVED"
"message.id" 1
}
}
attributes {
"${SemanticAttributes.RPC_SYSTEM.key}" "grpc"
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
// "localhost" on linux, "127.0.0.1" on windows
"${SemanticAttributes.NET_PEER_NAME.key}" { it == "localhost" || it == "127.0.0.1" }
"${SemanticAttributes.NET_PEER_PORT.key}" Long
"${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE.key}" Status.Code.OK.value()
}
}
span(3) {
name "child"
kind SpanKind.INTERNAL
childOf span(0)
}
}
}
cleanup:
channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS)
server?.shutdownNow()?.awaitTermination()
}
def "test error - #paramName"() {
setup:
def error = grpcStatus.asException()