From 98aa69af7295df22fde75dbfbf05728f0562200e Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Fri, 12 Jan 2018 16:56:54 -0800 Subject: [PATCH] core: make ManagedChannel honor Service config LB I think the idle state transitions are correct. I have looked at them and tried tracing a few paths through. This doesn't go full idle because the name resolver doesn't need to be restarted. Also, the LB transition happens inside of a NR callback, so it would be odd to have the NR terminate itself upon successful resolution. (Might this cause recursion? I think it may). --- core/BUILD.bazel | 1 + .../AbstractManagedChannelImplBuilder.java | 13 +- .../AutoConfiguredLoadBalancerFactory.java | 197 ++++++++++++++++++ .../io/grpc/internal/DnsNameResolver.java | 6 +- .../java/io/grpc/internal/GrpcAttributes.java | 7 +- .../io/grpc/internal/ManagedChannelImpl.java | 19 +- ...AbstractManagedChannelImplBuilderTest.java | 2 +- ...AutoConfiguredLoadBalancerFactoryTest.java | 189 +++++++++++++++++ 8 files changed, 409 insertions(+), 25 deletions(-) create mode 100644 core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java create mode 100644 core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java diff --git a/core/BUILD.bazel b/core/BUILD.bazel index 9a99c44640..37c08c3131 100644 --- a/core/BUILD.bazel +++ b/core/BUILD.bazel @@ -42,6 +42,7 @@ java_library( "//context", "@com_google_code_findbugs_jsr305//jar", "@com_google_errorprone_error_prone_annotations//jar", + "@com_google_code_gson_gson//jar", "@com_google_guava_guava//jar", "@com_google_instrumentation_instrumentation_api//jar", "@io_opencensus_opencensus_api//jar", diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index 765170b8e4..675212ec30 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -31,7 +31,6 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.NameResolver; import io.grpc.NameResolverProvider; -import io.grpc.PickFirstBalancerFactory; import io.opencensus.trace.Tracing; import java.net.SocketAddress; import java.net.URI; @@ -85,9 +84,6 @@ public abstract class AbstractManagedChannelImplBuilder private static final NameResolver.Factory DEFAULT_NAME_RESOLVER_FACTORY = NameResolverProvider.asFactory(); - private static final LoadBalancer.Factory DEFAULT_LOAD_BALANCER_FACTORY = - PickFirstBalancerFactory.getInstance(); - private static final DecompressorRegistry DEFAULT_DECOMPRESSOR_REGISTRY = DecompressorRegistry.getDefaultInstance(); @@ -113,8 +109,7 @@ public abstract class AbstractManagedChannelImplBuilder @Nullable String authorityOverride; - - LoadBalancer.Factory loadBalancerFactory = DEFAULT_LOAD_BALANCER_FACTORY; + @Nullable LoadBalancer.Factory loadBalancerFactory; boolean fullStreamDecompression; @@ -223,11 +218,7 @@ public abstract class AbstractManagedChannelImplBuilder Preconditions.checkState(directServerAddress == null, "directServerAddress is set (%s), which forbids the use of LoadBalancer.Factory", directServerAddress); - if (loadBalancerFactory != null) { - this.loadBalancerFactory = loadBalancerFactory; - } else { - this.loadBalancerFactory = DEFAULT_LOAD_BALANCER_FACTORY; - } + this.loadBalancerFactory = loadBalancerFactory; return thisT(); } diff --git a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java new file mode 100644 index 0000000000..277a558501 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java @@ -0,0 +1,197 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import com.google.gson.JsonObject; +import io.grpc.Attributes; +import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickResult; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.PickFirstBalancerFactory; +import io.grpc.Status; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Locale; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory { + private static final Logger logger = + Logger.getLogger(AutoConfiguredLoadBalancerFactory.class.getName()); + + @VisibleForTesting + static final String ROUND_ROUND_LOAD_BALANCER_FACTORY_NAME = + "io.grpc.util.RoundRobinLoadBalancerFactory"; + @VisibleForTesting + static final String GRPCLB_LOAD_BALANCER_FACTORY_NAME = + "io.grpc.grpclb.GrpclbLoadBalancerFactory"; + + AutoConfiguredLoadBalancerFactory() {} + + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + return new AutoConfiguredLoadBalancer(helper); + } + + private static final class EmptySubchannelPicker extends SubchannelPicker { + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withNoResult(); + } + } + + @VisibleForTesting + static final class AutoConfiguredLoadBalancer extends LoadBalancer { + private final Helper helper; + private LoadBalancer delegate; + private LoadBalancer.Factory delegateFactory; + + AutoConfiguredLoadBalancer(Helper helper) { + this.helper = helper; + setDelegateFactory(PickFirstBalancerFactory.getInstance()); + setDelegate(getDelegateFactory().newLoadBalancer(helper)); + } + + // Must be run inside ChannelExecutor. + @Override + public void handleResolvedAddressGroups( + List servers, Attributes attributes) { + if (attributes.keys().contains(GrpcAttributes.NAME_RESOLVER_ATTR_SERVICE_CONFIG)) { + Factory newlbf = decideLoadBalancerFactory( + servers, attributes.get(GrpcAttributes.NAME_RESOLVER_ATTR_SERVICE_CONFIG)); + if (newlbf != null && newlbf != delegateFactory) { + helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptySubchannelPicker()); + getDelegate().shutdown(); + setDelegateFactory(newlbf); + setDelegate(getDelegateFactory().newLoadBalancer(helper)); + } + } + getDelegate().handleResolvedAddressGroups(servers, attributes); + } + + @Override + public void handleNameResolutionError(Status error) { + getDelegate().handleNameResolutionError(error); + } + + @Override + public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { + getDelegate().handleSubchannelState(subchannel, stateInfo); + } + + @Override + public void shutdown() { + getDelegate().shutdown(); + setDelegate(null); + } + + @VisibleForTesting + LoadBalancer getDelegate() { + return delegate; + } + + @VisibleForTesting + void setDelegate(LoadBalancer delegate) { + this.delegate = delegate; + } + + @VisibleForTesting + LoadBalancer.Factory getDelegateFactory() { + return delegateFactory; + } + + @VisibleForTesting + void setDelegateFactory(LoadBalancer.Factory delegateFactory) { + this.delegateFactory = delegateFactory; + } + + /** + * Picks a load balancer based on given criteria. In order of preference: + * + *
    + *
  1. User provided lb on the channel. This is a degenerate case and not handled here.
  2. + *
  3. gRPCLB if on the class path and any gRPC LB balancer addresses are present
  4. + *
  5. RoundRobin if on the class path and picked by the service config
  6. + *
  7. PickFirst if the service config choice does not specify
  8. + *
+ * + * @param servers The list of servers reported + * @param choice the service config object + * @return the new load balancer factory, or null if the existing lb should be used. + */ + @Nullable + @VisibleForTesting + static LoadBalancer.Factory decideLoadBalancerFactory( + List servers, JsonObject choice) { + boolean loadBalancingPolicyPresent = choice.has("loadBalancingPolicy"); + + // Check for balancer addresses + boolean haveBalancerAddress = false; + for (EquivalentAddressGroup s : servers) { + if (s.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY) != null) { + haveBalancerAddress = true; + break; + } + } + + if (haveBalancerAddress) { + try { + Class lbFactoryClass; + lbFactoryClass = Class.forName(GRPCLB_LOAD_BALANCER_FACTORY_NAME); + Method getInstance = lbFactoryClass.getMethod("getInstance"); + return (LoadBalancer.Factory) getInstance.invoke(null); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException("Can't get GRPCLB, but balancer addresses were present", e); + } + } + + // Check for an explicitly present lb choice + if (loadBalancingPolicyPresent) { + String serviceConfigChoiceBalancingPolicy = null; + serviceConfigChoiceBalancingPolicy = choice.get("loadBalancingPolicy").getAsString(); + String policy = checkNotNull(serviceConfigChoiceBalancingPolicy, "policy"); + if (policy.toUpperCase(Locale.ROOT).equals("ROUND_ROBIN")) { + ClassNotFoundException caught = null; + try { + Class lbFactoryClass; + lbFactoryClass = Class.forName(ROUND_ROUND_LOAD_BALANCER_FACTORY_NAME); + Method getInstance = lbFactoryClass.getMethod("getInstance"); + return (LoadBalancer.Factory) getInstance.invoke(null); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException("Can't get Round Robin LB", e); + } + } + throw new IllegalArgumentException("Unknown service config policy: " + policy); + } + + return PickFirstBalancerFactory.getInstance(); + } + } +} diff --git a/core/src/main/java/io/grpc/internal/DnsNameResolver.java b/core/src/main/java/io/grpc/internal/DnsNameResolver.java index 63ec83325d..4d028c3b2b 100644 --- a/core/src/main/java/io/grpc/internal/DnsNameResolver.java +++ b/core/src/main/java/io/grpc/internal/DnsNameResolver.java @@ -194,9 +194,13 @@ final class DnsNameResolver extends NameResolver { Attributes.Builder attrs = Attributes.newBuilder(); if (!resolvedInetAddrs.txtRecords.isEmpty()) { + // TODO(carl-mastrangelo): re enable this + /* attrs.set( - GrpcAttributes.NAME_RESOLVER_ATTR_DNS_TXT, + GrpcAttributes.NAME_RESOLVER_ATTR_SERVICE_CONFIG, Collections.unmodifiableList(new ArrayList(resolvedInetAddrs.txtRecords))); + */ + } savedListener.onAddresses(servers, attrs.build()); } finally { diff --git a/core/src/main/java/io/grpc/internal/GrpcAttributes.java b/core/src/main/java/io/grpc/internal/GrpcAttributes.java index b6d5e8a86a..8178d803b5 100644 --- a/core/src/main/java/io/grpc/internal/GrpcAttributes.java +++ b/core/src/main/java/io/grpc/internal/GrpcAttributes.java @@ -16,8 +16,8 @@ package io.grpc.internal; +import com.google.gson.JsonObject; import io.grpc.Attributes; -import java.util.List; /** * Special attributes that are only useful to gRPC. @@ -26,8 +26,8 @@ public final class GrpcAttributes { /** * Attribute key TXT DNS records. */ - public static final Attributes.Key> NAME_RESOLVER_ATTR_DNS_TXT = - Attributes.Key.of("dns-txt"); + public static final Attributes.Key NAME_RESOLVER_ATTR_SERVICE_CONFIG = + Attributes.Key.of("service-config"); /** * The naming authority of a gRPC LB server address. It is an address-group-level attribute, @@ -36,6 +36,5 @@ public final class GrpcAttributes { public static final Attributes.Key ATTR_LB_ADDR_AUTHORITY = Attributes.Key.of("io.grpc.grpclb.lbAddrAuthority"); - private GrpcAttributes() {} } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 126d0816c8..e76d870c5e 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -113,6 +113,7 @@ public final class ManagedChannelImpl private final NameResolver.Factory nameResolverFactory; private final Attributes nameResolverParams; private final LoadBalancer.Factory loadBalancerFactory; + private final ClientTransportFactory transportFactory; private final Executor executor; private final ObjectPool executorPool; @@ -459,8 +460,11 @@ public final class ManagedChannelImpl this.nameResolverFactory = builder.getNameResolverFactory(); this.nameResolverParams = checkNotNull(builder.getNameResolverParams(), "nameResolverParams"); this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams); - this.loadBalancerFactory = - checkNotNull(builder.loadBalancerFactory, "loadBalancerFactory"); + if (builder.loadBalancerFactory == null) { + this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(); + } else { + this.loadBalancerFactory = builder.loadBalancerFactory; + } this.executorPool = checkNotNull(builder.executorPool, "executorPool"); this.oobExecutorPool = checkNotNull(oobExecutorPool, "oobExecutorPool"); this.executor = checkNotNull(executorPool.getObject(), "executor"); @@ -983,11 +987,9 @@ public final class ManagedChannelImpl } private class NameResolverListenerImpl implements NameResolver.Listener { - final LoadBalancer balancer; - final LoadBalancer.Helper helper; + final LbHelperImpl helper; NameResolverListenerImpl(LbHelperImpl helperImpl) { - this.balancer = helperImpl.lb; this.helper = helperImpl; } @@ -1002,6 +1004,7 @@ public final class ManagedChannelImpl new Object[]{getLogId(), servers, config}); } + final class NamesResolved implements Runnable { @Override public void run() { @@ -1010,13 +1013,13 @@ public final class ManagedChannelImpl return; } try { - balancer.handleResolvedAddressGroups(servers, config); + helper.lb.handleResolvedAddressGroups(servers, config); } catch (Throwable e) { logger.log( Level.WARNING, "[" + getLogId() + "] Unexpected exception from LoadBalancer", e); // It must be a bug! Push the exception back to LoadBalancer in the hope that it may // be propagated to the application. - balancer.handleNameResolutionError(Status.INTERNAL.withCause(e) + helper.lb.handleNameResolutionError(Status.INTERNAL.withCause(e) .withDescription("Thrown from handleResolvedAddresses(): " + e)); } } @@ -1037,7 +1040,7 @@ public final class ManagedChannelImpl if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) { return; } - balancer.handleNameResolutionError(error); + lbHelper.lb.handleNameResolutionError(error); } }).drain(); } diff --git a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java index 3ca5db366a..7d53ae348f 100644 --- a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java @@ -117,7 +117,7 @@ public class AbstractManagedChannelImplBuilderTest { @Test public void loadBalancerFactory_default() { - assertNotNull(builder.loadBalancerFactory); + assertNull(builder.loadBalancerFactory); } @Test diff --git a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java new file mode 100644 index 0000000000..124d0f9338 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java @@ -0,0 +1,189 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static com.google.common.truth.Truth.assertThat; + +import io.grpc.Attributes; +import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.Subchannel; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.ManagedChannel; +import io.grpc.NameResolver.Factory; +import io.grpc.PickFirstBalancerFactory; +import io.grpc.Status; +import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nonnull; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link AutoConfiguredLoadBalancerFactory}. + */ +// TODO(carl-mastrangelo): Add tests for selection logic. +@RunWith(JUnit4.class) +public class AutoConfiguredLoadBalancerFactoryTest { + private final AutoConfiguredLoadBalancerFactory lbf = new AutoConfiguredLoadBalancerFactory(); + + @Test + public void newLoadBalancer_isAuto() { + LoadBalancer lb = lbf.newLoadBalancer(new TestHelper()); + + assertThat(lb).isInstanceOf(AutoConfiguredLoadBalancer.class); + } + + @Test + public void defaultIsPickFirst() { + AutoConfiguredLoadBalancer lb = + (AutoConfiguredLoadBalancer) lbf.newLoadBalancer(new TestHelper()); + + assertThat(lb.getDelegateFactory()).isInstanceOf(PickFirstBalancerFactory.class); + assertThat(lb.getDelegate().getClass().getName()).contains("PickFirst"); + } + + @Test + public void forwardsCalls() { + AutoConfiguredLoadBalancer lb = + (AutoConfiguredLoadBalancer) lbf.newLoadBalancer(new TestHelper()); + + final AtomicInteger calls = new AtomicInteger(); + TestLoadBalancer testlb = new TestLoadBalancer() { + + @Override + public void handleNameResolutionError(Status error) { + calls.getAndSet(1); + } + + @Override + public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { + calls.getAndSet(2); + } + + @Override + public void shutdown() { + calls.getAndSet(3); + } + }; + + lb.setDelegate(testlb); + + lb.handleNameResolutionError(Status.RESOURCE_EXHAUSTED); + assertThat(calls.getAndSet(0)).isEqualTo(1); + + lb.handleSubchannelState(null, null); + assertThat(calls.getAndSet(0)).isEqualTo(2); + + lb.shutdown(); + assertThat(calls.getAndSet(0)).isEqualTo(3); + } + + public static class ForwardingLoadBalancer extends LoadBalancer { + private final LoadBalancer delegate; + + public ForwardingLoadBalancer(LoadBalancer delegate) { + this.delegate = delegate; + } + + protected LoadBalancer delegate() { + return delegate; + } + + @Override + public void handleResolvedAddressGroups( + List servers, Attributes attributes) { + delegate().handleResolvedAddressGroups(servers, attributes); + } + + @Override + public void handleNameResolutionError(Status error) { + delegate().handleNameResolutionError(error); + } + + @Override + public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { + delegate().handleSubchannelState(subchannel, stateInfo); + } + + @Override + public void shutdown() { + delegate().shutdown(); + } + } + + public static class ForwardingLoadBalancerHelper extends Helper { + + private final Helper delegate; + + public ForwardingLoadBalancerHelper(Helper delegate) { + this.delegate = delegate; + } + + protected Helper delegate() { + return delegate; + } + + @Override + public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) { + return delegate().createSubchannel(addrs, attrs); + } + + @Override + public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) { + return delegate().createOobChannel(eag, authority); + } + + @Override + public void updateBalancingState( + @Nonnull ConnectivityState newState, @Nonnull SubchannelPicker newPicker) { + delegate().updateBalancingState(newState, newPicker); + } + + @Override + public void runSerialized(Runnable task) { + delegate().runSerialized(task); + } + + @Override + public Factory getNameResolverFactory() { + return delegate().getNameResolverFactory(); + } + + @Override + public String getAuthority() { + return delegate().getAuthority(); + } + } + + private static class TestLoadBalancer extends ForwardingLoadBalancer { + TestLoadBalancer() { + super(null); + } + } + + private static class TestHelper extends ForwardingLoadBalancerHelper { + TestHelper() { + super(null); + } + } +}