okhttp: local-only transparent retry for okhttp

The OKHttp counterpart for #8878.
This commit is contained in:
ZHANG Dapeng 2022-02-11 13:02:23 -08:00 committed by GitHub
parent fbb1dbf7a5
commit 560a7fb084
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 4 deletions

View File

@ -419,7 +419,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
void streamReadyToStart(OkHttpClientStream clientStream) { void streamReadyToStart(OkHttpClientStream clientStream) {
if (goAwayStatus != null) { if (goAwayStatus != null) {
clientStream.transportState().transportReportStatus( clientStream.transportState().transportReportStatus(
goAwayStatus, RpcProgress.REFUSED, true, new Metadata()); goAwayStatus, RpcProgress.MISCARRIED, true, new Metadata());
} else if (streams.size() >= maxConcurrentStreams) { } else if (streams.size() >= maxConcurrentStreams) {
pendingStreams.add(clientStream); pendingStreams.add(clientStream);
setInUse(clientStream); setInUse(clientStream);
@ -811,7 +811,10 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
} }
for (OkHttpClientStream stream : pendingStreams) { for (OkHttpClientStream stream : pendingStreams) {
stream.transportState().transportReportStatus(reason, true, new Metadata()); // in cases such as the connection fails to ACK keep-alive, pending streams should have a
// chance to retry and be routed to another connection.
stream.transportState().transportReportStatus(
reason, RpcProgress.MISCARRIED, true, new Metadata());
maybeClearInUse(stream); maybeClearInUse(stream);
} }
pendingStreams.clear(); pendingStreams.clear();
@ -894,7 +897,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
for (OkHttpClientStream stream : pendingStreams) { for (OkHttpClientStream stream : pendingStreams) {
stream.transportState().transportReportStatus( stream.transportState().transportReportStatus(
status, RpcProgress.REFUSED, true, new Metadata()); status, RpcProgress.MISCARRIED, true, new Metadata());
maybeClearInUse(stream); maybeClearInUse(stream);
} }
pendingStreams.clear(); pendingStreams.clear();

View File

@ -18,6 +18,7 @@ package io.grpc.okhttp;
import static com.google.common.base.Charsets.UTF_8; import static com.google.common.base.Charsets.UTF_8;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static io.grpc.internal.ClientStreamListener.RpcProgress.MISCARRIED;
import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED; import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
import static io.grpc.internal.ClientStreamListener.RpcProgress.REFUSED; import static io.grpc.internal.ClientStreamListener.RpcProgress.REFUSED;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
@ -2080,7 +2081,7 @@ public class OkHttpClientTransportTest {
listener3.waitUntilStreamClosed(); listener3.waitUntilStreamClosed();
assertNull(listener1.rpcProgress); assertNull(listener1.rpcProgress);
assertEquals(REFUSED, listener2.rpcProgress); assertEquals(REFUSED, listener2.rpcProgress);
assertEquals(REFUSED, listener3.rpcProgress); assertEquals(MISCARRIED, listener3.rpcProgress);
assertEquals(1, activeStreamCount()); assertEquals(1, activeStreamCount());
assertContainStream(DEFAULT_START_STREAM_ID); assertContainStream(DEFAULT_START_STREAM_ID);
@ -2133,6 +2134,39 @@ public class OkHttpClientTransportTest {
shutdownAndVerify(); shutdownAndVerify();
} }
@Test
public void shutdownNow_streamListenerRpcProgress() throws Exception {
initTransport();
setMaxConcurrentStreams(2);
MockStreamListener listener1 = new MockStreamListener();
MockStreamListener listener2 = new MockStreamListener();
MockStreamListener listener3 = new MockStreamListener();
OkHttpClientStream stream1 =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
stream1.start(listener1);
OkHttpClientStream stream2 =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
stream2.start(listener2);
OkHttpClientStream stream3 =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
stream3.start(listener3);
waitForStreamPending(1);
assertEquals(2, activeStreamCount());
assertContainStream(DEFAULT_START_STREAM_ID);
assertContainStream(DEFAULT_START_STREAM_ID + 2);
clientTransport.shutdownNow(Status.INTERNAL);
listener1.waitUntilStreamClosed();
listener2.waitUntilStreamClosed();
listener3.waitUntilStreamClosed();
assertEquals(PROCESSED, listener1.rpcProgress);
assertEquals(PROCESSED, listener2.rpcProgress);
assertEquals(MISCARRIED, listener3.rpcProgress);
}
private int activeStreamCount() { private int activeStreamCount() {
return clientTransport.getActiveStreams().length; return clientTransport.getActiveStreams().length;
} }