grpclb: skip picker updates that have no effect (#2876)

Each time helper.updatePicker() is called, the Channel will re-process
all pending streams with the new picker.  If the old picker is
equivalent to the old one, it's wasteful.

This is also needed to make our internal integration test easier.
Because the load-balancer may send address list that is identical to the
previous one, just to update the TTL.  Without this change, new picker
replaces the old picker even if they carry the same list, which
effectively resets the round-robin pointer.  This causes a little
imbalance between test backends, resulting in test failure.
This commit is contained in:
Kun Zhang 2017-04-05 09:43:05 -07:00 committed by GitHub
parent 1c1864be73
commit 123bb315e9
2 changed files with 44 additions and 20 deletions

View File

@ -133,6 +133,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
private Map<EquivalentAddressGroup, Subchannel> subchannels = Collections.emptyMap(); private Map<EquivalentAddressGroup, Subchannel> subchannels = Collections.emptyMap();
private List<RoundRobinEntry> roundRobinList = Collections.emptyList(); private List<RoundRobinEntry> roundRobinList = Collections.emptyList();
private SubchannelPicker currentPicker = BUFFER_PICKER;
GrpclbLoadBalancer(Helper helper, Factory pickFirstBalancerFactory, GrpclbLoadBalancer(Helper helper, Factory pickFirstBalancerFactory,
Factory roundRobinBalancerFactory) { Factory roundRobinBalancerFactory) {
@ -162,7 +163,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
subchannel.requestConnection(); subchannel.requestConnection();
} }
subchannel.getAttributes().get(STATE_INFO).set(newState); subchannel.getAttributes().get(STATE_INFO).set(newState);
helper.updatePicker(makePicker()); maybeUpdatePicker();
} }
@Override @Override
@ -306,7 +307,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
logger.log(Level.FINE, "[{0}] Had an error: {1}; roundRobinList={2}", logger.log(Level.FINE, "[{0}] Had an error: {1}; roundRobinList={2}",
new Object[] {logId, status, roundRobinList}); new Object[] {logId, status, roundRobinList});
if (roundRobinList.isEmpty()) { if (roundRobinList.isEmpty()) {
helper.updatePicker(new ErrorPicker(status)); maybeUpdatePicker(new ErrorPicker(status));
} }
} }
@ -385,7 +386,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
subchannels = newSubchannelMap; subchannels = newSubchannelMap;
roundRobinList = newRoundRobinList; roundRobinList = newRoundRobinList;
helper.updatePicker(makePicker()); maybeUpdatePicker();
} }
@Override public void onError(final Throwable error) { @Override public void onError(final Throwable error) {
@ -421,9 +422,10 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
} }
/** /**
* Make a picker out of the current roundRobinList and the states of subchannels. * Make and use a picker out of the current roundRobinList and the states of subchannels if they
* have changed since the last picker created.
*/ */
private SubchannelPicker makePicker() { private void maybeUpdatePicker() {
List<RoundRobinEntry> resultList = new ArrayList<RoundRobinEntry>(); List<RoundRobinEntry> resultList = new ArrayList<RoundRobinEntry>();
Status error = null; Status error = null;
for (RoundRobinEntry entry : roundRobinList) { for (RoundRobinEntry entry : roundRobinList) {
@ -445,17 +447,38 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
if (error != null) { if (error != null) {
logger.log(Level.FINE, "[{0}] No ready Subchannel. Using error: {1}", logger.log(Level.FINE, "[{0}] No ready Subchannel. Using error: {1}",
new Object[] {logId, error}); new Object[] {logId, error});
return new ErrorPicker(error); maybeUpdatePicker(new ErrorPicker(error));
} else { } else {
logger.log(Level.FINE, "[{0}] No ready Subchannel and no error", logId); logger.log(Level.FINE, "[{0}] No ready Subchannel and no error", logId);
return BUFFER_PICKER; maybeUpdatePicker(BUFFER_PICKER);
} }
} else { } else {
logger.log(Level.FINE, "[{0}] Using list {1}", new Object[] {logId, resultList}); logger.log(Level.FINE, "[{0}] Using list {1}", new Object[] {logId, resultList});
return new RoundRobinPicker(resultList); maybeUpdatePicker(new RoundRobinPicker(resultList));
} }
} }
/**
* Update the given picker to the helper if it's different from the current one.
*/
private void maybeUpdatePicker(SubchannelPicker picker) {
// Discard the new picker if we are sure it won't make any difference, in order to save
// re-processing pending streams, and avoid unnecessary resetting of the pointer in
// RoundRobinPicker.
if (picker == BUFFER_PICKER && currentPicker == BUFFER_PICKER) {
return;
}
if (picker instanceof RoundRobinPicker && currentPicker instanceof RoundRobinPicker) {
if (((RoundRobinPicker) picker).list.equals(((RoundRobinPicker) currentPicker).list)) {
return;
}
}
// No need to skip ErrorPicker. If the current picker is ErrorPicker, there won't be any pending
// stream thus no time is wasted in re-process.
currentPicker = picker;
helper.updatePicker(picker);
}
@VisibleForTesting @VisibleForTesting
LoadBalancer getDelegate() { LoadBalancer getDelegate() {
return delegate; return delegate;

View File

@ -91,6 +91,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import org.junit.After; import org.junit.After;
@ -579,12 +580,9 @@ public class GrpclbLoadBalancerTest {
assertEquals(new EquivalentAddressGroup(backends1.get(0).addr), subchannel1.getAddresses()); assertEquals(new EquivalentAddressGroup(backends1.get(0).addr), subchannel1.getAddresses());
assertEquals(new EquivalentAddressGroup(backends1.get(1).addr), subchannel2.getAddresses()); assertEquals(new EquivalentAddressGroup(backends1.get(1).addr), subchannel2.getAddresses());
// Before any subchannel is READY, a buffer picker will be provided
inOrder.verify(helper).updatePicker(same(GrpclbLoadBalancer.BUFFER_PICKER));
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verify(helper, times(2)).updatePicker(same(GrpclbLoadBalancer.BUFFER_PICKER)); inOrder.verifyNoMoreInteractions();
// Let subchannels be connected // Let subchannels be connected
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
@ -608,16 +606,12 @@ public class GrpclbLoadBalancerTest {
assertThat(picker3.list).containsExactly(new RoundRobinEntry(subchannel2, "token0002")); assertThat(picker3.list).containsExactly(new RoundRobinEntry(subchannel2, "token0002"));
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verify(helper).updatePicker(pickerCaptor.capture()); inOrder.verifyNoMoreInteractions();
RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker4.list).containsExactly(new RoundRobinEntry(subchannel2, "token0002"));
// As long as there is at least one READY subchannel, round robin will work. // As long as there is at least one READY subchannel, round robin will work.
Status error1 = Status.UNAVAILABLE.withDescription("error1"); Status error1 = Status.UNAVAILABLE.withDescription("error1");
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error1)); deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error1));
inOrder.verify(helper).updatePicker(pickerCaptor.capture()); inOrder.verifyNoMoreInteractions();
RoundRobinPicker picker5 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker5.list).containsExactly(new RoundRobinEntry(subchannel2, "token0002"));
// If no subchannel is READY, will propagate an error from an arbitrary subchannel (but here // If no subchannel is READY, will propagate an error from an arbitrary subchannel (but here
// only subchannel1 has error). // only subchannel1 has error).
@ -678,8 +672,14 @@ public class GrpclbLoadBalancerTest {
GrpclbLoadBalancer.DROP_ENTRY, GrpclbLoadBalancer.DROP_ENTRY,
new RoundRobinEntry(subchannel2, "token0004"), new RoundRobinEntry(subchannel2, "token0004"),
new RoundRobinEntry(subchannel3, "token0005")).inOrder(); new RoundRobinEntry(subchannel3, "token0005")).inOrder();
verify(subchannel3, never()).shutdown(); verify(subchannel3, never()).shutdown();
// Update backends, with no entry
lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList()));
verify(subchannel2).shutdown();
verify(subchannel3).shutdown();
inOrder.verify(helper).updatePicker((GrpclbLoadBalancer.BUFFER_PICKER));
assertFalse(oobChannel.isShutdown()); assertFalse(oobChannel.isShutdown());
assertEquals(1, lbRequestObservers.size()); assertEquals(1, lbRequestObservers.size());
verify(lbRequestObservers.peek(), never()).onCompleted(); verify(lbRequestObservers.peek(), never()).onCompleted();
@ -754,6 +754,8 @@ public class GrpclbLoadBalancerTest {
// Finally it works. // Finally it works.
lbResponseObserver.onNext(buildInitialResponse()); lbResponseObserver.onNext(buildInitialResponse());
verify(helper, never()).createSubchannel(
any(EquivalentAddressGroup.class), any(Attributes.class));
List<ServerEntry> backends = Arrays.asList( List<ServerEntry> backends = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token001"), new ServerEntry("127.0.0.1", 2000, "token001"),
new ServerEntry("127.0.0.1", 2010, "token002")); new ServerEntry("127.0.0.1", 2010, "token002"));
@ -762,7 +764,6 @@ public class GrpclbLoadBalancerTest {
eq(new EquivalentAddressGroup(backends.get(0).addr)), any(Attributes.class)); eq(new EquivalentAddressGroup(backends.get(0).addr)), any(Attributes.class));
inOrder.verify(helper).createSubchannel( inOrder.verify(helper).createSubchannel(
eq(new EquivalentAddressGroup(backends.get(1).addr)), any(Attributes.class)); eq(new EquivalentAddressGroup(backends.get(1).addr)), any(Attributes.class));
inOrder.verify(helper).updatePicker(same(GrpclbLoadBalancer.BUFFER_PICKER));
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
} }