Merge pull request #1301 from DataDog/tyler/grpc

Fix grpc tests with java-concurrent.
This commit is contained in:
Tyler Benson 2020-03-06 10:48:03 -08:00 committed by GitHub
commit caabb55ed2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 124 additions and 78 deletions

View File

@ -47,6 +47,9 @@ dependencies {
testCompile group: 'io.grpc', name: 'grpc-netty', version: grpcVersion
testCompile group: 'io.grpc', name: 'grpc-protobuf', version: grpcVersion
testCompile group: 'io.grpc', name: 'grpc-stub', version: grpcVersion
testCompile group: 'javax.annotation', name: 'javax.annotation-api', version: '1.3.2'
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
latestDepTestCompile sourceSets.test.output // include the protobuf generated classes
latestDepTestCompile group: 'io.grpc', name: 'grpc-netty', version: '+'

View File

@ -64,7 +64,7 @@ public class TracingClientInterceptor implements ClientInterceptor {
propagate().inject(span, headers, SETTER);
try (final AgentScope scope = activateSpan(span, false)) {
scope.setAsyncPropagation(true);
// Don't async propagate otherwise the span gets tied up with a timeout handler.
super.start(new TracingClientCallListener<>(span, responseListener), headers);
} catch (final Throwable e) {
DECORATE.onError(span, e);

View File

@ -0,0 +1,57 @@
package datadog.trace.instrumentation.grpc.server;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.context.TraceScope;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
/**
* The InProcessTransport calls the client response in process, so we have to disable async
* propagation to allow spans to complete and be reported properly.
*/
@AutoService(Instrumenter.class)
public class InProcessServerStreamInstrumentation extends Instrumenter.Default {
public InProcessServerStreamInstrumentation() {
super("grpc", "grpc-server");
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream");
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(isPublic(), getClass().getName() + "$DisableAsyncPropagationAdvice");
}
public static class DisableAsyncPropagationAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static TraceScope enter() {
final TraceScope scope = activeScope();
if (scope != null && scope.isAsyncPropagating()) {
scope.setAsyncPropagation(false);
return scope;
}
return null;
}
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void exit(@Advice.Enter final TraceScope scopeToReenable) {
if (scopeToReenable != null) {
scopeToReenable.setAsyncPropagation(true);
}
}
}
}

View File

@ -74,9 +74,9 @@ public class TracingServerInterceptor implements ServerInterceptor {
public void close(final Status status, final Metadata trailers) {
DECORATE.onClose(span, status);
try (final AgentScope scope = activateSpan(span, false)) {
scope.setAsyncPropagation(true);
// Using async propagate here breaks the tests which use InProcessTransport.
// It also seems logical to not need it at all, so removing it.
delegate().close(status, trailers);
scope.setAsyncPropagation(false);
} catch (final Throwable e) {
DECORATE.onError(span, e);
throw e;

View File

@ -1,3 +1,4 @@
import datadog.opentracing.scopemanager.ContinuableScope
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.DDSpanTypes
import datadog.trace.bootstrap.instrumentation.api.Tags
@ -29,22 +30,35 @@ class GrpcStreamingTest extends AgentTestRunner {
return new StreamObserver<Helloworld.Response>() {
@Override
void onNext(Helloworld.Response value) {
serverReceived << value.message
(1..msgCount).each {
observer.onNext(value)
if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) {
observer.onNext(value)
} else {
observer.onError(new IllegalStateException("not async propagating!"))
}
}
}
@Override
void onError(Throwable t) {
error.set(t)
observer.onError(t)
if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) {
error.set(t)
observer.onError(t)
} else {
observer.onError(new IllegalStateException("not async propagating!"))
}
}
@Override
void onCompleted() {
observer.onCompleted()
if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) {
observer.onCompleted()
} else {
observer.onError(new IllegalStateException("not async propagating!"))
}
}
}
}
@ -58,17 +72,29 @@ class GrpcStreamingTest extends AgentTestRunner {
def observer = client.conversation(new StreamObserver<Helloworld.Response>() {
@Override
void onNext(Helloworld.Response value) {
clientReceived << value.message
if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) {
clientReceived << value.message
} else {
error.set(new IllegalStateException("not async propagating!"))
}
}
@Override
void onError(Throwable t) {
error.set(t)
if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) {
error.set(t)
} else {
error.set(new IllegalStateException("not async propagating!"))
}
}
@Override
void onCompleted() {
TEST_WRITER.waitForTraces(1)
if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) {
TEST_WRITER.waitForTraces(1)
} else {
error.set(new IllegalStateException("not async propagating!"))
}
}
})
@ -80,6 +106,10 @@ class GrpcStreamingTest extends AgentTestRunner {
then:
error.get() == null
TEST_WRITER.waitForTraces(2)
error.get() == null
serverReceived == clientRange.collect { "call $it" }
clientReceived == serverRange.collect { clientRange.collect { "call $it" } }.flatten().sort()
assertTraces(2) {
trace(0, clientMessageCount + 1) {
@ -148,9 +178,6 @@ class GrpcStreamingTest extends AgentTestRunner {
}
}
serverReceived == clientRange.collect { "call $it" }
clientReceived == serverRange.collect { clientRange.collect { "call $it" } }.flatten().sort()
cleanup:
channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS)
server?.shutdownNow()?.awaitTermination()

View File

@ -1,3 +1,4 @@
import datadog.common.exec.CommonTaskExecutor
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.DDSpanTypes
import datadog.trace.bootstrap.instrumentation.api.Tags
@ -16,6 +17,9 @@ import io.grpc.stub.StreamObserver
import java.util.concurrent.TimeUnit
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
class GrpcTest extends AgentTestRunner {
def "test request-response"() {
@ -25,8 +29,14 @@ class GrpcTest extends AgentTestRunner {
void sayHello(
final Helloworld.Request req, final StreamObserver<Helloworld.Response> responseObserver) {
final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build()
responseObserver.onNext(reply)
responseObserver.onCompleted()
CommonTaskExecutor.INSTANCE.execute {
if (testTracer.activeSpan() == null) {
responseObserver.onError(new IllegalStateException("no active span"))
} else {
responseObserver.onNext(reply)
responseObserver.onCompleted()
}
}
}
}
Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter).directExecutor().build().start()
@ -35,7 +45,11 @@ class GrpcTest extends AgentTestRunner {
GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel)
when:
def response = client.sayHello(Helloworld.Request.newBuilder().setName(name).build())
def response = runUnderTrace("parent") {
def resp = client.sayHello(Helloworld.Request.newBuilder().setName(name).build())
TEST_WRITER.waitForTraces(1) // Wait for the server span to be reported.
return resp
}
then:
response.message == "Hello $name"
@ -46,7 +60,7 @@ class GrpcTest extends AgentTestRunner {
operationName "grpc.server"
resourceName "example.Greeter/SayHello"
spanType DDSpanTypes.RPC
childOf trace(1).get(0)
childOf trace(1).get(1)
errored false
tags {
"$Tags.COMPONENT" "grpc-server"
@ -70,13 +84,14 @@ class GrpcTest extends AgentTestRunner {
}
}
}
trace(1, 2) {
span(0) {
trace(1, 3) {
basicSpan(it, 0, "parent")
span(1) {
serviceName "unnamed-java-app"
operationName "grpc.client"
resourceName "example.Greeter/SayHello"
spanType DDSpanTypes.RPC
parent()
childOf span(0)
errored false
tags {
"$Tags.COMPONENT" "grpc-client"
@ -85,12 +100,12 @@ class GrpcTest extends AgentTestRunner {
defaultTags()
}
}
span(1) {
span(2) {
serviceName "unnamed-java-app"
operationName "grpc.message"
resourceName "grpc.message"
spanType DDSpanTypes.RPC
childOf span(0)
childOf span(1)
errored false
tags {
"$Tags.COMPONENT" "grpc-client"

View File

@ -1,33 +0,0 @@
package util
import io.grpc.CallOptions
import io.grpc.Channel
import io.grpc.ClientCall
import io.grpc.ClientInterceptor
import io.grpc.ForwardingClientCall
import io.grpc.Metadata
import io.grpc.MethodDescriptor
import java.util.concurrent.Phaser
/**
* Interceptor that blocks client from returning until server trace is reported.
*/
class BlockingInterceptor implements ClientInterceptor {
private final Phaser phaser
BlockingInterceptor(Phaser phaser) {
this.phaser = phaser
phaser.register()
}
@Override
<ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) {
@Override
void start(final ClientCall.Listener responseListener, final Metadata headers) {
super.start(new BlockingListener(responseListener, phaser), headers)
}
}
}
}

View File

@ -1,23 +0,0 @@
package util
import io.grpc.ClientCall
import io.grpc.ForwardingClientCallListener
import io.grpc.Metadata
import io.grpc.Status
import java.util.concurrent.Phaser
class BlockingListener<RespT> extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT> {
private final Phaser phaser
BlockingListener(ClientCall.Listener<RespT> delegate, Phaser phaser) {
super(delegate)
this.phaser = phaser
}
@Override
void onClose(final Status status, final Metadata trailers) {
delegate().onClose(status, trailers)
phaser.arriveAndAwaitAdvance()
}
}