Merge pull request #2973 from carlory/operator

add operator skeleton
This commit is contained in:
karmada-bot 2022-12-23 09:35:48 +08:00 committed by GitHub
commit 64897987ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 531 additions and 0 deletions

View File

@ -86,3 +86,6 @@ issues:
- path: cmd/scheduler-estimator/main.go
linters:
- gci
- path: operator/cmd/operator/operator.go
linters:
- gci

View File

@ -0,0 +1,164 @@
package app
import (
"context"
"flag"
"fmt"
"net"
"os"
"strconv"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/sets"
restclient "k8s.io/client-go/rest"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
ctrlruntimecfg "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"github.com/karmada-io/karmada/operator/cmd/operator/app/options"
operatorv1alpha1 "github.com/karmada-io/karmada/operator/pkg/apis/operator/v1alpha1"
ctrlctx "github.com/karmada-io/karmada/operator/pkg/controller/context"
"github.com/karmada-io/karmada/operator/pkg/controller/karmada"
"github.com/karmada-io/karmada/operator/pkg/scheme"
"github.com/karmada-io/karmada/pkg/sharedcli"
"github.com/karmada-io/karmada/pkg/sharedcli/klogflag"
)
// NewOperatorCommand creates a *cobra.Command object with default parameters
func NewOperatorCommand(ctx context.Context) *cobra.Command {
o := options.NewOptions()
cmd := &cobra.Command{
Use: "karmada-operator",
PersistentPreRunE: func(*cobra.Command, []string) error {
// silence client-go warnings.
// karmada-operator generically watches APIs (including deprecated ones),
// and CI ensures it works properly against matching kube-apiserver versions.
restclient.SetDefaultWarningHandler(restclient.NoWarnings{})
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
if err := o.Validate(); err != nil {
return err
}
return Run(ctx, o)
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
if len(arg) > 0 {
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
}
}
return nil
},
}
fss := cliflag.NamedFlagSets{}
genericFlagSet := fss.FlagSet("generic")
// Add the flag(--kubeconfig) that is added by controller-runtime
// (https://github.com/kubernetes-sigs/controller-runtime/blob/v0.11.1/pkg/client/config/config.go#L39).
genericFlagSet.AddGoFlagSet(flag.CommandLine)
o.AddFlags(genericFlagSet, controllers.ControllerNames(), controllersDisabledByDefault.List())
// Set klog flags
logsFlagSet := fss.FlagSet("logs")
klogflag.Add(logsFlagSet)
cmd.Flags().AddFlagSet(genericFlagSet)
cmd.Flags().AddFlagSet(logsFlagSet)
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
sharedcli.SetUsageAndHelpFunc(cmd, fss, cols)
return cmd
}
// Run runs the karmada-operator. This should never exit.
func Run(ctx context.Context, o *options.Options) error {
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
manager, err := createControllerManager(ctx, o)
if err != nil {
klog.Errorf("failed to build controller manager: %v", err)
return err
}
if err := manager.AddHealthzCheck("ping", healthz.Ping); err != nil {
klog.Errorf("failed to add health check endpoint: %v", err)
return err
}
controllerCtx := ctrlctx.Context{
Controllers: o.Controllers,
Manager: manager,
}
if err := controllers.StartControllers(controllerCtx, controllersDisabledByDefault); err != nil {
klog.Errorf("failed to start controllers: %v", err)
return err
}
// blocks until the context is done.
if err := manager.Start(ctx); err != nil {
klog.Errorf("controller manager exits unexpectedly: %v", err)
return err
}
// never reach here
return nil
}
var controllers = make(ctrlctx.Initializers)
// controllersDisabledByDefault is the set of controllers which is disabled by default
var controllersDisabledByDefault = sets.NewString()
func init() {
controllers["karmada"] = startKarmadaController
}
func startKarmadaController(ctx ctrlctx.Context) (bool, error) {
ctrl := &karmada.Controller{
Client: ctx.Manager.GetClient(),
EventRecorder: ctx.Manager.GetEventRecorderFor(karmada.ControllerName),
}
if err := ctrl.SetupWithManager(ctx.Manager); err != nil {
klog.ErrorS(err, "unable to setup with manager", "controller", karmada.ControllerName)
return false, err
}
return true, nil
}
// createControllerManager creates controllerruntime.Manager from the given configuration
func createControllerManager(ctx context.Context, o *options.Options) (controllerruntime.Manager, error) {
config, err := controllerruntime.GetConfig()
if err != nil {
return nil, err
}
opts := controllerruntime.Options{
Logger: klog.Background(),
Scheme: scheme.Scheme,
BaseContext: func() context.Context {
return ctx
},
SyncPeriod: &o.ResyncPeriod.Duration,
LeaderElection: o.LeaderElection.LeaderElect,
LeaderElectionID: o.LeaderElection.ResourceName,
LeaderElectionNamespace: o.LeaderElection.ResourceNamespace,
LeaseDuration: &o.LeaderElection.LeaseDuration.Duration,
RenewDeadline: &o.LeaderElection.RenewDeadline.Duration,
RetryPeriod: &o.LeaderElection.RetryPeriod.Duration,
LeaderElectionResourceLock: o.LeaderElection.ResourceLock,
HealthProbeBindAddress: net.JoinHostPort(o.BindAddress, strconv.Itoa(o.SecurePort)),
LivenessEndpointName: "/healthz",
MetricsBindAddress: o.MetricsBindAddress,
Controller: ctrlruntimecfg.ControllerConfigurationSpec{
GroupKindConcurrency: map[string]int{
operatorv1alpha1.SchemeGroupVersion.WithKind("Karmada").GroupKind().String(): o.ConcurrentKarmadaSyncs,
},
},
}
return controllerruntime.NewManager(config, opts)
}

View File

@ -0,0 +1,89 @@
package options
import (
"fmt"
"strings"
"time"
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/leaderelection/resourcelock"
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/component-base/config/options"
)
// Options is the main context object for the karmada-operator.
type Options struct {
// Controllers is the list of controllers to enable or disable
// '*' means "all enabled by default controllers"
// 'foo' means "enable 'foo'"
// '-foo' means "disable 'foo'"
// first item for a particular name wins
Controllers []string
// LeaderElection defines the configuration of leader election client.
LeaderElection componentbaseconfig.LeaderElectionConfiguration
// BindAddress is the IP address on which to listen for the --secure-port port.
BindAddress string
// SecurePort is the port that the the server serves at.
// Note: We hope support https in the future once controller-runtime provides the functionality.
SecurePort int
// KubeAPIQPS is the QPS to use while talking with karmada-apiserver.
KubeAPIQPS float32
// KubeAPIBurst is the burst to allow while talking with karmada-apiserver.
KubeAPIBurst int32
// ResyncPeriod is the base frequency the informers are resynced.
// Defaults to 0, which means the created informer will never do resyncs.
ResyncPeriod metav1.Duration
// MetricsBindAddress is the TCP address that the controller should bind to
// for serving prometheus metrics.
// It can be set to "0" to disable the metrics serving.
// Defaults to ":8080".
MetricsBindAddress string
// ConcurrentKarmadaSyncs is the number of karmada objects that are allowed to sync concurrently.
ConcurrentKarmadaSyncs int
}
// NewOptions creates a new Options with a default config.
func NewOptions() *Options {
o := Options{
Controllers: []string{"*"},
LeaderElection: componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: true,
LeaseDuration: metav1.Duration{Duration: 15 * time.Second},
RenewDeadline: metav1.Duration{Duration: 10 * time.Second},
RetryPeriod: metav1.Duration{Duration: 2 * time.Second},
ResourceLock: resourcelock.LeasesResourceLock,
ResourceNamespace: "karmada-system",
ResourceName: "karmada-operator",
},
BindAddress: "0.0.0.0",
SecurePort: 8443,
KubeAPIQPS: 50,
KubeAPIBurst: 100,
ConcurrentKarmadaSyncs: 5,
}
return &o
}
// AddFlags adds flags to the specified FlagSet.
func (o *Options) AddFlags(fs *pflag.FlagSet, allControllers []string, disabledByDefaultControllers []string) {
fs.DurationVar(&o.ResyncPeriod.Duration, "resync-period", o.ResyncPeriod.Duration, "ResyncPeriod determines the minimum frequency at which watched resources are reconciled.")
fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", o.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver.")
fs.Int32Var(&o.KubeAPIBurst, "kube-api-burst", o.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver.")
fs.StringSliceVar(&o.Controllers, "controllers", o.Controllers, fmt.Sprintf(""+
"A list of controllers to enable. '*' enables all on-by-default controllers, 'foo' enables the controller "+
"named 'foo', '-foo' disables the controller named 'foo'.\nAll controllers: %s\nDisabled-by-default controllers: %s",
strings.Join(allControllers, ", "), strings.Join(disabledByDefaultControllers, ", ")))
fs.IntVar(&o.ConcurrentKarmadaSyncs, "concurrent-karmada-syncs", o.ConcurrentKarmadaSyncs, "The number of karmada objects that are allowed to sync concurrently..")
options.BindLeaderElectionFlags(&o.LeaderElection, fs)
}
// Validate is used to validate the options and config before launching the controller manager
func (o *Options) Validate() error {
var errs []error
// do validation logic here
return utilerrors.NewAggregate(errs)
}

View File

@ -0,0 +1,29 @@
package main
import (
"os"
// Note that Kubernetes registers workqueue metrics to default prometheus Registry. And the registry will be
// initialized by the package 'k8s.io/apiserver/pkg/server'.
// See https://github.com/kubernetes/kubernetes/blob/release-1.26/staging/src/k8s.io/component-base/metrics/prometheus/workqueue/metrics.go#L25-L26
// But the controller-runtime registers workqueue metrics to its own Registry instead of default prometheus Registry.
// See https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.14/pkg/metrics/workqueue.go#L24-L26
// However, global workqueue metrics factory will be only initialized once.
// See https://github.com/kubernetes/kubernetes/blob/release-1.26/staging/src/k8s.io/client-go/util/workqueue/metrics.go#L257-L261
// So this package should be initialized before 'k8s.io/apiserver/pkg/server', thus the internal registry of
// controller-runtime could be set first.
_ "sigs.k8s.io/controller-runtime/pkg/metrics"
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/component-base/cli"
_ "k8s.io/component-base/logs/json/register" // for JSON log format registration
"github.com/karmada-io/karmada/operator/cmd/operator/app"
)
func main() {
ctx := apiserver.SetupSignalContext()
command := app.NewOperatorCommand(ctx)
code := cli.Run(command)
os.Exit(code)
}

View File

@ -0,0 +1,77 @@
package context
import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
)
// Context defines the context object for controller
type Context struct {
// Controllers is the list of controllers to enable or disable
// '*' means "all enabled by default controllers"
// 'foo' means "enable 'foo'"
// '-foo' means "disable 'foo'"
// first item for a particular name wins
Controllers []string
Manager controllerruntime.Manager
}
// IsControllerEnabled checks if the context's controllers enabled or not
func (c Context) IsControllerEnabled(name string, disabledByDefaultControllers sets.String) bool {
hasStar := false
for _, ctrl := range c.Controllers {
if ctrl == name {
return true
}
if ctrl == "-"+name {
return false
}
if ctrl == "*" {
hasStar = true
}
}
// if we get here, there was no explicit choice
if !hasStar {
// nothing on by default
return false
}
return !disabledByDefaultControllers.Has(name)
}
// InitFunc is used to launch a particular controller.
// Any error returned will cause the controller process to `Fatal`
// The bool indicates whether the controller was enabled.
type InitFunc func(ctx Context) (enabled bool, err error)
// Initializers is a public map of named controller groups
type Initializers map[string]InitFunc
// ControllerNames returns all known controller names
func (i Initializers) ControllerNames() []string {
return sets.StringKeySet(i).List()
}
// StartControllers starts a set of controllers with a specified ControllerContext
func (i Initializers) StartControllers(ctx Context, controllersDisabledByDefault sets.String) error {
for controllerName, initFn := range i {
if !ctx.IsControllerEnabled(controllerName, controllersDisabledByDefault) {
klog.Warningf("%q is disabled", controllerName)
continue
}
klog.V(1).Infof("Starting %q", controllerName)
started, err := initFn(ctx)
if err != nil {
klog.Errorf("Error starting %q", controllerName)
return err
}
if !started {
klog.Warningf("Skipping %q", controllerName)
continue
}
klog.Infof("Started %q", controllerName)
}
return nil
}

View File

@ -0,0 +1,98 @@
package karmada
import (
"context"
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
operatorv1alpha1 "github.com/karmada-io/karmada/operator/pkg/apis/operator/v1alpha1"
)
const (
// ControllerName is the controller name that will be used when reporting events.
ControllerName = "karmada-operator-controller"
// ControllerFinalizerName is the name of the karmada controller finalizer
ControllerFinalizerName = "operator.karmada.io/finalizer"
)
// Controller controls the Karmada resource.
type Controller struct {
client.Client
EventRecorder record.EventRecorder
}
// Reconcile performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (ctrl *Controller) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
startTime := time.Now()
klog.V(4).InfoS("Started syncing karmada", "karmada", req, "startTime", startTime)
defer func() {
klog.V(4).InfoS("Finished syncing karmada", "karmada", req, "duration", time.Since(startTime))
}()
karmada := &operatorv1alpha1.Karmada{}
if err := ctrl.Get(ctx, req.NamespacedName, karmada); err != nil {
// The resource may no longer exist, in which case we stop processing.
if errors.IsNotFound(err) {
klog.V(2).InfoS("Karmada has been deleted", "karmada", req)
return controllerruntime.Result{}, nil
}
return controllerruntime.Result{}, err
}
// examine DeletionTimestamp to determine if object is under deletion
if karmada.DeletionTimestamp.IsZero() {
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object. This is equivalent
// registering our finalizer.
if !controllerutil.ContainsFinalizer(karmada, ControllerFinalizerName) {
controllerutil.AddFinalizer(karmada, ControllerFinalizerName)
if err := ctrl.Update(ctx, karmada); err != nil {
return controllerruntime.Result{}, err
}
}
} else {
// The object is being deleted
if controllerutil.ContainsFinalizer(karmada, ControllerFinalizerName) {
// our finalizer is present, so lets handle any external dependency
if err := ctrl.deleteUnableGCResources(karmada); err != nil {
// if fail to delete the external dependency here, return with error
// so that it can be retried
return controllerruntime.Result{}, err
}
// remove our finalizer from the list and update it.
controllerutil.RemoveFinalizer(karmada, ControllerFinalizerName)
if err := ctrl.Update(ctx, karmada); err != nil {
return controllerruntime.Result{}, err
}
// Stop reconciliation as the item is being deleted
return controllerruntime.Result{}, nil
}
}
klog.V(2).InfoS("Reconciling karmada", "name", req.Name)
// do reconcile
return controllerruntime.Result{}, nil
}
func (ctrl *Controller) deleteUnableGCResources(karmada *operatorv1alpha1.Karmada) error {
klog.InfoS("Deleting unable gc resources", "karmada", klog.KObj(karmada))
return nil
}
// SetupWithManager creates a controller and register to controller manager.
func (ctrl *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).For(&operatorv1alpha1.Karmada{}).Complete(ctrl)
}

View File

@ -0,0 +1,17 @@
package scheme
import (
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes/scheme"
operatorv1alpha1 "github.com/karmada-io/karmada/operator/pkg/apis/operator/v1alpha1"
)
// Scheme holds the aggregated Kubernetes's schemes and extended schemes.
var Scheme = runtime.NewScheme()
func init() {
utilruntime.Must(scheme.AddToScheme(Scheme))
utilruntime.Must(operatorv1alpha1.AddToScheme(Scheme))
}

View File

@ -0,0 +1,53 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
"github.com/spf13/pflag"
"k8s.io/component-base/config"
)
// BindLeaderElectionFlags binds the LeaderElectionConfiguration struct fields to a flagset
func BindLeaderElectionFlags(l *config.LeaderElectionConfiguration, fs *pflag.FlagSet) {
fs.BoolVar(&l.LeaderElect, "leader-elect", l.LeaderElect, ""+
"Start a leader election client and gain leadership before "+
"executing the main loop. Enable this when running replicated "+
"components for high availability.")
fs.DurationVar(&l.LeaseDuration.Duration, "leader-elect-lease-duration", l.LeaseDuration.Duration, ""+
"The duration that non-leader candidates will wait after observing a leadership "+
"renewal until attempting to acquire leadership of a led but unrenewed leader "+
"slot. This is effectively the maximum duration that a leader can be stopped "+
"before it is replaced by another candidate. This is only applicable if leader "+
"election is enabled.")
fs.DurationVar(&l.RenewDeadline.Duration, "leader-elect-renew-deadline", l.RenewDeadline.Duration, ""+
"The interval between attempts by the acting master to renew a leadership slot "+
"before it stops leading. This must be less than or equal to the lease duration. "+
"This is only applicable if leader election is enabled.")
fs.DurationVar(&l.RetryPeriod.Duration, "leader-elect-retry-period", l.RetryPeriod.Duration, ""+
"The duration the clients should wait between attempting acquisition and renewal "+
"of a leadership. This is only applicable if leader election is enabled.")
fs.StringVar(&l.ResourceLock, "leader-elect-resource-lock", l.ResourceLock, ""+
"The type of resource object that is used for locking during "+
"leader election. Supported options are 'leases', 'endpointsleases' "+
"and 'configmapsleases'.")
fs.StringVar(&l.ResourceName, "leader-elect-resource-name", l.ResourceName, ""+
"The name of resource object that is used for locking during "+
"leader election.")
fs.StringVar(&l.ResourceNamespace, "leader-elect-resource-namespace", l.ResourceNamespace, ""+
"The namespace of resource object that is used for locking during "+
"leader election.")
}

1
vendor/modules.txt vendored
View File

@ -1385,6 +1385,7 @@ k8s.io/code-generator/third_party/forked/golang/reflect
k8s.io/component-base/cli
k8s.io/component-base/cli/flag
k8s.io/component-base/config
k8s.io/component-base/config/options
k8s.io/component-base/config/v1alpha1
k8s.io/component-base/featuregate
k8s.io/component-base/logs