diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index 9374280762..f542036330 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -156,24 +156,29 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans } @Override - public synchronized void shutdown() { + public synchronized void shutdown(Status reason) { // Can be called multiple times: once for ManagedClientTransport, once for ServerTransport. if (shutdown) { return; } - shutdownStatus = Status.UNAVAILABLE.withDescription("transport was requested to shut down"); - notifyShutdown(shutdownStatus); + shutdownStatus = reason; + notifyShutdown(reason); if (streams.isEmpty()) { notifyTerminated(); } } + @Override + public synchronized void shutdown() { + shutdown(Status.UNAVAILABLE.withDescription("InProcessTransport shutdown by the server-side")); + } + @Override public void shutdownNow(Status reason) { checkNotNull(reason, "reason"); List streamsCopy; synchronized (this) { - shutdown(); + shutdown(reason); if (terminated) { return; } diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 3ba3b32bc0..4a707b661b 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -51,17 +51,18 @@ final class DelayedClientTransport implements ManagedClientTransport { private Runnable reportTransportInUse; private Runnable reportTransportNotInUse; - private Runnable reportTransportShutdown; private Runnable reportTransportTerminated; + private Listener listener; @GuardedBy("lock") private Collection pendingStreams = new LinkedHashSet(); /** - * When shutdown == true and pendingStreams == null, then the transport is considered terminated. + * When shutdownStatus != null and pendingStreams == null, then the transport is considered + * terminated. */ @GuardedBy("lock") - private boolean shutdown; + private Status shutdownStatus; /** * The last picker that {@link #reprocess} has used. @@ -89,6 +90,7 @@ final class DelayedClientTransport implements ManagedClientTransport { @Override public final Runnable start(final Listener listener) { + this.listener = listener; reportTransportInUse = new Runnable() { @Override public void run() { @@ -101,13 +103,6 @@ final class DelayedClientTransport implements ManagedClientTransport { listener.transportInUse(false); } }; - reportTransportShutdown = new Runnable() { - @Override - public void run() { - listener.transportShutdown( - Status.UNAVAILABLE.withDescription("Channel requested transport to shut down")); - } - }; reportTransportTerminated = new Runnable() { @Override public void run() { @@ -128,44 +123,41 @@ final class DelayedClientTransport implements ManagedClientTransport { public final ClientStream newStream( MethodDescriptor method, Metadata headers, CallOptions callOptions) { try { - SubchannelPicker picker = null; + SubchannelPicker picker; PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions); long pickerVersion = -1; synchronized (lock) { - if (!shutdown) { + if (shutdownStatus == null) { if (lastPicker == null) { return createPendingStream(args); } picker = lastPicker; pickerVersion = lastPickerVersion; + } else { + return new FailingClientStream(shutdownStatus); + } + } + while (true) { + PickResult pickResult = picker.pickSubchannel(args); + ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, + callOptions.isWaitForReady()); + if (transport != null) { + return transport.newStream( + args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions()); + } + // This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible + // race with reprocess()), we will buffer it. Otherwise, will try with the new picker. + synchronized (lock) { + if (shutdownStatus != null) { + return new FailingClientStream(shutdownStatus); + } + if (pickerVersion == lastPickerVersion) { + return createPendingStream(args); + } + picker = lastPicker; + pickerVersion = lastPickerVersion; } } - if (picker != null) { - while (true) { - PickResult pickResult = picker.pickSubchannel(args); - ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, - callOptions.isWaitForReady()); - if (transport != null) { - return transport.newStream( - args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions()); - } - // This picker's conclusion is "buffer". If there hasn't been a newer picker set - // (possible race with reprocess()), we will buffer it. Otherwise, will try with the new - // picker. - synchronized (lock) { - if (shutdown) { - break; - } - if (pickerVersion == lastPickerVersion) { - return createPendingStream(args); - } - picker = lastPicker; - pickerVersion = lastPickerVersion; - } - } - } - return new FailingClientStream(Status.UNAVAILABLE.withDescription( - "Channel has shutdown (reported by delayed transport)")); } finally { channelExecutor.drain(); } @@ -196,13 +188,18 @@ final class DelayedClientTransport implements ManagedClientTransport { * more buffered streams. */ @Override - public final void shutdown() { + public final void shutdown(final Status status) { synchronized (lock) { - if (shutdown) { + if (shutdownStatus != null) { return; } - shutdown = true; - channelExecutor.executeLater(reportTransportShutdown); + shutdownStatus = status; + channelExecutor.executeLater(new Runnable() { + @Override + public void run() { + listener.transportShutdown(status); + } + }); if (pendingStreams == null || pendingStreams.isEmpty()) { pendingStreams = null; channelExecutor.executeLater(reportTransportTerminated); @@ -217,7 +214,7 @@ final class DelayedClientTransport implements ManagedClientTransport { */ @Override public final void shutdownNow(Status status) { - shutdown(); + shutdown(status); Collection savedPendingStreams = null; synchronized (lock) { if (pendingStreams != null) { @@ -307,7 +304,7 @@ final class DelayedClientTransport implements ManagedClientTransport { // (which would shutdown the transports and LoadBalancer) because the gap should be shorter // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second). channelExecutor.executeLater(reportTransportNotInUse); - if (shutdown) { + if (shutdownStatus != null) { pendingStreams = null; channelExecutor.executeLater(reportTransportTerminated); } else { @@ -354,7 +351,7 @@ final class DelayedClientTransport implements ManagedClientTransport { boolean justRemovedAnElement = pendingStreams.remove(this); if (pendingStreams.isEmpty() && justRemovedAnElement) { channelExecutor.executeLater(reportTransportNotInUse); - if (shutdown) { + if (shutdownStatus != null) { pendingStreams = null; channelExecutor.executeLater(reportTransportTerminated); } diff --git a/core/src/main/java/io/grpc/internal/ForwardingConnectionClientTransport.java b/core/src/main/java/io/grpc/internal/ForwardingConnectionClientTransport.java index 83d73f7bd7..0fc3191f8f 100644 --- a/core/src/main/java/io/grpc/internal/ForwardingConnectionClientTransport.java +++ b/core/src/main/java/io/grpc/internal/ForwardingConnectionClientTransport.java @@ -30,8 +30,8 @@ abstract class ForwardingConnectionClientTransport implements ConnectionClientTr } @Override - public void shutdown() { - delegate().shutdown(); + public void shutdown(Status status) { + delegate().shutdown(status); } @Override diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 91bda51009..dd9fc8fab8 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -137,6 +137,9 @@ final class InternalSubchannel implements WithLogId { @GuardedBy("lock") private ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE); + @GuardedBy("lock") + private Status shutdownReason; + InternalSubchannel(EquivalentAddressGroup addressGroup, String authority, String userAgent, BackoffPolicy.Provider backoffPolicyProvider, ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, @@ -302,11 +305,13 @@ final class InternalSubchannel implements WithLogId { channelExecutor.drain(); } if (savedTransport != null) { - savedTransport.shutdown(); + savedTransport.shutdown( + Status.UNAVAILABLE.withDescription( + "InternalSubchannel closed transport due to address change")); } } - public void shutdown() { + public void shutdown(Status reason) { ManagedClientTransport savedActiveTransport; ConnectionClientTransport savedPendingTransport; try { @@ -314,6 +319,7 @@ final class InternalSubchannel implements WithLogId { if (state.getState() == SHUTDOWN) { return; } + shutdownReason = reason; gotoNonErrorState(SHUTDOWN); savedActiveTransport = activeTransport; savedPendingTransport = pendingTransport; @@ -332,10 +338,10 @@ final class InternalSubchannel implements WithLogId { channelExecutor.drain(); } if (savedActiveTransport != null) { - savedActiveTransport.shutdown(); + savedActiveTransport.shutdown(reason); } if (savedPendingTransport != null) { - savedPendingTransport.shutdown(); + savedPendingTransport.shutdown(reason); } } @@ -360,7 +366,7 @@ final class InternalSubchannel implements WithLogId { } void shutdownNow(Status reason) { - shutdown(); + shutdown(reason); Collection transportsCopy; try { synchronized (lock) { @@ -424,12 +430,12 @@ final class InternalSubchannel implements WithLogId { log.log(Level.FINE, "[{0}] {1} for {2} is ready", new Object[] {logId, transport.getLogId(), address}); } - ConnectivityState savedState; + Status savedShutdownReason; try { synchronized (lock) { - savedState = state.getState(); + savedShutdownReason = shutdownReason; reconnectPolicy = null; - if (savedState == SHUTDOWN) { + if (savedShutdownReason != null) { // activeTransport should have already been set to null by shutdown(). We keep it null. Preconditions.checkState(activeTransport == null, "Unexpected non-null activeTransport"); @@ -442,8 +448,8 @@ final class InternalSubchannel implements WithLogId { } finally { channelExecutor.drain(); } - if (savedState == SHUTDOWN) { - transport.shutdown(); + if (savedShutdownReason != null) { + transport.shutdown(savedShutdownReason); } } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index bcd681e8ea..04fed50632 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -84,6 +84,14 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI static final Status SHUTDOWN_NOW_STATUS = Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked"); + @VisibleForTesting + static final Status SHUTDOWN_STATUS = + Status.UNAVAILABLE.withDescription("Channel shutdown invoked"); + + @VisibleForTesting + static final Status SUBCHANNEL_SHUTDOWN_STATUS = + Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked"); + private final String target; private final NameResolver.Factory nameResolverFactory; private final Attributes nameResolverParams; @@ -470,7 +478,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI } }); - delayedTransport.shutdown(); + delayedTransport.shutdown(SHUTDOWN_STATUS); channelExecutor.executeLater(new Runnable() { @Override public void run() { @@ -659,7 +667,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI // shutdown even if "terminating" is already true. The subchannel will not be used in // this case, because delayed transport has terminated when "terminating" becomes // true, and no more requests will be sent to balancer beyond this point. - internalSubchannel.shutdown(); + internalSubchannel.shutdown(SHUTDOWN_STATUS); } if (!terminated) { // If channel has not terminated, it will track the subchannel and block termination @@ -904,7 +912,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI new Runnable() { @Override public void run() { - subchannel.shutdown(); + subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS); } }), SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); return; @@ -912,7 +920,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI } // When terminating == true, no more real streams will be created. It's safe and also // desirable to shutdown timely. - subchannel.shutdown(); + subchannel.shutdown(SHUTDOWN_STATUS); } @Override diff --git a/core/src/main/java/io/grpc/internal/ManagedClientTransport.java b/core/src/main/java/io/grpc/internal/ManagedClientTransport.java index 8eb246e6d8..30f7f7103a 100644 --- a/core/src/main/java/io/grpc/internal/ManagedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/ManagedClientTransport.java @@ -56,7 +56,7 @@ public interface ManagedClientTransport extends ClientTransport, WithLogId { * {@link Listener#transportShutdown} callback called), or be transferred off this transport (in * which case they may succeed). This method may only be called once. */ - void shutdown(); + void shutdown(Status reason); /** * Initiates a forceful shutdown in which preexisting and new calls are closed. Existing calls diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java index 018a7e71fc..a993a4083e 100644 --- a/core/src/main/java/io/grpc/internal/OobChannel.java +++ b/core/src/main/java/io/grpc/internal/OobChannel.java @@ -109,7 +109,7 @@ final class OobChannel extends ManagedChannel implements WithLogId { subchannelImpl = new AbstractSubchannel() { @Override public void shutdown() { - subchannel.shutdown(); + subchannel.shutdown(Status.UNAVAILABLE.withDescription("OobChannel is shutdown")); } @Override @@ -179,7 +179,7 @@ final class OobChannel extends ManagedChannel implements WithLogId { @Override public ManagedChannel shutdown() { shutdown = true; - delayedTransport.shutdown(); + delayedTransport.shutdown(Status.UNAVAILABLE.withDescription("OobChannel.shutdown() called")); return this; } diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 9b53bf9aa2..8ddd194e45 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -78,6 +78,8 @@ public class DelayedClientTransportTest { @Captor private ArgumentCaptor listenerCaptor; private static final CallOptions.Key SHARD_ID = CallOptions.Key.of("shard-id", -1); + private static final Status SHUTDOWN_STATUS = + Status.UNAVAILABLE.withDescription("shutdown called"); private final MethodDescriptor method = MethodDescriptor.newBuilder() @@ -141,8 +143,8 @@ public class DelayedClientTransportTest { assertTrue(stream instanceof DelayedStream); delayedTransport.reprocess(mockPicker); assertEquals(0, delayedTransport.getPendingStreamsCount()); - delayedTransport.shutdown(); - verify(transportListener).transportShutdown(any(Status.class)); + delayedTransport.shutdown(SHUTDOWN_STATUS); + verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); verify(transportListener).transportTerminated(); assertEquals(1, fakeExecutor.runDueTasks()); verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions)); @@ -151,8 +153,8 @@ public class DelayedClientTransportTest { } @Test public void transportTerminatedThenAssignTransport() { - delayedTransport.shutdown(); - verify(transportListener).transportShutdown(any(Status.class)); + delayedTransport.shutdown(SHUTDOWN_STATUS); + verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); verify(transportListener).transportTerminated(); delayedTransport.reprocess(mockPicker); verifyNoMoreInteractions(transportListener); @@ -160,8 +162,8 @@ public class DelayedClientTransportTest { @Test public void assignTransportThenShutdownThenNewStream() { delayedTransport.reprocess(mockPicker); - delayedTransport.shutdown(); - verify(transportListener).transportShutdown(any(Status.class)); + delayedTransport.shutdown(SHUTDOWN_STATUS); + verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); verify(transportListener).transportTerminated(); ClientStream stream = delayedTransport.newStream(method, headers, callOptions); assertEquals(0, delayedTransport.getPendingStreamsCount()); @@ -205,10 +207,10 @@ public class DelayedClientTransportTest { @Test public void newStreamThenShutdownTransportThenAssignTransport() { ClientStream stream = delayedTransport.newStream(method, headers, callOptions); stream.start(streamListener); - delayedTransport.shutdown(); + delayedTransport.shutdown(SHUTDOWN_STATUS); // Stream is still buffered - verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); verify(transportListener, times(0)).transportTerminated(); assertEquals(1, delayedTransport.getPendingStreamsCount()); @@ -236,8 +238,8 @@ public class DelayedClientTransportTest { @Test public void newStreamThenShutdownTransportThenCancelStream() { ClientStream stream = delayedTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); - delayedTransport.shutdown(); - verify(transportListener).transportShutdown(any(Status.class)); + delayedTransport.shutdown(SHUTDOWN_STATUS); + verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); verify(transportListener, times(0)).transportTerminated(); assertEquals(1, delayedTransport.getPendingStreamsCount()); stream.cancel(Status.CANCELLED); @@ -248,8 +250,8 @@ public class DelayedClientTransportTest { } @Test public void shutdownThenNewStream() { - delayedTransport.shutdown(); - verify(transportListener).transportShutdown(any(Status.class)); + delayedTransport.shutdown(SHUTDOWN_STATUS); + verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); verify(transportListener).transportTerminated(); ClientStream stream = delayedTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); stream.start(streamListener); @@ -426,8 +428,8 @@ public class DelayedClientTransportTest { assertEquals(1, delayedTransport.getPendingStreamsCount()); // wfr5 will stop delayed transport from terminating - delayedTransport.shutdown(); - verify(transportListener).transportShutdown(any(Status.class)); + delayedTransport.shutdown(SHUTDOWN_STATUS); + verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); verify(transportListener, never()).transportTerminated(); // ... until it's gone picker = mock(SubchannelPicker.class); diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java index bc71c3b344..719b61cd5f 100644 --- a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java +++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java @@ -65,6 +65,7 @@ public class InternalSubchannelTest { ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE); private static final ConnectivityStateInfo RESOURCE_EXHAUSTED_STATE = ConnectivityStateInfo.forTransientFailure(Status.RESOURCE_EXHAUSTED); + private static final Status SHUTDOWN_REASON = Status.UNAVAILABLE.withDescription("for test"); // For scheduled executor private final FakeClock fakeClock = new FakeClock(); @@ -392,7 +393,7 @@ public class InternalSubchannelTest { internalSubchannel.updateAddresses(new EquivalentAddressGroup(Arrays.asList(addr2, addr3))); assertNoCallbackInvoke(); assertEquals(READY, internalSubchannel.getState()); - verify(transports.peek().transport, never()).shutdown(); + verify(transports.peek().transport, never()).shutdown(any(Status.class)); verify(transports.peek().transport, never()).shutdownNow(any(Status.class)); // And new addresses chosen when re-connecting @@ -433,7 +434,7 @@ public class InternalSubchannelTest { internalSubchannel.updateAddresses(new EquivalentAddressGroup(Arrays.asList(addr2, addr3))); assertNoCallbackInvoke(); assertEquals(CONNECTING, internalSubchannel.getState()); - verify(transports.peek().transport, never()).shutdown(); + verify(transports.peek().transport, never()).shutdown(any(Status.class)); verify(transports.peek().transport, never()).shutdownNow(any(Status.class)); // And new addresses chosen when re-connecting @@ -507,7 +508,7 @@ public class InternalSubchannelTest { internalSubchannel.updateAddresses(new EquivalentAddressGroup(Arrays.asList(addr3, addr4))); assertExactCallbackInvokes("onStateChange:IDLE"); assertEquals(IDLE, internalSubchannel.getState()); - verify(transports.peek().transport).shutdown(); + verify(transports.peek().transport).shutdown(any(Status.class)); // And new addresses chosen when re-connecting transports.poll().listener.transportShutdown(Status.UNAVAILABLE); @@ -551,7 +552,7 @@ public class InternalSubchannelTest { assertEquals(CONNECTING, internalSubchannel.getState()); // And new addresses chosen immediately - verify(transports.poll().transport).shutdown(); + verify(transports.poll().transport).shutdown(any(Status.class)); assertNoCallbackInvoke(); assertEquals(CONNECTING, internalSubchannel.getState()); @@ -622,8 +623,8 @@ public class InternalSubchannelTest { transportInfo.listener.transportReady(); assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY"); - internalSubchannel.shutdown(); - verify(transportInfo.transport).shutdown(); + internalSubchannel.shutdown(SHUTDOWN_REASON); + verify(transportInfo.transport).shutdown(same(SHUTDOWN_REASON)); assertExactCallbackInvokes("onStateChange:SHUTDOWN"); transportInfo.listener.transportTerminated(); @@ -661,7 +662,7 @@ public class InternalSubchannelTest { assertNotNull("There should be at least one reconnectTask", reconnectTask); // Shut down InternalSubchannel before the transport is created. - internalSubchannel.shutdown(); + internalSubchannel.shutdown(SHUTDOWN_REASON); assertTrue(reconnectTask.isCancelled()); // InternalSubchannel terminated promptly. assertExactCallbackInvokes("onStateChange:SHUTDOWN", "onTerminated"); @@ -694,11 +695,11 @@ public class InternalSubchannelTest { // Shutdown the InternalSubchannel before the pending transport is ready assertNull(internalSubchannel.obtainActiveTransport()); - internalSubchannel.shutdown(); + internalSubchannel.shutdown(SHUTDOWN_REASON); assertExactCallbackInvokes("onStateChange:SHUTDOWN"); // The transport should've been shut down even though it's not the active transport yet. - verify(transportInfo.transport).shutdown(); + verify(transportInfo.transport).shutdown(same(SHUTDOWN_REASON)); transportInfo.listener.transportShutdown(Status.UNAVAILABLE); assertNoCallbackInvoke(); transportInfo.listener.transportTerminated(); @@ -735,7 +736,7 @@ public class InternalSubchannelTest { SocketAddress addr = mock(SocketAddress.class); createInternalSubchannel(addr); - internalSubchannel.shutdown(); + internalSubchannel.shutdown(SHUTDOWN_REASON); assertExactCallbackInvokes("onStateChange:SHUTDOWN", "onTerminated"); assertEquals(SHUTDOWN, internalSubchannel.getState()); assertNull(internalSubchannel.obtainActiveTransport()); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 5d3c6133a4..067e5e5895 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -399,7 +399,11 @@ public class ManagedChannelImplTest { } // LoadBalancer should shutdown the subchannel subchannel.shutdown(); - verify(mockTransport).shutdown(); + if (shutdownNow) { + verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_NOW_STATUS)); + } else { + verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS)); + } // Killing the remaining real transport will terminate the channel transportListener.transportShutdown(Status.UNAVAILABLE); @@ -417,10 +421,6 @@ public class ManagedChannelImplTest { verifyNoMoreInteractions(mockTransport); } - @Test - public void shutdownNowWithMultipleOobChannels() { - } - @Test public void interceptor() throws Exception { final AtomicLong atomic = new AtomicLong(); @@ -745,18 +745,18 @@ public class ManagedChannelImplTest { sub1.shutdown(); timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS - 1, TimeUnit.SECONDS); sub1.shutdown(); - verify(transportInfo1.transport, never()).shutdown(); + verify(transportInfo1.transport, never()).shutdown(any(Status.class)); timer.forwardTime(1, TimeUnit.SECONDS); - verify(transportInfo1.transport).shutdown(); + verify(transportInfo1.transport).shutdown(same(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_STATUS)); // ... but not after Channel is terminating verify(mockLoadBalancer, never()).shutdown(); channel.shutdown(); verify(mockLoadBalancer).shutdown(); - verify(transportInfo2.transport, never()).shutdown(); + verify(transportInfo2.transport, never()).shutdown(any(Status.class)); sub2.shutdown(); - verify(transportInfo2.transport).shutdown(); + verify(transportInfo2.transport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS)); } @Test diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 2295b17eb1..93b25dbb84 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -540,8 +540,7 @@ class NettyClientHandler extends AbstractNettyHandler { private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, ChannelPromise promise) throws Exception { - lifecycleManager.notifyShutdown( - Status.UNAVAILABLE.withDescription("Channel requested transport to shut down")); + lifecycleManager.notifyShutdown(msg.getStatus()); close(ctx, promise); connection().forEachActiveStream(new Http2StreamVisitor() { @Override diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 29e30b0650..94c31663f5 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -248,16 +248,14 @@ class NettyClientTransport implements ConnectionClientTransport { } @Override - public void shutdown() { + public void shutdown(Status reason) { // start() could have failed if (channel == null) { return; } // Notifying of termination is automatically done when the channel closes. if (channel.isOpen()) { - Status status - = Status.UNAVAILABLE.withDescription("Channel requested transport to shut down"); - handler.getWriteQueue().enqueue(new GracefulCloseCommand(status), true); + handler.getWriteQueue().enqueue(new GracefulCloseCommand(reason), true); } } diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index e805cdd803..5c73624479 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -118,7 +118,7 @@ public class NettyClientTransportTest { public void teardown() throws Exception { Context.ROOT.attach(); for (NettyClientTransport transport : transports) { - transport.shutdown(); + transport.shutdown(Status.UNAVAILABLE); } if (server != null) { diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index a9cc79abce..5ffc3bc6d5 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -587,13 +587,13 @@ class OkHttpClientTransport implements ConnectionClientTransport { } @Override - public void shutdown() { + public void shutdown(Status reason) { synchronized (lock) { if (goAwayStatus != null) { return; } - goAwayStatus = Status.UNAVAILABLE.withDescription("Transport stopped"); + goAwayStatus = reason; listener.transportShutdown(goAwayStatus); stopIfNecessary(); } @@ -601,7 +601,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { @Override public void shutdownNow(Status reason) { - shutdown(); + shutdown(reason); synchronized (lock) { Iterator> it = streams.entrySet().iterator(); while (it.hasNext()) { diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index 7b406a876c..df37ec3549 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -34,6 +35,7 @@ import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; +import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -112,6 +114,7 @@ public class OkHttpClientTransportTest { private static final String ERROR_MESSAGE = "simulated error"; // The gRPC header length, which includes 1 byte compression flag and 4 bytes message length. private static final int HEADER_LENGTH = 5; + private static final Status SHUTDOWN_REASON = Status.UNAVAILABLE.withDescription("for test"); @Rule public Timeout globalTimeout = new Timeout(10 * 1000); @@ -700,10 +703,10 @@ public class OkHttpClientTransportTest { clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); stream2.start(listener2); assertEquals(2, activeStreamCount()); - clientTransport.shutdown(); + clientTransport.shutdown(SHUTDOWN_REASON); assertEquals(2, activeStreamCount()); - verify(transportListener).transportShutdown(isA(Status.class)); + verify(transportListener).transportShutdown(same(SHUTDOWN_REASON)); stream1.cancel(Status.CANCELLED); stream2.cancel(Status.CANCELLED); @@ -912,7 +915,7 @@ public class OkHttpClientTransportTest { stream.start(listener); waitForStreamPending(1); - clientTransport.shutdown(); + clientTransport.shutdown(SHUTDOWN_REASON); setMaxConcurrentStreams(1); verify(frameWriter, timeout(TIME_OUT_MS)) .synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader()); @@ -1261,20 +1264,18 @@ public class OkHttpClientTransportTest { clientTransport.ping(callback, MoreExecutors.directExecutor()); assertEquals(0, callback.invocationCount); - clientTransport.shutdown(); + clientTransport.shutdown(SHUTDOWN_REASON); // ping failed on channel shutdown assertEquals(1, callback.invocationCount); assertTrue(callback.failureCause instanceof StatusException); - assertEquals(Status.Code.UNAVAILABLE, - ((StatusException) callback.failureCause).getStatus().getCode()); + assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus()); // now that handler is in terminal state, all future pings fail immediately callback = new PingCallbackImpl(); clientTransport.ping(callback, MoreExecutors.directExecutor()); assertEquals(1, callback.invocationCount); assertTrue(callback.failureCause instanceof StatusException); - assertEquals(Status.Code.UNAVAILABLE, - ((StatusException) callback.failureCause).getStatus().getCode()); + assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus()); shutdownAndVerify(); } @@ -1354,7 +1355,7 @@ public class OkHttpClientTransportTest { OkHttpClientStream stream = clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); stream.start(listener); - clientTransport.shutdown(); + clientTransport.shutdown(SHUTDOWN_REASON); allowTransportConnected(); // The new stream should be failed, but not the pending stream. @@ -1846,7 +1847,7 @@ public class OkHttpClientTransportTest { } private void shutdownAndVerify() { - clientTransport.shutdown(); + clientTransport.shutdown(SHUTDOWN_REASON); assertEquals(0, activeStreamCount()); try { verify(frameWriter, timeout(TIME_OUT_MS)).close(); diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index 114ab359b8..27cb94ffd9 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -302,10 +302,10 @@ public abstract class AbstractTransportTest { client = newClientTransport(server); InOrder inOrder = inOrder(mockClientTransportListener); runIfNotNull(client.start(mockClientTransportListener)); - client.shutdown(); + Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called"); + client.shutdown(shutdownReason); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); - inOrder.verify(mockClientTransportListener).transportShutdown(statusCaptor.capture()); - assertCodeEquals(Status.UNAVAILABLE, statusCaptor.getValue()); + inOrder.verify(mockClientTransportListener).transportShutdown(same(shutdownReason)); inOrder.verify(mockClientTransportListener).transportTerminated(); verify(mockClientTransportListener, never()).transportInUse(anyBoolean()); } @@ -319,7 +319,7 @@ public abstract class AbstractTransportTest { verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); - client.shutdown(); + client.shutdown(Status.UNAVAILABLE); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); inOrder.verify(mockClientTransportListener).transportShutdown(any(Status.class)); inOrder.verify(mockClientTransportListener).transportTerminated(); @@ -356,7 +356,7 @@ public abstract class AbstractTransportTest { ServerStream serverStream = serverStreamCreation.stream; ServerStreamListener mockServerStreamListener = serverStreamCreation.listener; - client.shutdown(); + client.shutdown(Status.UNAVAILABLE); client = null; server.shutdown(); serverTransport.shutdown(); @@ -494,7 +494,7 @@ public abstract class AbstractTransportTest { // Stream prevents termination ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions); stream.start(mockClientStreamListener); - client.shutdown(); + client.shutdown(Status.UNAVAILABLE); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class)); ClientTransport.PingCallback mockPingCallback = mock(ClientTransport.PingCallback.class); try { @@ -513,7 +513,8 @@ public abstract class AbstractTransportTest { client = newClientTransport(server); runIfNotNull(client.start(mockClientTransportListener)); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); - client.shutdown(); + Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called"); + client.shutdown(shutdownReason); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); ClientTransport.PingCallback mockPingCallback = mock(ClientTransport.PingCallback.class); try { @@ -524,7 +525,7 @@ public abstract class AbstractTransportTest { } verify(mockPingCallback, timeout(TIMEOUT_MS)).onFailure(throwableCaptor.capture()); Status status = Status.fromThrowable(throwableCaptor.getValue()); - assertCodeEquals(Status.UNAVAILABLE, status); + assertSame(shutdownReason, status); } @Test @@ -540,7 +541,7 @@ public abstract class AbstractTransportTest { any(CallOptions.class), any(Metadata.class)); } stream.start(mockClientStreamListener); - client.shutdown(); + client.shutdown(Status.UNAVAILABLE); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class)); ClientStream stream2 = client.newStream(methodDescriptor, new Metadata(), callOptions); @@ -594,19 +595,19 @@ public abstract class AbstractTransportTest { client = newClientTransport(server); runIfNotNull(client.start(mockClientTransportListener)); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); - client.shutdown(); + Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called"); + client.shutdown(shutdownReason); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); Thread.sleep(100); ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions); stream.start(mockClientStreamListener); verify(mockClientStreamListener, timeout(TIMEOUT_MS)) - .closed(statusCaptor.capture(), any(Metadata.class)); + .closed(same(shutdownReason), any(Metadata.class)); verify(mockClientTransportListener, never()).transportInUse(anyBoolean()); - assertCodeEquals(Status.UNAVAILABLE, statusCaptor.getValue()); if (metricsExpected()) { verify(clientStreamTracerFactory).newClientStreamTracer( any(CallOptions.class), any(Metadata.class)); - assertSame(statusCaptor.getValue(), clientStreamTracer1.getStatus()); + assertSame(shutdownReason, clientStreamTracer1.getStatus()); // Assert no interactions assertNull(serverStreamTracer1.getServerCall()); } @@ -1477,7 +1478,7 @@ public abstract class AbstractTransportTest { verify(mockClientStreamListener, timeout(TIMEOUT_MS)) .closed(any(Status.class), any(Metadata.class)); verify(mockServerStreamListener, timeout(TIMEOUT_MS)).closed(any(Status.class)); - client.shutdown(); + client.shutdown(Status.UNAVAILABLE); } /**