grpclb: remove unnecessary support for lb delegation

This commit is contained in:
Carl Mastrangelo 2018-07-24 13:00:49 -07:00 committed by GitHub
parent 8da06a8bc4
commit 72179e22a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 33 additions and 383 deletions

View File

@ -16,7 +16,6 @@
package io.grpc.grpclb;
import io.grpc.Attributes;
import io.grpc.ExperimentalApi;
import io.grpc.Metadata;
@ -25,20 +24,6 @@ import io.grpc.Metadata;
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1782")
public final class GrpclbConstants {
/**
* The load-balancing policy designated by the naming system.
*/
public enum LbPolicy {
PICK_FIRST,
ROUND_ROBIN,
GRPCLB
}
/**
* An attribute of a name resolution result, designating the LB policy.
*/
public static final Attributes.Key<LbPolicy> ATTR_LB_POLICY =
Attributes.Key.create("io.grpc.grpclb.lbPolicy");
/**
* The opaque token given by the remote balancer for each returned server address. The client

View File

@ -24,7 +24,6 @@ import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.Status;
import io.grpc.grpclb.GrpclbConstants.LbPolicy;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.LogId;
@ -35,8 +34,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
@ -46,46 +44,32 @@ import javax.annotation.Nullable;
* or round-robin balancer.
*/
class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
private static final Logger logger = Logger.getLogger(GrpclbLoadBalancer.class.getName());
private final LogId logId = LogId.allocate(getClass().getName());
private final Helper helper;
private final SubchannelPool subchannelPool;
private final Factory pickFirstBalancerFactory;
private final Factory roundRobinBalancerFactory;
private final ObjectPool<ScheduledExecutorService> timerServicePool;
private final TimeProvider time;
private final BackoffPolicy.Provider backoffPolicyProvider;
// All mutable states in this class are mutated ONLY from Channel Executor
private ScheduledExecutorService timerService;
// If not null, all work is delegated to it.
@Nullable
private LoadBalancer delegate;
private LbPolicy lbPolicy;
// Null if lbPolicy != GRPCLB
@Nullable
private GrpclbState grpclbState;
GrpclbLoadBalancer(Helper helper, SubchannelPool subchannelPool, Factory pickFirstBalancerFactory,
Factory roundRobinBalancerFactory, ObjectPool<ScheduledExecutorService> timerServicePool,
TimeProvider time, BackoffPolicy.Provider backoffPolicyProvider) {
this.helper = checkNotNull(helper, "helper");
this.pickFirstBalancerFactory =
checkNotNull(pickFirstBalancerFactory, "pickFirstBalancerFactory");
this.roundRobinBalancerFactory =
checkNotNull(roundRobinBalancerFactory, "roundRobinBalancerFactory");
GrpclbLoadBalancer(
Helper helper,
SubchannelPool subchannelPool,
ObjectPool<ScheduledExecutorService> timerServicePool,
TimeProvider time,
BackoffPolicy.Provider backoffPolicyProvider) {
checkNotNull(helper, "helper");
this.timerServicePool = checkNotNull(timerServicePool, "timerServicePool");
this.timerService = checkNotNull(timerServicePool.getObject(), "timerService");
this.time = checkNotNull(time, "time provider");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
checkNotNull(time, "time provider");
checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool");
this.subchannelPool.init(helper, timerService);
setLbPolicy(LbPolicy.GRPCLB);
grpclbState =
new GrpclbState(helper, subchannelPool, time, timerService, backoffPolicyProvider, logId);
}
@Override
@ -95,19 +79,14 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) {
if (delegate != null) {
delegate.handleSubchannelState(subchannel, newState);
return;
}
if (grpclbState != null) {
grpclbState.handleSubchannelState(subchannel, newState);
}
// grpclbState should never be null here since handleSubchannelState cannot be called while the
// lb is shutdown.
grpclbState.handleSubchannelState(subchannel, newState);
}
@Override
public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> updatedServers, Attributes attributes) {
LbPolicy newLbPolicy = attributes.get(GrpclbConstants.ATTR_LB_POLICY);
// LB addresses and backend addresses are treated separately
List<LbAddressGroup> newLbAddressGroups = new ArrayList<LbAddressGroup>();
List<EquivalentAddressGroup> newBackendServers = new ArrayList<EquivalentAddressGroup>();
@ -122,65 +101,10 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
newLbAddressGroups = Collections.unmodifiableList(newLbAddressGroups);
newBackendServers = Collections.unmodifiableList(newBackendServers);
if (!newLbAddressGroups.isEmpty()) {
if (newLbPolicy != LbPolicy.GRPCLB) {
newLbPolicy = LbPolicy.GRPCLB;
logger.log(
Level.FINE, "[{0}] Switching to GRPCLB because there is at least one balancer", logId);
}
}
if (newLbPolicy == null) {
logger.log(Level.FINE, "[{0}] New config missing policy. Using PICK_FIRST", logId);
newLbPolicy = LbPolicy.PICK_FIRST;
}
// Switch LB policy if requested
setLbPolicy(newLbPolicy);
// Consume the new addresses
switch (lbPolicy) {
case PICK_FIRST:
case ROUND_ROBIN:
checkNotNull(delegate, "delegate should not be null. newLbPolicy=" + newLbPolicy);
delegate.handleResolvedAddressGroups(newBackendServers, attributes);
break;
case GRPCLB:
grpclbState.handleAddresses(newLbAddressGroups, newBackendServers);
break;
default:
// Do nothing
}
}
private void setLbPolicy(LbPolicy newLbPolicy) {
if (newLbPolicy != lbPolicy) {
resetStates();
switch (newLbPolicy) {
case PICK_FIRST:
delegate = checkNotNull(pickFirstBalancerFactory.newLoadBalancer(helper),
"pickFirstBalancerFactory.newLoadBalancer()");
break;
case ROUND_ROBIN:
delegate = checkNotNull(roundRobinBalancerFactory.newLoadBalancer(helper),
"roundRobinBalancerFactory.newLoadBalancer()");
break;
case GRPCLB:
grpclbState = new GrpclbState(
helper, subchannelPool, time, timerService, backoffPolicyProvider, logId);
break;
default:
// Do nohting
}
}
lbPolicy = newLbPolicy;
grpclbState.handleAddresses(newLbAddressGroups, newBackendServers);
}
private void resetStates() {
if (delegate != null) {
delegate.shutdown();
delegate = null;
}
if (grpclbState != null) {
grpclbState.shutdown();
grpclbState = null;
@ -195,9 +119,6 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
@Override
public void handleNameResolutionError(Status error) {
if (delegate != null) {
delegate.handleNameResolutionError(error);
}
if (grpclbState != null) {
grpclbState.propagateError(error);
}
@ -208,14 +129,4 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
GrpclbState getGrpclbState() {
return grpclbState;
}
@VisibleForTesting
LoadBalancer getDelegate() {
return delegate;
}
@VisibleForTesting
LbPolicy getLbPolicy() {
return lbPolicy;
}
}

View File

@ -18,12 +18,10 @@ package io.grpc.grpclb;
import io.grpc.ExperimentalApi;
import io.grpc.LoadBalancer;
import io.grpc.PickFirstBalancerFactory;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.TimeProvider;
import io.grpc.util.RoundRobinLoadBalancerFactory;
/**
* A factory for {@link LoadBalancer}s that uses the GRPCLB protocol.
@ -46,8 +44,7 @@ public class GrpclbLoadBalancerFactory extends LoadBalancer.Factory {
@Override
public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
return new GrpclbLoadBalancer(
helper, new CachedSubchannelPool(), PickFirstBalancerFactory.getInstance(),
RoundRobinLoadBalancerFactory.getInstance(),
helper, new CachedSubchannelPool(),
// TODO(zhangkun83): balancer sends load reporting RPCs from it, which also involves
// channelExecutor thus may also run other tasks queued in the channelExecutor. If such
// load should not be on the shared scheduled executor, we should use a combination of the

View File

@ -54,7 +54,6 @@ import io.grpc.ClientStreamTracer;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
@ -63,7 +62,6 @@ import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.grpclb.GrpclbConstants.LbPolicy;
import io.grpc.grpclb.GrpclbState.BackendEntry;
import io.grpc.grpclb.GrpclbState.DropEntry;
import io.grpc.grpclb.GrpclbState.ErrorEntry;
@ -158,14 +156,6 @@ public class GrpclbLoadBalancerTest {
private final SerializingExecutor channelExecutor =
new SerializingExecutor(MoreExecutors.directExecutor());
@Mock
private LoadBalancer.Factory pickFirstBalancerFactory;
@Mock
private LoadBalancer pickFirstBalancer;
@Mock
private LoadBalancer.Factory roundRobinBalancerFactory;
@Mock
private LoadBalancer roundRobinBalancer;
@Mock
private ObjectPool<ScheduledExecutorService> timerServicePool;
@Mock
private BackoffPolicy.Provider backoffPolicyProvider;
@ -179,10 +169,6 @@ public class GrpclbLoadBalancerTest {
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
when(pickFirstBalancerFactory.newLoadBalancer(any(Helper.class)))
.thenReturn(pickFirstBalancer);
when(roundRobinBalancerFactory.newLoadBalancer(any(Helper.class)))
.thenReturn(roundRobinBalancer);
mockLbService = mock(LoadBalancerGrpc.LoadBalancerImplBase.class, delegatesTo(
new LoadBalancerGrpc.LoadBalancerImplBase() {
@Override
@ -258,9 +244,11 @@ public class GrpclbLoadBalancerTest {
when(backoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L);
when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
balancer = new GrpclbLoadBalancer(
helper, subchannelPool, pickFirstBalancerFactory, roundRobinBalancerFactory,
helper,
subchannelPool,
timerServicePool,
timeProvider, backoffPolicyProvider);
timeProvider,
backoffPolicyProvider);
verify(subchannelPool).init(same(helper), same(timerService));
}
@ -399,8 +387,7 @@ public class GrpclbLoadBalancerTest {
long loadReportIntervalMillis = 1983;
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
// Fallback timer is started as soon as address is resolved.
@ -625,8 +612,7 @@ public class GrpclbLoadBalancerTest {
when(args.getHeaders()).thenReturn(headers);
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertEquals(1, fakeOobChannels.size());
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
@ -656,8 +642,7 @@ public class GrpclbLoadBalancerTest {
when(args.getHeaders()).thenReturn(headers);
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertEquals(1, fakeOobChannels.size());
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
@ -734,11 +719,6 @@ public class GrpclbLoadBalancerTest {
Attributes resolutionAttrs = Attributes.newBuilder().set(RESOLUTION_ATTR, "yeah").build();
deliverResolvedAddresses(resolvedServers, resolutionAttrs);
verify(pickFirstBalancerFactory).newLoadBalancer(helper);
verify(pickFirstBalancer).handleResolvedAddressGroups(eq(resolvedServers), eq(resolutionAttrs));
verifyNoMoreInteractions(roundRobinBalancerFactory);
verifyNoMoreInteractions(roundRobinBalancer);
}
@Test
@ -754,62 +734,11 @@ public class GrpclbLoadBalancerTest {
List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(true);
EquivalentAddressGroup eag = resolvedServers.get(0);
Attributes resolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
Attributes resolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(resolvedServers, resolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
assertNull(balancer.getDelegate());
verify(helper).createOobChannel(eq(eag), eq(lbAuthority(0)));
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
verifyNoMoreInteractions(pickFirstBalancerFactory);
verifyNoMoreInteractions(pickFirstBalancer);
verifyNoMoreInteractions(roundRobinBalancerFactory);
verifyNoMoreInteractions(roundRobinBalancer);
}
@Test
public void delegatingPickFirstThenNameResolutionFails() {
List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(false);
Attributes resolutionAttrs = Attributes.newBuilder().set(RESOLUTION_ATTR, "yeah").build();
deliverResolvedAddresses(resolvedServers, resolutionAttrs);
verify(pickFirstBalancerFactory).newLoadBalancer(helper);
verify(pickFirstBalancer).handleResolvedAddressGroups(eq(resolvedServers), eq(resolutionAttrs));
// Then let name resolution fail. The error will be passed directly to the delegate.
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);
verify(pickFirstBalancer).handleNameResolutionError(error);
verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
verifyNoMoreInteractions(roundRobinBalancerFactory);
verifyNoMoreInteractions(roundRobinBalancer);
}
@Test
public void delegatingRoundRobinThenNameResolutionFails() {
List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(false, false);
Attributes resolutionAttrs = Attributes.newBuilder()
.set(RESOLUTION_ATTR, "yeah")
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.ROUND_ROBIN)
.build();
deliverResolvedAddresses(resolvedServers, resolutionAttrs);
verify(roundRobinBalancerFactory).newLoadBalancer(helper);
verify(roundRobinBalancer).handleResolvedAddressGroups(resolvedServers, resolutionAttrs);
// Then let name resolution fail. The error will be passed directly to the delegate.
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);
verify(roundRobinBalancer).handleNameResolutionError(error);
verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
verifyNoMoreInteractions(pickFirstBalancerFactory);
verifyNoMoreInteractions(pickFirstBalancer);
}
@Test
@ -817,12 +746,9 @@ public class GrpclbLoadBalancerTest {
InOrder inOrder = inOrder(helper, subchannelPool);
// Go to GRPCLB first
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
assertNull(balancer.getDelegate());
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
@ -856,165 +782,13 @@ public class GrpclbLoadBalancerTest {
any(Attributes.class));
}
@SuppressWarnings("unchecked")
@Test
public void switchPolicy() {
// Go to GRPCLB first
List<EquivalentAddressGroup> grpclbResolutionList =
createResolvedServerAddresses(true, false);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
assertNull(balancer.getDelegate());
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
// Switch to PICK_FIRST
List<EquivalentAddressGroup> pickFirstResolutionList =
createResolvedServerAddresses(false, false);
Attributes pickFirstResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.PICK_FIRST).build();
verify(pickFirstBalancerFactory, never()).newLoadBalancer(any(Helper.class));
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver, never()).onCompleted();
assertFalse(oobChannel.isShutdown());
deliverResolvedAddresses(pickFirstResolutionList, pickFirstResolutionAttrs);
verify(pickFirstBalancerFactory).newLoadBalancer(same(helper));
// Only non-LB addresses are passed to the delegate
verify(pickFirstBalancer).handleResolvedAddressGroups(
eq(pickFirstResolutionList), same(pickFirstResolutionAttrs));
assertSame(LbPolicy.PICK_FIRST, balancer.getLbPolicy());
assertSame(pickFirstBalancer, balancer.getDelegate());
// GRPCLB connection is closed
verify(lbRequestObserver).onCompleted();
assertTrue(oobChannel.isShutdown());
// Switching away from GRPCLB will clear the subchannelPool
verify(subchannelPool).clear();
// Switch to ROUND_ROBIN
List<EquivalentAddressGroup> roundRobinResolutionList =
createResolvedServerAddresses(false, false, false);
Attributes roundRobinResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.ROUND_ROBIN).build();
verify(roundRobinBalancerFactory, never()).newLoadBalancer(any(Helper.class));
deliverResolvedAddresses(roundRobinResolutionList, roundRobinResolutionAttrs);
verify(roundRobinBalancerFactory).newLoadBalancer(same(helper));
// Only non-LB addresses are passed to the delegate
verify(roundRobinBalancer).handleResolvedAddressGroups(
eq(roundRobinResolutionList), same(roundRobinResolutionAttrs));
assertSame(LbPolicy.ROUND_ROBIN, balancer.getLbPolicy());
assertSame(roundRobinBalancer, balancer.getDelegate());
// Special case: if at least one address is loadbalancer, use GRPCLB no matter what the
// NameResolver says.
grpclbResolutionList = createResolvedServerAddresses(true, false, true, false);
grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.PICK_FIRST).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
assertNull(balancer.getDelegate());
EquivalentAddressGroup combinedEag = new EquivalentAddressGroup(
Arrays.asList(
grpclbResolutionList.get(0).getAddresses().get(0),
grpclbResolutionList.get(2).getAddresses().get(0)),
lbAttributes(lbAuthority(0)));
verify(helper).createOobChannel(eq(combinedEag), eq(lbAuthority(0)));
assertEquals(1, fakeOobChannels.size());
oobChannel = fakeOobChannels.poll();
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
// Special case: PICK_FIRST is the default
pickFirstResolutionList = createResolvedServerAddresses(false, false, false);
pickFirstResolutionAttrs = Attributes.EMPTY;
verify(pickFirstBalancerFactory).newLoadBalancer(any(Helper.class));
assertFalse(oobChannel.isShutdown());
deliverResolvedAddresses(pickFirstResolutionList, pickFirstResolutionAttrs);
verify(pickFirstBalancerFactory, times(2)).newLoadBalancer(same(helper));
// Only non-LB addresses are passed to the delegate
verify(pickFirstBalancer).handleResolvedAddressGroups(
eq(pickFirstResolutionList), same(pickFirstResolutionAttrs));
assertSame(LbPolicy.PICK_FIRST, balancer.getLbPolicy());
assertSame(pickFirstBalancer, balancer.getDelegate());
// GRPCLB connection is closed
assertTrue(oobChannel.isShutdown());
// Switching away from GRPCLB will clear the subchannelPool
verify(subchannelPool, times(2)).clear();
}
@Test
public void resetGrpclbWhenSwitchingAwayFromGrpclb() {
InOrder inOrder = inOrder(helper, subchannelPool);
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
assertNull(balancer.getDelegate());
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
// Simulate receiving LB response
List<ServerEntry> backends = Arrays.asList(new ServerEntry("127.0.0.1", 2000, "token0001"));
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends));
inOrder.verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends.get(0).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
assertEquals(1, mockSubchannels.size());
Subchannel subchannel = mockSubchannels.poll();
verify(subchannel).requestConnection();
// Switch to round-robin. GRPCLB streams and connections should be closed.
List<EquivalentAddressGroup> roundRobinResolutionList =
createResolvedServerAddresses(false, false, false);
Attributes roundRobinResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.ROUND_ROBIN).build();
verify(lbRequestObserver, never()).onCompleted();
verify(subchannelPool, never()).returnSubchannel(same(subchannel));
assertFalse(oobChannel.isShutdown());
deliverResolvedAddresses(roundRobinResolutionList, roundRobinResolutionAttrs);
verify(lbRequestObserver).onCompleted();
verify(subchannelPool).returnSubchannel(same(subchannel));
assertTrue(oobChannel.isShutdown());
assertTrue(oobChannel.isTerminated());
assertSame(LbPolicy.ROUND_ROBIN, balancer.getLbPolicy());
assertSame(roundRobinBalancer, balancer.getDelegate());
assertNull(balancer.getGrpclbState());
}
@Test
public void grpclbUpdatedAddresses_avoidsReconnect() {
List<EquivalentAddressGroup> grpclbResolutionList =
createResolvedServerAddresses(true, false);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
ManagedChannel oobChannel = fakeOobChannels.poll();
assertEquals(1, lbRequestObservers.size());
@ -1034,11 +808,9 @@ public class GrpclbLoadBalancerTest {
public void grpclbUpdatedAddresses_reconnectOnAuthorityChange() {
List<EquivalentAddressGroup> grpclbResolutionList =
createResolvedServerAddresses(true, false);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
ManagedChannel oobChannel = fakeOobChannels.poll();
assertEquals(1, lbRequestObservers.size());
@ -1058,15 +830,12 @@ public class GrpclbLoadBalancerTest {
public void grpclbWorking() {
InOrder inOrder = inOrder(helper, subchannelPool);
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
// Fallback timer is started as soon as the addresses are resolved.
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
assertNull(balancer.getDelegate());
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
@ -1273,11 +1042,9 @@ public class GrpclbLoadBalancerTest {
// Create a resolution list with a mixture of balancer and backend addresses
List<EquivalentAddressGroup> resolutionList =
createResolvedServerAddresses(false, true, false);
Attributes resolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
Attributes resolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(resolutionList, resolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0)));
// Attempted to connect to balancer
@ -1334,7 +1101,6 @@ public class GrpclbLoadBalancerTest {
fallbackTestVerifyUseOfFallbackBackendLists(
inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2)));
assertNull(balancer.getDelegate());
assertFalse(oobChannel.isShutdown());
verify(lbRequestObserver, never()).onCompleted();
}
@ -1344,7 +1110,6 @@ public class GrpclbLoadBalancerTest {
////////////////////////////////////////////////////////
resolutionList = createResolvedServerAddresses(true, true);
deliverResolvedAddresses(resolutionList, resolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
// New addresses are updated to the OobChannel
inOrder.verify(helper).updateOobChannelAddresses(
@ -1366,7 +1131,6 @@ public class GrpclbLoadBalancerTest {
//////////////////////////////////////////////////
resolutionList = createResolvedServerAddresses(true, false, false);
deliverResolvedAddresses(resolutionList, resolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
// New LB address is updated to the OobChannel
inOrder.verify(helper).updateOobChannelAddresses(
@ -1416,7 +1180,6 @@ public class GrpclbLoadBalancerTest {
///////////////////////////////////////////////////////////////
resolutionList = createResolvedServerAddresses(true, false);
deliverResolvedAddresses(resolutionList, resolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
// Will not affect the round robin list at all
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
@ -1449,11 +1212,9 @@ public class GrpclbLoadBalancerTest {
// Create a resolution list with a mixture of balancer and backend addresses
List<EquivalentAddressGroup> resolutionList =
createResolvedServerAddresses(false, true, false);
Attributes resolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
Attributes resolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(resolutionList, resolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0)));
// Attempted to connect to balancer
@ -1609,12 +1370,9 @@ public class GrpclbLoadBalancerTest {
new FakeSocketAddress("fake-address-3")),
lbAttributes("fake-authority-1")); // Supporting multiple authorities would be good, one day
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
assertNull(balancer.getDelegate());
verify(helper).createOobChannel(goldenOobChannelEag, "fake-authority-1");
}
@ -1628,8 +1386,7 @@ public class GrpclbLoadBalancerTest {
InOrder inOrder =
inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertEquals(1, fakeOobChannels.size());