From 0708b7d55b5f9263621ce7a0b8dfe4764ea9c967 Mon Sep 17 00:00:00 2001 From: whitewindmills Date: Wed, 8 May 2024 14:54:33 +0800 Subject: [PATCH] Call shutdown after being stopped Signed-off-by: whitewindmills --- cmd/metrics-adapter/app/options/options.go | 9 +++++---- pkg/metricsadapter/controller.go | 9 +++++---- pkg/scheduler/scheduler.go | 1 + 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/cmd/metrics-adapter/app/options/options.go b/cmd/metrics-adapter/app/options/options.go index c2a829dc0..f88f070ba 100755 --- a/cmd/metrics-adapter/app/options/options.go +++ b/cmd/metrics-adapter/app/options/options.go @@ -80,7 +80,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { } // Config returns config for the metrics-adapter server given Options -func (o *Options) Config() (*metricsadapter.MetricsServer, error) { +func (o *Options) Config(stopCh <-chan struct{}) (*metricsadapter.MetricsServer, error) { restConfig, err := clientcmd.BuildConfigFromFlags("", o.KubeConfig) if err != nil { klog.Errorf("Unable to build restConfig: %v", err) @@ -92,7 +92,7 @@ func (o *Options) Config() (*metricsadapter.MetricsServer, error) { factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0) kubeClient := kubernetes.NewForConfigOrDie(restConfig) kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0) - metricsController := metricsadapter.NewMetricsController(restConfig, factory, kubeFactory, &util.ClientOption{QPS: o.ClusterAPIQPS, Burst: o.ClusterAPIBurst}) + metricsController := metricsadapter.NewMetricsController(stopCh, restConfig, factory, kubeFactory, &util.ClientOption{QPS: o.ClusterAPIQPS, Burst: o.ClusterAPIBurst}) metricsAdapter := metricsadapter.NewMetricsAdapter(metricsController, o.CustomMetricsAdapterServerOptions) metricsAdapter.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme)) metricsAdapter.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme)) @@ -138,10 +138,11 @@ func (o *Options) Run(ctx context.Context) error { profileflag.ListenAndServe(o.ProfileOpts) - metricsServer, err := o.Config() + stopCh := ctx.Done() + metricsServer, err := o.Config(stopCh) if err != nil { return err } - return metricsServer.StartServer(ctx.Done()) + return metricsServer.StartServer(stopCh) } diff --git a/pkg/metricsadapter/controller.go b/pkg/metricsadapter/controller.go index e1e392fab..13c9c54ea 100755 --- a/pkg/metricsadapter/controller.go +++ b/pkg/metricsadapter/controller.go @@ -61,14 +61,14 @@ type MetricsController struct { } // NewMetricsController creates a new metrics controller -func NewMetricsController(restConfig *rest.Config, factory informerfactory.SharedInformerFactory, kubeFactory informers.SharedInformerFactory, clusterClientOption *util.ClientOption) *MetricsController { +func NewMetricsController(stopCh <-chan struct{}, restConfig *rest.Config, factory informerfactory.SharedInformerFactory, kubeFactory informers.SharedInformerFactory, clusterClientOption *util.ClientOption) *MetricsController { clusterLister := factory.Cluster().V1alpha1().Clusters().Lister() controller := &MetricsController{ InformerFactory: factory, ClusterLister: clusterLister, MultiClusterDiscovery: multiclient.NewMultiClusterDiscoveryClient(clusterLister, kubeFactory, clusterClientOption), InformerManager: genericmanager.GetInstance(), - TypedInformerManager: newInstance(), + TypedInformerManager: newInstance(stopCh), restConfig: restConfig, queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{ Name: "metrics-adapter", @@ -131,12 +131,12 @@ func podTransformFunc(obj interface{}) (interface{}, error) { return aggregatedPod, nil } -func newInstance() typedmanager.MultiClusterInformerManager { +func newInstance(stopCh <-chan struct{}) typedmanager.MultiClusterInformerManager { transforms := map[schema.GroupVersionResource]cache.TransformFunc{ provider.NodesGVR: cache.TransformFunc(nodeTransformFunc), provider.PodsGVR: cache.TransformFunc(podTransformFunc), } - return typedmanager.NewMultiClusterInformerManager(context.TODO().Done(), transforms) + return typedmanager.NewMultiClusterInformerManager(stopCh, transforms) } // addEventHandler adds event handler for cluster @@ -188,6 +188,7 @@ func (m *MetricsController) startController(stopCh <-chan struct{}) { go func() { <-stopCh + m.queue.ShutDown() genericmanager.StopInstance() klog.Infof("Shutting down karmada-metrics-adapter") }() diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 1086befc7..b03dbf7cf 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -293,6 +293,7 @@ func (s *Scheduler) Run(ctx context.Context) { go wait.Until(s.worker, time.Second, stopCh) <-stopCh + s.queue.ShutDown() } func (s *Scheduler) worker() {