xds: allow grpclb balancer addresses for backward compatibility

During migration, the name resolver may not know when the client has been upgraded to xds, so it may still send grpclb v1 addresses with a list of policies including both grpclb v1 and xds.
This commit is contained in:
ZHANG Dapeng 2019-06-10 11:27:42 -07:00 committed by GitHub
parent 2db3abc9ad
commit 33c30db42c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 150 additions and 18 deletions

View File

@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsLoadBalancerProvider.XDS_POLICY_NAME;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
@ -35,6 +36,7 @@ import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.xds.LocalityStore.LocalityStoreImpl;
@ -314,12 +316,7 @@ final class XdsLoadBalancer extends LoadBalancer {
fallbackBalancer = lbRegistry.getProvider(fallbackPolicy.getPolicyName())
.newLoadBalancer(fallbackBalancerHelper);
fallbackBalancerHelper.balancer = fallbackBalancer;
// TODO(carl-mastrangelo): propagate the load balancing config policy
fallbackBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(fallbackServers)
.setAttributes(fallbackAttributes)
.build());
propagateFallbackAddresses();
}
void updateFallbackServers(
@ -334,12 +331,7 @@ final class XdsLoadBalancer extends LoadBalancer {
this.fallbackPolicy = fallbackPolicy;
if (fallbackBalancer != null) {
if (fallbackPolicy.getPolicyName().equals(currentFallbackPolicy.getPolicyName())) {
// TODO(carl-mastrangelo): propagate the load balancing config policy
fallbackBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(fallbackServers)
.setAttributes(fallbackAttributes)
.build());
propagateFallbackAddresses();
} else {
fallbackBalancer.shutdown();
fallbackBalancer = null;
@ -348,6 +340,38 @@ final class XdsLoadBalancer extends LoadBalancer {
}
}
private void propagateFallbackAddresses() {
String fallbackPolicyName = fallbackPolicy.getPolicyName();
List<EquivalentAddressGroup> servers = fallbackServers;
// Some addresses in the list may be grpclb-v1 balancer addresses, so if the fallback policy
// does not support grpclb-v1 balancer addresses, then we need to exclude them from the list.
if (!fallbackPolicyName.equals("grpclb") && !fallbackPolicyName.equals(XDS_POLICY_NAME)) {
ImmutableList.Builder<EquivalentAddressGroup> backends = ImmutableList.builder();
for (EquivalentAddressGroup eag : fallbackServers) {
if (eag.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY) == null) {
backends.add(eag);
}
}
servers = backends.build();
}
// TODO(zhangkun83): FIXME(#5496): this is a temporary hack.
if (servers.isEmpty()
&& !fallbackBalancer.canHandleEmptyAddressListFromNameResolution()) {
fallbackBalancer.handleNameResolutionError(Status.UNAVAILABLE.withDescription(
"NameResolver returned no usable address."
+ " addrs=" + fallbackServers + ", attrs=" + fallbackAttributes));
} else {
// TODO(carl-mastrangelo): propagate the load balancing config policy
fallbackBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers)
.setAttributes(fallbackAttributes)
.build());
}
}
void startFallbackTimer() {
if (fallbackTimer == null) {
class FallbackTask implements Runnable {

View File

@ -40,6 +40,8 @@ import javax.annotation.Nullable;
@Internal
public final class XdsLoadBalancerProvider extends LoadBalancerProvider {
static final String XDS_POLICY_NAME = "xds_experimental";
private static final LbConfig DEFAULT_FALLBACK_POLICY =
new LbConfig("round_robin", ImmutableMap.<String, Void>of());
@ -55,7 +57,7 @@ public final class XdsLoadBalancerProvider extends LoadBalancerProvider {
@Override
public String getPolicyName() {
return "xds_experimental";
return XDS_POLICY_NAME;
}
@Override

View File

@ -21,6 +21,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableList;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.EquivalentAddressGroup;
@ -29,11 +30,15 @@ import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import io.grpc.internal.FakeClock;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.xds.XdsLoadBalancer.FallbackManager;
import java.util.ArrayList;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -42,6 +47,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@ -70,7 +76,7 @@ public class FallbackManagerTest {
@Override
public String getPolicyName() {
return "test_policy";
return fallbackPolicy.getPolicyName();
}
@Override
@ -127,10 +133,10 @@ public class FallbackManagerTest {
doReturn(syncContext).when(helper).getSynchronizationContext();
doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService();
doReturn(channelLogger).when(helper).getChannelLogger();
fallbackPolicy = new LbConfig("test_policy", new HashMap<String, Void>());
lbRegistry.register(fakeRoundRonbinLbProvider);
lbRegistry.register(fakeFallbackLbProvider);
fallbackManager = new FallbackManager(helper, lbRegistry);
fallbackPolicy = new LbConfig("test_policy", new HashMap<String, Void>());
}
@After
@ -141,7 +147,8 @@ public class FallbackManagerTest {
@Test
public void useFallbackWhenTimeout() {
fallbackManager.startFallbackTimer();
List<EquivalentAddressGroup> eags = new ArrayList<>();
List<EquivalentAddressGroup> eags = ImmutableList.of(
new EquivalentAddressGroup(ImmutableList.<SocketAddress>of(new InetSocketAddress(8080))));
fallbackManager.updateFallbackServers(
eags, Attributes.EMPTY, fallbackPolicy);
@ -164,10 +171,106 @@ public class FallbackManagerTest {
.build());
}
@Test
public void fallback_handleBackendsEagsOnly() {
fallbackManager.startFallbackTimer();
EquivalentAddressGroup eag0 = new EquivalentAddressGroup(
ImmutableList.<SocketAddress>of(new InetSocketAddress(8080)));
Attributes attributes = Attributes
.newBuilder()
.set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "this is a balancer address")
.build();
EquivalentAddressGroup eag1 = new EquivalentAddressGroup(
ImmutableList.<SocketAddress>of(new InetSocketAddress(8081)), attributes);
EquivalentAddressGroup eag2 = new EquivalentAddressGroup(
ImmutableList.<SocketAddress>of(new InetSocketAddress(8082)));
List<EquivalentAddressGroup> eags = ImmutableList.of(eag0, eag1, eag2);
fallbackManager.updateFallbackServers(
eags, Attributes.EMPTY, fallbackPolicy);
fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertThat(fallbackManager.isInFallbackMode()).isTrue();
verify(fakeFallbackLb).handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.of(eag0, eag2))
.setAttributes(
Attributes.newBuilder()
.set(
LoadBalancer.ATTR_LOAD_BALANCING_CONFIG,
fallbackPolicy.getRawConfigValue())
.build())
.build());
}
@Test
public void fallback_handleGrpclbAddresses() {
lbRegistry.deregister(fakeFallbackLbProvider);
fallbackPolicy = new LbConfig("grpclb", new HashMap<String, Void>());
lbRegistry.register(fakeFallbackLbProvider);
fallbackManager.startFallbackTimer();
EquivalentAddressGroup eag0 = new EquivalentAddressGroup(
ImmutableList.<SocketAddress>of(new InetSocketAddress(8080)));
Attributes attributes = Attributes
.newBuilder()
.set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "this is a balancer address")
.build();
EquivalentAddressGroup eag1 = new EquivalentAddressGroup(
ImmutableList.<SocketAddress>of(new InetSocketAddress(8081)), attributes);
EquivalentAddressGroup eag2 = new EquivalentAddressGroup(
ImmutableList.<SocketAddress>of(new InetSocketAddress(8082)));
List<EquivalentAddressGroup> eags = ImmutableList.of(eag0, eag1, eag2);
fallbackManager.updateFallbackServers(
eags, Attributes.EMPTY, fallbackPolicy);
fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertThat(fallbackManager.isInFallbackMode()).isTrue();
verify(fakeFallbackLb).handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(eags)
.setAttributes(
Attributes.newBuilder()
.set(
LoadBalancer.ATTR_LOAD_BALANCING_CONFIG,
fallbackPolicy.getRawConfigValue())
.build())
.build());
}
@Test
public void fallback_onlyGrpclbAddresses_NoBackendAddress() {
lbRegistry.deregister(fakeFallbackLbProvider);
fallbackPolicy = new LbConfig("not_grpclb", new HashMap<String, Void>());
lbRegistry.register(fakeFallbackLbProvider);
fallbackManager.startFallbackTimer();
Attributes attributes = Attributes
.newBuilder()
.set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "this is a balancer address")
.build();
EquivalentAddressGroup eag1 = new EquivalentAddressGroup(
ImmutableList.<SocketAddress>of(new InetSocketAddress(8081)), attributes);
EquivalentAddressGroup eag2 = new EquivalentAddressGroup(
ImmutableList.<SocketAddress>of(new InetSocketAddress(8082)), attributes);
List<EquivalentAddressGroup> eags = ImmutableList.of(eag1, eag2);
fallbackManager.updateFallbackServers(
eags, Attributes.EMPTY, fallbackPolicy);
fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertThat(fallbackManager.isInFallbackMode()).isTrue();
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(fakeFallbackLb).handleNameResolutionError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
}
@Test
public void cancelFallback() {
fallbackManager.startFallbackTimer();
List<EquivalentAddressGroup> eags = new ArrayList<>();
List<EquivalentAddressGroup> eags = ImmutableList.of(
new EquivalentAddressGroup(ImmutableList.<SocketAddress>of(new InetSocketAddress(8080))));
fallbackManager.updateFallbackServers(
eags, Attributes.EMPTY, fallbackPolicy);

View File

@ -281,6 +281,9 @@ public class XdsLoadBalancerTest {
doReturn(oobChannel1).doReturn(oobChannel2).doReturn(oobChannel3)
.when(helper).createResolvingOobChannel(anyString());
// To write less tedious code for tests, allow fallbackBalancer to handle empty address list.
doReturn(true).when(fallbackBalancer1).canHandleEmptyAddressListFromNameResolution();
}
@After