Merge pull request #1131 from iawia002/agent-controllers-flag
Add --controllers flag to karmada-agent and move ControllerContext to a separate package
This commit is contained in:
commit
305439ae0a
|
@ -17,6 +17,7 @@ import (
|
|||
|
||||
"github.com/karmada-io/karmada/cmd/agent/app/options"
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
controllerscontext "github.com/karmada-io/karmada/pkg/controllers/context"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/execution"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/mcs"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/status"
|
||||
|
@ -60,12 +61,21 @@ func NewAgentCommand(ctx context.Context) *cobra.Command {
|
|||
},
|
||||
}
|
||||
|
||||
opts.AddFlags(cmd.Flags())
|
||||
opts.AddFlags(cmd.Flags(), controllers.ControllerNames())
|
||||
cmd.AddCommand(sharedcommand.NewCmdVersion(os.Stdout, "karmada-agent"))
|
||||
cmd.Flags().AddGoFlagSet(flag.CommandLine)
|
||||
return cmd
|
||||
}
|
||||
|
||||
var controllers = make(controllerscontext.Initializers)
|
||||
|
||||
func init() {
|
||||
controllers["clusterStatus"] = startClusterStatusController
|
||||
controllers["execution"] = startExecutionController
|
||||
controllers["workStatus"] = startWorkStatusController
|
||||
controllers["serviceExport"] = startServiceExportController
|
||||
}
|
||||
|
||||
func run(ctx context.Context, karmadaConfig karmadactl.KarmadaConfig, opts *options.Options) error {
|
||||
klog.Infof("karmada-agent version: %s", version.Get())
|
||||
controlPlaneRestConfig, err := karmadaConfig.GetRestConfig(opts.KarmadaContext, opts.KarmadaKubeConfig)
|
||||
|
@ -110,25 +120,6 @@ func run(ctx context.Context, karmadaConfig karmadactl.KarmadaConfig, opts *opti
|
|||
}
|
||||
|
||||
func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
|
||||
clusterStatusController := &status.ClusterStatusController{
|
||||
Client: mgr.GetClient(),
|
||||
KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()),
|
||||
EventRecorder: mgr.GetEventRecorderFor(status.ControllerName),
|
||||
PredicateFunc: helper.NewClusterPredicateOnAgent(opts.ClusterName),
|
||||
InformerManager: informermanager.GetInstance(),
|
||||
StopChan: stopChan,
|
||||
ClusterClientSetFunc: util.NewClusterClientSetForAgent,
|
||||
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
|
||||
ClusterClientOption: &util.ClientOption{QPS: opts.ClusterAPIQPS, Burst: opts.ClusterAPIBurst},
|
||||
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
|
||||
ClusterLeaseDuration: opts.ClusterLeaseDuration,
|
||||
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
|
||||
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
|
||||
}
|
||||
if err := clusterStatusController.SetupWithManager(mgr); err != nil {
|
||||
klog.Fatalf("Failed to setup cluster status controller: %v", err)
|
||||
}
|
||||
|
||||
restConfig := mgr.GetConfig()
|
||||
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
|
||||
controlPlaneInformerManager := informermanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan)
|
||||
|
@ -138,50 +129,24 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
}
|
||||
|
||||
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, resourceInterpreter)
|
||||
executionController := &execution.Controller{
|
||||
Client: mgr.GetClient(),
|
||||
EventRecorder: mgr.GetEventRecorderFor(execution.ControllerName),
|
||||
RESTMapper: mgr.GetRESTMapper(),
|
||||
ObjectWatcher: objectWatcher,
|
||||
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
|
||||
InformerManager: informermanager.GetInstance(),
|
||||
ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent,
|
||||
}
|
||||
if err := executionController.SetupWithManager(mgr); err != nil {
|
||||
klog.Fatalf("Failed to setup execution controller: %v", err)
|
||||
controllerContext := controllerscontext.Context{
|
||||
Mgr: mgr,
|
||||
ObjectWatcher: objectWatcher,
|
||||
Opts: controllerscontext.Options{
|
||||
Controllers: opts.Controllers,
|
||||
ClusterName: opts.ClusterName,
|
||||
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
|
||||
ClusterLeaseDuration: opts.ClusterLeaseDuration,
|
||||
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
|
||||
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
|
||||
ClusterAPIQPS: opts.ClusterAPIQPS,
|
||||
ClusterAPIBurst: opts.ClusterAPIBurst,
|
||||
},
|
||||
StopChan: stopChan,
|
||||
}
|
||||
|
||||
workStatusController := &status.WorkStatusController{
|
||||
Client: mgr.GetClient(),
|
||||
EventRecorder: mgr.GetEventRecorderFor(status.WorkStatusControllerName),
|
||||
RESTMapper: mgr.GetRESTMapper(),
|
||||
InformerManager: informermanager.GetInstance(),
|
||||
StopChan: stopChan,
|
||||
WorkerNumber: 1,
|
||||
ObjectWatcher: objectWatcher,
|
||||
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
|
||||
ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent,
|
||||
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
|
||||
}
|
||||
workStatusController.RunWorkQueue()
|
||||
if err := workStatusController.SetupWithManager(mgr); err != nil {
|
||||
klog.Fatalf("Failed to setup work status controller: %v", err)
|
||||
}
|
||||
|
||||
serviceExportController := &mcs.ServiceExportController{
|
||||
Client: mgr.GetClient(),
|
||||
EventRecorder: mgr.GetEventRecorderFor(mcs.ServiceExportControllerName),
|
||||
RESTMapper: mgr.GetRESTMapper(),
|
||||
InformerManager: informermanager.GetInstance(),
|
||||
StopChan: stopChan,
|
||||
WorkerNumber: 1,
|
||||
PredicateFunc: helper.NewPredicateForServiceExportControllerOnAgent(opts.ClusterName),
|
||||
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
|
||||
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
|
||||
}
|
||||
serviceExportController.RunWorkQueue()
|
||||
if err := serviceExportController.SetupWithManager(mgr); err != nil {
|
||||
klog.Fatalf("Failed to setup ServiceExport controller: %v", err)
|
||||
if err := controllers.StartControllers(controllerContext); err != nil {
|
||||
klog.Fatalf("error starting controllers: %v", err)
|
||||
}
|
||||
|
||||
// Ensure the InformerManager stops when the stop channel closes
|
||||
|
@ -191,6 +156,83 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
}()
|
||||
}
|
||||
|
||||
func startClusterStatusController(ctx controllerscontext.Context) (bool, error) {
|
||||
clusterStatusController := &status.ClusterStatusController{
|
||||
Client: ctx.Mgr.GetClient(),
|
||||
KubeClient: kubeclientset.NewForConfigOrDie(ctx.Mgr.GetConfig()),
|
||||
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.ControllerName),
|
||||
PredicateFunc: helper.NewClusterPredicateOnAgent(ctx.Opts.ClusterName),
|
||||
InformerManager: informermanager.GetInstance(),
|
||||
StopChan: ctx.StopChan,
|
||||
ClusterClientSetFunc: util.NewClusterClientSetForAgent,
|
||||
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
|
||||
ClusterClientOption: &util.ClientOption{QPS: ctx.Opts.ClusterAPIQPS, Burst: ctx.Opts.ClusterAPIBurst},
|
||||
ClusterStatusUpdateFrequency: ctx.Opts.ClusterStatusUpdateFrequency,
|
||||
ClusterLeaseDuration: ctx.Opts.ClusterLeaseDuration,
|
||||
ClusterLeaseRenewIntervalFraction: ctx.Opts.ClusterLeaseRenewIntervalFraction,
|
||||
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,
|
||||
}
|
||||
if err := clusterStatusController.SetupWithManager(ctx.Mgr); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func startExecutionController(ctx controllerscontext.Context) (bool, error) {
|
||||
executionController := &execution.Controller{
|
||||
Client: ctx.Mgr.GetClient(),
|
||||
EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName),
|
||||
RESTMapper: ctx.Mgr.GetRESTMapper(),
|
||||
ObjectWatcher: ctx.ObjectWatcher,
|
||||
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
|
||||
InformerManager: informermanager.GetInstance(),
|
||||
ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent,
|
||||
}
|
||||
if err := executionController.SetupWithManager(ctx.Mgr); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func startWorkStatusController(ctx controllerscontext.Context) (bool, error) {
|
||||
workStatusController := &status.WorkStatusController{
|
||||
Client: ctx.Mgr.GetClient(),
|
||||
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
|
||||
RESTMapper: ctx.Mgr.GetRESTMapper(),
|
||||
InformerManager: informermanager.GetInstance(),
|
||||
StopChan: ctx.StopChan,
|
||||
WorkerNumber: 1,
|
||||
ObjectWatcher: ctx.ObjectWatcher,
|
||||
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
|
||||
ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent,
|
||||
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,
|
||||
}
|
||||
workStatusController.RunWorkQueue()
|
||||
if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func startServiceExportController(ctx controllerscontext.Context) (bool, error) {
|
||||
serviceExportController := &mcs.ServiceExportController{
|
||||
Client: ctx.Mgr.GetClient(),
|
||||
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.ServiceExportControllerName),
|
||||
RESTMapper: ctx.Mgr.GetRESTMapper(),
|
||||
InformerManager: informermanager.GetInstance(),
|
||||
StopChan: ctx.StopChan,
|
||||
WorkerNumber: 1,
|
||||
PredicateFunc: helper.NewPredicateForServiceExportControllerOnAgent(ctx.Opts.ClusterName),
|
||||
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
|
||||
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,
|
||||
}
|
||||
serviceExportController.RunWorkQueue()
|
||||
if err := serviceExportController.SetupWithManager(ctx.Mgr); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func registerWithControlPlaneAPIServer(controlPlaneRestConfig *restclient.Config, memberClusterName string) error {
|
||||
client := gclient.NewForConfigOrDie(controlPlaneRestConfig)
|
||||
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package options
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
|
@ -13,6 +15,8 @@ import (
|
|||
|
||||
// Options contains everything necessary to create and run controller-manager.
|
||||
type Options struct {
|
||||
// Controllers contains all controller names.
|
||||
Controllers []string
|
||||
LeaderElection componentbaseconfig.LeaderElectionConfiguration
|
||||
KarmadaKubeConfig string
|
||||
// ClusterContext is the name of the cluster context in control plane KUBECONFIG file.
|
||||
|
@ -53,11 +57,15 @@ func NewOptions() *Options {
|
|||
}
|
||||
|
||||
// AddFlags adds flags of scheduler to the specified FlagSet
|
||||
func (o *Options) AddFlags(fs *pflag.FlagSet) {
|
||||
func (o *Options) AddFlags(fs *pflag.FlagSet, allControllers []string) {
|
||||
if o == nil {
|
||||
return
|
||||
}
|
||||
|
||||
fs.StringSliceVar(&o.Controllers, "controllers", []string{"*"}, fmt.Sprintf(
|
||||
"A list of controllers to enable. '*' enables all on-by-default controllers, 'foo' enables the controller named 'foo', '-foo' disables the controller named 'foo'. All controllers: %s.",
|
||||
strings.Join(allControllers, ", "),
|
||||
))
|
||||
fs.BoolVar(&o.LeaderElection.LeaderElect, "leader-elect", true, "Start a leader election client and gain leadership before executing the main loop. Enable this when running replicated components for high availability.")
|
||||
fs.StringVar(&o.LeaderElection.ResourceNamespace, "leader-elect-resource-namespace", util.NamespaceKarmadaSystem, "The namespace of resource object that is used for locking during leader election.")
|
||||
fs.StringVar(&o.KarmadaKubeConfig, "karmada-kubeconfig", o.KarmadaKubeConfig, "Path to karmada control plane kubeconfig file.")
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"strconv"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/dynamic"
|
||||
kubeclientset "k8s.io/client-go/kubernetes"
|
||||
|
@ -24,6 +23,7 @@ import (
|
|||
"github.com/karmada-io/karmada/pkg/clusterdiscovery/clusterapi"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/binding"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/cluster"
|
||||
controllerscontext "github.com/karmada-io/karmada/pkg/controllers/context"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/execution"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/hpa"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/mcs"
|
||||
|
@ -62,7 +62,7 @@ func NewControllerManagerCommand(ctx context.Context) *cobra.Command {
|
|||
|
||||
cmd.Flags().AddGoFlagSet(flag.CommandLine)
|
||||
cmd.AddCommand(sharedcommand.NewCmdVersion(os.Stdout, "karmada-controller-manager"))
|
||||
opts.AddFlags(cmd.Flags(), KnownControllers())
|
||||
opts.AddFlags(cmd.Flags(), controllers.ControllerNames())
|
||||
return cmd
|
||||
}
|
||||
|
||||
|
@ -105,42 +105,9 @@ func Run(ctx context.Context, opts *options.Options) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// ControllerContext defines the context object for controller
|
||||
type ControllerContext struct {
|
||||
Mgr controllerruntime.Manager
|
||||
ObjectWatcher objectwatcher.ObjectWatcher
|
||||
Opts *options.Options
|
||||
StopChan <-chan struct{}
|
||||
DynamicClientSet dynamic.Interface
|
||||
OverrideManager overridemanager.OverrideManager
|
||||
ControlPlaneInformerManager informermanager.SingleClusterInformerManager
|
||||
}
|
||||
var controllers = make(controllerscontext.Initializers)
|
||||
|
||||
// IsControllerEnabled check if a specified controller enabled or not.
|
||||
func (c ControllerContext) IsControllerEnabled(name string) bool {
|
||||
hasStar := false
|
||||
for _, ctrl := range c.Opts.Controllers {
|
||||
if ctrl == name {
|
||||
return true
|
||||
}
|
||||
if ctrl == "-"+name {
|
||||
return false
|
||||
}
|
||||
if ctrl == "*" {
|
||||
hasStar = true
|
||||
}
|
||||
}
|
||||
return hasStar
|
||||
}
|
||||
|
||||
// InitFunc is used to launch a particular controller.
|
||||
// Any error returned will cause the controller process to `Fatal`
|
||||
// The bool indicates whether the controller was enabled.
|
||||
type InitFunc func(ctx ControllerContext) (enabled bool, err error)
|
||||
|
||||
// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
|
||||
func NewControllerInitializers() map[string]InitFunc {
|
||||
controllers := map[string]InitFunc{}
|
||||
func init() {
|
||||
controllers["cluster"] = startClusterController
|
||||
controllers["clusterStatus"] = startClusterStatusController
|
||||
controllers["hpa"] = startHpaController
|
||||
|
@ -152,15 +119,9 @@ func NewControllerInitializers() map[string]InitFunc {
|
|||
controllers["endpointSlice"] = startEndpointSliceController
|
||||
controllers["serviceImport"] = startServiceImportController
|
||||
controllers["unifiedAuth"] = startUnifiedAuthController
|
||||
return controllers
|
||||
}
|
||||
|
||||
// KnownControllers returns all known controllers's name
|
||||
func KnownControllers() []string {
|
||||
return sets.StringKeySet(NewControllerInitializers()).List()
|
||||
}
|
||||
|
||||
func startClusterController(ctx ControllerContext) (enabled bool, err error) {
|
||||
func startClusterController(ctx controllerscontext.Context) (enabled bool, err error) {
|
||||
mgr := ctx.Mgr
|
||||
opts := ctx.Opts
|
||||
clusterController := &cluster.Controller{
|
||||
|
@ -176,7 +137,7 @@ func startClusterController(ctx ControllerContext) (enabled bool, err error) {
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func startClusterStatusController(ctx ControllerContext) (enabled bool, err error) {
|
||||
func startClusterStatusController(ctx controllerscontext.Context) (enabled bool, err error) {
|
||||
mgr := ctx.Mgr
|
||||
opts := ctx.Opts
|
||||
stopChan := ctx.StopChan
|
||||
|
@ -218,7 +179,7 @@ func startClusterStatusController(ctx ControllerContext) (enabled bool, err erro
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func startHpaController(ctx ControllerContext) (enabled bool, err error) {
|
||||
func startHpaController(ctx controllerscontext.Context) (enabled bool, err error) {
|
||||
hpaController := &hpa.HorizontalPodAutoscalerController{
|
||||
Client: ctx.Mgr.GetClient(),
|
||||
DynamicClient: ctx.DynamicClientSet,
|
||||
|
@ -232,7 +193,7 @@ func startHpaController(ctx ControllerContext) (enabled bool, err error) {
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func startBindingController(ctx ControllerContext) (enabled bool, err error) {
|
||||
func startBindingController(ctx controllerscontext.Context) (enabled bool, err error) {
|
||||
bindingController := &binding.ResourceBindingController{
|
||||
Client: ctx.Mgr.GetClient(),
|
||||
DynamicClient: ctx.DynamicClientSet,
|
||||
|
@ -259,7 +220,7 @@ func startBindingController(ctx ControllerContext) (enabled bool, err error) {
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func startExecutionController(ctx ControllerContext) (enabled bool, err error) {
|
||||
func startExecutionController(ctx controllerscontext.Context) (enabled bool, err error) {
|
||||
executionController := &execution.Controller{
|
||||
Client: ctx.Mgr.GetClient(),
|
||||
EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName),
|
||||
|
@ -275,7 +236,7 @@ func startExecutionController(ctx ControllerContext) (enabled bool, err error) {
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func startWorkStatusController(ctx ControllerContext) (enabled bool, err error) {
|
||||
func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, err error) {
|
||||
opts := ctx.Opts
|
||||
workStatusController := &status.WorkStatusController{
|
||||
Client: ctx.Mgr.GetClient(),
|
||||
|
@ -296,7 +257,7 @@ func startWorkStatusController(ctx ControllerContext) (enabled bool, err error)
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func startNamespaceController(ctx ControllerContext) (enabled bool, err error) {
|
||||
func startNamespaceController(ctx controllerscontext.Context) (enabled bool, err error) {
|
||||
skippedPropagatingNamespaces := map[string]struct{}{}
|
||||
for _, ns := range ctx.Opts.SkippedPropagatingNamespaces {
|
||||
skippedPropagatingNamespaces[ns] = struct{}{}
|
||||
|
@ -312,7 +273,7 @@ func startNamespaceController(ctx ControllerContext) (enabled bool, err error) {
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func startServiceExportController(ctx ControllerContext) (enabled bool, err error) {
|
||||
func startServiceExportController(ctx controllerscontext.Context) (enabled bool, err error) {
|
||||
opts := ctx.Opts
|
||||
serviceExportController := &mcs.ServiceExportController{
|
||||
Client: ctx.Mgr.GetClient(),
|
||||
|
@ -332,7 +293,7 @@ func startServiceExportController(ctx ControllerContext) (enabled bool, err erro
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func startEndpointSliceController(ctx ControllerContext) (enabled bool, err error) {
|
||||
func startEndpointSliceController(ctx controllerscontext.Context) (enabled bool, err error) {
|
||||
endpointSliceController := &mcs.EndpointSliceController{
|
||||
Client: ctx.Mgr.GetClient(),
|
||||
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.EndpointSliceControllerName),
|
||||
|
@ -343,7 +304,7 @@ func startEndpointSliceController(ctx ControllerContext) (enabled bool, err erro
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func startServiceImportController(ctx ControllerContext) (enabled bool, err error) {
|
||||
func startServiceImportController(ctx controllerscontext.Context) (enabled bool, err error) {
|
||||
serviceImportController := &mcs.ServiceImportController{
|
||||
Client: ctx.Mgr.GetClient(),
|
||||
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.ServiceImportControllerName),
|
||||
|
@ -354,7 +315,7 @@ func startServiceImportController(ctx ControllerContext) (enabled bool, err erro
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func startUnifiedAuthController(ctx ControllerContext) (enabled bool, err error) {
|
||||
func startUnifiedAuthController(ctx controllerscontext.Context) (enabled bool, err error) {
|
||||
unifiedAuthController := &unifiedauth.Controller{
|
||||
Client: ctx.Mgr.GetClient(),
|
||||
EventRecorder: ctx.Mgr.GetEventRecorderFor(unifiedauth.ControllerName),
|
||||
|
@ -408,17 +369,29 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
}
|
||||
|
||||
setupClusterAPIClusterDetector(mgr, opts, stopChan)
|
||||
controllerContext := ControllerContext{
|
||||
Mgr: mgr,
|
||||
ObjectWatcher: objectWatcher,
|
||||
Opts: opts,
|
||||
controllerContext := controllerscontext.Context{
|
||||
Mgr: mgr,
|
||||
ObjectWatcher: objectWatcher,
|
||||
Opts: controllerscontext.Options{
|
||||
Controllers: opts.Controllers,
|
||||
ClusterMonitorPeriod: opts.ClusterMonitorPeriod,
|
||||
ClusterMonitorGracePeriod: opts.ClusterMonitorGracePeriod,
|
||||
ClusterStartupGracePeriod: opts.ClusterStartupGracePeriod,
|
||||
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
|
||||
ClusterLeaseDuration: opts.ClusterLeaseDuration,
|
||||
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
|
||||
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
|
||||
ClusterAPIQPS: opts.ClusterAPIQPS,
|
||||
ClusterAPIBurst: opts.ClusterAPIBurst,
|
||||
SkippedPropagatingNamespaces: opts.SkippedPropagatingNamespaces,
|
||||
},
|
||||
StopChan: stopChan,
|
||||
DynamicClientSet: dynamicClientSet,
|
||||
OverrideManager: overrideManager,
|
||||
ControlPlaneInformerManager: controlPlaneInformerManager,
|
||||
}
|
||||
|
||||
if err := StartControllers(controllerContext, NewControllerInitializers()); err != nil {
|
||||
if err := controllers.StartControllers(controllerContext); err != nil {
|
||||
klog.Fatalf("error starting controllers: %v", err)
|
||||
}
|
||||
|
||||
|
@ -429,29 +402,6 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
}()
|
||||
}
|
||||
|
||||
// StartControllers starts a set of controllers with a specified ControllerContext
|
||||
func StartControllers(ctx ControllerContext, controllers map[string]InitFunc) error {
|
||||
for controllerName, initFn := range controllers {
|
||||
if !ctx.IsControllerEnabled(controllerName) {
|
||||
klog.Warningf("%q is disabled", controllerName)
|
||||
continue
|
||||
}
|
||||
klog.V(1).Infof("Starting %q", controllerName)
|
||||
started, err := initFn(ctx)
|
||||
if err != nil {
|
||||
klog.Errorf("Error starting %q", controllerName)
|
||||
return err
|
||||
}
|
||||
if !started {
|
||||
klog.Warningf("Skipping %q", controllerName)
|
||||
continue
|
||||
}
|
||||
klog.Infof("Started %q", controllerName)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// setupClusterAPIClusterDetector initialize Cluster detector with the cluster-api management cluster.
|
||||
func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
|
||||
if len(opts.ClusterAPIKubeconfig) == 0 {
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
package context
|
||||
|
||||
import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/klog/v2"
|
||||
controllerruntime "sigs.k8s.io/controller-runtime"
|
||||
|
||||
"github.com/karmada-io/karmada/pkg/util/informermanager"
|
||||
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
|
||||
"github.com/karmada-io/karmada/pkg/util/overridemanager"
|
||||
)
|
||||
|
||||
// Options defines all the parameters required by our controllers.
|
||||
type Options struct {
|
||||
// Controllers contains all controller names.
|
||||
Controllers []string
|
||||
// ClusterMonitorPeriod represents cluster-controller monitoring period, i.e. how often does
|
||||
// cluster-controller check cluster health signal posted from cluster-status-controller.
|
||||
// This value should be lower than ClusterMonitorGracePeriod.
|
||||
ClusterMonitorPeriod metav1.Duration
|
||||
// ClusterMonitorGracePeriod represents the grace period after last cluster health probe time.
|
||||
// If it doesn't receive update for this amount of time, it will start posting
|
||||
// "ClusterReady==ConditionUnknown".
|
||||
ClusterMonitorGracePeriod metav1.Duration
|
||||
// ClusterStartupGracePeriod specifies the grace period of allowing a cluster to be unresponsive during
|
||||
// startup before marking it unhealthy.
|
||||
ClusterStartupGracePeriod metav1.Duration
|
||||
// ClusterStatusUpdateFrequency is the frequency that controller computes and report cluster status.
|
||||
// It must work with ClusterMonitorGracePeriod.
|
||||
ClusterStatusUpdateFrequency metav1.Duration
|
||||
// ClusterLeaseDuration is a duration that candidates for a lease need to wait to force acquire it.
|
||||
// This is measure against time of last observed lease RenewTime.
|
||||
ClusterLeaseDuration metav1.Duration
|
||||
// ClusterLeaseRenewIntervalFraction is a fraction coordinated with ClusterLeaseDuration that
|
||||
// how long the current holder of a lease has last updated the lease.
|
||||
ClusterLeaseRenewIntervalFraction float64
|
||||
// ClusterCacheSyncTimeout is the timeout period waiting for cluster cache to sync.
|
||||
ClusterCacheSyncTimeout metav1.Duration
|
||||
// ClusterAPIQPS is the QPS to use while talking with cluster kube-apiserver.
|
||||
ClusterAPIQPS float32
|
||||
// ClusterAPIBurst is the burst to allow while talking with cluster kube-apiserver.
|
||||
ClusterAPIBurst int
|
||||
// SkippedPropagatingNamespaces is a list of namespaces that will be skipped for propagating.
|
||||
SkippedPropagatingNamespaces []string
|
||||
// ClusterName is the name of cluster.
|
||||
ClusterName string
|
||||
}
|
||||
|
||||
// Context defines the context object for controller.
|
||||
type Context struct {
|
||||
Mgr controllerruntime.Manager
|
||||
ObjectWatcher objectwatcher.ObjectWatcher
|
||||
Opts Options
|
||||
StopChan <-chan struct{}
|
||||
DynamicClientSet dynamic.Interface
|
||||
OverrideManager overridemanager.OverrideManager
|
||||
ControlPlaneInformerManager informermanager.SingleClusterInformerManager
|
||||
}
|
||||
|
||||
// IsControllerEnabled check if a specified controller enabled or not.
|
||||
func (c Context) IsControllerEnabled(name string) bool {
|
||||
hasStar := false
|
||||
for _, ctrl := range c.Opts.Controllers {
|
||||
if ctrl == name {
|
||||
return true
|
||||
}
|
||||
if ctrl == "-"+name {
|
||||
return false
|
||||
}
|
||||
if ctrl == "*" {
|
||||
hasStar = true
|
||||
}
|
||||
}
|
||||
return hasStar
|
||||
}
|
||||
|
||||
// InitFunc is used to launch a particular controller.
|
||||
// Any error returned will cause the controller process to `Fatal`
|
||||
// The bool indicates whether the controller was enabled.
|
||||
type InitFunc func(ctx Context) (enabled bool, err error)
|
||||
|
||||
// Initializers is a public map of named controller groups
|
||||
type Initializers map[string]InitFunc
|
||||
|
||||
// ControllerNames returns all known controller names
|
||||
func (i Initializers) ControllerNames() []string {
|
||||
return sets.StringKeySet(i).List()
|
||||
}
|
||||
|
||||
// StartControllers starts a set of controllers with a specified ControllerContext
|
||||
func (i Initializers) StartControllers(ctx Context) error {
|
||||
for controllerName, initFn := range i {
|
||||
if !ctx.IsControllerEnabled(controllerName) {
|
||||
klog.Warningf("%q is disabled", controllerName)
|
||||
continue
|
||||
}
|
||||
klog.V(1).Infof("Starting %q", controllerName)
|
||||
started, err := initFn(ctx)
|
||||
if err != nil {
|
||||
klog.Errorf("Error starting %q", controllerName)
|
||||
return err
|
||||
}
|
||||
if !started {
|
||||
klog.Warningf("Skipping %q", controllerName)
|
||||
continue
|
||||
}
|
||||
klog.Infof("Started %q", controllerName)
|
||||
}
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue