mirror of https://github.com/grpc/grpc-java.git
xds: Fix cluster selection races when updating config selector
Listener2.onResult() doesn't require running in the sync context, so when called from the sync context it is guaranteed not to do its processing immediately (instead, it schedules work into the sync context). The code was doing an update dance: 1) update service config to add new cluster, 2) update config selector to use new cluster, 3) update service config to remove old clusters. But the onResult() wasn't being processed immediately, so the actual execution order was 2, 1, 3 which has a small window where RPCs will fail. But onResult2() does run immediately. And sinceca4819ac6, updateBalancingState() updates the picker immediately. cleanUpRoutes() was also racy because it updated the routingConfig before swapping to the new config selector, so RPCs could fail saying there was no route instead of the useful error message. Even with the opposite order, some RPCs may be executing the while loop of selectConfig(), trying to acquire a cluster. The code unreffed the clusters before updating the routingConfig, so those RPCs could go into a tight loop until the routingConfig was updated. Also, once the routingConfig was updated to EMPTY those RPCs would similarly see the wrong error message. To give the correct error message, selectConfig() must fail such RPCs directly, and once it can do that there's no need to stop using the config selector in error cases. This has the benefit of fewer moving parts and more consistent threading among cases. The added test was able to detect the race 2% of the time. The slower the code/machine, the more reliable the test failed.ca4819ac6along with this commit reduced it to 0 failures in 1000 runs. Discovered when investigating b/394850611
This commit is contained in:
parent
ca4819ac6d
commit
d82613a74c
|
|
@ -132,7 +132,7 @@ final class XdsNameResolver extends NameResolver {
|
|||
// NamedFilterConfig.filterStateKey -> filter_instance.
|
||||
private final HashMap<String, Filter> activeFilters = new HashMap<>();
|
||||
|
||||
private volatile RoutingConfig routingConfig = RoutingConfig.EMPTY;
|
||||
private volatile RoutingConfig routingConfig;
|
||||
private Listener2 listener;
|
||||
private ObjectPool<XdsClient> xdsClientPool;
|
||||
private XdsClient xdsClient;
|
||||
|
|
@ -306,7 +306,7 @@ final class XdsNameResolver extends NameResolver {
|
|||
|
||||
if (logger.isLoggable(XdsLogLevel.INFO)) {
|
||||
logger.log(
|
||||
XdsLogLevel.INFO, "Generated service config:\n{0}", new Gson().toJson(rawServiceConfig));
|
||||
XdsLogLevel.INFO, "Generated service config: {0}", new Gson().toJson(rawServiceConfig));
|
||||
}
|
||||
ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
|
||||
Attributes attrs =
|
||||
|
|
@ -320,7 +320,7 @@ final class XdsNameResolver extends NameResolver {
|
|||
.setAttributes(attrs)
|
||||
.setServiceConfig(parsedServiceConfig)
|
||||
.build();
|
||||
listener.onResult(result);
|
||||
listener.onResult2(result);
|
||||
receivedConfig = true;
|
||||
}
|
||||
|
||||
|
|
@ -395,6 +395,9 @@ final class XdsNameResolver extends NameResolver {
|
|||
String path = "/" + args.getMethodDescriptor().getFullMethodName();
|
||||
do {
|
||||
routingCfg = routingConfig;
|
||||
if (routingCfg.errorStatus != null) {
|
||||
return Result.forError(routingCfg.errorStatus);
|
||||
}
|
||||
selectedRoute = null;
|
||||
for (RouteData route : routingCfg.routes) {
|
||||
if (RoutingUtils.matchRoute(route.routeMatch, path, headers, random)) {
|
||||
|
|
@ -626,19 +629,6 @@ final class XdsNameResolver extends NameResolver {
|
|||
return "cluster_specifier_plugin:" + pluginName;
|
||||
}
|
||||
|
||||
private static final class FailingConfigSelector extends InternalConfigSelector {
|
||||
private final Result result;
|
||||
|
||||
public FailingConfigSelector(Status error) {
|
||||
this.result = Result.forError(error);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result selectConfig(PickSubchannelArgs args) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
private class ResolveState implements ResourceWatcher<XdsListenerResource.LdsUpdate> {
|
||||
private final ConfigOrError emptyServiceConfig =
|
||||
serviceConfigParser.parseServiceConfig(Collections.<String, Object>emptyMap());
|
||||
|
|
@ -835,13 +825,13 @@ final class XdsNameResolver extends NameResolver {
|
|||
}
|
||||
}
|
||||
// Update service config to include newly added clusters.
|
||||
if (shouldUpdateResult) {
|
||||
if (shouldUpdateResult && routingConfig != null) {
|
||||
updateResolutionResult();
|
||||
shouldUpdateResult = false;
|
||||
}
|
||||
// Make newly added clusters selectable by config selector and deleted clusters no longer
|
||||
// selectable.
|
||||
routingConfig = new RoutingConfig(httpMaxStreamDurationNano, routesData.build());
|
||||
shouldUpdateResult = false;
|
||||
for (String cluster : deletedClusters) {
|
||||
int count = clusterRefs.get(cluster).refCount.decrementAndGet();
|
||||
if (count == 0) {
|
||||
|
|
@ -893,6 +883,9 @@ final class XdsNameResolver extends NameResolver {
|
|||
}
|
||||
|
||||
private void cleanUpRoutes(String error) {
|
||||
String errorWithNodeId =
|
||||
error + ", xDS node ID: " + xdsClient.getBootstrapInfo().node().getId();
|
||||
routingConfig = new RoutingConfig(Status.UNAVAILABLE.withDescription(errorWithNodeId));
|
||||
if (existingClusters != null) {
|
||||
for (String cluster : existingClusters) {
|
||||
int count = clusterRefs.get(cluster).refCount.decrementAndGet();
|
||||
|
|
@ -902,17 +895,12 @@ final class XdsNameResolver extends NameResolver {
|
|||
}
|
||||
existingClusters = null;
|
||||
}
|
||||
routingConfig = RoutingConfig.EMPTY;
|
||||
|
||||
// Without addresses the default LB (normally pick_first) should become TRANSIENT_FAILURE, and
|
||||
// the config selector handles the error message itself. Once the LB API allows providing
|
||||
// failure information for addresses yet still providing a service config, the config seector
|
||||
// could be avoided.
|
||||
String errorWithNodeId =
|
||||
error + ", xDS node ID: " + xdsClient.getBootstrapInfo().node().getId();
|
||||
listener.onResult(ResolutionResult.newBuilder()
|
||||
// the config selector handles the error message itself.
|
||||
listener.onResult2(ResolutionResult.newBuilder()
|
||||
.setAttributes(Attributes.newBuilder()
|
||||
.set(InternalConfigSelector.KEY,
|
||||
new FailingConfigSelector(Status.UNAVAILABLE.withDescription(errorWithNodeId)))
|
||||
.set(InternalConfigSelector.KEY, configSelector)
|
||||
.build())
|
||||
.setServiceConfig(emptyServiceConfig)
|
||||
.build());
|
||||
|
|
@ -983,12 +971,19 @@ final class XdsNameResolver extends NameResolver {
|
|||
private static class RoutingConfig {
|
||||
private final long fallbackTimeoutNano;
|
||||
final ImmutableList<RouteData> routes;
|
||||
|
||||
private static final RoutingConfig EMPTY = new RoutingConfig(0, ImmutableList.of());
|
||||
final Status errorStatus;
|
||||
|
||||
private RoutingConfig(long fallbackTimeoutNano, ImmutableList<RouteData> routes) {
|
||||
this.fallbackTimeoutNano = fallbackTimeoutNano;
|
||||
this.routes = checkNotNull(routes, "routes");
|
||||
this.errorStatus = null;
|
||||
}
|
||||
|
||||
private RoutingConfig(Status errorStatus) {
|
||||
this.fallbackTimeoutNano = 0;
|
||||
this.routes = null;
|
||||
this.errorStatus = checkNotNull(errorStatus, "errorStatus");
|
||||
checkArgument(!errorStatus.isOk(), "errorStatus should not be okay");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,9 +19,12 @@ package io.grpc.xds;
|
|||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static io.grpc.xds.DataPlaneRule.ENDPOINT_HOST_NAME;
|
||||
import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_CDS;
|
||||
import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_EDS;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import com.github.xds.type.v3.TypedStruct;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.protobuf.Any;
|
||||
import com.google.protobuf.Struct;
|
||||
import com.google.protobuf.Value;
|
||||
|
|
@ -36,11 +39,17 @@ import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
|
|||
import io.envoyproxy.envoy.config.endpoint.v3.Endpoint;
|
||||
import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint;
|
||||
import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints;
|
||||
import io.envoyproxy.envoy.config.route.v3.Route;
|
||||
import io.envoyproxy.envoy.config.route.v3.RouteAction;
|
||||
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
|
||||
import io.envoyproxy.envoy.config.route.v3.RouteMatch;
|
||||
import io.envoyproxy.envoy.config.route.v3.VirtualHost;
|
||||
import io.envoyproxy.envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.ClientCall;
|
||||
import io.grpc.ClientInterceptor;
|
||||
import io.grpc.ClientStreamTracer;
|
||||
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
|
||||
import io.grpc.ForwardingClientCallListener;
|
||||
import io.grpc.LoadBalancerRegistry;
|
||||
|
|
@ -89,8 +98,7 @@ public class FakeControlPlaneXdsIntegrationTest {
|
|||
ManagedChannel channel = dataPlane.getManagedChannel();
|
||||
SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub = SimpleServiceGrpc.newBlockingStub(
|
||||
channel);
|
||||
SimpleRequest request = SimpleRequest.newBuilder()
|
||||
.build();
|
||||
SimpleRequest request = SimpleRequest.getDefaultInstance();
|
||||
SimpleResponse goldenResponse = SimpleResponse.newBuilder()
|
||||
.setResponseMessage("Hi, xDS! Authority= test-server")
|
||||
.build();
|
||||
|
|
@ -104,8 +112,7 @@ public class FakeControlPlaneXdsIntegrationTest {
|
|||
ManagedChannel channel = dataPlane.getManagedChannel();
|
||||
SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub = SimpleServiceGrpc.newBlockingStub(
|
||||
channel);
|
||||
SimpleRequest request = SimpleRequest.newBuilder()
|
||||
.build();
|
||||
SimpleRequest request = SimpleRequest.getDefaultInstance();
|
||||
SimpleResponse goldenResponse = SimpleResponse.newBuilder()
|
||||
.setResponseMessage("Hi, xDS! Authority= " + ENDPOINT_HOST_NAME)
|
||||
.build();
|
||||
|
|
@ -145,8 +152,7 @@ public class FakeControlPlaneXdsIntegrationTest {
|
|||
// We add an interceptor to catch the response headers from the server.
|
||||
SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub = SimpleServiceGrpc.newBlockingStub(
|
||||
dataPlane.getManagedChannel()).withInterceptors(responseHeaderInterceptor);
|
||||
SimpleRequest request = SimpleRequest.newBuilder()
|
||||
.build();
|
||||
SimpleRequest request = SimpleRequest.getDefaultInstance();
|
||||
SimpleResponse goldenResponse = SimpleResponse.newBuilder()
|
||||
.setResponseMessage("Hi, xDS! Authority= test-server")
|
||||
.build();
|
||||
|
|
@ -160,6 +166,100 @@ public class FakeControlPlaneXdsIntegrationTest {
|
|||
}
|
||||
}
|
||||
|
||||
// Try to trigger "UNAVAILABLE: CDS encountered error: unable to find available subchannel for
|
||||
// cluster cluster:cluster1" race, if XdsNameResolver updates its ConfigSelector before
|
||||
// cluster_manager config.
|
||||
@Test
|
||||
public void changeClusterForRoute() throws Exception {
|
||||
// Start with route to cluster0
|
||||
InetSocketAddress edsInetSocketAddress
|
||||
= (InetSocketAddress) dataPlane.getServer().getListenSockets().get(0);
|
||||
controlPlane.getService().setXdsConfig(
|
||||
ADS_TYPE_URL_EDS,
|
||||
ImmutableMap.of(
|
||||
"eds-service-0",
|
||||
ControlPlaneRule.buildClusterLoadAssignment(
|
||||
edsInetSocketAddress.getHostName(), "", edsInetSocketAddress.getPort(),
|
||||
"eds-service-0"),
|
||||
"eds-service-1",
|
||||
ControlPlaneRule.buildClusterLoadAssignment(
|
||||
edsInetSocketAddress.getHostName(), "", edsInetSocketAddress.getPort(),
|
||||
"eds-service-1")));
|
||||
controlPlane.getService().setXdsConfig(
|
||||
ADS_TYPE_URL_CDS,
|
||||
ImmutableMap.of(
|
||||
"cluster0",
|
||||
ControlPlaneRule.buildCluster("cluster0", "eds-service-0"),
|
||||
"cluster1",
|
||||
ControlPlaneRule.buildCluster("cluster1", "eds-service-1")));
|
||||
controlPlane.setRdsConfig(RouteConfiguration.newBuilder()
|
||||
.setName("route-config.googleapis.com")
|
||||
.addVirtualHosts(VirtualHost.newBuilder()
|
||||
.addDomains("test-server")
|
||||
.addRoutes(Route.newBuilder()
|
||||
.setMatch(RouteMatch.newBuilder().setPrefix("/").build())
|
||||
.setRoute(RouteAction.newBuilder().setCluster("cluster0").build())
|
||||
.build())
|
||||
.build())
|
||||
.build());
|
||||
|
||||
class ClusterClientStreamTracer extends ClientStreamTracer {
|
||||
boolean usedCluster1;
|
||||
|
||||
@Override
|
||||
public void addOptionalLabel(String key, String value) {
|
||||
if ("grpc.lb.backend_service".equals(key)) {
|
||||
usedCluster1 = "cluster1".equals(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ClusterClientStreamTracer tracer = new ClusterClientStreamTracer();
|
||||
ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
|
||||
@Override
|
||||
public ClientStreamTracer newClientStreamTracer(
|
||||
ClientStreamTracer.StreamInfo info, Metadata headers) {
|
||||
return tracer;
|
||||
}
|
||||
};
|
||||
ClientInterceptor tracerInterceptor = new ClientInterceptor() {
|
||||
@Override
|
||||
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
|
||||
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
|
||||
return next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
|
||||
}
|
||||
};
|
||||
SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc
|
||||
.newBlockingStub(dataPlane.getManagedChannel())
|
||||
.withInterceptors(tracerInterceptor);
|
||||
SimpleRequest request = SimpleRequest.getDefaultInstance();
|
||||
SimpleResponse goldenResponse = SimpleResponse.newBuilder()
|
||||
.setResponseMessage("Hi, xDS! Authority= test-server")
|
||||
.build();
|
||||
assertThat(stub.unaryRpc(request)).isEqualTo(goldenResponse);
|
||||
assertThat(tracer.usedCluster1).isFalse();
|
||||
|
||||
// Check for errors when swapping route to cluster1
|
||||
controlPlane.setRdsConfig(RouteConfiguration.newBuilder()
|
||||
.setName("route-config.googleapis.com")
|
||||
.addVirtualHosts(VirtualHost.newBuilder()
|
||||
.addDomains("test-server")
|
||||
.addRoutes(Route.newBuilder()
|
||||
.setMatch(RouteMatch.newBuilder().setPrefix("/").build())
|
||||
.setRoute(RouteAction.newBuilder().setCluster("cluster1").build())
|
||||
.build())
|
||||
.build())
|
||||
.build());
|
||||
|
||||
for (int j = 0; j < 10; j++) {
|
||||
stub.unaryRpc(request);
|
||||
if (tracer.usedCluster1) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertThat(tracer.usedCluster1).isTrue();
|
||||
}
|
||||
|
||||
// Captures response headers from the server.
|
||||
private static class ResponseHeaderClientInterceptor implements ClientInterceptor {
|
||||
Metadata reponseHeaders;
|
||||
|
|
@ -199,8 +299,7 @@ public class FakeControlPlaneXdsIntegrationTest {
|
|||
ManagedChannel channel = dataPlane.getManagedChannel();
|
||||
SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub = SimpleServiceGrpc.newBlockingStub(
|
||||
channel);
|
||||
SimpleRequest request = SimpleRequest.newBuilder()
|
||||
.build();
|
||||
SimpleRequest request = SimpleRequest.getDefaultInstance();
|
||||
SimpleResponse goldenResponse = SimpleResponse.newBuilder()
|
||||
.setResponseMessage("Hi, xDS! Authority= test-server")
|
||||
.build();
|
||||
|
|
@ -231,8 +330,7 @@ public class FakeControlPlaneXdsIntegrationTest {
|
|||
ManagedChannel channel = dataPlane.getManagedChannel();
|
||||
SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub = SimpleServiceGrpc.newBlockingStub(
|
||||
channel);
|
||||
SimpleRequest request = SimpleRequest.newBuilder()
|
||||
.build();
|
||||
SimpleRequest request = SimpleRequest.getDefaultInstance();
|
||||
SimpleResponse goldenResponse = SimpleResponse.newBuilder()
|
||||
.setResponseMessage("Hi, xDS! Authority= localhost:" + serverAddress.getPort())
|
||||
.build();
|
||||
|
|
|
|||
|
|
@ -413,7 +413,7 @@ public class XdsNameResolverTest {
|
|||
Collections.singletonList(route1),
|
||||
ImmutableMap.of());
|
||||
xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
assertServiceConfigForLoadBalancingConfig(
|
||||
Collections.singletonList(cluster1),
|
||||
(Map<String, ?>) resolutionResultCaptor.getValue().getServiceConfig().getConfig());
|
||||
|
|
@ -432,7 +432,7 @@ public class XdsNameResolverTest {
|
|||
// Two new service config updates triggered:
|
||||
// - with load balancing config being able to select cluster1 and cluster2
|
||||
// - with load balancing config being able to select cluster2 only
|
||||
verify(mockListener, times(2)).onResult(resultCaptor.capture());
|
||||
verify(mockListener, times(2)).onResult2(resultCaptor.capture());
|
||||
assertServiceConfigForLoadBalancingConfig(
|
||||
Arrays.asList(cluster1, cluster2),
|
||||
(Map<String, ?>) resultCaptor.getAllValues().get(0).getServiceConfig().getConfig());
|
||||
|
|
@ -467,7 +467,7 @@ public class XdsNameResolverTest {
|
|||
Collections.singletonList(route),
|
||||
ImmutableMap.of());
|
||||
xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
assertServiceConfigForLoadBalancingConfig(
|
||||
Collections.singletonList(cluster1),
|
||||
(Map<String, ?>) resolutionResultCaptor.getValue().getServiceConfig().getConfig());
|
||||
|
|
@ -483,7 +483,7 @@ public class XdsNameResolverTest {
|
|||
verifyNoInteractions(mockListener);
|
||||
assertThat(xdsClient.rdsResource).isEqualTo(RDS_RESOURCE_NAME);
|
||||
xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
assertServiceConfigForLoadBalancingConfig(
|
||||
Collections.singletonList(cluster1),
|
||||
(Map<String, ?>) resolutionResultCaptor.getValue().getServiceConfig().getConfig());
|
||||
|
|
@ -506,7 +506,7 @@ public class XdsNameResolverTest {
|
|||
Collections.singletonList(route),
|
||||
ImmutableMap.of());
|
||||
xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
assertServiceConfigForLoadBalancingConfig(
|
||||
Collections.singletonList(cluster1),
|
||||
(Map<String, ?>) resolutionResultCaptor.getValue().getServiceConfig().getConfig());
|
||||
|
|
@ -518,7 +518,7 @@ public class XdsNameResolverTest {
|
|||
// Simulate management server adds back the previously used RDS resource.
|
||||
reset(mockListener);
|
||||
xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
assertServiceConfigForLoadBalancingConfig(
|
||||
Collections.singletonList(cluster1),
|
||||
(Map<String, ?>) resolutionResultCaptor.getValue().getServiceConfig().getConfig());
|
||||
|
|
@ -585,7 +585,7 @@ public class XdsNameResolverTest {
|
|||
resolver.start(mockListener);
|
||||
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
|
||||
xdsClient.deliverLdsUpdate(0L, Arrays.asList(virtualHost));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
assertServiceConfigForLoadBalancingConfig(
|
||||
Collections.singletonList(cluster1),
|
||||
(Map<String, ?>) resolutionResultCaptor.getValue().getServiceConfig().getConfig());
|
||||
|
|
@ -671,7 +671,7 @@ public class XdsNameResolverTest {
|
|||
Collections.singletonList(AUTHORITY), Collections.singletonList(route),
|
||||
ImmutableMap.of());
|
||||
xdsClient.deliverLdsUpdate(0L, Collections.singletonList(virtualHost));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
assertCallSelectClusterResult(call1, configSelector, cluster1, null);
|
||||
|
|
@ -690,7 +690,7 @@ public class XdsNameResolverTest {
|
|||
ImmutableMap.of());
|
||||
xdsClient.deliverLdsUpdate(TimeUnit.SECONDS.toNanos(5L),
|
||||
Collections.singletonList(virtualHost));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
assertCallSelectClusterResult(call1, configSelector, cluster1, 5.0);
|
||||
|
|
@ -719,7 +719,7 @@ public class XdsNameResolverTest {
|
|||
retryPolicy,
|
||||
false),
|
||||
ImmutableMap.of())));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
Result selectResult = configSelector.selectConfig(
|
||||
|
|
@ -777,7 +777,7 @@ public class XdsNameResolverTest {
|
|||
RouteAction.forCluster(cluster2, Collections.emptyList(),
|
||||
TimeUnit.SECONDS.toNanos(15L), null, false),
|
||||
ImmutableMap.of())));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
assertThat(result.getAddressesOrError().getValue()).isEmpty();
|
||||
assertServiceConfigForLoadBalancingConfig(
|
||||
|
|
@ -814,7 +814,7 @@ public class XdsNameResolverTest {
|
|||
null,
|
||||
false),
|
||||
ImmutableMap.of())));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
InternalConfigSelector configSelector =
|
||||
resolutionResultCaptor.getValue().getAttributes().get(InternalConfigSelector.KEY);
|
||||
|
||||
|
|
@ -850,7 +850,7 @@ public class XdsNameResolverTest {
|
|||
null,
|
||||
false),
|
||||
ImmutableMap.of())));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
InternalConfigSelector configSelector =
|
||||
resolutionResultCaptor.getValue().getAttributes().get(InternalConfigSelector.KEY);
|
||||
|
||||
|
|
@ -890,7 +890,7 @@ public class XdsNameResolverTest {
|
|||
null,
|
||||
false),
|
||||
ImmutableMap.of())));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
InternalConfigSelector configSelector =
|
||||
resolutionResultCaptor.getValue().getAttributes().get(InternalConfigSelector.KEY);
|
||||
|
||||
|
|
@ -928,7 +928,7 @@ public class XdsNameResolverTest {
|
|||
null,
|
||||
false),
|
||||
ImmutableMap.of())));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
configSelector = resolutionResultCaptor.getValue().getAttributes().get(
|
||||
InternalConfigSelector.KEY);
|
||||
|
||||
|
|
@ -962,7 +962,7 @@ public class XdsNameResolverTest {
|
|||
null,
|
||||
true),
|
||||
ImmutableMap.of())));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
InternalConfigSelector configSelector =
|
||||
resolutionResultCaptor.getValue().getAttributes().get(InternalConfigSelector.KEY);
|
||||
|
||||
|
|
@ -993,7 +993,7 @@ public class XdsNameResolverTest {
|
|||
null,
|
||||
false),
|
||||
ImmutableMap.of())));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
InternalConfigSelector configSelector =
|
||||
resolutionResultCaptor.getValue().getAttributes().get(InternalConfigSelector.KEY);
|
||||
|
||||
|
|
@ -1027,7 +1027,7 @@ public class XdsNameResolverTest {
|
|||
cluster2, Collections.emptyList(), TimeUnit.SECONDS.toNanos(15L),
|
||||
null, false),
|
||||
ImmutableMap.of())));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
// Updated service config still contains cluster1 while it is removed resource. New calls no
|
||||
// longer routed to cluster1.
|
||||
|
|
@ -1039,7 +1039,7 @@ public class XdsNameResolverTest {
|
|||
assertCallSelectClusterResult(call1, configSelector, "another-cluster", 20.0);
|
||||
|
||||
firstCall.deliverErrorStatus(); // completes previous call
|
||||
verify(mockListener, times(2)).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener, times(2)).onResult2(resolutionResultCaptor.capture());
|
||||
result = resolutionResultCaptor.getValue();
|
||||
assertServiceConfigForLoadBalancingConfig(
|
||||
Arrays.asList(cluster2, "another-cluster"),
|
||||
|
|
@ -1069,7 +1069,7 @@ public class XdsNameResolverTest {
|
|||
ImmutableMap.of())));
|
||||
// Two consecutive service config updates: one for removing clcuster1,
|
||||
// one for adding "another=cluster".
|
||||
verify(mockListener, times(2)).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener, times(2)).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
assertServiceConfigForLoadBalancingConfig(
|
||||
Arrays.asList(cluster2, "another-cluster"),
|
||||
|
|
@ -1104,7 +1104,7 @@ public class XdsNameResolverTest {
|
|||
TimeUnit.SECONDS.toNanos(15L), null, false),
|
||||
ImmutableMap.of())));
|
||||
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
assertServiceConfigForLoadBalancingConfig(
|
||||
Arrays.asList(cluster1, cluster2, "another-cluster"),
|
||||
|
|
@ -1179,7 +1179,7 @@ public class XdsNameResolverTest {
|
|||
TimeUnit.SECONDS.toNanos(20L),
|
||||
null, false),
|
||||
ImmutableMap.of())));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
assertThat(result.getAddressesOrError().getValue()).isEmpty();
|
||||
assertServiceConfigForLoadBalancingConfig(
|
||||
|
|
@ -1208,7 +1208,7 @@ public class XdsNameResolverTest {
|
|||
TimeUnit.SECONDS.toNanos(20L),
|
||||
null, false),
|
||||
ImmutableMap.of())));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
assertThat(result.getAddressesOrError().getValue()).isEmpty();
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
@ -1256,7 +1256,7 @@ public class XdsNameResolverTest {
|
|||
TimeUnit.SECONDS.toNanos(30L),
|
||||
null, false),
|
||||
ImmutableMap.of())));
|
||||
verify(mockListener, times(2)).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener, times(2)).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result2 = resolutionResultCaptor.getValue();
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, ?> resultServiceConfig2 = (Map<String, ?>) result2.getServiceConfig().getConfig();
|
||||
|
|
@ -1599,7 +1599,7 @@ public class XdsNameResolverTest {
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void assertEmptyResolutionResult(String resource) {
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
assertThat(result.getAddressesOrError().getValue()).isEmpty();
|
||||
assertThat((Map<String, ?>) result.getServiceConfig().getConfig()).isEmpty();
|
||||
|
|
@ -1611,7 +1611,7 @@ public class XdsNameResolverTest {
|
|||
}
|
||||
|
||||
private void assertClusterResolutionResult(CallInfo call, String expectedCluster) {
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
assertCallSelectClusterResult(call, configSelector, expectedCluster, null);
|
||||
|
|
@ -1685,7 +1685,7 @@ public class XdsNameResolverTest {
|
|||
cluster2, Collections.emptyList(), TimeUnit.SECONDS.toNanos(15L),
|
||||
null, false),
|
||||
ImmutableMap.of())));
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
assertThat(result.getAddressesOrError().getValue()).isEmpty();
|
||||
assertServiceConfigForLoadBalancingConfig(
|
||||
|
|
@ -1763,7 +1763,7 @@ public class XdsNameResolverTest {
|
|||
ImmutableMap.of());
|
||||
xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost));
|
||||
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
String expectedServiceConfigJson =
|
||||
"{\n"
|
||||
+ " \"loadBalancingConfig\": [{\n"
|
||||
|
|
@ -1946,7 +1946,7 @@ public class XdsNameResolverTest {
|
|||
FaultAbort.forHeader(FaultConfig.FractionalPercent.perHundred(70)),
|
||||
null);
|
||||
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
// no header abort key provided in metadata, rpc should succeed
|
||||
|
|
@ -1985,7 +1985,7 @@ public class XdsNameResolverTest {
|
|||
FaultAbort.forHeader(FaultConfig.FractionalPercent.perMillion(600_000)),
|
||||
null);
|
||||
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
result = resolutionResultCaptor.getValue();
|
||||
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
|
||||
|
|
@ -2001,7 +2001,7 @@ public class XdsNameResolverTest {
|
|||
FaultAbort.forHeader(FaultConfig.FractionalPercent.perMillion(0)),
|
||||
null);
|
||||
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
result = resolutionResultCaptor.getValue();
|
||||
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
|
||||
|
|
@ -2016,7 +2016,7 @@ public class XdsNameResolverTest {
|
|||
FaultConfig.FractionalPercent.perMillion(600_000)),
|
||||
null);
|
||||
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
result = resolutionResultCaptor.getValue();
|
||||
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
|
||||
|
|
@ -2034,7 +2034,7 @@ public class XdsNameResolverTest {
|
|||
FaultConfig.FractionalPercent.perMillion(400_000)),
|
||||
null);
|
||||
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
result = resolutionResultCaptor.getValue();
|
||||
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
|
||||
|
|
@ -2052,7 +2052,7 @@ public class XdsNameResolverTest {
|
|||
FaultConfig httpFilterFaultConfig = FaultConfig.create(
|
||||
FaultDelay.forHeader(FaultConfig.FractionalPercent.perHundred(70)), null, null);
|
||||
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
// no header delay key provided in metadata, rpc should succeed immediately
|
||||
|
|
@ -2069,7 +2069,7 @@ public class XdsNameResolverTest {
|
|||
httpFilterFaultConfig = FaultConfig.create(
|
||||
FaultDelay.forHeader(FaultConfig.FractionalPercent.perMillion(600_000)), null, null);
|
||||
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
result = resolutionResultCaptor.getValue();
|
||||
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
|
||||
|
|
@ -2080,7 +2080,7 @@ public class XdsNameResolverTest {
|
|||
httpFilterFaultConfig = FaultConfig.create(
|
||||
FaultDelay.forHeader(FaultConfig.FractionalPercent.perMillion(0)), null, null);
|
||||
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
result = resolutionResultCaptor.getValue();
|
||||
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
|
||||
|
|
@ -2093,7 +2093,7 @@ public class XdsNameResolverTest {
|
|||
null,
|
||||
null);
|
||||
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
result = resolutionResultCaptor.getValue();
|
||||
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
|
||||
|
|
@ -2106,7 +2106,7 @@ public class XdsNameResolverTest {
|
|||
null,
|
||||
null);
|
||||
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
result = resolutionResultCaptor.getValue();
|
||||
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
|
||||
|
|
@ -2125,7 +2125,7 @@ public class XdsNameResolverTest {
|
|||
null,
|
||||
/* maxActiveFaults= */ 1);
|
||||
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
|
||||
|
|
@ -2155,7 +2155,7 @@ public class XdsNameResolverTest {
|
|||
null,
|
||||
null);
|
||||
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
|
||||
|
|
@ -2187,7 +2187,7 @@ public class XdsNameResolverTest {
|
|||
FaultConfig.FractionalPercent.perMillion(1000_000)),
|
||||
null);
|
||||
xdsClient.deliverLdsUpdateWithFaultInjection(cluster1, httpFilterFaultConfig, null, null, null);
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
ClientCall.Listener<Void> observer = startNewCall(TestMethodDescriptors.voidMethod(),
|
||||
|
|
@ -2216,7 +2216,7 @@ public class XdsNameResolverTest {
|
|||
null);
|
||||
xdsClient.deliverLdsUpdateWithFaultInjection(
|
||||
cluster1, httpFilterFaultConfig, virtualHostFaultConfig, null, null);
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
ClientCall.Listener<Void> observer = startNewCall(TestMethodDescriptors.voidMethod(),
|
||||
|
|
@ -2231,7 +2231,7 @@ public class XdsNameResolverTest {
|
|||
null);
|
||||
xdsClient.deliverLdsUpdateWithFaultInjection(
|
||||
cluster1, httpFilterFaultConfig, virtualHostFaultConfig, routeFaultConfig, null);
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
result = resolutionResultCaptor.getValue();
|
||||
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
|
||||
|
|
@ -2248,7 +2248,7 @@ public class XdsNameResolverTest {
|
|||
xdsClient.deliverLdsUpdateWithFaultInjection(
|
||||
cluster1, httpFilterFaultConfig, virtualHostFaultConfig, routeFaultConfig,
|
||||
weightedClusterFaultConfig);
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
result = resolutionResultCaptor.getValue();
|
||||
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
|
||||
|
|
@ -2277,7 +2277,7 @@ public class XdsNameResolverTest {
|
|||
FaultAbort.forStatus(Status.UNKNOWN, FaultConfig.FractionalPercent.perMillion(1000_000)),
|
||||
null);
|
||||
xdsClient.deliverRdsUpdateWithFaultInjection(RDS_RESOURCE_NAME, null, routeFaultConfig, null);
|
||||
verify(mockListener).onResult(resolutionResultCaptor.capture());
|
||||
verify(mockListener).onResult2(resolutionResultCaptor.capture());
|
||||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
ClientCall.Listener<Void> observer = startNewCall(TestMethodDescriptors.voidMethod(),
|
||||
|
|
|
|||
Loading…
Reference in New Issue