core: no retry on dropped PickResult

Address #3553 for normal retry. Not covering the unimplemented hedging case.
This commit is contained in:
ZHANG Dapeng 2018-03-26 14:57:12 -07:00 committed by GitHub
parent 8fe313f598
commit db64c36af2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 262 additions and 33 deletions

View File

@ -16,9 +16,11 @@
package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.ClientStreamListener.RpcProgress;
/**
* An implementation of {@link ClientStream} that fails (by calling {@link
@ -27,22 +29,32 @@ import io.grpc.Status;
public final class FailingClientStream extends NoopClientStream {
private boolean started;
private final Status error;
private final RpcProgress rpcProgress;
/**
* Creates a {@code FailingClientStream} that would fail with the given error.
*/
public FailingClientStream(Status error) {
this(error, RpcProgress.PROCESSED);
}
/**
* Creates a {@code FailingClientStream} that would fail with the given error.
*/
public FailingClientStream(Status error, RpcProgress rpcProgress) {
Preconditions.checkArgument(!error.isOk(), "error must not be OK");
this.error = error;
this.rpcProgress = rpcProgress;
}
@Override
public void start(ClientStreamListener listener) {
Preconditions.checkState(!started, "already started");
started = true;
listener.closed(error, new Metadata());
listener.closed(error, rpcProgress, new Metadata());
}
@VisibleForTesting
Status getError() {
return error;
}

View File

@ -25,6 +25,7 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.Channelz.SocketStats;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import java.util.concurrent.Executor;
/**
@ -33,16 +34,18 @@ import java.util.concurrent.Executor;
class FailingClientTransport implements ClientTransport {
@VisibleForTesting
final Status error;
private final RpcProgress rpcProgress;
FailingClientTransport(Status error) {
FailingClientTransport(Status error, RpcProgress rpcProgress) {
Preconditions.checkArgument(!error.isOk(), "error must not be OK");
this.error = error;
this.rpcProgress = rpcProgress;
}
@Override
public ClientStream newStream(
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
return new FailingClientStream(error);
return new FailingClientStream(error, rpcProgress);
}
@Override

View File

@ -37,6 +37,7 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.Channelz.SocketStats;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import io.grpc.internal.SharedResourceHolder.Resource;
import io.grpc.internal.StreamListener.MessageProducer;
import java.io.IOException;
@ -697,8 +698,13 @@ public final class GrpcUtil {
}
};
}
if (!result.getStatus().isOk() && (result.isDrop() || !isWaitForReady)) {
return new FailingClientTransport(result.getStatus());
if (!result.getStatus().isOk()) {
if (result.isDrop()) {
return new FailingClientTransport(result.getStatus(), RpcProgress.DROPPED);
}
if (!isWaitForReady) {
return new FailingClientTransport(result.getStatus(), RpcProgress.PROCESSED);
}
}
return null;
}

View File

@ -566,31 +566,35 @@ abstract class RetriableStream<ReqT> implements ClientStream {
}
});
return;
} // TODO(zdapeng): else if (rpcProgress == RpcProgress.DROPPED)
noMoreTransparentRetry = true;
RetryPlan retryPlan = makeRetryDecision(retryPolicy, status, trailers);
if (retryPlan.shouldRetry) {
// The check state.winningSubstream == null, checking if is not already committed, is
// racy, but is still safe b/c the retry will also handle committed/cancellation
scheduledRetry = scheduledExecutorService.schedule(
new Runnable() {
@Override
public void run() {
scheduledRetry = null;
callExecutor.execute(new Runnable() {
@Override
public void run() {
// retry
Substream newSubstream = createSubstream(substream.previousAttempts + 1);
drain(newSubstream);
}
});
}
},
retryPlan.backoffInMillis,
TimeUnit.MILLISECONDS);
return;
} else if (rpcProgress == RpcProgress.DROPPED) {
// For normal retry, nothing need be done here, will just commit.
// For hedging:
// TODO(zdapeng): cancel all scheduled hedges (TBD)
} else {
noMoreTransparentRetry = true;
RetryPlan retryPlan = makeRetryDecision(retryPolicy, status, trailers);
if (retryPlan.shouldRetry) {
// The check state.winningSubstream == null, checking if is not already committed, is
// racy, but is still safe b/c the retry will also handle committed/cancellation
scheduledRetry = scheduledExecutorService.schedule(
new Runnable() {
@Override
public void run() {
scheduledRetry = null;
callExecutor.execute(new Runnable() {
@Override
public void run() {
// retry
Substream newSubstream = createSubstream(substream.previousAttempts + 1);
drain(newSubstream);
}
});
}
},
retryPlan.backoffInMillis,
TimeUnit.MILLISECONDS);
return;
}
}
}

View File

@ -43,6 +43,7 @@ import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import io.grpc.StringMarshaller;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@ -228,7 +229,8 @@ public class DelayedClientTransportTest {
stream = delayedTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
verify(streamListener, never()).closed(any(Status.class), any(Metadata.class));
stream.start(streamListener);
verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
verify(streamListener).closed(
statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class));
assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
assertEquals(0, delayedTransport.getPendingStreamsCount());
@ -255,7 +257,8 @@ public class DelayedClientTransportTest {
verify(transportListener).transportTerminated();
ClientStream stream = delayedTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(streamListener);
verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
verify(streamListener).closed(
statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class));
assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
}
@ -275,7 +278,8 @@ public class DelayedClientTransportTest {
verify(transportListener).transportTerminated();
ClientStream stream = delayedTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(streamListener);
verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
verify(streamListener).closed(
statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class));
assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Unit tests for {@link FailingClientStream}.
*/
@RunWith(JUnit4.class)
public class FailingClientStreamTest {
@Test
public void processedRpcProgressPopulatedToListener() {
ClientStreamListener listener = mock(ClientStreamListener.class);
Status status = Status.UNAVAILABLE;
ClientStream stream = new FailingClientStream(status);
stream.start(listener);
verify(listener).closed(eq(status), eq(RpcProgress.PROCESSED), any(Metadata.class));
}
@Test
public void droppedRpcProgressPopulatedToListener() {
ClientStreamListener listener = mock(ClientStreamListener.class);
Status status = Status.UNAVAILABLE;
ClientStream stream = new FailingClientStream(status, RpcProgress.DROPPED);
stream.start(listener);
verify(listener).closed(eq(status), eq(RpcProgress.DROPPED), any(Metadata.class));
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import io.grpc.CallOptions;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import io.grpc.testing.TestMethodDescriptors;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Unit tests for {@link FailingClientTransport}.
*/
@RunWith(JUnit4.class)
public class FailingClientTransportTest {
@Test
public void newStreamStart() {
Status error = Status.UNAVAILABLE;
RpcProgress rpcProgress = RpcProgress.DROPPED;
FailingClientTransport transport = new FailingClientTransport(error, rpcProgress);
ClientStream stream = transport
.newStream(TestMethodDescriptors.voidMethod(), new Metadata(), CallOptions.DEFAULT);
ClientStreamListener listener = mock(ClientStreamListener.class);
stream.start(listener);
verify(listener).closed(eq(error), eq(rpcProgress), any(Metadata.class));
}
}

View File

@ -18,12 +18,22 @@ package io.grpc.internal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import io.grpc.CallOptions;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import io.grpc.internal.GrpcUtil.Http2Error;
import io.grpc.testing.TestMethodDescriptors;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@ -215,4 +225,61 @@ public class GrpcUtilTest {
assertFalse(GrpcUtil.httpStatusToGrpcStatus(i).isOk());
}
}
@Test
public void getTransportFromPickResult_errorPickResult_waitForReady() {
Status status = Status.UNAVAILABLE;
PickResult pickResult = PickResult.withError(status);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, true);
assertNull(transport);
}
@Test
public void getTransportFromPickResult_errorPickResult_failFast() {
Status status = Status.UNAVAILABLE;
PickResult pickResult = PickResult.withError(status);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, false);
assertNotNull(transport);
ClientStream stream = transport
.newStream(TestMethodDescriptors.voidMethod(), new Metadata(), CallOptions.DEFAULT);
ClientStreamListener listener = mock(ClientStreamListener.class);
stream.start(listener);
verify(listener).closed(eq(status), eq(RpcProgress.PROCESSED), any(Metadata.class));
}
@Test
public void getTransportFromPickResult_dropPickResult_waitForReady() {
Status status = Status.UNAVAILABLE;
PickResult pickResult = PickResult.withDrop(status);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, true);
assertNotNull(transport);
ClientStream stream = transport
.newStream(TestMethodDescriptors.voidMethod(), new Metadata(), CallOptions.DEFAULT);
ClientStreamListener listener = mock(ClientStreamListener.class);
stream.start(listener);
verify(listener).closed(eq(status), eq(RpcProgress.DROPPED), any(Metadata.class));
}
@Test
public void getTransportFromPickResult_dropPickResult_failFast() {
Status status = Status.UNAVAILABLE;
PickResult pickResult = PickResult.withDrop(status);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, false);
assertNotNull(transport);
ClientStream stream = transport
.newStream(TestMethodDescriptors.voidMethod(), new Metadata(), CallOptions.DEFAULT);
ClientStreamListener listener = mock(ClientStreamListener.class);
stream.start(listener);
verify(listener).closed(eq(status), eq(RpcProgress.DROPPED), any(Metadata.class));
}
}

View File

@ -17,6 +17,7 @@
package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.internal.ClientStreamListener.RpcProgress.DROPPED;
import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
import static io.grpc.internal.ClientStreamListener.RpcProgress.REFUSED;
import static io.grpc.internal.RetriableStream.GRPC_PREVIOUS_RPC_ATTEMPTS;
@ -28,6 +29,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
@ -1506,6 +1508,30 @@ public class RetriableStreamTest {
verify(retriableStreamRecorder).postCommit();
}
@Test
public void droppedShouldNeverRetry() {
ClientStream mockStream1 = mock(ClientStream.class);
ClientStream mockStream2 = mock(ClientStream.class);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
// start
retriableStream.start(masterListener);
verify(retriableStreamRecorder).newSubstream(0);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());
// drop and verify no retry
Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1);
sublistenerCaptor1.getValue().closed(status, DROPPED, new Metadata());
verifyNoMoreInteractions(mockStream1, mockStream2);
verify(retriableStreamRecorder).postCommit();
verify(masterListener).closed(same(status), any(Metadata.class));
}
/**
* Used to stub a retriable stream as well as to record methods of the retriable stream being
* called.