diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java index e7c2c94c1a..aa902a851a 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java @@ -133,6 +133,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { private Map subchannels = Collections.emptyMap(); private List roundRobinList = Collections.emptyList(); + private SubchannelPicker currentPicker = BUFFER_PICKER; GrpclbLoadBalancer(Helper helper, Factory pickFirstBalancerFactory, Factory roundRobinBalancerFactory) { @@ -162,7 +163,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { subchannel.requestConnection(); } subchannel.getAttributes().get(STATE_INFO).set(newState); - helper.updatePicker(makePicker()); + maybeUpdatePicker(); } @Override @@ -306,7 +307,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { logger.log(Level.FINE, "[{0}] Had an error: {1}; roundRobinList={2}", new Object[] {logId, status, roundRobinList}); if (roundRobinList.isEmpty()) { - helper.updatePicker(new ErrorPicker(status)); + maybeUpdatePicker(new ErrorPicker(status)); } } @@ -385,7 +386,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { subchannels = newSubchannelMap; roundRobinList = newRoundRobinList; - helper.updatePicker(makePicker()); + maybeUpdatePicker(); } @Override public void onError(final Throwable error) { @@ -421,9 +422,10 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { } /** - * Make a picker out of the current roundRobinList and the states of subchannels. + * Make and use a picker out of the current roundRobinList and the states of subchannels if they + * have changed since the last picker created. */ - private SubchannelPicker makePicker() { + private void maybeUpdatePicker() { List resultList = new ArrayList(); Status error = null; for (RoundRobinEntry entry : roundRobinList) { @@ -445,17 +447,38 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { if (error != null) { logger.log(Level.FINE, "[{0}] No ready Subchannel. Using error: {1}", new Object[] {logId, error}); - return new ErrorPicker(error); + maybeUpdatePicker(new ErrorPicker(error)); } else { logger.log(Level.FINE, "[{0}] No ready Subchannel and no error", logId); - return BUFFER_PICKER; + maybeUpdatePicker(BUFFER_PICKER); } } else { logger.log(Level.FINE, "[{0}] Using list {1}", new Object[] {logId, resultList}); - return new RoundRobinPicker(resultList); + maybeUpdatePicker(new RoundRobinPicker(resultList)); } } + /** + * Update the given picker to the helper if it's different from the current one. + */ + private void maybeUpdatePicker(SubchannelPicker picker) { + // Discard the new picker if we are sure it won't make any difference, in order to save + // re-processing pending streams, and avoid unnecessary resetting of the pointer in + // RoundRobinPicker. + if (picker == BUFFER_PICKER && currentPicker == BUFFER_PICKER) { + return; + } + if (picker instanceof RoundRobinPicker && currentPicker instanceof RoundRobinPicker) { + if (((RoundRobinPicker) picker).list.equals(((RoundRobinPicker) currentPicker).list)) { + return; + } + } + // No need to skip ErrorPicker. If the current picker is ErrorPicker, there won't be any pending + // stream thus no time is wasted in re-process. + currentPicker = picker; + helper.updatePicker(picker); + } + @VisibleForTesting LoadBalancer getDelegate() { return delegate; diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 7b7544f9a9..c74af14968 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -91,6 +91,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import org.junit.After; @@ -579,12 +580,9 @@ public class GrpclbLoadBalancerTest { assertEquals(new EquivalentAddressGroup(backends1.get(0).addr), subchannel1.getAddresses()); assertEquals(new EquivalentAddressGroup(backends1.get(1).addr), subchannel2.getAddresses()); - // Before any subchannel is READY, a buffer picker will be provided - inOrder.verify(helper).updatePicker(same(GrpclbLoadBalancer.BUFFER_PICKER)); - deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); - inOrder.verify(helper, times(2)).updatePicker(same(GrpclbLoadBalancer.BUFFER_PICKER)); + inOrder.verifyNoMoreInteractions(); // Let subchannels be connected deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); @@ -608,16 +606,12 @@ public class GrpclbLoadBalancerTest { assertThat(picker3.list).containsExactly(new RoundRobinEntry(subchannel2, "token0002")); deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); - inOrder.verify(helper).updatePicker(pickerCaptor.capture()); - RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker4.list).containsExactly(new RoundRobinEntry(subchannel2, "token0002")); + inOrder.verifyNoMoreInteractions(); // As long as there is at least one READY subchannel, round robin will work. Status error1 = Status.UNAVAILABLE.withDescription("error1"); deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error1)); - inOrder.verify(helper).updatePicker(pickerCaptor.capture()); - RoundRobinPicker picker5 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker5.list).containsExactly(new RoundRobinEntry(subchannel2, "token0002")); + inOrder.verifyNoMoreInteractions(); // If no subchannel is READY, will propagate an error from an arbitrary subchannel (but here // only subchannel1 has error). @@ -678,8 +672,14 @@ public class GrpclbLoadBalancerTest { GrpclbLoadBalancer.DROP_ENTRY, new RoundRobinEntry(subchannel2, "token0004"), new RoundRobinEntry(subchannel3, "token0005")).inOrder(); - verify(subchannel3, never()).shutdown(); + + // Update backends, with no entry + lbResponseObserver.onNext(buildLbResponse(Collections.emptyList())); + verify(subchannel2).shutdown(); + verify(subchannel3).shutdown(); + inOrder.verify(helper).updatePicker((GrpclbLoadBalancer.BUFFER_PICKER)); + assertFalse(oobChannel.isShutdown()); assertEquals(1, lbRequestObservers.size()); verify(lbRequestObservers.peek(), never()).onCompleted(); @@ -754,6 +754,8 @@ public class GrpclbLoadBalancerTest { // Finally it works. lbResponseObserver.onNext(buildInitialResponse()); + verify(helper, never()).createSubchannel( + any(EquivalentAddressGroup.class), any(Attributes.class)); List backends = Arrays.asList( new ServerEntry("127.0.0.1", 2000, "token001"), new ServerEntry("127.0.0.1", 2010, "token002")); @@ -762,7 +764,6 @@ public class GrpclbLoadBalancerTest { eq(new EquivalentAddressGroup(backends.get(0).addr)), any(Attributes.class)); inOrder.verify(helper).createSubchannel( eq(new EquivalentAddressGroup(backends.get(1).addr)), any(Attributes.class)); - inOrder.verify(helper).updatePicker(same(GrpclbLoadBalancer.BUFFER_PICKER)); inOrder.verifyNoMoreInteractions(); }