diff --git a/xds/internal/clients/xdsclient/ads_stream.go b/xds/internal/clients/xdsclient/ads_stream.go index 774f8ab24..b9dc81d9e 100644 --- a/xds/internal/clients/xdsclient/ads_stream.go +++ b/xds/internal/clients/xdsclient/ads_stream.go @@ -68,9 +68,10 @@ type dataAndErrTuple struct { // occur on the ADS stream. Methods on this interface may be invoked // concurrently and implementations need to handle them in a thread-safe manner. type adsStreamEventHandler interface { - onStreamError(error) // Called when the ADS stream breaks. - onWatchExpiry(ResourceType, string) // Called when the watch timer expires for a resource. - onResponse(response, func()) ([]string, error) // Called when a response is received on the ADS stream. + onStreamError(error) // Called when the ADS stream breaks. + onWatchExpiry(ResourceType, string) // Called when the watch timer expires for a resource. + onResponse(response, func()) ([]string, error) // Called when a response is received on the ADS stream. + onRequiredToRemoveUnsubscribedCacheEntries(typeURL string) // Called when it is needed to remove unsubscribed cache entries. } // state corresponding to a resource type. @@ -444,6 +445,9 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string, } } + // Call the event handler to remove unsubscribed cache entries. + s.eventHandler.onRequiredToRemoveUnsubscribedCacheEntries(url) + msg, err := proto.Marshal(req) if err != nil { s.logger.Warningf("Failed to marshal DiscoveryRequest: %v", err) @@ -460,6 +464,7 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string, } else if s.logger.V(2) { s.logger.Warningf("ADS request sent for type %q, resources: %v, version: %q, nonce: %q", url, names, version, nonce) } + return nil } diff --git a/xds/internal/clients/xdsclient/authority.go b/xds/internal/clients/xdsclient/authority.go index 7a3a29691..b8abe98b4 100644 --- a/xds/internal/clients/xdsclient/authority.go +++ b/xds/internal/clients/xdsclient/authority.go @@ -655,6 +655,17 @@ func (a *authority) watchResource(rType ResourceType, resourceName string, watch } resources[resourceName] = state xdsChannel.channel.subscribe(rType, resourceName) + } else if len(state.watchers) == 0 { + if a.logger.V(2) { + a.logger.Infof("Re-watch for type %q, resource name %q before unsubscription", rType.TypeName, resourceName) + } + // Add the active channel to the resource's channel configs if not + // already present. + state.xdsChannelConfigs[xdsChannel] = true + // Ensure the resource is subscribed on the active channel. We do this + // even if resource is present in cache as re-watches might occur + // after unsubscribes or channel changes. + xdsChannel.channel.subscribe(rType, resourceName) } // Always add the new watcher to the set of watchers. state.watchers[watcher] = true @@ -720,6 +731,10 @@ func (a *authority) unwatchResource(rType ResourceType, resourceName string, wat // there when the watch was registered. resources := a.resources[rType] state := resources[resourceName] + if state == nil { + a.logger.Warningf("Attempting to unwatch resource %q of type %q which is not currently watched", resourceName, rType.TypeName) + return + } // Delete this particular watcher from the list of watchers, so that its // callback will not be invoked in the future. @@ -732,32 +747,16 @@ func (a *authority) unwatchResource(rType ResourceType, resourceName string, wat } // There are no more watchers for this resource. Unsubscribe this - // resource from all channels where it was subscribed to and delete - // the state associated with it. + // resource from all channels where it was subscribed to but do not + // delete the state associated with it in case the resource is + // re-requested later before un-subscription request is completed by + // the management server. if a.logger.V(2) { a.logger.Infof("Removing last watch for resource name %q", resourceName) } for xcc := range state.xdsChannelConfigs { xcc.channel.unsubscribe(rType, resourceName) } - delete(resources, resourceName) - - // If there are no more watchers for this resource type, delete the - // resource type from the top-level map. - if len(resources) == 0 { - if a.logger.V(2) { - a.logger.Infof("Removing last watch for resource type %q", rType.TypeName) - } - delete(a.resources, rType) - } - // If there are no more watchers for any resource type, release the - // reference to the xdsChannels. - if len(a.resources) == 0 { - if a.logger.V(2) { - a.logger.Infof("Removing last watch for for any resource type, releasing reference to the xdsChannel") - } - a.closeXDSChannels() - } }, func() { close(done) }) <-done }) @@ -874,6 +873,41 @@ func (a *authority) close() { } } +// removeUnsubscribedCacheEntries iterates through all resources of the given type and +// removes the state for resources that have no active watchers. This is called +// after sending a discovery request to ensure that resources that were +// unsubscribed (and thus have no watchers) are eventually removed from the +// authority's cache. +func (a *authority) removeUnsubscribedCacheEntries(rType ResourceType) { + resources := a.resources[rType] + if resources == nil { + return + } + + for name, state := range resources { + if len(state.watchers) == 0 { + if a.logger.V(2) { + a.logger.Infof("Removing resource state for %q of type %q as it has no watchers after an update cycle", name, rType.TypeName) + } + delete(resources, name) + } + } + + if len(resources) == 0 { + if a.logger.V(2) { + a.logger.Infof("Removing resource type %q from cache as it has no more resources", rType.TypeName) + } + delete(a.resources, rType) + } + + if len(a.resources) == 0 { + if a.logger.V(2) { + a.logger.Infof("Removing last watch for any resource type, releasing reference to the xdsChannels") + } + a.closeXDSChannels() + } +} + func serviceStatusToProto(serviceStatus xdsresource.ServiceStatus) v3adminpb.ClientResourceStatus { switch serviceStatus { case xdsresource.ServiceStatusUnknown: diff --git a/xds/internal/clients/xdsclient/channel.go b/xds/internal/clients/xdsclient/channel.go index 2d424b811..3bc5378dc 100644 --- a/xds/internal/clients/xdsclient/channel.go +++ b/xds/internal/clients/xdsclient/channel.go @@ -59,6 +59,10 @@ type xdsChannelEventHandler interface { // adsResourceDoesNotExist is called when the xdsChannel determines that a // requested ADS resource does not exist. adsResourceDoesNotExist(ResourceType, string) + + // requiredToRemoveUnsubscribedCacheEntries is called when the xdsChannel + // needs to remove unsubscribed cache entries. + requiredToRemoveUnsubscribedCacheEntries(ResourceType) } // xdsChannelOpts holds the options for creating a new xdsChannel. @@ -136,8 +140,30 @@ type xdsChannel struct { } func (xc *xdsChannel) close() { + if xc.closed.HasFired() { + return + } xc.closed.Fire() + + // Get the resource types that this specific ADS stream was handling + // before stopping it. + xc.ads.mu.Lock() + typesHandledByStream := make([]ResourceType, 0, len(xc.ads.resourceTypeState)) + for typ := range xc.ads.resourceTypeState { + typesHandledByStream = append(typesHandledByStream, typ) + } + xc.ads.mu.Unlock() + xc.ads.Stop() + + // Schedule removeUnsubscribedCacheEntries for the types this stream was handling, + // on all authorities that were interested in this channel. + if _, ok := xc.eventHandler.(*channelState); ok { + for _, typ := range typesHandledByStream { + xc.eventHandler.requiredToRemoveUnsubscribedCacheEntries(typ) + } + } + xc.transport.Close() xc.logger.Infof("Shutdown") } @@ -228,6 +254,24 @@ func (xc *xdsChannel) onResponse(resp response, onDone func()) ([]string, error) return names, err } +func (xc *xdsChannel) onRequiredToRemoveUnsubscribedCacheEntries(typeURL string) { + if xc.closed.HasFired() { + if xc.logger.V(2) { + xc.logger.Infof("Received an update from the ADS stream on closed ADS stream") + } + return + } + + // Lookup the resource parser based on the resource type. + rType, ok := xc.clientConfig.ResourceTypes[typeURL] + if !ok { + logger.Warningf("Resource type URL %q unknown in response from server", typeURL) + return + } + + xc.eventHandler.requiredToRemoveUnsubscribedCacheEntries(rType) +} + // decodeResponse decodes the resources in the given ADS response. // // The opts parameter provides configuration options for decoding the resources. diff --git a/xds/internal/clients/xdsclient/channel_test.go b/xds/internal/clients/xdsclient/channel_test.go index 1a9f37f87..db3f8d7cd 100644 --- a/xds/internal/clients/xdsclient/channel_test.go +++ b/xds/internal/clients/xdsclient/channel_test.go @@ -772,3 +772,6 @@ func (ta *testEventHandler) waitForResourceDoesNotExist(ctx context.Context) (Re } return typ, name, nil } + +func (*testEventHandler) requiredToRemoveUnsubscribedCacheEntries(ResourceType) { +} diff --git a/xds/internal/clients/xdsclient/test/ads_stream_ack_nack_test.go b/xds/internal/clients/xdsclient/test/ads_stream_ack_nack_test.go index 467365d26..eaaec608b 100644 --- a/xds/internal/clients/xdsclient/test/ads_stream_ack_nack_test.go +++ b/xds/internal/clients/xdsclient/test/ads_stream_ack_nack_test.go @@ -469,8 +469,10 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) { } // Cancel the watch on the listener resource. This should result in the - // existing connection to be management server getting closed. + // existing connection to be management server getting closed after the + // unsubscription discovery request is sent. ldsCancel() + // Verify that the connection to the management server is closed. if _, err := streamCloseCh.Receive(ctx); err != nil { t.Fatalf("Timeout when expecting existing connection to be closed: %v", err) } diff --git a/xds/internal/clients/xdsclient/xdsclient.go b/xds/internal/clients/xdsclient/xdsclient.go index c9cd52a1e..7529ee569 100644 --- a/xds/internal/clients/xdsclient/xdsclient.go +++ b/xds/internal/clients/xdsclient/xdsclient.go @@ -439,6 +439,21 @@ func (cs *channelState) adsResourceDoesNotExist(typ ResourceType, resourceName s } } +func (cs *channelState) requiredToRemoveUnsubscribedCacheEntries(rType ResourceType) { + if cs.parent.done.HasFired() { + return + } + + cs.parent.channelsMu.Lock() + defer cs.parent.channelsMu.Unlock() + + for authority := range cs.interestedAuthorities { + authority.xdsClientSerializer.TrySchedule(func(context.Context) { + authority.removeUnsubscribedCacheEntries(rType) + }) + } +} + func resourceWatchStateForTesting(c *XDSClient, rType ResourceType, resourceName string) (xdsresource.ResourceWatchState, error) { c.channelsMu.Lock() defer c.channelsMu.Unlock()