diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 9af74d046b..2a46c1e31d 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -260,7 +260,7 @@ final class ManagedChannelImpl extends ManagedChannel implements private final ChannelTracer channelTracer; private final ChannelLogger channelLogger; private final InternalChannelz channelz; - + private final RealChannel realChannel; // Must be mutated and read from syncContext // a flag for doing channel tracing when flipped private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION; @@ -655,8 +655,8 @@ final class ManagedChannelImpl extends ManagedChannel implements this.defaultServiceConfig = null; } this.lookUpServiceConfig = builder.lookUpServiceConfig; - Channel channel = new RealChannel(nameResolver.getServiceAuthority()); - channel = ClientInterceptors.intercept(channel, serviceConfigInterceptor); + realChannel = new RealChannel(nameResolver.getServiceAuthority()); + Channel channel = ClientInterceptors.intercept(realChannel, serviceConfigInterceptor); if (builder.binlog != null) { channel = builder.binlog.wrapChannel(channel); } @@ -773,11 +773,6 @@ final class ManagedChannelImpl extends ManagedChannel implements if (!shutdown.compareAndSet(false, true)) { return this; } - - // Put gotoState(SHUTDOWN) as early into the syncContext's queue as possible. - // delayedTransport.shutdown() may also add some tasks into the queue. But some things inside - // delayedTransport.shutdown() like setting delayedTransport.shutdown = true are not run in the - // syncContext's queue and should not be blocked, so we do not drain() immediately here. final class Shutdown implements Runnable { @Override public void run() { @@ -786,9 +781,8 @@ final class ManagedChannelImpl extends ManagedChannel implements } } - syncContext.executeLater(new Shutdown()); - - uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS); + syncContext.execute(new Shutdown()); + realChannel.shutdown(); final class CancelIdleTimer implements Runnable { @Override public void run() { @@ -809,7 +803,7 @@ final class ManagedChannelImpl extends ManagedChannel implements public ManagedChannelImpl shutdownNow() { channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called"); shutdown(); - uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS); + realChannel.shutdownNow(); final class ShutdownNow implements Runnable { @Override public void run() { @@ -918,9 +912,6 @@ final class ManagedChannelImpl extends ManagedChannel implements @Override public ClientCall newCall( MethodDescriptor method, CallOptions callOptions) { - if (true) { // FIXME(zdapeng): there is a bug for using PendingCall. Temporarily disable it. - return newClientCall(method, callOptions); - } if (configSelector.get() != INITIAL_PENDING_SELECTOR) { return newClientCall(method, callOptions); } @@ -936,6 +927,23 @@ final class ManagedChannelImpl extends ManagedChannel implements // tests might observe slight behavior difference from earlier grpc versions. return newClientCall(method, callOptions); } + if (shutdown.get()) { + // Return a failing ClientCall. + return new ClientCall() { + @Override + public void start(Listener responseListener, Metadata headers) { + responseListener.onClose(SHUTDOWN_STATUS, new Metadata()); + } + + @Override public void request(int numMessages) {} + + @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {} + + @Override public void halfClose() {} + + @Override public void sendMessage(ReqT message) {} + }; + } Context context = Context.current(); final PendingCall pendingCall = new PendingCall<>(context, method, callOptions); syncContext.execute(new Runnable() { @@ -955,6 +963,51 @@ final class ManagedChannelImpl extends ManagedChannel implements return pendingCall; } + // Must run in SynchronizationContext. + private void drainPendingCalls() { + if (pendingCalls == null) { + return; + } + for (RealChannel.PendingCall pendingCall : pendingCalls) { + pendingCall.reprocess(); + } + } + + void shutdown() { + final class RealChannelShutdown implements Runnable { + @Override + public void run() { + if (pendingCalls == null) { + if (configSelector.get() == INITIAL_PENDING_SELECTOR) { + configSelector.set(null); + } + uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS); + } + } + } + + syncContext.execute(new RealChannelShutdown()); + } + + void shutdownNow() { + final class RealChannelShutdownNow implements Runnable { + @Override + public void run() { + if (configSelector.get() == INITIAL_PENDING_SELECTOR) { + configSelector.set(null); + } + if (pendingCalls != null) { + for (RealChannel.PendingCall pendingCall : pendingCalls) { + pendingCall.cancel("Channel is forcefully shutdown", null); + } + } + uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS); + } + } + + syncContext.execute(new RealChannelShutdownNow()); + } + @Override public String authority() { return authority; @@ -1007,6 +1060,9 @@ final class ManagedChannelImpl extends ManagedChannel implements if (pendingCalls.isEmpty()) { inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false); pendingCalls = null; + if (shutdown.get()) { + uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS); + } } } } @@ -1583,7 +1639,7 @@ final class ManagedChannelImpl extends ManagedChannel implements re); } } - drainPendingCalls(); + realChannel.drainPendingCalls(); Attributes effectiveAttrs = resolutionResult.getAttributes(); // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. @@ -1633,7 +1689,7 @@ final class ManagedChannelImpl extends ManagedChannel implements new Object[] {getLogId(), error}); if (configSelector.get() == INITIAL_PENDING_SELECTOR) { configSelector.set(null); - drainPendingCalls(); + realChannel.drainPendingCalls(); } if (lastResolutionState != ResolutionState.ERROR) { channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error); @@ -1649,16 +1705,6 @@ final class ManagedChannelImpl extends ManagedChannel implements scheduleExponentialBackOffInSyncContext(); } - // Must run in SynchronizationContext. - private void drainPendingCalls() { - if (pendingCalls == null) { - return; - } - for (RealChannel.PendingCall pendingCall : pendingCalls) { - pendingCall.reprocess(); - } - } - private void scheduleExponentialBackOffInSyncContext() { if (scheduledNameResolverRefresh != null && scheduledNameResolverRefresh.isPending()) { // The name resolver may invoke onError multiple times, but we only want to diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index ba17deba61..fdebd545ff 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -551,6 +551,41 @@ public class ManagedChannelImplTest { verify(executorPool).returnObject(executor.getScheduledExecutorService()); } + @Test + public void shutdownNow_pendingCallShouldFail() { + channelBuilder.nameResolverFactory( + new FakeNameResolverFactory.Builder(expectedUri) + .setResolvedAtStart(false) + .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) + .build()); + createChannel(); + ClientCall call = channel.newCall(method, CallOptions.DEFAULT); + call.start(mockCallListener, new Metadata()); + channel.shutdown(); + executor.runDueTasks(); + verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class)); + channel.shutdownNow(); + executor.runDueTasks(); + verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class)); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); + } + + @Test + public void shutdownWithNoNameResolution_newCallShouldFail() { + channelBuilder.nameResolverFactory( + new FakeNameResolverFactory.Builder(expectedUri) + .setResolvedAtStart(false) + .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) + .build()); + createChannel(); + channel.shutdown(); + ClientCall call = channel.newCall(method, CallOptions.DEFAULT); + call.start(mockCallListener, new Metadata()); + executor.runDueTasks(); + verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class)); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + } + @Test public void channelzMembership() throws Exception { createChannel();