core: attach debug information about stream to DEADLINE_EXCEEDED (#5892)

Works for #4740 

- Subclasses of `AbstractClientStream` include remote address in insight if available.
- `DelayedStream` adds buffered time, and the insight of real stream if it's set.
- `RetriableStream` insights outputs of Substreams.

Example error message:
```
deadline exceeded after 8112071ns. [buffered_nanos=24763, remote_addr=foo.test.google.fr/127.0.0.1:44749]
```
or
```
deadline exceeded after 8112071ns. [buffered_nanos=22344324763, waiting_for_connection]
```

This is related to #4776 but taking a more usage-specific approach.
This commit is contained in:
Kun Zhang 2019-06-19 17:30:44 -07:00 committed by GitHub
parent 74e945ceb4
commit ddbaf743cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 302 additions and 15 deletions

View File

@ -44,6 +44,7 @@ import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.GrpcAttributes; import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.InUseStateAggregator; import io.grpc.internal.InUseStateAggregator;
import io.grpc.internal.InsightBuilder;
import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.NoopClientStream; import io.grpc.internal.NoopClientStream;
import io.grpc.internal.ObjectPool; import io.grpc.internal.ObjectPool;
@ -806,6 +807,10 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS)); long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
headers.put(TIMEOUT_KEY, effectiveTimeout); headers.put(TIMEOUT_KEY, effectiveTimeout);
} }
@Override
public void appendTimeoutInsight(InsightBuilder insight) {
}
} }
} }

View File

@ -26,12 +26,14 @@ import static java.lang.Math.max;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import io.grpc.Attributes;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.grpc.Codec; import io.grpc.Codec;
import io.grpc.Compressor; import io.grpc.Compressor;
import io.grpc.Deadline; import io.grpc.Deadline;
import io.grpc.Decompressor; import io.grpc.Decompressor;
import io.grpc.DecompressorRegistry; import io.grpc.DecompressorRegistry;
import io.grpc.Grpc;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.internal.ClientStreamListener.RpcProgress; import io.grpc.internal.ClientStreamListener.RpcProgress;
@ -217,6 +219,12 @@ public abstract class AbstractClientStream extends AbstractStream
return super.isReady() && !cancelled; return super.isReady() && !cancelled;
} }
@Override
public final void appendTimeoutInsight(InsightBuilder insight) {
Attributes attrs = getAttributes();
insight.appendKeyValue("remote_addr", attrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
}
protected TransportTracer getTransportTracer() { protected TransportTracer getTransportTracer() {
return transportTracer; return transportTracer;
} }

View File

@ -265,7 +265,8 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
} }
} else { } else {
stream = new FailingClientStream( stream = new FailingClientStream(
DEADLINE_EXCEEDED.withDescription("deadline exceeded: " + effectiveDeadline)); DEADLINE_EXCEEDED.withDescription(
"ClientCall started after deadline exceeded: " + effectiveDeadline));
} }
if (callOptions.getAuthority() != null) { if (callOptions.getAuthority() != null) {
@ -347,10 +348,12 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override @Override
public void run() { public void run() {
InsightBuilder insight = new InsightBuilder();
stream.appendTimeoutInsight(insight);
// DelayedStream.cancel() is safe to call from a thread that is different from where the // DelayedStream.cancel() is safe to call from a thread that is different from where the
// stream is created. // stream is created.
stream.cancel(DEADLINE_EXCEEDED.augmentDescription( stream.cancel(DEADLINE_EXCEEDED.augmentDescription(
String.format("deadline exceeded after %dns", remainingNanos))); "deadline exceeded after " + remainingNanos + "ns. " + insight));
} }
} }
@ -654,7 +657,10 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
// description. Since our timer may be delayed in firing, we double-check the deadline and // description. Since our timer may be delayed in firing, we double-check the deadline and
// turn the failure into the likely more helpful DEADLINE_EXCEEDED status. // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
if (deadline.isExpired()) { if (deadline.isExpired()) {
status = DEADLINE_EXCEEDED; InsightBuilder insight = new InsightBuilder();
stream.appendTimeoutInsight(insight);
status = DEADLINE_EXCEEDED.augmentDescription(
"ClientCall was cancelled at or after deadline. " + insight);
// Replace trailers to prevent mixing sources of status and trailers. // Replace trailers to prevent mixing sources of status and trailers.
trailers = new Metadata(); trailers = new Metadata();
} }

View File

@ -98,4 +98,11 @@ public interface ClientStream extends Stream {
* time, although some attributes are there only after a certain point. * time, although some attributes are there only after a certain point.
*/ */
Attributes getAttributes(); Attributes getAttributes();
/**
* Append information that will be included in the locally generated DEADLINE_EXCEEDED errors to
* the given {@link InsightBuilder}, in order to tell the user about the state of the stream so
* that they can better diagnose the cause of the error.
*/
void appendTimeoutInsight(InsightBuilder insight);
} }

View File

@ -55,6 +55,10 @@ class DelayedStream implements ClientStream {
private List<Runnable> pendingCalls = new ArrayList<>(); private List<Runnable> pendingCalls = new ArrayList<>();
@GuardedBy("this") @GuardedBy("this")
private DelayedStreamListener delayedListener; private DelayedStreamListener delayedListener;
@GuardedBy("this")
private long startTimeNanos;
@GuardedBy("this")
private long streamSetTimeNanos;
@Override @Override
public void setMaxInboundMessageSize(final int maxSize) { public void setMaxInboundMessageSize(final int maxSize) {
@ -94,6 +98,22 @@ class DelayedStream implements ClientStream {
}); });
} }
@Override
public void appendTimeoutInsight(InsightBuilder insight) {
synchronized (this) {
if (listener == null) {
return;
}
if (realStream != null) {
insight.appendKeyValue("buffered_nanos", streamSetTimeNanos - startTimeNanos);
realStream.appendTimeoutInsight(insight);
} else {
insight.appendKeyValue("buffered_nanos", System.nanoTime() - startTimeNanos);
insight.append("waiting_for_connection");
}
}
}
/** /**
* Transfers all pending and future requests and mutations to the given stream. * Transfers all pending and future requests and mutations to the given stream.
* *
@ -106,7 +126,7 @@ class DelayedStream implements ClientStream {
if (realStream != null) { if (realStream != null) {
return; return;
} }
realStream = checkNotNull(stream, "stream"); setRealStream(checkNotNull(stream, "stream"));
} }
drainPendingCalls(); drainPendingCalls();
@ -192,6 +212,7 @@ class DelayedStream implements ClientStream {
if (!savedPassThrough) { if (!savedPassThrough) {
listener = delayedListener = new DelayedStreamListener(listener); listener = delayedListener = new DelayedStreamListener(listener);
} }
startTimeNanos = System.nanoTime();
} }
if (savedError != null) { if (savedError != null) {
listener.closed(savedError, new Metadata()); listener.closed(savedError, new Metadata());
@ -262,9 +283,8 @@ class DelayedStream implements ClientStream {
synchronized (this) { synchronized (this) {
// If realStream != null, then either setStream() or cancel() has been called // If realStream != null, then either setStream() or cancel() has been called
if (realStream == null) { if (realStream == null) {
realStream = NoopClientStream.INSTANCE; setRealStream(NoopClientStream.INSTANCE);
delegateToRealStream = false; delegateToRealStream = false;
// If listener == null, then start() will later call listener with 'error' // If listener == null, then start() will later call listener with 'error'
listenerToClose = listener; listenerToClose = listener;
error = reason; error = reason;
@ -285,6 +305,13 @@ class DelayedStream implements ClientStream {
} }
} }
@GuardedBy("this")
private void setRealStream(ClientStream realStream) {
checkState(this.realStream == null, "realStream already set to %s", this.realStream);
this.realStream = realStream;
streamSetTimeNanos = System.nanoTime();
}
@Override @Override
public void halfClose() { public void halfClose() {
delayOrExecute(new Runnable() { delayOrExecute(new Runnable() {

View File

@ -58,4 +58,9 @@ public final class FailingClientStream extends NoopClientStream {
Status getError() { Status getError() {
return error; return error;
} }
@Override
public void appendTimeoutInsight(InsightBuilder insight) {
insight.appendKeyValue("error", error).appendKeyValue("progress", rpcProgress);
}
} }

View File

@ -111,4 +111,9 @@ abstract class ForwardingClientStream implements ClientStream {
public String toString() { public String toString() {
return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString(); return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
} }
@Override
public void appendTimeoutInsight(InsightBuilder insight) {
delegate().appendTimeoutInsight(insight);
}
} }

View File

@ -0,0 +1,55 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import java.util.ArrayList;
import javax.annotation.Nullable;
/**
* Builds a concise and readable string that gives insight of the concerned part of the system. The
* resulted string is made up of a list of short strings, each of which gives out a piece of
* information.
*/
public final class InsightBuilder {
private final ArrayList<String> buffer = new ArrayList<String>();
/**
* Appends a piece of information which is a plain string. The given object is immediately
* converted to string and recorded.
*/
public InsightBuilder append(@Nullable Object insight) {
buffer.add(String.valueOf(insight));
return this;
}
/**
* Appends a piece of information which is a key-value , which will be formatted into {@code
* "key=value"}. Value's {@code toString()} or {@code null} is immediately recorded.
*/
public InsightBuilder appendKeyValue(String key, @Nullable Object value) {
buffer.add(key + "=" + value);
return this;
}
/**
* Get the resulting string.
*/
@Override
public String toString() {
return buffer.toString();
}
}

View File

@ -480,7 +480,6 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
return channelStatsFuture; return channelStatsFuture;
} }
@VisibleForTesting
ConnectivityState getState() { ConnectivityState getState() {
return state.getState(); return state.getState();
} }

View File

@ -83,4 +83,9 @@ public class NoopClientStream implements ClientStream {
@Override @Override
public void setDeadline(@Nonnull Deadline deadline) {} public void setDeadline(@Nonnull Deadline deadline) {}
@Override
public void appendTimeoutInsight(InsightBuilder insight) {
insight.append("noop");
}
} }

View File

@ -79,6 +79,8 @@ abstract class RetriableStream<ReqT> implements ClientStream {
private final long channelBufferLimit; private final long channelBufferLimit;
@Nullable @Nullable
private final Throttle throttle; private final Throttle throttle;
@GuardedBy("lock")
private final InsightBuilder closedSubstreamsInsight = new InsightBuilder();
private volatile State state = new State( private volatile State state = new State(
new ArrayList<BufferEntry>(8), Collections.<Substream>emptyList(), null, null, false, false, new ArrayList<BufferEntry>(8), Collections.<Substream>emptyList(), null, null, false, false,
@ -642,6 +644,37 @@ abstract class RetriableStream<ReqT> implements ClientStream {
return Attributes.EMPTY; return Attributes.EMPTY;
} }
@Override
public void appendTimeoutInsight(InsightBuilder insight) {
State currentState;
synchronized (lock) {
insight.appendKeyValue("closed", closedSubstreamsInsight);
currentState = state;
}
if (currentState.winningSubstream != null) {
// TODO(zhangkun83): in this case while other drained substreams have been cancelled in favor
// of the winning substream, they may not have received closed() notifications yet, thus they
// may be missing from closedSubstreamsInsight. This may be a little confusing to the user.
// We need to figure out how to include them.
InsightBuilder substreamInsight = new InsightBuilder();
currentState.winningSubstream.stream.appendTimeoutInsight(substreamInsight);
insight.appendKeyValue("committed", substreamInsight);
} else {
InsightBuilder openSubstreamsInsight = new InsightBuilder();
// drainedSubstreams doesn't include all open substreams. Those which have just been created
// and are still catching up with buffered requests (in other words, still draining) will not
// show up. We think this is benign, because the draining should be typically fast, and it'd
// be indistinguishable from the case where those streams are to be created a little late due
// to delays in the timer.
for (Substream sub : currentState.drainedSubstreams) {
InsightBuilder substreamInsight = new InsightBuilder();
sub.stream.appendTimeoutInsight(substreamInsight);
openSubstreamsInsight.append(substreamInsight);
}
insight.appendKeyValue("open", openSubstreamsInsight);
}
}
private static Random random = new Random(); private static Random random = new Random();
@VisibleForTesting @VisibleForTesting
@ -709,6 +742,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
synchronized (lock) { synchronized (lock) {
state = state.substreamClosed(substream); state = state.substreamClosed(substream);
closedSubstreamsInsight.append(status.getCode());
} }
// handle a race between buffer limit exceeded and closed, when setting // handle a race between buffer limit exceeded and closed, when setting
@ -913,8 +947,8 @@ abstract class RetriableStream<ReqT> implements ClientStream {
@Nullable final List<BufferEntry> buffer; @Nullable final List<BufferEntry> buffer;
/** /**
* Unmodifiable collection of all the substreams that are drained. Exceptional cases: Singleton * Unmodifiable collection of all the open substreams that are drained. Singleton once
* once passThrough; Empty if committed but not passTrough. * passThrough; Empty if committed but not passTrough.
*/ */
final Collection<Substream> drainedSubstreams; final Collection<Substream> drainedSubstreams;

View File

@ -39,6 +39,7 @@ import io.grpc.Attributes;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.grpc.Codec; import io.grpc.Codec;
import io.grpc.Deadline; import io.grpc.Deadline;
import io.grpc.Grpc;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.Status.Code; import io.grpc.Status.Code;
@ -50,6 +51,7 @@ import io.grpc.internal.testing.TestClientStreamTracer;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
@ -77,6 +79,13 @@ public class AbstractClientStreamTest {
private final StatsTraceContext statsTraceCtx = StatsTraceContext.NOOP; private final StatsTraceContext statsTraceCtx = StatsTraceContext.NOOP;
private final TransportTracer transportTracer = new TransportTracer(); private final TransportTracer transportTracer = new TransportTracer();
private static final SocketAddress SERVER_ADDR = new SocketAddress() {
@Override
public String toString() {
return "fake_server_addr";
}
};
@Mock private ClientStreamListener mockListener; @Mock private ClientStreamListener mockListener;
@Before @Before
@ -464,6 +473,15 @@ public class AbstractClientStreamTest {
.isGreaterThan(TimeUnit.MILLISECONDS.toNanos(600)); .isGreaterThan(TimeUnit.MILLISECONDS.toNanos(600));
} }
@Test
public void appendTimeoutInsight() {
InsightBuilder insight = new InsightBuilder();
AbstractClientStream stream =
new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer);
stream.appendTimeoutInsight(insight);
assertThat(insight.toString()).isEqualTo("[remote_addr=fake_server_addr]");
}
/** /**
* No-op base class for testing. * No-op base class for testing.
*/ */
@ -525,7 +543,7 @@ public class AbstractClientStreamTest {
@Override @Override
public Attributes getAttributes() { public Attributes getAttributes() {
return Attributes.EMPTY; return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, SERVER_ADDR).build();
} }
} }

View File

@ -28,6 +28,7 @@ import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same; import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.timeout;
@ -78,8 +79,10 @@ import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers; import org.mockito.ArgumentMatchers;
import org.mockito.Captor; import org.mockito.Captor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule; import org.mockito.junit.MockitoRule;
import org.mockito.stubbing.Answer;
/** /**
* Test for {@link ClientCallImpl}. * Test for {@link ClientCallImpl}.
@ -136,6 +139,14 @@ public class ClientCallImplTest {
when(transport.newStream( when(transport.newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
.thenReturn(stream); .thenReturn(stream);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock in) {
InsightBuilder insight = (InsightBuilder) in.getArguments()[0];
insight.appendKeyValue("remote_addr", "127.0.0.1:443");
return null;
}
}).when(stream).appendTimeoutInsight(any(InsightBuilder.class));
baseCallOptions = CallOptions.DEFAULT.withStreamTracerFactory(streamTracerFactory); baseCallOptions = CallOptions.DEFAULT.withStreamTracerFactory(streamTracerFactory);
} }
@ -676,6 +687,8 @@ public class ClientCallImplTest {
.newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
verify(callListener, timeout(1000)).onClose(statusCaptor.capture(), any(Metadata.class)); verify(callListener, timeout(1000)).onClose(statusCaptor.capture(), any(Metadata.class));
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode()); assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
assertThat(statusCaptor.getValue().getDescription())
.startsWith("ClientCall started after deadline exceeded");
verifyZeroInteractions(provider); verifyZeroInteractions(provider);
} }
@ -807,6 +820,8 @@ public class ClientCallImplTest {
verify(stream, times(1)).cancel(statusCaptor.capture()); verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode()); assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
assertThat(statusCaptor.getValue().getDescription())
.matches("deadline exceeded after [0-9]+ns. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
} }
@Test @Test
@ -834,6 +849,7 @@ public class ClientCallImplTest {
verify(stream, times(1)).cancel(statusCaptor.capture()); verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode()); assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
assertThat(statusCaptor.getValue().getDescription()).isEqualTo("context timed out");
} }
@Test @Test

View File

@ -16,12 +16,14 @@
package io.grpc.internal; package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same; import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -47,8 +49,10 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Captor; import org.mockito.Captor;
import org.mockito.InOrder; import org.mockito.InOrder;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule; import org.mockito.junit.MockitoRule;
import org.mockito.stubbing.Answer;
/** /**
* Tests for {@link DelayedStream}. Most of the state checking is enforced by * Tests for {@link DelayedStream}. Most of the state checking is enforced by
@ -341,4 +345,38 @@ public class DelayedStreamTest {
delayedListener.closed(status, trailers); delayedListener.closed(status, trailers);
verify(listener).closed(status, trailers); verify(listener).closed(status, trailers);
} }
@Test
public void appendTimeoutInsight_notStarted() {
InsightBuilder insight = new InsightBuilder();
stream.appendTimeoutInsight(insight);
assertThat(insight.toString()).isEqualTo("[]");
}
@Test
public void appendTimeoutInsight_realStreamNotSet() {
InsightBuilder insight = new InsightBuilder();
stream.start(listener);
stream.appendTimeoutInsight(insight);
assertThat(insight.toString()).matches("\\[buffered_nanos=[0-9]+\\, waiting_for_connection]");
}
@Test
public void appendTimeoutInsight_realStreamSet() {
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock in) {
InsightBuilder insight = (InsightBuilder) in.getArguments()[0];
insight.appendKeyValue("remote_addr", "127.0.0.1:443");
return null;
}
}).when(realStream).appendTimeoutInsight(any(InsightBuilder.class));
stream.start(listener);
stream.setStream(realStream);
InsightBuilder insight = new InsightBuilder();
stream.appendTimeoutInsight(insight);
assertThat(insight.toString())
.matches("\\[buffered_nanos=[0-9]+, remote_addr=127\\.0\\.0\\.1:443\\]");
}
} }

View File

@ -30,6 +30,7 @@ import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.same; import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -69,6 +70,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.InOrder; import org.mockito.InOrder;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/** Unit tests for {@link RetriableStream}. */ /** Unit tests for {@link RetriableStream}. */
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
@ -212,7 +215,17 @@ public class RetriableStreamTest {
ClientStream mockStream1 = mock(ClientStream.class); ClientStream mockStream1 = mock(ClientStream.class);
ClientStream mockStream2 = mock(ClientStream.class); ClientStream mockStream2 = mock(ClientStream.class);
ClientStream mockStream3 = mock(ClientStream.class); ClientStream mockStream3 = mock(ClientStream.class);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock in) {
InsightBuilder insight = (InsightBuilder) in.getArguments()[0];
insight.appendKeyValue("remote_addr", "2.2.2.2:443");
return null;
}
}).when(mockStream3).appendTimeoutInsight(any(InsightBuilder.class));
InOrder inOrder = InOrder inOrder =
inOrder(retriableStreamRecorder, masterListener, mockStream1, mockStream2, mockStream3); inOrder(retriableStreamRecorder, masterListener, mockStream1, mockStream2, mockStream3);
@ -348,6 +361,11 @@ public class RetriableStreamTest {
inOrder.verify(mockStream3, times(7)).writeMessage(any(InputStream.class)); inOrder.verify(mockStream3, times(7)).writeMessage(any(InputStream.class));
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
InsightBuilder insight = new InsightBuilder();
retriableStream.appendTimeoutInsight(insight);
assertThat(insight.toString()).isEqualTo(
"[closed=[DATA_LOSS, UNAVAILABLE], open=[[remote_addr=2.2.2.2:443]]]");
// no more retry // no more retry
sublistenerCaptor3.getValue().closed( sublistenerCaptor3.getValue().closed(
Status.fromCode(NON_RETRIABLE_STATUS_CODE), new Metadata()); Status.fromCode(NON_RETRIABLE_STATUS_CODE), new Metadata());
@ -355,6 +373,11 @@ public class RetriableStreamTest {
inOrder.verify(retriableStreamRecorder).postCommit(); inOrder.verify(retriableStreamRecorder).postCommit();
inOrder.verify(masterListener).closed(any(Status.class), any(Metadata.class)); inOrder.verify(masterListener).closed(any(Status.class), any(Metadata.class));
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
insight = new InsightBuilder();
retriableStream.appendTimeoutInsight(insight);
assertThat(insight.toString()).isEqualTo(
"[closed=[DATA_LOSS, UNAVAILABLE, INTERNAL], committed=[remote_addr=2.2.2.2:443]]");
} }
@Test @Test
@ -1609,10 +1632,22 @@ public class RetriableStreamTest {
ClientStream mockStream2 = mock(ClientStream.class); ClientStream mockStream2 = mock(ClientStream.class);
ClientStream mockStream3 = mock(ClientStream.class); ClientStream mockStream3 = mock(ClientStream.class);
ClientStream mockStream4 = mock(ClientStream.class); ClientStream mockStream4 = mock(ClientStream.class);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); ClientStream[] mockStreams =
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); new ClientStream[]{mockStream1, mockStream2, mockStream3, mockStream4};
doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(2);
doReturn(mockStream4).when(retriableStreamRecorder).newSubstream(3); for (int i = 0; i < mockStreams.length; i++) {
doReturn(mockStreams[i]).when(retriableStreamRecorder).newSubstream(i);
final int fakePort = 80 + i;
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock in) {
InsightBuilder insight = (InsightBuilder) in.getArguments()[0];
insight.appendKeyValue("remote_addr", "2.2.2.2:" + fakePort);
return null;
}
}).when(mockStreams[i]).appendTimeoutInsight(any(InsightBuilder.class));
}
InOrder inOrder = inOrder( InOrder inOrder = inOrder(
retriableStreamRecorder, retriableStreamRecorder,
masterListener, masterListener,
@ -1772,6 +1807,12 @@ public class RetriableStreamTest {
inOrder.verify(mockStream4, times(4)).writeMessage(any(InputStream.class)); inOrder.verify(mockStream4, times(4)).writeMessage(any(InputStream.class));
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
InsightBuilder insight = new InsightBuilder();
hedgingStream.appendTimeoutInsight(insight);
assertThat(insight.toString()).isEqualTo(
"[closed=[UNAVAILABLE], "
+ "open=[[remote_addr=2.2.2.2:80], [remote_addr=2.2.2.2:81], [remote_addr=2.2.2.2:83]]]");
// commit // commit
sublistenerCaptor2.getValue().closed( sublistenerCaptor2.getValue().closed(
Status.fromCode(FATAL_STATUS_CODE), new Metadata()); Status.fromCode(FATAL_STATUS_CODE), new Metadata());
@ -1786,6 +1827,11 @@ public class RetriableStreamTest {
inOrder.verify(retriableStreamRecorder).postCommit(); inOrder.verify(retriableStreamRecorder).postCommit();
inOrder.verify(masterListener).closed(any(Status.class), any(Metadata.class)); inOrder.verify(masterListener).closed(any(Status.class), any(Metadata.class));
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
insight = new InsightBuilder();
hedgingStream.appendTimeoutInsight(insight);
assertThat(insight.toString()).isEqualTo(
"[closed=[UNAVAILABLE, INTERNAL], committed=[remote_addr=2.2.2.2:81]]");
} }
@Test @Test

View File

@ -118,6 +118,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import java.util.regex.Pattern;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSession;
@ -1081,7 +1082,7 @@ public abstract class AbstractInteropTest {
// warm up the channel and JVM // warm up the channel and JVM
blockingStub.emptyCall(Empty.getDefaultInstance()); blockingStub.emptyCall(Empty.getDefaultInstance());
TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.TestServiceBlockingStub stub =
blockingStub.withDeadlineAfter(10, TimeUnit.MILLISECONDS); blockingStub.withDeadlineAfter(100, TimeUnit.MILLISECONDS);
StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder() .addResponseParameters(ResponseParameters.newBuilder()
.setIntervalUs((int) TimeUnit.SECONDS.toMicros(20))) .setIntervalUs((int) TimeUnit.SECONDS.toMicros(20)))
@ -1091,6 +1092,14 @@ public abstract class AbstractInteropTest {
fail("Expected deadline to be exceeded"); fail("Expected deadline to be exceeded");
} catch (StatusRuntimeException ex) { } catch (StatusRuntimeException ex) {
assertEquals(Status.DEADLINE_EXCEEDED.getCode(), ex.getStatus().getCode()); assertEquals(Status.DEADLINE_EXCEEDED.getCode(), ex.getStatus().getCode());
String desc = ex.getStatus().getDescription();
assertTrue(desc,
// There is a race between client and server-side deadline expiration.
// If client expires first, it'd generate this message
Pattern.matches("deadline exceeded after .*ns. \\[.*\\]", desc)
// If server expires first, it'd reset the stream and client would generate a different
// message
|| desc.startsWith("ClientCall was cancelled at or after deadline."));
} }
assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK); assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK);
@ -1155,6 +1164,8 @@ public abstract class AbstractInteropTest {
fail("Should have thrown"); fail("Should have thrown");
} catch (StatusRuntimeException ex) { } catch (StatusRuntimeException ex) {
assertEquals(Status.Code.DEADLINE_EXCEEDED, ex.getStatus().getCode()); assertEquals(Status.Code.DEADLINE_EXCEEDED, ex.getStatus().getCode());
assertThat(ex.getStatus().getDescription())
.startsWith("ClientCall started after deadline exceeded");
} }
// CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be
@ -1178,6 +1189,8 @@ public abstract class AbstractInteropTest {
fail("Should have thrown"); fail("Should have thrown");
} catch (StatusRuntimeException ex) { } catch (StatusRuntimeException ex) {
assertEquals(Status.Code.DEADLINE_EXCEEDED, ex.getStatus().getCode()); assertEquals(Status.Code.DEADLINE_EXCEEDED, ex.getStatus().getCode());
assertThat(ex.getStatus().getDescription())
.startsWith("ClientCall started after deadline exceeded");
} }
assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK); assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK);
if (metricsExpected()) { if (metricsExpected()) {