Migrate gRPC tests to Java (#5521)
* Migrate gRPC tests to Java * check * clean
This commit is contained in:
parent
b3496381f1
commit
9e5fdcebd5
|
@ -1,23 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright The OpenTelemetry Authors
|
|
||||||
* SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.opentelemetry.javaagent.instrumentation.grpc.v1_6
|
|
||||||
|
|
||||||
import io.grpc.ManagedChannelBuilder
|
|
||||||
import io.grpc.ServerBuilder
|
|
||||||
import io.opentelemetry.instrumentation.grpc.v1_6.AbstractGrpcStreamingTest
|
|
||||||
import io.opentelemetry.instrumentation.test.AgentTestTrait
|
|
||||||
|
|
||||||
class GrpcStreamingTest extends AbstractGrpcStreamingTest implements AgentTestTrait {
|
|
||||||
@Override
|
|
||||||
ServerBuilder configureServer(ServerBuilder server) {
|
|
||||||
return server
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
ManagedChannelBuilder configureClient(ManagedChannelBuilder client) {
|
|
||||||
return client
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,23 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright The OpenTelemetry Authors
|
|
||||||
* SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.opentelemetry.javaagent.instrumentation.grpc.v1_6
|
|
||||||
|
|
||||||
import io.grpc.ManagedChannelBuilder
|
|
||||||
import io.grpc.ServerBuilder
|
|
||||||
import io.opentelemetry.instrumentation.grpc.v1_6.AbstractGrpcTest
|
|
||||||
import io.opentelemetry.instrumentation.test.AgentTestTrait
|
|
||||||
|
|
||||||
class GrpcTest extends AbstractGrpcTest implements AgentTestTrait {
|
|
||||||
@Override
|
|
||||||
ServerBuilder configureServer(ServerBuilder server) {
|
|
||||||
return server
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
ManagedChannelBuilder configureClient(ManagedChannelBuilder client) {
|
|
||||||
return client
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.javaagent.instrumentation.grpc.v1_6;
|
||||||
|
|
||||||
|
import io.grpc.ManagedChannelBuilder;
|
||||||
|
import io.grpc.ServerBuilder;
|
||||||
|
import io.opentelemetry.instrumentation.grpc.v1_6.AbstractGrpcStreamingTest;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||||
|
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||||
|
|
||||||
|
class GrpcStreamingTest extends AbstractGrpcStreamingTest {
|
||||||
|
|
||||||
|
@RegisterExtension
|
||||||
|
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ServerBuilder<?> configureServer(ServerBuilder<?> server) {
|
||||||
|
return server;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ManagedChannelBuilder<?> configureClient(ManagedChannelBuilder<?> client) {
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected InstrumentationExtension testing() {
|
||||||
|
return testing;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.javaagent.instrumentation.grpc.v1_6;
|
||||||
|
|
||||||
|
import io.grpc.ManagedChannelBuilder;
|
||||||
|
import io.grpc.ServerBuilder;
|
||||||
|
import io.opentelemetry.instrumentation.grpc.v1_6.AbstractGrpcTest;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||||
|
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||||
|
|
||||||
|
class GrpcTest extends AbstractGrpcTest {
|
||||||
|
|
||||||
|
@RegisterExtension
|
||||||
|
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ServerBuilder<?> configureServer(ServerBuilder<?> server) {
|
||||||
|
return server;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ManagedChannelBuilder<?> configureClient(ManagedChannelBuilder<?> client) {
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected InstrumentationExtension testing() {
|
||||||
|
return testing;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,22 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright The OpenTelemetry Authors
|
|
||||||
* SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.opentelemetry.instrumentation.grpc.v1_6
|
|
||||||
|
|
||||||
import io.grpc.ManagedChannelBuilder
|
|
||||||
import io.grpc.ServerBuilder
|
|
||||||
import io.opentelemetry.instrumentation.test.LibraryTestTrait
|
|
||||||
|
|
||||||
class GrpcStreamingTest extends AbstractGrpcStreamingTest implements LibraryTestTrait {
|
|
||||||
@Override
|
|
||||||
ServerBuilder configureServer(ServerBuilder server) {
|
|
||||||
return server.intercept(GrpcTracing.create(getOpenTelemetry()).newServerInterceptor())
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
ManagedChannelBuilder configureClient(ManagedChannelBuilder client) {
|
|
||||||
return client.intercept(GrpcTracing.create(getOpenTelemetry()).newClientInterceptor())
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,22 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright The OpenTelemetry Authors
|
|
||||||
* SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.opentelemetry.instrumentation.grpc.v1_6
|
|
||||||
|
|
||||||
import io.grpc.ManagedChannelBuilder
|
|
||||||
import io.grpc.ServerBuilder
|
|
||||||
import io.opentelemetry.instrumentation.test.LibraryTestTrait
|
|
||||||
|
|
||||||
class GrpcTest extends AbstractGrpcTest implements LibraryTestTrait {
|
|
||||||
@Override
|
|
||||||
ServerBuilder configureServer(ServerBuilder server) {
|
|
||||||
return server.intercept(GrpcTracing.create(getOpenTelemetry()).newServerInterceptor())
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
ManagedChannelBuilder configureClient(ManagedChannelBuilder client) {
|
|
||||||
return client.intercept(GrpcTracing.create(getOpenTelemetry()).newClientInterceptor())
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.instrumentation.grpc.v1_6;
|
||||||
|
|
||||||
|
import io.grpc.ManagedChannelBuilder;
|
||||||
|
import io.grpc.ServerBuilder;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
|
||||||
|
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||||
|
|
||||||
|
class GrpcStreamingTest extends AbstractGrpcStreamingTest {
|
||||||
|
|
||||||
|
@RegisterExtension
|
||||||
|
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ServerBuilder<?> configureServer(ServerBuilder<?> server) {
|
||||||
|
return server.intercept(GrpcTracing.create(testing.getOpenTelemetry()).newServerInterceptor());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ManagedChannelBuilder<?> configureClient(ManagedChannelBuilder<?> client) {
|
||||||
|
return client.intercept(GrpcTracing.create(testing.getOpenTelemetry()).newClientInterceptor());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected InstrumentationExtension testing() {
|
||||||
|
return testing;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,33 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.instrumentation.grpc.v1_6;
|
||||||
|
|
||||||
|
import io.grpc.ManagedChannelBuilder;
|
||||||
|
import io.grpc.ServerBuilder;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
|
||||||
|
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||||
|
|
||||||
|
class GrpcTest extends AbstractGrpcTest {
|
||||||
|
|
||||||
|
@RegisterExtension
|
||||||
|
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ServerBuilder<?> configureServer(ServerBuilder<?> server) {
|
||||||
|
return server.intercept(GrpcTracing.create(testing.getOpenTelemetry()).newServerInterceptor());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ManagedChannelBuilder<?> configureClient(ManagedChannelBuilder<?> client) {
|
||||||
|
return client.intercept(GrpcTracing.create(testing.getOpenTelemetry()).newClientInterceptor());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected InstrumentationExtension testing() {
|
||||||
|
return testing;
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,9 +17,8 @@ dependencies {
|
||||||
|
|
||||||
implementation("com.google.guava:guava")
|
implementation("com.google.guava:guava")
|
||||||
|
|
||||||
implementation("org.codehaus.groovy:groovy-all")
|
api("org.junit-pioneer:junit-pioneer")
|
||||||
implementation("io.opentelemetry:opentelemetry-api")
|
implementation("io.opentelemetry:opentelemetry-api")
|
||||||
implementation("org.spockframework:spock-core")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tasks {
|
tasks {
|
||||||
|
|
|
@ -1,180 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright The OpenTelemetry Authors
|
|
||||||
* SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.opentelemetry.instrumentation.grpc.v1_6
|
|
||||||
|
|
||||||
import example.GreeterGrpc
|
|
||||||
import example.Helloworld
|
|
||||||
import io.grpc.BindableService
|
|
||||||
import io.grpc.ManagedChannel
|
|
||||||
import io.grpc.ManagedChannelBuilder
|
|
||||||
import io.grpc.Server
|
|
||||||
import io.grpc.ServerBuilder
|
|
||||||
import io.grpc.Status
|
|
||||||
import io.grpc.stub.StreamObserver
|
|
||||||
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
|
|
||||||
import io.opentelemetry.instrumentation.test.utils.PortUtils
|
|
||||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
|
||||||
import spock.lang.Unroll
|
|
||||||
|
|
||||||
import java.util.concurrent.CopyOnWriteArrayList
|
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
|
||||||
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
|
||||||
import static io.opentelemetry.api.trace.SpanKind.SERVER
|
|
||||||
|
|
||||||
@Unroll
|
|
||||||
abstract class AbstractGrpcStreamingTest extends InstrumentationSpecification {
|
|
||||||
|
|
||||||
abstract ServerBuilder configureServer(ServerBuilder server)
|
|
||||||
|
|
||||||
abstract ManagedChannelBuilder configureClient(ManagedChannelBuilder client)
|
|
||||||
|
|
||||||
def "test conversation #paramName"() {
|
|
||||||
setup:
|
|
||||||
def msgCount = serverMessageCount
|
|
||||||
def serverReceived = new CopyOnWriteArrayList<>()
|
|
||||||
def clientReceived = new CopyOnWriteArrayList<>()
|
|
||||||
def error = new AtomicReference()
|
|
||||||
|
|
||||||
BindableService greeter = new GreeterGrpc.GreeterImplBase() {
|
|
||||||
@Override
|
|
||||||
StreamObserver<Helloworld.Response> conversation(StreamObserver<Helloworld.Response> observer) {
|
|
||||||
return new StreamObserver<Helloworld.Response>() {
|
|
||||||
@Override
|
|
||||||
void onNext(Helloworld.Response value) {
|
|
||||||
|
|
||||||
serverReceived << value.message
|
|
||||||
|
|
||||||
(1..msgCount).each {
|
|
||||||
observer.onNext(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void onError(Throwable t) {
|
|
||||||
error.set(t)
|
|
||||||
observer.onError(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void onCompleted() {
|
|
||||||
observer.onCompleted()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
def port = PortUtils.findOpenPort()
|
|
||||||
Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start()
|
|
||||||
ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port))
|
|
||||||
|
|
||||||
// Depending on the version of gRPC usePlainText may or may not take an argument.
|
|
||||||
try {
|
|
||||||
channelBuilder.usePlaintext()
|
|
||||||
} catch (MissingMethodException e) {
|
|
||||||
channelBuilder.usePlaintext(true)
|
|
||||||
}
|
|
||||||
ManagedChannel channel = channelBuilder.build()
|
|
||||||
GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel).withWaitForReady()
|
|
||||||
|
|
||||||
when:
|
|
||||||
def observer2 = client.conversation(new StreamObserver<Helloworld.Response>() {
|
|
||||||
@Override
|
|
||||||
void onNext(Helloworld.Response value) {
|
|
||||||
clientReceived << value.message
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void onError(Throwable t) {
|
|
||||||
error.set(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void onCompleted() {
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
clientRange.each {
|
|
||||||
def message = Helloworld.Response.newBuilder().setMessage("call $it").build()
|
|
||||||
observer2.onNext(message)
|
|
||||||
}
|
|
||||||
observer2.onCompleted()
|
|
||||||
|
|
||||||
then:
|
|
||||||
assertTraces(1) {
|
|
||||||
trace(0, 2) {
|
|
||||||
span(0) {
|
|
||||||
name "example.Greeter/Conversation"
|
|
||||||
kind CLIENT
|
|
||||||
hasNoParent()
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.RPC_SYSTEM" "grpc"
|
|
||||||
"$SemanticAttributes.RPC_SERVICE" "example.Greeter"
|
|
||||||
"$SemanticAttributes.RPC_METHOD" "Conversation"
|
|
||||||
"$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP
|
|
||||||
"$SemanticAttributes.RPC_GRPC_STATUS_CODE" Status.OK.code.value()
|
|
||||||
"$SemanticAttributes.NET_PEER_NAME" "localhost"
|
|
||||||
"$SemanticAttributes.NET_PEER_PORT" port
|
|
||||||
}
|
|
||||||
(1..(clientMessageCount * serverMessageCount + clientMessageCount)).each {
|
|
||||||
def messageId = it
|
|
||||||
event(it - 1) {
|
|
||||||
eventName "message"
|
|
||||||
attributes {
|
|
||||||
"message.type" { it == "SENT" || it == "RECEIVED" }
|
|
||||||
"message.id" messageId
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
span(1) {
|
|
||||||
name "example.Greeter/Conversation"
|
|
||||||
kind SERVER
|
|
||||||
childOf span(0)
|
|
||||||
attributes {
|
|
||||||
"$SemanticAttributes.RPC_SYSTEM" "grpc"
|
|
||||||
"$SemanticAttributes.RPC_SERVICE" "example.Greeter"
|
|
||||||
"$SemanticAttributes.RPC_METHOD" "Conversation"
|
|
||||||
"$SemanticAttributes.NET_PEER_IP" "127.0.0.1"
|
|
||||||
// net.peer.name resolves to "127.0.0.1" on windows which is same as net.peer.ip so then not captured
|
|
||||||
"$SemanticAttributes.NET_PEER_NAME" { it == "localhost" || it == null }
|
|
||||||
"$SemanticAttributes.NET_PEER_PORT" Long
|
|
||||||
"$SemanticAttributes.NET_TRANSPORT" SemanticAttributes.NetTransportValues.IP_TCP
|
|
||||||
"$SemanticAttributes.RPC_GRPC_STATUS_CODE" Status.OK.code.value()
|
|
||||||
}
|
|
||||||
(1..(clientMessageCount * serverMessageCount + clientMessageCount)).each {
|
|
||||||
def messageId = it
|
|
||||||
event(it - 1) {
|
|
||||||
eventName "message"
|
|
||||||
attributes {
|
|
||||||
"message.type" { it == "RECEIVED" || it == "SENT" }
|
|
||||||
"message.id" messageId
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
error.get() == null
|
|
||||||
serverReceived == clientRange.collect { "call $it" }
|
|
||||||
clientReceived == serverRange.collect { clientRange.collect { "call $it" } }.flatten().sort()
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS)
|
|
||||||
server?.shutdownNow()?.awaitTermination()
|
|
||||||
|
|
||||||
where:
|
|
||||||
paramName | clientMessageCount | serverMessageCount
|
|
||||||
"A" | 1 | 1
|
|
||||||
"B" | 2 | 1
|
|
||||||
"C" | 1 | 2
|
|
||||||
"D" | 2 | 2
|
|
||||||
"E" | 3 | 3
|
|
||||||
|
|
||||||
clientRange = 1..clientMessageCount
|
|
||||||
serverRange = 1..serverMessageCount
|
|
||||||
}
|
|
||||||
}
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,241 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.instrumentation.grpc.v1_6;
|
||||||
|
|
||||||
|
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
|
||||||
|
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
|
||||||
|
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
|
||||||
|
|
||||||
|
import example.GreeterGrpc;
|
||||||
|
import example.Helloworld;
|
||||||
|
import io.grpc.BindableService;
|
||||||
|
import io.grpc.ManagedChannel;
|
||||||
|
import io.grpc.ManagedChannelBuilder;
|
||||||
|
import io.grpc.Server;
|
||||||
|
import io.grpc.ServerBuilder;
|
||||||
|
import io.grpc.Status;
|
||||||
|
import io.grpc.stub.StreamObserver;
|
||||||
|
import io.opentelemetry.api.trace.SpanKind;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||||
|
import io.opentelemetry.instrumentation.testing.util.ThrowingRunnable;
|
||||||
|
import io.opentelemetry.sdk.testing.assertj.EventDataAssert;
|
||||||
|
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junitpioneer.jupiter.cartesian.CartesianTest;
|
||||||
|
|
||||||
|
public abstract class AbstractGrpcStreamingTest {
|
||||||
|
|
||||||
|
protected abstract ServerBuilder<?> configureServer(ServerBuilder<?> server);
|
||||||
|
|
||||||
|
protected abstract ManagedChannelBuilder<?> configureClient(ManagedChannelBuilder<?> client);
|
||||||
|
|
||||||
|
protected abstract InstrumentationExtension testing();
|
||||||
|
|
||||||
|
private final Queue<ThrowingRunnable<?>> closer = new ConcurrentLinkedQueue<>();
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
void tearDown() throws Throwable {
|
||||||
|
while (!closer.isEmpty()) {
|
||||||
|
closer.poll().run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@CartesianTest
|
||||||
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||||
|
void conversation(
|
||||||
|
@CartesianTest.Values(ints = {1, 2, 3}) int clientMessageCount,
|
||||||
|
@CartesianTest.Values(ints = {1, 2, 3}) int serverMessageCount)
|
||||||
|
throws Exception {
|
||||||
|
Queue<String> serverReceived = new ConcurrentLinkedQueue<>();
|
||||||
|
Queue<String> clientReceived = new ConcurrentLinkedQueue<>();
|
||||||
|
AtomicReference<Throwable> error = new AtomicReference<>();
|
||||||
|
CountDownLatch latch = new CountDownLatch(2);
|
||||||
|
|
||||||
|
BindableService greeter =
|
||||||
|
new GreeterGrpc.GreeterImplBase() {
|
||||||
|
@Override
|
||||||
|
public StreamObserver<Helloworld.Response> conversation(
|
||||||
|
StreamObserver<Helloworld.Response> observer) {
|
||||||
|
return new StreamObserver<Helloworld.Response>() {
|
||||||
|
@Override
|
||||||
|
public void onNext(Helloworld.Response value) {
|
||||||
|
serverReceived.add(value.getMessage());
|
||||||
|
|
||||||
|
for (int i = 1; i <= serverMessageCount; i++) {
|
||||||
|
observer.onNext(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable t) {
|
||||||
|
error.set(t);
|
||||||
|
observer.onError(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onCompleted() {
|
||||||
|
observer.onCompleted();
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Server server = configureServer(ServerBuilder.forPort(0).addService(greeter)).build().start();
|
||||||
|
ManagedChannel channel = createChannel(server);
|
||||||
|
closer.add(() -> channel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS));
|
||||||
|
closer.add(() -> server.shutdownNow().awaitTermination());
|
||||||
|
|
||||||
|
GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel).withWaitForReady();
|
||||||
|
|
||||||
|
StreamObserver<Helloworld.Response> observer2 =
|
||||||
|
client.conversation(
|
||||||
|
new StreamObserver<Helloworld.Response>() {
|
||||||
|
@Override
|
||||||
|
public void onNext(Helloworld.Response value) {
|
||||||
|
clientReceived.add(value.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable t) {
|
||||||
|
error.set(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onCompleted() {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
for (int i = 1; i <= clientMessageCount; i++) {
|
||||||
|
Helloworld.Response message =
|
||||||
|
Helloworld.Response.newBuilder().setMessage("call " + i).build();
|
||||||
|
observer2.onNext(message);
|
||||||
|
}
|
||||||
|
observer2.onCompleted();
|
||||||
|
|
||||||
|
latch.await(10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
assertThat(error).hasValue(null);
|
||||||
|
assertThat(serverReceived)
|
||||||
|
.containsExactlyElementsOf(
|
||||||
|
IntStream.rangeClosed(1, clientMessageCount)
|
||||||
|
.mapToObj(i -> "call " + i)
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
assertThat(clientReceived)
|
||||||
|
.containsExactlyElementsOf(
|
||||||
|
IntStream.rangeClosed(1, serverMessageCount)
|
||||||
|
.boxed()
|
||||||
|
.flatMap(
|
||||||
|
unused ->
|
||||||
|
IntStream.rangeClosed(1, clientMessageCount).mapToObj(i -> "call " + i))
|
||||||
|
.sorted()
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
|
||||||
|
List<Consumer<EventDataAssert>> events = new ArrayList<>();
|
||||||
|
for (int i = 1; i <= clientMessageCount * serverMessageCount + clientMessageCount; i++) {
|
||||||
|
long messageId = i;
|
||||||
|
events.add(
|
||||||
|
event ->
|
||||||
|
event
|
||||||
|
.hasName("message")
|
||||||
|
.hasAttributesSatisfying(
|
||||||
|
attrs ->
|
||||||
|
assertThat(attrs)
|
||||||
|
.hasSize(2)
|
||||||
|
.hasEntrySatisfying(
|
||||||
|
SemanticAttributes.MESSAGE_TYPE,
|
||||||
|
val ->
|
||||||
|
assertThat(val)
|
||||||
|
.satisfiesAnyOf(
|
||||||
|
v -> assertThat(v).isEqualTo("RECEIVED"),
|
||||||
|
v -> assertThat(v).isEqualTo("SENT")))
|
||||||
|
.containsEntry(SemanticAttributes.MESSAGE_ID, messageId)));
|
||||||
|
}
|
||||||
|
|
||||||
|
testing()
|
||||||
|
.waitAndAssertTraces(
|
||||||
|
trace ->
|
||||||
|
trace.hasSpansSatisfyingExactly(
|
||||||
|
span ->
|
||||||
|
span.hasName("example.Greeter/Conversation")
|
||||||
|
.hasKind(SpanKind.CLIENT)
|
||||||
|
.hasNoParent()
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"),
|
||||||
|
equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"),
|
||||||
|
equalTo(SemanticAttributes.RPC_METHOD, "Conversation"),
|
||||||
|
equalTo(
|
||||||
|
SemanticAttributes.NET_TRANSPORT,
|
||||||
|
SemanticAttributes.NetTransportValues.IP_TCP),
|
||||||
|
equalTo(
|
||||||
|
SemanticAttributes.RPC_GRPC_STATUS_CODE,
|
||||||
|
(long) Status.Code.OK.value()),
|
||||||
|
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
|
||||||
|
equalTo(SemanticAttributes.NET_PEER_PORT, (long) server.getPort()))
|
||||||
|
.hasEventsSatisfyingExactly(events.toArray(new Consumer[0])),
|
||||||
|
span ->
|
||||||
|
span.hasName("example.Greeter/Conversation")
|
||||||
|
.hasKind(SpanKind.SERVER)
|
||||||
|
.hasParent(trace.getSpan(0))
|
||||||
|
.hasAttributesSatisfyingExactly(
|
||||||
|
equalTo(SemanticAttributes.RPC_SYSTEM, "grpc"),
|
||||||
|
equalTo(SemanticAttributes.RPC_SERVICE, "example.Greeter"),
|
||||||
|
equalTo(SemanticAttributes.RPC_METHOD, "Conversation"),
|
||||||
|
equalTo(SemanticAttributes.NET_PEER_IP, "127.0.0.1"),
|
||||||
|
// net.peer.name resolves to "127.0.0.1" on windows which is same as
|
||||||
|
// net.peer.ip so then not captured
|
||||||
|
satisfies(
|
||||||
|
SemanticAttributes.NET_PEER_NAME,
|
||||||
|
val ->
|
||||||
|
val.satisfiesAnyOf(
|
||||||
|
v -> assertThat(v).isNull(),
|
||||||
|
v -> assertThat(v).isEqualTo("localhost"))),
|
||||||
|
satisfies(
|
||||||
|
SemanticAttributes.NET_PEER_PORT,
|
||||||
|
val -> assertThat(val).isNotNull()),
|
||||||
|
equalTo(
|
||||||
|
SemanticAttributes.NET_TRANSPORT,
|
||||||
|
SemanticAttributes.NetTransportValues.IP_TCP),
|
||||||
|
equalTo(
|
||||||
|
SemanticAttributes.RPC_GRPC_STATUS_CODE,
|
||||||
|
(long) Status.Code.OK.value()))
|
||||||
|
.hasEventsSatisfyingExactly(events.toArray(new Consumer[0]))));
|
||||||
|
}
|
||||||
|
|
||||||
|
private ManagedChannel createChannel(Server server) throws Exception {
|
||||||
|
ManagedChannelBuilder<?> channelBuilder =
|
||||||
|
configureClient(ManagedChannelBuilder.forAddress("localhost", server.getPort()));
|
||||||
|
return createChannel(channelBuilder);
|
||||||
|
}
|
||||||
|
|
||||||
|
static ManagedChannel createChannel(ManagedChannelBuilder<?> channelBuilder) throws Exception {
|
||||||
|
usePlainText(channelBuilder);
|
||||||
|
return channelBuilder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void usePlainText(ManagedChannelBuilder<?> channelBuilder) throws Exception {
|
||||||
|
// Depending on the version of gRPC usePlainText may or may not take an argument.
|
||||||
|
try {
|
||||||
|
channelBuilder
|
||||||
|
.getClass()
|
||||||
|
.getMethod("usePlaintext", boolean.class)
|
||||||
|
.invoke(channelBuilder, true);
|
||||||
|
} catch (NoSuchMethodException unused) {
|
||||||
|
channelBuilder.getClass().getMethod("usePlaintext").invoke(channelBuilder);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue