From 560a7fb08437e9b538e5e1751f12caf6207df530 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Fri, 11 Feb 2022 13:02:23 -0800 Subject: [PATCH] okhttp: local-only transparent retry for okhttp The OKHttp counterpart for #8878. --- .../io/grpc/okhttp/OkHttpClientTransport.java | 9 +++-- .../okhttp/OkHttpClientTransportTest.java | 36 ++++++++++++++++++- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 121093716d..cf7cae19d8 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -419,7 +419,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep void streamReadyToStart(OkHttpClientStream clientStream) { if (goAwayStatus != null) { clientStream.transportState().transportReportStatus( - goAwayStatus, RpcProgress.REFUSED, true, new Metadata()); + goAwayStatus, RpcProgress.MISCARRIED, true, new Metadata()); } else if (streams.size() >= maxConcurrentStreams) { pendingStreams.add(clientStream); setInUse(clientStream); @@ -811,7 +811,10 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep } for (OkHttpClientStream stream : pendingStreams) { - stream.transportState().transportReportStatus(reason, true, new Metadata()); + // in cases such as the connection fails to ACK keep-alive, pending streams should have a + // chance to retry and be routed to another connection. + stream.transportState().transportReportStatus( + reason, RpcProgress.MISCARRIED, true, new Metadata()); maybeClearInUse(stream); } pendingStreams.clear(); @@ -894,7 +897,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep for (OkHttpClientStream stream : pendingStreams) { stream.transportState().transportReportStatus( - status, RpcProgress.REFUSED, true, new Metadata()); + status, RpcProgress.MISCARRIED, true, new Metadata()); maybeClearInUse(stream); } pendingStreams.clear(); diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index b70b832a79..1628df4d3c 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -18,6 +18,7 @@ package io.grpc.okhttp; import static com.google.common.base.Charsets.UTF_8; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.internal.ClientStreamListener.RpcProgress.MISCARRIED; import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED; import static io.grpc.internal.ClientStreamListener.RpcProgress.REFUSED; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; @@ -2080,7 +2081,7 @@ public class OkHttpClientTransportTest { listener3.waitUntilStreamClosed(); assertNull(listener1.rpcProgress); assertEquals(REFUSED, listener2.rpcProgress); - assertEquals(REFUSED, listener3.rpcProgress); + assertEquals(MISCARRIED, listener3.rpcProgress); assertEquals(1, activeStreamCount()); assertContainStream(DEFAULT_START_STREAM_ID); @@ -2133,6 +2134,39 @@ public class OkHttpClientTransportTest { shutdownAndVerify(); } + @Test + public void shutdownNow_streamListenerRpcProgress() throws Exception { + initTransport(); + setMaxConcurrentStreams(2); + MockStreamListener listener1 = new MockStreamListener(); + MockStreamListener listener2 = new MockStreamListener(); + MockStreamListener listener3 = new MockStreamListener(); + OkHttpClientStream stream1 = + clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); + stream1.start(listener1); + OkHttpClientStream stream2 = + clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); + stream2.start(listener2); + OkHttpClientStream stream3 = + clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers); + stream3.start(listener3); + waitForStreamPending(1); + + assertEquals(2, activeStreamCount()); + assertContainStream(DEFAULT_START_STREAM_ID); + assertContainStream(DEFAULT_START_STREAM_ID + 2); + + clientTransport.shutdownNow(Status.INTERNAL); + + listener1.waitUntilStreamClosed(); + listener2.waitUntilStreamClosed(); + listener3.waitUntilStreamClosed(); + + assertEquals(PROCESSED, listener1.rpcProgress); + assertEquals(PROCESSED, listener2.rpcProgress); + assertEquals(MISCARRIED, listener3.rpcProgress); + } + private int activeStreamCount() { return clientTransport.getActiveStreams().length; }