Merge pull request #2319 from XiShanYongYe-Chang/graceful-evicting

Add grace-eviction-controller to evict cluster workload
This commit is contained in:
karmada-bot 2022-08-15 11:06:43 +08:00 committed by GitHub
commit f4e7d5a5df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 586 additions and 1 deletions

View File

@ -29,7 +29,7 @@ spec:
- --bind-address=0.0.0.0
- --cluster-status-update-frequency=10s
- --secure-port=10357
- --feature-gates=PropagateDeps=true
- --feature-gates=PropagateDeps=true,Failover=true,GracefulEviction=true
- --v=4
livenessProbe:
httpGet:

View File

@ -33,6 +33,7 @@ import (
controllerscontext "github.com/karmada-io/karmada/pkg/controllers/context"
"github.com/karmada-io/karmada/pkg/controllers/execution"
"github.com/karmada-io/karmada/pkg/controllers/federatedresourcequota"
"github.com/karmada-io/karmada/pkg/controllers/gracefuleviction"
"github.com/karmada-io/karmada/pkg/controllers/hpa"
"github.com/karmada-io/karmada/pkg/controllers/mcs"
"github.com/karmada-io/karmada/pkg/controllers/namespace"
@ -177,6 +178,9 @@ func init() {
controllers["unifiedAuth"] = startUnifiedAuthController
controllers["federatedResourceQuotaSync"] = startFederatedResourceQuotaSyncController
controllers["federatedResourceQuotaStatus"] = startFederatedResourceQuotaStatusController
if features.FeatureGate.Enabled(features.Failover) && features.FeatureGate.Enabled(features.GracefulEviction) {
controllers["gracefulEviction"] = startGracefulEvictionController
}
}
func startClusterController(ctx controllerscontext.Context) (enabled bool, err error) {
@ -452,6 +456,30 @@ func startFederatedResourceQuotaStatusController(ctx controllerscontext.Context)
return true, nil
}
func startGracefulEvictionController(ctx controllerscontext.Context) (enabled bool, err error) {
rbGracefulEvictionController := &gracefuleviction.RBGracefulEvictionController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(gracefuleviction.RBGracefulEvictionControllerName),
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
GracefulEvictionTimeout: ctx.Opts.GracefulEvictionTimeout.Duration,
}
if err := rbGracefulEvictionController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
}
crbGracefulEvictionController := &gracefuleviction.CRBGracefulEvictionController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(gracefuleviction.CRBGracefulEvictionControllerName),
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
GracefulEvictionTimeout: ctx.Opts.GracefulEvictionTimeout.Duration,
}
if err := crbGracefulEvictionController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
}
return true, nil
}
// setupControllers initialize controllers and setup one by one.
func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
restConfig := mgr.GetConfig()
@ -532,6 +560,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs,
EnableTaintManager: opts.EnableTaintManager,
RateLimiterOptions: opts.RateLimiterOpts,
GracefulEvictionTimeout: opts.GracefulEvictionTimeout,
},
StopChan: stopChan,
DynamicClientSet: dynamicClientSet,

View File

@ -114,6 +114,9 @@ type Options struct {
// If set to true enables NoExecute Taints and will evict all not-tolerating
// objects propagating on Clusters tainted with this kind of Taints.
EnableTaintManager bool
// GracefulEvictionTimeout is the timeout period waiting for the grace-eviction-controller performs the final
// removal since the workload(resource) has been moved to the graceful eviction tasks.
GracefulEvictionTimeout metav1.Duration
RateLimiterOpts ratelimiterflag.Options
ProfileOpts profileflag.Options
@ -194,6 +197,7 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers, disabledByDefau
flags.IntVar(&o.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", 1, "The number of Namespaces that are allowed to sync concurrently.")
flags.IntVar(&o.ConcurrentResourceTemplateSyncs, "concurrent-resource-template-syncs", 5, "The number of resource templates that are allowed to sync concurrently.")
flags.BoolVar(&o.EnableTaintManager, "enable-taint-manager", true, "If set to true enables NoExecute Taints and will evict all not-tolerating objects propagating on Clusters tainted with this kind of Taints.")
flags.DurationVar(&o.GracefulEvictionTimeout.Duration, "graceful-eviction-timeout", 10*time.Minute, "Specifies the timeout period waiting for the graceful-eviction-controller performs the final removal since the workload(resource) has been moved to the graceful eviction tasks.")
o.RateLimiterOpts.AddFlags(flags)
o.ProfileOpts.AddFlags(flags)

View File

@ -61,6 +61,9 @@ type Options struct {
// If set to true enables NoExecute Taints and will evict all not-tolerating
// objects propagating on Clusters tainted with this kind of Taints.
EnableTaintManager bool
// GracefulEvictionTimeout is the timeout period waiting for the grace-eviction-controller performs the final
// removal since the workload(resource) has been moved to the graceful eviction tasks.
GracefulEvictionTimeout metav1.Duration
}
// Context defines the context object for controller.

View File

@ -0,0 +1,106 @@
package gracefuleviction
import (
"context"
"reflect"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
)
// CRBGracefulEvictionControllerName is the controller name that will be used when reporting events.
const CRBGracefulEvictionControllerName = "cluster-resource-binding-graceful-eviction-controller"
// CRBGracefulEvictionController is to sync ClusterResourceBinding.spec.gracefulEvictionTasks.
type CRBGracefulEvictionController struct {
client.Client
EventRecorder record.EventRecorder
RateLimiterOptions ratelimiterflag.Options
GracefulEvictionTimeout time.Duration
}
// Reconcile performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (c *CRBGracefulEvictionController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Reconciling ClusterResourceBinding %s.", req.NamespacedName.String())
binding := &workv1alpha2.ClusterResourceBinding{}
if err := c.Client.Get(context.TODO(), req.NamespacedName, binding); err != nil {
if apierrors.IsNotFound(err) {
return controllerruntime.Result{}, nil
}
return controllerruntime.Result{Requeue: true}, err
}
if !binding.DeletionTimestamp.IsZero() {
return controllerruntime.Result{}, nil
}
retryDuration, err := c.syncBinding(binding)
if err != nil {
return controllerruntime.Result{Requeue: true}, err
}
if retryDuration > 0 {
klog.V(4).Infof("Retry to evict task after %v minutes.", retryDuration.Minutes())
return controllerruntime.Result{RequeueAfter: retryDuration}, nil
}
return controllerruntime.Result{}, nil
}
func (c *CRBGracefulEvictionController) syncBinding(binding *workv1alpha2.ClusterResourceBinding) (time.Duration, error) {
keptTask := assessEvictionTasks(binding.Spec, binding.Status.AggregatedStatus, c.GracefulEvictionTimeout, metav1.Now())
if reflect.DeepEqual(binding.Spec.GracefulEvictionTasks, keptTask) {
return nextRetry(keptTask, c.GracefulEvictionTimeout, metav1.Now().Time), nil
}
objPatch := client.MergeFrom(binding)
modifiedObj := binding.DeepCopy()
modifiedObj.Spec.GracefulEvictionTasks = keptTask
err := c.Client.Patch(context.TODO(), modifiedObj, objPatch)
if err != nil {
return 0, err
}
return nextRetry(keptTask, c.GracefulEvictionTimeout, metav1.Now().Time), nil
}
// SetupWithManager creates a controller and register to controller manager.
func (c *CRBGracefulEvictionController) SetupWithManager(mgr controllerruntime.Manager) error {
clusterResourceBindingPredicateFn := predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool { return false },
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
newObj := updateEvent.ObjectNew.(*workv1alpha2.ClusterResourceBinding)
oldObj := updateEvent.ObjectOld.(*workv1alpha2.ClusterResourceBinding)
if len(newObj.Spec.GracefulEvictionTasks) == 0 {
return false
}
if newObj.Status.SchedulerObservedGeneration != newObj.Generation {
return false
}
return !reflect.DeepEqual(newObj.Spec.GracefulEvictionTasks, oldObj.Spec.GracefulEvictionTasks)
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool { return false },
GenericFunc: func(genericEvent event.GenericEvent) bool { return false },
}
return controllerruntime.NewControllerManagedBy(mgr).
For(&workv1alpha2.ClusterResourceBinding{}, builder.WithPredicates(clusterResourceBindingPredicateFn)).
WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions)}).
Complete(c)
}

View File

@ -0,0 +1,73 @@
package gracefuleviction
import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
)
type assessmentOption struct {
timeout time.Duration
scheduleResult []workv1alpha2.TargetCluster
observedStatus []workv1alpha2.AggregatedStatusItem
}
// assessEvictionTasks assesses each task according to graceful eviction rules and
// returns the tasks that should be kept.
func assessEvictionTasks(bindingSpec workv1alpha2.ResourceBindingSpec,
observedStatus []workv1alpha2.AggregatedStatusItem,
timeout time.Duration,
now metav1.Time,
) []workv1alpha2.GracefulEvictionTask {
var keptTasks []workv1alpha2.GracefulEvictionTask
for _, task := range bindingSpec.GracefulEvictionTasks {
// set creation timestamp for new task
if task.CreationTimestamp.IsZero() {
task.CreationTimestamp = now
keptTasks = append(keptTasks, task)
continue
}
// assess task according to observed status
kt := assessSingleTask(task, assessmentOption{
scheduleResult: bindingSpec.Clusters,
timeout: timeout,
observedStatus: observedStatus,
})
if kt != nil {
keptTasks = append(keptTasks, *kt)
}
}
return keptTasks
}
func assessSingleTask(task workv1alpha2.GracefulEvictionTask, opt assessmentOption) *workv1alpha2.GracefulEvictionTask {
// TODO(): gradually evict replica as per observed status.
// task exceeds timeout
if metav1.Now().After(task.CreationTimestamp.Add(opt.timeout)) {
return nil
}
return &task
}
func nextRetry(tasks []workv1alpha2.GracefulEvictionTask, timeout time.Duration, timeNow time.Time) time.Duration {
if len(tasks) == 0 {
return 0
}
retryInterval := timeout / 10
for i := range tasks {
next := tasks[i].CreationTimestamp.Add(timeout).Sub(timeNow)
if next < retryInterval {
retryInterval = next
}
}
return retryInterval
}

View File

@ -0,0 +1,264 @@
package gracefuleviction
import (
"reflect"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
)
func Test_assessSingleTask(t *testing.T) {
timeNow := metav1.Now()
timeout := time.Minute * 3
type args struct {
task workv1alpha2.GracefulEvictionTask
opt assessmentOption
}
tests := []struct {
name string
args args
want *workv1alpha2.GracefulEvictionTask
}{
{
name: "task that doesn't exceed the timeout",
args: args{
task: workv1alpha2.GracefulEvictionTask{
FromCluster: "member1",
CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Minute * -1)},
},
opt: assessmentOption{
timeout: timeout,
},
},
want: &workv1alpha2.GracefulEvictionTask{
FromCluster: "member1",
CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Minute * -1)},
},
},
{
name: "task that exceeds the timeout",
args: args{
task: workv1alpha2.GracefulEvictionTask{
FromCluster: "member1",
CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Minute * -4)},
},
opt: assessmentOption{
timeout: timeout,
},
},
want: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := assessSingleTask(tt.args.task, tt.args.opt); !reflect.DeepEqual(got, tt.want) {
t.Errorf("assessSingleTask() = %v, want %v", got, tt.want)
}
})
}
}
func Test_assessEvictionTasks(t *testing.T) {
timeNow := metav1.Now()
timeout := time.Minute * 3
type args struct {
bindingSpec workv1alpha2.ResourceBindingSpec
observedStatus []workv1alpha2.AggregatedStatusItem
timeout time.Duration
now metav1.Time
}
tests := []struct {
name string
args args
want []workv1alpha2.GracefulEvictionTask
}{
{
name: "tasks without creation timestamp",
args: args{
bindingSpec: workv1alpha2.ResourceBindingSpec{
GracefulEvictionTasks: []workv1alpha2.GracefulEvictionTask{
{FromCluster: "member1"},
{FromCluster: "member2"},
},
},
observedStatus: []workv1alpha2.AggregatedStatusItem{},
timeout: timeout,
now: timeNow,
},
want: []workv1alpha2.GracefulEvictionTask{
{
FromCluster: "member1",
CreationTimestamp: timeNow,
},
{
FromCluster: "member2",
CreationTimestamp: timeNow,
},
},
},
{
name: "tasks that do not exceed the timeout should do nothing",
args: args{
bindingSpec: workv1alpha2.ResourceBindingSpec{
GracefulEvictionTasks: []workv1alpha2.GracefulEvictionTask{
{
FromCluster: "member1",
CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Minute * -1)},
},
{
FromCluster: "member2",
CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Minute * -2)},
},
},
},
observedStatus: []workv1alpha2.AggregatedStatusItem{},
timeout: timeout,
now: timeNow,
},
want: []workv1alpha2.GracefulEvictionTask{
{
FromCluster: "member1",
CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Minute * -1)},
},
{
FromCluster: "member2",
CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Minute * -2)},
},
},
},
{
name: "tasks that exceed the timeout should be removed",
args: args{
bindingSpec: workv1alpha2.ResourceBindingSpec{
GracefulEvictionTasks: []workv1alpha2.GracefulEvictionTask{
{
FromCluster: "member1",
CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Minute * -4)},
},
{
FromCluster: "member2",
CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Minute * -5)},
},
},
},
observedStatus: []workv1alpha2.AggregatedStatusItem{},
timeout: timeout,
now: timeNow,
},
want: nil,
},
{
name: "mixed tasks",
args: args{
bindingSpec: workv1alpha2.ResourceBindingSpec{
GracefulEvictionTasks: []workv1alpha2.GracefulEvictionTask{
{
FromCluster: "member1",
},
{
FromCluster: "member2",
CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Minute * -2)},
},
{
FromCluster: "member3",
CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Minute * -4)},
},
},
},
observedStatus: []workv1alpha2.AggregatedStatusItem{},
timeout: timeout,
now: timeNow,
},
want: []workv1alpha2.GracefulEvictionTask{
{
FromCluster: "member1",
CreationTimestamp: timeNow,
},
{
FromCluster: "member2",
CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Minute * -2)},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := assessEvictionTasks(tt.args.bindingSpec, tt.args.observedStatus, tt.args.timeout, tt.args.now); !reflect.DeepEqual(got, tt.want) {
t.Errorf("assessEvictionTasks() = %v, want %v", got, tt.want)
}
})
}
}
func Test_nextRetry(t *testing.T) {
timeNow := metav1.Now()
timeout := time.Minute * 20
type args struct {
task []workv1alpha2.GracefulEvictionTask
timeout time.Duration
timeNow time.Time
}
tests := []struct {
name string
args args
want time.Duration
}{
{
name: "empty tasks",
args: args{
task: []workv1alpha2.GracefulEvictionTask{},
timeout: timeout,
timeNow: timeNow.Time,
},
want: 0,
},
{
name: "retry interval is less than timeout / 10",
args: args{
task: []workv1alpha2.GracefulEvictionTask{
{
FromCluster: "member1",
CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Minute * -19)},
},
{
FromCluster: "member2",
CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Minute * -10)},
},
},
timeout: timeout,
timeNow: timeNow.Time,
},
want: time.Minute * 1,
},
{
name: "retry interval is equal to timeout / 10",
args: args{
task: []workv1alpha2.GracefulEvictionTask{
{
FromCluster: "member1",
CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Minute * -10)},
},
{
FromCluster: "member2",
CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Minute * -5)},
},
},
timeout: timeout,
timeNow: timeNow.Time,
},
want: timeout / 10,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := nextRetry(tt.args.task, tt.args.timeout, tt.args.timeNow); got != tt.want {
t.Errorf("nextRetry() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -0,0 +1,106 @@
package gracefuleviction
import (
"context"
"reflect"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
)
// RBGracefulEvictionControllerName is the controller name that will be used when reporting events.
const RBGracefulEvictionControllerName = "resource-binding-graceful-eviction-controller"
// RBGracefulEvictionController is to sync ResourceBinding.spec.gracefulEvictionTasks.
type RBGracefulEvictionController struct {
client.Client
EventRecorder record.EventRecorder
RateLimiterOptions ratelimiterflag.Options
GracefulEvictionTimeout time.Duration
}
// Reconcile performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (c *RBGracefulEvictionController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Reconciling ResourceBinding %s.", req.NamespacedName.String())
binding := &workv1alpha2.ResourceBinding{}
if err := c.Client.Get(context.TODO(), req.NamespacedName, binding); err != nil {
if apierrors.IsNotFound(err) {
return controllerruntime.Result{}, nil
}
return controllerruntime.Result{Requeue: true}, err
}
if !binding.DeletionTimestamp.IsZero() {
return controllerruntime.Result{}, nil
}
retryDuration, err := c.syncBinding(binding)
if err != nil {
return controllerruntime.Result{Requeue: true}, err
}
if retryDuration > 0 {
klog.V(4).Infof("Retry to evict task after %v minutes.", retryDuration.Minutes())
return controllerruntime.Result{RequeueAfter: retryDuration}, nil
}
return controllerruntime.Result{}, nil
}
func (c *RBGracefulEvictionController) syncBinding(binding *workv1alpha2.ResourceBinding) (time.Duration, error) {
keptTask := assessEvictionTasks(binding.Spec, binding.Status.AggregatedStatus, c.GracefulEvictionTimeout, metav1.Now())
if reflect.DeepEqual(binding.Spec.GracefulEvictionTasks, keptTask) {
return nextRetry(keptTask, c.GracefulEvictionTimeout, metav1.Now().Time), nil
}
objPatch := client.MergeFrom(binding)
modifiedObj := binding.DeepCopy()
modifiedObj.Spec.GracefulEvictionTasks = keptTask
err := c.Client.Patch(context.TODO(), modifiedObj, objPatch)
if err != nil {
return 0, err
}
return nextRetry(keptTask, c.GracefulEvictionTimeout, metav1.Now().Time), nil
}
// SetupWithManager creates a controller and register to controller manager.
func (c *RBGracefulEvictionController) SetupWithManager(mgr controllerruntime.Manager) error {
resourceBindingPredicateFn := predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool { return false },
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
newObj := updateEvent.ObjectNew.(*workv1alpha2.ResourceBinding)
oldObj := updateEvent.ObjectOld.(*workv1alpha2.ResourceBinding)
if len(newObj.Spec.GracefulEvictionTasks) == 0 {
return false
}
if newObj.Status.SchedulerObservedGeneration != newObj.Generation {
return false
}
return !reflect.DeepEqual(newObj.Spec.GracefulEvictionTasks, oldObj.Spec.GracefulEvictionTasks)
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool { return false },
GenericFunc: func(genericEvent event.GenericEvent) bool { return false },
}
return controllerruntime.NewControllerManagedBy(mgr).
For(&workv1alpha2.ResourceBinding{}, builder.WithPredicates(resourceBindingPredicateFn)).
WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions)}).
Complete(c)
}