From fea577c8047f7d6b489359a837cd1ff902063d14 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Thu, 23 May 2024 14:45:38 -0700 Subject: [PATCH] core: Exit idle mode when delayed transport is in use 8844cf7b8 triggered a regression where a new RPC wouldn't cause the channel to exit idle mode, if an RPC was still progressing on an old transport. This was already possible previously, but was racy. 8844cf7b8 made it less racy and more obvious. The two added `exitIdleMode()` calls in this commit are companions to those in `enterIdleMode()`, which detect whether the channel should immediately exit idle mode. Noticed in cl/635819804. --- .../io/grpc/internal/ManagedChannelImpl.java | 16 ++-- .../java/io/grpc/internal/FakeClock.java | 3 +- .../ManagedChannelImplIntegrationTest.java | 80 +++++++++++++++++++ .../java/io/grpc/xds/CsdsServiceTest.java | 3 + 4 files changed, 95 insertions(+), 7 deletions(-) create mode 100644 interop-testing/src/test/java/io/grpc/testing/integration/ManagedChannelImplIntegrationTest.java diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 09ca4684bd..997968172a 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -889,12 +889,6 @@ final class ManagedChannelImpl extends ManagedChannel implements if (configSelector.get() != INITIAL_PENDING_SELECTOR) { return newClientCall(method, callOptions); } - syncContext.execute(new Runnable() { - @Override - public void run() { - exitIdleMode(); - } - }); if (configSelector.get() != INITIAL_PENDING_SELECTOR) { // This is an optimization for the case (typically with InProcessTransport) when name // resolution result is immediately available at this point. Otherwise, some users' @@ -927,6 +921,10 @@ final class ManagedChannelImpl extends ManagedChannel implements if (pendingCalls == null) { pendingCalls = new LinkedHashSet<>(); inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true); + // It's possible to be in idle mode while inUseStateAggregator is in-use, if one of + // the subchannels is in use. But we should never be in idle mode when pendingCalls is + // in use. + exitIdleMode(); } pendingCalls.add(pendingCall); } else { @@ -2081,6 +2079,12 @@ final class ManagedChannelImpl extends ManagedChannel implements @Override public void transportInUse(final boolean inUse) { inUseStateAggregator.updateObjectInUse(delayedTransport, inUse); + if (inUse) { + // It's possible to be in idle mode while inUseStateAggregator is in-use, if one of the + // subchannels is in use. But we should never be in idle mode when delayed transport is in + // use. + exitIdleMode(); + } } @Override diff --git a/core/src/testFixtures/java/io/grpc/internal/FakeClock.java b/core/src/testFixtures/java/io/grpc/internal/FakeClock.java index 9cc9178f1f..1a3584f4e2 100644 --- a/core/src/testFixtures/java/io/grpc/internal/FakeClock.java +++ b/core/src/testFixtures/java/io/grpc/internal/FakeClock.java @@ -188,7 +188,8 @@ public final class FakeClock { } @Override public boolean isShutdown() { - throw new UnsupportedOperationException(); + // If shutdown is not implemented, then it is never shutdown. + return false; } @Override public boolean isTerminated() { diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/ManagedChannelImplIntegrationTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/ManagedChannelImplIntegrationTest.java new file mode 100644 index 0000000000..f09f196d7d --- /dev/null +++ b/interop-testing/src/test/java/io/grpc/testing/integration/ManagedChannelImplIntegrationTest.java @@ -0,0 +1,80 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.testing.integration; + +import static com.google.common.truth.Truth.assertThat; + +import io.grpc.ManagedChannel; +import io.grpc.ServerInterceptors; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.FakeClock; +import io.grpc.internal.testing.StreamRecorder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.testing.integration.EmptyProtos.Empty; +import io.grpc.testing.integration.Messages.ResponseParameters; +import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; +import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; +import java.util.concurrent.TimeUnit; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for ManagedChannelImpl that use a real transport. */ +@RunWith(JUnit4.class) +public final class ManagedChannelImplIntegrationTest { + private static final String SERVER_NAME = ManagedChannelImplIntegrationTest.class.getName(); + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + @Test + public void idleWhileRpcInTransport_exitsIdleForNewRpc() throws Exception { + FakeClock fakeClock = new FakeClock(); + grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME) + .directExecutor() + .addService( + ServerInterceptors.intercept( + new TestServiceImpl(fakeClock.getScheduledExecutorService()), + TestServiceImpl.interceptors())) + .build() + .start()); + ManagedChannel channel = grpcCleanup.register(InProcessChannelBuilder.forName(SERVER_NAME) + .directExecutor() + .build()); + + TestServiceGrpc.TestServiceBlockingStub blockingStub = TestServiceGrpc.newBlockingStub(channel); + TestServiceGrpc.TestServiceStub asyncStub = TestServiceGrpc.newStub(channel); + StreamRecorder responseObserver = StreamRecorder.create(); + StreamObserver requestObserver = + asyncStub.fullDuplexCall(responseObserver); + requestObserver.onNext(StreamingOutputCallRequest.newBuilder() + .addResponseParameters(ResponseParameters.newBuilder() + .setIntervalUs(Integer.MAX_VALUE)) + .build()); + try { + channel.enterIdle(); + assertThat(blockingStub + .withDeadlineAfter(10, TimeUnit.SECONDS) + .emptyCall(Empty.getDefaultInstance())) + .isEqualTo(Empty.getDefaultInstance()); + } finally { + requestObserver.onError(new RuntimeException("cleanup")); + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java b/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java index 0ac024d1e4..bf330b1007 100644 --- a/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java +++ b/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java @@ -169,6 +169,9 @@ public class CsdsServiceTest { grpcServerRule.getServiceRegistry() .addService(new CsdsService(new FakeXdsClientPoolFactory(throwingXdsClient))); + // Hack to prevent the interrupted exception from propagating through to the client stub. + grpcServerRule.getChannel().getState(true); + try { ClientStatusResponse response = csdsStub.fetchClientStatus(REQUEST); fail("Should've failed, got response: " + response);