From 1f64ac94a8f3ee827e686742f915b06311772010 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Fri, 8 Nov 2019 08:54:35 -0800 Subject: [PATCH] Revert "grpclb: shuffle pick first index" This reverts commit 1949ebd7ef96fa5bc35258da698a1429c4e61819. The shuffling should be handled by the grpclb server instead. --- .../io/grpc/grpclb/GrpclbLoadBalancer.java | 9 +- .../main/java/io/grpc/grpclb/GrpclbState.java | 16 +- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 166 ++---------------- 3 files changed, 25 insertions(+), 166 deletions(-) diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java index 99c402b898..aa827d296a 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java @@ -38,7 +38,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -52,11 +51,7 @@ import javax.annotation.Nullable; class GrpclbLoadBalancer extends LoadBalancer { private static final Mode DEFAULT_MODE = Mode.ROUND_ROBIN; private static final Logger logger = Logger.getLogger(GrpclbLoadBalancer.class.getName()); - private static final AtomicInteger pickFirstGlobalIndex = new AtomicInteger(); - /** Backend list for pick_first will be picked from this index. */ - // & 0xFFFFFF to avoid overflow - private final int pickFirstIndex = pickFirstGlobalIndex.getAndIncrement() & 0xFFFFFF; private final Helper helper; private final TimeProvider time; private final Stopwatch stopwatch; @@ -172,8 +167,8 @@ class GrpclbLoadBalancer extends LoadBalancer { private void recreateStates() { resetStates(); checkState(grpclbState == null, "Should've been cleared"); - grpclbState = new GrpclbState( - mode, helper, subchannelPool, time, stopwatch, backoffPolicyProvider, pickFirstIndex); + grpclbState = new GrpclbState(mode, helper, subchannelPool, time, stopwatch, + backoffPolicyProvider); } @Override diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 98bbaa9d41..6e44a7eed6 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -104,13 +104,11 @@ final class GrpclbState { } }; - enum Mode { + static enum Mode { ROUND_ROBIN, PICK_FIRST, } - // backend list for pick_first will be picked from this index - private final int pickFirstIndex; private final String serviceName; private final Helper helper; private final SynchronizationContext syncContext; @@ -160,8 +158,7 @@ final class GrpclbState { SubchannelPool subchannelPool, TimeProvider time, Stopwatch stopwatch, - BackoffPolicy.Provider backoffPolicyProvider, - int pickFirstIndex) { + BackoffPolicy.Provider backoffPolicyProvider) { this.mode = checkNotNull(mode, "mode"); this.helper = checkNotNull(helper, "helper"); this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); @@ -173,7 +170,6 @@ final class GrpclbState { this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority"); this.logger = checkNotNull(helper.getChannelLogger(), "logger"); - this.pickFirstIndex = pickFirstIndex; } void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) { @@ -421,14 +417,12 @@ final class GrpclbState { subchannels = Collections.unmodifiableMap(newSubchannelMap); break; case PICK_FIRST: - int numOfEags = newBackendAddrList.size(); - List eagList = Arrays.asList(new EquivalentAddressGroup[numOfEags]); + List eagList = new ArrayList<>(); // Because for PICK_FIRST, we create a single Subchannel for all addresses, we have to // attach the tokens to the EAG attributes and use TokenAttachingLoadRecorder to put them on // headers. // // The PICK_FIRST code path doesn't cache Subchannels. - int offset = pickFirstIndex; for (BackendAddressGroup bag : newBackendAddrList) { EquivalentAddressGroup origEag = bag.getAddresses(); Attributes eagAttrs = origEag.getAttributes(); @@ -436,9 +430,7 @@ final class GrpclbState { eagAttrs = eagAttrs.toBuilder() .set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, bag.getToken()).build(); } - eagList.set( - offset++ % numOfEags, - new EquivalentAddressGroup(origEag.getAddresses(), eagAttrs)); + eagList.add(new EquivalentAddressGroup(origEag.getAddresses(), eagAttrs)); } Subchannel subchannel; if (subchannels.isEmpty()) { diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 3d820c52a2..8ca9d31286 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -186,8 +186,6 @@ public class GrpclbLoadBalancerTest { private io.grpc.Server fakeLbServer; @Captor private ArgumentCaptor pickerCaptor; - @Captor - private ArgumentCaptor> eagsCaptor; @Mock private BackoffPolicy.Provider backoffPolicyProvider; @Mock @@ -1723,11 +1721,10 @@ public class GrpclbLoadBalancerTest { // TODO(zhangkun83): remove the deprecation suppression on this method once migrated to // the new createSubchannel(). inOrder.verify(helper).createSubchannel( - eagsCaptor.capture(), + eq(Arrays.asList( + new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), + new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")))), any(Attributes.class)); - assertThat(eagsCaptor.getValue()).containsExactly( - new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); // Initially IDLE inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); @@ -1780,10 +1777,11 @@ public class GrpclbLoadBalancerTest { // createSubchannel() has ever been called only once verify(helper, times(1)).createSubchannel(any(List.class), any(Attributes.class)); assertThat(mockSubchannels).isEmpty(); - verify(subchannel).updateAddresses(eagsCaptor.capture()); - assertThat(eagsCaptor.getValue()).containsExactly( - new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends2.get(2).addr, eagAttrsWithToken("token0004"))); + verify(subchannel).updateAddresses( + eq(Arrays.asList( + new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")), + new EquivalentAddressGroup(backends2.get(2).addr, + eagAttrsWithToken("token0004"))))); inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(picker4.dropList).containsExactly( @@ -1842,132 +1840,6 @@ public class GrpclbLoadBalancerTest { .isEqualTo(Code.CANCELLED); } - @SuppressWarnings("deprecation") - @Test - public void grpclbWorking_pickFirstMode_expectBackendsShuffled() throws Exception { - InOrder inOrder = inOrder(helper); - String lbConfig = "{\"childPolicy\" : [ {\"pick_first\" : {}} ]}"; - List grpclbResolutionList = createResolvedServerAddresses(true); - Attributes grpclbResolutionAttrs = Attributes.newBuilder().set( - LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig)).build(); - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); - - // Simulate receiving LB response - List backends0 = generateFakeServerEntries(100); - lbResponseObserver.onNext(buildInitialResponse()); - lbResponseObserver.onNext(buildLbResponse(backends0)); - - // TODO(zhangkun83): remove the deprecation suppression on this method once migrated to - // the new createSubchannel(). - inOrder.verify(helper).createSubchannel( - eagsCaptor.capture(), - any(Attributes.class)); - List eags = eagsCaptor.getValue(); - int offset = eags.indexOf(eagOfFakeServerEntry(backends0.get(0))); - assertEagsAreShifted(eags, backends0, offset); - - // Only one subchannel is created - assertThat(mockSubchannels).hasSize(1); - Subchannel subchannel = mockSubchannels.poll(); - - List backends1 = generateFakeServerEntries(1); - lbResponseObserver.onNext(buildLbResponse(backends1)); - verify(subchannel).updateAddresses(eagsCaptor.capture()); - eags = eagsCaptor.getValue(); - assertEagsAreShifted(eags, backends1, offset); - - List backends2 = generateFakeServerEntries(2); - lbResponseObserver.onNext(buildLbResponse(backends2)); - verify(subchannel, times(2)).updateAddresses(eagsCaptor.capture()); - eags = eagsCaptor.getValue(); - assertEagsAreShifted(eags, backends2, offset); - - List backends3 = generateFakeServerEntries(3); - lbResponseObserver.onNext(buildLbResponse(backends3)); - verify(subchannel, times(3)).updateAddresses(eagsCaptor.capture()); - eags = eagsCaptor.getValue(); - assertEagsAreShifted(eags, backends3, offset); - - List allDrop = Collections.singletonList(new ServerEntry("token0004")); - lbResponseObserver.onNext(buildLbResponse(allDrop)); - verify(subchannel, times(4)).updateAddresses(eagsCaptor.capture()); - assertThat(eagsCaptor.getValue()).isEmpty(); - - balancer.shutdown(); - - // new balancer - balancer = new GrpclbLoadBalancer(helper, subchannelPool, fakeClock.getTimeProvider(), - fakeClock.getStopwatchSupplier().get(), - backoffPolicyProvider); - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); - lbResponseObserver = lbResponseObserverCaptor.getValue(); - - // Simulate receiving LB response - lbResponseObserver.onNext(buildInitialResponse()); - lbResponseObserver.onNext(buildLbResponse(backends0)); - - // TODO(zhangkun83): remove the deprecation suppression on this method once migrated to - // the new createSubchannel(). - inOrder.verify(helper).createSubchannel( - eagsCaptor.capture(), - any(Attributes.class)); - eags = eagsCaptor.getValue(); - int newOffset = eags.indexOf(eagOfFakeServerEntry(backends0.get(0))); - - assertThat(newOffset).isEqualTo(offset + 1); - - assertEagsAreShifted(eags, backends0, newOffset); - - // Only one subchannel is created - assertThat(mockSubchannels).hasSize(1); - subchannel = mockSubchannels.poll(); - - lbResponseObserver.onNext(buildLbResponse(backends1)); - verify(subchannel).updateAddresses(eagsCaptor.capture()); - eags = eagsCaptor.getValue(); - assertEagsAreShifted(eags, backends1, newOffset); - - lbResponseObserver.onNext(buildLbResponse(backends2)); - verify(subchannel, times(2)).updateAddresses(eagsCaptor.capture()); - eags = eagsCaptor.getValue(); - assertEagsAreShifted(eags, backends2, newOffset); - - lbResponseObserver.onNext(buildLbResponse(backends3)); - verify(subchannel, times(3)).updateAddresses(eagsCaptor.capture()); - eags = eagsCaptor.getValue(); - assertEagsAreShifted(eags, backends3, newOffset); - - lbResponseObserver.onNext(buildLbResponse(allDrop)); - verify(subchannel, times(4)).updateAddresses(eagsCaptor.capture()); - assertThat(eagsCaptor.getValue()).isEmpty(); - } - - private static List generateFakeServerEntries(int num) { - List list = new ArrayList<>(num); - for (int i = 0; i < num; i++) { - list.add(new ServerEntry("127.0.0.1", 1000 + i, "token000" + i)); - } - return list; - } - - private static EquivalentAddressGroup eagOfFakeServerEntry(ServerEntry serverEntry) { - return new EquivalentAddressGroup(serverEntry.addr, eagAttrsWithToken(serverEntry.token)); - } - - private static void assertEagsAreShifted( - List eags, List serverEntries, int offset) { - int len = serverEntries.size(); - assertThat(eags).hasSize(len); - for (ServerEntry serverEntry : serverEntries) { - assertEquals( - eags + " is not a shift of " + serverEntries, - eagOfFakeServerEntry(serverEntry), eags.get(offset++ % len)); - } - } - @SuppressWarnings({"unchecked", "deprecation"}) @Test public void pickFirstMode_fallback() throws Exception { @@ -1994,9 +1866,9 @@ public class GrpclbLoadBalancerTest { // Entering fallback mode // TODO(zhangkun83): remove the deprecation suppression on this method once migrated to // the new createSubchannel(). - inOrder.verify(helper).createSubchannel(eagsCaptor.capture(), any(Attributes.class)); - assertThat(eagsCaptor.getValue()).containsExactly( - grpclbResolutionList.get(0), grpclbResolutionList.get(2)); + inOrder.verify(helper).createSubchannel( + eq(Arrays.asList(grpclbResolutionList.get(0), grpclbResolutionList.get(2))), + any(Attributes.class)); assertThat(mockSubchannels).hasSize(1); Subchannel subchannel = mockSubchannels.poll(); @@ -2030,10 +1902,11 @@ public class GrpclbLoadBalancerTest { // createSubchannel() has ever been called only once verify(helper, times(1)).createSubchannel(any(List.class), any(Attributes.class)); assertThat(mockSubchannels).isEmpty(); - verify(subchannel).updateAddresses(eagsCaptor.capture()); - assertThat(eagsCaptor.getValue()).containsExactly( - new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); + verify(subchannel).updateAddresses( + eq(Arrays.asList( + new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), + new EquivalentAddressGroup(backends1.get(1).addr, + eagAttrsWithToken("token0002"))))); inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(picker2.dropList).containsExactly(null, null); @@ -2128,11 +2001,10 @@ public class GrpclbLoadBalancerTest { // TODO(zhangkun83): remove the deprecation suppression on this method once migrated to // the new createSubchannel(). inOrder.verify(helper).createSubchannel( - eagsCaptor.capture(), + eq(Arrays.asList( + new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), + new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")))), any(Attributes.class)); - assertThat(eagsCaptor.getValue()).containsExactly( - new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); }