From 8634632019ee7ea35e44c6c8749343ec99ecc858 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Wed, 16 Aug 2017 16:03:22 -0700 Subject: [PATCH] grpclb: use two-level drop behavior (#3343) Previously, the round-robin list that the client uses (effective round-robin list, ERRL) was the received round-robin list (RRRL) excluding non-READY backends. Drop and backend entries are in the same list. The problem with it is that when not all backends are READY, drop entries take a larger proportion in ERRL than they do in the RRRL, resulting a larger drop ratio than intended. To fix this, we employ a two-list scheme: - A "drop list" (DL) that is out of the RRRL, with the same size and the same number of drop entries. - A "backend list" (BL) that contains only the backend entries from the RRRL, excluding non-READY ones. For every pick, the client would round-robin on the DL to determine whether the pick should be dropped. Only when it's not dropped, round-robin on the BL to pick the actual backend. This way, the drop ratio is always equal to the proportion they take in the RRRL. --- .../io/grpc/grpclb/GrpclbLoadBalancer.java | 279 +++++++++++------- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 217 +++++++++----- 2 files changed, 312 insertions(+), 184 deletions(-) diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java index ecfae44d4d..b86b0b99b1 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java @@ -48,6 +48,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -75,9 +76,9 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { PickResult.withError(Status.UNAVAILABLE.withDescription("Dropped as requested by balancer")); @VisibleForTesting - static final SubchannelPicker BUFFER_PICKER = new SubchannelPicker() { + static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() { @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { + public PickResult picked(Metadata headers) { return PickResult.withNoResult(); } }; @@ -120,8 +121,14 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { private LbStream lbStream; private Map subchannels = Collections.emptyMap(); - private List roundRobinList = Collections.emptyList(); - private SubchannelPicker currentPicker = BUFFER_PICKER; + // Has the same size as the round-robin list from the balancer. + // A drop entry from the round-robin list becomes a DropEntry here. + // A backend entry from the robin-robin list becomes a null here. + private List dropList = Collections.emptyList(); + // Contains only non-drop, i.e., backends from the round-robin list from the balancer. + private List backendList = Collections.emptyList(); + private RoundRobinPicker currentPicker = + new RoundRobinPicker(Collections.emptyList(), Arrays.asList(BUFFER_ENTRY)); GrpclbLoadBalancer(Helper helper, Factory pickFirstBalancerFactory, Factory roundRobinBalancerFactory, ObjectPool timerServicePool, @@ -299,10 +306,11 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { } private void handleGrpclbError(Status status) { - logger.log(Level.FINE, "[{0}] Had an error: {1}; roundRobinList={2}", - new Object[] {logId, status, roundRobinList}); - if (roundRobinList.isEmpty()) { - maybeUpdatePicker(TRANSIENT_FAILURE, new ErrorPicker(status)); + logger.log(Level.FINE, "[{0}] Had an error: {1}; dropList={2}; backendList={3}", + new Object[] {logId, status, dropList, backendList}); + if (backendList.isEmpty()) { + maybeUpdatePicker( + TRANSIENT_FAILURE, new RoundRobinPicker(dropList, Arrays.asList(new ErrorEntry(status)))); } } @@ -441,14 +449,16 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { ServerList serverList = response.getServerList(); HashMap newSubchannelMap = new HashMap(); - List newRoundRobinList = new ArrayList(); + List newDropList = new ArrayList(); + List newBackendList = new ArrayList(); // TODO(zhangkun83): honor expiration_interval // Construct the new collections. Create new Subchannels when necessary. for (Server server : serverList.getServersList()) { String token = server.getLoadBalanceToken(); if (server.getDrop()) { - newRoundRobinList.add(RoundRobinEntry.newDropEntry(loadRecorder, token)); + newDropList.add(new DropEntry(loadRecorder, token)); } else { + newDropList.add(null); InetSocketAddress address; try { address = new InetSocketAddress( @@ -472,7 +482,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { } newSubchannelMap.put(eag, subchannel); } - newRoundRobinList.add(RoundRobinEntry.newEntry(subchannel, loadRecorder, token)); + newBackendList.add(new BackendEntry(subchannel, loadRecorder, token)); } } // Close Subchannels whose addresses have been delisted @@ -483,8 +493,9 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { } } - subchannels = newSubchannelMap; - roundRobinList = newRoundRobinList; + subchannels = Collections.unmodifiableMap(newSubchannelMap); + dropList = Collections.unmodifiableList(newDropList); + backendList = Collections.unmodifiableList(newBackendList); maybeUpdatePicker(); } @@ -527,63 +538,57 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { } /** - * Make and use a picker out of the current roundRobinList and the states of subchannels if they - * have changed since the last picker created. + * Make and use a picker out of the current lists and the states of subchannels if they have + * changed since the last picker created. */ private void maybeUpdatePicker() { - List resultList = new ArrayList(); + List pickList = new ArrayList(backendList.size()); Status error = null; boolean hasIdle = false; - // TODO(zhangkun83): if roundRobinList contains at least one address, but none of them are - // ready, maybe we should always return BUFFER_PICKER, no matter if there are drop entries or - // not. - for (RoundRobinEntry entry : roundRobinList) { + for (BackendEntry entry : backendList) { Subchannel subchannel = entry.result.getSubchannel(); - if (subchannel != null) { - Attributes attrs = subchannel.getAttributes(); - ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get(); - if (stateInfo.getState() == READY) { - resultList.add(entry); - } else if (stateInfo.getState() == TRANSIENT_FAILURE) { - error = stateInfo.getStatus(); - } else if (stateInfo.getState() == IDLE) { - hasIdle = true; - } - } else { - // This is a drop entry. - resultList.add(entry); + Attributes attrs = subchannel.getAttributes(); + ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get(); + if (stateInfo.getState() == READY) { + pickList.add(entry); + } else if (stateInfo.getState() == TRANSIENT_FAILURE) { + error = stateInfo.getStatus(); + } else if (stateInfo.getState() == IDLE) { + hasIdle = true; } } - if (resultList.isEmpty()) { + ConnectivityState state; + if (pickList.isEmpty()) { if (error != null && !hasIdle) { logger.log(Level.FINE, "[{0}] No ready Subchannel. Using error: {1}", new Object[] {logId, error}); - maybeUpdatePicker(TRANSIENT_FAILURE, new ErrorPicker(error)); + pickList.add(new ErrorEntry(error)); + state = TRANSIENT_FAILURE; } else { logger.log(Level.FINE, "[{0}] No ready Subchannel and still connecting", logId); - maybeUpdatePicker(CONNECTING, BUFFER_PICKER); + pickList.add(BUFFER_ENTRY); + state = CONNECTING; } } else { - logger.log(Level.FINE, "[{0}] Using list {1}", new Object[] {logId, resultList}); - maybeUpdatePicker(READY, new RoundRobinPicker(resultList)); + logger.log( + Level.FINE, "[{0}] Using drop list {1} and pick list {2}", + new Object[] {logId, dropList, pickList}); + state = READY; } + maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList)); } /** * Update the given picker to the helper if it's different from the current one. */ - private void maybeUpdatePicker(ConnectivityState state, SubchannelPicker picker) { + private void maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker) { // Discard the new picker if we are sure it won't make any difference, in order to save // re-processing pending streams, and avoid unnecessary resetting of the pointer in // RoundRobinPicker. - if (picker == BUFFER_PICKER && currentPicker == BUFFER_PICKER) { + if (picker.dropList.equals(currentPicker.dropList) + && picker.pickList.equals(currentPicker.pickList)) { return; } - if (picker instanceof RoundRobinPicker && currentPicker instanceof RoundRobinPicker) { - if (((RoundRobinPicker) picker).list.equals(((RoundRobinPicker) currentPicker).list)) { - return; - } - } // No need to skip ErrorPicker. If the current picker is ErrorPicker, there won't be any pending // stream thus no time is wasted in re-process. currentPicker = picker; @@ -632,105 +637,163 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { } @VisibleForTesting - static final class ErrorPicker extends SubchannelPicker { - final PickResult result; + static final class DropEntry { + private final GrpclbClientLoadRecorder loadRecorder; + private final String token; - ErrorPicker(Status status) { - result = PickResult.withError(status); - } - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return result; - } - } - - @VisibleForTesting - static final class RoundRobinEntry { - final PickResult result; - final GrpclbClientLoadRecorder loadRecorder; - final String token; - - private RoundRobinEntry( - PickResult result, GrpclbClientLoadRecorder loadRecorder, String token) { - this.result = checkNotNull(result); + DropEntry(GrpclbClientLoadRecorder loadRecorder, String token) { this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder"); this.token = checkNotNull(token, "token"); } - /** - * Create a non-drop result. - */ - static RoundRobinEntry newEntry( - Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token) { - return new RoundRobinEntry( - PickResult.withSubchannel(subchannel, loadRecorder), loadRecorder, token); - } - - /** - * Create a drop result. - */ - static RoundRobinEntry newDropEntry(GrpclbClientLoadRecorder loadRecorder, String token) { - return new RoundRobinEntry(DROP_PICK_RESULT, loadRecorder, token); - } - - void updateHeaders(Metadata headers) { - if (!isDrop()) { - headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY); - headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token); - } + PickResult picked() { + loadRecorder.recordDroppedRequest(token); + return DROP_PICK_RESULT; } @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("result", result) + .add("loadRecorder", loadRecorder) .add("token", token) .toString(); } @Override public int hashCode() { - return Objects.hashCode(result, token); + return Objects.hashCode(loadRecorder, token); } @Override public boolean equals(Object other) { - if (!(other instanceof RoundRobinEntry)) { + if (!(other instanceof DropEntry)) { return false; } - RoundRobinEntry that = (RoundRobinEntry) other; - return Objects.equal(result, that.result) && Objects.equal(token, that.token); + DropEntry that = (DropEntry) other; + return Objects.equal(loadRecorder, that.loadRecorder) && Objects.equal(token, that.token); + } + } + + private interface RoundRobinEntry { + PickResult picked(Metadata headers); + } + + @VisibleForTesting + static final class BackendEntry implements RoundRobinEntry { + @VisibleForTesting + final PickResult result; + private final GrpclbClientLoadRecorder loadRecorder; + private final String token; + + BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token) { + this.result = PickResult.withSubchannel(subchannel, loadRecorder); + this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder"); + this.token = checkNotNull(token, "token"); } - boolean isDrop() { - return result == DROP_PICK_RESULT; + @Override + public PickResult picked(Metadata headers) { + headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY); + headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token); + return result; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("result", result) + .add("loadRecorder", loadRecorder) + .add("token", token) + .toString(); + } + + @Override + public int hashCode() { + return Objects.hashCode(loadRecorder, result, token); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof BackendEntry)) { + return false; + } + BackendEntry that = (BackendEntry) other; + return Objects.equal(result, that.result) && Objects.equal(token, that.token) + && Objects.equal(loadRecorder, that.loadRecorder); + } + } + + @VisibleForTesting + static final class ErrorEntry implements RoundRobinEntry { + private final PickResult result; + + ErrorEntry(Status status) { + result = PickResult.withError(status); + } + + @Override + public PickResult picked(Metadata headers) { + return result; + } + + @Override + public int hashCode() { + return Objects.hashCode(result); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof ErrorEntry)) { + return false; + } + return Objects.equal(result, ((ErrorEntry) other).result); } } @VisibleForTesting static final class RoundRobinPicker extends SubchannelPicker { - final List list; - private int index; + @VisibleForTesting + final List dropList; + private int dropIndex; - RoundRobinPicker(List resultList) { - checkArgument(!resultList.isEmpty(), "resultList is empty"); - this.list = checkNotNull(resultList, "resultList"); + @VisibleForTesting + final List pickList; + private int pickIndex; + + // dropList can be empty, which means no drop. + // pickList must not be empty. + RoundRobinPicker(List dropList, List pickList) { + this.dropList = checkNotNull(dropList, "dropList"); + this.pickList = checkNotNull(pickList, "pickList"); + checkArgument(!pickList.isEmpty(), "pickList is empty"); } @Override public PickResult pickSubchannel(PickSubchannelArgs args) { - synchronized (list) { - RoundRobinEntry result = list.get(index); - index++; - if (index == list.size()) { - index = 0; + synchronized (pickList) { + // Two-level round-robin. + // First round-robin on dropList. If a drop entry is selected, request will be dropped. If + // a non-drop entry is selected, then round-robin on pickList. This makes sure requests are + // dropped at the same proportion as the drop entries appear on the round-robin list from + // the balancer, while only READY backends (that make up pickList) are selected for the + // non-drop cases. + if (!dropList.isEmpty()) { + DropEntry drop = dropList.get(dropIndex); + dropIndex++; + if (dropIndex == dropList.size()) { + dropIndex = 0; + } + if (drop != null) { + return drop.picked(); + } } - result.updateHeaders(args.getHeaders()); - if (result.isDrop()) { - result.loadRecorder.recordDroppedRequest(result.token); + + RoundRobinEntry pick = pickList.get(pickIndex); + pickIndex++; + if (pickIndex == pickList.size()) { + pickIndex = 0; } - return result.result; + return pick.picked(args.getHeaders()); } } } diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 04ac666f85..1c74561fd2 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -22,6 +22,8 @@ import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.SHUTDOWN; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; +import static io.grpc.grpclb.GrpclbLoadBalancer.BUFFER_ENTRY; +import static io.grpc.grpclb.GrpclbLoadBalancer.DROP_PICK_RESULT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -61,8 +63,9 @@ import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.grpclb.GrpclbConstants.LbPolicy; -import io.grpc.grpclb.GrpclbLoadBalancer.ErrorPicker; -import io.grpc.grpclb.GrpclbLoadBalancer.RoundRobinEntry; +import io.grpc.grpclb.GrpclbLoadBalancer.BackendEntry; +import io.grpc.grpclb.GrpclbLoadBalancer.DropEntry; +import io.grpc.grpclb.GrpclbLoadBalancer.ErrorEntry; import io.grpc.grpclb.GrpclbLoadBalancer.RoundRobinPicker; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; @@ -237,64 +240,95 @@ public class GrpclbLoadBalancerTest { } @Test - public void errorPicker() { - PickSubchannelArgs mockArgs = mock(PickSubchannelArgs.class); - Status error = Status.UNAVAILABLE.withDescription("Just don't know why"); - ErrorPicker picker = new ErrorPicker(error); - assertSame(error, picker.pickSubchannel(mockArgs).getStatus()); - verifyNoMoreInteractions(mockArgs); - } - - @Test - public void roundRobinPicker() { + public void roundRobinPickerNoDrop() { GrpclbClientLoadRecorder loadRecorder = new GrpclbClientLoadRecorder(timeProvider); Subchannel subchannel = mock(Subchannel.class); - RoundRobinEntry r1 = RoundRobinEntry.newDropEntry(loadRecorder, "LBTOKEN0001"); - RoundRobinEntry r2 = RoundRobinEntry.newEntry(subchannel, loadRecorder, "LBTOKEN0001"); - RoundRobinEntry r3 = RoundRobinEntry.newEntry(subchannel, loadRecorder, "LBTOKEN0002"); + BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001"); + BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002"); - List list = Arrays.asList(r1, r2, r3); - RoundRobinPicker picker = new RoundRobinPicker(list); + List pickList = Arrays.asList(b1, b2); + RoundRobinPicker picker = new RoundRobinPicker(Collections.emptyList(), pickList); PickSubchannelArgs args1 = mock(PickSubchannelArgs.class); Metadata headers1 = new Metadata(); + // The existing token on the headers will be replaced + headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD"); when(args1.getHeaders()).thenReturn(headers1); - assertSame(r1.result, picker.pickSubchannel(args1)); + assertSame(b1.result, picker.pickSubchannel(args1)); verify(args1).getHeaders(); - assertFalse(headers1.containsKey(GrpclbConstants.TOKEN_METADATA_KEY)); + assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); PickSubchannelArgs args2 = mock(PickSubchannelArgs.class); Metadata headers2 = new Metadata(); - // The existing token on the headers will be replaced - headers2.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD"); when(args2.getHeaders()).thenReturn(headers2); - assertSame(r2.result, picker.pickSubchannel(args2)); + assertSame(b2.result, picker.pickSubchannel(args2)); verify(args2).getHeaders(); - assertThat(headers2.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); + assertThat(headers2.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002"); PickSubchannelArgs args3 = mock(PickSubchannelArgs.class); Metadata headers3 = new Metadata(); when(args3.getHeaders()).thenReturn(headers3); - assertSame(r3.result, picker.pickSubchannel(args3)); + assertSame(b1.result, picker.pickSubchannel(args3)); verify(args3).getHeaders(); - assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002"); - - PickSubchannelArgs args4 = mock(PickSubchannelArgs.class); - Metadata headers4 = new Metadata(); - when(args4.getHeaders()).thenReturn(headers4); - assertSame(r1.result, picker.pickSubchannel(args4)); - verify(args4).getHeaders(); - assertFalse(headers4.containsKey(GrpclbConstants.TOKEN_METADATA_KEY)); + assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); verify(subchannel, never()).getAttributes(); } + @Test - public void bufferPicker() { - PickSubchannelArgs args = mock(PickSubchannelArgs.class); - assertEquals(PickResult.withNoResult(), - GrpclbLoadBalancer.BUFFER_PICKER.pickSubchannel(args)); - verifyNoMoreInteractions(args); + public void roundRobinPickerWithDrop() { + GrpclbClientLoadRecorder loadRecorder = new GrpclbClientLoadRecorder(timeProvider); + Subchannel subchannel = mock(Subchannel.class); + // 1 out of 2 requests are to be dropped + DropEntry d = new DropEntry(loadRecorder, "LBTOKEN0003"); + List dropList = Arrays.asList(null, d); + + BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001"); + BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002"); + List pickList = Arrays.asList(b1, b2); + RoundRobinPicker picker = new RoundRobinPicker(dropList, pickList); + + // dropList[0], pickList[0] + PickSubchannelArgs args1 = mock(PickSubchannelArgs.class); + Metadata headers1 = new Metadata(); + headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD"); + when(args1.getHeaders()).thenReturn(headers1); + assertSame(b1.result, picker.pickSubchannel(args1)); + verify(args1).getHeaders(); + assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); + + // dropList[1]: drop + PickSubchannelArgs args2 = mock(PickSubchannelArgs.class); + Metadata headers2 = new Metadata(); + when(args2.getHeaders()).thenReturn(headers2); + assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args2)); + verify(args2, never()).getHeaders(); + + // dropList[0], pickList[1] + PickSubchannelArgs args3 = mock(PickSubchannelArgs.class); + Metadata headers3 = new Metadata(); + when(args3.getHeaders()).thenReturn(headers3); + assertSame(b2.result, picker.pickSubchannel(args3)); + verify(args3).getHeaders(); + assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002"); + + // dropList[1]: drop + PickSubchannelArgs args4 = mock(PickSubchannelArgs.class); + Metadata headers4 = new Metadata(); + when(args4.getHeaders()).thenReturn(headers4); + assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args4)); + verify(args4, never()).getHeaders(); + + // dropList[0], pickList[0] + PickSubchannelArgs args5 = mock(PickSubchannelArgs.class); + Metadata headers5 = new Metadata(); + when(args5.getHeaders()).thenReturn(headers5); + assertSame(b1.result, picker.pickSubchannel(args5)); + verify(args5).getHeaders(); + assertThat(headers5.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); + + verify(subchannel, never()).getAttributes(); } @Test @@ -331,9 +365,9 @@ public class GrpclbLoadBalancerTest { List backends = Arrays.asList( new ServerEntry("127.0.0.1", 2000, "token0001"), - new ServerEntry("token0001"), + new ServerEntry("token0001"), // drop new ServerEntry("127.0.0.1", 2010, "token0002"), - new ServerEntry("token0003")); + new ServerEntry("token0003")); // drop lbResponseObserver.onNext(buildLbResponse(backends)); @@ -348,11 +382,14 @@ public class GrpclbLoadBalancerTest { helperInOrder.verify(helper, atLeast(1)) .updateBalancingState(eq(READY), pickerCaptor.capture()); RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker.list).containsExactly( - RoundRobinEntry.newEntry(subchannel1, balancer.getLoadRecorder(), "token0001"), - RoundRobinEntry.newDropEntry(balancer.getLoadRecorder(), "token0001"), - RoundRobinEntry.newEntry(subchannel2, balancer.getLoadRecorder(), "token0002"), - RoundRobinEntry.newDropEntry(balancer.getLoadRecorder(), "token0003")).inOrder(); + assertThat(picker.dropList).containsExactly( + null, + new DropEntry(balancer.getLoadRecorder(), "token0001"), + null, + new DropEntry(balancer.getLoadRecorder(), "token0003")).inOrder(); + assertThat(picker.pickList).containsExactly( + new BackendEntry(subchannel1, balancer.getLoadRecorder(), "token0001"), + new BackendEntry(subchannel2, balancer.getLoadRecorder(), "token0002")).inOrder(); // Report, no data assertNextReport( @@ -623,8 +660,9 @@ public class GrpclbLoadBalancerTest { Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); deliverNameResolutionError(error); verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue(); - assertSame(error, errorPicker.result.getStatus()); + RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker.dropList).isEmpty(); + assertThat(picker.pickList).containsExactly(new ErrorEntry(error)); // Recover with a subsequent success List resolvedServers = createResolvedServerAddresses(false); @@ -643,8 +681,9 @@ public class GrpclbLoadBalancerTest { Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); deliverNameResolutionError(error); verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue(); - assertSame(error, errorPicker.result.getStatus()); + RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker.dropList).isEmpty(); + assertThat(picker.pickList).containsExactly(new ErrorEntry(error)); // Recover with a subsequent success List resolvedServers = createResolvedServerAddresses(true); @@ -730,8 +769,9 @@ public class GrpclbLoadBalancerTest { deliverNameResolutionError(error); inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue(); - assertSame(error, errorPicker.result.getStatus()); + RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker.dropList).isEmpty(); + assertThat(picker.pickList).containsExactly(new ErrorEntry(error)); assertFalse(oobChannel.isShutdown()); // Simulate receiving LB response @@ -930,6 +970,10 @@ public class GrpclbLoadBalancerTest { deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker0.dropList).containsExactly(null, null); + assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY); inOrder.verifyNoMoreInteractions(); // Let subchannels be connected @@ -937,15 +981,17 @@ public class GrpclbLoadBalancerTest { inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker1.list).containsExactly( - RoundRobinEntry.newEntry(subchannel2, balancer.getLoadRecorder(), "token0002")); + assertThat(picker1.dropList).containsExactly(null, null); + assertThat(picker1.pickList).containsExactly( + new BackendEntry(subchannel2, balancer.getLoadRecorder(), "token0002")); deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker2.list).containsExactly( - RoundRobinEntry.newEntry(subchannel1, balancer.getLoadRecorder(), "token0001"), - RoundRobinEntry.newEntry(subchannel2, balancer.getLoadRecorder(), "token0002")) + assertThat(picker2.dropList).containsExactly(null, null); + assertThat(picker2.pickList).containsExactly( + new BackendEntry(subchannel1, balancer.getLoadRecorder(), "token0001"), + new BackendEntry(subchannel2, balancer.getLoadRecorder(), "token0002")) .inOrder(); // Disconnected subchannels @@ -954,8 +1000,9 @@ public class GrpclbLoadBalancerTest { verify(subchannel1, times(2)).requestConnection(); inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker3.list).containsExactly( - RoundRobinEntry.newEntry(subchannel2, balancer.getLoadRecorder(), "token0002")); + assertThat(picker3.dropList).containsExactly(null, null); + assertThat(picker3.pickList).containsExactly( + new BackendEntry(subchannel2, balancer.getLoadRecorder(), "token0002")); deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); inOrder.verifyNoMoreInteractions(); @@ -969,16 +1016,19 @@ public class GrpclbLoadBalancerTest { verify(subchannel2).requestConnection(); deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(IDLE)); verify(subchannel2, times(2)).requestConnection(); - inOrder.verify(helper).updateBalancingState(CONNECTING, GrpclbLoadBalancer.BUFFER_PICKER); + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker4.dropList).containsExactly(null, null); + assertThat(picker4.pickList).containsExactly(BUFFER_ENTRY); // Update backends, with a drop entry List backends2 = Arrays.asList( new ServerEntry("127.0.0.1", 2030, "token0003"), // New address - new ServerEntry("token0003"), + new ServerEntry("token0003"), // drop new ServerEntry("127.0.0.1", 2010, "token0004"), // Existing address with token changed new ServerEntry("127.0.0.1", 2030, "token0005"), // New address appearing second time - new ServerEntry("token0006")); + new ServerEntry("token0006")); // drop verify(subchannel1, never()).shutdown(); lbResponseObserver.onNext(buildLbResponse(backends2)); @@ -993,11 +1043,15 @@ public class GrpclbLoadBalancerTest { Subchannel subchannel3 = mockSubchannels.poll(); verify(subchannel3).requestConnection(); assertEquals(new EquivalentAddressGroup(backends2.get(0).addr), subchannel3.getAddresses()); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); RoundRobinPicker picker7 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker7.list).containsExactly( - RoundRobinEntry.newDropEntry(balancer.getLoadRecorder(), "token0003"), - RoundRobinEntry.newDropEntry(balancer.getLoadRecorder(), "token0006")).inOrder(); + assertThat(picker7.dropList).containsExactly( + null, + new DropEntry(balancer.getLoadRecorder(), "token0003"), + null, + null, + new DropEntry(balancer.getLoadRecorder(), "token0006")).inOrder(); + assertThat(picker7.pickList).containsExactly(BUFFER_ENTRY); // State updates on obsolete subchannel1 will have no effect deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); @@ -1009,29 +1063,40 @@ public class GrpclbLoadBalancerTest { deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY)); inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); RoundRobinPicker picker8 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker8.dropList).containsExactly( + null, + new DropEntry(balancer.getLoadRecorder(), "token0003"), + null, + null, + new DropEntry(balancer.getLoadRecorder(), "token0006")).inOrder(); // subchannel2 is still IDLE, thus not in the active list - assertThat(picker8.list).containsExactly( - RoundRobinEntry.newEntry(subchannel3, balancer.getLoadRecorder(), "token0003"), - RoundRobinEntry.newDropEntry(balancer.getLoadRecorder(), "token0003"), - RoundRobinEntry.newEntry(subchannel3, balancer.getLoadRecorder(), "token0005"), - RoundRobinEntry.newDropEntry(balancer.getLoadRecorder(), "token0006")).inOrder(); + assertThat(picker8.pickList).containsExactly( + new BackendEntry(subchannel3, balancer.getLoadRecorder(), "token0003"), + new BackendEntry(subchannel3, balancer.getLoadRecorder(), "token0005")).inOrder(); // subchannel2 becomes READY and makes it into the list deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); RoundRobinPicker picker9 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker9.list).containsExactly( - RoundRobinEntry.newEntry(subchannel3, balancer.getLoadRecorder(), "token0003"), - RoundRobinEntry.newDropEntry(balancer.getLoadRecorder(), "token0003"), - RoundRobinEntry.newEntry(subchannel2, balancer.getLoadRecorder(), "token0004"), - RoundRobinEntry.newEntry(subchannel3, balancer.getLoadRecorder(), "token0005"), - RoundRobinEntry.newDropEntry(balancer.getLoadRecorder(), "token0006")).inOrder(); + assertThat(picker9.dropList).containsExactly( + null, + new DropEntry(balancer.getLoadRecorder(), "token0003"), + null, + null, + new DropEntry(balancer.getLoadRecorder(), "token0006")).inOrder(); + assertThat(picker9.pickList).containsExactly( + new BackendEntry(subchannel3, balancer.getLoadRecorder(), "token0003"), + new BackendEntry(subchannel2, balancer.getLoadRecorder(), "token0004"), + new BackendEntry(subchannel3, balancer.getLoadRecorder(), "token0005")).inOrder(); verify(subchannel3, never()).shutdown(); // Update backends, with no entry lbResponseObserver.onNext(buildLbResponse(Collections.emptyList())); verify(subchannel2).shutdown(); verify(subchannel3).shutdown(); - inOrder.verify(helper).updateBalancingState(CONNECTING, GrpclbLoadBalancer.BUFFER_PICKER); + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + RoundRobinPicker picker10 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker10.dropList).isEmpty(); + assertThat(picker10.pickList).containsExactly(BUFFER_ENTRY); assertFalse(oobChannel.isShutdown()); assertEquals(0, lbRequestObservers.size());