core: use exponential backoff for name resolution (#4105)

This commit is contained in:
Eric Gribkoff 2018-03-05 21:46:02 -08:00 committed by GitHub
parent d45e1abd8d
commit ae1fb9467c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 393 additions and 291 deletions

View File

@ -30,6 +30,10 @@ import javax.annotation.concurrent.ThreadSafe;
* <p>The addresses and attributes of a target may be changed over time, thus the caller registers a
* {@link Listener} to receive continuous updates.
*
* <p>A {@code NameResolver} does not need to automatically re-resolve on failure. Instead, the
* {@link Listener} is responsible for eventually (after an appropriate backoff period) invoking
* {@link #refresh()}.
*
* @since 1.0.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1770")
@ -136,7 +140,8 @@ public abstract class NameResolver {
void onAddresses(List<EquivalentAddressGroup> servers, Attributes attributes);
/**
* Handles an error from the resolver.
* Handles an error from the resolver. The listener is responsible for eventually invoking
* {@link #refresh()} to re-attempt resolution.
*
* @param error a non-OK status
* @since 1.0.0

View File

@ -36,9 +36,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
@ -79,29 +76,22 @@ final class DnsNameResolver extends NameResolver {
private final String authority;
private final String host;
private final int port;
private final Resource<ScheduledExecutorService> timerServiceResource;
private final Resource<ExecutorService> executorResource;
private final ProxyDetector proxyDetector;
@GuardedBy("this")
private boolean shutdown;
@GuardedBy("this")
private ScheduledExecutorService timerService;
@GuardedBy("this")
private ExecutorService executor;
@GuardedBy("this")
private ScheduledFuture<?> resolutionTask;
@GuardedBy("this")
private boolean resolving;
@GuardedBy("this")
private Listener listener;
DnsNameResolver(@Nullable String nsAuthority, String name, Attributes params,
Resource<ScheduledExecutorService> timerServiceResource,
Resource<ExecutorService> executorResource,
ProxyDetector proxyDetector) {
// TODO: if a DNS server is provided as nsAuthority, use it.
// https://www.captechconsulting.com/blogs/accessing-the-dusty-corners-of-dns-with-java
this.timerServiceResource = timerServiceResource;
this.executorResource = executorResource;
// Must prepend a "//" to the name when constructing a URI, otherwise it will be treated as an
// opaque URI, thus the authority and host of the resulted URI would be null.
@ -131,7 +121,6 @@ final class DnsNameResolver extends NameResolver {
@Override
public final synchronized void start(Listener listener) {
Preconditions.checkState(this.listener == null, "already started");
timerService = SharedResourceHolder.get(timerServiceResource);
executor = SharedResourceHolder.get(executorResource);
this.listener = Preconditions.checkNotNull(listener, "listener");
resolve();
@ -148,11 +137,6 @@ final class DnsNameResolver extends NameResolver {
public void run() {
Listener savedListener;
synchronized (DnsNameResolver.this) {
// If this task is started by refresh(), there might already be a scheduled task.
if (resolutionTask != null) {
resolutionTask.cancel(false);
resolutionTask = null;
}
if (shutdown) {
return;
}
@ -171,16 +155,6 @@ final class DnsNameResolver extends NameResolver {
try {
resolvedInetAddrs = delegateResolver.resolve(host);
} catch (Exception e) {
synchronized (DnsNameResolver.this) {
if (shutdown) {
return;
}
// Because timerService is the single-threaded GrpcUtil.TIMER_SERVICE in production,
// we need to delegate the blocking work to the executor
resolutionTask =
timerService.schedule(new LogExceptionRunnable(resolutionRunnableOnExecutor),
1, TimeUnit.MINUTES);
}
savedListener.onError(
Status.UNAVAILABLE.withDescription("Unable to resolve host " + host).withCause(e));
return;
@ -207,17 +181,6 @@ final class DnsNameResolver extends NameResolver {
}
};
private final Runnable resolutionRunnableOnExecutor = new Runnable() {
@Override
public void run() {
synchronized (DnsNameResolver.this) {
if (!shutdown) {
executor.execute(resolutionRunnable);
}
}
}
};
@GuardedBy("this")
private void resolve() {
if (resolving || shutdown) {
@ -232,12 +195,6 @@ final class DnsNameResolver extends NameResolver {
return;
}
shutdown = true;
if (resolutionTask != null) {
resolutionTask.cancel(false);
}
if (timerService != null) {
timerService = SharedResourceHolder.release(timerServiceResource, timerService);
}
if (executor != null) {
executor = SharedResourceHolder.release(executorResource, executor);
}

View File

@ -47,8 +47,12 @@ public final class DnsNameResolverProvider extends NameResolverProvider {
Preconditions.checkArgument(targetPath.startsWith("/"),
"the path component (%s) of the target (%s) must start with '/'", targetPath, targetUri);
String name = targetPath.substring(1);
return new DnsNameResolver(targetUri.getAuthority(), name, params, GrpcUtil.TIMER_SERVICE,
GrpcUtil.SHARED_CHANNEL_EXECUTOR, GrpcUtil.getProxyDetector());
return new DnsNameResolver(
targetUri.getAuthority(),
name,
params,
GrpcUtil.SHARED_CHANNEL_EXECUTOR,
GrpcUtil.getProxyDetector());
} else {
return null;
}

View File

@ -344,6 +344,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
checkState(lbHelper != null, "lbHelper is null");
}
if (nameResolver != null) {
cancelNameResolverBackoff();
nameResolver.shutdown();
nameResolver = null;
nameResolverStarted = false;
@ -429,6 +430,46 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
idleTimeoutMillis, TimeUnit.MILLISECONDS);
}
// Run from channelExecutor
@VisibleForTesting
class NameResolverRefresh implements Runnable {
// Only mutated from channelExecutor
boolean cancelled;
@Override
public void run() {
if (cancelled) {
// Race detected: this task was scheduled on channelExecutor before
// cancelNameResolverBackoff() could cancel the timer.
return;
}
nameResolverRefreshFuture = null;
nameResolverRefresh = null;
if (nameResolver != null) {
nameResolver.refresh();
}
}
}
// Must be used from channelExecutor
@Nullable private ScheduledFuture<?> nameResolverRefreshFuture;
// Must be used from channelExecutor
@Nullable private NameResolverRefresh nameResolverRefresh;
// The policy to control backoff between name resolution attempts. Non-null when an attempt is
// scheduled. Must be used from channelExecutor
@Nullable private BackoffPolicy nameResolverBackoffPolicy;
// Must be run from channelExecutor
private void cancelNameResolverBackoff() {
if (nameResolverRefreshFuture != null) {
nameResolverRefreshFuture.cancel(false);
nameResolverRefresh.cancelled = true;
nameResolverRefreshFuture = null;
nameResolverRefresh = null;
nameResolverBackoffPolicy = null;
}
}
private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
@Override
public ClientTransport get(PickSubchannelArgs args) {
@ -799,14 +840,17 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
@Override
public void resetConnectBackoff() {
channelExecutor.executeLater(
channelExecutor
.executeLater(
new Runnable() {
@Override
public void run() {
if (shutdown.get()) {
return;
}
if (nameResolverStarted) {
if (nameResolverRefreshFuture != null) {
checkState(nameResolverStarted, "name resolver must be started");
cancelNameResolverBackoff();
nameResolver.refresh();
}
for (InternalSubchannel subchannel : subchannels) {
@ -816,7 +860,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
oobChannel.resetConnectBackoff();
}
}
}).drain();
})
.drain();
}
@Override
@ -1132,6 +1177,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
return;
}
nameResolverBackoffPolicy = null;
try {
if (retryEnabled) {
retryPolicies = getRetryPolicies(config);
@ -1156,7 +1203,9 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
checkArgument(!error.isOk(), "the error status must not be OK");
logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
new Object[] {getLogId(), error});
channelExecutor.executeLater(new Runnable() {
channelExecutor
.executeLater(
new Runnable() {
@Override
public void run() {
// Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
@ -1164,8 +1213,31 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
return;
}
balancer.handleNameResolutionError(error);
if (nameResolverRefreshFuture != null) {
// The name resolver may invoke onError multiple times, but we only want to
// schedule one backoff attempt
// TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we
// want to reset the backoff interval upon repeated onError() calls
return;
}
}).drain();
if (nameResolverBackoffPolicy == null) {
nameResolverBackoffPolicy = backoffPolicyProvider.get();
}
long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos();
if (logger.isLoggable(Level.FINE)) {
logger.log(
Level.FINE,
"[{0}] Scheduling DNS resolution backoff for {1} ns",
new Object[] {logId, delayNanos});
}
nameResolverRefresh = new NameResolverRefresh();
nameResolverRefreshFuture =
transportFactory
.getScheduledExecutorService()
.schedule(nameResolverRefresh, delayNanos, TimeUnit.NANOSECONDS);
}
})
.drain();
}
}

View File

@ -19,7 +19,6 @@ package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@ -27,7 +26,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.base.MoreObjects;
@ -35,7 +33,6 @@ import com.google.common.collect.Iterables;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.internal.DnsNameResolver.DelegateResolver;
import io.grpc.internal.DnsNameResolver.ResolutionResults;
import io.grpc.internal.SharedResourceHolder.Resource;
@ -51,8 +48,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
@ -80,17 +75,6 @@ public class DnsNameResolverTest {
private final FakeClock fakeClock = new FakeClock();
private final FakeClock fakeExecutor = new FakeClock();
private MockResolver mockResolver = new MockResolver();
private final Resource<ScheduledExecutorService> fakeTimerServiceResource =
new Resource<ScheduledExecutorService>() {
@Override
public ScheduledExecutorService create() {
return fakeClock.getScheduledExecutorService();
}
@Override
public void close(ScheduledExecutorService instance) {
}
};
private final Resource<ExecutorService> fakeExecutorResource =
new Resource<ExecutorService>() {
@ -108,8 +92,6 @@ public class DnsNameResolverTest {
private NameResolver.Listener mockListener;
@Captor
private ArgumentCaptor<List<EquivalentAddressGroup>> resultCaptor;
@Captor
private ArgumentCaptor<Status> statusCaptor;
private DnsNameResolver newResolver(String name, int port) {
return newResolver(name, port, mockResolver, GrpcUtil.NOOP_PROXY_DETECTOR);
@ -124,7 +106,6 @@ public class DnsNameResolverTest {
null,
name,
Attributes.newBuilder().set(NameResolver.Factory.PARAMS_DEFAULT_PORT, port).build(),
fakeTimerServiceResource,
fakeExecutorResource,
proxyDetector);
dnsResolver.setDelegateResolver(delegateResolver);
@ -190,105 +171,6 @@ public class DnsNameResolverTest {
resolver.shutdown();
}
@Test
public void retry() throws Exception {
String name = "foo.googleapis.com";
UnknownHostException error = new UnknownHostException(name);
List<InetAddress> answer = createAddressList(2);
DnsNameResolver resolver = newResolver(name, 81);
mockResolver.addAnswer(error).addAnswer(error).addAnswer(answer);
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onError(statusCaptor.capture());
assertEquals(name, mockResolver.invocations.poll());
Status status = statusCaptor.getValue();
assertEquals(Status.Code.UNAVAILABLE, status.getCode());
assertSame(error, status.getCause());
// First retry scheduled
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardNanos(TimeUnit.MINUTES.toNanos(1) - 1);
assertEquals(1, fakeClock.numPendingTasks());
// First retry
fakeClock.forwardNanos(1);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener, times(2)).onError(statusCaptor.capture());
assertEquals(name, mockResolver.invocations.poll());
status = statusCaptor.getValue();
assertEquals(Status.Code.UNAVAILABLE, status.getCode());
assertSame(error, status.getCause());
// Second retry scheduled
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardNanos(TimeUnit.MINUTES.toNanos(1) - 1);
assertEquals(1, fakeClock.numPendingTasks());
// Second retry
fakeClock.forwardNanos(1);
assertEquals(0, fakeClock.numPendingTasks());
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class));
assertEquals(name, mockResolver.invocations.poll());
assertAnswerMatches(answer, 81, resultCaptor.getValue());
verifyNoMoreInteractions(mockListener);
}
@Test
public void refreshCancelsScheduledRetry() throws Exception {
String name = "foo.googleapis.com";
UnknownHostException error = new UnknownHostException(name);
List<InetAddress> answer = createAddressList(2);
DnsNameResolver resolver = newResolver(name, 81);
mockResolver.addAnswer(error).addAnswer(answer);
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onError(statusCaptor.capture());
assertEquals(name, mockResolver.invocations.poll());
Status status = statusCaptor.getValue();
assertEquals(Status.Code.UNAVAILABLE, status.getCode());
assertSame(error, status.getCause());
// First retry scheduled
assertEquals(1, fakeClock.numPendingTasks());
resolver.refresh();
assertEquals(1, fakeExecutor.runDueTasks());
// Refresh cancelled the retry
assertEquals(0, fakeClock.numPendingTasks());
verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class));
assertEquals(name, mockResolver.invocations.poll());
assertAnswerMatches(answer, 81, resultCaptor.getValue());
verifyNoMoreInteractions(mockListener);
}
@Test
public void shutdownCancelsScheduledRetry() throws Exception {
String name = "foo.googleapis.com";
UnknownHostException error = new UnknownHostException(name);
DnsNameResolver resolver = newResolver(name, 81);
mockResolver.addAnswer(error);
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onError(statusCaptor.capture());
assertEquals(name, mockResolver.invocations.poll());
Status status = statusCaptor.getValue();
assertEquals(Status.Code.UNAVAILABLE, status.getCode());
assertSame(error, status.getCause());
// Retry scheduled
assertEquals(1, fakeClock.numPendingTasks());
// Shutdown cancelled the retry
resolver.shutdown();
assertEquals(0, fakeClock.numPendingTasks());
verifyNoMoreInteractions(mockListener);
}
@Test
public void jdkResolverWorks() throws Exception {
DnsNameResolver.DelegateResolver resolver = new DnsNameResolver.JdkResolver();

View File

@ -49,6 +49,7 @@ import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.grpc.Attributes;
import io.grpc.CallCredentials;
import io.grpc.CallCredentials.MetadataApplier;
@ -129,7 +130,7 @@ public class ManagedChannelImplTest {
.build();
private static final Attributes.Key<String> SUBCHANNEL_ATTR_KEY =
Attributes.Key.of("subchannel-attr-key");
private static final long RECONNECT_BACKOFF_INTERVAL_NANOS = 1;
private static final long RECONNECT_BACKOFF_INTERVAL_NANOS = 10;
private final String serviceName = "fake.example.com";
private final String authority = serviceName;
private final String userAgent = "userAgent";
@ -141,6 +142,13 @@ public class ManagedChannelImplTest {
private final FakeClock timer = new FakeClock();
private final FakeClock executor = new FakeClock();
private final FakeClock oobExecutor = new FakeClock();
private static final FakeClock.TaskFilter NAME_RESOLVER_REFRESH_TASK_FILTER =
new FakeClock.TaskFilter() {
@Override
public boolean shouldAccept(Runnable command) {
return command instanceof ManagedChannelImpl.NameResolverRefresh;
}
};
private final CallTracer.Factory channelStatsFactory = new CallTracer.Factory() {
@Override
public CallTracer create() {
@ -238,11 +246,19 @@ public class ManagedChannelImplTest {
channelStatsFactory);
if (requestConnection) {
int numExpectedTasks = 0;
// Force-exit the initial idle-mode
channel.exitIdleMode();
assertEquals(
idleTimeoutMillis == ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE ? 0 : 1,
timer.numPendingTasks());
if (idleTimeoutMillis != ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE) {
numExpectedTasks += 1;
}
if (getNameResolverRefresh() != null) {
numExpectedTasks += 1;
}
assertEquals(numExpectedTasks, timer.numPendingTasks());
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
@ -279,7 +295,11 @@ public class ManagedChannelImplTest {
@Test
@SuppressWarnings("unchecked")
public void idleModeDisabled() {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build(),
NO_INTERCEPTOR);
// In this test suite, the channel is always created with idle mode disabled.
// No task is scheduled to enter idle mode
@ -289,7 +309,7 @@ public class ManagedChannelImplTest {
@Test
public void immediateDeadlineExceeded() {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
ClientCall<String, Integer> call =
channel.newCall(method, CallOptions.DEFAULT.withDeadlineAfter(0, TimeUnit.NANOSECONDS));
call.start(mockCallListener, new Metadata());
@ -302,7 +322,11 @@ public class ManagedChannelImplTest {
@Test
public void shutdownWithNoTransportsEverCreated() {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build(),
NO_INTERCEPTOR);
verify(executorPool).getObject();
verify(executorPool, never()).returnObject(anyObject());
verifyNoMoreInteractions(mockTransportFactory);
@ -314,7 +338,7 @@ public class ManagedChannelImplTest {
@Test
public void channelzMembership() throws Exception {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
assertTrue(channelz.containsRootChannel(channel.getLogId()));
assertTrue(channelz.containsChannel(channel.getLogId()));
channel.shutdownNow();
@ -325,7 +349,7 @@ public class ManagedChannelImplTest {
@Test
public void channelzMembership_subchannel() throws Exception {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
assertTrue(channelz.containsRootChannel(channel.getLogId()));
assertTrue(channelz.containsChannel(channel.getLogId()));
@ -361,7 +385,7 @@ public class ManagedChannelImplTest {
@Test
public void channelzMembership_oob() throws Exception {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
OobChannel oob = (OobChannel) helper.createOobChannel(addressGroup, authority);
// oob channels are not root channels
assertFalse(channelz.containsRootChannel(oob.getLogId()));
@ -412,7 +436,8 @@ public class ManagedChannelImplTest {
}
private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAfterShutdown) {
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).build();
createChannel(nameResolverFactory, NO_INTERCEPTOR);
verify(executorPool).getObject();
ClientStream mockStream = mock(ClientStream.class);
@ -542,7 +567,10 @@ public class ManagedChannelImplTest {
@Test
public void noMoreCallbackAfterLoadBalancerShutdown() {
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
Status resolutionError = Status.UNAVAILABLE.withDescription("Resolution failed");
createChannel(nameResolverFactory, NO_INTERCEPTOR);
@ -598,7 +626,8 @@ public class ManagedChannelImplTest {
return next.newCall(method, callOptions);
}
};
createChannel(new FakeNameResolverFactory(true), Arrays.asList(interceptor));
createChannel(
new FakeNameResolverFactory.Builder(expectedUri).build(), Arrays.asList(interceptor));
assertNotNull(channel.newCall(method, CallOptions.DEFAULT));
assertEquals(1, atomic.get());
}
@ -608,7 +637,7 @@ public class ManagedChannelImplTest {
Metadata headers = new Metadata();
ClientStream mockStream = mock(ClientStream.class);
FakeClock callExecutor = new FakeClock();
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
// Start a call with a call executor
CallOptions options =
@ -661,10 +690,81 @@ public class ManagedChannelImplTest {
@Test
public void nameResolutionFailed() {
Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.setError(error)
.build();
// Name resolution is started as soon as channel is created.
createChannel(new FailingNameResolverFactory(error), NO_INTERCEPTOR);
createChannel(nameResolverFactory, NO_INTERCEPTOR);
FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
verify(mockLoadBalancer).handleNameResolutionError(same(error));
assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER));
timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1);
assertEquals(0, resolver.refreshCalled);
timer.forwardNanos(1);
assertEquals(1, resolver.refreshCalled);
verify(mockLoadBalancer, times(2)).handleNameResolutionError(same(error));
// Verify an additional name resolution failure does not schedule another timer
resolver.refresh();
verify(mockLoadBalancer, times(3)).handleNameResolutionError(same(error));
assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER));
// Allow the next refresh attempt to succeed
resolver.error = null;
// For the second attempt, the backoff should occur at RECONNECT_BACKOFF_INTERVAL_NANOS * 2
timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS * 2 - 1);
assertEquals(2, resolver.refreshCalled);
timer.forwardNanos(1);
assertEquals(3, resolver.refreshCalled);
assertEquals(0, timer.numPendingTasks());
// Verify that the successful resolution reset the backoff policy
resolver.listener.onError(error);
timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1);
assertEquals(3, resolver.refreshCalled);
timer.forwardNanos(1);
assertEquals(4, resolver.refreshCalled);
assertEquals(0, timer.numPendingTasks());
}
@Test
public void nameResolutionFailed_delayedTransportShutdownCancelsBackoff() {
Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).setError(error).build();
// Name resolution is started as soon as channel is created.
createChannel(nameResolverFactory, NO_INTERCEPTOR);
verify(mockLoadBalancer).handleNameResolutionError(same(error));
FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh();
assertNotNull(nameResolverBackoff);
assertFalse(nameResolverBackoff.isCancelled());
// Add a pending call to the delayed transport
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
Metadata headers = new Metadata();
call.start(mockCallListener, headers);
// The pending call on the delayed transport stops the name resolver backoff from cancelling
channel.shutdown();
assertFalse(nameResolverBackoff.isCancelled());
// Notify that a subchannel is ready, which drains the delayed transport
SubchannelPicker picker = mock(SubchannelPicker.class);
Status status = Status.UNAVAILABLE.withDescription("for test");
when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withDrop(status));
helper.updateBalancingState(READY, picker);
executor.runDueTasks();
verify(mockCallListener).onClose(same(status), any(Metadata.class));
assertTrue(nameResolverBackoff.isCancelled());
}
@Test
@ -672,7 +772,7 @@ public class ManagedChannelImplTest {
String errorDescription = "NameResolver returned an empty list";
// Pass a FakeNameResolverFactory with an empty list
createChannel(new FakeNameResolverFactory(), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
// LoadBalancer received the error
verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
@ -686,7 +786,11 @@ public class ManagedChannelImplTest {
public void loadBalancerThrowsInHandleResolvedAddresses() {
RuntimeException ex = new RuntimeException("simulated");
// Delay the success of name resolution until allResolved() is called
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(false);
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setResolvedAtStart(false)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
createChannel(nameResolverFactory, NO_INTERCEPTOR);
verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
@ -703,7 +807,8 @@ public class ManagedChannelImplTest {
@Test
public void nameResolvedAfterChannelShutdown() {
// Delay the success of name resolution until allResolved() is called.
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(false);
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build();
createChannel(nameResolverFactory, NO_INTERCEPTOR);
channel.shutdown();
@ -736,7 +841,10 @@ public class ManagedChannelImplTest {
InOrder inOrder = inOrder(mockLoadBalancer);
List<SocketAddress> resolvedAddrs = Arrays.asList(badAddress, goodAddress);
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(resolvedAddrs);
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(resolvedAddrs)))
.build();
createChannel(nameResolverFactory, NO_INTERCEPTOR);
// Start the call
@ -817,7 +925,7 @@ public class ManagedChannelImplTest {
}
private void subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail) {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
// This call will be buffered by the channel, thus involve delayed transport
CallOptions callOptions = CallOptions.DEFAULT;
@ -875,7 +983,10 @@ public class ManagedChannelImplTest {
List<SocketAddress> resolvedAddrs = Arrays.asList(addr1, addr2);
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(resolvedAddrs);
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(resolvedAddrs)))
.build();
createChannel(nameResolverFactory, NO_INTERCEPTOR);
// Start a wait-for-ready call
@ -947,7 +1058,7 @@ public class ManagedChannelImplTest {
@Test
public void subchannels() {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
// createSubchannel() always return a new Subchannel
Attributes attrs1 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr1").build();
@ -1007,7 +1118,7 @@ public class ManagedChannelImplTest {
@Test
public void subchannelsWhenChannelShutdownNow() {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
sub1.requestConnection();
@ -1035,7 +1146,7 @@ public class ManagedChannelImplTest {
@Test
public void subchannelsNoConnectionShutdown() {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
@ -1051,7 +1162,7 @@ public class ManagedChannelImplTest {
@Test
public void subchannelsNoConnectionShutdownNow() {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
helper.createSubchannel(addressGroup, Attributes.EMPTY);
helper.createSubchannel(addressGroup, Attributes.EMPTY);
channel.shutdownNow();
@ -1066,7 +1177,7 @@ public class ManagedChannelImplTest {
@Test
public void oobchannels() {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1authority");
ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2authority");
@ -1164,7 +1275,7 @@ public class ManagedChannelImplTest {
@Test
public void oobChannelsWhenChannelShutdownNow() {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority");
ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority");
@ -1193,7 +1304,7 @@ public class ManagedChannelImplTest {
@Test
public void oobChannelsNoConnectionShutdown() {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority");
ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority");
channel.shutdown();
@ -1211,7 +1322,7 @@ public class ManagedChannelImplTest {
@Test
public void oobChannelsNoConnectionShutdownNow() {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
helper.createOobChannel(addressGroup, "oob1Authority");
helper.createOobChannel(addressGroup, "oob2Authority");
channel.shutdownNow();
@ -1235,7 +1346,10 @@ public class ManagedChannelImplTest {
}
private void subtestRefreshNameResolutionWhenConnectionFailed(boolean isOobChannel) {
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
createChannel(nameResolverFactory, NO_INTERCEPTOR);
FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
@ -1283,7 +1397,7 @@ public class ManagedChannelImplTest {
*/
@Test
public void informationPropagatedToNewStreamAndCallCredentials() {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
CallOptions callOptions = CallOptions.DEFAULT.withCallCredentials(creds);
final Context.Key<String> testKey = Context.key("testing");
Context ctx = Context.current().withValue(testKey, "testValue");
@ -1385,7 +1499,7 @@ public class ManagedChannelImplTest {
ClientStream mockStream = mock(ClientStream.class);
ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class);
ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class);
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
subchannel.requestConnection();
MockClientTransportInfo transportInfo = transports.poll();
@ -1418,7 +1532,7 @@ public class ManagedChannelImplTest {
ClientStream mockStream = mock(ClientStream.class);
ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class);
ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class);
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1);
ClientCall<String, Integer> call = channel.newCall(method, callOptions);
@ -1450,7 +1564,9 @@ public class ManagedChannelImplTest {
@Test
public void getState_loadBalancerSupportsChannelState() {
createChannel(new FakeNameResolverFactory(false), NO_INTERCEPTOR);
createChannel(
new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build(),
NO_INTERCEPTOR);
assertEquals(IDLE, channel.getState(false));
helper.updateBalancingState(TRANSIENT_FAILURE, mockPicker);
@ -1460,7 +1576,9 @@ public class ManagedChannelImplTest {
@Test
public void getState_withRequestConnect() {
createChannel(
new FakeNameResolverFactory(false), NO_INTERCEPTOR, false /* requestConnection */,
new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build(),
NO_INTERCEPTOR,
false /* requestConnection */,
ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE);
assertEquals(IDLE, channel.getState(false));
@ -1488,7 +1606,9 @@ public class ManagedChannelImplTest {
}
};
createChannel(new FakeNameResolverFactory(false), NO_INTERCEPTOR);
createChannel(
new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build(),
NO_INTERCEPTOR);
assertEquals(IDLE, channel.getState(false));
channel.notifyWhenStateChanged(IDLE, onStateChanged);
@ -1519,7 +1639,9 @@ public class ManagedChannelImplTest {
}
};
createChannel(new FakeNameResolverFactory(false), NO_INTERCEPTOR);
createChannel(
new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build(),
NO_INTERCEPTOR);
assertEquals(IDLE, channel.getState(false));
channel.notifyWhenStateChanged(IDLE, onStateChanged);
executor.runDueTasks();
@ -1543,7 +1665,9 @@ public class ManagedChannelImplTest {
public void stateIsIdleOnIdleTimeout() {
long idleTimeoutMillis = 2000L;
createChannel(
new FakeNameResolverFactory(true), NO_INTERCEPTOR, true /* request connection*/,
new FakeNameResolverFactory.Builder(expectedUri).build(),
NO_INTERCEPTOR,
true /* request connection*/,
idleTimeoutMillis);
assertEquals(IDLE, channel.getState(false));
@ -1577,7 +1701,8 @@ public class ManagedChannelImplTest {
private void subtestPanic(ConnectivityState initialState) {
assertNotEquals("We don't test panic mode if it's already SHUTDOWN", SHUTDOWN, initialState);
long idleTimeoutMillis = 2000L;
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).build();
createChannel(nameResolverFactory, NO_INTERCEPTOR, true, idleTimeoutMillis);
verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
@ -1644,7 +1769,8 @@ public class ManagedChannelImplTest {
@Test
public void panic_bufferedCallsWillFail() {
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).build();
createChannel(nameResolverFactory, NO_INTERCEPTOR);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
@ -1705,7 +1831,9 @@ public class ManagedChannelImplTest {
public void idleTimeoutAndReconnect() {
long idleTimeoutMillis = 2000L;
createChannel(
new FakeNameResolverFactory(true), NO_INTERCEPTOR, true /* request connection*/,
new FakeNameResolverFactory.Builder(expectedUri).build(),
NO_INTERCEPTOR,
true /* request connection*/,
idleTimeoutMillis);
timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
@ -1727,7 +1855,7 @@ public class ManagedChannelImplTest {
@Test
public void prepareToLoseNetworkEntersIdle() {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
helper.updateBalancingState(READY, mockPicker);
assertEquals(READY, channel.getState(false));
@ -1740,7 +1868,9 @@ public class ManagedChannelImplTest {
public void prepareToLoseNetworkAfterIdleTimerIsNoOp() {
long idleTimeoutMillis = 2000L;
createChannel(
new FakeNameResolverFactory(true), NO_INTERCEPTOR, true /* request connection*/,
new FakeNameResolverFactory.Builder(expectedUri).build(),
NO_INTERCEPTOR,
true /* request connection*/,
idleTimeoutMillis);
timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
assertEquals(IDLE, channel.getState(false));
@ -1753,7 +1883,7 @@ public class ManagedChannelImplTest {
@Test
public void updateBalancingStateDoesUpdatePicker() {
ClientStream mockStream = mock(ClientStream.class);
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
@ -1791,7 +1921,9 @@ public class ManagedChannelImplTest {
@Test
public void updateBalancingStateWithShutdownShouldBeIgnored() {
createChannel(new FakeNameResolverFactory(false), NO_INTERCEPTOR);
createChannel(
new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build(),
NO_INTERCEPTOR);
assertEquals(IDLE, channel.getState(false));
Runnable onStateChanged = mock(Runnable.class);
@ -1805,20 +1937,53 @@ public class ManagedChannelImplTest {
}
@Test
public void resetConnectBackoff_refreshesNameResolver() {
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
public void resetConnectBackoff() {
// Start with a name resolution failure to trigger backoff attempts
Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).setError(error).build();
// Name resolution is started as soon as channel is created.
createChannel(nameResolverFactory, NO_INTERCEPTOR);
FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
verify(mockLoadBalancer).handleNameResolutionError(same(error));
FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh();
assertNotNull("There should be a name resolver backoff task", nameResolverBackoff);
assertEquals(0, resolver.refreshCalled);
// Verify resetConnectBackoff() calls refresh and cancels the scheduled backoff
channel.resetConnectBackoff();
assertEquals(1, resolver.refreshCalled);
assertTrue(nameResolverBackoff.isCancelled());
// Simulate a race between cancel and the task scheduler. Should be a no-op.
nameResolverBackoff.command.run();
assertEquals(1, resolver.refreshCalled);
// Verify that the reconnect policy was recreated and the backoff multiplier reset to 1
timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS);
assertEquals(2, resolver.refreshCalled);
}
@Test
public void resetConnectBackoff_noOpWithoutPendingResolverBackoff() {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
createChannel(nameResolverFactory, NO_INTERCEPTOR);
FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0);
assertEquals(0, nameResolver.refreshCalled);
channel.resetConnectBackoff();
assertEquals(1, nameResolver.refreshCalled);
assertEquals(0, nameResolver.refreshCalled);
}
@Test
public void resetConnectBackoff_noOpWhenChannelShutdown() {
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).build();
createChannel(nameResolverFactory, NO_INTERCEPTOR);
channel.shutdown();
@ -1831,7 +1996,8 @@ public class ManagedChannelImplTest {
@Test
public void resetConnectBackoff_noOpWhenNameResolverNotStarted() {
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).build();
createChannel(nameResolverFactory, NO_INTERCEPTOR, false /* requestConnection */,
ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE);
@ -1843,7 +2009,7 @@ public class ManagedChannelImplTest {
@Test
public void channelsAndSubchannels_instrumented_name() throws Exception {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
assertEquals(target, getStats(channel).target);
Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
@ -1852,7 +2018,7 @@ public class ManagedChannelImplTest {
@Test
public void channelsAndSubchannels_instrumented_state() throws Exception {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
@ -1886,7 +2052,7 @@ public class ManagedChannelImplTest {
@Test
public void channelStat_callStarted() throws Exception {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
assertEquals(0, getStats(channel).callsStarted);
call.start(mockCallListener, new Metadata());
@ -1905,7 +2071,7 @@ public class ManagedChannelImplTest {
}
private void channelsAndSubchannels_instrumented0(boolean success) throws Exception {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
@ -1976,7 +2142,7 @@ public class ManagedChannelImplTest {
private void channelsAndSubchannels_oob_instrumented0(boolean success) throws Exception {
// set up
ClientStream mockStream = mock(ClientStream.class);
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority");
AbstractSubchannel oobSubchannel = (AbstractSubchannel) oobChannel.getSubchannel();
@ -2037,7 +2203,7 @@ public class ManagedChannelImplTest {
@Test
public void channelsAndSubchannels_oob_instrumented_name() throws Exception {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
String authority = "oobauthority";
OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, authority);
@ -2046,7 +2212,7 @@ public class ManagedChannelImplTest {
@Test
public void channelsAndSubchannels_oob_instrumented_state() throws Exception {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
createChannel(new FakeNameResolverFactory.Builder(expectedUri).build(), NO_INTERCEPTOR);
OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority");
assertEquals(IDLE, getStats(oobChannel).state);
@ -2146,7 +2312,7 @@ public class ManagedChannelImplTest {
Metadata headers = new Metadata();
ClientStream mockStream = mock(ClientStream.class);
createChannel(
new FakeNameResolverFactory(true),
new FakeNameResolverFactory.Builder(expectedUri).build(),
ImmutableList.<ClientInterceptor>of(userInterceptor));
CallOptions options =
CallOptions.DEFAULT.withExecutor(executor.getScheduledExecutorService());
@ -2229,7 +2395,7 @@ public class ManagedChannelImplTest {
}
};
createChannel(
new FakeNameResolverFactory(true),
new FakeNameResolverFactory.Builder(expectedUri).build(),
Collections.<ClientInterceptor>singletonList(userInterceptor));
ClientCall<String, Integer> call =
channel.newCall(method, CallOptions.DEFAULT.withDeadlineAfter(0, TimeUnit.NANOSECONDS));
@ -2254,32 +2420,32 @@ public class ManagedChannelImplTest {
@Override
public BackoffPolicy get() {
return new BackoffPolicy() {
private int multiplier = 1;
@Override
public long nextBackoffNanos() {
return RECONNECT_BACKOFF_INTERVAL_NANOS;
return RECONNECT_BACKOFF_INTERVAL_NANOS * multiplier++;
}
};
}
}
private class FakeNameResolverFactory extends NameResolver.Factory {
final List<EquivalentAddressGroup> servers;
final boolean resolvedAtStart;
final ArrayList<FakeNameResolver> resolvers = new ArrayList<FakeNameResolver>();
private static class FakeNameResolverFactory extends NameResolver.Factory {
private final URI expectedUri;
private final List<EquivalentAddressGroup> servers;
private final boolean resolvedAtStart;
private final Status error;
private final ArrayList<FakeNameResolver> resolvers = new ArrayList<FakeNameResolver>();
FakeNameResolverFactory(boolean resolvedAtStart) {
private FakeNameResolverFactory(
URI expectedUri,
List<EquivalentAddressGroup> servers,
boolean resolvedAtStart,
Status error) {
this.expectedUri = expectedUri;
this.servers = servers;
this.resolvedAtStart = resolvedAtStart;
servers = Collections.singletonList(new EquivalentAddressGroup(socketAddress));
}
FakeNameResolverFactory(List<SocketAddress> servers) {
resolvedAtStart = true;
this.servers = Collections.singletonList(new EquivalentAddressGroup(servers));
}
public FakeNameResolverFactory() {
resolvedAtStart = true;
this.servers = ImmutableList.of();
this.error = error;
}
@Override
@ -2288,7 +2454,7 @@ public class ManagedChannelImplTest {
return null;
}
assertSame(NAME_RESOLVER_PARAMS, params);
FakeNameResolver resolver = new FakeNameResolver();
FakeNameResolver resolver = new FakeNameResolver(error);
resolvers.add(resolver);
return resolver;
}
@ -2308,6 +2474,11 @@ public class ManagedChannelImplTest {
Listener listener;
boolean shutdown;
int refreshCalled;
Status error;
FakeNameResolver(Status error) {
this.error = error;
}
@Override public String getServiceAuthority() {
return expectedUri.getAuthority();
@ -2323,9 +2494,14 @@ public class ManagedChannelImplTest {
@Override public void refresh() {
assertNotNull(listener);
refreshCalled++;
resolved();
}
void resolved() {
if (error != null) {
listener.onError(error);
return;
}
listener.onAddresses(servers, Attributes.EMPTY);
}
@ -2333,33 +2509,35 @@ public class ManagedChannelImplTest {
shutdown = true;
}
}
private static class Builder {
private final URI expectedUri;
List<EquivalentAddressGroup> servers = ImmutableList.<EquivalentAddressGroup>of();
boolean resolvedAtStart = true;
Status error = null;
private Builder(URI expectedUri) {
this.expectedUri = expectedUri;
}
private static class FailingNameResolverFactory extends NameResolver.Factory {
final Status error;
private Builder setServers(List<EquivalentAddressGroup> servers) {
this.servers = servers;
return this;
}
FailingNameResolverFactory(Status error) {
private Builder setResolvedAtStart(boolean resolvedAtStart) {
this.resolvedAtStart = resolvedAtStart;
return this;
}
private Builder setError(Status error) {
this.error = error;
return this;
}
@Override
public NameResolver newNameResolver(URI notUsedUri, Attributes params) {
return new NameResolver() {
@Override public String getServiceAuthority() {
return "irrelevant-authority";
private FakeNameResolverFactory build() {
return new FakeNameResolverFactory(expectedUri, servers, resolvedAtStart, error);
}
@Override public void start(final Listener listener) {
listener.onError(error);
}
@Override public void shutdown() {}
};
}
@Override
public String getDefaultScheme() {
return "fake";
}
}
@ -2371,4 +2549,8 @@ public class ManagedChannelImplTest {
Instrumented<ChannelStats> instrumented) throws Exception {
return instrumented.getStats().get();
}
private FakeClock.ScheduledTask getNameResolverRefresh() {
return Iterables.getOnlyElement(timer.getPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER), null);
}
}