check pending stream completion at delayed transport lifecycle (#7720)

add onTransferComplete() at delayedStream and wait for all pending streams to complete transfer when shutting down delayedClientTransport
This commit is contained in:
Yifei Zhuang 2020-12-21 11:56:51 -08:00 committed by GitHub
parent da939ca762
commit 90850128a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 104 additions and 31 deletions

View File

@ -66,6 +66,8 @@ final class DelayedClientTransport implements ManagedClientTransport {
@Nonnull @Nonnull
@GuardedBy("lock") @GuardedBy("lock")
private Collection<PendingStream> pendingStreams = new LinkedHashSet<>(); private Collection<PendingStream> pendingStreams = new LinkedHashSet<>();
@GuardedBy("lock")
private int pendingCompleteStreams;
/** /**
* When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
@ -175,6 +177,7 @@ final class DelayedClientTransport implements ManagedClientTransport {
private PendingStream createPendingStream(PickSubchannelArgs args) { private PendingStream createPendingStream(PickSubchannelArgs args) {
PendingStream pendingStream = new PendingStream(args); PendingStream pendingStream = new PendingStream(args);
pendingStreams.add(pendingStream); pendingStreams.add(pendingStream);
pendingCompleteStreams++;
if (getPendingStreamsCount() == 1) { if (getPendingStreamsCount() == 1) {
syncContext.executeLater(reportTransportInUse); syncContext.executeLater(reportTransportInUse);
} }
@ -211,7 +214,7 @@ final class DelayedClientTransport implements ManagedClientTransport {
listener.transportShutdown(status); listener.transportShutdown(status);
} }
}); });
if (!hasPendingStreams() && reportTransportTerminated != null) { if (pendingCompleteStreams == 0 && reportTransportTerminated != null) {
syncContext.executeLater(reportTransportTerminated); syncContext.executeLater(reportTransportTerminated);
reportTransportTerminated = null; reportTransportTerminated = null;
} }
@ -227,23 +230,15 @@ final class DelayedClientTransport implements ManagedClientTransport {
public final void shutdownNow(Status status) { public final void shutdownNow(Status status) {
shutdown(status); shutdown(status);
Collection<PendingStream> savedPendingStreams; Collection<PendingStream> savedPendingStreams;
Runnable savedReportTransportTerminated;
synchronized (lock) { synchronized (lock) {
savedPendingStreams = pendingStreams; savedPendingStreams = pendingStreams;
savedReportTransportTerminated = reportTransportTerminated;
reportTransportTerminated = null;
if (!pendingStreams.isEmpty()) { if (!pendingStreams.isEmpty()) {
pendingStreams = Collections.emptyList(); pendingStreams = Collections.emptyList();
} }
} }
if (savedReportTransportTerminated != null) {
for (PendingStream stream : savedPendingStreams) { for (PendingStream stream : savedPendingStreams) {
stream.cancel(status); stream.cancel(status);
} }
syncContext.execute(savedReportTransportTerminated);
}
// If savedReportTransportTerminated == null, transportTerminated() has already been called in
// shutdown().
} }
public final boolean hasPendingStreams() { public final boolean hasPendingStreams() {
@ -259,6 +254,13 @@ final class DelayedClientTransport implements ManagedClientTransport {
} }
} }
@VisibleForTesting
final int getPendingCompleteStreamsCount() {
synchronized (lock) {
return pendingCompleteStreams;
}
}
/** /**
* Use the picker to try picking a transport for every pending stream, proceed the stream if the * Use the picker to try picking a transport for every pending stream, proceed the stream if the
* pick is successful, otherwise keep it pending. * pick is successful, otherwise keep it pending.
@ -324,10 +326,6 @@ final class DelayedClientTransport implements ManagedClientTransport {
// (which would shutdown the transports and LoadBalancer) because the gap should be shorter // (which would shutdown the transports and LoadBalancer) because the gap should be shorter
// than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second). // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
syncContext.executeLater(reportTransportNotInUse); syncContext.executeLater(reportTransportNotInUse);
if (shutdownStatus != null && reportTransportTerminated != null) {
syncContext.executeLater(reportTransportTerminated);
reportTransportTerminated = null;
}
} }
} }
syncContext.drain(); syncContext.drain();
@ -341,6 +339,8 @@ final class DelayedClientTransport implements ManagedClientTransport {
private class PendingStream extends DelayedStream { private class PendingStream extends DelayedStream {
private final PickSubchannelArgs args; private final PickSubchannelArgs args;
private final Context context = Context.current(); private final Context context = Context.current();
@GuardedBy("lock")
private boolean transferCompleted;
private PendingStream(PickSubchannelArgs args) { private PendingStream(PickSubchannelArgs args) {
this.args = args; this.args = args;
@ -362,17 +362,27 @@ final class DelayedClientTransport implements ManagedClientTransport {
public void cancel(Status reason) { public void cancel(Status reason) {
super.cancel(reason); super.cancel(reason);
synchronized (lock) { synchronized (lock) {
if (reportTransportTerminated != null) {
boolean justRemovedAnElement = pendingStreams.remove(this); boolean justRemovedAnElement = pendingStreams.remove(this);
if (!hasPendingStreams() && justRemovedAnElement) { if (!hasPendingStreams() && justRemovedAnElement && reportTransportTerminated != null) {
syncContext.executeLater(reportTransportNotInUse); syncContext.executeLater(reportTransportNotInUse);
if (shutdownStatus != null) { }
}
syncContext.drain();
}
@Override
public void onTransferComplete() {
synchronized (lock) {
if (transferCompleted) {
return;
}
transferCompleted = true;
pendingCompleteStreams--;
if (shutdownStatus != null && pendingCompleteStreams == 0) {
syncContext.executeLater(reportTransportTerminated); syncContext.executeLater(reportTransportTerminated);
reportTransportTerminated = null; reportTransportTerminated = null;
} }
} }
}
}
syncContext.drain(); syncContext.drain();
} }
} }

View File

@ -39,7 +39,7 @@ import javax.annotation.concurrent.GuardedBy;
* DelayedStream} may be internally altered by different threads, thus internal synchronization is * DelayedStream} may be internally altered by different threads, thus internal synchronization is
* necessary. * necessary.
*/ */
class DelayedStream implements ClientStream { abstract class DelayedStream implements ClientStream {
/** {@code true} once realStream is valid and all pending calls have been drained. */ /** {@code true} once realStream is valid and all pending calls have been drained. */
private volatile boolean passThrough; private volatile boolean passThrough;
/** /**
@ -221,12 +221,14 @@ class DelayedStream implements ClientStream {
if (savedPassThrough) { if (savedPassThrough) {
realStream.start(listener); realStream.start(listener);
onTransferComplete();
} else { } else {
final ClientStreamListener finalListener = listener; final ClientStreamListener finalListener = listener;
delayOrExecute(new Runnable() { delayOrExecute(new Runnable() {
@Override @Override
public void run() { public void run() {
realStream.start(finalListener); realStream.start(finalListener);
onTransferComplete();
} }
}); });
} }
@ -302,6 +304,7 @@ class DelayedStream implements ClientStream {
listenerToClose.closed(reason, new Metadata()); listenerToClose.closed(reason, new Metadata());
} }
drainPendingCalls(); drainPendingCalls();
onTransferComplete();
} }
} }
@ -407,6 +410,12 @@ class DelayedStream implements ClientStream {
return realStream; return realStream;
} }
/**
* Provides the place to define actions at the point when transfer is done.
* Call this method to trigger those transfer completion activities.
*/
abstract void onTransferComplete();
private static class DelayedStreamListener implements ClientStreamListener { private static class DelayedStreamListener implements ClientStreamListener {
private final ClientStreamListener realListener; private final ClientStreamListener realListener;
private volatile boolean passThrough; private volatile boolean passThrough;

View File

@ -47,7 +47,7 @@ final class MetadataApplierImpl extends MetadataApplier {
boolean finalized; boolean finalized;
// not null if returnStream() was called before apply() // not null if returnStream() was called before apply()
DelayedStream delayedStream; ApplierDelayedStream delayedStream;
MetadataApplierImpl( MetadataApplierImpl(
ClientTransport transport, MethodDescriptor<?, ?> method, Metadata origHeaders, ClientTransport transport, MethodDescriptor<?, ?> method, Metadata origHeaders,
@ -105,11 +105,16 @@ final class MetadataApplierImpl extends MetadataApplier {
synchronized (lock) { synchronized (lock) {
if (returnedStream == null) { if (returnedStream == null) {
// apply() has not been called, needs to buffer the requests. // apply() has not been called, needs to buffer the requests.
delayedStream = new DelayedStream(); delayedStream = new ApplierDelayedStream();
return returnedStream = delayedStream; return returnedStream = delayedStream;
} else { } else {
return returnedStream; return returnedStream;
} }
} }
} }
private static class ApplierDelayedStream extends DelayedStream {
@Override
void onTransferComplete() {}
}
} }

View File

@ -158,12 +158,14 @@ public class DelayedClientTransportTest {
delayedTransport.reprocess(mockPicker); delayedTransport.reprocess(mockPicker);
assertEquals(0, delayedTransport.getPendingStreamsCount()); assertEquals(0, delayedTransport.getPendingStreamsCount());
delayedTransport.shutdown(SHUTDOWN_STATUS); delayedTransport.shutdown(SHUTDOWN_STATUS);
verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); verify(transportListener, never()).transportTerminated();
verify(transportListener).transportTerminated();
assertEquals(1, fakeExecutor.runDueTasks()); assertEquals(1, fakeExecutor.runDueTasks());
verify(transportListener, never()).transportTerminated();
verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions)); verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions));
stream.start(streamListener); stream.start(streamListener);
verify(mockRealStream).start(same(streamListener)); verify(mockRealStream).start(same(streamListener));
verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS));
verify(transportListener).transportTerminated();
} }
@Test public void transportTerminatedThenAssignTransport() { @Test public void transportTerminatedThenAssignTransport() {
@ -201,8 +203,10 @@ public class DelayedClientTransportTest {
@Test public void cancelStreamWithoutSetTransport() { @Test public void cancelStreamWithoutSetTransport() {
ClientStream stream = delayedTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); ClientStream stream = delayedTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
assertEquals(1, delayedTransport.getPendingStreamsCount()); assertEquals(1, delayedTransport.getPendingStreamsCount());
assertEquals(1, delayedTransport.getPendingCompleteStreamsCount());
stream.cancel(Status.CANCELLED); stream.cancel(Status.CANCELLED);
assertEquals(0, delayedTransport.getPendingStreamsCount()); assertEquals(0, delayedTransport.getPendingStreamsCount());
assertEquals(0, delayedTransport.getPendingCompleteStreamsCount());
verifyNoMoreInteractions(mockRealTransport); verifyNoMoreInteractions(mockRealTransport);
verifyNoMoreInteractions(mockRealStream); verifyNoMoreInteractions(mockRealStream);
} }
@ -211,13 +215,34 @@ public class DelayedClientTransportTest {
ClientStream stream = delayedTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); ClientStream stream = delayedTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(streamListener); stream.start(streamListener);
assertEquals(1, delayedTransport.getPendingStreamsCount()); assertEquals(1, delayedTransport.getPendingStreamsCount());
assertEquals(1, delayedTransport.getPendingCompleteStreamsCount());
stream.cancel(Status.CANCELLED); stream.cancel(Status.CANCELLED);
assertEquals(0, delayedTransport.getPendingStreamsCount()); assertEquals(0, delayedTransport.getPendingStreamsCount());
assertEquals(0, delayedTransport.getPendingCompleteStreamsCount());
verify(streamListener).closed(same(Status.CANCELLED), any(Metadata.class)); verify(streamListener).closed(same(Status.CANCELLED), any(Metadata.class));
verifyNoMoreInteractions(mockRealTransport); verifyNoMoreInteractions(mockRealTransport);
verifyNoMoreInteractions(mockRealStream); verifyNoMoreInteractions(mockRealStream);
} }
@Test
public void cancelStreamShutdownThenStart() {
ClientStream stream = delayedTransport.newStream(method, headers, callOptions);
delayedTransport.shutdown(Status.UNAVAILABLE);
assertEquals(1, delayedTransport.getPendingStreamsCount());
assertEquals(1, delayedTransport.getPendingCompleteStreamsCount());
delayedTransport.reprocess(mockPicker);
assertEquals(1, fakeExecutor.runDueTasks());
assertEquals(0, delayedTransport.getPendingStreamsCount());
assertEquals(1, delayedTransport.getPendingCompleteStreamsCount());
stream.cancel(Status.CANCELLED);
verify(mockRealStream).cancel(same(Status.CANCELLED));
verify(transportListener, never()).transportTerminated();
stream.start(streamListener);
assertEquals(0, delayedTransport.getPendingCompleteStreamsCount());
verify(mockRealStream).start(streamListener);
verify(transportListener).transportTerminated();
}
@Test public void newStreamThenShutdownTransportThenAssignTransport() { @Test public void newStreamThenShutdownTransportThenAssignTransport() {
ClientStream stream = delayedTransport.newStream(method, headers, callOptions); ClientStream stream = delayedTransport.newStream(method, headers, callOptions);
stream.start(streamListener); stream.start(streamListener);
@ -353,6 +378,7 @@ public class DelayedClientTransportTest {
waitForReadyCallOptions); waitForReadyCallOptions);
assertEquals(8, delayedTransport.getPendingStreamsCount()); assertEquals(8, delayedTransport.getPendingStreamsCount());
assertEquals(8, delayedTransport.getPendingCompleteStreamsCount());
// First reprocess(). Some will proceed, some will fail and the rest will stay buffered. // First reprocess(). Some will proceed, some will fail and the rest will stay buffered.
SubchannelPicker picker = mock(SubchannelPicker.class); SubchannelPicker picker = mock(SubchannelPicker.class);
@ -370,6 +396,7 @@ public class DelayedClientTransportTest {
delayedTransport.reprocess(picker); delayedTransport.reprocess(picker);
assertEquals(5, delayedTransport.getPendingStreamsCount()); assertEquals(5, delayedTransport.getPendingStreamsCount());
assertEquals(8, delayedTransport.getPendingCompleteStreamsCount());
inOrder.verify(picker).pickSubchannel(ff1args); inOrder.verify(picker).pickSubchannel(ff1args);
inOrder.verify(picker).pickSubchannel(ff2args); inOrder.verify(picker).pickSubchannel(ff2args);
inOrder.verify(picker).pickSubchannel(ff3args); inOrder.verify(picker).pickSubchannel(ff3args);
@ -385,8 +412,12 @@ public class DelayedClientTransportTest {
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
verify(mockRealTransport2, never()).newStream( verify(mockRealTransport2, never()).newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
ff1.start(streamListener);
ff2.start(streamListener);
fakeExecutor.runDueTasks(); fakeExecutor.runDueTasks();
assertEquals(0, fakeExecutor.numPendingTasks()); assertEquals(0, fakeExecutor.numPendingTasks());
// 8 - 2(runDueTask with start)
assertEquals(6, delayedTransport.getPendingCompleteStreamsCount());
// ff1 and wfr1 went through // ff1 and wfr1 went through
verify(mockRealTransport).newStream(method, headers, failFastCallOptions); verify(mockRealTransport).newStream(method, headers, failFastCallOptions);
verify(mockRealTransport2).newStream(method, headers, waitForReadyCallOptions); verify(mockRealTransport2).newStream(method, headers, waitForReadyCallOptions);
@ -394,6 +425,8 @@ public class DelayedClientTransportTest {
assertSame(mockRealStream2, wfr1.getRealStream()); assertSame(mockRealStream2, wfr1.getRealStream());
// The ff2 has failed due to picker returning an error // The ff2 has failed due to picker returning an error
assertSame(Status.UNAVAILABLE, ((FailingClientStream) ff2.getRealStream()).getError()); assertSame(Status.UNAVAILABLE, ((FailingClientStream) ff2.getRealStream()).getError());
wfr1.start(streamListener);
assertEquals(5, delayedTransport.getPendingCompleteStreamsCount());
// Other streams are still buffered // Other streams are still buffered
assertNull(ff3.getRealStream()); assertNull(ff3.getRealStream());
assertNull(ff4.getRealStream()); assertNull(ff4.getRealStream());
@ -414,8 +447,14 @@ public class DelayedClientTransportTest {
assertEquals(0, wfr3Executor.numPendingTasks()); assertEquals(0, wfr3Executor.numPendingTasks());
verify(transportListener, never()).transportInUse(false); verify(transportListener, never()).transportInUse(false);
ff3.start(streamListener);
ff4.start(streamListener);
wfr2.start(streamListener);
wfr3.start(streamListener);
wfr4.start(streamListener);
delayedTransport.reprocess(picker); delayedTransport.reprocess(picker);
assertEquals(0, delayedTransport.getPendingStreamsCount()); assertEquals(0, delayedTransport.getPendingStreamsCount());
assertEquals(5, delayedTransport.getPendingCompleteStreamsCount());
verify(transportListener).transportInUse(false); verify(transportListener).transportInUse(false);
inOrder.verify(picker).pickSubchannel(ff3args); // ff3 inOrder.verify(picker).pickSubchannel(ff3args); // ff3
inOrder.verify(picker).pickSubchannel(ff4args); // ff4 inOrder.verify(picker).pickSubchannel(ff4args); // ff4
@ -423,8 +462,9 @@ public class DelayedClientTransportTest {
inOrder.verify(picker).pickSubchannel(wfr3args); // wfr3 inOrder.verify(picker).pickSubchannel(wfr3args); // wfr3
inOrder.verify(picker).pickSubchannel(wfr4args); // wfr4 inOrder.verify(picker).pickSubchannel(wfr4args); // wfr4
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
fakeExecutor.runDueTasks(); assertEquals(4, fakeExecutor.runDueTasks());
assertEquals(0, fakeExecutor.numPendingTasks()); assertEquals(0, fakeExecutor.numPendingTasks());
assertEquals(1, delayedTransport.getPendingCompleteStreamsCount());
assertSame(mockRealStream, ff3.getRealStream()); assertSame(mockRealStream, ff3.getRealStream());
assertSame(mockRealStream2, ff4.getRealStream()); assertSame(mockRealStream2, ff4.getRealStream());
assertSame(mockRealStream2, wfr2.getRealStream()); assertSame(mockRealStream2, wfr2.getRealStream());
@ -434,15 +474,18 @@ public class DelayedClientTransportTest {
assertNull(wfr3.getRealStream()); assertNull(wfr3.getRealStream());
wfr3Executor.runDueTasks(); wfr3Executor.runDueTasks();
assertSame(mockRealStream, wfr3.getRealStream()); assertSame(mockRealStream, wfr3.getRealStream());
assertEquals(0, delayedTransport.getPendingCompleteStreamsCount());
// New streams will use the last picker // New streams will use the last picker
DelayedStream wfr5 = (DelayedStream) delayedTransport.newStream( DelayedStream wfr5 = (DelayedStream) delayedTransport.newStream(
method, headers, waitForReadyCallOptions); method, headers, waitForReadyCallOptions);
wfr5.start(streamListener);
assertNull(wfr5.getRealStream()); assertNull(wfr5.getRealStream());
inOrder.verify(picker).pickSubchannel( inOrder.verify(picker).pickSubchannel(
new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions)); new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions));
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
assertEquals(1, delayedTransport.getPendingStreamsCount()); assertEquals(1, delayedTransport.getPendingStreamsCount());
assertEquals(1, delayedTransport.getPendingCompleteStreamsCount());
// wfr5 will stop delayed transport from terminating // wfr5 will stop delayed transport from terminating
delayedTransport.shutdown(SHUTDOWN_STATUS); delayedTransport.shutdown(SHUTDOWN_STATUS);

View File

@ -65,7 +65,7 @@ public class DelayedStreamTest {
@Mock private ClientStreamListener listener; @Mock private ClientStreamListener listener;
@Mock private ClientStream realStream; @Mock private ClientStream realStream;
@Captor private ArgumentCaptor<ClientStreamListener> listenerCaptor; @Captor private ArgumentCaptor<ClientStreamListener> listenerCaptor;
private DelayedStream stream = new DelayedStream(); private DelayedStream stream = new SimpleDelayedStream();
@Test @Test
public void setStream_setAuthority() { public void setStream_setAuthority() {
@ -378,4 +378,10 @@ public class DelayedStreamTest {
assertThat(insight.toString()) assertThat(insight.toString())
.matches("\\[buffered_nanos=[0-9]+, remote_addr=127\\.0\\.0\\.1:443\\]"); .matches("\\[buffered_nanos=[0-9]+, remote_addr=127\\.0\\.0\\.1:443\\]");
} }
private static class SimpleDelayedStream extends DelayedStream {
@Override
void onTransferComplete() {
}
}
} }