From fc4410f159a60edf19ebb57802e069b257a1c18c Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Tue, 11 Apr 2023 13:49:33 -0700 Subject: [PATCH] api, census: add new pendingStreamCreated on clientStreamTracer and new tracer annotation (#10014) --- api/src/main/java/io/grpc/ClientStreamTracer.java | 12 ++++++++++++ .../java/io/grpc/census/CensusTracingModule.java | 9 +++++++++ .../test/java/io/grpc/census/CensusModulesTest.java | 2 ++ .../io/grpc/internal/DelayedClientTransport.java | 3 +++ .../grpc/internal/ForwardingClientStreamTracer.java | 5 +++++ .../io/grpc/util/ForwardingClientStreamTracer.java | 5 +++++ 6 files changed, 36 insertions(+) diff --git a/api/src/main/java/io/grpc/ClientStreamTracer.java b/api/src/main/java/io/grpc/ClientStreamTracer.java index 53909f2374..8b3520a8dc 100644 --- a/api/src/main/java/io/grpc/ClientStreamTracer.java +++ b/api/src/main/java/io/grpc/ClientStreamTracer.java @@ -39,6 +39,18 @@ public abstract class ClientStreamTracer extends StreamTracer { public void streamCreated(@Grpc.TransportAttr Attributes transportAttrs, Metadata headers) { } + /** + * Name resolution is completed and the connection starts getting established. This method is only + * invoked on the streams that encounter such delay. + * + *

gRPC buffers the client call if the remote address and configurations, e.g. timeouts and + * retry policy, are not ready. Asynchronously gRPC internally does the name resolution to get + * this information. The streams that are processed immediately on ready transports by the time + * the RPC comes do not go through the pending process, thus this callback will not be invoked. + */ + public void createPendingStream() { + } + /** * Headers has been sent to the socket. */ diff --git a/census/src/main/java/io/grpc/census/CensusTracingModule.java b/census/src/main/java/io/grpc/census/CensusTracingModule.java index 4fa9e28747..dfe437780b 100644 --- a/census/src/main/java/io/grpc/census/CensusTracingModule.java +++ b/census/src/main/java/io/grpc/census/CensusTracingModule.java @@ -299,6 +299,7 @@ final class CensusTracingModule { final Metadata.Key tracingHeader; final boolean isSampledToLocalTracing; volatile int seqNo; + boolean isPendingStream; ClientTracer( Span span, Span parentSpan, Metadata.Key tracingHeader, @@ -315,6 +316,14 @@ final class CensusTracingModule { headers.discardAll(tracingHeader); headers.put(tracingHeader, span.getContext()); } + if (isPendingStream) { + span.addAnnotation("Delayed LB pick complete"); + } + } + + @Override + public void createPendingStream() { + isPendingStream = true; } @Override diff --git a/census/src/test/java/io/grpc/census/CensusModulesTest.java b/census/src/test/java/io/grpc/census/CensusModulesTest.java index 692395b362..2447b2c01f 100644 --- a/census/src/test/java/io/grpc/census/CensusModulesTest.java +++ b/census/src/test/java/io/grpc/census/CensusModulesTest.java @@ -746,6 +746,7 @@ public class CensusModulesTest { censusTracing.newClientCallTracer(spyClientSpan, method); Metadata headers = new Metadata(); ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers); + clientStreamTracer.createPendingStream(); clientStreamTracer.streamCreated(Attributes.EMPTY, headers); verify(tracer).spanBuilderWithExplicitParent( eq("Attempt.package1.service2.method3"), eq(spyClientSpan)); @@ -767,6 +768,7 @@ public class CensusModulesTest { .putAttribute("previous-rpc-attempts", AttributeValue.longAttributeValue(0)); inOrder.verify(spyAttemptSpan) .putAttribute("transparent-retry", AttributeValue.booleanAttributeValue(false)); + inOrder.verify(spyAttemptSpan).addAnnotation("Delayed LB pick complete"); inOrder.verify(spyAttemptSpan, times(2)).addMessageEvent(messageEventCaptor.capture()); List events = messageEventCaptor.getAllValues(); assertEquals( diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 2b1145d1c4..d71de1f5d5 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -183,6 +183,9 @@ final class DelayedClientTransport implements ManagedClientTransport { if (getPendingStreamsCount() == 1) { syncContext.executeLater(reportTransportInUse); } + for (ClientStreamTracer streamTracer : tracers) { + streamTracer.createPendingStream(); + } return pendingStream; } diff --git a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java index fd03564d39..4740a811f3 100644 --- a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java +++ b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java @@ -34,6 +34,11 @@ public abstract class ForwardingClientStreamTracer extends ClientStreamTracer { delegate().streamCreated(transportAttrs, headers); } + @Override + public void createPendingStream() { + delegate().createPendingStream(); + } + @Override public void outboundHeaders() { delegate().outboundHeaders(); diff --git a/core/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java b/core/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java index 7bb9d8cf71..7317917887 100644 --- a/core/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java +++ b/core/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java @@ -33,6 +33,11 @@ public abstract class ForwardingClientStreamTracer extends ClientStreamTracer { delegate().streamCreated(transportAttrs, headers); } + @Override + public void createPendingStream() { + delegate().createPendingStream(); + } + @Override public void outboundHeaders() { delegate().outboundHeaders();