From 0f4b7676603cd2f7a92c276dc4b36fb0f70ba6b3 Mon Sep 17 00:00:00 2001 From: pandaapo <35672972+pandaapo@users.noreply.github.com> Date: Wed, 18 Jan 2023 07:33:45 +0800 Subject: [PATCH] core: Add grpc-previous-rpc-attempts to the initial response metadata (#9686) Fixes #9641 --- .../io/grpc/internal/RetriableStream.java | 4 ++ .../io/grpc/internal/RetriableStreamTest.java | 52 +++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 37e8341f85..d6a8ecacc9 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -854,6 +854,10 @@ abstract class RetriableStream implements ClientStream { @Override public void headersRead(final Metadata headers) { + if (substream.previousAttemptCount > 0) { + headers.discardAll(GRPC_PREVIOUS_RPC_ATTEMPTS); + headers.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(substream.previousAttemptCount)); + } commitAndRun(substream); if (state.winningSubstream == substream) { if (throttle != null) { diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 4910c0b613..71cde947bf 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -43,6 +43,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; import io.grpc.ClientStreamTracer; import io.grpc.Codec; @@ -997,6 +998,57 @@ public class RetriableStreamTest { verify(masterListener).messagesAvailable(messageProducer); } + @Test + public void notAdd0PrevRetryAttemptsToRespHeaders() { + ClientStream mockStream1 = mock(ClientStream.class); + doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); + + retriableStream.start(masterListener); + + ArgumentCaptor sublistenerCaptor = + ArgumentCaptor.forClass(ClientStreamListener.class); + verify(mockStream1).start(sublistenerCaptor.capture()); + + sublistenerCaptor.getValue().headersRead(new Metadata()); + + ArgumentCaptor metadataCaptor = + ArgumentCaptor.forClass(Metadata.class); + verify(masterListener).headersRead(metadataCaptor.capture()); + assertEquals(null, metadataCaptor.getValue().get(GRPC_PREVIOUS_RPC_ATTEMPTS)); + } + + @Test + public void addPrevRetryAttemptsToRespHeaders() { + ClientStream mockStream1 = mock(ClientStream.class); + doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); + + retriableStream.start(masterListener); + + ArgumentCaptor sublistenerCaptor1 = + ArgumentCaptor.forClass(ClientStreamListener.class); + verify(mockStream1).start(sublistenerCaptor1.capture()); + + // retry + ClientStream mockStream2 = mock(ClientStream.class); + doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); + sublistenerCaptor1.getValue().closed( + Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); + fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + + ArgumentCaptor sublistenerCaptor2 = + ArgumentCaptor.forClass(ClientStreamListener.class); + verify(mockStream2).start(sublistenerCaptor2.capture()); + Metadata headers = new Metadata(); + headers.put(GRPC_PREVIOUS_RPC_ATTEMPTS, "3"); + sublistenerCaptor2.getValue().headersRead(headers); + + ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(Metadata.class); + verify(masterListener).headersRead(metadataCaptor.capture()); + Iterable iterable = metadataCaptor.getValue().getAll(GRPC_PREVIOUS_RPC_ATTEMPTS); + assertEquals(1, Iterables.size(iterable)); + assertEquals("1", iterable.iterator().next()); + } + @Test public void closedWhileDraining() { ClientStream mockStream1 = mock(ClientStream.class);