Call shutdown after being stopped

Signed-off-by: whitewindmills <jayfantasyhjh@gmail.com>
This commit is contained in:
whitewindmills 2024-05-08 14:54:33 +08:00
parent 81b8c4c811
commit 0708b7d55b
3 changed files with 11 additions and 8 deletions

View File

@ -80,7 +80,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
} }
// Config returns config for the metrics-adapter server given Options // Config returns config for the metrics-adapter server given Options
func (o *Options) Config() (*metricsadapter.MetricsServer, error) { func (o *Options) Config(stopCh <-chan struct{}) (*metricsadapter.MetricsServer, error) {
restConfig, err := clientcmd.BuildConfigFromFlags("", o.KubeConfig) restConfig, err := clientcmd.BuildConfigFromFlags("", o.KubeConfig)
if err != nil { if err != nil {
klog.Errorf("Unable to build restConfig: %v", err) klog.Errorf("Unable to build restConfig: %v", err)
@ -92,7 +92,7 @@ func (o *Options) Config() (*metricsadapter.MetricsServer, error) {
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0) factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)
kubeClient := kubernetes.NewForConfigOrDie(restConfig) kubeClient := kubernetes.NewForConfigOrDie(restConfig)
kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0) kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0)
metricsController := metricsadapter.NewMetricsController(restConfig, factory, kubeFactory, &util.ClientOption{QPS: o.ClusterAPIQPS, Burst: o.ClusterAPIBurst}) metricsController := metricsadapter.NewMetricsController(stopCh, restConfig, factory, kubeFactory, &util.ClientOption{QPS: o.ClusterAPIQPS, Burst: o.ClusterAPIBurst})
metricsAdapter := metricsadapter.NewMetricsAdapter(metricsController, o.CustomMetricsAdapterServerOptions) metricsAdapter := metricsadapter.NewMetricsAdapter(metricsController, o.CustomMetricsAdapterServerOptions)
metricsAdapter.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme)) metricsAdapter.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme))
metricsAdapter.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme)) metricsAdapter.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme))
@ -138,10 +138,11 @@ func (o *Options) Run(ctx context.Context) error {
profileflag.ListenAndServe(o.ProfileOpts) profileflag.ListenAndServe(o.ProfileOpts)
metricsServer, err := o.Config() stopCh := ctx.Done()
metricsServer, err := o.Config(stopCh)
if err != nil { if err != nil {
return err return err
} }
return metricsServer.StartServer(ctx.Done()) return metricsServer.StartServer(stopCh)
} }

View File

@ -61,14 +61,14 @@ type MetricsController struct {
} }
// NewMetricsController creates a new metrics controller // NewMetricsController creates a new metrics controller
func NewMetricsController(restConfig *rest.Config, factory informerfactory.SharedInformerFactory, kubeFactory informers.SharedInformerFactory, clusterClientOption *util.ClientOption) *MetricsController { func NewMetricsController(stopCh <-chan struct{}, restConfig *rest.Config, factory informerfactory.SharedInformerFactory, kubeFactory informers.SharedInformerFactory, clusterClientOption *util.ClientOption) *MetricsController {
clusterLister := factory.Cluster().V1alpha1().Clusters().Lister() clusterLister := factory.Cluster().V1alpha1().Clusters().Lister()
controller := &MetricsController{ controller := &MetricsController{
InformerFactory: factory, InformerFactory: factory,
ClusterLister: clusterLister, ClusterLister: clusterLister,
MultiClusterDiscovery: multiclient.NewMultiClusterDiscoveryClient(clusterLister, kubeFactory, clusterClientOption), MultiClusterDiscovery: multiclient.NewMultiClusterDiscoveryClient(clusterLister, kubeFactory, clusterClientOption),
InformerManager: genericmanager.GetInstance(), InformerManager: genericmanager.GetInstance(),
TypedInformerManager: newInstance(), TypedInformerManager: newInstance(stopCh),
restConfig: restConfig, restConfig: restConfig,
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{ queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
Name: "metrics-adapter", Name: "metrics-adapter",
@ -131,12 +131,12 @@ func podTransformFunc(obj interface{}) (interface{}, error) {
return aggregatedPod, nil return aggregatedPod, nil
} }
func newInstance() typedmanager.MultiClusterInformerManager { func newInstance(stopCh <-chan struct{}) typedmanager.MultiClusterInformerManager {
transforms := map[schema.GroupVersionResource]cache.TransformFunc{ transforms := map[schema.GroupVersionResource]cache.TransformFunc{
provider.NodesGVR: cache.TransformFunc(nodeTransformFunc), provider.NodesGVR: cache.TransformFunc(nodeTransformFunc),
provider.PodsGVR: cache.TransformFunc(podTransformFunc), provider.PodsGVR: cache.TransformFunc(podTransformFunc),
} }
return typedmanager.NewMultiClusterInformerManager(context.TODO().Done(), transforms) return typedmanager.NewMultiClusterInformerManager(stopCh, transforms)
} }
// addEventHandler adds event handler for cluster // addEventHandler adds event handler for cluster
@ -188,6 +188,7 @@ func (m *MetricsController) startController(stopCh <-chan struct{}) {
go func() { go func() {
<-stopCh <-stopCh
m.queue.ShutDown()
genericmanager.StopInstance() genericmanager.StopInstance()
klog.Infof("Shutting down karmada-metrics-adapter") klog.Infof("Shutting down karmada-metrics-adapter")
}() }()

View File

@ -293,6 +293,7 @@ func (s *Scheduler) Run(ctx context.Context) {
go wait.Until(s.worker, time.Second, stopCh) go wait.Until(s.worker, time.Second, stopCh)
<-stopCh <-stopCh
s.queue.ShutDown()
} }
func (s *Scheduler) worker() { func (s *Scheduler) worker() {