core: fix pending call not drained when shutdown

There was bug that new pending calls were not drained after channel is shutdown. The bug was worked around by #7354 .

Fixing by making sure new calls fail immediately if the channel is already shutdown.
This commit is contained in:
ZHANG Dapeng 2020-09-09 17:47:10 -07:00 committed by GitHub
parent 7c7c4a7daa
commit 1411e6f61e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 108 additions and 27 deletions

View File

@ -260,7 +260,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
private final ChannelTracer channelTracer; private final ChannelTracer channelTracer;
private final ChannelLogger channelLogger; private final ChannelLogger channelLogger;
private final InternalChannelz channelz; private final InternalChannelz channelz;
private final RealChannel realChannel;
// Must be mutated and read from syncContext // Must be mutated and read from syncContext
// a flag for doing channel tracing when flipped // a flag for doing channel tracing when flipped
private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION; private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
@ -655,8 +655,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
this.defaultServiceConfig = null; this.defaultServiceConfig = null;
} }
this.lookUpServiceConfig = builder.lookUpServiceConfig; this.lookUpServiceConfig = builder.lookUpServiceConfig;
Channel channel = new RealChannel(nameResolver.getServiceAuthority()); realChannel = new RealChannel(nameResolver.getServiceAuthority());
channel = ClientInterceptors.intercept(channel, serviceConfigInterceptor); Channel channel = ClientInterceptors.intercept(realChannel, serviceConfigInterceptor);
if (builder.binlog != null) { if (builder.binlog != null) {
channel = builder.binlog.wrapChannel(channel); channel = builder.binlog.wrapChannel(channel);
} }
@ -773,11 +773,6 @@ final class ManagedChannelImpl extends ManagedChannel implements
if (!shutdown.compareAndSet(false, true)) { if (!shutdown.compareAndSet(false, true)) {
return this; return this;
} }
// Put gotoState(SHUTDOWN) as early into the syncContext's queue as possible.
// delayedTransport.shutdown() may also add some tasks into the queue. But some things inside
// delayedTransport.shutdown() like setting delayedTransport.shutdown = true are not run in the
// syncContext's queue and should not be blocked, so we do not drain() immediately here.
final class Shutdown implements Runnable { final class Shutdown implements Runnable {
@Override @Override
public void run() { public void run() {
@ -786,9 +781,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
} }
} }
syncContext.executeLater(new Shutdown()); syncContext.execute(new Shutdown());
realChannel.shutdown();
uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
final class CancelIdleTimer implements Runnable { final class CancelIdleTimer implements Runnable {
@Override @Override
public void run() { public void run() {
@ -809,7 +803,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
public ManagedChannelImpl shutdownNow() { public ManagedChannelImpl shutdownNow() {
channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called"); channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
shutdown(); shutdown();
uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS); realChannel.shutdownNow();
final class ShutdownNow implements Runnable { final class ShutdownNow implements Runnable {
@Override @Override
public void run() { public void run() {
@ -918,9 +912,6 @@ final class ManagedChannelImpl extends ManagedChannel implements
@Override @Override
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall( public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) { MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
if (true) { // FIXME(zdapeng): there is a bug for using PendingCall. Temporarily disable it.
return newClientCall(method, callOptions);
}
if (configSelector.get() != INITIAL_PENDING_SELECTOR) { if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
return newClientCall(method, callOptions); return newClientCall(method, callOptions);
} }
@ -936,6 +927,23 @@ final class ManagedChannelImpl extends ManagedChannel implements
// tests might observe slight behavior difference from earlier grpc versions. // tests might observe slight behavior difference from earlier grpc versions.
return newClientCall(method, callOptions); return newClientCall(method, callOptions);
} }
if (shutdown.get()) {
// Return a failing ClientCall.
return new ClientCall<ReqT, RespT>() {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
}
@Override public void request(int numMessages) {}
@Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
@Override public void halfClose() {}
@Override public void sendMessage(ReqT message) {}
};
}
Context context = Context.current(); Context context = Context.current();
final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions); final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
syncContext.execute(new Runnable() { syncContext.execute(new Runnable() {
@ -955,6 +963,51 @@ final class ManagedChannelImpl extends ManagedChannel implements
return pendingCall; return pendingCall;
} }
// Must run in SynchronizationContext.
private void drainPendingCalls() {
if (pendingCalls == null) {
return;
}
for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
pendingCall.reprocess();
}
}
void shutdown() {
final class RealChannelShutdown implements Runnable {
@Override
public void run() {
if (pendingCalls == null) {
if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
configSelector.set(null);
}
uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
}
}
}
syncContext.execute(new RealChannelShutdown());
}
void shutdownNow() {
final class RealChannelShutdownNow implements Runnable {
@Override
public void run() {
if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
configSelector.set(null);
}
if (pendingCalls != null) {
for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
pendingCall.cancel("Channel is forcefully shutdown", null);
}
}
uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
}
}
syncContext.execute(new RealChannelShutdownNow());
}
@Override @Override
public String authority() { public String authority() {
return authority; return authority;
@ -1007,6 +1060,9 @@ final class ManagedChannelImpl extends ManagedChannel implements
if (pendingCalls.isEmpty()) { if (pendingCalls.isEmpty()) {
inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false); inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
pendingCalls = null; pendingCalls = null;
if (shutdown.get()) {
uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
}
} }
} }
} }
@ -1583,7 +1639,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
re); re);
} }
} }
drainPendingCalls(); realChannel.drainPendingCalls();
Attributes effectiveAttrs = resolutionResult.getAttributes(); Attributes effectiveAttrs = resolutionResult.getAttributes();
// Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
@ -1633,7 +1689,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
new Object[] {getLogId(), error}); new Object[] {getLogId(), error});
if (configSelector.get() == INITIAL_PENDING_SELECTOR) { if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
configSelector.set(null); configSelector.set(null);
drainPendingCalls(); realChannel.drainPendingCalls();
} }
if (lastResolutionState != ResolutionState.ERROR) { if (lastResolutionState != ResolutionState.ERROR) {
channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error); channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
@ -1649,16 +1705,6 @@ final class ManagedChannelImpl extends ManagedChannel implements
scheduleExponentialBackOffInSyncContext(); scheduleExponentialBackOffInSyncContext();
} }
// Must run in SynchronizationContext.
private void drainPendingCalls() {
if (pendingCalls == null) {
return;
}
for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
pendingCall.reprocess();
}
}
private void scheduleExponentialBackOffInSyncContext() { private void scheduleExponentialBackOffInSyncContext() {
if (scheduledNameResolverRefresh != null && scheduledNameResolverRefresh.isPending()) { if (scheduledNameResolverRefresh != null && scheduledNameResolverRefresh.isPending()) {
// The name resolver may invoke onError multiple times, but we only want to // The name resolver may invoke onError multiple times, but we only want to

View File

@ -551,6 +551,41 @@ public class ManagedChannelImplTest {
verify(executorPool).returnObject(executor.getScheduledExecutorService()); verify(executorPool).returnObject(executor.getScheduledExecutorService());
} }
@Test
public void shutdownNow_pendingCallShouldFail() {
channelBuilder.nameResolverFactory(
new FakeNameResolverFactory.Builder(expectedUri)
.setResolvedAtStart(false)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build());
createChannel();
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
channel.shutdown();
executor.runDueTasks();
verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class));
channel.shutdownNow();
executor.runDueTasks();
verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
}
@Test
public void shutdownWithNoNameResolution_newCallShouldFail() {
channelBuilder.nameResolverFactory(
new FakeNameResolverFactory.Builder(expectedUri)
.setResolvedAtStart(false)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build());
createChannel();
channel.shutdown();
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
executor.runDueTasks();
verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
}
@Test @Test
public void channelzMembership() throws Exception { public void channelzMembership() throws Exception {
createChannel(); createChannel();