From 9e5fdcebd59a23b00de3274a6b45884584827e77 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Wed, 9 Mar 2022 14:37:59 +0900 Subject: [PATCH] Migrate gRPC tests to Java (#5521) * Migrate gRPC tests to Java * check * clean --- .../grpc/v1_6/GrpcStreamingTest.groovy | 23 - .../instrumentation/grpc/v1_6/GrpcTest.groovy | 23 - .../grpc/v1_6/GrpcStreamingTest.java | 34 + .../instrumentation/grpc/v1_6/GrpcTest.java | 34 + .../grpc/v1_6/GrpcStreamingTest.groovy | 22 - .../instrumentation/grpc/v1_6/GrpcTest.groovy | 22 - .../grpc/v1_6/GrpcStreamingTest.java | 33 + .../instrumentation/grpc/v1_6/GrpcTest.java | 33 + .../grpc-1.6/testing/build.gradle.kts | 3 +- .../v1_6/AbstractGrpcStreamingTest.groovy | 180 --- .../grpc/v1_6/AbstractGrpcTest.groovy | 1156 -------------- .../grpc/v1_6/AbstractGrpcStreamingTest.java | 241 +++ .../grpc/v1_6/AbstractGrpcTest.java | 1395 +++++++++++++++++ 13 files changed, 1771 insertions(+), 1428 deletions(-) delete mode 100644 instrumentation/grpc-1.6/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcStreamingTest.groovy delete mode 100644 instrumentation/grpc-1.6/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcTest.groovy create mode 100644 instrumentation/grpc-1.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcStreamingTest.java create mode 100644 instrumentation/grpc-1.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcTest.java delete mode 100644 instrumentation/grpc-1.6/library/src/test/groovy/io/opentelemetry/instrumentation/grpc/v1_6/GrpcStreamingTest.groovy delete mode 100644 instrumentation/grpc-1.6/library/src/test/groovy/io/opentelemetry/instrumentation/grpc/v1_6/GrpcTest.groovy create mode 100644 instrumentation/grpc-1.6/library/src/test/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcStreamingTest.java create mode 100644 instrumentation/grpc-1.6/library/src/test/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcTest.java delete mode 100644 instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcStreamingTest.groovy delete mode 100644 instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy create mode 100644 instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcStreamingTest.java create mode 100644 instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java diff --git a/instrumentation/grpc-1.6/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcStreamingTest.groovy b/instrumentation/grpc-1.6/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcStreamingTest.groovy deleted file mode 100644 index 82133c6ab7..0000000000 --- a/instrumentation/grpc-1.6/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcStreamingTest.groovy +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.grpc.v1_6 - -import io.grpc.ManagedChannelBuilder -import io.grpc.ServerBuilder -import io.opentelemetry.instrumentation.grpc.v1_6.AbstractGrpcStreamingTest -import io.opentelemetry.instrumentation.test.AgentTestTrait - -class GrpcStreamingTest extends AbstractGrpcStreamingTest implements AgentTestTrait { - @Override - ServerBuilder configureServer(ServerBuilder server) { - return server - } - - @Override - ManagedChannelBuilder configureClient(ManagedChannelBuilder client) { - return client - } -} diff --git a/instrumentation/grpc-1.6/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcTest.groovy b/instrumentation/grpc-1.6/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcTest.groovy deleted file mode 100644 index c3ce414011..0000000000 --- a/instrumentation/grpc-1.6/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcTest.groovy +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.grpc.v1_6 - -import io.grpc.ManagedChannelBuilder -import io.grpc.ServerBuilder -import io.opentelemetry.instrumentation.grpc.v1_6.AbstractGrpcTest -import io.opentelemetry.instrumentation.test.AgentTestTrait - -class GrpcTest extends AbstractGrpcTest implements AgentTestTrait { - @Override - ServerBuilder configureServer(ServerBuilder server) { - return server - } - - @Override - ManagedChannelBuilder configureClient(ManagedChannelBuilder client) { - return client - } -} diff --git a/instrumentation/grpc-1.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcStreamingTest.java b/instrumentation/grpc-1.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcStreamingTest.java new file mode 100644 index 0000000000..cf8b4954ca --- /dev/null +++ b/instrumentation/grpc-1.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcStreamingTest.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.grpc.v1_6; + +import io.grpc.ManagedChannelBuilder; +import io.grpc.ServerBuilder; +import io.opentelemetry.instrumentation.grpc.v1_6.AbstractGrpcStreamingTest; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class GrpcStreamingTest extends AbstractGrpcStreamingTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected ServerBuilder configureServer(ServerBuilder server) { + return server; + } + + @Override + protected ManagedChannelBuilder configureClient(ManagedChannelBuilder client) { + return client; + } + + @Override + protected InstrumentationExtension testing() { + return testing; + } +} diff --git a/instrumentation/grpc-1.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcTest.java b/instrumentation/grpc-1.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcTest.java new file mode 100644 index 0000000000..ab81c6dbec --- /dev/null +++ b/instrumentation/grpc-1.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcTest.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.grpc.v1_6; + +import io.grpc.ManagedChannelBuilder; +import io.grpc.ServerBuilder; +import io.opentelemetry.instrumentation.grpc.v1_6.AbstractGrpcTest; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class GrpcTest extends AbstractGrpcTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected ServerBuilder configureServer(ServerBuilder server) { + return server; + } + + @Override + protected ManagedChannelBuilder configureClient(ManagedChannelBuilder client) { + return client; + } + + @Override + protected InstrumentationExtension testing() { + return testing; + } +} diff --git a/instrumentation/grpc-1.6/library/src/test/groovy/io/opentelemetry/instrumentation/grpc/v1_6/GrpcStreamingTest.groovy b/instrumentation/grpc-1.6/library/src/test/groovy/io/opentelemetry/instrumentation/grpc/v1_6/GrpcStreamingTest.groovy deleted file mode 100644 index f2c23762f4..0000000000 --- a/instrumentation/grpc-1.6/library/src/test/groovy/io/opentelemetry/instrumentation/grpc/v1_6/GrpcStreamingTest.groovy +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.grpc.v1_6 - -import io.grpc.ManagedChannelBuilder -import io.grpc.ServerBuilder -import io.opentelemetry.instrumentation.test.LibraryTestTrait - -class GrpcStreamingTest extends AbstractGrpcStreamingTest implements LibraryTestTrait { - @Override - ServerBuilder configureServer(ServerBuilder server) { - return server.intercept(GrpcTracing.create(getOpenTelemetry()).newServerInterceptor()) - } - - @Override - ManagedChannelBuilder configureClient(ManagedChannelBuilder client) { - return client.intercept(GrpcTracing.create(getOpenTelemetry()).newClientInterceptor()) - } -} diff --git a/instrumentation/grpc-1.6/library/src/test/groovy/io/opentelemetry/instrumentation/grpc/v1_6/GrpcTest.groovy b/instrumentation/grpc-1.6/library/src/test/groovy/io/opentelemetry/instrumentation/grpc/v1_6/GrpcTest.groovy deleted file mode 100644 index 5663043b41..0000000000 --- a/instrumentation/grpc-1.6/library/src/test/groovy/io/opentelemetry/instrumentation/grpc/v1_6/GrpcTest.groovy +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.grpc.v1_6 - -import io.grpc.ManagedChannelBuilder -import io.grpc.ServerBuilder -import io.opentelemetry.instrumentation.test.LibraryTestTrait - -class GrpcTest extends AbstractGrpcTest implements LibraryTestTrait { - @Override - ServerBuilder configureServer(ServerBuilder server) { - return server.intercept(GrpcTracing.create(getOpenTelemetry()).newServerInterceptor()) - } - - @Override - ManagedChannelBuilder configureClient(ManagedChannelBuilder client) { - return client.intercept(GrpcTracing.create(getOpenTelemetry()).newClientInterceptor()) - } -} diff --git a/instrumentation/grpc-1.6/library/src/test/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcStreamingTest.java b/instrumentation/grpc-1.6/library/src/test/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcStreamingTest.java new file mode 100644 index 0000000000..6642a31e26 --- /dev/null +++ b/instrumentation/grpc-1.6/library/src/test/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcStreamingTest.java @@ -0,0 +1,33 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.grpc.v1_6; + +import io.grpc.ManagedChannelBuilder; +import io.grpc.ServerBuilder; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class GrpcStreamingTest extends AbstractGrpcStreamingTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @Override + protected ServerBuilder configureServer(ServerBuilder server) { + return server.intercept(GrpcTracing.create(testing.getOpenTelemetry()).newServerInterceptor()); + } + + @Override + protected ManagedChannelBuilder configureClient(ManagedChannelBuilder client) { + return client.intercept(GrpcTracing.create(testing.getOpenTelemetry()).newClientInterceptor()); + } + + @Override + protected InstrumentationExtension testing() { + return testing; + } +} diff --git a/instrumentation/grpc-1.6/library/src/test/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcTest.java b/instrumentation/grpc-1.6/library/src/test/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcTest.java new file mode 100644 index 0000000000..a8b7d70ee4 --- /dev/null +++ b/instrumentation/grpc-1.6/library/src/test/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcTest.java @@ -0,0 +1,33 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.grpc.v1_6; + +import io.grpc.ManagedChannelBuilder; +import io.grpc.ServerBuilder; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class GrpcTest extends AbstractGrpcTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @Override + protected ServerBuilder configureServer(ServerBuilder server) { + return server.intercept(GrpcTracing.create(testing.getOpenTelemetry()).newServerInterceptor()); + } + + @Override + protected ManagedChannelBuilder configureClient(ManagedChannelBuilder client) { + return client.intercept(GrpcTracing.create(testing.getOpenTelemetry()).newClientInterceptor()); + } + + @Override + protected InstrumentationExtension testing() { + return testing; + } +} diff --git a/instrumentation/grpc-1.6/testing/build.gradle.kts b/instrumentation/grpc-1.6/testing/build.gradle.kts index abea116b59..53e988c167 100644 --- a/instrumentation/grpc-1.6/testing/build.gradle.kts +++ b/instrumentation/grpc-1.6/testing/build.gradle.kts @@ -17,9 +17,8 @@ dependencies { implementation("com.google.guava:guava") - implementation("org.codehaus.groovy:groovy-all") + api("org.junit-pioneer:junit-pioneer") implementation("io.opentelemetry:opentelemetry-api") - implementation("org.spockframework:spock-core") } tasks { diff --git a/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcStreamingTest.groovy b/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcStreamingTest.groovy deleted file mode 100644 index ea3d891bb1..0000000000 --- a/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcStreamingTest.groovy +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.grpc.v1_6 - -import example.GreeterGrpc -import example.Helloworld -import io.grpc.BindableService -import io.grpc.ManagedChannel -import io.grpc.ManagedChannelBuilder -import io.grpc.Server -import io.grpc.ServerBuilder -import io.grpc.Status -import io.grpc.stub.StreamObserver -import io.opentelemetry.instrumentation.test.InstrumentationSpecification -import io.opentelemetry.instrumentation.test.utils.PortUtils -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import spock.lang.Unroll - -import java.util.concurrent.CopyOnWriteArrayList -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicReference - -import static io.opentelemetry.api.trace.SpanKind.CLIENT -import static io.opentelemetry.api.trace.SpanKind.SERVER - -@Unroll -abstract class AbstractGrpcStreamingTest extends InstrumentationSpecification { - - abstract ServerBuilder configureServer(ServerBuilder server) - - abstract ManagedChannelBuilder configureClient(ManagedChannelBuilder client) - - def "test conversation #paramName"() { - setup: - def msgCount = serverMessageCount - def serverReceived = new CopyOnWriteArrayList<>() - def clientReceived = new CopyOnWriteArrayList<>() - def error = new AtomicReference() - - BindableService greeter = new GreeterGrpc.GreeterImplBase() { - @Override - StreamObserver conversation(StreamObserver observer) { - return new StreamObserver() { - @Override - void onNext(Helloworld.Response value) { - - serverReceived << value.message - - (1..msgCount).each { - observer.onNext(value) - } - } - - @Override - void onError(Throwable t) { - error.set(t) - observer.onError(t) - } - - @Override - void onCompleted() { - observer.onCompleted() - } - } - } - } - def port = PortUtils.findOpenPort() - Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start() - ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port)) - - // Depending on the version of gRPC usePlainText may or may not take an argument. - try { - channelBuilder.usePlaintext() - } catch (MissingMethodException e) { - channelBuilder.usePlaintext(true) - } - ManagedChannel channel = channelBuilder.build() - GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel).withWaitForReady() - - when: - def observer2 = client.conversation(new StreamObserver() { - @Override - void onNext(Helloworld.Response value) { - clientReceived << value.message - } - - @Override - void onError(Throwable t) { - error.set(t) - } - - @Override - void onCompleted() { - } - }) - - clientRange.each { - def message = Helloworld.Response.newBuilder().setMessage("call $it").build() - observer2.onNext(message) - } - observer2.onCompleted() - - then: - assertTraces(1) { - trace(0, 2) { - span(0) { - name "example.Greeter/Conversation" - kind CLIENT - hasNoParent() - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "example.Greeter" - "$SemanticAttributes.RPC_METHOD" "Conversation" - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - "$SemanticAttributes.RPC_GRPC_STATUS_CODE" Status.OK.code.value() - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" port - } - (1..(clientMessageCount * serverMessageCount + clientMessageCount)).each { - def messageId = it - event(it - 1) { - eventName "message" - attributes { - "message.type" { it == "SENT" || it == "RECEIVED" } - "message.id" messageId - } - } - } - } - span(1) { - name "example.Greeter/Conversation" - kind SERVER - childOf span(0) - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "example.Greeter" - "$SemanticAttributes.RPC_METHOD" "Conversation" - "$SemanticAttributes.NET_PEER_IP" "127.0.0.1" - // net.peer.name resolves to "127.0.0.1" on windows which is same as net.peer.ip so then not captured - "$SemanticAttributes.NET_PEER_NAME" { it == "localhost" || it == null } - "$SemanticAttributes.NET_PEER_PORT" Long - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - "$SemanticAttributes.RPC_GRPC_STATUS_CODE" Status.OK.code.value() - } - (1..(clientMessageCount * serverMessageCount + clientMessageCount)).each { - def messageId = it - event(it - 1) { - eventName "message" - attributes { - "message.type" { it == "RECEIVED" || it == "SENT" } - "message.id" messageId - } - } - } - } - } - } - error.get() == null - serverReceived == clientRange.collect { "call $it" } - clientReceived == serverRange.collect { clientRange.collect { "call $it" } }.flatten().sort() - - cleanup: - channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) - server?.shutdownNow()?.awaitTermination() - - where: - paramName | clientMessageCount | serverMessageCount - "A" | 1 | 1 - "B" | 2 | 1 - "C" | 1 | 2 - "D" | 2 | 2 - "E" | 3 | 3 - - clientRange = 1..clientMessageCount - serverRange = 1..serverMessageCount - } -} diff --git a/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy b/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy deleted file mode 100644 index 772a7840ff..0000000000 --- a/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy +++ /dev/null @@ -1,1156 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.grpc.v1_6 - -import com.google.common.util.concurrent.Futures -import com.google.common.util.concurrent.MoreExecutors -import example.GreeterGrpc -import example.Helloworld -import io.grpc.BindableService -import io.grpc.CallOptions -import io.grpc.Channel -import io.grpc.ClientCall -import io.grpc.ClientInterceptor -import io.grpc.Context -import io.grpc.Contexts -import io.grpc.ManagedChannel -import io.grpc.ManagedChannelBuilder -import io.grpc.Metadata -import io.grpc.MethodDescriptor -import io.grpc.Server -import io.grpc.ServerBuilder -import io.grpc.ServerCall -import io.grpc.ServerCallHandler -import io.grpc.ServerInterceptor -import io.grpc.Status -import io.grpc.StatusRuntimeException -import io.grpc.protobuf.services.ProtoReflectionService -import io.grpc.reflection.v1alpha.ServerReflectionGrpc -import io.grpc.reflection.v1alpha.ServerReflectionRequest -import io.grpc.reflection.v1alpha.ServerReflectionResponse -import io.grpc.stub.StreamObserver -import io.opentelemetry.api.trace.SpanKind -import io.opentelemetry.instrumentation.test.InstrumentationSpecification -import io.opentelemetry.instrumentation.test.utils.PortUtils -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import spock.lang.Unroll - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicReference - -import static io.opentelemetry.api.trace.SpanKind.CLIENT -import static io.opentelemetry.api.trace.SpanKind.SERVER -import static io.opentelemetry.api.trace.StatusCode.ERROR - -@Unroll -abstract class AbstractGrpcTest extends InstrumentationSpecification { - - abstract ServerBuilder configureServer(ServerBuilder server) - - abstract ManagedChannelBuilder configureClient(ManagedChannelBuilder client) - - def "test request-response #paramName"() { - setup: - BindableService greeter = new GreeterGrpc.GreeterImplBase() { - @Override - void sayHello( - final Helloworld.Request req, final StreamObserver responseObserver) { - final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build() - responseObserver.onNext(reply) - responseObserver.onCompleted() - } - } - def port = PortUtils.findOpenPort() - Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start() - ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port)) - - // Depending on the version of gRPC usePlainText may or may not take an argument. - try { - channelBuilder.usePlaintext() - } catch (MissingMethodException e) { - channelBuilder.usePlaintext(true) - } - ManagedChannel channel = channelBuilder.build() - GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel) - - when: - def response = runWithSpan("parent") { - client.sayHello(Helloworld.Request.newBuilder().setName(paramName).build()) - } - - then: - response.message == "Hello $paramName" - - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - kind SpanKind.INTERNAL - hasNoParent() - } - span(1) { - name "example.Greeter/SayHello" - kind CLIENT - childOf span(0) - event(0) { - eventName "message" - attributes { - "message.type" "SENT" - "message.id" 1 - } - } - event(1) { - eventName "message" - attributes { - "message.type" "RECEIVED" - "message.id" 2 - } - } - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "example.Greeter" - "$SemanticAttributes.RPC_METHOD" "SayHello" - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - "$SemanticAttributes.RPC_GRPC_STATUS_CODE" Status.Code.OK.value() - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" port - } - } - span(2) { - name "example.Greeter/SayHello" - kind SERVER - childOf span(1) - event(0) { - eventName "message" - attributes { - "message.type" "RECEIVED" - "message.id" 1 - } - } - event(1) { - eventName "message" - attributes { - "message.type" "SENT" - "message.id" 2 - } - } - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "example.Greeter" - "$SemanticAttributes.RPC_METHOD" "SayHello" - "$SemanticAttributes.NET_PEER_IP" "127.0.0.1" - // net.peer.name resolves to "127.0.0.1" on windows which is same as net.peer.ip so then not captured - "$SemanticAttributes.NET_PEER_NAME" { it == "localhost" || it == null } - "$SemanticAttributes.NET_PEER_PORT" Long - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - "$SemanticAttributes.RPC_GRPC_STATUS_CODE" Status.Code.OK.value() - } - } - } - } - - cleanup: - channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) - server?.shutdownNow()?.awaitTermination() - - where: - paramName << ["some name", "some other name"] - } - - def "test ListenableFuture callback"() { - setup: - BindableService greeter = new GreeterGrpc.GreeterImplBase() { - @Override - void sayHello( - final Helloworld.Request req, final StreamObserver responseObserver) { - final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build() - responseObserver.onNext(reply) - responseObserver.onCompleted() - } - } - def port = PortUtils.findOpenPort() - Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start() - ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port)) - - // Depending on the version of gRPC usePlainText may or may not take an argument. - try { - channelBuilder.usePlaintext() - } catch (MissingMethodException e) { - channelBuilder.usePlaintext(true) - } - ManagedChannel channel = channelBuilder.build() - GreeterGrpc.GreeterFutureStub client = GreeterGrpc.newFutureStub(channel) - - when: - AtomicReference response = new AtomicReference<>() - AtomicReference error = new AtomicReference<>() - runWithSpan("parent") { - def future = Futures.transform( - client.sayHello(Helloworld.Request.newBuilder().setName("test").build()), - { - runWithSpan("child") {} - return it - }, - MoreExecutors.directExecutor()) - try { - response.set(future.get()) - } catch (Exception e) { - error.set(e) - } - } - - then: - error.get() == null - response.get().message == "Hello test" - - assertTraces(1) { - trace(0, 4) { - span(0) { - name "parent" - kind SpanKind.INTERNAL - hasNoParent() - } - span(1) { - name "example.Greeter/SayHello" - kind CLIENT - childOf span(0) - event(0) { - eventName "message" - attributes { - "message.type" "SENT" - "message.id" 1 - } - } - event(1) { - eventName "message" - attributes { - "message.type" "RECEIVED" - "message.id" 2 - } - } - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "example.Greeter" - "$SemanticAttributes.RPC_METHOD" "SayHello" - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - "$SemanticAttributes.RPC_GRPC_STATUS_CODE" Status.Code.OK.value() - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" port - } - } - span(2) { - name "example.Greeter/SayHello" - kind SERVER - childOf span(1) - event(0) { - eventName "message" - attributes { - "message.type" "RECEIVED" - "message.id" 1 - } - } - event(1) { - eventName "message" - attributes { - "message.type" "SENT" - "message.id" 2 - } - } - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "example.Greeter" - "$SemanticAttributes.RPC_METHOD" "SayHello" - "$SemanticAttributes.NET_PEER_IP" "127.0.0.1" - // net.peer.name resolves to "127.0.0.1" on windows which is same as net.peer.ip so then not captured - "$SemanticAttributes.NET_PEER_NAME" { it == "localhost" || it == null } - "$SemanticAttributes.NET_PEER_PORT" Long - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - "$SemanticAttributes.RPC_GRPC_STATUS_CODE" Status.Code.OK.value() - } - } - span(3) { - name "child" - kind SpanKind.INTERNAL - childOf span(0) - } - } - } - - cleanup: - channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) - server?.shutdownNow()?.awaitTermination() - } - - - def "test onCompleted callback"() { - setup: - BindableService greeter = new GreeterGrpc.GreeterImplBase() { - @Override - void sayHello( - final Helloworld.Request req, final StreamObserver responseObserver) { - final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build() - responseObserver.onNext(reply) - responseObserver.onCompleted() - } - } - def port = PortUtils.findOpenPort() - Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start() - ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port)) - - // Depending on the version of gRPC usePlainText may or may not take an argument. - try { - channelBuilder.usePlaintext() - } catch (MissingMethodException e) { - channelBuilder.usePlaintext(true) - } - ManagedChannel channel = channelBuilder.build() - GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel) - - when: - AtomicReference response = new AtomicReference<>() - AtomicReference error = new AtomicReference<>() - CountDownLatch latch = new CountDownLatch(1) - runWithSpan("parent") { - client.sayHello(Helloworld.Request.newBuilder().setName("test").build(), - new StreamObserver() { - @Override - void onNext(Helloworld.Response r) { - response.set(r) - } - - @Override - void onError(Throwable throwable) { - error.set(throwable) - } - - @Override - void onCompleted() { - runWithSpan("child") {} - latch.countDown() - } - } - ) - } - - latch.await(10, TimeUnit.SECONDS) - - then: - error.get() == null - response.get().message == "Hello test" - - assertTraces(1) { - trace(0, 4) { - span(0) { - name "parent" - kind SpanKind.INTERNAL - hasNoParent() - } - span(1) { - name "example.Greeter/SayHello" - kind CLIENT - childOf span(0) - event(0) { - eventName "message" - attributes { - "message.type" "SENT" - "message.id" 1 - } - } - event(1) { - eventName "message" - attributes { - "message.type" "RECEIVED" - "message.id" 2 - } - } - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "example.Greeter" - "$SemanticAttributes.RPC_METHOD" "SayHello" - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - "$SemanticAttributes.RPC_GRPC_STATUS_CODE" Status.Code.OK.value() - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" port - } - } - span(2) { - name "example.Greeter/SayHello" - kind SERVER - childOf span(1) - event(0) { - eventName "message" - attributes { - "message.type" "RECEIVED" - "message.id" 1 - } - } - event(1) { - eventName "message" - attributes { - "message.type" "SENT" - "message.id" 2 - } - } - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "example.Greeter" - "$SemanticAttributes.RPC_METHOD" "SayHello" - "$SemanticAttributes.NET_PEER_IP" "127.0.0.1" - // net.peer.name resolves to "127.0.0.1" on windows which is same as net.peer.ip so then not captured - "$SemanticAttributes.NET_PEER_NAME" { it == "localhost" || it == null } - "$SemanticAttributes.NET_PEER_PORT" Long - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - "$SemanticAttributes.RPC_GRPC_STATUS_CODE" Status.Code.OK.value() - } - } - span(3) { - name "child" - kind SpanKind.INTERNAL - childOf span(0) - } - } - } - - cleanup: - channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) - server?.shutdownNow()?.awaitTermination() - } - - def "test error - #paramName"() { - setup: - def error = grpcStatus.asException() - BindableService greeter = new GreeterGrpc.GreeterImplBase() { - @Override - void sayHello( - final Helloworld.Request req, final StreamObserver responseObserver) { - responseObserver.onError(error) - } - } - def port = PortUtils.findOpenPort() - Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start() - ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port)) - - // Depending on the version of gRPC usePlainText may or may not take an argument. - try { - channelBuilder.usePlaintext() - } catch (MissingMethodException ignored) { - channelBuilder.usePlaintext(true) - } - ManagedChannel channel = channelBuilder.build() - GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel) - - when: - client.sayHello(Helloworld.Request.newBuilder().setName(paramName).build()) - - then: - def e = thrown(StatusRuntimeException) - e.status.code == grpcStatus.code - e.status.description == grpcStatus.description - - assertTraces(1) { - trace(0, 2) { - span(0) { - name "example.Greeter/SayHello" - kind CLIENT - hasNoParent() - status ERROR - event(0) { - eventName "message" - attributes { - "message.type" "SENT" - "message.id" 1 - } - } - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "example.Greeter" - "$SemanticAttributes.RPC_METHOD" "SayHello" - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - "$SemanticAttributes.RPC_GRPC_STATUS_CODE" grpcStatus.code.value() - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" port - } - } - span(1) { - name "example.Greeter/SayHello" - kind SERVER - childOf span(0) - status ERROR - event(0) { - eventName "message" - attributes { - "message.type" "RECEIVED" - "message.id" 1 - } - } - if (grpcStatus.cause != null) { - errorEvent grpcStatus.cause.class, grpcStatus.cause.message, 1 - } - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "example.Greeter" - "$SemanticAttributes.RPC_METHOD" "SayHello" - "$SemanticAttributes.NET_PEER_IP" "127.0.0.1" - // net.peer.name resolves to "127.0.0.1" on windows which is same as net.peer.ip so then not captured - "$SemanticAttributes.NET_PEER_NAME" { it == "localhost" || it == null } - "$SemanticAttributes.NET_PEER_PORT" Long - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - "$SemanticAttributes.RPC_GRPC_STATUS_CODE" grpcStatus.code.value() - } - } - } - } - - cleanup: - channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) - server?.shutdownNow()?.awaitTermination() - - where: - paramName | grpcStatus - "Runtime - cause" | Status.UNKNOWN.withCause(new RuntimeException("some error")) - "Status - cause" | Status.PERMISSION_DENIED.withCause(new RuntimeException("some error")) - "StatusRuntime - cause" | Status.UNIMPLEMENTED.withCause(new RuntimeException("some error")) - "Runtime - description" | Status.UNKNOWN.withDescription("some description") - "Status - description" | Status.PERMISSION_DENIED.withDescription("some description") - "StatusRuntime - description" | Status.UNIMPLEMENTED.withDescription("some description") - } - - def "test error thrown - #paramName"() { - setup: - def error = grpcStatus.asRuntimeException() - BindableService greeter = new GreeterGrpc.GreeterImplBase() { - @Override - void sayHello( - final Helloworld.Request req, final StreamObserver responseObserver) { - throw error - } - } - def port = PortUtils.findOpenPort() - Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start() - ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port)) - - // Depending on the version of gRPC usePlainText may or may not take an argument. - try { - channelBuilder.usePlaintext() - } catch (MissingMethodException ignored) { - channelBuilder.usePlaintext(true) - } - ManagedChannel channel = channelBuilder.build() - GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel) - - when: - client.sayHello(Helloworld.Request.newBuilder().setName(paramName).build()) - - then: - def e = thrown(StatusRuntimeException) - // gRPC doesn't appear to propagate server exceptions that are thrown, not onError. - e.status.code == Status.UNKNOWN.code - e.status.description == null - - assertTraces(1) { - trace(0, 2) { - span(0) { - name "example.Greeter/SayHello" - kind CLIENT - hasNoParent() - status ERROR - event(0) { - eventName "message" - attributes { - "message.type" "SENT" - "message.id" 1 - } - } - // NB: Exceptions thrown on the server don't appear to be propagated to the client, at - // least for the version we test against. - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "example.Greeter" - "$SemanticAttributes.RPC_METHOD" "SayHello" - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - "$SemanticAttributes.RPC_GRPC_STATUS_CODE" Status.UNKNOWN.code.value() - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" port - } - } - span(1) { - name "example.Greeter/SayHello" - kind SERVER - childOf span(0) - status ERROR - event(0) { - eventName "message" - attributes { - "message.type" "RECEIVED" - "message.id" 1 - } - } - errorEvent grpcStatus.asRuntimeException().class, grpcStatus.asRuntimeException().message, 1 - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "example.Greeter" - "$SemanticAttributes.RPC_METHOD" "SayHello" - "$SemanticAttributes.NET_PEER_IP" "127.0.0.1" - // net.peer.name resolves to "127.0.0.1" on windows which is same as net.peer.ip so then not captured - "$SemanticAttributes.NET_PEER_NAME" { it == "localhost" || it == null } - "$SemanticAttributes.NET_PEER_PORT" Long - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - } - } - } - } - - cleanup: - channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) - server?.shutdownNow()?.awaitTermination() - - where: - paramName | grpcStatus - "Runtime - cause" | Status.UNKNOWN.withCause(new RuntimeException("some error")) - "Status - cause" | Status.PERMISSION_DENIED.withCause(new RuntimeException("some error")) - "StatusRuntime - cause" | Status.UNIMPLEMENTED.withCause(new RuntimeException("some error")) - "Runtime - description" | Status.UNKNOWN.withDescription("some description") - "Status - description" | Status.PERMISSION_DENIED.withDescription("some description") - "StatusRuntime - description" | Status.UNIMPLEMENTED.withDescription("some description") - } - - def "test user context preserved"() { - setup: - Context.Key key = Context.key("cat") - BindableService greeter = new GreeterGrpc.GreeterImplBase() { - @Override - void sayHello( - final Helloworld.Request req, final StreamObserver responseObserver) { - if (key.get() != "meow") { - responseObserver.onError(new AssertionError((Object) "context not preserved")) - return - } - if (!io.opentelemetry.api.trace.Span.fromContext(io.opentelemetry.context.Context.current()).getSpanContext().isValid()) { - responseObserver.onError(new AssertionError((Object) "span not attached")) - return - } - final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build() - responseObserver.onNext(reply) - responseObserver.onCompleted() - } - } - def port = PortUtils.findOpenPort() - Server server - server = configureServer(ServerBuilder.forPort(port) - .addService(greeter) - .intercept(new ServerInterceptor() { - @Override - ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { - if (!io.opentelemetry.api.trace.Span.fromContext(io.opentelemetry.context.Context.current()).getSpanContext().isValid()) { - throw new AssertionError((Object) "span not attached in server interceptor") - } - def ctx = Context.current().withValue(key, "meow") - return Contexts.interceptCall(ctx, call, headers, next) - } - })) - .build().start() - ManagedChannelBuilder channelBuilder - channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port)) - .intercept(new ClientInterceptor() { - @Override - ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { - if (!io.opentelemetry.api.trace.Span.fromContext(io.opentelemetry.context.Context.current()).getSpanContext().isValid()) { - throw new AssertionError((Object) "span not attached in client interceptor") - } - def ctx = Context.current().withValue(key, "meow") - def oldCtx = ctx.attach() - try { - return next.newCall(method, callOptions) - } finally { - ctx.detach(oldCtx) - } - } - }) - - // Depending on the version of gRPC usePlainText may or may not take an argument. - try { - channelBuilder.usePlaintext() - } catch (MissingMethodException e) { - channelBuilder.usePlaintext(true) - } - ManagedChannel channel = channelBuilder.build() - def client = GreeterGrpc.newStub(channel) - - when: - AtomicReference response = new AtomicReference<>() - AtomicReference error = new AtomicReference<>() - CountDownLatch latch = new CountDownLatch(1) - runWithSpan("parent") { - client.sayHello( - Helloworld.Request.newBuilder().setName("test").build(), - new StreamObserver() { - @Override - void onNext(Helloworld.Response r) { - if (key.get() != "meow") { - error.set(new AssertionError((Object) "context not preserved")) - return - } - if (!io.opentelemetry.api.trace.Span.fromContext(io.opentelemetry.context.Context.current()).getSpanContext().isValid()) { - error.set(new AssertionError((Object) "span not attached")) - return - } - response.set(r) - } - - @Override - void onError(Throwable throwable) { - error.set(throwable) - } - - @Override - void onCompleted() { - latch.countDown() - } - }) - } - - latch.await(10, TimeUnit.SECONDS) - - then: - error.get() == null - response.get().message == "Hello test" - - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - kind SpanKind.INTERNAL - hasNoParent() - } - span(1) { - name "example.Greeter/SayHello" - kind CLIENT - childOf span(0) - event(0) { - eventName "message" - attributes { - "message.type" "SENT" - "message.id" 1 - } - } - event(1) { - eventName "message" - attributes { - "message.type" "RECEIVED" - "message.id" 2 - } - } - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "example.Greeter" - "$SemanticAttributes.RPC_METHOD" "SayHello" - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - "$SemanticAttributes.RPC_GRPC_STATUS_CODE" Status.OK.code.value() - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" port - } - } - span(2) { - name "example.Greeter/SayHello" - kind SERVER - childOf span(1) - event(0) { - eventName "message" - attributes { - "message.type" "RECEIVED" - "message.id" 1 - } - } - event(1) { - eventName "message" - attributes { - "message.type" "SENT" - "message.id" 2 - } - } - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "example.Greeter" - "$SemanticAttributes.RPC_METHOD" "SayHello" - "$SemanticAttributes.NET_PEER_IP" "127.0.0.1" - // net.peer.name resolves to "127.0.0.1" on windows which is same as net.peer.ip so then not captured - "$SemanticAttributes.NET_PEER_NAME" { it == "localhost" || it == null } - "$SemanticAttributes.NET_PEER_PORT" Long - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - "$SemanticAttributes.RPC_GRPC_STATUS_CODE" Status.OK.code.value() - } - } - } - } - - cleanup: - channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) - server?.shutdownNow()?.awaitTermination() - } - - // Regression test for https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2285 - def "client error thrown"() { - setup: - BindableService greeter = new GreeterGrpc.GreeterImplBase() { - @Override - void sayHello( - final Helloworld.Request req, final StreamObserver responseObserver) { - // Send a response but don't complete so client can fail itself - responseObserver.onNext(Helloworld.Response.getDefaultInstance()) - } - } - def port = PortUtils.findOpenPort() - Server server - server = configureServer(ServerBuilder.forPort(port) - .addService(greeter)) - .build().start() - ManagedChannelBuilder channelBuilder - channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port)) - - // Depending on the version of gRPC usePlainText may or may not take an argument. - try { - channelBuilder.usePlaintext() - } catch (MissingMethodException e) { - channelBuilder.usePlaintext(true) - } - ManagedChannel channel = channelBuilder.build() - def client = GreeterGrpc.newStub(channel) - - when: - AtomicReference error = new AtomicReference<>() - CountDownLatch latch = new CountDownLatch(1) - runWithSpan("parent") { - client.sayHello( - Helloworld.Request.newBuilder().setName("test").build(), - new StreamObserver() { - @Override - void onNext(Helloworld.Response r) { - throw new IllegalStateException("illegal") - } - - @Override - void onError(Throwable throwable) { - error.set(throwable) - latch.countDown() - } - - @Override - void onCompleted() { - latch.countDown() - } - }) - } - - latch.await(10, TimeUnit.SECONDS) - - then: - error.get() != null - - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - kind SpanKind.INTERNAL - hasNoParent() - } - span(1) { - name "example.Greeter/SayHello" - kind CLIENT - childOf span(0) - status ERROR - event(0) { - eventName "message" - attributes { - "message.type" "SENT" - "message.id" 1 - } - } - event(1) { - eventName "message" - attributes { - "message.type" "RECEIVED" - "message.id" 2 - } - } - errorEvent(IllegalStateException, "illegal", 2) - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "example.Greeter" - "$SemanticAttributes.RPC_METHOD" "SayHello" - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - "$SemanticAttributes.RPC_GRPC_STATUS_CODE" Status.CANCELLED.code.value() - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" port - } - } - span(2) { - name "example.Greeter/SayHello" - kind SERVER - childOf span(1) - event(0) { - eventName "message" - attributes { - "message.type" "RECEIVED" - "message.id" 1 - } - } - event(1) { - eventName "message" - attributes { - "message.type" "SENT" - "message.id" 2 - } - } - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "example.Greeter" - "$SemanticAttributes.RPC_METHOD" "SayHello" - "$SemanticAttributes.NET_PEER_IP" "127.0.0.1" - // net.peer.name resolves to "127.0.0.1" on windows which is same as net.peer.ip so then not captured - "$SemanticAttributes.NET_PEER_NAME" { it == "localhost" || it == null } - "$SemanticAttributes.NET_PEER_PORT" Long - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - } - } - } - } - - cleanup: - channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) - server?.shutdownNow()?.awaitTermination() - } - - def "test reflection service"() { - setup: - def service = ProtoReflectionService.newInstance() - def port = PortUtils.findOpenPort() - Server server = configureServer(ServerBuilder.forPort(port).addService(service)).build().start() - ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port)) - - // Depending on the version of gRPC usePlainText may or may not take an argument. - try { - channelBuilder.usePlaintext() - } catch (MissingMethodException e) { - channelBuilder.usePlaintext(true) - } - ManagedChannel channel = channelBuilder.build() - ServerReflectionGrpc.ServerReflectionStub client = ServerReflectionGrpc.newStub(channel) - - when: - AtomicReference error = new AtomicReference<>() - AtomicReference response = new AtomicReference<>() - CountDownLatch latch = new CountDownLatch(1) - def request = client.serverReflectionInfo(new StreamObserver() { - @Override - void onNext(ServerReflectionResponse serverReflectionResponse) { - response.set(serverReflectionResponse) - } - - @Override - void onError(Throwable throwable) { - error.set(throwable) - latch.countDown() - } - - @Override - void onCompleted() { - latch.countDown() - } - }) - - request.onNext(ServerReflectionRequest.newBuilder() - .setListServices("The content will not be checked?") - .build()) - request.onCompleted() - - latch.await(10, TimeUnit.SECONDS) - - then: - error.get() == null - response.get().listServicesResponse.getService(0).name == "grpc.reflection.v1alpha.ServerReflection" - - assertTraces(1) { - trace(0, 2) { - span(0) { - name "grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo" - kind CLIENT - hasNoParent() - event(0) { - eventName "message" - attributes { - "message.type" "SENT" - "message.id" 1 - } - } - event(1) { - eventName "message" - attributes { - "message.type" "RECEIVED" - "message.id" 2 - } - } - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "grpc.reflection.v1alpha.ServerReflection" - "$SemanticAttributes.RPC_METHOD" "ServerReflectionInfo" - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - "$SemanticAttributes.RPC_GRPC_STATUS_CODE" Status.OK.code.value() - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" port - } - } - span(1) { - name "grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo" - kind SERVER - childOf span(0) - event(0) { - eventName "message" - attributes { - "message.type" "RECEIVED" - "message.id" 1 - } - } - event(1) { - eventName "message" - attributes { - "message.type" "SENT" - "message.id" 2 - } - } - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "grpc.reflection.v1alpha.ServerReflection" - "$SemanticAttributes.RPC_METHOD" "ServerReflectionInfo" - "$SemanticAttributes.NET_PEER_IP" "127.0.0.1" - // net.peer.name resolves to "127.0.0.1" on windows which is same as net.peer.ip so then not captured - "$SemanticAttributes.NET_PEER_NAME" { it == "localhost" || it == null } - "$SemanticAttributes.NET_PEER_PORT" Long - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - "$SemanticAttributes.RPC_GRPC_STATUS_CODE" Status.OK.code.value() - } - } - } - } - - cleanup: - channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) - server?.shutdownNow()?.awaitTermination() - } - - def "test reuse builders"() { - setup: - BindableService greeter = new GreeterGrpc.GreeterImplBase() { - @Override - void sayHello( - final Helloworld.Request req, final StreamObserver responseObserver) { - final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build() - responseObserver.onNext(reply) - responseObserver.onCompleted() - } - } - def port = PortUtils.findOpenPort() - ServerBuilder serverBuilder = configureServer(ServerBuilder.forPort(port).addService(greeter)) - // Multiple calls to build on same builder - serverBuilder.build() - Server server = serverBuilder.build().start() - ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port)) - - // Depending on the version of gRPC usePlainText may or may not take an argument. - try { - channelBuilder.usePlaintext() - } catch (MissingMethodException e) { - channelBuilder.usePlaintext(true) - } - // Multiple calls to build on the same builder - channelBuilder.build() - ManagedChannel channel = channelBuilder.build() - GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel) - - when: - def response = runWithSpan("parent") { - client.sayHello(Helloworld.Request.newBuilder().setName(paramName).build()) - } - - then: - response.message == "Hello $paramName" - - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - kind SpanKind.INTERNAL - hasNoParent() - } - span(1) { - name "example.Greeter/SayHello" - kind CLIENT - childOf span(0) - event(0) { - eventName "message" - attributes { - "message.type" "SENT" - "message.id" 1 - } - } - event(1) { - eventName "message" - attributes { - "message.type" "RECEIVED" - "message.id" 2 - } - } - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "example.Greeter" - "$SemanticAttributes.RPC_METHOD" "SayHello" - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - "$SemanticAttributes.RPC_GRPC_STATUS_CODE" Status.Code.OK.value() - "$SemanticAttributes.NET_PEER_NAME" "localhost" - "$SemanticAttributes.NET_PEER_PORT" port - } - } - span(2) { - name "example.Greeter/SayHello" - kind SERVER - childOf span(1) - event(0) { - eventName "message" - attributes { - "message.type" "RECEIVED" - "message.id" 1 - } - } - event(1) { - eventName "message" - attributes { - "message.type" "SENT" - "message.id" 2 - } - } - attributes { - "$SemanticAttributes.RPC_SYSTEM" "grpc" - "$SemanticAttributes.RPC_SERVICE" "example.Greeter" - "$SemanticAttributes.RPC_METHOD" "SayHello" - "$SemanticAttributes.NET_PEER_IP" "127.0.0.1" - // net.peer.name resolves to "127.0.0.1" on windows which is same as net.peer.ip so then not captured - "$SemanticAttributes.NET_PEER_NAME" { it == "localhost" || it == null } - "$SemanticAttributes.NET_PEER_PORT" Long - "$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP - "$SemanticAttributes.RPC_GRPC_STATUS_CODE" Status.Code.OK.value() - } - } - } - } - - cleanup: - channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) - server?.shutdownNow()?.awaitTermination() - - where: - paramName << ["some name", "some other name"] - } -} diff --git a/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcStreamingTest.java b/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcStreamingTest.java new file mode 100644 index 0000000000..f0a6bd6b69 --- /dev/null +++ b/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcStreamingTest.java @@ -0,0 +1,241 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.grpc.v1_6; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; + +import example.GreeterGrpc; +import example.Helloworld; +import io.grpc.BindableService; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.util.ThrowingRunnable; +import io.opentelemetry.sdk.testing.assertj.EventDataAssert; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.jupiter.api.AfterEach; +import org.junitpioneer.jupiter.cartesian.CartesianTest; + +public abstract class AbstractGrpcStreamingTest { + + protected abstract ServerBuilder configureServer(ServerBuilder server); + + protected abstract ManagedChannelBuilder configureClient(ManagedChannelBuilder client); + + protected abstract InstrumentationExtension testing(); + + private final Queue> closer = new ConcurrentLinkedQueue<>(); + + @AfterEach + void tearDown() throws Throwable { + while (!closer.isEmpty()) { + closer.poll().run(); + } + } + + @CartesianTest + @SuppressWarnings({"unchecked", "rawtypes"}) + void conversation( + @CartesianTest.Values(ints = {1, 2, 3}) int clientMessageCount, + @CartesianTest.Values(ints = {1, 2, 3}) int serverMessageCount) + throws Exception { + Queue serverReceived = new ConcurrentLinkedQueue<>(); + Queue clientReceived = new ConcurrentLinkedQueue<>(); + AtomicReference error = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(2); + + BindableService greeter = + new GreeterGrpc.GreeterImplBase() { + @Override + public StreamObserver conversation( + StreamObserver observer) { + return new StreamObserver() { + @Override + public void onNext(Helloworld.Response value) { + serverReceived.add(value.getMessage()); + + for (int i = 1; i <= serverMessageCount; i++) { + observer.onNext(value); + } + } + + @Override + public void onError(Throwable t) { + error.set(t); + observer.onError(t); + } + + @Override + public void onCompleted() { + observer.onCompleted(); + latch.countDown(); + } + }; + } + }; + + Server server = configureServer(ServerBuilder.forPort(0).addService(greeter)).build().start(); + ManagedChannel channel = createChannel(server); + closer.add(() -> channel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS)); + closer.add(() -> server.shutdownNow().awaitTermination()); + + GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel).withWaitForReady(); + + StreamObserver observer2 = + client.conversation( + new StreamObserver() { + @Override + public void onNext(Helloworld.Response value) { + clientReceived.add(value.getMessage()); + } + + @Override + public void onError(Throwable t) { + error.set(t); + } + + @Override + public void onCompleted() { + latch.countDown(); + } + }); + + for (int i = 1; i <= clientMessageCount; i++) { + Helloworld.Response message = + Helloworld.Response.newBuilder().setMessage("call " + i).build(); + observer2.onNext(message); + } + observer2.onCompleted(); + + latch.await(10, TimeUnit.SECONDS); + + assertThat(error).hasValue(null); + assertThat(serverReceived) + .containsExactlyElementsOf( + IntStream.rangeClosed(1, clientMessageCount) + .mapToObj(i -> "call " + i) + .collect(Collectors.toList())); + assertThat(clientReceived) + .containsExactlyElementsOf( + IntStream.rangeClosed(1, serverMessageCount) + .boxed() + .flatMap( + unused -> + IntStream.rangeClosed(1, clientMessageCount).mapToObj(i -> "call " + i)) + .sorted() + .collect(Collectors.toList())); + + List> events = new ArrayList<>(); + for (int i = 1; i <= clientMessageCount * serverMessageCount + clientMessageCount; i++) { + long messageId = i; + events.add( + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .hasSize(2) + .hasEntrySatisfying( + SemanticAttributes.MESSAGE_TYPE, + val -> + assertThat(val) + .satisfiesAnyOf( + v -> assertThat(v).isEqualTo("RECEIVED"), + v -> assertThat(v).isEqualTo("SENT"))) + .containsEntry(SemanticAttributes.MESSAGE_ID, messageId))); + } + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("example.Greeter/Conversation") + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"), + equalTo(SemanticAttributes.RPC_METHOD, "Conversation"), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP), + equalTo( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + (long) Status.Code.OK.value()), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, (long) server.getPort())) + .hasEventsSatisfyingExactly(events.toArray(new Consumer[0])), + span -> + span.hasName("example.Greeter/Conversation") + .hasKind(SpanKind.SERVER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"), + equalTo(SemanticAttributes.RPC_METHOD, "Conversation"), + equalTo(SemanticAttributes.NET_PEER_IP, "127.0.0.1"), + // net.peer.name resolves to "127.0.0.1" on windows which is same as + // net.peer.ip so then not captured + satisfies( + SemanticAttributes.NET_PEER_NAME, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isNull(), + v -> assertThat(v).isEqualTo("localhost"))), + satisfies( + SemanticAttributes.NET_PEER_PORT, + val -> assertThat(val).isNotNull()), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP), + equalTo( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + (long) Status.Code.OK.value())) + .hasEventsSatisfyingExactly(events.toArray(new Consumer[0])))); + } + + private ManagedChannel createChannel(Server server) throws Exception { + ManagedChannelBuilder channelBuilder = + configureClient(ManagedChannelBuilder.forAddress("localhost", server.getPort())); + return createChannel(channelBuilder); + } + + static ManagedChannel createChannel(ManagedChannelBuilder channelBuilder) throws Exception { + usePlainText(channelBuilder); + return channelBuilder.build(); + } + + private static void usePlainText(ManagedChannelBuilder channelBuilder) throws Exception { + // Depending on the version of gRPC usePlainText may or may not take an argument. + try { + channelBuilder + .getClass() + .getMethod("usePlaintext", boolean.class) + .invoke(channelBuilder, true); + } catch (NoSuchMethodException unused) { + channelBuilder.getClass().getMethod("usePlaintext").invoke(channelBuilder); + } + } +} diff --git a/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java b/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java new file mode 100644 index 0000000000..7d5d151059 --- /dev/null +++ b/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java @@ -0,0 +1,1395 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.grpc.v1_6; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.entry; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import example.GreeterGrpc; +import example.Helloworld; +import io.grpc.BindableService; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.Context; +import io.grpc.Contexts; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.protobuf.services.ProtoReflectionService; +import io.grpc.reflection.v1alpha.ServerReflectionGrpc; +import io.grpc.reflection.v1alpha.ServerReflectionRequest; +import io.grpc.reflection.v1alpha.ServerReflectionResponse; +import io.grpc.stub.StreamObserver; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.util.ThrowingRunnable; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.junit.jupiter.params.provider.ValueSource; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class AbstractGrpcTest { + + protected abstract ServerBuilder configureServer(ServerBuilder server); + + protected abstract ManagedChannelBuilder configureClient(ManagedChannelBuilder client); + + protected abstract InstrumentationExtension testing(); + + private final Queue> closer = new ConcurrentLinkedQueue<>(); + + @AfterEach + void tearDown() throws Throwable { + while (!closer.isEmpty()) { + closer.poll().run(); + } + } + + @ParameterizedTest + @ValueSource(strings = {"some name", "some other name"}) + void successBlockingStub(String paramName) throws Exception { + BindableService greeter = + new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello( + Helloworld.Request req, StreamObserver responseObserver) { + Helloworld.Response reply = + Helloworld.Response.newBuilder().setMessage("Hello " + req.getName()).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + }; + Server server = configureServer(ServerBuilder.forPort(0).addService(greeter)).build().start(); + ManagedChannel channel = createChannel(server); + closer.add(() -> channel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS)); + closer.add(() -> server.shutdownNow().awaitTermination()); + + GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel); + + Helloworld.Response response = + testing() + .runWithSpan( + "parent", + () -> client.sayHello(Helloworld.Request.newBuilder().setName(paramName).build())); + + assertThat(response.getMessage()).isEqualTo("Hello " + paramName); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"), + equalTo(SemanticAttributes.RPC_METHOD, "SayHello"), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP), + equalTo( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + (long) Status.Code.OK.value()), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, (long) server.getPort())) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "SENT"), + entry(SemanticAttributes.MESSAGE_ID, 1L))), + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "RECEIVED"), + entry(SemanticAttributes.MESSAGE_ID, 2L)))), + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.SERVER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"), + equalTo(SemanticAttributes.RPC_METHOD, "SayHello"), + equalTo(SemanticAttributes.NET_PEER_IP, "127.0.0.1"), + // net.peer.name resolves to "127.0.0.1" on windows which is same as + // net.peer.ip so then not captured + satisfies( + SemanticAttributes.NET_PEER_NAME, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isNull(), + v -> assertThat(v).isEqualTo("localhost"))), + satisfies( + SemanticAttributes.NET_PEER_PORT, + val -> assertThat(val).isNotNull()), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP), + equalTo( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + (long) Status.Code.OK.value())) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "RECEIVED"), + entry(SemanticAttributes.MESSAGE_ID, 1L))), + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "SENT"), + entry( + SemanticAttributes.MESSAGE_ID, 2L)))))); + } + + @Test + void listenableFuture() throws Exception { + BindableService greeter = + new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello( + Helloworld.Request req, StreamObserver responseObserver) { + Helloworld.Response reply = + Helloworld.Response.newBuilder().setMessage("Hello " + req.getName()).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + }; + + Server server = configureServer(ServerBuilder.forPort(0).addService(greeter)).build().start(); + ManagedChannel channel = createChannel(server); + closer.add(() -> channel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS)); + closer.add(() -> server.shutdownNow().awaitTermination()); + + GreeterGrpc.GreeterFutureStub client = GreeterGrpc.newFutureStub(channel); + + AtomicReference response = new AtomicReference<>(); + AtomicReference error = new AtomicReference<>(); + testing() + .runWithSpan( + "parent", + () -> { + ListenableFuture future = + Futures.transform( + client.sayHello(Helloworld.Request.newBuilder().setName("test").build()), + resp -> { + testing().runWithSpan("child", () -> {}); + return resp; + }, + MoreExecutors.directExecutor()); + try { + response.set(Futures.getUnchecked(future)); + } catch (Throwable t) { + error.set(t); + } + }); + + assertThat(error).hasValue(null); + assertThat(response.get().getMessage()).isEqualTo("Hello test"); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"), + equalTo(SemanticAttributes.RPC_METHOD, "SayHello"), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP), + equalTo( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + (long) Status.Code.OK.value()), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, (long) server.getPort())) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "SENT"), + entry(SemanticAttributes.MESSAGE_ID, 1L))), + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "RECEIVED"), + entry(SemanticAttributes.MESSAGE_ID, 2L)))), + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.SERVER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"), + equalTo(SemanticAttributes.RPC_METHOD, "SayHello"), + equalTo(SemanticAttributes.NET_PEER_IP, "127.0.0.1"), + // net.peer.name resolves to "127.0.0.1" on windows which is same as + // net.peer.ip so then not captured + satisfies( + SemanticAttributes.NET_PEER_NAME, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isNull(), + v -> assertThat(v).isEqualTo("localhost"))), + satisfies( + SemanticAttributes.NET_PEER_PORT, + val -> assertThat(val).isNotNull()), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP), + equalTo( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + (long) Status.Code.OK.value())) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "RECEIVED"), + entry(SemanticAttributes.MESSAGE_ID, 1L))), + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "SENT"), + entry(SemanticAttributes.MESSAGE_ID, 2L)))), + span -> + span.hasName("child") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)))); + } + + @Test + void streamingStub() throws Exception { + BindableService greeter = + new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello( + Helloworld.Request req, StreamObserver responseObserver) { + Helloworld.Response reply = + Helloworld.Response.newBuilder().setMessage("Hello " + req.getName()).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + }; + + Server server = configureServer(ServerBuilder.forPort(0).addService(greeter)).build().start(); + ManagedChannel channel = createChannel(server); + closer.add(() -> channel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS)); + closer.add(() -> server.shutdownNow().awaitTermination()); + + GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel); + + AtomicReference response = new AtomicReference<>(); + AtomicReference error = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + testing() + .runWithSpan( + "parent", + () -> + client.sayHello( + Helloworld.Request.newBuilder().setName("test").build(), + new StreamObserver() { + @Override + public void onNext(Helloworld.Response r) { + response.set(r); + } + + @Override + public void onError(Throwable throwable) { + error.set(throwable); + } + + @Override + public void onCompleted() { + testing().runWithSpan("child", () -> {}); + latch.countDown(); + } + })); + + latch.await(10, TimeUnit.SECONDS); + + assertThat(error).hasValue(null); + assertThat(response.get().getMessage()).isEqualTo("Hello test"); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"), + equalTo(SemanticAttributes.RPC_METHOD, "SayHello"), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP), + equalTo( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + (long) Status.Code.OK.value()), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, (long) server.getPort())) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "SENT"), + entry(SemanticAttributes.MESSAGE_ID, 1L))), + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "RECEIVED"), + entry(SemanticAttributes.MESSAGE_ID, 2L)))), + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.SERVER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"), + equalTo(SemanticAttributes.RPC_METHOD, "SayHello"), + equalTo(SemanticAttributes.NET_PEER_IP, "127.0.0.1"), + // net.peer.name resolves to "127.0.0.1" on windows which is same as + // net.peer.ip so then not captured + satisfies( + SemanticAttributes.NET_PEER_NAME, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isNull(), + v -> assertThat(v).isEqualTo("localhost"))), + satisfies( + SemanticAttributes.NET_PEER_PORT, + val -> assertThat(val).isNotNull()), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP), + equalTo( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + (long) Status.Code.OK.value())) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "RECEIVED"), + entry(SemanticAttributes.MESSAGE_ID, 1L))), + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "SENT"), + entry(SemanticAttributes.MESSAGE_ID, 2L)))), + span -> + span.hasName("child") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)))); + } + + @ParameterizedTest + @ArgumentsSource(ErrorProvider.class) + void errorReturned(Status status) throws Exception { + BindableService greeter = + new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello( + Helloworld.Request req, StreamObserver responseObserver) { + responseObserver.onError(status.asException()); + } + }; + Server server = configureServer(ServerBuilder.forPort(0).addService(greeter)).build().start(); + ManagedChannel channel = createChannel(server); + closer.add(() -> channel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS)); + closer.add(() -> server.shutdownNow().awaitTermination()); + + GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel); + + assertThatThrownBy( + () -> client.sayHello(Helloworld.Request.newBuilder().setName("error").build())) + .isInstanceOfSatisfying( + StatusRuntimeException.class, + t -> { + assertThat(t.getStatus().getCode()).isEqualTo(status.getCode()); + assertThat(t.getStatus().getDescription()).isEqualTo(status.getDescription()); + }); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasStatus(StatusData.error()) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"), + equalTo(SemanticAttributes.RPC_METHOD, "SayHello"), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP), + equalTo( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + (long) status.getCode().value()), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, (long) server.getPort())) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "SENT"), + entry(SemanticAttributes.MESSAGE_ID, 1L)))), + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.SERVER) + .hasParent(trace.getSpan(0)) + .hasStatus(StatusData.error()) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"), + equalTo(SemanticAttributes.RPC_METHOD, "SayHello"), + equalTo(SemanticAttributes.NET_PEER_IP, "127.0.0.1"), + // net.peer.name resolves to "127.0.0.1" on windows which is same as + // net.peer.ip so then not captured + satisfies( + SemanticAttributes.NET_PEER_NAME, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isNull(), + v -> assertThat(v).isEqualTo("localhost"))), + satisfies( + SemanticAttributes.NET_PEER_PORT, + val -> assertThat(val).isNotNull()), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP), + equalTo( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + (long) status.getCode().value())) + .hasEventsSatisfying( + events -> { + assertThat(events).isNotEmpty(); + assertThat(events.get(0)) + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "RECEIVED"), + entry(SemanticAttributes.MESSAGE_ID, 1L))); + if (status.getCause() == null) { + assertThat(events).hasSize(1); + } else { + assertThat(events).hasSize(2); + span.hasException(status.getCause()); + } + }))); + } + + @ParameterizedTest + @ArgumentsSource(ErrorProvider.class) + void errorThrown(Status status) throws Exception { + BindableService greeter = + new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello( + Helloworld.Request req, StreamObserver responseObserver) { + throw status.asRuntimeException(); + } + }; + Server server = configureServer(ServerBuilder.forPort(0).addService(greeter)).build().start(); + ManagedChannel channel = createChannel(server); + closer.add(() -> channel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS)); + closer.add(() -> server.shutdownNow().awaitTermination()); + + GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel); + + assertThatThrownBy( + () -> client.sayHello(Helloworld.Request.newBuilder().setName("error").build())) + .isInstanceOfSatisfying( + StatusRuntimeException.class, + t -> { + // gRPC doesn't appear to propagate server exceptions that are thrown, not onError. + assertThat(t.getStatus().getCode()).isEqualTo(Status.UNKNOWN.getCode()); + assertThat(t.getStatus().getDescription()).isNull(); + }); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + // NB: Exceptions thrown on the server don't appear to be propagated to the + // client, at + // least for the version we test against, so the client gets an UNKNOWN + // status and the server + // doesn't record one at all. + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasStatus(StatusData.error()) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"), + equalTo(SemanticAttributes.RPC_METHOD, "SayHello"), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP), + equalTo( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + (long) Status.UNKNOWN.getCode().value()), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, (long) server.getPort())) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "SENT"), + entry(SemanticAttributes.MESSAGE_ID, 1L)))), + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.SERVER) + .hasParent(trace.getSpan(0)) + .hasStatus(StatusData.error()) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"), + equalTo(SemanticAttributes.RPC_METHOD, "SayHello"), + equalTo(SemanticAttributes.NET_PEER_IP, "127.0.0.1"), + // net.peer.name resolves to "127.0.0.1" on windows which is same as + // net.peer.ip so then not captured + satisfies( + SemanticAttributes.NET_PEER_NAME, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isNull(), + v -> assertThat(v).isEqualTo("localhost"))), + satisfies( + SemanticAttributes.NET_PEER_PORT, + val -> assertThat(val).isNotNull()), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP)) + .hasEventsSatisfying( + events -> { + assertThat(events).hasSize(2); + assertThat(events.get(0)) + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "RECEIVED"), + entry(SemanticAttributes.MESSAGE_ID, 1L))); + span.hasException(status.asRuntimeException()); + }))); + } + + static class ErrorProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(ExtensionContext context) { + return Stream.of( + arguments(Status.UNKNOWN.withCause(new RuntimeException("some error"))), + arguments(Status.PERMISSION_DENIED.withCause(new RuntimeException("some error"))), + arguments(Status.UNIMPLEMENTED.withCause(new RuntimeException("some error"))), + arguments(Status.UNKNOWN.withDescription("some description")), + arguments(Status.PERMISSION_DENIED.withDescription("some description")), + arguments(Status.UNIMPLEMENTED.withDescription("some description"))); + } + } + + @Test + void userContextPreserved() throws Exception { + Context.Key key = Context.key("cat"); + BindableService greeter = + new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello( + Helloworld.Request req, StreamObserver responseObserver) { + if (!key.get().equals("meow")) { + responseObserver.onError(new AssertionError("context not preserved")); + return; + } + if (!Span.fromContext(io.opentelemetry.context.Context.current()) + .getSpanContext() + .isValid()) { + responseObserver.onError(new AssertionError("span not attached")); + return; + } + Helloworld.Response reply = + Helloworld.Response.newBuilder().setMessage("Hello " + req.getName()).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + }; + Server server = + configureServer( + ServerBuilder.forPort(0) + .addService(greeter) + .intercept( + new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next) { + if (!Span.fromContext(io.opentelemetry.context.Context.current()) + .getSpanContext() + .isValid()) { + throw new AssertionError("span not attached in server interceptor"); + } + Context ctx = Context.current().withValue(key, "meow"); + return Contexts.interceptCall(ctx, call, headers, next); + } + })) + .build() + .start(); + ManagedChannel channel = + createChannel( + configureClient(ManagedChannelBuilder.forAddress("localhost", server.getPort())) + .intercept( + new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, + CallOptions callOptions, + Channel next) { + if (!Span.fromContext(io.opentelemetry.context.Context.current()) + .getSpanContext() + .isValid()) { + throw new AssertionError("span not attached in client interceptor"); + } + Context ctx = Context.current().withValue(key, "meow"); + Context oldCtx = ctx.attach(); + try { + return next.newCall(method, callOptions); + } finally { + ctx.detach(oldCtx); + } + } + })); + closer.add(() -> channel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS)); + closer.add(() -> server.shutdownNow().awaitTermination()); + + GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel); + + AtomicReference response = new AtomicReference<>(); + AtomicReference error = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + testing() + .runWithSpan( + "parent", + () -> + client.sayHello( + Helloworld.Request.newBuilder().setName("test").build(), + new StreamObserver() { + @Override + public void onNext(Helloworld.Response r) { + if (!key.get().equals("meow")) { + error.set(new AssertionError("context not preserved")); + return; + } + if (!Span.fromContext(io.opentelemetry.context.Context.current()) + .getSpanContext() + .isValid()) { + error.set(new AssertionError("span not attached")); + return; + } + response.set(r); + } + + @Override + public void onError(Throwable throwable) { + error.set(throwable); + } + + @Override + public void onCompleted() { + latch.countDown(); + } + })); + + latch.await(10, TimeUnit.SECONDS); + + assertThat(error).hasValue(null); + assertThat(response.get().getMessage()).isEqualTo("Hello test"); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"), + equalTo(SemanticAttributes.RPC_METHOD, "SayHello"), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP), + equalTo( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + (long) Status.Code.OK.value()), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, (long) server.getPort())) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "SENT"), + entry(SemanticAttributes.MESSAGE_ID, 1L))), + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "RECEIVED"), + entry(SemanticAttributes.MESSAGE_ID, 2L)))), + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.SERVER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"), + equalTo(SemanticAttributes.RPC_METHOD, "SayHello"), + equalTo(SemanticAttributes.NET_PEER_IP, "127.0.0.1"), + // net.peer.name resolves to "127.0.0.1" on windows which is same as + // net.peer.ip so then not captured + satisfies( + SemanticAttributes.NET_PEER_NAME, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isNull(), + v -> assertThat(v).isEqualTo("localhost"))), + satisfies( + SemanticAttributes.NET_PEER_PORT, + val -> assertThat(val).isNotNull()), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP), + equalTo( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + (long) Status.Code.OK.value())) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "RECEIVED"), + entry(SemanticAttributes.MESSAGE_ID, 1L))), + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "SENT"), + entry( + SemanticAttributes.MESSAGE_ID, 2L)))))); + } + + @Test + void clientErrorThrown() throws Exception { + BindableService greeter = + new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello( + Helloworld.Request req, StreamObserver responseObserver) { + // Send a response but don't complete so client can fail itself + responseObserver.onNext(Helloworld.Response.getDefaultInstance()); + } + }; + + Server server = configureServer(ServerBuilder.forPort(0).addService(greeter)).build().start(); + ManagedChannel channel = createChannel(server); + closer.add(() -> channel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS)); + closer.add(() -> server.shutdownNow().awaitTermination()); + + GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel); + + IllegalStateException thrown = new IllegalStateException("illegal"); + AtomicReference response = new AtomicReference<>(); + AtomicReference error = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + testing() + .runWithSpan( + "parent", + () -> + client.sayHello( + Helloworld.Request.newBuilder().setName("test").build(), + new StreamObserver() { + @Override + public void onNext(Helloworld.Response r) { + throw thrown; + } + + @Override + public void onError(Throwable throwable) { + error.set(throwable); + latch.countDown(); + } + + @Override + public void onCompleted() { + testing().runWithSpan("child", () -> {}); + latch.countDown(); + } + })); + + latch.await(10, TimeUnit.SECONDS); + + assertThat(error.get()).isNotNull(); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasStatus(StatusData.error()) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"), + equalTo(SemanticAttributes.RPC_METHOD, "SayHello"), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP), + equalTo( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + (long) Status.Code.CANCELLED.value()), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, (long) server.getPort())) + .hasEventsSatisfying( + events -> { + assertThat(events).hasSize(3); + assertThat(events.get(0)) + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, "SENT"), + entry(SemanticAttributes.MESSAGE_ID, 1L))); + assertThat(events.get(1)) + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "RECEIVED"), + entry(SemanticAttributes.MESSAGE_ID, 2L))); + span.hasException(thrown); + }), + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.SERVER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"), + equalTo(SemanticAttributes.RPC_METHOD, "SayHello"), + equalTo(SemanticAttributes.NET_PEER_IP, "127.0.0.1"), + // net.peer.name resolves to "127.0.0.1" on windows which is same as + // net.peer.ip so then not captured + satisfies( + SemanticAttributes.NET_PEER_NAME, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isNull(), + v -> assertThat(v).isEqualTo("localhost"))), + satisfies( + SemanticAttributes.NET_PEER_PORT, + val -> assertThat(val).isNotNull()), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP)) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "RECEIVED"), + entry(SemanticAttributes.MESSAGE_ID, 1L))), + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "SENT"), + entry( + SemanticAttributes.MESSAGE_ID, 2L)))))); + } + + @Test + void reflectionService() throws Exception { + Server server = + configureServer(ServerBuilder.forPort(0).addService(ProtoReflectionService.newInstance())) + .build() + .start(); + ManagedChannel channel = createChannel(server); + closer.add(() -> channel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS)); + closer.add(() -> server.shutdownNow().awaitTermination()); + + ServerReflectionGrpc.ServerReflectionStub client = ServerReflectionGrpc.newStub(channel); + + AtomicReference response = new AtomicReference<>(); + AtomicReference error = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + + StreamObserver request = + client.serverReflectionInfo( + new StreamObserver() { + @Override + public void onNext(ServerReflectionResponse serverReflectionResponse) { + response.set(serverReflectionResponse); + } + + @Override + public void onError(Throwable throwable) { + error.set(throwable); + latch.countDown(); + } + + @Override + public void onCompleted() { + latch.countDown(); + } + }); + + request.onNext( + ServerReflectionRequest.newBuilder() + .setListServices("The content will not be checked?") + .build()); + request.onCompleted(); + + latch.await(10, TimeUnit.SECONDS); + + assertThat(error).hasValue(null); + assertThat(response.get().getListServicesResponse().getService(0).getName()) + .isEqualTo("grpc.reflection.v1alpha.ServerReflection"); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName( + "grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo") + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo( + SemanticAttributes.RPC_SERVICE, + "grpc.reflection.v1alpha.ServerReflection"), + equalTo(SemanticAttributes.RPC_METHOD, "ServerReflectionInfo"), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP), + equalTo( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + (long) Status.Code.OK.value()), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, (long) server.getPort())) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "SENT"), + entry(SemanticAttributes.MESSAGE_ID, 1L))), + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "RECEIVED"), + entry(SemanticAttributes.MESSAGE_ID, 2L)))), + span -> + span.hasName( + "grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo") + .hasKind(SpanKind.SERVER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo( + SemanticAttributes.RPC_SERVICE, + "grpc.reflection.v1alpha.ServerReflection"), + equalTo(SemanticAttributes.RPC_METHOD, "ServerReflectionInfo"), + equalTo(SemanticAttributes.NET_PEER_IP, "127.0.0.1"), + // net.peer.name resolves to "127.0.0.1" on windows which is same as + // net.peer.ip so then not captured + satisfies( + SemanticAttributes.NET_PEER_NAME, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isNull(), + v -> assertThat(v).isEqualTo("localhost"))), + satisfies( + SemanticAttributes.NET_PEER_PORT, + val -> assertThat(val).isNotNull()), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP), + equalTo( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + (long) Status.Code.OK.value())) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "RECEIVED"), + entry(SemanticAttributes.MESSAGE_ID, 1L))), + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "SENT"), + entry( + SemanticAttributes.MESSAGE_ID, 2L)))))); + } + + @Test + void reuseBuilders() throws Exception { + BindableService greeter = + new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello( + Helloworld.Request req, StreamObserver responseObserver) { + Helloworld.Response reply = + Helloworld.Response.newBuilder().setMessage("Hello " + req.getName()).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + }; + ServerBuilder serverBuilder = configureServer(ServerBuilder.forPort(0).addService(greeter)); + // Multiple calls to build on same builder + serverBuilder.build(); + Server server = serverBuilder.build().start(); + ManagedChannelBuilder channelBuilder = + configureClient(ManagedChannelBuilder.forAddress("localhost", server.getPort())); + usePlainText(channelBuilder); + // Multiple calls to build on the same builder + channelBuilder.build(); + ManagedChannel channel = channelBuilder.build(); + closer.add(() -> channel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS)); + closer.add(() -> server.shutdownNow().awaitTermination()); + + GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel); + + Helloworld.Response response = + testing() + .runWithSpan( + "parent", + () -> client.sayHello(Helloworld.Request.newBuilder().setName("test").build())); + + assertThat(response.getMessage()).isEqualTo("Hello test"); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"), + equalTo(SemanticAttributes.RPC_METHOD, "SayHello"), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP), + equalTo( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + (long) Status.Code.OK.value()), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, (long) server.getPort())) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "SENT"), + entry(SemanticAttributes.MESSAGE_ID, 1L))), + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "RECEIVED"), + entry(SemanticAttributes.MESSAGE_ID, 2L)))), + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.SERVER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"), + equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"), + equalTo(SemanticAttributes.RPC_METHOD, "SayHello"), + equalTo(SemanticAttributes.NET_PEER_IP, "127.0.0.1"), + // net.peer.name resolves to "127.0.0.1" on windows which is same as + // net.peer.ip so then not captured + satisfies( + SemanticAttributes.NET_PEER_NAME, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isNull(), + v -> assertThat(v).isEqualTo("localhost"))), + satisfies( + SemanticAttributes.NET_PEER_PORT, + val -> assertThat(val).isNotNull()), + equalTo( + SemanticAttributes.NET_TRANSPORT, + SemanticAttributes.NetTransportValues.IP_TCP), + equalTo( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + (long) Status.Code.OK.value())) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "RECEIVED"), + entry(SemanticAttributes.MESSAGE_ID, 1L))), + event -> + event + .hasName("message") + .hasAttributesSatisfying( + attrs -> + assertThat(attrs) + .containsOnly( + entry( + SemanticAttributes.MESSAGE_TYPE, + "SENT"), + entry( + SemanticAttributes.MESSAGE_ID, 2L)))))); + } + + private ManagedChannel createChannel(Server server) throws Exception { + ManagedChannelBuilder channelBuilder = + configureClient(ManagedChannelBuilder.forAddress("localhost", server.getPort())); + return createChannel(channelBuilder); + } + + static ManagedChannel createChannel(ManagedChannelBuilder channelBuilder) throws Exception { + usePlainText(channelBuilder); + return channelBuilder.build(); + } + + private static void usePlainText(ManagedChannelBuilder channelBuilder) throws Exception { + // Depending on the version of gRPC usePlainText may or may not take an argument. + try { + channelBuilder + .getClass() + .getMethod("usePlaintext", boolean.class) + .invoke(channelBuilder, true); + } catch (NoSuchMethodException unused) { + channelBuilder.getClass().getMethod("usePlaintext").invoke(channelBuilder); + } + } +}