add dynamic watcher for various workload types (#47)

Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>

Co-authored-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
This commit is contained in:
Wei-Xiang Sun 2022-06-22 13:46:09 +08:00 committed by GitHub
parent cf29580566
commit 149e5a48da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 148 additions and 14 deletions

View File

@ -2,5 +2,8 @@ resources:
- manifests.yaml
- service.yaml
patchesStrategicMerge:
- patch_manifests.yaml
configurations:
- kustomizeconfig.yaml

View File

@ -88,6 +88,27 @@ webhooks:
resources:
- statefulsets
sideEffects: None
- admissionReviewVersions:
- v1
- v1beta1
clientConfig:
service:
name: webhook-service
namespace: system
path: /mutate-unified-workload
failurePolicy: Ignore
name: munifiedworload.kb.io
rules:
- apiGroups:
- '*'
apiVersions:
- '*'
operations:
- CREATE
- UPDATE
resources:
- '*'
sideEffects: None
---
apiVersion: admissionregistration.k8s.io/v1

View File

@ -0,0 +1,10 @@
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
name: mutating-webhook-configuration
webhooks:
- name: munifiedworload.kb.io
objectSelector:
matchExpressions:
- key: rollouts.kruise.io/workload-type
operator: Exists

View File

@ -21,6 +21,7 @@ import (
"encoding/json"
"flag"
"reflect"
"sync"
"time"
"github.com/openkruise/rollouts/api/v1alpha1"
@ -45,8 +46,20 @@ import (
var (
concurrentReconciles = 3
workloadHandler handler.EventHandler
runtimeController controller.Controller
watchedWorkload sync.Map
)
func init() {
watchedWorkload = sync.Map{}
watchedWorkload.LoadOrStore(util.ControllerKindDep.String(), struct{}{})
watchedWorkload.LoadOrStore(util.ControllerKindSts.String(), struct{}{})
watchedWorkload.LoadOrStore(util.ControllerKruiseKindCS.String(), struct{}{})
watchedWorkload.LoadOrStore(util.ControllerKruiseKindSts.String(), struct{}{})
watchedWorkload.LoadOrStore(util.ControllerKruiseOldKindSts.String(), struct{}{})
}
const ReleaseFinalizer = "rollouts.kruise.io/batch-release-finalizer"
func init() {
@ -101,7 +114,8 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}
workloadHandler := &workloadEventHandler{Reader: mgr.GetCache()}
runtimeController = c
workloadHandler = &workloadEventHandler{Reader: mgr.GetCache()}
return util.AddWorkloadWatcher(c, workloadHandler)
}
@ -146,6 +160,23 @@ func (r *BatchReleaseReconciler) Reconcile(ctx context.Context, req ctrl.Request
klog.Infof("Begin to reconcile BatchRelease(%v/%v), release-phase: %v", release.Namespace, release.Name, release.Status.Phase)
// If workload watcher does not exist, then add the watcher dynamically
workloadRef := release.Spec.TargetRef.WorkloadRef
workloadGVK := util.GetGVKFrom(workloadRef)
_, exists := watchedWorkload.Load(workloadGVK.String())
if workloadRef != nil && !exists {
succeeded, err := util.AddWatcherDynamically(runtimeController, workloadHandler, workloadGVK)
if err != nil {
return ctrl.Result{}, err
} else if succeeded {
watchedWorkload.LoadOrStore(workloadGVK.String(), struct{}{})
klog.Infof("Rollout controller begin to watch workload type: %s", workloadGVK.String())
// return, and wait informer cache to be synced
return ctrl.Result{}, nil
}
}
// finalizer will block the deletion of batchRelease
// util all canary resources and settings are cleaned up.
reconcileDone, err := r.handleFinalizer(release)

View File

@ -18,6 +18,7 @@ package rollout
import (
"context"
"sync"
"time"
rolloutv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
@ -25,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
@ -34,8 +36,20 @@ import (
var (
concurrentReconciles = 3
runtimeController controller.Controller
workloadHandler handler.EventHandler
watchedWorkload sync.Map
)
func init() {
watchedWorkload = sync.Map{}
watchedWorkload.LoadOrStore(util.ControllerKindDep.String(), struct{}{})
watchedWorkload.LoadOrStore(util.ControllerKindSts.String(), struct{}{})
watchedWorkload.LoadOrStore(util.ControllerKruiseKindCS.String(), struct{}{})
watchedWorkload.LoadOrStore(util.ControllerKruiseKindSts.String(), struct{}{})
watchedWorkload.LoadOrStore(util.ControllerKruiseOldKindSts.String(), struct{}{})
}
// RolloutReconciler reconciles a Rollout object
type RolloutReconciler struct {
client.Client
@ -79,6 +93,23 @@ func (r *RolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, err
}
// If workload watcher does not exist, then add the watcher dynamically
workloadRef := rollout.Spec.ObjectRef.WorkloadRef
workloadGVK := util.GetGVKFrom(workloadRef)
_, exists := watchedWorkload.Load(workloadGVK.String())
if workloadRef != nil && !exists {
succeeded, err := util.AddWatcherDynamically(runtimeController, workloadHandler, workloadGVK)
if err != nil {
return ctrl.Result{}, err
} else if succeeded {
watchedWorkload.LoadOrStore(workloadGVK.String(), struct{}{})
klog.Infof("Rollout controller begin to watch workload type: %s", workloadGVK.String())
// return, and wait informer cache to be synced
return ctrl.Result{}, nil
}
}
// handle finalizer
err = r.handleFinalizer(rollout)
if err != nil {
@ -125,6 +156,10 @@ func (r *RolloutReconciler) SetupWithManager(mgr ctrl.Manager) error {
return err
}
workloadHandler := &enqueueRequestForWorkload{reader: mgr.GetCache(), scheme: r.Scheme}
return util.AddWorkloadWatcher(c, workloadHandler)
runtimeController = c
workloadHandler = &enqueueRequestForWorkload{reader: mgr.GetCache(), scheme: r.Scheme}
if err = util.AddWorkloadWatcher(c, workloadHandler); err != nil {
return err
}
return nil
}

View File

@ -108,6 +108,7 @@ var (
ControllerKindSts = apps.SchemeGroupVersion.WithKind("StatefulSet")
ControllerKruiseKindCS = appsv1alpha1.SchemeGroupVersion.WithKind("CloneSet")
ControllerKruiseKindSts = appsv1beta1.SchemeGroupVersion.WithKind("StatefulSet")
ControllerKruiseOldKindSts = appsv1alpha1.SchemeGroupVersion.WithKind("StatefulSet")
)
// getKruiseCloneSet returns the kruise cloneSet referenced by the provided controllerRef.
@ -197,11 +198,6 @@ func (r *ControllerFinder) getDeployment(namespace string, ref *rolloutv1alpha1.
return workload, nil
}
// no need to progress
if stable.Status.Replicas == stable.Status.UpdatedReplicas {
return workload, nil
}
// in rollout progressing
workload.InRolloutProgressing = true
// workload is continuous release, indicates rollback(v1 -> v2 -> v1)

View File

@ -22,6 +22,7 @@ import (
kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
kruiseappsv1beta1 "github.com/openkruise/kruise-api/apps/v1beta1"
rolloutv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/util/client"
apps "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
@ -49,6 +50,7 @@ const (
// Only when RolloutIDLabel is set, RolloutBatchIDLabel will be patched.
// Users can use RolloutIDLabel and RolloutBatchIDLabel to select the pods that are upgraded in some certain batch and release.
RolloutBatchIDLabel = "apps.kruise.io/rollout-batch-id"
WorkloadTypeLabel = "rollouts.kruise.io/workload-type"
)
// RolloutState is annotation[rollouts.kruise.io/in-progressing] value
@ -135,6 +137,24 @@ func DiscoverGVK(gvk schema.GroupVersionKind) bool {
return true
}
func GetGVKFrom(workloadRef *rolloutv1alpha1.WorkloadRef) schema.GroupVersionKind {
if workloadRef == nil {
return schema.GroupVersionKind{}
}
return schema.FromAPIVersionAndKind(workloadRef.APIVersion, workloadRef.Kind)
}
func AddWatcherDynamically(c controller.Controller, h handler.EventHandler, gvk schema.GroupVersionKind) (bool, error) {
if !DiscoverGVK(gvk) {
klog.Errorf("Failed to find GVK(%v) in cluster", gvk.String())
return false, nil
}
object := &unstructured.Unstructured{}
object.SetGroupVersionKind(gvk)
return true, c.Watch(&source.Kind{Type: object}, h)
}
func DumpJSON(o interface{}) string {
by, _ := json.Marshal(o)
return string(by)

View File

@ -25,6 +25,7 @@ import (
"fmt"
"hash"
"hash/fnv"
"strings"
"github.com/davecgh/go-spew/spew"
appsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
@ -61,6 +62,14 @@ const (
alphanums = "bcdfghjklmnpqrstvwxz2456789"
)
type WorkloadType string
const (
StatefulSetType WorkloadType = "statefulset"
DeploymentType WorkloadType = "deployment"
CloneSetType WorkloadType = "cloneset"
)
var (
knownWorkloadGVKs = []*schema.GroupVersionKind{
&ControllerKindDep,
@ -525,3 +534,7 @@ func GetOwnerWorkload(r client.Reader, object client.Object) (client.Object, err
}
return ownerObj, nil
}
func IsWorkloadType(object client.Object, t WorkloadType) bool {
return WorkloadType(strings.ToLower(object.GetLabels()[WorkloadTypeLabel])) == t
}

View File

@ -24,6 +24,7 @@ import (
// +kubebuilder:webhook:path=/mutate-apps-v1-deployment,mutating=true,failurePolicy=Ignore,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=apps,resources=deployments,verbs=update,versions=v1,name=mdeployment.kb.io
// +kubebuilder:webhook:path=/mutate-apps-v1-statefulset,mutating=true,failurePolicy=Ignore,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=apps,resources=statefulsets,verbs=update,versions=v1,name=mstatefulset.kb.io
// +kubebuilder:webhook:path=/mutate-apps-kruise-io-statefulset,mutating=true,failurePolicy=Ignore,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=apps.kruise.io,resources=statefulsets,verbs=create;update,versions=v1alpha1;v1beta1,name=madvancedstatefulset.kb.io
// +kubebuilder:webhook:path=/mutate-unified-workload,mutating=true,failurePolicy=Ignore,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=*,resources=*,verbs=create;update,versions=*,name=munifiedworload.kb.io
var (
// HandlerMap contains admission webhook handlers
@ -32,5 +33,6 @@ var (
"mutate-apps-v1-deployment": &WorkloadHandler{},
"mutate-apps-v1-statefulset": &WorkloadHandler{},
"mutate-apps-kruise-io-statefulset": &WorkloadHandler{},
"mutate-unified-workload": &WorkloadHandler{},
}
)

View File

@ -122,13 +122,16 @@ func (h *WorkloadHandler) Handle(ctx context.Context, req admission.Request) adm
}
}
// other statefulset-like workload, including advanced statefulset
// handle other workload types, including native/advanced statefulset
{
newObj := &unstructured.Unstructured{}
newObj.SetGroupVersionKind(schema.GroupVersionKind{Group: req.Kind.Group, Version: req.Kind.Version, Kind: req.Kind.Kind})
if err := h.Decoder.Decode(req, newObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
if !util.IsWorkloadType(newObj, util.StatefulSetType) && req.Kind.Kind != util.ControllerKindSts.Kind {
return admission.Allowed("")
}
oldObj := &unstructured.Unstructured{}
oldObj.SetGroupVersionKind(schema.GroupVersionKind{Group: req.Kind.Group, Version: req.Kind.Version, Kind: req.Kind.Kind})
if err := h.Decoder.Decode(