From 7daefd75a144b64874e85b55f98744e56ab94d89 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Fri, 16 Mar 2018 14:47:25 -0700 Subject: [PATCH] core: make ManagedChannel honor Service config LB --- .../AbstractManagedChannelImplBuilder.java | 12 +- .../AutoConfiguredLoadBalancerFactory.java | 192 ++++++++++++++++++ .../io/grpc/internal/DnsNameResolver.java | 8 +- .../java/io/grpc/internal/GrpcAttributes.java | 8 +- .../io/grpc/internal/ManagedChannelImpl.java | 15 +- .../io/grpc/internal/ServiceConfigUtil.java | 23 +++ ...AbstractManagedChannelImplBuilderTest.java | 2 +- ...AutoConfiguredLoadBalancerFactoryTest.java | 189 +++++++++++++++++ 8 files changed, 422 insertions(+), 27 deletions(-) create mode 100644 core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java create mode 100644 core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index b960dbd0b3..140ab9414e 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -31,7 +31,6 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.NameResolver; import io.grpc.NameResolverProvider; -import io.grpc.PickFirstBalancerFactory; import io.opencensus.trace.Tracing; import java.net.SocketAddress; import java.net.URI; @@ -85,9 +84,6 @@ public abstract class AbstractManagedChannelImplBuilder private static final NameResolver.Factory DEFAULT_NAME_RESOLVER_FACTORY = NameResolverProvider.asFactory(); - private static final LoadBalancer.Factory DEFAULT_LOAD_BALANCER_FACTORY = - PickFirstBalancerFactory.getInstance(); - private static final DecompressorRegistry DEFAULT_DECOMPRESSOR_REGISTRY = DecompressorRegistry.getDefaultInstance(); @@ -117,7 +113,7 @@ public abstract class AbstractManagedChannelImplBuilder String authorityOverride; - LoadBalancer.Factory loadBalancerFactory = DEFAULT_LOAD_BALANCER_FACTORY; + @Nullable LoadBalancer.Factory loadBalancerFactory; boolean fullStreamDecompression; @@ -236,11 +232,7 @@ public abstract class AbstractManagedChannelImplBuilder Preconditions.checkState(directServerAddress == null, "directServerAddress is set (%s), which forbids the use of LoadBalancer.Factory", directServerAddress); - if (loadBalancerFactory != null) { - this.loadBalancerFactory = loadBalancerFactory; - } else { - this.loadBalancerFactory = DEFAULT_LOAD_BALANCER_FACTORY; - } + this.loadBalancerFactory = loadBalancerFactory; return thisT(); } diff --git a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java new file mode 100644 index 0000000000..9d84bc3dac --- /dev/null +++ b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java @@ -0,0 +1,192 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import com.google.common.annotations.VisibleForTesting; +import io.grpc.Attributes; +import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickResult; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.PickFirstBalancerFactory; +import io.grpc.Status; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory { + private static final Logger logger = + Logger.getLogger(AutoConfiguredLoadBalancerFactory.class.getName()); + + @VisibleForTesting + static final String ROUND_ROUND_LOAD_BALANCER_FACTORY_NAME = + "io.grpc.util.RoundRobinLoadBalancerFactory"; + @VisibleForTesting + static final String GRPCLB_LOAD_BALANCER_FACTORY_NAME = + "io.grpc.grpclb.GrpclbLoadBalancerFactory"; + + AutoConfiguredLoadBalancerFactory() {} + + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + return new AutoConfiguredLoadBalancer(helper); + } + + private static final class EmptySubchannelPicker extends SubchannelPicker { + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withNoResult(); + } + } + + @VisibleForTesting + static final class AutoConfiguredLoadBalancer extends LoadBalancer { + private final Helper helper; + private LoadBalancer delegate; + private LoadBalancer.Factory delegateFactory; + + AutoConfiguredLoadBalancer(Helper helper) { + this.helper = helper; + setDelegateFactory(PickFirstBalancerFactory.getInstance()); + setDelegate(getDelegateFactory().newLoadBalancer(helper)); + } + + // Must be run inside ChannelExecutor. + @Override + public void handleResolvedAddressGroups( + List servers, Attributes attributes) { + if (attributes.keys().contains(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG)) { + Factory newlbf = decideLoadBalancerFactory( + servers, attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG)); + if (newlbf != null && newlbf != delegateFactory) { + helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptySubchannelPicker()); + getDelegate().shutdown(); + setDelegateFactory(newlbf); + setDelegate(getDelegateFactory().newLoadBalancer(helper)); + } + } + getDelegate().handleResolvedAddressGroups(servers, attributes); + } + + @Override + public void handleNameResolutionError(Status error) { + getDelegate().handleNameResolutionError(error); + } + + @Override + public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { + getDelegate().handleSubchannelState(subchannel, stateInfo); + } + + @Override + public void shutdown() { + getDelegate().shutdown(); + setDelegate(null); + } + + @VisibleForTesting + LoadBalancer getDelegate() { + return delegate; + } + + @VisibleForTesting + void setDelegate(LoadBalancer delegate) { + this.delegate = delegate; + } + + @VisibleForTesting + LoadBalancer.Factory getDelegateFactory() { + return delegateFactory; + } + + @VisibleForTesting + void setDelegateFactory(LoadBalancer.Factory delegateFactory) { + this.delegateFactory = delegateFactory; + } + + /** + * Picks a load balancer based on given criteria. In order of preference: + * + *
    + *
  1. User provided lb on the channel. This is a degenerate case and not handled here.
  2. + *
  3. gRPCLB if on the class path and any gRPC LB balancer addresses are present
  4. + *
  5. RoundRobin if on the class path and picked by the service config
  6. + *
  7. PickFirst if the service config choice does not specify
  8. + *
+ * + * @param servers The list of servers reported + * @param config the service config object + * @return the new load balancer factory, or null if the existing lb should be used. + */ + @Nullable + @VisibleForTesting + static LoadBalancer.Factory decideLoadBalancerFactory( + List servers, Map config) { + + // Check for balancer addresses + boolean haveBalancerAddress = false; + for (EquivalentAddressGroup s : servers) { + if (s.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY) != null) { + haveBalancerAddress = true; + break; + } + } + + if (haveBalancerAddress) { + try { + Class lbFactoryClass = Class.forName(GRPCLB_LOAD_BALANCER_FACTORY_NAME); + Method getInstance = lbFactoryClass.getMethod("getInstance"); + return (LoadBalancer.Factory) getInstance.invoke(null); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException("Can't get GRPCLB, but balancer addresses were present", e); + } + } + + String serviceConfigChoiceBalancingPolicy = + ServiceConfigUtil.getLoadBalancingPolicy(config); + + // Check for an explicitly present lb choice + if (serviceConfigChoiceBalancingPolicy != null) { + if (serviceConfigChoiceBalancingPolicy.toUpperCase(Locale.ROOT).equals("ROUND_ROBIN")) { + try { + Class lbFactoryClass = Class.forName(ROUND_ROUND_LOAD_BALANCER_FACTORY_NAME); + Method getInstance = lbFactoryClass.getMethod("getInstance"); + return (LoadBalancer.Factory) getInstance.invoke(null); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException("Can't get Round Robin LB", e); + } + } + throw new IllegalArgumentException( + "Unknown service config policy: " + serviceConfigChoiceBalancingPolicy); + } + + return PickFirstBalancerFactory.getInstance(); + } + } +} diff --git a/core/src/main/java/io/grpc/internal/DnsNameResolver.java b/core/src/main/java/io/grpc/internal/DnsNameResolver.java index ae8faa8f62..e1c1f9817f 100644 --- a/core/src/main/java/io/grpc/internal/DnsNameResolver.java +++ b/core/src/main/java/io/grpc/internal/DnsNameResolver.java @@ -211,11 +211,9 @@ final class DnsNameResolver extends NameResolver { } catch (RuntimeException e) { logger.log(Level.WARNING, "Can't parse service Configs", e); } - - // TODO(carl-mastrangelo): pass this as an object. - attrs.set( - GrpcAttributes.NAME_RESOLVER_ATTR_DNS_TXT, - Collections.unmodifiableList(new ArrayList(resolvedInetAddrs.txtRecords))); + if (serviceConfig != null) { + attrs.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig); + } } else { logger.log(Level.FINE, "No TXT records found for {0}", new Object[]{host}); } diff --git a/core/src/main/java/io/grpc/internal/GrpcAttributes.java b/core/src/main/java/io/grpc/internal/GrpcAttributes.java index b6d5e8a86a..157806d92e 100644 --- a/core/src/main/java/io/grpc/internal/GrpcAttributes.java +++ b/core/src/main/java/io/grpc/internal/GrpcAttributes.java @@ -17,17 +17,17 @@ package io.grpc.internal; import io.grpc.Attributes; -import java.util.List; +import java.util.Map; /** * Special attributes that are only useful to gRPC. */ public final class GrpcAttributes { /** - * Attribute key TXT DNS records. + * Attribute key for service config. */ - public static final Attributes.Key> NAME_RESOLVER_ATTR_DNS_TXT = - Attributes.Key.of("dns-txt"); + public static final Attributes.Key> NAME_RESOLVER_SERVICE_CONFIG = + Attributes.Key.of("service-config"); /** * The naming authority of a gRPC LB server address. It is an address-group-level attribute, diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index c9453cf9b2..cb218b1276 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -552,8 +552,11 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented serviceConfig) { + String key = "loadBalancingPolicy"; + if (serviceConfig.containsKey(key)) { + return getString(serviceConfig, key); + } + return null; + } + /** * Determines if a given Service Config choice applies, and if so, returns it. * @@ -179,6 +188,20 @@ final class ServiceConfigUtil { String.format("value %s for key %s in %s is not Double", value, key, obj)); } + /** + * Gets a string from an object for the given key. + */ + @SuppressWarnings("unchecked") + private static String getString(Map obj, String key) { + assert obj.containsKey(key); + Object value = checkNotNull(obj.get(key), "no such key %s", key); + if (value instanceof String) { + return (String) value; + } + throw new ClassCastException( + String.format("value %s for key %s in %s is not String", value, key, obj)); + } + /** * Gets a string from an object for the given index. */ diff --git a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java index 79f25da1ee..cacd07dc37 100644 --- a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java @@ -117,7 +117,7 @@ public class AbstractManagedChannelImplBuilderTest { @Test public void loadBalancerFactory_default() { - assertNotNull(builder.loadBalancerFactory); + assertNull(builder.loadBalancerFactory); } @Test diff --git a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java new file mode 100644 index 0000000000..124d0f9338 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java @@ -0,0 +1,189 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static com.google.common.truth.Truth.assertThat; + +import io.grpc.Attributes; +import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.Subchannel; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.ManagedChannel; +import io.grpc.NameResolver.Factory; +import io.grpc.PickFirstBalancerFactory; +import io.grpc.Status; +import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nonnull; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link AutoConfiguredLoadBalancerFactory}. + */ +// TODO(carl-mastrangelo): Add tests for selection logic. +@RunWith(JUnit4.class) +public class AutoConfiguredLoadBalancerFactoryTest { + private final AutoConfiguredLoadBalancerFactory lbf = new AutoConfiguredLoadBalancerFactory(); + + @Test + public void newLoadBalancer_isAuto() { + LoadBalancer lb = lbf.newLoadBalancer(new TestHelper()); + + assertThat(lb).isInstanceOf(AutoConfiguredLoadBalancer.class); + } + + @Test + public void defaultIsPickFirst() { + AutoConfiguredLoadBalancer lb = + (AutoConfiguredLoadBalancer) lbf.newLoadBalancer(new TestHelper()); + + assertThat(lb.getDelegateFactory()).isInstanceOf(PickFirstBalancerFactory.class); + assertThat(lb.getDelegate().getClass().getName()).contains("PickFirst"); + } + + @Test + public void forwardsCalls() { + AutoConfiguredLoadBalancer lb = + (AutoConfiguredLoadBalancer) lbf.newLoadBalancer(new TestHelper()); + + final AtomicInteger calls = new AtomicInteger(); + TestLoadBalancer testlb = new TestLoadBalancer() { + + @Override + public void handleNameResolutionError(Status error) { + calls.getAndSet(1); + } + + @Override + public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { + calls.getAndSet(2); + } + + @Override + public void shutdown() { + calls.getAndSet(3); + } + }; + + lb.setDelegate(testlb); + + lb.handleNameResolutionError(Status.RESOURCE_EXHAUSTED); + assertThat(calls.getAndSet(0)).isEqualTo(1); + + lb.handleSubchannelState(null, null); + assertThat(calls.getAndSet(0)).isEqualTo(2); + + lb.shutdown(); + assertThat(calls.getAndSet(0)).isEqualTo(3); + } + + public static class ForwardingLoadBalancer extends LoadBalancer { + private final LoadBalancer delegate; + + public ForwardingLoadBalancer(LoadBalancer delegate) { + this.delegate = delegate; + } + + protected LoadBalancer delegate() { + return delegate; + } + + @Override + public void handleResolvedAddressGroups( + List servers, Attributes attributes) { + delegate().handleResolvedAddressGroups(servers, attributes); + } + + @Override + public void handleNameResolutionError(Status error) { + delegate().handleNameResolutionError(error); + } + + @Override + public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { + delegate().handleSubchannelState(subchannel, stateInfo); + } + + @Override + public void shutdown() { + delegate().shutdown(); + } + } + + public static class ForwardingLoadBalancerHelper extends Helper { + + private final Helper delegate; + + public ForwardingLoadBalancerHelper(Helper delegate) { + this.delegate = delegate; + } + + protected Helper delegate() { + return delegate; + } + + @Override + public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) { + return delegate().createSubchannel(addrs, attrs); + } + + @Override + public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) { + return delegate().createOobChannel(eag, authority); + } + + @Override + public void updateBalancingState( + @Nonnull ConnectivityState newState, @Nonnull SubchannelPicker newPicker) { + delegate().updateBalancingState(newState, newPicker); + } + + @Override + public void runSerialized(Runnable task) { + delegate().runSerialized(task); + } + + @Override + public Factory getNameResolverFactory() { + return delegate().getNameResolverFactory(); + } + + @Override + public String getAuthority() { + return delegate().getAuthority(); + } + } + + private static class TestLoadBalancer extends ForwardingLoadBalancer { + TestLoadBalancer() { + super(null); + } + } + + private static class TestHelper extends ForwardingLoadBalancerHelper { + TestHelper() { + super(null); + } + } +}