diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer2.java new file mode 100644 index 0000000000..11df68f39f --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer2.java @@ -0,0 +1,248 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import io.grpc.ChannelLogger.ChannelLogLevel; +import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; +import io.grpc.LoadBalancer; +import io.grpc.Status; +import io.grpc.SynchronizationContext.ScheduledHandle; +import io.grpc.util.ForwardingLoadBalancerHelper; +import io.grpc.xds.XdsComms.AdsStreamCallback; +import java.util.concurrent.TimeUnit; +import javax.annotation.CheckForNull; +import javax.annotation.Nullable; + +/** + * A {@link LoadBalancer} that uses the XDS protocol. + * + *

This class manages fallback handling. The logic for child policy handling and fallback policy + * handling is provided by LookasideLb and FallbackLb. + */ +// TODO(zdapeng): migrate name to XdsLoadBlancer +final class XdsLoadBalancer2 extends LoadBalancer { + + private static final long FALLBACK_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); // same as grpclb + + private final Helper helper; + private final LoadBalancer lookasideLb; + private final LoadBalancer.Factory fallbackLbFactory; + private final AdsStreamCallback adsCallback = new AdsStreamCallback() { + @Override + public void onWorking() { + if (childPolicyHasBeenReady) { + // cancel Fallback-After-Startup timer if there's any + cancelFallbackTimer(); + } + + adsWorked = true; + } + + @Override + public void onError() { + if (!adsWorked) { + // start Fallback-at-Startup immediately + useFallbackPolicy(); + } else if (childPolicyHasBeenReady) { + // TODO: schedule a timer for Fallback-After-Startup + } // else: the Fallback-at-Startup timer is still pending, noop and wait + } + + @Override + public void onAllDrop() { + cancelFallback(); + } + }; + + @Nullable + private LoadBalancer fallbackLb; + @Nullable + private ResolvedAddresses resolvedAddresses; + // Scheduled only once. Never reset to null. + @CheckForNull + private ScheduledHandle fallbackTimer; + private boolean adsWorked; + private boolean childPolicyHasBeenReady; + + // TODO(zdapeng): Add XdsLoadBalancer2(Helper helper) with default factories + @VisibleForTesting + XdsLoadBalancer2( + Helper helper, + LookasideLbFactory lookasideLbFactory, + LoadBalancer.Factory fallbackLbFactory) { + this.helper = helper; + this.lookasideLb = lookasideLbFactory.newLoadBalancer(new LookasideLbHelper(), adsCallback); + this.fallbackLbFactory = fallbackLbFactory; + } + + @Override + public boolean canHandleEmptyAddressListFromNameResolution() { + // This does not sound correct, but it's fine as we don't support fallback at this moment. + // TODO(zdapeng): revisit it once we officially support fallback. + return true; + } + + @Override + public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + this.resolvedAddresses = resolvedAddresses; + + if (isInFallbackMode()) { + fallbackLb.handleResolvedAddresses(this.resolvedAddresses); + } + + if (fallbackTimer == null) { + class EnterFallbackTask implements Runnable { + + @Override + public void run() { + useFallbackPolicy(); + } + } + + fallbackTimer = helper.getSynchronizationContext().schedule( + new EnterFallbackTask(), FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS, + helper.getScheduledExecutorService()); + } + + lookasideLb.handleResolvedAddresses(resolvedAddresses); + } + + @Override + public void handleNameResolutionError(Status error) { + lookasideLb.handleNameResolutionError(error); + if (isInFallbackMode()) { + fallbackLb.handleNameResolutionError(error); + } + } + + @Override + public void requestConnection() { + lookasideLb.requestConnection(); + if (isInFallbackMode()) { + fallbackLb.requestConnection(); + } + } + + @Override + public void shutdown() { + helper.getChannelLogger().log( + ChannelLogLevel.INFO, "Shutting down XDS balancer"); + lookasideLb.shutdown(); + cancelFallback(); + } + + @Deprecated + public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { + throw new UnsupportedOperationException( + "handleSubchannelState() not supported by XdsLoadBalancer"); + } + + private void cancelFallbackTimer() { + if (fallbackTimer != null) { + fallbackTimer.cancel(); + } + } + + private void cancelFallback() { + cancelFallbackTimer(); + if (isInFallbackMode()) { + helper.getChannelLogger().log( + ChannelLogLevel.INFO, "Shutting down XDS fallback balancer"); + fallbackLb.shutdown(); + fallbackLb = null; + } + } + + private void useFallbackPolicy() { + if (isInFallbackMode()) { + return; + } + cancelFallbackTimer(); + helper.getChannelLogger().log( + ChannelLogLevel.INFO, "Using XDS fallback policy"); + + FallbackLbHelper fallbackLbHelper = new FallbackLbHelper(); + fallbackLb = fallbackLbFactory.newLoadBalancer(fallbackLbHelper); + fallbackLbHelper.balancer = fallbackLb; + fallbackLb.handleResolvedAddresses(resolvedAddresses); + } + + /** + * Fallback mode being on indicates that an update from child LBs will be ignored unless the + * update triggers turning off the fallback mode first. + */ + private boolean isInFallbackMode() { + return fallbackLb != null; + } + + private final class LookasideLbHelper extends ForwardingLoadBalancerHelper { + + @Override + protected Helper delegate() { + return helper; + } + + @Override + public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { + if (newState == ConnectivityState.READY) { + checkState( + adsWorked, + "channel goes to READY before the load balancer even worked"); + childPolicyHasBeenReady = true; + cancelFallback(); + } + if (!isInFallbackMode()) { + helper.getChannelLogger().log( + ChannelLogLevel.INFO, "Picker updated - state: {0}, picker: {1}", newState, newPicker); + helper.updateBalancingState(newState, newPicker); + } + } + } + + private final class FallbackLbHelper extends ForwardingLoadBalancerHelper { + LoadBalancer balancer; + + @Override + protected Helper delegate() { + return helper; + } + + @Override + public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { + checkNotNull(balancer, "balancer not set yet"); + if (balancer != fallbackLb) { + // ignore updates from a misbehaving shutdown fallback balancer + return; + } + helper.getChannelLogger().log( + ChannelLogLevel.INFO, + "Picker updated - state: {0}, picker: {1}", newState, newPicker); + super.updateBalancingState(newState, newPicker); + } + } + + /** Factory of a look-aside load balancer. The interface itself is for convenience in test. */ + @VisibleForTesting + interface LookasideLbFactory { + LoadBalancer newLoadBalancer(Helper helper, AdsStreamCallback adsCallback); + } +} diff --git a/xds/src/test/java/io/grpc/xds/XdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/XdsLoadBalancer2Test.java new file mode 100644 index 0000000000..f52d17844f --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/XdsLoadBalancer2Test.java @@ -0,0 +1,297 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.READY; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import io.grpc.Attributes; +import io.grpc.ChannelLogger; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.ResolvedAddresses; +import io.grpc.LoadBalancer.Subchannel; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.Status; +import io.grpc.SynchronizationContext; +import io.grpc.internal.FakeClock; +import io.grpc.xds.XdsComms.AdsStreamCallback; +import io.grpc.xds.XdsLoadBalancer2.LookasideLbFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +/** Unit tests for {@link XdsLoadBalancer2}. */ +@RunWith(JUnit4.class) +public class XdsLoadBalancer2Test { + @Rule + public final ExpectedException thrown = ExpectedException.none(); + @Rule + public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + private final FakeClock fakeClock = new FakeClock(); + private final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); + } + }); + + @Mock + private Helper helper; + private LoadBalancer xdsLoadBalancer; + private AdsStreamCallback adsCallback; + + private Helper lookasideLbHelper; + private final List lookasideLbs = new ArrayList<>(); + + private Helper fallbackLbHelper; + private final List fallbackLbs = new ArrayList<>(); + + private int requestConnectionTimes; + + @Before + public void setUp() { + LookasideLbFactory lookasideLbFactory = new LookasideLbFactory() { + @Override + public LoadBalancer newLoadBalancer(Helper helper, AdsStreamCallback adsCallback) { + // just return a mock and record the input and output + lookasideLbHelper = helper; + XdsLoadBalancer2Test.this.adsCallback = adsCallback; + LoadBalancer lookasideLb = mock(LoadBalancer.class); + lookasideLbs.add(lookasideLb); + return lookasideLb; + } + }; + LoadBalancer.Factory fallbackLbFactory = new LoadBalancer.Factory() { + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + // just return a mock and record the input and output + fallbackLbHelper = helper; + LoadBalancer fallbackLb = mock(LoadBalancer.class); + fallbackLbs.add(fallbackLb); + return fallbackLb; + } + }; + doReturn(syncContext).when(helper).getSynchronizationContext(); + doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService(); + doReturn(mock(ChannelLogger.class)).when(helper).getChannelLogger(); + + xdsLoadBalancer = + new XdsLoadBalancer2(helper, lookasideLbFactory, fallbackLbFactory); + xdsLoadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()).build()); + } + + @Test + public void tearDown() { + assertThat(lookasideLbs).hasSize(1); + xdsLoadBalancer.shutdown(); + for (LoadBalancer lookasideLb : lookasideLbs) { + verify(lookasideLb).shutdown(); + } + for (LoadBalancer fallbackLb : fallbackLbs) { + verify(fallbackLb).shutdown(); + } + assertThat(fakeClock.getPendingTasks()).isEmpty(); + } + + @Test + public void canHandleEmptyAddressListFromNameResolution() { + assertThat(xdsLoadBalancer.canHandleEmptyAddressListFromNameResolution()).isTrue(); + } + + @Test + public void timeoutAtStartup_expectUseFallback_thenBackendReady_expectExitFallback() { + verifyNotInFallbackMode(); + fakeClock.forwardTime(9, TimeUnit.SECONDS); + adsCallback.onWorking(); + verifyNotInFallbackMode(); + fakeClock.forwardTime(1, TimeUnit.SECONDS); + verifyInFallbackMode(); + + SubchannelPicker subchannelPicker = mock(SubchannelPicker.class); + lookasideLbHelper.updateBalancingState(READY, subchannelPicker); + verify(helper).updateBalancingState(READY, subchannelPicker); + verifyNotInFallbackMode(); + + assertThat(fallbackLbs).hasSize(1); + } + + @Test + public void backendReadyBeforeTimeoutAtStartup_expectNoFallback() { + verifyNotInFallbackMode(); + fakeClock.forwardTime(9, TimeUnit.SECONDS); + verifyNotInFallbackMode(); + assertThat(fakeClock.getPendingTasks()).hasSize(1); + + adsCallback.onWorking(); + SubchannelPicker subchannelPicker = mock(SubchannelPicker.class); + lookasideLbHelper.updateBalancingState(READY, subchannelPicker); + verify(helper).updateBalancingState(READY, subchannelPicker); + assertThat(fakeClock.getPendingTasks()).isEmpty(); + verifyNotInFallbackMode(); + + assertThat(fallbackLbs).isEmpty(); + } + + @Test + public void recevieAllDropBeforeTimeoutAtStartup_expectNoFallback() { + verifyNotInFallbackMode(); + assertThat(fakeClock.getPendingTasks()).hasSize(1); + + adsCallback.onAllDrop(); + assertThat(fakeClock.getPendingTasks()).isEmpty(); + verifyNotInFallbackMode(); + + assertThat(fallbackLbs).isEmpty(); + } + + @Test + public void lookasideChannelFailsWithoutSeeingEdsResponseBeforeTimeoutAtStartup() { + verifyNotInFallbackMode(); + assertThat(fakeClock.getPendingTasks()).hasSize(1); + + adsCallback.onError(); + verifyInFallbackMode(); + + assertThat(fallbackLbs).hasSize(1); + } + + @Test + public void lookasideChannelSeeingEdsResponseThenFailsBeforeTimeoutAtStartup() { + verifyNotInFallbackMode(); + assertThat(fakeClock.getPendingTasks()).hasSize(1); + adsCallback.onWorking(); + adsCallback.onError(); + verifyNotInFallbackMode(); + + fakeClock.forwardTime(10, TimeUnit.SECONDS); + verifyInFallbackMode(); + + assertThat(fallbackLbs).hasSize(1); + } + + @Test + public void fallbackWillHandleLastResolvedAddresses() { + verifyNotInFallbackMode(); + + ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setAttributes( + Attributes.newBuilder().set(Attributes.Key.create("k"), new Object()).build()) + .setLoadBalancingPolicyConfig(new Object()) + .build(); + xdsLoadBalancer.handleResolvedAddresses(resolvedAddresses); + + adsCallback.onError(); + LoadBalancer fallbackLb = Iterables.getLast(fallbackLbs); + verify(fallbackLb).handleResolvedAddresses(same(resolvedAddresses)); + } + + private void verifyInFallbackMode() { + assertThat(lookasideLbs).isNotEmpty(); + assertThat(fallbackLbs).isNotEmpty(); + LoadBalancer lookasideLb = Iterables.getLast(lookasideLbs); + LoadBalancer fallbackLb = Iterables.getLast(fallbackLbs); + verify(lookasideLb, never()).shutdown(); + verify(fallbackLb, never()).shutdown(); + + xdsLoadBalancer.requestConnection(); + verify(lookasideLb, times(++requestConnectionTimes)).requestConnection(); + verify(fallbackLb).requestConnection(); + + ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setAttributes( + Attributes.newBuilder().set(Attributes.Key.create("k"), new Object()).build()) + .setLoadBalancingPolicyConfig(new Object()) + .build(); + xdsLoadBalancer.handleResolvedAddresses(resolvedAddresses); + verify(lookasideLb).handleResolvedAddresses(same(resolvedAddresses)); + verify(fallbackLb).handleResolvedAddresses(same(resolvedAddresses)); + + Status status = Status.DATA_LOSS.withDescription(""); + xdsLoadBalancer.handleNameResolutionError(status); + verify(lookasideLb).handleNameResolutionError(same(status)); + verify(fallbackLb).handleNameResolutionError(same(status)); + + SubchannelPicker subchannelPicker = mock(SubchannelPicker.class); + lookasideLbHelper.updateBalancingState(CONNECTING, subchannelPicker); + verify(helper, never()).updateBalancingState(CONNECTING, subchannelPicker); + fallbackLbHelper.updateBalancingState(CONNECTING, subchannelPicker); + verify(helper).updateBalancingState(CONNECTING, subchannelPicker); + } + + private void verifyNotInFallbackMode() { + for (LoadBalancer fallbackLb : fallbackLbs) { + verify(fallbackLb).shutdown(); + } + LoadBalancer lookasideLb = Iterables.getLast(lookasideLbs); + + xdsLoadBalancer.requestConnection(); + verify(lookasideLb, times(++requestConnectionTimes)).requestConnection(); + + ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setAttributes( + Attributes.newBuilder().set(Attributes.Key.create("k"), new Object()).build()) + .setLoadBalancingPolicyConfig(new Object()) + .build(); + xdsLoadBalancer.handleResolvedAddresses(resolvedAddresses); + verify(lookasideLb).handleResolvedAddresses(same(resolvedAddresses)); + + Status status = Status.DATA_LOSS.withDescription(""); + xdsLoadBalancer.handleNameResolutionError(status); + verify(lookasideLb).handleNameResolutionError(same(status)); + + SubchannelPicker subchannelPicker = mock(SubchannelPicker.class); + lookasideLbHelper.updateBalancingState(CONNECTING, subchannelPicker); + verify(helper).updateBalancingState(CONNECTING, subchannelPicker); + } + + @Deprecated + @Test + public void handleSubchannelState_shouldThrow() { + Subchannel subchannel = mock(Subchannel.class); + ConnectivityStateInfo connectivityStateInfo = ConnectivityStateInfo.forNonError(READY); + thrown.expect(UnsupportedOperationException.class); + xdsLoadBalancer.handleSubchannelState(subchannel, connectivityStateInfo); + } +}