grpclb: re-implement GrpclbLoadBalancer in v2 API. (#2557)

Besides API changes, this implementation is also up-to-date with the
latest design:

1. Delegate to round-robin and pick-first policies if requested by
the naming system.

2. OOB channels to LoadBalancer always use the LB authority provided by
the naming system.

3. Never send application RPCs to balancer addresses, even if the
address returns UNIMPLEMENTED error.
This commit is contained in:
Kun Zhang 2016-12-29 15:00:04 -08:00 committed by GitHub
parent 3d210ae875
commit 322eb8c5c5
5 changed files with 1482 additions and 1 deletions

View File

@ -31,6 +31,8 @@
package io.grpc.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import java.util.LinkedList;
@ -97,7 +99,7 @@ final class ChannelExecutor {
*/
ChannelExecutor executeLater(Runnable runnable) {
synchronized (lock) {
queue.add(runnable);
queue.add(checkNotNull(runnable, "runnable is null"));
}
return this;
}

View File

@ -0,0 +1,65 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.grpclb;
import io.grpc.Attributes;
import io.grpc.ExperimentalApi;
/**
* Constants for the GRPCLB load-balancer.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1782")
public final class GrpclbConstants {
/**
* The load-balancing policy designated by the naming system.
*/
public enum LbPolicy {
PICK_FIRST,
ROUND_ROBIN,
GRPCLB
}
/**
* An attribute of a name resolution result, designating the LB policy.
*/
public static final Attributes.Key<LbPolicy> ATTR_LB_POLICY =
Attributes.Key.of("io.grpc.grpclb.lbPolicy");
/**
* The naming authority of an LB server address. It is an address-group-level attribute, present
* when the address group is a LoadBalancer.
*/
public static final Attributes.Key<String> ATTR_LB_ADDR_AUTHORITY =
Attributes.Key.of("io.grpc.grpclb.lbAddrAuthority");
private GrpclbConstants() { }
}

View File

@ -0,0 +1,500 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.grpclb;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer2;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.ResolvedServerInfoGroup;
import io.grpc.Status;
import io.grpc.grpclb.GrpclbConstants.LbPolicy;
import io.grpc.internal.LogId;
import io.grpc.internal.WithLogId;
import io.grpc.stub.StreamObserver;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* A {@link LoadBalancer2} that uses the GRPCLB protocol.
*
* <p>Optionally, when requested by the naming system, will delegate the work to a local pick-first
* or round-robin balancer.
*/
class GrpclbLoadBalancer2 extends LoadBalancer2 implements WithLogId {
private static final Logger logger = Logger.getLogger(GrpclbLoadBalancer2.class.getName());
@VisibleForTesting
static final SubchannelPicker BUFFER_PICKER = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(Attributes affinity, Metadata headers) {
return PickResult.withNoResult();
}
};
private final LogId logId = LogId.allocate(getClass().getName());
private final String serviceName;
private final Helper helper;
private final Factory pickFirstBalancerFactory;
private final Factory roundRobinBalancerFactory;
private static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO =
Attributes.Key.of("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo");
@VisibleForTesting
static final PickResult THROTTLED_RESULT =
PickResult.withError(Status.UNAVAILABLE.withDescription("Throttled by LB"));
// All mutable states in this class are mutated ONLY from Channel Executor
///////////////////////////////////////////////////////////////////////////////
// General states.
///////////////////////////////////////////////////////////////////////////////
// If not null, all work is delegated to it.
@Nullable
private LoadBalancer2 delegate;
private LbPolicy lbPolicy;
///////////////////////////////////////////////////////////////////////////////
// GRPCLB states, valid only if lbPolicy == GRPCLB
///////////////////////////////////////////////////////////////////////////////
// null if there isn't any available LB addresses.
// If non-null, never empty.
@Nullable
private List<LbAddressGroup> lbAddressGroups;
@Nullable
private ManagedChannel lbCommChannel;
// Points to the position of the LB address that lbCommChannel is bound to, if
// lbCommChannel != null.
private int currentLbIndex;
@Nullable
private LbResponseObserver lbResponseObserver;
@Nullable
private StreamObserver<LoadBalanceRequest> lbRequestWriter;
private Map<EquivalentAddressGroup, Subchannel> subchannels = Collections.emptyMap();
// A null element indicate a simulated error for throttling purpose
private List<EquivalentAddressGroup> roundRobinList = Collections.emptyList();
GrpclbLoadBalancer2(Helper helper, Factory pickFirstBalancerFactory,
Factory roundRobinBalancerFactory) {
this.helper = checkNotNull(helper, "helper");
this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority");
this.pickFirstBalancerFactory =
checkNotNull(pickFirstBalancerFactory, "pickFirstBalancerFactory");
this.roundRobinBalancerFactory =
checkNotNull(roundRobinBalancerFactory, "roundRobinBalancerFactory");
}
@Override
public LogId getLogId() {
return logId;
}
@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) {
if (delegate != null) {
delegate.handleSubchannelState(subchannel, newState);
return;
}
if (newState.getState() == SHUTDOWN || !(subchannels.values().contains(subchannel))) {
return;
}
if (newState.getState() == IDLE) {
subchannel.requestConnection();
}
subchannel.getAttributes().get(STATE_INFO).set(newState);
helper.updatePicker(makePicker());
}
@Override
public void handleResolvedAddresses(List<ResolvedServerInfoGroup> updatedServers,
Attributes attributes) {
LbPolicy newLbPolicy = attributes.get(GrpclbConstants.ATTR_LB_POLICY);
// LB addresses and backend addresses are treated separately
List<LbAddressGroup> newLbAddressGroups = new ArrayList<LbAddressGroup>();
List<ResolvedServerInfoGroup> newBackendServerInfoGroups =
new ArrayList<ResolvedServerInfoGroup>();
for (ResolvedServerInfoGroup serverInfoGroup : updatedServers) {
String lbAddrAuthority = serverInfoGroup.getAttributes().get(
GrpclbConstants.ATTR_LB_ADDR_AUTHORITY);
EquivalentAddressGroup eag = serverInfoGroup.toEquivalentAddressGroup();
if (lbAddrAuthority != null) {
newLbAddressGroups.add(new LbAddressGroup(eag, lbAddrAuthority));
} else {
newBackendServerInfoGroups.add(serverInfoGroup);
}
}
if (newBackendServerInfoGroups.isEmpty()) {
// handleResolvedAddresses()'s javadoc has guaranteed updatedServers is never empty.
checkState(!newLbAddressGroups.isEmpty(),
"No backend address nor LB address. updatedServers=%s", updatedServers);
if (newLbPolicy != LbPolicy.GRPCLB) {
newLbPolicy = LbPolicy.GRPCLB;
logger.log(Level.FINE, "[{0}] Switching to GRPCLB because all addresses are balancers",
logId);
}
}
if (newLbPolicy == null) {
logger.log(Level.FINE, "[{0}] New config missing policy. Using PICK_FIRST", logId);
newLbPolicy = LbPolicy.PICK_FIRST;
}
// Switch LB policy if requested
if (newLbPolicy != lbPolicy) {
shutdownDelegate();
shutdownLbComm();
lbAddressGroups = null;
currentLbIndex = 0;
switch (newLbPolicy) {
case PICK_FIRST:
delegate = checkNotNull(pickFirstBalancerFactory.newLoadBalancer(helper),
"pickFirstBalancerFactory.newLoadBalancer()");
break;
case ROUND_ROBIN:
delegate = checkNotNull(roundRobinBalancerFactory.newLoadBalancer(helper),
"roundRobinBalancerFactory.newLoadBalancer()");
break;
default:
// Do nohting
}
}
lbPolicy = newLbPolicy;
// Consume the new addresses
switch (lbPolicy) {
case PICK_FIRST:
case ROUND_ROBIN:
checkNotNull(delegate, "delegate should not be null. newLbPolicy=" + newLbPolicy);
delegate.handleResolvedAddresses(newBackendServerInfoGroups, attributes);
break;
case GRPCLB:
if (newLbAddressGroups.isEmpty()) {
shutdownLbComm();
lbAddressGroups = null;
handleGrpclbError(Status.UNAVAILABLE.withDescription(
"NameResolver returned no LB address while asking for GRPCLB"));
} else {
// See if the currently used LB server is in the new list.
int newIndexOfCurrentLb = -1;
if (lbAddressGroups != null) {
LbAddressGroup currentLb = lbAddressGroups.get(currentLbIndex);
newIndexOfCurrentLb = newLbAddressGroups.indexOf(currentLb);
}
lbAddressGroups = newLbAddressGroups;
if (newIndexOfCurrentLb == -1) {
shutdownLbComm();
currentLbIndex = 0;
startLbComm();
} else {
// Current LB is still in the list, calibrate index.
currentLbIndex = newIndexOfCurrentLb;
}
}
break;
default:
// Do nothing
}
}
private void shutdownLbComm() {
if (lbCommChannel != null) {
lbCommChannel.shutdown();
lbCommChannel = null;
}
if (lbRequestWriter != null) {
lbRequestWriter.onCompleted();
lbRequestWriter = null;
}
if (lbResponseObserver != null) {
lbResponseObserver.dismissed = true;
lbResponseObserver = null;
}
}
private void startLbComm() {
checkState(lbCommChannel == null, "previous lbCommChannel has not been closed yet");
checkState(lbRequestWriter == null, "previous lbRequestWriter has not been cleared yet");
checkState(lbResponseObserver == null, "previous lbResponseObserver has not been cleared yet");
LbAddressGroup currentLb = lbAddressGroups.get(currentLbIndex);
lbCommChannel = helper.createOobChannel(currentLb.getAddresses(), currentLb.getAuthority());
LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(lbCommChannel);
lbResponseObserver = new LbResponseObserver();
lbRequestWriter = stub.balanceLoad(lbResponseObserver);
LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder()
.setInitialRequest(InitialLoadBalanceRequest.newBuilder()
.setName(helper.getAuthority()).build())
.build();
lbRequestWriter.onNext(initRequest);
}
private void shutdownDelegate() {
if (delegate != null) {
delegate.shutdown();
delegate = null;
}
}
@Override
public void shutdown() {
shutdownDelegate();
shutdownLbComm();
for (Subchannel subchannel : subchannels.values()) {
subchannel.shutdown();
}
subchannels = Collections.emptyMap();
}
private void handleGrpclbError(Status status) {
logger.log(Level.FINE, "[{0}] Had an error: {1}; roundRobinList={2}",
new Object[] {logId, status, roundRobinList});
if (roundRobinList.isEmpty()) {
helper.updatePicker(new ErrorPicker(status));
}
}
@Override
public void handleNameResolutionError(Status error) {
if (delegate != null) {
delegate.handleNameResolutionError(error);
} else {
handleGrpclbError(error);
}
}
private class LbResponseObserver implements StreamObserver<LoadBalanceResponse> {
boolean dismissed;
@Override public void onNext(final LoadBalanceResponse response) {
helper.runSerialized(new Runnable() {
@Override
public void run() {
handleResponse(response);
}
});
}
private void handleResponse(LoadBalanceResponse response) {
if (dismissed) {
return;
}
logger.log(Level.FINE, "[{0}] Got an LB response: {1}", new Object[] {logId, response});
// TODO(zhangkun83): make use of initialResponse
// InitialLoadBalanceResponse initialResponse = response.getInitialResponse();
ServerList serverList = response.getServerList();
HashMap<EquivalentAddressGroup, Subchannel> newSubchannelMap =
new HashMap<EquivalentAddressGroup, Subchannel>();
List<EquivalentAddressGroup> newRoundRobinList = new ArrayList<EquivalentAddressGroup>();
// TODO(zhangkun83): honor expiration_interval
// Construct the new collections. Create new Subchannels when necessary.
for (Server server : serverList.getServersList()) {
if (server.getDropRequest()) {
newRoundRobinList.add(null);
} else {
InetSocketAddress address;
try {
address = new InetSocketAddress(
InetAddress.getByAddress(server.getIpAddress().toByteArray()), server.getPort());
} catch (UnknownHostException e) {
handleGrpclbError(Status.UNAVAILABLE.withCause(e));
continue;
}
EquivalentAddressGroup eag = new EquivalentAddressGroup(address);
// TODO(zhangkun83): save the LB token and insert it to the application RPCs' headers.
if (!newSubchannelMap.containsKey(eag)) {
Attributes subchannelAttrs = Attributes.newBuilder()
.set(STATE_INFO,
new AtomicReference<ConnectivityStateInfo>(
ConnectivityStateInfo.forNonError(IDLE)))
.build();
Subchannel subchannel = helper.createSubchannel(eag, subchannelAttrs);
subchannel.requestConnection();
newSubchannelMap.put(eag, subchannel);
}
newRoundRobinList.add(eag);
}
}
// Close Subchannels whose addresses have been delisted
for (Entry<EquivalentAddressGroup, Subchannel> entry : subchannels.entrySet()) {
EquivalentAddressGroup eag = entry.getKey();
if (!newSubchannelMap.containsKey(eag)) {
entry.getValue().shutdown();
}
}
subchannels = newSubchannelMap;
roundRobinList = newRoundRobinList;
helper.updatePicker(makePicker());
}
@Override public void onError(final Throwable error) {
helper.runSerialized(new Runnable() {
@Override
public void run() {
handleStreamClosed(Status.fromThrowable(error)
.augmentDescription("Stream to GRPCLB LoadBalancer had an error"));
}
});
}
@Override public void onCompleted() {
helper.runSerialized(new Runnable() {
@Override
public void run() {
handleStreamClosed(Status.UNAVAILABLE.augmentDescription(
"Stream to GRPCLB LoadBalancer was closed"));
}
});
}
private void handleStreamClosed(Status status) {
if (dismissed) {
return;
}
lbRequestWriter = null;
handleGrpclbError(status);
shutdownLbComm();
currentLbIndex = (currentLbIndex + 1) % lbAddressGroups.size();
startLbComm();
}
}
/**
* Make a picker out of the current roundRobinList and the states of subchannels.
*/
private SubchannelPicker makePicker() {
List<PickResult> resultList = new ArrayList<PickResult>();
Status error = null;
for (EquivalentAddressGroup eag : roundRobinList) {
if (eag == null) {
resultList.add(THROTTLED_RESULT);
} else {
Subchannel subchannel = subchannels.get(eag);
checkNotNull(subchannel, "Subchannel for %s not found", eag);
Attributes attrs = subchannel.getAttributes();
ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get();
if (stateInfo.getState() == READY) {
resultList.add(PickResult.withSubchannel(subchannel));
} else if (stateInfo.getState() == TRANSIENT_FAILURE) {
error = stateInfo.getStatus();
}
}
}
if (resultList.isEmpty()) {
if (error != null) {
logger.log(Level.FINE, "[{0}] No ready Subchannel. Using error: {1}",
new Object[] {logId, error});
return new ErrorPicker(error);
} else {
logger.log(Level.FINE, "[{0}] No ready Subchannel and no error", logId);
return BUFFER_PICKER;
}
} else {
logger.log(Level.FINE, "[{0}] Using list {1}", new Object[] {logId, resultList});
return new RoundRobinPicker(resultList);
}
}
@VisibleForTesting
LoadBalancer2 getDelegate() {
return delegate;
}
@VisibleForTesting
LbPolicy getLbPolicy() {
return lbPolicy;
}
@VisibleForTesting
static final class ErrorPicker extends SubchannelPicker {
final PickResult result;
ErrorPicker(Status status) {
result = PickResult.withError(status);
}
@Override
public PickResult pickSubchannel(Attributes affinity, Metadata headers) {
return result;
}
}
@VisibleForTesting
static final class RoundRobinPicker extends SubchannelPicker {
final List<PickResult> list;
int index;
RoundRobinPicker(List<PickResult> resultList) {
checkArgument(!resultList.isEmpty(), "resultList is empty");
list = resultList;
}
@Override
public PickResult pickSubchannel(Attributes affinity, Metadata headers) {
synchronized (list) {
PickResult result = list.get(index);
index++;
if (index == list.size()) {
index = 0;
}
return result;
}
}
}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.grpclb;
import static com.google.common.base.Preconditions.checkNotNull;
import io.grpc.EquivalentAddressGroup;
/**
* Represents a balancer address entry.
*/
class LbAddressGroup {
private final EquivalentAddressGroup addresses;
private final String authority;
LbAddressGroup(EquivalentAddressGroup addresses, String authority) {
this.addresses = checkNotNull(addresses, "addresses");
this.authority = checkNotNull(authority, "authority");
}
@Override
public boolean equals(Object other) {
if (!(other instanceof LbAddressGroup)) {
return false;
}
LbAddressGroup otherGroup = (LbAddressGroup) other;
return addresses.equals(otherGroup.addresses) && authority.equals(otherGroup.authority);
}
@Override
public int hashCode() {
return addresses.hashCode();
}
EquivalentAddressGroup getAddresses() {
return addresses;
}
String getAuthority() {
return authority;
}
}

View File

@ -0,0 +1,843 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.grpclb;
import static com.google.common.base.Charsets.UTF_8;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer2.Helper;
import io.grpc.LoadBalancer2.PickResult;
import io.grpc.LoadBalancer2.Subchannel;
import io.grpc.LoadBalancer2.SubchannelPicker;
import io.grpc.LoadBalancer2;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.MethodDescriptor;
import io.grpc.ResolvedServerInfo;
import io.grpc.ResolvedServerInfoGroup;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.grpclb.GrpclbConstants.LbPolicy;
import io.grpc.grpclb.GrpclbLoadBalancer2.ErrorPicker;
import io.grpc.grpclb.GrpclbLoadBalancer2.RoundRobinPicker;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.SerializingExecutor;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
/** Unit tests for {@link GrpclbLoadBalancer2}. */
@RunWith(JUnit4.class)
public class GrpclbLoadBalancer2Test {
private static final Attributes.Key<String> RESOLUTION_ATTR =
Attributes.Key.of("resolution-attr");
private static final String SERVICE_AUTHORITY = "api.google.com";
private static final MethodDescriptor<String, String> TRASH_METHOD = MethodDescriptor.create(
MethodDescriptor.MethodType.UNARY, "/service/trashmethod",
new StringMarshaller(), new StringMarshaller());
private static class StringMarshaller implements Marshaller<String> {
static final StringMarshaller INSTANCE = new StringMarshaller();
@Override
public InputStream stream(String value) {
return new ByteArrayInputStream(value.getBytes(UTF_8));
}
@Override
public String parse(InputStream stream) {
try {
return new String(ByteStreams.toByteArray(stream), UTF_8);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
@Mock
private Helper helper;
@Mock
private Subchannel mockSubchannel;
private LoadBalancerGrpc.LoadBalancerImplBase mockLbService;
@Captor
private ArgumentCaptor<StreamObserver<LoadBalanceResponse>> lbResponseObserverCaptor;
private final LinkedList<StreamObserver<LoadBalanceRequest>> lbRequestObservers =
new LinkedList<StreamObserver<LoadBalanceRequest>>();
private final LinkedList<Subchannel> mockSubchannels = new LinkedList<Subchannel>();
private final LinkedList<ManagedChannel> fakeOobChannels = new LinkedList<ManagedChannel>();
private final ArrayList<Subchannel> subchannelTracker = new ArrayList<Subchannel>();
private final ArrayList<ManagedChannel> oobChannelTracker = new ArrayList<ManagedChannel>();
private final ArrayList<String> failingLbAuthorities = new ArrayList<String>();
private io.grpc.Server fakeLbServer;
@Captor
private ArgumentCaptor<SubchannelPicker> pickerCaptor;
private final SerializingExecutor channelExecutor =
new SerializingExecutor(MoreExecutors.directExecutor());
private final Metadata headers = new Metadata();
@Mock
private LoadBalancer2.Factory pickFirstBalancerFactory;
@Mock
private LoadBalancer2 pickFirstBalancer;
@Mock
private LoadBalancer2.Factory roundRobinBalancerFactory;
@Mock
private LoadBalancer2 roundRobinBalancer;
private GrpclbLoadBalancer2 balancer;
@SuppressWarnings("unchecked")
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
when(pickFirstBalancerFactory.newLoadBalancer(any(Helper.class)))
.thenReturn(pickFirstBalancer);
when(roundRobinBalancerFactory.newLoadBalancer(any(Helper.class)))
.thenReturn(roundRobinBalancer);
mockLbService = spy(new LoadBalancerGrpc.LoadBalancerImplBase() {
@Override
public StreamObserver<LoadBalanceRequest> balanceLoad(
final StreamObserver<LoadBalanceResponse> responseObserver) {
StreamObserver<LoadBalanceRequest> requestObserver =
(StreamObserver<LoadBalanceRequest>) mock(StreamObserver.class);
Answer<Void> closeRpc = new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
responseObserver.onCompleted();
return null;
}
};
doAnswer(closeRpc).when(requestObserver).onCompleted();
lbRequestObservers.add(requestObserver);
return requestObserver;
}
});
fakeLbServer = InProcessServerBuilder.forName("fakeLb")
.directExecutor().addService(mockLbService).build().start();
doAnswer(new Answer<ManagedChannel>() {
@Override
public ManagedChannel answer(InvocationOnMock invocation) throws Throwable {
String authority = (String) invocation.getArguments()[1];
ManagedChannel channel;
if (failingLbAuthorities.contains(authority)) {
channel = InProcessChannelBuilder.forName("nonExistFakeLb").directExecutor().build();
} else {
channel = InProcessChannelBuilder.forName("fakeLb").directExecutor().build();
}
// TODO(zhangkun83): #2444: non-determinism of Channel due to starting NameResolver on the
// timer "Prime" it before use. Remove it after #2444 is resolved.
try {
ClientCalls.blockingUnaryCall(channel, TRASH_METHOD, CallOptions.DEFAULT, "trash");
} catch (StatusRuntimeException ignored) {
// Ignored
}
fakeOobChannels.add(channel);
oobChannelTracker.add(channel);
return channel;
}
}).when(helper).createOobChannel(any(EquivalentAddressGroup.class), any(String.class));
doAnswer(new Answer<Subchannel>() {
@Override
public Subchannel answer(InvocationOnMock invocation) throws Throwable {
Subchannel subchannel = mock(Subchannel.class);
EquivalentAddressGroup eag = (EquivalentAddressGroup) invocation.getArguments()[0];
Attributes attrs = (Attributes) invocation.getArguments()[1];
when(subchannel.getAddresses()).thenReturn(eag);
when(subchannel.getAttributes()).thenReturn(attrs);
mockSubchannels.add(subchannel);
subchannelTracker.add(subchannel);
return subchannel;
}
}).when(helper).createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class));
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Runnable task = (Runnable) invocation.getArguments()[0];
channelExecutor.execute(task);
return null;
}
}).when(helper).runSerialized(any(Runnable.class));
when(helper.getAuthority()).thenReturn(SERVICE_AUTHORITY);
balancer = new GrpclbLoadBalancer2(helper, pickFirstBalancerFactory, roundRobinBalancerFactory);
}
@After
public void tearDown() {
try {
if (balancer != null) {
channelExecutor.execute(new Runnable() {
@Override
public void run() {
balancer.shutdown();
}
});
}
for (ManagedChannel channel : oobChannelTracker) {
assertTrue(channel + " is shutdown", channel.isShutdown());
// balancer should have closed the LB stream, terminating the OOB channel.
assertTrue(channel + " is terminated", channel.isTerminated());
}
for (Subchannel subchannel: subchannelTracker) {
verify(subchannel).shutdown();
}
} finally {
if (fakeLbServer != null) {
fakeLbServer.shutdownNow();
}
}
}
@Test
public void errorPicker() {
Status error = Status.UNAVAILABLE.withDescription("Just don't know why");
ErrorPicker picker = new ErrorPicker(error);
assertSame(error, picker.pickSubchannel(Attributes.EMPTY, headers).getStatus());
}
@Test
public void roundRobinPicker() {
PickResult pr1 = PickResult.withError(Status.UNAVAILABLE.withDescription("Just error"));
PickResult pr2 = PickResult.withSubchannel(mockSubchannel);
List<PickResult> list = Arrays.asList(pr1, pr2);
RoundRobinPicker picker = new RoundRobinPicker(list);
assertSame(pr1, picker.pickSubchannel(Attributes.EMPTY, headers));
assertSame(pr2, picker.pickSubchannel(Attributes.EMPTY, headers));
assertSame(pr1, picker.pickSubchannel(Attributes.EMPTY, headers));
}
@Test
public void bufferPicker() {
assertEquals(PickResult.withNoResult(),
GrpclbLoadBalancer2.BUFFER_PICKER.pickSubchannel(Attributes.EMPTY, headers));
}
@Test
public void nameResolutionFailsThenRecoverToDelegate() {
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);
verify(helper).updatePicker(pickerCaptor.capture());
ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue();
assertSame(error, errorPicker.result.getStatus());
// Recover with a subsequent success
List<ResolvedServerInfoGroup> resolvedServers = createResolvedServerInfoGroupList(false);
EquivalentAddressGroup eag = resolvedServers.get(0).toEquivalentAddressGroup();
Attributes resolutionAttrs = Attributes.newBuilder().set(RESOLUTION_ATTR, "yeah").build();
deliverResolvedAddresses(resolvedServers, resolutionAttrs);
verify(pickFirstBalancerFactory).newLoadBalancer(helper);
verify(pickFirstBalancer).handleResolvedAddresses(eq(resolvedServers), eq(resolutionAttrs));
verifyNoMoreInteractions(roundRobinBalancerFactory);
verifyNoMoreInteractions(roundRobinBalancer);
}
@Test
public void nameResolutionFailsThenRecoverToGrpclb() {
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);
verify(helper).updatePicker(pickerCaptor.capture());
ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue();
assertSame(error, errorPicker.result.getStatus());
// Recover with a subsequent success
List<ResolvedServerInfoGroup> resolvedServers = createResolvedServerInfoGroupList(true);
EquivalentAddressGroup eag = resolvedServers.get(0).toEquivalentAddressGroup();
Attributes resolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
deliverResolvedAddresses(resolvedServers, resolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
assertNull(balancer.getDelegate());
verify(helper).createOobChannel(eq(eag), eq(lbAuthority(0)));
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
verifyNoMoreInteractions(pickFirstBalancerFactory);
verifyNoMoreInteractions(pickFirstBalancer);
verifyNoMoreInteractions(roundRobinBalancerFactory);
verifyNoMoreInteractions(roundRobinBalancer);
}
@Test
public void delegatingPickFirstThenNameResolutionFails() {
List<ResolvedServerInfoGroup> resolvedServers = createResolvedServerInfoGroupList(false);
Attributes resolutionAttrs = Attributes.newBuilder().set(RESOLUTION_ATTR, "yeah").build();
deliverResolvedAddresses(resolvedServers, resolutionAttrs);
verify(pickFirstBalancerFactory).newLoadBalancer(helper);
verify(pickFirstBalancer).handleResolvedAddresses(eq(resolvedServers), eq(resolutionAttrs));
// Then let name resolution fail. The error will be passed directly to the delegate.
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);
verify(pickFirstBalancer).handleNameResolutionError(error);
verify(helper, never()).updatePicker(any(SubchannelPicker.class));
verifyNoMoreInteractions(roundRobinBalancerFactory);
verifyNoMoreInteractions(roundRobinBalancer);
}
@Test
public void delegatingRoundRobinThenNameResolutionFails() {
List<ResolvedServerInfoGroup> resolvedServers = createResolvedServerInfoGroupList(false, false);
Attributes resolutionAttrs = Attributes.newBuilder()
.set(RESOLUTION_ATTR, "yeah")
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.ROUND_ROBIN)
.build();
deliverResolvedAddresses(resolvedServers, resolutionAttrs);
verify(roundRobinBalancerFactory).newLoadBalancer(helper);
verify(roundRobinBalancer).handleResolvedAddresses(resolvedServers, resolutionAttrs);
// Then let name resolution fail. The error will be passed directly to the delegate.
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);
verify(roundRobinBalancer).handleNameResolutionError(error);
verify(helper, never()).updatePicker(any(SubchannelPicker.class));
verifyNoMoreInteractions(pickFirstBalancerFactory);
verifyNoMoreInteractions(pickFirstBalancer);
}
@Test
public void grpclbThenNameResolutionFails() {
InOrder inOrder = inOrder(helper);
// Go to GRPCLB first
List<ResolvedServerInfoGroup> grpclbResolutionList =
createResolvedServerInfoGroupList(true, true);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
assertNull(balancer.getDelegate());
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0).toEquivalentAddressGroup()),
eq(lbAuthority(0)));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
// Let name resolution fail before round-robin list is ready
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue();
assertSame(error, errorPicker.result.getStatus());
// Simulate receiving LB response
List<InetSocketAddress> backends = Arrays.asList(
new InetSocketAddress("127.0.0.1", 2000),
new InetSocketAddress("127.0.0.1", 2010));
verify(helper, never()).runSerialized(any(Runnable.class));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends));
verify(helper, times(2)).runSerialized(any(Runnable.class));
inOrder.verify(helper).createSubchannel(
eq(new EquivalentAddressGroup(backends.get(0))), any(Attributes.class));
inOrder.verify(helper).createSubchannel(
eq(new EquivalentAddressGroup(backends.get(1))), any(Attributes.class));
}
@SuppressWarnings("unchecked")
@Test
public void switchPolicy() {
// Go to GRPCLB first
List<ResolvedServerInfoGroup> grpclbResolutionList =
createResolvedServerInfoGroupList(true, false, true);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
assertNull(balancer.getDelegate());
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0).toEquivalentAddressGroup()),
eq(lbAuthority(0)));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
// Switch to PICK_FIRST
List<ResolvedServerInfoGroup> pickFirstResolutionList =
createResolvedServerInfoGroupList(true, false, true);
Attributes pickFirstResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.PICK_FIRST).build();
verify(pickFirstBalancerFactory, never()).newLoadBalancer(any(Helper.class));
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver, never()).onCompleted();
assertFalse(oobChannel.isShutdown());
deliverResolvedAddresses(pickFirstResolutionList, pickFirstResolutionAttrs);
verify(pickFirstBalancerFactory).newLoadBalancer(same(helper));
// Only non-LB addresses are passed to the delegate
verify(pickFirstBalancer).handleResolvedAddresses(
eq(Arrays.asList(pickFirstResolutionList.get(1))), same(pickFirstResolutionAttrs));
assertSame(LbPolicy.PICK_FIRST, balancer.getLbPolicy());
assertSame(pickFirstBalancer, balancer.getDelegate());
// GRPCLB connection is closed
verify(lbRequestObserver).onCompleted();
assertTrue(oobChannel.isShutdown());
// Switch to ROUND_ROBIN
List<ResolvedServerInfoGroup> roundRobinResolutionList =
createResolvedServerInfoGroupList(true, false, false);
Attributes roundRobinResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.ROUND_ROBIN).build();
verify(roundRobinBalancerFactory, never()).newLoadBalancer(any(Helper.class));
deliverResolvedAddresses(roundRobinResolutionList, roundRobinResolutionAttrs);
verify(roundRobinBalancerFactory).newLoadBalancer(same(helper));
// Only non-LB addresses are passed to the delegate
verify(roundRobinBalancer).handleResolvedAddresses(
eq(roundRobinResolutionList.subList(1, 3)), same(roundRobinResolutionAttrs));
assertSame(LbPolicy.ROUND_ROBIN, balancer.getLbPolicy());
assertSame(roundRobinBalancer, balancer.getDelegate());
// Special case: if all addresses are loadbalancers, use GRPCLB no matter what the NameResolver
// says.
grpclbResolutionList = createResolvedServerInfoGroupList(true, true, true);
grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.PICK_FIRST).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
assertNull(balancer.getDelegate());
verify(helper, times(2)).createOobChannel(
eq(grpclbResolutionList.get(0).toEquivalentAddressGroup()),
eq(lbAuthority(0)));
verify(helper, times(2)).createOobChannel(any(EquivalentAddressGroup.class), any(String.class));
assertEquals(1, fakeOobChannels.size());
oobChannel = fakeOobChannels.poll();
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
// Special case: PICK_FIRST is the default
pickFirstResolutionList = createResolvedServerInfoGroupList(true, false, false);
pickFirstResolutionAttrs = Attributes.EMPTY;
verify(pickFirstBalancerFactory).newLoadBalancer(any(Helper.class));
assertFalse(oobChannel.isShutdown());
deliverResolvedAddresses(pickFirstResolutionList, pickFirstResolutionAttrs);
verify(pickFirstBalancerFactory, times(2)).newLoadBalancer(same(helper));
// Only non-LB addresses are passed to the delegate
verify(pickFirstBalancer).handleResolvedAddresses(
eq(pickFirstResolutionList.subList(1, 3)), same(pickFirstResolutionAttrs));
assertSame(LbPolicy.PICK_FIRST, balancer.getLbPolicy());
assertSame(pickFirstBalancer, balancer.getDelegate());
// GRPCLB connection is closed
assertTrue(oobChannel.isShutdown());
}
@Test
public void grpclbWorking() {
InOrder inOrder = inOrder(helper);
List<ResolvedServerInfoGroup> grpclbResolutionList =
createResolvedServerInfoGroupList(true, true);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
assertNull(balancer.getDelegate());
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0).toEquivalentAddressGroup()),
eq(lbAuthority(0)));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
// Simulate receiving LB response
List<InetSocketAddress> backends1 = Arrays.asList(
new InetSocketAddress("127.0.0.1", 2000),
new InetSocketAddress("127.0.0.1", 2010));
inOrder.verify(helper, never()).updatePicker(any(SubchannelPicker.class));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends1));
inOrder.verify(helper).createSubchannel(
eq(new EquivalentAddressGroup(backends1.get(0))), any(Attributes.class));
inOrder.verify(helper).createSubchannel(
eq(new EquivalentAddressGroup(backends1.get(1))), any(Attributes.class));
assertEquals(2, mockSubchannels.size());
Subchannel subchannel1 = mockSubchannels.poll();
Subchannel subchannel2 = mockSubchannels.poll();
verify(subchannel1).requestConnection();
verify(subchannel2).requestConnection();
assertEquals(new EquivalentAddressGroup(backends1.get(0)), subchannel1.getAddresses());
assertEquals(new EquivalentAddressGroup(backends1.get(1)), subchannel2.getAddresses());
// Before any subchannel is READY, a buffer picker will be provided
inOrder.verify(helper).updatePicker(same(GrpclbLoadBalancer2.BUFFER_PICKER));
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verify(helper, times(2)).updatePicker(same(GrpclbLoadBalancer2.BUFFER_PICKER));
// Let subchannels be connected
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue();
assertRoundRobinList(picker1, subchannel2);
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
assertRoundRobinList(picker2, subchannel1, subchannel2);
// Disconnected subchannels
verify(subchannel1).requestConnection();
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE));
verify(subchannel1, times(2)).requestConnection();
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue();
assertRoundRobinList(picker3, subchannel2);
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue();
assertRoundRobinList(picker4, subchannel2);
// As long as there is at least one READY subchannel, round robin will work.
Status error1 = Status.UNAVAILABLE.withDescription("error1");
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error1));
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
RoundRobinPicker picker5 = (RoundRobinPicker) pickerCaptor.getValue();
assertRoundRobinList(picker5, subchannel2);
// If no subchannel is READY, will propagate an error from an arbitrary subchannel (but here
// only subchannel1 has error).
verify(subchannel2).requestConnection();
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(IDLE));
verify(subchannel2, times(2)).requestConnection();
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
ErrorPicker picker6 = (ErrorPicker) pickerCaptor.getValue();
assertNull(picker6.result.getSubchannel());
assertSame(error1, picker6.result.getStatus());
// Update backends, with a drop entry
List<InetSocketAddress> backends2 = Arrays.asList(
new InetSocketAddress("127.0.0.1", 2030), null);
verify(subchannel1, never()).shutdown();
verify(subchannel2, never()).shutdown();
lbResponseObserver.onNext(buildLbResponse(backends2));
verify(subchannel1).shutdown();
verify(subchannel2).shutdown();
inOrder.verify(helper).createSubchannel(
eq(new EquivalentAddressGroup(backends2.get(0))), any(Attributes.class));
assertEquals(1, mockSubchannels.size());
Subchannel subchannel3 = mockSubchannels.poll();
verify(subchannel3).requestConnection();
assertEquals(new EquivalentAddressGroup(backends2.get(0)), subchannel3.getAddresses());
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
RoundRobinPicker picker7 = (RoundRobinPicker) pickerCaptor.getValue();
assertRoundRobinList(picker7, (Subchannel) null);
// State updates on obsolete subchannels will have no effect
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(SHUTDOWN));
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(SHUTDOWN));
inOrder.verifyNoMoreInteractions();
deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
RoundRobinPicker picker8 = (RoundRobinPicker) pickerCaptor.getValue();
assertRoundRobinList(picker8, subchannel3, null);
verify(subchannel3, never()).shutdown();
assertFalse(oobChannel.isShutdown());
assertEquals(1, lbRequestObservers.size());
verify(lbRequestObservers.peek(), never()).onCompleted();
verify(lbRequestObservers.peek(), never()).onError(any(Throwable.class));
}
@Test
public void grpclbBalanerCommErrors() {
InOrder inOrder = inOrder(helper, mockLbService);
// Make the first LB address fail to connect
failingLbAuthorities.add(lbAuthority(0));
List<ResolvedServerInfoGroup> grpclbResolutionList =
createResolvedServerInfoGroupList(true, true, true);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
// First LB addr fails to connect
inOrder.verify(helper).createOobChannel(
eq(grpclbResolutionList.get(0).toEquivalentAddressGroup()),
eq(lbAuthority(0)));
inOrder.verify(helper).updatePicker(isA(ErrorPicker.class));
assertEquals(2, fakeOobChannels.size());
assertTrue(fakeOobChannels.poll().isShutdown());
// Will move on to second LB addr
inOrder.verify(helper).createOobChannel(
eq(grpclbResolutionList.get(1).toEquivalentAddressGroup()),
eq(lbAuthority(1)));
inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
lbRequestObservers.poll();
assertEquals(1, fakeOobChannels.size());
assertFalse(fakeOobChannels.peek().isShutdown());
Status error1 = Status.UNAVAILABLE.withDescription("error1");
// Simulate that the stream on the second LB failed
lbResponseObserver.onError(error1.asException());
assertTrue(fakeOobChannels.poll().isShutdown());
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue();
assertEquals(error1.getCode(), errorPicker.result.getStatus().getCode());
assertTrue(errorPicker.result.getStatus().getDescription().contains(error1.getDescription()));
// Move on to the third LB.
inOrder.verify(helper).createOobChannel(
eq(grpclbResolutionList.get(2).toEquivalentAddressGroup()),
eq(lbAuthority(2)));
inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
lbRequestObservers.poll();
assertEquals(1, fakeOobChannels.size());
assertFalse(fakeOobChannels.peek().isShutdown());
// Simulate that the stream on the third LB closed without error. It is treated
// as an error.
lbResponseObserver.onCompleted();
assertTrue(fakeOobChannels.poll().isShutdown());
// Loop back to the first LB addr, which still fails.
inOrder.verify(helper).createOobChannel(
eq(grpclbResolutionList.get(0).toEquivalentAddressGroup()),
eq(lbAuthority(0)));
inOrder.verify(helper).updatePicker(isA(ErrorPicker.class));
assertEquals(2, fakeOobChannels.size());
assertTrue(fakeOobChannels.poll().isShutdown());
// Will move on to second LB addr
inOrder.verify(helper).createOobChannel(
eq(grpclbResolutionList.get(1).toEquivalentAddressGroup()),
eq(lbAuthority(1)));
inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
assertEquals(1, fakeOobChannels.size());
assertFalse(fakeOobChannels.peek().isShutdown());
// Finally it works.
lbResponseObserver.onNext(buildInitialResponse());
List<InetSocketAddress> backends = Arrays.asList(
new InetSocketAddress("127.0.0.1", 2000),
new InetSocketAddress("127.0.0.1", 2010));
lbResponseObserver.onNext(buildLbResponse(backends));
inOrder.verify(helper).createSubchannel(
eq(new EquivalentAddressGroup(backends.get(0))), any(Attributes.class));
inOrder.verify(helper).createSubchannel(
eq(new EquivalentAddressGroup(backends.get(1))), any(Attributes.class));
inOrder.verify(helper).updatePicker(same(GrpclbLoadBalancer2.BUFFER_PICKER));
inOrder.verifyNoMoreInteractions();
}
private void deliverSubchannelState(
final Subchannel subchannel, final ConnectivityStateInfo newState) {
channelExecutor.execute(new Runnable() {
@Override
public void run() {
balancer.handleSubchannelState(subchannel, newState);
}
});
}
private void deliverNameResolutionError(final Status error) {
channelExecutor.execute(new Runnable() {
@Override
public void run() {
balancer.handleNameResolutionError(error);
}
});
}
private void deliverResolvedAddresses(
final List<ResolvedServerInfoGroup> addrs, final Attributes attrs) {
channelExecutor.execute(new Runnable() {
@Override
public void run() {
balancer.handleResolvedAddresses(addrs, attrs);
}
});
}
private static List<ResolvedServerInfoGroup> createResolvedServerInfoGroupList(boolean ... isLb) {
ArrayList<ResolvedServerInfoGroup> list = new ArrayList<ResolvedServerInfoGroup>();
for (int i = 0; i < isLb.length; i++) {
SocketAddress addr = new FakeSocketAddress("fake-address-" + i);
ResolvedServerInfoGroup serverInfoGroup = ResolvedServerInfoGroup
.builder(isLb[i] ? Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, lbAuthority(i))
.build()
: Attributes.EMPTY)
.add(new ResolvedServerInfo(addr))
.build();
list.add(serverInfoGroup);
}
return list;
}
private static String lbAuthority(int i) {
return "lb" + i + ".google.com";
}
private static LoadBalanceResponse buildInitialResponse() {
return LoadBalanceResponse.newBuilder().setInitialResponse(
InitialLoadBalanceResponse.getDefaultInstance())
.build();
}
private static LoadBalanceResponse buildLbResponse(List<InetSocketAddress> addrs) {
ServerList.Builder serverListBuilder = ServerList.newBuilder();
for (InetSocketAddress addr : addrs) {
if (addr != null) {
serverListBuilder.addServers(Server.newBuilder()
.setIpAddress(ByteString.copyFrom(addr.getAddress().getAddress()))
.setPort(addr.getPort())
.build());
} else {
serverListBuilder.addServers(Server.newBuilder().setDropRequest(true).build());
}
}
return LoadBalanceResponse.newBuilder()
.setServerList(serverListBuilder.build())
.build();
}
private static void assertRoundRobinList(RoundRobinPicker picker, Subchannel ... subchannels) {
assertEquals(subchannels.length, picker.list.size());
for (int i = 0; i < subchannels.length; i++) {
Subchannel subchannel = subchannels[i];
if (subchannel == null) {
assertSame("list[" + i + "] should be drop",
GrpclbLoadBalancer2.THROTTLED_RESULT, picker.list.get(i));
} else {
assertEquals("list[" + i + "] should be Subchannel",
subchannel, picker.list.get(i).getSubchannel());
}
}
}
private static class FakeSocketAddress extends SocketAddress {
final String name;
FakeSocketAddress(String name) {
this.name = name;
}
@Override
public String toString() {
return "FakeSocketAddress-" + name;
}
@Override
public boolean equals(Object other) {
if (other instanceof FakeSocketAddress) {
FakeSocketAddress otherAddr = (FakeSocketAddress) other;
return name.equals(otherAddr.name);
}
return false;
}
@Override
public int hashCode() {
return name.hashCode();
}
}
}