core: add internal Subchannel.asChannel() (#4950)

Returns a Channel that allows a LoadBalancer to make auxiliary RPCs on already-established application connections. We need this to implement client-side health-checking (#4932)

See comments on the API for its semantics.

Notable changes:

- Transports are modified to use InUseStateAggregator so that they can exclude RPCs made on Subchannel.asChannel() when reporting in-use state for idle mode.
- OobChannel shares the same Executor as Subchannel.asChannel(). Because the latter is not a ManagedChannel and doesn't have life-cycle, thus can't determine when to return the Executor to a pool, the Executor is now returned only when ManagedChannelImpl is terminated.
This commit is contained in:
Kun Zhang 2018-10-15 15:39:21 -07:00 committed by GitHub
parent ef8a84421d
commit c528df8ae8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 537 additions and 102 deletions

View File

@ -655,6 +655,35 @@ public abstract class LoadBalancer {
* @since 1.2.0
*/
public abstract Attributes getAttributes();
/**
* (Internal use only) returns a {@link Channel} that is backed by this Subchannel. This allows
* a LoadBalancer to issue its own RPCs for auxiliary purposes, such as health-checking, on
* already-established connections. This channel has certain restrictions:
* <ol>
* <li>It can issue RPCs only if the Subchannel is {@code READY}. If {@link
* Channel#newCall} is called when the Subchannel is not {@code READY}, the RPC will fail
* immediately.</li>
* <li>It doesn't support {@link CallOptions#withWaitForReady wait-for-ready} RPCs. Such RPCs
* will fail immediately.</li>
* </ol>
*
* <p>RPCs made on this Channel is not counted when determining ManagedChannel's {@link
* ManagedChannelBuilder#idleTimeout idle mode}. In other words, they won't prevent
* ManagedChannel from entering idle mode.
*
* <p>Warning: RPCs made on this channel will prevent a shut-down transport from terminating. If
* you make long-running RPCs, you need to make sure they will finish in time after the
* Subchannel has transitioned away from {@code READY} state
* (notified through {@link #handleSubchannelState}).
*
* <p>Warning: this is INTERNAL API, is not supposed to be used by external users, and may
* change without notice. If you think you must use it, please file an issue.
*/
@Internal
public Channel asChannel() {
throw new UnsupportedOperationException();
}
}
/**

View File

@ -42,6 +42,7 @@ import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.InUseStateAggregator;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.NoopClientStream;
import io.grpc.internal.ObjectPool;
@ -93,6 +94,19 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private final Attributes attributes = Attributes.newBuilder()
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
.build();
@GuardedBy("this")
private final InUseStateAggregator<InProcessStream> inUseState =
new InUseStateAggregator<InProcessStream>() {
@Override
protected void handleInUse() {
clientTransportListener.transportInUse(true);
}
@Override
protected void handleNotInUse() {
clientTransportListener.transportInUse(false);
}
};
public InProcessTransport(String name, String authority, String userAgent) {
this.name = name;
@ -270,6 +284,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private class InProcessStream {
private final InProcessClientStream clientStream;
private final InProcessServerStream serverStream;
private final CallOptions callOptions;
private final Metadata headers;
private final MethodDescriptor<?, ?> method;
private volatile String authority;
@ -279,6 +294,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
String authority) {
this.method = checkNotNull(method, "method");
this.headers = checkNotNull(headers, "headers");
this.callOptions = checkNotNull(callOptions, "callOptions");
this.authority = authority;
this.clientStream = new InProcessClientStream(callOptions, headers);
this.serverStream = new InProcessServerStream(method, headers);
@ -288,8 +304,10 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private void streamClosed() {
synchronized (InProcessTransport.this) {
boolean justRemovedAnElement = streams.remove(this);
if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
inUseState.updateObjectInUse(this, false);
}
if (streams.isEmpty() && justRemovedAnElement) {
clientTransportListener.transportInUse(false);
if (shutdown) {
notifyTerminated();
}
@ -498,6 +516,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private class InProcessClientStream implements ClientStream {
final StatsTraceContext statsTraceCtx;
final CallOptions callOptions;
@GuardedBy("this")
private ServerStreamListener serverStreamListener;
@GuardedBy("this")
@ -514,6 +533,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private int outboundSeqNo;
InProcessClientStream(CallOptions callOptions, Metadata headers) {
this.callOptions = callOptions;
statsTraceCtx = StatsTraceContext.newClientContext(callOptions, headers);
}
@ -652,8 +672,8 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
synchronized (InProcessTransport.this) {
statsTraceCtx.clientOutboundHeaders();
streams.add(InProcessTransport.InProcessStream.this);
if (streams.size() == 1) {
clientTransportListener.transportInUse(true);
if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
inUseState.updateObjectInUse(InProcessTransport.InProcessStream.this, true);
}
serverTransportListener.streamCreated(serverStream, method.getFullMethodName(), headers);
}

View File

@ -25,6 +25,7 @@ import static java.lang.Math.max;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.CallOptions;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.Deadline;
@ -95,6 +96,7 @@ public abstract class AbstractClientStream extends AbstractStream
private final TransportTracer transportTracer;
private final Framer framer;
private boolean shouldBeCountedForInUse;
private boolean useGet;
private Metadata headers;
/**
@ -109,9 +111,11 @@ public abstract class AbstractClientStream extends AbstractStream
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer,
Metadata headers,
CallOptions callOptions,
boolean useGet) {
checkNotNull(headers, "headers");
this.transportTracer = checkNotNull(transportTracer, "transportTracer");
this.shouldBeCountedForInUse = GrpcUtil.shouldBeCountedForInUse(callOptions);
this.useGet = useGet;
if (!useGet) {
framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
@ -172,6 +176,14 @@ public abstract class AbstractClientStream extends AbstractStream
return framer;
}
/**
* Returns true if this stream should be counted when determining the in-use state of the
* transport.
*/
public final boolean shouldBeCountedForInUse() {
return shouldBeCountedForInUse;
}
@Override
public final void request(int numMessages) {
abstractClientStreamSink().request(numMessages);

View File

@ -248,6 +248,22 @@ public final class GrpcUtil {
}
};
/**
* RPCs created on the Channel returned by {@link io.grpc.LoadBalancer.Subchannel#asChannel}
* will have this option with value {@code true}. They will be treated differently from
* the ones created by application.
*/
public static final CallOptions.Key<Boolean> CALL_OPTIONS_RPC_OWNED_BY_BALANCER =
CallOptions.Key.create("io.grpc.internal.CALL_OPTIONS_RPC_OWNED_BY_BALANCER");
/**
* Returns true if an RPC with the given properties should be counted when calculating the
* in-use state of a transport.
*/
public static boolean shouldBeCountedForInUse(CallOptions callOptions) {
return !(Boolean.TRUE.equals(callOptions.getOption(CALL_OPTIONS_RPC_OWNED_BY_BALANCER)));
}
/**
* Returns a proxy detector appropriate for the current environment.
*/

View File

@ -23,7 +23,7 @@ import javax.annotation.concurrent.NotThreadSafe;
* Aggregates the in-use state of a set of objects.
*/
@NotThreadSafe
abstract class InUseStateAggregator<T> {
public abstract class InUseStateAggregator<T> {
private final HashSet<T> inUseObjects = new HashSet<T>();
@ -32,7 +32,7 @@ abstract class InUseStateAggregator<T> {
*
* <p>This may call into {@link #handleInUse} or {@link #handleNotInUse} when appropriate.
*/
final void updateObjectInUse(T object, boolean inUse) {
public final void updateObjectInUse(T object, boolean inUse) {
int origSize = inUseObjects.size();
if (inUse) {
inUseObjects.add(object);
@ -47,7 +47,7 @@ abstract class InUseStateAggregator<T> {
}
}
final boolean isInUse() {
public final boolean isInUse() {
return !inUseObjects.isEmpty();
}
@ -55,10 +55,10 @@ abstract class InUseStateAggregator<T> {
* Called when the aggregated in-use state has changed to true, which means at least one object is
* in use.
*/
abstract void handleInUse();
protected abstract void handleInUse();
/**
* Called when the aggregated in-use state has changed to false, which means no object is in use.
*/
abstract void handleNotInUse();
protected abstract void handleNotInUse();
}

View File

@ -131,12 +131,12 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats> {
private final InUseStateAggregator<ConnectionClientTransport> inUseStateAggregator =
new InUseStateAggregator<ConnectionClientTransport>() {
@Override
void handleInUse() {
protected void handleInUse() {
callback.onInUse(InternalSubchannel.this);
}
@Override
void handleNotInUse() {
protected void handleNotInUse() {
callback.onNotInUse(InternalSubchannel.this);
}
};
@ -215,6 +215,21 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats> {
return null;
}
/**
* Returns a READY transport if there is any, without trying to connect.
*/
@Nullable
ClientTransport getTransport() {
return activeTransport;
}
/**
* Returns the authority string associated with this Subchannel.
*/
String getAuthority() {
return authority;
}
@GuardedBy("lock")
private void startNewTransport() {
Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");

View File

@ -121,7 +121,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
private final ClientTransportFactory transportFactory;
private final Executor executor;
private final ObjectPool<? extends Executor> executorPool;
private final ObjectPool<? extends Executor> oobExecutorPool;
private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
private final ExecutorHolder balancerRpcExecutorHolder;
private final TimeProvider timeProvider;
private final int maxTraceEvents;
@ -515,7 +516,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
AbstractManagedChannelImplBuilder<?> builder,
ClientTransportFactory clientTransportFactory,
BackoffPolicy.Provider backoffPolicyProvider,
ObjectPool<? extends Executor> oobExecutorPool,
ObjectPool<? extends Executor> balancerRpcExecutorPool,
Supplier<Stopwatch> stopwatchSupplier,
List<ClientInterceptor> interceptors,
final TimeProvider timeProvider) {
@ -537,7 +538,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
this.loadBalancerFactory = builder.loadBalancerFactory;
}
this.executorPool = checkNotNull(builder.executorPool, "executorPool");
this.oobExecutorPool = checkNotNull(oobExecutorPool, "oobExecutorPool");
this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
this.executor = checkNotNull(executorPool.getObject(), "executor");
this.delayedTransport = new DelayedClientTransport(this.executor, this.channelExecutor);
this.delayedTransport.start(delayedTransportListener);
@ -835,6 +837,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
terminated = true;
terminatedLatch.countDown();
executorPool.returnObject(executor);
balancerRpcExecutorHolder.release();
// Release the transport factory so that it can deallocate any resources.
transportFactory.close();
}
@ -1153,7 +1156,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
oobChannelTracer = new ChannelTracer(maxTraceEvents, oobChannelCreationTime, "OobChannel");
}
final OobChannel oobChannel = new OobChannel(
authority, oobExecutorPool, transportFactory.getScheduledExecutorService(),
authority, balancerRpcExecutorPool, transportFactory.getScheduledExecutorService(),
channelExecutor, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
if (channelTracer != null) {
channelTracer.reportEvent(new ChannelTrace.Event.Builder()
@ -1455,6 +1458,14 @@ final class ManagedChannelImpl extends ManagedChannel implements
public String toString() {
return subchannel.getLogId().toString();
}
@Override
public Channel asChannel() {
return new SubchannelChannel(
subchannel, balancerRpcExecutorHolder.getExecutor(),
transportFactory.getScheduledExecutorService(),
callTracerFactory.create());
}
}
@Override
@ -1510,16 +1521,41 @@ final class ManagedChannelImpl extends ManagedChannel implements
*/
private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
@Override
void handleInUse() {
protected void handleInUse() {
exitIdleMode();
}
@Override
void handleNotInUse() {
protected void handleNotInUse() {
if (shutdown.get()) {
return;
}
rescheduleIdleTimer();
}
}
/**
* Lazily request for Executor from an executor pool.
*/
private static final class ExecutorHolder {
private final ObjectPool<? extends Executor> pool;
private Executor executor;
ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
this.pool = checkNotNull(executorPool, "executorPool");
}
synchronized Executor getExecutor() {
if (executor == null) {
executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
}
return executor;
}
synchronized void release() {
if (executor != null) {
executor = pool.returnObject(executor);
}
}
}
}

View File

@ -0,0 +1,119 @@
/*
* Copyright 2016 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.Context;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import io.grpc.internal.GrpcUtil;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
final class SubchannelChannel extends Channel {
@VisibleForTesting
static final Status NOT_READY_ERROR =
Status.UNAVAILABLE.withDescription("Subchannel is NOT READY");
@VisibleForTesting
static final Status WAIT_FOR_READY_ERROR =
Status.UNAVAILABLE.withDescription(
"wait-for-ready RPC is not supported on Subchannel.asChannel()");
private static final FailingClientTransport notReadyTransport =
new FailingClientTransport(NOT_READY_ERROR, RpcProgress.REFUSED);
private final InternalSubchannel subchannel;
private final Executor executor;
private final ScheduledExecutorService deadlineCancellationExecutor;
private final CallTracer callsTracer;
private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
@Override
public ClientTransport get(PickSubchannelArgs args) {
ClientTransport transport = subchannel.getTransport();
if (transport == null) {
return notReadyTransport;
} else {
return transport;
}
}
@Override
public <ReqT> RetriableStream<ReqT> newRetriableStream(MethodDescriptor<ReqT, ?> method,
CallOptions callOptions, Metadata headers, Context context) {
throw new UnsupportedOperationException("OobChannel should not create retriable streams");
}
};
SubchannelChannel(
InternalSubchannel subchannel, Executor executor,
ScheduledExecutorService deadlineCancellationExecutor, CallTracer callsTracer) {
this.subchannel = checkNotNull(subchannel, "subchannel");
this.executor = checkNotNull(executor, "executor");
this.deadlineCancellationExecutor =
checkNotNull(deadlineCancellationExecutor, "deadlineCancellationExecutor");
this.callsTracer = checkNotNull(callsTracer, "callsTracer");
}
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
final Executor effectiveExecutor =
callOptions.getExecutor() == null ? executor : callOptions.getExecutor();
if (callOptions.isWaitForReady()) {
return new ClientCall<RequestT, ResponseT>() {
@Override
public void start(final ClientCall.Listener<ResponseT> listener, Metadata headers) {
effectiveExecutor.execute(new Runnable() {
@Override
public void run() {
listener.onClose(WAIT_FOR_READY_ERROR, new Metadata());
}
});
}
@Override
public void request(int numMessages) {}
@Override
public void cancel(String message, Throwable cause) {}
@Override
public void halfClose() {}
@Override
public void sendMessage(RequestT message) {}
};
}
return new ClientCallImpl<RequestT, ResponseT>(methodDescriptor,
effectiveExecutor,
callOptions.withOption(GrpcUtil.CALL_OPTIONS_RPC_OWNED_BY_BALANCER, Boolean.TRUE),
transportProvider, deadlineCancellationExecutor, callsTracer, false /* retryEnabled */);
}
@Override
public String authority() {
return subchannel.getAuthority();
}
}

View File

@ -34,6 +34,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Codec;
import io.grpc.Deadline;
import io.grpc.Metadata;
@ -479,7 +480,7 @@ public class AbstractClientStreamTest {
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer,
boolean useGet) {
super(allocator, statsTraceCtx, transportTracer, new Metadata(), useGet);
super(allocator, statsTraceCtx, transportTracer, new Metadata(), CallOptions.DEFAULT, useGet);
this.state = state;
this.sink = sink;
}

View File

@ -160,7 +160,7 @@ public class ManagedChannelImplTest {
private final EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress);
private final FakeClock timer = new FakeClock();
private final FakeClock executor = new FakeClock();
private final FakeClock oobExecutor = new FakeClock();
private final FakeClock balancerRpcExecutor = new FakeClock();
private static final FakeClock.TaskFilter NAME_RESOLVER_REFRESH_TASK_FILTER =
new FakeClock.TaskFilter() {
@Override
@ -203,7 +203,7 @@ public class ManagedChannelImplTest {
@Mock
private ObjectPool<Executor> executorPool;
@Mock
private ObjectPool<Executor> oobExecutorPool;
private ObjectPool<Executor> balancerRpcExecutorPool;
@Mock
private CallCredentials creds;
private ChannelBuilder channelBuilder;
@ -224,7 +224,7 @@ public class ManagedChannelImplTest {
channel = new ManagedChannelImpl(
channelBuilder, mockTransportFactory, new FakeBackoffPolicyProvider(),
oobExecutorPool, timer.getStopwatchSupplier(), Arrays.asList(interceptors),
balancerRpcExecutorPool, timer.getStopwatchSupplier(), Arrays.asList(interceptors),
fakeClockTimeProvider);
if (requestConnection) {
@ -257,7 +257,8 @@ public class ManagedChannelImplTest {
when(mockTransportFactory.getScheduledExecutorService())
.thenReturn(timer.getScheduledExecutorService());
when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService());
when(oobExecutorPool.getObject()).thenReturn(oobExecutor.getScheduledExecutorService());
when(balancerRpcExecutorPool.getObject())
.thenReturn(balancerRpcExecutor.getScheduledExecutorService());
channelBuilder = new ChannelBuilder()
.nameResolverFactory(new FakeNameResolverFactory.Builder(expectedUri).build())
@ -546,7 +547,7 @@ public class ManagedChannelImplTest {
transportListener.transportTerminated();
assertTrue(channel.isTerminated());
verify(executorPool).returnObject(executor.getScheduledExecutorService());
verifyNoMoreInteractions(oobExecutorPool);
verifyNoMoreInteractions(balancerRpcExecutorPool);
verify(mockTransportFactory)
.newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
@ -1171,7 +1172,7 @@ public class ManagedChannelImplTest {
ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1authority");
ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2authority");
verify(oobExecutorPool, times(2)).getObject();
verify(balancerRpcExecutorPool, times(2)).getObject();
assertEquals("oob1authority", oob1.authority());
assertEquals("oob2authority", oob2.authority());
@ -1187,9 +1188,9 @@ public class ManagedChannelImplTest {
MockClientTransportInfo transportInfo = transports.poll();
assertNotNull(transportInfo);
assertEquals(0, oobExecutor.numPendingTasks());
assertEquals(0, balancerRpcExecutor.numPendingTasks());
transportInfo.listener.transportReady();
assertEquals(1, oobExecutor.runDueTasks());
assertEquals(1, balancerRpcExecutor.runDueTasks());
verify(transportInfo.transport).newStream(same(method), same(headers),
same(CallOptions.DEFAULT));
@ -1211,9 +1212,9 @@ public class ManagedChannelImplTest {
// This transport fails
Status transportError = Status.UNAVAILABLE.withDescription("Connection refused");
assertEquals(0, oobExecutor.numPendingTasks());
assertEquals(0, balancerRpcExecutor.numPendingTasks());
transportInfo.listener.transportShutdown(transportError);
assertTrue(oobExecutor.runDueTasks() > 0);
assertTrue(balancerRpcExecutor.runDueTasks() > 0);
// Fail-fast RPC will fail, while wait-for-ready RPC will still be pending
verify(mockCallListener2).onClose(same(transportError), any(Metadata.class));
@ -1223,20 +1224,19 @@ public class ManagedChannelImplTest {
assertFalse(oob1.isShutdown());
assertFalse(oob2.isShutdown());
oob1.shutdown();
verify(oobExecutorPool, never()).returnObject(anyObject());
oob2.shutdownNow();
assertTrue(oob1.isShutdown());
assertTrue(oob2.isShutdown());
assertTrue(oob2.isTerminated());
verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService());
verify(balancerRpcExecutorPool).returnObject(balancerRpcExecutor.getScheduledExecutorService());
// New RPCs will be rejected.
assertEquals(0, oobExecutor.numPendingTasks());
assertEquals(0, balancerRpcExecutor.numPendingTasks());
ClientCall<String, Integer> call4 = oob1.newCall(method, CallOptions.DEFAULT);
ClientCall<String, Integer> call5 = oob2.newCall(method, CallOptions.DEFAULT);
call4.start(mockCallListener4, headers);
call5.start(mockCallListener5, headers);
assertTrue(oobExecutor.runDueTasks() > 0);
assertTrue(balancerRpcExecutor.runDueTasks() > 0);
verify(mockCallListener4).onClose(statusCaptor.capture(), any(Metadata.class));
Status status4 = statusCaptor.getValue();
assertEquals(Status.Code.UNAVAILABLE, status4.getCode());
@ -1248,9 +1248,9 @@ public class ManagedChannelImplTest {
verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class));
// This will shutdownNow() the delayed transport, terminating the pending RPC
assertEquals(0, oobExecutor.numPendingTasks());
assertEquals(0, balancerRpcExecutor.numPendingTasks());
oob1.shutdownNow();
assertTrue(oobExecutor.runDueTasks() > 0);
assertTrue(balancerRpcExecutor.runDueTasks() > 0);
verify(mockCallListener3).onClose(any(Status.class), any(Metadata.class));
// Shut down the channel, and it will not terminated because OOB channel has not.
@ -1259,11 +1259,12 @@ public class ManagedChannelImplTest {
// Delayed transport has already terminated. Terminating the transport terminates the
// subchannel, which in turn terimates the OOB channel, which terminates the channel.
assertFalse(oob1.isTerminated());
verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService());
verify(balancerRpcExecutorPool).returnObject(balancerRpcExecutor.getScheduledExecutorService());
transportInfo.listener.transportTerminated();
assertTrue(oob1.isTerminated());
assertTrue(channel.isTerminated());
verify(oobExecutorPool, times(2)).returnObject(oobExecutor.getScheduledExecutorService());
verify(balancerRpcExecutorPool, times(2))
.returnObject(balancerRpcExecutor.getScheduledExecutorService());
}
@Test
@ -1328,6 +1329,93 @@ public class ManagedChannelImplTest {
.newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
}
@Test
public void subchannelChannel_normalUsage() {
createChannel();
Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
verify(balancerRpcExecutorPool, never()).getObject();
Channel sChannel = subchannel.asChannel();
verify(balancerRpcExecutorPool).getObject();
Metadata headers = new Metadata();
CallOptions callOptions = CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS);
// Subchannel must be READY when creating the RPC.
subchannel.requestConnection();
verify(mockTransportFactory)
.newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
transportListener.transportReady();
ClientCall<String, Integer> call = sChannel.newCall(method, callOptions);
call.start(mockCallListener, headers);
verify(mockTransport).newStream(same(method), same(headers), callOptionsCaptor.capture());
CallOptions capturedCallOption = callOptionsCaptor.getValue();
assertThat(capturedCallOption.getDeadline()).isSameAs(callOptions.getDeadline());
assertThat(capturedCallOption.getOption(GrpcUtil.CALL_OPTIONS_RPC_OWNED_BY_BALANCER)).isTrue();
}
@Test
public void subchannelChannel_failWhenNotReady() {
createChannel();
Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
Channel sChannel = subchannel.asChannel();
Metadata headers = new Metadata();
subchannel.requestConnection();
verify(mockTransportFactory)
.newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
assertEquals(0, balancerRpcExecutor.numPendingTasks());
// Subchannel is still CONNECTING, but not READY yet
ClientCall<String, Integer> call = sChannel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, headers);
verify(mockTransport, never()).newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
verifyZeroInteractions(mockCallListener);
assertEquals(1, balancerRpcExecutor.runDueTasks());
verify(mockCallListener).onClose(
same(SubchannelChannel.NOT_READY_ERROR), any(Metadata.class));
}
@Test
public void subchannelChannel_failWaitForReady() {
createChannel();
Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
Channel sChannel = subchannel.asChannel();
Metadata headers = new Metadata();
// Subchannel must be READY when creating the RPC.
subchannel.requestConnection();
verify(mockTransportFactory)
.newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
transportListener.transportReady();
assertEquals(0, balancerRpcExecutor.numPendingTasks());
// Wait-for-ready RPC is not allowed
ClientCall<String, Integer> call =
sChannel.newCall(method, CallOptions.DEFAULT.withWaitForReady());
call.start(mockCallListener, headers);
verify(mockTransport, never()).newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
verifyZeroInteractions(mockCallListener);
assertEquals(1, balancerRpcExecutor.runDueTasks());
verify(mockCallListener).onClose(
same(SubchannelChannel.WAIT_FOR_READY_ERROR), any(Metadata.class));
}
@Test
public void refreshNameResolutionWhenSubchannelConnectionFailed() {
subtestRefreshNameResolutionWhenConnectionFailed(false);
@ -1818,7 +1906,7 @@ public class ManagedChannelImplTest {
// Channel is dead. No more pending task to possibly revive it.
assertEquals(0, timer.numPendingTasks());
assertEquals(0, executor.numPendingTasks());
assertEquals(0, oobExecutor.numPendingTasks());
assertEquals(0, balancerRpcExecutor.numPendingTasks());
}
private void verifyCallListenerClosed(

View File

@ -95,7 +95,7 @@ class CronetClientStream extends AbstractClientStream {
CallOptions callOptions,
TransportTracer transportTracer) {
super(
new CronetWritableBufferAllocator(), statsTraceCtx, transportTracer, headers,
new CronetWritableBufferAllocator(), statsTraceCtx, transportTracer, headers, callOptions,
method.isSafe());
this.url = Preconditions.checkNotNull(url, "url");
this.userAgent = Preconditions.checkNotNull(userAgent, "userAgent");

View File

@ -26,17 +26,16 @@ import io.netty.handler.codec.http2.Http2Headers;
class CreateStreamCommand extends WriteQueue.AbstractQueuedCommand {
private final Http2Headers headers;
private final NettyClientStream.TransportState stream;
private final boolean shouldBeCountedForInUse;
private final boolean get;
CreateStreamCommand(Http2Headers headers,
NettyClientStream.TransportState stream) {
this(headers, stream, false);
}
CreateStreamCommand(Http2Headers headers,
NettyClientStream.TransportState stream, boolean get) {
CreateStreamCommand(
Http2Headers headers,
NettyClientStream.TransportState stream,
boolean shouldBeCountedForInUse, boolean get) {
this.stream = Preconditions.checkNotNull(stream, "stream");
this.headers = Preconditions.checkNotNull(headers, "headers");
this.shouldBeCountedForInUse = shouldBeCountedForInUse;
this.get = get;
}
@ -48,6 +47,10 @@ class CreateStreamCommand extends WriteQueue.AbstractQueuedCommand {
return headers;
}
boolean shouldBeCountedForInUse() {
return shouldBeCountedForInUse;
}
boolean isGet() {
return get;
}

View File

@ -32,6 +32,7 @@ import io.grpc.internal.ClientStreamListener.RpcProgress;
import io.grpc.internal.ClientTransport.PingCallback;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.Http2Ping;
import io.grpc.internal.InUseStateAggregator;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.TransportTracer;
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder;
@ -104,6 +105,19 @@ class NettyClientHandler extends AbstractNettyHandler {
private final TransportTracer transportTracer;
private final Attributes eagAttributes;
private final String authority;
private final InUseStateAggregator<Http2Stream> inUseState =
new InUseStateAggregator<Http2Stream>() {
@Override
protected void handleInUse() {
lifecycleManager.notifyInUse(true);
}
@Override
protected void handleNotInUse() {
lifecycleManager.notifyInUse(false);
}
};
private WriteQueue clientWriteQueue;
private Http2Ping ping;
private Attributes attributes = Attributes.EMPTY;
@ -256,26 +270,20 @@ class NettyClientHandler extends AbstractNettyHandler {
@Override
public void onStreamActive(Http2Stream stream) {
if (connection().numActiveStreams() != 1) {
return;
}
NettyClientHandler.this.lifecycleManager.notifyInUse(true);
if (NettyClientHandler.this.keepAliveManager != null) {
if (connection().numActiveStreams() == 1
&& NettyClientHandler.this.keepAliveManager != null) {
NettyClientHandler.this.keepAliveManager.onTransportActive();
}
}
@Override
public void onStreamClosed(Http2Stream stream) {
if (connection().numActiveStreams() != 0) {
return;
}
NettyClientHandler.this.lifecycleManager.notifyInUse(false);
if (NettyClientHandler.this.keepAliveManager != null) {
// Although streams with CALL_OPTIONS_RPC_OWNED_BY_BALANCER are not marked as "in-use" in
// the first place, we don't propagate that option here, and it's safe to reset the in-use
// state for them, which will be a cheap no-op.
inUseState.updateObjectInUse(stream, false);
if (connection().numActiveStreams() == 0
&& NettyClientHandler.this.keepAliveManager != null) {
NettyClientHandler.this.keepAliveManager.onTransportIdle();
}
}
@ -482,7 +490,7 @@ class NettyClientHandler extends AbstractNettyHandler {
* Attempts to create a new stream from the given command. If there are too many active streams,
* the creation request is queued.
*/
private void createStream(CreateStreamCommand command, final ChannelPromise promise)
private void createStream(final CreateStreamCommand command, final ChannelPromise promise)
throws Exception {
if (lifecycleManager.getShutdownThrowable() != null) {
// The connection is going away (it is really the GOAWAY case),
@ -530,6 +538,12 @@ class NettyClientHandler extends AbstractNettyHandler {
stream.getStatsTraceContext().clientOutboundHeaders();
http2Stream.setProperty(streamKey, stream);
// This delays the in-use state until the I/O completes, which technically may
// be later than we would like.
if (command.shouldBeCountedForInUse()) {
inUseState.updateObjectInUse(http2Stream, true);
}
// Attach the client stream to the HTTP/2 stream object as user data.
stream.setHttp2Stream(http2Stream);
}

View File

@ -24,6 +24,7 @@ import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import com.google.common.base.Preconditions;
import com.google.common.io.BaseEncoding;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.InternalKnownTransport;
import io.grpc.InternalMethodDescriptor;
import io.grpc.Metadata;
@ -70,12 +71,14 @@ class NettyClientStream extends AbstractClientStream {
AsciiString scheme,
AsciiString userAgent,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) {
TransportTracer transportTracer,
CallOptions callOptions) {
super(
new NettyWritableBufferAllocator(channel.alloc()),
statsTraceCtx,
transportTracer,
headers,
callOptions,
useGet(method));
this.state = checkNotNull(state, "transportState");
this.writeQueue = state.handler.getWriteQueue();
@ -152,7 +155,8 @@ class NettyClientStream extends AbstractClientStream {
};
// Write the command requesting the creation of the stream.
writeQueue.enqueue(new CreateStreamCommand(http2Headers, transportState(), get),
writeQueue.enqueue(
new CreateStreamCommand(http2Headers, transportState(), shouldBeCountedForInUse(), get),
!method.getType().clientSendsOneMessage() || get).addListener(failureListener);
}

View File

@ -178,7 +178,8 @@ class NettyClientTransport implements ConnectionClientTransport {
negotiationHandler.scheme(),
userAgent,
statsTraceCtx,
transportTracer);
transportTracer,
callOptions);
}
@SuppressWarnings("unchecked")

View File

@ -193,7 +193,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
receiveMaxConcurrentStreams(0);
// Create a new stream with id 3.
ChannelFuture createFuture = enqueue(
new CreateStreamCommand(grpcHeaders, streamTransportState));
newCreateStreamCommand(grpcHeaders, streamTransportState));
assertEquals(3, streamTransportState.id());
// Cancel the stream.
cancelStream(Status.CANCELLED);
@ -328,7 +328,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
public void receivedGoAwayShouldCancelBufferedStream() throws Exception {
// Force the stream to be buffered.
receiveMaxConcurrentStreams(0);
ChannelFuture future = enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
channelRead(goAwayFrame(0));
assertTrue(future.isDone());
assertFalse(future.isSuccess());
@ -339,7 +339,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
@Test
public void receivedGoAwayShouldRefuseLaterStreamId() throws Exception {
ChannelFuture future = enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
channelRead(goAwayFrame(streamId - 1));
verify(streamListener).closed(any(Status.class), eq(REFUSED), any(Metadata.class));
assertTrue(future.isDone());
@ -347,7 +347,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
@Test
public void receivedGoAwayShouldNotAffectEarlyStreamId() throws Exception {
ChannelFuture future = enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
channelRead(goAwayFrame(streamId));
verify(streamListener, never())
.closed(any(Status.class), any(Metadata.class));
@ -358,7 +358,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
@Test
public void receivedResetWithRefuseCode() throws Exception {
ChannelFuture future = enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
channelRead(rstStreamFrame(streamId, (int) Http2Error.REFUSED_STREAM.code() ));
verify(streamListener).closed(any(Status.class), eq(REFUSED), any(Metadata.class));
assertTrue(future.isDone());
@ -366,7 +366,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
@Test
public void receivedResetWithCanceCode() throws Exception {
ChannelFuture future = enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
channelRead(rstStreamFrame(streamId, (int) Http2Error.CANCEL.code()));
verify(streamListener).closed(any(Status.class), eq(PROCESSED), any(Metadata.class));
assertTrue(future.isDone());
@ -374,7 +374,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
@Test
public void receivedGoAwayShouldFailUnknownStreams() throws Exception {
enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
// Read a GOAWAY that indicates our stream was never processed by the server.
channelRead(goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8)));
@ -389,7 +389,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
public void receivedGoAwayShouldFailUnknownBufferedStreams() throws Exception {
receiveMaxConcurrentStreams(0);
ChannelFuture future = enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
// Read a GOAWAY that indicates our stream was never processed by the server.
channelRead(goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8)));
@ -407,7 +407,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
channelRead(goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8)));
// Now try to create a stream.
ChannelFuture future = enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
assertTrue(future.isDone());
assertFalse(future.isSuccess());
Status status = Status.fromThrowable(future.cause());
@ -461,7 +461,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
@Test
public void cancelStreamShouldCreateAndThenFailBufferedStream() throws Exception {
receiveMaxConcurrentStreams(0);
enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
assertEquals(3, streamTransportState.id());
cancelStream(Status.CANCELLED);
verify(streamListener).closed(eq(Status.CANCELLED), same(PROCESSED), any(Metadata.class));
@ -471,7 +471,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
public void channelShutdownShouldCancelBufferedStreams() throws Exception {
// Force a stream to get added to the pending queue.
receiveMaxConcurrentStreams(0);
ChannelFuture future = enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
handler().channelInactive(ctx());
assertTrue(future.isDone());
@ -507,7 +507,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
@Test
public void createIncrementsIdsForActualAndBufferdStreams() throws Exception {
receiveMaxConcurrentStreams(2);
enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
assertEquals(3, streamTransportState.id());
streamTransportState = new TransportStateImpl(
@ -516,7 +516,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
DEFAULT_MAX_MESSAGE_SIZE,
transportTracer);
streamTransportState.setListener(streamListener);
enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
assertEquals(5, streamTransportState.id());
streamTransportState = new TransportStateImpl(
@ -525,7 +525,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
DEFAULT_MAX_MESSAGE_SIZE,
transportTracer);
streamTransportState.setListener(streamListener);
enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
assertEquals(7, streamTransportState.id());
verify(mockKeepAliveManager, times(1)).onTransportActive(); // onStreamActive
@ -682,7 +682,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
@CanIgnoreReturnValue
private ChannelFuture createStream() throws Exception {
ChannelFuture future = enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
return future;
}
@ -736,6 +736,11 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
return new AsciiString(string);
}
private static CreateStreamCommand newCreateStreamCommand(
Http2Headers headers, NettyClientStream.TransportState stream) {
return new CreateStreamCommand(headers, stream, true, false);
}
private static class PingCallbackImpl implements ClientTransport.PingCallback {
int invocationCount;
long roundTripTime;

View File

@ -43,6 +43,7 @@ import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.io.BaseEncoding;
import io.grpc.CallOptions;
import io.grpc.InternalStatus;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
@ -417,7 +418,8 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
AsciiString.of("http"),
AsciiString.of("agent"),
StatsTraceContext.NOOP,
transportTracer);
transportTracer,
CallOptions.DEFAULT);
stream.start(listener);
stream().transportState().setId(STREAM_ID);
verify(listener, never()).onReady();
@ -447,7 +449,8 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
AsciiString.of("http"),
AsciiString.of("good agent"),
StatsTraceContext.NOOP,
transportTracer);
transportTracer,
CallOptions.DEFAULT);
stream.start(listener);
ArgumentCaptor<CreateStreamCommand> cmdCap = ArgumentCaptor.forClass(CreateStreamCommand.class);
@ -476,7 +479,8 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
AsciiString.of("http"),
AsciiString.of("agent"),
StatsTraceContext.NOOP,
transportTracer);
transportTracer,
CallOptions.DEFAULT);
stream.start(listener);
stream.transportState().setId(STREAM_ID);
stream.transportState().setHttp2Stream(http2Stream);
@ -508,7 +512,8 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
AsciiString.of("http"),
AsciiString.of("agent"),
StatsTraceContext.NOOP,
transportTracer);
transportTracer,
CallOptions.DEFAULT);
stream.start(listener);
stream.transportState().setId(STREAM_ID);
stream.transportState().setHttp2Stream(http2Stream);

View File

@ -22,6 +22,7 @@ import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
import com.google.common.io.BaseEncoding;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
@ -73,12 +74,14 @@ class OkHttpClientStream extends AbstractClientStream {
String authority,
String userAgent,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) {
TransportTracer transportTracer,
CallOptions callOptions) {
super(
new OkHttpWritableBufferAllocator(),
statsTraceCtx,
transportTracer,
headers,
callOptions,
method.isSafe());
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
this.method = method;

View File

@ -48,6 +48,7 @@ import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.Http2Ping;
import io.grpc.internal.InUseStateAggregator;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
import io.grpc.internal.ProxyParameters;
@ -171,7 +172,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
@GuardedBy("lock")
private boolean stopped;
@GuardedBy("lock")
private boolean inUse;
private boolean hasStream;
private SSLSocketFactory sslSocketFactory;
private HostnameVerifier hostnameVerifier;
private Socket socket;
@ -192,6 +193,19 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
@GuardedBy("lock")
private final TransportTracer transportTracer;
@GuardedBy("lock")
private final InUseStateAggregator<OkHttpClientStream> inUseState =
new InUseStateAggregator<OkHttpClientStream>() {
@Override
protected void handleInUse() {
listener.transportInUse(true);
}
@Override
protected void handleNotInUse() {
listener.transportInUse(false);
}
};
@GuardedBy("lock")
private InternalChannelz.Security securityInfo;
@VisibleForTesting
@ -347,7 +361,8 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
defaultAuthority,
userAgent,
statsTraceCtx,
transportTracer);
transportTracer,
callOptions);
}
@GuardedBy("lock")
@ -357,7 +372,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
goAwayStatus, RpcProgress.REFUSED, true, new Metadata());
} else if (streams.size() >= maxConcurrentStreams) {
pendingStreams.add(clientStream);
setInUse();
setInUse(clientStream);
} else {
startStream(clientStream);
}
@ -368,7 +383,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
Preconditions.checkState(
stream.id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned");
streams.put(nextStreamId, stream);
setInUse();
setInUse(stream);
stream.transportState().start(nextStreamId);
// For unary and server streaming, there will be a data frame soon, no need to flush the header.
if ((stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING)
@ -406,7 +421,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
@GuardedBy("lock")
void removePendingStream(OkHttpClientStream pendingStream) {
pendingStreams.remove(pendingStream);
maybeClearInUse();
maybeClearInUse(pendingStream);
}
@Override
@ -689,13 +704,14 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
Map.Entry<Integer, OkHttpClientStream> entry = it.next();
it.remove();
entry.getValue().transportState().transportReportStatus(reason, false, new Metadata());
maybeClearInUse(entry.getValue());
}
for (OkHttpClientStream stream : pendingStreams) {
stream.transportState().transportReportStatus(reason, true, new Metadata());
maybeClearInUse(stream);
}
pendingStreams.clear();
maybeClearInUse();
stopIfNecessary();
}
@ -764,15 +780,16 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
it.remove();
entry.getValue().transportState().transportReportStatus(
status, RpcProgress.REFUSED, false, new Metadata());
maybeClearInUse(entry.getValue());
}
}
for (OkHttpClientStream stream : pendingStreams) {
stream.transportState().transportReportStatus(
status, RpcProgress.REFUSED, true, new Metadata());
maybeClearInUse(stream);
}
pendingStreams.clear();
maybeClearInUse();
stopIfNecessary();
}
@ -817,7 +834,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
}
if (!startPendingStreams()) {
stopIfNecessary();
maybeClearInUse();
maybeClearInUse(stream);
}
}
}
@ -860,11 +877,10 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
}
@GuardedBy("lock")
private void maybeClearInUse() {
if (inUse) {
private void maybeClearInUse(OkHttpClientStream stream) {
if (hasStream) {
if (pendingStreams.isEmpty() && streams.isEmpty()) {
inUse = false;
listener.transportInUse(false);
hasStream = false;
if (keepAliveManager != null) {
// We don't have any active streams. No need to do keepalives any more.
// Again, we have to call this inside the lock to avoid the race between onTransportIdle
@ -873,13 +889,15 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
}
}
}
if (stream.shouldBeCountedForInUse()) {
inUseState.updateObjectInUse(stream, false);
}
}
@GuardedBy("lock")
private void setInUse() {
if (!inUse) {
inUse = true;
listener.transportInUse(true);
private void setInUse(OkHttpClientStream stream) {
if (!hasStream) {
hasStream = true;
if (keepAliveManager != null) {
// We have a new stream. We might need to do keepalives now.
// Note that we have to do this inside the lock to avoid calling
@ -888,6 +906,9 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
keepAliveManager.onTransportActive();
}
}
if (stream.shouldBeCountedForInUse()) {
inUseState.updateObjectInUse(stream, true);
}
}
private Throwable getPingFailure() {

View File

@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.io.BaseEncoding;
import io.grpc.CallOptions;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
@ -90,7 +91,8 @@ public class OkHttpClientStreamTest {
"localhost",
"userAgent",
StatsTraceContext.NOOP,
transportTracer);
transportTracer,
CallOptions.DEFAULT);
}
@Test
@ -149,7 +151,7 @@ public class OkHttpClientStreamTest {
metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application");
stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport,
flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application",
StatsTraceContext.NOOP, transportTracer);
StatsTraceContext.NOOP, transportTracer, CallOptions.DEFAULT);
stream.start(new BaseClientStreamListener());
stream.transportState().start(3);
@ -164,7 +166,7 @@ public class OkHttpClientStreamTest {
metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application");
stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport,
flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application",
StatsTraceContext.NOOP, transportTracer);
StatsTraceContext.NOOP, transportTracer, CallOptions.DEFAULT);
stream.start(new BaseClientStreamListener());
stream.transportState().start(3);
@ -192,7 +194,7 @@ public class OkHttpClientStreamTest {
.build();
stream = new OkHttpClientStream(getMethod, new Metadata(), frameWriter, transport,
flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application",
StatsTraceContext.NOOP, transportTracer);
StatsTraceContext.NOOP, transportTracer, CallOptions.DEFAULT);
stream.start(new BaseClientStreamListener());
// GET streams send headers after halfClose is called.

View File

@ -59,6 +59,7 @@ import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientTransport;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.InternalServer;
import io.grpc.internal.IoUtils;
import io.grpc.internal.ManagedClientTransport;
@ -612,6 +613,46 @@ public abstract class AbstractTransportTest {
assertNull(serverStreamTracer1.getServerCallInfo());
}
@Test
public void transportInUse_balancerRpcsNotCounted() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
startTransport(client, mockClientTransportListener);
// stream1 is created by balancer through Subchannel.asChannel(), which is marked by
// CALL_OPTIONS_RPC_OWNED_BY_BALANCER in CallOptions. It won't be counted for in-use signal.
ClientStream stream1 = client.newStream(
methodDescriptor, new Metadata(),
callOptions.withOption(GrpcUtil.CALL_OPTIONS_RPC_OWNED_BY_BALANCER, Boolean.TRUE));
ClientStreamListenerBase clientStreamListener1 = new ClientStreamListenerBase();
stream1.start(clientStreamListener1);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
StreamCreation serverStreamCreation1
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
// stream2 is the normal RPC, and will be counted for in-use
ClientStream stream2 = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener2 = new ClientStreamListenerBase();
stream2.start(clientStreamListener2);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true);
StreamCreation serverStreamCreation2
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
stream2.halfClose();
verify(mockClientTransportListener, never()).transportInUse(false);
serverStreamCreation2.stream.close(Status.OK, new Metadata());
// As soon as stream2 is closed, even though stream1 is still open, the transport will report
// in-use == false.
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false);
stream1.halfClose();
serverStreamCreation1.stream.close(Status.OK, new Metadata());
// Verify that the callback has been called only once for true and false respectively
verify(mockClientTransportListener).transportInUse(true);
verify(mockClientTransportListener).transportInUse(false);
}
@Test
public void transportInUse_normalClose() throws Exception {
server.start(serverListener);