From db64c36af2d828f339e40180f10c0118c55f3f30 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Mon, 26 Mar 2018 14:57:12 -0700 Subject: [PATCH] core: no retry on dropped PickResult Address #3553 for normal retry. Not covering the unimplemented hedging case. --- .../io/grpc/internal/FailingClientStream.java | 14 +++- .../grpc/internal/FailingClientTransport.java | 7 +- .../main/java/io/grpc/internal/GrpcUtil.java | 10 ++- .../io/grpc/internal/RetriableStream.java | 54 ++++++++------- .../internal/DelayedClientTransportTest.java | 10 ++- .../internal/FailingClientStreamTest.java | 56 ++++++++++++++++ .../internal/FailingClientTransportTest.java | 51 ++++++++++++++ .../java/io/grpc/internal/GrpcUtilTest.java | 67 +++++++++++++++++++ .../io/grpc/internal/RetriableStreamTest.java | 26 +++++++ 9 files changed, 262 insertions(+), 33 deletions(-) create mode 100644 core/src/test/java/io/grpc/internal/FailingClientStreamTest.java create mode 100644 core/src/test/java/io/grpc/internal/FailingClientTransportTest.java diff --git a/core/src/main/java/io/grpc/internal/FailingClientStream.java b/core/src/main/java/io/grpc/internal/FailingClientStream.java index 3ce7a95f2c..93155f4f44 100644 --- a/core/src/main/java/io/grpc/internal/FailingClientStream.java +++ b/core/src/main/java/io/grpc/internal/FailingClientStream.java @@ -16,9 +16,11 @@ package io.grpc.internal; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.grpc.Metadata; import io.grpc.Status; +import io.grpc.internal.ClientStreamListener.RpcProgress; /** * An implementation of {@link ClientStream} that fails (by calling {@link @@ -27,22 +29,32 @@ import io.grpc.Status; public final class FailingClientStream extends NoopClientStream { private boolean started; private final Status error; + private final RpcProgress rpcProgress; /** * Creates a {@code FailingClientStream} that would fail with the given error. */ public FailingClientStream(Status error) { + this(error, RpcProgress.PROCESSED); + } + + /** + * Creates a {@code FailingClientStream} that would fail with the given error. + */ + public FailingClientStream(Status error, RpcProgress rpcProgress) { Preconditions.checkArgument(!error.isOk(), "error must not be OK"); this.error = error; + this.rpcProgress = rpcProgress; } @Override public void start(ClientStreamListener listener) { Preconditions.checkState(!started, "already started"); started = true; - listener.closed(error, new Metadata()); + listener.closed(error, rpcProgress, new Metadata()); } + @VisibleForTesting Status getError() { return error; } diff --git a/core/src/main/java/io/grpc/internal/FailingClientTransport.java b/core/src/main/java/io/grpc/internal/FailingClientTransport.java index a0698853bd..9890e73e27 100644 --- a/core/src/main/java/io/grpc/internal/FailingClientTransport.java +++ b/core/src/main/java/io/grpc/internal/FailingClientTransport.java @@ -25,6 +25,7 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.internal.Channelz.SocketStats; +import io.grpc.internal.ClientStreamListener.RpcProgress; import java.util.concurrent.Executor; /** @@ -33,16 +34,18 @@ import java.util.concurrent.Executor; class FailingClientTransport implements ClientTransport { @VisibleForTesting final Status error; + private final RpcProgress rpcProgress; - FailingClientTransport(Status error) { + FailingClientTransport(Status error, RpcProgress rpcProgress) { Preconditions.checkArgument(!error.isOk(), "error must not be OK"); this.error = error; + this.rpcProgress = rpcProgress; } @Override public ClientStream newStream( MethodDescriptor method, Metadata headers, CallOptions callOptions) { - return new FailingClientStream(error); + return new FailingClientStream(error, rpcProgress); } @Override diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index 55f35b5bc8..e986586277 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -37,6 +37,7 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.internal.Channelz.SocketStats; +import io.grpc.internal.ClientStreamListener.RpcProgress; import io.grpc.internal.SharedResourceHolder.Resource; import io.grpc.internal.StreamListener.MessageProducer; import java.io.IOException; @@ -697,8 +698,13 @@ public final class GrpcUtil { } }; } - if (!result.getStatus().isOk() && (result.isDrop() || !isWaitForReady)) { - return new FailingClientTransport(result.getStatus()); + if (!result.getStatus().isOk()) { + if (result.isDrop()) { + return new FailingClientTransport(result.getStatus(), RpcProgress.DROPPED); + } + if (!isWaitForReady) { + return new FailingClientTransport(result.getStatus(), RpcProgress.PROCESSED); + } } return null; } diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 1a66a750fb..c1ca2e61a5 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -566,31 +566,35 @@ abstract class RetriableStream implements ClientStream { } }); return; - } // TODO(zdapeng): else if (rpcProgress == RpcProgress.DROPPED) - - noMoreTransparentRetry = true; - RetryPlan retryPlan = makeRetryDecision(retryPolicy, status, trailers); - if (retryPlan.shouldRetry) { - // The check state.winningSubstream == null, checking if is not already committed, is - // racy, but is still safe b/c the retry will also handle committed/cancellation - scheduledRetry = scheduledExecutorService.schedule( - new Runnable() { - @Override - public void run() { - scheduledRetry = null; - callExecutor.execute(new Runnable() { - @Override - public void run() { - // retry - Substream newSubstream = createSubstream(substream.previousAttempts + 1); - drain(newSubstream); - } - }); - } - }, - retryPlan.backoffInMillis, - TimeUnit.MILLISECONDS); - return; + } else if (rpcProgress == RpcProgress.DROPPED) { + // For normal retry, nothing need be done here, will just commit. + // For hedging: + // TODO(zdapeng): cancel all scheduled hedges (TBD) + } else { + noMoreTransparentRetry = true; + RetryPlan retryPlan = makeRetryDecision(retryPolicy, status, trailers); + if (retryPlan.shouldRetry) { + // The check state.winningSubstream == null, checking if is not already committed, is + // racy, but is still safe b/c the retry will also handle committed/cancellation + scheduledRetry = scheduledExecutorService.schedule( + new Runnable() { + @Override + public void run() { + scheduledRetry = null; + callExecutor.execute(new Runnable() { + @Override + public void run() { + // retry + Substream newSubstream = createSubstream(substream.previousAttempts + 1); + drain(newSubstream); + } + }); + } + }, + retryPlan.backoffInMillis, + TimeUnit.MILLISECONDS); + return; + } } } diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index f24f190557..79ca3f6354 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -43,6 +43,7 @@ import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; import io.grpc.StringMarshaller; +import io.grpc.internal.ClientStreamListener.RpcProgress; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -228,7 +229,8 @@ public class DelayedClientTransportTest { stream = delayedTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); verify(streamListener, never()).closed(any(Status.class), any(Metadata.class)); stream.start(streamListener); - verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class)); + verify(streamListener).closed( + statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class)); assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); assertEquals(0, delayedTransport.getPendingStreamsCount()); @@ -255,7 +257,8 @@ public class DelayedClientTransportTest { verify(transportListener).transportTerminated(); ClientStream stream = delayedTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); stream.start(streamListener); - verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class)); + verify(streamListener).closed( + statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class)); assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); } @@ -275,7 +278,8 @@ public class DelayedClientTransportTest { verify(transportListener).transportTerminated(); ClientStream stream = delayedTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); stream.start(streamListener); - verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class)); + verify(streamListener).closed( + statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class)); assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); } diff --git a/core/src/test/java/io/grpc/internal/FailingClientStreamTest.java b/core/src/test/java/io/grpc/internal/FailingClientStreamTest.java new file mode 100644 index 0000000000..e8dd0089dc --- /dev/null +++ b/core/src/test/java/io/grpc/internal/FailingClientStreamTest.java @@ -0,0 +1,56 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.internal.ClientStreamListener.RpcProgress; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link FailingClientStream}. + */ +@RunWith(JUnit4.class) +public class FailingClientStreamTest { + + @Test + public void processedRpcProgressPopulatedToListener() { + ClientStreamListener listener = mock(ClientStreamListener.class); + Status status = Status.UNAVAILABLE; + + ClientStream stream = new FailingClientStream(status); + stream.start(listener); + verify(listener).closed(eq(status), eq(RpcProgress.PROCESSED), any(Metadata.class)); + } + + @Test + public void droppedRpcProgressPopulatedToListener() { + ClientStreamListener listener = mock(ClientStreamListener.class); + Status status = Status.UNAVAILABLE; + + ClientStream stream = new FailingClientStream(status, RpcProgress.DROPPED); + stream.start(listener); + verify(listener).closed(eq(status), eq(RpcProgress.DROPPED), any(Metadata.class)); + } +} diff --git a/core/src/test/java/io/grpc/internal/FailingClientTransportTest.java b/core/src/test/java/io/grpc/internal/FailingClientTransportTest.java new file mode 100644 index 0000000000..83e0caccc9 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/FailingClientTransportTest.java @@ -0,0 +1,51 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import io.grpc.CallOptions; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.internal.ClientStreamListener.RpcProgress; +import io.grpc.testing.TestMethodDescriptors; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link FailingClientTransport}. + */ +@RunWith(JUnit4.class) +public class FailingClientTransportTest { + + @Test + public void newStreamStart() { + Status error = Status.UNAVAILABLE; + RpcProgress rpcProgress = RpcProgress.DROPPED; + FailingClientTransport transport = new FailingClientTransport(error, rpcProgress); + ClientStream stream = transport + .newStream(TestMethodDescriptors.voidMethod(), new Metadata(), CallOptions.DEFAULT); + ClientStreamListener listener = mock(ClientStreamListener.class); + stream.start(listener); + + verify(listener).closed(eq(error), eq(rpcProgress), any(Metadata.class)); + } +} diff --git a/core/src/test/java/io/grpc/internal/GrpcUtilTest.java b/core/src/test/java/io/grpc/internal/GrpcUtilTest.java index 66e052c575..51942c9070 100644 --- a/core/src/test/java/io/grpc/internal/GrpcUtilTest.java +++ b/core/src/test/java/io/grpc/internal/GrpcUtilTest.java @@ -18,12 +18,22 @@ package io.grpc.internal; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import io.grpc.CallOptions; +import io.grpc.LoadBalancer.PickResult; +import io.grpc.Metadata; import io.grpc.Status; +import io.grpc.internal.ClientStreamListener.RpcProgress; import io.grpc.internal.GrpcUtil.Http2Error; +import io.grpc.testing.TestMethodDescriptors; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -215,4 +225,61 @@ public class GrpcUtilTest { assertFalse(GrpcUtil.httpStatusToGrpcStatus(i).isOk()); } } + + @Test + public void getTransportFromPickResult_errorPickResult_waitForReady() { + Status status = Status.UNAVAILABLE; + PickResult pickResult = PickResult.withError(status); + ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, true); + + assertNull(transport); + } + + @Test + public void getTransportFromPickResult_errorPickResult_failFast() { + Status status = Status.UNAVAILABLE; + PickResult pickResult = PickResult.withError(status); + ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, false); + + assertNotNull(transport); + + ClientStream stream = transport + .newStream(TestMethodDescriptors.voidMethod(), new Metadata(), CallOptions.DEFAULT); + ClientStreamListener listener = mock(ClientStreamListener.class); + stream.start(listener); + + verify(listener).closed(eq(status), eq(RpcProgress.PROCESSED), any(Metadata.class)); + } + + @Test + public void getTransportFromPickResult_dropPickResult_waitForReady() { + Status status = Status.UNAVAILABLE; + PickResult pickResult = PickResult.withDrop(status); + ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, true); + + assertNotNull(transport); + + ClientStream stream = transport + .newStream(TestMethodDescriptors.voidMethod(), new Metadata(), CallOptions.DEFAULT); + ClientStreamListener listener = mock(ClientStreamListener.class); + stream.start(listener); + + verify(listener).closed(eq(status), eq(RpcProgress.DROPPED), any(Metadata.class)); + } + + @Test + public void getTransportFromPickResult_dropPickResult_failFast() { + Status status = Status.UNAVAILABLE; + PickResult pickResult = PickResult.withDrop(status); + ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, false); + + assertNotNull(transport); + + ClientStream stream = transport + .newStream(TestMethodDescriptors.voidMethod(), new Metadata(), CallOptions.DEFAULT); + ClientStreamListener listener = mock(ClientStreamListener.class); + stream.start(listener); + + verify(listener).closed(eq(status), eq(RpcProgress.DROPPED), any(Metadata.class)); + } } diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 20d7c070f2..5df1f0d4bd 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -17,6 +17,7 @@ package io.grpc.internal; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.internal.ClientStreamListener.RpcProgress.DROPPED; import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED; import static io.grpc.internal.ClientStreamListener.RpcProgress.REFUSED; import static io.grpc.internal.RetriableStream.GRPC_PREVIOUS_RPC_ATTEMPTS; @@ -28,6 +29,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.same; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -1506,6 +1508,30 @@ public class RetriableStreamTest { verify(retriableStreamRecorder).postCommit(); } + @Test + public void droppedShouldNeverRetry() { + ClientStream mockStream1 = mock(ClientStream.class); + ClientStream mockStream2 = mock(ClientStream.class); + doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); + doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); + + // start + retriableStream.start(masterListener); + + verify(retriableStreamRecorder).newSubstream(0); + ArgumentCaptor sublistenerCaptor1 = + ArgumentCaptor.forClass(ClientStreamListener.class); + verify(mockStream1).start(sublistenerCaptor1.capture()); + + // drop and verify no retry + Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1); + sublistenerCaptor1.getValue().closed(status, DROPPED, new Metadata()); + + verifyNoMoreInteractions(mockStream1, mockStream2); + verify(retriableStreamRecorder).postCommit(); + verify(masterListener).closed(same(status), any(Metadata.class)); + } + /** * Used to stub a retriable stream as well as to record methods of the retriable stream being * called.