core: Exit idle mode when delayed transport is in use

8844cf7b8 triggered a regression where a new RPC wouldn't cause the
channel to exit idle mode, if an RPC was still progressing on an old
transport. This was already possible previously, but was racy.
8844cf7b8 made it less racy and more obvious.

The two added `exitIdleMode()` calls in this commit are companions to
those in `enterIdleMode()`, which detect whether the channel should
immediately exit idle mode.

Noticed in cl/635819804.
This commit is contained in:
Eric Anderson 2024-05-23 14:45:38 -07:00 committed by GitHub
parent 0b5f38d942
commit fea577c804
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 95 additions and 7 deletions

View File

@ -889,12 +889,6 @@ final class ManagedChannelImpl extends ManagedChannel implements
if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
return newClientCall(method, callOptions);
}
syncContext.execute(new Runnable() {
@Override
public void run() {
exitIdleMode();
}
});
if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
// This is an optimization for the case (typically with InProcessTransport) when name
// resolution result is immediately available at this point. Otherwise, some users'
@ -927,6 +921,10 @@ final class ManagedChannelImpl extends ManagedChannel implements
if (pendingCalls == null) {
pendingCalls = new LinkedHashSet<>();
inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
// It's possible to be in idle mode while inUseStateAggregator is in-use, if one of
// the subchannels is in use. But we should never be in idle mode when pendingCalls is
// in use.
exitIdleMode();
}
pendingCalls.add(pendingCall);
} else {
@ -2081,6 +2079,12 @@ final class ManagedChannelImpl extends ManagedChannel implements
@Override
public void transportInUse(final boolean inUse) {
inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
if (inUse) {
// It's possible to be in idle mode while inUseStateAggregator is in-use, if one of the
// subchannels is in use. But we should never be in idle mode when delayed transport is in
// use.
exitIdleMode();
}
}
@Override

View File

@ -188,7 +188,8 @@ public final class FakeClock {
}
@Override public boolean isShutdown() {
throw new UnsupportedOperationException();
// If shutdown is not implemented, then it is never shutdown.
return false;
}
@Override public boolean isTerminated() {

View File

@ -0,0 +1,80 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.testing.integration;
import static com.google.common.truth.Truth.assertThat;
import io.grpc.ManagedChannel;
import io.grpc.ServerInterceptors;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.FakeClock;
import io.grpc.internal.testing.StreamRecorder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.testing.integration.EmptyProtos.Empty;
import io.grpc.testing.integration.Messages.ResponseParameters;
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
import java.util.concurrent.TimeUnit;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for ManagedChannelImpl that use a real transport. */
@RunWith(JUnit4.class)
public final class ManagedChannelImplIntegrationTest {
private static final String SERVER_NAME = ManagedChannelImplIntegrationTest.class.getName();
@Rule
public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
@Test
public void idleWhileRpcInTransport_exitsIdleForNewRpc() throws Exception {
FakeClock fakeClock = new FakeClock();
grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME)
.directExecutor()
.addService(
ServerInterceptors.intercept(
new TestServiceImpl(fakeClock.getScheduledExecutorService()),
TestServiceImpl.interceptors()))
.build()
.start());
ManagedChannel channel = grpcCleanup.register(InProcessChannelBuilder.forName(SERVER_NAME)
.directExecutor()
.build());
TestServiceGrpc.TestServiceBlockingStub blockingStub = TestServiceGrpc.newBlockingStub(channel);
TestServiceGrpc.TestServiceStub asyncStub = TestServiceGrpc.newStub(channel);
StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create();
StreamObserver<StreamingOutputCallRequest> requestObserver =
asyncStub.fullDuplexCall(responseObserver);
requestObserver.onNext(StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder()
.setIntervalUs(Integer.MAX_VALUE))
.build());
try {
channel.enterIdle();
assertThat(blockingStub
.withDeadlineAfter(10, TimeUnit.SECONDS)
.emptyCall(Empty.getDefaultInstance()))
.isEqualTo(Empty.getDefaultInstance());
} finally {
requestObserver.onError(new RuntimeException("cleanup"));
}
}
}

View File

@ -169,6 +169,9 @@ public class CsdsServiceTest {
grpcServerRule.getServiceRegistry()
.addService(new CsdsService(new FakeXdsClientPoolFactory(throwingXdsClient)));
// Hack to prevent the interrupted exception from propagating through to the client stub.
grpcServerRule.getChannel().getState(true);
try {
ClientStatusResponse response = csdsStub.fetchClientStatus(REQUEST);
fail("Should've failed, got response: " + response);