mirror of https://github.com/grpc/grpc-java.git
xds: avoid pushing duplicate (CDS) resource data to watchers (#7143)
De-duplicate cluster update information pushed to cluster watchers. This only applies to CDS as the management server sends a response with all requested clusters while only some of. them have changed (or newly been subscribed). This does not apply to EDS as the protocol is incremental and each EDS response will only contain ClusterLoadAssignments for clusters whose endpoints have changed. This does not apply to LDS and RDS as at any time we will subscribe to a single resource and our TD implementation will not send extra (unrequested) resources. So each time, the received responses always contain updated resource information.
This commit is contained in:
parent
0f1631c7a3
commit
3facda0130
|
|
@ -182,6 +182,28 @@ abstract class XdsClient {
|
|||
.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(
|
||||
clusterName, edsServiceName, lbPolicy, lrsServerName, upstreamTlsContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
ClusterUpdate that = (ClusterUpdate) o;
|
||||
return Objects.equals(clusterName, that.clusterName)
|
||||
&& Objects.equals(edsServiceName, that.edsServiceName)
|
||||
&& Objects.equals(lbPolicy, that.lbPolicy)
|
||||
&& Objects.equals(lrsServerName, that.lrsServerName)
|
||||
&& Objects.equals(upstreamTlsContext, that.upstreamTlsContext);
|
||||
}
|
||||
|
||||
static Builder newBuilder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
|
@ -287,9 +309,9 @@ abstract class XdsClient {
|
|||
return false;
|
||||
}
|
||||
EndpointUpdate that = (EndpointUpdate) o;
|
||||
return clusterName.equals(that.clusterName)
|
||||
&& localityLbEndpointsMap.equals(that.localityLbEndpointsMap)
|
||||
&& dropPolicies.equals(that.dropPolicies);
|
||||
return Objects.equals(clusterName, that.clusterName)
|
||||
&& Objects.equals(localityLbEndpointsMap, that.localityLbEndpointsMap)
|
||||
&& Objects.equals(dropPolicies, that.dropPolicies);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -1022,13 +1022,15 @@ final class XdsClientImpl extends XdsClient {
|
|||
|
||||
// Update local CDS cache with data in this response.
|
||||
absentCdsResources.removeAll(clusterUpdates.keySet());
|
||||
for (String clusterName : clusterNamesToClusterUpdates.keySet()) {
|
||||
if (!clusterUpdates.containsKey(clusterName)) {
|
||||
for (Map.Entry<String, ClusterUpdate> entry : clusterNamesToClusterUpdates.entrySet()) {
|
||||
if (!clusterUpdates.containsKey(entry.getKey())) {
|
||||
// Some previously existing resource no longer exists.
|
||||
absentCdsResources.add(clusterName);
|
||||
absentCdsResources.add(entry.getKey());
|
||||
} else if (clusterUpdates.get(entry.getKey()).equals(entry.getValue())) {
|
||||
clusterUpdates.remove(entry.getKey());
|
||||
}
|
||||
}
|
||||
clusterNamesToClusterUpdates.clear();
|
||||
clusterNamesToClusterUpdates.keySet().removeAll(absentCdsResources);
|
||||
clusterNamesToClusterUpdates.putAll(clusterUpdates);
|
||||
|
||||
// Remove EDS cache entries for ClusterLoadAssignments not referenced by this CDS response.
|
||||
|
|
@ -1056,12 +1058,13 @@ final class XdsClientImpl extends XdsClient {
|
|||
// Notify watchers if clusters interested in present in this CDS response.
|
||||
for (Map.Entry<String, Set<ClusterWatcher>> entry : clusterWatchers.entrySet()) {
|
||||
String clusterName = entry.getKey();
|
||||
if (clusterUpdates.containsKey(clusterName)) {
|
||||
if (clusterUpdates.containsKey(entry.getKey())) {
|
||||
ClusterUpdate clusterUpdate = clusterUpdates.get(clusterName);
|
||||
for (ClusterWatcher watcher : entry.getValue()) {
|
||||
watcher.onClusterChanged(clusterUpdate);
|
||||
}
|
||||
} else if (!cdsRespTimers.containsKey(clusterName)) {
|
||||
} else if (!clusterNamesToClusterUpdates.containsKey(entry.getKey())
|
||||
&& !cdsRespTimers.containsKey(clusterName)) {
|
||||
// Update for previously present resource being removed.
|
||||
for (ClusterWatcher watcher : entry.getValue()) {
|
||||
watcher.onResourceDoesNotExist(entry.getKey());
|
||||
|
|
@ -1192,6 +1195,8 @@ final class XdsClientImpl extends XdsClient {
|
|||
absentEdsResources.removeAll(endpointUpdates.keySet());
|
||||
|
||||
// Notify watchers waiting for updates of endpoint information received in this EDS response.
|
||||
// Based on xDS protocol, the management server should not send endpoint data again if
|
||||
// nothing has changed.
|
||||
for (Map.Entry<String, EndpointUpdate> entry : endpointUpdates.entrySet()) {
|
||||
String clusterName = entry.getKey();
|
||||
// Cancel and delete response timeout timer.
|
||||
|
|
|
|||
|
|
@ -1573,24 +1573,7 @@ public class XdsClientImplTest {
|
|||
ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
|
||||
XdsClientImpl.ADS_TYPE_URL_CDS, "0001")));
|
||||
|
||||
// All watchers received notification for cluster update.
|
||||
verify(watcher1, times(2)).onClusterChanged(clusterUpdateCaptor1.capture());
|
||||
clusterUpdate1 = clusterUpdateCaptor1.getValue();
|
||||
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
|
||||
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
|
||||
assertThat(clusterUpdate1.getEdsServiceName()).isNull();
|
||||
assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
|
||||
assertThat(clusterUpdate1.getLrsServerName()).isNull();
|
||||
|
||||
clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
|
||||
verify(watcher2, times(2)).onClusterChanged(clusterUpdateCaptor2.capture());
|
||||
clusterUpdate2 = clusterUpdateCaptor2.getValue();
|
||||
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
|
||||
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
|
||||
assertThat(clusterUpdate2.getEdsServiceName()).isNull();
|
||||
assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
|
||||
assertThat(clusterUpdate2.getLrsServerName()).isNull();
|
||||
|
||||
verifyNoMoreInteractions(watcher1, watcher2); // resource has no change
|
||||
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor3 = ArgumentCaptor.forClass(null);
|
||||
verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture());
|
||||
ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue();
|
||||
|
|
@ -1728,14 +1711,7 @@ public class XdsClientImplTest {
|
|||
new DiscoveryRequestMatcher("1",
|
||||
ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
|
||||
XdsClientImpl.ADS_TYPE_URL_CDS, "0001")));
|
||||
|
||||
verify(watcher1, times(2)).onClusterChanged(clusterUpdateCaptor1.capture());
|
||||
clusterUpdate1 = clusterUpdateCaptor1.getValue();
|
||||
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
|
||||
assertThat(clusterUpdate1.getEdsServiceName()).isNull();
|
||||
assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
|
||||
assertThat(clusterUpdate1.getLrsServerName()).isNull();
|
||||
|
||||
verifyNoMoreInteractions(watcher1); // resource has no change
|
||||
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
|
||||
verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture());
|
||||
ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue();
|
||||
|
|
|
|||
Loading…
Reference in New Issue