From c528df8ae88d08906627844652e62443b93e4e6e Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Mon, 15 Oct 2018 15:39:21 -0700 Subject: [PATCH] core: add internal Subchannel.asChannel() (#4950) Returns a Channel that allows a LoadBalancer to make auxiliary RPCs on already-established application connections. We need this to implement client-side health-checking (#4932) See comments on the API for its semantics. Notable changes: - Transports are modified to use InUseStateAggregator so that they can exclude RPCs made on Subchannel.asChannel() when reporting in-use state for idle mode. - OobChannel shares the same Executor as Subchannel.asChannel(). Because the latter is not a ManagedChannel and doesn't have life-cycle, thus can't determine when to return the Executor to a pool, the Executor is now returned only when ManagedChannelImpl is terminated. --- core/src/main/java/io/grpc/LoadBalancer.java | 29 ++++ .../io/grpc/inprocess/InProcessTransport.java | 26 +++- .../grpc/internal/AbstractClientStream.java | 12 ++ .../main/java/io/grpc/internal/GrpcUtil.java | 16 +++ .../grpc/internal/InUseStateAggregator.java | 10 +- .../io/grpc/internal/InternalSubchannel.java | 19 ++- .../io/grpc/internal/ManagedChannelImpl.java | 48 ++++++- .../io/grpc/internal/SubchannelChannel.java | 119 +++++++++++++++++ .../internal/AbstractClientStreamTest.java | 3 +- .../grpc/internal/ManagedChannelImplTest.java | 126 +++++++++++++++--- .../io/grpc/cronet/CronetClientStream.java | 2 +- .../io/grpc/netty/CreateStreamCommand.java | 17 ++- .../io/grpc/netty/NettyClientHandler.java | 44 +++--- .../java/io/grpc/netty/NettyClientStream.java | 8 +- .../io/grpc/netty/NettyClientTransport.java | 3 +- .../io/grpc/netty/NettyClientHandlerTest.java | 35 ++--- .../io/grpc/netty/NettyClientStreamTest.java | 13 +- .../io/grpc/okhttp/OkHttpClientStream.java | 5 +- .../io/grpc/okhttp/OkHttpClientTransport.java | 53 +++++--- .../grpc/okhttp/OkHttpClientStreamTest.java | 10 +- .../testing/AbstractTransportTest.java | 41 ++++++ 21 files changed, 537 insertions(+), 102 deletions(-) create mode 100644 core/src/main/java/io/grpc/internal/SubchannelChannel.java diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java index f1cef63fba..2c16b101dc 100644 --- a/core/src/main/java/io/grpc/LoadBalancer.java +++ b/core/src/main/java/io/grpc/LoadBalancer.java @@ -655,6 +655,35 @@ public abstract class LoadBalancer { * @since 1.2.0 */ public abstract Attributes getAttributes(); + + /** + * (Internal use only) returns a {@link Channel} that is backed by this Subchannel. This allows + * a LoadBalancer to issue its own RPCs for auxiliary purposes, such as health-checking, on + * already-established connections. This channel has certain restrictions: + *
    + *
  1. It can issue RPCs only if the Subchannel is {@code READY}. If {@link + * Channel#newCall} is called when the Subchannel is not {@code READY}, the RPC will fail + * immediately.
  2. + *
  3. It doesn't support {@link CallOptions#withWaitForReady wait-for-ready} RPCs. Such RPCs + * will fail immediately.
  4. + *
+ * + *

RPCs made on this Channel is not counted when determining ManagedChannel's {@link + * ManagedChannelBuilder#idleTimeout idle mode}. In other words, they won't prevent + * ManagedChannel from entering idle mode. + * + *

Warning: RPCs made on this channel will prevent a shut-down transport from terminating. If + * you make long-running RPCs, you need to make sure they will finish in time after the + * Subchannel has transitioned away from {@code READY} state + * (notified through {@link #handleSubchannelState}). + * + *

Warning: this is INTERNAL API, is not supposed to be used by external users, and may + * change without notice. If you think you must use it, please file an issue. + */ + @Internal + public Channel asChannel() { + throw new UnsupportedOperationException(); + } } /** diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index 03e2e1a197..6f9c3b3cf5 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -42,6 +42,7 @@ import io.grpc.internal.ClientStreamListener; import io.grpc.internal.ConnectionClientTransport; import io.grpc.internal.GrpcAttributes; import io.grpc.internal.GrpcUtil; +import io.grpc.internal.InUseStateAggregator; import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.NoopClientStream; import io.grpc.internal.ObjectPool; @@ -93,6 +94,19 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans private final Attributes attributes = Attributes.newBuilder() .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY) .build(); + @GuardedBy("this") + private final InUseStateAggregator inUseState = + new InUseStateAggregator() { + @Override + protected void handleInUse() { + clientTransportListener.transportInUse(true); + } + + @Override + protected void handleNotInUse() { + clientTransportListener.transportInUse(false); + } + }; public InProcessTransport(String name, String authority, String userAgent) { this.name = name; @@ -270,6 +284,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans private class InProcessStream { private final InProcessClientStream clientStream; private final InProcessServerStream serverStream; + private final CallOptions callOptions; private final Metadata headers; private final MethodDescriptor method; private volatile String authority; @@ -279,6 +294,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans String authority) { this.method = checkNotNull(method, "method"); this.headers = checkNotNull(headers, "headers"); + this.callOptions = checkNotNull(callOptions, "callOptions"); this.authority = authority; this.clientStream = new InProcessClientStream(callOptions, headers); this.serverStream = new InProcessServerStream(method, headers); @@ -288,8 +304,10 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans private void streamClosed() { synchronized (InProcessTransport.this) { boolean justRemovedAnElement = streams.remove(this); + if (GrpcUtil.shouldBeCountedForInUse(callOptions)) { + inUseState.updateObjectInUse(this, false); + } if (streams.isEmpty() && justRemovedAnElement) { - clientTransportListener.transportInUse(false); if (shutdown) { notifyTerminated(); } @@ -498,6 +516,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans private class InProcessClientStream implements ClientStream { final StatsTraceContext statsTraceCtx; + final CallOptions callOptions; @GuardedBy("this") private ServerStreamListener serverStreamListener; @GuardedBy("this") @@ -514,6 +533,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans private int outboundSeqNo; InProcessClientStream(CallOptions callOptions, Metadata headers) { + this.callOptions = callOptions; statsTraceCtx = StatsTraceContext.newClientContext(callOptions, headers); } @@ -652,8 +672,8 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans synchronized (InProcessTransport.this) { statsTraceCtx.clientOutboundHeaders(); streams.add(InProcessTransport.InProcessStream.this); - if (streams.size() == 1) { - clientTransportListener.transportInUse(true); + if (GrpcUtil.shouldBeCountedForInUse(callOptions)) { + inUseState.updateObjectInUse(InProcessTransport.InProcessStream.this, true); } serverTransportListener.streamCreated(serverStream, method.getFullMethodName(), headers); } diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index c100998a13..abd28b1ae7 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -25,6 +25,7 @@ import static java.lang.Math.max; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import io.grpc.CallOptions; import io.grpc.Codec; import io.grpc.Compressor; import io.grpc.Deadline; @@ -95,6 +96,7 @@ public abstract class AbstractClientStream extends AbstractStream private final TransportTracer transportTracer; private final Framer framer; + private boolean shouldBeCountedForInUse; private boolean useGet; private Metadata headers; /** @@ -109,9 +111,11 @@ public abstract class AbstractClientStream extends AbstractStream StatsTraceContext statsTraceCtx, TransportTracer transportTracer, Metadata headers, + CallOptions callOptions, boolean useGet) { checkNotNull(headers, "headers"); this.transportTracer = checkNotNull(transportTracer, "transportTracer"); + this.shouldBeCountedForInUse = GrpcUtil.shouldBeCountedForInUse(callOptions); this.useGet = useGet; if (!useGet) { framer = new MessageFramer(this, bufferAllocator, statsTraceCtx); @@ -172,6 +176,14 @@ public abstract class AbstractClientStream extends AbstractStream return framer; } + /** + * Returns true if this stream should be counted when determining the in-use state of the + * transport. + */ + public final boolean shouldBeCountedForInUse() { + return shouldBeCountedForInUse; + } + @Override public final void request(int numMessages) { abstractClientStreamSink().request(numMessages); diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index b1340565cb..da10e62356 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -248,6 +248,22 @@ public final class GrpcUtil { } }; + /** + * RPCs created on the Channel returned by {@link io.grpc.LoadBalancer.Subchannel#asChannel} + * will have this option with value {@code true}. They will be treated differently from + * the ones created by application. + */ + public static final CallOptions.Key CALL_OPTIONS_RPC_OWNED_BY_BALANCER = + CallOptions.Key.create("io.grpc.internal.CALL_OPTIONS_RPC_OWNED_BY_BALANCER"); + + /** + * Returns true if an RPC with the given properties should be counted when calculating the + * in-use state of a transport. + */ + public static boolean shouldBeCountedForInUse(CallOptions callOptions) { + return !(Boolean.TRUE.equals(callOptions.getOption(CALL_OPTIONS_RPC_OWNED_BY_BALANCER))); + } + /** * Returns a proxy detector appropriate for the current environment. */ diff --git a/core/src/main/java/io/grpc/internal/InUseStateAggregator.java b/core/src/main/java/io/grpc/internal/InUseStateAggregator.java index 4c9bba1296..786baf77e7 100644 --- a/core/src/main/java/io/grpc/internal/InUseStateAggregator.java +++ b/core/src/main/java/io/grpc/internal/InUseStateAggregator.java @@ -23,7 +23,7 @@ import javax.annotation.concurrent.NotThreadSafe; * Aggregates the in-use state of a set of objects. */ @NotThreadSafe -abstract class InUseStateAggregator { +public abstract class InUseStateAggregator { private final HashSet inUseObjects = new HashSet(); @@ -32,7 +32,7 @@ abstract class InUseStateAggregator { * *

This may call into {@link #handleInUse} or {@link #handleNotInUse} when appropriate. */ - final void updateObjectInUse(T object, boolean inUse) { + public final void updateObjectInUse(T object, boolean inUse) { int origSize = inUseObjects.size(); if (inUse) { inUseObjects.add(object); @@ -47,7 +47,7 @@ abstract class InUseStateAggregator { } } - final boolean isInUse() { + public final boolean isInUse() { return !inUseObjects.isEmpty(); } @@ -55,10 +55,10 @@ abstract class InUseStateAggregator { * Called when the aggregated in-use state has changed to true, which means at least one object is * in use. */ - abstract void handleInUse(); + protected abstract void handleInUse(); /** * Called when the aggregated in-use state has changed to false, which means no object is in use. */ - abstract void handleNotInUse(); + protected abstract void handleNotInUse(); } diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 879b7d873b..0f67a1f6d0 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -131,12 +131,12 @@ final class InternalSubchannel implements InternalInstrumented { private final InUseStateAggregator inUseStateAggregator = new InUseStateAggregator() { @Override - void handleInUse() { + protected void handleInUse() { callback.onInUse(InternalSubchannel.this); } @Override - void handleNotInUse() { + protected void handleNotInUse() { callback.onNotInUse(InternalSubchannel.this); } }; @@ -215,6 +215,21 @@ final class InternalSubchannel implements InternalInstrumented { return null; } + /** + * Returns a READY transport if there is any, without trying to connect. + */ + @Nullable + ClientTransport getTransport() { + return activeTransport; + } + + /** + * Returns the authority string associated with this Subchannel. + */ + String getAuthority() { + return authority; + } + @GuardedBy("lock") private void startNewTransport() { Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled"); diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 3765692c79..37aad45381 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -121,7 +121,8 @@ final class ManagedChannelImpl extends ManagedChannel implements private final ClientTransportFactory transportFactory; private final Executor executor; private final ObjectPool executorPool; - private final ObjectPool oobExecutorPool; + private final ObjectPool balancerRpcExecutorPool; + private final ExecutorHolder balancerRpcExecutorHolder; private final TimeProvider timeProvider; private final int maxTraceEvents; @@ -515,7 +516,7 @@ final class ManagedChannelImpl extends ManagedChannel implements AbstractManagedChannelImplBuilder builder, ClientTransportFactory clientTransportFactory, BackoffPolicy.Provider backoffPolicyProvider, - ObjectPool oobExecutorPool, + ObjectPool balancerRpcExecutorPool, Supplier stopwatchSupplier, List interceptors, final TimeProvider timeProvider) { @@ -537,7 +538,8 @@ final class ManagedChannelImpl extends ManagedChannel implements this.loadBalancerFactory = builder.loadBalancerFactory; } this.executorPool = checkNotNull(builder.executorPool, "executorPool"); - this.oobExecutorPool = checkNotNull(oobExecutorPool, "oobExecutorPool"); + this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool"); + this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool); this.executor = checkNotNull(executorPool.getObject(), "executor"); this.delayedTransport = new DelayedClientTransport(this.executor, this.channelExecutor); this.delayedTransport.start(delayedTransportListener); @@ -835,6 +837,7 @@ final class ManagedChannelImpl extends ManagedChannel implements terminated = true; terminatedLatch.countDown(); executorPool.returnObject(executor); + balancerRpcExecutorHolder.release(); // Release the transport factory so that it can deallocate any resources. transportFactory.close(); } @@ -1153,7 +1156,7 @@ final class ManagedChannelImpl extends ManagedChannel implements oobChannelTracer = new ChannelTracer(maxTraceEvents, oobChannelCreationTime, "OobChannel"); } final OobChannel oobChannel = new OobChannel( - authority, oobExecutorPool, transportFactory.getScheduledExecutorService(), + authority, balancerRpcExecutorPool, transportFactory.getScheduledExecutorService(), channelExecutor, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider); if (channelTracer != null) { channelTracer.reportEvent(new ChannelTrace.Event.Builder() @@ -1455,6 +1458,14 @@ final class ManagedChannelImpl extends ManagedChannel implements public String toString() { return subchannel.getLogId().toString(); } + + @Override + public Channel asChannel() { + return new SubchannelChannel( + subchannel, balancerRpcExecutorHolder.getExecutor(), + transportFactory.getScheduledExecutorService(), + callTracerFactory.create()); + } } @Override @@ -1510,16 +1521,41 @@ final class ManagedChannelImpl extends ManagedChannel implements */ private final class IdleModeStateAggregator extends InUseStateAggregator { @Override - void handleInUse() { + protected void handleInUse() { exitIdleMode(); } @Override - void handleNotInUse() { + protected void handleNotInUse() { if (shutdown.get()) { return; } rescheduleIdleTimer(); } } + + /** + * Lazily request for Executor from an executor pool. + */ + private static final class ExecutorHolder { + private final ObjectPool pool; + private Executor executor; + + ExecutorHolder(ObjectPool executorPool) { + this.pool = checkNotNull(executorPool, "executorPool"); + } + + synchronized Executor getExecutor() { + if (executor == null) { + executor = checkNotNull(pool.getObject(), "%s.getObject()", executor); + } + return executor; + } + + synchronized void release() { + if (executor != null) { + executor = pool.returnObject(executor); + } + } + } } diff --git a/core/src/main/java/io/grpc/internal/SubchannelChannel.java b/core/src/main/java/io/grpc/internal/SubchannelChannel.java new file mode 100644 index 0000000000..d59f0f0527 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/SubchannelChannel.java @@ -0,0 +1,119 @@ +/* + * Copyright 2016 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.Context; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.internal.ClientCallImpl.ClientTransportProvider; +import io.grpc.internal.ClientStreamListener.RpcProgress; +import io.grpc.internal.GrpcUtil; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; + +final class SubchannelChannel extends Channel { + @VisibleForTesting + static final Status NOT_READY_ERROR = + Status.UNAVAILABLE.withDescription("Subchannel is NOT READY"); + @VisibleForTesting + static final Status WAIT_FOR_READY_ERROR = + Status.UNAVAILABLE.withDescription( + "wait-for-ready RPC is not supported on Subchannel.asChannel()"); + private static final FailingClientTransport notReadyTransport = + new FailingClientTransport(NOT_READY_ERROR, RpcProgress.REFUSED); + private final InternalSubchannel subchannel; + private final Executor executor; + private final ScheduledExecutorService deadlineCancellationExecutor; + private final CallTracer callsTracer; + + private final ClientTransportProvider transportProvider = new ClientTransportProvider() { + @Override + public ClientTransport get(PickSubchannelArgs args) { + ClientTransport transport = subchannel.getTransport(); + if (transport == null) { + return notReadyTransport; + } else { + return transport; + } + } + + @Override + public RetriableStream newRetriableStream(MethodDescriptor method, + CallOptions callOptions, Metadata headers, Context context) { + throw new UnsupportedOperationException("OobChannel should not create retriable streams"); + } + }; + + SubchannelChannel( + InternalSubchannel subchannel, Executor executor, + ScheduledExecutorService deadlineCancellationExecutor, CallTracer callsTracer) { + this.subchannel = checkNotNull(subchannel, "subchannel"); + this.executor = checkNotNull(executor, "executor"); + this.deadlineCancellationExecutor = + checkNotNull(deadlineCancellationExecutor, "deadlineCancellationExecutor"); + this.callsTracer = checkNotNull(callsTracer, "callsTracer"); + } + + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, CallOptions callOptions) { + final Executor effectiveExecutor = + callOptions.getExecutor() == null ? executor : callOptions.getExecutor(); + if (callOptions.isWaitForReady()) { + return new ClientCall() { + @Override + public void start(final ClientCall.Listener listener, Metadata headers) { + effectiveExecutor.execute(new Runnable() { + @Override + public void run() { + listener.onClose(WAIT_FOR_READY_ERROR, new Metadata()); + } + }); + } + + @Override + public void request(int numMessages) {} + + @Override + public void cancel(String message, Throwable cause) {} + + @Override + public void halfClose() {} + + @Override + public void sendMessage(RequestT message) {} + }; + } + return new ClientCallImpl(methodDescriptor, + effectiveExecutor, + callOptions.withOption(GrpcUtil.CALL_OPTIONS_RPC_OWNED_BY_BALANCER, Boolean.TRUE), + transportProvider, deadlineCancellationExecutor, callsTracer, false /* retryEnabled */); + } + + @Override + public String authority() { + return subchannel.getAuthority(); + } +} diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java index 0d540a95f7..d370007201 100644 --- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java @@ -34,6 +34,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import io.grpc.Attributes; +import io.grpc.CallOptions; import io.grpc.Codec; import io.grpc.Deadline; import io.grpc.Metadata; @@ -479,7 +480,7 @@ public class AbstractClientStreamTest { StatsTraceContext statsTraceCtx, TransportTracer transportTracer, boolean useGet) { - super(allocator, statsTraceCtx, transportTracer, new Metadata(), useGet); + super(allocator, statsTraceCtx, transportTracer, new Metadata(), CallOptions.DEFAULT, useGet); this.state = state; this.sink = sink; } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 18ffc131aa..96a9a543fb 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -160,7 +160,7 @@ public class ManagedChannelImplTest { private final EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress); private final FakeClock timer = new FakeClock(); private final FakeClock executor = new FakeClock(); - private final FakeClock oobExecutor = new FakeClock(); + private final FakeClock balancerRpcExecutor = new FakeClock(); private static final FakeClock.TaskFilter NAME_RESOLVER_REFRESH_TASK_FILTER = new FakeClock.TaskFilter() { @Override @@ -203,7 +203,7 @@ public class ManagedChannelImplTest { @Mock private ObjectPool executorPool; @Mock - private ObjectPool oobExecutorPool; + private ObjectPool balancerRpcExecutorPool; @Mock private CallCredentials creds; private ChannelBuilder channelBuilder; @@ -224,7 +224,7 @@ public class ManagedChannelImplTest { channel = new ManagedChannelImpl( channelBuilder, mockTransportFactory, new FakeBackoffPolicyProvider(), - oobExecutorPool, timer.getStopwatchSupplier(), Arrays.asList(interceptors), + balancerRpcExecutorPool, timer.getStopwatchSupplier(), Arrays.asList(interceptors), fakeClockTimeProvider); if (requestConnection) { @@ -257,7 +257,8 @@ public class ManagedChannelImplTest { when(mockTransportFactory.getScheduledExecutorService()) .thenReturn(timer.getScheduledExecutorService()); when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService()); - when(oobExecutorPool.getObject()).thenReturn(oobExecutor.getScheduledExecutorService()); + when(balancerRpcExecutorPool.getObject()) + .thenReturn(balancerRpcExecutor.getScheduledExecutorService()); channelBuilder = new ChannelBuilder() .nameResolverFactory(new FakeNameResolverFactory.Builder(expectedUri).build()) @@ -546,7 +547,7 @@ public class ManagedChannelImplTest { transportListener.transportTerminated(); assertTrue(channel.isTerminated()); verify(executorPool).returnObject(executor.getScheduledExecutorService()); - verifyNoMoreInteractions(oobExecutorPool); + verifyNoMoreInteractions(balancerRpcExecutorPool); verify(mockTransportFactory) .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); @@ -1171,7 +1172,7 @@ public class ManagedChannelImplTest { ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1authority"); ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2authority"); - verify(oobExecutorPool, times(2)).getObject(); + verify(balancerRpcExecutorPool, times(2)).getObject(); assertEquals("oob1authority", oob1.authority()); assertEquals("oob2authority", oob2.authority()); @@ -1187,9 +1188,9 @@ public class ManagedChannelImplTest { MockClientTransportInfo transportInfo = transports.poll(); assertNotNull(transportInfo); - assertEquals(0, oobExecutor.numPendingTasks()); + assertEquals(0, balancerRpcExecutor.numPendingTasks()); transportInfo.listener.transportReady(); - assertEquals(1, oobExecutor.runDueTasks()); + assertEquals(1, balancerRpcExecutor.runDueTasks()); verify(transportInfo.transport).newStream(same(method), same(headers), same(CallOptions.DEFAULT)); @@ -1211,9 +1212,9 @@ public class ManagedChannelImplTest { // This transport fails Status transportError = Status.UNAVAILABLE.withDescription("Connection refused"); - assertEquals(0, oobExecutor.numPendingTasks()); + assertEquals(0, balancerRpcExecutor.numPendingTasks()); transportInfo.listener.transportShutdown(transportError); - assertTrue(oobExecutor.runDueTasks() > 0); + assertTrue(balancerRpcExecutor.runDueTasks() > 0); // Fail-fast RPC will fail, while wait-for-ready RPC will still be pending verify(mockCallListener2).onClose(same(transportError), any(Metadata.class)); @@ -1223,20 +1224,19 @@ public class ManagedChannelImplTest { assertFalse(oob1.isShutdown()); assertFalse(oob2.isShutdown()); oob1.shutdown(); - verify(oobExecutorPool, never()).returnObject(anyObject()); oob2.shutdownNow(); assertTrue(oob1.isShutdown()); assertTrue(oob2.isShutdown()); assertTrue(oob2.isTerminated()); - verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService()); + verify(balancerRpcExecutorPool).returnObject(balancerRpcExecutor.getScheduledExecutorService()); // New RPCs will be rejected. - assertEquals(0, oobExecutor.numPendingTasks()); + assertEquals(0, balancerRpcExecutor.numPendingTasks()); ClientCall call4 = oob1.newCall(method, CallOptions.DEFAULT); ClientCall call5 = oob2.newCall(method, CallOptions.DEFAULT); call4.start(mockCallListener4, headers); call5.start(mockCallListener5, headers); - assertTrue(oobExecutor.runDueTasks() > 0); + assertTrue(balancerRpcExecutor.runDueTasks() > 0); verify(mockCallListener4).onClose(statusCaptor.capture(), any(Metadata.class)); Status status4 = statusCaptor.getValue(); assertEquals(Status.Code.UNAVAILABLE, status4.getCode()); @@ -1248,9 +1248,9 @@ public class ManagedChannelImplTest { verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class)); // This will shutdownNow() the delayed transport, terminating the pending RPC - assertEquals(0, oobExecutor.numPendingTasks()); + assertEquals(0, balancerRpcExecutor.numPendingTasks()); oob1.shutdownNow(); - assertTrue(oobExecutor.runDueTasks() > 0); + assertTrue(balancerRpcExecutor.runDueTasks() > 0); verify(mockCallListener3).onClose(any(Status.class), any(Metadata.class)); // Shut down the channel, and it will not terminated because OOB channel has not. @@ -1259,11 +1259,12 @@ public class ManagedChannelImplTest { // Delayed transport has already terminated. Terminating the transport terminates the // subchannel, which in turn terimates the OOB channel, which terminates the channel. assertFalse(oob1.isTerminated()); - verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService()); + verify(balancerRpcExecutorPool).returnObject(balancerRpcExecutor.getScheduledExecutorService()); transportInfo.listener.transportTerminated(); assertTrue(oob1.isTerminated()); assertTrue(channel.isTerminated()); - verify(oobExecutorPool, times(2)).returnObject(oobExecutor.getScheduledExecutorService()); + verify(balancerRpcExecutorPool, times(2)) + .returnObject(balancerRpcExecutor.getScheduledExecutorService()); } @Test @@ -1328,6 +1329,93 @@ public class ManagedChannelImplTest { .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); } + @Test + public void subchannelChannel_normalUsage() { + createChannel(); + Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); + verify(balancerRpcExecutorPool, never()).getObject(); + + Channel sChannel = subchannel.asChannel(); + verify(balancerRpcExecutorPool).getObject(); + + Metadata headers = new Metadata(); + CallOptions callOptions = CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS); + + // Subchannel must be READY when creating the RPC. + subchannel.requestConnection(); + verify(mockTransportFactory) + .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); + MockClientTransportInfo transportInfo = transports.poll(); + ConnectionClientTransport mockTransport = transportInfo.transport; + ManagedClientTransport.Listener transportListener = transportInfo.listener; + transportListener.transportReady(); + + ClientCall call = sChannel.newCall(method, callOptions); + call.start(mockCallListener, headers); + verify(mockTransport).newStream(same(method), same(headers), callOptionsCaptor.capture()); + + CallOptions capturedCallOption = callOptionsCaptor.getValue(); + assertThat(capturedCallOption.getDeadline()).isSameAs(callOptions.getDeadline()); + assertThat(capturedCallOption.getOption(GrpcUtil.CALL_OPTIONS_RPC_OWNED_BY_BALANCER)).isTrue(); + } + + @Test + public void subchannelChannel_failWhenNotReady() { + createChannel(); + Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); + Channel sChannel = subchannel.asChannel(); + Metadata headers = new Metadata(); + + subchannel.requestConnection(); + verify(mockTransportFactory) + .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); + MockClientTransportInfo transportInfo = transports.poll(); + ConnectionClientTransport mockTransport = transportInfo.transport; + + assertEquals(0, balancerRpcExecutor.numPendingTasks()); + + // Subchannel is still CONNECTING, but not READY yet + ClientCall call = sChannel.newCall(method, CallOptions.DEFAULT); + call.start(mockCallListener, headers); + verify(mockTransport, never()).newStream( + any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); + + verifyZeroInteractions(mockCallListener); + assertEquals(1, balancerRpcExecutor.runDueTasks()); + verify(mockCallListener).onClose( + same(SubchannelChannel.NOT_READY_ERROR), any(Metadata.class)); + } + + @Test + public void subchannelChannel_failWaitForReady() { + createChannel(); + Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); + Channel sChannel = subchannel.asChannel(); + Metadata headers = new Metadata(); + + // Subchannel must be READY when creating the RPC. + subchannel.requestConnection(); + verify(mockTransportFactory) + .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); + MockClientTransportInfo transportInfo = transports.poll(); + ConnectionClientTransport mockTransport = transportInfo.transport; + ManagedClientTransport.Listener transportListener = transportInfo.listener; + transportListener.transportReady(); + assertEquals(0, balancerRpcExecutor.numPendingTasks()); + + // Wait-for-ready RPC is not allowed + ClientCall call = + sChannel.newCall(method, CallOptions.DEFAULT.withWaitForReady()); + call.start(mockCallListener, headers); + verify(mockTransport, never()).newStream( + any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); + + verifyZeroInteractions(mockCallListener); + assertEquals(1, balancerRpcExecutor.runDueTasks()); + verify(mockCallListener).onClose( + same(SubchannelChannel.WAIT_FOR_READY_ERROR), any(Metadata.class)); + } + @Test public void refreshNameResolutionWhenSubchannelConnectionFailed() { subtestRefreshNameResolutionWhenConnectionFailed(false); @@ -1818,7 +1906,7 @@ public class ManagedChannelImplTest { // Channel is dead. No more pending task to possibly revive it. assertEquals(0, timer.numPendingTasks()); assertEquals(0, executor.numPendingTasks()); - assertEquals(0, oobExecutor.numPendingTasks()); + assertEquals(0, balancerRpcExecutor.numPendingTasks()); } private void verifyCallListenerClosed( diff --git a/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java b/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java index b9a8a43b22..1217106ec9 100644 --- a/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java +++ b/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java @@ -95,7 +95,7 @@ class CronetClientStream extends AbstractClientStream { CallOptions callOptions, TransportTracer transportTracer) { super( - new CronetWritableBufferAllocator(), statsTraceCtx, transportTracer, headers, + new CronetWritableBufferAllocator(), statsTraceCtx, transportTracer, headers, callOptions, method.isSafe()); this.url = Preconditions.checkNotNull(url, "url"); this.userAgent = Preconditions.checkNotNull(userAgent, "userAgent"); diff --git a/netty/src/main/java/io/grpc/netty/CreateStreamCommand.java b/netty/src/main/java/io/grpc/netty/CreateStreamCommand.java index 9a758e6422..f5cfcf35c2 100644 --- a/netty/src/main/java/io/grpc/netty/CreateStreamCommand.java +++ b/netty/src/main/java/io/grpc/netty/CreateStreamCommand.java @@ -26,17 +26,16 @@ import io.netty.handler.codec.http2.Http2Headers; class CreateStreamCommand extends WriteQueue.AbstractQueuedCommand { private final Http2Headers headers; private final NettyClientStream.TransportState stream; + private final boolean shouldBeCountedForInUse; private final boolean get; - CreateStreamCommand(Http2Headers headers, - NettyClientStream.TransportState stream) { - this(headers, stream, false); - } - - CreateStreamCommand(Http2Headers headers, - NettyClientStream.TransportState stream, boolean get) { + CreateStreamCommand( + Http2Headers headers, + NettyClientStream.TransportState stream, + boolean shouldBeCountedForInUse, boolean get) { this.stream = Preconditions.checkNotNull(stream, "stream"); this.headers = Preconditions.checkNotNull(headers, "headers"); + this.shouldBeCountedForInUse = shouldBeCountedForInUse; this.get = get; } @@ -48,6 +47,10 @@ class CreateStreamCommand extends WriteQueue.AbstractQueuedCommand { return headers; } + boolean shouldBeCountedForInUse() { + return shouldBeCountedForInUse; + } + boolean isGet() { return get; } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index e059af96d5..ea8706ff32 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -32,6 +32,7 @@ import io.grpc.internal.ClientStreamListener.RpcProgress; import io.grpc.internal.ClientTransport.PingCallback; import io.grpc.internal.GrpcUtil; import io.grpc.internal.Http2Ping; +import io.grpc.internal.InUseStateAggregator; import io.grpc.internal.KeepAliveManager; import io.grpc.internal.TransportTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder; @@ -104,6 +105,19 @@ class NettyClientHandler extends AbstractNettyHandler { private final TransportTracer transportTracer; private final Attributes eagAttributes; private final String authority; + private final InUseStateAggregator inUseState = + new InUseStateAggregator() { + @Override + protected void handleInUse() { + lifecycleManager.notifyInUse(true); + } + + @Override + protected void handleNotInUse() { + lifecycleManager.notifyInUse(false); + } + }; + private WriteQueue clientWriteQueue; private Http2Ping ping; private Attributes attributes = Attributes.EMPTY; @@ -256,26 +270,20 @@ class NettyClientHandler extends AbstractNettyHandler { @Override public void onStreamActive(Http2Stream stream) { - if (connection().numActiveStreams() != 1) { - return; - } - - NettyClientHandler.this.lifecycleManager.notifyInUse(true); - - if (NettyClientHandler.this.keepAliveManager != null) { + if (connection().numActiveStreams() == 1 + && NettyClientHandler.this.keepAliveManager != null) { NettyClientHandler.this.keepAliveManager.onTransportActive(); } } @Override public void onStreamClosed(Http2Stream stream) { - if (connection().numActiveStreams() != 0) { - return; - } - - NettyClientHandler.this.lifecycleManager.notifyInUse(false); - - if (NettyClientHandler.this.keepAliveManager != null) { + // Although streams with CALL_OPTIONS_RPC_OWNED_BY_BALANCER are not marked as "in-use" in + // the first place, we don't propagate that option here, and it's safe to reset the in-use + // state for them, which will be a cheap no-op. + inUseState.updateObjectInUse(stream, false); + if (connection().numActiveStreams() == 0 + && NettyClientHandler.this.keepAliveManager != null) { NettyClientHandler.this.keepAliveManager.onTransportIdle(); } } @@ -482,7 +490,7 @@ class NettyClientHandler extends AbstractNettyHandler { * Attempts to create a new stream from the given command. If there are too many active streams, * the creation request is queued. */ - private void createStream(CreateStreamCommand command, final ChannelPromise promise) + private void createStream(final CreateStreamCommand command, final ChannelPromise promise) throws Exception { if (lifecycleManager.getShutdownThrowable() != null) { // The connection is going away (it is really the GOAWAY case), @@ -530,6 +538,12 @@ class NettyClientHandler extends AbstractNettyHandler { stream.getStatsTraceContext().clientOutboundHeaders(); http2Stream.setProperty(streamKey, stream); + // This delays the in-use state until the I/O completes, which technically may + // be later than we would like. + if (command.shouldBeCountedForInUse()) { + inUseState.updateObjectInUse(http2Stream, true); + } + // Attach the client stream to the HTTP/2 stream object as user data. stream.setHttp2Stream(http2Stream); } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java index 2170e1f829..de2e8107c0 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java @@ -24,6 +24,7 @@ import static io.netty.buffer.Unpooled.EMPTY_BUFFER; import com.google.common.base.Preconditions; import com.google.common.io.BaseEncoding; import io.grpc.Attributes; +import io.grpc.CallOptions; import io.grpc.InternalKnownTransport; import io.grpc.InternalMethodDescriptor; import io.grpc.Metadata; @@ -70,12 +71,14 @@ class NettyClientStream extends AbstractClientStream { AsciiString scheme, AsciiString userAgent, StatsTraceContext statsTraceCtx, - TransportTracer transportTracer) { + TransportTracer transportTracer, + CallOptions callOptions) { super( new NettyWritableBufferAllocator(channel.alloc()), statsTraceCtx, transportTracer, headers, + callOptions, useGet(method)); this.state = checkNotNull(state, "transportState"); this.writeQueue = state.handler.getWriteQueue(); @@ -152,7 +155,8 @@ class NettyClientStream extends AbstractClientStream { }; // Write the command requesting the creation of the stream. - writeQueue.enqueue(new CreateStreamCommand(http2Headers, transportState(), get), + writeQueue.enqueue( + new CreateStreamCommand(http2Headers, transportState(), shouldBeCountedForInUse(), get), !method.getType().clientSendsOneMessage() || get).addListener(failureListener); } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 2be71c20d5..72eb5f61d2 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -178,7 +178,8 @@ class NettyClientTransport implements ConnectionClientTransport { negotiationHandler.scheme(), userAgent, statsTraceCtx, - transportTracer); + transportTracer, + callOptions); } @SuppressWarnings("unchecked") diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index 1e02d27ec5..f1fcac72ca 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -193,7 +193,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase cmdCap = ArgumentCaptor.forClass(CreateStreamCommand.class); @@ -476,7 +479,8 @@ public class NettyClientStreamTest extends NettyStreamTestBase inUseState = + new InUseStateAggregator() { + @Override + protected void handleInUse() { + listener.transportInUse(true); + } + + @Override + protected void handleNotInUse() { + listener.transportInUse(false); + } + }; + @GuardedBy("lock") private InternalChannelz.Security securityInfo; @VisibleForTesting @@ -347,7 +361,8 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep defaultAuthority, userAgent, statsTraceCtx, - transportTracer); + transportTracer, + callOptions); } @GuardedBy("lock") @@ -357,7 +372,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep goAwayStatus, RpcProgress.REFUSED, true, new Metadata()); } else if (streams.size() >= maxConcurrentStreams) { pendingStreams.add(clientStream); - setInUse(); + setInUse(clientStream); } else { startStream(clientStream); } @@ -368,7 +383,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep Preconditions.checkState( stream.id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned"); streams.put(nextStreamId, stream); - setInUse(); + setInUse(stream); stream.transportState().start(nextStreamId); // For unary and server streaming, there will be a data frame soon, no need to flush the header. if ((stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING) @@ -406,7 +421,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep @GuardedBy("lock") void removePendingStream(OkHttpClientStream pendingStream) { pendingStreams.remove(pendingStream); - maybeClearInUse(); + maybeClearInUse(pendingStream); } @Override @@ -689,13 +704,14 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep Map.Entry entry = it.next(); it.remove(); entry.getValue().transportState().transportReportStatus(reason, false, new Metadata()); + maybeClearInUse(entry.getValue()); } for (OkHttpClientStream stream : pendingStreams) { stream.transportState().transportReportStatus(reason, true, new Metadata()); + maybeClearInUse(stream); } pendingStreams.clear(); - maybeClearInUse(); stopIfNecessary(); } @@ -764,15 +780,16 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep it.remove(); entry.getValue().transportState().transportReportStatus( status, RpcProgress.REFUSED, false, new Metadata()); + maybeClearInUse(entry.getValue()); } } for (OkHttpClientStream stream : pendingStreams) { stream.transportState().transportReportStatus( status, RpcProgress.REFUSED, true, new Metadata()); + maybeClearInUse(stream); } pendingStreams.clear(); - maybeClearInUse(); stopIfNecessary(); } @@ -817,7 +834,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep } if (!startPendingStreams()) { stopIfNecessary(); - maybeClearInUse(); + maybeClearInUse(stream); } } } @@ -860,11 +877,10 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep } @GuardedBy("lock") - private void maybeClearInUse() { - if (inUse) { + private void maybeClearInUse(OkHttpClientStream stream) { + if (hasStream) { if (pendingStreams.isEmpty() && streams.isEmpty()) { - inUse = false; - listener.transportInUse(false); + hasStream = false; if (keepAliveManager != null) { // We don't have any active streams. No need to do keepalives any more. // Again, we have to call this inside the lock to avoid the race between onTransportIdle @@ -873,13 +889,15 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep } } } + if (stream.shouldBeCountedForInUse()) { + inUseState.updateObjectInUse(stream, false); + } } @GuardedBy("lock") - private void setInUse() { - if (!inUse) { - inUse = true; - listener.transportInUse(true); + private void setInUse(OkHttpClientStream stream) { + if (!hasStream) { + hasStream = true; if (keepAliveManager != null) { // We have a new stream. We might need to do keepalives now. // Note that we have to do this inside the lock to avoid calling @@ -888,6 +906,9 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep keepAliveManager.onTransportActive(); } } + if (stream.shouldBeCountedForInUse()) { + inUseState.updateObjectInUse(stream, true); + } } private Throwable getPingFailure() { diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java index 8d8347a0e5..e610b96068 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import com.google.common.io.BaseEncoding; +import io.grpc.CallOptions; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; @@ -90,7 +91,8 @@ public class OkHttpClientStreamTest { "localhost", "userAgent", StatsTraceContext.NOOP, - transportTracer); + transportTracer, + CallOptions.DEFAULT); } @Test @@ -149,7 +151,7 @@ public class OkHttpClientStreamTest { metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application"); stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport, flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application", - StatsTraceContext.NOOP, transportTracer); + StatsTraceContext.NOOP, transportTracer, CallOptions.DEFAULT); stream.start(new BaseClientStreamListener()); stream.transportState().start(3); @@ -164,7 +166,7 @@ public class OkHttpClientStreamTest { metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application"); stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport, flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application", - StatsTraceContext.NOOP, transportTracer); + StatsTraceContext.NOOP, transportTracer, CallOptions.DEFAULT); stream.start(new BaseClientStreamListener()); stream.transportState().start(3); @@ -192,7 +194,7 @@ public class OkHttpClientStreamTest { .build(); stream = new OkHttpClientStream(getMethod, new Metadata(), frameWriter, transport, flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application", - StatsTraceContext.NOOP, transportTracer); + StatsTraceContext.NOOP, transportTracer, CallOptions.DEFAULT); stream.start(new BaseClientStreamListener()); // GET streams send headers after halfClose is called. diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index fc516f41ba..7adb6c11f9 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -59,6 +59,7 @@ import io.grpc.internal.ClientStreamListener; import io.grpc.internal.ClientTransport; import io.grpc.internal.ConnectionClientTransport; import io.grpc.internal.GrpcAttributes; +import io.grpc.internal.GrpcUtil; import io.grpc.internal.InternalServer; import io.grpc.internal.IoUtils; import io.grpc.internal.ManagedClientTransport; @@ -612,6 +613,46 @@ public abstract class AbstractTransportTest { assertNull(serverStreamTracer1.getServerCallInfo()); } + @Test + public void transportInUse_balancerRpcsNotCounted() throws Exception { + server.start(serverListener); + client = newClientTransport(server); + startTransport(client, mockClientTransportListener); + + // stream1 is created by balancer through Subchannel.asChannel(), which is marked by + // CALL_OPTIONS_RPC_OWNED_BY_BALANCER in CallOptions. It won't be counted for in-use signal. + ClientStream stream1 = client.newStream( + methodDescriptor, new Metadata(), + callOptions.withOption(GrpcUtil.CALL_OPTIONS_RPC_OWNED_BY_BALANCER, Boolean.TRUE)); + ClientStreamListenerBase clientStreamListener1 = new ClientStreamListenerBase(); + stream1.start(clientStreamListener1); + MockServerTransportListener serverTransportListener + = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + StreamCreation serverStreamCreation1 + = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + + // stream2 is the normal RPC, and will be counted for in-use + ClientStream stream2 = client.newStream(methodDescriptor, new Metadata(), callOptions); + ClientStreamListenerBase clientStreamListener2 = new ClientStreamListenerBase(); + stream2.start(clientStreamListener2); + verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true); + StreamCreation serverStreamCreation2 + = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + + stream2.halfClose(); + verify(mockClientTransportListener, never()).transportInUse(false); + serverStreamCreation2.stream.close(Status.OK, new Metadata()); + // As soon as stream2 is closed, even though stream1 is still open, the transport will report + // in-use == false. + verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false); + + stream1.halfClose(); + serverStreamCreation1.stream.close(Status.OK, new Metadata()); + // Verify that the callback has been called only once for true and false respectively + verify(mockClientTransportListener).transportInUse(true); + verify(mockClientTransportListener).transportInUse(false); + } + @Test public void transportInUse_normalClose() throws Exception { server.start(serverListener);