Revert "grpclb: shuffle pick first index"

This reverts commit 1949ebd7ef.

The shuffling should be handled by the grpclb server instead.
This commit is contained in:
Eric Anderson 2019-11-08 08:54:35 -08:00
parent c032e2ebf9
commit 1f64ac94a8
3 changed files with 25 additions and 166 deletions

View File

@ -38,7 +38,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@ -52,11 +51,7 @@ import javax.annotation.Nullable;
class GrpclbLoadBalancer extends LoadBalancer {
private static final Mode DEFAULT_MODE = Mode.ROUND_ROBIN;
private static final Logger logger = Logger.getLogger(GrpclbLoadBalancer.class.getName());
private static final AtomicInteger pickFirstGlobalIndex = new AtomicInteger();
/** Backend list for pick_first will be picked from this index. */
// & 0xFFFFFF to avoid overflow
private final int pickFirstIndex = pickFirstGlobalIndex.getAndIncrement() & 0xFFFFFF;
private final Helper helper;
private final TimeProvider time;
private final Stopwatch stopwatch;
@ -172,8 +167,8 @@ class GrpclbLoadBalancer extends LoadBalancer {
private void recreateStates() {
resetStates();
checkState(grpclbState == null, "Should've been cleared");
grpclbState = new GrpclbState(
mode, helper, subchannelPool, time, stopwatch, backoffPolicyProvider, pickFirstIndex);
grpclbState = new GrpclbState(mode, helper, subchannelPool, time, stopwatch,
backoffPolicyProvider);
}
@Override

View File

@ -104,13 +104,11 @@ final class GrpclbState {
}
};
enum Mode {
static enum Mode {
ROUND_ROBIN,
PICK_FIRST,
}
// backend list for pick_first will be picked from this index
private final int pickFirstIndex;
private final String serviceName;
private final Helper helper;
private final SynchronizationContext syncContext;
@ -160,8 +158,7 @@ final class GrpclbState {
SubchannelPool subchannelPool,
TimeProvider time,
Stopwatch stopwatch,
BackoffPolicy.Provider backoffPolicyProvider,
int pickFirstIndex) {
BackoffPolicy.Provider backoffPolicyProvider) {
this.mode = checkNotNull(mode, "mode");
this.helper = checkNotNull(helper, "helper");
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
@ -173,7 +170,6 @@ final class GrpclbState {
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority");
this.logger = checkNotNull(helper.getChannelLogger(), "logger");
this.pickFirstIndex = pickFirstIndex;
}
void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) {
@ -421,14 +417,12 @@ final class GrpclbState {
subchannels = Collections.unmodifiableMap(newSubchannelMap);
break;
case PICK_FIRST:
int numOfEags = newBackendAddrList.size();
List<EquivalentAddressGroup> eagList = Arrays.asList(new EquivalentAddressGroup[numOfEags]);
List<EquivalentAddressGroup> eagList = new ArrayList<>();
// Because for PICK_FIRST, we create a single Subchannel for all addresses, we have to
// attach the tokens to the EAG attributes and use TokenAttachingLoadRecorder to put them on
// headers.
//
// The PICK_FIRST code path doesn't cache Subchannels.
int offset = pickFirstIndex;
for (BackendAddressGroup bag : newBackendAddrList) {
EquivalentAddressGroup origEag = bag.getAddresses();
Attributes eagAttrs = origEag.getAttributes();
@ -436,9 +430,7 @@ final class GrpclbState {
eagAttrs = eagAttrs.toBuilder()
.set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, bag.getToken()).build();
}
eagList.set(
offset++ % numOfEags,
new EquivalentAddressGroup(origEag.getAddresses(), eagAttrs));
eagList.add(new EquivalentAddressGroup(origEag.getAddresses(), eagAttrs));
}
Subchannel subchannel;
if (subchannels.isEmpty()) {

View File

@ -186,8 +186,6 @@ public class GrpclbLoadBalancerTest {
private io.grpc.Server fakeLbServer;
@Captor
private ArgumentCaptor<SubchannelPicker> pickerCaptor;
@Captor
private ArgumentCaptor<List<EquivalentAddressGroup>> eagsCaptor;
@Mock
private BackoffPolicy.Provider backoffPolicyProvider;
@Mock
@ -1723,11 +1721,10 @@ public class GrpclbLoadBalancerTest {
// TODO(zhangkun83): remove the deprecation suppression on this method once migrated to
// the new createSubchannel().
inOrder.verify(helper).createSubchannel(
eagsCaptor.capture(),
eq(Arrays.asList(
new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")))),
any(Attributes.class));
assertThat(eagsCaptor.getValue()).containsExactly(
new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")));
// Initially IDLE
inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
@ -1780,10 +1777,11 @@ public class GrpclbLoadBalancerTest {
// createSubchannel() has ever been called only once
verify(helper, times(1)).createSubchannel(any(List.class), any(Attributes.class));
assertThat(mockSubchannels).isEmpty();
verify(subchannel).updateAddresses(eagsCaptor.capture());
assertThat(eagsCaptor.getValue()).containsExactly(
new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends2.get(2).addr, eagAttrsWithToken("token0004")));
verify(subchannel).updateAddresses(
eq(Arrays.asList(
new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends2.get(2).addr,
eagAttrsWithToken("token0004")))));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker4.dropList).containsExactly(
@ -1842,132 +1840,6 @@ public class GrpclbLoadBalancerTest {
.isEqualTo(Code.CANCELLED);
}
@SuppressWarnings("deprecation")
@Test
public void grpclbWorking_pickFirstMode_expectBackendsShuffled() throws Exception {
InOrder inOrder = inOrder(helper);
String lbConfig = "{\"childPolicy\" : [ {\"pick_first\" : {}} ]}";
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
Attributes grpclbResolutionAttrs = Attributes.newBuilder().set(
LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig)).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
// Simulate receiving LB response
List<ServerEntry> backends0 = generateFakeServerEntries(100);
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends0));
// TODO(zhangkun83): remove the deprecation suppression on this method once migrated to
// the new createSubchannel().
inOrder.verify(helper).createSubchannel(
eagsCaptor.capture(),
any(Attributes.class));
List<EquivalentAddressGroup> eags = eagsCaptor.getValue();
int offset = eags.indexOf(eagOfFakeServerEntry(backends0.get(0)));
assertEagsAreShifted(eags, backends0, offset);
// Only one subchannel is created
assertThat(mockSubchannels).hasSize(1);
Subchannel subchannel = mockSubchannels.poll();
List<ServerEntry> backends1 = generateFakeServerEntries(1);
lbResponseObserver.onNext(buildLbResponse(backends1));
verify(subchannel).updateAddresses(eagsCaptor.capture());
eags = eagsCaptor.getValue();
assertEagsAreShifted(eags, backends1, offset);
List<ServerEntry> backends2 = generateFakeServerEntries(2);
lbResponseObserver.onNext(buildLbResponse(backends2));
verify(subchannel, times(2)).updateAddresses(eagsCaptor.capture());
eags = eagsCaptor.getValue();
assertEagsAreShifted(eags, backends2, offset);
List<ServerEntry> backends3 = generateFakeServerEntries(3);
lbResponseObserver.onNext(buildLbResponse(backends3));
verify(subchannel, times(3)).updateAddresses(eagsCaptor.capture());
eags = eagsCaptor.getValue();
assertEagsAreShifted(eags, backends3, offset);
List<ServerEntry> allDrop = Collections.singletonList(new ServerEntry("token0004"));
lbResponseObserver.onNext(buildLbResponse(allDrop));
verify(subchannel, times(4)).updateAddresses(eagsCaptor.capture());
assertThat(eagsCaptor.getValue()).isEmpty();
balancer.shutdown();
// new balancer
balancer = new GrpclbLoadBalancer(helper, subchannelPool, fakeClock.getTimeProvider(),
fakeClock.getStopwatchSupplier().get(),
backoffPolicyProvider);
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
// Simulate receiving LB response
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends0));
// TODO(zhangkun83): remove the deprecation suppression on this method once migrated to
// the new createSubchannel().
inOrder.verify(helper).createSubchannel(
eagsCaptor.capture(),
any(Attributes.class));
eags = eagsCaptor.getValue();
int newOffset = eags.indexOf(eagOfFakeServerEntry(backends0.get(0)));
assertThat(newOffset).isEqualTo(offset + 1);
assertEagsAreShifted(eags, backends0, newOffset);
// Only one subchannel is created
assertThat(mockSubchannels).hasSize(1);
subchannel = mockSubchannels.poll();
lbResponseObserver.onNext(buildLbResponse(backends1));
verify(subchannel).updateAddresses(eagsCaptor.capture());
eags = eagsCaptor.getValue();
assertEagsAreShifted(eags, backends1, newOffset);
lbResponseObserver.onNext(buildLbResponse(backends2));
verify(subchannel, times(2)).updateAddresses(eagsCaptor.capture());
eags = eagsCaptor.getValue();
assertEagsAreShifted(eags, backends2, newOffset);
lbResponseObserver.onNext(buildLbResponse(backends3));
verify(subchannel, times(3)).updateAddresses(eagsCaptor.capture());
eags = eagsCaptor.getValue();
assertEagsAreShifted(eags, backends3, newOffset);
lbResponseObserver.onNext(buildLbResponse(allDrop));
verify(subchannel, times(4)).updateAddresses(eagsCaptor.capture());
assertThat(eagsCaptor.getValue()).isEmpty();
}
private static List<ServerEntry> generateFakeServerEntries(int num) {
List<ServerEntry> list = new ArrayList<>(num);
for (int i = 0; i < num; i++) {
list.add(new ServerEntry("127.0.0.1", 1000 + i, "token000" + i));
}
return list;
}
private static EquivalentAddressGroup eagOfFakeServerEntry(ServerEntry serverEntry) {
return new EquivalentAddressGroup(serverEntry.addr, eagAttrsWithToken(serverEntry.token));
}
private static void assertEagsAreShifted(
List<EquivalentAddressGroup> eags, List<ServerEntry> serverEntries, int offset) {
int len = serverEntries.size();
assertThat(eags).hasSize(len);
for (ServerEntry serverEntry : serverEntries) {
assertEquals(
eags + " is not a shift of " + serverEntries,
eagOfFakeServerEntry(serverEntry), eags.get(offset++ % len));
}
}
@SuppressWarnings({"unchecked", "deprecation"})
@Test
public void pickFirstMode_fallback() throws Exception {
@ -1994,9 +1866,9 @@ public class GrpclbLoadBalancerTest {
// Entering fallback mode
// TODO(zhangkun83): remove the deprecation suppression on this method once migrated to
// the new createSubchannel().
inOrder.verify(helper).createSubchannel(eagsCaptor.capture(), any(Attributes.class));
assertThat(eagsCaptor.getValue()).containsExactly(
grpclbResolutionList.get(0), grpclbResolutionList.get(2));
inOrder.verify(helper).createSubchannel(
eq(Arrays.asList(grpclbResolutionList.get(0), grpclbResolutionList.get(2))),
any(Attributes.class));
assertThat(mockSubchannels).hasSize(1);
Subchannel subchannel = mockSubchannels.poll();
@ -2030,10 +1902,11 @@ public class GrpclbLoadBalancerTest {
// createSubchannel() has ever been called only once
verify(helper, times(1)).createSubchannel(any(List.class), any(Attributes.class));
assertThat(mockSubchannels).isEmpty();
verify(subchannel).updateAddresses(eagsCaptor.capture());
assertThat(eagsCaptor.getValue()).containsExactly(
new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")));
verify(subchannel).updateAddresses(
eq(Arrays.asList(
new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends1.get(1).addr,
eagAttrsWithToken("token0002")))));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker2.dropList).containsExactly(null, null);
@ -2128,11 +2001,10 @@ public class GrpclbLoadBalancerTest {
// TODO(zhangkun83): remove the deprecation suppression on this method once migrated to
// the new createSubchannel().
inOrder.verify(helper).createSubchannel(
eagsCaptor.capture(),
eq(Arrays.asList(
new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")))),
any(Attributes.class));
assertThat(eagsCaptor.getValue()).containsExactly(
new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")));
inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
}