diff --git a/core/src/main/java/io/grpc/ConnectivityStateInfo.java b/core/src/main/java/io/grpc/ConnectivityStateInfo.java new file mode 100644 index 0000000000..8698cd10a9 --- /dev/null +++ b/core/src/main/java/io/grpc/ConnectivityStateInfo.java @@ -0,0 +1,111 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import com.google.common.base.Preconditions; + +import io.grpc.Status; + +/** + * A tuple of a {@link ConnectivityState} and its associated {@link Status}. + * + *

If the state is {@code TRANSIENT_FAILURE}, the status is never {@code OK}. For other states, + * the status is always {@code OK}. + */ +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771") +public final class ConnectivityStateInfo { + private final ConnectivityState state; + private final Status status; + + /** + * Returns an instance for a state that is not {@code TRANSIENT_FAILURE}. + * + * @throws IllegalArgumentException if {@code state} is {@code TRANSIENT_FAILURE}. + */ + public static ConnectivityStateInfo forNonError(ConnectivityState state) { + Preconditions.checkArgument(state != ConnectivityState.TRANSIENT_FAILURE, + "state is TRANSIENT_ERROR. Use forError() instead"); + return new ConnectivityStateInfo(state, Status.OK); + } + + /** + * Returns an instance for {@code TRANSIENT_FAILURE}, associated with an error status. + */ + public static ConnectivityStateInfo forTransientFailure(Status error) { + Preconditions.checkArgument(!error.isOk(), "The error status must not be OK"); + return new ConnectivityStateInfo(ConnectivityState.TRANSIENT_FAILURE, error); + } + + /** + * Returns the state. + */ + public ConnectivityState getState() { + return state; + } + + /** + * Returns the status associated with the state. + * + *

If the state is {@code TRANSIENT_FAILURE}, the status is never {@code OK}. For other + * states, the status is always {@code OK}. + */ + public Status getStatus() { + return status; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof ConnectivityStateInfo)) { + return false; + } + ConnectivityStateInfo o = (ConnectivityStateInfo) other; + return state.equals(o.state) && status.equals(o.status); + } + + @Override + public int hashCode() { + return state.hashCode() ^ status.hashCode(); + } + + @Override + public String toString() { + if (status.isOk()) { + return state.toString(); + } + return state + "(" + status + ")"; + } + + private ConnectivityStateInfo(ConnectivityState state, Status status) { + this.state = Preconditions.checkNotNull(state, "state is null"); + this.status = Preconditions.checkNotNull(status, "status is null"); + } +} diff --git a/core/src/main/java/io/grpc/internal/InUseStateAggregator2.java b/core/src/main/java/io/grpc/internal/InUseStateAggregator2.java new file mode 100644 index 0000000000..a303b60b05 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/InUseStateAggregator2.java @@ -0,0 +1,82 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +import java.util.HashSet; + +import javax.annotation.concurrent.NotThreadSafe; + +/** + * Aggregates the in-use state of a set of objects. + */ +@NotThreadSafe +abstract class InUseStateAggregator2 { + + private final HashSet inUseObjects = new HashSet(); + + /** + * Update the in-use state of an object. Initially no object is in use. + * + *

This may call into {@link #handleInUse} or {@link #handleNotInUse} when appropriate. + */ + final void updateObjectInUse(T object, boolean inUse) { + int origSize = inUseObjects.size(); + if (inUse) { + inUseObjects.add(object); + if (origSize == 0) { + handleInUse(); + } + } else { + boolean removed = inUseObjects.remove(object); + if (removed && origSize == 1) { + handleNotInUse(); + } + } + } + + final boolean isInUse() { + return !inUseObjects.isEmpty(); + } + + /** + * Called when the aggregated in-use state has changed to true, which means at least one object is + * in use. + */ + abstract void handleInUse(); + + /** + * Called when the aggregated in-use state has changed to false, which means no object is in use. + * + *

This method is called under the lock returned by {@link #getLock}. + */ + abstract void handleNotInUse(); +} diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java new file mode 100644 index 0000000000..c044bb9376 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -0,0 +1,492 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.IDLE; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.SHUTDOWN; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; + +import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.Status; + +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.annotation.CheckReturnValue; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +/** + * Transports for a single {@link SocketAddress}. + * + *

This is the next version of {@link TransportSet} in development. + */ +@ThreadSafe +final class InternalSubchannel implements WithLogId { + private static final Logger log = Logger.getLogger(InternalSubchannel.class.getName()); + + private final Object lock = new Object(); + private final EquivalentAddressGroup addressGroup; + private final String authority; + private final String userAgent; + private final BackoffPolicy.Provider backoffPolicyProvider; + private final Callback callback; + private final ClientTransportFactory transportFactory; + private final ScheduledExecutorService scheduledExecutor; + + // A serializing executor shared across the Channel + // + // TODO(zhangkun83): decide the type of Channel Executor. I considered a SerializingExecutor + // based on the app executor, but it seems abusive because the app executor is intended for app + // logic, not for channel bookkeeping. We don't want channel bookkeeping logic to contend for + // threads with app logic, which may increase latency or even cause starvation. Instead, we + // should consider a thread-less Executor after the refactor of ManagedChannelImpl is done. + // + // NOTE: there are cases where channelExecutor.execute() is run under "lock". This will add risk + // of deadlock if channelExecutor is based on a direct executor. Thread-less executor wouldn't + // have such problem. + private final Executor channelExecutor; + + @GuardedBy("lock") + private int nextAddressIndex; + + /** + * The policy to control back off between reconnects. Non-{@code null} when last connect failed. + */ + @GuardedBy("lock") + private BackoffPolicy reconnectPolicy; + + /** + * Timer monitoring duration since entering CONNECTING state. + */ + @GuardedBy("lock") + private final Stopwatch connectingTimer; + + @GuardedBy("lock") + @Nullable + private ScheduledFuture reconnectTask; + + /** + * All transports that are not terminated. At the very least the value of {@link #activeTransport} + * will be present, but previously used transports that still have streams or are stopping may + * also be present. + */ + @GuardedBy("lock") + private final Collection transports = + new ArrayList(); + + // Must only be used from channelExecutor + private final InUseStateAggregator2 inUseStateAggregator = + new InUseStateAggregator2() { + @Override + void handleInUse() { + callback.onInUse(InternalSubchannel.this); + } + + @Override + void handleNotInUse() { + callback.onNotInUse(InternalSubchannel.this); + } + }; + + /** + * The to-be active transport, which is not ready yet. + */ + @GuardedBy("lock") + @Nullable + private ConnectionClientTransport pendingTransport; + + /** + * The transport for new outgoing requests. 'lock' must be held when assigning to it. Non-null + * only in READY state. + */ + @Nullable + private volatile ManagedClientTransport activeTransport; + + @GuardedBy("lock") + private ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE); + + InternalSubchannel(EquivalentAddressGroup addressGroup, String authority, String userAgent, + BackoffPolicy.Provider backoffPolicyProvider, + ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, + Supplier stopwatchSupplier, Executor channelExecutor, Callback callback) { + this.addressGroup = Preconditions.checkNotNull(addressGroup, "addressGroup"); + this.authority = authority; + this.userAgent = userAgent; + this.backoffPolicyProvider = backoffPolicyProvider; + this.transportFactory = transportFactory; + this.scheduledExecutor = scheduledExecutor; + this.connectingTimer = stopwatchSupplier.get(); + this.channelExecutor = channelExecutor; + this.callback = callback; + } + + /** + * Returns a READY transport that will be used to create new streams. + * + *

Returns {@code null} if the state is not READY. + */ + @Nullable + final ClientTransport obtainActiveTransport() { + ClientTransport savedTransport = activeTransport; + if (savedTransport != null) { + return savedTransport; + } + Runnable runnable = null; + synchronized (lock) { + savedTransport = activeTransport; + // Check again, since it could have changed before acquiring the lock + if (savedTransport != null) { + return savedTransport; + } + if (state.getState() == IDLE) { + gotoNonErrorState(CONNECTING); + runnable = startNewTransport(); + } + } + if (runnable != null) { + runnable.run(); + } + return null; + } + + @CheckReturnValue + @GuardedBy("lock") + private Runnable startNewTransport() { + Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled"); + + if (nextAddressIndex == 0) { + connectingTimer.reset().start(); + } + List addrs = addressGroup.getAddresses(); + final SocketAddress address = addrs.get(nextAddressIndex++); + if (nextAddressIndex >= addrs.size()) { + nextAddressIndex = 0; + } + + ConnectionClientTransport transport = + transportFactory.newClientTransport(address, authority, userAgent); + if (log.isLoggable(Level.FINE)) { + log.log(Level.FINE, "[{0}] Created {1} for {2}", + new Object[] {getLogId(), transport.getLogId(), address}); + } + pendingTransport = transport; + transports.add(transport); + return transport.start(new TransportListener(transport, address)); + } + + /** + * Only called after all addresses attempted and failed (TRANSIENT_FAILURE). + * @param status the causal status when the channel begins transition to + * TRANSIENT_FAILURE. + */ + @GuardedBy("lock") + private void scheduleBackoff(final Status status) { + class EndOfCurrentBackoff implements Runnable { + @Override + public void run() { + try { + Runnable runnable = null; + synchronized (lock) { + reconnectTask = null; + if (state.getState() == SHUTDOWN) { + // Even though shutdown() will cancel this task, the task may have already started + // when it's being cancelled. + return; + } + gotoNonErrorState(CONNECTING); + runnable = startNewTransport(); + } + if (runnable != null) { + runnable.run(); + } + } catch (Throwable t) { + log.log(Level.WARNING, "Exception handling end of backoff", t); + } + } + } + + gotoState(ConnectivityStateInfo.forTransientFailure(status)); + if (reconnectPolicy == null) { + reconnectPolicy = backoffPolicyProvider.get(); + } + long delayMillis = + reconnectPolicy.nextBackoffMillis() - connectingTimer.elapsed(TimeUnit.MILLISECONDS); + if (log.isLoggable(Level.FINE)) { + log.log(Level.FINE, "[{0}] Scheduling backoff for {1} ms", + new Object[]{getLogId(), delayMillis}); + } + Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done"); + reconnectTask = scheduledExecutor.schedule( + new LogExceptionRunnable(new EndOfCurrentBackoff()), + delayMillis, + TimeUnit.MILLISECONDS); + } + + @GuardedBy("lock") + private void gotoNonErrorState(ConnectivityState newState) { + gotoState(ConnectivityStateInfo.forNonError(newState)); + } + + @GuardedBy("lock") + private void gotoState(final ConnectivityStateInfo newState) { + if (state.getState() != newState.getState()) { + Preconditions.checkState(state.getState() != SHUTDOWN, + "Cannot transition out of SHUTDOWN to " + newState); + state = newState; + channelExecutor.execute(new Runnable() { + @Override + public void run() { + callback.onStateChange(InternalSubchannel.this, newState); + } + }); + } + } + + public void shutdown() { + ManagedClientTransport savedActiveTransport; + ConnectionClientTransport savedPendingTransport; + synchronized (lock) { + if (state.getState() == SHUTDOWN) { + return; + } + gotoNonErrorState(SHUTDOWN); + savedActiveTransport = activeTransport; + savedPendingTransport = pendingTransport; + activeTransport = null; + pendingTransport = null; + if (transports.isEmpty()) { + handleTermination(); + if (log.isLoggable(Level.FINE)) { + log.log(Level.FINE, "[{0}] Terminated in shutdown()", getLogId()); + } + } // else: the callback will be run once all transports have been terminated + cancelReconnectTask(); + } + if (savedActiveTransport != null) { + savedActiveTransport.shutdown(); + } + if (savedPendingTransport != null) { + savedPendingTransport.shutdown(); + } + } + + // May be called under lock. + private void handleTermination() { + channelExecutor.execute(new Runnable() { + @Override + public void run() { + callback.onTerminated(InternalSubchannel.this); + } + }); + } + + private void handleTransportInUseState( + final ManagedClientTransport transport, final boolean inUse) { + channelExecutor.execute(new Runnable() { + @Override + public void run() { + inUseStateAggregator.updateObjectInUse(transport, inUse); + } + }); + } + + void shutdownNow(Status reason) { + shutdown(); + Collection transportsCopy; + synchronized (lock) { + transportsCopy = new ArrayList(transports); + } + for (ManagedClientTransport transport : transportsCopy) { + transport.shutdownNow(reason); + } + } + + @GuardedBy("lock") + private void cancelReconnectTask() { + if (reconnectTask != null) { + reconnectTask.cancel(false); + reconnectTask = null; + } + } + + @Override + public String getLogId() { + return GrpcUtil.getLogId(this); + } + + @VisibleForTesting + ConnectivityState getState() { + synchronized (lock) { + return state.getState(); + } + } + + /** Listener for real transports. */ + private class TransportListener implements ManagedClientTransport.Listener { + final ManagedClientTransport transport; + final SocketAddress address; + + TransportListener(ManagedClientTransport transport, SocketAddress address) { + this.transport = transport; + this.address = address; + } + + @Override + public void transportReady() { + if (log.isLoggable(Level.FINE)) { + log.log(Level.FINE, "[{0}] {1} for {2} is ready", + new Object[] {getLogId(), transport.getLogId(), address}); + } + ConnectivityState savedState; + synchronized (lock) { + savedState = state.getState(); + reconnectPolicy = null; + nextAddressIndex = 0; + if (savedState == SHUTDOWN) { + // activeTransport should have already been set to null by shutdown(). We keep it null. + Preconditions.checkState(activeTransport == null, + "Unexpected non-null activeTransport"); + } else if (pendingTransport == transport) { + gotoNonErrorState(READY); + activeTransport = transport; + pendingTransport = null; + } + } + if (savedState == SHUTDOWN) { + transport.shutdown(); + } + } + + @Override + public void transportInUse(boolean inUse) { + handleTransportInUseState(transport, inUse); + } + + @Override + public void transportShutdown(Status s) { + if (log.isLoggable(Level.FINE)) { + log.log(Level.FINE, "[{0}] {1} for {2} is being shutdown with status {3}", + new Object[] {getLogId(), transport.getLogId(), address, s}); + } + Runnable runnable = null; + synchronized (lock) { + if (state.getState() == SHUTDOWN) { + return; + } + if (activeTransport == transport) { + gotoNonErrorState(IDLE); + activeTransport = null; + } else if (pendingTransport == transport) { + Preconditions.checkState(state.getState() == CONNECTING, + "Expected state is CONNECTING, actual state is %s", state.getState()); + // Continue reconnect if there are still addresses to try. + if (nextAddressIndex == 0) { + // Initiate backoff + // Transition to TRANSIENT_FAILURE + scheduleBackoff(s); + } else { + runnable = startNewTransport(); + } + } + } + if (runnable != null) { + runnable.run(); + } + } + + @Override + public void transportTerminated() { + if (log.isLoggable(Level.FINE)) { + log.log(Level.FINE, "[{0}] {1} for {2} is terminated", + new Object[] {getLogId(), transport.getLogId(), address}); + } + boolean terminated = false; + handleTransportInUseState(transport, false); + synchronized (lock) { + transports.remove(transport); + if (state.getState() == SHUTDOWN && transports.isEmpty()) { + if (log.isLoggable(Level.FINE)) { + log.log(Level.FINE, "[{0}] Terminated in transportTerminated()", getLogId()); + } + handleTermination(); + } + } + Preconditions.checkState(activeTransport != transport, + "activeTransport still points to this transport. " + + "Seems transportShutdown() was not called."); + } + } + + // All methods are called in channelExecutor, which is a serializing executor. + abstract static class Callback { + /** + * Called when the subchannel is terminated, which means it's shut down and all transports + * have been terminated. + */ + public void onTerminated(InternalSubchannel is) { } + + /** + * Called when the subchannel's connectivity state has changed. + */ + public void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { } + + /** + * Called when the subchannel's in-use state has changed to true, which means at least one + * transport is in use. + */ + public void onInUse(InternalSubchannel is) { } + + /** + * Called when the subchannel's in-use state has changed to false, which means no transport is + * in use. + */ + public void onNotInUse(InternalSubchannel is) { } + } +} diff --git a/core/src/test/java/io/grpc/ConnectivityStateInfoTest.java b/core/src/test/java/io/grpc/ConnectivityStateInfoTest.java new file mode 100644 index 0000000000..868cf4c6f7 --- /dev/null +++ b/core/src/test/java/io/grpc/ConnectivityStateInfoTest.java @@ -0,0 +1,93 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotSame; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link ConnectivityStateInfo}. */ +@RunWith(JUnit4.class) +public class ConnectivityStateInfoTest { + @Test + public void forNonError() { + ConnectivityStateInfo info = ConnectivityStateInfo.forNonError(ConnectivityState.IDLE); + assertEquals(ConnectivityState.IDLE, info.getState()); + assertEquals(Status.OK, info.getStatus()); + } + + @Test(expected = IllegalArgumentException.class) + public void forNonErrorInvalid() { + ConnectivityStateInfo.forNonError(ConnectivityState.TRANSIENT_FAILURE); + } + + @Test + public void forTransientFailure() { + ConnectivityStateInfo info = ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE); + assertEquals(ConnectivityState.TRANSIENT_FAILURE, info.getState()); + assertEquals(Status.UNAVAILABLE, info.getStatus()); + } + + @Test(expected = IllegalArgumentException.class) + public void forTransientFailureInvalid() { + ConnectivityStateInfo.forTransientFailure(Status.OK); + } + + @Test + public void equality() { + ConnectivityStateInfo info1 = ConnectivityStateInfo.forNonError(ConnectivityState.IDLE); + ConnectivityStateInfo info2 = ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING); + ConnectivityStateInfo info3 = ConnectivityStateInfo.forNonError(ConnectivityState.IDLE); + ConnectivityStateInfo info4 = ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE); + ConnectivityStateInfo info5 = ConnectivityStateInfo.forTransientFailure(Status.INTERNAL); + ConnectivityStateInfo info6 = ConnectivityStateInfo.forTransientFailure(Status.INTERNAL); + + assertEquals(info1, info3); + assertNotSame(info1, info3); + assertEquals(info1.hashCode(), info3.hashCode()); + assertEquals(info5, info6); + assertEquals(info5.hashCode(), info6.hashCode()); + assertNotSame(info5, info6); + + assertNotEquals(info1, info2); + assertNotEquals(info1, info4); + assertNotEquals(info4, info6); + + assertFalse(info1.equals(null)); + assertFalse(info1.equals(this)); + } +} diff --git a/core/src/test/java/io/grpc/internal/FakeClock.java b/core/src/test/java/io/grpc/internal/FakeClock.java index 97e1921626..f2ad2f95fe 100644 --- a/core/src/test/java/io/grpc/internal/FakeClock.java +++ b/core/src/test/java/io/grpc/internal/FakeClock.java @@ -76,9 +76,9 @@ public final class FakeClock { private long currentTimeNanos; - private class ScheduledTask extends AbstractFuture implements ScheduledFuture { - final Runnable command; - final long dueTimeNanos; + public class ScheduledTask extends AbstractFuture implements ScheduledFuture { + public final Runnable command; + public final long dueTimeNanos; ScheduledTask(long dueTimeNanos, Runnable command) { this.dueTimeNanos = dueTimeNanos; diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java new file mode 100644 index 0000000000..9b5eccdf4f --- /dev/null +++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java @@ -0,0 +1,688 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.Status; +import io.grpc.internal.TestUtils.MockClientTransportInfo; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Unit tests for {@link InternalSubchannel}. + * + *

It only tests the logic that is not covered by {@link ManagedChannelImplTransportManagerTest}. + */ +@RunWith(JUnit4.class) +public class InternalSubchannelTest { + + private static final String AUTHORITY = "fakeauthority"; + private static final String USER_AGENT = "mosaic"; + private static final ConnectivityStateInfo UNAVAILABLE_STATE = + ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE); + private static final ConnectivityStateInfo RESOURCE_EXHAUSTED_STATE = + ConnectivityStateInfo.forTransientFailure(Status.RESOURCE_EXHAUSTED); + + // For scheduled executor + private final FakeClock fakeClock = new FakeClock(); + // For channelExecutor + private final FakeClock fakeExecutor = new FakeClock(); + private final SerializingExecutor channelExecutor = + new SerializingExecutor(fakeExecutor.getScheduledExecutorService()); + + @Mock private BackoffPolicy mockBackoffPolicy1; + @Mock private BackoffPolicy mockBackoffPolicy2; + @Mock private BackoffPolicy mockBackoffPolicy3; + @Mock private BackoffPolicy.Provider mockBackoffPolicyProvider; + @Mock private ClientTransportFactory mockTransportFactory; + + private final LinkedList callbackInvokes = new LinkedList(); + private final InternalSubchannel.Callback mockInternalSubchannelCallback = + new InternalSubchannel.Callback() { + @Override + public void onTerminated(InternalSubchannel is) { + assertSame(internalSubchannel, is); + callbackInvokes.add("onTerminated"); + } + + @Override + public void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { + assertSame(internalSubchannel, is); + callbackInvokes.add("onStateChange:" + newState); + } + + @Override + public void onInUse(InternalSubchannel is) { + assertSame(internalSubchannel, is); + callbackInvokes.add("onInUse"); + } + + @Override + public void onNotInUse(InternalSubchannel is) { + assertSame(internalSubchannel, is); + callbackInvokes.add("onNotInUse"); + } + }; + + private InternalSubchannel internalSubchannel; + private EquivalentAddressGroup addressGroup; + private BlockingQueue transports; + + @Before public void setUp() { + MockitoAnnotations.initMocks(this); + + when(mockBackoffPolicyProvider.get()) + .thenReturn(mockBackoffPolicy1, mockBackoffPolicy2, mockBackoffPolicy3); + when(mockBackoffPolicy1.nextBackoffMillis()).thenReturn(10L, 100L); + when(mockBackoffPolicy2.nextBackoffMillis()).thenReturn(10L, 100L); + when(mockBackoffPolicy3.nextBackoffMillis()).thenReturn(10L, 100L); + transports = TestUtils.captureTransports(mockTransportFactory); + } + + @After public void noMorePendingTasks() { + assertEquals(0, fakeClock.numPendingTasks()); + assertEquals(0, fakeExecutor.numPendingTasks()); + } + + @Test public void singleAddressReconnect() { + SocketAddress addr = mock(SocketAddress.class); + createInternalSubchannel(addr); + assertEquals(ConnectivityState.IDLE, internalSubchannel.getState()); + + // Invocation counters + int transportsCreated = 0; + int backoff1Consulted = 0; + int backoff2Consulted = 0; + int backoffReset = 0; + + // First attempt + assertEquals(ConnectivityState.IDLE, internalSubchannel.getState()); + assertNoCallbackInvoke(); + assertNull(internalSubchannel.obtainActiveTransport()); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState()); + verify(mockTransportFactory, times(++transportsCreated)) + .newClientTransport(addr, AUTHORITY, USER_AGENT); + + // Fail this one. Because there is only one address to try, enter TRANSIENT_FAILURE. + assertNoCallbackInvoke(); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState()); + assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); + // Backoff reset and using first back-off value interval + verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis(); + verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); + + // Second attempt + // Transport creation doesn't happen until time is due + fakeClock.forwardMillis(9); + assertNull(internalSubchannel.obtainActiveTransport()); + verify(mockTransportFactory, times(transportsCreated)) + .newClientTransport(addr, AUTHORITY, USER_AGENT); + assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState()); + + assertNoCallbackInvoke(); + fakeClock.forwardMillis(1); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState()); + verify(mockTransportFactory, times(++transportsCreated)) + .newClientTransport(addr, AUTHORITY, USER_AGENT); + // Fail this one too + assertNoCallbackInvoke(); + // Here we use a different status from the first failure, and verify that it's passed to + // the callback. + transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED); + assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState()); + assertExactCallbackInvokes("onStateChange:" + RESOURCE_EXHAUSTED_STATE); + // Second back-off interval + verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis(); + verify(mockBackoffPolicyProvider, times(backoffReset)).get(); + + // Third attempt + // Transport creation doesn't happen until time is due + fakeClock.forwardMillis(99); + assertNull(internalSubchannel.obtainActiveTransport()); + verify(mockTransportFactory, times(transportsCreated)) + .newClientTransport(addr, AUTHORITY, USER_AGENT); + assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState()); + assertNoCallbackInvoke(); + fakeClock.forwardMillis(1); + assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState()); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + assertNull(internalSubchannel.obtainActiveTransport()); + verify(mockTransportFactory, times(++transportsCreated)) + .newClientTransport(addr, AUTHORITY, USER_AGENT); + // Let this one succeed, will enter READY state. + assertNoCallbackInvoke(); + transports.peek().listener.transportReady(); + assertExactCallbackInvokes("onStateChange:READY"); + assertEquals(ConnectivityState.READY, internalSubchannel.getState()); + assertSame(transports.peek().transport, internalSubchannel.obtainActiveTransport()); + + // Close the READY transport, will enter IDLE state. + assertNoCallbackInvoke(); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + assertEquals(ConnectivityState.IDLE, internalSubchannel.getState()); + assertExactCallbackInvokes("onStateChange:IDLE"); + + // Back-off is reset, and the next attempt will happen immediately + assertNull(internalSubchannel.obtainActiveTransport()); + assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState()); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + verify(mockBackoffPolicyProvider, times(backoffReset)).get(); + verify(mockTransportFactory, times(++transportsCreated)) + .newClientTransport(addr, AUTHORITY, USER_AGENT); + + // Final checks for consultations on back-off policies + verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffMillis(); + verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffMillis(); + } + + @Test public void twoAddressesReconnect() { + SocketAddress addr1 = mock(SocketAddress.class); + SocketAddress addr2 = mock(SocketAddress.class); + createInternalSubchannel(addr1, addr2); + assertEquals(ConnectivityState.IDLE, internalSubchannel.getState()); + // Invocation counters + int transportsAddr1 = 0; + int transportsAddr2 = 0; + int backoff1Consulted = 0; + int backoff2Consulted = 0; + int backoff3Consulted = 0; + int backoffReset = 0; + + // First attempt + assertNoCallbackInvoke(); + assertNull(internalSubchannel.obtainActiveTransport()); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState()); + verify(mockTransportFactory, times(++transportsAddr1)) + .newClientTransport(addr1, AUTHORITY, USER_AGENT); + + // Let this one fail without success + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + // Still in CONNECTING + assertNull(internalSubchannel.obtainActiveTransport()); + assertNoCallbackInvoke(); + assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState()); + + // Second attempt will start immediately. Still no back-off policy. + verify(mockBackoffPolicyProvider, times(backoffReset)).get(); + verify(mockTransportFactory, times(++transportsAddr2)) + .newClientTransport(addr2, AUTHORITY, USER_AGENT); + assertNull(internalSubchannel.obtainActiveTransport()); + // Fail this one too + assertNoCallbackInvoke(); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + // All addresses have failed. Delayed transport will be in back-off interval. + assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState()); + assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); + // Backoff reset and first back-off interval begins + verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis(); + verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); + + // No reconnect during TRANSIENT_FAILURE even when requested. + assertNull(internalSubchannel.obtainActiveTransport()); + assertNoCallbackInvoke(); + assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState()); + + // Third attempt is the first address, thus controlled by the first back-off interval. + fakeClock.forwardMillis(9); + verify(mockTransportFactory, times(transportsAddr1)) + .newClientTransport(addr1, AUTHORITY, USER_AGENT); + assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState()); + assertNoCallbackInvoke(); + fakeClock.forwardMillis(1); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState()); + verify(mockTransportFactory, times(++transportsAddr1)) + .newClientTransport(addr1, AUTHORITY, USER_AGENT); + // Fail this one too + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState()); + + // Forth attempt will start immediately. Keep back-off policy. + assertNull(internalSubchannel.obtainActiveTransport()); + assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState()); + verify(mockBackoffPolicyProvider, times(backoffReset)).get(); + verify(mockTransportFactory, times(++transportsAddr2)) + .newClientTransport(addr2, AUTHORITY, USER_AGENT); + // Fail this one too + assertNoCallbackInvoke(); + transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED); + // All addresses have failed again. Delayed transport will be in back-off interval. + assertExactCallbackInvokes("onStateChange:" + RESOURCE_EXHAUSTED_STATE); + assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState()); + // Second back-off interval begins + verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis(); + verify(mockBackoffPolicyProvider, times(backoffReset)).get(); + + // Fifth attempt for the first address, thus controlled by the second back-off interval. + assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState()); + fakeClock.forwardMillis(99); + verify(mockTransportFactory, times(transportsAddr1)) + .newClientTransport(addr1, AUTHORITY, USER_AGENT); + assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState()); + assertNoCallbackInvoke(); + fakeClock.forwardMillis(1); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState()); + verify(mockTransportFactory, times(++transportsAddr1)) + .newClientTransport(addr1, AUTHORITY, USER_AGENT); + // Let it through + assertNoCallbackInvoke(); + transports.peek().listener.transportReady(); + assertExactCallbackInvokes("onStateChange:READY"); + assertEquals(ConnectivityState.READY, internalSubchannel.getState()); + + assertSame(transports.peek().transport, internalSubchannel.obtainActiveTransport()); + // Then close it. + assertNoCallbackInvoke(); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + assertExactCallbackInvokes("onStateChange:IDLE"); + assertEquals(ConnectivityState.IDLE, internalSubchannel.getState()); + + // First attempt after a successful connection. Old back-off policy should be ignored, but there + // is not yet a need for a new one. Start from the first address. + assertNull(internalSubchannel.obtainActiveTransport()); + assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState()); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + verify(mockBackoffPolicyProvider, times(backoffReset)).get(); + verify(mockTransportFactory, times(++transportsAddr1)) + .newClientTransport(addr1, AUTHORITY, USER_AGENT); + // Fail the transport + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState()); + + // Second attempt will start immediately. Still no new back-off policy. + verify(mockBackoffPolicyProvider, times(backoffReset)).get(); + verify(mockTransportFactory, times(++transportsAddr2)) + .newClientTransport(addr2, AUTHORITY, USER_AGENT); + // Fail this one too + assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState()); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + // All addresses have failed. Enter TRANSIENT_FAILURE. Back-off in effect. + assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); + assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState()); + // Back-off reset and first back-off interval begins + verify(mockBackoffPolicy2, times(++backoff2Consulted)).nextBackoffMillis(); + verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); + + // Third attempt is the first address, thus controlled by the first back-off interval. + fakeClock.forwardMillis(9); + verify(mockTransportFactory, times(transportsAddr1)) + .newClientTransport(addr1, AUTHORITY, USER_AGENT); + assertEquals(ConnectivityState.TRANSIENT_FAILURE, internalSubchannel.getState()); + assertNoCallbackInvoke(); + fakeClock.forwardMillis(1); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + assertEquals(ConnectivityState.CONNECTING, internalSubchannel.getState()); + verify(mockTransportFactory, times(++transportsAddr1)) + .newClientTransport(addr1, AUTHORITY, USER_AGENT); + + // Final checks on invocations on back-off policies + verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffMillis(); + verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffMillis(); + verify(mockBackoffPolicy3, times(backoff3Consulted)).nextBackoffMillis(); + } + + @Test + public void connectIsLazy() { + SocketAddress addr = mock(SocketAddress.class); + createInternalSubchannel(addr); + + // Invocation counters + int transportsCreated = 0; + + // Won't connect until requested + verify(mockTransportFactory, times(transportsCreated)) + .newClientTransport(addr, AUTHORITY, USER_AGENT); + + // First attempt + internalSubchannel.obtainActiveTransport(); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + verify(mockTransportFactory, times(++transportsCreated)) + .newClientTransport(addr, AUTHORITY, USER_AGENT); + + // Fail this one + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); + + // Will always reconnect after back-off + fakeClock.forwardMillis(10); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + verify(mockTransportFactory, times(++transportsCreated)) + .newClientTransport(addr, AUTHORITY, USER_AGENT); + + // Make this one proceed + transports.peek().listener.transportReady(); + assertExactCallbackInvokes("onStateChange:READY"); + // Then go-away + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + assertExactCallbackInvokes("onStateChange:IDLE"); + + // No scheduled tasks that would ever try to reconnect ... + assertEquals(0, fakeClock.numPendingTasks()); + assertEquals(0, fakeExecutor.numPendingTasks()); + + // ... until it's requested. + internalSubchannel.obtainActiveTransport(); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + verify(mockTransportFactory, times(++transportsCreated)) + .newClientTransport(addr, AUTHORITY, USER_AGENT); + } + + @Test + public void shutdownWhenReady() throws Exception { + SocketAddress addr = mock(SocketAddress.class); + createInternalSubchannel(addr); + + internalSubchannel.obtainActiveTransport(); + MockClientTransportInfo transportInfo = transports.poll(); + transportInfo.listener.transportReady(); + assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY"); + + internalSubchannel.shutdown(); + verify(transportInfo.transport).shutdown(); + assertExactCallbackInvokes("onStateChange:SHUTDOWN"); + + transportInfo.listener.transportTerminated(); + assertExactCallbackInvokes("onTerminated"); + verify(transportInfo.transport, never()).shutdownNow(any(Status.class)); + } + + @Test + public void shutdownBeforeTransportCreated() throws Exception { + SocketAddress addr = mock(SocketAddress.class); + createInternalSubchannel(addr); + + // First transport is created immediately + internalSubchannel.obtainActiveTransport(); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + verify(mockTransportFactory).newClientTransport(addr, AUTHORITY, USER_AGENT); + + // Fail this one + MockClientTransportInfo transportInfo = transports.poll(); + transportInfo.listener.transportShutdown(Status.UNAVAILABLE); + transportInfo.listener.transportTerminated(); + + // Entering TRANSIENT_FAILURE, waiting for back-off + assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); + + // Save the reconnectTask before shutting down + FakeClock.ScheduledTask reconnectTask = null; + for (FakeClock.ScheduledTask task : fakeClock.getPendingTasks()) { + if (task.command.toString().contains("EndOfCurrentBackoff")) { + assertNull("There shouldn't be more than one reconnectTask", reconnectTask); + assertFalse(task.isDone()); + reconnectTask = task; + } + } + assertNotNull("There should be at least one reconnectTask", reconnectTask); + + // Shut down InternalSubchannel before the transport is created. + internalSubchannel.shutdown(); + assertTrue(reconnectTask.isCancelled()); + // InternalSubchannel terminated promptly. + assertExactCallbackInvokes("onStateChange:SHUTDOWN", "onTerminated"); + + // Simulate a race between reconnectTask cancellation and execution -- the task runs anyway. + // This should not lead to the creation of a new transport. + reconnectTask.command.run(); + + // Futher call to obtainActiveTransport() is no-op. + assertNull(internalSubchannel.obtainActiveTransport()); + assertEquals(ConnectivityState.SHUTDOWN, internalSubchannel.getState()); + assertNoCallbackInvoke(); + + // No more transports will be created. + fakeClock.forwardMillis(10000); + assertEquals(ConnectivityState.SHUTDOWN, internalSubchannel.getState()); + verifyNoMoreInteractions(mockTransportFactory); + assertEquals(0, transports.size()); + assertNoCallbackInvoke(); + } + + @Test + public void shutdownBeforeTransportReady() throws Exception { + SocketAddress addr = mock(SocketAddress.class); + createInternalSubchannel(addr); + + internalSubchannel.obtainActiveTransport(); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + MockClientTransportInfo transportInfo = transports.poll(); + + // Shutdown the InternalSubchannel before the pending transport is ready + assertNull(internalSubchannel.obtainActiveTransport()); + internalSubchannel.shutdown(); + assertExactCallbackInvokes("onStateChange:SHUTDOWN"); + + // The transport should've been shut down even though it's not the active transport yet. + verify(transportInfo.transport).shutdown(); + transportInfo.listener.transportShutdown(Status.UNAVAILABLE); + assertNoCallbackInvoke(); + transportInfo.listener.transportTerminated(); + assertExactCallbackInvokes("onTerminated"); + assertEquals(ConnectivityState.SHUTDOWN, internalSubchannel.getState()); + } + + @Test + public void shutdownNow() throws Exception { + SocketAddress addr = mock(SocketAddress.class); + createInternalSubchannel(addr); + + internalSubchannel.obtainActiveTransport(); + MockClientTransportInfo t1 = transports.poll(); + t1.listener.transportReady(); + assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY"); + t1.listener.transportShutdown(Status.UNAVAILABLE); + assertExactCallbackInvokes("onStateChange:IDLE"); + + internalSubchannel.obtainActiveTransport(); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + MockClientTransportInfo t2 = transports.poll(); + + Status status = Status.UNAVAILABLE.withDescription("Requested"); + internalSubchannel.shutdownNow(status); + + verify(t1.transport).shutdownNow(same(status)); + verify(t2.transport).shutdownNow(same(status)); + assertExactCallbackInvokes("onStateChange:SHUTDOWN"); + } + + @Test + public void obtainTransportAfterShutdown() throws Exception { + SocketAddress addr = mock(SocketAddress.class); + createInternalSubchannel(addr); + + internalSubchannel.shutdown(); + assertExactCallbackInvokes("onStateChange:SHUTDOWN", "onTerminated"); + assertEquals(ConnectivityState.SHUTDOWN, internalSubchannel.getState()); + assertNull(internalSubchannel.obtainActiveTransport()); + verify(mockTransportFactory, times(0)).newClientTransport(addr, AUTHORITY, USER_AGENT); + assertNoCallbackInvoke(); + assertEquals(ConnectivityState.SHUTDOWN, internalSubchannel.getState()); + } + + @Test + public void logId() { + createInternalSubchannel(mock(SocketAddress.class)); + assertEquals("InternalSubchannel@" + Integer.toHexString(internalSubchannel.hashCode()), + internalSubchannel.getLogId()); + } + + @Test + public void inUseState() { + SocketAddress addr = mock(SocketAddress.class); + createInternalSubchannel(addr); + + internalSubchannel.obtainActiveTransport(); + MockClientTransportInfo t0 = transports.poll(); + t0.listener.transportReady(); + assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY"); + t0.listener.transportInUse(true); + assertExactCallbackInvokes("onInUse"); + + t0.listener.transportInUse(false); + assertExactCallbackInvokes("onNotInUse"); + + t0.listener.transportInUse(true); + assertExactCallbackInvokes("onInUse"); + t0.listener.transportShutdown(Status.UNAVAILABLE); + assertExactCallbackInvokes("onStateChange:IDLE"); + + assertNull(internalSubchannel.obtainActiveTransport()); + MockClientTransportInfo t1 = transports.poll(); + t1.listener.transportReady(); + assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY"); + t1.listener.transportInUse(true); + // InternalSubchannel is already in-use, thus doesn't call the callback + assertNoCallbackInvoke(); + + t1.listener.transportInUse(false); + // t0 is still in-use + assertNoCallbackInvoke(); + + t0.listener.transportInUse(false); + assertExactCallbackInvokes("onNotInUse"); + } + + @Test + public void transportTerminateWithoutExitingInUse() { + // An imperfect transport that terminates without going out of in-use. InternalSubchannel will + // clear the in-use bit for it. + SocketAddress addr = mock(SocketAddress.class); + createInternalSubchannel(addr); + + internalSubchannel.obtainActiveTransport(); + MockClientTransportInfo t0 = transports.poll(); + t0.listener.transportReady(); + assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY"); + t0.listener.transportInUse(true); + assertExactCallbackInvokes("onInUse"); + + t0.listener.transportShutdown(Status.UNAVAILABLE); + assertExactCallbackInvokes("onStateChange:IDLE"); + t0.listener.transportTerminated(); + assertExactCallbackInvokes("onNotInUse"); + } + + @Test + public void transportStartReturnsRunnable() { + SocketAddress addr1 = mock(SocketAddress.class); + SocketAddress addr2 = mock(SocketAddress.class); + createInternalSubchannel(addr1, addr2); + final AtomicInteger runnableInvokes = new AtomicInteger(0); + Runnable startRunnable = new Runnable() { + @Override + public void run() { + runnableInvokes.incrementAndGet(); + } + }; + transports = TestUtils.captureTransports(mockTransportFactory, startRunnable); + + assertEquals(0, runnableInvokes.get()); + internalSubchannel.obtainActiveTransport(); + assertEquals(1, runnableInvokes.get()); + internalSubchannel.obtainActiveTransport(); + assertEquals(1, runnableInvokes.get()); + + MockClientTransportInfo t0 = transports.poll(); + t0.listener.transportShutdown(Status.UNAVAILABLE); + assertEquals(2, runnableInvokes.get()); + + // 2nd address: reconnect immediatly + MockClientTransportInfo t1 = transports.poll(); + t1.listener.transportShutdown(Status.UNAVAILABLE); + + // Addresses exhausted, waiting for back-off. + assertEquals(2, runnableInvokes.get()); + // Run out the back-off period + fakeClock.forwardMillis(10); + assertEquals(3, runnableInvokes.get()); + + // This test doesn't care about scheduled InternalSubchannel callbacks. Clear it up so that + // noMorePendingTasks() won't fail. + fakeExecutor.runDueTasks(); + assertEquals(3, runnableInvokes.get()); + } + + private void createInternalSubchannel(SocketAddress ... addrs) { + addressGroup = new EquivalentAddressGroup(Arrays.asList(addrs)); + internalSubchannel = new InternalSubchannel(addressGroup, AUTHORITY, USER_AGENT, + mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(), + fakeClock.getStopwatchSupplier(), channelExecutor, mockInternalSubchannelCallback); + } + + private void assertNoCallbackInvoke() { + while (fakeExecutor.runDueTasks() > 0) {} + assertEquals(0, callbackInvokes.size()); + } + + private void assertExactCallbackInvokes(String ... expectedInvokes) { + // Make sure all callbacks are to run from channelExecutor only. + assertEquals(0, callbackInvokes.size()); + + while (fakeExecutor.runDueTasks() > 0) {} + assertEquals(Arrays.asList(expectedInvokes), callbackInvokes); + callbackInvokes.clear(); + } +} diff --git a/core/src/test/java/io/grpc/internal/TestUtils.java b/core/src/test/java/io/grpc/internal/TestUtils.java index b8c643e594..7e0f57aed8 100644 --- a/core/src/test/java/io/grpc/internal/TestUtils.java +++ b/core/src/test/java/io/grpc/internal/TestUtils.java @@ -46,6 +46,7 @@ import org.mockito.stubbing.Answer; import java.net.SocketAddress; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import javax.annotation.Nullable; /** * Common utility methods for tests. @@ -78,6 +79,11 @@ final class TestUtils { */ static BlockingQueue captureTransports( ClientTransportFactory mockTransportFactory) { + return captureTransports(mockTransportFactory, null); + } + + static BlockingQueue captureTransports( + ClientTransportFactory mockTransportFactory, @Nullable final Runnable startRunnable) { final BlockingQueue captor = new LinkedBlockingQueue(); @@ -89,12 +95,12 @@ final class TestUtils { any(CallOptions.class), any(StatsTraceContext.class))) .thenReturn(mock(ClientStream.class)); // Save the listener - doAnswer(new Answer() { + doAnswer(new Answer() { @Override - public Void answer(InvocationOnMock invocation) throws Throwable { + public Runnable answer(InvocationOnMock invocation) throws Throwable { captor.add(new MockClientTransportInfo( mockTransport, (ManagedClientTransport.Listener) invocation.getArguments()[0])); - return null; + return startRunnable; } }).when(mockTransport).start(any(ManagedClientTransport.Listener.class)); return mockTransport;