diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java
index f1cef63fba..2c16b101dc 100644
--- a/core/src/main/java/io/grpc/LoadBalancer.java
+++ b/core/src/main/java/io/grpc/LoadBalancer.java
@@ -655,6 +655,35 @@ public abstract class LoadBalancer {
* @since 1.2.0
*/
public abstract Attributes getAttributes();
+
+ /**
+ * (Internal use only) returns a {@link Channel} that is backed by this Subchannel. This allows
+ * a LoadBalancer to issue its own RPCs for auxiliary purposes, such as health-checking, on
+ * already-established connections. This channel has certain restrictions:
+ *
+ *
It can issue RPCs only if the Subchannel is {@code READY}. If {@link
+ * Channel#newCall} is called when the Subchannel is not {@code READY}, the RPC will fail
+ * immediately.
+ *
It doesn't support {@link CallOptions#withWaitForReady wait-for-ready} RPCs. Such RPCs
+ * will fail immediately.
+ *
+ *
+ *
RPCs made on this Channel is not counted when determining ManagedChannel's {@link
+ * ManagedChannelBuilder#idleTimeout idle mode}. In other words, they won't prevent
+ * ManagedChannel from entering idle mode.
+ *
+ *
Warning: RPCs made on this channel will prevent a shut-down transport from terminating. If
+ * you make long-running RPCs, you need to make sure they will finish in time after the
+ * Subchannel has transitioned away from {@code READY} state
+ * (notified through {@link #handleSubchannelState}).
+ *
+ *
Warning: this is INTERNAL API, is not supposed to be used by external users, and may
+ * change without notice. If you think you must use it, please file an issue.
+ */
+ @Internal
+ public Channel asChannel() {
+ throw new UnsupportedOperationException();
+ }
}
/**
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java
index 03e2e1a197..6f9c3b3cf5 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java
@@ -42,6 +42,7 @@ import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.InUseStateAggregator;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.NoopClientStream;
import io.grpc.internal.ObjectPool;
@@ -93,6 +94,19 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private final Attributes attributes = Attributes.newBuilder()
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
.build();
+ @GuardedBy("this")
+ private final InUseStateAggregator inUseState =
+ new InUseStateAggregator() {
+ @Override
+ protected void handleInUse() {
+ clientTransportListener.transportInUse(true);
+ }
+
+ @Override
+ protected void handleNotInUse() {
+ clientTransportListener.transportInUse(false);
+ }
+ };
public InProcessTransport(String name, String authority, String userAgent) {
this.name = name;
@@ -270,6 +284,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private class InProcessStream {
private final InProcessClientStream clientStream;
private final InProcessServerStream serverStream;
+ private final CallOptions callOptions;
private final Metadata headers;
private final MethodDescriptor, ?> method;
private volatile String authority;
@@ -279,6 +294,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
String authority) {
this.method = checkNotNull(method, "method");
this.headers = checkNotNull(headers, "headers");
+ this.callOptions = checkNotNull(callOptions, "callOptions");
this.authority = authority;
this.clientStream = new InProcessClientStream(callOptions, headers);
this.serverStream = new InProcessServerStream(method, headers);
@@ -288,8 +304,10 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private void streamClosed() {
synchronized (InProcessTransport.this) {
boolean justRemovedAnElement = streams.remove(this);
+ if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
+ inUseState.updateObjectInUse(this, false);
+ }
if (streams.isEmpty() && justRemovedAnElement) {
- clientTransportListener.transportInUse(false);
if (shutdown) {
notifyTerminated();
}
@@ -498,6 +516,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private class InProcessClientStream implements ClientStream {
final StatsTraceContext statsTraceCtx;
+ final CallOptions callOptions;
@GuardedBy("this")
private ServerStreamListener serverStreamListener;
@GuardedBy("this")
@@ -514,6 +533,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private int outboundSeqNo;
InProcessClientStream(CallOptions callOptions, Metadata headers) {
+ this.callOptions = callOptions;
statsTraceCtx = StatsTraceContext.newClientContext(callOptions, headers);
}
@@ -652,8 +672,8 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
synchronized (InProcessTransport.this) {
statsTraceCtx.clientOutboundHeaders();
streams.add(InProcessTransport.InProcessStream.this);
- if (streams.size() == 1) {
- clientTransportListener.transportInUse(true);
+ if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
+ inUseState.updateObjectInUse(InProcessTransport.InProcessStream.this, true);
}
serverTransportListener.streamCreated(serverStream, method.getFullMethodName(), headers);
}
diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java
index c100998a13..abd28b1ae7 100644
--- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java
@@ -25,6 +25,7 @@ import static java.lang.Math.max;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import io.grpc.CallOptions;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.Deadline;
@@ -95,6 +96,7 @@ public abstract class AbstractClientStream extends AbstractStream
private final TransportTracer transportTracer;
private final Framer framer;
+ private boolean shouldBeCountedForInUse;
private boolean useGet;
private Metadata headers;
/**
@@ -109,9 +111,11 @@ public abstract class AbstractClientStream extends AbstractStream
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer,
Metadata headers,
+ CallOptions callOptions,
boolean useGet) {
checkNotNull(headers, "headers");
this.transportTracer = checkNotNull(transportTracer, "transportTracer");
+ this.shouldBeCountedForInUse = GrpcUtil.shouldBeCountedForInUse(callOptions);
this.useGet = useGet;
if (!useGet) {
framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
@@ -172,6 +176,14 @@ public abstract class AbstractClientStream extends AbstractStream
return framer;
}
+ /**
+ * Returns true if this stream should be counted when determining the in-use state of the
+ * transport.
+ */
+ public final boolean shouldBeCountedForInUse() {
+ return shouldBeCountedForInUse;
+ }
+
@Override
public final void request(int numMessages) {
abstractClientStreamSink().request(numMessages);
diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java
index b1340565cb..da10e62356 100644
--- a/core/src/main/java/io/grpc/internal/GrpcUtil.java
+++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java
@@ -248,6 +248,22 @@ public final class GrpcUtil {
}
};
+ /**
+ * RPCs created on the Channel returned by {@link io.grpc.LoadBalancer.Subchannel#asChannel}
+ * will have this option with value {@code true}. They will be treated differently from
+ * the ones created by application.
+ */
+ public static final CallOptions.Key CALL_OPTIONS_RPC_OWNED_BY_BALANCER =
+ CallOptions.Key.create("io.grpc.internal.CALL_OPTIONS_RPC_OWNED_BY_BALANCER");
+
+ /**
+ * Returns true if an RPC with the given properties should be counted when calculating the
+ * in-use state of a transport.
+ */
+ public static boolean shouldBeCountedForInUse(CallOptions callOptions) {
+ return !(Boolean.TRUE.equals(callOptions.getOption(CALL_OPTIONS_RPC_OWNED_BY_BALANCER)));
+ }
+
/**
* Returns a proxy detector appropriate for the current environment.
*/
diff --git a/core/src/main/java/io/grpc/internal/InUseStateAggregator.java b/core/src/main/java/io/grpc/internal/InUseStateAggregator.java
index 4c9bba1296..786baf77e7 100644
--- a/core/src/main/java/io/grpc/internal/InUseStateAggregator.java
+++ b/core/src/main/java/io/grpc/internal/InUseStateAggregator.java
@@ -23,7 +23,7 @@ import javax.annotation.concurrent.NotThreadSafe;
* Aggregates the in-use state of a set of objects.
*/
@NotThreadSafe
-abstract class InUseStateAggregator {
+public abstract class InUseStateAggregator {
private final HashSet inUseObjects = new HashSet();
@@ -32,7 +32,7 @@ abstract class InUseStateAggregator {
*
*
This may call into {@link #handleInUse} or {@link #handleNotInUse} when appropriate.
*/
- final void updateObjectInUse(T object, boolean inUse) {
+ public final void updateObjectInUse(T object, boolean inUse) {
int origSize = inUseObjects.size();
if (inUse) {
inUseObjects.add(object);
@@ -47,7 +47,7 @@ abstract class InUseStateAggregator {
}
}
- final boolean isInUse() {
+ public final boolean isInUse() {
return !inUseObjects.isEmpty();
}
@@ -55,10 +55,10 @@ abstract class InUseStateAggregator {
* Called when the aggregated in-use state has changed to true, which means at least one object is
* in use.
*/
- abstract void handleInUse();
+ protected abstract void handleInUse();
/**
* Called when the aggregated in-use state has changed to false, which means no object is in use.
*/
- abstract void handleNotInUse();
+ protected abstract void handleNotInUse();
}
diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java
index 879b7d873b..0f67a1f6d0 100644
--- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java
+++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java
@@ -131,12 +131,12 @@ final class InternalSubchannel implements InternalInstrumented {
private final InUseStateAggregator inUseStateAggregator =
new InUseStateAggregator() {
@Override
- void handleInUse() {
+ protected void handleInUse() {
callback.onInUse(InternalSubchannel.this);
}
@Override
- void handleNotInUse() {
+ protected void handleNotInUse() {
callback.onNotInUse(InternalSubchannel.this);
}
};
@@ -215,6 +215,21 @@ final class InternalSubchannel implements InternalInstrumented {
return null;
}
+ /**
+ * Returns a READY transport if there is any, without trying to connect.
+ */
+ @Nullable
+ ClientTransport getTransport() {
+ return activeTransport;
+ }
+
+ /**
+ * Returns the authority string associated with this Subchannel.
+ */
+ String getAuthority() {
+ return authority;
+ }
+
@GuardedBy("lock")
private void startNewTransport() {
Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index 3765692c79..37aad45381 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -121,7 +121,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
private final ClientTransportFactory transportFactory;
private final Executor executor;
private final ObjectPool extends Executor> executorPool;
- private final ObjectPool extends Executor> oobExecutorPool;
+ private final ObjectPool extends Executor> balancerRpcExecutorPool;
+ private final ExecutorHolder balancerRpcExecutorHolder;
private final TimeProvider timeProvider;
private final int maxTraceEvents;
@@ -515,7 +516,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
AbstractManagedChannelImplBuilder> builder,
ClientTransportFactory clientTransportFactory,
BackoffPolicy.Provider backoffPolicyProvider,
- ObjectPool extends Executor> oobExecutorPool,
+ ObjectPool extends Executor> balancerRpcExecutorPool,
Supplier stopwatchSupplier,
List interceptors,
final TimeProvider timeProvider) {
@@ -537,7 +538,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
this.loadBalancerFactory = builder.loadBalancerFactory;
}
this.executorPool = checkNotNull(builder.executorPool, "executorPool");
- this.oobExecutorPool = checkNotNull(oobExecutorPool, "oobExecutorPool");
+ this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
+ this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
this.executor = checkNotNull(executorPool.getObject(), "executor");
this.delayedTransport = new DelayedClientTransport(this.executor, this.channelExecutor);
this.delayedTransport.start(delayedTransportListener);
@@ -835,6 +837,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
terminated = true;
terminatedLatch.countDown();
executorPool.returnObject(executor);
+ balancerRpcExecutorHolder.release();
// Release the transport factory so that it can deallocate any resources.
transportFactory.close();
}
@@ -1153,7 +1156,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
oobChannelTracer = new ChannelTracer(maxTraceEvents, oobChannelCreationTime, "OobChannel");
}
final OobChannel oobChannel = new OobChannel(
- authority, oobExecutorPool, transportFactory.getScheduledExecutorService(),
+ authority, balancerRpcExecutorPool, transportFactory.getScheduledExecutorService(),
channelExecutor, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
if (channelTracer != null) {
channelTracer.reportEvent(new ChannelTrace.Event.Builder()
@@ -1455,6 +1458,14 @@ final class ManagedChannelImpl extends ManagedChannel implements
public String toString() {
return subchannel.getLogId().toString();
}
+
+ @Override
+ public Channel asChannel() {
+ return new SubchannelChannel(
+ subchannel, balancerRpcExecutorHolder.getExecutor(),
+ transportFactory.getScheduledExecutorService(),
+ callTracerFactory.create());
+ }
}
@Override
@@ -1510,16 +1521,41 @@ final class ManagedChannelImpl extends ManagedChannel implements
*/
private final class IdleModeStateAggregator extends InUseStateAggregator