xds: resource (with version info) should persist across ADS streams along with XdsClient lifetime (#7427)

The version_info in the xDS protocol represents the client's knowledge for the state of that resource type. It should persist across ADS stream recreation. Even if the ADS stream is recreated, the XdsClient should persist its knowledge for resources it has received. With this implementation, client and server are stateful across the xDS communication. With persisted version_info, the management server knows resources that the client currently knows even after the stream is recreated. So it does not need to re-send resources that the client received with the previous stream.
This commit is contained in:
Chengyuan Zhang 2020-09-16 10:51:38 -07:00 committed by GitHub
parent eb871698e3
commit f04f33efe8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 55 additions and 58 deletions

View File

@ -163,6 +163,14 @@ final class XdsClientImpl extends XdsClient {
private final LoadStatsManager loadStatsManager = new LoadStatsManager();
// Last successfully applied version_info for each resource type. Starts with empty string.
// A version_info is used to update management server with client's most recent knowledge of
// resources.
private String ldsVersion = "";
private String rdsVersion = "";
private String cdsVersion = "";
private String edsVersion = "";
// Timer for concluding the currently requesting LDS resource not found.
@Nullable
private ScheduledHandle ldsRespTimer;
@ -226,7 +234,7 @@ final class XdsClientImpl extends XdsClient {
if (adsStream != null) {
adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
}
cleanUpResources();
cleanUpResourceTimers();
if (lrsClient != null) {
lrsClient.stopLoadReporting();
lrsClient = null;
@ -236,15 +244,7 @@ final class XdsClientImpl extends XdsClient {
}
}
/**
* Purge cache for resources and cancel resource fetch timers.
*/
private void cleanUpResources() {
clusterNamesToClusterUpdates.clear();
absentCdsResources.clear();
clusterNamesToEndpointUpdates.clear();
absentEdsResources.clear();
private void cleanUpResourceTimers() {
if (ldsRespTimer != null) {
ldsRespTimer.cancel();
ldsRespTimer = null;
@ -1434,14 +1434,6 @@ final class XdsClientImpl extends XdsClient {
private boolean responseReceived;
private boolean closed;
// Last successfully applied version_info for each resource type. Starts with empty string.
// A version_info is used to update management server with client's most recent knowledge of
// resources.
private String ldsVersion = "";
private String rdsVersion = "";
private String cdsVersion = "";
private String edsVersion = "";
// Response nonce for the most recently received discovery responses of each resource type.
// Client initiated requests start response nonce with empty string.
// A nonce is used to indicate the specific DiscoveryResponse each DiscoveryRequest
@ -1558,7 +1550,7 @@ final class XdsClientImpl extends XdsClient {
}
}
cleanUp();
cleanUpResources();
cleanUpResourceTimers();
if (responseReceived || retryBackoffPolicy == null) {
// Reset the backoff sequence if had received a response, or backoff sequence
// has never been initialized.

View File

@ -2672,7 +2672,7 @@ public class XdsClientImplTest {
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
// RPC stream closed immediately
@ -2689,10 +2689,11 @@ public class XdsClientImplTest {
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
// Management server sends an LDS response.
ldsResponse = buildDiscoveryResponse("1", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0001");
responseObserver.onNext(ldsResponse);
// Client sends an ACK LDS request and an RDS request for "route-foo.googleapis.com". (Omitted)
@ -2724,7 +2725,7 @@ public class XdsClientImplTest {
fakeClock.runDueTasks();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequest(NODE, "1", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verifyNoMoreInteractions(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
@ -2786,7 +2787,7 @@ public class XdsClientImplTest {
// Retry resumes requests for all wanted resources.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com",
@ -2815,7 +2816,7 @@ public class XdsClientImplTest {
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com",
@ -2844,7 +2845,7 @@ public class XdsClientImplTest {
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com",
@ -2877,10 +2878,10 @@ public class XdsClientImplTest {
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com",
.onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com",
@ -2905,10 +2906,10 @@ public class XdsClientImplTest {
.streamAggregatedResources(responseObserverCaptor.capture());
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com",
.onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com",
@ -2950,7 +2951,7 @@ public class XdsClientImplTest {
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
// Management server becomes unreachable.
@ -2971,7 +2972,7 @@ public class XdsClientImplTest {
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com",
@ -2995,7 +2996,7 @@ public class XdsClientImplTest {
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com",
@ -3038,10 +3039,10 @@ public class XdsClientImplTest {
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com",
.onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", "cluster2.googleapis.com",
@ -3065,10 +3066,10 @@ public class XdsClientImplTest {
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver, never())
.onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com",
.onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver, never())
.onNext(eq(buildDiscoveryRequest(NODE, "", "cluster2.googleapis.com",

View File

@ -708,6 +708,8 @@ public class XdsClientImplTestForListener {
buildDiscoveryResponseV2("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0000");
responseObserver.onNext(response);
// Client sent an ACK CDS request (Omitted).
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(null);
// Management server closes the RPC stream with an error.
@ -725,7 +727,7 @@ public class XdsClientImplTestForListener {
// Retry resumes requests for all wanted resources.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
// Management server becomes unreachable.
@ -744,7 +746,7 @@ public class XdsClientImplTestForListener {
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
// Management server is still not reachable.
@ -763,7 +765,7 @@ public class XdsClientImplTestForListener {
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
// Management server sends back a LDS response.
@ -786,7 +788,7 @@ public class XdsClientImplTestForListener {
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
// Management server becomes unreachable again.
@ -804,7 +806,7 @@ public class XdsClientImplTestForListener {
.streamAggregatedResources(responseObserverCaptor.capture());
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1",
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,

View File

@ -2682,7 +2682,7 @@ public class XdsClientImplTestV2 {
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
// RPC stream closed immediately
@ -2699,10 +2699,12 @@ public class XdsClientImplTestV2 {
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
// Management server sends an LDS response.
ldsResponse =
buildDiscoveryResponseV2("1", listeners, XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0001");
responseObserver.onNext(ldsResponse);
// Client sends an ACK LDS request and an RDS request for "route-foo.googleapis.com". (Omitted)
@ -2734,7 +2736,7 @@ public class XdsClientImplTestV2 {
fakeClock.runDueTasks();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequestV2(NODE, "1", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
verifyNoMoreInteractions(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
@ -2796,7 +2798,7 @@ public class XdsClientImplTestV2 {
// Retry resumes requests for all wanted resources.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com",
@ -2825,7 +2827,7 @@ public class XdsClientImplTestV2 {
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com",
@ -2854,7 +2856,7 @@ public class XdsClientImplTestV2 {
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com",
@ -2887,10 +2889,10 @@ public class XdsClientImplTestV2 {
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com",
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS_V2, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com",
@ -2915,10 +2917,10 @@ public class XdsClientImplTestV2 {
.streamAggregatedResources(responseObserverCaptor.capture());
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com",
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS_V2, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com",
@ -2960,7 +2962,7 @@ public class XdsClientImplTestV2 {
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
// Management server becomes unreachable.
@ -2981,7 +2983,7 @@ public class XdsClientImplTestV2 {
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com",
@ -3005,7 +3007,7 @@ public class XdsClientImplTestV2 {
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com",
@ -3048,10 +3050,10 @@ public class XdsClientImplTestV2 {
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com",
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS_V2, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster2.googleapis.com",
@ -3075,10 +3077,10 @@ public class XdsClientImplTestV2 {
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY,
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY,
XdsClientImpl.ADS_TYPE_URL_LDS_V2, "")));
verify(requestObserver, never())
.onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com",
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS_V2, "")));
verify(requestObserver, never())
.onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster2.googleapis.com",