xds: handle 100% drop for fallback mode

- Cancel fallback timer and/or exit fallback mode once receiving an EDS response indicating 100% drop.
- Also update balancing state once receiving the first EDS response with drop information when the channel is at the initial IDLE state.
This commit is contained in:
ZHANG Dapeng 2019-05-24 20:54:37 -07:00 committed by GitHub
parent 7fd5f261b4
commit 77a512551f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 249 additions and 34 deletions

View File

@ -53,6 +53,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
/**
* Manages EAG and locality info for a collection of subchannels, not including subchannels
@ -222,8 +223,9 @@ interface LocalityStore {
this.dropOverloads = checkNotNull(dropOverloads, "dropOverloads");
}
@Nullable
private static ConnectivityState aggregateState(
ConnectivityState overallState, ConnectivityState childState) {
@Nullable ConnectivityState overallState, @Nullable ConnectivityState childState) {
if (overallState == null) {
return childState;
}
@ -270,7 +272,8 @@ interface LocalityStore {
updatePicker(overallState, childPickers);
}
private void updatePicker(ConnectivityState state, List<WeightedChildPicker> childPickers) {
private void updatePicker(
@Nullable ConnectivityState state, List<WeightedChildPicker> childPickers) {
childPickers = Collections.unmodifiableList(childPickers);
SubchannelPicker picker;
if (childPickers.isEmpty()) {
@ -285,6 +288,9 @@ interface LocalityStore {
if (!dropOverloads.isEmpty()) {
picker = new DroppablePicker(dropOverloads, picker, random);
if (state == null) {
state = IDLE;
}
}
if (state != null) {

View File

@ -254,9 +254,13 @@ final class XdsComms {
= ImmutableList.builder();
for (ClusterLoadAssignment.Policy.DropOverload dropOverload
: dropOverloadsProto) {
int rateInMillion = rateInMillion(dropOverload.getDropPercentage());
dropOverloadsBuilder.add(new DropOverload(
dropOverload.getCategory(),
rateInMillion(dropOverload.getDropPercentage())));
dropOverload.getCategory(), rateInMillion));
if (rateInMillion == 1000_000) {
adsStreamCallback.onAllDrop();
break;
}
}
ImmutableList<DropOverload> dropOverloads = dropOverloadsBuilder.build();
localityStore.updateDropPercentage(dropOverloads);
@ -419,5 +423,10 @@ final class XdsComms {
* Once an error occurs in ADS stream.
*/
void onError();
/**
* Once receives a response indicating that 100% of calls should be dropped.
*/
void onAllDrop();
}
}

View File

@ -82,6 +82,11 @@ final class XdsLoadBalancer extends LoadBalancer {
// TODO: schedule a timer for Fallback-After-Startup
} // else: the Fallback-at-Startup timer is still pending, noop and wait
}
@Override
public void onAllDrop() {
fallbackManager.cancelFallback();
}
};
@Nullable

View File

@ -18,6 +18,7 @@ package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.same;
@ -31,6 +32,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
@ -176,7 +178,8 @@ public class LocalityStoreTest {
}
@Test
public void updateLoaclityStore() {
public void updateLoaclityStore_withEmptyDropList() {
localityStore.updateDropPercentage(ImmutableList.<DropOverload>of());
LocalityInfo localityInfo1 =
new LocalityInfo(ImmutableList.of(lbEndpoint11, lbEndpoint12), 1);
LocalityInfo localityInfo2 =
@ -201,6 +204,8 @@ public class LocalityStoreTest {
verify(loadBalancers.get(2)).handleResolvedAddresses(resolvedAddressesCaptor3.capture());
assertThat(resolvedAddressesCaptor3.getValue().getAddresses()).containsExactly(eag31, eag32);
assertThat(pickerFactory.totalReadyLocalities).isEqualTo(0);
verify(helper, never()).updateBalancingState(
any(ConnectivityState.class), any(SubchannelPicker.class));
// subchannel12 goes to CONNECTING
final Subchannel subchannel12 =
@ -276,6 +281,9 @@ public class LocalityStoreTest {
@Test
public void updateLoaclityStore_withDrop() {
localityStore.updateDropPercentage(ImmutableList.of(
new DropOverload("throttle", 365),
new DropOverload("lb", 1234)));
LocalityInfo localityInfo1 =
new LocalityInfo(ImmutableList.of(lbEndpoint11, lbEndpoint12), 1);
LocalityInfo localityInfo2 =
@ -285,9 +293,6 @@ public class LocalityStoreTest {
Map<Locality, LocalityInfo> localityInfoMap = ImmutableMap.of(
locality1, localityInfo1, locality2, localityInfo2, locality3, localityInfo3);
localityStore.updateLocalityStore(localityInfoMap);
localityStore.updateDropPercentage(ImmutableList.of(
new DropOverload("throttle", 365),
new DropOverload("lb", 1234)));
assertThat(loadBalancers).hasSize(3);
ArgumentCaptor<ResolvedAddresses> resolvedAddressesCaptor1 =
@ -303,8 +308,32 @@ public class LocalityStoreTest {
verify(loadBalancers.get(2)).handleResolvedAddresses(resolvedAddressesCaptor3.capture());
assertThat(resolvedAddressesCaptor3.getValue().getAddresses()).containsExactly(eag31, eag32);
assertThat(pickerFactory.totalReadyLocalities).isEqualTo(0);
ArgumentCaptor<SubchannelPicker> subchannelPickerCaptor =
ArgumentCaptor.forClass(SubchannelPicker.class);
verify(helper).updateBalancingState(same(IDLE), subchannelPickerCaptor.capture());
// subchannel12 goes to CONNECTING
int times = 0;
doReturn(365, 1234).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor.getValue().pickSubchannel(pickSubchannelArgs))
.isEqualTo(PickResult.withNoResult());
verify(random, times(times += 2)).nextInt(1000_000);
doReturn(366, 1235).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor.getValue().pickSubchannel(pickSubchannelArgs))
.isEqualTo(PickResult.withNoResult());
verify(random, times(times += 2)).nextInt(1000_000);
doReturn(364, 1234).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor.getValue().pickSubchannel(pickSubchannelArgs).isDrop())
.isTrue();
verify(random, times(times += 1)).nextInt(1000_000);
doReturn(365, 1233).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor.getValue().pickSubchannel(pickSubchannelArgs).isDrop())
.isTrue();
verify(random, times(times += 2)).nextInt(1000_000);
// subchannel12 goes to READY
final Subchannel subchannel12 =
helpers.get(0).createSubchannel(ImmutableList.of(eag12), Attributes.EMPTY);
verify(helper).createSubchannel(ImmutableList.of(eag12), Attributes.EMPTY);
@ -314,30 +343,54 @@ public class LocalityStoreTest {
return PickResult.withSubchannel(subchannel12);
}
};
helpers.get(0).updateBalancingState(CONNECTING, subchannelPicker12);
helpers.get(0).updateBalancingState(READY, subchannelPicker12);
ArgumentCaptor<SubchannelPicker> subchannelPickerCaptor12 =
ArgumentCaptor.forClass(SubchannelPicker.class);
verify(helper).updateBalancingState(same(CONNECTING), subchannelPickerCaptor12.capture());
verify(helper).updateBalancingState(same(READY), subchannelPickerCaptor12.capture());
doReturn(365, 1234).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs))
.isEqualTo(PickResult.withNoResult());
verify(random, times(2)).nextInt(1000_000);
assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs)
.getSubchannel()).isEqualTo(subchannel12);
verify(random, times(times += 2)).nextInt(1000_000);
doReturn(366, 1235).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs))
.isEqualTo(PickResult.withNoResult());
verify(random, times(4)).nextInt(1000_000);
assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs)
.getSubchannel()).isEqualTo(subchannel12);
verify(random, times(times += 2)).nextInt(1000_000);
doReturn(364, 1234).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs).isDrop())
.isTrue();
verify(random, times(5)).nextInt(1000_000);
verify(random, times(times += 1)).nextInt(1000_000);
doReturn(365, 1233).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs).isDrop())
.isTrue();
verify(random, times(7)).nextInt(1000_000);
verify(random, times(times += 2)).nextInt(1000_000);
}
@Test
public void updateLoaclityStore_withAllDropBeforeLocalityUpdateConnectivityState() {
localityStore.updateDropPercentage(ImmutableList.of(
new DropOverload("throttle", 365),
new DropOverload("lb", 1000_000)));
LocalityInfo localityInfo1 =
new LocalityInfo(ImmutableList.of(lbEndpoint11, lbEndpoint12), 1);
LocalityInfo localityInfo2 =
new LocalityInfo(ImmutableList.of(lbEndpoint21, lbEndpoint22), 2);
LocalityInfo localityInfo3 =
new LocalityInfo(ImmutableList.of(lbEndpoint31, lbEndpoint32), 3);
Map<Locality, LocalityInfo> localityInfoMap = ImmutableMap.of(
locality1, localityInfo1, locality2, localityInfo2, locality3, localityInfo3);
localityStore.updateLocalityStore(localityInfoMap);
ArgumentCaptor<SubchannelPicker> subchannelPickerCaptor =
ArgumentCaptor.forClass(SubchannelPicker.class);
verify(helper).updateBalancingState(same(IDLE), subchannelPickerCaptor.capture());
doReturn(999_999).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor.getValue().pickSubchannel(pickSubchannelArgs).isDrop())
.isTrue();
verify(random, times(2)).nextInt(1000_000);
}
@Test

View File

@ -21,8 +21,11 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Any;
@ -64,6 +67,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@ -248,6 +252,8 @@ public class XdsCommsTest {
.build();
responseWriter.onNext(edsResponse);
verify(adsStreamCallback).onWorking();
XdsComms.Locality locality1 = new XdsComms.Locality(localityProto1);
LocalityInfo localityInfo1 = new LocalityInfo(
ImmutableList.of(
@ -261,8 +267,9 @@ public class XdsCommsTest {
2);
XdsComms.Locality locality2 = new XdsComms.Locality(localityProto2);
verify(localityStore).updateDropPercentage(ImmutableList.<DropOverload>of());
verify(localityStore).updateLocalityStore(localityEndpointsMappingCaptor.capture());
InOrder inOrder = inOrder(localityStore);
inOrder.verify(localityStore).updateDropPercentage(ImmutableList.<DropOverload>of());
inOrder.verify(localityStore).updateLocalityStore(localityEndpointsMappingCaptor.capture());
assertThat(localityEndpointsMappingCaptor.getValue()).containsExactly(
locality1, localityInfo1, locality2, localityInfo2).inOrder();
@ -284,8 +291,10 @@ public class XdsCommsTest {
.build();
responseWriter.onNext(edsResponse);
verify(localityStore, times(2)).updateDropPercentage(ImmutableList.<DropOverload>of());
verify(localityStore, times(2)).updateLocalityStore(localityEndpointsMappingCaptor.capture());
verify(adsStreamCallback, times(1)).onWorking();
verifyNoMoreInteractions(adsStreamCallback);
inOrder.verify(localityStore).updateDropPercentage(ImmutableList.<DropOverload>of());
inOrder.verify(localityStore).updateLocalityStore(localityEndpointsMappingCaptor.capture());
assertThat(localityEndpointsMappingCaptor.getValue()).containsExactly(
locality2, localityInfo2, locality1, localityInfo1).inOrder();
@ -345,25 +354,20 @@ public class XdsCommsTest {
.setDropPercentage(FractionalPercent.newBuilder()
.setNumerator(78).setDenominator(DenominatorType.HUNDRED).build())
.build())
.addDropOverloads(
io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload
.newBuilder()
.setCategory("fake_category_2")
.setDropPercentage(FractionalPercent.newBuilder()
.setNumerator(789).setDenominator(DenominatorType.HUNDRED).build())
.build())
.build())
.build()))
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build();
responseWriter.onNext(edsResponseWithDrops);
verify(localityStore).updateDropPercentage(ImmutableList.of(
verify(adsStreamCallback).onWorking();
verifyNoMoreInteractions(adsStreamCallback);
InOrder inOrder = inOrder(localityStore);
inOrder.verify(localityStore).updateDropPercentage(ImmutableList.of(
new DropOverload("throttle", 123),
new DropOverload("lb", 456_00),
new DropOverload("fake_category", 78_00_00),
new DropOverload("fake_category_2", 1000_000)));
verify(localityStore).updateLocalityStore(localityEndpointsMappingCaptor.capture());
new DropOverload("fake_category", 78_00_00)));
inOrder.verify(localityStore).updateLocalityStore(localityEndpointsMappingCaptor.capture());
XdsComms.Locality locality1 = new XdsComms.Locality(localityProto1);
LocalityInfo localityInfo1 = new LocalityInfo(
@ -373,6 +377,63 @@ public class XdsCommsTest {
XdsComms.Locality locality2 = new XdsComms.Locality(localityProto2);
assertThat(localityEndpointsMappingCaptor.getValue()).containsExactly(
locality2, localityInfo2, locality1, localityInfo1).inOrder();
DiscoveryResponse edsResponseWithAllDrops = DiscoveryResponse.newBuilder()
.addResources(Any.pack(ClusterLoadAssignment.newBuilder()
.addEndpoints(LocalityLbEndpoints.newBuilder()
.setLocality(localityProto2)
.addLbEndpoints(endpoint21)
.setLoadBalancingWeight(UInt32Value.of(2)))
.addEndpoints(LocalityLbEndpoints.newBuilder()
.setLocality(localityProto1)
.addLbEndpoints(endpoint11)
.setLoadBalancingWeight(UInt32Value.of(1)))
.setPolicy(Policy.newBuilder()
.addDropOverloads(
io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload
.newBuilder()
.setCategory("throttle")
.setDropPercentage(FractionalPercent.newBuilder()
.setNumerator(123).setDenominator(DenominatorType.MILLION).build())
.build())
.addDropOverloads(
io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload
.newBuilder()
.setCategory("lb")
.setDropPercentage(FractionalPercent.newBuilder()
.setNumerator(456).setDenominator(DenominatorType.TEN_THOUSAND).build())
.build())
.addDropOverloads(
io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload
.newBuilder()
.setCategory("fake_category")
.setDropPercentage(FractionalPercent.newBuilder()
.setNumerator(789).setDenominator(DenominatorType.HUNDRED).build())
.build())
.addDropOverloads(
io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload
.newBuilder()
.setCategory("fake_category_2")
.setDropPercentage(FractionalPercent.newBuilder()
.setNumerator(78).setDenominator(DenominatorType.HUNDRED).build())
.build())
.build())
.build()))
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build();
responseWriter.onNext(edsResponseWithAllDrops);
verify(adsStreamCallback, times(1)).onWorking();
verify(adsStreamCallback).onAllDrop();
verify(adsStreamCallback, never()).onError();
inOrder.verify(localityStore).updateDropPercentage(ImmutableList.of(
new DropOverload("throttle", 123),
new DropOverload("lb", 456_00),
new DropOverload("fake_category", 1000_000)));
inOrder.verify(localityStore).updateLocalityStore(localityEndpointsMappingCaptor.capture());
assertThat(localityEndpointsMappingCaptor.getValue()).containsExactly(
locality2, localityInfo2, locality1, localityInfo1).inOrder();
xdsComms.shutdownChannel();
}
@ -381,6 +442,7 @@ public class XdsCommsTest {
responseWriter.onCompleted();
verify(adsStreamCallback).onError();
verifyNoMoreInteractions(adsStreamCallback);
xdsComms.shutdownChannel();
}

View File

@ -18,6 +18,7 @@ package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.LoadBalancer.ATTR_LOAD_BALANCING_CONFIG;
@ -40,6 +41,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.protobuf.Any;
import com.google.protobuf.UInt32Value;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
import io.envoyproxy.envoy.api.v2.core.Address;
@ -49,6 +51,8 @@ import io.envoyproxy.envoy.api.v2.endpoint.Endpoint;
import io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint;
import io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints;
import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
import io.envoyproxy.envoy.type.FractionalPercent;
import io.envoyproxy.envoy.type.FractionalPercent.DenominatorType;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ChannelLogger;
@ -56,6 +60,7 @@ import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
@ -582,6 +587,81 @@ public class XdsLoadBalancerTest {
verify(helper).updateBalancingState(CONNECTING, picker);
}
@Test
public void allDropCancelsFallbackTimer() throws Exception {
lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(standardModeWithFallback1Attributes())
.build());
DiscoveryResponse edsResponse = DiscoveryResponse.newBuilder()
.addResources(Any.pack(ClusterLoadAssignment.newBuilder()
.addEndpoints(LocalityLbEndpoints.newBuilder()
.setLocality(localityProto1)
.addLbEndpoints(endpoint11)
.setLoadBalancingWeight(UInt32Value.of(1)))
.setPolicy(Policy.newBuilder()
.addDropOverloads(
io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload
.newBuilder()
.setCategory("throttle")
.setDropPercentage(FractionalPercent.newBuilder()
.setNumerator(100).setDenominator(DenominatorType.HUNDRED).build())
.build()))
.build()))
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build();
serverResponseWriter.onNext(edsResponse);
assertThat(fakeClock.getPendingTasks()).isEmpty();
assertNotNull(childHelper);
assertNull(fallbackHelper1);
verify(fallbackBalancer1, never()).handleResolvedAddresses(any(ResolvedAddresses.class));
}
@Test
public void allDropExitFallbackMode() throws Exception {
lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(standardModeWithFallback1Attributes())
.build());
// let the fallback timer expire
assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1);
assertThat(fakeClock.getPendingTasks()).isEmpty();
assertNull(childHelper);
assertNotNull(fallbackHelper1);
// receives EDS response with 100% drop
DiscoveryResponse edsResponse = DiscoveryResponse.newBuilder()
.addResources(Any.pack(ClusterLoadAssignment.newBuilder()
.addEndpoints(LocalityLbEndpoints.newBuilder()
.setLocality(localityProto1)
.addLbEndpoints(endpoint11)
.setLoadBalancingWeight(UInt32Value.of(1)))
.setPolicy(Policy.newBuilder()
.addDropOverloads(
io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload
.newBuilder()
.setCategory("throttle")
.setDropPercentage(FractionalPercent.newBuilder()
.setNumerator(100).setDenominator(DenominatorType.HUNDRED).build())
.build()))
.build()))
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build();
serverResponseWriter.onNext(edsResponse);
verify(fallbackBalancer1).shutdown();
assertNotNull(childHelper);
ArgumentCaptor<SubchannelPicker> subchannelPickerCaptor =
ArgumentCaptor.forClass(SubchannelPicker.class);
verify(helper).updateBalancingState(same(IDLE), subchannelPickerCaptor.capture());
assertThat(subchannelPickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class))
.isDrop()).isTrue();
}
@Test
public void fallback_ErrorWithoutReceivingEdsResponse() throws Exception {
lb.handleResolvedAddresses(