add remedy controller

Signed-off-by: changzhen <changzhen5@huawei.com>
This commit is contained in:
changzhen 2024-02-26 16:28:36 +08:00
parent bc9316705c
commit 081610e33e
6 changed files with 382 additions and 0 deletions

View File

@ -19822,6 +19822,11 @@
"kind": "DeleteOptions",
"version": "v1beta1"
},
{
"group": "remedy.karmada.io",
"kind": "DeleteOptions",
"version": "v1alpha1"
},
{
"group": "resource.k8s.io",
"kind": "DeleteOptions",
@ -20561,6 +20566,11 @@
"kind": "WatchEvent",
"version": "v1beta1"
},
{
"group": "remedy.karmada.io",
"kind": "WatchEvent",
"version": "v1alpha1"
},
{
"group": "resource.k8s.io",
"kind": "WatchEvent",

View File

@ -66,6 +66,7 @@ import (
"github.com/karmada-io/karmada/pkg/controllers/mcs"
"github.com/karmada-io/karmada/pkg/controllers/multiclusterservice"
"github.com/karmada-io/karmada/pkg/controllers/namespace"
"github.com/karmada-io/karmada/pkg/controllers/remediation"
"github.com/karmada-io/karmada/pkg/controllers/status"
"github.com/karmada-io/karmada/pkg/controllers/unifiedauth"
"github.com/karmada-io/karmada/pkg/dependenciesdistributor"
@ -229,6 +230,7 @@ func init() {
controllers["multiclusterservice"] = startMCSController
controllers["endpointsliceCollect"] = startEndpointSliceCollectController
controllers["endpointsliceDispatch"] = startEndpointSliceDispatchController
controllers["remedy"] = startRemedyController
}
func startClusterController(ctx controllerscontext.Context) (enabled bool, err error) {
@ -689,6 +691,17 @@ func startMCSController(ctx controllerscontext.Context) (enabled bool, err error
return true, nil
}
func startRemedyController(ctx controllerscontext.Context) (enabled bool, err error) {
c := &remediation.RemedyController{
Client: ctx.Mgr.GetClient(),
RateLimitOptions: ctx.Opts.RateLimiterOptions,
}
if err = c.SetupWithManager(ctx.Mgr); err != nil {
return false, err
}
return true, nil
}
// setupControllers initialize controllers and setup one by one.
func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
restConfig := mgr.GetConfig()

View File

@ -0,0 +1,122 @@
/*
Copyright 2024 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remediation
import (
"context"
"reflect"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
remedyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/remedy/v1alpha1"
)
func newClusterEventHandler() handler.EventHandler {
return &clusterEventHandler{}
}
var _ handler.EventHandler = (*clusterEventHandler)(nil)
type clusterEventHandler struct {
}
func (h *clusterEventHandler) Create(_ context.Context, _ event.CreateEvent, _ workqueue.RateLimitingInterface) {
// Don't care about cluster creation events
}
func (h *clusterEventHandler) Update(_ context.Context, e event.UpdateEvent, queue workqueue.RateLimitingInterface) {
oldCluster := e.ObjectOld.(*clusterv1alpha1.Cluster)
newCluster := e.ObjectNew.(*clusterv1alpha1.Cluster)
if reflect.DeepEqual(oldCluster.Status.Conditions, newCluster.Status.Conditions) {
return
}
queue.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: newCluster.Name,
}})
}
func (h *clusterEventHandler) Delete(_ context.Context, _ event.DeleteEvent, _ workqueue.RateLimitingInterface) {
// Don't care about cluster deletion events
}
func (h *clusterEventHandler) Generic(_ context.Context, e event.GenericEvent, queue workqueue.RateLimitingInterface) {
queue.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.Object.GetName(),
}})
}
func newRemedyEventHandler(clusterChan chan<- event.GenericEvent) handler.EventHandler {
return &remedyEventHandler{
clusterChan: clusterChan,
}
}
var _ handler.EventHandler = (*remedyEventHandler)(nil)
type remedyEventHandler struct {
clusterChan chan<- event.GenericEvent
}
func (h *remedyEventHandler) Create(_ context.Context, e event.CreateEvent, _ workqueue.RateLimitingInterface) {
remedy := e.Object.(*remedyv1alpha1.Remedy)
for _, clusterName := range remedy.Spec.ClusterAffinity.ClusterNames {
h.clusterChan <- event.GenericEvent{Object: &clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
}}}
}
}
func (h *remedyEventHandler) Update(_ context.Context, e event.UpdateEvent, _ workqueue.RateLimitingInterface) {
oldRemedy := e.ObjectOld.(*remedyv1alpha1.Remedy)
newRemedy := e.ObjectNew.(*remedyv1alpha1.Remedy)
clusters := sets.Set[string]{}
for _, clusterName := range oldRemedy.Spec.ClusterAffinity.ClusterNames {
clusters.Insert(clusterName)
}
for _, clusterName := range newRemedy.Spec.ClusterAffinity.ClusterNames {
clusters.Insert(clusterName)
}
for clusterName := range clusters {
h.clusterChan <- event.GenericEvent{Object: &clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
}}}
}
}
func (h *remedyEventHandler) Delete(_ context.Context, e event.DeleteEvent, _ workqueue.RateLimitingInterface) {
remedy := e.Object.(*remedyv1alpha1.Remedy)
for _, clusterName := range remedy.Spec.ClusterAffinity.ClusterNames {
h.clusterChan <- event.GenericEvent{Object: &clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
}}}
}
}
func (h *remedyEventHandler) Generic(_ context.Context, _ event.GenericEvent, _ workqueue.RateLimitingInterface) {
}

View File

@ -0,0 +1,148 @@
/*
Copyright 2024 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remediation
import (
"context"
"reflect"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/source"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
remedyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/remedy/v1alpha1"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
)
// ControllerName is the controller name that will be used when reporting events.
const ControllerName = "remedy-controller"
// RemedyController is to sync Cluster resource, according to the cluster status
// condition, condition matching is performed through remedy, and then the actions
// required to be performed by the cluster are calculated.
type RemedyController struct {
client.Client
RateLimitOptions ratelimiterflag.Options
}
// Reconcile performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (c *RemedyController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Start to reconcile cluster(%s)", req.NamespacedName.String())
cluster := &clusterv1alpha1.Cluster{}
if err := c.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
if apierrors.IsNotFound(err) {
return controllerruntime.Result{}, nil
}
return controllerruntime.Result{}, err
}
if !cluster.DeletionTimestamp.IsZero() {
return controllerruntime.Result{}, nil
}
clusterRelatedRemedies, err := c.getClusterRelatedRemedies(ctx, cluster)
if err != nil {
klog.Errorf("Failed to get cluster(%s) related remedies: %v", cluster.Name, err)
return controllerruntime.Result{}, err
}
actions := calculateActions(clusterRelatedRemedies, cluster)
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
if reflect.DeepEqual(actions, cluster.Status.RemedyActions) {
return nil
}
cluster.Status.RemedyActions = actions
updateErr := c.Client.Status().Update(ctx, cluster)
if updateErr == nil {
return nil
}
updatedCluster := &clusterv1alpha1.Cluster{}
err = c.Client.Get(ctx, types.NamespacedName{Name: cluster.Name}, updatedCluster)
if err == nil {
cluster = updatedCluster
} else {
klog.Errorf("Failed to get updated cluster(%s): %v", cluster.Name, err)
}
return updateErr
})
if err != nil {
klog.Errorf("Failed to sync cluster(%s) remedy actions: %v", cluster.Name, err)
return controllerruntime.Result{}, err
}
klog.V(4).Infof("Success to sync cluster(%s) remedy actions", cluster.Name)
return controllerruntime.Result{}, nil
}
func (c *RemedyController) getClusterRelatedRemedies(ctx context.Context, cluster *clusterv1alpha1.Cluster) ([]*remedyv1alpha1.Remedy, error) {
remedyList := &remedyv1alpha1.RemedyList{}
if err := c.Client.List(ctx, remedyList); err != nil {
return nil, err
}
var clusterRelatedRemedies []*remedyv1alpha1.Remedy
for index := range remedyList.Items {
remedy := remedyList.Items[index]
if isRemedyWorkOnCluster(&remedy, cluster) {
clusterRelatedRemedies = append(clusterRelatedRemedies, &remedy)
}
}
return clusterRelatedRemedies, nil
}
// SetupWithManager creates a controller and register to controller manager.
func (c *RemedyController) SetupWithManager(mgr controllerruntime.Manager) error {
remedyController, err := controller.New(ControllerName, mgr, controller.Options{
Reconciler: c,
RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimitOptions),
})
if err != nil {
return err
}
err = c.setupWatches(remedyController, mgr)
if err != nil {
return err
}
return nil
}
func (c *RemedyController) setupWatches(remedyController controller.Controller, mgr controllerruntime.Manager) error {
clusterChan := make(chan event.GenericEvent)
clusterHandler := newClusterEventHandler()
remedyHandler := newRemedyEventHandler(clusterChan)
if err := remedyController.Watch(source.Kind(mgr.GetCache(), &clusterv1alpha1.Cluster{}), clusterHandler); err != nil {
return err
}
if err := remedyController.Watch(&source.Channel{Source: clusterChan}, clusterHandler); err != nil {
return err
}
if err := remedyController.Watch(source.Kind(mgr.GetCache(), &remedyv1alpha1.Remedy{}), remedyHandler); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,87 @@
/*
Copyright 2024 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remediation
import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
remedyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/remedy/v1alpha1"
)
func isRemedyWorkOnCluster(remedy *remedyv1alpha1.Remedy, cluster *clusterv1alpha1.Cluster) bool {
if remedy.Spec.ClusterAffinity == nil {
return true
}
for _, clusterName := range remedy.Spec.ClusterAffinity.ClusterNames {
if clusterName == cluster.Name {
return true
}
}
return false
}
func remedyDecisionMatchWithCluster(decisionMatches []remedyv1alpha1.DecisionMatch, conditions []metav1.Condition) bool {
if decisionMatches == nil {
return true
}
if conditions == nil {
return false
}
for _, decisionMatch := range decisionMatches {
if decisionMatch.ClusterConditionMatch == nil {
continue
}
conditionType := decisionMatch.ClusterConditionMatch.ConditionType
findStatusCondition := meta.FindStatusCondition(conditions, string(conditionType))
if findStatusCondition == nil {
continue
}
status := decisionMatch.ClusterConditionMatch.ConditionStatus
switch decisionMatch.ClusterConditionMatch.Operator {
case remedyv1alpha1.ClusterConditionEqual:
if status == string(findStatusCondition.Status) {
return true
}
case remedyv1alpha1.ClusterConditionNotEqual:
if status != string(findStatusCondition.Status) {
return true
}
}
}
return false
}
func calculateActions(clusterRelatedRemedies []*remedyv1alpha1.Remedy, cluster *clusterv1alpha1.Cluster) []string {
actionSet := sets.NewString()
for _, remedy := range clusterRelatedRemedies {
if remedyDecisionMatchWithCluster(remedy.Spec.DecisionMatches, cluster.Status.Conditions) {
for _, action := range remedy.Spec.Actions {
actionSet.Insert(string(action))
}
}
}
return actionSet.List()
}

View File

@ -30,6 +30,7 @@ import (
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
remedyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/remedy/v1alpha1"
searchv1alpha1 "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
@ -50,6 +51,7 @@ func init() {
utilruntime.Must(mcsv1alpha1.AddToScheme(aggregatedScheme)) // add mcs-api schemes
utilruntime.Must(clusterapiv1beta1.AddToScheme(aggregatedScheme)) // add cluster-api v1beta1 schemes
utilruntime.Must(autoscalingv1alpha1.AddToScheme(aggregatedScheme)) // add autoscaling v1alpha1 schemes
utilruntime.Must(remedyv1alpha1.AddToScheme(aggregatedScheme)) // add autoscaling v1alpha1 schemes
}
// NewSchema returns a singleton schema set which aggregated Kubernetes's schemes and extended schemes.