diff --git a/core/src/main/java/io/grpc/internal/ChannelExecutor.java b/core/src/main/java/io/grpc/internal/ChannelExecutor.java index 5de3b56f8c..1c92f56461 100644 --- a/core/src/main/java/io/grpc/internal/ChannelExecutor.java +++ b/core/src/main/java/io/grpc/internal/ChannelExecutor.java @@ -31,6 +31,8 @@ package io.grpc.internal; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.common.annotations.VisibleForTesting; import java.util.LinkedList; @@ -97,7 +99,7 @@ final class ChannelExecutor { */ ChannelExecutor executeLater(Runnable runnable) { synchronized (lock) { - queue.add(runnable); + queue.add(checkNotNull(runnable, "runnable is null")); } return this; } diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java new file mode 100644 index 0000000000..535f4f0ab4 --- /dev/null +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java @@ -0,0 +1,65 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.grpclb; + +import io.grpc.Attributes; +import io.grpc.ExperimentalApi; + +/** + * Constants for the GRPCLB load-balancer. + */ +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1782") +public final class GrpclbConstants { + /** + * The load-balancing policy designated by the naming system. + */ + public enum LbPolicy { + PICK_FIRST, + ROUND_ROBIN, + GRPCLB + } + + /** + * An attribute of a name resolution result, designating the LB policy. + */ + public static final Attributes.Key ATTR_LB_POLICY = + Attributes.Key.of("io.grpc.grpclb.lbPolicy"); + + /** + * The naming authority of an LB server address. It is an address-group-level attribute, present + * when the address group is a LoadBalancer. + */ + public static final Attributes.Key ATTR_LB_ADDR_AUTHORITY = + Attributes.Key.of("io.grpc.grpclb.lbAddrAuthority"); + + private GrpclbConstants() { } +} diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer2.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer2.java new file mode 100644 index 0000000000..f13910c486 --- /dev/null +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer2.java @@ -0,0 +1,500 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.grpclb; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static io.grpc.ConnectivityState.IDLE; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.SHUTDOWN; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; + +import com.google.common.annotations.VisibleForTesting; + +import io.grpc.Attributes; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer2; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.ResolvedServerInfoGroup; +import io.grpc.Status; +import io.grpc.grpclb.GrpclbConstants.LbPolicy; +import io.grpc.internal.LogId; +import io.grpc.internal.WithLogId; +import io.grpc.stub.StreamObserver; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** + * A {@link LoadBalancer2} that uses the GRPCLB protocol. + * + *

Optionally, when requested by the naming system, will delegate the work to a local pick-first + * or round-robin balancer. + */ +class GrpclbLoadBalancer2 extends LoadBalancer2 implements WithLogId { + private static final Logger logger = Logger.getLogger(GrpclbLoadBalancer2.class.getName()); + + @VisibleForTesting + static final SubchannelPicker BUFFER_PICKER = new SubchannelPicker() { + @Override + public PickResult pickSubchannel(Attributes affinity, Metadata headers) { + return PickResult.withNoResult(); + } + }; + + private final LogId logId = LogId.allocate(getClass().getName()); + + private final String serviceName; + private final Helper helper; + private final Factory pickFirstBalancerFactory; + private final Factory roundRobinBalancerFactory; + + private static final Attributes.Key> STATE_INFO = + Attributes.Key.of("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo"); + @VisibleForTesting + static final PickResult THROTTLED_RESULT = + PickResult.withError(Status.UNAVAILABLE.withDescription("Throttled by LB")); + + // All mutable states in this class are mutated ONLY from Channel Executor + + /////////////////////////////////////////////////////////////////////////////// + // General states. + /////////////////////////////////////////////////////////////////////////////// + + // If not null, all work is delegated to it. + @Nullable + private LoadBalancer2 delegate; + private LbPolicy lbPolicy; + + /////////////////////////////////////////////////////////////////////////////// + // GRPCLB states, valid only if lbPolicy == GRPCLB + /////////////////////////////////////////////////////////////////////////////// + + // null if there isn't any available LB addresses. + // If non-null, never empty. + @Nullable + private List lbAddressGroups; + @Nullable + private ManagedChannel lbCommChannel; + // Points to the position of the LB address that lbCommChannel is bound to, if + // lbCommChannel != null. + private int currentLbIndex; + @Nullable + private LbResponseObserver lbResponseObserver; + @Nullable + private StreamObserver lbRequestWriter; + private Map subchannels = Collections.emptyMap(); + // A null element indicate a simulated error for throttling purpose + private List roundRobinList = Collections.emptyList(); + + GrpclbLoadBalancer2(Helper helper, Factory pickFirstBalancerFactory, + Factory roundRobinBalancerFactory) { + this.helper = checkNotNull(helper, "helper"); + this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority"); + this.pickFirstBalancerFactory = + checkNotNull(pickFirstBalancerFactory, "pickFirstBalancerFactory"); + this.roundRobinBalancerFactory = + checkNotNull(roundRobinBalancerFactory, "roundRobinBalancerFactory"); + } + + @Override + public LogId getLogId() { + return logId; + } + + @Override + public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) { + if (delegate != null) { + delegate.handleSubchannelState(subchannel, newState); + return; + } + if (newState.getState() == SHUTDOWN || !(subchannels.values().contains(subchannel))) { + return; + } + if (newState.getState() == IDLE) { + subchannel.requestConnection(); + } + subchannel.getAttributes().get(STATE_INFO).set(newState); + helper.updatePicker(makePicker()); + } + + @Override + public void handleResolvedAddresses(List updatedServers, + Attributes attributes) { + LbPolicy newLbPolicy = attributes.get(GrpclbConstants.ATTR_LB_POLICY); + // LB addresses and backend addresses are treated separately + List newLbAddressGroups = new ArrayList(); + List newBackendServerInfoGroups = + new ArrayList(); + for (ResolvedServerInfoGroup serverInfoGroup : updatedServers) { + String lbAddrAuthority = serverInfoGroup.getAttributes().get( + GrpclbConstants.ATTR_LB_ADDR_AUTHORITY); + EquivalentAddressGroup eag = serverInfoGroup.toEquivalentAddressGroup(); + if (lbAddrAuthority != null) { + newLbAddressGroups.add(new LbAddressGroup(eag, lbAddrAuthority)); + } else { + newBackendServerInfoGroups.add(serverInfoGroup); + } + } + + if (newBackendServerInfoGroups.isEmpty()) { + // handleResolvedAddresses()'s javadoc has guaranteed updatedServers is never empty. + checkState(!newLbAddressGroups.isEmpty(), + "No backend address nor LB address. updatedServers=%s", updatedServers); + if (newLbPolicy != LbPolicy.GRPCLB) { + newLbPolicy = LbPolicy.GRPCLB; + logger.log(Level.FINE, "[{0}] Switching to GRPCLB because all addresses are balancers", + logId); + } + } + if (newLbPolicy == null) { + logger.log(Level.FINE, "[{0}] New config missing policy. Using PICK_FIRST", logId); + newLbPolicy = LbPolicy.PICK_FIRST; + } + + // Switch LB policy if requested + if (newLbPolicy != lbPolicy) { + shutdownDelegate(); + shutdownLbComm(); + lbAddressGroups = null; + currentLbIndex = 0; + switch (newLbPolicy) { + case PICK_FIRST: + delegate = checkNotNull(pickFirstBalancerFactory.newLoadBalancer(helper), + "pickFirstBalancerFactory.newLoadBalancer()"); + break; + case ROUND_ROBIN: + delegate = checkNotNull(roundRobinBalancerFactory.newLoadBalancer(helper), + "roundRobinBalancerFactory.newLoadBalancer()"); + break; + default: + // Do nohting + } + } + lbPolicy = newLbPolicy; + + // Consume the new addresses + switch (lbPolicy) { + case PICK_FIRST: + case ROUND_ROBIN: + checkNotNull(delegate, "delegate should not be null. newLbPolicy=" + newLbPolicy); + delegate.handleResolvedAddresses(newBackendServerInfoGroups, attributes); + break; + case GRPCLB: + if (newLbAddressGroups.isEmpty()) { + shutdownLbComm(); + lbAddressGroups = null; + handleGrpclbError(Status.UNAVAILABLE.withDescription( + "NameResolver returned no LB address while asking for GRPCLB")); + } else { + // See if the currently used LB server is in the new list. + int newIndexOfCurrentLb = -1; + if (lbAddressGroups != null) { + LbAddressGroup currentLb = lbAddressGroups.get(currentLbIndex); + newIndexOfCurrentLb = newLbAddressGroups.indexOf(currentLb); + } + lbAddressGroups = newLbAddressGroups; + if (newIndexOfCurrentLb == -1) { + shutdownLbComm(); + currentLbIndex = 0; + startLbComm(); + } else { + // Current LB is still in the list, calibrate index. + currentLbIndex = newIndexOfCurrentLb; + } + } + break; + default: + // Do nothing + } + } + + private void shutdownLbComm() { + if (lbCommChannel != null) { + lbCommChannel.shutdown(); + lbCommChannel = null; + } + if (lbRequestWriter != null) { + lbRequestWriter.onCompleted(); + lbRequestWriter = null; + } + if (lbResponseObserver != null) { + lbResponseObserver.dismissed = true; + lbResponseObserver = null; + } + } + + private void startLbComm() { + checkState(lbCommChannel == null, "previous lbCommChannel has not been closed yet"); + checkState(lbRequestWriter == null, "previous lbRequestWriter has not been cleared yet"); + checkState(lbResponseObserver == null, "previous lbResponseObserver has not been cleared yet"); + LbAddressGroup currentLb = lbAddressGroups.get(currentLbIndex); + lbCommChannel = helper.createOobChannel(currentLb.getAddresses(), currentLb.getAuthority()); + LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(lbCommChannel); + lbResponseObserver = new LbResponseObserver(); + lbRequestWriter = stub.balanceLoad(lbResponseObserver); + + LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder() + .setInitialRequest(InitialLoadBalanceRequest.newBuilder() + .setName(helper.getAuthority()).build()) + .build(); + lbRequestWriter.onNext(initRequest); + } + + private void shutdownDelegate() { + if (delegate != null) { + delegate.shutdown(); + delegate = null; + } + } + + @Override + public void shutdown() { + shutdownDelegate(); + shutdownLbComm(); + for (Subchannel subchannel : subchannels.values()) { + subchannel.shutdown(); + } + subchannels = Collections.emptyMap(); + } + + private void handleGrpclbError(Status status) { + logger.log(Level.FINE, "[{0}] Had an error: {1}; roundRobinList={2}", + new Object[] {logId, status, roundRobinList}); + if (roundRobinList.isEmpty()) { + helper.updatePicker(new ErrorPicker(status)); + } + } + + @Override + public void handleNameResolutionError(Status error) { + if (delegate != null) { + delegate.handleNameResolutionError(error); + } else { + handleGrpclbError(error); + } + } + + private class LbResponseObserver implements StreamObserver { + boolean dismissed; + + @Override public void onNext(final LoadBalanceResponse response) { + helper.runSerialized(new Runnable() { + @Override + public void run() { + handleResponse(response); + } + }); + } + + private void handleResponse(LoadBalanceResponse response) { + if (dismissed) { + return; + } + logger.log(Level.FINE, "[{0}] Got an LB response: {1}", new Object[] {logId, response}); + // TODO(zhangkun83): make use of initialResponse + // InitialLoadBalanceResponse initialResponse = response.getInitialResponse(); + ServerList serverList = response.getServerList(); + HashMap newSubchannelMap = + new HashMap(); + List newRoundRobinList = new ArrayList(); + // TODO(zhangkun83): honor expiration_interval + // Construct the new collections. Create new Subchannels when necessary. + for (Server server : serverList.getServersList()) { + if (server.getDropRequest()) { + newRoundRobinList.add(null); + } else { + InetSocketAddress address; + try { + address = new InetSocketAddress( + InetAddress.getByAddress(server.getIpAddress().toByteArray()), server.getPort()); + } catch (UnknownHostException e) { + handleGrpclbError(Status.UNAVAILABLE.withCause(e)); + continue; + } + EquivalentAddressGroup eag = new EquivalentAddressGroup(address); + // TODO(zhangkun83): save the LB token and insert it to the application RPCs' headers. + if (!newSubchannelMap.containsKey(eag)) { + Attributes subchannelAttrs = Attributes.newBuilder() + .set(STATE_INFO, + new AtomicReference( + ConnectivityStateInfo.forNonError(IDLE))) + .build(); + Subchannel subchannel = helper.createSubchannel(eag, subchannelAttrs); + subchannel.requestConnection(); + newSubchannelMap.put(eag, subchannel); + } + newRoundRobinList.add(eag); + } + } + // Close Subchannels whose addresses have been delisted + for (Entry entry : subchannels.entrySet()) { + EquivalentAddressGroup eag = entry.getKey(); + if (!newSubchannelMap.containsKey(eag)) { + entry.getValue().shutdown(); + } + } + + subchannels = newSubchannelMap; + roundRobinList = newRoundRobinList; + helper.updatePicker(makePicker()); + } + + @Override public void onError(final Throwable error) { + helper.runSerialized(new Runnable() { + @Override + public void run() { + handleStreamClosed(Status.fromThrowable(error) + .augmentDescription("Stream to GRPCLB LoadBalancer had an error")); + } + }); + } + + @Override public void onCompleted() { + helper.runSerialized(new Runnable() { + @Override + public void run() { + handleStreamClosed(Status.UNAVAILABLE.augmentDescription( + "Stream to GRPCLB LoadBalancer was closed")); + } + }); + } + + private void handleStreamClosed(Status status) { + if (dismissed) { + return; + } + lbRequestWriter = null; + handleGrpclbError(status); + shutdownLbComm(); + currentLbIndex = (currentLbIndex + 1) % lbAddressGroups.size(); + startLbComm(); + } + } + + /** + * Make a picker out of the current roundRobinList and the states of subchannels. + */ + private SubchannelPicker makePicker() { + List resultList = new ArrayList(); + Status error = null; + for (EquivalentAddressGroup eag : roundRobinList) { + if (eag == null) { + resultList.add(THROTTLED_RESULT); + } else { + Subchannel subchannel = subchannels.get(eag); + checkNotNull(subchannel, "Subchannel for %s not found", eag); + Attributes attrs = subchannel.getAttributes(); + ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get(); + if (stateInfo.getState() == READY) { + resultList.add(PickResult.withSubchannel(subchannel)); + } else if (stateInfo.getState() == TRANSIENT_FAILURE) { + error = stateInfo.getStatus(); + } + } + } + if (resultList.isEmpty()) { + if (error != null) { + logger.log(Level.FINE, "[{0}] No ready Subchannel. Using error: {1}", + new Object[] {logId, error}); + return new ErrorPicker(error); + } else { + logger.log(Level.FINE, "[{0}] No ready Subchannel and no error", logId); + return BUFFER_PICKER; + } + } else { + logger.log(Level.FINE, "[{0}] Using list {1}", new Object[] {logId, resultList}); + return new RoundRobinPicker(resultList); + } + } + + @VisibleForTesting + LoadBalancer2 getDelegate() { + return delegate; + } + + @VisibleForTesting + LbPolicy getLbPolicy() { + return lbPolicy; + } + + @VisibleForTesting + static final class ErrorPicker extends SubchannelPicker { + final PickResult result; + + ErrorPicker(Status status) { + result = PickResult.withError(status); + } + + @Override + public PickResult pickSubchannel(Attributes affinity, Metadata headers) { + return result; + } + } + + @VisibleForTesting + static final class RoundRobinPicker extends SubchannelPicker { + final List list; + int index; + + RoundRobinPicker(List resultList) { + checkArgument(!resultList.isEmpty(), "resultList is empty"); + list = resultList; + } + + @Override + public PickResult pickSubchannel(Attributes affinity, Metadata headers) { + synchronized (list) { + PickResult result = list.get(index); + index++; + if (index == list.size()) { + index = 0; + } + return result; + } + } + } +} diff --git a/grpclb/src/main/java/io/grpc/grpclb/LbAddressGroup.java b/grpclb/src/main/java/io/grpc/grpclb/LbAddressGroup.java new file mode 100644 index 0000000000..1fb75c445f --- /dev/null +++ b/grpclb/src/main/java/io/grpc/grpclb/LbAddressGroup.java @@ -0,0 +1,71 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.grpclb; + +import static com.google.common.base.Preconditions.checkNotNull; + +import io.grpc.EquivalentAddressGroup; + +/** + * Represents a balancer address entry. + */ +class LbAddressGroup { + private final EquivalentAddressGroup addresses; + private final String authority; + + LbAddressGroup(EquivalentAddressGroup addresses, String authority) { + this.addresses = checkNotNull(addresses, "addresses"); + this.authority = checkNotNull(authority, "authority"); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof LbAddressGroup)) { + return false; + } + LbAddressGroup otherGroup = (LbAddressGroup) other; + return addresses.equals(otherGroup.addresses) && authority.equals(otherGroup.authority); + } + + @Override + public int hashCode() { + return addresses.hashCode(); + } + + EquivalentAddressGroup getAddresses() { + return addresses; + } + + String getAuthority() { + return authority; + } +} diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancer2Test.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancer2Test.java new file mode 100644 index 0000000000..996e6fdbfe --- /dev/null +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancer2Test.java @@ -0,0 +1,843 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.grpclb; + +import static com.google.common.base.Charsets.UTF_8; +import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.IDLE; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.SHUTDOWN; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.io.ByteStreams; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; + +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer2.Helper; +import io.grpc.LoadBalancer2.PickResult; +import io.grpc.LoadBalancer2.Subchannel; +import io.grpc.LoadBalancer2.SubchannelPicker; +import io.grpc.LoadBalancer2; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor.Marshaller; +import io.grpc.MethodDescriptor; +import io.grpc.ResolvedServerInfo; +import io.grpc.ResolvedServerInfoGroup; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.grpclb.GrpclbConstants.LbPolicy; +import io.grpc.grpclb.GrpclbLoadBalancer2.ErrorPicker; +import io.grpc.grpclb.GrpclbLoadBalancer2.RoundRobinPicker; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.SerializingExecutor; +import io.grpc.stub.ClientCalls; +import io.grpc.stub.StreamObserver; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +/** Unit tests for {@link GrpclbLoadBalancer2}. */ +@RunWith(JUnit4.class) +public class GrpclbLoadBalancer2Test { + private static final Attributes.Key RESOLUTION_ATTR = + Attributes.Key.of("resolution-attr"); + private static final String SERVICE_AUTHORITY = "api.google.com"; + + private static final MethodDescriptor TRASH_METHOD = MethodDescriptor.create( + MethodDescriptor.MethodType.UNARY, "/service/trashmethod", + new StringMarshaller(), new StringMarshaller()); + + private static class StringMarshaller implements Marshaller { + static final StringMarshaller INSTANCE = new StringMarshaller(); + + @Override + public InputStream stream(String value) { + return new ByteArrayInputStream(value.getBytes(UTF_8)); + } + + @Override + public String parse(InputStream stream) { + try { + return new String(ByteStreams.toByteArray(stream), UTF_8); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + } + + @Mock + private Helper helper; + @Mock + private Subchannel mockSubchannel; + private LoadBalancerGrpc.LoadBalancerImplBase mockLbService; + @Captor + private ArgumentCaptor> lbResponseObserverCaptor; + private final LinkedList> lbRequestObservers = + new LinkedList>(); + private final LinkedList mockSubchannels = new LinkedList(); + private final LinkedList fakeOobChannels = new LinkedList(); + private final ArrayList subchannelTracker = new ArrayList(); + private final ArrayList oobChannelTracker = new ArrayList(); + private final ArrayList failingLbAuthorities = new ArrayList(); + private io.grpc.Server fakeLbServer; + @Captor + private ArgumentCaptor pickerCaptor; + private final SerializingExecutor channelExecutor = + new SerializingExecutor(MoreExecutors.directExecutor()); + private final Metadata headers = new Metadata(); + @Mock + private LoadBalancer2.Factory pickFirstBalancerFactory; + @Mock + private LoadBalancer2 pickFirstBalancer; + @Mock + private LoadBalancer2.Factory roundRobinBalancerFactory; + @Mock + private LoadBalancer2 roundRobinBalancer; + private GrpclbLoadBalancer2 balancer; + + @SuppressWarnings("unchecked") + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + when(pickFirstBalancerFactory.newLoadBalancer(any(Helper.class))) + .thenReturn(pickFirstBalancer); + when(roundRobinBalancerFactory.newLoadBalancer(any(Helper.class))) + .thenReturn(roundRobinBalancer); + mockLbService = spy(new LoadBalancerGrpc.LoadBalancerImplBase() { + @Override + public StreamObserver balanceLoad( + final StreamObserver responseObserver) { + StreamObserver requestObserver = + (StreamObserver) mock(StreamObserver.class); + Answer closeRpc = new Answer() { + @Override + public Void answer(InvocationOnMock invocation) { + responseObserver.onCompleted(); + return null; + } + }; + doAnswer(closeRpc).when(requestObserver).onCompleted(); + lbRequestObservers.add(requestObserver); + return requestObserver; + } + }); + fakeLbServer = InProcessServerBuilder.forName("fakeLb") + .directExecutor().addService(mockLbService).build().start(); + doAnswer(new Answer() { + @Override + public ManagedChannel answer(InvocationOnMock invocation) throws Throwable { + String authority = (String) invocation.getArguments()[1]; + ManagedChannel channel; + if (failingLbAuthorities.contains(authority)) { + channel = InProcessChannelBuilder.forName("nonExistFakeLb").directExecutor().build(); + } else { + channel = InProcessChannelBuilder.forName("fakeLb").directExecutor().build(); + } + // TODO(zhangkun83): #2444: non-determinism of Channel due to starting NameResolver on the + // timer "Prime" it before use. Remove it after #2444 is resolved. + try { + ClientCalls.blockingUnaryCall(channel, TRASH_METHOD, CallOptions.DEFAULT, "trash"); + } catch (StatusRuntimeException ignored) { + // Ignored + } + fakeOobChannels.add(channel); + oobChannelTracker.add(channel); + return channel; + } + }).when(helper).createOobChannel(any(EquivalentAddressGroup.class), any(String.class)); + doAnswer(new Answer() { + @Override + public Subchannel answer(InvocationOnMock invocation) throws Throwable { + Subchannel subchannel = mock(Subchannel.class); + EquivalentAddressGroup eag = (EquivalentAddressGroup) invocation.getArguments()[0]; + Attributes attrs = (Attributes) invocation.getArguments()[1]; + when(subchannel.getAddresses()).thenReturn(eag); + when(subchannel.getAttributes()).thenReturn(attrs); + mockSubchannels.add(subchannel); + subchannelTracker.add(subchannel); + return subchannel; + } + }).when(helper).createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Runnable task = (Runnable) invocation.getArguments()[0]; + channelExecutor.execute(task); + return null; + } + }).when(helper).runSerialized(any(Runnable.class)); + when(helper.getAuthority()).thenReturn(SERVICE_AUTHORITY); + balancer = new GrpclbLoadBalancer2(helper, pickFirstBalancerFactory, roundRobinBalancerFactory); + } + + @After + public void tearDown() { + try { + if (balancer != null) { + channelExecutor.execute(new Runnable() { + @Override + public void run() { + balancer.shutdown(); + } + }); + } + for (ManagedChannel channel : oobChannelTracker) { + assertTrue(channel + " is shutdown", channel.isShutdown()); + // balancer should have closed the LB stream, terminating the OOB channel. + assertTrue(channel + " is terminated", channel.isTerminated()); + } + for (Subchannel subchannel: subchannelTracker) { + verify(subchannel).shutdown(); + } + } finally { + if (fakeLbServer != null) { + fakeLbServer.shutdownNow(); + } + } + } + + @Test + public void errorPicker() { + Status error = Status.UNAVAILABLE.withDescription("Just don't know why"); + ErrorPicker picker = new ErrorPicker(error); + assertSame(error, picker.pickSubchannel(Attributes.EMPTY, headers).getStatus()); + } + + @Test + public void roundRobinPicker() { + PickResult pr1 = PickResult.withError(Status.UNAVAILABLE.withDescription("Just error")); + PickResult pr2 = PickResult.withSubchannel(mockSubchannel); + List list = Arrays.asList(pr1, pr2); + RoundRobinPicker picker = new RoundRobinPicker(list); + assertSame(pr1, picker.pickSubchannel(Attributes.EMPTY, headers)); + assertSame(pr2, picker.pickSubchannel(Attributes.EMPTY, headers)); + assertSame(pr1, picker.pickSubchannel(Attributes.EMPTY, headers)); + } + + @Test + public void bufferPicker() { + assertEquals(PickResult.withNoResult(), + GrpclbLoadBalancer2.BUFFER_PICKER.pickSubchannel(Attributes.EMPTY, headers)); + } + + @Test + public void nameResolutionFailsThenRecoverToDelegate() { + Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); + deliverNameResolutionError(error); + verify(helper).updatePicker(pickerCaptor.capture()); + ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue(); + assertSame(error, errorPicker.result.getStatus()); + + // Recover with a subsequent success + List resolvedServers = createResolvedServerInfoGroupList(false); + EquivalentAddressGroup eag = resolvedServers.get(0).toEquivalentAddressGroup(); + + Attributes resolutionAttrs = Attributes.newBuilder().set(RESOLUTION_ATTR, "yeah").build(); + deliverResolvedAddresses(resolvedServers, resolutionAttrs); + + verify(pickFirstBalancerFactory).newLoadBalancer(helper); + verify(pickFirstBalancer).handleResolvedAddresses(eq(resolvedServers), eq(resolutionAttrs)); + verifyNoMoreInteractions(roundRobinBalancerFactory); + verifyNoMoreInteractions(roundRobinBalancer); + } + + @Test + public void nameResolutionFailsThenRecoverToGrpclb() { + Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); + deliverNameResolutionError(error); + verify(helper).updatePicker(pickerCaptor.capture()); + ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue(); + assertSame(error, errorPicker.result.getStatus()); + + // Recover with a subsequent success + List resolvedServers = createResolvedServerInfoGroupList(true); + EquivalentAddressGroup eag = resolvedServers.get(0).toEquivalentAddressGroup(); + + Attributes resolutionAttrs = Attributes.newBuilder() + .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + deliverResolvedAddresses(resolvedServers, resolutionAttrs); + + assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); + assertNull(balancer.getDelegate()); + verify(helper).createOobChannel(eq(eag), eq(lbAuthority(0))); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + + verifyNoMoreInteractions(pickFirstBalancerFactory); + verifyNoMoreInteractions(pickFirstBalancer); + verifyNoMoreInteractions(roundRobinBalancerFactory); + verifyNoMoreInteractions(roundRobinBalancer); + } + + @Test + public void delegatingPickFirstThenNameResolutionFails() { + List resolvedServers = createResolvedServerInfoGroupList(false); + + Attributes resolutionAttrs = Attributes.newBuilder().set(RESOLUTION_ATTR, "yeah").build(); + deliverResolvedAddresses(resolvedServers, resolutionAttrs); + + verify(pickFirstBalancerFactory).newLoadBalancer(helper); + verify(pickFirstBalancer).handleResolvedAddresses(eq(resolvedServers), eq(resolutionAttrs)); + + // Then let name resolution fail. The error will be passed directly to the delegate. + Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); + deliverNameResolutionError(error); + verify(pickFirstBalancer).handleNameResolutionError(error); + verify(helper, never()).updatePicker(any(SubchannelPicker.class)); + verifyNoMoreInteractions(roundRobinBalancerFactory); + verifyNoMoreInteractions(roundRobinBalancer); + } + + @Test + public void delegatingRoundRobinThenNameResolutionFails() { + List resolvedServers = createResolvedServerInfoGroupList(false, false); + + Attributes resolutionAttrs = Attributes.newBuilder() + .set(RESOLUTION_ATTR, "yeah") + .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.ROUND_ROBIN) + .build(); + deliverResolvedAddresses(resolvedServers, resolutionAttrs); + + verify(roundRobinBalancerFactory).newLoadBalancer(helper); + verify(roundRobinBalancer).handleResolvedAddresses(resolvedServers, resolutionAttrs); + + // Then let name resolution fail. The error will be passed directly to the delegate. + Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); + deliverNameResolutionError(error); + verify(roundRobinBalancer).handleNameResolutionError(error); + verify(helper, never()).updatePicker(any(SubchannelPicker.class)); + verifyNoMoreInteractions(pickFirstBalancerFactory); + verifyNoMoreInteractions(pickFirstBalancer); + } + + @Test + public void grpclbThenNameResolutionFails() { + InOrder inOrder = inOrder(helper); + // Go to GRPCLB first + List grpclbResolutionList = + createResolvedServerInfoGroupList(true, true); + Attributes grpclbResolutionAttrs = Attributes.newBuilder() + .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + + assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); + assertNull(balancer.getDelegate()); + verify(helper).createOobChannel(eq(grpclbResolutionList.get(0).toEquivalentAddressGroup()), + eq(lbAuthority(0))); + assertEquals(1, fakeOobChannels.size()); + ManagedChannel oobChannel = fakeOobChannels.poll(); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + + // Let name resolution fail before round-robin list is ready + Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); + deliverNameResolutionError(error); + + inOrder.verify(helper).updatePicker(pickerCaptor.capture()); + ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue(); + assertSame(error, errorPicker.result.getStatus()); + + // Simulate receiving LB response + List backends = Arrays.asList( + new InetSocketAddress("127.0.0.1", 2000), + new InetSocketAddress("127.0.0.1", 2010)); + verify(helper, never()).runSerialized(any(Runnable.class)); + lbResponseObserver.onNext(buildInitialResponse()); + lbResponseObserver.onNext(buildLbResponse(backends)); + + verify(helper, times(2)).runSerialized(any(Runnable.class)); + inOrder.verify(helper).createSubchannel( + eq(new EquivalentAddressGroup(backends.get(0))), any(Attributes.class)); + inOrder.verify(helper).createSubchannel( + eq(new EquivalentAddressGroup(backends.get(1))), any(Attributes.class)); + } + + @SuppressWarnings("unchecked") + @Test + public void switchPolicy() { + // Go to GRPCLB first + List grpclbResolutionList = + createResolvedServerInfoGroupList(true, false, true); + Attributes grpclbResolutionAttrs = Attributes.newBuilder() + .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + + assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); + assertNull(balancer.getDelegate()); + verify(helper).createOobChannel(eq(grpclbResolutionList.get(0).toEquivalentAddressGroup()), + eq(lbAuthority(0))); + assertEquals(1, fakeOobChannels.size()); + ManagedChannel oobChannel = fakeOobChannels.poll(); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + + // Switch to PICK_FIRST + List pickFirstResolutionList = + createResolvedServerInfoGroupList(true, false, true); + Attributes pickFirstResolutionAttrs = Attributes.newBuilder() + .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.PICK_FIRST).build(); + verify(pickFirstBalancerFactory, never()).newLoadBalancer(any(Helper.class)); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + StreamObserver lbRequestObserver = lbRequestObservers.poll(); + + verify(lbRequestObserver, never()).onCompleted(); + assertFalse(oobChannel.isShutdown()); + deliverResolvedAddresses(pickFirstResolutionList, pickFirstResolutionAttrs); + + verify(pickFirstBalancerFactory).newLoadBalancer(same(helper)); + // Only non-LB addresses are passed to the delegate + verify(pickFirstBalancer).handleResolvedAddresses( + eq(Arrays.asList(pickFirstResolutionList.get(1))), same(pickFirstResolutionAttrs)); + assertSame(LbPolicy.PICK_FIRST, balancer.getLbPolicy()); + assertSame(pickFirstBalancer, balancer.getDelegate()); + // GRPCLB connection is closed + verify(lbRequestObserver).onCompleted(); + assertTrue(oobChannel.isShutdown()); + + // Switch to ROUND_ROBIN + List roundRobinResolutionList = + createResolvedServerInfoGroupList(true, false, false); + Attributes roundRobinResolutionAttrs = Attributes.newBuilder() + .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.ROUND_ROBIN).build(); + verify(roundRobinBalancerFactory, never()).newLoadBalancer(any(Helper.class)); + deliverResolvedAddresses(roundRobinResolutionList, roundRobinResolutionAttrs); + + verify(roundRobinBalancerFactory).newLoadBalancer(same(helper)); + // Only non-LB addresses are passed to the delegate + verify(roundRobinBalancer).handleResolvedAddresses( + eq(roundRobinResolutionList.subList(1, 3)), same(roundRobinResolutionAttrs)); + assertSame(LbPolicy.ROUND_ROBIN, balancer.getLbPolicy()); + assertSame(roundRobinBalancer, balancer.getDelegate()); + + // Special case: if all addresses are loadbalancers, use GRPCLB no matter what the NameResolver + // says. + grpclbResolutionList = createResolvedServerInfoGroupList(true, true, true); + grpclbResolutionAttrs = Attributes.newBuilder() + .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.PICK_FIRST).build(); + deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + + assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); + assertNull(balancer.getDelegate()); + verify(helper, times(2)).createOobChannel( + eq(grpclbResolutionList.get(0).toEquivalentAddressGroup()), + eq(lbAuthority(0))); + verify(helper, times(2)).createOobChannel(any(EquivalentAddressGroup.class), any(String.class)); + assertEquals(1, fakeOobChannels.size()); + oobChannel = fakeOobChannels.poll(); + verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); + + // Special case: PICK_FIRST is the default + pickFirstResolutionList = createResolvedServerInfoGroupList(true, false, false); + pickFirstResolutionAttrs = Attributes.EMPTY; + verify(pickFirstBalancerFactory).newLoadBalancer(any(Helper.class)); + assertFalse(oobChannel.isShutdown()); + deliverResolvedAddresses(pickFirstResolutionList, pickFirstResolutionAttrs); + + verify(pickFirstBalancerFactory, times(2)).newLoadBalancer(same(helper)); + // Only non-LB addresses are passed to the delegate + verify(pickFirstBalancer).handleResolvedAddresses( + eq(pickFirstResolutionList.subList(1, 3)), same(pickFirstResolutionAttrs)); + assertSame(LbPolicy.PICK_FIRST, balancer.getLbPolicy()); + assertSame(pickFirstBalancer, balancer.getDelegate()); + // GRPCLB connection is closed + assertTrue(oobChannel.isShutdown()); + } + + @Test + public void grpclbWorking() { + InOrder inOrder = inOrder(helper); + List grpclbResolutionList = + createResolvedServerInfoGroupList(true, true); + Attributes grpclbResolutionAttrs = Attributes.newBuilder() + .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + + assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); + assertNull(balancer.getDelegate()); + verify(helper).createOobChannel(eq(grpclbResolutionList.get(0).toEquivalentAddressGroup()), + eq(lbAuthority(0))); + assertEquals(1, fakeOobChannels.size()); + ManagedChannel oobChannel = fakeOobChannels.poll(); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + + // Simulate receiving LB response + List backends1 = Arrays.asList( + new InetSocketAddress("127.0.0.1", 2000), + new InetSocketAddress("127.0.0.1", 2010)); + inOrder.verify(helper, never()).updatePicker(any(SubchannelPicker.class)); + lbResponseObserver.onNext(buildInitialResponse()); + lbResponseObserver.onNext(buildLbResponse(backends1)); + + inOrder.verify(helper).createSubchannel( + eq(new EquivalentAddressGroup(backends1.get(0))), any(Attributes.class)); + inOrder.verify(helper).createSubchannel( + eq(new EquivalentAddressGroup(backends1.get(1))), any(Attributes.class)); + assertEquals(2, mockSubchannels.size()); + Subchannel subchannel1 = mockSubchannels.poll(); + Subchannel subchannel2 = mockSubchannels.poll(); + verify(subchannel1).requestConnection(); + verify(subchannel2).requestConnection(); + assertEquals(new EquivalentAddressGroup(backends1.get(0)), subchannel1.getAddresses()); + assertEquals(new EquivalentAddressGroup(backends1.get(1)), subchannel2.getAddresses()); + + // Before any subchannel is READY, a buffer picker will be provided + inOrder.verify(helper).updatePicker(same(GrpclbLoadBalancer2.BUFFER_PICKER)); + + deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); + deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); + inOrder.verify(helper, times(2)).updatePicker(same(GrpclbLoadBalancer2.BUFFER_PICKER)); + + // Let subchannels be connected + deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(helper).updatePicker(pickerCaptor.capture()); + RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); + assertRoundRobinList(picker1, subchannel2); + + deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(helper).updatePicker(pickerCaptor.capture()); + RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); + assertRoundRobinList(picker2, subchannel1, subchannel2); + + // Disconnected subchannels + verify(subchannel1).requestConnection(); + deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE)); + verify(subchannel1, times(2)).requestConnection(); + inOrder.verify(helper).updatePicker(pickerCaptor.capture()); + RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); + assertRoundRobinList(picker3, subchannel2); + + deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); + inOrder.verify(helper).updatePicker(pickerCaptor.capture()); + RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); + assertRoundRobinList(picker4, subchannel2); + + // As long as there is at least one READY subchannel, round robin will work. + Status error1 = Status.UNAVAILABLE.withDescription("error1"); + deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error1)); + inOrder.verify(helper).updatePicker(pickerCaptor.capture()); + RoundRobinPicker picker5 = (RoundRobinPicker) pickerCaptor.getValue(); + assertRoundRobinList(picker5, subchannel2); + + // If no subchannel is READY, will propagate an error from an arbitrary subchannel (but here + // only subchannel1 has error). + verify(subchannel2).requestConnection(); + deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(IDLE)); + verify(subchannel2, times(2)).requestConnection(); + inOrder.verify(helper).updatePicker(pickerCaptor.capture()); + ErrorPicker picker6 = (ErrorPicker) pickerCaptor.getValue(); + assertNull(picker6.result.getSubchannel()); + assertSame(error1, picker6.result.getStatus()); + + // Update backends, with a drop entry + List backends2 = Arrays.asList( + new InetSocketAddress("127.0.0.1", 2030), null); + verify(subchannel1, never()).shutdown(); + verify(subchannel2, never()).shutdown(); + + lbResponseObserver.onNext(buildLbResponse(backends2)); + verify(subchannel1).shutdown(); + verify(subchannel2).shutdown(); + + inOrder.verify(helper).createSubchannel( + eq(new EquivalentAddressGroup(backends2.get(0))), any(Attributes.class)); + assertEquals(1, mockSubchannels.size()); + Subchannel subchannel3 = mockSubchannels.poll(); + verify(subchannel3).requestConnection(); + assertEquals(new EquivalentAddressGroup(backends2.get(0)), subchannel3.getAddresses()); + inOrder.verify(helper).updatePicker(pickerCaptor.capture()); + RoundRobinPicker picker7 = (RoundRobinPicker) pickerCaptor.getValue(); + assertRoundRobinList(picker7, (Subchannel) null); + + // State updates on obsolete subchannels will have no effect + deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(SHUTDOWN)); + deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(SHUTDOWN)); + inOrder.verifyNoMoreInteractions(); + + deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(helper).updatePicker(pickerCaptor.capture()); + RoundRobinPicker picker8 = (RoundRobinPicker) pickerCaptor.getValue(); + assertRoundRobinList(picker8, subchannel3, null); + + verify(subchannel3, never()).shutdown(); + assertFalse(oobChannel.isShutdown()); + assertEquals(1, lbRequestObservers.size()); + verify(lbRequestObservers.peek(), never()).onCompleted(); + verify(lbRequestObservers.peek(), never()).onError(any(Throwable.class)); + } + + @Test + public void grpclbBalanerCommErrors() { + InOrder inOrder = inOrder(helper, mockLbService); + // Make the first LB address fail to connect + failingLbAuthorities.add(lbAuthority(0)); + List grpclbResolutionList = + createResolvedServerInfoGroupList(true, true, true); + + Attributes grpclbResolutionAttrs = Attributes.newBuilder() + .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + + // First LB addr fails to connect + inOrder.verify(helper).createOobChannel( + eq(grpclbResolutionList.get(0).toEquivalentAddressGroup()), + eq(lbAuthority(0))); + inOrder.verify(helper).updatePicker(isA(ErrorPicker.class)); + + assertEquals(2, fakeOobChannels.size()); + assertTrue(fakeOobChannels.poll().isShutdown()); + // Will move on to second LB addr + inOrder.verify(helper).createOobChannel( + eq(grpclbResolutionList.get(1).toEquivalentAddressGroup()), + eq(lbAuthority(1))); + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObservers.poll(); + assertEquals(1, fakeOobChannels.size()); + assertFalse(fakeOobChannels.peek().isShutdown()); + + Status error1 = Status.UNAVAILABLE.withDescription("error1"); + // Simulate that the stream on the second LB failed + lbResponseObserver.onError(error1.asException()); + assertTrue(fakeOobChannels.poll().isShutdown()); + + inOrder.verify(helper).updatePicker(pickerCaptor.capture()); + ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue(); + assertEquals(error1.getCode(), errorPicker.result.getStatus().getCode()); + assertTrue(errorPicker.result.getStatus().getDescription().contains(error1.getDescription())); + // Move on to the third LB. + inOrder.verify(helper).createOobChannel( + eq(grpclbResolutionList.get(2).toEquivalentAddressGroup()), + eq(lbAuthority(2))); + + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObservers.poll(); + assertEquals(1, fakeOobChannels.size()); + assertFalse(fakeOobChannels.peek().isShutdown()); + + // Simulate that the stream on the third LB closed without error. It is treated + // as an error. + lbResponseObserver.onCompleted(); + assertTrue(fakeOobChannels.poll().isShutdown()); + + // Loop back to the first LB addr, which still fails. + inOrder.verify(helper).createOobChannel( + eq(grpclbResolutionList.get(0).toEquivalentAddressGroup()), + eq(lbAuthority(0))); + inOrder.verify(helper).updatePicker(isA(ErrorPicker.class)); + + assertEquals(2, fakeOobChannels.size()); + assertTrue(fakeOobChannels.poll().isShutdown()); + // Will move on to second LB addr + inOrder.verify(helper).createOobChannel( + eq(grpclbResolutionList.get(1).toEquivalentAddressGroup()), + eq(lbAuthority(1))); + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + + assertEquals(1, fakeOobChannels.size()); + assertFalse(fakeOobChannels.peek().isShutdown()); + + // Finally it works. + lbResponseObserver.onNext(buildInitialResponse()); + List backends = Arrays.asList( + new InetSocketAddress("127.0.0.1", 2000), + new InetSocketAddress("127.0.0.1", 2010)); + lbResponseObserver.onNext(buildLbResponse(backends)); + inOrder.verify(helper).createSubchannel( + eq(new EquivalentAddressGroup(backends.get(0))), any(Attributes.class)); + inOrder.verify(helper).createSubchannel( + eq(new EquivalentAddressGroup(backends.get(1))), any(Attributes.class)); + inOrder.verify(helper).updatePicker(same(GrpclbLoadBalancer2.BUFFER_PICKER)); + inOrder.verifyNoMoreInteractions(); + } + + private void deliverSubchannelState( + final Subchannel subchannel, final ConnectivityStateInfo newState) { + channelExecutor.execute(new Runnable() { + @Override + public void run() { + balancer.handleSubchannelState(subchannel, newState); + } + }); + } + + private void deliverNameResolutionError(final Status error) { + channelExecutor.execute(new Runnable() { + @Override + public void run() { + balancer.handleNameResolutionError(error); + } + }); + } + + private void deliverResolvedAddresses( + final List addrs, final Attributes attrs) { + channelExecutor.execute(new Runnable() { + @Override + public void run() { + balancer.handleResolvedAddresses(addrs, attrs); + } + }); + } + + private static List createResolvedServerInfoGroupList(boolean ... isLb) { + ArrayList list = new ArrayList(); + for (int i = 0; i < isLb.length; i++) { + SocketAddress addr = new FakeSocketAddress("fake-address-" + i); + ResolvedServerInfoGroup serverInfoGroup = ResolvedServerInfoGroup + .builder(isLb[i] ? Attributes.newBuilder() + .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, lbAuthority(i)) + .build() + : Attributes.EMPTY) + .add(new ResolvedServerInfo(addr)) + .build(); + list.add(serverInfoGroup); + } + return list; + } + + private static String lbAuthority(int i) { + return "lb" + i + ".google.com"; + } + + private static LoadBalanceResponse buildInitialResponse() { + return LoadBalanceResponse.newBuilder().setInitialResponse( + InitialLoadBalanceResponse.getDefaultInstance()) + .build(); + } + + private static LoadBalanceResponse buildLbResponse(List addrs) { + ServerList.Builder serverListBuilder = ServerList.newBuilder(); + for (InetSocketAddress addr : addrs) { + if (addr != null) { + serverListBuilder.addServers(Server.newBuilder() + .setIpAddress(ByteString.copyFrom(addr.getAddress().getAddress())) + .setPort(addr.getPort()) + .build()); + } else { + serverListBuilder.addServers(Server.newBuilder().setDropRequest(true).build()); + } + } + return LoadBalanceResponse.newBuilder() + .setServerList(serverListBuilder.build()) + .build(); + } + + private static void assertRoundRobinList(RoundRobinPicker picker, Subchannel ... subchannels) { + assertEquals(subchannels.length, picker.list.size()); + for (int i = 0; i < subchannels.length; i++) { + Subchannel subchannel = subchannels[i]; + if (subchannel == null) { + assertSame("list[" + i + "] should be drop", + GrpclbLoadBalancer2.THROTTLED_RESULT, picker.list.get(i)); + } else { + assertEquals("list[" + i + "] should be Subchannel", + subchannel, picker.list.get(i).getSubchannel()); + } + } + } + + private static class FakeSocketAddress extends SocketAddress { + final String name; + + FakeSocketAddress(String name) { + this.name = name; + } + + @Override + public String toString() { + return "FakeSocketAddress-" + name; + } + + @Override + public boolean equals(Object other) { + if (other instanceof FakeSocketAddress) { + FakeSocketAddress otherAddr = (FakeSocketAddress) other; + return name.equals(otherAddr.name); + } + return false; + } + + @Override + public int hashCode() { + return name.hashCode(); + } + } +}