From f04f33efe8e4f4d88099df68b6072416da468ad6 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Wed, 16 Sep 2020 10:51:38 -0700 Subject: [PATCH] xds: resource (with version info) should persist across ADS streams along with XdsClient lifetime (#7427) The version_info in the xDS protocol represents the client's knowledge for the state of that resource type. It should persist across ADS stream recreation. Even if the ADS stream is recreated, the XdsClient should persist its knowledge for resources it has received. With this implementation, client and server are stateful across the xDS communication. With persisted version_info, the management server knows resources that the client currently knows even after the stream is recreated. So it does not need to re-send resources that the client received with the previous stream. --- .../main/java/io/grpc/xds/XdsClientImpl.java | 30 ++++++---------- .../java/io/grpc/xds/XdsClientImplTest.java | 35 +++++++++--------- .../xds/XdsClientImplTestForListener.java | 12 ++++--- .../java/io/grpc/xds/XdsClientImplTestV2.java | 36 ++++++++++--------- 4 files changed, 55 insertions(+), 58 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index 82addf72b3..1dcb0c656c 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -163,6 +163,14 @@ final class XdsClientImpl extends XdsClient { private final LoadStatsManager loadStatsManager = new LoadStatsManager(); + // Last successfully applied version_info for each resource type. Starts with empty string. + // A version_info is used to update management server with client's most recent knowledge of + // resources. + private String ldsVersion = ""; + private String rdsVersion = ""; + private String cdsVersion = ""; + private String edsVersion = ""; + // Timer for concluding the currently requesting LDS resource not found. @Nullable private ScheduledHandle ldsRespTimer; @@ -226,7 +234,7 @@ final class XdsClientImpl extends XdsClient { if (adsStream != null) { adsStream.close(Status.CANCELLED.withDescription("shutdown").asException()); } - cleanUpResources(); + cleanUpResourceTimers(); if (lrsClient != null) { lrsClient.stopLoadReporting(); lrsClient = null; @@ -236,15 +244,7 @@ final class XdsClientImpl extends XdsClient { } } - /** - * Purge cache for resources and cancel resource fetch timers. - */ - private void cleanUpResources() { - clusterNamesToClusterUpdates.clear(); - absentCdsResources.clear(); - clusterNamesToEndpointUpdates.clear(); - absentEdsResources.clear(); - + private void cleanUpResourceTimers() { if (ldsRespTimer != null) { ldsRespTimer.cancel(); ldsRespTimer = null; @@ -1434,14 +1434,6 @@ final class XdsClientImpl extends XdsClient { private boolean responseReceived; private boolean closed; - // Last successfully applied version_info for each resource type. Starts with empty string. - // A version_info is used to update management server with client's most recent knowledge of - // resources. - private String ldsVersion = ""; - private String rdsVersion = ""; - private String cdsVersion = ""; - private String edsVersion = ""; - // Response nonce for the most recently received discovery responses of each resource type. // Client initiated requests start response nonce with empty string. // A nonce is used to indicate the specific DiscoveryResponse each DiscoveryRequest @@ -1558,7 +1550,7 @@ final class XdsClientImpl extends XdsClient { } } cleanUp(); - cleanUpResources(); + cleanUpResourceTimers(); if (responseReceived || retryBackoffPolicy == null) { // Reset the backoff sequence if had received a response, or backoff sequence // has never been initialized. diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java index b55a649741..a9b8e42b6b 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java @@ -2672,7 +2672,7 @@ public class XdsClientImplTest { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); // RPC stream closed immediately @@ -2689,10 +2689,11 @@ public class XdsClientImplTest { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); // Management server sends an LDS response. + ldsResponse = buildDiscoveryResponse("1", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0001"); responseObserver.onNext(ldsResponse); // Client sends an ACK LDS request and an RDS request for "route-foo.googleapis.com". (Omitted) @@ -2724,7 +2725,7 @@ public class XdsClientImplTest { fakeClock.runDueTasks(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "1", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verifyNoMoreInteractions(backoffPolicyProvider, backoffPolicy1, backoffPolicy2); @@ -2786,7 +2787,7 @@ public class XdsClientImplTest { // Retry resumes requests for all wanted resources. verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", @@ -2815,7 +2816,7 @@ public class XdsClientImplTest { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", @@ -2844,7 +2845,7 @@ public class XdsClientImplTest { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", @@ -2877,10 +2878,10 @@ public class XdsClientImplTest { requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", + .onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", @@ -2905,10 +2906,10 @@ public class XdsClientImplTest { .streamAggregatedResources(responseObserverCaptor.capture()); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", + .onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", @@ -2950,7 +2951,7 @@ public class XdsClientImplTest { StreamObserver requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); // Management server becomes unreachable. @@ -2971,7 +2972,7 @@ public class XdsClientImplTest { requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", @@ -2995,7 +2996,7 @@ public class XdsClientImplTest { requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", @@ -3038,10 +3039,10 @@ public class XdsClientImplTest { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", + .onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster2.googleapis.com", @@ -3065,10 +3066,10 @@ public class XdsClientImplTest { requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verify(requestObserver, never()) - .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", + .onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS, ""))); verify(requestObserver, never()) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster2.googleapis.com", diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestForListener.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestForListener.java index ba729f0481..77c073cf7b 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestForListener.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestForListener.java @@ -708,6 +708,8 @@ public class XdsClientImplTestForListener { buildDiscoveryResponseV2("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0000"); responseObserver.onNext(response); + // Client sent an ACK CDS request (Omitted). + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(null); // Management server closes the RPC stream with an error. @@ -725,7 +727,7 @@ public class XdsClientImplTestForListener { // Retry resumes requests for all wanted resources. verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0", XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); // Management server becomes unreachable. @@ -744,7 +746,7 @@ public class XdsClientImplTestForListener { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0", XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); // Management server is still not reachable. @@ -763,7 +765,7 @@ public class XdsClientImplTestForListener { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0", XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); // Management server sends back a LDS response. @@ -786,7 +788,7 @@ public class XdsClientImplTestForListener { requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1", XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); // Management server becomes unreachable again. @@ -804,7 +806,7 @@ public class XdsClientImplTestForListener { .streamAggregatedResources(responseObserverCaptor.capture()); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1", XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestV2.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestV2.java index d3691a9a47..51b5b5db68 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestV2.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestV2.java @@ -2682,7 +2682,7 @@ public class XdsClientImplTestV2 { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); // RPC stream closed immediately @@ -2699,10 +2699,12 @@ public class XdsClientImplTestV2 { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); // Management server sends an LDS response. + ldsResponse = + buildDiscoveryResponseV2("1", listeners, XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0001"); responseObserver.onNext(ldsResponse); // Client sends an ACK LDS request and an RDS request for "route-foo.googleapis.com". (Omitted) @@ -2734,7 +2736,7 @@ public class XdsClientImplTestV2 { fakeClock.runDueTasks(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "1", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verifyNoMoreInteractions(backoffPolicyProvider, backoffPolicy1, backoffPolicy2); @@ -2796,7 +2798,7 @@ public class XdsClientImplTestV2 { // Retry resumes requests for all wanted resources. verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", @@ -2825,7 +2827,7 @@ public class XdsClientImplTestV2 { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", @@ -2854,7 +2856,7 @@ public class XdsClientImplTestV2 { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", @@ -2887,10 +2889,10 @@ public class XdsClientImplTestV2 { requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS_V2, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", @@ -2915,10 +2917,10 @@ public class XdsClientImplTestV2 { .streamAggregatedResources(responseObserverCaptor.capture()); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS_V2, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", @@ -2960,7 +2962,7 @@ public class XdsClientImplTestV2 { StreamObserver requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); // Management server becomes unreachable. @@ -2981,7 +2983,7 @@ public class XdsClientImplTestV2 { requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", @@ -3005,7 +3007,7 @@ public class XdsClientImplTestV2 { requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", @@ -3048,10 +3050,10 @@ public class XdsClientImplTestV2 { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS_V2, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster2.googleapis.com", @@ -3075,10 +3077,10 @@ public class XdsClientImplTestV2 { requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verify(requestObserver, never()) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS_V2, ""))); verify(requestObserver, never()) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster2.googleapis.com",