diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 09ca4684bd..997968172a 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -889,12 +889,6 @@ final class ManagedChannelImpl extends ManagedChannel implements if (configSelector.get() != INITIAL_PENDING_SELECTOR) { return newClientCall(method, callOptions); } - syncContext.execute(new Runnable() { - @Override - public void run() { - exitIdleMode(); - } - }); if (configSelector.get() != INITIAL_PENDING_SELECTOR) { // This is an optimization for the case (typically with InProcessTransport) when name // resolution result is immediately available at this point. Otherwise, some users' @@ -927,6 +921,10 @@ final class ManagedChannelImpl extends ManagedChannel implements if (pendingCalls == null) { pendingCalls = new LinkedHashSet<>(); inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true); + // It's possible to be in idle mode while inUseStateAggregator is in-use, if one of + // the subchannels is in use. But we should never be in idle mode when pendingCalls is + // in use. + exitIdleMode(); } pendingCalls.add(pendingCall); } else { @@ -2081,6 +2079,12 @@ final class ManagedChannelImpl extends ManagedChannel implements @Override public void transportInUse(final boolean inUse) { inUseStateAggregator.updateObjectInUse(delayedTransport, inUse); + if (inUse) { + // It's possible to be in idle mode while inUseStateAggregator is in-use, if one of the + // subchannels is in use. But we should never be in idle mode when delayed transport is in + // use. + exitIdleMode(); + } } @Override diff --git a/core/src/testFixtures/java/io/grpc/internal/FakeClock.java b/core/src/testFixtures/java/io/grpc/internal/FakeClock.java index 9cc9178f1f..1a3584f4e2 100644 --- a/core/src/testFixtures/java/io/grpc/internal/FakeClock.java +++ b/core/src/testFixtures/java/io/grpc/internal/FakeClock.java @@ -188,7 +188,8 @@ public final class FakeClock { } @Override public boolean isShutdown() { - throw new UnsupportedOperationException(); + // If shutdown is not implemented, then it is never shutdown. + return false; } @Override public boolean isTerminated() { diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/ManagedChannelImplIntegrationTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/ManagedChannelImplIntegrationTest.java new file mode 100644 index 0000000000..f09f196d7d --- /dev/null +++ b/interop-testing/src/test/java/io/grpc/testing/integration/ManagedChannelImplIntegrationTest.java @@ -0,0 +1,80 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.testing.integration; + +import static com.google.common.truth.Truth.assertThat; + +import io.grpc.ManagedChannel; +import io.grpc.ServerInterceptors; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.FakeClock; +import io.grpc.internal.testing.StreamRecorder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.testing.integration.EmptyProtos.Empty; +import io.grpc.testing.integration.Messages.ResponseParameters; +import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; +import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; +import java.util.concurrent.TimeUnit; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for ManagedChannelImpl that use a real transport. */ +@RunWith(JUnit4.class) +public final class ManagedChannelImplIntegrationTest { + private static final String SERVER_NAME = ManagedChannelImplIntegrationTest.class.getName(); + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + @Test + public void idleWhileRpcInTransport_exitsIdleForNewRpc() throws Exception { + FakeClock fakeClock = new FakeClock(); + grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME) + .directExecutor() + .addService( + ServerInterceptors.intercept( + new TestServiceImpl(fakeClock.getScheduledExecutorService()), + TestServiceImpl.interceptors())) + .build() + .start()); + ManagedChannel channel = grpcCleanup.register(InProcessChannelBuilder.forName(SERVER_NAME) + .directExecutor() + .build()); + + TestServiceGrpc.TestServiceBlockingStub blockingStub = TestServiceGrpc.newBlockingStub(channel); + TestServiceGrpc.TestServiceStub asyncStub = TestServiceGrpc.newStub(channel); + StreamRecorder responseObserver = StreamRecorder.create(); + StreamObserver requestObserver = + asyncStub.fullDuplexCall(responseObserver); + requestObserver.onNext(StreamingOutputCallRequest.newBuilder() + .addResponseParameters(ResponseParameters.newBuilder() + .setIntervalUs(Integer.MAX_VALUE)) + .build()); + try { + channel.enterIdle(); + assertThat(blockingStub + .withDeadlineAfter(10, TimeUnit.SECONDS) + .emptyCall(Empty.getDefaultInstance())) + .isEqualTo(Empty.getDefaultInstance()); + } finally { + requestObserver.onError(new RuntimeException("cleanup")); + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java b/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java index 0ac024d1e4..bf330b1007 100644 --- a/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java +++ b/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java @@ -169,6 +169,9 @@ public class CsdsServiceTest { grpcServerRule.getServiceRegistry() .addService(new CsdsService(new FakeXdsClientPoolFactory(throwingXdsClient))); + // Hack to prevent the interrupted exception from propagating through to the client stub. + grpcServerRule.getChannel().getState(true); + try { ClientStatusResponse response = csdsStub.fetchClientStatus(REQUEST); fail("Should've failed, got response: " + response);