add OpenTelemetryTracingModule (#11477)

This commit is contained in:
yifeizhuang 2024-08-30 12:17:28 -07:00 committed by GitHub
parent c63e354883
commit 421e2371e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 990 additions and 0 deletions

View File

@ -0,0 +1,408 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.opentelemetry;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientStreamTracer;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerStreamTracer;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* Provides factories for {@link io.grpc.StreamTracer} that records tracing to OpenTelemetry.
*/
final class OpenTelemetryTracingModule {
private static final Logger logger = Logger.getLogger(OpenTelemetryTracingModule.class.getName());
@VisibleForTesting
static final String OTEL_TRACING_SCOPE_NAME = "grpc-java";
@Nullable
private static final AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> callEndedUpdater;
@Nullable
private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
/*
* When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their JDK
* reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to
* (potentially racy) direct updates of the volatile variables.
*/
static {
AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> tmpCallEndedUpdater;
AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
try {
tmpCallEndedUpdater =
AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded");
tmpStreamClosedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
} catch (Throwable t) {
logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
tmpCallEndedUpdater = null;
tmpStreamClosedUpdater = null;
}
callEndedUpdater = tmpCallEndedUpdater;
streamClosedUpdater = tmpStreamClosedUpdater;
}
private final Tracer otelTracer;
private final ContextPropagators contextPropagators;
private final MetadataGetter metadataGetter = MetadataGetter.getInstance();
private final MetadataSetter metadataSetter = MetadataSetter.getInstance();
private final TracingClientInterceptor clientInterceptor = new TracingClientInterceptor();
private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory();
OpenTelemetryTracingModule(OpenTelemetry openTelemetry) {
this.otelTracer = checkNotNull(openTelemetry.getTracer(OTEL_TRACING_SCOPE_NAME), "otelTracer");
this.contextPropagators = checkNotNull(openTelemetry.getPropagators(), "contextPropagators");
}
/**
* Creates a {@link CallAttemptsTracerFactory} for a new call.
*/
@VisibleForTesting
CallAttemptsTracerFactory newClientCallTracer(Span clientSpan, MethodDescriptor<?, ?> method) {
return new CallAttemptsTracerFactory(clientSpan, method);
}
/**
* Returns the server tracer factory.
*/
ServerStreamTracer.Factory getServerTracerFactory() {
return serverTracerFactory;
}
/**
* Returns the client interceptor that facilitates otel tracing reporting.
*/
ClientInterceptor getClientInterceptor() {
return clientInterceptor;
}
@VisibleForTesting
final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory {
volatile int callEnded;
private final Span clientSpan;
private final String fullMethodName;
CallAttemptsTracerFactory(Span clientSpan, MethodDescriptor<?, ?> method) {
checkNotNull(method, "method");
this.fullMethodName = checkNotNull(method.getFullMethodName(), "fullMethodName");
this.clientSpan = checkNotNull(clientSpan, "clientSpan");
}
@Override
public ClientStreamTracer newClientStreamTracer(
ClientStreamTracer.StreamInfo info, Metadata headers) {
Span attemptSpan = otelTracer.spanBuilder(
"Attempt." + fullMethodName.replace('/', '.'))
.setParent(Context.current().with(clientSpan))
.startSpan();
attemptSpan.setAttribute(
"previous-rpc-attempts", info.getPreviousAttempts());
attemptSpan.setAttribute(
"transparent-retry",info.isTransparentRetry());
if (info.getCallOptions().getOption(NAME_RESOLUTION_DELAYED) != null) {
clientSpan.addEvent("Delayed name resolution complete");
}
return new ClientTracer(attemptSpan, clientSpan);
}
/**
* Record a finished call and mark the current time as the end time.
*
* <p>Can be called from any thread without synchronization. Calling it the second time or more
* is a no-op.
*/
void callEnded(io.grpc.Status status) {
if (callEndedUpdater != null) {
if (callEndedUpdater.getAndSet(this, 1) != 0) {
return;
}
} else {
if (callEnded != 0) {
return;
}
callEnded = 1;
}
endSpanWithStatus(clientSpan, status);
}
}
private final class ClientTracer extends ClientStreamTracer {
private final Span span;
private final Span parentSpan;
volatile int seqNo;
boolean isPendingStream;
ClientTracer(Span span, Span parentSpan) {
this.span = checkNotNull(span, "span");
this.parentSpan = checkNotNull(parentSpan, "parent span");
}
@Override
public void streamCreated(Attributes transportAtts, Metadata headers) {
contextPropagators.getTextMapPropagator().inject(Context.current().with(span), headers,
metadataSetter);
if (isPendingStream) {
span.addEvent("Delayed LB pick complete");
}
}
@Override
public void createPendingStream() {
isPendingStream = true;
}
@Override
public void outboundMessageSent(
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
recordOutboundMessageSentEvent(span, seqNo, optionalWireSize, optionalUncompressedSize);
}
@Override
public void inboundMessageRead(
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
//TODO(yifeizhuang): needs support from message deframer.
if (optionalWireSize != optionalUncompressedSize) {
recordInboundCompressedMessage(span, seqNo, optionalWireSize);
}
}
@Override
public void inboundMessage(int seqNo) {
this.seqNo = seqNo;
}
@Override
public void inboundUncompressedSize(long bytes) {
recordInboundMessageSize(parentSpan, seqNo, bytes);
}
@Override
public void streamClosed(io.grpc.Status status) {
endSpanWithStatus(span, status);
}
}
private final class ServerTracer extends ServerStreamTracer {
private final Span span;
volatile int streamClosed;
private int seqNo;
ServerTracer(String fullMethodName, @Nullable Span remoteSpan) {
checkNotNull(fullMethodName, "fullMethodName");
this.span =
otelTracer.spanBuilder(generateTraceSpanName(true, fullMethodName))
.setParent(remoteSpan == null ? null : Context.current().with(remoteSpan))
.startSpan();
}
/**
* Record a finished stream and mark the current time as the end time.
*
* <p>Can be called from any thread without synchronization. Calling it the second time or more
* is a no-op.
*/
@Override
public void streamClosed(io.grpc.Status status) {
if (streamClosedUpdater != null) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
}
} else {
if (streamClosed != 0) {
return;
}
streamClosed = 1;
}
endSpanWithStatus(span, status);
}
@Override
public void outboundMessageSent(
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
recordOutboundMessageSentEvent(span, seqNo, optionalWireSize, optionalUncompressedSize);
}
@Override
public void inboundMessageRead(
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
if (optionalWireSize != optionalUncompressedSize) {
recordInboundCompressedMessage(span, seqNo, optionalWireSize);
}
}
@Override
public void inboundMessage(int seqNo) {
this.seqNo = seqNo;
}
@Override
public void inboundUncompressedSize(long bytes) {
recordInboundMessageSize(span, seqNo, bytes);
}
}
@VisibleForTesting
final class ServerTracerFactory extends ServerStreamTracer.Factory {
@SuppressWarnings("ReferenceEquality")
@Override
public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
Context context = contextPropagators.getTextMapPropagator().extract(
Context.current(), headers, metadataGetter
);
Span remoteSpan = Span.fromContext(context);
if (remoteSpan == Span.getInvalid()) {
remoteSpan = null;
}
return new ServerTracer(fullMethodName, remoteSpan);
}
}
@VisibleForTesting
final class TracingClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
Span clientSpan = otelTracer.spanBuilder(
generateTraceSpanName(false, method.getFullMethodName()))
.startSpan();
final CallAttemptsTracerFactory tracerFactory = newClientCallTracer(clientSpan, method);
ClientCall<ReqT, RespT> call =
next.newCall(
method,
callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
delegate().start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(io.grpc.Status status, Metadata trailers) {
tracerFactory.callEnded(status);
super.onClose(status, trailers);
}
},
headers);
}
};
}
}
// Attribute named "message-size" always means the message size the application sees.
// If there was compression, additional event reports "message-size-compressed".
//
// An example trace with message compression:
//
// Sending:
// |-- Event 'Outbound message sent', attributes('sequence-numer' = 0, 'message-size' = 7854,
// 'message-size-compressed' = 5493) ----|
//
// Receiving:
// |-- Event 'Inbound compressed message', attributes('sequence-numer' = 0,
// 'message-size-compressed' = 5493 ) ----|
// |-- Event 'Inbound message received', attributes('sequence-numer' = 0,
// 'message-size' = 7854) ----|
//
// An example trace with no message compression:
//
// Sending:
// |-- Event 'Outbound message sent', attributes('sequence-numer' = 0, 'message-size' = 7854) ---|
//
// Receiving:
// |-- Event 'Inbound message received', attributes('sequence-numer' = 0,
// 'message-size' = 7854) ----|
private void recordOutboundMessageSentEvent(Span span,
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder();
attributesBuilder.put("sequence-number", seqNo);
if (optionalUncompressedSize != -1) {
attributesBuilder.put("message-size", optionalUncompressedSize);
}
if (optionalWireSize != -1 && optionalWireSize != optionalUncompressedSize) {
attributesBuilder.put("message-size-compressed", optionalWireSize);
}
span.addEvent("Outbound message sent", attributesBuilder.build());
}
private void recordInboundCompressedMessage(Span span, int seqNo, long optionalWireSize) {
AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder();
attributesBuilder.put("sequence-number", seqNo);
attributesBuilder.put("message-size-compressed", optionalWireSize);
span.addEvent("Inbound compressed message", attributesBuilder.build());
}
private void recordInboundMessageSize(Span span, int seqNo, long bytes) {
AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder();
attributesBuilder.put("sequence-number", seqNo);
attributesBuilder.put("message-size", bytes);
span.addEvent("Inbound message received", attributesBuilder.build());
}
private String generateErrorStatusDescription(io.grpc.Status status) {
if (status.getDescription() != null) {
return status.getCode() + ": " + status.getDescription();
} else {
return status.getCode().toString();
}
}
private void endSpanWithStatus(Span span, io.grpc.Status status) {
if (status.isOk()) {
span.setStatus(StatusCode.OK);
} else {
span.setStatus(StatusCode.ERROR, generateErrorStatusDescription(status));
}
span.end();
}
/**
* Convert a full method name to a tracing span name.
*
* @param isServer {@code false} if the span is on the client-side, {@code true} if on the
* server-side
* @param fullMethodName the method name as returned by
* {@link MethodDescriptor#getFullMethodName}.
*/
@VisibleForTesting
static String generateTraceSpanName(boolean isServer, String fullMethodName) {
String prefix = isServer ? "Recv" : "Sent";
return prefix + "." + fullMethodName.replace('/', '.');
}
}

View File

@ -0,0 +1,582 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.opentelemetry;
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
import static io.grpc.opentelemetry.OpenTelemetryTracingModule.OTEL_TRACING_SCOPE_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableSet;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ClientStreamTracer;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.opentelemetry.OpenTelemetryTracingModule.CallAttemptsTracerFactory;
import io.grpc.testing.GrpcServerRule;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanId;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.TraceId;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.EventData;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
@RunWith(JUnit4.class)
public class OpenTelemetryTracingModuleTest {
@Rule
public final MockitoRule mocks = MockitoJUnit.rule();
private static final ClientStreamTracer.StreamInfo STREAM_INFO =
ClientStreamTracer.StreamInfo.newBuilder()
.setCallOptions(CallOptions.DEFAULT.withOption(NAME_RESOLUTION_DELAYED, 10L)).build();
private static final CallOptions.Key<String> CUSTOM_OPTION =
CallOptions.Key.createWithDefault("option1", "default");
private static final CallOptions CALL_OPTIONS =
CallOptions.DEFAULT.withOption(CUSTOM_OPTION, "customvalue");
private static class StringInputStream extends InputStream {
final String string;
StringInputStream(String string) {
this.string = string;
}
@Override
public int read() {
// InProcessTransport doesn't actually read bytes from the InputStream. The InputStream is
// passed to the InProcess server and consumed by MARSHALLER.parse().
throw new UnsupportedOperationException("Should not be called");
}
}
private static final MethodDescriptor.Marshaller<String> MARSHALLER =
new MethodDescriptor.Marshaller<String>() {
@Override
public InputStream stream(String value) {
return new StringInputStream(value);
}
@Override
public String parse(InputStream stream) {
return ((StringInputStream) stream).string;
}
};
private final MethodDescriptor<String, String> method =
MethodDescriptor.<String, String>newBuilder()
.setType(MethodDescriptor.MethodType.UNKNOWN)
.setRequestMarshaller(MARSHALLER)
.setResponseMarshaller(MARSHALLER)
.setFullMethodName("package1.service2/method3")
.build();
@Rule
public final OpenTelemetryRule openTelemetryRule = OpenTelemetryRule.create();
@Rule
public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
private Tracer tracerRule;
@Mock
private Tracer mockTracer;
@Mock
TextMapPropagator mockPropagator;
@Mock
private Span mockClientSpan;
@Mock
private Span mockAttemptSpan;
@Mock
private ServerCall.Listener<String> mockServerCallListener;
@Mock
private ClientCall.Listener<String> mockClientCallListener;
@Mock
private SpanBuilder mockSpanBuilder;
@Mock
private OpenTelemetry mockOpenTelemetry;
@Captor
private ArgumentCaptor<String> eventNameCaptor;
@Captor
private ArgumentCaptor<io.opentelemetry.api.common.Attributes> attributesCaptor;
@Captor
private ArgumentCaptor<Status> statusCaptor;
@Before
public void setUp() {
tracerRule = openTelemetryRule.getOpenTelemetry().getTracer(OTEL_TRACING_SCOPE_NAME);
when(mockOpenTelemetry.getTracer(OTEL_TRACING_SCOPE_NAME)).thenReturn(mockTracer);
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(mockPropagator));
when(mockSpanBuilder.startSpan()).thenReturn(mockAttemptSpan);
when(mockSpanBuilder.setParent(any())).thenReturn(mockSpanBuilder);
when(mockTracer.spanBuilder(any())).thenReturn(mockSpanBuilder);
}
// Use mock instead of OpenTelemetryRule to verify inOrder and propagator.
@Test
public void clientBasicTracingMocking() {
OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(mockOpenTelemetry);
CallAttemptsTracerFactory callTracer =
tracingModule.newClientCallTracer(mockClientSpan, method);
Metadata headers = new Metadata();
ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
clientStreamTracer.createPendingStream();
clientStreamTracer.streamCreated(Attributes.EMPTY, headers);
verify(mockTracer).spanBuilder(eq("Attempt.package1.service2.method3"));
verify(mockPropagator).inject(any(), eq(headers), eq(MetadataSetter.getInstance()));
verify(mockClientSpan, never()).end();
verify(mockAttemptSpan, never()).end();
clientStreamTracer.outboundMessage(0);
clientStreamTracer.outboundMessageSent(0, 882, -1);
clientStreamTracer.inboundMessage(0);
clientStreamTracer.outboundMessage(1);
clientStreamTracer.outboundMessageSent(1, -1, 27);
clientStreamTracer.inboundMessageRead(0, 255, 90);
clientStreamTracer.streamClosed(Status.OK);
callTracer.callEnded(Status.OK);
InOrder inOrder = inOrder(mockClientSpan, mockAttemptSpan);
inOrder.verify(mockAttemptSpan)
.setAttribute("previous-rpc-attempts", 0);
inOrder.verify(mockAttemptSpan)
.setAttribute("transparent-retry", false);
inOrder.verify(mockClientSpan).addEvent("Delayed name resolution complete");
inOrder.verify(mockAttemptSpan).addEvent("Delayed LB pick complete");
inOrder.verify(mockAttemptSpan, times(3)).addEvent(
eventNameCaptor.capture(), attributesCaptor.capture()
);
List<String> events = eventNameCaptor.getAllValues();
List<io.opentelemetry.api.common.Attributes> attributes = attributesCaptor.getAllValues();
assertEquals(
"Outbound message sent" ,
events.get(0));
assertEquals(
io.opentelemetry.api.common.Attributes.builder()
.put("sequence-number", 0)
.put("message-size-compressed", 882)
.build(),
attributes.get(0));
assertEquals(
"Outbound message sent" ,
events.get(1));
assertEquals(
io.opentelemetry.api.common.Attributes.builder()
.put("sequence-number", 1)
.put("message-size", 27)
.build(),
attributes.get(1));
assertEquals(
"Inbound compressed message" ,
events.get(2));
assertEquals(
io.opentelemetry.api.common.Attributes.builder()
.put("sequence-number", 0)
.put("message-size-compressed", 255)
.build(),
attributes.get(2));
inOrder.verify(mockAttemptSpan).setStatus(StatusCode.OK);
inOrder.verify(mockAttemptSpan).end();
inOrder.verify(mockClientSpan).setStatus(StatusCode.OK);
inOrder.verify(mockClientSpan).end();
inOrder.verifyNoMoreInteractions();
}
@Test
public void clientBasicTracingRule() {
OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(
openTelemetryRule.getOpenTelemetry());
Span clientSpan = tracerRule.spanBuilder("test-client-span").startSpan();
CallAttemptsTracerFactory callTracer =
tracingModule.newClientCallTracer(clientSpan, method);
Metadata headers = new Metadata();
ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
clientStreamTracer.createPendingStream();
clientStreamTracer.streamCreated(Attributes.EMPTY, headers);
clientStreamTracer.outboundMessage(0);
clientStreamTracer.outboundMessageSent(0, 882, -1);
clientStreamTracer.inboundMessage(0);
clientStreamTracer.outboundMessage(1);
clientStreamTracer.outboundMessageSent(1, -1, 27);
clientStreamTracer.inboundMessageRead(0, 255, -1);
clientStreamTracer.inboundUncompressedSize(288);
clientStreamTracer.inboundMessageRead(1, 128, 128);
clientStreamTracer.inboundMessage(1);
clientStreamTracer.inboundUncompressedSize(128);
clientStreamTracer.streamClosed(Status.OK);
callTracer.callEnded(Status.OK);
List<SpanData> spans = openTelemetryRule.getSpans();
assertEquals(spans.size(), 2);
SpanData attemptSpanData = spans.get(0);
SpanData clientSpanData = spans.get(1);
assertEquals(attemptSpanData.getName(), "Attempt.package1.service2.method3");
assertEquals(clientSpanData.getName(), "test-client-span");
assertEquals(headers.keys(), ImmutableSet.of("traceparent"));
String spanContext = headers.get(
Metadata.Key.of("traceparent", Metadata.ASCII_STRING_MARSHALLER));
assertEquals(spanContext.substring(3, 3 + TraceId.getLength()),
spans.get(1).getSpanContext().getTraceId());
// parent(client) span data
List<EventData> clientSpanEvents = clientSpanData.getEvents();
assertEquals(clientSpanEvents.size(), 3);
assertEquals(
"Delayed name resolution complete",
clientSpanEvents.get(0).getName());
assertTrue(clientSpanEvents.get(0).getAttributes().isEmpty());
assertEquals(
"Inbound message received" ,
clientSpanEvents.get(1).getName());
assertEquals(
io.opentelemetry.api.common.Attributes.builder()
.put("sequence-number", 0)
.put("message-size", 288)
.build(),
clientSpanEvents.get(1).getAttributes());
assertEquals(
"Inbound message received" ,
clientSpanEvents.get(2).getName());
assertEquals(
io.opentelemetry.api.common.Attributes.builder()
.put("sequence-number", 1)
.put("message-size", 128)
.build(),
clientSpanEvents.get(2).getAttributes());
assertEquals(clientSpanData.hasEnded(), true);
// child(attempt) span data
List<EventData> attemptSpanEvents = attemptSpanData.getEvents();
assertEquals(clientSpanEvents.size(), 3);
assertEquals(
"Delayed LB pick complete",
attemptSpanEvents.get(0).getName());
assertTrue(clientSpanEvents.get(0).getAttributes().isEmpty());
assertEquals(
"Outbound message sent" ,
attemptSpanEvents.get(1).getName());
assertEquals(
io.opentelemetry.api.common.Attributes.builder()
.put("sequence-number", 0)
.put("message-size-compressed", 882)
.build(),
attemptSpanEvents.get(1).getAttributes());
assertEquals(
"Outbound message sent" ,
attemptSpanEvents.get(2).getName());
assertEquals(
io.opentelemetry.api.common.Attributes.builder()
.put("sequence-number", 1)
.put("message-size", 27)
.build(),
attemptSpanEvents.get(2).getAttributes());
assertEquals(
"Inbound compressed message" ,
attemptSpanEvents.get(3).getName());
assertEquals(
io.opentelemetry.api.common.Attributes.builder()
.put("sequence-number", 0)
.put("message-size-compressed", 255)
.build(),
attemptSpanEvents.get(3).getAttributes());
assertEquals(attemptSpanData.hasEnded(), true);
}
@Test
public void clientInterceptor() {
testClientInterceptors(false);
}
@Test
public void clientInterceptorNonDefaultOtelContext() {
testClientInterceptors(true);
}
private void testClientInterceptors(boolean nonDefaultOtelContext) {
final AtomicReference<Metadata> capturedMetadata = new AtomicReference<>();
grpcServerRule.getServiceRegistry().addService(
ServerServiceDefinition.builder("package1.service2").addMethod(
method, new ServerCallHandler<String, String>() {
@Override
public ServerCall.Listener<String> startCall(
ServerCall<String, String> call, Metadata headers) {
capturedMetadata.set(headers);
call.sendHeaders(new Metadata());
call.sendMessage("Hello");
call.close(
Status.PERMISSION_DENIED.withDescription("No you don't"), new Metadata());
return mockServerCallListener;
}
}).build());
final AtomicReference<CallOptions> capturedCallOptions = new AtomicReference<>();
ClientInterceptor callOptionsCaptureInterceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
capturedCallOptions.set(callOptions);
return next.newCall(method, callOptions);
}
};
OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(
openTelemetryRule.getOpenTelemetry());
Channel interceptedChannel =
ClientInterceptors.intercept(
grpcServerRule.getChannel(), callOptionsCaptureInterceptor,
tracingModule.getClientInterceptor());
Span parentSpan = tracerRule.spanBuilder("test-parent-span").startSpan();
ClientCall<String, String> call;
if (nonDefaultOtelContext) {
try (Scope scope = io.opentelemetry.context.Context.current().with(parentSpan)
.makeCurrent()) {
call = interceptedChannel.newCall(method, CALL_OPTIONS);
}
} else {
call = interceptedChannel.newCall(method, CALL_OPTIONS);
}
assertEquals("customvalue", capturedCallOptions.get().getOption(CUSTOM_OPTION));
assertEquals(1, capturedCallOptions.get().getStreamTracerFactories().size());
assertTrue(
capturedCallOptions.get().getStreamTracerFactories().get(0)
instanceof CallAttemptsTracerFactory);
// Make the call
Metadata headers = new Metadata();
call.start(mockClientCallListener, headers);
// End the call
call.halfClose();
call.request(1);
parentSpan.end();
verify(mockClientCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
Status status = statusCaptor.getValue();
assertEquals(Status.Code.PERMISSION_DENIED, status.getCode());
assertEquals("No you don't", status.getDescription());
List<SpanData> spans = openTelemetryRule.getSpans();
assertEquals(spans.size(), 3);
SpanData clientSpan = spans.get(1);
SpanData attemptSpan = spans.get(0);
if (nonDefaultOtelContext) {
assertEquals(clientSpan.getParentSpanContext(), parentSpan.getSpanContext());
} else {
assertEquals(clientSpan.getParentSpanContext(),
Span.fromContext(Context.root()).getSpanContext());
}
String spanContext = capturedMetadata.get().get(
Metadata.Key.of("traceparent", Metadata.ASCII_STRING_MARSHALLER));
// W3C format: 00-<trace id>-<span id>-<trace flag>
assertEquals(spanContext.substring(3, 3 + TraceId.getLength()),
attemptSpan.getSpanContext().getTraceId());
assertEquals(spanContext.substring(3 + TraceId.getLength() + 1,
3 + TraceId.getLength() + 1 + SpanId.getLength()),
attemptSpan.getSpanContext().getSpanId());
assertEquals(attemptSpan.getParentSpanContext(), clientSpan.getSpanContext());
assertTrue(clientSpan.hasEnded());
assertEquals(clientSpan.getStatus().getStatusCode(), StatusCode.ERROR);
assertEquals(clientSpan.getStatus().getDescription(), "PERMISSION_DENIED: No you don't");
assertTrue(attemptSpan.hasEnded());
assertTrue(attemptSpan.hasEnded());
assertEquals(attemptSpan.getStatus().getStatusCode(), StatusCode.ERROR);
assertEquals(attemptSpan.getStatus().getDescription(), "PERMISSION_DENIED: No you don't");
}
@Test
public void clientStreamNeverCreatedStillRecordTracing() {
OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(mockOpenTelemetry);
CallAttemptsTracerFactory callTracer =
tracingModule.newClientCallTracer(mockClientSpan, method);
callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds"));
verify(mockClientSpan).end();
verify(mockClientSpan).setStatus(eq(StatusCode.ERROR),
eq("DEADLINE_EXCEEDED: 3 seconds"));
verifyNoMoreInteractions(mockClientSpan);
}
@Test
public void serverBasicTracingNoHeaders() {
OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(
openTelemetryRule.getOpenTelemetry());
ServerStreamTracer.Factory tracerFactory = tracingModule.getServerTracerFactory();
ServerStreamTracer serverStreamTracer =
tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata());
assertSame(Span.fromContext(Context.current()), Span.getInvalid());
serverStreamTracer.outboundMessage(0);
serverStreamTracer.outboundMessageSent(0, 882, 998);
serverStreamTracer.inboundMessage(0);
serverStreamTracer.outboundMessage(1);
serverStreamTracer.outboundMessageSent(1, -1, 27);
serverStreamTracer.inboundMessageRead(0, 90, -1);
serverStreamTracer.inboundUncompressedSize(255);
serverStreamTracer.streamClosed(Status.CANCELLED);
List<SpanData> spans = openTelemetryRule.getSpans();
assertEquals(spans.size(), 1);
assertEquals(spans.get(0).getName(), "Recv.package1.service2.method3");
assertEquals(spans.get(0).getParentSpanContext(), Span.getInvalid().getSpanContext());
List<EventData> events = spans.get(0).getEvents();
assertEquals(events.size(), 4);
assertEquals(
"Outbound message sent" ,
events.get(0).getName());
assertEquals(
io.opentelemetry.api.common.Attributes.builder()
.put("sequence-number", 0)
.put("message-size-compressed", 882)
.put("message-size", 998)
.build(),
events.get(0).getAttributes());
assertEquals(
"Outbound message sent" ,
events.get(1).getName());
assertEquals(
io.opentelemetry.api.common.Attributes.builder()
.put("sequence-number", 1)
.put("message-size", 27)
.build(),
events.get(1).getAttributes());
assertEquals(
"Inbound compressed message" ,
events.get(2).getName());
assertEquals(
io.opentelemetry.api.common.Attributes.builder()
.put("sequence-number", 0)
.put("message-size-compressed", 90)
.build(),
events.get(2).getAttributes());
assertEquals(
"Inbound message received" ,
events.get(3).getName());
assertEquals(
io.opentelemetry.api.common.Attributes.builder()
.put("sequence-number", 0)
.put("message-size", 255)
.build(),
events.get(3).getAttributes());
assertEquals(spans.get(0).hasEnded(), true);
}
@Test
public void grpcTraceBinPropagator() {
when(mockOpenTelemetry.getPropagators()).thenReturn(
ContextPropagators.create(GrpcTraceBinContextPropagator.defaultInstance()));
ArgumentCaptor<Context> contextArgumentCaptor = ArgumentCaptor.forClass(Context.class);
OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(mockOpenTelemetry);
Span testClientSpan = tracerRule.spanBuilder("test-client-span").startSpan();
CallAttemptsTracerFactory callTracer =
tracingModule.newClientCallTracer(testClientSpan, method);
Span testAttemptSpan = tracerRule.spanBuilder("test-attempt-span").startSpan();
when(mockSpanBuilder.startSpan()).thenReturn(testAttemptSpan);
Metadata headers = new Metadata();
ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
clientStreamTracer.streamCreated(Attributes.EMPTY, headers);
clientStreamTracer.streamClosed(Status.CANCELLED);
Metadata.Key<byte[]> key = Metadata.Key.of(
GrpcTraceBinContextPropagator.GRPC_TRACE_BIN_HEADER, Metadata.BINARY_BYTE_MARSHALLER);
assertTrue(Arrays.equals(BinaryFormat.getInstance().toBytes(testAttemptSpan.getSpanContext()),
headers.get(key)
));
verify(mockSpanBuilder).setParent(contextArgumentCaptor.capture());
assertEquals(testClientSpan, Span.fromContext(contextArgumentCaptor.getValue()));
Span serverSpan = tracerRule.spanBuilder("test-server-span").startSpan();
when(mockSpanBuilder.startSpan()).thenReturn(serverSpan);
ServerStreamTracer.Factory tracerFactory = tracingModule.getServerTracerFactory();
ServerStreamTracer serverStreamTracer =
tracerFactory.newServerStreamTracer(method.getFullMethodName(), headers);
serverStreamTracer.streamClosed(Status.CANCELLED);
verify(mockSpanBuilder, times(2))
.setParent(contextArgumentCaptor.capture());
assertEquals(testAttemptSpan.getSpanContext(),
Span.fromContext(contextArgumentCaptor.getValue()).getSpanContext());
}
@Test
public void generateTraceSpanName() {
assertEquals(
"Sent.io.grpc.Foo", OpenTelemetryTracingModule.generateTraceSpanName(
false, "io.grpc/Foo"));
assertEquals(
"Recv.io.grpc.Bar", OpenTelemetryTracingModule.generateTraceSpanName(
true, "io.grpc/Bar"));
}
}