mirror of https://github.com/grpc/grpc-java.git
grpclb: support explicit fallback from LB (#6549)
This commit is contained in:
parent
e6d15a6d04
commit
641d74f34f
|
|
@ -581,7 +581,12 @@ final class GrpclbState {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (typeCase != LoadBalanceResponseTypeCase.SERVER_LIST) {
|
if (typeCase == LoadBalanceResponseTypeCase.FALLBACK_RESPONSE) {
|
||||||
|
cancelFallbackTimer();
|
||||||
|
useFallbackBackends();
|
||||||
|
maybeUpdatePicker();
|
||||||
|
return;
|
||||||
|
} else if (typeCase != LoadBalanceResponseTypeCase.SERVER_LIST) {
|
||||||
logger.log(ChannelLogLevel.WARNING, "Ignoring unexpected response type: {0}", typeCase);
|
logger.log(ChannelLogLevel.WARNING, "Ignoring unexpected response type: {0}", typeCase);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -94,6 +94,11 @@ message LoadBalanceResponse {
|
||||||
// Contains the list of servers selected by the load balancer. The client
|
// Contains the list of servers selected by the load balancer. The client
|
||||||
// should send requests to these servers in the specified order.
|
// should send requests to these servers in the specified order.
|
||||||
ServerList server_list = 2;
|
ServerList server_list = 2;
|
||||||
|
|
||||||
|
// If this field is set, then the client should eagerly enter fallback
|
||||||
|
// mode (even if there are existing, healthy connections to backends).
|
||||||
|
// See go/grpclb-explicit-fallback for more details.
|
||||||
|
FallbackResponse fallback_response = 3;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -111,6 +116,8 @@ message InitialLoadBalanceResponse {
|
||||||
google.protobuf.Duration client_stats_report_interval = 2;
|
google.protobuf.Duration client_stats_report_interval = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message FallbackResponse {}
|
||||||
|
|
||||||
message ServerList {
|
message ServerList {
|
||||||
// Contains a list of servers selected by the load balancer. The list will
|
// Contains a list of servers selected by the load balancer. The list will
|
||||||
// be updated when server resolutions change or as needed to balance load
|
// be updated when server resolutions change or as needed to balance load
|
||||||
|
|
|
||||||
|
|
@ -81,6 +81,7 @@ import io.grpc.internal.GrpcAttributes;
|
||||||
import io.grpc.internal.JsonParser;
|
import io.grpc.internal.JsonParser;
|
||||||
import io.grpc.lb.v1.ClientStats;
|
import io.grpc.lb.v1.ClientStats;
|
||||||
import io.grpc.lb.v1.ClientStatsPerToken;
|
import io.grpc.lb.v1.ClientStatsPerToken;
|
||||||
|
import io.grpc.lb.v1.FallbackResponse;
|
||||||
import io.grpc.lb.v1.InitialLoadBalanceRequest;
|
import io.grpc.lb.v1.InitialLoadBalanceRequest;
|
||||||
import io.grpc.lb.v1.InitialLoadBalanceResponse;
|
import io.grpc.lb.v1.InitialLoadBalanceResponse;
|
||||||
import io.grpc.lb.v1.LoadBalanceRequest;
|
import io.grpc.lb.v1.LoadBalanceRequest;
|
||||||
|
|
@ -288,7 +289,6 @@ public class GrpclbLoadBalancerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public void tearDown() {
|
public void tearDown() {
|
||||||
try {
|
try {
|
||||||
if (balancer != null) {
|
if (balancer != null) {
|
||||||
|
|
@ -2085,6 +2085,210 @@ public class GrpclbLoadBalancerTest {
|
||||||
assertThat(mode).isEqualTo(Mode.ROUND_ROBIN);
|
assertThat(mode).isEqualTo(Mode.ROUND_ROBIN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void grpclbWorking_lbSendsFallbackMessage() {
|
||||||
|
InOrder inOrder = inOrder(helper, subchannelPool);
|
||||||
|
List<EquivalentAddressGroup> grpclbResolutionList =
|
||||||
|
createResolvedServerAddresses(true, true, false, false);
|
||||||
|
List<EquivalentAddressGroup> fallbackEags = grpclbResolutionList.subList(2, 4);
|
||||||
|
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
|
||||||
|
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
|
||||||
|
|
||||||
|
// Fallback timer is started as soon as the addresses are resolved.
|
||||||
|
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
|
||||||
|
|
||||||
|
List<SocketAddress> addrs = new ArrayList<>();
|
||||||
|
addrs.addAll(grpclbResolutionList.get(0).getAddresses());
|
||||||
|
addrs.addAll(grpclbResolutionList.get(1).getAddresses());
|
||||||
|
Attributes attr = grpclbResolutionList.get(0).getAttributes();
|
||||||
|
EquivalentAddressGroup oobChannelEag = new EquivalentAddressGroup(addrs, attr);
|
||||||
|
verify(helper).createOobChannel(eq(oobChannelEag), eq(lbAuthority(0)));
|
||||||
|
assertEquals(1, fakeOobChannels.size());
|
||||||
|
ManagedChannel oobChannel = fakeOobChannels.poll();
|
||||||
|
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
|
||||||
|
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
|
||||||
|
assertEquals(1, lbRequestObservers.size());
|
||||||
|
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
|
||||||
|
verify(lbRequestObserver).onNext(
|
||||||
|
eq(LoadBalanceRequest.newBuilder()
|
||||||
|
.setInitialRequest(
|
||||||
|
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
// Simulate receiving LB response
|
||||||
|
ServerEntry backend1a = new ServerEntry("127.0.0.1", 2000, "token0001");
|
||||||
|
ServerEntry backend1b = new ServerEntry("127.0.0.1", 2010, "token0002");
|
||||||
|
List<ServerEntry> backends1 = Arrays.asList(backend1a, backend1b);
|
||||||
|
inOrder.verify(helper, never())
|
||||||
|
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
|
||||||
|
logs.clear();
|
||||||
|
lbResponseObserver.onNext(buildInitialResponse());
|
||||||
|
assertThat(logs).containsExactly("DEBUG: Got an LB response: " + buildInitialResponse());
|
||||||
|
logs.clear();
|
||||||
|
lbResponseObserver.onNext(buildLbResponse(backends1));
|
||||||
|
|
||||||
|
inOrder.verify(subchannelPool).takeOrCreateSubchannel(
|
||||||
|
eq(new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS)),
|
||||||
|
any(Attributes.class));
|
||||||
|
inOrder.verify(subchannelPool).takeOrCreateSubchannel(
|
||||||
|
eq(new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS)),
|
||||||
|
any(Attributes.class));
|
||||||
|
|
||||||
|
assertEquals(2, mockSubchannels.size());
|
||||||
|
Subchannel subchannel1 = mockSubchannels.poll();
|
||||||
|
Subchannel subchannel2 = mockSubchannels.poll();
|
||||||
|
|
||||||
|
verify(subchannel1).requestConnection();
|
||||||
|
verify(subchannel2).requestConnection();
|
||||||
|
assertEquals(
|
||||||
|
new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS),
|
||||||
|
subchannel1.getAddresses());
|
||||||
|
assertEquals(
|
||||||
|
new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS),
|
||||||
|
subchannel2.getAddresses());
|
||||||
|
|
||||||
|
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
|
||||||
|
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING));
|
||||||
|
|
||||||
|
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
|
||||||
|
RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue();
|
||||||
|
assertThat(picker0.dropList).containsExactly(null, null);
|
||||||
|
assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY);
|
||||||
|
inOrder.verifyNoMoreInteractions();
|
||||||
|
|
||||||
|
assertThat(logs).containsExactly(
|
||||||
|
"DEBUG: Got an LB response: " + buildLbResponse(backends1),
|
||||||
|
"INFO: Using RR list="
|
||||||
|
+ "[[[/127.0.0.1:2000]/{io.grpc.grpclb.lbProvidedBackend=true}](token0001),"
|
||||||
|
+ " [[/127.0.0.1:2010]/{io.grpc.grpclb.lbProvidedBackend=true}](token0002)],"
|
||||||
|
+ " drop=[null, null]",
|
||||||
|
"INFO: CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]").inOrder();
|
||||||
|
logs.clear();
|
||||||
|
|
||||||
|
// Let subchannels be connected
|
||||||
|
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
|
||||||
|
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
|
||||||
|
assertThat(logs).containsExactly(
|
||||||
|
"INFO: READY: picks="
|
||||||
|
+ "[[[[[/127.0.0.1:2010]/{io.grpc.grpclb.lbProvidedBackend=true}]](token0002)]],"
|
||||||
|
+ " drops=[null, null]");
|
||||||
|
logs.clear();
|
||||||
|
|
||||||
|
RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue();
|
||||||
|
|
||||||
|
assertThat(picker1.dropList).containsExactly(null, null);
|
||||||
|
assertThat(picker1.pickList).containsExactly(
|
||||||
|
new BackendEntry(subchannel2, getLoadRecorder(), "token0002"));
|
||||||
|
|
||||||
|
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
|
||||||
|
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
|
||||||
|
assertThat(logs).containsExactly(
|
||||||
|
"INFO: READY: picks="
|
||||||
|
+ "[[[[[/127.0.0.1:2000]/{io.grpc.grpclb.lbProvidedBackend=true}]](token0001)],"
|
||||||
|
+ " [[[[/127.0.0.1:2010]/{io.grpc.grpclb.lbProvidedBackend=true}]](token0002)]],"
|
||||||
|
+ " drops=[null, null]");
|
||||||
|
logs.clear();
|
||||||
|
|
||||||
|
RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
|
||||||
|
assertThat(picker2.dropList).containsExactly(null, null);
|
||||||
|
assertThat(picker2.pickList).containsExactly(
|
||||||
|
new BackendEntry(subchannel1, getLoadRecorder(), "token0001"),
|
||||||
|
new BackendEntry(subchannel2, getLoadRecorder(), "token0002"))
|
||||||
|
.inOrder();
|
||||||
|
|
||||||
|
// enter fallback mode
|
||||||
|
lbResponseObserver.onNext(buildLbFallbackResponse());
|
||||||
|
|
||||||
|
// existing subchannels must be returned immediately to gracefully shutdown.
|
||||||
|
verify(subchannelPool)
|
||||||
|
.returnSubchannel(eq(subchannel1), eq(ConnectivityStateInfo.forNonError(READY)));
|
||||||
|
verify(subchannelPool)
|
||||||
|
.returnSubchannel(eq(subchannel2), eq(ConnectivityStateInfo.forNonError(READY)));
|
||||||
|
|
||||||
|
// verify fallback
|
||||||
|
fallbackTestVerifyUseOfFallbackBackendLists(inOrder, fallbackEags);
|
||||||
|
|
||||||
|
assertFalse(oobChannel.isShutdown());
|
||||||
|
verify(lbRequestObserver, never()).onCompleted();
|
||||||
|
|
||||||
|
// exit fall back by providing two new backends
|
||||||
|
ServerEntry backend2a = new ServerEntry("127.0.0.1", 8000, "token1001");
|
||||||
|
ServerEntry backend2b = new ServerEntry("127.0.0.1", 8010, "token1002");
|
||||||
|
List<ServerEntry> backends2 = Arrays.asList(backend2a, backend2b);
|
||||||
|
|
||||||
|
inOrder.verify(helper, never())
|
||||||
|
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
|
||||||
|
logs.clear();
|
||||||
|
lbResponseObserver.onNext(buildLbResponse(backends2));
|
||||||
|
|
||||||
|
inOrder.verify(subchannelPool).takeOrCreateSubchannel(
|
||||||
|
eq(new EquivalentAddressGroup(backend2a.addr, LB_BACKEND_ATTRS)),
|
||||||
|
any(Attributes.class));
|
||||||
|
inOrder.verify(subchannelPool).takeOrCreateSubchannel(
|
||||||
|
eq(new EquivalentAddressGroup(backend2b.addr, LB_BACKEND_ATTRS)),
|
||||||
|
any(Attributes.class));
|
||||||
|
|
||||||
|
assertEquals(2, mockSubchannels.size());
|
||||||
|
Subchannel subchannel3 = mockSubchannels.poll();
|
||||||
|
Subchannel subchannel4 = mockSubchannels.poll();
|
||||||
|
verify(subchannel3).requestConnection();
|
||||||
|
verify(subchannel4).requestConnection();
|
||||||
|
assertEquals(
|
||||||
|
new EquivalentAddressGroup(backend2a.addr, LB_BACKEND_ATTRS),
|
||||||
|
subchannel3.getAddresses());
|
||||||
|
assertEquals(
|
||||||
|
new EquivalentAddressGroup(backend2b.addr, LB_BACKEND_ATTRS),
|
||||||
|
subchannel4.getAddresses());
|
||||||
|
|
||||||
|
deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(CONNECTING));
|
||||||
|
deliverSubchannelState(subchannel4, ConnectivityStateInfo.forNonError(CONNECTING));
|
||||||
|
|
||||||
|
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
|
||||||
|
RoundRobinPicker picker6 = (RoundRobinPicker) pickerCaptor.getValue();
|
||||||
|
assertThat(picker6.dropList).containsExactly(null, null);
|
||||||
|
assertThat(picker6.pickList).containsExactly(BUFFER_ENTRY);
|
||||||
|
inOrder.verifyNoMoreInteractions();
|
||||||
|
|
||||||
|
assertThat(logs).containsExactly(
|
||||||
|
"DEBUG: Got an LB response: " + buildLbResponse(backends2),
|
||||||
|
"INFO: Using RR list="
|
||||||
|
+ "[[[/127.0.0.1:8000]/{io.grpc.grpclb.lbProvidedBackend=true}](token1001),"
|
||||||
|
+ " [[/127.0.0.1:8010]/{io.grpc.grpclb.lbProvidedBackend=true}](token1002)],"
|
||||||
|
+ " drop=[null, null]",
|
||||||
|
"INFO: CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]").inOrder();
|
||||||
|
logs.clear();
|
||||||
|
|
||||||
|
// Let new subchannels be connected
|
||||||
|
deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY));
|
||||||
|
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
|
||||||
|
assertThat(logs).containsExactly(
|
||||||
|
"INFO: READY: picks="
|
||||||
|
+ "[[[[[/127.0.0.1:8000]/{io.grpc.grpclb.lbProvidedBackend=true}]](token1001)]],"
|
||||||
|
+ " drops=[null, null]");
|
||||||
|
logs.clear();
|
||||||
|
|
||||||
|
RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue();
|
||||||
|
assertThat(picker3.dropList).containsExactly(null, null);
|
||||||
|
assertThat(picker3.pickList).containsExactly(
|
||||||
|
new BackendEntry(subchannel3, getLoadRecorder(), "token1001"));
|
||||||
|
|
||||||
|
deliverSubchannelState(subchannel4, ConnectivityStateInfo.forNonError(READY));
|
||||||
|
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
|
||||||
|
assertThat(logs).containsExactly(
|
||||||
|
"INFO: READY: picks="
|
||||||
|
+ "[[[[[/127.0.0.1:8000]/{io.grpc.grpclb.lbProvidedBackend=true}]](token1001)],"
|
||||||
|
+ " [[[[/127.0.0.1:8010]/{io.grpc.grpclb.lbProvidedBackend=true}]](token1002)]],"
|
||||||
|
+ " drops=[null, null]");
|
||||||
|
logs.clear();
|
||||||
|
|
||||||
|
RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue();
|
||||||
|
assertThat(picker4.dropList).containsExactly(null, null);
|
||||||
|
assertThat(picker4.pickList).containsExactly(
|
||||||
|
new BackendEntry(subchannel3, getLoadRecorder(), "token1001"),
|
||||||
|
new BackendEntry(subchannel4, getLoadRecorder(), "token1002"))
|
||||||
|
.inOrder();
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
private void deliverSubchannelState(
|
private void deliverSubchannelState(
|
||||||
final Subchannel subchannel, final ConnectivityStateInfo newState) {
|
final Subchannel subchannel, final ConnectivityStateInfo newState) {
|
||||||
|
|
@ -2154,7 +2358,13 @@ public class GrpclbLoadBalancerTest {
|
||||||
return LoadBalanceResponse.newBuilder()
|
return LoadBalanceResponse.newBuilder()
|
||||||
.setInitialResponse(
|
.setInitialResponse(
|
||||||
InitialLoadBalanceResponse.newBuilder()
|
InitialLoadBalanceResponse.newBuilder()
|
||||||
.setClientStatsReportInterval(Durations.fromMillis(loadReportIntervalMillis)))
|
.setClientStatsReportInterval(Durations.fromMillis(loadReportIntervalMillis)))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static LoadBalanceResponse buildLbFallbackResponse() {
|
||||||
|
return LoadBalanceResponse.newBuilder()
|
||||||
|
.setFallbackResponse(FallbackResponse.newBuilder().build())
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue