mirror of https://github.com/grpc/grpc-java.git
Introduce onResult2 in NameResolver Listener2 that returns Status
Lets the Name Resolver receive the status of the acceptance of the name resolution by the load balancer.
This commit is contained in:
parent
9bc1a93f6e
commit
90d0fabb1f
|
|
@ -246,6 +246,17 @@ public abstract class NameResolver {
|
|||
*/
|
||||
@Override
|
||||
public abstract void onError(Status error);
|
||||
|
||||
/**
|
||||
* Handles updates on resolved addresses and attributes.
|
||||
*
|
||||
* @param resolutionResult the resolved server addresses, attributes, and Service Config.
|
||||
* @since 1.66
|
||||
*/
|
||||
public Status onResult2(ResolutionResult resolutionResult) {
|
||||
onResult(resolutionResult);
|
||||
return Status.OK;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -330,7 +330,9 @@ public class DnsNameResolver extends NameResolver {
|
|||
resolutionResultBuilder.setAttributes(result.attributes);
|
||||
}
|
||||
}
|
||||
savedListener.onResult(resolutionResultBuilder.build());
|
||||
syncContext.execute(() -> {
|
||||
savedListener.onResult2(resolutionResultBuilder.build());
|
||||
});
|
||||
} catch (IOException e) {
|
||||
savedListener.onError(
|
||||
Status.UNAVAILABLE.withDescription("Unable to resolve host " + host).withCause(e));
|
||||
|
|
|
|||
|
|
@ -1673,11 +1673,24 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
public void onResult(final ResolutionResult resolutionResult) {
|
||||
final class NamesResolved implements Runnable {
|
||||
|
||||
@SuppressWarnings("ReferenceEquality")
|
||||
@Override
|
||||
public void run() {
|
||||
Status status = onResult2(resolutionResult);
|
||||
ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
|
||||
.get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
|
||||
resolutionResultListener.resolutionAttempted(status);
|
||||
}
|
||||
}
|
||||
|
||||
syncContext.execute(new NamesResolved());
|
||||
}
|
||||
|
||||
@SuppressWarnings("ReferenceEquality")
|
||||
@Override
|
||||
public Status onResult2(final ResolutionResult resolutionResult) {
|
||||
syncContext.throwIfNotInThisSynchronizationContext();
|
||||
if (ManagedChannelImpl.this.nameResolver != resolver) {
|
||||
return;
|
||||
return Status.OK;
|
||||
}
|
||||
|
||||
List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
|
||||
|
|
@ -1693,8 +1706,6 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
}
|
||||
|
||||
ConfigOrError configOrError = resolutionResult.getServiceConfig();
|
||||
ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
|
||||
.get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
|
||||
InternalConfigSelector resolvedConfigSelector =
|
||||
resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
|
||||
ManagedChannelServiceConfig validServiceConfig =
|
||||
|
|
@ -1751,10 +1762,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
// we later check for these error codes when investigating pick results in
|
||||
// GrpcUtil.getTransportFromPickResult().
|
||||
onError(configOrError.getError());
|
||||
if (resolutionResultListener != null) {
|
||||
resolutionResultListener.resolutionAttempted(configOrError.getError());
|
||||
}
|
||||
return;
|
||||
return configOrError.getError();
|
||||
} else {
|
||||
effectiveServiceConfig = lastServiceConfig;
|
||||
}
|
||||
|
|
@ -1798,21 +1806,14 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
}
|
||||
Attributes attributes = attrBuilder.build();
|
||||
|
||||
Status addressAcceptanceStatus = helper.lb.tryAcceptResolvedAddresses(
|
||||
return helper.lb.tryAcceptResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(servers)
|
||||
.setAttributes(attributes)
|
||||
.setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
|
||||
.build());
|
||||
// If a listener is provided, let it know if the addresses were accepted.
|
||||
if (resolutionResultListener != null) {
|
||||
resolutionResultListener.resolutionAttempted(addressAcceptanceStatus);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
syncContext.execute(new NamesResolved());
|
||||
return Status.OK;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -95,12 +95,24 @@ final class RetryingNameResolver extends ForwardingNameResolver {
|
|||
"RetryingNameResolver can only be used once to wrap a NameResolver");
|
||||
}
|
||||
|
||||
// To have retry behavior for name resolvers that haven't migrated to onResult2.
|
||||
delegateListener.onResult(resolutionResult.toBuilder().setAttributes(
|
||||
resolutionResult.getAttributes().toBuilder()
|
||||
.set(RESOLUTION_RESULT_LISTENER_KEY, new ResolutionResultListener()).build())
|
||||
.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status onResult2(ResolutionResult resolutionResult) {
|
||||
Status status = delegateListener.onResult2(resolutionResult);
|
||||
if (status.isOk()) {
|
||||
retryScheduler.reset();
|
||||
} else {
|
||||
retryScheduler.schedule(new DelayedNameResolverRefresh());
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Status error) {
|
||||
delegateListener.onError(error);
|
||||
|
|
|
|||
|
|
@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue;
|
|||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.ArgumentMatchers.isA;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
|
@ -226,13 +225,7 @@ public class DnsNameResolverTest {
|
|||
System.getProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY);
|
||||
|
||||
// By default the mock listener processes the result successfully.
|
||||
doAnswer(invocation -> {
|
||||
ResolutionResult result = invocation.getArgument(0);
|
||||
syncContext.execute(
|
||||
() -> result.getAttributes().get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY)
|
||||
.resolutionAttempted(Status.OK));
|
||||
return null;
|
||||
}).when(mockListener).onResult(isA(ResolutionResult.class));
|
||||
when(mockListener.onResult2(isA(ResolutionResult.class))).thenReturn(Status.OK);
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
@ -319,13 +312,13 @@ public class DnsNameResolverTest {
|
|||
|
||||
resolver.start(mockListener);
|
||||
assertEquals(1, fakeExecutor.runDueTasks());
|
||||
verify(mockListener).onResult(resultCaptor.capture());
|
||||
verify(mockListener).onResult2(resultCaptor.capture());
|
||||
assertAnswerMatches(answer1, 81, resultCaptor.getValue());
|
||||
assertEquals(0, fakeClock.numPendingTasks());
|
||||
|
||||
resolver.refresh();
|
||||
assertEquals(1, fakeExecutor.runDueTasks());
|
||||
verify(mockListener, times(2)).onResult(resultCaptor.capture());
|
||||
verify(mockListener, times(2)).onResult2(resultCaptor.capture());
|
||||
assertAnswerMatches(answer2, 81, resultCaptor.getValue());
|
||||
assertEquals(0, fakeClock.numPendingTasks());
|
||||
assertEquals(0, fakeExecutor.numPendingTasks());
|
||||
|
|
@ -347,7 +340,7 @@ public class DnsNameResolverTest {
|
|||
|
||||
resolver.start(mockListener);
|
||||
assertEquals(1, fakeExecutor.runDueTasks());
|
||||
verify(mockListener).onResult(resultCaptor.capture());
|
||||
verify(mockListener).onResult2(resultCaptor.capture());
|
||||
assertAnswerMatches(answer, 81, resultCaptor.getValue());
|
||||
assertEquals(0, fakeClock.numPendingTasks());
|
||||
assertEquals(0, fakeExecutor.numPendingTasks());
|
||||
|
|
@ -389,7 +382,7 @@ public class DnsNameResolverTest {
|
|||
|
||||
resolver.start(mockListener);
|
||||
assertEquals(0, fakeExecutor.runDueTasks());
|
||||
verify(mockListener).onResult(resultCaptor.capture());
|
||||
verify(mockListener).onResult2(resultCaptor.capture());
|
||||
assertAnswerMatches(answer, 81, resultCaptor.getValue());
|
||||
assertEquals(0, fakeClock.numPendingTasks());
|
||||
assertEquals(0, fakeExecutor.numPendingTasks());
|
||||
|
|
@ -418,7 +411,7 @@ public class DnsNameResolverTest {
|
|||
|
||||
resolver.start(mockListener);
|
||||
assertEquals(1, fakeExecutor.runDueTasks());
|
||||
verify(mockListener).onResult(resultCaptor.capture());
|
||||
verify(mockListener).onResult2(resultCaptor.capture());
|
||||
assertAnswerMatches(answer1, 81, resultCaptor.getValue());
|
||||
assertEquals(0, fakeClock.numPendingTasks());
|
||||
|
||||
|
|
@ -452,7 +445,7 @@ public class DnsNameResolverTest {
|
|||
|
||||
resolver.start(mockListener);
|
||||
assertEquals(1, fakeExecutor.runDueTasks());
|
||||
verify(mockListener).onResult(resultCaptor.capture());
|
||||
verify(mockListener).onResult2(resultCaptor.capture());
|
||||
assertAnswerMatches(answer, 81, resultCaptor.getValue());
|
||||
assertEquals(0, fakeClock.numPendingTasks());
|
||||
|
||||
|
|
@ -487,14 +480,14 @@ public class DnsNameResolverTest {
|
|||
|
||||
resolver.start(mockListener);
|
||||
assertEquals(1, fakeExecutor.runDueTasks());
|
||||
verify(mockListener).onResult(resultCaptor.capture());
|
||||
verify(mockListener).onResult2(resultCaptor.capture());
|
||||
assertAnswerMatches(answer1, 81, resultCaptor.getValue());
|
||||
assertEquals(0, fakeClock.numPendingTasks());
|
||||
|
||||
fakeTicker.advance(ttl + 1, TimeUnit.SECONDS);
|
||||
resolver.refresh();
|
||||
assertEquals(1, fakeExecutor.runDueTasks());
|
||||
verify(mockListener, times(2)).onResult(resultCaptor.capture());
|
||||
verify(mockListener, times(2)).onResult2(resultCaptor.capture());
|
||||
assertAnswerMatches(answer2, 81, resultCaptor.getValue());
|
||||
assertEquals(0, fakeClock.numPendingTasks());
|
||||
assertEquals(0, fakeExecutor.numPendingTasks());
|
||||
|
|
@ -531,7 +524,7 @@ public class DnsNameResolverTest {
|
|||
|
||||
resolver.start(mockListener);
|
||||
assertEquals(1, fakeExecutor.runDueTasks());
|
||||
verify(mockListener).onResult(resultCaptor.capture());
|
||||
verify(mockListener).onResult2(resultCaptor.capture());
|
||||
assertAnswerMatches(answer1, 81, resultCaptor.getValue());
|
||||
assertEquals(0, fakeClock.numPendingTasks());
|
||||
|
||||
|
|
@ -544,7 +537,7 @@ public class DnsNameResolverTest {
|
|||
fakeTicker.advance(1, TimeUnit.SECONDS);
|
||||
resolver.refresh();
|
||||
assertEquals(1, fakeExecutor.runDueTasks());
|
||||
verify(mockListener, times(2)).onResult(resultCaptor.capture());
|
||||
verify(mockListener, times(2)).onResult2(resultCaptor.capture());
|
||||
assertAnswerMatches(answer2, 81, resultCaptor.getValue());
|
||||
assertEquals(0, fakeClock.numPendingTasks());
|
||||
assertEquals(0, fakeExecutor.numPendingTasks());
|
||||
|
|
@ -575,7 +568,7 @@ public class DnsNameResolverTest {
|
|||
assertThat(fakeExecutor.runDueTasks()).isEqualTo(1);
|
||||
|
||||
ArgumentCaptor<ResolutionResult> ac = ArgumentCaptor.forClass(ResolutionResult.class);
|
||||
verify(mockListener).onResult(ac.capture());
|
||||
verify(mockListener).onResult2(ac.capture());
|
||||
verifyNoMoreInteractions(mockListener);
|
||||
assertThat(ac.getValue().getAddresses()).isEmpty();
|
||||
assertThat(ac.getValue().getServiceConfig()).isNull();
|
||||
|
|
@ -588,12 +581,7 @@ public class DnsNameResolverTest {
|
|||
// Load balancer rejects the empty addresses.
|
||||
@Test
|
||||
public void resolve_emptyResult_notAccepted() throws Exception {
|
||||
doAnswer(invocation -> {
|
||||
ResolutionResult result = invocation.getArgument(0);
|
||||
result.getAttributes().get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY)
|
||||
.resolutionAttempted(Status.UNAVAILABLE);
|
||||
return null;
|
||||
}).when(mockListener).onResult(isA(ResolutionResult.class));
|
||||
when(mockListener.onResult2(isA(ResolutionResult.class))).thenReturn(Status.UNAVAILABLE);
|
||||
|
||||
DnsNameResolver.enableTxt = true;
|
||||
RetryingNameResolver resolver = newResolver("dns:///addr.fake:1234", 443);
|
||||
|
|
@ -614,7 +602,7 @@ public class DnsNameResolverTest {
|
|||
syncContext.execute(() -> assertThat(fakeExecutor.runDueTasks()).isEqualTo(1));
|
||||
|
||||
ArgumentCaptor<ResolutionResult> ac = ArgumentCaptor.forClass(ResolutionResult.class);
|
||||
verify(mockListener).onResult(ac.capture());
|
||||
verify(mockListener).onResult2(ac.capture());
|
||||
verifyNoMoreInteractions(mockListener);
|
||||
assertThat(ac.getValue().getAddresses()).isEmpty();
|
||||
assertThat(ac.getValue().getServiceConfig()).isNull();
|
||||
|
|
@ -640,7 +628,7 @@ public class DnsNameResolverTest {
|
|||
dnsResolver.setResourceResolver(null);
|
||||
resolver.start(mockListener);
|
||||
assertEquals(1, fakeExecutor.runDueTasks());
|
||||
verify(mockListener).onResult(resultCaptor.capture());
|
||||
verify(mockListener).onResult2(resultCaptor.capture());
|
||||
ResolutionResult result = resultCaptor.getValue();
|
||||
InetSocketAddress resolvedBackendAddr =
|
||||
(InetSocketAddress) Iterables.getOnlyElement(
|
||||
|
|
@ -712,7 +700,7 @@ public class DnsNameResolverTest {
|
|||
|
||||
resolver.start(mockListener);
|
||||
assertEquals(1, fakeExecutor.runDueTasks());
|
||||
verify(mockListener).onResult(resultCaptor.capture());
|
||||
verify(mockListener).onResult2(resultCaptor.capture());
|
||||
ResolutionResult result = resultCaptor.getValue();
|
||||
InetSocketAddress resolvedBackendAddr =
|
||||
(InetSocketAddress) Iterables.getOnlyElement(
|
||||
|
|
@ -770,7 +758,7 @@ public class DnsNameResolverTest {
|
|||
dnsResolver.setResourceResolver(mockResourceResolver);
|
||||
resolver.start(mockListener);
|
||||
assertEquals(1, fakeExecutor.runDueTasks());
|
||||
verify(mockListener).onResult(resultCaptor.capture());
|
||||
verify(mockListener).onResult2(resultCaptor.capture());
|
||||
ResolutionResult result = resultCaptor.getValue();
|
||||
InetSocketAddress resolvedBackendAddr =
|
||||
(InetSocketAddress) Iterables.getOnlyElement(
|
||||
|
|
@ -802,7 +790,7 @@ public class DnsNameResolverTest {
|
|||
dnsResolver.setResourceResolver(mockResourceResolver);
|
||||
resolver.start(mockListener);
|
||||
assertEquals(1, fakeExecutor.runDueTasks());
|
||||
verify(mockListener).onResult(resultCaptor.capture());
|
||||
verify(mockListener).onResult2(resultCaptor.capture());
|
||||
ResolutionResult result = resultCaptor.getValue();
|
||||
InetSocketAddress resolvedBackendAddr =
|
||||
(InetSocketAddress) Iterables.getOnlyElement(
|
||||
|
|
@ -870,7 +858,7 @@ public class DnsNameResolverTest {
|
|||
resolver.start(mockListener);
|
||||
assertEquals(1, fakeExecutor.runDueTasks());
|
||||
|
||||
verify(mockListener).onResult(resultCaptor.capture());
|
||||
verify(mockListener).onResult2(resultCaptor.capture());
|
||||
List<EquivalentAddressGroup> result = resultCaptor.getValue().getAddresses();
|
||||
assertThat(result).hasSize(1);
|
||||
EquivalentAddressGroup eag = result.get(0);
|
||||
|
|
|
|||
|
|
@ -1054,6 +1054,79 @@ public class ManagedChannelImplTest {
|
|||
verifyNoMoreInteractions(mockLoadBalancer);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void noMoreCallbackAfterLoadBalancerShutdown_configError() throws InterruptedException {
|
||||
FakeNameResolverFactory nameResolverFactory =
|
||||
new FakeNameResolverFactory.Builder(expectedUri)
|
||||
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
|
||||
.build();
|
||||
channelBuilder.nameResolverFactory(nameResolverFactory);
|
||||
Status resolutionError = Status.UNAVAILABLE.withDescription("Resolution failed");
|
||||
createChannel();
|
||||
|
||||
FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
|
||||
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
|
||||
verify(mockLoadBalancer).acceptResolvedAddresses(resolvedAddressCaptor.capture());
|
||||
assertThat(resolvedAddressCaptor.getValue().getAddresses()).containsExactly(addressGroup);
|
||||
|
||||
SubchannelStateListener stateListener1 = mock(SubchannelStateListener.class);
|
||||
SubchannelStateListener stateListener2 = mock(SubchannelStateListener.class);
|
||||
Subchannel subchannel1 =
|
||||
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, stateListener1);
|
||||
Subchannel subchannel2 =
|
||||
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, stateListener2);
|
||||
requestConnectionSafely(helper, subchannel1);
|
||||
requestConnectionSafely(helper, subchannel2);
|
||||
verify(mockTransportFactory, times(2))
|
||||
.newClientTransport(
|
||||
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
|
||||
MockClientTransportInfo transportInfo1 = transports.poll();
|
||||
MockClientTransportInfo transportInfo2 = transports.poll();
|
||||
|
||||
// LoadBalancer receives all sorts of callbacks
|
||||
transportInfo1.listener.transportReady();
|
||||
|
||||
verify(stateListener1, times(2)).onSubchannelState(stateInfoCaptor.capture());
|
||||
assertSame(CONNECTING, stateInfoCaptor.getAllValues().get(0).getState());
|
||||
assertSame(READY, stateInfoCaptor.getAllValues().get(1).getState());
|
||||
|
||||
verify(stateListener2).onSubchannelState(stateInfoCaptor.capture());
|
||||
assertSame(CONNECTING, stateInfoCaptor.getValue().getState());
|
||||
|
||||
resolver.listener.onError(resolutionError);
|
||||
verify(mockLoadBalancer).handleNameResolutionError(resolutionError);
|
||||
|
||||
verifyNoMoreInteractions(mockLoadBalancer);
|
||||
|
||||
channel.shutdown();
|
||||
verify(mockLoadBalancer).shutdown();
|
||||
verifyNoMoreInteractions(stateListener1, stateListener2);
|
||||
|
||||
// LoadBalancer will normally shutdown all subchannels
|
||||
shutdownSafely(helper, subchannel1);
|
||||
shutdownSafely(helper, subchannel2);
|
||||
|
||||
// Since subchannels are shutdown, SubchannelStateListeners will only get SHUTDOWN regardless of
|
||||
// the transport states.
|
||||
transportInfo1.listener.transportShutdown(Status.UNAVAILABLE);
|
||||
transportInfo2.listener.transportReady();
|
||||
verify(stateListener1).onSubchannelState(ConnectivityStateInfo.forNonError(SHUTDOWN));
|
||||
verify(stateListener2).onSubchannelState(ConnectivityStateInfo.forNonError(SHUTDOWN));
|
||||
verifyNoMoreInteractions(stateListener1, stateListener2);
|
||||
|
||||
// No more callback should be delivered to LoadBalancer after it's shut down
|
||||
resolver.listener.onResult(
|
||||
ResolutionResult.newBuilder()
|
||||
.setAddresses(new ArrayList<>())
|
||||
.setServiceConfig(
|
||||
ConfigOrError.fromError(Status.UNAVAILABLE.withDescription("Resolution failed")))
|
||||
.build());
|
||||
Thread.sleep(1100);
|
||||
assertThat(timer.getPendingTasks()).isEmpty();
|
||||
resolver.resolved();
|
||||
verifyNoMoreInteractions(mockLoadBalancer);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void interceptor() throws Exception {
|
||||
final AtomicLong atomic = new AtomicLong();
|
||||
|
|
@ -3138,6 +3211,48 @@ public class ManagedChannelImplTest {
|
|||
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void channelTracing_nameResolvedEvent_zeorAndNonzeroBackends_usesListener2onResult2()
|
||||
throws Exception {
|
||||
timer.forwardNanos(1234);
|
||||
channelBuilder.maxTraceEvents(10);
|
||||
List<EquivalentAddressGroup> servers = new ArrayList<>();
|
||||
servers.add(new EquivalentAddressGroup(socketAddress));
|
||||
FakeNameResolverFactory nameResolverFactory =
|
||||
new FakeNameResolverFactory.Builder(expectedUri).setServers(servers).build();
|
||||
channelBuilder.nameResolverFactory(nameResolverFactory);
|
||||
createChannel();
|
||||
|
||||
int prevSize = getStats(channel).channelTrace.events.size();
|
||||
ResolutionResult resolutionResult1 = ResolutionResult.newBuilder()
|
||||
.setAddresses(Collections.singletonList(
|
||||
new EquivalentAddressGroup(
|
||||
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
|
||||
.build();
|
||||
|
||||
channel.syncContext.execute(
|
||||
() -> nameResolverFactory.resolvers.get(0).listener.onResult2(resolutionResult1));
|
||||
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
|
||||
|
||||
prevSize = getStats(channel).channelTrace.events.size();
|
||||
nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL);
|
||||
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
|
||||
|
||||
prevSize = getStats(channel).channelTrace.events.size();
|
||||
nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL);
|
||||
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
|
||||
|
||||
prevSize = getStats(channel).channelTrace.events.size();
|
||||
ResolutionResult resolutionResult2 = ResolutionResult.newBuilder()
|
||||
.setAddresses(Collections.singletonList(
|
||||
new EquivalentAddressGroup(
|
||||
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
|
||||
.build();
|
||||
channel.syncContext.execute(
|
||||
() -> nameResolverFactory.resolvers.get(0).listener.onResult2(resolutionResult2));
|
||||
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void channelTracing_serviceConfigChange() throws Exception {
|
||||
timer.forwardNanos(1234);
|
||||
|
|
@ -3197,6 +3312,69 @@ public class ManagedChannelImplTest {
|
|||
.build());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void channelTracing_serviceConfigChange_usesListener2OnResult2() throws Exception {
|
||||
timer.forwardNanos(1234);
|
||||
channelBuilder.maxTraceEvents(10);
|
||||
List<EquivalentAddressGroup> servers = new ArrayList<>();
|
||||
servers.add(new EquivalentAddressGroup(socketAddress));
|
||||
FakeNameResolverFactory nameResolverFactory =
|
||||
new FakeNameResolverFactory.Builder(expectedUri).setServers(servers).build();
|
||||
channelBuilder.nameResolverFactory(nameResolverFactory);
|
||||
createChannel();
|
||||
|
||||
int prevSize = getStats(channel).channelTrace.events.size();
|
||||
ManagedChannelServiceConfig mcsc1 = createManagedChannelServiceConfig(
|
||||
ImmutableMap.<String, Object>of(),
|
||||
new PolicySelection(
|
||||
mockLoadBalancerProvider, null));
|
||||
ResolutionResult resolutionResult1 = ResolutionResult.newBuilder()
|
||||
.setAddresses(Collections.singletonList(
|
||||
new EquivalentAddressGroup(
|
||||
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
|
||||
.setServiceConfig(ConfigOrError.fromConfig(mcsc1))
|
||||
.build();
|
||||
|
||||
channel.syncContext.execute(() ->
|
||||
nameResolverFactory.resolvers.get(0).listener.onResult2(resolutionResult1));
|
||||
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
|
||||
assertThat(getStats(channel).channelTrace.events.get(prevSize))
|
||||
.isEqualTo(new ChannelTrace.Event.Builder()
|
||||
.setDescription("Service config changed")
|
||||
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
|
||||
.setTimestampNanos(timer.getTicker().read())
|
||||
.build());
|
||||
|
||||
prevSize = getStats(channel).channelTrace.events.size();
|
||||
ResolutionResult resolutionResult2 = ResolutionResult.newBuilder().setAddresses(
|
||||
Collections.singletonList(
|
||||
new EquivalentAddressGroup(
|
||||
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
|
||||
.setServiceConfig(ConfigOrError.fromConfig(mcsc1))
|
||||
.build();
|
||||
channel.syncContext.execute(() ->
|
||||
nameResolverFactory.resolvers.get(0).listener.onResult(resolutionResult2));
|
||||
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
|
||||
|
||||
prevSize = getStats(channel).channelTrace.events.size();
|
||||
timer.forwardNanos(1234);
|
||||
ResolutionResult resolutionResult3 = ResolutionResult.newBuilder()
|
||||
.setAddresses(Collections.singletonList(
|
||||
new EquivalentAddressGroup(
|
||||
Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))))
|
||||
.setServiceConfig(ConfigOrError.fromConfig(ManagedChannelServiceConfig.empty()))
|
||||
.build();
|
||||
channel.syncContext.execute(() ->
|
||||
nameResolverFactory.resolvers.get(0).listener.onResult(resolutionResult3));
|
||||
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
|
||||
assertThat(getStats(channel).channelTrace.events.get(prevSize))
|
||||
.isEqualTo(new ChannelTrace.Event.Builder()
|
||||
.setDescription("Service config changed")
|
||||
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
|
||||
.setTimestampNanos(timer.getTicker().read())
|
||||
.build());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void channelTracing_stateChangeEvent() throws Exception {
|
||||
channelBuilder.maxTraceEvents(10);
|
||||
|
|
@ -3857,6 +4035,120 @@ public class ManagedChannelImplTest {
|
|||
mychannel.shutdownNow();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void badServiceConfigIsRecoverable_usesListener2OnResult2() throws Exception {
|
||||
final List<EquivalentAddressGroup> addresses =
|
||||
ImmutableList.of(new EquivalentAddressGroup(new SocketAddress() {}));
|
||||
final class FakeNameResolver extends NameResolver {
|
||||
Listener2 listener;
|
||||
private final SynchronizationContext syncContext;
|
||||
|
||||
FakeNameResolver(Args args) {
|
||||
this.syncContext = args.getSynchronizationContext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getServiceAuthority() {
|
||||
return "also fake";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(Listener2 listener) {
|
||||
this.listener = listener;
|
||||
syncContext.execute(() ->
|
||||
listener.onResult2(
|
||||
ResolutionResult.newBuilder()
|
||||
.setAddresses(addresses)
|
||||
.setServiceConfig(
|
||||
ConfigOrError.fromError(
|
||||
Status.INTERNAL.withDescription("kaboom is invalid")))
|
||||
.build()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {}
|
||||
}
|
||||
|
||||
final class FakeNameResolverFactory2 extends NameResolver.Factory {
|
||||
FakeNameResolver resolver;
|
||||
ManagedChannelImpl managedChannel;
|
||||
SynchronizationContext syncContext;
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
|
||||
syncContext = args.getSynchronizationContext();
|
||||
return (resolver = new FakeNameResolver(args));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDefaultScheme() {
|
||||
return "fake";
|
||||
}
|
||||
}
|
||||
|
||||
FakeNameResolverFactory2 factory = new FakeNameResolverFactory2();
|
||||
|
||||
ManagedChannelImplBuilder customBuilder = new ManagedChannelImplBuilder(TARGET,
|
||||
new ClientTransportFactoryBuilder() {
|
||||
@Override
|
||||
public ClientTransportFactory buildClientTransportFactory() {
|
||||
return mockTransportFactory;
|
||||
}
|
||||
},
|
||||
null);
|
||||
when(mockTransportFactory.getSupportedSocketAddressTypes()).thenReturn(Collections.singleton(
|
||||
InetSocketAddress.class));
|
||||
customBuilder.executorPool = executorPool;
|
||||
customBuilder.channelz = channelz;
|
||||
ManagedChannel mychannel = customBuilder.nameResolverFactory(factory).build();
|
||||
|
||||
ClientCall<Void, Void> call1 =
|
||||
mychannel.newCall(TestMethodDescriptors.voidMethod(), CallOptions.DEFAULT);
|
||||
ListenableFuture<Void> future1 = ClientCalls.futureUnaryCall(call1, null);
|
||||
executor.runDueTasks();
|
||||
try {
|
||||
future1.get(1, TimeUnit.SECONDS);
|
||||
Assert.fail();
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("kaboom");
|
||||
}
|
||||
|
||||
// ok the service config is bad, let's fix it.
|
||||
Map<String, Object> rawServiceConfig =
|
||||
parseConfig("{\"loadBalancingConfig\": [{\"round_robin\": {}}]}");
|
||||
Object fakeLbConfig = new Object();
|
||||
PolicySelection lbConfigs =
|
||||
new PolicySelection(
|
||||
mockLoadBalancerProvider, fakeLbConfig);
|
||||
mockLoadBalancerProvider.parseLoadBalancingPolicyConfig(rawServiceConfig);
|
||||
ManagedChannelServiceConfig managedChannelServiceConfig =
|
||||
createManagedChannelServiceConfig(rawServiceConfig, lbConfigs);
|
||||
factory.syncContext.execute(() ->
|
||||
factory.resolver.listener.onResult2(
|
||||
ResolutionResult.newBuilder()
|
||||
.setAddresses(addresses)
|
||||
.setServiceConfig(ConfigOrError.fromConfig(managedChannelServiceConfig))
|
||||
.build()));
|
||||
|
||||
ClientCall<Void, Void> call2 = mychannel.newCall(
|
||||
TestMethodDescriptors.voidMethod(),
|
||||
CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS));
|
||||
ListenableFuture<Void> future2 = ClientCalls.futureUnaryCall(call2, null);
|
||||
|
||||
timer.forwardTime(1234, TimeUnit.SECONDS);
|
||||
|
||||
executor.runDueTasks();
|
||||
try {
|
||||
future2.get();
|
||||
Assert.fail();
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("deadline");
|
||||
}
|
||||
|
||||
mychannel.shutdownNow();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void nameResolverArgsPropagation() {
|
||||
final AtomicReference<NameResolver.Args> capturedArgs = new AtomicReference<>();
|
||||
|
|
@ -4518,7 +4810,7 @@ public class ManagedChannelImplTest {
|
|||
}
|
||||
assertEquals(DEFAULT_PORT, args.getDefaultPort());
|
||||
FakeNameResolverFactory.FakeNameResolver resolver =
|
||||
new FakeNameResolverFactory.FakeNameResolver(targetUri, error);
|
||||
new FakeNameResolverFactory.FakeNameResolver(targetUri, error, args);
|
||||
resolvers.add(resolver);
|
||||
return resolver;
|
||||
}
|
||||
|
|
@ -4546,14 +4838,16 @@ public class ManagedChannelImplTest {
|
|||
|
||||
final class FakeNameResolver extends NameResolver {
|
||||
final URI targetUri;
|
||||
final SynchronizationContext syncContext;
|
||||
Listener2 listener;
|
||||
boolean shutdown;
|
||||
int refreshCalled;
|
||||
Status error;
|
||||
|
||||
FakeNameResolver(URI targetUri, Status error) {
|
||||
FakeNameResolver(URI targetUri, Status error, Args args) {
|
||||
this.targetUri = targetUri;
|
||||
this.error = error;
|
||||
syncContext = args.getSynchronizationContext();
|
||||
}
|
||||
|
||||
@Override public String getServiceAuthority() {
|
||||
|
|
@ -4585,7 +4879,7 @@ public class ManagedChannelImplTest {
|
|||
if (configOrError != null) {
|
||||
builder.setServiceConfig(configOrError);
|
||||
}
|
||||
listener.onResult(builder.build());
|
||||
syncContext.execute(() -> listener.onResult(builder.build()));
|
||||
}
|
||||
|
||||
@Override public void shutdown() {
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import static org.junit.Assert.fail;
|
|||
import static org.mockito.ArgumentMatchers.isA;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import io.grpc.NameResolver;
|
||||
import io.grpc.NameResolver.Listener2;
|
||||
|
|
@ -79,7 +80,7 @@ public class RetryingNameResolverTest {
|
|||
// Make sure the ResolutionResultListener callback is added to the ResolutionResult attributes,
|
||||
// and the retry scheduler is reset since the name resolution was successful.
|
||||
@Test
|
||||
public void onResult_sucess() {
|
||||
public void onResult_success() {
|
||||
retryingNameResolver.start(mockListener);
|
||||
verify(mockNameResolver).start(listenerCaptor.capture());
|
||||
|
||||
|
|
@ -94,6 +95,18 @@ public class RetryingNameResolverTest {
|
|||
verify(mockRetryScheduler).reset();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void onResult2_sucesss() {
|
||||
when(mockListener.onResult2(isA(ResolutionResult.class))).thenReturn(Status.OK);
|
||||
retryingNameResolver.start(mockListener);
|
||||
verify(mockNameResolver).start(listenerCaptor.capture());
|
||||
|
||||
assertThat(listenerCaptor.getValue().onResult2(ResolutionResult.newBuilder().build()))
|
||||
.isEqualTo(Status.OK);
|
||||
|
||||
verify(mockRetryScheduler).reset();
|
||||
}
|
||||
|
||||
// Make sure the ResolutionResultListener callback is added to the ResolutionResult attributes,
|
||||
// and that a retry gets scheduled when the resolution results are rejected.
|
||||
@Test
|
||||
|
|
@ -112,6 +125,19 @@ public class RetryingNameResolverTest {
|
|||
verify(mockRetryScheduler).schedule(isA(Runnable.class));
|
||||
}
|
||||
|
||||
// Make sure that a retry gets scheduled when the resolution results are rejected.
|
||||
@Test
|
||||
public void onResult2_failure() {
|
||||
when(mockListener.onResult2(isA(ResolutionResult.class))).thenReturn(Status.UNAVAILABLE);
|
||||
retryingNameResolver.start(mockListener);
|
||||
verify(mockNameResolver).start(listenerCaptor.capture());
|
||||
|
||||
assertThat(listenerCaptor.getValue().onResult2(ResolutionResult.newBuilder().build()))
|
||||
.isEqualTo(Status.UNAVAILABLE);
|
||||
|
||||
verify(mockRetryScheduler).schedule(isA(Runnable.class));
|
||||
}
|
||||
|
||||
// Wrapping a NameResolver more than once is a misconfiguration.
|
||||
@Test
|
||||
public void onResult_failure_doubleWrapped() {
|
||||
|
|
|
|||
|
|
@ -152,7 +152,7 @@ public class GrpclbNameResolverTest {
|
|||
resolver.start(mockListener);
|
||||
assertThat(fakeClock.runDueTasks()).isEqualTo(1);
|
||||
|
||||
verify(mockListener).onResult(resultCaptor.capture());
|
||||
verify(mockListener).onResult2(resultCaptor.capture());
|
||||
ResolutionResult result = resultCaptor.getValue();
|
||||
assertThat(result.getAddresses()).isEmpty();
|
||||
assertThat(result.getAttributes()).isEqualTo(Attributes.EMPTY);
|
||||
|
|
@ -192,7 +192,7 @@ public class GrpclbNameResolverTest {
|
|||
|
||||
resolver.start(mockListener);
|
||||
assertThat(fakeClock.runDueTasks()).isEqualTo(1);
|
||||
verify(mockListener).onResult(resultCaptor.capture());
|
||||
verify(mockListener).onResult2(resultCaptor.capture());
|
||||
ResolutionResult result = resultCaptor.getValue();
|
||||
InetSocketAddress resolvedBackendAddr =
|
||||
(InetSocketAddress) Iterables.getOnlyElement(
|
||||
|
|
@ -225,7 +225,7 @@ public class GrpclbNameResolverTest {
|
|||
|
||||
resolver.start(mockListener);
|
||||
assertThat(fakeClock.runDueTasks()).isEqualTo(1);
|
||||
verify(mockListener).onResult(resultCaptor.capture());
|
||||
verify(mockListener).onResult2(resultCaptor.capture());
|
||||
ResolutionResult result = resultCaptor.getValue();
|
||||
assertThat(result.getAddresses())
|
||||
.containsExactly(
|
||||
|
|
@ -272,7 +272,7 @@ public class GrpclbNameResolverTest {
|
|||
|
||||
resolver.start(mockListener);
|
||||
assertThat(fakeClock.runDueTasks()).isEqualTo(1);
|
||||
verify(mockListener).onResult(resultCaptor.capture());
|
||||
verify(mockListener).onResult2(resultCaptor.capture());
|
||||
ResolutionResult result = resultCaptor.getValue();
|
||||
assertThat(result.getAddresses()).isEmpty();
|
||||
EquivalentAddressGroup resolvedBalancerAddr =
|
||||
|
|
@ -306,7 +306,7 @@ public class GrpclbNameResolverTest {
|
|||
|
||||
resolver.start(mockListener);
|
||||
assertThat(fakeClock.runDueTasks()).isEqualTo(1);
|
||||
verify(mockListener).onResult(resultCaptor.capture());
|
||||
verify(mockListener).onResult2(resultCaptor.capture());
|
||||
ResolutionResult result = resultCaptor.getValue();
|
||||
|
||||
InetSocketAddress resolvedBackendAddr =
|
||||
|
|
|
|||
Loading…
Reference in New Issue