From 33c30db42c9a09226cac2e0645fd1dc82f063750 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Mon, 10 Jun 2019 11:27:42 -0700 Subject: [PATCH] xds: allow grpclb balancer addresses for backward compatibility During migration, the name resolver may not know when the client has been upgraded to xds, so it may still send grpclb v1 addresses with a list of policies including both grpclb v1 and xds. --- .../java/io/grpc/xds/XdsLoadBalancer.java | 48 ++++++-- .../io/grpc/xds/XdsLoadBalancerProvider.java | 4 +- .../java/io/grpc/xds/FallbackManagerTest.java | 113 +++++++++++++++++- .../java/io/grpc/xds/XdsLoadBalancerTest.java | 3 + 4 files changed, 150 insertions(+), 18 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java index 634fadb70a..bf82bac161 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; +import static io.grpc.xds.XdsLoadBalancerProvider.XDS_POLICY_NAME; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -35,6 +36,7 @@ import io.grpc.LoadBalancerRegistry; import io.grpc.NameResolver.ConfigOrError; import io.grpc.Status; import io.grpc.SynchronizationContext.ScheduledHandle; +import io.grpc.internal.GrpcAttributes; import io.grpc.internal.ServiceConfigUtil.LbConfig; import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.xds.LocalityStore.LocalityStoreImpl; @@ -314,12 +316,7 @@ final class XdsLoadBalancer extends LoadBalancer { fallbackBalancer = lbRegistry.getProvider(fallbackPolicy.getPolicyName()) .newLoadBalancer(fallbackBalancerHelper); fallbackBalancerHelper.balancer = fallbackBalancer; - // TODO(carl-mastrangelo): propagate the load balancing config policy - fallbackBalancer.handleResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(fallbackServers) - .setAttributes(fallbackAttributes) - .build()); + propagateFallbackAddresses(); } void updateFallbackServers( @@ -334,12 +331,7 @@ final class XdsLoadBalancer extends LoadBalancer { this.fallbackPolicy = fallbackPolicy; if (fallbackBalancer != null) { if (fallbackPolicy.getPolicyName().equals(currentFallbackPolicy.getPolicyName())) { - // TODO(carl-mastrangelo): propagate the load balancing config policy - fallbackBalancer.handleResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(fallbackServers) - .setAttributes(fallbackAttributes) - .build()); + propagateFallbackAddresses(); } else { fallbackBalancer.shutdown(); fallbackBalancer = null; @@ -348,6 +340,38 @@ final class XdsLoadBalancer extends LoadBalancer { } } + private void propagateFallbackAddresses() { + String fallbackPolicyName = fallbackPolicy.getPolicyName(); + List servers = fallbackServers; + + // Some addresses in the list may be grpclb-v1 balancer addresses, so if the fallback policy + // does not support grpclb-v1 balancer addresses, then we need to exclude them from the list. + if (!fallbackPolicyName.equals("grpclb") && !fallbackPolicyName.equals(XDS_POLICY_NAME)) { + ImmutableList.Builder backends = ImmutableList.builder(); + for (EquivalentAddressGroup eag : fallbackServers) { + if (eag.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY) == null) { + backends.add(eag); + } + } + servers = backends.build(); + } + + // TODO(zhangkun83): FIXME(#5496): this is a temporary hack. + if (servers.isEmpty() + && !fallbackBalancer.canHandleEmptyAddressListFromNameResolution()) { + fallbackBalancer.handleNameResolutionError(Status.UNAVAILABLE.withDescription( + "NameResolver returned no usable address." + + " addrs=" + fallbackServers + ", attrs=" + fallbackAttributes)); + } else { + // TODO(carl-mastrangelo): propagate the load balancing config policy + fallbackBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers) + .setAttributes(fallbackAttributes) + .build()); + } + } + void startFallbackTimer() { if (fallbackTimer == null) { class FallbackTask implements Runnable { diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/XdsLoadBalancerProvider.java index a2b727decc..d334ae2875 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/XdsLoadBalancerProvider.java @@ -40,6 +40,8 @@ import javax.annotation.Nullable; @Internal public final class XdsLoadBalancerProvider extends LoadBalancerProvider { + static final String XDS_POLICY_NAME = "xds_experimental"; + private static final LbConfig DEFAULT_FALLBACK_POLICY = new LbConfig("round_robin", ImmutableMap.of()); @@ -55,7 +57,7 @@ public final class XdsLoadBalancerProvider extends LoadBalancerProvider { @Override public String getPolicyName() { - return "xds_experimental"; + return XDS_POLICY_NAME; } @Override diff --git a/xds/src/test/java/io/grpc/xds/FallbackManagerTest.java b/xds/src/test/java/io/grpc/xds/FallbackManagerTest.java index 92b698a5a9..a79a5dd908 100644 --- a/xds/src/test/java/io/grpc/xds/FallbackManagerTest.java +++ b/xds/src/test/java/io/grpc/xds/FallbackManagerTest.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import com.google.common.collect.ImmutableList; import io.grpc.Attributes; import io.grpc.ChannelLogger; import io.grpc.EquivalentAddressGroup; @@ -29,11 +30,15 @@ import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.ResolvedAddresses; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; +import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.SynchronizationContext; import io.grpc.internal.FakeClock; +import io.grpc.internal.GrpcAttributes; import io.grpc.internal.ServiceConfigUtil.LbConfig; import io.grpc.xds.XdsLoadBalancer.FallbackManager; -import java.util.ArrayList; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.HashMap; import java.util.List; import java.util.concurrent.TimeUnit; @@ -42,6 +47,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -70,7 +76,7 @@ public class FallbackManagerTest { @Override public String getPolicyName() { - return "test_policy"; + return fallbackPolicy.getPolicyName(); } @Override @@ -127,10 +133,10 @@ public class FallbackManagerTest { doReturn(syncContext).when(helper).getSynchronizationContext(); doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService(); doReturn(channelLogger).when(helper).getChannelLogger(); + fallbackPolicy = new LbConfig("test_policy", new HashMap()); lbRegistry.register(fakeRoundRonbinLbProvider); lbRegistry.register(fakeFallbackLbProvider); fallbackManager = new FallbackManager(helper, lbRegistry); - fallbackPolicy = new LbConfig("test_policy", new HashMap()); } @After @@ -141,7 +147,8 @@ public class FallbackManagerTest { @Test public void useFallbackWhenTimeout() { fallbackManager.startFallbackTimer(); - List eags = new ArrayList<>(); + List eags = ImmutableList.of( + new EquivalentAddressGroup(ImmutableList.of(new InetSocketAddress(8080)))); fallbackManager.updateFallbackServers( eags, Attributes.EMPTY, fallbackPolicy); @@ -164,10 +171,106 @@ public class FallbackManagerTest { .build()); } + @Test + public void fallback_handleBackendsEagsOnly() { + fallbackManager.startFallbackTimer(); + EquivalentAddressGroup eag0 = new EquivalentAddressGroup( + ImmutableList.of(new InetSocketAddress(8080))); + Attributes attributes = Attributes + .newBuilder() + .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "this is a balancer address") + .build(); + EquivalentAddressGroup eag1 = new EquivalentAddressGroup( + ImmutableList.of(new InetSocketAddress(8081)), attributes); + EquivalentAddressGroup eag2 = new EquivalentAddressGroup( + ImmutableList.of(new InetSocketAddress(8082))); + List eags = ImmutableList.of(eag0, eag1, eag2); + fallbackManager.updateFallbackServers( + eags, Attributes.EMPTY, fallbackPolicy); + + fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + assertThat(fallbackManager.isInFallbackMode()).isTrue(); + verify(fakeFallbackLb).handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of(eag0, eag2)) + .setAttributes( + Attributes.newBuilder() + .set( + LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, + fallbackPolicy.getRawConfigValue()) + .build()) + .build()); + } + + @Test + public void fallback_handleGrpclbAddresses() { + lbRegistry.deregister(fakeFallbackLbProvider); + fallbackPolicy = new LbConfig("grpclb", new HashMap()); + lbRegistry.register(fakeFallbackLbProvider); + + fallbackManager.startFallbackTimer(); + EquivalentAddressGroup eag0 = new EquivalentAddressGroup( + ImmutableList.of(new InetSocketAddress(8080))); + Attributes attributes = Attributes + .newBuilder() + .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "this is a balancer address") + .build(); + EquivalentAddressGroup eag1 = new EquivalentAddressGroup( + ImmutableList.of(new InetSocketAddress(8081)), attributes); + EquivalentAddressGroup eag2 = new EquivalentAddressGroup( + ImmutableList.of(new InetSocketAddress(8082))); + List eags = ImmutableList.of(eag0, eag1, eag2); + fallbackManager.updateFallbackServers( + eags, Attributes.EMPTY, fallbackPolicy); + + fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + assertThat(fallbackManager.isInFallbackMode()).isTrue(); + verify(fakeFallbackLb).handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(eags) + .setAttributes( + Attributes.newBuilder() + .set( + LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, + fallbackPolicy.getRawConfigValue()) + .build()) + .build()); + } + + @Test + public void fallback_onlyGrpclbAddresses_NoBackendAddress() { + lbRegistry.deregister(fakeFallbackLbProvider); + fallbackPolicy = new LbConfig("not_grpclb", new HashMap()); + lbRegistry.register(fakeFallbackLbProvider); + + fallbackManager.startFallbackTimer(); + Attributes attributes = Attributes + .newBuilder() + .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "this is a balancer address") + .build(); + EquivalentAddressGroup eag1 = new EquivalentAddressGroup( + ImmutableList.of(new InetSocketAddress(8081)), attributes); + EquivalentAddressGroup eag2 = new EquivalentAddressGroup( + ImmutableList.of(new InetSocketAddress(8082)), attributes); + List eags = ImmutableList.of(eag1, eag2); + fallbackManager.updateFallbackServers( + eags, Attributes.EMPTY, fallbackPolicy); + + fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + assertThat(fallbackManager.isInFallbackMode()).isTrue(); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + verify(fakeFallbackLb).handleNameResolutionError(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + } + @Test public void cancelFallback() { fallbackManager.startFallbackTimer(); - List eags = new ArrayList<>(); + List eags = ImmutableList.of( + new EquivalentAddressGroup(ImmutableList.of(new InetSocketAddress(8080)))); fallbackManager.updateFallbackServers( eags, Attributes.EMPTY, fallbackPolicy); diff --git a/xds/src/test/java/io/grpc/xds/XdsLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/XdsLoadBalancerTest.java index c26af06f29..416f8ad4ca 100644 --- a/xds/src/test/java/io/grpc/xds/XdsLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsLoadBalancerTest.java @@ -281,6 +281,9 @@ public class XdsLoadBalancerTest { doReturn(oobChannel1).doReturn(oobChannel2).doReturn(oobChannel3) .when(helper).createResolvingOobChannel(anyString()); + + // To write less tedious code for tests, allow fallbackBalancer to handle empty address list. + doReturn(true).when(fallbackBalancer1).canHandleEmptyAddressListFromNameResolution(); } @After