mirror of https://github.com/grpc/grpc-java.git
xds: eliminate unnecessary caching mechanism for RDS (#6510)
A gRPC channel will only ever be interested in a single Listener. So each RDS request will request for at most one resource. By design, server is required to always send back client's newly requested resources, so client will always receive the RDS resource (if exists) after the request was sent. Therefore, client does not need to cache anything.
This commit is contained in:
parent
ada575dd24
commit
074cb73702
|
|
@ -91,13 +91,6 @@ final class XdsClientImpl extends XdsClient {
|
|||
// more than once.
|
||||
private final Node node;
|
||||
|
||||
// Cached data for RDS responses, keyed by RouteConfiguration names.
|
||||
// LDS responses indicate absence of RouteConfigurations and RDS responses indicate presence
|
||||
// of RouteConfigurations.
|
||||
// Optimization: only cache clusterName field in the RouteConfiguration messages of RDS
|
||||
// responses.
|
||||
private final Map<String, String> routeConfigNamesToClusterNames = new HashMap<>();
|
||||
|
||||
// Cached data for CDS responses, keyed by cluster names.
|
||||
// Optimization: cache ClusterUpdate, which contains only information needed by gRPC, instead
|
||||
// of whole Cluster messages to reduce memory usage.
|
||||
|
|
@ -379,14 +372,12 @@ final class XdsClientImpl extends XdsClient {
|
|||
return;
|
||||
}
|
||||
|
||||
// Unpack HttpConnectionManager messages
|
||||
// Unpack HttpConnectionManager messages.
|
||||
HttpConnectionManager requestedHttpConnManager = null;
|
||||
List<HttpConnectionManager> httpConnectionManagers = new ArrayList<>();
|
||||
try {
|
||||
for (Listener listener : listeners) {
|
||||
HttpConnectionManager hm =
|
||||
listener.getApiListener().getApiListener().unpack(HttpConnectionManager.class);
|
||||
httpConnectionManagers.add(hm);
|
||||
if (listener.getName().equals(ldsResourceName)) {
|
||||
requestedHttpConnManager = hm;
|
||||
}
|
||||
|
|
@ -398,33 +389,17 @@ final class XdsClientImpl extends XdsClient {
|
|||
}
|
||||
|
||||
String errorMessage = null;
|
||||
// All RouteConfigurations referenced by this LDS response, either in in-lined
|
||||
// RouteConfiguration message or in RDS config.
|
||||
Set<String> routeConfigs = new HashSet<>();
|
||||
for (HttpConnectionManager hm : httpConnectionManagers) {
|
||||
// The HttpConnectionManager message must either provide the RouteConfiguration directly
|
||||
// in-line or tell the client to use RDS to obtain it.
|
||||
if (hm.hasRouteConfig()) {
|
||||
routeConfigs.add(hm.getRouteConfig().getName());
|
||||
} else if (hm.hasRds()) {
|
||||
Rds rds = hm.getRds();
|
||||
if (!rds.getConfigSource().hasAds()) {
|
||||
errorMessage = "For using RDS, it must be set to use ADS.";
|
||||
break;
|
||||
}
|
||||
routeConfigs.add(rds.getRouteConfigName());
|
||||
} else {
|
||||
errorMessage = "HttpConnectionManager message must either provide the "
|
||||
+ "RouteConfiguration directly in-line or tell the client to use RDS to obtain it.";
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Field clusterName found in the in-lined RouteConfiguration, if exists.
|
||||
String clusterName = null;
|
||||
// RouteConfiguration name to be used as the resource name for RDS request.
|
||||
// RouteConfiguration name to be used as the resource name for RDS request, if exists.
|
||||
String rdsRouteConfigName = null;
|
||||
if (errorMessage == null && requestedHttpConnManager != null) {
|
||||
// Process the requested Listener if exists, either extract cluster information from in-lined
|
||||
// RouteConfiguration message or send an RDS request for dynamic resolution.
|
||||
if (requestedHttpConnManager != null) {
|
||||
// The HttpConnectionManager message must either provide the RouteConfiguration directly
|
||||
// in-line or tell the client to use RDS to obtain it.
|
||||
// TODO(chengyuanzhang): if both route_config and rds are set, it should be either invalid
|
||||
// data or one supersedes the other. TBD.
|
||||
if (requestedHttpConnManager.hasRouteConfig()) {
|
||||
RouteConfiguration rc = requestedHttpConnManager.getRouteConfig();
|
||||
clusterName = processRouteConfig(rc);
|
||||
|
|
@ -434,9 +409,15 @@ final class XdsClientImpl extends XdsClient {
|
|||
}
|
||||
} else if (requestedHttpConnManager.hasRds()) {
|
||||
Rds rds = requestedHttpConnManager.getRds();
|
||||
rdsRouteConfigName = rds.getRouteConfigName();
|
||||
if (!rds.getConfigSource().hasAds()) {
|
||||
errorMessage = "For using RDS, it must be set to use ADS.";
|
||||
} else {
|
||||
rdsRouteConfigName = rds.getRouteConfigName();
|
||||
}
|
||||
} else {
|
||||
errorMessage = "HttpConnectionManager message must either provide the "
|
||||
+ "RouteConfiguration directly in-line or tell the client to use RDS to obtain it.";
|
||||
}
|
||||
// Else impossible as we have already validated all HttpConnectionManager messages.
|
||||
}
|
||||
|
||||
if (errorMessage != null) {
|
||||
|
|
@ -446,32 +427,13 @@ final class XdsClientImpl extends XdsClient {
|
|||
adsStream.sendAckRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName),
|
||||
ldsResponse.getVersionInfo());
|
||||
|
||||
// Remove RDS cache entries for RouteConfigurations not referenced by this LDS response.
|
||||
// LDS responses represents the state of the world, RouteConfigurations not referenced
|
||||
// by this LDS response are those no longer exist.
|
||||
routeConfigNamesToClusterNames.keySet().retainAll(routeConfigs);
|
||||
|
||||
// Process the requested Listener if exists, either extract cluster information from in-lined
|
||||
// RouteConfiguration message or send an RDS request for dynamic resolution.
|
||||
if (clusterName != null) {
|
||||
// Found clusterName in the in-lined RouteConfiguration.
|
||||
ConfigUpdate configUpdate = ConfigUpdate.newBuilder().setClusterName(clusterName).build();
|
||||
configWatcher.onConfigChanged(configUpdate);
|
||||
} else if (rdsRouteConfigName != null) {
|
||||
// Update the RDS resource we wish to request. An RDS request may not be necessary, but
|
||||
// we need to keep what we request updated in case of notifying watcher upon receiving
|
||||
// an RDS response for updating the requested resource.
|
||||
adsStream.rdsResourceName = rdsRouteConfigName;
|
||||
// First look up the RDS cache to see if we had received an RDS response containing the
|
||||
// desired RouteConfiguration previously. Otherwise, send an RDS request for dynamic
|
||||
// resolution.
|
||||
if (routeConfigNamesToClusterNames.containsKey(rdsRouteConfigName)) {
|
||||
ConfigUpdate configUpdate =
|
||||
ConfigUpdate.newBuilder()
|
||||
.setClusterName(routeConfigNamesToClusterNames.get(rdsRouteConfigName))
|
||||
.build();
|
||||
configWatcher.onConfigChanged(configUpdate);
|
||||
} else {
|
||||
// Send an RDS request if the resource to request has changed.
|
||||
if (!rdsRouteConfigName.equals(adsStream.rdsResourceName)) {
|
||||
adsStream.sendXdsRequest(ADS_TYPE_URL_RDS, ImmutableList.of(rdsRouteConfigName));
|
||||
}
|
||||
} else {
|
||||
|
|
@ -494,10 +456,13 @@ final class XdsClientImpl extends XdsClient {
|
|||
"Never requested for RDS resources, management server is doing something wrong");
|
||||
|
||||
// Unpack RouteConfiguration messages.
|
||||
List<RouteConfiguration> routeConfigs = new ArrayList<>(rdsResponse.getResourcesCount());
|
||||
RouteConfiguration requestedRouteConfig = null;
|
||||
try {
|
||||
for (com.google.protobuf.Any res : rdsResponse.getResourcesList()) {
|
||||
routeConfigs.add(res.unpack(RouteConfiguration.class));
|
||||
RouteConfiguration rc = res.unpack(RouteConfiguration.class);
|
||||
if (rc.getName().equals(adsStream.rdsResourceName)) {
|
||||
requestedRouteConfig = rc;
|
||||
}
|
||||
}
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
adsStream.sendNackRequest(ADS_TYPE_URL_RDS, ImmutableList.of(adsStream.rdsResourceName),
|
||||
|
|
@ -505,30 +470,25 @@ final class XdsClientImpl extends XdsClient {
|
|||
return;
|
||||
}
|
||||
|
||||
// Validate and cache information from each RouteConfiguration message.
|
||||
Map<String, String> clusterNames = new HashMap<>();
|
||||
for (RouteConfiguration routeConfig : routeConfigs) {
|
||||
String clusterName = processRouteConfig(routeConfig);
|
||||
// Resolved cluster name for the requested resource, if exists.
|
||||
String clusterName = null;
|
||||
if (requestedRouteConfig != null) {
|
||||
clusterName = processRouteConfig(requestedRouteConfig);
|
||||
if (clusterName == null) {
|
||||
adsStream.sendNackRequest(ADS_TYPE_URL_RDS, ImmutableList.of(adsStream.rdsResourceName),
|
||||
"Cannot find a valid cluster name in VirtualHost inside "
|
||||
+ "RouteConfiguration with domains matching: " + hostName + ".");
|
||||
return;
|
||||
}
|
||||
clusterNames.put(routeConfig.getName(), clusterName);
|
||||
}
|
||||
routeConfigNamesToClusterNames.putAll(clusterNames);
|
||||
|
||||
adsStream.sendAckRequest(ADS_TYPE_URL_RDS, ImmutableList.of(adsStream.rdsResourceName),
|
||||
rdsResponse.getVersionInfo());
|
||||
|
||||
// Notify the ConfigWatcher if this RDS response contains the most recently requested
|
||||
// RDS resource.
|
||||
if (clusterNames.containsKey(adsStream.rdsResourceName)) {
|
||||
ConfigUpdate configUpdate =
|
||||
ConfigUpdate.newBuilder()
|
||||
.setClusterName(clusterNames.get(adsStream.rdsResourceName))
|
||||
.build();
|
||||
if (clusterName != null) {
|
||||
ConfigUpdate configUpdate = ConfigUpdate.newBuilder().setClusterName(clusterName).build();
|
||||
configWatcher.onConfigChanged(configUpdate);
|
||||
}
|
||||
// Do not notify an error to the ConfigWatcher. RDS protocol is incremental, not receiving
|
||||
|
|
|
|||
|
|
@ -931,152 +931,6 @@ public class XdsClientImplTest {
|
|||
.isEqualTo("another-cluster.googleapis.com");
|
||||
}
|
||||
|
||||
/**
|
||||
* Client receives RDS responses containing RouteConfigurations for resources that were
|
||||
* not requested (management server sends them proactively). Later client receives an LDS
|
||||
* response with the requested Listener containing Rds config pointing to do RDS for one of
|
||||
* the previously received RouteConfigurations. No RDS request needs to be sent for that
|
||||
* RouteConfiguration as it can be found in local cache (management server will not send
|
||||
* RDS responses for that RouteConfiguration again). A future RDS response update for
|
||||
* that RouteConfiguration should be notified to config watcher.
|
||||
*
|
||||
* <p>Tests for caching RDS response data behavior.
|
||||
*/
|
||||
@Test
|
||||
public void receiveRdsResponsesForRouteConfigurationsToBeUsedLater() {
|
||||
xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
|
||||
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
|
||||
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
|
||||
|
||||
// Client sends an LDS request for the host name (with port) to management server.
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(NODE, "", "foo.googleapis.com:8080",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
|
||||
|
||||
// Management sends back an LDS response telling client to do RDS.
|
||||
Rds rdsConfig =
|
||||
Rds.newBuilder()
|
||||
// Must set to use ADS.
|
||||
.setConfigSource(
|
||||
ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
|
||||
.setRouteConfigName("route-foo1.googleapis.com")
|
||||
.build();
|
||||
|
||||
List<Any> listeners = ImmutableList.of(
|
||||
Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
|
||||
Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build())))
|
||||
);
|
||||
DiscoveryResponse response =
|
||||
buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
|
||||
responseObserver.onNext(response);
|
||||
|
||||
// Client sends an ACK LDS request.
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(NODE, "0", "foo.googleapis.com:8080",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "0000")));
|
||||
|
||||
// Client sends an (first) RDS request.
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(NODE, "", "route-foo1.googleapis.com",
|
||||
XdsClientImpl.ADS_TYPE_URL_RDS, "")));
|
||||
|
||||
// Management server sends back an RDS response containing RouteConfigurations
|
||||
// more than requested.
|
||||
List<Any> routeConfigs = ImmutableList.of(
|
||||
// Currently wanted resource.
|
||||
Any.pack(
|
||||
buildRouteConfiguration(
|
||||
"route-foo1.googleapis.com",
|
||||
ImmutableList.of(
|
||||
buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
|
||||
"cluster1.googleapis.com")))),
|
||||
// Resources currently not wanted.
|
||||
Any.pack(
|
||||
buildRouteConfiguration(
|
||||
"route-foo2.googleapis.com",
|
||||
ImmutableList.of(
|
||||
buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
|
||||
"cluster2.googleapis.com")))),
|
||||
Any.pack(
|
||||
buildRouteConfiguration(
|
||||
"route-foo3.googleapis.com",
|
||||
ImmutableList.of(
|
||||
buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
|
||||
"cluster3.googleapis.com")))));
|
||||
response = buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000");
|
||||
responseObserver.onNext(response);
|
||||
|
||||
// Client sent an ACK RDS request.
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(NODE, "0", "route-foo1.googleapis.com",
|
||||
XdsClientImpl.ADS_TYPE_URL_RDS, "0000")));
|
||||
|
||||
// Resolved cluster name is notified to config watcher.
|
||||
ArgumentCaptor<ConfigUpdate> configUpdateCaptor = ArgumentCaptor.forClass(null);
|
||||
verify(configWatcher).onConfigChanged(configUpdateCaptor.capture());
|
||||
assertThat(configUpdateCaptor.getValue().getClusterName()).isEqualTo("cluster1.googleapis.com");
|
||||
|
||||
// Management server sends back another LDS response containing updates for the requested
|
||||
// Listener and telling client to do RDS for a RouteConfiguration which had previously
|
||||
// sent to client.
|
||||
rdsConfig =
|
||||
Rds.newBuilder()
|
||||
// Must set to use ADS.
|
||||
.setConfigSource(
|
||||
ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
|
||||
.setRouteConfigName("route-foo2.googleapis.com")
|
||||
.build();
|
||||
|
||||
listeners = ImmutableList.of(
|
||||
Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
|
||||
Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build())))
|
||||
);
|
||||
response = buildDiscoveryResponse("1", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0001");
|
||||
responseObserver.onNext(response);
|
||||
|
||||
// Client sent an ACK LDS request.
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(NODE, "1", "foo.googleapis.com:8080",
|
||||
XdsClientImpl.ADS_TYPE_URL_LDS, "0001")));
|
||||
|
||||
// Updated cluster name is notified to config watcher.
|
||||
configUpdateCaptor = ArgumentCaptor.forClass(null);
|
||||
verify(configWatcher, times(2)).onConfigChanged(configUpdateCaptor.capture());
|
||||
assertThat(configUpdateCaptor.getValue().getClusterName()).isEqualTo("cluster2.googleapis.com");
|
||||
|
||||
// At this time, no RDS request is sent as the result can be found in local cache (even if
|
||||
// a request is sent for it, management server does not necessarily reply).
|
||||
verify(requestObserver, times(0))
|
||||
.onNext(eq(buildDiscoveryRequest(NODE, "0", "route-foo.googleapis.com",
|
||||
XdsClientImpl.ADS_TYPE_URL_RDS, "0000")));
|
||||
|
||||
verifyNoMoreInteractions(requestObserver);
|
||||
|
||||
// Management server sends back another RDS response containing updates for the
|
||||
// RouteConfiguration that the client was pointed to most recently (i.e.,
|
||||
// "route-foo2.googleapis.com").
|
||||
routeConfigs = ImmutableList.of(
|
||||
Any.pack(
|
||||
buildRouteConfiguration(
|
||||
"route-foo2.googleapis.com",
|
||||
ImmutableList.of(
|
||||
buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
|
||||
"a-new-cluster.googleapis.com")))));
|
||||
response = buildDiscoveryResponse("1", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0001");
|
||||
responseObserver.onNext(response);
|
||||
|
||||
// Client sent an ACK RDS request.
|
||||
verify(requestObserver)
|
||||
.onNext(eq(buildDiscoveryRequest(NODE, "1", "route-foo2.googleapis.com",
|
||||
XdsClientImpl.ADS_TYPE_URL_RDS, "0001")));
|
||||
|
||||
// Updated cluster name is notified to config watcher.
|
||||
configUpdateCaptor = ArgumentCaptor.forClass(null);
|
||||
verify(configWatcher, times(3)).onConfigChanged(configUpdateCaptor.capture());
|
||||
assertThat(configUpdateCaptor.getValue().getClusterName())
|
||||
.isEqualTo("a-new-cluster.googleapis.com");
|
||||
}
|
||||
|
||||
/**
|
||||
* An RouteConfiguration is removed by server by sending client an LDS response removing the
|
||||
* corresponding Listener.
|
||||
|
|
|
|||
Loading…
Reference in New Issue