Move the pick-first logic from SimpleLoadBalancer to TransportSet.

TransportSet (as well as TransportManager) accepts a group of equivalent
addresses (EquivalentAddressGroup) instead of a single address.
TransportSet will move down the address list when reconnecting, and
applies back-off only after the entire list has been tried.

Main benefits:
- It will stop channel from trying to reconnect addresses that have been
  failed to connect to and moved away from. (#1212)
- It will make future implementation of Happy Eyeballs possible, inside
  TransportSet.

Tested: covered by TransportSetTest and
ManagedChannelImplTransportManagerTest.
This commit is contained in:
Kun Zhang 2015-11-24 13:26:37 -08:00
parent 88491c064f
commit ad70a28d21
11 changed files with 821 additions and 118 deletions

View File

@ -0,0 +1,83 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* A group of {@link SocketAddress}es that are considered equivalent when channel makes connections.
*
* <p>Usually the addresses are addresses resolved from the same host name, and connecting to any of
* them is equally sufficient. They do have order. An address appears earlier on the list is likely
* to be tried earlier.
*/
@ExperimentalApi
public final class EquivalentAddressGroup {
private final List<SocketAddress> addrs;
public EquivalentAddressGroup(List<SocketAddress> addrs) {
this.addrs = Collections.unmodifiableList(new ArrayList<SocketAddress>(addrs));
}
public EquivalentAddressGroup(SocketAddress addr) {
this.addrs = Collections.singletonList(addr);
}
/**
* Returns an immutable list of the addresses.
*/
public List<SocketAddress> getAddresses() {
return addrs;
}
@Override
public String toString() {
return addrs.toString();
}
@Override
public int hashCode() {
return addrs.hashCode();
}
@Override
public boolean equals(Object other) {
if (!(other instanceof EquivalentAddressGroup)) {
return false;
}
return addrs.equals(((EquivalentAddressGroup) other).addrs);
}
}

View File

@ -35,7 +35,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.internal.ClientTransport;
import java.net.SocketAddress;
import java.util.List;
import javax.annotation.Nullable;
@ -83,12 +82,13 @@ public abstract class LoadBalancer {
/**
* Called when a transport is fully connected and ready to accept traffic.
*/
public void transportReady(SocketAddress addr, ClientTransport transport) { }
public void transportReady(EquivalentAddressGroup addressGroup, ClientTransport transport) { }
/**
* Called when a transport is shutting down.
*/
public void transportShutdown(SocketAddress addr, ClientTransport transport, Status s) { }
public void transportShutdown(
EquivalentAddressGroup addressGroup, ClientTransport transport, Status s) { }
public abstract static class Factory {
/**

View File

@ -68,14 +68,14 @@ public final class SimpleLoadBalancerFactory extends LoadBalancer.Factory {
}
private static class SimpleLoadBalancer extends LoadBalancer {
@GuardedBy("servers")
private final List<ResolvedServerInfo> servers = new ArrayList<ResolvedServerInfo>();
@GuardedBy("servers")
private int currentServerIndex;
@GuardedBy("servers")
private final Object lock = new Object();
@GuardedBy("lock")
private EquivalentAddressGroup addresses;
@GuardedBy("lock")
private final BlankFutureProvider<ClientTransport> pendingPicks =
new BlankFutureProvider<ClientTransport>();
@GuardedBy("servers")
@GuardedBy("lock")
private StatusException nameResolutionError;
private final TransportManager tm;
@ -86,42 +86,41 @@ public final class SimpleLoadBalancerFactory extends LoadBalancer.Factory {
@Override
public ListenableFuture<ClientTransport> pickTransport(@Nullable RequestKey requestKey) {
ResolvedServerInfo currentServer;
synchronized (servers) {
if (servers.isEmpty()) {
EquivalentAddressGroup addressesCopy;
synchronized (lock) {
addressesCopy = addresses;
if (addressesCopy == null) {
if (nameResolutionError != null) {
return Futures.immediateFailedFuture(nameResolutionError);
}
return pendingPicks.newBlankFuture();
}
currentServer = servers.get(currentServerIndex);
}
return tm.getTransport(currentServer.getAddress());
return tm.getTransport(addressesCopy);
}
@Override
public void handleResolvedAddresses(
List<ResolvedServerInfo> updatedServers, Attributes config) {
BlankFutureProvider.FulfillmentBatch<ClientTransport> pendingPicksFulfillmentBatch;
final ResolvedServerInfo currentServer;
synchronized (servers) {
nameResolutionError = null;
servers.clear();
for (ResolvedServerInfo addr : updatedServers) {
servers.add(addr);
final EquivalentAddressGroup newAddresses;
synchronized (lock) {
ArrayList<SocketAddress> newAddressList =
new ArrayList<SocketAddress>(updatedServers.size());
for (ResolvedServerInfo server : updatedServers) {
newAddressList.add(server.getAddress());
}
if (servers.isEmpty()) {
newAddresses = new EquivalentAddressGroup(newAddressList);
if (newAddresses.equals(addresses)) {
return;
}
addresses = newAddresses;
nameResolutionError = null;
pendingPicksFulfillmentBatch = pendingPicks.createFulfillmentBatch();
if (currentServerIndex >= servers.size()) {
currentServerIndex = 0;
}
currentServer = servers.get(currentServerIndex);
}
pendingPicksFulfillmentBatch.link(new Supplier<ListenableFuture<ClientTransport>>() {
@Override public ListenableFuture<ClientTransport> get() {
return tm.getTransport(currentServer.getAddress());
return tm.getTransport(newAddresses);
}
});
}
@ -131,27 +130,11 @@ public final class SimpleLoadBalancerFactory extends LoadBalancer.Factory {
BlankFutureProvider.FulfillmentBatch<ClientTransport> pendingPicksFulfillmentBatch;
StatusException statusException =
error.augmentDescription("Name resolution failed").asException();
synchronized (servers) {
synchronized (lock) {
pendingPicksFulfillmentBatch = pendingPicks.createFulfillmentBatch();
nameResolutionError = statusException;
}
pendingPicksFulfillmentBatch.fail(statusException);
}
@Override
public void transportShutdown(SocketAddress addr, ClientTransport transport, Status s) {
if (!s.isOk()) {
// If the current transport is shut down due to error, move on to the next address in the
// list
synchronized (servers) {
if (addr.equals(servers.get(currentServerIndex).getAddress())) {
currentServerIndex++;
if (currentServerIndex >= servers.size()) {
currentServerIndex = 0;
}
}
}
}
}
}
}

View File

@ -49,12 +49,13 @@ public abstract class TransportManager {
public abstract void updateRetainedTransports(SocketAddress[] addrs);
/**
* Returns the future of a transport for the given server.
* Returns the future of a transport for any of the addresses from the given address group.
*
* <p>If the channel has been shut down, the value of the future will be {@code null}.
*/
// TODO(zhangkun83): GrpcLoadBalancer will use this to get transport to connect to LB servers,
// which would have a different authority than the primary servers. We need to figure out how to
// do it.
public abstract ListenableFuture<ClientTransport> getTransport(SocketAddress addr);
public abstract ListenableFuture<ClientTransport> getTransport(
EquivalentAddressGroup addressGroup);
}

View File

@ -47,6 +47,7 @@ import io.grpc.ClientInterceptors;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.DecompressorRegistry;
import io.grpc.EquivalentAddressGroup;
import io.grpc.ExperimentalApi;
import io.grpc.LoadBalancer;
import io.grpc.ManagedChannel;
@ -114,11 +115,11 @@ public final class ManagedChannelImpl extends ManagedChannel {
private final LoadBalancer loadBalancer;
/**
* Maps addresses to transports for that server.
* Maps EquivalentAddressGroups to transports for that server.
*/
@GuardedBy("lock")
private final Map<SocketAddress, TransportSet> transports =
new HashMap<SocketAddress, TransportSet>();
private final Map<EquivalentAddressGroup, TransportSet> transports =
new HashMap<EquivalentAddressGroup, TransportSet>();
@GuardedBy("lock")
private boolean shutdown;
@ -359,20 +360,21 @@ public final class ManagedChannelImpl extends ManagedChannel {
}
@Override
public ListenableFuture<ClientTransport> getTransport(final SocketAddress addr) {
public ListenableFuture<ClientTransport> getTransport(
final EquivalentAddressGroup addressGroup) {
TransportSet ts;
synchronized (lock) {
if (shutdown) {
return NULL_VALUE_TRANSPORT_FUTURE;
}
ts = transports.get(addr);
ts = transports.get(addressGroup);
if (ts == null) {
ts = new TransportSet(addr, authority(), loadBalancer, backoffPolicyProvider,
ts = new TransportSet(addressGroup, authority(), loadBalancer, backoffPolicyProvider,
transportFactory, scheduledExecutor, new TransportSet.Callback() {
@Override
public void onTerminated() {
synchronized (lock) {
transports.remove(addr);
transports.remove(addressGroup);
if (shutdown && transports.isEmpty()) {
if (terminated) {
log.warning("transportTerminated called after already terminated");
@ -384,7 +386,7 @@ public final class ManagedChannelImpl extends ManagedChannel {
}
}
});
transports.put(addr, ts);
transports.put(addressGroup, ts);
}
}
return ts.obtainActiveTransport();

View File

@ -36,12 +36,14 @@ import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.Status;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -53,7 +55,7 @@ import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
/**
* Transports for a single server.
* Transports for a single {@link SocketAddress}.
*/
@ThreadSafe
final class TransportSet {
@ -66,7 +68,7 @@ final class TransportSet {
}
private final Object lock = new Object();
private final SocketAddress server;
private final EquivalentAddressGroup addressGroup;
private final String authority;
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Callback callback;
@ -74,9 +76,18 @@ final class TransportSet {
private final ScheduledExecutorService scheduledExecutor;
@GuardedBy("lock")
@Nullable
private int nextAddressIndex;
@GuardedBy("lock")
private BackoffPolicy reconnectPolicy;
// The address index from which the current series of consecutive failing connection attempts
// started. -1 means the current series have not started.
// In the case of consecutive failures, the time between two attempts for this address is
// controlled by connectPolicy.
@GuardedBy("lock")
private int headIndex = -1;
@GuardedBy("lock")
@Nullable
private ScheduledFuture<?> reconnectTask;
@ -100,10 +111,10 @@ final class TransportSet {
*/
private volatile SettableFuture<ClientTransport> activeTransportFuture;
TransportSet(SocketAddress server, String authority, LoadBalancer loadBalancer,
TransportSet(EquivalentAddressGroup addressGroup, String authority, LoadBalancer loadBalancer,
BackoffPolicy.Provider backoffPolicyProvider, ClientTransportFactory transportFactory,
ScheduledExecutorService scheduledExecutor, Callback callback) {
this.server = server;
this.addressGroup = Preconditions.checkNotNull(addressGroup, "addressGroup");
this.authority = authority;
this.loadBalancer = loadBalancer;
this.backoffPolicyProvider = backoffPolicyProvider;
@ -150,6 +161,15 @@ final class TransportSet {
Preconditions.checkState(!shutdown, "Already shut down");
Preconditions.checkState(reconnectTask == null || reconnectTask.isDone(),
"previous reconnectTask is not done");
final int currentAddressIndex = nextAddressIndex;
List<SocketAddress> addrs = addressGroup.getAddresses();
final SocketAddress address = addrs.get(currentAddressIndex);
nextAddressIndex++;
if (nextAddressIndex >= addrs.size()) {
nextAddressIndex = 0;
}
Runnable createTransportRunnable = new Runnable() {
@Override
public void run() {
@ -157,28 +177,33 @@ final class TransportSet {
if (shutdown) {
return;
}
ClientTransport newActiveTransport = transportFactory.newClientTransport(
server, authority);
ClientTransport newActiveTransport =
transportFactory.newClientTransport(address, authority);
log.log(Level.INFO, "Created transport {0} for {1}",
new Object[] {newActiveTransport, server});
new Object[] {newActiveTransport, address});
transports.add(newActiveTransport);
newActiveTransport.start(
new TransportListener(newActiveTransport, activeTransportFuture));
new TransportListener(newActiveTransport, activeTransportFuture, address));
Preconditions.checkState(activeTransportFuture.set(newActiveTransport),
"failed to set the new transport to the future");
}
}
};
if (reconnectPolicy == null) {
// First connect attempt
reconnectPolicy = backoffPolicyProvider.get();
createTransportRunnable.run();
reconnectTask = null;
} else {
// Reconnect attempts
if (currentAddressIndex == headIndex) {
// Back to the first attempted address. Trigger back-off.
long delayMillis = reconnectPolicy.nextBackoffMillis();
reconnectTask = scheduledExecutor.schedule(
createTransportRunnable, delayMillis, TimeUnit.MILLISECONDS);
} else {
if (headIndex == -1) {
// First connect attempt, or the first attempt since last successful connection.
headIndex = currentAddressIndex;
reconnectPolicy = backoffPolicyProvider.get();
}
reconnectTask = null;
// No back-off this time.
createTransportRunnable.run();
}
}
@ -221,13 +246,15 @@ final class TransportSet {
}
private class TransportListener implements ClientTransport.Listener {
private final SocketAddress address;
private final ClientTransport transport;
private final SettableFuture<ClientTransport> transportFuture;
public TransportListener(ClientTransport transport,
SettableFuture<ClientTransport> transportFuture) {
SettableFuture<ClientTransport> transportFuture, SocketAddress address) {
this.transport = transport;
this.transportFuture = transportFuture;
this.address = address;
}
@GuardedBy("lock")
@ -238,26 +265,26 @@ final class TransportSet {
@Override
public void transportReady() {
synchronized (lock) {
log.log(Level.INFO, "Transport {0} for {1} is ready", new Object[] {transport, server});
log.log(Level.INFO, "Transport {0} for {1} is ready", new Object[] {transport, address});
Preconditions.checkState(transportFuture.isDone(), "the transport future is not done");
if (isAttachedToActiveTransport()) {
reconnectPolicy = null;
headIndex = -1;
}
}
loadBalancer.transportReady(server, transport);
loadBalancer.transportReady(addressGroup, transport);
}
@Override
public void transportShutdown(Status s) {
synchronized (lock) {
log.log(Level.INFO, "Transport {0} for {1} is being shutdown",
new Object[] {transport, server});
new Object[] {transport, address});
Preconditions.checkState(transportFuture.isDone(), "the transport future is not done");
if (isAttachedToActiveTransport()) {
createActiveTransportFuture();
}
}
loadBalancer.transportShutdown(server, transport, s);
loadBalancer.transportShutdown(addressGroup, transport, s);
}
@Override
@ -265,7 +292,7 @@ final class TransportSet {
boolean runCallback = false;
synchronized (lock) {
log.log(Level.INFO, "Transport {0} for {1} is terminated",
new Object[] {transport, server});
new Object[] {transport, address});
Preconditions.checkState(!isAttachedToActiveTransport(),
"Listener is still attached to activeTransportFuture. "
+ "Seems transportTerminated was not called.");

View File

@ -36,7 +36,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@ -44,7 +44,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@ -66,6 +65,7 @@ public class SimpleLoadBalancerTest {
private LoadBalancer loadBalancer;
private ArrayList<ResolvedServerInfo> servers;
private EquivalentAddressGroup addressGroup;
@Mock
private TransportManager mockTransportManager;
@ -76,16 +76,20 @@ public class SimpleLoadBalancerTest {
loadBalancer = SimpleLoadBalancerFactory.getInstance().newLoadBalancer(
"fakeservice", mockTransportManager);
servers = new ArrayList<ResolvedServerInfo>();
ArrayList<SocketAddress> addresses = new ArrayList<SocketAddress>();
for (int i = 0; i < 3; i++) {
servers.add(new ResolvedServerInfo(new FakeSocketAddress("server" + i), Attributes.EMPTY));
SocketAddress addr = new FakeSocketAddress("server" + i);
servers.add(new ResolvedServerInfo(addr, Attributes.EMPTY));
addresses.add(addr);
}
addressGroup = new EquivalentAddressGroup(addresses);
}
@Test
public void pickBeforeResolved() throws Exception {
ClientTransport mockTransport = mock(ClientTransport.class);
SettableFuture<ClientTransport> sourceFuture = SettableFuture.create();
when(mockTransportManager.getTransport(same(servers.get(0).getAddress())))
when(mockTransportManager.getTransport(eq(addressGroup)))
.thenReturn(sourceFuture);
ListenableFuture<ClientTransport> f1 = loadBalancer.pickTransport(null);
ListenableFuture<ClientTransport> f2 = loadBalancer.pickTransport(null);
@ -94,9 +98,9 @@ public class SimpleLoadBalancerTest {
assertNotSame(f1, f2);
assertFalse(f1.isDone());
assertFalse(f2.isDone());
verify(mockTransportManager, never()).getTransport(any(SocketAddress.class));
verify(mockTransportManager, never()).getTransport(any(EquivalentAddressGroup.class));
loadBalancer.handleResolvedAddresses(servers, Attributes.EMPTY);
verify(mockTransportManager, times(2)).getTransport(same(servers.get(0).getAddress()));
verify(mockTransportManager, times(2)).getTransport(eq(addressGroup));
assertFalse(f1.isDone());
assertFalse(f2.isDone());
assertNotSame(sourceFuture, f1);
@ -104,28 +108,22 @@ public class SimpleLoadBalancerTest {
sourceFuture.set(mockTransport);
assertSame(mockTransport, f1.get());
assertSame(mockTransport, f2.get());
ListenableFuture<ClientTransport> f3 = loadBalancer.pickTransport(null);
assertSame(sourceFuture, f3);
verify(mockTransportManager, times(3)).getTransport(same(servers.get(0).getAddress()));
verifyNoMoreInteractions(mockTransportManager);
}
@Test
public void transportFailed() throws Exception {
ClientTransport mockTransport1 = mock(ClientTransport.class);
ClientTransport mockTransport2 = mock(ClientTransport.class);
when(mockTransportManager.getTransport(same(servers.get(0).getAddress()))).thenReturn(
Futures.immediateFuture(mockTransport1));
when(mockTransportManager.getTransport(same(servers.get(1).getAddress()))).thenReturn(
Futures.immediateFuture(mockTransport2));
public void pickAfterResolved() throws Exception {
ClientTransport mockTransport = mock(ClientTransport.class);
SettableFuture<ClientTransport> sourceFuture = SettableFuture.create();
when(mockTransportManager.getTransport(eq(addressGroup)))
.thenReturn(sourceFuture);
loadBalancer.handleResolvedAddresses(servers, Attributes.EMPTY);
ListenableFuture<ClientTransport> f1 = loadBalancer.pickTransport(null);
ListenableFuture<ClientTransport> f2 = loadBalancer.pickTransport(null);
assertSame(mockTransport1, f1.get());
assertSame(mockTransport1, f2.get());
loadBalancer.transportShutdown(servers.get(0).getAddress(), mockTransport1, Status.INTERNAL);
ListenableFuture<ClientTransport> f3 = loadBalancer.pickTransport(null);
assertSame(mockTransport2, f3.get());
ListenableFuture<ClientTransport> f = loadBalancer.pickTransport(null);
assertSame(sourceFuture, f);
assertFalse(f.isDone());
sourceFuture.set(mockTransport);
assertSame(mockTransport, f.get());
verify(mockTransportManager).getTransport(addressGroup);
}
private static class FakeSocketAddress extends SocketAddress {

View File

@ -83,11 +83,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/** Unit tests for {@link ManagedChannelImpl}. */
@ -353,22 +351,6 @@ public class ManagedChannelImplTest {
.thenReturn(goodTransport);
when(mockTransportFactory.newClientTransport(same(badAddress), any(String.class)))
.thenReturn(badTransport);
final CountDownLatch badTransportFailed = new CountDownLatch(1);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
final ClientTransport.Listener listener = (ClientTransport.Listener) args[0];
executor.execute(new Runnable() {
@Override
public void run() {
listener.transportShutdown(Status.UNAVAILABLE);
}
});
badTransportFailed.countDown();
return null;
}
}).when(badTransport).start(any(ClientTransport.Listener.class));
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory(Arrays.asList(badServer, goodServer));
@ -389,8 +371,11 @@ public class ManagedChannelImplTest {
// First try should fail with the bad address.
call.start(mockCallListener, headers);
assertTrue(badTransportFailed.await(1000, TimeUnit.MILLISECONDS));
ArgumentCaptor<ClientTransport.Listener> badTransportListenerCaptor =
ArgumentCaptor.forClass(ClientTransport.Listener.class);
verify(mockCallListener, timeout(1000)).onClose(same(Status.UNAVAILABLE), any(Metadata.class));
verify(badTransport, timeout(1000)).start(badTransportListenerCaptor.capture());
badTransportListenerCaptor.getValue().transportShutdown(Status.UNAVAILABLE);
// Retry should work with the good address.
ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);

View File

@ -0,0 +1,254 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Attributes;
import io.grpc.ClientInterceptor;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.TransportManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.net.SocketAddress;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Unit tests for {@link ManagedChannelImpl}'s {@link TransportManager} implementation as well as
* {@link TransportSet}.
*/
@RunWith(JUnit4.class)
public class ManagedChannelImplTransportManagerTest {
private static final String authority = "fakeauthority";
private static final NameResolver.Factory nameResolverFactory = new NameResolver.Factory() {
@Override
public NameResolver newNameResolver(final URI targetUri, Attributes params) {
return new NameResolver() {
@Override public void start(final Listener listener) {
}
@Override public String getServiceAuthority() {
return authority;
}
@Override public void shutdown() {
}
};
}
};
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private ManagedChannelImpl channel;
@Mock private ClientTransportFactory mockTransportFactory;
@Mock private LoadBalancer.Factory mockLoadBalancerFactory;
@Mock private BackoffPolicy.Provider mockBackoffPolicyProvider;
@Mock private BackoffPolicy mockBackoffPolicy;
private TransportManager tm;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
when(mockBackoffPolicyProvider.get()).thenReturn(mockBackoffPolicy);
when(mockLoadBalancerFactory.newLoadBalancer(anyString(), any(TransportManager.class)))
.thenReturn(mock(LoadBalancer.class));
channel = new ManagedChannelImpl("fake://target", mockBackoffPolicyProvider,
nameResolverFactory, Attributes.EMPTY, mockLoadBalancerFactory,
mockTransportFactory, executor, null, Collections.<ClientInterceptor>emptyList());
ArgumentCaptor<TransportManager> tmCaptor = ArgumentCaptor.forClass(TransportManager.class);
verify(mockLoadBalancerFactory).newLoadBalancer(anyString(), tmCaptor.capture());
tm = tmCaptor.getValue();
}
@After
public void tearDown() {
channel.shutdown();
executor.shutdown();
}
@Test
public void createAndReuseTransport() throws Exception {
doAnswer(new Answer<ClientTransport>() {
@Override
public ClientTransport answer(InvocationOnMock invocation) throws Throwable {
return mock(ClientTransport.class);
}
}).when(mockTransportFactory).newClientTransport(any(SocketAddress.class), any(String.class));
SocketAddress addr = mock(SocketAddress.class);
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(addr);
ListenableFuture<ClientTransport> future1 = tm.getTransport(addressGroup);
verify(mockTransportFactory).newClientTransport(addr, authority);
ListenableFuture<ClientTransport> future2 = tm.getTransport(addressGroup);
assertNotNull(future1.get());
assertSame(future1.get(), future2.get());
verify(mockBackoffPolicyProvider).get();
verify(mockBackoffPolicy, times(0)).nextBackoffMillis();
verifyNoMoreInteractions(mockTransportFactory);
}
@Test
public void reconnect() throws Exception {
SocketAddress addr1 = mock(SocketAddress.class);
SocketAddress addr2 = mock(SocketAddress.class);
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(Arrays.asList(addr1, addr2));
LinkedList<ClientTransport.Listener> listeners =
TestUtils.captureListeners(mockTransportFactory);
// Invocation counters
int backoffReset = 0;
// Pick the first transport
ListenableFuture<ClientTransport> future1 = tm.getTransport(addressGroup);
assertNotNull(future1.get());
verify(mockTransportFactory).newClientTransport(addr1, authority);
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
// Fail the first transport, without setting it to ready
listeners.poll().transportShutdown(Status.UNAVAILABLE);
// Subsequent getTransport() will use the next address
ListenableFuture<ClientTransport> future2a = tm.getTransport(addressGroup);
assertNotNull(future2a.get());
// Will keep the previous back-off policy, and not consult back-off policy
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory).newClientTransport(addr2, authority);
ListenableFuture<ClientTransport> future2b = tm.getTransport(addressGroup);
assertSame(future2a.get(), future2b.get());
assertNotSame(future1.get(), future2a.get());
// Make the second transport ready
listeners.peek().transportReady();
// Disconnect the second transport
listeners.poll().transportShutdown(Status.UNAVAILABLE);
// Subsequent getTransport() will use the next address, which is the first one since we have run
// out of addresses.
ListenableFuture<ClientTransport> future3 = tm.getTransport(addressGroup);
assertNotSame(future1.get(), future3.get());
assertNotSame(future2a.get(), future3.get());
// This time back-off policy was reset, because previous transport was succesfully connected.
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
// Back-off policy was never consulted.
verify(mockBackoffPolicy, times(0)).nextBackoffMillis();
verify(mockTransportFactory, times(2)).newClientTransport(addr1, authority);
verifyNoMoreInteractions(mockTransportFactory);
assertEquals(1, listeners.size());
}
@Test
public void reconnectWithBackoff() throws Exception {
SocketAddress addr1 = mock(SocketAddress.class);
SocketAddress addr2 = mock(SocketAddress.class);
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(Arrays.asList(addr1, addr2));
LinkedList<ClientTransport.Listener> listeners =
TestUtils.captureListeners(mockTransportFactory);
// Invocation counters
int transportsAddr1 = 0;
int transportsAddr2 = 0;
int backoffConsulted = 0;
int backoffReset = 0;
// First pick succeeds
ListenableFuture<ClientTransport> future1 = tm.getTransport(addressGroup);
assertNotNull(future1.get());
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Back-off policy was set initially.
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
listeners.peek().transportReady();
// Then close it
listeners.poll().transportShutdown(Status.UNAVAILABLE);
// Second pick fails. This is the beginning of a series of failures.
ListenableFuture<ClientTransport> future2 = tm.getTransport(addressGroup);
assertNotNull(future2.get());
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Back-off policy was reset.
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
listeners.poll().transportShutdown(Status.UNAVAILABLE);
// Third pick fails too
ListenableFuture<ClientTransport> future3 = tm.getTransport(addressGroup);
assertNotNull(future3.get());
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Back-off policy was not reset.
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
listeners.poll().transportShutdown(Status.UNAVAILABLE);
// Forth pick is on addr2, back-off policy kicks in.
ListenableFuture<ClientTransport> future4 = tm.getTransport(addressGroup);
assertNotNull(future4.get());
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Back-off policy was not reset, but was consulted.
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockBackoffPolicy, times(++backoffConsulted)).nextBackoffMillis();
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.net.SocketAddress;
import java.util.LinkedList;
/**
* Common utility methods for tests.
*/
final class TestUtils {
/**
* Stub the given mock {@link ClientTransportFactory} by returning mock {@link ClientTransport}s
* which saves their listeners to a list which is returned by this method.
*/
static LinkedList<ClientTransport.Listener> captureListeners(
ClientTransportFactory mockTransportFactory) {
final LinkedList<ClientTransport.Listener> listeners =
new LinkedList<ClientTransport.Listener>();
doAnswer(new Answer<ClientTransport>() {
@Override
public ClientTransport answer(InvocationOnMock invocation) throws Throwable {
ClientTransport mockTransport = mock(ClientTransport.class);
// Save the listener
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
listeners.add((ClientTransport.Listener) invocation.getArguments()[0]);
return null;
}
}).when(mockTransport).start(any(ClientTransport.Listener.class));
return mockTransport;
}
}).when(mockTransportFactory).newClientTransport(any(SocketAddress.class), any(String.class));
return listeners;
}
}

View File

@ -0,0 +1,294 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.Status;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.PriorityQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* Unit tests for {@link TransportSet}.
*
* <p>It only tests the logic that is not covered by {@link ManagedChannelImplTransportManagerTest}.
*/
@RunWith(JUnit4.class)
public class TransportSetTest {
private static final String authority = "fakeauthority";
private long currentTimeMillis;
@Mock private LoadBalancer mockLoadBalancer;
@Mock private BackoffPolicy mockBackoffPolicy1;
@Mock private BackoffPolicy mockBackoffPolicy2;
@Mock private BackoffPolicy mockBackoffPolicy3;
@Mock private BackoffPolicy.Provider mockBackoffPolicyProvider;
@Mock private ClientTransportFactory mockTransportFactory;
@Mock private TransportSet.Callback mockTransportSetCallback;
@Mock private ScheduledExecutorService mockScheduledExecutorService;
private final PriorityQueue<Task> tasks = new PriorityQueue<Task>();
private TransportSet transportSet;
private EquivalentAddressGroup addressGroup;
private LinkedList<ClientTransport.Listener> listeners;
@Before public void setUp() {
MockitoAnnotations.initMocks(this);
when(mockBackoffPolicyProvider.get())
.thenReturn(mockBackoffPolicy1, mockBackoffPolicy2, mockBackoffPolicy3);
when(mockBackoffPolicy1.nextBackoffMillis()).thenReturn(10L, 100L);
when(mockBackoffPolicy2.nextBackoffMillis()).thenReturn(10L, 100L);
when(mockBackoffPolicy3.nextBackoffMillis()).thenReturn(10L, 100L);
doAnswer(new Answer<ScheduledFuture<?>>() {
@Override public ScheduledFuture<?> answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
Runnable task = (Runnable) args[0];
long delay = (Long) args[1];
TimeUnit unit = (TimeUnit) args[2];
tasks.add(new Task(currentTimeMillis + unit.toMillis(delay), task));
return null;
}
}).when(mockScheduledExecutorService)
.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
listeners = TestUtils.captureListeners(mockTransportFactory);
}
@Test public void singleAddressBackoff() {
SocketAddress addr = mock(SocketAddress.class);
createTransortSet(addr);
// Invocation counters
int transportsCreated = 0;
int backoff1Consulted = 0;
int backoff2Consulted = 0;
int backoffReset = 0;
// First attempt happens immediately (TransportSet aggressively maintains a transport)
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority);
// Fail this one
listeners.poll().transportShutdown(Status.UNAVAILABLE);
// Second attempt uses the first back-off value interval.
verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis();
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
// Transport creation doesn't happen until time is due
forwardTime(9);
verify(mockTransportFactory, times(transportsCreated)).newClientTransport(addr, authority);
forwardTime(1);
verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority);
// Fail this one too
listeners.poll().transportShutdown(Status.UNAVAILABLE);
// Third attempt uses the second back-off interval.
verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis();
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
// Transport creation doesn't happen until time is due
forwardTime(99);
verify(mockTransportFactory, times(transportsCreated)).newClientTransport(addr, authority);
forwardTime(1);
verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority);
// Let this one succeed
listeners.peek().transportReady();
// And close it
listeners.poll().transportShutdown(Status.UNAVAILABLE);
// Back-off is reset, and the next attempt will happen immediately
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority);
// Final checks for consultations on back-off policies
verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffMillis();
verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffMillis();
}
@Test public void twoAddressesBackoff() {
SocketAddress addr1 = mock(SocketAddress.class);
SocketAddress addr2 = mock(SocketAddress.class);
createTransortSet(addr1, addr2);
// Invocation counters
int transportsAddr1 = 0;
int transportsAddr2 = 0;
int backoff1Consulted = 0;
int backoff2Consulted = 0;
int backoff3Consulted = 0;
int backoffReset = 0;
// Connection happens immediately (TransportSet aggressively maintains a transport)
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Let this one through
listeners.peek().transportReady();
// Then shut it down
listeners.poll().transportShutdown(Status.UNAVAILABLE);
////// Now start a series of failing attempts, where addr2 is the head.
// First attempt after a connection closed. Reset back-off policy.
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Fail this one
listeners.poll().transportShutdown(Status.UNAVAILABLE);
// Second attempt will happen immediately. Keep back-off policy.
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Fail this one too
listeners.poll().transportShutdown(Status.UNAVAILABLE);
// Third attempt is on head, thus controlled by the first back-off interval.
verify(mockBackoffPolicy2, times(++backoff2Consulted)).nextBackoffMillis();
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
forwardTime(9);
verify(mockTransportFactory, times(transportsAddr2)).newClientTransport(addr2, authority);
forwardTime(1);
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Fail this one too
listeners.poll().transportShutdown(Status.UNAVAILABLE);
// Forth attempt will happen immediately. Keep back-off policy.
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Fail this one too
listeners.poll().transportShutdown(Status.UNAVAILABLE);
// Fifth attempt is on head, thus controlled by the second back-off interval.
verify(mockBackoffPolicy2, times(++backoff2Consulted)).nextBackoffMillis();
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
forwardTime(99);
verify(mockTransportFactory, times(transportsAddr2)).newClientTransport(addr2, authority);
forwardTime(1);
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Let it through
listeners.peek().transportReady();
// Then close it.
listeners.poll().transportShutdown(Status.UNAVAILABLE);
////// Now start a series of failing attempts, where addr1 is the head.
// First attempt after a connection closed. Reset back-off policy.
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Fail this one
listeners.poll().transportShutdown(Status.UNAVAILABLE);
// Second attempt will happen immediately. Keep back-off policy.
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Fail this one too
listeners.poll().transportShutdown(Status.UNAVAILABLE);
// Third attempt is on head, thus controlled by the first back-off interval.
verify(mockBackoffPolicy3, times(++backoff3Consulted)).nextBackoffMillis();
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
forwardTime(9);
verify(mockTransportFactory, times(transportsAddr1)).newClientTransport(addr1, authority);
forwardTime(1);
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Final checks on invocations on back-off policies
verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffMillis();
verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffMillis();
verify(mockBackoffPolicy3, times(backoff3Consulted)).nextBackoffMillis();
}
private static class Task implements Comparable<Task> {
final long dueTimeMillis;
final Runnable command;
Task(long dueTimeMillis, Runnable command) {
this.dueTimeMillis = dueTimeMillis;
this.command = command;
}
@Override public int compareTo(Task other) {
if (dueTimeMillis < other.dueTimeMillis) {
return -1;
} else if (dueTimeMillis > other.dueTimeMillis) {
return 1;
} else {
return 0;
}
}
}
private void runDueTasks() {
while (true) {
Task task = tasks.peek();
if (task == null || task.dueTimeMillis > currentTimeMillis) {
break;
}
tasks.poll();
task.command.run();
}
}
private void forwardTime(long millis) {
currentTimeMillis += millis;
runDueTasks();
}
private void createTransortSet(SocketAddress ... addrs) {
addressGroup = new EquivalentAddressGroup(Arrays.asList(addrs));
transportSet = new TransportSet(addressGroup, authority, mockLoadBalancer,
mockBackoffPolicyProvider, mockTransportFactory, mockScheduledExecutorService,
mockTransportSetCallback);
}
}