mirror of https://github.com/grpc/grpc-java.git
grpclb: use two-level drop behavior (#3343)
Previously, the round-robin list that the client uses (effective round-robin list, ERRL) was the received round-robin list (RRRL) excluding non-READY backends. Drop and backend entries are in the same list. The problem with it is that when not all backends are READY, drop entries take a larger proportion in ERRL than they do in the RRRL, resulting a larger drop ratio than intended. To fix this, we employ a two-list scheme: - A "drop list" (DL) that is out of the RRRL, with the same size and the same number of drop entries. - A "backend list" (BL) that contains only the backend entries from the RRRL, excluding non-READY ones. For every pick, the client would round-robin on the DL to determine whether the pick should be dropped. Only when it's not dropped, round-robin on the BL to pick the actual backend. This way, the drop ratio is always equal to the proportion they take in the RRRL.
This commit is contained in:
parent
34857580ff
commit
8634632019
|
|
@ -48,6 +48,7 @@ import java.net.InetSocketAddress;
|
|||
import java.net.SocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
|
@ -75,9 +76,9 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
|
|||
PickResult.withError(Status.UNAVAILABLE.withDescription("Dropped as requested by balancer"));
|
||||
|
||||
@VisibleForTesting
|
||||
static final SubchannelPicker BUFFER_PICKER = new SubchannelPicker() {
|
||||
static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() {
|
||||
@Override
|
||||
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
||||
public PickResult picked(Metadata headers) {
|
||||
return PickResult.withNoResult();
|
||||
}
|
||||
};
|
||||
|
|
@ -120,8 +121,14 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
|
|||
private LbStream lbStream;
|
||||
private Map<EquivalentAddressGroup, Subchannel> subchannels = Collections.emptyMap();
|
||||
|
||||
private List<RoundRobinEntry> roundRobinList = Collections.emptyList();
|
||||
private SubchannelPicker currentPicker = BUFFER_PICKER;
|
||||
// Has the same size as the round-robin list from the balancer.
|
||||
// A drop entry from the round-robin list becomes a DropEntry here.
|
||||
// A backend entry from the robin-robin list becomes a null here.
|
||||
private List<DropEntry> dropList = Collections.emptyList();
|
||||
// Contains only non-drop, i.e., backends from the round-robin list from the balancer.
|
||||
private List<BackendEntry> backendList = Collections.emptyList();
|
||||
private RoundRobinPicker currentPicker =
|
||||
new RoundRobinPicker(Collections.<DropEntry>emptyList(), Arrays.asList(BUFFER_ENTRY));
|
||||
|
||||
GrpclbLoadBalancer(Helper helper, Factory pickFirstBalancerFactory,
|
||||
Factory roundRobinBalancerFactory, ObjectPool<ScheduledExecutorService> timerServicePool,
|
||||
|
|
@ -299,10 +306,11 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
|
|||
}
|
||||
|
||||
private void handleGrpclbError(Status status) {
|
||||
logger.log(Level.FINE, "[{0}] Had an error: {1}; roundRobinList={2}",
|
||||
new Object[] {logId, status, roundRobinList});
|
||||
if (roundRobinList.isEmpty()) {
|
||||
maybeUpdatePicker(TRANSIENT_FAILURE, new ErrorPicker(status));
|
||||
logger.log(Level.FINE, "[{0}] Had an error: {1}; dropList={2}; backendList={3}",
|
||||
new Object[] {logId, status, dropList, backendList});
|
||||
if (backendList.isEmpty()) {
|
||||
maybeUpdatePicker(
|
||||
TRANSIENT_FAILURE, new RoundRobinPicker(dropList, Arrays.asList(new ErrorEntry(status))));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -441,14 +449,16 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
|
|||
ServerList serverList = response.getServerList();
|
||||
HashMap<EquivalentAddressGroup, Subchannel> newSubchannelMap =
|
||||
new HashMap<EquivalentAddressGroup, Subchannel>();
|
||||
List<RoundRobinEntry> newRoundRobinList = new ArrayList<RoundRobinEntry>();
|
||||
List<DropEntry> newDropList = new ArrayList<DropEntry>();
|
||||
List<BackendEntry> newBackendList = new ArrayList<BackendEntry>();
|
||||
// TODO(zhangkun83): honor expiration_interval
|
||||
// Construct the new collections. Create new Subchannels when necessary.
|
||||
for (Server server : serverList.getServersList()) {
|
||||
String token = server.getLoadBalanceToken();
|
||||
if (server.getDrop()) {
|
||||
newRoundRobinList.add(RoundRobinEntry.newDropEntry(loadRecorder, token));
|
||||
newDropList.add(new DropEntry(loadRecorder, token));
|
||||
} else {
|
||||
newDropList.add(null);
|
||||
InetSocketAddress address;
|
||||
try {
|
||||
address = new InetSocketAddress(
|
||||
|
|
@ -472,7 +482,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
|
|||
}
|
||||
newSubchannelMap.put(eag, subchannel);
|
||||
}
|
||||
newRoundRobinList.add(RoundRobinEntry.newEntry(subchannel, loadRecorder, token));
|
||||
newBackendList.add(new BackendEntry(subchannel, loadRecorder, token));
|
||||
}
|
||||
}
|
||||
// Close Subchannels whose addresses have been delisted
|
||||
|
|
@ -483,8 +493,9 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
|
|||
}
|
||||
}
|
||||
|
||||
subchannels = newSubchannelMap;
|
||||
roundRobinList = newRoundRobinList;
|
||||
subchannels = Collections.unmodifiableMap(newSubchannelMap);
|
||||
dropList = Collections.unmodifiableList(newDropList);
|
||||
backendList = Collections.unmodifiableList(newBackendList);
|
||||
maybeUpdatePicker();
|
||||
}
|
||||
|
||||
|
|
@ -527,63 +538,57 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
|
|||
}
|
||||
|
||||
/**
|
||||
* Make and use a picker out of the current roundRobinList and the states of subchannels if they
|
||||
* have changed since the last picker created.
|
||||
* Make and use a picker out of the current lists and the states of subchannels if they have
|
||||
* changed since the last picker created.
|
||||
*/
|
||||
private void maybeUpdatePicker() {
|
||||
List<RoundRobinEntry> resultList = new ArrayList<RoundRobinEntry>();
|
||||
List<RoundRobinEntry> pickList = new ArrayList<RoundRobinEntry>(backendList.size());
|
||||
Status error = null;
|
||||
boolean hasIdle = false;
|
||||
// TODO(zhangkun83): if roundRobinList contains at least one address, but none of them are
|
||||
// ready, maybe we should always return BUFFER_PICKER, no matter if there are drop entries or
|
||||
// not.
|
||||
for (RoundRobinEntry entry : roundRobinList) {
|
||||
for (BackendEntry entry : backendList) {
|
||||
Subchannel subchannel = entry.result.getSubchannel();
|
||||
if (subchannel != null) {
|
||||
Attributes attrs = subchannel.getAttributes();
|
||||
ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get();
|
||||
if (stateInfo.getState() == READY) {
|
||||
resultList.add(entry);
|
||||
} else if (stateInfo.getState() == TRANSIENT_FAILURE) {
|
||||
error = stateInfo.getStatus();
|
||||
} else if (stateInfo.getState() == IDLE) {
|
||||
hasIdle = true;
|
||||
}
|
||||
} else {
|
||||
// This is a drop entry.
|
||||
resultList.add(entry);
|
||||
Attributes attrs = subchannel.getAttributes();
|
||||
ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get();
|
||||
if (stateInfo.getState() == READY) {
|
||||
pickList.add(entry);
|
||||
} else if (stateInfo.getState() == TRANSIENT_FAILURE) {
|
||||
error = stateInfo.getStatus();
|
||||
} else if (stateInfo.getState() == IDLE) {
|
||||
hasIdle = true;
|
||||
}
|
||||
}
|
||||
if (resultList.isEmpty()) {
|
||||
ConnectivityState state;
|
||||
if (pickList.isEmpty()) {
|
||||
if (error != null && !hasIdle) {
|
||||
logger.log(Level.FINE, "[{0}] No ready Subchannel. Using error: {1}",
|
||||
new Object[] {logId, error});
|
||||
maybeUpdatePicker(TRANSIENT_FAILURE, new ErrorPicker(error));
|
||||
pickList.add(new ErrorEntry(error));
|
||||
state = TRANSIENT_FAILURE;
|
||||
} else {
|
||||
logger.log(Level.FINE, "[{0}] No ready Subchannel and still connecting", logId);
|
||||
maybeUpdatePicker(CONNECTING, BUFFER_PICKER);
|
||||
pickList.add(BUFFER_ENTRY);
|
||||
state = CONNECTING;
|
||||
}
|
||||
} else {
|
||||
logger.log(Level.FINE, "[{0}] Using list {1}", new Object[] {logId, resultList});
|
||||
maybeUpdatePicker(READY, new RoundRobinPicker(resultList));
|
||||
logger.log(
|
||||
Level.FINE, "[{0}] Using drop list {1} and pick list {2}",
|
||||
new Object[] {logId, dropList, pickList});
|
||||
state = READY;
|
||||
}
|
||||
maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList));
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the given picker to the helper if it's different from the current one.
|
||||
*/
|
||||
private void maybeUpdatePicker(ConnectivityState state, SubchannelPicker picker) {
|
||||
private void maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker) {
|
||||
// Discard the new picker if we are sure it won't make any difference, in order to save
|
||||
// re-processing pending streams, and avoid unnecessary resetting of the pointer in
|
||||
// RoundRobinPicker.
|
||||
if (picker == BUFFER_PICKER && currentPicker == BUFFER_PICKER) {
|
||||
if (picker.dropList.equals(currentPicker.dropList)
|
||||
&& picker.pickList.equals(currentPicker.pickList)) {
|
||||
return;
|
||||
}
|
||||
if (picker instanceof RoundRobinPicker && currentPicker instanceof RoundRobinPicker) {
|
||||
if (((RoundRobinPicker) picker).list.equals(((RoundRobinPicker) currentPicker).list)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
// No need to skip ErrorPicker. If the current picker is ErrorPicker, there won't be any pending
|
||||
// stream thus no time is wasted in re-process.
|
||||
currentPicker = picker;
|
||||
|
|
@ -632,105 +637,163 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static final class ErrorPicker extends SubchannelPicker {
|
||||
final PickResult result;
|
||||
static final class DropEntry {
|
||||
private final GrpclbClientLoadRecorder loadRecorder;
|
||||
private final String token;
|
||||
|
||||
ErrorPicker(Status status) {
|
||||
result = PickResult.withError(status);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static final class RoundRobinEntry {
|
||||
final PickResult result;
|
||||
final GrpclbClientLoadRecorder loadRecorder;
|
||||
final String token;
|
||||
|
||||
private RoundRobinEntry(
|
||||
PickResult result, GrpclbClientLoadRecorder loadRecorder, String token) {
|
||||
this.result = checkNotNull(result);
|
||||
DropEntry(GrpclbClientLoadRecorder loadRecorder, String token) {
|
||||
this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder");
|
||||
this.token = checkNotNull(token, "token");
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a non-drop result.
|
||||
*/
|
||||
static RoundRobinEntry newEntry(
|
||||
Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token) {
|
||||
return new RoundRobinEntry(
|
||||
PickResult.withSubchannel(subchannel, loadRecorder), loadRecorder, token);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a drop result.
|
||||
*/
|
||||
static RoundRobinEntry newDropEntry(GrpclbClientLoadRecorder loadRecorder, String token) {
|
||||
return new RoundRobinEntry(DROP_PICK_RESULT, loadRecorder, token);
|
||||
}
|
||||
|
||||
void updateHeaders(Metadata headers) {
|
||||
if (!isDrop()) {
|
||||
headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY);
|
||||
headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token);
|
||||
}
|
||||
PickResult picked() {
|
||||
loadRecorder.recordDroppedRequest(token);
|
||||
return DROP_PICK_RESULT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return MoreObjects.toStringHelper(this)
|
||||
.add("result", result)
|
||||
.add("loadRecorder", loadRecorder)
|
||||
.add("token", token)
|
||||
.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(result, token);
|
||||
return Objects.hashCode(loadRecorder, token);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (!(other instanceof RoundRobinEntry)) {
|
||||
if (!(other instanceof DropEntry)) {
|
||||
return false;
|
||||
}
|
||||
RoundRobinEntry that = (RoundRobinEntry) other;
|
||||
return Objects.equal(result, that.result) && Objects.equal(token, that.token);
|
||||
DropEntry that = (DropEntry) other;
|
||||
return Objects.equal(loadRecorder, that.loadRecorder) && Objects.equal(token, that.token);
|
||||
}
|
||||
}
|
||||
|
||||
private interface RoundRobinEntry {
|
||||
PickResult picked(Metadata headers);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static final class BackendEntry implements RoundRobinEntry {
|
||||
@VisibleForTesting
|
||||
final PickResult result;
|
||||
private final GrpclbClientLoadRecorder loadRecorder;
|
||||
private final String token;
|
||||
|
||||
BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token) {
|
||||
this.result = PickResult.withSubchannel(subchannel, loadRecorder);
|
||||
this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder");
|
||||
this.token = checkNotNull(token, "token");
|
||||
}
|
||||
|
||||
boolean isDrop() {
|
||||
return result == DROP_PICK_RESULT;
|
||||
@Override
|
||||
public PickResult picked(Metadata headers) {
|
||||
headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY);
|
||||
headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return MoreObjects.toStringHelper(this)
|
||||
.add("result", result)
|
||||
.add("loadRecorder", loadRecorder)
|
||||
.add("token", token)
|
||||
.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(loadRecorder, result, token);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (!(other instanceof BackendEntry)) {
|
||||
return false;
|
||||
}
|
||||
BackendEntry that = (BackendEntry) other;
|
||||
return Objects.equal(result, that.result) && Objects.equal(token, that.token)
|
||||
&& Objects.equal(loadRecorder, that.loadRecorder);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static final class ErrorEntry implements RoundRobinEntry {
|
||||
private final PickResult result;
|
||||
|
||||
ErrorEntry(Status status) {
|
||||
result = PickResult.withError(status);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PickResult picked(Metadata headers) {
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (!(other instanceof ErrorEntry)) {
|
||||
return false;
|
||||
}
|
||||
return Objects.equal(result, ((ErrorEntry) other).result);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static final class RoundRobinPicker extends SubchannelPicker {
|
||||
final List<RoundRobinEntry> list;
|
||||
private int index;
|
||||
@VisibleForTesting
|
||||
final List<DropEntry> dropList;
|
||||
private int dropIndex;
|
||||
|
||||
RoundRobinPicker(List<RoundRobinEntry> resultList) {
|
||||
checkArgument(!resultList.isEmpty(), "resultList is empty");
|
||||
this.list = checkNotNull(resultList, "resultList");
|
||||
@VisibleForTesting
|
||||
final List<? extends RoundRobinEntry> pickList;
|
||||
private int pickIndex;
|
||||
|
||||
// dropList can be empty, which means no drop.
|
||||
// pickList must not be empty.
|
||||
RoundRobinPicker(List<DropEntry> dropList, List<? extends RoundRobinEntry> pickList) {
|
||||
this.dropList = checkNotNull(dropList, "dropList");
|
||||
this.pickList = checkNotNull(pickList, "pickList");
|
||||
checkArgument(!pickList.isEmpty(), "pickList is empty");
|
||||
}
|
||||
|
||||
@Override
|
||||
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
||||
synchronized (list) {
|
||||
RoundRobinEntry result = list.get(index);
|
||||
index++;
|
||||
if (index == list.size()) {
|
||||
index = 0;
|
||||
synchronized (pickList) {
|
||||
// Two-level round-robin.
|
||||
// First round-robin on dropList. If a drop entry is selected, request will be dropped. If
|
||||
// a non-drop entry is selected, then round-robin on pickList. This makes sure requests are
|
||||
// dropped at the same proportion as the drop entries appear on the round-robin list from
|
||||
// the balancer, while only READY backends (that make up pickList) are selected for the
|
||||
// non-drop cases.
|
||||
if (!dropList.isEmpty()) {
|
||||
DropEntry drop = dropList.get(dropIndex);
|
||||
dropIndex++;
|
||||
if (dropIndex == dropList.size()) {
|
||||
dropIndex = 0;
|
||||
}
|
||||
if (drop != null) {
|
||||
return drop.picked();
|
||||
}
|
||||
}
|
||||
result.updateHeaders(args.getHeaders());
|
||||
if (result.isDrop()) {
|
||||
result.loadRecorder.recordDroppedRequest(result.token);
|
||||
|
||||
RoundRobinEntry pick = pickList.get(pickIndex);
|
||||
pickIndex++;
|
||||
if (pickIndex == pickList.size()) {
|
||||
pickIndex = 0;
|
||||
}
|
||||
return result.result;
|
||||
return pick.picked(args.getHeaders());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,6 +22,8 @@ import static io.grpc.ConnectivityState.IDLE;
|
|||
import static io.grpc.ConnectivityState.READY;
|
||||
import static io.grpc.ConnectivityState.SHUTDOWN;
|
||||
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
|
||||
import static io.grpc.grpclb.GrpclbLoadBalancer.BUFFER_ENTRY;
|
||||
import static io.grpc.grpclb.GrpclbLoadBalancer.DROP_PICK_RESULT;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
|
@ -61,8 +63,9 @@ import io.grpc.ManagedChannel;
|
|||
import io.grpc.Metadata;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.grpclb.GrpclbConstants.LbPolicy;
|
||||
import io.grpc.grpclb.GrpclbLoadBalancer.ErrorPicker;
|
||||
import io.grpc.grpclb.GrpclbLoadBalancer.RoundRobinEntry;
|
||||
import io.grpc.grpclb.GrpclbLoadBalancer.BackendEntry;
|
||||
import io.grpc.grpclb.GrpclbLoadBalancer.DropEntry;
|
||||
import io.grpc.grpclb.GrpclbLoadBalancer.ErrorEntry;
|
||||
import io.grpc.grpclb.GrpclbLoadBalancer.RoundRobinPicker;
|
||||
import io.grpc.inprocess.InProcessChannelBuilder;
|
||||
import io.grpc.inprocess.InProcessServerBuilder;
|
||||
|
|
@ -237,64 +240,95 @@ public class GrpclbLoadBalancerTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void errorPicker() {
|
||||
PickSubchannelArgs mockArgs = mock(PickSubchannelArgs.class);
|
||||
Status error = Status.UNAVAILABLE.withDescription("Just don't know why");
|
||||
ErrorPicker picker = new ErrorPicker(error);
|
||||
assertSame(error, picker.pickSubchannel(mockArgs).getStatus());
|
||||
verifyNoMoreInteractions(mockArgs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void roundRobinPicker() {
|
||||
public void roundRobinPickerNoDrop() {
|
||||
GrpclbClientLoadRecorder loadRecorder = new GrpclbClientLoadRecorder(timeProvider);
|
||||
Subchannel subchannel = mock(Subchannel.class);
|
||||
RoundRobinEntry r1 = RoundRobinEntry.newDropEntry(loadRecorder, "LBTOKEN0001");
|
||||
RoundRobinEntry r2 = RoundRobinEntry.newEntry(subchannel, loadRecorder, "LBTOKEN0001");
|
||||
RoundRobinEntry r3 = RoundRobinEntry.newEntry(subchannel, loadRecorder, "LBTOKEN0002");
|
||||
BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001");
|
||||
BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002");
|
||||
|
||||
List<RoundRobinEntry> list = Arrays.asList(r1, r2, r3);
|
||||
RoundRobinPicker picker = new RoundRobinPicker(list);
|
||||
List<BackendEntry> pickList = Arrays.asList(b1, b2);
|
||||
RoundRobinPicker picker = new RoundRobinPicker(Collections.<DropEntry>emptyList(), pickList);
|
||||
|
||||
PickSubchannelArgs args1 = mock(PickSubchannelArgs.class);
|
||||
Metadata headers1 = new Metadata();
|
||||
// The existing token on the headers will be replaced
|
||||
headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD");
|
||||
when(args1.getHeaders()).thenReturn(headers1);
|
||||
assertSame(r1.result, picker.pickSubchannel(args1));
|
||||
assertSame(b1.result, picker.pickSubchannel(args1));
|
||||
verify(args1).getHeaders();
|
||||
assertFalse(headers1.containsKey(GrpclbConstants.TOKEN_METADATA_KEY));
|
||||
assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
|
||||
|
||||
PickSubchannelArgs args2 = mock(PickSubchannelArgs.class);
|
||||
Metadata headers2 = new Metadata();
|
||||
// The existing token on the headers will be replaced
|
||||
headers2.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD");
|
||||
when(args2.getHeaders()).thenReturn(headers2);
|
||||
assertSame(r2.result, picker.pickSubchannel(args2));
|
||||
assertSame(b2.result, picker.pickSubchannel(args2));
|
||||
verify(args2).getHeaders();
|
||||
assertThat(headers2.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
|
||||
assertThat(headers2.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002");
|
||||
|
||||
PickSubchannelArgs args3 = mock(PickSubchannelArgs.class);
|
||||
Metadata headers3 = new Metadata();
|
||||
when(args3.getHeaders()).thenReturn(headers3);
|
||||
assertSame(r3.result, picker.pickSubchannel(args3));
|
||||
assertSame(b1.result, picker.pickSubchannel(args3));
|
||||
verify(args3).getHeaders();
|
||||
assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002");
|
||||
|
||||
PickSubchannelArgs args4 = mock(PickSubchannelArgs.class);
|
||||
Metadata headers4 = new Metadata();
|
||||
when(args4.getHeaders()).thenReturn(headers4);
|
||||
assertSame(r1.result, picker.pickSubchannel(args4));
|
||||
verify(args4).getHeaders();
|
||||
assertFalse(headers4.containsKey(GrpclbConstants.TOKEN_METADATA_KEY));
|
||||
assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
|
||||
|
||||
verify(subchannel, never()).getAttributes();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void bufferPicker() {
|
||||
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
|
||||
assertEquals(PickResult.withNoResult(),
|
||||
GrpclbLoadBalancer.BUFFER_PICKER.pickSubchannel(args));
|
||||
verifyNoMoreInteractions(args);
|
||||
public void roundRobinPickerWithDrop() {
|
||||
GrpclbClientLoadRecorder loadRecorder = new GrpclbClientLoadRecorder(timeProvider);
|
||||
Subchannel subchannel = mock(Subchannel.class);
|
||||
// 1 out of 2 requests are to be dropped
|
||||
DropEntry d = new DropEntry(loadRecorder, "LBTOKEN0003");
|
||||
List<DropEntry> dropList = Arrays.asList(null, d);
|
||||
|
||||
BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001");
|
||||
BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002");
|
||||
List<BackendEntry> pickList = Arrays.asList(b1, b2);
|
||||
RoundRobinPicker picker = new RoundRobinPicker(dropList, pickList);
|
||||
|
||||
// dropList[0], pickList[0]
|
||||
PickSubchannelArgs args1 = mock(PickSubchannelArgs.class);
|
||||
Metadata headers1 = new Metadata();
|
||||
headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD");
|
||||
when(args1.getHeaders()).thenReturn(headers1);
|
||||
assertSame(b1.result, picker.pickSubchannel(args1));
|
||||
verify(args1).getHeaders();
|
||||
assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
|
||||
|
||||
// dropList[1]: drop
|
||||
PickSubchannelArgs args2 = mock(PickSubchannelArgs.class);
|
||||
Metadata headers2 = new Metadata();
|
||||
when(args2.getHeaders()).thenReturn(headers2);
|
||||
assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args2));
|
||||
verify(args2, never()).getHeaders();
|
||||
|
||||
// dropList[0], pickList[1]
|
||||
PickSubchannelArgs args3 = mock(PickSubchannelArgs.class);
|
||||
Metadata headers3 = new Metadata();
|
||||
when(args3.getHeaders()).thenReturn(headers3);
|
||||
assertSame(b2.result, picker.pickSubchannel(args3));
|
||||
verify(args3).getHeaders();
|
||||
assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002");
|
||||
|
||||
// dropList[1]: drop
|
||||
PickSubchannelArgs args4 = mock(PickSubchannelArgs.class);
|
||||
Metadata headers4 = new Metadata();
|
||||
when(args4.getHeaders()).thenReturn(headers4);
|
||||
assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args4));
|
||||
verify(args4, never()).getHeaders();
|
||||
|
||||
// dropList[0], pickList[0]
|
||||
PickSubchannelArgs args5 = mock(PickSubchannelArgs.class);
|
||||
Metadata headers5 = new Metadata();
|
||||
when(args5.getHeaders()).thenReturn(headers5);
|
||||
assertSame(b1.result, picker.pickSubchannel(args5));
|
||||
verify(args5).getHeaders();
|
||||
assertThat(headers5.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
|
||||
|
||||
verify(subchannel, never()).getAttributes();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -331,9 +365,9 @@ public class GrpclbLoadBalancerTest {
|
|||
|
||||
List<ServerEntry> backends = Arrays.asList(
|
||||
new ServerEntry("127.0.0.1", 2000, "token0001"),
|
||||
new ServerEntry("token0001"),
|
||||
new ServerEntry("token0001"), // drop
|
||||
new ServerEntry("127.0.0.1", 2010, "token0002"),
|
||||
new ServerEntry("token0003"));
|
||||
new ServerEntry("token0003")); // drop
|
||||
|
||||
lbResponseObserver.onNext(buildLbResponse(backends));
|
||||
|
||||
|
|
@ -348,11 +382,14 @@ public class GrpclbLoadBalancerTest {
|
|||
helperInOrder.verify(helper, atLeast(1))
|
||||
.updateBalancingState(eq(READY), pickerCaptor.capture());
|
||||
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
|
||||
assertThat(picker.list).containsExactly(
|
||||
RoundRobinEntry.newEntry(subchannel1, balancer.getLoadRecorder(), "token0001"),
|
||||
RoundRobinEntry.newDropEntry(balancer.getLoadRecorder(), "token0001"),
|
||||
RoundRobinEntry.newEntry(subchannel2, balancer.getLoadRecorder(), "token0002"),
|
||||
RoundRobinEntry.newDropEntry(balancer.getLoadRecorder(), "token0003")).inOrder();
|
||||
assertThat(picker.dropList).containsExactly(
|
||||
null,
|
||||
new DropEntry(balancer.getLoadRecorder(), "token0001"),
|
||||
null,
|
||||
new DropEntry(balancer.getLoadRecorder(), "token0003")).inOrder();
|
||||
assertThat(picker.pickList).containsExactly(
|
||||
new BackendEntry(subchannel1, balancer.getLoadRecorder(), "token0001"),
|
||||
new BackendEntry(subchannel2, balancer.getLoadRecorder(), "token0002")).inOrder();
|
||||
|
||||
// Report, no data
|
||||
assertNextReport(
|
||||
|
|
@ -623,8 +660,9 @@ public class GrpclbLoadBalancerTest {
|
|||
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
|
||||
deliverNameResolutionError(error);
|
||||
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
|
||||
ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue();
|
||||
assertSame(error, errorPicker.result.getStatus());
|
||||
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
|
||||
assertThat(picker.dropList).isEmpty();
|
||||
assertThat(picker.pickList).containsExactly(new ErrorEntry(error));
|
||||
|
||||
// Recover with a subsequent success
|
||||
List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(false);
|
||||
|
|
@ -643,8 +681,9 @@ public class GrpclbLoadBalancerTest {
|
|||
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
|
||||
deliverNameResolutionError(error);
|
||||
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
|
||||
ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue();
|
||||
assertSame(error, errorPicker.result.getStatus());
|
||||
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
|
||||
assertThat(picker.dropList).isEmpty();
|
||||
assertThat(picker.pickList).containsExactly(new ErrorEntry(error));
|
||||
|
||||
// Recover with a subsequent success
|
||||
List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(true);
|
||||
|
|
@ -730,8 +769,9 @@ public class GrpclbLoadBalancerTest {
|
|||
deliverNameResolutionError(error);
|
||||
|
||||
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
|
||||
ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue();
|
||||
assertSame(error, errorPicker.result.getStatus());
|
||||
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
|
||||
assertThat(picker.dropList).isEmpty();
|
||||
assertThat(picker.pickList).containsExactly(new ErrorEntry(error));
|
||||
assertFalse(oobChannel.isShutdown());
|
||||
|
||||
// Simulate receiving LB response
|
||||
|
|
@ -930,6 +970,10 @@ public class GrpclbLoadBalancerTest {
|
|||
|
||||
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
|
||||
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING));
|
||||
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
|
||||
RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue();
|
||||
assertThat(picker0.dropList).containsExactly(null, null);
|
||||
assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY);
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
|
||||
// Let subchannels be connected
|
||||
|
|
@ -937,15 +981,17 @@ public class GrpclbLoadBalancerTest {
|
|||
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
|
||||
RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue();
|
||||
|
||||
assertThat(picker1.list).containsExactly(
|
||||
RoundRobinEntry.newEntry(subchannel2, balancer.getLoadRecorder(), "token0002"));
|
||||
assertThat(picker1.dropList).containsExactly(null, null);
|
||||
assertThat(picker1.pickList).containsExactly(
|
||||
new BackendEntry(subchannel2, balancer.getLoadRecorder(), "token0002"));
|
||||
|
||||
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
|
||||
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
|
||||
RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
|
||||
assertThat(picker2.list).containsExactly(
|
||||
RoundRobinEntry.newEntry(subchannel1, balancer.getLoadRecorder(), "token0001"),
|
||||
RoundRobinEntry.newEntry(subchannel2, balancer.getLoadRecorder(), "token0002"))
|
||||
assertThat(picker2.dropList).containsExactly(null, null);
|
||||
assertThat(picker2.pickList).containsExactly(
|
||||
new BackendEntry(subchannel1, balancer.getLoadRecorder(), "token0001"),
|
||||
new BackendEntry(subchannel2, balancer.getLoadRecorder(), "token0002"))
|
||||
.inOrder();
|
||||
|
||||
// Disconnected subchannels
|
||||
|
|
@ -954,8 +1000,9 @@ public class GrpclbLoadBalancerTest {
|
|||
verify(subchannel1, times(2)).requestConnection();
|
||||
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
|
||||
RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue();
|
||||
assertThat(picker3.list).containsExactly(
|
||||
RoundRobinEntry.newEntry(subchannel2, balancer.getLoadRecorder(), "token0002"));
|
||||
assertThat(picker3.dropList).containsExactly(null, null);
|
||||
assertThat(picker3.pickList).containsExactly(
|
||||
new BackendEntry(subchannel2, balancer.getLoadRecorder(), "token0002"));
|
||||
|
||||
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
|
|
@ -969,16 +1016,19 @@ public class GrpclbLoadBalancerTest {
|
|||
verify(subchannel2).requestConnection();
|
||||
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(IDLE));
|
||||
verify(subchannel2, times(2)).requestConnection();
|
||||
inOrder.verify(helper).updateBalancingState(CONNECTING, GrpclbLoadBalancer.BUFFER_PICKER);
|
||||
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
|
||||
RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue();
|
||||
assertThat(picker4.dropList).containsExactly(null, null);
|
||||
assertThat(picker4.pickList).containsExactly(BUFFER_ENTRY);
|
||||
|
||||
// Update backends, with a drop entry
|
||||
List<ServerEntry> backends2 =
|
||||
Arrays.asList(
|
||||
new ServerEntry("127.0.0.1", 2030, "token0003"), // New address
|
||||
new ServerEntry("token0003"),
|
||||
new ServerEntry("token0003"), // drop
|
||||
new ServerEntry("127.0.0.1", 2010, "token0004"), // Existing address with token changed
|
||||
new ServerEntry("127.0.0.1", 2030, "token0005"), // New address appearing second time
|
||||
new ServerEntry("token0006"));
|
||||
new ServerEntry("token0006")); // drop
|
||||
verify(subchannel1, never()).shutdown();
|
||||
|
||||
lbResponseObserver.onNext(buildLbResponse(backends2));
|
||||
|
|
@ -993,11 +1043,15 @@ public class GrpclbLoadBalancerTest {
|
|||
Subchannel subchannel3 = mockSubchannels.poll();
|
||||
verify(subchannel3).requestConnection();
|
||||
assertEquals(new EquivalentAddressGroup(backends2.get(0).addr), subchannel3.getAddresses());
|
||||
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
|
||||
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
|
||||
RoundRobinPicker picker7 = (RoundRobinPicker) pickerCaptor.getValue();
|
||||
assertThat(picker7.list).containsExactly(
|
||||
RoundRobinEntry.newDropEntry(balancer.getLoadRecorder(), "token0003"),
|
||||
RoundRobinEntry.newDropEntry(balancer.getLoadRecorder(), "token0006")).inOrder();
|
||||
assertThat(picker7.dropList).containsExactly(
|
||||
null,
|
||||
new DropEntry(balancer.getLoadRecorder(), "token0003"),
|
||||
null,
|
||||
null,
|
||||
new DropEntry(balancer.getLoadRecorder(), "token0006")).inOrder();
|
||||
assertThat(picker7.pickList).containsExactly(BUFFER_ENTRY);
|
||||
|
||||
// State updates on obsolete subchannel1 will have no effect
|
||||
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
|
||||
|
|
@ -1009,29 +1063,40 @@ public class GrpclbLoadBalancerTest {
|
|||
deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY));
|
||||
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
|
||||
RoundRobinPicker picker8 = (RoundRobinPicker) pickerCaptor.getValue();
|
||||
assertThat(picker8.dropList).containsExactly(
|
||||
null,
|
||||
new DropEntry(balancer.getLoadRecorder(), "token0003"),
|
||||
null,
|
||||
null,
|
||||
new DropEntry(balancer.getLoadRecorder(), "token0006")).inOrder();
|
||||
// subchannel2 is still IDLE, thus not in the active list
|
||||
assertThat(picker8.list).containsExactly(
|
||||
RoundRobinEntry.newEntry(subchannel3, balancer.getLoadRecorder(), "token0003"),
|
||||
RoundRobinEntry.newDropEntry(balancer.getLoadRecorder(), "token0003"),
|
||||
RoundRobinEntry.newEntry(subchannel3, balancer.getLoadRecorder(), "token0005"),
|
||||
RoundRobinEntry.newDropEntry(balancer.getLoadRecorder(), "token0006")).inOrder();
|
||||
assertThat(picker8.pickList).containsExactly(
|
||||
new BackendEntry(subchannel3, balancer.getLoadRecorder(), "token0003"),
|
||||
new BackendEntry(subchannel3, balancer.getLoadRecorder(), "token0005")).inOrder();
|
||||
// subchannel2 becomes READY and makes it into the list
|
||||
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
|
||||
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
|
||||
RoundRobinPicker picker9 = (RoundRobinPicker) pickerCaptor.getValue();
|
||||
assertThat(picker9.list).containsExactly(
|
||||
RoundRobinEntry.newEntry(subchannel3, balancer.getLoadRecorder(), "token0003"),
|
||||
RoundRobinEntry.newDropEntry(balancer.getLoadRecorder(), "token0003"),
|
||||
RoundRobinEntry.newEntry(subchannel2, balancer.getLoadRecorder(), "token0004"),
|
||||
RoundRobinEntry.newEntry(subchannel3, balancer.getLoadRecorder(), "token0005"),
|
||||
RoundRobinEntry.newDropEntry(balancer.getLoadRecorder(), "token0006")).inOrder();
|
||||
assertThat(picker9.dropList).containsExactly(
|
||||
null,
|
||||
new DropEntry(balancer.getLoadRecorder(), "token0003"),
|
||||
null,
|
||||
null,
|
||||
new DropEntry(balancer.getLoadRecorder(), "token0006")).inOrder();
|
||||
assertThat(picker9.pickList).containsExactly(
|
||||
new BackendEntry(subchannel3, balancer.getLoadRecorder(), "token0003"),
|
||||
new BackendEntry(subchannel2, balancer.getLoadRecorder(), "token0004"),
|
||||
new BackendEntry(subchannel3, balancer.getLoadRecorder(), "token0005")).inOrder();
|
||||
verify(subchannel3, never()).shutdown();
|
||||
|
||||
// Update backends, with no entry
|
||||
lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList()));
|
||||
verify(subchannel2).shutdown();
|
||||
verify(subchannel3).shutdown();
|
||||
inOrder.verify(helper).updateBalancingState(CONNECTING, GrpclbLoadBalancer.BUFFER_PICKER);
|
||||
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
|
||||
RoundRobinPicker picker10 = (RoundRobinPicker) pickerCaptor.getValue();
|
||||
assertThat(picker10.dropList).isEmpty();
|
||||
assertThat(picker10.pickList).containsExactly(BUFFER_ENTRY);
|
||||
|
||||
assertFalse(oobChannel.isShutdown());
|
||||
assertEquals(0, lbRequestObservers.size());
|
||||
|
|
|
|||
Loading…
Reference in New Issue