From 7cb049776751b28b77bf00be9e327253d2ac8104 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Mon, 9 Jan 2017 14:44:10 -0800 Subject: [PATCH] core: ManagedChannelImpl2. (#2530) 1. Adapt to LoadBalancer2 interface. Channel holds on to a single DelayedClientTransport2. 2. Lock-free: every channel state mutation, including Subchannel mutations, calling into LoadBalancer, idleness and shutdown, is made from channelExecutor. 3. Idleness grace period is no longer needed. --- .../java/io/grpc/internal/ClientCallImpl.java | 4 +- .../io/grpc/internal/ManagedChannelImpl.java | 3 +- .../io/grpc/internal/ManagedChannelImpl2.java | 876 ++++++++++++ .../java/io/grpc/internal/ObjectPool.java | 46 + .../java/io/grpc/internal/OobChannel.java | 249 ++++ .../grpc/internal/SingleTransportChannel.java | 3 +- .../java/io/grpc/internal/TransportSet.java | 3 +- .../io/grpc/internal/ClientCallImplTest.java | 2 +- .../internal/DelayedClientTransport2Test.java | 1 - .../ManagedChannelImpl2IdlenessTest.java | 364 +++++ .../internal/ManagedChannelImpl2Test.java | 1217 +++++++++++++++++ 11 files changed, 2761 insertions(+), 7 deletions(-) create mode 100644 core/src/main/java/io/grpc/internal/ManagedChannelImpl2.java create mode 100644 core/src/main/java/io/grpc/internal/ObjectPool.java create mode 100644 core/src/main/java/io/grpc/internal/OobChannel.java create mode 100644 core/src/test/java/io/grpc/internal/ManagedChannelImpl2IdlenessTest.java create mode 100644 core/src/test/java/io/grpc/internal/ManagedChannelImpl2Test.java diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 5c0c1877ea..c242c031c1 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -127,7 +127,7 @@ final class ClientCallImpl extends ClientCall /** * Returns a transport for a new call. */ - ClientTransport get(CallOptions callOptions); + ClientTransport get(CallOptions callOptions, Metadata headers); } ClientCallImpl setDecompressorRegistry(DecompressorRegistry decompressorRegistry) { @@ -215,7 +215,7 @@ final class ClientCallImpl extends ClientCall if (!deadlineExceeded) { updateTimeoutHeaders(effectiveDeadline, callOptions.getDeadline(), context.getDeadline(), headers); - ClientTransport transport = clientTransportProvider.get(callOptions); + ClientTransport transport = clientTransportProvider.get(callOptions, headers); Context origContext = context.attach(); try { stream = transport.newStream(method, headers, callOptions, statsTraceCtx); diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 431648cb58..717f45c975 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -51,6 +51,7 @@ import io.grpc.DecompressorRegistry; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.ManagedChannel; +import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.NameResolver; import io.grpc.ResolvedServerInfoGroup; @@ -359,7 +360,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI private final ClientTransportProvider transportProvider = new ClientTransportProvider() { @Override - public ClientTransport get(CallOptions callOptions) { + public ClientTransport get(CallOptions callOptions, Metadata headers) { LoadBalancer balancer = loadBalancer; if (balancer == null) { // Current state is either idle or in grace period diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl2.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl2.java new file mode 100644 index 0000000000..401c32bd18 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl2.java @@ -0,0 +1,876 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static io.grpc.ConnectivityState.IDLE; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; +import com.google.instrumentation.stats.StatsContextFactory; + +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ClientInterceptors; +import io.grpc.CompressorRegistry; +import io.grpc.ConnectivityStateInfo; +import io.grpc.DecompressorRegistry; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer2.PickResult; +import io.grpc.LoadBalancer2.SubchannelPicker; +import io.grpc.LoadBalancer2; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.NameResolver; +import io.grpc.ResolvedServerInfoGroup; +import io.grpc.Status; +import io.grpc.internal.ClientCallImpl.ClientTransportProvider; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.regex.Pattern; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +/** A communication channel for making outgoing RPCs. */ +@ThreadSafe +public final class ManagedChannelImpl2 extends ManagedChannel implements WithLogId { + private static final Logger log = Logger.getLogger(ManagedChannelImpl2.class.getName()); + + // Matching this pattern means the target string is a URI target or at least intended to be one. + // A URI target must be an absolute hierarchical URI. + // From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." ) + @VisibleForTesting + static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*"); + + static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1; + + @VisibleForTesting + static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5; + + private static final ClientTransport SHUTDOWN_TRANSPORT = + new FailingClientTransport(Status.UNAVAILABLE.withDescription("Channel is shutdown")); + + @VisibleForTesting + static final Status SHUTDOWN_NOW_STATUS = + Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked"); + + private final String target; + private final NameResolver.Factory nameResolverFactory; + private final Attributes nameResolverParams; + private final LoadBalancer2.Factory loadBalancerFactory; + private final ClientTransportFactory transportFactory; + private final Executor executor; + private final ObjectPool executorPool; + private final ObjectPool oobExecutorPool; + private final LogId logId = LogId.allocate(getClass().getName()); + + private final ChannelExecutor channelExecutor = new ChannelExecutor(); + + private final DecompressorRegistry decompressorRegistry; + private final CompressorRegistry compressorRegistry; + + private final ObjectPool timerServicePool; + private final Supplier stopwatchSupplier; + /** The timout before entering idle mode. */ + private final long idleTimeoutMillis; + private final StatsContextFactory statsFactory; + + /** + * Executor that runs deadline timers for requests. + */ + // Must be assigned from channelExecutor + private volatile ScheduledExecutorService scheduledExecutor; + + private final BackoffPolicy.Provider backoffPolicyProvider; + + /** + * We delegate to this channel, so that we can have interceptors as necessary. If there aren't + * any interceptors this will just be {@link RealChannel}. + */ + private final Channel interceptorChannel; + @Nullable private final String userAgent; + + // Only null after channel is terminated. Must be assigned from the channelExecutor. + private NameResolver nameResolver; + + // null when channel is in idle mode. Must be assigned from channelExecutor. + @Nullable + private LoadBalancer2 loadBalancer; + + // Must be assigned from channelExecutor. null if channel is in idle mode. + @Nullable + private volatile SubchannelPicker subchannelPicker; + + // Must be mutated from channelExecutor + // If any monitoring hook to be added later needs to get a snapshot of this Set, we could + // switch to a ConcurrentHashMap. + private final Set subchannels = new HashSet(16, .75f); + + // Must be mutated from channelExecutor + private final Set oobChannels = new HashSet(1, .75f); + + // reprocess() must be run from channelExecutor + private final DelayedClientTransport2 delayedTransport; + + // Shutdown states. + // + // Channel's shutdown process: + // 1. shutdown(): stop accepting new calls from applications + // 1a shutdown <- true + // 1b subchannelPicker <- null + // 1c delayedTransport.shutdown() + // 2. delayedTransport terminated: stop stream-creation functionality + // 2a terminating <- true + // 2b loadBalancer.shutdown() + // * LoadBalancer will shutdown subchannels and OOB channels + // 2c loadBalancer <- null + // 2d nameResolver.shutdown() + // 2e nameResolver <- null + // 3. All subchannels and OOB channels terminated: Channel considered terminated + + private final AtomicBoolean shutdown = new AtomicBoolean(false); + // Must only be mutated and read from channelExecutor + private boolean shutdownNowed; + // Must be mutated from channelExecutor + private volatile boolean terminating; + // Must be mutated from channelExecutor + private volatile boolean terminated; + private final CountDownLatch terminatedLatch = new CountDownLatch(1); + + // Called from channelExecutor + private final ManagedClientTransport.Listener delayedTransportListener = + new ManagedClientTransport.Listener() { + @Override + public void transportShutdown(Status s) { + checkState(shutdown.get(), "Channel must have been shut down"); + } + + @Override + public void transportReady() { + // Don't care + } + + @Override + public void transportInUse(final boolean inUse) { + inUseStateAggregator.updateObjectInUse(delayedTransport, inUse); + } + + @Override + public void transportTerminated() { + checkState(shutdown.get(), "Channel must have been shut down"); + terminating = true; + if (loadBalancer != null) { + loadBalancer.shutdown(); + loadBalancer = null; + } + if (nameResolver != null) { + nameResolver.shutdown(); + nameResolver = null; + } + + // Until LoadBalancer is shutdown, it may still create new subchannels. We catch them + // here. + maybeShutdownNowSubchannels(); + maybeTerminateChannel(); + } + }; + + // Must be called from channelExecutor + private void maybeShutdownNowSubchannels() { + if (shutdownNowed) { + for (InternalSubchannel subchannel : subchannels) { + subchannel.shutdownNow(SHUTDOWN_NOW_STATUS); + } + for (InternalSubchannel oobChannel : oobChannels) { + oobChannel.shutdownNow(SHUTDOWN_NOW_STATUS); + } + } + } + + // Must be accessed from channelExecutor + @VisibleForTesting + final InUseStateAggregator2 inUseStateAggregator = + new InUseStateAggregator2() { + @Override + void handleInUse() { + exitIdleMode(); + } + + @Override + void handleNotInUse() { + if (shutdown.get()) { + return; + } + rescheduleIdleTimer(); + } + }; + + // Run from channelExecutor + private class IdleModeTimer implements Runnable { + // Only mutated from channelExecutor + boolean cancelled; + + @Override + public void run() { + if (cancelled) { + // Race detected: this task was scheduled on channelExecutor before cancelIdleTimer() + // could cancel the timer. + return; + } + log.log(Level.FINE, "[{0}] Entering idle mode", getLogId()); + // nameResolver and loadBalancer are guaranteed to be non-null. If any of them were null, + // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown() + // did not cancel idleModeTimer, both of which are bugs. + nameResolver.shutdown(); + nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams); + loadBalancer.shutdown(); + loadBalancer = null; + subchannelPicker = null; + } + } + + // Must be used from channelExecutor + @Nullable + private ScheduledFuture idleModeTimerFuture; + // Must be used from channelExecutor + @Nullable + private IdleModeTimer idleModeTimer; + + /** + * Make the channel exit idle mode, if it's in it. + * + *

Must be called from channelExecutor + */ + @VisibleForTesting + void exitIdleMode() { + if (shutdown.get()) { + return; + } + if (inUseStateAggregator.isInUse()) { + // Cancel the timer now, so that a racing due timer will not put Channel on idleness + // when the caller of exitIdleMode() is about to use the returned loadBalancer. + cancelIdleTimer(); + } else { + // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while + // isInUse() == false, in which case we still need to schedule the timer. + rescheduleIdleTimer(); + } + if (loadBalancer != null) { + return; + } + log.log(Level.FINE, "[{0}] Exiting idle mode", getLogId()); + LbHelperImpl helper = new LbHelperImpl(nameResolver); + helper.lb = loadBalancerFactory.newLoadBalancer(helper); + this.loadBalancer = helper.lb; + + NameResolverListenerImpl listener = new NameResolverListenerImpl(helper); + try { + nameResolver.start(listener); + } catch (Throwable t) { + listener.onError(Status.fromThrowable(t)); + } + } + + // Must be run from channelExecutor + private void cancelIdleTimer() { + if (idleModeTimerFuture != null) { + idleModeTimerFuture.cancel(false); + idleModeTimer.cancelled = true; + idleModeTimerFuture = null; + idleModeTimer = null; + } + } + + // Always run from channelExecutor + private void rescheduleIdleTimer() { + if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) { + return; + } + cancelIdleTimer(); + idleModeTimer = new IdleModeTimer(); + idleModeTimerFuture = scheduledExecutor.schedule( + new LogExceptionRunnable(new Runnable() { + @Override + public void run() { + channelExecutor.executeLater(idleModeTimer).drain(); + } + }), + idleTimeoutMillis, TimeUnit.MILLISECONDS); + } + + private final ClientTransportProvider transportProvider = new ClientTransportProvider() { + @Override + public ClientTransport get(CallOptions callOptions, Metadata headers) { + SubchannelPicker pickerCopy = subchannelPicker; + if (shutdown.get()) { + // If channel is shut down, delayedTransport is also shut down which will fail the stream + // properly. + return delayedTransport; + } + if (pickerCopy == null) { + channelExecutor.executeLater(new Runnable() { + @Override + public void run() { + exitIdleMode(); + } + }).drain(); + return delayedTransport; + } + // There is no need to reschedule the idle timer here. + // + // pickerCopy != null, which means idle timer has not expired when this method starts. + // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer + // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after + // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it. + // + // In most cases the idle timer is scheduled to fire after the transport has created the + // stream, which would have reported in-use state to the channel that would have cancelled + // the idle timer. + PickResult pickResult = pickerCopy.pickSubchannel(callOptions.getAffinity(), headers); + ClientTransport transport = GrpcUtil.getTransportFromPickResult( + pickResult, callOptions.isWaitForReady()); + if (transport != null) { + return transport; + } + return delayedTransport; + } + }; + + ManagedChannelImpl2(String target, BackoffPolicy.Provider backoffPolicyProvider, + NameResolver.Factory nameResolverFactory, Attributes nameResolverParams, + LoadBalancer2.Factory loadBalancerFactory, ClientTransportFactory transportFactory, + DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry, + ObjectPool timerServicePool, + ObjectPool executorPool, ObjectPool oobExecutorPool, + Supplier stopwatchSupplier, long idleTimeoutMillis, + @Nullable String userAgent, + List interceptors, StatsContextFactory statsFactory) { + this.target = checkNotNull(target, "target"); + this.nameResolverFactory = checkNotNull(nameResolverFactory, "nameResolverFactory"); + this.nameResolverParams = checkNotNull(nameResolverParams, "nameResolverParams"); + this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams); + this.loadBalancerFactory = checkNotNull(loadBalancerFactory, "loadBalancerFactory"); + this.executorPool = checkNotNull(executorPool, "executorPool"); + this.oobExecutorPool = checkNotNull(oobExecutorPool, "oobExecutorPool"); + this.executor = checkNotNull(executorPool.getObject(), "executor"); + this.delayedTransport = new DelayedClientTransport2(this.executor, this.channelExecutor); + this.delayedTransport.start(delayedTransportListener); + this.backoffPolicyProvider = backoffPolicyProvider; + this.transportFactory = + new CallCredentialsApplyingTransportFactory(transportFactory, this.executor); + this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors); + this.timerServicePool = checkNotNull(timerServicePool, "timerServicePool"); + this.scheduledExecutor = checkNotNull(timerServicePool.getObject(), "timerService"); + this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); + if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) { + this.idleTimeoutMillis = idleTimeoutMillis; + } else { + checkArgument( + idleTimeoutMillis >= AbstractManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS, + "invalid idleTimeoutMillis %s", idleTimeoutMillis); + this.idleTimeoutMillis = idleTimeoutMillis; + } + this.decompressorRegistry = checkNotNull(decompressorRegistry, "decompressorRegistry"); + this.compressorRegistry = checkNotNull(compressorRegistry, "compressorRegistry"); + this.userAgent = userAgent; + this.statsFactory = checkNotNull(statsFactory, "statsFactory"); + + log.log(Level.FINE, "[{0}] Created with target {1}", new Object[] {getLogId(), target}); + } + + @VisibleForTesting + static NameResolver getNameResolver(String target, NameResolver.Factory nameResolverFactory, + Attributes nameResolverParams) { + // Finding a NameResolver. Try using the target string as the URI. If that fails, try prepending + // "dns:///". + URI targetUri = null; + StringBuilder uriSyntaxErrors = new StringBuilder(); + try { + targetUri = new URI(target); + // For "localhost:8080" this would likely cause newNameResolver to return null, because + // "localhost" is parsed as the scheme. Will fall into the next branch and try + // "dns:///localhost:8080". + } catch (URISyntaxException e) { + // Can happen with ip addresses like "[::1]:1234" or 127.0.0.1:1234. + uriSyntaxErrors.append(e.getMessage()); + } + if (targetUri != null) { + NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverParams); + if (resolver != null) { + return resolver; + } + // "foo.googleapis.com:8080" cause resolver to be null, because "foo.googleapis.com" is an + // unmapped scheme. Just fall through and will try "dns:///foo.googleapis.com:8080" + } + + // If we reached here, the targetUri couldn't be used. + if (!URI_PATTERN.matcher(target).matches()) { + // It doesn't look like a URI target. Maybe it's an authority string. Try with the default + // scheme from the factory. + try { + targetUri = new URI(nameResolverFactory.getDefaultScheme(), "", "/" + target, null); + } catch (URISyntaxException e) { + // Should not be possible. + throw new IllegalArgumentException(e); + } + NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverParams); + if (resolver != null) { + return resolver; + } + } + throw new IllegalArgumentException(String.format( + "cannot find a NameResolver for %s%s", + target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : "")); + } + + /** + * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately + * cancelled. + */ + @Override + public ManagedChannelImpl2 shutdown() { + log.log(Level.FINE, "[{0}] shutdown() called", getLogId()); + if (!shutdown.compareAndSet(false, true)) { + return this; + } + delayedTransport.shutdown(); + channelExecutor.executeLater(new Runnable() { + @Override + public void run() { + cancelIdleTimer(); + } + }).drain(); + log.log(Level.FINE, "[{0}] Shutting down", getLogId()); + return this; + } + + /** + * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although + * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely + * return {@code false} immediately after this method returns. + */ + @Override + public ManagedChannelImpl2 shutdownNow() { + log.log(Level.FINE, "[{0}] shutdownNow() called", getLogId()); + shutdown(); + delayedTransport.shutdownNow(SHUTDOWN_NOW_STATUS); + channelExecutor.executeLater(new Runnable() { + @Override + public void run() { + if (shutdownNowed) { + return; + } + shutdownNowed = true; + maybeShutdownNowSubchannels(); + } + }).drain(); + return this; + } + + @Override + public boolean isShutdown() { + return shutdown.get(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return terminatedLatch.await(timeout, unit); + } + + @Override + public boolean isTerminated() { + return terminated; + } + + /* + * Creates a new outgoing call on the channel. + */ + @Override + public ClientCall newCall(MethodDescriptor method, + CallOptions callOptions) { + return interceptorChannel.newCall(method, callOptions); + } + + @Override + public String authority() { + return interceptorChannel.authority(); + } + + private class RealChannel extends Channel { + @Override + public ClientCall newCall(MethodDescriptor method, + CallOptions callOptions) { + Executor executor = callOptions.getExecutor(); + if (executor == null) { + executor = ManagedChannelImpl2.this.executor; + } + StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext( + method.getFullMethodName(), statsFactory, stopwatchSupplier); + return new ClientCallImpl( + method, + executor, + callOptions, + statsTraceCtx, + transportProvider, + scheduledExecutor) + .setDecompressorRegistry(decompressorRegistry) + .setCompressorRegistry(compressorRegistry); + } + + @Override + public String authority() { + String authority = nameResolver.getServiceAuthority(); + return checkNotNull(authority, "authority"); + } + } + + /** + * Terminate the channel if termination conditions are met. + */ + // Must be run from channelExecutor + private void maybeTerminateChannel() { + if (terminated) { + return; + } + if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) { + log.log(Level.FINE, "[{0}] Terminated", getLogId()); + terminated = true; + terminatedLatch.countDown(); + executorPool.returnObject(executor); + scheduledExecutor = timerServicePool.returnObject(scheduledExecutor); + // Release the transport factory so that it can deallocate any resources. + transportFactory.close(); + } + } + + private class LbHelperImpl extends LoadBalancer2.Helper { + LoadBalancer2 lb; + final NameResolver nr; + + LbHelperImpl(NameResolver nr) { + this.nr = checkNotNull(nr, "NameResolver"); + } + + @Override + public SubchannelImpl createSubchannel(EquivalentAddressGroup addressGroup, Attributes attrs) { + checkNotNull(addressGroup, "addressGroup"); + checkNotNull(attrs, "attrs"); + ScheduledExecutorService scheduledExecutorCopy = scheduledExecutor; + checkState(scheduledExecutorCopy != null, + "scheduledExecutor is already cleared. Looks like you are calling this method after " + + "you've already shut down"); + final SubchannelImplImpl subchannel = new SubchannelImplImpl(attrs); + final InternalSubchannel internalSubchannel = new InternalSubchannel( + addressGroup, authority(), userAgent, backoffPolicyProvider, transportFactory, + scheduledExecutorCopy, stopwatchSupplier, channelExecutor, + new InternalSubchannel.Callback() { + // All callbacks are run in channelExecutor + @Override + void onTerminated(InternalSubchannel is) { + subchannels.remove(is); + maybeTerminateChannel(); + } + + @Override + void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { + if ((newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE)) { + nr.refresh(); + } + lb.handleSubchannelState(subchannel, newState); + } + + @Override + public void onInUse(InternalSubchannel is) { + inUseStateAggregator.updateObjectInUse(is, true); + } + + @Override + public void onNotInUse(InternalSubchannel is) { + inUseStateAggregator.updateObjectInUse(is, false); + } + }); + subchannel.subchannel = internalSubchannel; + log.log(Level.FINE, "[{0}] {1} created for {2}", + new Object[] {getLogId(), internalSubchannel.getLogId(), addressGroup}); + runSerialized(new Runnable() { + @Override + public void run() { + if (terminating) { + // Because runSerialized() doesn't guarantee the runnable has been executed upon when + // returning, the subchannel may still be returned to the balancer without being + // shutdown even if "terminating" is already true. The subchannel will not be used in + // this case, because delayed transport has terminated when "terminating" becomes + // true, and no more requests will be sent to balancer beyond this point. + internalSubchannel.shutdown(); + } + if (!terminated) { + // If channel has not terminated, it will track the subchannel and block termination + // for it. + subchannels.add(internalSubchannel); + } + } + }); + return subchannel; + } + + @Override + public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) { + ScheduledExecutorService scheduledExecutorCopy = scheduledExecutor; + checkState(scheduledExecutorCopy != null, + "scheduledExecutor is already cleared. Looks like you are calling this method after " + + "you've already shut down"); + final OobChannel oobChannel = new OobChannel(statsFactory, authority, + oobExecutorPool, scheduledExecutorCopy, stopwatchSupplier, channelExecutor); + final InternalSubchannel internalSubchannel = new InternalSubchannel( + addressGroup, authority, userAgent, backoffPolicyProvider, transportFactory, + scheduledExecutorCopy, stopwatchSupplier, channelExecutor, + // All callback methods are run from channelExecutor + new InternalSubchannel.Callback() { + @Override + void onTerminated(InternalSubchannel is) { + oobChannels.remove(is); + oobChannel.handleSubchannelTerminated(); + maybeTerminateChannel(); + } + + @Override + void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { + oobChannel.handleSubchannelStateChange(newState); + } + }); + oobChannel.setSubchannel(internalSubchannel); + runSerialized(new Runnable() { + @Override + public void run() { + if (terminating) { + oobChannel.shutdown(); + } + if (!terminated) { + // If channel has not terminated, it will track the subchannel and block termination + // for it. + oobChannels.add(internalSubchannel); + } + } + }); + return oobChannel; + } + + @Override + public String getAuthority() { + return ManagedChannelImpl2.this.authority(); + } + + @Override + public NameResolver.Factory getNameResolverFactory() { + return nameResolverFactory; + } + + @Override + public void runSerialized(Runnable task) { + channelExecutor.executeLater(task).drain(); + } + + @Override + public void updatePicker(final SubchannelPicker picker) { + runSerialized(new Runnable() { + @Override + public void run() { + subchannelPicker = picker; + delayedTransport.reprocess(picker); + } + }); + } + } + + @Override + public LogId getLogId() { + return logId; + } + + private class NameResolverListenerImpl implements NameResolver.Listener { + final LoadBalancer2 balancer; + final LoadBalancer2.Helper helper; + + NameResolverListenerImpl(LbHelperImpl helperImpl) { + this.balancer = helperImpl.lb; + this.helper = helperImpl; + } + + @Override + public void onUpdate(final List servers, final Attributes config) { + if (servers.isEmpty()) { + onError(Status.UNAVAILABLE.withDescription("NameResolver returned an empty list")); + return; + } + log.log(Level.FINE, "[{0}] resolved address: {1}, config={2}", + new Object[] {getLogId(), servers, config}); + helper.runSerialized(new Runnable() { + @Override + public void run() { + if (terminated) { + return; + } + try { + balancer.handleResolvedAddresses(servers, config); + } catch (Throwable e) { + log.log(Level.WARNING, "[" + getLogId() + "] Caught exception from LoadBalancer", e); + // It must be a bug! Push the exception back to LoadBalancer in the hope that it may + // be propagated to the application. + balancer.handleNameResolutionError(Status.INTERNAL.withCause(e) + .withDescription("Thrown from handleResolvedAddresses(): " + e)); + } + } + }); + } + + @Override + public void onError(final Status error) { + checkArgument(!error.isOk(), "the error status must not be OK"); + log.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}", + new Object[] {getLogId(), error}); + channelExecutor.executeLater(new Runnable() { + @Override + public void run() { + if (terminated) { + return; + } + balancer.handleNameResolutionError(error); + } + }).drain(); + } + } + + private final class SubchannelImplImpl extends SubchannelImpl { + // Set right after SubchannelImplImpl is created. + InternalSubchannel subchannel; + final Object shutdownLock = new Object(); + final Attributes attrs; + + @GuardedBy("shutdownLock") + boolean shutdownRequested; + @GuardedBy("shutdownLock") + ScheduledFuture delayedShutdownTask; + + SubchannelImplImpl(Attributes attrs) { + this.attrs = checkNotNull(attrs, "attrs"); + } + + @Override + ClientTransport obtainActiveTransport() { + return subchannel.obtainActiveTransport(); + } + + @Override + public void shutdown() { + synchronized (shutdownLock) { + if (shutdownRequested) { + if (terminating && delayedShutdownTask != null) { + // shutdown() was previously called when terminating == false, thus a delayed shutdown() + // was scheduled. Now since terminating == true, We should expedite the shutdown. + delayedShutdownTask.cancel(false); + delayedShutdownTask = null; + // Will fall through to the subchannel.shutdown() at the end. + } else { + return; + } + } else { + shutdownRequested = true; + } + ScheduledExecutorService scheduledExecutorCopy = scheduledExecutor; + // Add a delay to shutdown to deal with the race between 1) a transport being picked and + // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g., + // because of address change, or because LoadBalancer is shutdown by Channel entering idle + // mode). If (2) wins, the app will see a spurious error. We work around this by delaying + // shutdown of Subchannel for a few seconds here. + // + // TODO(zhangkun83): consider a better approach + // (https://github.com/grpc/grpc-java/issues/2562). + if (!terminating && scheduledExecutorCopy != null) { + delayedShutdownTask = scheduledExecutorCopy.schedule( + new LogExceptionRunnable( + new Runnable() { + @Override + public void run() { + subchannel.shutdown(); + } + }), SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); + return; + } + } + // Two possible ways to get here: + // + // 1. terminating == true: no more real streams will be created, it's safe and also desirable + // to shutdown timely. + // + // 2. scheduledExecutor == null: possible only when Channel has already been terminated. + // Though may not be necessary, we'll do it anyway. + subchannel.shutdown(); + } + + @Override + public void requestConnection() { + subchannel.obtainActiveTransport(); + } + + @Override + public EquivalentAddressGroup getAddresses() { + return subchannel.getAddressGroup(); + } + + @Override + public Attributes getAttributes() { + return attrs; + } + } +} diff --git a/core/src/main/java/io/grpc/internal/ObjectPool.java b/core/src/main/java/io/grpc/internal/ObjectPool.java new file mode 100644 index 0000000000..20ae7191e4 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/ObjectPool.java @@ -0,0 +1,46 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +public interface ObjectPool { + /** + * Get an object from the pool. + */ + T getObject(); + + /** + * Return the object to the pool. The caller should not use the object beyond this point. + * + * @return always {@code null} + */ + T returnObject(Object object); +} diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java new file mode 100644 index 0000000000..921e96f4cb --- /dev/null +++ b/core/src/main/java/io/grpc/internal/OobChannel.java @@ -0,0 +1,249 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; + +import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; +import com.google.instrumentation.stats.StatsContextFactory; + +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer2.PickResult; +import io.grpc.LoadBalancer2.SubchannelPicker; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.internal.ClientCallImpl.ClientTransportProvider; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.concurrent.ThreadSafe; + +/** + * A ManagedChannel backed by a single {@link InternalSubchannel} and used for {@link LoadBalancer2} + * to its own RPC needs. + */ +@ThreadSafe +final class OobChannel extends ManagedChannel implements WithLogId { + private static final Logger log = Logger.getLogger(OobChannel.class.getName()); + + private SubchannelImpl subchannelImpl; + private SubchannelPicker subchannelPicker; + + private final LogId logId = LogId.allocate(getClass().getName()); + private final StatsContextFactory statsFactory; + private final String authority; + private final DelayedClientTransport2 delayedTransport; + private final ObjectPool executorPool; + private final Executor executor; + private final ScheduledExecutorService deadlineCancellationExecutor; + private final Supplier stopwatchSupplier; + private final CountDownLatch terminatedLatch = new CountDownLatch(1); + private volatile boolean shutdown; + + private final ClientTransportProvider transportProvider = new ClientTransportProvider() { + @Override + public ClientTransport get(CallOptions callOptions, Metadata headers) { + // delayed transport's newStream() always acquires a lock, but concurrent performance doesn't + // matter here because OOB communication should be sparse, and it's not on application RPC's + // critical path. + return delayedTransport; + } + }; + + OobChannel(StatsContextFactory statsFactory, String authority, + ObjectPool executorPool, + ScheduledExecutorService deadlineCancellationExecutor, Supplier stopwatchSupplier, + ChannelExecutor channelExecutor) { + this.statsFactory = checkNotNull(statsFactory, "statsFactory"); + this.authority = checkNotNull(authority, "authority"); + this.executorPool = checkNotNull(executorPool, "executorPool"); + this.executor = checkNotNull(executorPool.getObject(), "executor"); + this.deadlineCancellationExecutor = checkNotNull( + deadlineCancellationExecutor, "deadlineCancellationExecutor"); + this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); + this.delayedTransport = new DelayedClientTransport2(executor, channelExecutor); + this.delayedTransport.start(new ManagedClientTransport.Listener() { + @Override + public void transportShutdown(Status s) { + // Don't care + } + + @Override + public void transportTerminated() { + subchannelImpl.shutdown(); + } + + @Override + public void transportReady() { + // Don't care + } + + @Override + public void transportInUse(boolean inUse) { + // Don't care + } + }); + } + + // Must be called only once, right after the OobChannel is created. + void setSubchannel(final InternalSubchannel subchannel) { + log.log(Level.FINE, "[{0}] Created with [{1}]", new Object[] {this, subchannel}); + subchannelImpl = new SubchannelImpl() { + @Override + public void shutdown() { + subchannel.shutdown(); + } + + @Override + ClientTransport obtainActiveTransport() { + return subchannel.obtainActiveTransport(); + } + + @Override + public void requestConnection() { + subchannel.obtainActiveTransport(); + } + + @Override + public EquivalentAddressGroup getAddresses() { + return subchannel.getAddressGroup(); + } + + @Override + public Attributes getAttributes() { + return Attributes.EMPTY; + } + }; + + subchannelPicker = new SubchannelPicker() { + final PickResult result = PickResult.withSubchannel(subchannelImpl); + + @Override + public PickResult pickSubchannel(Attributes affinity, Metadata headers) { + return result; + } + }; + delayedTransport.reprocess(subchannelPicker); + } + + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, CallOptions callOptions) { + StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext( + methodDescriptor.getFullMethodName(), statsFactory, stopwatchSupplier); + return new ClientCallImpl(methodDescriptor, + callOptions.getExecutor() == null ? executor : callOptions.getExecutor(), + callOptions, statsTraceCtx, transportProvider, + deadlineCancellationExecutor); + } + + @Override + public String authority() { + return authority; + } + + @Override + public LogId getLogId() { + return logId; + } + + @Override + public boolean isTerminated() { + return terminatedLatch.getCount() == 0; + } + + @Override + public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedException { + return terminatedLatch.await(time, unit); + } + + @Override + public ManagedChannel shutdown() { + shutdown = true; + delayedTransport.shutdown(); + return this; + } + + @Override + public boolean isShutdown() { + return shutdown; + } + + @Override + public ManagedChannel shutdownNow() { + shutdown = true; + delayedTransport.shutdownNow( + Status.UNAVAILABLE.withDescription("OobChannel.shutdownNow() called")); + return this; + } + + void handleSubchannelStateChange(final ConnectivityStateInfo newState) { + switch (newState.getState()) { + case READY: + case IDLE: + delayedTransport.reprocess(subchannelPicker); + break; + case TRANSIENT_FAILURE: + delayedTransport.reprocess(new SubchannelPicker() { + final PickResult errorResult = PickResult.withError(newState.getStatus()); + + @Override + public PickResult pickSubchannel(Attributes affinity, Metadata headers) { + return errorResult; + } + }); + break; + default: + // Do nothing + } + } + + void handleSubchannelTerminated() { + // When delayedTransport is terminated, it shuts down subchannel. Therefore, at this point + // both delayedTransport and subchannel have terminated. + executorPool.returnObject(executor); + terminatedLatch.countDown(); + } +} diff --git a/core/src/main/java/io/grpc/internal/SingleTransportChannel.java b/core/src/main/java/io/grpc/internal/SingleTransportChannel.java index 3d524519c6..a8b5a430cd 100644 --- a/core/src/main/java/io/grpc/internal/SingleTransportChannel.java +++ b/core/src/main/java/io/grpc/internal/SingleTransportChannel.java @@ -39,6 +39,7 @@ import com.google.instrumentation.stats.StatsContextFactory; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; +import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.internal.ClientCallImpl.ClientTransportProvider; @@ -59,7 +60,7 @@ final class SingleTransportChannel extends Channel { private final ClientTransportProvider transportProvider = new ClientTransportProvider() { @Override - public ClientTransport get(CallOptions callOptions) { + public ClientTransport get(CallOptions callOptions, Metadata headers) { return transport; } }; diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java index 9ab5b63663..501670e745 100644 --- a/core/src/main/java/io/grpc/internal/TransportSet.java +++ b/core/src/main/java/io/grpc/internal/TransportSet.java @@ -41,6 +41,7 @@ import io.grpc.ConnectivityState; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.ManagedChannel; +import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.internal.ClientCallImpl.ClientTransportProvider; @@ -364,7 +365,7 @@ final class TransportSet extends ManagedChannel implements WithLogId { new SerializingExecutor(appExecutor), callOptions, StatsTraceContext.NOOP, new ClientTransportProvider() { @Override - public ClientTransport get(CallOptions callOptions) { + public ClientTransport get(CallOptions callOptions, Metadata headers) { return obtainActiveTransport(); } }, diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index 876429575f..a88300da83 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -155,7 +155,7 @@ public class ClientCallImplTest { public void setUp() { MockitoAnnotations.initMocks(this); assertNotNull(statsCtx); - when(provider.get(any(CallOptions.class))).thenReturn(transport); + when(provider.get(any(CallOptions.class), any(Metadata.class))).thenReturn(transport); when(transport.newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class), any(StatsTraceContext.class))).thenReturn(stream); } diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransport2Test.java b/core/src/test/java/io/grpc/internal/DelayedClientTransport2Test.java index c5c41216f0..dc0b56e29f 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransport2Test.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransport2Test.java @@ -105,7 +105,6 @@ public class DelayedClientTransport2Test { private final Metadata headers = new Metadata(); private final Metadata headers2 = new Metadata(); - private final Metadata headers3 = new Metadata(); private final CallOptions callOptions = CallOptions.DEFAULT.withAuthority("dummy_value"); private final CallOptions callOptions2 = CallOptions.DEFAULT.withAuthority("dummy_value2"); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImpl2IdlenessTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImpl2IdlenessTest.java new file mode 100644 index 0000000000..6af0cc6937 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImpl2IdlenessTest.java @@ -0,0 +1,364 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.Lists; + +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.CompressorRegistry; +import io.grpc.DecompressorRegistry; +import io.grpc.EquivalentAddressGroup; +import io.grpc.IntegerMarshaller; +import io.grpc.LoadBalancer2.Helper; +import io.grpc.LoadBalancer2.PickResult; +import io.grpc.LoadBalancer2.Subchannel; +import io.grpc.LoadBalancer2.SubchannelPicker; +import io.grpc.LoadBalancer2; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.NameResolver; +import io.grpc.ResolvedServerInfo; +import io.grpc.ResolvedServerInfoGroup; +import io.grpc.Status; +import io.grpc.StringMarshaller; +import io.grpc.internal.TestUtils.MockClientTransportInfo; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.net.SocketAddress; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Unit tests for {@link ManagedChannelImpl}'s idle mode. + */ +@RunWith(JUnit4.class) +public class ManagedChannelImpl2IdlenessTest { + private final FakeClock timer = new FakeClock(); + private final FakeClock executor = new FakeClock(); + private final FakeClock oobExecutor = new FakeClock(); + private static final String AUTHORITY = "fakeauthority"; + private static final String USER_AGENT = "fakeagent"; + private static final long IDLE_TIMEOUT_SECONDS = 30; + private ManagedChannelImpl2 channel; + + private final MethodDescriptor method = MethodDescriptor.create( + MethodDescriptor.MethodType.UNKNOWN, "/service/method", + new StringMarshaller(), new IntegerMarshaller()); + + private final List servers = Lists.newArrayList(); + private final List addressGroupList = + new ArrayList(); + + @Mock private ObjectPool timerServicePool; + @Mock private ObjectPool executorPool; + @Mock private ObjectPool oobExecutorPool; + @Mock private ClientTransportFactory mockTransportFactory; + @Mock private LoadBalancer2 mockLoadBalancer; + @Mock private LoadBalancer2.Factory mockLoadBalancerFactory; + @Mock private NameResolver mockNameResolver; + @Mock private NameResolver.Factory mockNameResolverFactory; + @Mock private ClientCall.Listener mockCallListener; + @Mock private ClientCall.Listener mockCallListener2; + @Captor private ArgumentCaptor nameResolverListenerCaptor; + private BlockingQueue newTransports; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + when(timerServicePool.getObject()).thenReturn(timer.getScheduledExecutorService()); + when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService()); + when(oobExecutorPool.getObject()).thenReturn(oobExecutor.getScheduledExecutorService()); + when(mockLoadBalancerFactory.newLoadBalancer(any(Helper.class))).thenReturn(mockLoadBalancer); + when(mockNameResolver.getServiceAuthority()).thenReturn(AUTHORITY); + when(mockNameResolverFactory + .newNameResolver(any(URI.class), any(Attributes.class))) + .thenReturn(mockNameResolver); + + channel = new ManagedChannelImpl2("fake://target", new FakeBackoffPolicyProvider(), + mockNameResolverFactory, Attributes.EMPTY, mockLoadBalancerFactory, + mockTransportFactory, DecompressorRegistry.getDefaultInstance(), + CompressorRegistry.getDefaultInstance(), timerServicePool, executorPool, oobExecutorPool, + timer.getStopwatchSupplier(), TimeUnit.SECONDS.toMillis(IDLE_TIMEOUT_SECONDS), USER_AGENT, + Collections.emptyList(), + NoopStatsContextFactory.INSTANCE); + newTransports = TestUtils.captureTransports(mockTransportFactory); + + for (int i = 0; i < 2; i++) { + ResolvedServerInfoGroup.Builder resolvedServerInfoGroup = ResolvedServerInfoGroup.builder(); + for (int j = 0; j < 2; j++) { + resolvedServerInfoGroup.add( + new ResolvedServerInfo(new FakeSocketAddress("servergroup" + i + "server" + j))); + } + servers.add(resolvedServerInfoGroup.build()); + addressGroupList.add(resolvedServerInfoGroup.build().toEquivalentAddressGroup()); + } + verify(mockNameResolverFactory).newNameResolver(any(URI.class), any(Attributes.class)); + // Verify the initial idleness + verify(mockLoadBalancerFactory, never()).newLoadBalancer(any(Helper.class)); + verify(mockTransportFactory, never()).newClientTransport( + any(SocketAddress.class), anyString(), anyString()); + verify(mockNameResolver, never()).start(any(NameResolver.Listener.class)); + } + + @After + public void allPendingTasksAreRun() { + assertEquals(timer.getPendingTasks() + " should be empty", 0, timer.numPendingTasks()); + assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks()); + } + + @Test + public void newCallExitsIdleness() throws Exception { + final EquivalentAddressGroup addressGroup = addressGroupList.get(1); + + ClientCall call = channel.newCall(method, CallOptions.DEFAULT); + call.start(mockCallListener, new Metadata()); + + ArgumentCaptor helperCaptor = ArgumentCaptor.forClass(null); + verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture()); + Helper helper = helperCaptor.getValue(); + + verify(mockNameResolver).start(nameResolverListenerCaptor.capture()); + // Simulate new address resolved to make sure the LoadBalancer is correctly linked to + // the NameResolver. + nameResolverListenerCaptor.getValue().onUpdate(servers, Attributes.EMPTY); + verify(mockLoadBalancer).handleResolvedAddresses(servers, Attributes.EMPTY); + } + + @Test + public void newCallRefreshesIdlenessTimer() throws Exception { + final EquivalentAddressGroup addressGroup = addressGroupList.get(1); + + // First call to exit the initial idleness, then immediately cancel the call. + ClientCall call = channel.newCall(method, CallOptions.DEFAULT); + call.start(mockCallListener, new Metadata()); + call.cancel("For testing", null); + + // Verify that we have exited the idle mode + ArgumentCaptor helperCaptor = ArgumentCaptor.forClass(null); + verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture()); + Helper helper = helperCaptor.getValue(); + assertFalse(channel.inUseStateAggregator.isInUse()); + + // Move closer to idleness, but not yet. + timer.forwardTime(IDLE_TIMEOUT_SECONDS - 1, TimeUnit.SECONDS); + verify(mockLoadBalancer, never()).shutdown(); + assertFalse(channel.inUseStateAggregator.isInUse()); + + // A new call would refresh the timer + call = channel.newCall(method, CallOptions.DEFAULT); + call.start(mockCallListener, new Metadata()); + call.cancel("For testing", null); + assertFalse(channel.inUseStateAggregator.isInUse()); + + // ... so that passing the same length of time will not trigger idle mode + timer.forwardTime(IDLE_TIMEOUT_SECONDS - 1, TimeUnit.SECONDS); + verify(mockLoadBalancer, never()).shutdown(); + assertFalse(channel.inUseStateAggregator.isInUse()); + + // ... until the time since last call has reached the timeout + timer.forwardTime(1, TimeUnit.SECONDS); + verify(mockLoadBalancer).shutdown(); + assertFalse(channel.inUseStateAggregator.isInUse()); + + // Drain the app executor, which runs the call listeners + verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class)); + assertEquals(2, executor.runDueTasks()); + verify(mockCallListener, times(2)).onClose(any(Status.class), any(Metadata.class)); + } + + @Test + public void delayedTransportHoldsOffIdleness() throws Exception { + ClientCall call = channel.newCall(method, CallOptions.DEFAULT); + call.start(mockCallListener, new Metadata()); + assertTrue(channel.inUseStateAggregator.isInUse()); + + // As long as the delayed transport is in-use (by the pending RPC), the channel won't go idle. + timer.forwardTime(IDLE_TIMEOUT_SECONDS * 2, TimeUnit.SECONDS); + assertTrue(channel.inUseStateAggregator.isInUse()); + + // Cancelling the only RPC will reset the in-use state. + assertEquals(0, executor.numPendingTasks()); + call.cancel("In test", null); + assertEquals(1, executor.runDueTasks()); + assertFalse(channel.inUseStateAggregator.isInUse()); + // And allow the channel to go idle. + timer.forwardTime(IDLE_TIMEOUT_SECONDS - 1, TimeUnit.SECONDS); + verify(mockLoadBalancer, never()).shutdown(); + timer.forwardTime(1, TimeUnit.SECONDS); + verify(mockLoadBalancer).shutdown(); + } + + @Test + public void realTransportsHoldsOffIdleness() throws Exception { + final EquivalentAddressGroup addressGroup = addressGroupList.get(1); + + // Start a call, which goes to delayed transport + ClientCall call = channel.newCall(method, CallOptions.DEFAULT); + call.start(mockCallListener, new Metadata()); + + // Verify that we have exited the idle mode + ArgumentCaptor helperCaptor = ArgumentCaptor.forClass(null); + verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture()); + Helper helper = helperCaptor.getValue(); + assertTrue(channel.inUseStateAggregator.isInUse()); + + // Assume LoadBalancer has received an address, then create a subchannel. + Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); + subchannel.requestConnection(); + MockClientTransportInfo t0 = newTransports.poll(); + t0.listener.transportReady(); + + SubchannelPicker mockPicker = mock(SubchannelPicker.class); + when(mockPicker.pickSubchannel(any(Attributes.class), any(Metadata.class))) + .thenReturn(PickResult.withSubchannel(subchannel)); + helper.updatePicker(mockPicker); + // Delayed transport creates real streams in the app executor + executor.runDueTasks(); + + // Delayed transport exits in-use, while real transport has not entered in-use yet. + assertFalse(channel.inUseStateAggregator.isInUse()); + + // Now it's in-use + t0.listener.transportInUse(true); + assertTrue(channel.inUseStateAggregator.isInUse()); + + // As long as the transport is in-use, the channel won't go idle. + timer.forwardTime(IDLE_TIMEOUT_SECONDS * 2, TimeUnit.SECONDS); + assertTrue(channel.inUseStateAggregator.isInUse()); + + t0.listener.transportInUse(false); + assertFalse(channel.inUseStateAggregator.isInUse()); + // And allow the channel to go idle. + timer.forwardTime(IDLE_TIMEOUT_SECONDS - 1, TimeUnit.SECONDS); + verify(mockLoadBalancer, never()).shutdown(); + timer.forwardTime(1, TimeUnit.SECONDS); + verify(mockLoadBalancer).shutdown(); + } + + @Test + public void oobTransportDoesNotAffectIdleness() { + FakeClock oobExecutor = new FakeClock(); + // Start a call, which goes to delayed transport + ClientCall call = channel.newCall(method, CallOptions.DEFAULT); + call.start(mockCallListener, new Metadata()); + + // Verify that we have exited the idle mode + ArgumentCaptor helperCaptor = ArgumentCaptor.forClass(null); + verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture()); + Helper helper = helperCaptor.getValue(); + + // Fail the RPC + SubchannelPicker failingPicker = mock(SubchannelPicker.class); + when(failingPicker.pickSubchannel(any(Attributes.class), any(Metadata.class))) + .thenReturn(PickResult.withError(Status.UNAVAILABLE)); + helper.updatePicker(failingPicker); + executor.runDueTasks(); + verify(mockCallListener).onClose(same(Status.UNAVAILABLE), any(Metadata.class)); + + // ... so that the channel resets its in-use state + assertFalse(channel.inUseStateAggregator.isInUse()); + + // Now make an RPC on an OOB channel + ManagedChannel oob = helper.createOobChannel(addressGroupList.get(0), "oobauthority"); + verify(mockTransportFactory, never()) + .newClientTransport(any(SocketAddress.class), same("oobauthority"), same(USER_AGENT)); + ClientCall oobCall = oob.newCall(method, CallOptions.DEFAULT); + oobCall.start(mockCallListener2, new Metadata()); + verify(mockTransportFactory) + .newClientTransport(any(SocketAddress.class), same("oobauthority"), same(USER_AGENT)); + MockClientTransportInfo oobTransportInfo = newTransports.poll(); + assertEquals(0, newTransports.size()); + // The OOB transport reports in-use state + oobTransportInfo.listener.transportInUse(true); + + // But it won't stop the channel from going idle + verify(mockLoadBalancer, never()).shutdown(); + timer.forwardTime(IDLE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + verify(mockLoadBalancer).shutdown(); + } + + private static class FakeBackoffPolicyProvider implements BackoffPolicy.Provider { + @Override + public BackoffPolicy get() { + return new BackoffPolicy() { + @Override + public long nextBackoffMillis() { + return 1; + } + }; + } + } + + private static class FakeSocketAddress extends SocketAddress { + final String name; + + FakeSocketAddress(String name) { + this.name = name; + } + + @Override + public String toString() { + return "FakeSocketAddress-" + name; + } + } +} diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImpl2Test.java b/core/src/test/java/io/grpc/internal/ManagedChannelImpl2Test.java new file mode 100644 index 0000000000..0fc0178eb0 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImpl2Test.java @@ -0,0 +1,1217 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; +import static junit.framework.TestCase.assertNotSame; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; + +import io.grpc.Attributes; +import io.grpc.CallCredentials.MetadataApplier; +import io.grpc.CallCredentials; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.CompressorRegistry; +import io.grpc.ConnectivityStateInfo; +import io.grpc.Context; +import io.grpc.DecompressorRegistry; +import io.grpc.EquivalentAddressGroup; +import io.grpc.IntegerMarshaller; +import io.grpc.LoadBalancer2; +import io.grpc.LoadBalancer2.Helper; +import io.grpc.LoadBalancer2.PickResult; +import io.grpc.LoadBalancer2.Subchannel; +import io.grpc.LoadBalancer2.SubchannelPicker; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.NameResolver; +import io.grpc.ResolvedServerInfo; +import io.grpc.ResolvedServerInfoGroup; +import io.grpc.SecurityLevel; +import io.grpc.Status; +import io.grpc.StringMarshaller; +import io.grpc.internal.TestUtils.MockClientTransportInfo; +import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.net.SocketAddress; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** Unit tests for {@link ManagedChannelImpl2}. */ +@RunWith(JUnit4.class) +public class ManagedChannelImpl2Test { + private static final List NO_INTERCEPTOR = + Collections.emptyList(); + private static final Attributes NAME_RESOLVER_PARAMS = + Attributes.newBuilder().set(NameResolver.Factory.PARAMS_DEFAULT_PORT, 447).build(); + private static final MethodDescriptor method = MethodDescriptor.create( + MethodDescriptor.MethodType.UNKNOWN, "/service/method", + new StringMarshaller(), new IntegerMarshaller()); + private static final Attributes.Key SUBCHANNEL_ATTR_KEY = + Attributes.Key.of("subchannel-attr-key"); + private final String serviceName = "fake.example.com"; + private final String authority = serviceName; + private final String userAgent = "userAgent"; + private final String target = "fake://" + serviceName; + private URI expectedUri; + private final SocketAddress socketAddress = new SocketAddress() {}; + private final EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress); + private final ResolvedServerInfo server = new ResolvedServerInfo(socketAddress, Attributes.EMPTY); + private final FakeClock timer = new FakeClock(); + private final FakeClock executor = new FakeClock(); + private final FakeClock oobExecutor = new FakeClock(); + private final FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory(); + + @Rule public final ExpectedException thrown = ExpectedException.none(); + + private ManagedChannelImpl2 channel; + private Helper helper; + @Captor + private ArgumentCaptor statusCaptor; + @Captor + private ArgumentCaptor statsTraceCtxCaptor; + @Mock + private LoadBalancer2.Factory mockLoadBalancerFactory; + @Mock + private LoadBalancer2 mockLoadBalancer; + @Captor + private ArgumentCaptor stateInfoCaptor; + @Mock + private SubchannelPicker mockPicker; + @Mock + private ClientTransportFactory mockTransportFactory; + @Mock + private ClientCall.Listener mockCallListener; + @Mock + private ClientCall.Listener mockCallListener2; + @Mock + private ClientCall.Listener mockCallListener3; + @Mock + private ClientCall.Listener mockCallListener4; + @Mock + private ClientCall.Listener mockCallListener5; + @Mock + private ObjectPool timerServicePool; + @Mock + private ObjectPool executorPool; + @Mock + private ObjectPool oobExecutorPool; + @Mock + private CallCredentials creds; + private BlockingQueue transports; + + private ArgumentCaptor streamListenerCaptor = + ArgumentCaptor.forClass(ClientStreamListener.class); + + private void createChannel( + NameResolver.Factory nameResolverFactory, List interceptors) { + channel = new ManagedChannelImpl2(target, new FakeBackoffPolicyProvider(), + nameResolverFactory, NAME_RESOLVER_PARAMS, mockLoadBalancerFactory, + mockTransportFactory, DecompressorRegistry.getDefaultInstance(), + CompressorRegistry.getDefaultInstance(), timerServicePool, executorPool, oobExecutorPool, + timer.getStopwatchSupplier(), ManagedChannelImpl2.IDLE_TIMEOUT_MILLIS_DISABLE, userAgent, + interceptors, statsCtxFactory); + // Force-exit the initial idle-mode + channel.exitIdleMode(); + assertEquals(0, timer.numPendingTasks()); + + ArgumentCaptor helperCaptor = ArgumentCaptor.forClass(null); + verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture()); + helper = helperCaptor.getValue(); + } + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + expectedUri = new URI(target); + when(mockLoadBalancerFactory.newLoadBalancer(any(Helper.class))).thenReturn(mockLoadBalancer); + transports = TestUtils.captureTransports(mockTransportFactory); + when(timerServicePool.getObject()).thenReturn(timer.getScheduledExecutorService()); + when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService()); + when(oobExecutorPool.getObject()).thenReturn(oobExecutor.getScheduledExecutorService()); + } + + @After + public void allPendingTasksAreRun() throws Exception { + // The "never" verifications in the tests only hold up if all due tasks are done. + // As for timer, although there may be scheduled tasks in a future time, since we don't test + // any time-related behavior in this test suite, we only care the tasks that are due. This + // would ignore any time-sensitive tasks, e.g., back-off and the idle timer. + assertTrue(timer.getDueTasks() + " should be empty", timer.getDueTasks().isEmpty()); + assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks()); + } + + /** + * The counterpart of {@link ManagedChannelImpl2IdlenessTest#enterIdleModeAfterForceExit}. + */ + @Test + @SuppressWarnings("unchecked") + public void idleModeDisabled() { + createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); + + // In this test suite, the channel is always created with idle mode disabled. + // No task is scheduled to enter idle mode + assertEquals(0, timer.numPendingTasks()); + assertEquals(0, executor.numPendingTasks()); + } + + @Test + public void immediateDeadlineExceeded() { + createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); + ClientCall call = + channel.newCall(method, CallOptions.DEFAULT.withDeadlineAfter(0, TimeUnit.NANOSECONDS)); + call.start(mockCallListener, new Metadata()); + assertEquals(1, executor.runDueTasks()); + + verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class)); + Status status = statusCaptor.getValue(); + assertSame(Status.DEADLINE_EXCEEDED.getCode(), status.getCode()); + } + + @Test + public void shutdownWithNoTransportsEverCreated() { + createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); + verify(executorPool).getObject(); + verify(timerServicePool).getObject(); + verify(executorPool, never()).returnObject(anyObject()); + verify(timerServicePool, never()).returnObject(anyObject()); + verifyNoMoreInteractions(mockTransportFactory); + channel.shutdown(); + assertTrue(channel.isShutdown()); + assertTrue(channel.isTerminated()); + verify(executorPool).returnObject(executor.getScheduledExecutorService()); + verify(timerServicePool).returnObject(timer.getScheduledExecutorService()); + } + + @Test + public void callsAndShutdown() { + subtestCallsAndShutdown(false, false); + } + + @Test + public void callsAndShutdownNow() { + subtestCallsAndShutdown(true, false); + } + + /** Make sure shutdownNow() after shutdown() has an effect. */ + @Test + public void callsAndShutdownAndShutdownNow() { + subtestCallsAndShutdown(false, true); + } + + private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAfterShutdown) { + FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true); + createChannel(nameResolverFactory, NO_INTERCEPTOR); + verify(executorPool).getObject(); + verify(timerServicePool).getObject(); + ClientStream mockStream = mock(ClientStream.class); + ClientStream mockStream2 = mock(ClientStream.class); + Metadata headers = new Metadata(); + Metadata headers2 = new Metadata(); + + // Configure the picker so that first RPC goes to delayed transport, and second RPC goes to + // real transport. + Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); + subchannel.requestConnection(); + verify(mockTransportFactory).newClientTransport( + any(SocketAddress.class), any(String.class), any(String.class)); + MockClientTransportInfo transportInfo = transports.poll(); + ConnectionClientTransport mockTransport = transportInfo.transport; + verify(mockTransport).start(any(ManagedClientTransport.Listener.class)); + ManagedClientTransport.Listener transportListener = transportInfo.listener; + when(mockTransport.newStream(same(method), same(headers), same(CallOptions.DEFAULT), + any(StatsTraceContext.class))) + .thenReturn(mockStream); + when(mockTransport.newStream(same(method), same(headers2), same(CallOptions.DEFAULT), + any(StatsTraceContext.class))) + .thenReturn(mockStream2); + transportListener.transportReady(); + when(mockPicker.pickSubchannel(any(Attributes.class), same(headers))).thenReturn( + PickResult.withNoResult()); + when(mockPicker.pickSubchannel(any(Attributes.class), same(headers2))).thenReturn( + PickResult.withSubchannel(subchannel)); + helper.updatePicker(mockPicker); + + // First RPC, will be pending + ClientCall call = channel.newCall(method, CallOptions.DEFAULT); + verifyNoMoreInteractions(mockTransportFactory); + call.start(mockCallListener, headers); + + verify(mockTransport, never()).newStream(same(method), same(headers), same(CallOptions.DEFAULT), + any(StatsTraceContext.class)); + statsCtxFactory.pollContextOrFail(); + + // Second RPC, will be assigned to the real transport + ClientCall call2 = channel.newCall(method, CallOptions.DEFAULT); + call2.start(mockCallListener2, headers2); + verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT), + statsTraceCtxCaptor.capture()); + assertEquals(statsCtxFactory.pollContextOrFail(), + statsTraceCtxCaptor.getValue().getStatsContext()); + verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT), + statsTraceCtxCaptor.capture()); + verify(mockStream2).start(any(ClientStreamListener.class)); + + // Shutdown + if (shutdownNow) { + channel.shutdownNow(); + } else { + channel.shutdown(); + if (shutdownNowAfterShutdown) { + channel.shutdownNow(); + shutdownNow = true; + } + } + assertTrue(channel.isShutdown()); + assertFalse(channel.isTerminated()); + assertEquals(1, nameResolverFactory.resolvers.size()); + verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class)); + + // Further calls should fail without going to the transport + ClientCall call3 = channel.newCall(method, CallOptions.DEFAULT); + call3.start(mockCallListener3, headers2); + timer.runDueTasks(); + executor.runDueTasks(); + + verify(mockCallListener3).onClose(statusCaptor.capture(), any(Metadata.class)); + assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); + + if (shutdownNow) { + // LoadBalancer and NameResolver are shut down as soon as delayed transport is terminated. + verify(mockLoadBalancer).shutdown(); + assertTrue(nameResolverFactory.resolvers.get(0).shutdown); + // call should have been aborted by delayed transport + executor.runDueTasks(); + verify(mockCallListener).onClose(same(ManagedChannelImpl2.SHUTDOWN_NOW_STATUS), + any(Metadata.class)); + } else { + // LoadBalancer and NameResolver are still running. + verify(mockLoadBalancer, never()).shutdown(); + assertFalse(nameResolverFactory.resolvers.get(0).shutdown); + // call and call2 are still alive, and can still be assigned to a real transport + SubchannelPicker picker2 = mock(SubchannelPicker.class); + when(picker2.pickSubchannel(any(Attributes.class), same(headers))).thenReturn( + PickResult.withSubchannel(subchannel)); + helper.updatePicker(picker2); + executor.runDueTasks(); + verify(mockTransport).newStream(same(method), same(headers), same(CallOptions.DEFAULT), + any(StatsTraceContext.class)); + verify(mockStream).start(any(ClientStreamListener.class)); + } + + // After call is moved out of delayed transport, LoadBalancer, NameResolver and the transports + // will be shutdown. + verify(mockLoadBalancer).shutdown(); + assertTrue(nameResolverFactory.resolvers.get(0).shutdown); + + if (shutdownNow) { + // Channel shutdownNow() all subchannels after shutting down LoadBalancer + verify(mockTransport).shutdownNow(ManagedChannelImpl2.SHUTDOWN_NOW_STATUS); + } else { + verify(mockTransport, never()).shutdownNow(any(Status.class)); + } + // LoadBalancer should shutdown the subchannel + subchannel.shutdown(); + verify(mockTransport).shutdown(); + + // Killing the remaining real transport will terminate the channel + transportListener.transportShutdown(Status.UNAVAILABLE); + assertFalse(channel.isTerminated()); + verify(executorPool, never()).returnObject(anyObject()); + verify(timerServicePool, never()).returnObject(anyObject()); + transportListener.transportTerminated(); + assertTrue(channel.isTerminated()); + verify(executorPool).returnObject(executor.getScheduledExecutorService()); + verify(timerServicePool).returnObject(timer.getScheduledExecutorService()); + verifyNoMoreInteractions(oobExecutorPool); + + verify(mockTransportFactory).close(); + verifyNoMoreInteractions(mockTransportFactory); + verify(mockTransport, atLeast(0)).getLogId(); + verifyNoMoreInteractions(mockTransport); + } + + @Test + public void shutdownNowWithMultipleOobChannels() { + } + + @Test + public void interceptor() throws Exception { + final AtomicLong atomic = new AtomicLong(); + ClientInterceptor interceptor = new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, + Channel next) { + atomic.set(1); + return next.newCall(method, callOptions); + } + }; + createChannel(new FakeNameResolverFactory(true), Arrays.asList(interceptor)); + assertNotNull(channel.newCall(method, CallOptions.DEFAULT)); + assertEquals(1, atomic.get()); + } + + @Test + public void callOptionsExecutor() { + Metadata headers = new Metadata(); + ClientStream mockStream = mock(ClientStream.class); + FakeClock callExecutor = new FakeClock(); + createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); + + // Start a call with a call executor + CallOptions options = + CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService()); + ClientCall call = channel.newCall(method, options); + call.start(mockCallListener, headers); + + // Make the transport available + Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); + verify(mockTransportFactory, never()).newClientTransport( + any(SocketAddress.class), any(String.class), any(String.class)); + subchannel.requestConnection(); + verify(mockTransportFactory).newClientTransport( + any(SocketAddress.class), any(String.class), any(String.class)); + MockClientTransportInfo transportInfo = transports.poll(); + ConnectionClientTransport mockTransport = transportInfo.transport; + ManagedClientTransport.Listener transportListener = transportInfo.listener; + when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class), + any(StatsTraceContext.class))) + .thenReturn(mockStream); + transportListener.transportReady(); + when(mockPicker.pickSubchannel(any(Attributes.class), any(Metadata.class))) + .thenReturn(PickResult.withSubchannel(subchannel)); + assertEquals(0, callExecutor.numPendingTasks()); + helper.updatePicker(mockPicker); + + // Real streams are started in the call executor if they were previously buffered. + assertEquals(1, callExecutor.runDueTasks()); + verify(mockTransport).newStream(same(method), same(headers), same(options), + any(StatsTraceContext.class)); + verify(mockStream).start(streamListenerCaptor.capture()); + + // Call listener callbacks are also run in the call executor + ClientStreamListener streamListener = streamListenerCaptor.getValue(); + Metadata trailers = new Metadata(); + assertEquals(0, callExecutor.numPendingTasks()); + streamListener.closed(Status.CANCELLED, trailers); + verify(mockCallListener, never()).onClose(same(Status.CANCELLED), same(trailers)); + assertEquals(1, callExecutor.runDueTasks()); + verify(mockCallListener).onClose(same(Status.CANCELLED), same(trailers)); + } + + @Test + public void nameResolutionFailed() { + Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error")); + + // Name resolution is started as soon as channel is created. + createChannel(new FailingNameResolverFactory(error), NO_INTERCEPTOR); + verify(mockLoadBalancer).handleNameResolutionError(same(error)); + } + + @Test + public void nameResolverReturnsEmptySubLists() { + String errorDescription = "NameResolver returned an empty list"; + + // Pass a FakeNameResolverFactory with an empty list + createChannel(new FakeNameResolverFactory(), NO_INTERCEPTOR); + + // LoadBalancer received the error + verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class)); + verify(mockLoadBalancer).handleNameResolutionError(statusCaptor.capture()); + Status status = statusCaptor.getValue(); + assertSame(Status.Code.UNAVAILABLE, status.getCode()); + assertEquals(errorDescription, status.getDescription()); + } + + @Test + public void loadBalancerThrowsInHandleResolvedAddresses() { + RuntimeException ex = new RuntimeException("simulated"); + // Delay the success of name resolution until allResolved() is called + FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(false); + createChannel(nameResolverFactory, NO_INTERCEPTOR); + + verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class)); + doThrow(ex).when(mockLoadBalancer).handleResolvedAddresses( + Matchers.>anyObject(), any(Attributes.class)); + + // NameResolver returns addresses. + nameResolverFactory.allResolved(); + + // The LoadBalancer will receive the error that it has thrown. + verify(mockLoadBalancer).handleNameResolutionError(statusCaptor.capture()); + Status status = statusCaptor.getValue(); + assertSame(Status.Code.INTERNAL, status.getCode()); + assertSame(ex, status.getCause()); + } + + @Test + public void nameResolvedAfterChannelShutdown() { + // Delay the success of name resolution until allResolved() is called. + FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(false); + createChannel(nameResolverFactory, NO_INTERCEPTOR); + + channel.shutdown(); + + assertTrue(channel.isShutdown()); + assertTrue(channel.isTerminated()); + verify(mockLoadBalancer).shutdown(); + // Name resolved after the channel is shut down, which is possible if the name resolution takes + // time and is not cancellable. The resolved address will be dropped. + nameResolverFactory.allResolved(); + verifyNoMoreInteractions(mockLoadBalancer); + } + + /** + * Verify that if the first resolved address points to a server that cannot be connected, the call + * will end up with the second address which works. + */ + @Test + public void firstResolvedServerFailedToConnect() throws Exception { + final SocketAddress goodAddress = new SocketAddress() { + @Override public String toString() { + return "goodAddress"; + } + }; + final SocketAddress badAddress = new SocketAddress() { + @Override public String toString() { + return "badAddress"; + } + }; + final ResolvedServerInfo goodServer = new ResolvedServerInfo(goodAddress, Attributes.EMPTY); + final ResolvedServerInfo badServer = new ResolvedServerInfo(badAddress, Attributes.EMPTY); + InOrder inOrder = inOrder(mockLoadBalancer); + + ResolvedServerInfoGroup serverInfoGroup = ResolvedServerInfoGroup.builder() + .add(badServer) + .add(goodServer) + .build(); + FakeNameResolverFactory nameResolverFactory = + new FakeNameResolverFactory(serverInfoGroup.getResolvedServerInfoList()); + createChannel(nameResolverFactory, NO_INTERCEPTOR); + + // Start the call + ClientCall call = channel.newCall(method, CallOptions.DEFAULT); + Metadata headers = new Metadata(); + call.start(mockCallListener, headers); + executor.runDueTasks(); + + // Simulate name resolution results + inOrder.verify(mockLoadBalancer).handleResolvedAddresses( + eq(Arrays.asList(serverInfoGroup)), eq(Attributes.EMPTY)); + Subchannel subchannel = helper.createSubchannel( + serverInfoGroup.toEquivalentAddressGroup(), Attributes.EMPTY); + when(mockPicker.pickSubchannel(any(Attributes.class), any(Metadata.class))) + .thenReturn(PickResult.withSubchannel(subchannel)); + subchannel.requestConnection(); + inOrder.verify(mockLoadBalancer).handleSubchannelState( + same(subchannel), stateInfoCaptor.capture()); + assertEquals(CONNECTING, stateInfoCaptor.getValue().getState()); + + // The channel will starts with the first address (badAddress) + verify(mockTransportFactory) + .newClientTransport(same(badAddress), any(String.class), any(String.class)); + verify(mockTransportFactory, times(0)) + .newClientTransport(same(goodAddress), any(String.class), any(String.class)); + + MockClientTransportInfo badTransportInfo = transports.poll(); + // Which failed to connect + badTransportInfo.listener.transportShutdown(Status.UNAVAILABLE); + inOrder.verifyNoMoreInteractions(); + + // The channel then try the second address (goodAddress) + verify(mockTransportFactory) + .newClientTransport(same(goodAddress), any(String.class), any(String.class)); + MockClientTransportInfo goodTransportInfo = transports.poll(); + when(goodTransportInfo.transport.newStream( + any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class), + any(StatsTraceContext.class))) + .thenReturn(mock(ClientStream.class)); + + goodTransportInfo.listener.transportReady(); + inOrder.verify(mockLoadBalancer).handleSubchannelState( + same(subchannel), stateInfoCaptor.capture()); + assertEquals(READY, stateInfoCaptor.getValue().getState()); + + // A typical LoadBalancer will call this once the subchannel becomes READY + helper.updatePicker(mockPicker); + // Delayed transport uses the app executor to create real streams. + executor.runDueTasks(); + + verify(goodTransportInfo.transport).newStream(same(method), same(headers), + same(CallOptions.DEFAULT), any(StatsTraceContext.class)); + // The bad transport was never used. + verify(badTransportInfo.transport, times(0)).newStream(any(MethodDescriptor.class), + any(Metadata.class), any(CallOptions.class), any(StatsTraceContext.class)); + } + + /** + * Verify that if all resolved addresses failed to connect, a fail-fast call will fail, while a + * wait-for-ready call will still be buffered. + */ + @Test + public void allServersFailedToConnect() throws Exception { + final SocketAddress addr1 = new SocketAddress() { + @Override public String toString() { + return "addr1"; + } + }; + final SocketAddress addr2 = new SocketAddress() { + @Override public String toString() { + return "addr2"; + } + }; + final ResolvedServerInfo server1 = new ResolvedServerInfo(addr1, Attributes.EMPTY); + final ResolvedServerInfo server2 = new ResolvedServerInfo(addr2, Attributes.EMPTY); + InOrder inOrder = inOrder(mockLoadBalancer); + + ResolvedServerInfoGroup serverInfoGroup = ResolvedServerInfoGroup.builder() + .add(server1) + .add(server2) + .build(); + + FakeNameResolverFactory nameResolverFactory = + new FakeNameResolverFactory(serverInfoGroup.getResolvedServerInfoList()); + createChannel(nameResolverFactory, NO_INTERCEPTOR); + + // Start a wait-for-ready call + ClientCall call = + channel.newCall(method, CallOptions.DEFAULT.withWaitForReady()); + Metadata headers = new Metadata(); + call.start(mockCallListener, headers); + // ... and a fail-fast call + ClientCall call2 = + channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady()); + call2.start(mockCallListener2, headers); + executor.runDueTasks(); + + // Simulate name resolution results + inOrder.verify(mockLoadBalancer).handleResolvedAddresses( + eq(Arrays.asList(serverInfoGroup)), eq(Attributes.EMPTY)); + Subchannel subchannel = helper.createSubchannel( + serverInfoGroup.toEquivalentAddressGroup(), Attributes.EMPTY); + when(mockPicker.pickSubchannel(any(Attributes.class), any(Metadata.class))) + .thenReturn(PickResult.withSubchannel(subchannel)); + subchannel.requestConnection(); + inOrder.verify(mockLoadBalancer).handleSubchannelState( + same(subchannel), stateInfoCaptor.capture()); + assertEquals(CONNECTING, stateInfoCaptor.getValue().getState()); + + // Connecting to server1, which will fail + verify(mockTransportFactory) + .newClientTransport(same(addr1), any(String.class), any(String.class)); + verify(mockTransportFactory, times(0)) + .newClientTransport(same(addr2), any(String.class), any(String.class)); + MockClientTransportInfo transportInfo1 = transports.poll(); + transportInfo1.listener.transportShutdown(Status.UNAVAILABLE); + + // Connecting to server2, which will fail too + verify(mockTransportFactory) + .newClientTransport(same(addr2), any(String.class), any(String.class)); + MockClientTransportInfo transportInfo2 = transports.poll(); + Status server2Error = Status.UNAVAILABLE.withDescription("Server2 failed to connect"); + transportInfo2.listener.transportShutdown(server2Error); + + // ... which makes the subchannel enter TRANSIENT_FAILURE. The last error Status is propagated + // to LoadBalancer. + inOrder.verify(mockLoadBalancer).handleSubchannelState( + same(subchannel), stateInfoCaptor.capture()); + assertEquals(TRANSIENT_FAILURE, stateInfoCaptor.getValue().getState()); + assertSame(server2Error, stateInfoCaptor.getValue().getStatus()); + + // A typical LoadBalancer would create a picker with error + SubchannelPicker picker2 = mock(SubchannelPicker.class); + when(picker2.pickSubchannel(any(Attributes.class), any(Metadata.class))) + .thenReturn(PickResult.withError(server2Error)); + helper.updatePicker(picker2); + executor.runDueTasks(); + + // ... which fails the fail-fast call + verify(mockCallListener2).onClose(same(server2Error), any(Metadata.class)); + // ... while the wait-for-ready call stays + verifyNoMoreInteractions(mockCallListener); + // No real stream was ever created + verify(transportInfo1.transport, times(0)) + .newStream(any(MethodDescriptor.class), any(Metadata.class)); + verify(transportInfo2.transport, times(0)) + .newStream(any(MethodDescriptor.class), any(Metadata.class)); + } + + @Test + public void subchannels() { + createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); + + // createSubchannel() always return a new Subchannel + Attributes attrs1 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr1").build(); + Attributes attrs2 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr2").build(); + Subchannel sub1 = helper.createSubchannel(addressGroup, attrs1); + Subchannel sub2 = helper.createSubchannel(addressGroup, attrs2); + assertNotSame(sub1, sub2); + assertNotSame(attrs1, attrs2); + assertSame(attrs1, sub1.getAttributes()); + assertSame(attrs2, sub2.getAttributes()); + assertSame(addressGroup, sub1.getAddresses()); + assertSame(addressGroup, sub2.getAddresses()); + + // requestConnection() + verify(mockTransportFactory, never()).newClientTransport( + any(SocketAddress.class), any(String.class), any(String.class)); + sub1.requestConnection(); + verify(mockTransportFactory).newClientTransport(socketAddress, authority, userAgent); + MockClientTransportInfo transportInfo1 = transports.poll(); + assertNotNull(transportInfo1); + + sub2.requestConnection(); + verify(mockTransportFactory, times(2)).newClientTransport(socketAddress, authority, userAgent); + MockClientTransportInfo transportInfo2 = transports.poll(); + assertNotNull(transportInfo2); + + sub1.requestConnection(); + sub2.requestConnection(); + verify(mockTransportFactory, times(2)).newClientTransport(socketAddress, authority, userAgent); + + // shutdown() has a delay + sub1.shutdown(); + timer.forwardTime(ManagedChannelImpl2.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS - 1, TimeUnit.SECONDS); + sub1.shutdown(); + verify(transportInfo1.transport, never()).shutdown(); + timer.forwardTime(1, TimeUnit.SECONDS); + verify(transportInfo1.transport).shutdown(); + + // ... but not after Channel is terminating + verify(mockLoadBalancer, never()).shutdown(); + channel.shutdown(); + verify(mockLoadBalancer).shutdown(); + verify(transportInfo2.transport, never()).shutdown(); + + sub2.shutdown(); + verify(transportInfo2.transport).shutdown(); + } + + @Test + public void subchannelsWhenChannelShutdownNow() { + createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); + Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY); + Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY); + sub1.requestConnection(); + sub2.requestConnection(); + + assertEquals(2, transports.size()); + MockClientTransportInfo ti1 = transports.poll(); + MockClientTransportInfo ti2 = transports.poll(); + + ti1.listener.transportReady(); + ti2.listener.transportReady(); + + channel.shutdownNow(); + verify(ti1.transport).shutdownNow(any(Status.class)); + verify(ti2.transport).shutdownNow(any(Status.class)); + + ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); + ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); + ti1.listener.transportTerminated(); + + assertFalse(channel.isTerminated()); + ti2.listener.transportTerminated(); + assertTrue(channel.isTerminated()); + } + + @Test + public void subchannelsNoConnectionShutdown() { + createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); + Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY); + Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY); + + channel.shutdown(); + verify(mockLoadBalancer).shutdown(); + sub1.shutdown(); + assertFalse(channel.isTerminated()); + sub2.shutdown(); + assertTrue(channel.isTerminated()); + verify(mockTransportFactory, never()).newClientTransport(any(SocketAddress.class), anyString(), + anyString()); + } + + @Test + public void subchannelsNoConnectionShutdownNow() { + createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); + Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY); + Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY); + channel.shutdownNow(); + + verify(mockLoadBalancer).shutdown(); + // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels. + // Therefore, channel is terminated without relying on LoadBalancer to shutdown subchannels. + assertTrue(channel.isTerminated()); + verify(mockTransportFactory, never()).newClientTransport(any(SocketAddress.class), anyString(), + anyString()); + } + + @Test + public void oobchannels() { + createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); + + ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1authority"); + ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2authority"); + verify(oobExecutorPool, times(2)).getObject(); + + assertEquals("oob1authority", oob1.authority()); + assertEquals("oob2authority", oob2.authority()); + + // OOB channels create connections lazily. A new call will initiate the connection. + Metadata headers = new Metadata(); + ClientCall call = oob1.newCall(method, CallOptions.DEFAULT); + call.start(mockCallListener, headers); + verify(mockTransportFactory).newClientTransport(socketAddress, "oob1authority", userAgent); + MockClientTransportInfo transportInfo = transports.poll(); + assertNotNull(transportInfo); + + assertEquals(0, oobExecutor.numPendingTasks()); + transportInfo.listener.transportReady(); + assertEquals(1, oobExecutor.runDueTasks()); + verify(transportInfo.transport).newStream(same(method), same(headers), + same(CallOptions.DEFAULT), any(StatsTraceContext.class)); + + // The transport goes away + transportInfo.listener.transportShutdown(Status.UNAVAILABLE); + transportInfo.listener.transportTerminated(); + + // A new call will trigger a new transport + ClientCall call2 = oob1.newCall(method, CallOptions.DEFAULT); + call2.start(mockCallListener2, headers); + ClientCall call3 = + oob1.newCall(method, CallOptions.DEFAULT.withWaitForReady()); + call3.start(mockCallListener3, headers); + verify(mockTransportFactory, times(2)).newClientTransport( + socketAddress, "oob1authority", userAgent); + transportInfo = transports.poll(); + assertNotNull(transportInfo); + + // This transport fails + Status transportError = Status.UNAVAILABLE.withDescription("Connection refused"); + assertEquals(0, oobExecutor.numPendingTasks()); + transportInfo.listener.transportShutdown(transportError); + assertTrue(oobExecutor.runDueTasks() > 0); + + // Fail-fast RPC will fail, while wait-for-ready RPC will still be pending + verify(mockCallListener2).onClose(same(transportError), any(Metadata.class)); + verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class)); + + // Shutdown + assertFalse(oob1.isShutdown()); + assertFalse(oob2.isShutdown()); + oob1.shutdown(); + verify(oobExecutorPool, never()).returnObject(anyObject()); + oob2.shutdownNow(); + assertTrue(oob1.isShutdown()); + assertTrue(oob2.isShutdown()); + assertTrue(oob2.isTerminated()); + verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService()); + + // New RPCs will be rejected. + assertEquals(0, oobExecutor.numPendingTasks()); + ClientCall call4 = oob1.newCall(method, CallOptions.DEFAULT); + ClientCall call5 = oob2.newCall(method, CallOptions.DEFAULT); + call4.start(mockCallListener4, headers); + call5.start(mockCallListener5, headers); + assertTrue(oobExecutor.runDueTasks() > 0); + verify(mockCallListener4).onClose(statusCaptor.capture(), any(Metadata.class)); + Status status4 = statusCaptor.getValue(); + assertEquals(Status.Code.UNAVAILABLE, status4.getCode()); + verify(mockCallListener5).onClose(statusCaptor.capture(), any(Metadata.class)); + Status status5 = statusCaptor.getValue(); + assertEquals(Status.Code.UNAVAILABLE, status5.getCode()); + + // The pending RPC will still be pending + verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class)); + + // This will shutdownNow() the delayed transport, terminating the pending RPC + assertEquals(0, oobExecutor.numPendingTasks()); + oob1.shutdownNow(); + assertTrue(oobExecutor.runDueTasks() > 0); + verify(mockCallListener3).onClose(any(Status.class), any(Metadata.class)); + + // Shut down the channel, and it will not terminated because OOB channel has not. + channel.shutdown(); + assertFalse(channel.isTerminated()); + // Delayed transport has already terminated. Terminating the transport terminates the + // subchannel, which in turn terimates the OOB channel, which terminates the channel. + assertFalse(oob1.isTerminated()); + verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService()); + transportInfo.listener.transportTerminated(); + assertTrue(oob1.isTerminated()); + assertTrue(channel.isTerminated()); + verify(oobExecutorPool, times(2)).returnObject(oobExecutor.getScheduledExecutorService()); + } + + @Test + public void oobChannelsWhenChannelShutdownNow() { + createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); + ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority"); + ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority"); + + oob1.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata()); + oob2.newCall(method, CallOptions.DEFAULT).start(mockCallListener2, new Metadata()); + + assertEquals(2, transports.size()); + MockClientTransportInfo ti1 = transports.poll(); + MockClientTransportInfo ti2 = transports.poll(); + + ti1.listener.transportReady(); + ti2.listener.transportReady(); + + channel.shutdownNow(); + verify(ti1.transport).shutdownNow(any(Status.class)); + verify(ti2.transport).shutdownNow(any(Status.class)); + + ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); + ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); + ti1.listener.transportTerminated(); + + assertFalse(channel.isTerminated()); + ti2.listener.transportTerminated(); + assertTrue(channel.isTerminated()); + } + + @Test + public void oobChannelsNoConnectionShutdown() { + createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); + ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority"); + ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority"); + channel.shutdown(); + + verify(mockLoadBalancer).shutdown(); + oob1.shutdown(); + assertTrue(oob1.isTerminated()); + assertFalse(channel.isTerminated()); + oob2.shutdown(); + assertTrue(oob2.isTerminated()); + assertTrue(channel.isTerminated()); + verify(mockTransportFactory, never()).newClientTransport(any(SocketAddress.class), anyString(), + anyString()); + } + + @Test + public void oobChannelsNoConnectionShutdownNow() { + createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); + ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority"); + ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority"); + channel.shutdownNow(); + + verify(mockLoadBalancer).shutdown(); + assertTrue(channel.isTerminated()); + // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels. + // Therefore, channel is terminated without relying on LoadBalancer to shutdown oobchannels. + verify(mockTransportFactory, never()).newClientTransport(any(SocketAddress.class), anyString(), + anyString()); + } + + @Test + public void uriPattern() { + assertTrue(ManagedChannelImpl2.URI_PATTERN.matcher("a:/").matches()); + assertTrue(ManagedChannelImpl2.URI_PATTERN.matcher("Z019+-.:/!@ #~ ").matches()); + assertFalse(ManagedChannelImpl2.URI_PATTERN.matcher("a/:").matches()); // "/:" not matched + assertFalse(ManagedChannelImpl2.URI_PATTERN.matcher("0a:/").matches()); // '0' not matched + assertFalse(ManagedChannelImpl2.URI_PATTERN.matcher("a,:/").matches()); // ',' not matched + assertFalse(ManagedChannelImpl2.URI_PATTERN.matcher(" a:/").matches()); // space not matched + } + + /** + * Test that information such as the Call's context, MethodDescriptor, authority, executor are + * propagated to newStream() and applyRequestMetadata(). + */ + @Test + public void informationPropagatedToNewStreamAndCallCredentials() { + ResolvedServerInfoGroup serverInfoGroup = ResolvedServerInfoGroup.builder() + .add(server).build(); + createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); + CallOptions callOptions = CallOptions.DEFAULT.withCallCredentials(creds); + final Context.Key testKey = Context.key("testing"); + Context ctx = Context.current().withValue(testKey, "testValue"); + final LinkedList credsApplyContexts = new LinkedList(); + final LinkedList newStreamContexts = new LinkedList(); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock in) throws Throwable { + credsApplyContexts.add(Context.current()); + return null; + } + }).when(creds).applyRequestMetadata( + any(MethodDescriptor.class), any(Attributes.class), any(Executor.class), + any(MetadataApplier.class)); + + // First call will be on delayed transport. Only newCall() is run within the expected context, + // so that we can verify that the context is explicitly attached before calling newStream() and + // applyRequestMetadata(), which happens after we detach the context from the thread. + Context origCtx = ctx.attach(); + assertEquals("testValue", testKey.get()); + ClientCall call = channel.newCall(method, callOptions); + ctx.detach(origCtx); + assertNull(testKey.get()); + call.start(mockCallListener, new Metadata()); + + // Simulate name resolution results + Subchannel subchannel = helper.createSubchannel( + serverInfoGroup.toEquivalentAddressGroup(), Attributes.EMPTY); + subchannel.requestConnection(); + verify(mockTransportFactory).newClientTransport( + same(socketAddress), eq(authority), eq(userAgent)); + MockClientTransportInfo transportInfo = transports.poll(); + final ConnectionClientTransport transport = transportInfo.transport; + when(transport.getAttrs()).thenReturn(Attributes.EMPTY); + doAnswer(new Answer() { + @Override + public ClientStream answer(InvocationOnMock in) throws Throwable { + newStreamContexts.add(Context.current()); + return mock(ClientStream.class); + } + }).when(transport).newStream( + any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class), + any(StatsTraceContext.class)); + + verify(creds, never()).applyRequestMetadata( + any(MethodDescriptor.class), any(Attributes.class), any(Executor.class), + any(MetadataApplier.class)); + + // applyRequestMetadata() is called after the transport becomes ready. + transportInfo.listener.transportReady(); + when(mockPicker.pickSubchannel(any(Attributes.class), any(Metadata.class))) + .thenReturn(PickResult.withSubchannel(subchannel)); + helper.updatePicker(mockPicker); + executor.runDueTasks(); + ArgumentCaptor attrsCaptor = ArgumentCaptor.forClass(Attributes.class); + ArgumentCaptor applierCaptor = ArgumentCaptor.forClass(MetadataApplier.class); + verify(creds).applyRequestMetadata(same(method), attrsCaptor.capture(), + same(executor.getScheduledExecutorService()), applierCaptor.capture()); + assertEquals("testValue", testKey.get(credsApplyContexts.poll())); + assertEquals(authority, attrsCaptor.getValue().get(CallCredentials.ATTR_AUTHORITY)); + assertEquals(SecurityLevel.NONE, + attrsCaptor.getValue().get(CallCredentials.ATTR_SECURITY_LEVEL)); + verify(transport, never()).newStream( + any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class), + any(StatsTraceContext.class)); + + // newStream() is called after apply() is called + applierCaptor.getValue().apply(new Metadata()); + verify(transport).newStream(same(method), any(Metadata.class), same(callOptions), + any(StatsTraceContext.class)); + assertEquals("testValue", testKey.get(newStreamContexts.poll())); + // The context should not live beyond the scope of newStream() and applyRequestMetadata() + assertNull(testKey.get()); + + + // Second call will not be on delayed transport + origCtx = ctx.attach(); + call = channel.newCall(method, callOptions); + ctx.detach(origCtx); + call.start(mockCallListener, new Metadata()); + + verify(creds, times(2)).applyRequestMetadata(same(method), attrsCaptor.capture(), + same(executor.getScheduledExecutorService()), applierCaptor.capture()); + assertEquals("testValue", testKey.get(credsApplyContexts.poll())); + assertEquals(authority, attrsCaptor.getValue().get(CallCredentials.ATTR_AUTHORITY)); + assertEquals(SecurityLevel.NONE, + attrsCaptor.getValue().get(CallCredentials.ATTR_SECURITY_LEVEL)); + // This is from the first call + verify(transport).newStream( + any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class), + any(StatsTraceContext.class)); + + // Still, newStream() is called after apply() is called + applierCaptor.getValue().apply(new Metadata()); + verify(transport, times(2)).newStream(same(method), any(Metadata.class), same(callOptions), + any(StatsTraceContext.class)); + assertEquals("testValue", testKey.get(newStreamContexts.poll())); + + assertNull(testKey.get()); + } + + private static class FakeBackoffPolicyProvider implements BackoffPolicy.Provider { + @Override + public BackoffPolicy get() { + return new BackoffPolicy() { + @Override + public long nextBackoffMillis() { + return 1; + } + }; + } + } + + private class FakeNameResolverFactory extends NameResolver.Factory { + final List servers; + final boolean resolvedAtStart; + final ArrayList resolvers = new ArrayList(); + + FakeNameResolverFactory(boolean resolvedAtStart) { + this.resolvedAtStart = resolvedAtStart; + servers = Collections.singletonList(ResolvedServerInfoGroup.builder().add(server).build()); + } + + FakeNameResolverFactory(List servers) { + resolvedAtStart = true; + this.servers = Collections.singletonList( + ResolvedServerInfoGroup.builder().addAll(servers).build()); + } + + public FakeNameResolverFactory() { + resolvedAtStart = true; + this.servers = ImmutableList.of(); + } + + @Override + public NameResolver newNameResolver(final URI targetUri, Attributes params) { + if (!expectedUri.equals(targetUri)) { + return null; + } + assertSame(NAME_RESOLVER_PARAMS, params); + FakeNameResolver resolver = new FakeNameResolver(); + resolvers.add(resolver); + return resolver; + } + + @Override + public String getDefaultScheme() { + return "fake"; + } + + void allResolved() { + for (FakeNameResolver resolver : resolvers) { + resolver.resolved(); + } + } + + private class FakeNameResolver extends NameResolver { + Listener listener; + boolean shutdown; + + @Override public String getServiceAuthority() { + return expectedUri.getAuthority(); + } + + @Override public void start(final Listener listener) { + this.listener = listener; + if (resolvedAtStart) { + resolved(); + } + } + + void resolved() { + listener.onUpdate(servers, Attributes.EMPTY); + } + + @Override public void shutdown() { + shutdown = true; + } + } + } + + private static class FailingNameResolverFactory extends NameResolver.Factory { + final Status error; + + FailingNameResolverFactory(Status error) { + this.error = error; + } + + @Override + public NameResolver newNameResolver(URI notUsedUri, Attributes params) { + return new NameResolver() { + @Override public String getServiceAuthority() { + return "irrelevant-authority"; + } + + @Override public void start(final Listener listener) { + listener.onError(error); + } + + @Override public void shutdown() {} + }; + } + + @Override + public String getDefaultScheme() { + return "fake"; + } + } +}