From fd2a58a55e54c04250f77533c44d1e05c0e5985b Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Wed, 11 Aug 2021 10:24:37 -0700 Subject: [PATCH] all: implement retry stats (#8362) --- .../main/java/io/grpc/ClientStreamTracer.java | 30 +- .../java/io/grpc/ManagedChannelBuilder.java | 3 - .../io/grpc/census/CensusStatsModule.java | 228 +++++++---- .../io/grpc/census/CensusTracingModule.java | 47 ++- .../io/grpc/census/CensusModulesTest.java | 269 +++++++++++- .../java/io/grpc/internal/ClientCallImpl.java | 3 +- .../main/java/io/grpc/internal/GrpcUtil.java | 3 +- .../io/grpc/internal/ManagedChannelImpl.java | 11 +- .../internal/ManagedChannelImplBuilder.java | 12 - .../java/io/grpc/internal/OobChannel.java | 2 +- .../io/grpc/internal/RetriableStream.java | 35 +- .../io/grpc/internal/SubchannelChannel.java | 2 +- .../io/grpc/internal/RetriableStreamTest.java | 3 +- interop-testing/build.gradle | 1 + .../integration/AbstractInteropTest.java | 23 ++ .../grpc/testing/integration/RetryTest.java | 382 +++++++++++++++--- 16 files changed, 863 insertions(+), 191 deletions(-) diff --git a/api/src/main/java/io/grpc/ClientStreamTracer.java b/api/src/main/java/io/grpc/ClientStreamTracer.java index 6a5d3cc339..bb836ac82e 100644 --- a/api/src/main/java/io/grpc/ClientStreamTracer.java +++ b/api/src/main/java/io/grpc/ClientStreamTracer.java @@ -97,11 +97,15 @@ public abstract class ClientStreamTracer extends StreamTracer { public static final class StreamInfo { private final Attributes transportAttrs; private final CallOptions callOptions; + private final int previousAttempts; private final boolean isTransparentRetry; - StreamInfo(Attributes transportAttrs, CallOptions callOptions, boolean isTransparentRetry) { + StreamInfo( + Attributes transportAttrs, CallOptions callOptions, int previousAttempts, + boolean isTransparentRetry) { this.transportAttrs = checkNotNull(transportAttrs, "transportAttrs"); this.callOptions = checkNotNull(callOptions, "callOptions"); + this.previousAttempts = previousAttempts; this.isTransparentRetry = isTransparentRetry; } @@ -124,6 +128,15 @@ public abstract class ClientStreamTracer extends StreamTracer { return callOptions; } + /** + * Returns the number of preceding attempts for the RPC. + * + * @since 1.40.0 + */ + public int getPreviousAttempts() { + return previousAttempts; + } + /** * Whether the stream is a transparent retry. * @@ -142,6 +155,7 @@ public abstract class ClientStreamTracer extends StreamTracer { return new Builder() .setCallOptions(callOptions) .setTransportAttrs(transportAttrs) + .setPreviousAttempts(previousAttempts) .setIsTransparentRetry(isTransparentRetry); } @@ -159,6 +173,7 @@ public abstract class ClientStreamTracer extends StreamTracer { return MoreObjects.toStringHelper(this) .add("transportAttrs", transportAttrs) .add("callOptions", callOptions) + .add("previousAttempts", previousAttempts) .add("isTransparentRetry", isTransparentRetry) .toString(); } @@ -171,6 +186,7 @@ public abstract class ClientStreamTracer extends StreamTracer { public static final class Builder { private Attributes transportAttrs = Attributes.EMPTY; private CallOptions callOptions = CallOptions.DEFAULT; + private int previousAttempts; private boolean isTransparentRetry; Builder() { @@ -197,6 +213,16 @@ public abstract class ClientStreamTracer extends StreamTracer { return this; } + /** + * Set the number of preceding attempts of the RPC. + * + * @since 1.40.0 + */ + public Builder setPreviousAttempts(int previousAttempts) { + this.previousAttempts = previousAttempts; + return this; + } + /** * Sets whether the stream is a transparent retry. * @@ -211,7 +237,7 @@ public abstract class ClientStreamTracer extends StreamTracer { * Builds a new StreamInfo. */ public StreamInfo build() { - return new StreamInfo(transportAttrs, callOptions, isTransparentRetry); + return new StreamInfo(transportAttrs, callOptions, previousAttempts, isTransparentRetry); } } } diff --git a/api/src/main/java/io/grpc/ManagedChannelBuilder.java b/api/src/main/java/io/grpc/ManagedChannelBuilder.java index e4a4611541..73e66ed6dc 100644 --- a/api/src/main/java/io/grpc/ManagedChannelBuilder.java +++ b/api/src/main/java/io/grpc/ManagedChannelBuilder.java @@ -479,9 +479,6 @@ public abstract class ManagedChannelBuilder> * transparent retries, which are safe for non-idempotent RPCs. Service config is ideally provided * by the name resolver, but may also be specified via {@link #defaultServiceConfig}. * - *

For the current release, this method may have a side effect that disables Census stats and - * tracing. - * * @return this * @since 1.11.0 */ diff --git a/census/src/main/java/io/grpc/census/CensusStatsModule.java b/census/src/main/java/io/grpc/census/CensusStatsModule.java index ac5f4e705e..6faeb575cc 100644 --- a/census/src/main/java/io/grpc/census/CensusStatsModule.java +++ b/census/src/main/java/io/grpc/census/CensusStatsModule.java @@ -17,7 +17,6 @@ package io.grpc.census; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; @@ -28,16 +27,20 @@ import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.ClientStreamTracer; +import io.grpc.ClientStreamTracer.StreamInfo; import io.grpc.Context; +import io.grpc.Deadline; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerStreamTracer; import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.StreamTracer; import io.grpc.census.internal.DeprecatedCensusConstants; import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; +import io.opencensus.stats.Measure; import io.opencensus.stats.Measure.MeasureDouble; import io.opencensus.stats.Measure.MeasureLong; import io.opencensus.stats.MeasureMap; @@ -51,9 +54,11 @@ import io.opencensus.tags.propagation.TagContextBinarySerializer; import io.opencensus.tags.propagation.TagContextSerializationException; import io.opencensus.tags.unsafe.ContextUtils; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -61,9 +66,10 @@ import javax.annotation.Nullable; /** * Provides factories for {@link StreamTracer} that records stats to Census. * - *

On the client-side, a factory is created for each call, because ClientCall starts earlier than - * the ClientStream, and in some cases may even not create a ClientStream at all. Therefore, it's - * the factory that reports the summary to Census. + *

On the client-side, a factory is created for each call, and the factory creates a stream + * tracer for each attempt. If there is no stream created when the call is ended, we still create a + * tracer. It's the tracer that reports per-attempt stats, and the factory that reports the stats + * of the overall RPC, such as RETRIES_PER_CALL, to Census. * *

On the server-side, there is only one ServerStream per each ServerCall, and ServerStream * starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call and @@ -168,7 +174,6 @@ final class CensusStatsModule { } private static final class ClientTracer extends ClientStreamTracer { - @Nullable private static final AtomicLongFieldUpdater outboundMessageCountUpdater; @Nullable private static final AtomicLongFieldUpdater inboundMessageCountUpdater; @Nullable private static final AtomicLongFieldUpdater outboundWireSizeUpdater; @@ -222,21 +227,31 @@ final class CensusStatsModule { inboundUncompressedSizeUpdater = tmpInboundUncompressedSizeUpdater; } - private final CensusStatsModule module; + final Stopwatch stopwatch; + final CallAttemptsTracerFactory attemptsState; + final AtomicBoolean inboundReceivedOrClosed = new AtomicBoolean(); + final CensusStatsModule module; final TagContext parentCtx; - private final TagContext startCtx; - + final TagContext startCtx; + final StreamInfo info; volatile long outboundMessageCount; volatile long inboundMessageCount; volatile long outboundWireSize; volatile long inboundWireSize; volatile long outboundUncompressedSize; volatile long inboundUncompressedSize; + long roundtripNanos; + Code statusCode; - ClientTracer(CensusStatsModule module, TagContext parentCtx, TagContext startCtx) { - this.module = checkNotNull(module, "module"); + ClientTracer( + CallAttemptsTracerFactory attemptsState, CensusStatsModule module, TagContext parentCtx, + TagContext startCtx, StreamInfo info) { + this.attemptsState = attemptsState; + this.module = module; this.parentCtx = parentCtx; - this.startCtx = checkNotNull(startCtx, "startCtx"); + this.startCtx = startCtx; + this.info = info; + this.stopwatch = module.stopwatchSupplier.get().start(); } @Override @@ -296,6 +311,11 @@ final class CensusStatsModule { @Override @SuppressWarnings("NonAtomicVolatileUpdate") public void inboundMessage(int seqNo) { + if (inboundReceivedOrClosed.compareAndSet(false, true)) { + // Because inboundUncompressedSize() might be called after streamClosed(), + // we will report stats in callEnded(). Note that this attempt is already committed. + attemptsState.inboundMetricTracer = this; + } if (inboundMessageCountUpdater != null) { inboundMessageCountUpdater.getAndIncrement(this); } else { @@ -316,14 +336,74 @@ final class CensusStatsModule { module.recordRealTimeMetric( startCtx, RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1); } + + @Override + public void streamClosed(Status status) { + attemptsState.attemptEnded(); + stopwatch.stop(); + roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); + Deadline deadline = info.getCallOptions().getDeadline(); + statusCode = status.getCode(); + if (statusCode == Status.Code.CANCELLED && deadline != null) { + // When the server's deadline expires, it can only reset the stream with CANCEL and no + // description. Since our timer may be delayed in firing, we double-check the deadline and + // turn the failure into the likely more helpful DEADLINE_EXCEEDED status. + if (deadline.isExpired()) { + statusCode = Code.DEADLINE_EXCEEDED; + } + } + if (inboundReceivedOrClosed.compareAndSet(false, true)) { + if (module.recordFinishedRpcs) { + // Stream is closed early. So no need to record metrics for any inbound events after this + // point. + recordFinishedRpc(); + } + } // Otherwise will report stats in callEnded() to guarantee all inbound metrics are recorded. + } + + void recordFinishedRpc() { + MeasureMap measureMap = module.statsRecorder.newMeasureMap() + // TODO(songya): remove the deprecated measure constants once they are completed removed. + .put(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT, 1) + // The latency is double value + .put( + DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, + roundtripNanos / NANOS_PER_MILLI) + .put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT, outboundMessageCount) + .put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_COUNT, inboundMessageCount) + .put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES, outboundWireSize) + .put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_BYTES, inboundWireSize) + .put( + DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES, + outboundUncompressedSize) + .put( + DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES, + inboundUncompressedSize); + if (statusCode != Code.OK) { + measureMap.put(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT, 1); + } + TagValue statusTag = TagValue.create(statusCode.toString()); + measureMap.record( + module + .tagger + .toBuilder(startCtx) + .putLocal(RpcMeasureConstants.GRPC_CLIENT_STATUS, statusTag) + .build()); + } } @VisibleForTesting static final class CallAttemptsTracerFactory extends ClientStreamTracer.InternalLimitedInfoFactory { - @Nullable - private static final AtomicReferenceFieldUpdater - streamTracerUpdater; + static final MeasureLong RETRIES_PER_CALL = + Measure.MeasureLong.create( + "grpc.io/client/retries_per_call", "Number of retries per call", "1"); + static final MeasureLong TRANSPARENT_RETRIES_PER_CALL = + Measure.MeasureLong.create( + "grpc.io/client/transparent_retries_per_call", "Transparent retries per call", "1"); + static final MeasureDouble RETRY_DELAY_PER_CALL = + Measure.MeasureDouble.create( + "grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms"); @Nullable private static final AtomicIntegerFieldUpdater callEndedUpdater; @@ -334,40 +414,45 @@ final class CensusStatsModule { * (potentially racy) direct updates of the volatile variables. */ static { - AtomicReferenceFieldUpdater tmpStreamTracerUpdater; AtomicIntegerFieldUpdater tmpCallEndedUpdater; try { - tmpStreamTracerUpdater = - AtomicReferenceFieldUpdater.newUpdater( - CallAttemptsTracerFactory.class, ClientTracer.class, "streamTracer"); tmpCallEndedUpdater = AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded"); } catch (Throwable t) { logger.log(Level.SEVERE, "Creating atomic field updaters failed", t); - tmpStreamTracerUpdater = null; tmpCallEndedUpdater = null; } - streamTracerUpdater = tmpStreamTracerUpdater; callEndedUpdater = tmpCallEndedUpdater; } + ClientTracer inboundMetricTracer; private final CensusStatsModule module; private final Stopwatch stopwatch; - private volatile ClientTracer streamTracer; private volatile int callEnded; private final TagContext parentCtx; private final TagContext startCtx; + private final String fullMethodName; + + // TODO(zdapeng): optimize memory allocation using AtomicFieldUpdater. + private final AtomicLong attemptsPerCall = new AtomicLong(); + private final AtomicLong transparentRetriesPerCall = new AtomicLong(); + private final AtomicLong retryDelayNanos = new AtomicLong(); + private final AtomicLong lastInactiveTimeStamp = new AtomicLong(); + private final AtomicInteger activeStreams = new AtomicInteger(); + private final AtomicBoolean activated = new AtomicBoolean(); CallAttemptsTracerFactory( CensusStatsModule module, TagContext parentCtx, String fullMethodName) { - this.module = checkNotNull(module); - this.parentCtx = checkNotNull(parentCtx); + this.module = checkNotNull(module, "module"); + this.parentCtx = checkNotNull(parentCtx, "parentCtx"); + this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); + this.stopwatch = module.stopwatchSupplier.get().start(); TagValue methodTag = TagValue.create(fullMethodName); - this.startCtx = module.tagger.toBuilder(parentCtx) + startCtx = module.tagger.toBuilder(parentCtx) .putLocal(RpcMeasureConstants.GRPC_CLIENT_METHOD, methodTag) .build(); - this.stopwatch = module.stopwatchSupplier.get().start(); if (module.recordStartedRpcs) { + // Record here in case newClientStreamTracer() would never be called. module.statsRecorder.newMeasureMap() .put(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT, 1) .record(startCtx); @@ -375,30 +460,37 @@ final class CensusStatsModule { } @Override - public ClientStreamTracer newClientStreamTracer( - ClientStreamTracer.StreamInfo info, Metadata headers) { - ClientTracer tracer = new ClientTracer(module, parentCtx, startCtx); - // TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than - // one streams. We will need to update this file to support them. - if (streamTracerUpdater != null) { - checkState( - streamTracerUpdater.compareAndSet(this, null, tracer), - "Are you creating multiple streams per call? This class doesn't yet support this case"); + public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) { + ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, info); + if (activeStreams.incrementAndGet() == 1) { + if (!activated.compareAndSet(false, true)) { + retryDelayNanos.addAndGet(stopwatch.elapsed(TimeUnit.NANOSECONDS)); + } + } + if (module.recordStartedRpcs && attemptsPerCall.get() > 0) { + module.statsRecorder.newMeasureMap() + .put(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT, 1) + .record(startCtx); + } + if (info.isTransparentRetry()) { + transparentRetriesPerCall.incrementAndGet(); } else { - checkState( - streamTracer == null, - "Are you creating multiple streams per call? This class doesn't yet support this case"); - streamTracer = tracer; + attemptsPerCall.incrementAndGet(); } return tracer; } - /** - * Record a finished call and mark the current time as the end time. - * - *

Can be called from any thread without synchronization. Calling it the second time or more - * is a no-op. - */ + // Called whenever each attempt is ended. + void attemptEnded() { + if (activeStreams.decrementAndGet() == 0) { + // Race condition between two extremely close events does not matter because the difference + // in the result would be very small. + long lastInactiveTimeStamp = + this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS)); + retryDelayNanos.addAndGet(-lastInactiveTimeStamp); + } + } + void callEnded(Status status) { if (callEndedUpdater != null) { if (callEndedUpdater.getAndSet(this, 1) != 0) { @@ -414,36 +506,30 @@ final class CensusStatsModule { return; } stopwatch.stop(); - long roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); - ClientTracer tracer = streamTracer; - if (tracer == null) { - tracer = new ClientTracer(module, parentCtx, startCtx); + if (attemptsPerCall.get() == 0) { + ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, null); + tracer.roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); + tracer.statusCode = status.getCode(); + tracer.recordFinishedRpc(); + } else if (inboundMetricTracer != null) { + inboundMetricTracer.recordFinishedRpc(); + } + + long retriesPerCall = 0; + long attempts = attemptsPerCall.get(); + if (attempts > 0) { + retriesPerCall = attempts - 1; } MeasureMap measureMap = module.statsRecorder.newMeasureMap() - // TODO(songya): remove the deprecated measure constants once they are completed removed. - .put(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT, 1) - // The latency is double value - .put( - DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, - roundtripNanos / NANOS_PER_MILLI) - .put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount) - .put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount) - .put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES, tracer.outboundWireSize) - .put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_BYTES, tracer.inboundWireSize) - .put( - DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES, - tracer.outboundUncompressedSize) - .put( - DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES, - tracer.inboundUncompressedSize); - if (!status.isOk()) { - measureMap.put(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT, 1); - } + .put(RETRIES_PER_CALL, retriesPerCall) + .put(TRANSPARENT_RETRIES_PER_CALL, transparentRetriesPerCall.get()) + .put(RETRY_DELAY_PER_CALL, retryDelayNanos.get() / NANOS_PER_MILLI); + TagValue methodTag = TagValue.create(fullMethodName); TagValue statusTag = TagValue.create(status.getCode().toString()); measureMap.record( - module - .tagger - .toBuilder(startCtx) + module.tagger + .toBuilder(parentCtx) + .putLocal(RpcMeasureConstants.GRPC_CLIENT_METHOD, methodTag) .putLocal(RpcMeasureConstants.GRPC_CLIENT_STATUS, statusTag) .build()); } diff --git a/census/src/main/java/io/grpc/census/CensusTracingModule.java b/census/src/main/java/io/grpc/census/CensusTracingModule.java index dac62206fd..08d5fe3ca9 100644 --- a/census/src/main/java/io/grpc/census/CensusTracingModule.java +++ b/census/src/main/java/io/grpc/census/CensusTracingModule.java @@ -32,6 +32,7 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerStreamTracer; import io.grpc.StreamTracer; +import io.opencensus.trace.AttributeValue; import io.opencensus.trace.BlankSpan; import io.opencensus.trace.EndSpanOptions; import io.opencensus.trace.MessageEvent; @@ -60,7 +61,8 @@ import javax.annotation.Nullable; final class CensusTracingModule { private static final Logger logger = Logger.getLogger(CensusTracingModule.class.getName()); - @Nullable private static final AtomicIntegerFieldUpdater callEndedUpdater; + @Nullable + private static final AtomicIntegerFieldUpdater callEndedUpdater; @Nullable private static final AtomicIntegerFieldUpdater streamClosedUpdater; @@ -70,11 +72,11 @@ final class CensusTracingModule { * (potentially racy) direct updates of the volatile variables. */ static { - AtomicIntegerFieldUpdater tmpCallEndedUpdater; + AtomicIntegerFieldUpdater tmpCallEndedUpdater; AtomicIntegerFieldUpdater tmpStreamClosedUpdater; try { tmpCallEndedUpdater = - AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded"); + AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded"); tmpStreamClosedUpdater = AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed"); } catch (Throwable t) { @@ -116,11 +118,12 @@ final class CensusTracingModule { } /** - * Creates a {@link ClientCallTracer} for a new call. + * Creates a {@link CallAttemptsTracerFactory} for a new call. */ @VisibleForTesting - ClientCallTracer newClientCallTracer(@Nullable Span parentSpan, MethodDescriptor method) { - return new ClientCallTracer(parentSpan, method); + CallAttemptsTracerFactory newClientCallTracer( + @Nullable Span parentSpan, MethodDescriptor method) { + return new CallAttemptsTracerFactory(parentSpan, method); } /** @@ -223,19 +226,21 @@ final class CensusTracingModule { } @VisibleForTesting - final class ClientCallTracer extends ClientStreamTracer.InternalLimitedInfoFactory { + final class CallAttemptsTracerFactory extends ClientStreamTracer.InternalLimitedInfoFactory { volatile int callEnded; private final boolean isSampledToLocalTracing; private final Span span; + private final String fullMethodName; - ClientCallTracer(@Nullable Span parentSpan, MethodDescriptor method) { + CallAttemptsTracerFactory(@Nullable Span parentSpan, MethodDescriptor method) { checkNotNull(method, "method"); this.isSampledToLocalTracing = method.isSampledToLocalTracing(); + this.fullMethodName = method.getFullMethodName(); this.span = censusTracer .spanBuilderWithExplicitParent( - generateTraceSpanName(false, method.getFullMethodName()), + generateTraceSpanName(false, fullMethodName), parentSpan) .setRecordEvents(true) .startSpan(); @@ -244,7 +249,17 @@ final class CensusTracingModule { @Override public ClientStreamTracer newClientStreamTracer( ClientStreamTracer.StreamInfo info, Metadata headers) { - return new ClientTracer(span, tracingHeader); + Span attemptSpan = censusTracer + .spanBuilderWithExplicitParent( + "Attempt." + fullMethodName.replace('/', '.'), + span) + .setRecordEvents(true) + .startSpan(); + attemptSpan.putAttribute( + "previous-rpc-attempts", AttributeValue.longAttributeValue(info.getPreviousAttempts())); + attemptSpan.putAttribute( + "transparent-retry", AttributeValue.booleanAttributeValue(info.isTransparentRetry())); + return new ClientTracer(attemptSpan, tracingHeader, isSampledToLocalTracing); } /** @@ -271,10 +286,13 @@ final class CensusTracingModule { private static final class ClientTracer extends ClientStreamTracer { private final Span span; final Metadata.Key tracingHeader; + final boolean isSampledToLocalTracing; - ClientTracer(Span span, Metadata.Key tracingHeader) { + ClientTracer( + Span span, Metadata.Key tracingHeader, boolean isSampledToLocalTracing) { this.span = checkNotNull(span, "span"); this.tracingHeader = tracingHeader; + this.isSampledToLocalTracing = isSampledToLocalTracing; } @Override @@ -298,6 +316,11 @@ final class CensusTracingModule { recordMessageEvent( span, MessageEvent.Type.RECEIVED, seqNo, optionalWireSize, optionalUncompressedSize); } + + @Override + public void streamClosed(io.grpc.Status status) { + span.end(createEndSpanOptions(status, isSampledToLocalTracing)); + } } @@ -388,7 +411,7 @@ final class CensusTracingModule { // Safe usage of the unsafe trace API because CONTEXT_SPAN_KEY.get() returns the same value // as Tracer.getCurrentSpan() except when no value available when the return value is null // for the direct access and BlankSpan when Tracer API is used. - final ClientCallTracer tracerFactory = + final CallAttemptsTracerFactory tracerFactory = newClientCallTracer(ContextUtils.getValue(Context.current()), method); ClientCall call = next.newCall( diff --git a/census/src/test/java/io/grpc/census/CensusModulesTest.java b/census/src/test/java/io/grpc/census/CensusModulesTest.java index fd3a049f7a..d285c8fe8c 100644 --- a/census/src/test/java/io/grpc/census/CensusModulesTest.java +++ b/census/src/test/java/io/grpc/census/CensusModulesTest.java @@ -18,6 +18,9 @@ package io.grpc.census; import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertWithMessage; +import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRIES_PER_CALL; +import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRY_DELAY_PER_CALL; +import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.TRANSPARENT_RETRIES_PER_CALL; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -58,6 +61,7 @@ import io.grpc.ServerServiceDefinition; import io.grpc.ServerStreamTracer; import io.grpc.ServerStreamTracer.ServerCallInfo; import io.grpc.Status; +import io.grpc.census.CensusTracingModule.CallAttemptsTracerFactory; import io.grpc.census.internal.DeprecatedCensusConstants; import io.grpc.internal.FakeClock; import io.grpc.internal.testing.StatsTestUtils; @@ -81,6 +85,7 @@ import io.opencensus.stats.StatsComponent; import io.opencensus.stats.View; import io.opencensus.tags.TagContext; import io.opencensus.tags.TagValue; +import io.opencensus.trace.AttributeValue; import io.opencensus.trace.BlankSpan; import io.opencensus.trace.EndSpanOptions; import io.opencensus.trace.MessageEvent; @@ -173,10 +178,12 @@ public class CensusModulesTest { private final Random random = new Random(1234); private final Span fakeClientParentSpan = MockableSpan.generateRandomSpan(random); private final Span spyClientSpan = spy(MockableSpan.generateRandomSpan(random)); - private final SpanContext fakeClientSpanContext = spyClientSpan.getContext(); + private final Span spyAttemptSpan = spy(MockableSpan.generateRandomSpan(random)); + private final SpanContext fakeAttemptSpanContext = spyAttemptSpan.getContext(); private final Span spyServerSpan = spy(MockableSpan.generateRandomSpan(random)); private final byte[] binarySpanContext = new byte[]{3, 1, 5}; private final SpanBuilder spyClientSpanBuilder = spy(new MockableSpan.Builder()); + private final SpanBuilder spyAttemptSpanBuilder = spy(new MockableSpan.Builder()); private final SpanBuilder spyServerSpanBuilder = spy(new MockableSpan.Builder()); @Rule @@ -201,15 +208,20 @@ public class CensusModulesTest { @Before public void setUp() throws Exception { when(spyClientSpanBuilder.startSpan()).thenReturn(spyClientSpan); - when(tracer.spanBuilderWithExplicitParent(anyString(), ArgumentMatchers.any())) + when(spyAttemptSpanBuilder.startSpan()).thenReturn(spyAttemptSpan); + when(tracer.spanBuilderWithExplicitParent( + eq("Sent.package1.service2.method3"), ArgumentMatchers.any())) .thenReturn(spyClientSpanBuilder); + when(tracer.spanBuilderWithExplicitParent( + eq("Attempt.package1.service2.method3"), ArgumentMatchers.any())) + .thenReturn(spyAttemptSpanBuilder); when(spyServerSpanBuilder.startSpan()).thenReturn(spyServerSpan); when(tracer.spanBuilderWithRemoteParent(anyString(), ArgumentMatchers.any())) .thenReturn(spyServerSpanBuilder); when(mockTracingPropagationHandler.toByteArray(any(SpanContext.class))) .thenReturn(binarySpanContext); when(mockTracingPropagationHandler.fromByteArray(any(byte[].class))) - .thenReturn(fakeClientSpanContext); + .thenReturn(fakeAttemptSpanContext); censusStats = new CensusStatsModule( tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(), @@ -292,7 +304,7 @@ public class CensusModulesTest { assertEquals(2, capturedCallOptions.get().getStreamTracerFactories().size()); assertTrue( capturedCallOptions.get().getStreamTracerFactories().get(0) - instanceof CensusTracingModule.ClientCallTracer); + instanceof CallAttemptsTracerFactory); assertTrue( capturedCallOptions.get().getStreamTracerFactories().get(1) instanceof CensusStatsModule.CallAttemptsTracerFactory); @@ -355,6 +367,7 @@ public class CensusModulesTest { .setSampleToLocalSpanStore(false) .build()); verify(spyClientSpan, never()).end(); + assertZeroRetryRecorded(); } @Test @@ -489,11 +502,200 @@ public class CensusModulesTest { DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); assertEquals(30 + 100 + 16 + 24, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + assertZeroRetryRecorded(); } else { assertNull(statsRecorder.pollRecord()); } } + // This test is only unit-testing the stat recording logic. The retry behavior is faked. + @Test + public void recordRetryStats() { + CensusStatsModule localCensusStats = + new CensusStatsModule( + tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(), + true, true, true, true); + CensusStatsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = + new CensusStatsModule.CallAttemptsTracerFactory( + localCensusStats, tagger.empty(), method.getFullMethodName()); + ClientStreamTracer tracer = + callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); + + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); + assertEquals(1, record.tags.size()); + TagValue methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + assertEquals( + 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT)); + + fakeClock.forwardTime(30, MILLISECONDS); + tracer.outboundHeaders(); + fakeClock.forwardTime(100, MILLISECONDS); + tracer.outboundMessage(0); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true); + tracer.outboundMessage(1); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true); + tracer.outboundWireSize(1028); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD, 1028, true, true); + tracer.outboundUncompressedSize(1128); + fakeClock.forwardTime(24, MILLISECONDS); + tracer.streamClosed(Status.UNAVAILABLE); + record = statsRecorder.pollRecord(); + methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS); + assertEquals(Status.Code.UNAVAILABLE.toString(), statusTag.asString()); + assertEquals( + 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT)); + assertEquals(1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT)); + assertEquals( + 2, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT)); + assertEquals( + 1028, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES)); + assertEquals( + 1128, + record.getMetricAsLongOrFail( + DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals( + 30 + 100 + 24, + record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + + // faking retry + fakeClock.forwardTime(1000, MILLISECONDS); + tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); + record = statsRecorder.pollRecord(); + assertEquals(1, record.tags.size()); + methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + assertEquals( + 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT)); + tracer.outboundHeaders(); + tracer.outboundMessage(0); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true); + tracer.outboundMessage(1); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true); + tracer.outboundWireSize(1028); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD, 1028, true, true); + tracer.outboundUncompressedSize(1128); + fakeClock.forwardTime(100, MILLISECONDS); + tracer.streamClosed(Status.NOT_FOUND); + record = statsRecorder.pollRecord(); + methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS); + assertEquals(Status.Code.NOT_FOUND.toString(), statusTag.asString()); + assertEquals( + 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT)); + assertEquals(1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT)); + assertEquals( + 2, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT)); + assertEquals( + 1028, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES)); + assertEquals( + 1128, + record.getMetricAsLongOrFail( + DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals( + 100 , + record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + + // fake transparent retry + fakeClock.forwardTime(10, MILLISECONDS); + tracer = callAttemptsTracerFactory.newClientStreamTracer( + STREAM_INFO.toBuilder().setIsTransparentRetry(true).build(), new Metadata()); + record = statsRecorder.pollRecord(); + assertEquals(1, record.tags.size()); + methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + assertEquals( + 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT)); + tracer.streamClosed(Status.UNAVAILABLE); + record = statsRecorder.pollRecord(); + statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS); + assertEquals(Status.Code.UNAVAILABLE.toString(), statusTag.asString()); + assertEquals( + 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT)); + assertEquals(1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT)); + assertEquals( + 0, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT)); + assertEquals( + 0, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES)); + + // fake another transparent retry + fakeClock.forwardTime(10, MILLISECONDS); + tracer = callAttemptsTracerFactory.newClientStreamTracer( + STREAM_INFO.toBuilder().setIsTransparentRetry(true).build(), new Metadata()); + record = statsRecorder.pollRecord(); + assertEquals(1, record.tags.size()); + assertEquals( + 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT)); + tracer.outboundHeaders(); + tracer.outboundMessage(0); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true); + tracer.outboundMessage(1); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true); + tracer.outboundWireSize(1028); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD, 1028, true, true); + tracer.outboundUncompressedSize(1128); + fakeClock.forwardTime(16, MILLISECONDS); + tracer.inboundMessage(0); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_RECEIVED_MESSAGES_PER_METHOD, 1, true, true); + tracer.inboundWireSize(33); + assertRealTimeMetric( + RpcMeasureConstants.GRPC_CLIENT_RECEIVED_BYTES_PER_METHOD, 33, true, true); + tracer.inboundUncompressedSize(67); + fakeClock.forwardTime(24, MILLISECONDS); + // RPC succeeded + tracer.streamClosed(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK); + + record = statsRecorder.pollRecord(); + statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS); + assertEquals(Status.Code.OK.toString(), statusTag.asString()); + assertEquals( + 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT)); + assertThat(record.getMetric(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT)).isNull(); + assertEquals( + 2, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT)); + assertEquals( + 1028, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES)); + assertEquals( + 1128, + record.getMetricAsLongOrFail( + DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals( + 1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_COUNT)); + assertEquals( + 33, + record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_BYTES)); + assertEquals( + 67, + record.getMetricAsLongOrFail( + DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + assertEquals( + 16 + 24 , + record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + + record = statsRecorder.pollRecord(); + methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS); + assertEquals(Status.Code.OK.toString(), statusTag.asString()); + assertThat(record.getMetric(RETRIES_PER_CALL)).isEqualTo(1); + assertThat(record.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(2); + assertThat(record.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(1000D + 10 + 10); + } + private void assertRealTimeMetric( Measure measure, long expectedValue, boolean recordRealTimeMetrics, boolean clientSide) { StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); @@ -517,16 +719,28 @@ public class CensusModulesTest { assertEquals(expectedValue, record.getMetricAsLongOrFail(measure)); } + private void assertZeroRetryRecorded() { + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); + TagValue methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + assertThat(record.getMetric(RETRIES_PER_CALL)).isEqualTo(0); + assertThat(record.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(0); + assertThat(record.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(0D); + } + @Test public void clientBasicTracingDefaultSpan() { - CensusTracingModule.ClientCallTracer callTracer = + CallAttemptsTracerFactory callTracer = censusTracing.newClientCallTracer(null, method); Metadata headers = new Metadata(); ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers); clientStreamTracer.streamCreated(Attributes.EMPTY, headers); verify(tracer).spanBuilderWithExplicitParent( eq("Sent.package1.service2.method3"), ArgumentMatchers.isNull()); + verify(tracer).spanBuilderWithExplicitParent( + eq("Attempt.package1.service2.method3"), eq(spyClientSpan)); verify(spyClientSpan, never()).end(any(EndSpanOptions.class)); + verify(spyAttemptSpan, never()).end(any(EndSpanOptions.class)); clientStreamTracer.outboundMessage(0); clientStreamTracer.outboundMessageSent(0, 882, -1); @@ -538,8 +752,12 @@ public class CensusModulesTest { clientStreamTracer.streamClosed(Status.OK); callTracer.callEnded(Status.OK); - InOrder inOrder = inOrder(spyClientSpan); - inOrder.verify(spyClientSpan, times(3)).addMessageEvent(messageEventCaptor.capture()); + InOrder inOrder = inOrder(spyClientSpan, spyAttemptSpan); + inOrder.verify(spyAttemptSpan) + .putAttribute("previous-rpc-attempts", AttributeValue.longAttributeValue(0)); + inOrder.verify(spyAttemptSpan) + .putAttribute("transparent-retry", AttributeValue.booleanAttributeValue(false)); + inOrder.verify(spyAttemptSpan, times(3)).addMessageEvent(messageEventCaptor.capture()); List events = messageEventCaptor.getAllValues(); assertEquals( MessageEvent.builder(MessageEvent.Type.SENT, 0).setCompressedMessageSize(882).build(), @@ -553,18 +771,23 @@ public class CensusModulesTest { .setUncompressedMessageSize(90) .build(), events.get(2)); + inOrder.verify(spyAttemptSpan).end( + EndSpanOptions.builder() + .setStatus(io.opencensus.trace.Status.OK) + .setSampleToLocalSpanStore(false) + .build()); inOrder.verify(spyClientSpan).end( EndSpanOptions.builder() .setStatus(io.opencensus.trace.Status.OK) .setSampleToLocalSpanStore(false) .build()); - verifyNoMoreInteractions(spyClientSpan); + inOrder.verifyNoMoreInteractions(); verifyNoMoreInteractions(tracer); } @Test public void clientTracingSampledToLocalSpanStore() { - CensusTracingModule.ClientCallTracer callTracer = + CallAttemptsTracerFactory callTracer = censusTracing.newClientCallTracer(null, sampledMethod); callTracer.callEnded(Status.OK); @@ -631,11 +854,12 @@ public class CensusModulesTest { 3000, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); assertNull(record.getMetric(DeprecatedCensusConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); + assertZeroRetryRecorded(); } @Test public void clientStreamNeverCreatedStillRecordTracing() { - CensusTracingModule.ClientCallTracer callTracer = + CallAttemptsTracerFactory callTracer = censusTracing.newClientCallTracer(fakeClientParentSpan, method); verify(tracer).spanBuilderWithExplicitParent( eq("Sent.package1.service2.method3"), same(fakeClientParentSpan)); @@ -770,6 +994,7 @@ public class CensusModulesTest { assertNull(clientRecord.getMetric(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT)); TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG); assertEquals("extra-tag-value-897", clientPropagatedTag.asString()); + assertZeroRetryRecorded(); } if (!recordStats) { @@ -812,16 +1037,18 @@ public class CensusModulesTest { @Test public void traceHeadersPropagateSpanContext() throws Exception { - CensusTracingModule.ClientCallTracer callTracer = + CallAttemptsTracerFactory callTracer = censusTracing.newClientCallTracer(fakeClientParentSpan, method); Metadata headers = new Metadata(); ClientStreamTracer streamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers); streamTracer.streamCreated(Attributes.EMPTY, headers); - verify(mockTracingPropagationHandler).toByteArray(same(fakeClientSpanContext)); + verify(mockTracingPropagationHandler).toByteArray(same(fakeAttemptSpanContext)); verifyNoMoreInteractions(mockTracingPropagationHandler); verify(tracer).spanBuilderWithExplicitParent( eq("Sent.package1.service2.method3"), same(fakeClientParentSpan)); + verify(tracer).spanBuilderWithExplicitParent( + eq("Attempt.package1.service2.method3"), same(spyClientSpan)); verify(spyClientSpanBuilder).setRecordEvents(eq(true)); verifyNoMoreInteractions(tracer); assertTrue(headers.containsKey(censusTracing.tracingHeader)); @@ -831,7 +1058,7 @@ public class CensusModulesTest { method.getFullMethodName(), headers); verify(mockTracingPropagationHandler).fromByteArray(same(binarySpanContext)); verify(tracer).spanBuilderWithRemoteParent( - eq("Recv.package1.service2.method3"), same(spyClientSpan.getContext())); + eq("Recv.package1.service2.method3"), same(spyAttemptSpan.getContext())); verify(spyServerSpanBuilder).setRecordEvents(eq(true)); Context filteredContext = serverTracer.filterContext(Context.ROOT); @@ -840,7 +1067,7 @@ public class CensusModulesTest { @Test public void traceHeaders_propagateSpanContext() throws Exception { - CensusTracingModule.ClientCallTracer callTracer = + CallAttemptsTracerFactory callTracer = censusTracing.newClientCallTracer(fakeClientParentSpan, method); Metadata headers = new Metadata(); @@ -854,10 +1081,12 @@ public class CensusModulesTest { public void traceHeaders_missingCensusImpl_notPropagateSpanContext() throws Exception { reset(spyClientSpanBuilder); + reset(spyAttemptSpanBuilder); when(spyClientSpanBuilder.startSpan()).thenReturn(BlankSpan.INSTANCE); + when(spyAttemptSpanBuilder.startSpan()).thenReturn(BlankSpan.INSTANCE); Metadata headers = new Metadata(); - CensusTracingModule.ClientCallTracer callTracer = + CallAttemptsTracerFactory callTracer = censusTracing.newClientCallTracer(BlankSpan.INSTANCE, method); callTracer.newClientStreamTracer(STREAM_INFO, headers).streamCreated(Attributes.EMPTY, headers); @@ -867,14 +1096,16 @@ public class CensusModulesTest { @Test public void traceHeaders_clientMissingCensusImpl_preservingHeaders() throws Exception { reset(spyClientSpanBuilder); + reset(spyAttemptSpanBuilder); when(spyClientSpanBuilder.startSpan()).thenReturn(BlankSpan.INSTANCE); + when(spyAttemptSpanBuilder.startSpan()).thenReturn(BlankSpan.INSTANCE); Metadata headers = new Metadata(); headers.put( Metadata.Key.of("never-used-key-bin", Metadata.BINARY_BYTE_MARSHALLER), new byte[] {}); Set originalHeaderKeys = new HashSet<>(headers.keys()); - CensusTracingModule.ClientCallTracer callTracer = + CallAttemptsTracerFactory callTracer = censusTracing.newClientCallTracer(BlankSpan.INSTANCE, method); callTracer.newClientStreamTracer(STREAM_INFO, headers).streamCreated(Attributes.EMPTY, headers); @@ -885,9 +1116,9 @@ public class CensusModulesTest { public void traceHeaderMalformed() throws Exception { // As comparison, normal header parsing Metadata headers = new Metadata(); - headers.put(censusTracing.tracingHeader, fakeClientSpanContext); + headers.put(censusTracing.tracingHeader, fakeAttemptSpanContext); // mockTracingPropagationHandler was stubbed to always return fakeServerParentSpanContext - assertSame(spyClientSpan.getContext(), headers.get(censusTracing.tracingHeader)); + assertSame(spyAttemptSpan.getContext(), headers.get(censusTracing.tracingHeader)); // Make BinaryPropagationHandler always throw when parsing the header when(mockTracingPropagationHandler.fromByteArray(any(byte[].class))) @@ -895,7 +1126,7 @@ public class CensusModulesTest { headers = new Metadata(); assertNull(headers.get(censusTracing.tracingHeader)); - headers.put(censusTracing.tracingHeader, fakeClientSpanContext); + headers.put(censusTracing.tracingHeader, fakeAttemptSpanContext); assertSame(SpanContext.INVALID, headers.get(censusTracing.tracingHeader)); assertNotSame(spyClientSpan.getContext(), SpanContext.INVALID); diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 28cd335120..dd17244e2a 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -255,7 +255,8 @@ final class ClientCallImpl extends ClientCall { effectiveDeadline, context.getDeadline(), callOptions.getDeadline()); stream = clientStreamProvider.newStream(method, callOptions, headers, context); } else { - ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(callOptions, headers, false); + ClientStreamTracer[] tracers = + GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false); stream = new FailingClientStream( DEADLINE_EXCEEDED.withDescription( "ClientCall started after deadline exceeded: " + effectiveDeadline), diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index 5b5a062e95..54f6d2f41d 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -757,11 +757,12 @@ public final class GrpcUtil { /** Gets stream tracers based on CallOptions. */ public static ClientStreamTracer[] getClientStreamTracers( - CallOptions callOptions, Metadata headers, boolean isTransparentRetry) { + CallOptions callOptions, Metadata headers, int previousAttempts, boolean isTransparentRetry) { List factories = callOptions.getStreamTracerFactories(); ClientStreamTracer[] tracers = new ClientStreamTracer[factories.size() + 1]; StreamInfo streamInfo = StreamInfo.newBuilder() .setCallOptions(callOptions) + .setPreviousAttempts(previousAttempts) .setIsTransparentRetry(isTransparentRetry) .build(); for (int i = 0; i < factories.size(); i++) { diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 87162d9aba..6cd5598e2a 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -533,7 +533,7 @@ final class ManagedChannelImpl extends ManagedChannel implements getTransport(new PickSubchannelArgsImpl(method, headers, callOptions)); Context origContext = context.attach(); ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( - callOptions, headers, /* isTransparentRetry= */ false); + callOptions, headers, 0, /* isTransparentRetry= */ false); try { return transport.newStream(method, headers, callOptions, tracers); } finally { @@ -572,10 +572,11 @@ final class ManagedChannelImpl extends ManagedChannel implements @Override ClientStream newSubstream( - Metadata newHeaders, ClientStreamTracer.Factory factory, boolean isTransparentRetry) { + Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts, + boolean isTransparentRetry) { CallOptions newOptions = callOptions.withStreamTracerFactory(factory); - ClientStreamTracer[] tracers = - GrpcUtil.getClientStreamTracers(newOptions, newHeaders, isTransparentRetry); + ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( + newOptions, newHeaders, previousAttempts, isTransparentRetry); ClientTransport transport = getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions)); Context origContext = context.attach(); @@ -624,7 +625,7 @@ final class ManagedChannelImpl extends ManagedChannel implements channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider); ProxyDetector proxyDetector = builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR; - this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry; + this.retryEnabled = builder.retryEnabled; this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy); this.offloadExecutorHolder = new ExecutorHolder( diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java index d42b383213..cad4ece233 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java @@ -143,10 +143,6 @@ public final class ManagedChannelImplBuilder long retryBufferSize = DEFAULT_RETRY_BUFFER_SIZE_IN_BYTES; long perRpcBufferLimit = DEFAULT_PER_RPC_BUFFER_LIMIT_IN_BYTES; boolean retryEnabled = false; // TODO(zdapeng): default to true - // Temporarily disable retry when stats or tracing is enabled to avoid breakage, until we know - // what should be the desired behavior for retry + stats/tracing. - // TODO(zdapeng): delete me - boolean temporarilyDisableRetry; InternalChannelz channelz = InternalChannelz.instance(); int maxTraceEvents; @@ -460,8 +456,6 @@ public final class ManagedChannelImplBuilder @Override public ManagedChannelImplBuilder enableRetry() { retryEnabled = true; - statsEnabled = false; - tracingEnabled = false; return this; } @@ -592,9 +586,6 @@ public final class ManagedChannelImplBuilder /** * Disable or enable tracing features. Enabled by default. - * - *

For the current release, calling {@code setTracingEnabled(true)} may have a side effect that - * disables retry. */ public void setTracingEnabled(boolean value) { tracingEnabled = value; @@ -642,9 +633,7 @@ public final class ManagedChannelImplBuilder List getEffectiveInterceptors() { List effectiveInterceptors = new ArrayList<>(this.interceptors); - temporarilyDisableRetry = false; if (statsEnabled) { - temporarilyDisableRetry = true; ClientInterceptor statsInterceptor = null; try { Class censusStatsAccessor = @@ -679,7 +668,6 @@ public final class ManagedChannelImplBuilder } } if (tracingEnabled) { - temporarilyDisableRetry = true; ClientInterceptor tracingInterceptor = null; try { Class censusTracingAccessor = diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java index b628842efe..589824ae10 100644 --- a/core/src/main/java/io/grpc/internal/OobChannel.java +++ b/core/src/main/java/io/grpc/internal/OobChannel.java @@ -88,7 +88,7 @@ final class OobChannel extends ManagedChannel implements InternalInstrumented method, CallOptions callOptions, Metadata headers, Context context) { ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( - callOptions, headers, /* isTransparentRetry= */ false); + callOptions, headers, 0, /* isTransparentRetry= */ false); Context origContext = context.attach(); // delayed transport's newStream() always acquires a lock, but concurrent performance doesn't // matter here because OOB communication should be sparse, and it's not on application RPC's diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index d19a260049..3d277bbe2f 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -218,7 +218,7 @@ abstract class RetriableStream implements ClientStream { Metadata newHeaders = updateHeaders(headers, previousAttemptCount); // NOTICE: This set _must_ be done before stream.start() and it actually is. - sub.stream = newSubstream(newHeaders, tracerFactory, isTransparentRetry); + sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry); return sub; } @@ -227,7 +227,8 @@ abstract class RetriableStream implements ClientStream { * Client stream is not yet started. */ abstract ClientStream newSubstream( - Metadata headers, ClientStreamTracer.Factory tracerFactory, boolean isTransparentRetry); + Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts, + boolean isTransparentRetry); /** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */ @VisibleForTesting @@ -869,24 +870,26 @@ abstract class RetriableStream implements ClientStream { synchronized (lock) { scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock); } - scheduledRetryCopy.setFuture( - scheduledExecutorService.schedule( + class RetryBackoffRunnable implements Runnable { + @Override + public void run() { + callExecutor.execute( new Runnable() { @Override public void run() { - callExecutor.execute( - new Runnable() { - @Override - public void run() { - // retry - Substream newSubstream = createSubstream( - substream.previousAttemptCount + 1, - false); - drain(newSubstream); - } - }); + // retry + Substream newSubstream = createSubstream( + substream.previousAttemptCount + 1, + false); + drain(newSubstream); } - }, + }); + } + } + + scheduledRetryCopy.setFuture( + scheduledExecutorService.schedule( + new RetryBackoffRunnable(), retryPlan.backoffNanos, TimeUnit.NANOSECONDS)); return; diff --git a/core/src/main/java/io/grpc/internal/SubchannelChannel.java b/core/src/main/java/io/grpc/internal/SubchannelChannel.java index 1380a6bc71..a1d454ed2f 100644 --- a/core/src/main/java/io/grpc/internal/SubchannelChannel.java +++ b/core/src/main/java/io/grpc/internal/SubchannelChannel.java @@ -59,7 +59,7 @@ final class SubchannelChannel extends Channel { transport = notReadyTransport; } ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( - callOptions, headers, /* isTransparentRetry= */ false); + callOptions, headers, 0, /* isTransparentRetry= */ false); Context origContext = context.attach(); try { return transport.newStream(method, headers, callOptions, tracers); diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 95d2c2ba8b..c9ea504e18 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -164,7 +164,8 @@ public class RetriableStreamTest { @Override ClientStream newSubstream( - Metadata metadata, ClientStreamTracer.Factory tracerFactory, boolean isTransparentRetry) { + Metadata metadata, ClientStreamTracer.Factory tracerFactory, int previousAttempts, + boolean isTransparentRetry) { bufferSizeTracer = tracerFactory.newClientStreamTracer(STREAM_INFO, metadata); int actualPreviousRpcAttemptsInHeader = metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS) == null diff --git a/interop-testing/build.gradle b/interop-testing/build.gradle index 14c92a9fd1..852d5882cc 100644 --- a/interop-testing/build.gradle +++ b/interop-testing/build.gradle @@ -44,6 +44,7 @@ dependencies { project(':grpc-grpclb') testImplementation project(':grpc-context').sourceSets.test.output, project(':grpc-api').sourceSets.test.output, + project(':grpc-core').sourceSets.test.output, libraries.mockito alpnagent libraries.jetty_alpn_agent } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index 8a6e41722a..693d9b2af7 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -92,6 +92,9 @@ import io.grpc.testing.integration.Messages.StreamingInputCallResponse; import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; +import io.opencensus.stats.Measure; +import io.opencensus.stats.Measure.MeasureDouble; +import io.opencensus.stats.Measure.MeasureLong; import io.opencensus.tags.TagKey; import io.opencensus.tags.TagValue; import io.opencensus.trace.Span; @@ -152,6 +155,15 @@ public abstract class AbstractInteropTest { * SETTINGS/WINDOW_UPDATE exchange. */ public static final int TEST_FLOW_CONTROL_WINDOW = 65 * 1024; + private static final MeasureLong RETRIES_PER_CALL = + Measure.MeasureLong.create( + "grpc.io/client/retries_per_call", "Number of retries per call", "1"); + private static final MeasureLong TRANSPARENT_RETRIES_PER_CALL = + Measure.MeasureLong.create( + "grpc.io/client/transparent_retries_per_call", "Transparent retries per call", "1"); + private static final MeasureDouble RETRY_DELAY_PER_CALL = + Measure.MeasureDouble.create( + "grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms"); private static final FakeTagger tagger = new FakeTagger(); private static final FakeTagContextBinarySerializer tagContextBinarySerializer = @@ -1234,6 +1246,7 @@ public abstract class AbstractInteropTest { checkEndTags( clientEndRecord, "grpc.testing.TestService/EmptyCall", Status.DEADLINE_EXCEEDED.getCode(), true); + assertZeroRetryRecorded(); } // warm up the channel @@ -1243,6 +1256,7 @@ public abstract class AbstractInteropTest { clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); // clientEndRecord clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); + assertZeroRetryRecorded(); } try { blockingStub @@ -1261,6 +1275,7 @@ public abstract class AbstractInteropTest { checkEndTags( clientEndRecord, "grpc.testing.TestService/EmptyCall", Status.DEADLINE_EXCEEDED.getCode(), true); + assertZeroRetryRecorded(); } } @@ -1978,6 +1993,13 @@ public abstract class AbstractInteropTest { assertStatsTrace(method, status, null, null); } + private void assertZeroRetryRecorded() { + MetricsRecord retryRecord = clientStatsRecorder.pollRecord(); + assertThat(retryRecord.getMetric(RETRIES_PER_CALL)).isEqualTo(0); + assertThat(retryRecord.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(0); + assertThat(retryRecord.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(0D); + } + private void assertClientStatsTrace(String method, Status.Code code, Collection requests, Collection responses) { // Tracer-based stats @@ -2007,6 +2029,7 @@ public abstract class AbstractInteropTest { if (requests != null && responses != null) { checkCensus(clientEndRecord, false, requests, responses); } + assertZeroRetryRecorded(); } } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java index 4824f05313..bdf39e8546 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java @@ -17,13 +17,21 @@ package io.grpc.testing.integration; import static com.google.common.truth.Truth.assertThat; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; +import com.google.common.collect.ImmutableMap; +import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ClientStreamTracer; +import io.grpc.ClientStreamTracer.StreamInfo; +import io.grpc.Deadline; +import io.grpc.Deadline.Ticker; import io.grpc.IntegerMarshaller; import io.grpc.ManagedChannel; import io.grpc.Metadata; @@ -36,7 +44,15 @@ import io.grpc.ServerCallHandler; import io.grpc.ServerMethodDefinition; import io.grpc.ServerServiceDefinition; import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.StringMarshaller; +import io.grpc.census.InternalCensusStatsAccessor; +import io.grpc.census.internal.DeprecatedCensusConstants; +import io.grpc.internal.FakeClock; +import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder; +import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer; +import io.grpc.internal.testing.StatsTestUtils.FakeTagger; +import io.grpc.internal.testing.StatsTestUtils.MetricsRecord; import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; import io.grpc.testing.GrpcCleanupRule; @@ -45,11 +61,20 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalServerChannel; +import io.netty.util.concurrent.ScheduledFuture; +import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; +import io.opencensus.stats.Measure; +import io.opencensus.stats.Measure.MeasureDouble; +import io.opencensus.stats.Measure.MeasureLong; +import io.opencensus.tags.TagValue; import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -61,65 +86,111 @@ import org.mockito.junit.MockitoRule; @RunWith(JUnit4.class) public class RetryTest { + private static final FakeTagger tagger = new FakeTagger(); + private static final FakeTagContextBinarySerializer tagContextBinarySerializer = + new FakeTagContextBinarySerializer(); + private static final MeasureLong RETRIES_PER_CALL = + Measure.MeasureLong.create( + "grpc.io/client/retries_per_call", "Number of retries per call", "1"); + private static final MeasureLong TRANSPARENT_RETRIES_PER_CALL = + Measure.MeasureLong.create( + "grpc.io/client/transparent_retries_per_call", "Transparent retries per call", "1"); + private static final MeasureDouble RETRY_DELAY_PER_CALL = + Measure.MeasureDouble.create( + "grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms"); + @Rule public final MockitoRule mocks = MockitoJUnit.rule(); @Rule public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + private final FakeClock fakeClock = new FakeClock(); @Mock private ClientCall.Listener mockCallListener; - - @Test - public void retryUntilBufferLimitExceeded() throws Exception { - String message = "String of length 20."; - int bufferLimit = message.length() * 2 - 1; // Can buffer no more than 1 message. - - MethodDescriptor clientStreamingMethod = - MethodDescriptor.newBuilder() - .setType(MethodType.CLIENT_STREAMING) - .setFullMethodName("service/method") - .setRequestMarshaller(new StringMarshaller()) - .setResponseMarshaller(new IntegerMarshaller()) - .build(); - final LinkedBlockingQueue> serverCalls = - new LinkedBlockingQueue<>(); - ServerMethodDefinition methodDefinition = ServerMethodDefinition.create( - clientStreamingMethod, - new ServerCallHandler() { - @Override - public Listener startCall(ServerCall call, Metadata headers) { - serverCalls.offer(call); - return new Listener() {}; + private CountDownLatch backoffLatch = new CountDownLatch(1); + private final EventLoopGroup group = new DefaultEventLoopGroup() { + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public ScheduledFuture schedule( + final Runnable command, final long delay, final TimeUnit unit) { + if (!command.getClass().getName().contains("RetryBackoffRunnable")) { + return super.schedule(command, delay, unit); + } + fakeClock.getScheduledExecutorService().schedule( + new Runnable() { + @Override + public void run() { + group.execute(command); + } + }, + delay, + unit); + backoffLatch.countDown(); + return super.schedule( + new Runnable() { + @Override + public void run() {} // no-op + }, + 0, + TimeUnit.NANOSECONDS); + } + }; + private final FakeStatsRecorder clientStatsRecorder = new FakeStatsRecorder(); + private final ClientInterceptor statsInterceptor = + InternalCensusStatsAccessor.getClientInterceptor( + tagger, tagContextBinarySerializer, clientStatsRecorder, + fakeClock.getStopwatchSupplier(), true, true, true, + /* recordRealTimeMetrics= */ true); + private final MethodDescriptor clientStreamingMethod = + MethodDescriptor.newBuilder() + .setType(MethodType.CLIENT_STREAMING) + .setFullMethodName("service/method") + .setRequestMarshaller(new StringMarshaller()) + .setResponseMarshaller(new IntegerMarshaller()) + .build(); + private final LinkedBlockingQueue> serverCalls = + new LinkedBlockingQueue<>(); + private final ServerMethodDefinition methodDefinition = + ServerMethodDefinition.create( + clientStreamingMethod, + new ServerCallHandler() { + @Override + public Listener startCall(ServerCall call, Metadata headers) { + serverCalls.offer(call); + return new Listener() {}; + } } - } - ); - ServerServiceDefinition serviceDefinition = - ServerServiceDefinition.builder(clientStreamingMethod.getServiceName()) - .addMethod(methodDefinition) - .build(); - EventLoopGroup group = new DefaultEventLoopGroup(); - LocalAddress localAddress = new LocalAddress("RetryTest.retryUntilBufferLimitExceeded"); - Server localServer = cleanupRule.register(NettyServerBuilder.forAddress(localAddress) + ); + private final ServerServiceDefinition serviceDefinition = + ServerServiceDefinition.builder(clientStreamingMethod.getServiceName()) + .addMethod(methodDefinition) + .build(); + private final LocalAddress localAddress = new LocalAddress(this.getClass().getName()); + private Server localServer; + private ManagedChannel channel; + private Map retryPolicy = null; + private long bufferLimit = 1L << 20; // 1M + + private void startNewServer() throws Exception { + localServer = cleanupRule.register(NettyServerBuilder.forAddress(localAddress) .channelType(LocalServerChannel.class) .bossEventLoopGroup(group) .workerEventLoopGroup(group) .addService(serviceDefinition) .build()); localServer.start(); + } - Map retryPolicy = new HashMap<>(); - retryPolicy.put("maxAttempts", 4D); - retryPolicy.put("initialBackoff", "10s"); - retryPolicy.put("maxBackoff", "10s"); - retryPolicy.put("backoffMultiplier", 1D); - retryPolicy.put("retryableStatusCodes", Arrays.asList("UNAVAILABLE")); + private void createNewChannel() { Map methodConfig = new HashMap<>(); Map name = new HashMap<>(); name.put("service", "service"); methodConfig.put("name", Arrays.asList(name)); - methodConfig.put("retryPolicy", retryPolicy); + if (retryPolicy != null) { + methodConfig.put("retryPolicy", retryPolicy); + } Map rawServiceConfig = new HashMap<>(); rawServiceConfig.put("methodConfig", Arrays.asList(methodConfig)); - ManagedChannel channel = cleanupRule.register( + channel = cleanupRule.register( NettyChannelBuilder.forAddress(localAddress) .channelType(LocalChannel.class) .eventLoopGroup(group) @@ -127,23 +198,100 @@ public class RetryTest { .enableRetry() .perRpcBufferLimit(bufferLimit) .defaultServiceConfig(rawServiceConfig) + .intercept(statsInterceptor) .build()); + } + + private void elapseBackoff(long time, TimeUnit unit) throws Exception { + assertThat(backoffLatch.await(5, SECONDS)).isTrue(); + backoffLatch = new CountDownLatch(1); + fakeClock.forwardTime(time, unit); + } + + private void assertRpcStartedRecorded() throws Exception { + MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); + assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT)) + .isEqualTo(1); + } + + private void assertOutboundMessageRecorded() throws Exception { + MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); + assertThat( + record.getMetricAsLongOrFail( + RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD)) + .isEqualTo(1); + } + + private void assertInboundMessageRecorded() throws Exception { + MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); + assertThat( + record.getMetricAsLongOrFail( + RpcMeasureConstants.GRPC_CLIENT_RECEIVED_MESSAGES_PER_METHOD)) + .isEqualTo(1); + } + + private void assertOutboundWireSizeRecorded(long length) throws Exception { + MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); + assertThat(record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD)) + .isEqualTo(length); + } + + private void assertInboundWireSizeRecorded(long length) throws Exception { + MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); + assertThat( + record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_RECEIVED_BYTES_PER_METHOD)) + .isEqualTo(length); + } + + private void assertRpcStatusRecorded( + Status.Code code, long roundtripLatencyMs, long outboundMessages) throws Exception { + MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); + TagValue statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS); + assertThat(statusTag.asString()).isEqualTo(code.toString()); + assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT)) + .isEqualTo(1); + assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)) + .isEqualTo(roundtripLatencyMs); + assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT)) + .isEqualTo(outboundMessages); + } + + private void assertRetryStatsRecorded( + int numRetries, int numTransparentRetries, long retryDelayMs) throws Exception { + MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); + assertThat(record.getMetricAsLongOrFail(RETRIES_PER_CALL)).isEqualTo(numRetries); + assertThat(record.getMetricAsLongOrFail(TRANSPARENT_RETRIES_PER_CALL)) + .isEqualTo(numTransparentRetries); + assertThat(record.getMetricAsLongOrFail(RETRY_DELAY_PER_CALL)).isEqualTo(retryDelayMs); + } + + @Test + public void retryUntilBufferLimitExceeded() throws Exception { + String message = "String of length 20."; + + startNewServer(); + bufferLimit = message.length() * 2L - 1; // Can buffer no more than 1 message. + retryPolicy = ImmutableMap.builder() + .put("maxAttempts", 4D) + .put("initialBackoff", "10s") + .put("maxBackoff", "10s") + .put("backoffMultiplier", 1D) + .put("retryableStatusCodes", Arrays.asList("UNAVAILABLE")) + .build(); + createNewChannel(); ClientCall call = channel.newCall(clientStreamingMethod, CallOptions.DEFAULT); call.start(mockCallListener, new Metadata()); call.sendMessage(message); - ServerCall serverCall = serverCalls.poll(5, TimeUnit.SECONDS); + ServerCall serverCall = serverCalls.poll(5, SECONDS); serverCall.request(2); // trigger retry - Metadata pushBackMetadata = new Metadata(); - pushBackMetadata.put( - Metadata.Key.of("grpc-retry-pushback-ms", Metadata.ASCII_STRING_MARSHALLER), - "0"); // retry immediately serverCall.close( Status.UNAVAILABLE.withDescription("original attempt failed"), - pushBackMetadata); + new Metadata()); + elapseBackoff(10, SECONDS); // 2nd attempt received - serverCall = serverCalls.poll(5, TimeUnit.SECONDS); + serverCall = serverCalls.poll(5, SECONDS); serverCall.request(2); verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class)); // send one more message, should exceed buffer limit @@ -157,4 +305,146 @@ public class RetryTest { verify(mockCallListener, timeout(5000)).onClose(statusCaptor.capture(), any(Metadata.class)); assertThat(statusCaptor.getValue().getDescription()).contains("2nd attempt failed"); } + + @Test + public void statsRecorded() throws Exception { + startNewServer(); + retryPolicy = ImmutableMap.builder() + .put("maxAttempts", 4D) + .put("initialBackoff", "10s") + .put("maxBackoff", "10s") + .put("backoffMultiplier", 1D) + .put("retryableStatusCodes", Arrays.asList("UNAVAILABLE")) + .build(); + createNewChannel(); + + ClientCall call = channel.newCall(clientStreamingMethod, CallOptions.DEFAULT); + call.start(mockCallListener, new Metadata()); + assertRpcStartedRecorded(); + String message = "String of length 20."; + call.sendMessage(message); + assertOutboundMessageRecorded(); + ServerCall serverCall = serverCalls.poll(5, SECONDS); + serverCall.request(2); + assertOutboundWireSizeRecorded(message.length()); + // original attempt latency + fakeClock.forwardTime(1, SECONDS); + // trigger retry + serverCall.close( + Status.UNAVAILABLE.withDescription("original attempt failed"), + new Metadata()); + assertRpcStatusRecorded(Status.Code.UNAVAILABLE, 1000, 1); + elapseBackoff(10, SECONDS); + assertRpcStartedRecorded(); + assertOutboundMessageRecorded(); + serverCall = serverCalls.poll(5, SECONDS); + serverCall.request(2); + assertOutboundWireSizeRecorded(message.length()); + message = "new message"; + call.sendMessage(message); + assertOutboundMessageRecorded(); + assertOutboundWireSizeRecorded(message.length()); + // retry attempt latency + fakeClock.forwardTime(2, SECONDS); + serverCall.sendHeaders(new Metadata()); + serverCall.sendMessage(3); + call.request(1); + assertInboundMessageRecorded(); + assertInboundWireSizeRecorded(1); + serverCall.close(Status.OK, new Metadata()); + assertRpcStatusRecorded(Status.Code.OK, 2000, 2); + assertRetryStatsRecorded(1, 0, 10_000); + } + + @Test + public void serverCancelledAndClientDeadlineExceeded() throws Exception { + startNewServer(); + createNewChannel(); + + class CloseDelayedTracer extends ClientStreamTracer { + @Override + public void streamClosed(Status status) { + fakeClock.forwardTime(10, SECONDS); + } + } + + class CloseDelayedTracerFactory extends ClientStreamTracer.InternalLimitedInfoFactory { + @Override + public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { + return new CloseDelayedTracer(); + } + } + + CallOptions callOptions = CallOptions.DEFAULT + .withDeadline(Deadline.after( + 10, + SECONDS, + new Ticker() { + @Override + public long nanoTime() { + return fakeClock.getTicker().read(); + } + })) + .withStreamTracerFactory(new CloseDelayedTracerFactory()); + ClientCall call = channel.newCall(clientStreamingMethod, callOptions); + call.start(mockCallListener, new Metadata()); + assertRpcStartedRecorded(); + ServerCall serverCall = serverCalls.poll(5, SECONDS); + serverCall.close(Status.CANCELLED, new Metadata()); + assertRpcStatusRecorded(Code.DEADLINE_EXCEEDED, 10_000, 0); + assertRetryStatsRecorded(0, 0, 0); + } + + @Ignore("flaky because old transportReportStatus() is not completely migrated yet") + @Test + public void transparentRetryStatsRecorded() throws Exception { + startNewServer(); + createNewChannel(); + + final AtomicBoolean transparentRetryTriggered = new AtomicBoolean(); + class TransparentRetryTriggeringTracer extends ClientStreamTracer { + + @Override + public void streamCreated(Attributes transportAttrs, Metadata metadata) { + if (transparentRetryTriggered.get()) { + return; + } + localServer.shutdownNow(); + } + + @Override + public void streamClosed(Status status) { + if (transparentRetryTriggered.get()) { + return; + } + transparentRetryTriggered.set(true); + try { + startNewServer(); + channel.resetConnectBackoff(); + channel.getState(true); + } catch (Exception e) { + throw new AssertionError("local server can not be restarted", e); + } + } + } + + class TransparentRetryTracerFactory extends ClientStreamTracer.InternalLimitedInfoFactory { + @Override + public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { + return new TransparentRetryTriggeringTracer(); + } + } + + CallOptions callOptions = CallOptions.DEFAULT + .withWaitForReady() + .withStreamTracerFactory(new TransparentRetryTracerFactory()); + ClientCall call = channel.newCall(clientStreamingMethod, callOptions); + call.start(mockCallListener, new Metadata()); + assertRpcStartedRecorded(); + assertRpcStatusRecorded(Code.UNAVAILABLE, 0, 0); + assertRpcStartedRecorded(); + call.cancel("cancel", null); + assertRpcStatusRecorded(Code.CANCELLED, 0, 0); + assertRetryStatsRecorded(0, 1, 0); + } }