From c6fe4deb33686a7f0e65be6f3f509768ce5d04a6 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Mon, 12 Mar 2018 12:21:39 -0700 Subject: [PATCH] core: set delayedTransport picker to null in idle mode (#4207) --- .../grpc/internal/DelayedClientTransport.java | 9 +-- .../io/grpc/internal/ManagedChannelImpl.java | 1 + .../grpc/internal/ManagedChannelImplTest.java | 62 +++++++++++++++++++ 3 files changed, 68 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 076e336880..01b3fb73f9 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -71,7 +71,8 @@ final class DelayedClientTransport implements ManagedClientTransport { private Status shutdownStatus; /** - * The last picker that {@link #reprocess} has used. + * The last picker that {@link #reprocess} has used. May be set to null when the channel has moved + * to idle. */ @GuardedBy("lock") @Nullable @@ -271,17 +272,17 @@ final class DelayedClientTransport implements ManagedClientTransport { * *

This method must not be called concurrently with itself. */ - final void reprocess(SubchannelPicker picker) { + final void reprocess(@Nullable SubchannelPicker picker) { ArrayList toProcess; - ArrayList toRemove = new ArrayList(); synchronized (lock) { lastPicker = picker; lastPickerVersion++; - if (!hasPendingStreams()) { + if (picker == null || !hasPendingStreams()) { return; } toProcess = new ArrayList(pendingStreams); } + ArrayList toRemove = new ArrayList(); for (final PendingStream stream : toProcess) { PickResult pickResult = picker.pickSubchannel(stream.args); diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 10fe727910..d8ce7e0880 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -393,6 +393,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented call = channel.newCall(method, CallOptions.DEFAULT); + call.start(mockCallListener, new Metadata()); + + // Move channel into TRANSIENT_FAILURE, which will fail the pending call + when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withError(pickError)); + helper.updateBalancingState(TRANSIENT_FAILURE, mockPicker); + assertEquals(TRANSIENT_FAILURE, channel.getState(false)); + executor.runDueTasks(); + verify(mockCallListener).onClose(same(pickError), any(Metadata.class)); + + // Move channel to idle + timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); + assertEquals(IDLE, channel.getState(false)); + + // This call should be buffered, but will move the channel out of idle + ClientCall call2 = channel.newCall(method, CallOptions.DEFAULT); + call2.start(mockCallListener2, new Metadata()); + executor.runDueTasks(); + verifyNoMoreInteractions(mockCallListener2); + + // Get the helper created on exiting idle + ArgumentCaptor helperCaptor = ArgumentCaptor.forClass(Helper.class); + verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture()); + Helper helper2 = helperCaptor.getValue(); + + // Establish a connection + Subchannel subchannel = helper2.createSubchannel(addressGroup, Attributes.EMPTY); + subchannel.requestConnection(); + MockClientTransportInfo transportInfo = transports.poll(); + ConnectionClientTransport mockTransport = transportInfo.transport; + ManagedClientTransport.Listener transportListener = transportInfo.listener; + when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class))) + .thenReturn(mockStream); + transportListener.transportReady(); + + when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withSubchannel(subchannel)); + helper2.updateBalancingState(READY, mockPicker); + assertEquals(READY, channel.getState(false)); + executor.runDueTasks(); + + // Verify the buffered call was drained + verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class)); + verify(mockStream).start(any(ClientStreamListener.class)); + } + @Test public void enterIdleEntersIdle() { createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);