From 64d272ae7ce6286bda110f5d43c896e56b671d19 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Fri, 20 Jul 2018 17:47:03 -0700 Subject: [PATCH] core: make service config errors recoverable --- .../AutoConfiguredLoadBalancerFactory.java | 54 ++++++++- .../grpc/internal/ManagedChannelImplTest.java | 107 ++++++++++++++++++ 2 files changed, 155 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java index 9f5c344250..80505bce3f 100644 --- a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java @@ -50,14 +50,22 @@ final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory { return new AutoConfiguredLoadBalancer(helper); } - private static final class EmptySubchannelPicker extends SubchannelPicker { + private static final class NoopLoadBalancer extends LoadBalancer { @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return PickResult.withNoResult(); - } + public void handleResolvedAddressGroups(List s, Attributes a) {} + + @Override + public void handleNameResolutionError(Status error) {} + + @Override + public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {} + + @Override + public void shutdown() {} } + @VisibleForTesting static final class AutoConfiguredLoadBalancer extends LoadBalancer { private final Helper helper; @@ -75,9 +83,22 @@ final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory { public void handleResolvedAddressGroups( List servers, Attributes attributes) { Map configMap = attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG); - Factory newlbf = decideLoadBalancerFactory(servers, configMap); + Factory newlbf; + try { + newlbf = decideLoadBalancerFactory(servers, configMap); + } catch (RuntimeException e) { + Status s = Status.INTERNAL + .withDescription("Failed to pick a load balancer from service config") + .withCause(e); + helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new FailingPicker(s)); + delegate.shutdown(); + delegateFactory = null; + delegate = new NoopLoadBalancer(); + return; + } + if (newlbf != null && newlbf != delegateFactory) { - helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptySubchannelPicker()); + helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptyPicker()); delegate.shutdown(); delegateFactory = newlbf; delegate = delegateFactory.newLoadBalancer(helper); @@ -181,4 +202,25 @@ final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory { return PickFirstBalancerFactory.getInstance(); } } + + private static final class EmptyPicker extends SubchannelPicker { + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withNoResult(); + } + } + + private static final class FailingPicker extends SubchannelPicker { + private final Status failure; + + FailingPicker(Status failure) { + this.failure = failure; + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withError(failure); + } + } } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index cd8001562e..9203ecea46 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -47,8 +47,11 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import io.grpc.Attributes; @@ -85,6 +88,8 @@ import io.grpc.internal.Channelz.ChannelStats; import io.grpc.internal.Channelz.ChannelTrace; import io.grpc.internal.ClientTransportFactory.ClientTransportOptions; import io.grpc.internal.TestUtils.MockClientTransportInfo; +import io.grpc.stub.ClientCalls; +import io.grpc.testing.TestMethodDescriptors; import java.io.IOException; import java.net.SocketAddress; import java.net.URI; @@ -97,12 +102,15 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; import org.junit.After; +import org.junit.Assert; import org.junit.Assume; import org.junit.Before; import org.junit.Rule; @@ -2558,6 +2566,105 @@ public class ManagedChannelImplTest { channel.isTerminated()); } + @Test + public void badServiceConfigIsRecoverable() throws Exception { + final List addresses = + ImmutableList.of(new EquivalentAddressGroup(new SocketAddress() {})); + final class FakeNameResolver extends NameResolver { + Listener listener; + + @Override + public String getServiceAuthority() { + return "also fake"; + } + + @Override + public void start(Listener listener) { + this.listener = listener; + listener.onAddresses(addresses, + Attributes.newBuilder() + .set( + GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, + ImmutableMap.of("loadBalancingPolicy", "kaboom")) + .build()); + } + + @Override + public void shutdown() {} + } + + final class FakeNameResolverFactory extends NameResolver.Factory { + FakeNameResolver resolver; + + @Nullable + @Override + public NameResolver newNameResolver(URI targetUri, Attributes params) { + return (resolver = new FakeNameResolver()); + } + + @Override + public String getDefaultScheme() { + return "fake"; + } + } + + FakeNameResolverFactory factory = new FakeNameResolverFactory(); + final class CustomBuilder extends AbstractManagedChannelImplBuilder { + + CustomBuilder() { + super(TARGET); + this.executorPool = ManagedChannelImplTest.this.executorPool; + this.channelz = ManagedChannelImplTest.this.channelz; + } + + @Override + protected ClientTransportFactory buildTransportFactory() { + return mockTransportFactory; + } + } + + ManagedChannel mychannel = new CustomBuilder() + .nameResolverFactory(factory) + .loadBalancerFactory(new AutoConfiguredLoadBalancerFactory()).build(); + + ClientCall call1 = + mychannel.newCall(TestMethodDescriptors.voidMethod(), CallOptions.DEFAULT); + ListenableFuture future1 = ClientCalls.futureUnaryCall(call1, null); + executor.runDueTasks(); + try { + future1.get(); + Assert.fail(); + } catch (ExecutionException e) { + assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("kaboom"); + } + + // ok the service config is bad, let's fix it. + + factory.resolver.listener.onAddresses(addresses, + Attributes.newBuilder() + .set( + GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, + ImmutableMap.of("loadBalancingPolicy", "round_robin")) + .build()); + + ClientCall call2 = mychannel.newCall( + TestMethodDescriptors.voidMethod(), + CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS)); + ListenableFuture future2 = ClientCalls.futureUnaryCall(call2, null); + + timer.forwardTime(1234, TimeUnit.SECONDS); + + executor.runDueTasks(); + try { + future2.get(); + Assert.fail(); + } catch (ExecutionException e) { + assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("deadline"); + } + + mychannel.shutdownNow(); + } + private static final class ChannelBuilder extends AbstractManagedChannelImplBuilder {