xds: restructure XdsLoadBalancer part 1

- Get rid of `XdsLbState` and make config change handling into two layers: `LookasideLb` (handles balancer name) and `LookasideChannelLb` (handles child policy), under `XdsLoadBalanecer` (fallback manager layer)
- Move `XdsComms`/`AdsStream` to a layer under `LookasideChannelLb`. They don't keep the helper, but only `SyncCtx` and `ChannelLogger`
- For each layer, we pass in a `LoadBalancer.Factory` for the next/child layer. In test, 
  + we mock/fake the factory, so we don't care about the implementation details of the child layer.
  + we capture the helper of `factory.newBalancer(factory)`, so we can mimic `updateBalancingState() `from the child layer.

Part 1 contains fallback management logic. There is no change in fallback management logic.
This commit is contained in:
ZHANG Dapeng 2019-09-13 12:11:27 -07:00 committed by GitHub
parent 858a1f8fb9
commit 4334583681
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 545 additions and 0 deletions

View File

@ -0,0 +1,248 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.LoadBalancer;
import io.grpc.Status;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.xds.XdsComms.AdsStreamCallback;
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
/**
* A {@link LoadBalancer} that uses the XDS protocol.
*
* <p>This class manages fallback handling. The logic for child policy handling and fallback policy
* handling is provided by LookasideLb and FallbackLb.
*/
// TODO(zdapeng): migrate name to XdsLoadBlancer
final class XdsLoadBalancer2 extends LoadBalancer {
private static final long FALLBACK_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); // same as grpclb
private final Helper helper;
private final LoadBalancer lookasideLb;
private final LoadBalancer.Factory fallbackLbFactory;
private final AdsStreamCallback adsCallback = new AdsStreamCallback() {
@Override
public void onWorking() {
if (childPolicyHasBeenReady) {
// cancel Fallback-After-Startup timer if there's any
cancelFallbackTimer();
}
adsWorked = true;
}
@Override
public void onError() {
if (!adsWorked) {
// start Fallback-at-Startup immediately
useFallbackPolicy();
} else if (childPolicyHasBeenReady) {
// TODO: schedule a timer for Fallback-After-Startup
} // else: the Fallback-at-Startup timer is still pending, noop and wait
}
@Override
public void onAllDrop() {
cancelFallback();
}
};
@Nullable
private LoadBalancer fallbackLb;
@Nullable
private ResolvedAddresses resolvedAddresses;
// Scheduled only once. Never reset to null.
@CheckForNull
private ScheduledHandle fallbackTimer;
private boolean adsWorked;
private boolean childPolicyHasBeenReady;
// TODO(zdapeng): Add XdsLoadBalancer2(Helper helper) with default factories
@VisibleForTesting
XdsLoadBalancer2(
Helper helper,
LookasideLbFactory lookasideLbFactory,
LoadBalancer.Factory fallbackLbFactory) {
this.helper = helper;
this.lookasideLb = lookasideLbFactory.newLoadBalancer(new LookasideLbHelper(), adsCallback);
this.fallbackLbFactory = fallbackLbFactory;
}
@Override
public boolean canHandleEmptyAddressListFromNameResolution() {
// This does not sound correct, but it's fine as we don't support fallback at this moment.
// TODO(zdapeng): revisit it once we officially support fallback.
return true;
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
this.resolvedAddresses = resolvedAddresses;
if (isInFallbackMode()) {
fallbackLb.handleResolvedAddresses(this.resolvedAddresses);
}
if (fallbackTimer == null) {
class EnterFallbackTask implements Runnable {
@Override
public void run() {
useFallbackPolicy();
}
}
fallbackTimer = helper.getSynchronizationContext().schedule(
new EnterFallbackTask(), FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS,
helper.getScheduledExecutorService());
}
lookasideLb.handleResolvedAddresses(resolvedAddresses);
}
@Override
public void handleNameResolutionError(Status error) {
lookasideLb.handleNameResolutionError(error);
if (isInFallbackMode()) {
fallbackLb.handleNameResolutionError(error);
}
}
@Override
public void requestConnection() {
lookasideLb.requestConnection();
if (isInFallbackMode()) {
fallbackLb.requestConnection();
}
}
@Override
public void shutdown() {
helper.getChannelLogger().log(
ChannelLogLevel.INFO, "Shutting down XDS balancer");
lookasideLb.shutdown();
cancelFallback();
}
@Deprecated
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
throw new UnsupportedOperationException(
"handleSubchannelState() not supported by XdsLoadBalancer");
}
private void cancelFallbackTimer() {
if (fallbackTimer != null) {
fallbackTimer.cancel();
}
}
private void cancelFallback() {
cancelFallbackTimer();
if (isInFallbackMode()) {
helper.getChannelLogger().log(
ChannelLogLevel.INFO, "Shutting down XDS fallback balancer");
fallbackLb.shutdown();
fallbackLb = null;
}
}
private void useFallbackPolicy() {
if (isInFallbackMode()) {
return;
}
cancelFallbackTimer();
helper.getChannelLogger().log(
ChannelLogLevel.INFO, "Using XDS fallback policy");
FallbackLbHelper fallbackLbHelper = new FallbackLbHelper();
fallbackLb = fallbackLbFactory.newLoadBalancer(fallbackLbHelper);
fallbackLbHelper.balancer = fallbackLb;
fallbackLb.handleResolvedAddresses(resolvedAddresses);
}
/**
* Fallback mode being on indicates that an update from child LBs will be ignored unless the
* update triggers turning off the fallback mode first.
*/
private boolean isInFallbackMode() {
return fallbackLb != null;
}
private final class LookasideLbHelper extends ForwardingLoadBalancerHelper {
@Override
protected Helper delegate() {
return helper;
}
@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
if (newState == ConnectivityState.READY) {
checkState(
adsWorked,
"channel goes to READY before the load balancer even worked");
childPolicyHasBeenReady = true;
cancelFallback();
}
if (!isInFallbackMode()) {
helper.getChannelLogger().log(
ChannelLogLevel.INFO, "Picker updated - state: {0}, picker: {1}", newState, newPicker);
helper.updateBalancingState(newState, newPicker);
}
}
}
private final class FallbackLbHelper extends ForwardingLoadBalancerHelper {
LoadBalancer balancer;
@Override
protected Helper delegate() {
return helper;
}
@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
checkNotNull(balancer, "balancer not set yet");
if (balancer != fallbackLb) {
// ignore updates from a misbehaving shutdown fallback balancer
return;
}
helper.getChannelLogger().log(
ChannelLogLevel.INFO,
"Picker updated - state: {0}, picker: {1}", newState, newPicker);
super.updateBalancingState(newState, newPicker);
}
}
/** Factory of a look-aside load balancer. The interface itself is for convenience in test. */
@VisibleForTesting
interface LookasideLbFactory {
LoadBalancer newLoadBalancer(Helper helper, AdsStreamCallback adsCallback);
}
}

View File

@ -0,0 +1,297 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.READY;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.FakeClock;
import io.grpc.xds.XdsComms.AdsStreamCallback;
import io.grpc.xds.XdsLoadBalancer2.LookasideLbFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
/** Unit tests for {@link XdsLoadBalancer2}. */
@RunWith(JUnit4.class)
public class XdsLoadBalancer2Test {
@Rule
public final ExpectedException thrown = ExpectedException.none();
@Rule
public final MockitoRule mockitoRule = MockitoJUnit.rule();
private final FakeClock fakeClock = new FakeClock();
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
throw new AssertionError(e);
}
});
@Mock
private Helper helper;
private LoadBalancer xdsLoadBalancer;
private AdsStreamCallback adsCallback;
private Helper lookasideLbHelper;
private final List<LoadBalancer> lookasideLbs = new ArrayList<>();
private Helper fallbackLbHelper;
private final List<LoadBalancer> fallbackLbs = new ArrayList<>();
private int requestConnectionTimes;
@Before
public void setUp() {
LookasideLbFactory lookasideLbFactory = new LookasideLbFactory() {
@Override
public LoadBalancer newLoadBalancer(Helper helper, AdsStreamCallback adsCallback) {
// just return a mock and record the input and output
lookasideLbHelper = helper;
XdsLoadBalancer2Test.this.adsCallback = adsCallback;
LoadBalancer lookasideLb = mock(LoadBalancer.class);
lookasideLbs.add(lookasideLb);
return lookasideLb;
}
};
LoadBalancer.Factory fallbackLbFactory = new LoadBalancer.Factory() {
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
// just return a mock and record the input and output
fallbackLbHelper = helper;
LoadBalancer fallbackLb = mock(LoadBalancer.class);
fallbackLbs.add(fallbackLb);
return fallbackLb;
}
};
doReturn(syncContext).when(helper).getSynchronizationContext();
doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService();
doReturn(mock(ChannelLogger.class)).when(helper).getChannelLogger();
xdsLoadBalancer =
new XdsLoadBalancer2(helper, lookasideLbFactory, fallbackLbFactory);
xdsLoadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of()).build());
}
@Test
public void tearDown() {
assertThat(lookasideLbs).hasSize(1);
xdsLoadBalancer.shutdown();
for (LoadBalancer lookasideLb : lookasideLbs) {
verify(lookasideLb).shutdown();
}
for (LoadBalancer fallbackLb : fallbackLbs) {
verify(fallbackLb).shutdown();
}
assertThat(fakeClock.getPendingTasks()).isEmpty();
}
@Test
public void canHandleEmptyAddressListFromNameResolution() {
assertThat(xdsLoadBalancer.canHandleEmptyAddressListFromNameResolution()).isTrue();
}
@Test
public void timeoutAtStartup_expectUseFallback_thenBackendReady_expectExitFallback() {
verifyNotInFallbackMode();
fakeClock.forwardTime(9, TimeUnit.SECONDS);
adsCallback.onWorking();
verifyNotInFallbackMode();
fakeClock.forwardTime(1, TimeUnit.SECONDS);
verifyInFallbackMode();
SubchannelPicker subchannelPicker = mock(SubchannelPicker.class);
lookasideLbHelper.updateBalancingState(READY, subchannelPicker);
verify(helper).updateBalancingState(READY, subchannelPicker);
verifyNotInFallbackMode();
assertThat(fallbackLbs).hasSize(1);
}
@Test
public void backendReadyBeforeTimeoutAtStartup_expectNoFallback() {
verifyNotInFallbackMode();
fakeClock.forwardTime(9, TimeUnit.SECONDS);
verifyNotInFallbackMode();
assertThat(fakeClock.getPendingTasks()).hasSize(1);
adsCallback.onWorking();
SubchannelPicker subchannelPicker = mock(SubchannelPicker.class);
lookasideLbHelper.updateBalancingState(READY, subchannelPicker);
verify(helper).updateBalancingState(READY, subchannelPicker);
assertThat(fakeClock.getPendingTasks()).isEmpty();
verifyNotInFallbackMode();
assertThat(fallbackLbs).isEmpty();
}
@Test
public void recevieAllDropBeforeTimeoutAtStartup_expectNoFallback() {
verifyNotInFallbackMode();
assertThat(fakeClock.getPendingTasks()).hasSize(1);
adsCallback.onAllDrop();
assertThat(fakeClock.getPendingTasks()).isEmpty();
verifyNotInFallbackMode();
assertThat(fallbackLbs).isEmpty();
}
@Test
public void lookasideChannelFailsWithoutSeeingEdsResponseBeforeTimeoutAtStartup() {
verifyNotInFallbackMode();
assertThat(fakeClock.getPendingTasks()).hasSize(1);
adsCallback.onError();
verifyInFallbackMode();
assertThat(fallbackLbs).hasSize(1);
}
@Test
public void lookasideChannelSeeingEdsResponseThenFailsBeforeTimeoutAtStartup() {
verifyNotInFallbackMode();
assertThat(fakeClock.getPendingTasks()).hasSize(1);
adsCallback.onWorking();
adsCallback.onError();
verifyNotInFallbackMode();
fakeClock.forwardTime(10, TimeUnit.SECONDS);
verifyInFallbackMode();
assertThat(fallbackLbs).hasSize(1);
}
@Test
public void fallbackWillHandleLastResolvedAddresses() {
verifyNotInFallbackMode();
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setAttributes(
Attributes.newBuilder().set(Attributes.Key.create("k"), new Object()).build())
.setLoadBalancingPolicyConfig(new Object())
.build();
xdsLoadBalancer.handleResolvedAddresses(resolvedAddresses);
adsCallback.onError();
LoadBalancer fallbackLb = Iterables.getLast(fallbackLbs);
verify(fallbackLb).handleResolvedAddresses(same(resolvedAddresses));
}
private void verifyInFallbackMode() {
assertThat(lookasideLbs).isNotEmpty();
assertThat(fallbackLbs).isNotEmpty();
LoadBalancer lookasideLb = Iterables.getLast(lookasideLbs);
LoadBalancer fallbackLb = Iterables.getLast(fallbackLbs);
verify(lookasideLb, never()).shutdown();
verify(fallbackLb, never()).shutdown();
xdsLoadBalancer.requestConnection();
verify(lookasideLb, times(++requestConnectionTimes)).requestConnection();
verify(fallbackLb).requestConnection();
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setAttributes(
Attributes.newBuilder().set(Attributes.Key.create("k"), new Object()).build())
.setLoadBalancingPolicyConfig(new Object())
.build();
xdsLoadBalancer.handleResolvedAddresses(resolvedAddresses);
verify(lookasideLb).handleResolvedAddresses(same(resolvedAddresses));
verify(fallbackLb).handleResolvedAddresses(same(resolvedAddresses));
Status status = Status.DATA_LOSS.withDescription("");
xdsLoadBalancer.handleNameResolutionError(status);
verify(lookasideLb).handleNameResolutionError(same(status));
verify(fallbackLb).handleNameResolutionError(same(status));
SubchannelPicker subchannelPicker = mock(SubchannelPicker.class);
lookasideLbHelper.updateBalancingState(CONNECTING, subchannelPicker);
verify(helper, never()).updateBalancingState(CONNECTING, subchannelPicker);
fallbackLbHelper.updateBalancingState(CONNECTING, subchannelPicker);
verify(helper).updateBalancingState(CONNECTING, subchannelPicker);
}
private void verifyNotInFallbackMode() {
for (LoadBalancer fallbackLb : fallbackLbs) {
verify(fallbackLb).shutdown();
}
LoadBalancer lookasideLb = Iterables.getLast(lookasideLbs);
xdsLoadBalancer.requestConnection();
verify(lookasideLb, times(++requestConnectionTimes)).requestConnection();
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setAttributes(
Attributes.newBuilder().set(Attributes.Key.create("k"), new Object()).build())
.setLoadBalancingPolicyConfig(new Object())
.build();
xdsLoadBalancer.handleResolvedAddresses(resolvedAddresses);
verify(lookasideLb).handleResolvedAddresses(same(resolvedAddresses));
Status status = Status.DATA_LOSS.withDescription("");
xdsLoadBalancer.handleNameResolutionError(status);
verify(lookasideLb).handleNameResolutionError(same(status));
SubchannelPicker subchannelPicker = mock(SubchannelPicker.class);
lookasideLbHelper.updateBalancingState(CONNECTING, subchannelPicker);
verify(helper).updateBalancingState(CONNECTING, subchannelPicker);
}
@Deprecated
@Test
public void handleSubchannelState_shouldThrow() {
Subchannel subchannel = mock(Subchannel.class);
ConnectivityStateInfo connectivityStateInfo = ConnectivityStateInfo.forNonError(READY);
thrown.expect(UnsupportedOperationException.class);
xdsLoadBalancer.handleSubchannelState(subchannel, connectivityStateInfo);
}
}