mirror of https://github.com/grpc/grpc-java.git
core: set delayedTransport picker to null in idle mode (#4207)
This commit is contained in:
parent
402c1740fa
commit
c6fe4deb33
|
|
@ -71,7 +71,8 @@ final class DelayedClientTransport implements ManagedClientTransport {
|
||||||
private Status shutdownStatus;
|
private Status shutdownStatus;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The last picker that {@link #reprocess} has used.
|
* The last picker that {@link #reprocess} has used. May be set to null when the channel has moved
|
||||||
|
* to idle.
|
||||||
*/
|
*/
|
||||||
@GuardedBy("lock")
|
@GuardedBy("lock")
|
||||||
@Nullable
|
@Nullable
|
||||||
|
|
@ -271,17 +272,17 @@ final class DelayedClientTransport implements ManagedClientTransport {
|
||||||
*
|
*
|
||||||
* <p>This method <strong>must not</strong> be called concurrently with itself.
|
* <p>This method <strong>must not</strong> be called concurrently with itself.
|
||||||
*/
|
*/
|
||||||
final void reprocess(SubchannelPicker picker) {
|
final void reprocess(@Nullable SubchannelPicker picker) {
|
||||||
ArrayList<PendingStream> toProcess;
|
ArrayList<PendingStream> toProcess;
|
||||||
ArrayList<PendingStream> toRemove = new ArrayList<PendingStream>();
|
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
lastPicker = picker;
|
lastPicker = picker;
|
||||||
lastPickerVersion++;
|
lastPickerVersion++;
|
||||||
if (!hasPendingStreams()) {
|
if (picker == null || !hasPendingStreams()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
toProcess = new ArrayList<PendingStream>(pendingStreams);
|
toProcess = new ArrayList<PendingStream>(pendingStreams);
|
||||||
}
|
}
|
||||||
|
ArrayList<PendingStream> toRemove = new ArrayList<PendingStream>();
|
||||||
|
|
||||||
for (final PendingStream stream : toProcess) {
|
for (final PendingStream stream : toProcess) {
|
||||||
PickResult pickResult = picker.pickSubchannel(stream.args);
|
PickResult pickResult = picker.pickSubchannel(stream.args);
|
||||||
|
|
|
||||||
|
|
@ -393,6 +393,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
||||||
// did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
|
// did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
|
||||||
// which are bugs.
|
// which are bugs.
|
||||||
shutdownNameResolverAndLoadBalancer(true);
|
shutdownNameResolverAndLoadBalancer(true);
|
||||||
|
delayedTransport.reprocess(null);
|
||||||
nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
|
nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
|
||||||
channelStateManager.gotoState(IDLE);
|
channelStateManager.gotoState(IDLE);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1850,6 +1850,68 @@ public class ManagedChannelImplTest {
|
||||||
assertEquals(CONNECTING, channel.getState(false));
|
assertEquals(CONNECTING, channel.getState(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void idleMode_resetsDelayedTransportPicker() {
|
||||||
|
ClientStream mockStream = mock(ClientStream.class);
|
||||||
|
Status pickError = Status.UNAVAILABLE.withDescription("pick result error");
|
||||||
|
long idleTimeoutMillis = 1000L;
|
||||||
|
createChannel(
|
||||||
|
new FakeNameResolverFactory.Builder(expectedUri)
|
||||||
|
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
|
||||||
|
.build(),
|
||||||
|
NO_INTERCEPTOR,
|
||||||
|
true,
|
||||||
|
idleTimeoutMillis);
|
||||||
|
assertEquals(IDLE, channel.getState(false));
|
||||||
|
|
||||||
|
// This call will be buffered in delayedTransport
|
||||||
|
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
|
||||||
|
call.start(mockCallListener, new Metadata());
|
||||||
|
|
||||||
|
// Move channel into TRANSIENT_FAILURE, which will fail the pending call
|
||||||
|
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
|
||||||
|
.thenReturn(PickResult.withError(pickError));
|
||||||
|
helper.updateBalancingState(TRANSIENT_FAILURE, mockPicker);
|
||||||
|
assertEquals(TRANSIENT_FAILURE, channel.getState(false));
|
||||||
|
executor.runDueTasks();
|
||||||
|
verify(mockCallListener).onClose(same(pickError), any(Metadata.class));
|
||||||
|
|
||||||
|
// Move channel to idle
|
||||||
|
timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
|
||||||
|
assertEquals(IDLE, channel.getState(false));
|
||||||
|
|
||||||
|
// This call should be buffered, but will move the channel out of idle
|
||||||
|
ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
|
||||||
|
call2.start(mockCallListener2, new Metadata());
|
||||||
|
executor.runDueTasks();
|
||||||
|
verifyNoMoreInteractions(mockCallListener2);
|
||||||
|
|
||||||
|
// Get the helper created on exiting idle
|
||||||
|
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
|
||||||
|
verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture());
|
||||||
|
Helper helper2 = helperCaptor.getValue();
|
||||||
|
|
||||||
|
// Establish a connection
|
||||||
|
Subchannel subchannel = helper2.createSubchannel(addressGroup, Attributes.EMPTY);
|
||||||
|
subchannel.requestConnection();
|
||||||
|
MockClientTransportInfo transportInfo = transports.poll();
|
||||||
|
ConnectionClientTransport mockTransport = transportInfo.transport;
|
||||||
|
ManagedClientTransport.Listener transportListener = transportInfo.listener;
|
||||||
|
when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
|
||||||
|
.thenReturn(mockStream);
|
||||||
|
transportListener.transportReady();
|
||||||
|
|
||||||
|
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
|
||||||
|
.thenReturn(PickResult.withSubchannel(subchannel));
|
||||||
|
helper2.updateBalancingState(READY, mockPicker);
|
||||||
|
assertEquals(READY, channel.getState(false));
|
||||||
|
executor.runDueTasks();
|
||||||
|
|
||||||
|
// Verify the buffered call was drained
|
||||||
|
verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class));
|
||||||
|
verify(mockStream).start(any(ClientStreamListener.class));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void enterIdleEntersIdle() {
|
public void enterIdleEntersIdle() {
|
||||||
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
|
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue