all: implement retry stats (#8362)

This commit is contained in:
ZHANG Dapeng 2021-08-11 10:24:37 -07:00 committed by GitHub
parent 1eb1d157a7
commit fd2a58a55e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 863 additions and 191 deletions

View File

@ -97,11 +97,15 @@ public abstract class ClientStreamTracer extends StreamTracer {
public static final class StreamInfo {
private final Attributes transportAttrs;
private final CallOptions callOptions;
private final int previousAttempts;
private final boolean isTransparentRetry;
StreamInfo(Attributes transportAttrs, CallOptions callOptions, boolean isTransparentRetry) {
StreamInfo(
Attributes transportAttrs, CallOptions callOptions, int previousAttempts,
boolean isTransparentRetry) {
this.transportAttrs = checkNotNull(transportAttrs, "transportAttrs");
this.callOptions = checkNotNull(callOptions, "callOptions");
this.previousAttempts = previousAttempts;
this.isTransparentRetry = isTransparentRetry;
}
@ -124,6 +128,15 @@ public abstract class ClientStreamTracer extends StreamTracer {
return callOptions;
}
/**
* Returns the number of preceding attempts for the RPC.
*
* @since 1.40.0
*/
public int getPreviousAttempts() {
return previousAttempts;
}
/**
* Whether the stream is a transparent retry.
*
@ -142,6 +155,7 @@ public abstract class ClientStreamTracer extends StreamTracer {
return new Builder()
.setCallOptions(callOptions)
.setTransportAttrs(transportAttrs)
.setPreviousAttempts(previousAttempts)
.setIsTransparentRetry(isTransparentRetry);
}
@ -159,6 +173,7 @@ public abstract class ClientStreamTracer extends StreamTracer {
return MoreObjects.toStringHelper(this)
.add("transportAttrs", transportAttrs)
.add("callOptions", callOptions)
.add("previousAttempts", previousAttempts)
.add("isTransparentRetry", isTransparentRetry)
.toString();
}
@ -171,6 +186,7 @@ public abstract class ClientStreamTracer extends StreamTracer {
public static final class Builder {
private Attributes transportAttrs = Attributes.EMPTY;
private CallOptions callOptions = CallOptions.DEFAULT;
private int previousAttempts;
private boolean isTransparentRetry;
Builder() {
@ -197,6 +213,16 @@ public abstract class ClientStreamTracer extends StreamTracer {
return this;
}
/**
* Set the number of preceding attempts of the RPC.
*
* @since 1.40.0
*/
public Builder setPreviousAttempts(int previousAttempts) {
this.previousAttempts = previousAttempts;
return this;
}
/**
* Sets whether the stream is a transparent retry.
*
@ -211,7 +237,7 @@ public abstract class ClientStreamTracer extends StreamTracer {
* Builds a new StreamInfo.
*/
public StreamInfo build() {
return new StreamInfo(transportAttrs, callOptions, isTransparentRetry);
return new StreamInfo(transportAttrs, callOptions, previousAttempts, isTransparentRetry);
}
}
}

View File

@ -479,9 +479,6 @@ public abstract class ManagedChannelBuilder<T extends ManagedChannelBuilder<T>>
* transparent retries, which are safe for non-idempotent RPCs. Service config is ideally provided
* by the name resolver, but may also be specified via {@link #defaultServiceConfig}.
*
* <p>For the current release, this method may have a side effect that disables Census stats and
* tracing.
*
* @return this
* @since 1.11.0
*/

View File

@ -17,7 +17,6 @@
package io.grpc.census;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
@ -28,16 +27,20 @@ import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientStreamTracer;
import io.grpc.ClientStreamTracer.StreamInfo;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StreamTracer;
import io.grpc.census.internal.DeprecatedCensusConstants;
import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants;
import io.opencensus.stats.Measure;
import io.opencensus.stats.Measure.MeasureDouble;
import io.opencensus.stats.Measure.MeasureLong;
import io.opencensus.stats.MeasureMap;
@ -51,9 +54,11 @@ import io.opencensus.tags.propagation.TagContextBinarySerializer;
import io.opencensus.tags.propagation.TagContextSerializationException;
import io.opencensus.tags.unsafe.ContextUtils;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@ -61,9 +66,10 @@ import javax.annotation.Nullable;
/**
* Provides factories for {@link StreamTracer} that records stats to Census.
*
* <p>On the client-side, a factory is created for each call, because ClientCall starts earlier than
* the ClientStream, and in some cases may even not create a ClientStream at all. Therefore, it's
* the factory that reports the summary to Census.
* <p>On the client-side, a factory is created for each call, and the factory creates a stream
* tracer for each attempt. If there is no stream created when the call is ended, we still create a
* tracer. It's the tracer that reports per-attempt stats, and the factory that reports the stats
* of the overall RPC, such as RETRIES_PER_CALL, to Census.
*
* <p>On the server-side, there is only one ServerStream per each ServerCall, and ServerStream
* starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call and
@ -168,7 +174,6 @@ final class CensusStatsModule {
}
private static final class ClientTracer extends ClientStreamTracer {
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundMessageCountUpdater;
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundMessageCountUpdater;
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
@ -222,21 +227,31 @@ final class CensusStatsModule {
inboundUncompressedSizeUpdater = tmpInboundUncompressedSizeUpdater;
}
private final CensusStatsModule module;
final Stopwatch stopwatch;
final CallAttemptsTracerFactory attemptsState;
final AtomicBoolean inboundReceivedOrClosed = new AtomicBoolean();
final CensusStatsModule module;
final TagContext parentCtx;
private final TagContext startCtx;
final TagContext startCtx;
final StreamInfo info;
volatile long outboundMessageCount;
volatile long inboundMessageCount;
volatile long outboundWireSize;
volatile long inboundWireSize;
volatile long outboundUncompressedSize;
volatile long inboundUncompressedSize;
long roundtripNanos;
Code statusCode;
ClientTracer(CensusStatsModule module, TagContext parentCtx, TagContext startCtx) {
this.module = checkNotNull(module, "module");
ClientTracer(
CallAttemptsTracerFactory attemptsState, CensusStatsModule module, TagContext parentCtx,
TagContext startCtx, StreamInfo info) {
this.attemptsState = attemptsState;
this.module = module;
this.parentCtx = parentCtx;
this.startCtx = checkNotNull(startCtx, "startCtx");
this.startCtx = startCtx;
this.info = info;
this.stopwatch = module.stopwatchSupplier.get().start();
}
@Override
@ -296,6 +311,11 @@ final class CensusStatsModule {
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void inboundMessage(int seqNo) {
if (inboundReceivedOrClosed.compareAndSet(false, true)) {
// Because inboundUncompressedSize() might be called after streamClosed(),
// we will report stats in callEnded(). Note that this attempt is already committed.
attemptsState.inboundMetricTracer = this;
}
if (inboundMessageCountUpdater != null) {
inboundMessageCountUpdater.getAndIncrement(this);
} else {
@ -316,14 +336,74 @@ final class CensusStatsModule {
module.recordRealTimeMetric(
startCtx, RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1);
}
@Override
public void streamClosed(Status status) {
attemptsState.attemptEnded();
stopwatch.stop();
roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
Deadline deadline = info.getCallOptions().getDeadline();
statusCode = status.getCode();
if (statusCode == Status.Code.CANCELLED && deadline != null) {
// When the server's deadline expires, it can only reset the stream with CANCEL and no
// description. Since our timer may be delayed in firing, we double-check the deadline and
// turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
if (deadline.isExpired()) {
statusCode = Code.DEADLINE_EXCEEDED;
}
}
if (inboundReceivedOrClosed.compareAndSet(false, true)) {
if (module.recordFinishedRpcs) {
// Stream is closed early. So no need to record metrics for any inbound events after this
// point.
recordFinishedRpc();
}
} // Otherwise will report stats in callEnded() to guarantee all inbound metrics are recorded.
}
void recordFinishedRpc() {
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
// TODO(songya): remove the deprecated measure constants once they are completed removed.
.put(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT, 1)
// The latency is double value
.put(
DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY,
roundtripNanos / NANOS_PER_MILLI)
.put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT, outboundMessageCount)
.put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_COUNT, inboundMessageCount)
.put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES, outboundWireSize)
.put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_BYTES, inboundWireSize)
.put(
DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES,
outboundUncompressedSize)
.put(
DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES,
inboundUncompressedSize);
if (statusCode != Code.OK) {
measureMap.put(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT, 1);
}
TagValue statusTag = TagValue.create(statusCode.toString());
measureMap.record(
module
.tagger
.toBuilder(startCtx)
.putLocal(RpcMeasureConstants.GRPC_CLIENT_STATUS, statusTag)
.build());
}
}
@VisibleForTesting
static final class CallAttemptsTracerFactory extends
ClientStreamTracer.InternalLimitedInfoFactory {
@Nullable
private static final AtomicReferenceFieldUpdater<CallAttemptsTracerFactory, ClientTracer>
streamTracerUpdater;
static final MeasureLong RETRIES_PER_CALL =
Measure.MeasureLong.create(
"grpc.io/client/retries_per_call", "Number of retries per call", "1");
static final MeasureLong TRANSPARENT_RETRIES_PER_CALL =
Measure.MeasureLong.create(
"grpc.io/client/transparent_retries_per_call", "Transparent retries per call", "1");
static final MeasureDouble RETRY_DELAY_PER_CALL =
Measure.MeasureDouble.create(
"grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms");
@Nullable
private static final AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> callEndedUpdater;
@ -334,40 +414,45 @@ final class CensusStatsModule {
* (potentially racy) direct updates of the volatile variables.
*/
static {
AtomicReferenceFieldUpdater<CallAttemptsTracerFactory, ClientTracer> tmpStreamTracerUpdater;
AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> tmpCallEndedUpdater;
try {
tmpStreamTracerUpdater =
AtomicReferenceFieldUpdater.newUpdater(
CallAttemptsTracerFactory.class, ClientTracer.class, "streamTracer");
tmpCallEndedUpdater =
AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded");
} catch (Throwable t) {
logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
tmpStreamTracerUpdater = null;
tmpCallEndedUpdater = null;
}
streamTracerUpdater = tmpStreamTracerUpdater;
callEndedUpdater = tmpCallEndedUpdater;
}
ClientTracer inboundMetricTracer;
private final CensusStatsModule module;
private final Stopwatch stopwatch;
private volatile ClientTracer streamTracer;
private volatile int callEnded;
private final TagContext parentCtx;
private final TagContext startCtx;
private final String fullMethodName;
// TODO(zdapeng): optimize memory allocation using AtomicFieldUpdater.
private final AtomicLong attemptsPerCall = new AtomicLong();
private final AtomicLong transparentRetriesPerCall = new AtomicLong();
private final AtomicLong retryDelayNanos = new AtomicLong();
private final AtomicLong lastInactiveTimeStamp = new AtomicLong();
private final AtomicInteger activeStreams = new AtomicInteger();
private final AtomicBoolean activated = new AtomicBoolean();
CallAttemptsTracerFactory(
CensusStatsModule module, TagContext parentCtx, String fullMethodName) {
this.module = checkNotNull(module);
this.parentCtx = checkNotNull(parentCtx);
this.module = checkNotNull(module, "module");
this.parentCtx = checkNotNull(parentCtx, "parentCtx");
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
this.stopwatch = module.stopwatchSupplier.get().start();
TagValue methodTag = TagValue.create(fullMethodName);
this.startCtx = module.tagger.toBuilder(parentCtx)
startCtx = module.tagger.toBuilder(parentCtx)
.putLocal(RpcMeasureConstants.GRPC_CLIENT_METHOD, methodTag)
.build();
this.stopwatch = module.stopwatchSupplier.get().start();
if (module.recordStartedRpcs) {
// Record here in case newClientStreamTracer() would never be called.
module.statsRecorder.newMeasureMap()
.put(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT, 1)
.record(startCtx);
@ -375,30 +460,37 @@ final class CensusStatsModule {
}
@Override
public ClientStreamTracer newClientStreamTracer(
ClientStreamTracer.StreamInfo info, Metadata headers) {
ClientTracer tracer = new ClientTracer(module, parentCtx, startCtx);
// TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than
// one streams. We will need to update this file to support them.
if (streamTracerUpdater != null) {
checkState(
streamTracerUpdater.compareAndSet(this, null, tracer),
"Are you creating multiple streams per call? This class doesn't yet support this case");
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) {
ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, info);
if (activeStreams.incrementAndGet() == 1) {
if (!activated.compareAndSet(false, true)) {
retryDelayNanos.addAndGet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
}
}
if (module.recordStartedRpcs && attemptsPerCall.get() > 0) {
module.statsRecorder.newMeasureMap()
.put(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT, 1)
.record(startCtx);
}
if (info.isTransparentRetry()) {
transparentRetriesPerCall.incrementAndGet();
} else {
checkState(
streamTracer == null,
"Are you creating multiple streams per call? This class doesn't yet support this case");
streamTracer = tracer;
attemptsPerCall.incrementAndGet();
}
return tracer;
}
/**
* Record a finished call and mark the current time as the end time.
*
* <p>Can be called from any thread without synchronization. Calling it the second time or more
* is a no-op.
*/
// Called whenever each attempt is ended.
void attemptEnded() {
if (activeStreams.decrementAndGet() == 0) {
// Race condition between two extremely close events does not matter because the difference
// in the result would be very small.
long lastInactiveTimeStamp =
this.lastInactiveTimeStamp.getAndSet(stopwatch.elapsed(TimeUnit.NANOSECONDS));
retryDelayNanos.addAndGet(-lastInactiveTimeStamp);
}
}
void callEnded(Status status) {
if (callEndedUpdater != null) {
if (callEndedUpdater.getAndSet(this, 1) != 0) {
@ -414,36 +506,30 @@ final class CensusStatsModule {
return;
}
stopwatch.stop();
long roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
ClientTracer tracer = streamTracer;
if (tracer == null) {
tracer = new ClientTracer(module, parentCtx, startCtx);
if (attemptsPerCall.get() == 0) {
ClientTracer tracer = new ClientTracer(this, module, parentCtx, startCtx, null);
tracer.roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
tracer.statusCode = status.getCode();
tracer.recordFinishedRpc();
} else if (inboundMetricTracer != null) {
inboundMetricTracer.recordFinishedRpc();
}
long retriesPerCall = 0;
long attempts = attemptsPerCall.get();
if (attempts > 0) {
retriesPerCall = attempts - 1;
}
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
// TODO(songya): remove the deprecated measure constants once they are completed removed.
.put(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT, 1)
// The latency is double value
.put(
DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY,
roundtripNanos / NANOS_PER_MILLI)
.put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount)
.put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount)
.put(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES, tracer.outboundWireSize)
.put(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_BYTES, tracer.inboundWireSize)
.put(
DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES,
tracer.outboundUncompressedSize)
.put(
DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES,
tracer.inboundUncompressedSize);
if (!status.isOk()) {
measureMap.put(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT, 1);
}
.put(RETRIES_PER_CALL, retriesPerCall)
.put(TRANSPARENT_RETRIES_PER_CALL, transparentRetriesPerCall.get())
.put(RETRY_DELAY_PER_CALL, retryDelayNanos.get() / NANOS_PER_MILLI);
TagValue methodTag = TagValue.create(fullMethodName);
TagValue statusTag = TagValue.create(status.getCode().toString());
measureMap.record(
module
.tagger
.toBuilder(startCtx)
module.tagger
.toBuilder(parentCtx)
.putLocal(RpcMeasureConstants.GRPC_CLIENT_METHOD, methodTag)
.putLocal(RpcMeasureConstants.GRPC_CLIENT_STATUS, statusTag)
.build());
}

View File

@ -32,6 +32,7 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerStreamTracer;
import io.grpc.StreamTracer;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.BlankSpan;
import io.opencensus.trace.EndSpanOptions;
import io.opencensus.trace.MessageEvent;
@ -60,7 +61,8 @@ import javax.annotation.Nullable;
final class CensusTracingModule {
private static final Logger logger = Logger.getLogger(CensusTracingModule.class.getName());
@Nullable private static final AtomicIntegerFieldUpdater<ClientCallTracer> callEndedUpdater;
@Nullable
private static final AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> callEndedUpdater;
@Nullable private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
@ -70,11 +72,11 @@ final class CensusTracingModule {
* (potentially racy) direct updates of the volatile variables.
*/
static {
AtomicIntegerFieldUpdater<ClientCallTracer> tmpCallEndedUpdater;
AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> tmpCallEndedUpdater;
AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
try {
tmpCallEndedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded");
AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded");
tmpStreamClosedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
} catch (Throwable t) {
@ -116,11 +118,12 @@ final class CensusTracingModule {
}
/**
* Creates a {@link ClientCallTracer} for a new call.
* Creates a {@link CallAttemptsTracerFactory} for a new call.
*/
@VisibleForTesting
ClientCallTracer newClientCallTracer(@Nullable Span parentSpan, MethodDescriptor<?, ?> method) {
return new ClientCallTracer(parentSpan, method);
CallAttemptsTracerFactory newClientCallTracer(
@Nullable Span parentSpan, MethodDescriptor<?, ?> method) {
return new CallAttemptsTracerFactory(parentSpan, method);
}
/**
@ -223,19 +226,21 @@ final class CensusTracingModule {
}
@VisibleForTesting
final class ClientCallTracer extends ClientStreamTracer.InternalLimitedInfoFactory {
final class CallAttemptsTracerFactory extends ClientStreamTracer.InternalLimitedInfoFactory {
volatile int callEnded;
private final boolean isSampledToLocalTracing;
private final Span span;
private final String fullMethodName;
ClientCallTracer(@Nullable Span parentSpan, MethodDescriptor<?, ?> method) {
CallAttemptsTracerFactory(@Nullable Span parentSpan, MethodDescriptor<?, ?> method) {
checkNotNull(method, "method");
this.isSampledToLocalTracing = method.isSampledToLocalTracing();
this.fullMethodName = method.getFullMethodName();
this.span =
censusTracer
.spanBuilderWithExplicitParent(
generateTraceSpanName(false, method.getFullMethodName()),
generateTraceSpanName(false, fullMethodName),
parentSpan)
.setRecordEvents(true)
.startSpan();
@ -244,7 +249,17 @@ final class CensusTracingModule {
@Override
public ClientStreamTracer newClientStreamTracer(
ClientStreamTracer.StreamInfo info, Metadata headers) {
return new ClientTracer(span, tracingHeader);
Span attemptSpan = censusTracer
.spanBuilderWithExplicitParent(
"Attempt." + fullMethodName.replace('/', '.'),
span)
.setRecordEvents(true)
.startSpan();
attemptSpan.putAttribute(
"previous-rpc-attempts", AttributeValue.longAttributeValue(info.getPreviousAttempts()));
attemptSpan.putAttribute(
"transparent-retry", AttributeValue.booleanAttributeValue(info.isTransparentRetry()));
return new ClientTracer(attemptSpan, tracingHeader, isSampledToLocalTracing);
}
/**
@ -271,10 +286,13 @@ final class CensusTracingModule {
private static final class ClientTracer extends ClientStreamTracer {
private final Span span;
final Metadata.Key<SpanContext> tracingHeader;
final boolean isSampledToLocalTracing;
ClientTracer(Span span, Metadata.Key<SpanContext> tracingHeader) {
ClientTracer(
Span span, Metadata.Key<SpanContext> tracingHeader, boolean isSampledToLocalTracing) {
this.span = checkNotNull(span, "span");
this.tracingHeader = tracingHeader;
this.isSampledToLocalTracing = isSampledToLocalTracing;
}
@Override
@ -298,6 +316,11 @@ final class CensusTracingModule {
recordMessageEvent(
span, MessageEvent.Type.RECEIVED, seqNo, optionalWireSize, optionalUncompressedSize);
}
@Override
public void streamClosed(io.grpc.Status status) {
span.end(createEndSpanOptions(status, isSampledToLocalTracing));
}
}
@ -388,7 +411,7 @@ final class CensusTracingModule {
// Safe usage of the unsafe trace API because CONTEXT_SPAN_KEY.get() returns the same value
// as Tracer.getCurrentSpan() except when no value available when the return value is null
// for the direct access and BlankSpan when Tracer API is used.
final ClientCallTracer tracerFactory =
final CallAttemptsTracerFactory tracerFactory =
newClientCallTracer(ContextUtils.getValue(Context.current()), method);
ClientCall<ReqT, RespT> call =
next.newCall(

View File

@ -18,6 +18,9 @@ package io.grpc.census;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRIES_PER_CALL;
import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRY_DELAY_PER_CALL;
import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.TRANSPARENT_RETRIES_PER_CALL;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -58,6 +61,7 @@ import io.grpc.ServerServiceDefinition;
import io.grpc.ServerStreamTracer;
import io.grpc.ServerStreamTracer.ServerCallInfo;
import io.grpc.Status;
import io.grpc.census.CensusTracingModule.CallAttemptsTracerFactory;
import io.grpc.census.internal.DeprecatedCensusConstants;
import io.grpc.internal.FakeClock;
import io.grpc.internal.testing.StatsTestUtils;
@ -81,6 +85,7 @@ import io.opencensus.stats.StatsComponent;
import io.opencensus.stats.View;
import io.opencensus.tags.TagContext;
import io.opencensus.tags.TagValue;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.BlankSpan;
import io.opencensus.trace.EndSpanOptions;
import io.opencensus.trace.MessageEvent;
@ -173,10 +178,12 @@ public class CensusModulesTest {
private final Random random = new Random(1234);
private final Span fakeClientParentSpan = MockableSpan.generateRandomSpan(random);
private final Span spyClientSpan = spy(MockableSpan.generateRandomSpan(random));
private final SpanContext fakeClientSpanContext = spyClientSpan.getContext();
private final Span spyAttemptSpan = spy(MockableSpan.generateRandomSpan(random));
private final SpanContext fakeAttemptSpanContext = spyAttemptSpan.getContext();
private final Span spyServerSpan = spy(MockableSpan.generateRandomSpan(random));
private final byte[] binarySpanContext = new byte[]{3, 1, 5};
private final SpanBuilder spyClientSpanBuilder = spy(new MockableSpan.Builder());
private final SpanBuilder spyAttemptSpanBuilder = spy(new MockableSpan.Builder());
private final SpanBuilder spyServerSpanBuilder = spy(new MockableSpan.Builder());
@Rule
@ -201,15 +208,20 @@ public class CensusModulesTest {
@Before
public void setUp() throws Exception {
when(spyClientSpanBuilder.startSpan()).thenReturn(spyClientSpan);
when(tracer.spanBuilderWithExplicitParent(anyString(), ArgumentMatchers.<Span>any()))
when(spyAttemptSpanBuilder.startSpan()).thenReturn(spyAttemptSpan);
when(tracer.spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), ArgumentMatchers.<Span>any()))
.thenReturn(spyClientSpanBuilder);
when(tracer.spanBuilderWithExplicitParent(
eq("Attempt.package1.service2.method3"), ArgumentMatchers.<Span>any()))
.thenReturn(spyAttemptSpanBuilder);
when(spyServerSpanBuilder.startSpan()).thenReturn(spyServerSpan);
when(tracer.spanBuilderWithRemoteParent(anyString(), ArgumentMatchers.<SpanContext>any()))
.thenReturn(spyServerSpanBuilder);
when(mockTracingPropagationHandler.toByteArray(any(SpanContext.class)))
.thenReturn(binarySpanContext);
when(mockTracingPropagationHandler.fromByteArray(any(byte[].class)))
.thenReturn(fakeClientSpanContext);
.thenReturn(fakeAttemptSpanContext);
censusStats =
new CensusStatsModule(
tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(),
@ -292,7 +304,7 @@ public class CensusModulesTest {
assertEquals(2, capturedCallOptions.get().getStreamTracerFactories().size());
assertTrue(
capturedCallOptions.get().getStreamTracerFactories().get(0)
instanceof CensusTracingModule.ClientCallTracer);
instanceof CallAttemptsTracerFactory);
assertTrue(
capturedCallOptions.get().getStreamTracerFactories().get(1)
instanceof CensusStatsModule.CallAttemptsTracerFactory);
@ -355,6 +367,7 @@ public class CensusModulesTest {
.setSampleToLocalSpanStore(false)
.build());
verify(spyClientSpan, never()).end();
assertZeroRetryRecorded();
}
@Test
@ -489,11 +502,200 @@ public class CensusModulesTest {
DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
assertEquals(30 + 100 + 16 + 24,
record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
assertZeroRetryRecorded();
} else {
assertNull(statsRecorder.pollRecord());
}
}
// This test is only unit-testing the stat recording logic. The retry behavior is faked.
@Test
public void recordRetryStats() {
CensusStatsModule localCensusStats =
new CensusStatsModule(
tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(),
true, true, true, true);
CensusStatsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
new CensusStatsModule.CallAttemptsTracerFactory(
localCensusStats, tagger.empty(), method.getFullMethodName());
ClientStreamTracer tracer =
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
assertEquals(1, record.tags.size());
TagValue methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
assertEquals(
1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT));
fakeClock.forwardTime(30, MILLISECONDS);
tracer.outboundHeaders();
fakeClock.forwardTime(100, MILLISECONDS);
tracer.outboundMessage(0);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true);
tracer.outboundMessage(1);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true);
tracer.outboundWireSize(1028);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD, 1028, true, true);
tracer.outboundUncompressedSize(1128);
fakeClock.forwardTime(24, MILLISECONDS);
tracer.streamClosed(Status.UNAVAILABLE);
record = statsRecorder.pollRecord();
methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
TagValue statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS);
assertEquals(Status.Code.UNAVAILABLE.toString(), statusTag.asString());
assertEquals(
1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT));
assertEquals(1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT));
assertEquals(
2, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT));
assertEquals(
1028, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES));
assertEquals(
1128,
record.getMetricAsLongOrFail(
DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
assertEquals(
30 + 100 + 24,
record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
// faking retry
fakeClock.forwardTime(1000, MILLISECONDS);
tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
record = statsRecorder.pollRecord();
assertEquals(1, record.tags.size());
methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
assertEquals(
1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT));
tracer.outboundHeaders();
tracer.outboundMessage(0);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true);
tracer.outboundMessage(1);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true);
tracer.outboundWireSize(1028);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD, 1028, true, true);
tracer.outboundUncompressedSize(1128);
fakeClock.forwardTime(100, MILLISECONDS);
tracer.streamClosed(Status.NOT_FOUND);
record = statsRecorder.pollRecord();
methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS);
assertEquals(Status.Code.NOT_FOUND.toString(), statusTag.asString());
assertEquals(
1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT));
assertEquals(1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT));
assertEquals(
2, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT));
assertEquals(
1028, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES));
assertEquals(
1128,
record.getMetricAsLongOrFail(
DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
assertEquals(
100 ,
record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
// fake transparent retry
fakeClock.forwardTime(10, MILLISECONDS);
tracer = callAttemptsTracerFactory.newClientStreamTracer(
STREAM_INFO.toBuilder().setIsTransparentRetry(true).build(), new Metadata());
record = statsRecorder.pollRecord();
assertEquals(1, record.tags.size());
methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
assertEquals(
1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT));
tracer.streamClosed(Status.UNAVAILABLE);
record = statsRecorder.pollRecord();
statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS);
assertEquals(Status.Code.UNAVAILABLE.toString(), statusTag.asString());
assertEquals(
1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT));
assertEquals(1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT));
assertEquals(
0, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT));
assertEquals(
0, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES));
// fake another transparent retry
fakeClock.forwardTime(10, MILLISECONDS);
tracer = callAttemptsTracerFactory.newClientStreamTracer(
STREAM_INFO.toBuilder().setIsTransparentRetry(true).build(), new Metadata());
record = statsRecorder.pollRecord();
assertEquals(1, record.tags.size());
assertEquals(
1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT));
tracer.outboundHeaders();
tracer.outboundMessage(0);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true);
tracer.outboundMessage(1);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, true, true);
tracer.outboundWireSize(1028);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD, 1028, true, true);
tracer.outboundUncompressedSize(1128);
fakeClock.forwardTime(16, MILLISECONDS);
tracer.inboundMessage(0);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_RECEIVED_MESSAGES_PER_METHOD, 1, true, true);
tracer.inboundWireSize(33);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_RECEIVED_BYTES_PER_METHOD, 33, true, true);
tracer.inboundUncompressedSize(67);
fakeClock.forwardTime(24, MILLISECONDS);
// RPC succeeded
tracer.streamClosed(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK);
record = statsRecorder.pollRecord();
statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS);
assertEquals(Status.Code.OK.toString(), statusTag.asString());
assertEquals(
1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT));
assertThat(record.getMetric(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT)).isNull();
assertEquals(
2, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT));
assertEquals(
1028, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_BYTES));
assertEquals(
1128,
record.getMetricAsLongOrFail(
DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
assertEquals(
1, record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_COUNT));
assertEquals(
33,
record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_RESPONSE_BYTES));
assertEquals(
67,
record.getMetricAsLongOrFail(
DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
assertEquals(
16 + 24 ,
record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
record = statsRecorder.pollRecord();
methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS);
assertEquals(Status.Code.OK.toString(), statusTag.asString());
assertThat(record.getMetric(RETRIES_PER_CALL)).isEqualTo(1);
assertThat(record.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(2);
assertThat(record.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(1000D + 10 + 10);
}
private void assertRealTimeMetric(
Measure measure, long expectedValue, boolean recordRealTimeMetrics, boolean clientSide) {
StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
@ -517,16 +719,28 @@ public class CensusModulesTest {
assertEquals(expectedValue, record.getMetricAsLongOrFail(measure));
}
private void assertZeroRetryRecorded() {
StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
TagValue methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
assertThat(record.getMetric(RETRIES_PER_CALL)).isEqualTo(0);
assertThat(record.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(0);
assertThat(record.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(0D);
}
@Test
public void clientBasicTracingDefaultSpan() {
CensusTracingModule.ClientCallTracer callTracer =
CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(null, method);
Metadata headers = new Metadata();
ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
clientStreamTracer.streamCreated(Attributes.EMPTY, headers);
verify(tracer).spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), ArgumentMatchers.<Span>isNull());
verify(tracer).spanBuilderWithExplicitParent(
eq("Attempt.package1.service2.method3"), eq(spyClientSpan));
verify(spyClientSpan, never()).end(any(EndSpanOptions.class));
verify(spyAttemptSpan, never()).end(any(EndSpanOptions.class));
clientStreamTracer.outboundMessage(0);
clientStreamTracer.outboundMessageSent(0, 882, -1);
@ -538,8 +752,12 @@ public class CensusModulesTest {
clientStreamTracer.streamClosed(Status.OK);
callTracer.callEnded(Status.OK);
InOrder inOrder = inOrder(spyClientSpan);
inOrder.verify(spyClientSpan, times(3)).addMessageEvent(messageEventCaptor.capture());
InOrder inOrder = inOrder(spyClientSpan, spyAttemptSpan);
inOrder.verify(spyAttemptSpan)
.putAttribute("previous-rpc-attempts", AttributeValue.longAttributeValue(0));
inOrder.verify(spyAttemptSpan)
.putAttribute("transparent-retry", AttributeValue.booleanAttributeValue(false));
inOrder.verify(spyAttemptSpan, times(3)).addMessageEvent(messageEventCaptor.capture());
List<MessageEvent> events = messageEventCaptor.getAllValues();
assertEquals(
MessageEvent.builder(MessageEvent.Type.SENT, 0).setCompressedMessageSize(882).build(),
@ -553,18 +771,23 @@ public class CensusModulesTest {
.setUncompressedMessageSize(90)
.build(),
events.get(2));
inOrder.verify(spyAttemptSpan).end(
EndSpanOptions.builder()
.setStatus(io.opencensus.trace.Status.OK)
.setSampleToLocalSpanStore(false)
.build());
inOrder.verify(spyClientSpan).end(
EndSpanOptions.builder()
.setStatus(io.opencensus.trace.Status.OK)
.setSampleToLocalSpanStore(false)
.build());
verifyNoMoreInteractions(spyClientSpan);
inOrder.verifyNoMoreInteractions();
verifyNoMoreInteractions(tracer);
}
@Test
public void clientTracingSampledToLocalSpanStore() {
CensusTracingModule.ClientCallTracer callTracer =
CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(null, sampledMethod);
callTracer.callEnded(Status.OK);
@ -631,11 +854,12 @@ public class CensusModulesTest {
3000,
record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
assertNull(record.getMetric(DeprecatedCensusConstants.RPC_CLIENT_SERVER_ELAPSED_TIME));
assertZeroRetryRecorded();
}
@Test
public void clientStreamNeverCreatedStillRecordTracing() {
CensusTracingModule.ClientCallTracer callTracer =
CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(fakeClientParentSpan, method);
verify(tracer).spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), same(fakeClientParentSpan));
@ -770,6 +994,7 @@ public class CensusModulesTest {
assertNull(clientRecord.getMetric(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT));
TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG);
assertEquals("extra-tag-value-897", clientPropagatedTag.asString());
assertZeroRetryRecorded();
}
if (!recordStats) {
@ -812,16 +1037,18 @@ public class CensusModulesTest {
@Test
public void traceHeadersPropagateSpanContext() throws Exception {
CensusTracingModule.ClientCallTracer callTracer =
CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(fakeClientParentSpan, method);
Metadata headers = new Metadata();
ClientStreamTracer streamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
streamTracer.streamCreated(Attributes.EMPTY, headers);
verify(mockTracingPropagationHandler).toByteArray(same(fakeClientSpanContext));
verify(mockTracingPropagationHandler).toByteArray(same(fakeAttemptSpanContext));
verifyNoMoreInteractions(mockTracingPropagationHandler);
verify(tracer).spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), same(fakeClientParentSpan));
verify(tracer).spanBuilderWithExplicitParent(
eq("Attempt.package1.service2.method3"), same(spyClientSpan));
verify(spyClientSpanBuilder).setRecordEvents(eq(true));
verifyNoMoreInteractions(tracer);
assertTrue(headers.containsKey(censusTracing.tracingHeader));
@ -831,7 +1058,7 @@ public class CensusModulesTest {
method.getFullMethodName(), headers);
verify(mockTracingPropagationHandler).fromByteArray(same(binarySpanContext));
verify(tracer).spanBuilderWithRemoteParent(
eq("Recv.package1.service2.method3"), same(spyClientSpan.getContext()));
eq("Recv.package1.service2.method3"), same(spyAttemptSpan.getContext()));
verify(spyServerSpanBuilder).setRecordEvents(eq(true));
Context filteredContext = serverTracer.filterContext(Context.ROOT);
@ -840,7 +1067,7 @@ public class CensusModulesTest {
@Test
public void traceHeaders_propagateSpanContext() throws Exception {
CensusTracingModule.ClientCallTracer callTracer =
CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(fakeClientParentSpan, method);
Metadata headers = new Metadata();
@ -854,10 +1081,12 @@ public class CensusModulesTest {
public void traceHeaders_missingCensusImpl_notPropagateSpanContext()
throws Exception {
reset(spyClientSpanBuilder);
reset(spyAttemptSpanBuilder);
when(spyClientSpanBuilder.startSpan()).thenReturn(BlankSpan.INSTANCE);
when(spyAttemptSpanBuilder.startSpan()).thenReturn(BlankSpan.INSTANCE);
Metadata headers = new Metadata();
CensusTracingModule.ClientCallTracer callTracer =
CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(BlankSpan.INSTANCE, method);
callTracer.newClientStreamTracer(STREAM_INFO, headers).streamCreated(Attributes.EMPTY, headers);
@ -867,14 +1096,16 @@ public class CensusModulesTest {
@Test
public void traceHeaders_clientMissingCensusImpl_preservingHeaders() throws Exception {
reset(spyClientSpanBuilder);
reset(spyAttemptSpanBuilder);
when(spyClientSpanBuilder.startSpan()).thenReturn(BlankSpan.INSTANCE);
when(spyAttemptSpanBuilder.startSpan()).thenReturn(BlankSpan.INSTANCE);
Metadata headers = new Metadata();
headers.put(
Metadata.Key.of("never-used-key-bin", Metadata.BINARY_BYTE_MARSHALLER),
new byte[] {});
Set<String> originalHeaderKeys = new HashSet<>(headers.keys());
CensusTracingModule.ClientCallTracer callTracer =
CallAttemptsTracerFactory callTracer =
censusTracing.newClientCallTracer(BlankSpan.INSTANCE, method);
callTracer.newClientStreamTracer(STREAM_INFO, headers).streamCreated(Attributes.EMPTY, headers);
@ -885,9 +1116,9 @@ public class CensusModulesTest {
public void traceHeaderMalformed() throws Exception {
// As comparison, normal header parsing
Metadata headers = new Metadata();
headers.put(censusTracing.tracingHeader, fakeClientSpanContext);
headers.put(censusTracing.tracingHeader, fakeAttemptSpanContext);
// mockTracingPropagationHandler was stubbed to always return fakeServerParentSpanContext
assertSame(spyClientSpan.getContext(), headers.get(censusTracing.tracingHeader));
assertSame(spyAttemptSpan.getContext(), headers.get(censusTracing.tracingHeader));
// Make BinaryPropagationHandler always throw when parsing the header
when(mockTracingPropagationHandler.fromByteArray(any(byte[].class)))
@ -895,7 +1126,7 @@ public class CensusModulesTest {
headers = new Metadata();
assertNull(headers.get(censusTracing.tracingHeader));
headers.put(censusTracing.tracingHeader, fakeClientSpanContext);
headers.put(censusTracing.tracingHeader, fakeAttemptSpanContext);
assertSame(SpanContext.INVALID, headers.get(censusTracing.tracingHeader));
assertNotSame(spyClientSpan.getContext(), SpanContext.INVALID);

View File

@ -255,7 +255,8 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
effectiveDeadline, context.getDeadline(), callOptions.getDeadline());
stream = clientStreamProvider.newStream(method, callOptions, headers, context);
} else {
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(callOptions, headers, false);
ClientStreamTracer[] tracers =
GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
stream = new FailingClientStream(
DEADLINE_EXCEEDED.withDescription(
"ClientCall started after deadline exceeded: " + effectiveDeadline),

View File

@ -757,11 +757,12 @@ public final class GrpcUtil {
/** Gets stream tracers based on CallOptions. */
public static ClientStreamTracer[] getClientStreamTracers(
CallOptions callOptions, Metadata headers, boolean isTransparentRetry) {
CallOptions callOptions, Metadata headers, int previousAttempts, boolean isTransparentRetry) {
List<ClientStreamTracer.Factory> factories = callOptions.getStreamTracerFactories();
ClientStreamTracer[] tracers = new ClientStreamTracer[factories.size() + 1];
StreamInfo streamInfo = StreamInfo.newBuilder()
.setCallOptions(callOptions)
.setPreviousAttempts(previousAttempts)
.setIsTransparentRetry(isTransparentRetry)
.build();
for (int i = 0; i < factories.size(); i++) {

View File

@ -533,7 +533,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
getTransport(new PickSubchannelArgsImpl(method, headers, callOptions));
Context origContext = context.attach();
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
callOptions, headers, /* isTransparentRetry= */ false);
callOptions, headers, 0, /* isTransparentRetry= */ false);
try {
return transport.newStream(method, headers, callOptions, tracers);
} finally {
@ -572,10 +572,11 @@ final class ManagedChannelImpl extends ManagedChannel implements
@Override
ClientStream newSubstream(
Metadata newHeaders, ClientStreamTracer.Factory factory, boolean isTransparentRetry) {
Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
boolean isTransparentRetry) {
CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
ClientStreamTracer[] tracers =
GrpcUtil.getClientStreamTracers(newOptions, newHeaders, isTransparentRetry);
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
newOptions, newHeaders, previousAttempts, isTransparentRetry);
ClientTransport transport =
getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions));
Context origContext = context.attach();
@ -624,7 +625,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
ProxyDetector proxyDetector =
builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry;
this.retryEnabled = builder.retryEnabled;
this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
this.offloadExecutorHolder =
new ExecutorHolder(

View File

@ -143,10 +143,6 @@ public final class ManagedChannelImplBuilder
long retryBufferSize = DEFAULT_RETRY_BUFFER_SIZE_IN_BYTES;
long perRpcBufferLimit = DEFAULT_PER_RPC_BUFFER_LIMIT_IN_BYTES;
boolean retryEnabled = false; // TODO(zdapeng): default to true
// Temporarily disable retry when stats or tracing is enabled to avoid breakage, until we know
// what should be the desired behavior for retry + stats/tracing.
// TODO(zdapeng): delete me
boolean temporarilyDisableRetry;
InternalChannelz channelz = InternalChannelz.instance();
int maxTraceEvents;
@ -460,8 +456,6 @@ public final class ManagedChannelImplBuilder
@Override
public ManagedChannelImplBuilder enableRetry() {
retryEnabled = true;
statsEnabled = false;
tracingEnabled = false;
return this;
}
@ -592,9 +586,6 @@ public final class ManagedChannelImplBuilder
/**
* Disable or enable tracing features. Enabled by default.
*
* <p>For the current release, calling {@code setTracingEnabled(true)} may have a side effect that
* disables retry.
*/
public void setTracingEnabled(boolean value) {
tracingEnabled = value;
@ -642,9 +633,7 @@ public final class ManagedChannelImplBuilder
List<ClientInterceptor> getEffectiveInterceptors() {
List<ClientInterceptor> effectiveInterceptors =
new ArrayList<>(this.interceptors);
temporarilyDisableRetry = false;
if (statsEnabled) {
temporarilyDisableRetry = true;
ClientInterceptor statsInterceptor = null;
try {
Class<?> censusStatsAccessor =
@ -679,7 +668,6 @@ public final class ManagedChannelImplBuilder
}
}
if (tracingEnabled) {
temporarilyDisableRetry = true;
ClientInterceptor tracingInterceptor = null;
try {
Class<?> censusTracingAccessor =

View File

@ -88,7 +88,7 @@ final class OobChannel extends ManagedChannel implements InternalInstrumented<Ch
public ClientStream newStream(MethodDescriptor<?, ?> method,
CallOptions callOptions, Metadata headers, Context context) {
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
callOptions, headers, /* isTransparentRetry= */ false);
callOptions, headers, 0, /* isTransparentRetry= */ false);
Context origContext = context.attach();
// delayed transport's newStream() always acquires a lock, but concurrent performance doesn't
// matter here because OOB communication should be sparse, and it's not on application RPC's

View File

@ -218,7 +218,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
Metadata newHeaders = updateHeaders(headers, previousAttemptCount);
// NOTICE: This set _must_ be done before stream.start() and it actually is.
sub.stream = newSubstream(newHeaders, tracerFactory, isTransparentRetry);
sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry);
return sub;
}
@ -227,7 +227,8 @@ abstract class RetriableStream<ReqT> implements ClientStream {
* Client stream is not yet started.
*/
abstract ClientStream newSubstream(
Metadata headers, ClientStreamTracer.Factory tracerFactory, boolean isTransparentRetry);
Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts,
boolean isTransparentRetry);
/** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */
@VisibleForTesting
@ -869,24 +870,26 @@ abstract class RetriableStream<ReqT> implements ClientStream {
synchronized (lock) {
scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
}
scheduledRetryCopy.setFuture(
scheduledExecutorService.schedule(
class RetryBackoffRunnable implements Runnable {
@Override
public void run() {
callExecutor.execute(
new Runnable() {
@Override
public void run() {
callExecutor.execute(
new Runnable() {
@Override
public void run() {
// retry
Substream newSubstream = createSubstream(
substream.previousAttemptCount + 1,
false);
drain(newSubstream);
}
});
// retry
Substream newSubstream = createSubstream(
substream.previousAttemptCount + 1,
false);
drain(newSubstream);
}
},
});
}
}
scheduledRetryCopy.setFuture(
scheduledExecutorService.schedule(
new RetryBackoffRunnable(),
retryPlan.backoffNanos,
TimeUnit.NANOSECONDS));
return;

View File

@ -59,7 +59,7 @@ final class SubchannelChannel extends Channel {
transport = notReadyTransport;
}
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
callOptions, headers, /* isTransparentRetry= */ false);
callOptions, headers, 0, /* isTransparentRetry= */ false);
Context origContext = context.attach();
try {
return transport.newStream(method, headers, callOptions, tracers);

View File

@ -164,7 +164,8 @@ public class RetriableStreamTest {
@Override
ClientStream newSubstream(
Metadata metadata, ClientStreamTracer.Factory tracerFactory, boolean isTransparentRetry) {
Metadata metadata, ClientStreamTracer.Factory tracerFactory, int previousAttempts,
boolean isTransparentRetry) {
bufferSizeTracer =
tracerFactory.newClientStreamTracer(STREAM_INFO, metadata);
int actualPreviousRpcAttemptsInHeader = metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS) == null

View File

@ -44,6 +44,7 @@ dependencies {
project(':grpc-grpclb')
testImplementation project(':grpc-context').sourceSets.test.output,
project(':grpc-api').sourceSets.test.output,
project(':grpc-core').sourceSets.test.output,
libraries.mockito
alpnagent libraries.jetty_alpn_agent
}

View File

@ -92,6 +92,9 @@ import io.grpc.testing.integration.Messages.StreamingInputCallResponse;
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants;
import io.opencensus.stats.Measure;
import io.opencensus.stats.Measure.MeasureDouble;
import io.opencensus.stats.Measure.MeasureLong;
import io.opencensus.tags.TagKey;
import io.opencensus.tags.TagValue;
import io.opencensus.trace.Span;
@ -152,6 +155,15 @@ public abstract class AbstractInteropTest {
* SETTINGS/WINDOW_UPDATE exchange.
*/
public static final int TEST_FLOW_CONTROL_WINDOW = 65 * 1024;
private static final MeasureLong RETRIES_PER_CALL =
Measure.MeasureLong.create(
"grpc.io/client/retries_per_call", "Number of retries per call", "1");
private static final MeasureLong TRANSPARENT_RETRIES_PER_CALL =
Measure.MeasureLong.create(
"grpc.io/client/transparent_retries_per_call", "Transparent retries per call", "1");
private static final MeasureDouble RETRY_DELAY_PER_CALL =
Measure.MeasureDouble.create(
"grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms");
private static final FakeTagger tagger = new FakeTagger();
private static final FakeTagContextBinarySerializer tagContextBinarySerializer =
@ -1234,6 +1246,7 @@ public abstract class AbstractInteropTest {
checkEndTags(
clientEndRecord, "grpc.testing.TestService/EmptyCall",
Status.DEADLINE_EXCEEDED.getCode(), true);
assertZeroRetryRecorded();
}
// warm up the channel
@ -1243,6 +1256,7 @@ public abstract class AbstractInteropTest {
clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
// clientEndRecord
clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
assertZeroRetryRecorded();
}
try {
blockingStub
@ -1261,6 +1275,7 @@ public abstract class AbstractInteropTest {
checkEndTags(
clientEndRecord, "grpc.testing.TestService/EmptyCall",
Status.DEADLINE_EXCEEDED.getCode(), true);
assertZeroRetryRecorded();
}
}
@ -1978,6 +1993,13 @@ public abstract class AbstractInteropTest {
assertStatsTrace(method, status, null, null);
}
private void assertZeroRetryRecorded() {
MetricsRecord retryRecord = clientStatsRecorder.pollRecord();
assertThat(retryRecord.getMetric(RETRIES_PER_CALL)).isEqualTo(0);
assertThat(retryRecord.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(0);
assertThat(retryRecord.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(0D);
}
private void assertClientStatsTrace(String method, Status.Code code,
Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses) {
// Tracer-based stats
@ -2007,6 +2029,7 @@ public abstract class AbstractInteropTest {
if (requests != null && responses != null) {
checkCensus(clientEndRecord, false, requests, responses);
}
assertZeroRetryRecorded();
}
}

View File

@ -17,13 +17,21 @@
package io.grpc.testing.integration;
import static com.google.common.truth.Truth.assertThat;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableMap;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientStreamTracer;
import io.grpc.ClientStreamTracer.StreamInfo;
import io.grpc.Deadline;
import io.grpc.Deadline.Ticker;
import io.grpc.IntegerMarshaller;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
@ -36,7 +44,15 @@ import io.grpc.ServerCallHandler;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StringMarshaller;
import io.grpc.census.InternalCensusStatsAccessor;
import io.grpc.census.internal.DeprecatedCensusConstants;
import io.grpc.internal.FakeClock;
import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder;
import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer;
import io.grpc.internal.testing.StatsTestUtils.FakeTagger;
import io.grpc.internal.testing.StatsTestUtils.MetricsRecord;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.testing.GrpcCleanupRule;
@ -45,11 +61,20 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.util.concurrent.ScheduledFuture;
import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants;
import io.opencensus.stats.Measure;
import io.opencensus.stats.Measure.MeasureDouble;
import io.opencensus.stats.Measure.MeasureLong;
import io.opencensus.tags.TagValue;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -61,65 +86,111 @@ import org.mockito.junit.MockitoRule;
@RunWith(JUnit4.class)
public class RetryTest {
private static final FakeTagger tagger = new FakeTagger();
private static final FakeTagContextBinarySerializer tagContextBinarySerializer =
new FakeTagContextBinarySerializer();
private static final MeasureLong RETRIES_PER_CALL =
Measure.MeasureLong.create(
"grpc.io/client/retries_per_call", "Number of retries per call", "1");
private static final MeasureLong TRANSPARENT_RETRIES_PER_CALL =
Measure.MeasureLong.create(
"grpc.io/client/transparent_retries_per_call", "Transparent retries per call", "1");
private static final MeasureDouble RETRY_DELAY_PER_CALL =
Measure.MeasureDouble.create(
"grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms");
@Rule
public final MockitoRule mocks = MockitoJUnit.rule();
@Rule
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
private final FakeClock fakeClock = new FakeClock();
@Mock
private ClientCall.Listener<Integer> mockCallListener;
@Test
public void retryUntilBufferLimitExceeded() throws Exception {
String message = "String of length 20.";
int bufferLimit = message.length() * 2 - 1; // Can buffer no more than 1 message.
MethodDescriptor<String, Integer> clientStreamingMethod =
MethodDescriptor.<String, Integer>newBuilder()
.setType(MethodType.CLIENT_STREAMING)
.setFullMethodName("service/method")
.setRequestMarshaller(new StringMarshaller())
.setResponseMarshaller(new IntegerMarshaller())
.build();
final LinkedBlockingQueue<ServerCall<String, Integer>> serverCalls =
new LinkedBlockingQueue<>();
ServerMethodDefinition<String, Integer> methodDefinition = ServerMethodDefinition.create(
clientStreamingMethod,
new ServerCallHandler<String, Integer>() {
@Override
public Listener<String> startCall(ServerCall<String, Integer> call, Metadata headers) {
serverCalls.offer(call);
return new Listener<String>() {};
private CountDownLatch backoffLatch = new CountDownLatch(1);
private final EventLoopGroup group = new DefaultEventLoopGroup() {
@SuppressWarnings("FutureReturnValueIgnored")
@Override
public ScheduledFuture<?> schedule(
final Runnable command, final long delay, final TimeUnit unit) {
if (!command.getClass().getName().contains("RetryBackoffRunnable")) {
return super.schedule(command, delay, unit);
}
fakeClock.getScheduledExecutorService().schedule(
new Runnable() {
@Override
public void run() {
group.execute(command);
}
},
delay,
unit);
backoffLatch.countDown();
return super.schedule(
new Runnable() {
@Override
public void run() {} // no-op
},
0,
TimeUnit.NANOSECONDS);
}
};
private final FakeStatsRecorder clientStatsRecorder = new FakeStatsRecorder();
private final ClientInterceptor statsInterceptor =
InternalCensusStatsAccessor.getClientInterceptor(
tagger, tagContextBinarySerializer, clientStatsRecorder,
fakeClock.getStopwatchSupplier(), true, true, true,
/* recordRealTimeMetrics= */ true);
private final MethodDescriptor<String, Integer> clientStreamingMethod =
MethodDescriptor.<String, Integer>newBuilder()
.setType(MethodType.CLIENT_STREAMING)
.setFullMethodName("service/method")
.setRequestMarshaller(new StringMarshaller())
.setResponseMarshaller(new IntegerMarshaller())
.build();
private final LinkedBlockingQueue<ServerCall<String, Integer>> serverCalls =
new LinkedBlockingQueue<>();
private final ServerMethodDefinition<String, Integer> methodDefinition =
ServerMethodDefinition.create(
clientStreamingMethod,
new ServerCallHandler<String, Integer>() {
@Override
public Listener<String> startCall(ServerCall<String, Integer> call, Metadata headers) {
serverCalls.offer(call);
return new Listener<String>() {};
}
}
}
);
ServerServiceDefinition serviceDefinition =
ServerServiceDefinition.builder(clientStreamingMethod.getServiceName())
.addMethod(methodDefinition)
.build();
EventLoopGroup group = new DefaultEventLoopGroup();
LocalAddress localAddress = new LocalAddress("RetryTest.retryUntilBufferLimitExceeded");
Server localServer = cleanupRule.register(NettyServerBuilder.forAddress(localAddress)
);
private final ServerServiceDefinition serviceDefinition =
ServerServiceDefinition.builder(clientStreamingMethod.getServiceName())
.addMethod(methodDefinition)
.build();
private final LocalAddress localAddress = new LocalAddress(this.getClass().getName());
private Server localServer;
private ManagedChannel channel;
private Map<String, Object> retryPolicy = null;
private long bufferLimit = 1L << 20; // 1M
private void startNewServer() throws Exception {
localServer = cleanupRule.register(NettyServerBuilder.forAddress(localAddress)
.channelType(LocalServerChannel.class)
.bossEventLoopGroup(group)
.workerEventLoopGroup(group)
.addService(serviceDefinition)
.build());
localServer.start();
}
Map<String, Object> retryPolicy = new HashMap<>();
retryPolicy.put("maxAttempts", 4D);
retryPolicy.put("initialBackoff", "10s");
retryPolicy.put("maxBackoff", "10s");
retryPolicy.put("backoffMultiplier", 1D);
retryPolicy.put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE"));
private void createNewChannel() {
Map<String, Object> methodConfig = new HashMap<>();
Map<String, Object> name = new HashMap<>();
name.put("service", "service");
methodConfig.put("name", Arrays.<Object>asList(name));
methodConfig.put("retryPolicy", retryPolicy);
if (retryPolicy != null) {
methodConfig.put("retryPolicy", retryPolicy);
}
Map<String, Object> rawServiceConfig = new HashMap<>();
rawServiceConfig.put("methodConfig", Arrays.<Object>asList(methodConfig));
ManagedChannel channel = cleanupRule.register(
channel = cleanupRule.register(
NettyChannelBuilder.forAddress(localAddress)
.channelType(LocalChannel.class)
.eventLoopGroup(group)
@ -127,23 +198,100 @@ public class RetryTest {
.enableRetry()
.perRpcBufferLimit(bufferLimit)
.defaultServiceConfig(rawServiceConfig)
.intercept(statsInterceptor)
.build());
}
private void elapseBackoff(long time, TimeUnit unit) throws Exception {
assertThat(backoffLatch.await(5, SECONDS)).isTrue();
backoffLatch = new CountDownLatch(1);
fakeClock.forwardTime(time, unit);
}
private void assertRpcStartedRecorded() throws Exception {
MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT))
.isEqualTo(1);
}
private void assertOutboundMessageRecorded() throws Exception {
MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
assertThat(
record.getMetricAsLongOrFail(
RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD))
.isEqualTo(1);
}
private void assertInboundMessageRecorded() throws Exception {
MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
assertThat(
record.getMetricAsLongOrFail(
RpcMeasureConstants.GRPC_CLIENT_RECEIVED_MESSAGES_PER_METHOD))
.isEqualTo(1);
}
private void assertOutboundWireSizeRecorded(long length) throws Exception {
MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
assertThat(record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD))
.isEqualTo(length);
}
private void assertInboundWireSizeRecorded(long length) throws Exception {
MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
assertThat(
record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_RECEIVED_BYTES_PER_METHOD))
.isEqualTo(length);
}
private void assertRpcStatusRecorded(
Status.Code code, long roundtripLatencyMs, long outboundMessages) throws Exception {
MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
TagValue statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS);
assertThat(statusTag.asString()).isEqualTo(code.toString());
assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT))
.isEqualTo(1);
assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_ROUNDTRIP_LATENCY))
.isEqualTo(roundtripLatencyMs);
assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_REQUEST_COUNT))
.isEqualTo(outboundMessages);
}
private void assertRetryStatsRecorded(
int numRetries, int numTransparentRetries, long retryDelayMs) throws Exception {
MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
assertThat(record.getMetricAsLongOrFail(RETRIES_PER_CALL)).isEqualTo(numRetries);
assertThat(record.getMetricAsLongOrFail(TRANSPARENT_RETRIES_PER_CALL))
.isEqualTo(numTransparentRetries);
assertThat(record.getMetricAsLongOrFail(RETRY_DELAY_PER_CALL)).isEqualTo(retryDelayMs);
}
@Test
public void retryUntilBufferLimitExceeded() throws Exception {
String message = "String of length 20.";
startNewServer();
bufferLimit = message.length() * 2L - 1; // Can buffer no more than 1 message.
retryPolicy = ImmutableMap.<String, Object>builder()
.put("maxAttempts", 4D)
.put("initialBackoff", "10s")
.put("maxBackoff", "10s")
.put("backoffMultiplier", 1D)
.put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE"))
.build();
createNewChannel();
ClientCall<String, Integer> call = channel.newCall(clientStreamingMethod, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
call.sendMessage(message);
ServerCall<String, Integer> serverCall = serverCalls.poll(5, TimeUnit.SECONDS);
ServerCall<String, Integer> serverCall = serverCalls.poll(5, SECONDS);
serverCall.request(2);
// trigger retry
Metadata pushBackMetadata = new Metadata();
pushBackMetadata.put(
Metadata.Key.of("grpc-retry-pushback-ms", Metadata.ASCII_STRING_MARSHALLER),
"0"); // retry immediately
serverCall.close(
Status.UNAVAILABLE.withDescription("original attempt failed"),
pushBackMetadata);
new Metadata());
elapseBackoff(10, SECONDS);
// 2nd attempt received
serverCall = serverCalls.poll(5, TimeUnit.SECONDS);
serverCall = serverCalls.poll(5, SECONDS);
serverCall.request(2);
verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class));
// send one more message, should exceed buffer limit
@ -157,4 +305,146 @@ public class RetryTest {
verify(mockCallListener, timeout(5000)).onClose(statusCaptor.capture(), any(Metadata.class));
assertThat(statusCaptor.getValue().getDescription()).contains("2nd attempt failed");
}
@Test
public void statsRecorded() throws Exception {
startNewServer();
retryPolicy = ImmutableMap.<String, Object>builder()
.put("maxAttempts", 4D)
.put("initialBackoff", "10s")
.put("maxBackoff", "10s")
.put("backoffMultiplier", 1D)
.put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE"))
.build();
createNewChannel();
ClientCall<String, Integer> call = channel.newCall(clientStreamingMethod, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
assertRpcStartedRecorded();
String message = "String of length 20.";
call.sendMessage(message);
assertOutboundMessageRecorded();
ServerCall<String, Integer> serverCall = serverCalls.poll(5, SECONDS);
serverCall.request(2);
assertOutboundWireSizeRecorded(message.length());
// original attempt latency
fakeClock.forwardTime(1, SECONDS);
// trigger retry
serverCall.close(
Status.UNAVAILABLE.withDescription("original attempt failed"),
new Metadata());
assertRpcStatusRecorded(Status.Code.UNAVAILABLE, 1000, 1);
elapseBackoff(10, SECONDS);
assertRpcStartedRecorded();
assertOutboundMessageRecorded();
serverCall = serverCalls.poll(5, SECONDS);
serverCall.request(2);
assertOutboundWireSizeRecorded(message.length());
message = "new message";
call.sendMessage(message);
assertOutboundMessageRecorded();
assertOutboundWireSizeRecorded(message.length());
// retry attempt latency
fakeClock.forwardTime(2, SECONDS);
serverCall.sendHeaders(new Metadata());
serverCall.sendMessage(3);
call.request(1);
assertInboundMessageRecorded();
assertInboundWireSizeRecorded(1);
serverCall.close(Status.OK, new Metadata());
assertRpcStatusRecorded(Status.Code.OK, 2000, 2);
assertRetryStatsRecorded(1, 0, 10_000);
}
@Test
public void serverCancelledAndClientDeadlineExceeded() throws Exception {
startNewServer();
createNewChannel();
class CloseDelayedTracer extends ClientStreamTracer {
@Override
public void streamClosed(Status status) {
fakeClock.forwardTime(10, SECONDS);
}
}
class CloseDelayedTracerFactory extends ClientStreamTracer.InternalLimitedInfoFactory {
@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
return new CloseDelayedTracer();
}
}
CallOptions callOptions = CallOptions.DEFAULT
.withDeadline(Deadline.after(
10,
SECONDS,
new Ticker() {
@Override
public long nanoTime() {
return fakeClock.getTicker().read();
}
}))
.withStreamTracerFactory(new CloseDelayedTracerFactory());
ClientCall<String, Integer> call = channel.newCall(clientStreamingMethod, callOptions);
call.start(mockCallListener, new Metadata());
assertRpcStartedRecorded();
ServerCall<String, Integer> serverCall = serverCalls.poll(5, SECONDS);
serverCall.close(Status.CANCELLED, new Metadata());
assertRpcStatusRecorded(Code.DEADLINE_EXCEEDED, 10_000, 0);
assertRetryStatsRecorded(0, 0, 0);
}
@Ignore("flaky because old transportReportStatus() is not completely migrated yet")
@Test
public void transparentRetryStatsRecorded() throws Exception {
startNewServer();
createNewChannel();
final AtomicBoolean transparentRetryTriggered = new AtomicBoolean();
class TransparentRetryTriggeringTracer extends ClientStreamTracer {
@Override
public void streamCreated(Attributes transportAttrs, Metadata metadata) {
if (transparentRetryTriggered.get()) {
return;
}
localServer.shutdownNow();
}
@Override
public void streamClosed(Status status) {
if (transparentRetryTriggered.get()) {
return;
}
transparentRetryTriggered.set(true);
try {
startNewServer();
channel.resetConnectBackoff();
channel.getState(true);
} catch (Exception e) {
throw new AssertionError("local server can not be restarted", e);
}
}
}
class TransparentRetryTracerFactory extends ClientStreamTracer.InternalLimitedInfoFactory {
@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
return new TransparentRetryTriggeringTracer();
}
}
CallOptions callOptions = CallOptions.DEFAULT
.withWaitForReady()
.withStreamTracerFactory(new TransparentRetryTracerFactory());
ClientCall<String, Integer> call = channel.newCall(clientStreamingMethod, callOptions);
call.start(mockCallListener, new Metadata());
assertRpcStartedRecorded();
assertRpcStatusRecorded(Code.UNAVAILABLE, 0, 0);
assertRpcStartedRecorded();
call.cancel("cancel", null);
assertRpcStatusRecorded(Code.CANCELLED, 0, 0);
assertRetryStatsRecorded(0, 1, 0);
}
}