Only instrument builders once (#3544)
This commit is contained in:
parent
df89691ca5
commit
9219fb6067
|
@ -12,8 +12,11 @@ import static net.bytebuddy.matcher.ElementMatchers.isMethod;
|
|||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
|
||||
import io.grpc.ClientInterceptor;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.ContextStore;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
|
||||
import java.util.List;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
|
@ -43,8 +46,14 @@ public class GrpcClientBuilderBuildInstrumentation implements TypeInstrumentatio
|
|||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static void addInterceptor(
|
||||
@Advice.This ManagedChannelBuilder<?> builder,
|
||||
@Advice.FieldValue("interceptors") List<ClientInterceptor> interceptors) {
|
||||
ContextStore<ManagedChannelBuilder<?>, Boolean> instrumented =
|
||||
InstrumentationContext.get(ManagedChannelBuilder.class, Boolean.class);
|
||||
if (!Boolean.TRUE.equals(instrumented.get(builder))) {
|
||||
interceptors.add(0, GrpcSingletons.CLIENT_INTERCEPTOR);
|
||||
instrumented.put(builder, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,8 @@ import io.grpc.ServerBuilder;
|
|||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.CallDepth;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.ContextStore;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
@ -48,7 +50,12 @@ public class GrpcServerBuilderInstrumentation implements TypeInstrumentation {
|
|||
@Advice.Local("otelCallDepth") CallDepth callDepth) {
|
||||
callDepth = CallDepth.forClass(ServerBuilder.class);
|
||||
if (callDepth.getAndIncrement() == 0) {
|
||||
ContextStore<ServerBuilder<?>, Boolean> instrumented =
|
||||
InstrumentationContext.get(ServerBuilder.class, Boolean.class);
|
||||
if (!Boolean.TRUE.equals(instrumented.get(serverBuilder))) {
|
||||
serverBuilder.intercept(GrpcSingletons.SERVER_INTERCEPTOR);
|
||||
instrumented.put(serverBuilder, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -678,4 +678,96 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
|
|||
channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS)
|
||||
server?.shutdownNow()?.awaitTermination()
|
||||
}
|
||||
|
||||
def "test reuse builders"() {
|
||||
setup:
|
||||
BindableService greeter = new GreeterGrpc.GreeterImplBase() {
|
||||
@Override
|
||||
void sayHello(
|
||||
final Helloworld.Request req, final StreamObserver<Helloworld.Response> responseObserver) {
|
||||
final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build()
|
||||
responseObserver.onNext(reply)
|
||||
responseObserver.onCompleted()
|
||||
}
|
||||
}
|
||||
def port = PortUtils.findOpenPort()
|
||||
ServerBuilder serverBuilder = configureServer(ServerBuilder.forPort(port).addService(greeter))
|
||||
// Multiple calls to build on same builder
|
||||
serverBuilder.build()
|
||||
Server server = serverBuilder.build().start()
|
||||
ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port))
|
||||
|
||||
// Depending on the version of gRPC usePlainText may or may not take an argument.
|
||||
try {
|
||||
channelBuilder.usePlaintext()
|
||||
} catch (MissingMethodException e) {
|
||||
channelBuilder.usePlaintext(true)
|
||||
}
|
||||
// Multiple calls to build on the same builder
|
||||
channelBuilder.build()
|
||||
ManagedChannel channel = channelBuilder.build()
|
||||
GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel)
|
||||
|
||||
when:
|
||||
def response = runUnderTrace("parent") {
|
||||
client.sayHello(Helloworld.Request.newBuilder().setName(paramName).build())
|
||||
}
|
||||
|
||||
then:
|
||||
response.message == "Hello $paramName"
|
||||
|
||||
assertTraces(1) {
|
||||
trace(0, 3) {
|
||||
basicSpan(it, 0, "parent")
|
||||
span(1) {
|
||||
name "example.Greeter/SayHello"
|
||||
kind CLIENT
|
||||
childOf span(0)
|
||||
event(0) {
|
||||
eventName "message"
|
||||
attributes {
|
||||
"message.type" "SENT"
|
||||
"message.id" 1
|
||||
}
|
||||
}
|
||||
attributes {
|
||||
"${SemanticAttributes.RPC_SYSTEM.key}" "grpc"
|
||||
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
|
||||
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
|
||||
"${SemanticAttributes.NET_TRANSPORT}" SemanticAttributes.NetTransportValues.IP_TCP
|
||||
"${SemanticAttributes.RPC_GRPC_STATUS_CODE}" Status.Code.OK.value()
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
name "example.Greeter/SayHello"
|
||||
kind SERVER
|
||||
childOf span(1)
|
||||
event(0) {
|
||||
eventName "message"
|
||||
attributes {
|
||||
"message.type" "RECEIVED"
|
||||
"message.id" 1
|
||||
}
|
||||
}
|
||||
attributes {
|
||||
"${SemanticAttributes.RPC_SYSTEM.key}" "grpc"
|
||||
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
|
||||
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" Long
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP
|
||||
"${SemanticAttributes.RPC_GRPC_STATUS_CODE.key}" Status.Code.OK.value()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS)
|
||||
server?.shutdownNow()?.awaitTermination()
|
||||
|
||||
where:
|
||||
paramName << ["some name", "some other name"]
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue