core: Add grpc-previous-rpc-attempts to the initial response metadata (#9686)

Fixes #9641
This commit is contained in:
pandaapo 2023-01-18 07:33:45 +08:00 committed by GitHub
parent f2533f4fd8
commit 0f4b767660
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 56 additions and 0 deletions

View File

@ -854,6 +854,10 @@ abstract class RetriableStream<ReqT> implements ClientStream {
@Override
public void headersRead(final Metadata headers) {
if (substream.previousAttemptCount > 0) {
headers.discardAll(GRPC_PREVIOUS_RPC_ATTEMPTS);
headers.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(substream.previousAttemptCount));
}
commitAndRun(substream);
if (state.winningSubstream == substream) {
if (throttle != null) {

View File

@ -43,6 +43,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.ClientStreamTracer;
import io.grpc.Codec;
@ -997,6 +998,57 @@ public class RetriableStreamTest {
verify(masterListener).messagesAvailable(messageProducer);
}
@Test
public void notAdd0PrevRetryAttemptsToRespHeaders() {
ClientStream mockStream1 = mock(ClientStream.class);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
retriableStream.start(masterListener);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor.capture());
sublistenerCaptor.getValue().headersRead(new Metadata());
ArgumentCaptor<Metadata> metadataCaptor =
ArgumentCaptor.forClass(Metadata.class);
verify(masterListener).headersRead(metadataCaptor.capture());
assertEquals(null, metadataCaptor.getValue().get(GRPC_PREVIOUS_RPC_ATTEMPTS));
}
@Test
public void addPrevRetryAttemptsToRespHeaders() {
ClientStream mockStream1 = mock(ClientStream.class);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
retriableStream.start(masterListener);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());
// retry
ClientStream mockStream2 = mock(ClientStream.class);
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream2).start(sublistenerCaptor2.capture());
Metadata headers = new Metadata();
headers.put(GRPC_PREVIOUS_RPC_ATTEMPTS, "3");
sublistenerCaptor2.getValue().headersRead(headers);
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(masterListener).headersRead(metadataCaptor.capture());
Iterable<String> iterable = metadataCaptor.getValue().getAll(GRPC_PREVIOUS_RPC_ATTEMPTS);
assertEquals(1, Iterables.size(iterable));
assertEquals("1", iterable.iterator().next());
}
@Test
public void closedWhileDraining() {
ClientStream mockStream1 = mock(ClientStream.class);