xds: Fix bug of locality store not update weights for existing localities (#5976)

* Fixed LocalityStore test relying on WeightChildPicker list order for mapping to its corresponding locality.

* Fixed bug for not updating new weights for existing localities.
This commit is contained in:
Chengyuan Zhang 2019-07-11 09:44:40 -07:00 committed by GitHub
parent 77ee79cc15
commit d30c304d8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 87 additions and 19 deletions

View File

@ -217,7 +217,8 @@ interface LocalityStore {
LocalityLbInfo oldLocalityLbInfo = localityMap.get(newLocality); LocalityLbInfo oldLocalityLbInfo = localityMap.get(newLocality);
childHelper = oldLocalityLbInfo.childHelper; childHelper = oldLocalityLbInfo.childHelper;
localityLbInfo = localityLbInfo =
new LocalityLbInfo(oldLocalityLbInfo.localityWeight, new LocalityLbInfo(
localityInfoMap.get(newLocality).localityWeight,
oldLocalityLbInfo.childBalancer, oldLocalityLbInfo.childBalancer,
childHelper); childHelper);
} else { } else {

View File

@ -67,8 +67,10 @@ import io.grpc.xds.XdsComms.LocalityInfo;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
@ -95,10 +97,12 @@ public class LocalityStoreTest {
private static final class FakePickerFactory implements PickerFactory { private static final class FakePickerFactory implements PickerFactory {
int totalReadyLocalities; int totalReadyLocalities;
int nextIndex; int nextIndex;
List<WeightedChildPicker> perLocalitiesPickers;
@Override @Override
public SubchannelPicker picker(final List<WeightedChildPicker> childPickers) { public SubchannelPicker picker(final List<WeightedChildPicker> childPickers) {
totalReadyLocalities = childPickers.size(); totalReadyLocalities = childPickers.size();
perLocalitiesPickers = Collections.unmodifiableList(childPickers);
return new SubchannelPicker() { return new SubchannelPicker() {
@Override @Override
@ -181,6 +185,9 @@ public class LocalityStoreTest {
private final LbEndpoint lbEndpoint41 = new LbEndpoint(eag41, 41); private final LbEndpoint lbEndpoint41 = new LbEndpoint(eag41, 41);
private final LbEndpoint lbEndpoint42 = new LbEndpoint(eag42, 42); private final LbEndpoint lbEndpoint42 = new LbEndpoint(eag42, 42);
private final Map<String, XdsLocality> namedLocalities
= ImmutableMap.of("sz1", locality1, "sz2", locality2, "sz3", locality3, "sz4", locality4);
@Mock @Mock
private Helper helper; private Helper helper;
@Mock @Mock
@ -268,9 +275,11 @@ public class LocalityStoreTest {
any(OrcaPerRequestReportListener.class))) any(OrcaPerRequestReportListener.class)))
.thenReturn(metricsTracingFactory1, metricsTracingFactory2); .thenReturn(metricsTracingFactory1, metricsTracingFactory2);
final PickResult result1 = PickResult.withSubchannel(mock(Subchannel.class)); Subchannel subchannel1 = mock(Subchannel.class);
Subchannel subchannel2 = mock(Subchannel.class);
final PickResult result1 = PickResult.withSubchannel(subchannel1);
final PickResult result2 = final PickResult result2 =
PickResult.withSubchannel(mock(Subchannel.class), mock(ClientStreamTracer.Factory.class)); PickResult.withSubchannel(subchannel2, mock(ClientStreamTracer.Factory.class));
SubchannelPicker subchannelPicker1 = mock(SubchannelPicker.class); SubchannelPicker subchannelPicker1 = mock(SubchannelPicker.class);
SubchannelPicker subchannelPicker2 = mock(SubchannelPicker.class); SubchannelPicker subchannelPicker2 = mock(SubchannelPicker.class);
when(subchannelPicker1.pickSubchannel(any(PickSubchannelArgs.class))) when(subchannelPicker1.pickSubchannel(any(PickSubchannelArgs.class)))
@ -289,12 +298,15 @@ public class LocalityStoreTest {
// Verify each PickResult picked is intercepted with client stream tracer factory for // Verify each PickResult picked is intercepted with client stream tracer factory for
// recording load and backend metrics. // recording load and backend metrics.
List<XdsLocality> localities = ImmutableList.of(locality1, locality2); Map<Subchannel, XdsLocality> localitiesBySubchannel
List<ClientStreamTracer.Factory> metricsTracingFactories = = ImmutableMap.of(subchannel1, locality1, subchannel2, locality2);
ImmutableList.of(metricsTracingFactory1, metricsTracingFactory2); Map<Subchannel, ClientStreamTracer.Factory> metricsTracingFactoriesBySubchannel
= ImmutableMap.of(subchannel1, metricsTracingFactory1, subchannel2, metricsTracingFactory2);
for (int i = 0; i < pickerFactory.totalReadyLocalities; i++) { for (int i = 0; i < pickerFactory.totalReadyLocalities; i++) {
pickerFactory.nextIndex = i; pickerFactory.nextIndex = i;
PickResult pickResult = interLocalityPicker.pickSubchannel(pickSubchannelArgs); PickResult pickResult = interLocalityPicker.pickSubchannel(pickSubchannelArgs);
Subchannel expectedSubchannel = pickResult.getSubchannel();
XdsLocality expectedLocality = localitiesBySubchannel.get(expectedSubchannel);
ArgumentCaptor<OrcaPerRequestReportListener> listenerCaptor = ArgumentCaptor.forClass(null); ArgumentCaptor<OrcaPerRequestReportListener> listenerCaptor = ArgumentCaptor.forClass(null);
verify(orcaPerRequestUtil, times(i + 1)) verify(orcaPerRequestUtil, times(i + 1))
.newOrcaClientStreamTracerFactory(any(ClientStreamTracer.Factory.class), .newOrcaClientStreamTracerFactory(any(ClientStreamTracer.Factory.class),
@ -302,14 +314,15 @@ public class LocalityStoreTest {
assertThat(listenerCaptor.getValue()).isInstanceOf(MetricsRecordingListener.class); assertThat(listenerCaptor.getValue()).isInstanceOf(MetricsRecordingListener.class);
MetricsRecordingListener listener = (MetricsRecordingListener) listenerCaptor.getValue(); MetricsRecordingListener listener = (MetricsRecordingListener) listenerCaptor.getValue();
assertThat(listener.getCounter()) assertThat(listener.getCounter())
.isSameInstanceAs(fakeLoadStatsStore.localityCounters.get(localities.get(i))); .isSameInstanceAs(fakeLoadStatsStore.localityCounters.get(expectedLocality));
assertThat(pickResult.getStreamTracerFactory()) assertThat(pickResult.getStreamTracerFactory())
.isInstanceOf(LoadRecordingStreamTracerFactory.class); .isInstanceOf(LoadRecordingStreamTracerFactory.class);
LoadRecordingStreamTracerFactory loadRecordingFactory = LoadRecordingStreamTracerFactory loadRecordingFactory =
(LoadRecordingStreamTracerFactory) pickResult.getStreamTracerFactory(); (LoadRecordingStreamTracerFactory) pickResult.getStreamTracerFactory();
assertThat(loadRecordingFactory.getCounter()) assertThat(loadRecordingFactory.getCounter())
.isSameInstanceAs(fakeLoadStatsStore.localityCounters.get(localities.get(i))); .isSameInstanceAs(fakeLoadStatsStore.localityCounters.get(expectedLocality));
assertThat(loadRecordingFactory.delegate()).isSameInstanceAs(metricsTracingFactories.get(i)); assertThat(loadRecordingFactory.delegate())
.isSameInstanceAs(metricsTracingFactoriesBySubchannel.get(expectedSubchannel));
} }
} }
@ -446,22 +459,27 @@ public class LocalityStoreTest {
} }
}; };
childHelpers.get("sz3").updateBalancingState(READY, subchannelPicker31); childHelpers.get("sz3").updateBalancingState(READY, subchannelPicker31);
ArgumentCaptor<SubchannelPicker> subchannelPickerCaptor31 = ArgumentCaptor<SubchannelPicker> subchannelPickerCaptor =
ArgumentCaptor.forClass(SubchannelPicker.class); ArgumentCaptor.forClass(null);
verify(helper).updateBalancingState(same(READY), subchannelPickerCaptor31.capture()); verify(helper).updateBalancingState(same(READY), subchannelPickerCaptor.capture());
assertThat(pickerFactory.totalReadyLocalities).isEqualTo(1); assertThat(pickerFactory.totalReadyLocalities).isEqualTo(1);
assertThat( pickerFactory.nextIndex = 0;
subchannelPickerCaptor31.getValue().pickSubchannel(pickSubchannelArgs).getSubchannel()) assertThat(subchannelPickerCaptor.getValue().pickSubchannel(pickSubchannelArgs).getSubchannel())
.isEqualTo(subchannel31); .isEqualTo(subchannel31);
// subchannel12 goes to READY // subchannel12 goes to READY
childHelpers.get("sz1").updateBalancingState(READY, subchannelPicker12); childHelpers.get("sz1").updateBalancingState(READY, subchannelPicker12);
verify(helper, times(2)).updateBalancingState(same(READY), subchannelPickerCaptor12.capture()); verify(helper, times(2)).updateBalancingState(same(READY), subchannelPickerCaptor.capture());
assertThat(pickerFactory.totalReadyLocalities).isEqualTo(2); assertThat(pickerFactory.totalReadyLocalities).isEqualTo(2);
pickerFactory.nextIndex = 0;
assertThat( SubchannelPicker interLocalityPicker = subchannelPickerCaptor.getValue();
subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs).getSubchannel()) Set<Subchannel> pickedReadySubchannels = new HashSet<>();
.isEqualTo(subchannel12); for (int i = 0; i < pickerFactory.totalReadyLocalities; i++) {
pickerFactory.nextIndex = i;
PickResult result = interLocalityPicker.pickSubchannel(pickSubchannelArgs);
pickedReadySubchannels.add(result.getSubchannel());
}
assertThat(pickedReadySubchannels).containsExactly(subchannel31, subchannel12);
// update with new addressed // update with new addressed
localityInfo1 = localityInfo1 =
@ -611,6 +629,55 @@ public class LocalityStoreTest {
verify(random, times(2)).nextInt(1000_000); verify(random, times(2)).nextInt(1000_000);
} }
@Test
public void updateLocalityStore_OnlyUpdatingWeightsStillUpdatesPicker() {
LocalityInfo localityInfo1 =
new LocalityInfo(ImmutableList.of(lbEndpoint11, lbEndpoint12), 1);
LocalityInfo localityInfo2 =
new LocalityInfo(ImmutableList.of(lbEndpoint21, lbEndpoint22), 2);
LocalityInfo localityInfo3 =
new LocalityInfo(ImmutableList.of(lbEndpoint31, lbEndpoint32), 3);
Map<XdsLocality, LocalityInfo> localityInfoMap = ImmutableMap.of(
locality1, localityInfo1, locality2, localityInfo2, locality3, localityInfo3);
localityStore.updateLocalityStore(localityInfoMap);
assertThat(loadBalancers).hasSize(3);
assertThat(loadBalancers.keySet()).containsExactly("sz1", "sz2", "sz3");
assertThat(pickerFactory.totalReadyLocalities).isEqualTo(0);
// Update locality weights before any subchannel becomes READY.
localityInfo1 = new LocalityInfo(ImmutableList.of(lbEndpoint11, lbEndpoint12), 4);
localityInfo2 = new LocalityInfo(ImmutableList.of(lbEndpoint21, lbEndpoint22), 5);
localityInfo3 = new LocalityInfo(ImmutableList.of(lbEndpoint31, lbEndpoint32), 6);
localityInfoMap = ImmutableMap.of(
locality1, localityInfo1, locality2, localityInfo2, locality3, localityInfo3);
localityStore.updateLocalityStore(localityInfoMap);
assertThat(pickerFactory.totalReadyLocalities).isEqualTo(0);
final Map<Subchannel, XdsLocality> localitiesBySubchannel = new HashMap<>();
for (final Helper h : childHelpers.values()) {
h.updateBalancingState(READY, new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
Subchannel subchannel = mock(Subchannel.class);
localitiesBySubchannel.put(subchannel, namedLocalities.get(h.getAuthority()));
return PickResult.withSubchannel(subchannel);
}
});
}
assertThat(pickerFactory.totalReadyLocalities).isEqualTo(3);
for (int i = 0; i < pickerFactory.totalReadyLocalities; i++) {
WeightedChildPicker weightedChildPicker
= pickerFactory.perLocalitiesPickers.get(i);
Subchannel subchannel
= weightedChildPicker.getPicker().pickSubchannel(pickSubchannelArgs).getSubchannel();
assertThat(weightedChildPicker.getWeight())
.isEqualTo(localityInfoMap.get(localitiesBySubchannel.get(subchannel)).localityWeight);
}
}
@Test @Test
public void reset() { public void reset() {
LocalityInfo localityInfo1 = LocalityInfo localityInfo1 =