api, census: add new pendingStreamCreated on clientStreamTracer and new tracer annotation (#10014)

This commit is contained in:
yifeizhuang 2023-04-11 13:49:33 -07:00 committed by GitHub
parent 8d98e5ff7f
commit fc4410f159
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 36 additions and 0 deletions

View File

@ -39,6 +39,18 @@ public abstract class ClientStreamTracer extends StreamTracer {
public void streamCreated(@Grpc.TransportAttr Attributes transportAttrs, Metadata headers) { public void streamCreated(@Grpc.TransportAttr Attributes transportAttrs, Metadata headers) {
} }
/**
* Name resolution is completed and the connection starts getting established. This method is only
* invoked on the streams that encounter such delay.
*
* </p>gRPC buffers the client call if the remote address and configurations, e.g. timeouts and
* retry policy, are not ready. Asynchronously gRPC internally does the name resolution to get
* this information. The streams that are processed immediately on ready transports by the time
* the RPC comes do not go through the pending process, thus this callback will not be invoked.
*/
public void createPendingStream() {
}
/** /**
* Headers has been sent to the socket. * Headers has been sent to the socket.
*/ */

View File

@ -299,6 +299,7 @@ final class CensusTracingModule {
final Metadata.Key<SpanContext> tracingHeader; final Metadata.Key<SpanContext> tracingHeader;
final boolean isSampledToLocalTracing; final boolean isSampledToLocalTracing;
volatile int seqNo; volatile int seqNo;
boolean isPendingStream;
ClientTracer( ClientTracer(
Span span, Span parentSpan, Metadata.Key<SpanContext> tracingHeader, Span span, Span parentSpan, Metadata.Key<SpanContext> tracingHeader,
@ -315,6 +316,14 @@ final class CensusTracingModule {
headers.discardAll(tracingHeader); headers.discardAll(tracingHeader);
headers.put(tracingHeader, span.getContext()); headers.put(tracingHeader, span.getContext());
} }
if (isPendingStream) {
span.addAnnotation("Delayed LB pick complete");
}
}
@Override
public void createPendingStream() {
isPendingStream = true;
} }
@Override @Override

View File

@ -746,6 +746,7 @@ public class CensusModulesTest {
censusTracing.newClientCallTracer(spyClientSpan, method); censusTracing.newClientCallTracer(spyClientSpan, method);
Metadata headers = new Metadata(); Metadata headers = new Metadata();
ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers); ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
clientStreamTracer.createPendingStream();
clientStreamTracer.streamCreated(Attributes.EMPTY, headers); clientStreamTracer.streamCreated(Attributes.EMPTY, headers);
verify(tracer).spanBuilderWithExplicitParent( verify(tracer).spanBuilderWithExplicitParent(
eq("Attempt.package1.service2.method3"), eq(spyClientSpan)); eq("Attempt.package1.service2.method3"), eq(spyClientSpan));
@ -767,6 +768,7 @@ public class CensusModulesTest {
.putAttribute("previous-rpc-attempts", AttributeValue.longAttributeValue(0)); .putAttribute("previous-rpc-attempts", AttributeValue.longAttributeValue(0));
inOrder.verify(spyAttemptSpan) inOrder.verify(spyAttemptSpan)
.putAttribute("transparent-retry", AttributeValue.booleanAttributeValue(false)); .putAttribute("transparent-retry", AttributeValue.booleanAttributeValue(false));
inOrder.verify(spyAttemptSpan).addAnnotation("Delayed LB pick complete");
inOrder.verify(spyAttemptSpan, times(2)).addMessageEvent(messageEventCaptor.capture()); inOrder.verify(spyAttemptSpan, times(2)).addMessageEvent(messageEventCaptor.capture());
List<MessageEvent> events = messageEventCaptor.getAllValues(); List<MessageEvent> events = messageEventCaptor.getAllValues();
assertEquals( assertEquals(

View File

@ -183,6 +183,9 @@ final class DelayedClientTransport implements ManagedClientTransport {
if (getPendingStreamsCount() == 1) { if (getPendingStreamsCount() == 1) {
syncContext.executeLater(reportTransportInUse); syncContext.executeLater(reportTransportInUse);
} }
for (ClientStreamTracer streamTracer : tracers) {
streamTracer.createPendingStream();
}
return pendingStream; return pendingStream;
} }

View File

@ -34,6 +34,11 @@ public abstract class ForwardingClientStreamTracer extends ClientStreamTracer {
delegate().streamCreated(transportAttrs, headers); delegate().streamCreated(transportAttrs, headers);
} }
@Override
public void createPendingStream() {
delegate().createPendingStream();
}
@Override @Override
public void outboundHeaders() { public void outboundHeaders() {
delegate().outboundHeaders(); delegate().outboundHeaders();

View File

@ -33,6 +33,11 @@ public abstract class ForwardingClientStreamTracer extends ClientStreamTracer {
delegate().streamCreated(transportAttrs, headers); delegate().streamCreated(transportAttrs, headers);
} }
@Override
public void createPendingStream() {
delegate().createPendingStream();
}
@Override @Override
public void outboundHeaders() { public void outboundHeaders() {
delegate().outboundHeaders(); delegate().outboundHeaders();