core: make service config errors recoverable

This commit is contained in:
Carl Mastrangelo 2018-07-20 17:47:03 -07:00 committed by GitHub
parent 8be92a1e0a
commit 64d272ae7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 155 additions and 6 deletions

View File

@ -50,14 +50,22 @@ final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory {
return new AutoConfiguredLoadBalancer(helper);
}
private static final class EmptySubchannelPicker extends SubchannelPicker {
private static final class NoopLoadBalancer extends LoadBalancer {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withNoResult();
}
public void handleResolvedAddressGroups(List<EquivalentAddressGroup> s, Attributes a) {}
@Override
public void handleNameResolutionError(Status error) {}
@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {}
@Override
public void shutdown() {}
}
@VisibleForTesting
static final class AutoConfiguredLoadBalancer extends LoadBalancer {
private final Helper helper;
@ -75,9 +83,22 @@ final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory {
public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> servers, Attributes attributes) {
Map<String, Object> configMap = attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
Factory newlbf = decideLoadBalancerFactory(servers, configMap);
Factory newlbf;
try {
newlbf = decideLoadBalancerFactory(servers, configMap);
} catch (RuntimeException e) {
Status s = Status.INTERNAL
.withDescription("Failed to pick a load balancer from service config")
.withCause(e);
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new FailingPicker(s));
delegate.shutdown();
delegateFactory = null;
delegate = new NoopLoadBalancer();
return;
}
if (newlbf != null && newlbf != delegateFactory) {
helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptySubchannelPicker());
helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptyPicker());
delegate.shutdown();
delegateFactory = newlbf;
delegate = delegateFactory.newLoadBalancer(helper);
@ -181,4 +202,25 @@ final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory {
return PickFirstBalancerFactory.getInstance();
}
}
private static final class EmptyPicker extends SubchannelPicker {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withNoResult();
}
}
private static final class FailingPicker extends SubchannelPicker {
private final Status failure;
FailingPicker(Status failure) {
this.failure = failure;
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(failure);
}
}
}

View File

@ -47,8 +47,11 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
@ -85,6 +88,8 @@ import io.grpc.internal.Channelz.ChannelStats;
import io.grpc.internal.Channelz.ChannelTrace;
import io.grpc.internal.ClientTransportFactory.ClientTransportOptions;
import io.grpc.internal.TestUtils.MockClientTransportInfo;
import io.grpc.stub.ClientCalls;
import io.grpc.testing.TestMethodDescriptors;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.URI;
@ -97,12 +102,15 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
@ -2558,6 +2566,105 @@ public class ManagedChannelImplTest {
channel.isTerminated());
}
@Test
public void badServiceConfigIsRecoverable() throws Exception {
final List<EquivalentAddressGroup> addresses =
ImmutableList.of(new EquivalentAddressGroup(new SocketAddress() {}));
final class FakeNameResolver extends NameResolver {
Listener listener;
@Override
public String getServiceAuthority() {
return "also fake";
}
@Override
public void start(Listener listener) {
this.listener = listener;
listener.onAddresses(addresses,
Attributes.newBuilder()
.set(
GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG,
ImmutableMap.<String, Object>of("loadBalancingPolicy", "kaboom"))
.build());
}
@Override
public void shutdown() {}
}
final class FakeNameResolverFactory extends NameResolver.Factory {
FakeNameResolver resolver;
@Nullable
@Override
public NameResolver newNameResolver(URI targetUri, Attributes params) {
return (resolver = new FakeNameResolver());
}
@Override
public String getDefaultScheme() {
return "fake";
}
}
FakeNameResolverFactory factory = new FakeNameResolverFactory();
final class CustomBuilder extends AbstractManagedChannelImplBuilder<CustomBuilder> {
CustomBuilder() {
super(TARGET);
this.executorPool = ManagedChannelImplTest.this.executorPool;
this.channelz = ManagedChannelImplTest.this.channelz;
}
@Override
protected ClientTransportFactory buildTransportFactory() {
return mockTransportFactory;
}
}
ManagedChannel mychannel = new CustomBuilder()
.nameResolverFactory(factory)
.loadBalancerFactory(new AutoConfiguredLoadBalancerFactory()).build();
ClientCall<Void, Void> call1 =
mychannel.newCall(TestMethodDescriptors.voidMethod(), CallOptions.DEFAULT);
ListenableFuture<Void> future1 = ClientCalls.futureUnaryCall(call1, null);
executor.runDueTasks();
try {
future1.get();
Assert.fail();
} catch (ExecutionException e) {
assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("kaboom");
}
// ok the service config is bad, let's fix it.
factory.resolver.listener.onAddresses(addresses,
Attributes.newBuilder()
.set(
GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG,
ImmutableMap.<String, Object>of("loadBalancingPolicy", "round_robin"))
.build());
ClientCall<Void, Void> call2 = mychannel.newCall(
TestMethodDescriptors.voidMethod(),
CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS));
ListenableFuture<Void> future2 = ClientCalls.futureUnaryCall(call2, null);
timer.forwardTime(1234, TimeUnit.SECONDS);
executor.runDueTasks();
try {
future2.get();
Assert.fail();
} catch (ExecutionException e) {
assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("deadline");
}
mychannel.shutdownNow();
}
private static final class ChannelBuilder
extends AbstractManagedChannelImplBuilder<ChannelBuilder> {