xdsclient: delay resource cache deletion to handle immediate re-subscription of same resource

This commit is contained in:
Purnesh Dixit 2025-05-29 09:59:48 +05:30
parent 0a12fb0d84
commit d6a72f1af7
6 changed files with 127 additions and 24 deletions

View File

@ -71,6 +71,7 @@ type adsStreamEventHandler interface {
onStreamError(error) // Called when the ADS stream breaks. onStreamError(error) // Called when the ADS stream breaks.
onWatchExpiry(ResourceType, string) // Called when the watch timer expires for a resource. onWatchExpiry(ResourceType, string) // Called when the watch timer expires for a resource.
onResponse(response, func()) ([]string, error) // Called when a response is received on the ADS stream. onResponse(response, func()) ([]string, error) // Called when a response is received on the ADS stream.
onRequiredToRemoveUnsubscribedCacheEntries(typeURL string) // Called when it is needed to remove unsubscribed cache entries.
} }
// state corresponding to a resource type. // state corresponding to a resource type.
@ -444,6 +445,9 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string,
} }
} }
// Call the event handler to remove unsubscribed cache entries.
s.eventHandler.onRequiredToRemoveUnsubscribedCacheEntries(url)
msg, err := proto.Marshal(req) msg, err := proto.Marshal(req)
if err != nil { if err != nil {
s.logger.Warningf("Failed to marshal DiscoveryRequest: %v", err) s.logger.Warningf("Failed to marshal DiscoveryRequest: %v", err)
@ -460,6 +464,7 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string,
} else if s.logger.V(2) { } else if s.logger.V(2) {
s.logger.Warningf("ADS request sent for type %q, resources: %v, version: %q, nonce: %q", url, names, version, nonce) s.logger.Warningf("ADS request sent for type %q, resources: %v, version: %q, nonce: %q", url, names, version, nonce)
} }
return nil return nil
} }

View File

@ -655,6 +655,17 @@ func (a *authority) watchResource(rType ResourceType, resourceName string, watch
} }
resources[resourceName] = state resources[resourceName] = state
xdsChannel.channel.subscribe(rType, resourceName) xdsChannel.channel.subscribe(rType, resourceName)
} else if len(state.watchers) == 0 {
if a.logger.V(2) {
a.logger.Infof("Re-watch for type %q, resource name %q before unsubscription", rType.TypeName, resourceName)
}
// Add the active channel to the resource's channel configs if not
// already present.
state.xdsChannelConfigs[xdsChannel] = true
// Ensure the resource is subscribed on the active channel. We do this
// even if resource is present in cache as re-watches might occur
// after unsubscribes or channel changes.
xdsChannel.channel.subscribe(rType, resourceName)
} }
// Always add the new watcher to the set of watchers. // Always add the new watcher to the set of watchers.
state.watchers[watcher] = true state.watchers[watcher] = true
@ -720,6 +731,10 @@ func (a *authority) unwatchResource(rType ResourceType, resourceName string, wat
// there when the watch was registered. // there when the watch was registered.
resources := a.resources[rType] resources := a.resources[rType]
state := resources[resourceName] state := resources[resourceName]
if state == nil {
a.logger.Warningf("Attempting to unwatch resource %q of type %q which is not currently watched", resourceName, rType.TypeName)
return
}
// Delete this particular watcher from the list of watchers, so that its // Delete this particular watcher from the list of watchers, so that its
// callback will not be invoked in the future. // callback will not be invoked in the future.
@ -732,32 +747,16 @@ func (a *authority) unwatchResource(rType ResourceType, resourceName string, wat
} }
// There are no more watchers for this resource. Unsubscribe this // There are no more watchers for this resource. Unsubscribe this
// resource from all channels where it was subscribed to and delete // resource from all channels where it was subscribed to but do not
// the state associated with it. // delete the state associated with it in case the resource is
// re-requested later before un-subscription request is completed by
// the management server.
if a.logger.V(2) { if a.logger.V(2) {
a.logger.Infof("Removing last watch for resource name %q", resourceName) a.logger.Infof("Removing last watch for resource name %q", resourceName)
} }
for xcc := range state.xdsChannelConfigs { for xcc := range state.xdsChannelConfigs {
xcc.channel.unsubscribe(rType, resourceName) xcc.channel.unsubscribe(rType, resourceName)
} }
delete(resources, resourceName)
// If there are no more watchers for this resource type, delete the
// resource type from the top-level map.
if len(resources) == 0 {
if a.logger.V(2) {
a.logger.Infof("Removing last watch for resource type %q", rType.TypeName)
}
delete(a.resources, rType)
}
// If there are no more watchers for any resource type, release the
// reference to the xdsChannels.
if len(a.resources) == 0 {
if a.logger.V(2) {
a.logger.Infof("Removing last watch for for any resource type, releasing reference to the xdsChannel")
}
a.closeXDSChannels()
}
}, func() { close(done) }) }, func() { close(done) })
<-done <-done
}) })
@ -874,6 +873,41 @@ func (a *authority) close() {
} }
} }
// removeUnsubscribedCacheEntries iterates through all resources of the given type and
// removes the state for resources that have no active watchers. This is called
// after sending a discovery request to ensure that resources that were
// unsubscribed (and thus have no watchers) are eventually removed from the
// authority's cache.
func (a *authority) removeUnsubscribedCacheEntries(rType ResourceType) {
resources := a.resources[rType]
if resources == nil {
return
}
for name, state := range resources {
if len(state.watchers) == 0 {
if a.logger.V(2) {
a.logger.Infof("Removing resource state for %q of type %q as it has no watchers after an update cycle", name, rType.TypeName)
}
delete(resources, name)
}
}
if len(resources) == 0 {
if a.logger.V(2) {
a.logger.Infof("Removing resource type %q from cache as it has no more resources", rType.TypeName)
}
delete(a.resources, rType)
}
if len(a.resources) == 0 {
if a.logger.V(2) {
a.logger.Infof("Removing last watch for any resource type, releasing reference to the xdsChannels")
}
a.closeXDSChannels()
}
}
func serviceStatusToProto(serviceStatus xdsresource.ServiceStatus) v3adminpb.ClientResourceStatus { func serviceStatusToProto(serviceStatus xdsresource.ServiceStatus) v3adminpb.ClientResourceStatus {
switch serviceStatus { switch serviceStatus {
case xdsresource.ServiceStatusUnknown: case xdsresource.ServiceStatusUnknown:

View File

@ -59,6 +59,10 @@ type xdsChannelEventHandler interface {
// adsResourceDoesNotExist is called when the xdsChannel determines that a // adsResourceDoesNotExist is called when the xdsChannel determines that a
// requested ADS resource does not exist. // requested ADS resource does not exist.
adsResourceDoesNotExist(ResourceType, string) adsResourceDoesNotExist(ResourceType, string)
// requiredToRemoveUnsubscribedCacheEntries is called when the xdsChannel
// needs to remove unsubscribed cache entries.
requiredToRemoveUnsubscribedCacheEntries(ResourceType)
} }
// xdsChannelOpts holds the options for creating a new xdsChannel. // xdsChannelOpts holds the options for creating a new xdsChannel.
@ -136,8 +140,30 @@ type xdsChannel struct {
} }
func (xc *xdsChannel) close() { func (xc *xdsChannel) close() {
if xc.closed.HasFired() {
return
}
xc.closed.Fire() xc.closed.Fire()
// Get the resource types that this specific ADS stream was handling
// before stopping it.
xc.ads.mu.Lock()
typesHandledByStream := make([]ResourceType, 0, len(xc.ads.resourceTypeState))
for typ := range xc.ads.resourceTypeState {
typesHandledByStream = append(typesHandledByStream, typ)
}
xc.ads.mu.Unlock()
xc.ads.Stop() xc.ads.Stop()
// Schedule removeUnsubscribedCacheEntries for the types this stream was handling,
// on all authorities that were interested in this channel.
if _, ok := xc.eventHandler.(*channelState); ok {
for _, typ := range typesHandledByStream {
xc.eventHandler.requiredToRemoveUnsubscribedCacheEntries(typ)
}
}
xc.transport.Close() xc.transport.Close()
xc.logger.Infof("Shutdown") xc.logger.Infof("Shutdown")
} }
@ -228,6 +254,24 @@ func (xc *xdsChannel) onResponse(resp response, onDone func()) ([]string, error)
return names, err return names, err
} }
func (xc *xdsChannel) onRequiredToRemoveUnsubscribedCacheEntries(typeURL string) {
if xc.closed.HasFired() {
if xc.logger.V(2) {
xc.logger.Infof("Received an update from the ADS stream on closed ADS stream")
}
return
}
// Lookup the resource parser based on the resource type.
rType, ok := xc.clientConfig.ResourceTypes[typeURL]
if !ok {
logger.Warningf("Resource type URL %q unknown in response from server", typeURL)
return
}
xc.eventHandler.requiredToRemoveUnsubscribedCacheEntries(rType)
}
// decodeResponse decodes the resources in the given ADS response. // decodeResponse decodes the resources in the given ADS response.
// //
// The opts parameter provides configuration options for decoding the resources. // The opts parameter provides configuration options for decoding the resources.

View File

@ -772,3 +772,6 @@ func (ta *testEventHandler) waitForResourceDoesNotExist(ctx context.Context) (Re
} }
return typ, name, nil return typ, name, nil
} }
func (*testEventHandler) requiredToRemoveUnsubscribedCacheEntries(ResourceType) {
}

View File

@ -469,8 +469,10 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) {
} }
// Cancel the watch on the listener resource. This should result in the // Cancel the watch on the listener resource. This should result in the
// existing connection to be management server getting closed. // existing connection to be management server getting closed after the
// unsubscription discovery request is sent.
ldsCancel() ldsCancel()
// Verify that the connection to the management server is closed.
if _, err := streamCloseCh.Receive(ctx); err != nil { if _, err := streamCloseCh.Receive(ctx); err != nil {
t.Fatalf("Timeout when expecting existing connection to be closed: %v", err) t.Fatalf("Timeout when expecting existing connection to be closed: %v", err)
} }

View File

@ -439,6 +439,21 @@ func (cs *channelState) adsResourceDoesNotExist(typ ResourceType, resourceName s
} }
} }
func (cs *channelState) requiredToRemoveUnsubscribedCacheEntries(rType ResourceType) {
if cs.parent.done.HasFired() {
return
}
cs.parent.channelsMu.Lock()
defer cs.parent.channelsMu.Unlock()
for authority := range cs.interestedAuthorities {
authority.xdsClientSerializer.TrySchedule(func(context.Context) {
authority.removeUnsubscribedCacheEntries(rType)
})
}
}
func resourceWatchStateForTesting(c *XDSClient, rType ResourceType, resourceName string) (xdsresource.ResourceWatchState, error) { func resourceWatchStateForTesting(c *XDSClient, rType ResourceType, resourceName string) (xdsresource.ResourceWatchState, error) {
c.channelsMu.Lock() c.channelsMu.Lock()
defer c.channelsMu.Unlock() defer c.channelsMu.Unlock()