diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index 82addf72b3..1dcb0c656c 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -163,6 +163,14 @@ final class XdsClientImpl extends XdsClient { private final LoadStatsManager loadStatsManager = new LoadStatsManager(); + // Last successfully applied version_info for each resource type. Starts with empty string. + // A version_info is used to update management server with client's most recent knowledge of + // resources. + private String ldsVersion = ""; + private String rdsVersion = ""; + private String cdsVersion = ""; + private String edsVersion = ""; + // Timer for concluding the currently requesting LDS resource not found. @Nullable private ScheduledHandle ldsRespTimer; @@ -226,7 +234,7 @@ final class XdsClientImpl extends XdsClient { if (adsStream != null) { adsStream.close(Status.CANCELLED.withDescription("shutdown").asException()); } - cleanUpResources(); + cleanUpResourceTimers(); if (lrsClient != null) { lrsClient.stopLoadReporting(); lrsClient = null; @@ -236,15 +244,7 @@ final class XdsClientImpl extends XdsClient { } } - /** - * Purge cache for resources and cancel resource fetch timers. - */ - private void cleanUpResources() { - clusterNamesToClusterUpdates.clear(); - absentCdsResources.clear(); - clusterNamesToEndpointUpdates.clear(); - absentEdsResources.clear(); - + private void cleanUpResourceTimers() { if (ldsRespTimer != null) { ldsRespTimer.cancel(); ldsRespTimer = null; @@ -1434,14 +1434,6 @@ final class XdsClientImpl extends XdsClient { private boolean responseReceived; private boolean closed; - // Last successfully applied version_info for each resource type. Starts with empty string. - // A version_info is used to update management server with client's most recent knowledge of - // resources. - private String ldsVersion = ""; - private String rdsVersion = ""; - private String cdsVersion = ""; - private String edsVersion = ""; - // Response nonce for the most recently received discovery responses of each resource type. // Client initiated requests start response nonce with empty string. // A nonce is used to indicate the specific DiscoveryResponse each DiscoveryRequest @@ -1558,7 +1550,7 @@ final class XdsClientImpl extends XdsClient { } } cleanUp(); - cleanUpResources(); + cleanUpResourceTimers(); if (responseReceived || retryBackoffPolicy == null) { // Reset the backoff sequence if had received a response, or backoff sequence // has never been initialized. diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java index b55a649741..a9b8e42b6b 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java @@ -2672,7 +2672,7 @@ public class XdsClientImplTest { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); // RPC stream closed immediately @@ -2689,10 +2689,11 @@ public class XdsClientImplTest { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); // Management server sends an LDS response. + ldsResponse = buildDiscoveryResponse("1", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0001"); responseObserver.onNext(ldsResponse); // Client sends an ACK LDS request and an RDS request for "route-foo.googleapis.com". (Omitted) @@ -2724,7 +2725,7 @@ public class XdsClientImplTest { fakeClock.runDueTasks(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "1", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verifyNoMoreInteractions(backoffPolicyProvider, backoffPolicy1, backoffPolicy2); @@ -2786,7 +2787,7 @@ public class XdsClientImplTest { // Retry resumes requests for all wanted resources. verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", @@ -2815,7 +2816,7 @@ public class XdsClientImplTest { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", @@ -2844,7 +2845,7 @@ public class XdsClientImplTest { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", @@ -2877,10 +2878,10 @@ public class XdsClientImplTest { requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", + .onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", @@ -2905,10 +2906,10 @@ public class XdsClientImplTest { .streamAggregatedResources(responseObserverCaptor.capture()); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", + .onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", @@ -2950,7 +2951,7 @@ public class XdsClientImplTest { StreamObserver requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); // Management server becomes unreachable. @@ -2971,7 +2972,7 @@ public class XdsClientImplTest { requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", @@ -2995,7 +2996,7 @@ public class XdsClientImplTest { requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", @@ -3038,10 +3039,10 @@ public class XdsClientImplTest { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", + .onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster2.googleapis.com", @@ -3065,10 +3066,10 @@ public class XdsClientImplTest { requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequest(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS, ""))); verify(requestObserver, never()) - .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster.googleapis.com", + .onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS, ""))); verify(requestObserver, never()) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster2.googleapis.com", diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestForListener.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestForListener.java index ba729f0481..77c073cf7b 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestForListener.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestForListener.java @@ -708,6 +708,8 @@ public class XdsClientImplTestForListener { buildDiscoveryResponseV2("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0000"); responseObserver.onNext(response); + // Client sent an ACK CDS request (Omitted). + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(null); // Management server closes the RPC stream with an error. @@ -725,7 +727,7 @@ public class XdsClientImplTestForListener { // Retry resumes requests for all wanted resources. verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0", XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); // Management server becomes unreachable. @@ -744,7 +746,7 @@ public class XdsClientImplTestForListener { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0", XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); // Management server is still not reachable. @@ -763,7 +765,7 @@ public class XdsClientImplTestForListener { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0", XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); // Management server sends back a LDS response. @@ -786,7 +788,7 @@ public class XdsClientImplTestForListener { requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1", XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); // Management server becomes unreachable again. @@ -804,7 +806,7 @@ public class XdsClientImplTestForListener { .streamAggregatedResources(responseObserverCaptor.capture()); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", + .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1", XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestV2.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestV2.java index d3691a9a47..51b5b5db68 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestV2.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestV2.java @@ -2682,7 +2682,7 @@ public class XdsClientImplTestV2 { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); // RPC stream closed immediately @@ -2699,10 +2699,12 @@ public class XdsClientImplTestV2 { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); // Management server sends an LDS response. + ldsResponse = + buildDiscoveryResponseV2("1", listeners, XdsClientImpl.ADS_TYPE_URL_LDS_V2, "0001"); responseObserver.onNext(ldsResponse); // Client sends an ACK LDS request and an RDS request for "route-foo.googleapis.com". (Omitted) @@ -2734,7 +2736,7 @@ public class XdsClientImplTestV2 { fakeClock.runDueTasks(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "1", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verifyNoMoreInteractions(backoffPolicyProvider, backoffPolicy1, backoffPolicy2); @@ -2796,7 +2798,7 @@ public class XdsClientImplTestV2 { // Retry resumes requests for all wanted resources. verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", @@ -2825,7 +2827,7 @@ public class XdsClientImplTestV2 { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", @@ -2854,7 +2856,7 @@ public class XdsClientImplTestV2 { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", @@ -2887,10 +2889,10 @@ public class XdsClientImplTestV2 { requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS_V2, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", @@ -2915,10 +2917,10 @@ public class XdsClientImplTestV2 { .streamAggregatedResources(responseObserverCaptor.capture()); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS_V2, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", @@ -2960,7 +2962,7 @@ public class XdsClientImplTestV2 { StreamObserver requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); // Management server becomes unreachable. @@ -2981,7 +2983,7 @@ public class XdsClientImplTestV2 { requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", @@ -3005,7 +3007,7 @@ public class XdsClientImplTestV2 { requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", @@ -3048,10 +3050,10 @@ public class XdsClientImplTestV2 { responseObserver = responseObserverCaptor.getValue(); requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS_V2, ""))); verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster2.googleapis.com", @@ -3075,10 +3077,10 @@ public class XdsClientImplTestV2 { requestObserver = requestObservers.poll(); verify(requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", TARGET_AUTHORITY, + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", TARGET_AUTHORITY, XdsClientImpl.ADS_TYPE_URL_LDS_V2, ""))); verify(requestObserver, never()) - .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster.googleapis.com", + .onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS_V2, ""))); verify(requestObserver, never()) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster2.googleapis.com",