diff --git a/internal/xds/clients/xdsclient/ads_stream.go b/internal/xds/clients/xdsclient/ads_stream.go index 4e88852fc..83f8a5df0 100644 --- a/internal/xds/clients/xdsclient/ads_stream.go +++ b/internal/xds/clients/xdsclient/ads_stream.go @@ -71,7 +71,6 @@ type adsStreamEventHandler interface { onStreamError(error) // Called when the ADS stream breaks. onWatchExpiry(ResourceType, string) // Called when the watch timer expires for a resource. onResponse(response, func()) ([]string, error) // Called when a response is received on the ADS stream. - onRequest(typeURL string) // Called when a request is about to be sent on the ADS stream. } // state corresponding to a resource type. @@ -445,11 +444,6 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string, } } - // Call the event handler to remove unsubscribed cache entries. It is to - // ensure the cache entries are deleted even if discovery request fails. In - // case of failure when the stream restarts, nonce is reset anyways. - s.eventHandler.onRequest(url) - msg, err := proto.Marshal(req) if err != nil { s.logger.Warningf("Failed to marshal DiscoveryRequest: %v", err) diff --git a/internal/xds/clients/xdsclient/authority.go b/internal/xds/clients/xdsclient/authority.go index 4ad55c350..49a4480cf 100644 --- a/internal/xds/clients/xdsclient/authority.go +++ b/internal/xds/clients/xdsclient/authority.go @@ -294,9 +294,6 @@ func (a *authority) fallbackToServer(xc *xdsChannelWithConfig) bool { // Subscribe to all existing resources from the new management server. for typ, resources := range a.resources { for name, state := range resources { - if len(state.watchers) == 0 { - continue - } if a.logger.V(2) { a.logger.Infof("Resubscribing to resource of type %q and name %q", typ.TypeName, name) } @@ -686,17 +683,6 @@ func (a *authority) watchResource(rType ResourceType, resourceName string, watch } resources[resourceName] = state xdsChannel.channel.subscribe(rType, resourceName) - } else if len(state.watchers) == 0 { - if a.logger.V(2) { - a.logger.Infof("Re-watch for type %q, resource name %q before unsubscription", rType.TypeName, resourceName) - } - // Add the active channel to the resource's channel configs if not - // already present. - state.xdsChannelConfigs[xdsChannel] = true - // Ensure the resource is subscribed on the active channel. We do this - // even if resource is present in cache as re-watches might occur - // after unsubscribes or channel changes. - xdsChannel.channel.subscribe(rType, resourceName) } // Always add the new watcher to the set of watchers. state.watchers[watcher] = true @@ -774,16 +760,32 @@ func (a *authority) unwatchResource(rType ResourceType, resourceName string, wat } // There are no more watchers for this resource. Unsubscribe this - // resource from all channels where it was subscribed to but do not - // delete the state associated with it in case the resource is - // re-requested later before un-subscription request is completed by - // the management server. + // resource from all channels where it was subscribed to and delete + // the state associated with it. if a.logger.V(2) { a.logger.Infof("Removing last watch for resource name %q", resourceName) } for xcc := range state.xdsChannelConfigs { xcc.channel.unsubscribe(rType, resourceName) } + delete(resources, resourceName) + + // If there are no more watchers for this resource type, delete the + // resource type from the top-level map. + if len(resources) == 0 { + if a.logger.V(2) { + a.logger.Infof("Removing last watch for resource type %q", rType.TypeName) + } + delete(a.resources, rType) + } + // If there are no more watchers for any resource type, release the + // reference to the xdsChannels. + if len(a.resources) == 0 { + if a.logger.V(2) { + a.logger.Infof("Removing last watch for for any resource type, releasing reference to the xdsChannel") + } + a.closeXDSChannels() + } }, func() { close(done) }) <-done }) @@ -835,7 +837,7 @@ func (a *authority) closeXDSChannels() { func (a *authority) watcherExistsForUncachedResource() bool { for _, resourceStates := range a.resources { for _, state := range resourceStates { - if len(state.watchers) > 0 && state.md.Status == xdsresource.ServiceStatusRequested { + if state.md.Status == xdsresource.ServiceStatusRequested { return true } } @@ -867,9 +869,6 @@ func (a *authority) resourceConfig() []*v3statuspb.ClientConfig_GenericXdsConfig for rType, resourceStates := range a.resources { typeURL := rType.TypeURL for name, state := range resourceStates { - if len(state.watchers) == 0 { - continue - } var raw *anypb.Any if state.cache != nil { raw = &anypb.Any{TypeUrl: typeURL, Value: state.cache.Bytes()} @@ -903,43 +902,6 @@ func (a *authority) close() { } } -// removeUnsubscribedCacheEntries iterates through all resources of the given type and -// removes the state for resources that have no active watchers. This is called -// after sending a discovery request to ensure that resources that were -// unsubscribed (and thus have no watchers) are eventually removed from the -// authority's cache. -func (a *authority) removeUnsubscribedCacheEntries(rType ResourceType) { - a.xdsClientSerializer.TrySchedule(func(context.Context) { - resources := a.resources[rType] - if resources == nil { - return - } - - for name, state := range resources { - if len(state.watchers) == 0 { - if a.logger.V(2) { - a.logger.Infof("Removing resource state for %q of type %q as it has no watchers", name, rType.TypeName) - } - delete(resources, name) - } - } - - if len(resources) == 0 { - if a.logger.V(2) { - a.logger.Infof("Removing resource type %q from cache as it has no more resources", rType.TypeName) - } - delete(a.resources, rType) - } - - if len(a.resources) == 0 { - if a.logger.V(2) { - a.logger.Infof("Removing last watch for any resource type, releasing reference to the xdsChannels") - } - a.closeXDSChannels() - } - }) -} - func serviceStatusToProto(serviceStatus xdsresource.ServiceStatus) v3adminpb.ClientResourceStatus { switch serviceStatus { case xdsresource.ServiceStatusUnknown: diff --git a/internal/xds/clients/xdsclient/channel.go b/internal/xds/clients/xdsclient/channel.go index f028b941b..e36e36220 100644 --- a/internal/xds/clients/xdsclient/channel.go +++ b/internal/xds/clients/xdsclient/channel.go @@ -59,10 +59,6 @@ type xdsChannelEventHandler interface { // adsResourceDoesNotExist is called when the xdsChannel determines that a // requested ADS resource does not exist. adsResourceDoesNotExist(ResourceType, string) - - // adsResourceRemoveUnsubscribedCacheEntries is called when the xdsChannel - // needs to remove unsubscribed cache entries. - adsResourceRemoveUnsubscribedCacheEntries(ResourceType) } // xdsChannelOpts holds the options for creating a new xdsChannel. @@ -140,32 +136,8 @@ type xdsChannel struct { } func (xc *xdsChannel) close() { - if xc.closed.HasFired() { - return - } xc.closed.Fire() - - // Get the resource types that this specific ADS stream was handling - // before stopping it. - // - // TODO: Revisit if we can avoid acquiring the lock of ads (another type). - xc.ads.mu.Lock() - typesHandledByStream := make([]ResourceType, 0, len(xc.ads.resourceTypeState)) - for typ := range xc.ads.resourceTypeState { - typesHandledByStream = append(typesHandledByStream, typ) - } - xc.ads.mu.Unlock() - xc.ads.Stop() - - // Schedule removeUnsubscribedCacheEntries for the types this stream was handling, - // on all authorities that were interested in this channel. - if _, ok := xc.eventHandler.(*channelState); ok { - for _, typ := range typesHandledByStream { - xc.eventHandler.adsResourceRemoveUnsubscribedCacheEntries(typ) - } - } - xc.transport.Close() xc.logger.Infof("Shutdown") } @@ -256,26 +228,6 @@ func (xc *xdsChannel) onResponse(resp response, onDone func()) ([]string, error) return names, err } -// onRequest invoked when a request is about to be sent on the ADS stream. It -// removes the cache entries for the resource type that are no longer subscribed to. -func (xc *xdsChannel) onRequest(typeURL string) { - if xc.closed.HasFired() { - if xc.logger.V(2) { - xc.logger.Infof("Received an update from the ADS stream on closed ADS stream") - } - return - } - - // Lookup the resource parser based on the resource type. - rType, ok := xc.clientConfig.ResourceTypes[typeURL] - if !ok { - logger.Warningf("Resource type URL %q unknown in response from server", typeURL) - return - } - - xc.eventHandler.adsResourceRemoveUnsubscribedCacheEntries(rType) -} - // decodeResponse decodes the resources in the given ADS response. // // The opts parameter provides configuration options for decoding the resources. diff --git a/internal/xds/clients/xdsclient/channel_test.go b/internal/xds/clients/xdsclient/channel_test.go index 2676f0039..153e7e824 100644 --- a/internal/xds/clients/xdsclient/channel_test.go +++ b/internal/xds/clients/xdsclient/channel_test.go @@ -772,6 +772,3 @@ func (ta *testEventHandler) waitForResourceDoesNotExist(ctx context.Context) (Re } return typ, name, nil } - -func (*testEventHandler) adsResourceRemoveUnsubscribedCacheEntries(ResourceType) { -} diff --git a/internal/xds/clients/xdsclient/test/misc_watchers_test.go b/internal/xds/clients/xdsclient/test/misc_watchers_test.go index b9e24957c..e611c2240 100644 --- a/internal/xds/clients/xdsclient/test/misc_watchers_test.go +++ b/internal/xds/clients/xdsclient/test/misc_watchers_test.go @@ -527,139 +527,3 @@ func (s) TestWatchErrorsContainNodeID_ChannelCreationFailure(t *testing.T) { } } } - -// TestUnsubscribeAndResubscribe tests the scenario where the client is busy -// processing a response (simulating a pending ACK at a higher level by holding -// the onDone callback from watchers). During this busy state, a resource is -// unsubscribed and then immediately resubscribed which causes the -// unsubscription and new subscription requests to be buffered due to flow -// control. -// -// The test verifies the following: -// - The resubscribed resource is served from the cache. -// - No "resource does not exist" error is generated for the resubscribed -// resource. -func (s) TestRaceUnsubscribeResubscribe(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) - nodeID := uuid.New().String() - - resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType} - si := clients.ServerIdentifier{ - ServerURI: mgmtServer.Address, - Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"}, - } - - configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}} - xdsClientConfig := xdsclient.Config{ - Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}}, - Node: clients.Node{ID: nodeID}, - TransportBuilder: grpctransport.NewBuilder(configs), - ResourceTypes: resourceTypes, - // Xdstp resource names used in this test do not specify an - // authority. These will end up looking up an entry with the - // empty key in the authorities map. Having an entry with an - // empty key and empty configuration, results in these - // resources also using the top-level configuration. - Authorities: map[string]xdsclient.Authority{ - "": {XDSServers: []xdsclient.ServerConfig{}}, - }, - } - - // Create an xDS client with the above config. - client, err := xdsclient.New(xdsClientConfig) - if err != nil { - t.Fatalf("Failed to create xDS client: %v", err) - } - defer client.Close() - - const ldsResourceName1 = "test-listener-resource1" - const ldsResourceName2 = "test-listener-resource2" - const rdsName1 = "test-route-configuration-resource1" - const rdsName2 = "test-route-configuration-resource2" - listenerResource1 := e2e.DefaultClientListener(ldsResourceName1, rdsName1) - listenerResource2 := e2e.DefaultClientListener(ldsResourceName2, rdsName2) - - // Watch ldsResourceName1 with a regular watcher to ensure it's in cache - // and ACKed. - watcherInitial := newListenerWatcher() - cancelInitial := client.WatchResource(xdsresource.V3ListenerURL, ldsResourceName1, watcherInitial) - if err := mgmtServer.Update(ctx, e2e.UpdateOptions{NodeID: nodeID, Listeners: []*v3listenerpb.Listener{listenerResource1}, SkipValidation: true}); err != nil { - t.Fatalf("mgmtServer.Update() for %s failed: %v", ldsResourceName1, err) - } - if err := verifyListenerUpdate(ctx, watcherInitial.updateCh, listenerUpdateErrTuple{update: listenerUpdate{RouteConfigName: rdsName1}}); err != nil { - t.Fatalf("watcherR1Initial did not receive update for %s: %v", ldsResourceName1, err) - } - cancelInitial() - - // Watch ldsResourceName1 and ldsResourceName2 using blocking watchers. - // - Server sends {ldsResourceName1, ldsResourceName2}. - // - Watchers for both resources get the update but we HOLD on to their - // onDone callbacks. - blockingWatcherR1 := newBLockingListenerWatcher() - cancelR1 := client.WatchResource(xdsresource.V3ListenerURL, ldsResourceName1, blockingWatcherR1) - // defer cancelR1 later to create the race - - blockingWatcherR2 := newBLockingListenerWatcher() - cancelR2 := client.WatchResource(xdsresource.V3ListenerURL, ldsResourceName2, blockingWatcherR2) - defer cancelR2() - - // Configure the listener resources on the management server. - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listenerResource1, listenerResource2}, - SkipValidation: true} - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatalf("mgmtServer.Update() for %s and %s failed: %v", ldsResourceName1, ldsResourceName2, err) - } - - var onDoneR1, onDoneR2 func() - select { - case <-blockingWatcherR1.updateCh: - onDoneR1 = <-blockingWatcherR1.doneNotifierCh - case <-ctx.Done(): - t.Fatalf("Timeout waiting for update for %s on blockingWatcherR1: %v", ldsResourceName1, ctx.Err()) - } - select { - case <-blockingWatcherR2.updateCh: - onDoneR2 = <-blockingWatcherR2.doneNotifierCh - case <-ctx.Done(): - t.Fatalf("Timeout waiting for update for %s on blockingWatcherR2: %v", ldsResourceName2, ctx.Err()) - } - - // At this point, ACK for {listenerResource1,listenerResource2} has been - // sent by the client but s.fc.pending.Load() is true because onDoneR1 and - // onDoneR2 are held. - // - // Unsubscribe listenerResource1. This request should be buffered by - // adsStreamImpl because s.fc.pending.Load() is true. - cancelR1() - - // Resubscribe listenerResource1 with a new regular watcher, which should - // be served from cache. - watcherR1New := newListenerWatcher() - cancelR1New := client.WatchResource(xdsresource.V3ListenerURL, ldsResourceName1, watcherR1New) - defer cancelR1New() - - if err := verifyListenerUpdate(ctx, watcherR1New.updateCh, listenerUpdateErrTuple{update: listenerUpdate{RouteConfigName: rdsName1}}); err != nil { - t.Fatalf("watcherR1New did not receive cached update for %s: %v", ldsResourceName1, err) - } - - // Release the onDone callbacks. - if onDoneR1 != nil { // onDoneR1 might be nil if cancelR1() completed very fast. - onDoneR1() - } - onDoneR2() - - // Verify watcherR1New does not get a "resource does not exist" error. - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout*10) // Slightly longer to catch delayed errors - defer sCancel() - if err := verifyNoListenerUpdate(sCtx, watcherR1New.resourceErrCh); err != nil { - t.Fatalf("watcherR1New received unexpected resource error for %s: %v", ldsResourceName1, err) - } - if err := verifyNoListenerUpdate(sCtx, watcherR1New.ambientErrCh); err != nil { - t.Fatalf("watcherR1New received unexpected ambient error for %s: %v", ldsResourceName1, err) - } -} diff --git a/internal/xds/clients/xdsclient/xdsclient.go b/internal/xds/clients/xdsclient/xdsclient.go index b6f27f4d8..a1949cfa5 100644 --- a/internal/xds/clients/xdsclient/xdsclient.go +++ b/internal/xds/clients/xdsclient/xdsclient.go @@ -437,19 +437,6 @@ func (cs *channelState) adsResourceDoesNotExist(typ ResourceType, resourceName s } } -func (cs *channelState) adsResourceRemoveUnsubscribedCacheEntries(rType ResourceType) { - if cs.parent.done.HasFired() { - return - } - - cs.parent.channelsMu.Lock() - defer cs.parent.channelsMu.Unlock() - - for authority := range cs.interestedAuthorities { - authority.removeUnsubscribedCacheEntries(rType) - } -} - func resourceWatchStateForTesting(c *XDSClient, rType ResourceType, resourceName string) (xdsresource.ResourceWatchState, error) { n := xdsresource.ParseName(resourceName) a := c.getAuthorityForResource(n)