core: make ManagedChannel honor Service config LB

This commit is contained in:
Carl Mastrangelo 2018-03-16 14:47:25 -07:00 committed by GitHub
parent 401726f310
commit 7daefd75a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 422 additions and 27 deletions

View File

@ -31,7 +31,6 @@ import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolver;
import io.grpc.NameResolverProvider;
import io.grpc.PickFirstBalancerFactory;
import io.opencensus.trace.Tracing;
import java.net.SocketAddress;
import java.net.URI;
@ -85,9 +84,6 @@ public abstract class AbstractManagedChannelImplBuilder
private static final NameResolver.Factory DEFAULT_NAME_RESOLVER_FACTORY =
NameResolverProvider.asFactory();
private static final LoadBalancer.Factory DEFAULT_LOAD_BALANCER_FACTORY =
PickFirstBalancerFactory.getInstance();
private static final DecompressorRegistry DEFAULT_DECOMPRESSOR_REGISTRY =
DecompressorRegistry.getDefaultInstance();
@ -117,7 +113,7 @@ public abstract class AbstractManagedChannelImplBuilder
String authorityOverride;
LoadBalancer.Factory loadBalancerFactory = DEFAULT_LOAD_BALANCER_FACTORY;
@Nullable LoadBalancer.Factory loadBalancerFactory;
boolean fullStreamDecompression;
@ -236,11 +232,7 @@ public abstract class AbstractManagedChannelImplBuilder
Preconditions.checkState(directServerAddress == null,
"directServerAddress is set (%s), which forbids the use of LoadBalancer.Factory",
directServerAddress);
if (loadBalancerFactory != null) {
this.loadBalancerFactory = loadBalancerFactory;
} else {
this.loadBalancerFactory = DEFAULT_LOAD_BALANCER_FACTORY;
}
this.loadBalancerFactory = loadBalancerFactory;
return thisT();
}

View File

@ -0,0 +1,192 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.PickFirstBalancerFactory;
import io.grpc.Status;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.logging.Logger;
import javax.annotation.Nullable;
final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory {
private static final Logger logger =
Logger.getLogger(AutoConfiguredLoadBalancerFactory.class.getName());
@VisibleForTesting
static final String ROUND_ROUND_LOAD_BALANCER_FACTORY_NAME =
"io.grpc.util.RoundRobinLoadBalancerFactory";
@VisibleForTesting
static final String GRPCLB_LOAD_BALANCER_FACTORY_NAME =
"io.grpc.grpclb.GrpclbLoadBalancerFactory";
AutoConfiguredLoadBalancerFactory() {}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return new AutoConfiguredLoadBalancer(helper);
}
private static final class EmptySubchannelPicker extends SubchannelPicker {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withNoResult();
}
}
@VisibleForTesting
static final class AutoConfiguredLoadBalancer extends LoadBalancer {
private final Helper helper;
private LoadBalancer delegate;
private LoadBalancer.Factory delegateFactory;
AutoConfiguredLoadBalancer(Helper helper) {
this.helper = helper;
setDelegateFactory(PickFirstBalancerFactory.getInstance());
setDelegate(getDelegateFactory().newLoadBalancer(helper));
}
// Must be run inside ChannelExecutor.
@Override
public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> servers, Attributes attributes) {
if (attributes.keys().contains(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG)) {
Factory newlbf = decideLoadBalancerFactory(
servers, attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG));
if (newlbf != null && newlbf != delegateFactory) {
helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptySubchannelPicker());
getDelegate().shutdown();
setDelegateFactory(newlbf);
setDelegate(getDelegateFactory().newLoadBalancer(helper));
}
}
getDelegate().handleResolvedAddressGroups(servers, attributes);
}
@Override
public void handleNameResolutionError(Status error) {
getDelegate().handleNameResolutionError(error);
}
@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
getDelegate().handleSubchannelState(subchannel, stateInfo);
}
@Override
public void shutdown() {
getDelegate().shutdown();
setDelegate(null);
}
@VisibleForTesting
LoadBalancer getDelegate() {
return delegate;
}
@VisibleForTesting
void setDelegate(LoadBalancer delegate) {
this.delegate = delegate;
}
@VisibleForTesting
LoadBalancer.Factory getDelegateFactory() {
return delegateFactory;
}
@VisibleForTesting
void setDelegateFactory(LoadBalancer.Factory delegateFactory) {
this.delegateFactory = delegateFactory;
}
/**
* Picks a load balancer based on given criteria. In order of preference:
*
* <ol>
* <li>User provided lb on the channel. This is a degenerate case and not handled here.</li>
* <li>gRPCLB if on the class path and any gRPC LB balancer addresses are present</li>
* <li>RoundRobin if on the class path and picked by the service config</li>
* <li>PickFirst if the service config choice does not specify</li>
* </ol>
*
* @param servers The list of servers reported
* @param config the service config object
* @return the new load balancer factory, or null if the existing lb should be used.
*/
@Nullable
@VisibleForTesting
static LoadBalancer.Factory decideLoadBalancerFactory(
List<EquivalentAddressGroup> servers, Map<String, Object> config) {
// Check for balancer addresses
boolean haveBalancerAddress = false;
for (EquivalentAddressGroup s : servers) {
if (s.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY) != null) {
haveBalancerAddress = true;
break;
}
}
if (haveBalancerAddress) {
try {
Class<?> lbFactoryClass = Class.forName(GRPCLB_LOAD_BALANCER_FACTORY_NAME);
Method getInstance = lbFactoryClass.getMethod("getInstance");
return (LoadBalancer.Factory) getInstance.invoke(null);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Can't get GRPCLB, but balancer addresses were present", e);
}
}
String serviceConfigChoiceBalancingPolicy =
ServiceConfigUtil.getLoadBalancingPolicy(config);
// Check for an explicitly present lb choice
if (serviceConfigChoiceBalancingPolicy != null) {
if (serviceConfigChoiceBalancingPolicy.toUpperCase(Locale.ROOT).equals("ROUND_ROBIN")) {
try {
Class<?> lbFactoryClass = Class.forName(ROUND_ROUND_LOAD_BALANCER_FACTORY_NAME);
Method getInstance = lbFactoryClass.getMethod("getInstance");
return (LoadBalancer.Factory) getInstance.invoke(null);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Can't get Round Robin LB", e);
}
}
throw new IllegalArgumentException(
"Unknown service config policy: " + serviceConfigChoiceBalancingPolicy);
}
return PickFirstBalancerFactory.getInstance();
}
}
}

View File

@ -211,11 +211,9 @@ final class DnsNameResolver extends NameResolver {
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Can't parse service Configs", e);
}
// TODO(carl-mastrangelo): pass this as an object.
attrs.set(
GrpcAttributes.NAME_RESOLVER_ATTR_DNS_TXT,
Collections.unmodifiableList(new ArrayList<String>(resolvedInetAddrs.txtRecords)));
if (serviceConfig != null) {
attrs.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig);
}
} else {
logger.log(Level.FINE, "No TXT records found for {0}", new Object[]{host});
}

View File

@ -17,17 +17,17 @@
package io.grpc.internal;
import io.grpc.Attributes;
import java.util.List;
import java.util.Map;
/**
* Special attributes that are only useful to gRPC.
*/
public final class GrpcAttributes {
/**
* Attribute key TXT DNS records.
* Attribute key for service config.
*/
public static final Attributes.Key<List<String>> NAME_RESOLVER_ATTR_DNS_TXT =
Attributes.Key.of("dns-txt");
public static final Attributes.Key<Map<String, Object>> NAME_RESOLVER_SERVICE_CONFIG =
Attributes.Key.of("service-config");
/**
* The naming authority of a gRPC LB server address. It is an address-group-level attribute,

View File

@ -552,8 +552,11 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
this.nameResolverFactory = builder.getNameResolverFactory();
this.nameResolverParams = checkNotNull(builder.getNameResolverParams(), "nameResolverParams");
this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
this.loadBalancerFactory =
checkNotNull(builder.loadBalancerFactory, "loadBalancerFactory");
if (builder.loadBalancerFactory == null) {
this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory();
} else {
this.loadBalancerFactory = builder.loadBalancerFactory;
}
this.executorPool = checkNotNull(builder.executorPool, "executorPool");
this.oobExecutorPool = checkNotNull(oobExecutorPool, "oobExecutorPool");
this.executor = checkNotNull(executorPool.getObject(), "executor");
@ -1144,11 +1147,9 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
}
private class NameResolverListenerImpl implements NameResolver.Listener {
final LoadBalancer balancer;
final LoadBalancer.Helper helper;
final LbHelperImpl helper;
NameResolverListenerImpl(LbHelperImpl helperImpl) {
this.balancer = helperImpl.lb;
this.helper = helperImpl;
}
@ -1185,7 +1186,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
re);
}
balancer.handleResolvedAddressGroups(servers, config);
helper.lb.handleResolvedAddressGroups(servers, config);
}
}
@ -1206,7 +1207,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
return;
}
balancer.handleNameResolutionError(error);
helper.lb.handleNameResolutionError(error);
if (nameResolverRefreshFuture != null) {
// The name resolver may invoke onError multiple times, but we only want to
// schedule one backoff attempt

View File

@ -50,6 +50,15 @@ final class ServiceConfigUtil {
private ServiceConfigUtil() {}
@Nullable
static String getLoadBalancingPolicy(Map<String, Object> serviceConfig) {
String key = "loadBalancingPolicy";
if (serviceConfig.containsKey(key)) {
return getString(serviceConfig, key);
}
return null;
}
/**
* Determines if a given Service Config choice applies, and if so, returns it.
*
@ -179,6 +188,20 @@ final class ServiceConfigUtil {
String.format("value %s for key %s in %s is not Double", value, key, obj));
}
/**
* Gets a string from an object for the given key.
*/
@SuppressWarnings("unchecked")
private static String getString(Map<String, Object> obj, String key) {
assert obj.containsKey(key);
Object value = checkNotNull(obj.get(key), "no such key %s", key);
if (value instanceof String) {
return (String) value;
}
throw new ClassCastException(
String.format("value %s for key %s in %s is not String", value, key, obj));
}
/**
* Gets a string from an object for the given index.
*/

View File

@ -117,7 +117,7 @@ public class AbstractManagedChannelImplBuilderTest {
@Test
public void loadBalancerFactory_default() {
assertNotNull(builder.loadBalancerFactory);
assertNull(builder.loadBalancerFactory);
}
@Test

View File

@ -0,0 +1,189 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.ManagedChannel;
import io.grpc.NameResolver.Factory;
import io.grpc.PickFirstBalancerFactory;
import io.grpc.Status;
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Unit tests for {@link AutoConfiguredLoadBalancerFactory}.
*/
// TODO(carl-mastrangelo): Add tests for selection logic.
@RunWith(JUnit4.class)
public class AutoConfiguredLoadBalancerFactoryTest {
private final AutoConfiguredLoadBalancerFactory lbf = new AutoConfiguredLoadBalancerFactory();
@Test
public void newLoadBalancer_isAuto() {
LoadBalancer lb = lbf.newLoadBalancer(new TestHelper());
assertThat(lb).isInstanceOf(AutoConfiguredLoadBalancer.class);
}
@Test
public void defaultIsPickFirst() {
AutoConfiguredLoadBalancer lb =
(AutoConfiguredLoadBalancer) lbf.newLoadBalancer(new TestHelper());
assertThat(lb.getDelegateFactory()).isInstanceOf(PickFirstBalancerFactory.class);
assertThat(lb.getDelegate().getClass().getName()).contains("PickFirst");
}
@Test
public void forwardsCalls() {
AutoConfiguredLoadBalancer lb =
(AutoConfiguredLoadBalancer) lbf.newLoadBalancer(new TestHelper());
final AtomicInteger calls = new AtomicInteger();
TestLoadBalancer testlb = new TestLoadBalancer() {
@Override
public void handleNameResolutionError(Status error) {
calls.getAndSet(1);
}
@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
calls.getAndSet(2);
}
@Override
public void shutdown() {
calls.getAndSet(3);
}
};
lb.setDelegate(testlb);
lb.handleNameResolutionError(Status.RESOURCE_EXHAUSTED);
assertThat(calls.getAndSet(0)).isEqualTo(1);
lb.handleSubchannelState(null, null);
assertThat(calls.getAndSet(0)).isEqualTo(2);
lb.shutdown();
assertThat(calls.getAndSet(0)).isEqualTo(3);
}
public static class ForwardingLoadBalancer extends LoadBalancer {
private final LoadBalancer delegate;
public ForwardingLoadBalancer(LoadBalancer delegate) {
this.delegate = delegate;
}
protected LoadBalancer delegate() {
return delegate;
}
@Override
public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> servers, Attributes attributes) {
delegate().handleResolvedAddressGroups(servers, attributes);
}
@Override
public void handleNameResolutionError(Status error) {
delegate().handleNameResolutionError(error);
}
@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
delegate().handleSubchannelState(subchannel, stateInfo);
}
@Override
public void shutdown() {
delegate().shutdown();
}
}
public static class ForwardingLoadBalancerHelper extends Helper {
private final Helper delegate;
public ForwardingLoadBalancerHelper(Helper delegate) {
this.delegate = delegate;
}
protected Helper delegate() {
return delegate;
}
@Override
public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) {
return delegate().createSubchannel(addrs, attrs);
}
@Override
public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) {
return delegate().createOobChannel(eag, authority);
}
@Override
public void updateBalancingState(
@Nonnull ConnectivityState newState, @Nonnull SubchannelPicker newPicker) {
delegate().updateBalancingState(newState, newPicker);
}
@Override
public void runSerialized(Runnable task) {
delegate().runSerialized(task);
}
@Override
public Factory getNameResolverFactory() {
return delegate().getNameResolverFactory();
}
@Override
public String getAuthority() {
return delegate().getAuthority();
}
}
private static class TestLoadBalancer extends ForwardingLoadBalancer {
TestLoadBalancer() {
super(null);
}
}
private static class TestHelper extends ForwardingLoadBalancerHelper {
TestHelper() {
super(null);
}
}
}