support advanced daemonset

This commit is contained in:
Chenxi Jiang 2022-09-30 22:24:46 +08:00
parent c322b09f96
commit 7a554f1c25
5 changed files with 323 additions and 0 deletions

View File

@ -142,6 +142,8 @@ type BatchReleaseReconciler struct {
// +kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps.kruise.io,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps.kruise.io,resources=statefulsets/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps.kruise.io,resources=daemonsets,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=apps.kruise.io,resources=daemonsets/status,verbs=get;update;patch
// Reconcile reads that state of the cluster for a Rollout object and makes changes based on the state read
// and what is in the Rollout.Spec

View File

@ -0,0 +1,171 @@
package workloads
import (
"context"
"fmt"
kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
"github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/util"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// AdvancedDaemonSetController is responsible for handling rollout Advanced DaemonSet type of workloads
type AdvancedDaemonSetController struct {
advancedDaemonSetController
daemonSet *kruiseappsv1alpha1.DaemonSet
}
// NewAdvancedDaemonSetController creates a new AdvancedDaemonSet rollout controller
func NewAdvancedDaemonSetController(cli client.Client, recorder record.EventRecorder, release *v1alpha1.BatchRelease, newStatus *v1alpha1.BatchReleaseStatus, targetNamespacedName types.NamespacedName) *AdvancedDaemonSetController {
return &AdvancedDaemonSetController{
advancedDaemonSetController: advancedDaemonSetController{
workloadController: workloadController{
client: cli,
recorder: recorder,
release: release,
newStatus: newStatus,
},
releasePlanKey: client.ObjectKeyFromObject(release),
targetNamespacedName: targetNamespacedName,
},
}
}
// VerifyWorkload verifies that the workload is ready to execute release plan
func (c *AdvancedDaemonSetController) VerifyWorkload() (bool, error) {
var err error
var message string
defer func() {
if err != nil {
c.recorder.Event(c.release, v1.EventTypeWarning, "VerifyFailed", err.Error())
} else if message != "" {
klog.Warningf(message)
}
}()
if err = c.fetchAdvanceDaemonSet(); err != nil {
return false, err
}
// if the workload status is untrustworthy, return and retry
if c.daemonSet.Status.ObservedGeneration != c.daemonSet.Generation {
message = fmt.Sprintf("advancedDaemonset(%v) is still reconciling, wait for it to be done", c.targetNamespacedName)
return false, nil
}
// if the workload has been promoted, return and not retry
if c.daemonSet.Status.UpdatedNumberScheduled == *&c.daemonSet.Status.DesiredNumberScheduled {
message = fmt.Sprintf("advancedDaemonset(%v) update revision has been promoted, no need to rollout", c.targetNamespacedName)
return false, nil
}
// if the workload is not paused, no need to progress it
if !*c.daemonSet.Spec.UpdateStrategy.RollingUpdate.Paused {
message = fmt.Sprintf("advancedDaemonset(%v) should be paused before execute the release plan", c.targetNamespacedName)
return false, nil
}
c.recorder.Event(c.release, v1.EventTypeNormal, "Verified", "ReleasePlan and the AdvancedDaemonset resource are verified")
return true, nil
}
// PrepareBeforeProgress makes sure that the source and target AdvancedDaemonSet is under our control
func (c *AdvancedDaemonSetController) PrepareBeforeProgress() (bool, error) {
if err := c.fetchAdvanceDaemonSet(); err != nil {
return false, err
}
// claim the AdvancedDaemonSet is under our control
if _, err := c.claimAdvancedDaemonSet(c.daemonSet); err != nil {
return false, err
}
// record revision info to BatchRelease.Status
c.recordAdvancedDaemonSetRevision()
c.recorder.Event(c.release, v1.EventTypeNormal, "InitializedSuccessfully", "Rollout resource are initialized")
return true, nil
}
// UpgradeOneBatch calculates the number of pods we can upgrade once according to the rollout spec
// and then set the partition accordingly
func (c *AdvancedDaemonSetController) UpgradeOneBatch() (bool, error) {
if err := c.fetchAdvanceDaemonSet(); err != nil {
return false, err
}
// todo
c.recorder.Eventf(c.release, v1.EventTypeNormal, "SetBatchDone",
"Finished submitting all upgrade quests for batch %d", c.newStatus.CanaryStatus.CurrentBatch)
return true, nil
}
// CheckOneBatchReady checks to see if the pods are all available according to the rollout plan
func (c *AdvancedDaemonSetController) CheckOneBatchReady() (bool, error) {
return true, nil
}
// FinalizeProgress makes sure the AdvancedDaemonSet is all upgraded
func (c *AdvancedDaemonSetController) FinalizeProgress(cleanup bool) (bool, error) {
c.recorder.Eventf(c.release, v1.EventTypeNormal, "FinalizedSuccessfully", "Rollout resource are finalized: cleanup=%v", cleanup)
return true, nil
}
// SyncWorkloadInfo return change type if workload was changed during release
func (c *AdvancedDaemonSetController) SyncWorkloadInfo() (WorkloadEventType, *util.WorkloadInfo, error) {
// ignore the sync if the release plan is deleted
if c.release.DeletionTimestamp != nil {
return IgnoreWorkloadEvent, nil, nil
}
if err := c.fetchAdvanceDaemonSet(); err != nil {
if apierrors.IsNotFound(err) {
return WorkloadHasGone, nil, err
}
return "", nil, err
}
return IgnoreWorkloadEvent, nil, nil
}
// fetchAdvancedDaemonSet fetch advancedDaemonSet to c.daemonSet
func (c *AdvancedDaemonSetController) fetchAdvanceDaemonSet() error {
daemonSet := &kruiseappsv1alpha1.DaemonSet{}
if err := c.client.Get(context.TODO(), c.targetNamespacedName, daemonSet); err != nil {
if !apierrors.IsNotFound(err) {
c.recorder.Event(c.release, v1.EventTypeWarning, "GetDaemonSetFailed", err.Error())
}
return err
}
c.daemonSet = daemonSet
return nil
}
func (c *AdvancedDaemonSetController) patchPodBatchLabel(canaryGoal int32) (bool, error) {
rolloutID, exist := c.release.Labels[util.RolloutIDLabel]
if !exist || rolloutID == "" {
return true, nil
}
pods, err := util.ListOwnedPods(c.client, c.daemonSet)
if err != nil {
klog.Errorf("Failed to list pods for CloneSet %v", c.targetNamespacedName)
return false, err
}
batchID := c.release.Status.CanaryStatus.CurrentBatch + 1
updateRevision := c.release.Status.UpdateRevision
return util.PatchPodBatchLabel(c.client, pods, rolloutID, batchID, updateRevision, canaryGoal, c.releasePlanKey)
}
func (c *AdvancedDaemonSetController) recordAdvancedDaemonSetRevision() {
c.newStatus.UpdateRevision = c.daemonSet.Status.DaemonSetHash
}

View File

@ -0,0 +1,91 @@
package workloads
import (
"context"
"encoding/json"
"k8s.io/apimachinery/pkg/types"
kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
"github.com/openkruise/rollouts/pkg/util"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// advancedDaemonSetController is the place to hold fields needed for handle Advanced DaemonSet type of workloads
type advancedDaemonSetController struct {
workloadController
releasePlanKey types.NamespacedName
targetNamespacedName types.NamespacedName
}
// add the parent controller to the owner of the AdvancedDaemonSet, unpause it and initialize the size
// before kicking start the update and start from every pod in the old version
func (c *advancedDaemonSetController) claimAdvancedDaemonSet(daemonSet *kruiseappsv1alpha1.DaemonSet) (bool, error) {
var controlled bool
if controlInfo, ok := daemonSet.Annotations[util.BatchReleaseControlAnnotation]; ok && controlInfo != "" {
ref := &metav1.OwnerReference{}
err := json.Unmarshal([]byte(controlInfo), ref)
if err == nil && ref.UID == c.release.UID {
controlled = true
klog.V(3).Infof("AdvancedDaemonSet(%v) has been controlled by this BatchRelease(%v), no need to claim again",
c.targetNamespacedName, c.releasePlanKey)
} else {
klog.Errorf("Failed to parse controller info from AdvancedDaemonSet(%v) annotation, error: %v, controller info: %+v",
c.targetNamespacedName, err, *ref)
}
}
patch := map[string]interface{}{}
switch {
// if the cloneSet has been claimed by this release
case controlled:
// make sure paused=false
if *daemonSet.Spec.UpdateStrategy.RollingUpdate.Paused {
patch = map[string]interface{}{
"spec": map[string]interface{}{
"updateStrategy": map[string]interface{}{
"rollingUpdate": map[string]interface{}{
"paused": false,
},
},
},
}
}
default:
patch = map[string]interface{}{
"spec": map[string]interface{}{
"updateStrategy": map[string]interface{}{
"rollingUpdate": map[string]interface{}{
"partition": &intstr.IntOrString{Type: intstr.String, StrVal: "100%"},
"paused": false,
},
},
},
}
controlInfo := metav1.NewControllerRef(c.release, c.release.GetObjectKind().GroupVersionKind())
controlByte, _ := json.Marshal(controlInfo)
patch["metadata"] = map[string]interface{}{
"annotations": map[string]string{
util.BatchReleaseControlAnnotation: string(controlByte),
},
}
}
if len(patch) > 0 {
cloneObj := daemonSet.DeepCopy()
patchByte, _ := json.Marshal(patch)
if err := c.client.Patch(context.TODO(), cloneObj, client.RawPatch(types.MergePatchType, patchByte)); err != nil {
c.recorder.Eventf(c.release, v1.EventTypeWarning, "ClaimAdvancedDaemonSetFailed", err.Error())
return false, err
}
}
klog.V(3).Infof("Claim AdvancedDaemonSet(%v) Successfully", c.targetNamespacedName)
return true, nil
}

View File

@ -109,6 +109,7 @@ var (
ControllerKruiseKindCS = appsv1alpha1.SchemeGroupVersion.WithKind("CloneSet")
ControllerKruiseKindSts = appsv1beta1.SchemeGroupVersion.WithKind("StatefulSet")
ControllerKruiseOldKindSts = appsv1alpha1.SchemeGroupVersion.WithKind("StatefulSet")
ControllerKruiseKindDS = appsv1alpha1.SchemeGroupVersion.WithKind("DaemonSet")
)
// getKruiseCloneSet returns the kruise cloneSet referenced by the provided controllerRef.

View File

@ -90,6 +90,30 @@ func (h *WorkloadHandler) Handle(ctx context.Context, req admission.Request) adm
return admission.Errored(http.StatusInternalServerError, err)
}
return admission.PatchResponseFromRaw(req.AdmissionRequest.Object.Raw, marshalled)
case util.ControllerKruiseKindDS.Kind:
// check daemonset
newObj := &kruiseappsv1alpha1.DaemonSet{}
if err := h.Decoder.Decode(req, newObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
oldObj := &kruiseappsv1alpha1.DaemonSet{}
if err := h.Decoder.Decode(
admission.Request{AdmissionRequest: admissionv1.AdmissionRequest{Object: req.AdmissionRequest.OldObject}},
oldObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
changed, err := h.handleAdvancedDaemonSet(newObj, oldObj)
if err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
if !changed {
return admission.Allowed("")
}
marshalled, err := json.Marshal(newObj)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
}
return admission.PatchResponseFromRaw(req.AdmissionRequest.Object.Raw, marshalled)
}
// native k8s deloyment
@ -286,6 +310,40 @@ func (h *WorkloadHandler) handleCloneSet(newObj, oldObj *kruiseappsv1alpha1.Clon
return
}
func (h *WorkloadHandler) handleAdvancedDaemonSet(newObj, oldObj *kruiseappsv1alpha1.DaemonSet) (changed bool, err error) {
// indicate whether the workload can enter the rollout process
// 1. advancedDaemonSet.spec.UpdateStrategy.type must be RollingUpdate
if newObj.Spec.UpdateStrategy.Type != kruiseappsv1alpha1.RollingUpdateDaemonSetStrategyType {
klog.Warningf("advanceDaemonSet(%s/%s) strategy type is not 'RollingUpdate', rollout will not work on it", newObj.Namespace, newObj.Name)
return
}
// 2. advancedDaemonSet.spec.PodTemplate is changed
if util.EqualIgnoreHash(&oldObj.Spec.Template, &newObj.Spec.Template) {
return
}
// 3. have matched rollout crd
rollout, err := h.fetchMatchedRollout(newObj)
if err != nil {
return
} else if rollout == nil {
return
}
klog.Infof("advancedDaemonSet(%s/%s) will be in rollout progressing, and paused", newObj.Namespace, newObj.Name)
changed = true
// need set workload paused = true
newObj.Spec.UpdateStrategy.RollingUpdate.Paused = &changed
state := &util.RolloutState{RolloutName: rollout.Name}
by, _ := json.Marshal(state)
if newObj.Annotations == nil {
newObj.Annotations = map[string]string{}
}
newObj.Annotations[util.InRolloutProgressingAnnotation] = string(by)
return
}
func (h *WorkloadHandler) fetchMatchedRollout(obj client.Object) (*appsv1alpha1.Rollout, error) {
oGv := obj.GetObjectKind().GroupVersionKind()
rolloutList := &appsv1alpha1.RolloutList{}