Cleanup rest storage resources on shutdown
Kubernetes-commit: 0527a0dd453c4b76259389ec8e8e6888c5e2a5ab
This commit is contained in:
		
							parent
							
								
									242a14763d
								
							
						
					
					
						commit
						fbdcc3ee50
					
				|  | @ -70,9 +70,12 @@ func StorageWithCacher() generic.StorageDecorator { | |||
| 		if err != nil { | ||||
| 			return nil, func() {}, err | ||||
| 		} | ||||
| 		var once sync.Once | ||||
| 		destroyFunc := func() { | ||||
| 			once.Do(func() { | ||||
| 				cacher.Stop() | ||||
| 				d() | ||||
| 			}) | ||||
| 		} | ||||
| 
 | ||||
| 		// TODO : Remove RegisterStorageCleanup below when PR
 | ||||
|  |  | |||
|  | @ -217,7 +217,10 @@ type Store struct { | |||
| 	// If the StorageVersioner is nil, apiserver will leave the
 | ||||
| 	// storageVersionHash as empty in the discovery document.
 | ||||
| 	StorageVersioner runtime.GroupVersioner | ||||
| 	// Called to cleanup clients used by the underlying Storage; optional.
 | ||||
| 
 | ||||
| 	// DestroyFunc cleans up clients used by the underlying Storage; optional.
 | ||||
| 	// If set, DestroyFunc has to be implemented in thread-safe way and
 | ||||
| 	// be prepared for being called more than once.
 | ||||
| 	DestroyFunc func() | ||||
| } | ||||
| 
 | ||||
|  | @ -279,6 +282,13 @@ func (e *Store) New() runtime.Object { | |||
| 	return e.NewFunc() | ||||
| } | ||||
| 
 | ||||
| // Destroy cleans up its resources on shutdown.
 | ||||
| func (e *Store) Destroy() { | ||||
| 	if e.DestroyFunc != nil { | ||||
| 		e.DestroyFunc() | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // NewList implements rest.Lister.
 | ||||
| func (e *Store) NewList() runtime.Object { | ||||
| 	return e.NewListFunc() | ||||
|  | @ -1433,11 +1443,14 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error { | |||
| 		if opts.CountMetricPollPeriod > 0 { | ||||
| 			stopFunc := e.startObservingCount(opts.CountMetricPollPeriod, opts.StorageObjectCountTracker) | ||||
| 			previousDestroy := e.DestroyFunc | ||||
| 			var once sync.Once | ||||
| 			e.DestroyFunc = func() { | ||||
| 				once.Do(func() { | ||||
| 					stopFunc() | ||||
| 					if previousDestroy != nil { | ||||
| 						previousDestroy() | ||||
| 					} | ||||
| 				}) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  |  | |||
|  | @ -89,6 +89,16 @@ type APIGroupInfo struct { | |||
| 	StaticOpenAPISpec *spec.Swagger | ||||
| } | ||||
| 
 | ||||
| func (a *APIGroupInfo) destroyStorage() { | ||||
| 	for _, stores := range a.VersionedResourcesStorageMap { | ||||
| 		for _, store := range stores { | ||||
| 			// TODO(wojtek-t): Uncomment once all storage support it.
 | ||||
| 			klog.Errorf("Destroying storage: %v", store) | ||||
| 			// store.Destroy()
 | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // GenericAPIServer contains state for a Kubernetes cluster api server.
 | ||||
| type GenericAPIServer struct { | ||||
| 	// discoveryAddresses is used to build cluster IPs for discovery.
 | ||||
|  | @ -222,6 +232,9 @@ type GenericAPIServer struct { | |||
| 	// lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver.
 | ||||
| 	lifecycleSignals lifecycleSignals | ||||
| 
 | ||||
| 	// destroyFns contains a list of functions that should be called on shutdown to clean up resources.
 | ||||
| 	destroyFns []func() | ||||
| 
 | ||||
| 	// muxAndDiscoveryCompleteSignals holds signals that indicate all known HTTP paths have been registered.
 | ||||
| 	// it exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler.
 | ||||
| 	// it is exposed for easier composition of the individual servers.
 | ||||
|  | @ -264,6 +277,11 @@ type DelegationTarget interface { | |||
| 
 | ||||
| 	// MuxAndDiscoveryCompleteSignals exposes registered signals that indicate if all known HTTP paths have been installed.
 | ||||
| 	MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{} | ||||
| 
 | ||||
| 	// Destroy cleans up its resources on shutdown.
 | ||||
| 	// Destroy has to be implemented in thread-safe way and be prepared
 | ||||
| 	// for being called more than once.
 | ||||
| 	Destroy() | ||||
| } | ||||
| 
 | ||||
| func (s *GenericAPIServer) UnprotectedHandler() http.Handler { | ||||
|  | @ -301,6 +319,18 @@ func (s *GenericAPIServer) MuxAndDiscoveryCompleteSignals() map[string]<-chan st | |||
| 	return s.muxAndDiscoveryCompleteSignals | ||||
| } | ||||
| 
 | ||||
| // Destroy cleans up all its and its delegation target resources on shutdown.
 | ||||
| // It starts with destroying its own resources and later proceeds with
 | ||||
| // its delegation target.
 | ||||
| func (s *GenericAPIServer) Destroy() { | ||||
| 	for _, destroyFn := range s.destroyFns { | ||||
| 		destroyFn() | ||||
| 	} | ||||
| 	if s.delegationTarget != nil { | ||||
| 		s.delegationTarget.Destroy() | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| type emptyDelegate struct { | ||||
| 	// handler is called at the end of the delegation chain
 | ||||
| 	// when a request has been made against an unregistered HTTP path the individual servers will simply pass it through until it reaches the handler.
 | ||||
|  | @ -340,6 +370,8 @@ func (s emptyDelegate) PrepareRun() preparedGenericAPIServer { | |||
| func (s emptyDelegate) MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{} { | ||||
| 	return map[string]<-chan struct{}{} | ||||
| } | ||||
| func (s emptyDelegate) Destroy() { | ||||
| } | ||||
| 
 | ||||
| // preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked.
 | ||||
| type preparedGenericAPIServer struct { | ||||
|  | @ -395,6 +427,9 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { | |||
| 	delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration | ||||
| 	shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated | ||||
| 
 | ||||
| 	// Clean up resources on shutdown.
 | ||||
| 	defer s.Destroy() | ||||
| 
 | ||||
| 	// spawn a new goroutine for closing the MuxAndDiscoveryComplete signal
 | ||||
| 	// registration happens during construction of the generic api server
 | ||||
| 	// the last server in the chain aggregates signals from the previous instances
 | ||||
|  | @ -584,6 +619,8 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A | |||
| 		resourceInfos = append(resourceInfos, r...) | ||||
| 	} | ||||
| 
 | ||||
| 	s.destroyFns = append(s.destroyFns, apiGroupInfo.destroyStorage) | ||||
| 
 | ||||
| 	if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) && | ||||
| 		utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) { | ||||
| 		// API installation happens before we start listening on the handlers,
 | ||||
|  | @ -595,6 +632,9 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // InstallLegacyAPIGroup exposes the given legacy api group in the API.
 | ||||
| // The <apiGroupInfo> passed into this function shouldn't be used elsewhere as the
 | ||||
| // underlying storage will be destroyed on this servers shutdown.
 | ||||
| func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error { | ||||
| 	if !s.legacyAPIGroupPrefixes.Has(apiPrefix) { | ||||
| 		return fmt.Errorf("%q is not in the allowed legacy API prefixes: %v", apiPrefix, s.legacyAPIGroupPrefixes.List()) | ||||
|  | @ -616,7 +656,9 @@ func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Exposes given api groups in the API.
 | ||||
| // InstallAPIGroups exposes given api groups in the API.
 | ||||
| // The <apiGroupInfos> passed into this function shouldn't be used elsewhere as the
 | ||||
| // underlying storage will be destroyed on this servers shutdown.
 | ||||
| func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error { | ||||
| 	for _, apiGroupInfo := range apiGroupInfos { | ||||
| 		// Do not register empty group or empty version.  Doing so claims /apis/ for the wrong entity to be returned.
 | ||||
|  | @ -669,7 +711,9 @@ func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) erro | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Exposes the given api group in the API.
 | ||||
| // InstallAPIGroup exposes the given api group in the API.
 | ||||
| // The <apiGroupInfo> passed into this function shouldn't be used elsewhere as the
 | ||||
| // underlying storage will be destroyed on this servers shutdown.
 | ||||
| func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error { | ||||
| 	return s.InstallAPIGroups(apiGroupInfo) | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue