diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index cdbc9c5083..18093946de 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -44,6 +44,7 @@ import io.grpc.internal.ConnectionClientTransport; import io.grpc.internal.GrpcAttributes; import io.grpc.internal.GrpcUtil; import io.grpc.internal.InUseStateAggregator; +import io.grpc.internal.InsightBuilder; import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.NoopClientStream; import io.grpc.internal.ObjectPool; @@ -806,6 +807,10 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS)); headers.put(TIMEOUT_KEY, effectiveTimeout); } + + @Override + public void appendTimeoutInsight(InsightBuilder insight) { + } } } diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index 5c5072710d..79a775c398 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -26,12 +26,14 @@ import static java.lang.Math.max; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.io.ByteStreams; +import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.Codec; import io.grpc.Compressor; import io.grpc.Deadline; import io.grpc.Decompressor; import io.grpc.DecompressorRegistry; +import io.grpc.Grpc; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.internal.ClientStreamListener.RpcProgress; @@ -217,6 +219,12 @@ public abstract class AbstractClientStream extends AbstractStream return super.isReady() && !cancelled; } + @Override + public final void appendTimeoutInsight(InsightBuilder insight) { + Attributes attrs = getAttributes(); + insight.appendKeyValue("remote_addr", attrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); + } + protected TransportTracer getTransportTracer() { return transportTracer; } diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 9ff010bbe9..33fb363ace 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -265,7 +265,8 @@ final class ClientCallImpl extends ClientCall { } } else { stream = new FailingClientStream( - DEADLINE_EXCEEDED.withDescription("deadline exceeded: " + effectiveDeadline)); + DEADLINE_EXCEEDED.withDescription( + "ClientCall started after deadline exceeded: " + effectiveDeadline)); } if (callOptions.getAuthority() != null) { @@ -347,10 +348,12 @@ final class ClientCallImpl extends ClientCall { @Override public void run() { + InsightBuilder insight = new InsightBuilder(); + stream.appendTimeoutInsight(insight); // DelayedStream.cancel() is safe to call from a thread that is different from where the // stream is created. stream.cancel(DEADLINE_EXCEEDED.augmentDescription( - String.format("deadline exceeded after %dns", remainingNanos))); + "deadline exceeded after " + remainingNanos + "ns. " + insight)); } } @@ -654,7 +657,10 @@ final class ClientCallImpl extends ClientCall { // description. Since our timer may be delayed in firing, we double-check the deadline and // turn the failure into the likely more helpful DEADLINE_EXCEEDED status. if (deadline.isExpired()) { - status = DEADLINE_EXCEEDED; + InsightBuilder insight = new InsightBuilder(); + stream.appendTimeoutInsight(insight); + status = DEADLINE_EXCEEDED.augmentDescription( + "ClientCall was cancelled at or after deadline. " + insight); // Replace trailers to prevent mixing sources of status and trailers. trailers = new Metadata(); } diff --git a/core/src/main/java/io/grpc/internal/ClientStream.java b/core/src/main/java/io/grpc/internal/ClientStream.java index c10c7c93e7..7dba4d7087 100644 --- a/core/src/main/java/io/grpc/internal/ClientStream.java +++ b/core/src/main/java/io/grpc/internal/ClientStream.java @@ -98,4 +98,11 @@ public interface ClientStream extends Stream { * time, although some attributes are there only after a certain point. */ Attributes getAttributes(); + + /** + * Append information that will be included in the locally generated DEADLINE_EXCEEDED errors to + * the given {@link InsightBuilder}, in order to tell the user about the state of the stream so + * that they can better diagnose the cause of the error. + */ + void appendTimeoutInsight(InsightBuilder insight); } diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index c9a8e96b02..944a271658 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -55,6 +55,10 @@ class DelayedStream implements ClientStream { private List pendingCalls = new ArrayList<>(); @GuardedBy("this") private DelayedStreamListener delayedListener; + @GuardedBy("this") + private long startTimeNanos; + @GuardedBy("this") + private long streamSetTimeNanos; @Override public void setMaxInboundMessageSize(final int maxSize) { @@ -94,6 +98,22 @@ class DelayedStream implements ClientStream { }); } + @Override + public void appendTimeoutInsight(InsightBuilder insight) { + synchronized (this) { + if (listener == null) { + return; + } + if (realStream != null) { + insight.appendKeyValue("buffered_nanos", streamSetTimeNanos - startTimeNanos); + realStream.appendTimeoutInsight(insight); + } else { + insight.appendKeyValue("buffered_nanos", System.nanoTime() - startTimeNanos); + insight.append("waiting_for_connection"); + } + } + } + /** * Transfers all pending and future requests and mutations to the given stream. * @@ -106,7 +126,7 @@ class DelayedStream implements ClientStream { if (realStream != null) { return; } - realStream = checkNotNull(stream, "stream"); + setRealStream(checkNotNull(stream, "stream")); } drainPendingCalls(); @@ -192,6 +212,7 @@ class DelayedStream implements ClientStream { if (!savedPassThrough) { listener = delayedListener = new DelayedStreamListener(listener); } + startTimeNanos = System.nanoTime(); } if (savedError != null) { listener.closed(savedError, new Metadata()); @@ -262,9 +283,8 @@ class DelayedStream implements ClientStream { synchronized (this) { // If realStream != null, then either setStream() or cancel() has been called if (realStream == null) { - realStream = NoopClientStream.INSTANCE; + setRealStream(NoopClientStream.INSTANCE); delegateToRealStream = false; - // If listener == null, then start() will later call listener with 'error' listenerToClose = listener; error = reason; @@ -285,6 +305,13 @@ class DelayedStream implements ClientStream { } } + @GuardedBy("this") + private void setRealStream(ClientStream realStream) { + checkState(this.realStream == null, "realStream already set to %s", this.realStream); + this.realStream = realStream; + streamSetTimeNanos = System.nanoTime(); + } + @Override public void halfClose() { delayOrExecute(new Runnable() { diff --git a/core/src/main/java/io/grpc/internal/FailingClientStream.java b/core/src/main/java/io/grpc/internal/FailingClientStream.java index 798779fa42..6d368b6975 100644 --- a/core/src/main/java/io/grpc/internal/FailingClientStream.java +++ b/core/src/main/java/io/grpc/internal/FailingClientStream.java @@ -58,4 +58,9 @@ public final class FailingClientStream extends NoopClientStream { Status getError() { return error; } + + @Override + public void appendTimeoutInsight(InsightBuilder insight) { + insight.appendKeyValue("error", error).appendKeyValue("progress", rpcProgress); + } } diff --git a/core/src/main/java/io/grpc/internal/ForwardingClientStream.java b/core/src/main/java/io/grpc/internal/ForwardingClientStream.java index b1d25d5572..d46b0a52f3 100644 --- a/core/src/main/java/io/grpc/internal/ForwardingClientStream.java +++ b/core/src/main/java/io/grpc/internal/ForwardingClientStream.java @@ -111,4 +111,9 @@ abstract class ForwardingClientStream implements ClientStream { public String toString() { return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString(); } + + @Override + public void appendTimeoutInsight(InsightBuilder insight) { + delegate().appendTimeoutInsight(insight); + } } diff --git a/core/src/main/java/io/grpc/internal/InsightBuilder.java b/core/src/main/java/io/grpc/internal/InsightBuilder.java new file mode 100644 index 0000000000..94fe44275c --- /dev/null +++ b/core/src/main/java/io/grpc/internal/InsightBuilder.java @@ -0,0 +1,55 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import java.util.ArrayList; +import javax.annotation.Nullable; + +/** + * Builds a concise and readable string that gives insight of the concerned part of the system. The + * resulted string is made up of a list of short strings, each of which gives out a piece of + * information. + */ +public final class InsightBuilder { + private final ArrayList buffer = new ArrayList(); + + /** + * Appends a piece of information which is a plain string. The given object is immediately + * converted to string and recorded. + */ + public InsightBuilder append(@Nullable Object insight) { + buffer.add(String.valueOf(insight)); + return this; + } + + /** + * Appends a piece of information which is a key-value , which will be formatted into {@code + * "key=value"}. Value's {@code toString()} or {@code null} is immediately recorded. + */ + public InsightBuilder appendKeyValue(String key, @Nullable Object value) { + buffer.add(key + "=" + value); + return this; + } + + /** + * Get the resulting string. + */ + @Override + public String toString() { + return buffer.toString(); + } +} diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 2983957113..9a7822aa1c 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -480,7 +480,6 @@ final class InternalSubchannel implements InternalInstrumented, Tr return channelStatsFuture; } - @VisibleForTesting ConnectivityState getState() { return state.getState(); } diff --git a/core/src/main/java/io/grpc/internal/NoopClientStream.java b/core/src/main/java/io/grpc/internal/NoopClientStream.java index 7e9d9272ec..c5ea847cb2 100644 --- a/core/src/main/java/io/grpc/internal/NoopClientStream.java +++ b/core/src/main/java/io/grpc/internal/NoopClientStream.java @@ -83,4 +83,9 @@ public class NoopClientStream implements ClientStream { @Override public void setDeadline(@Nonnull Deadline deadline) {} + + @Override + public void appendTimeoutInsight(InsightBuilder insight) { + insight.append("noop"); + } } diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index ce7e47a79c..9ca2280f6b 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -79,6 +79,8 @@ abstract class RetriableStream implements ClientStream { private final long channelBufferLimit; @Nullable private final Throttle throttle; + @GuardedBy("lock") + private final InsightBuilder closedSubstreamsInsight = new InsightBuilder(); private volatile State state = new State( new ArrayList(8), Collections.emptyList(), null, null, false, false, @@ -642,6 +644,37 @@ abstract class RetriableStream implements ClientStream { return Attributes.EMPTY; } + @Override + public void appendTimeoutInsight(InsightBuilder insight) { + State currentState; + synchronized (lock) { + insight.appendKeyValue("closed", closedSubstreamsInsight); + currentState = state; + } + if (currentState.winningSubstream != null) { + // TODO(zhangkun83): in this case while other drained substreams have been cancelled in favor + // of the winning substream, they may not have received closed() notifications yet, thus they + // may be missing from closedSubstreamsInsight. This may be a little confusing to the user. + // We need to figure out how to include them. + InsightBuilder substreamInsight = new InsightBuilder(); + currentState.winningSubstream.stream.appendTimeoutInsight(substreamInsight); + insight.appendKeyValue("committed", substreamInsight); + } else { + InsightBuilder openSubstreamsInsight = new InsightBuilder(); + // drainedSubstreams doesn't include all open substreams. Those which have just been created + // and are still catching up with buffered requests (in other words, still draining) will not + // show up. We think this is benign, because the draining should be typically fast, and it'd + // be indistinguishable from the case where those streams are to be created a little late due + // to delays in the timer. + for (Substream sub : currentState.drainedSubstreams) { + InsightBuilder substreamInsight = new InsightBuilder(); + sub.stream.appendTimeoutInsight(substreamInsight); + openSubstreamsInsight.append(substreamInsight); + } + insight.appendKeyValue("open", openSubstreamsInsight); + } + } + private static Random random = new Random(); @VisibleForTesting @@ -709,6 +742,7 @@ abstract class RetriableStream implements ClientStream { public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { synchronized (lock) { state = state.substreamClosed(substream); + closedSubstreamsInsight.append(status.getCode()); } // handle a race between buffer limit exceeded and closed, when setting @@ -913,8 +947,8 @@ abstract class RetriableStream implements ClientStream { @Nullable final List buffer; /** - * Unmodifiable collection of all the substreams that are drained. Exceptional cases: Singleton - * once passThrough; Empty if committed but not passTrough. + * Unmodifiable collection of all the open substreams that are drained. Singleton once + * passThrough; Empty if committed but not passTrough. */ final Collection drainedSubstreams; diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java index c685c89587..7ce6b42116 100644 --- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java @@ -39,6 +39,7 @@ import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.Codec; import io.grpc.Deadline; +import io.grpc.Grpc; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.Status.Code; @@ -50,6 +51,7 @@ import io.grpc.internal.testing.TestClientStreamTracer; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.net.SocketAddress; import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Rule; @@ -77,6 +79,13 @@ public class AbstractClientStreamTest { private final StatsTraceContext statsTraceCtx = StatsTraceContext.NOOP; private final TransportTracer transportTracer = new TransportTracer(); + private static final SocketAddress SERVER_ADDR = new SocketAddress() { + @Override + public String toString() { + return "fake_server_addr"; + } + }; + @Mock private ClientStreamListener mockListener; @Before @@ -464,6 +473,15 @@ public class AbstractClientStreamTest { .isGreaterThan(TimeUnit.MILLISECONDS.toNanos(600)); } + @Test + public void appendTimeoutInsight() { + InsightBuilder insight = new InsightBuilder(); + AbstractClientStream stream = + new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer); + stream.appendTimeoutInsight(insight); + assertThat(insight.toString()).isEqualTo("[remote_addr=fake_server_addr]"); + } + /** * No-op base class for testing. */ @@ -525,7 +543,7 @@ public class AbstractClientStreamTest { @Override public Attributes getAttributes() { - return Attributes.EMPTY; + return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, SERVER_ADDR).build(); } } diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index 3736699dbe..a27ce0a00c 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -28,6 +28,7 @@ import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; @@ -78,8 +79,10 @@ import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Captor; import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; +import org.mockito.stubbing.Answer; /** * Test for {@link ClientCallImpl}. @@ -136,6 +139,14 @@ public class ClientCallImplTest { when(transport.newStream( any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) .thenReturn(stream); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock in) { + InsightBuilder insight = (InsightBuilder) in.getArguments()[0]; + insight.appendKeyValue("remote_addr", "127.0.0.1:443"); + return null; + } + }).when(stream).appendTimeoutInsight(any(InsightBuilder.class)); baseCallOptions = CallOptions.DEFAULT.withStreamTracerFactory(streamTracerFactory); } @@ -676,6 +687,8 @@ public class ClientCallImplTest { .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); verify(callListener, timeout(1000)).onClose(statusCaptor.capture(), any(Metadata.class)); assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode()); + assertThat(statusCaptor.getValue().getDescription()) + .startsWith("ClientCall started after deadline exceeded"); verifyZeroInteractions(provider); } @@ -807,6 +820,8 @@ public class ClientCallImplTest { verify(stream, times(1)).cancel(statusCaptor.capture()); assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode()); + assertThat(statusCaptor.getValue().getDescription()) + .matches("deadline exceeded after [0-9]+ns. \\[remote_addr=127\\.0\\.0\\.1:443\\]"); } @Test @@ -834,6 +849,7 @@ public class ClientCallImplTest { verify(stream, times(1)).cancel(statusCaptor.capture()); assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode()); + assertThat(statusCaptor.getValue().getDescription()).isEqualTo("context timed out"); } @Test diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java index 884553f79b..564b7e59ce 100644 --- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java @@ -16,12 +16,14 @@ package io.grpc.internal; +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -47,8 +49,10 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; +import org.mockito.stubbing.Answer; /** * Tests for {@link DelayedStream}. Most of the state checking is enforced by @@ -341,4 +345,38 @@ public class DelayedStreamTest { delayedListener.closed(status, trailers); verify(listener).closed(status, trailers); } + + @Test + public void appendTimeoutInsight_notStarted() { + InsightBuilder insight = new InsightBuilder(); + stream.appendTimeoutInsight(insight); + assertThat(insight.toString()).isEqualTo("[]"); + } + + @Test + public void appendTimeoutInsight_realStreamNotSet() { + InsightBuilder insight = new InsightBuilder(); + stream.start(listener); + stream.appendTimeoutInsight(insight); + assertThat(insight.toString()).matches("\\[buffered_nanos=[0-9]+\\, waiting_for_connection]"); + } + + @Test + public void appendTimeoutInsight_realStreamSet() { + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock in) { + InsightBuilder insight = (InsightBuilder) in.getArguments()[0]; + insight.appendKeyValue("remote_addr", "127.0.0.1:443"); + return null; + } + }).when(realStream).appendTimeoutInsight(any(InsightBuilder.class)); + stream.start(listener); + stream.setStream(realStream); + + InsightBuilder insight = new InsightBuilder(); + stream.appendTimeoutInsight(insight); + assertThat(insight.toString()) + .matches("\\[buffered_nanos=[0-9]+, remote_addr=127\\.0\\.0\\.1:443\\]"); + } } diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index cd93ae54dc..77c5fe7be8 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -30,6 +30,7 @@ import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -69,6 +70,8 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** Unit tests for {@link RetriableStream}. */ @RunWith(JUnit4.class) @@ -212,7 +215,17 @@ public class RetriableStreamTest { ClientStream mockStream1 = mock(ClientStream.class); ClientStream mockStream2 = mock(ClientStream.class); ClientStream mockStream3 = mock(ClientStream.class); + doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock in) { + InsightBuilder insight = (InsightBuilder) in.getArguments()[0]; + insight.appendKeyValue("remote_addr", "2.2.2.2:443"); + return null; + } + }).when(mockStream3).appendTimeoutInsight(any(InsightBuilder.class)); + InOrder inOrder = inOrder(retriableStreamRecorder, masterListener, mockStream1, mockStream2, mockStream3); @@ -348,6 +361,11 @@ public class RetriableStreamTest { inOrder.verify(mockStream3, times(7)).writeMessage(any(InputStream.class)); inOrder.verifyNoMoreInteractions(); + InsightBuilder insight = new InsightBuilder(); + retriableStream.appendTimeoutInsight(insight); + assertThat(insight.toString()).isEqualTo( + "[closed=[DATA_LOSS, UNAVAILABLE], open=[[remote_addr=2.2.2.2:443]]]"); + // no more retry sublistenerCaptor3.getValue().closed( Status.fromCode(NON_RETRIABLE_STATUS_CODE), new Metadata()); @@ -355,6 +373,11 @@ public class RetriableStreamTest { inOrder.verify(retriableStreamRecorder).postCommit(); inOrder.verify(masterListener).closed(any(Status.class), any(Metadata.class)); inOrder.verifyNoMoreInteractions(); + + insight = new InsightBuilder(); + retriableStream.appendTimeoutInsight(insight); + assertThat(insight.toString()).isEqualTo( + "[closed=[DATA_LOSS, UNAVAILABLE, INTERNAL], committed=[remote_addr=2.2.2.2:443]]"); } @Test @@ -1609,10 +1632,22 @@ public class RetriableStreamTest { ClientStream mockStream2 = mock(ClientStream.class); ClientStream mockStream3 = mock(ClientStream.class); ClientStream mockStream4 = mock(ClientStream.class); - doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); - doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); - doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(2); - doReturn(mockStream4).when(retriableStreamRecorder).newSubstream(3); + ClientStream[] mockStreams = + new ClientStream[]{mockStream1, mockStream2, mockStream3, mockStream4}; + + for (int i = 0; i < mockStreams.length; i++) { + doReturn(mockStreams[i]).when(retriableStreamRecorder).newSubstream(i); + final int fakePort = 80 + i; + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock in) { + InsightBuilder insight = (InsightBuilder) in.getArguments()[0]; + insight.appendKeyValue("remote_addr", "2.2.2.2:" + fakePort); + return null; + } + }).when(mockStreams[i]).appendTimeoutInsight(any(InsightBuilder.class)); + } + InOrder inOrder = inOrder( retriableStreamRecorder, masterListener, @@ -1772,6 +1807,12 @@ public class RetriableStreamTest { inOrder.verify(mockStream4, times(4)).writeMessage(any(InputStream.class)); inOrder.verifyNoMoreInteractions(); + InsightBuilder insight = new InsightBuilder(); + hedgingStream.appendTimeoutInsight(insight); + assertThat(insight.toString()).isEqualTo( + "[closed=[UNAVAILABLE], " + + "open=[[remote_addr=2.2.2.2:80], [remote_addr=2.2.2.2:81], [remote_addr=2.2.2.2:83]]]"); + // commit sublistenerCaptor2.getValue().closed( Status.fromCode(FATAL_STATUS_CODE), new Metadata()); @@ -1786,6 +1827,11 @@ public class RetriableStreamTest { inOrder.verify(retriableStreamRecorder).postCommit(); inOrder.verify(masterListener).closed(any(Status.class), any(Metadata.class)); inOrder.verifyNoMoreInteractions(); + + insight = new InsightBuilder(); + hedgingStream.appendTimeoutInsight(insight); + assertThat(insight.toString()).isEqualTo( + "[closed=[UNAVAILABLE, INTERNAL], committed=[remote_addr=2.2.2.2:81]]"); } @Test diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index 55e4717a49..be567c91af 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -118,6 +118,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.regex.Pattern; import javax.annotation.Nullable; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSession; @@ -1081,7 +1082,7 @@ public abstract class AbstractInteropTest { // warm up the channel and JVM blockingStub.emptyCall(Empty.getDefaultInstance()); TestServiceGrpc.TestServiceBlockingStub stub = - blockingStub.withDeadlineAfter(10, TimeUnit.MILLISECONDS); + blockingStub.withDeadlineAfter(100, TimeUnit.MILLISECONDS); StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() .addResponseParameters(ResponseParameters.newBuilder() .setIntervalUs((int) TimeUnit.SECONDS.toMicros(20))) @@ -1091,6 +1092,14 @@ public abstract class AbstractInteropTest { fail("Expected deadline to be exceeded"); } catch (StatusRuntimeException ex) { assertEquals(Status.DEADLINE_EXCEEDED.getCode(), ex.getStatus().getCode()); + String desc = ex.getStatus().getDescription(); + assertTrue(desc, + // There is a race between client and server-side deadline expiration. + // If client expires first, it'd generate this message + Pattern.matches("deadline exceeded after .*ns. \\[.*\\]", desc) + // If server expires first, it'd reset the stream and client would generate a different + // message + || desc.startsWith("ClientCall was cancelled at or after deadline.")); } assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK); @@ -1155,6 +1164,8 @@ public abstract class AbstractInteropTest { fail("Should have thrown"); } catch (StatusRuntimeException ex) { assertEquals(Status.Code.DEADLINE_EXCEEDED, ex.getStatus().getCode()); + assertThat(ex.getStatus().getDescription()) + .startsWith("ClientCall started after deadline exceeded"); } // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be @@ -1178,6 +1189,8 @@ public abstract class AbstractInteropTest { fail("Should have thrown"); } catch (StatusRuntimeException ex) { assertEquals(Status.Code.DEADLINE_EXCEEDED, ex.getStatus().getCode()); + assertThat(ex.getStatus().getDescription()) + .startsWith("ClientCall started after deadline exceeded"); } assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK); if (metricsExpected()) {