core: Free unused MessageProducer in RetriableStream

This prevents leaking message buffers.

Fixes #9563
This commit is contained in:
Eric Anderson 2023-01-18 13:00:09 -08:00
parent 9de989bd64
commit 5a2c94bca1
2 changed files with 42 additions and 0 deletions

View File

@ -1099,6 +1099,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
checkState(
savedState.winningSubstream != null, "Headers should be received prior to messages.");
if (savedState.winningSubstream != substream) {
GrpcUtil.closeQuietly(producer);
return;
}
listenerSerializeExecutor.execute(

View File

@ -61,6 +61,8 @@ import io.grpc.internal.RetriableStream.Throttle;
import io.grpc.internal.StreamListener.MessageProducer;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executor;
@ -998,6 +1000,27 @@ public class RetriableStreamTest {
verify(masterListener).messagesAvailable(messageProducer);
}
@Test
public void inboundMessagesClosedOnCancel() throws Exception {
ClientStream mockStream1 = mock(ClientStream.class);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
retriableStream.start(masterListener);
retriableStream.request(1);
retriableStream.cancel(Status.CANCELLED.withDescription("on purpose"));
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());
ClientStreamListener listener = sublistenerCaptor1.getValue();
listener.headersRead(new Metadata());
InputStream is = mock(InputStream.class);
listener.messagesAvailable(new FakeMessageProducer(is));
verify(masterListener, never()).messagesAvailable(any(MessageProducer.class));
verify(is).close();
}
@Test
public void notAdd0PrevRetryAttemptsToRespHeaders() {
ClientStream mockStream1 = mock(ClientStream.class);
@ -2786,4 +2809,22 @@ public class RetriableStreamTest {
Status prestart();
}
private static final class FakeMessageProducer implements MessageProducer {
private final Iterator<InputStream> iterator;
public FakeMessageProducer(InputStream... iss) {
this.iterator = Arrays.asList(iss).iterator();
}
@Override
@Nullable
public InputStream next() {
if (iterator.hasNext()) {
return iterator.next();
} else {
return null;
}
}
}
}