batchrelease controller (#9)

* add batchrelease controller and test

Signed-off-by: veophi <vec.g.sun@gmail.com>

* improve batchrelease controller

Signed-off-by: veophi <vec.g.sun@gmail.com>
This commit is contained in:
Wei-Xiang Sun 2022-03-10 14:15:41 +08:00 committed by GitHub
parent 40748424e4
commit 7cc32dc4ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 6207 additions and 0 deletions

View File

@ -0,0 +1,225 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package batchrelease
import (
"context"
"flag"
"reflect"
"time"
kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
"github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/util"
apps "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
var (
concurrentReconciles = 3
)
const ReleaseFinalizer = "rollouts.kruise.io/batch-release-finalizer"
func init() {
flag.IntVar(&concurrentReconciles, "batchrelease-workers", concurrentReconciles, "Max concurrent workers for BatchRelease controller.")
}
// Add creates a new Rollout Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager) error {
return add(mgr, newReconciler(mgr))
}
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
recorder := mgr.GetEventRecorderFor("batchrelease-controller")
cli := mgr.GetClient()
return &BatchReleaseReconciler{
Client: cli,
Scheme: mgr.GetScheme(),
recorder: recorder,
executor: NewReleasePlanExecutor(cli, recorder),
}
}
// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
c, err := controller.New("batchrelease-controller", mgr, controller.Options{
Reconciler: r, MaxConcurrentReconciles: concurrentReconciles})
if err != nil {
return err
}
// Watch for changes to BatchRelease
err = c.Watch(&source.Kind{Type: &v1alpha1.BatchRelease{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
oldObject := e.ObjectOld.(*v1alpha1.BatchRelease)
newObject := e.ObjectNew.(*v1alpha1.BatchRelease)
if oldObject.Generation != newObject.Generation || newObject.DeletionTimestamp != nil {
klog.V(3).Infof("Observed updated Spec for BatchRelease: %s/%s", newObject.Namespace, newObject.Name)
return true
}
return false
},
})
if err != nil {
return err
}
if util.DiscoverGVK(util.CloneSetGVK) {
// Watch changes to CloneSet
err = c.Watch(&source.Kind{Type: &kruiseappsv1alpha1.CloneSet{}}, &workloadEventHandler{Reader: mgr.GetCache()})
if err != nil {
return err
}
}
// Watch changes to Deployment
err = c.Watch(&source.Kind{Type: &apps.Deployment{}}, &workloadEventHandler{Reader: mgr.GetCache()})
if err != nil {
return err
}
return nil
}
var _ reconcile.Reconciler = &BatchReleaseReconciler{}
// BatchReleaseReconciler reconciles a BatchRelease object
type BatchReleaseReconciler struct {
client.Client
Scheme *runtime.Scheme
recorder record.EventRecorder
executor *Executor
}
// +kubebuilder:rbac:groups="*",resources="events",verbs=create;update;patch
// +kubebuilder:rbac:groups=rollouts.kruise.io,resources=batchreleases,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=rollouts.kruise.io,resources=batchreleases/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps.kruise.io,resources=clonesets,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=apps.kruise.io,resources=clonesets/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=deployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=replicasets,verbs=get;list;watch
// +kubebuilder:rbac:groups=apps,resources=replicasets/status,verbs=get;update;patch
// Reconcile reads that state of the cluster for a Rollout object and makes changes based on the state read
// and what is in the Rollout.Spec
func (r *BatchReleaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
release := new(v1alpha1.BatchRelease)
err := r.Get(context.TODO(), req.NamespacedName, release)
if err != nil {
if errors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
return ctrl.Result{}, err
}
klog.Infof("Begin to reconcile BatchRelease(%v/%v), release-phase: %v", release.Namespace, release.Name, release.Status.Phase)
// finalizer will block the deletion of batchRelease
// util all canary resources and settings are cleaned up.
reconcileDone, err := r.handleFinalizer(release)
if reconcileDone {
return reconcile.Result{}, err
}
// set the release info for executor before executing.
r.executor.SetReleaseInfo(release)
// executor start to execute the batch release plan.
startTimestamp := time.Now()
result, currentStatus := r.executor.Do()
defer func() {
klog.InfoS("Finished one round of reconciling release plan",
"BatchRelease", client.ObjectKeyFromObject(release),
"phase", currentStatus.Phase,
"current-batch", currentStatus.CanaryStatus.CurrentBatch,
"current-batch-state", currentStatus.CanaryStatus.CurrentBatchState,
"reconcile-result ", result, "time-cost", time.Since(startTimestamp))
}()
return result, r.updateStatus(release, currentStatus)
}
func (r *BatchReleaseReconciler) updateStatus(release *v1alpha1.BatchRelease, newStatus *v1alpha1.BatchReleaseStatus) error {
var err error
defer func() {
if err != nil {
klog.Errorf("Failed to update status for BatchRelease(%v), error: %v", client.ObjectKeyFromObject(release), err)
}
}()
// observe and record the latest changes for generation and release plan
newStatus.ObservedGeneration = release.Generation
// do not retry
if !reflect.DeepEqual(release.Status, *newStatus) {
releaseClone := release.DeepCopy()
releaseClone.Status = *newStatus
return r.Status().Update(context.TODO(), releaseClone)
}
return nil
}
func (r *BatchReleaseReconciler) handleFinalizer(release *v1alpha1.BatchRelease) (bool, error) {
var err error
defer func() {
if err != nil {
klog.Errorf("Failed to handle finalizer for BatchRelease(%v), error: %v", client.ObjectKeyFromObject(release), err)
}
}()
// remove the release finalizer if it needs
if !release.DeletionTimestamp.IsZero() &&
HasTerminatingCondition(release.Status) &&
controllerutil.ContainsFinalizer(release, ReleaseFinalizer) {
err = util.UpdateFinalizer(r.Client, release, "Remove", ReleaseFinalizer)
if client.IgnoreNotFound(err) != nil {
return true, err
}
return true, nil
}
// add the release finalizer if it needs
if !controllerutil.ContainsFinalizer(release, ReleaseFinalizer) {
err = util.UpdateFinalizer(r.Client, release, "Add", ReleaseFinalizer)
if client.IgnoreNotFound(err) != nil {
return true, err
}
}
return false, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,287 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package batchrelease
import (
"context"
"encoding/json"
kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
"github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/controllers/batchrelease/workloads"
"github.com/openkruise/rollouts/pkg/util"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
type EventAction string
const (
CreateEventAction EventAction = "Create"
DeleteEventAction EventAction = "Delete"
)
var (
controllerKruiseKindCS = kruiseappsv1alpha1.SchemeGroupVersion.WithKind("CloneSet")
controllerKindDep = appsv1.SchemeGroupVersion.WithKind("Deployment")
)
var _ handler.EventHandler = &workloadEventHandler{}
type workloadEventHandler struct {
client.Reader
}
func (w workloadEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
w.handleWorkload(q, evt.Object, CreateEventAction)
}
func (w workloadEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
var oldAccessor, newAccessor *workloads.WorkloadInfo
var gvk schema.GroupVersionKind
switch evt.ObjectNew.(type) {
case *kruiseappsv1alpha1.CloneSet:
gvk = controllerKruiseKindCS
oldClone := evt.ObjectOld.(*kruiseappsv1alpha1.CloneSet)
newClone := evt.ObjectNew.(*kruiseappsv1alpha1.CloneSet)
var oldReplicas, newReplicas int32
if oldClone.Spec.Replicas != nil {
oldReplicas = *oldClone.Spec.Replicas
}
if newClone.Spec.Replicas != nil {
newReplicas = *newClone.Spec.Replicas
}
oldAccessor = &workloads.WorkloadInfo{
Replicas: &oldReplicas,
Paused: oldClone.Spec.UpdateStrategy.Paused,
Status: &workloads.WorkloadStatus{
Replicas: oldClone.Status.Replicas,
ReadyReplicas: oldClone.Status.ReadyReplicas,
UpdatedReplicas: oldClone.Status.UpdatedReplicas,
UpdatedReadyReplicas: oldClone.Status.UpdatedReadyReplicas,
ObservedGeneration: oldClone.Status.ObservedGeneration,
},
Metadata: &oldClone.ObjectMeta,
}
newAccessor = &workloads.WorkloadInfo{
Replicas: &newReplicas,
Paused: newClone.Spec.UpdateStrategy.Paused,
Status: &workloads.WorkloadStatus{
Replicas: newClone.Status.Replicas,
ReadyReplicas: newClone.Status.ReadyReplicas,
UpdatedReplicas: newClone.Status.UpdatedReplicas,
UpdatedReadyReplicas: newClone.Status.UpdatedReadyReplicas,
ObservedGeneration: newClone.Status.ObservedGeneration,
},
Metadata: &newClone.ObjectMeta,
}
case *appsv1.Deployment:
gvk = controllerKindDep
oldDeploy := evt.ObjectOld.(*appsv1.Deployment)
newDeploy := evt.ObjectNew.(*appsv1.Deployment)
var oldReplicas, newReplicas int32
if oldDeploy.Spec.Replicas != nil {
oldReplicas = *oldDeploy.Spec.Replicas
}
if newDeploy.Spec.Replicas != nil {
newReplicas = *newDeploy.Spec.Replicas
}
oldAccessor = &workloads.WorkloadInfo{
Replicas: &oldReplicas,
Paused: oldDeploy.Spec.Paused,
Status: &workloads.WorkloadStatus{
Replicas: oldDeploy.Status.Replicas,
ReadyReplicas: oldDeploy.Status.AvailableReplicas,
UpdatedReplicas: oldDeploy.Status.UpdatedReplicas,
ObservedGeneration: oldDeploy.Status.ObservedGeneration,
},
Metadata: &oldDeploy.ObjectMeta,
}
newAccessor = &workloads.WorkloadInfo{
Replicas: &newReplicas,
Paused: newDeploy.Spec.Paused,
Status: &workloads.WorkloadStatus{
Replicas: newDeploy.Status.Replicas,
ReadyReplicas: newDeploy.Status.AvailableReplicas,
UpdatedReplicas: newDeploy.Status.UpdatedReplicas,
ObservedGeneration: newDeploy.Status.ObservedGeneration,
},
Metadata: &newDeploy.ObjectMeta,
}
default:
return
}
if newAccessor.Metadata.ResourceVersion == oldAccessor.Metadata.ResourceVersion {
return
}
if observeGenerationChanged(newAccessor, oldAccessor) ||
observeLatestGeneration(newAccessor, oldAccessor) ||
observeScaleEventDone(newAccessor, oldAccessor) ||
observeReplicasChanged(newAccessor, oldAccessor) {
workloadNamespacedName := types.NamespacedName{
Namespace: newAccessor.Metadata.Namespace,
Name: newAccessor.Metadata.Name,
}
brNsn, err := w.getBatchRelease(workloadNamespacedName, gvk, newAccessor.Metadata.Annotations[util.BatchReleaseControlAnnotation])
if err != nil {
klog.Errorf("unable to get BatchRelease related with %s (%s/%s), error: %v",
gvk.Kind, workloadNamespacedName.Namespace, workloadNamespacedName.Name, err)
return
}
if len(brNsn.Name) != 0 {
klog.V(3).Infof("%s (%s/%s) changed generation from %d to %d managed by BatchRelease (%v)",
gvk.Kind, workloadNamespacedName.Namespace, workloadNamespacedName.Name, oldAccessor.Metadata.Generation, newAccessor.Metadata.Generation, brNsn)
q.Add(reconcile.Request{NamespacedName: brNsn})
}
}
}
func (w workloadEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
w.handleWorkload(q, evt.Object, DeleteEventAction)
}
func (w workloadEventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
}
func (w *workloadEventHandler) handleWorkload(q workqueue.RateLimitingInterface,
obj client.Object, action EventAction) {
var controlInfo string
var gvk schema.GroupVersionKind
switch obj.(type) {
case *kruiseappsv1alpha1.CloneSet:
gvk = controllerKruiseKindCS
controlInfo = obj.(*kruiseappsv1alpha1.CloneSet).Annotations[util.BatchReleaseControlAnnotation]
case *appsv1.Deployment:
gvk = controllerKindDep
controlInfo = obj.(*appsv1.Deployment).Annotations[util.BatchReleaseControlAnnotation]
default:
return
}
workloadNamespacedName := types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
}
brNsn, err := w.getBatchRelease(workloadNamespacedName, gvk, controlInfo)
if err != nil {
klog.Errorf("Unable to get BatchRelease related with %s (%s/%s), err: %v",
gvk.Kind, workloadNamespacedName.Namespace, workloadNamespacedName.Name, err)
return
}
if len(brNsn.Name) != 0 {
klog.V(5).Infof("Something related %s %s (%s/%s) happen and will reconcile BatchRelease (%v)",
action, gvk.Kind, workloadNamespacedName.Namespace, workloadNamespacedName.Namespace, brNsn)
q.Add(reconcile.Request{NamespacedName: brNsn})
}
}
func (w *workloadEventHandler) getBatchRelease(workloadNamespaceName types.NamespacedName, gvk schema.GroupVersionKind, controlInfo string) (nsn types.NamespacedName, err error) {
if len(controlInfo) > 0 {
br := &metav1.OwnerReference{}
err = json.Unmarshal([]byte(controlInfo), br)
if err != nil {
klog.Errorf("Failed to unmarshal controller info annotations for %v(%v)", gvk, workloadNamespaceName)
}
if br.APIVersion == v1alpha1.GroupVersion.String() && br.Kind == "BatchRelease" {
klog.V(3).Infof("%s (%v) is managed by BatchRelease (%s), append queue and will reconcile BatchRelease", gvk.Kind, workloadNamespaceName, br.Name)
nsn = types.NamespacedName{Namespace: workloadNamespaceName.Namespace, Name: br.Name}
return
}
}
brList := &v1alpha1.BatchReleaseList{}
listOptions := &client.ListOptions{Namespace: workloadNamespaceName.Namespace}
if err = w.List(context.TODO(), brList, listOptions); err != nil {
klog.Errorf("List BatchRelease failed: %s", err.Error())
return
}
for i := range brList.Items {
br := &brList.Items[i]
targetRef := br.Spec.TargetRef
targetGV, err := schema.ParseGroupVersion(targetRef.WorkloadRef.APIVersion)
if err != nil {
klog.Errorf("Failed to parse targetRef's group version: %s for BatchRelease(%v)", targetRef.WorkloadRef.APIVersion, client.ObjectKeyFromObject(br))
continue
}
if targetRef.WorkloadRef.Kind == gvk.Kind && targetGV.Group == gvk.Group && targetRef.WorkloadRef.Name == workloadNamespaceName.Name {
nsn = client.ObjectKeyFromObject(br)
}
}
return
}
func observeGenerationChanged(newOne, oldOne *workloads.WorkloadInfo) bool {
return newOne.Metadata.Generation != oldOne.Metadata.Generation
}
func observeLatestGeneration(newOne, oldOne *workloads.WorkloadInfo) bool {
oldNot := oldOne.Metadata.Generation != oldOne.Status.ObservedGeneration
newDid := newOne.Metadata.Generation == newOne.Status.ObservedGeneration
return oldNot && newDid
}
func observeScaleEventDone(newOne, oldOne *workloads.WorkloadInfo) bool {
_, controlled := newOne.Metadata.Annotations[util.BatchReleaseControlAnnotation]
if !controlled {
return false
}
oldScaling := *oldOne.Replicas != *newOne.Replicas ||
*oldOne.Replicas != oldOne.Status.Replicas
newDone := newOne.Metadata.Generation == newOne.Status.ObservedGeneration &&
*newOne.Replicas == newOne.Status.Replicas
return oldScaling && newDone
}
func observeReplicasChanged(newOne, oldOne *workloads.WorkloadInfo) bool {
_, controlled := newOne.Metadata.Annotations[util.BatchReleaseControlAnnotation]
if !controlled {
return false
}
return *oldOne.Replicas != *newOne.Replicas ||
oldOne.Status.Replicas != newOne.Status.Replicas ||
oldOne.Status.ReadyReplicas != newOne.Status.ReadyReplicas ||
oldOne.Status.UpdatedReplicas != newOne.Status.UpdatedReplicas ||
oldOne.Status.UpdatedReadyReplicas != newOne.Status.UpdatedReadyReplicas
}

View File

@ -0,0 +1,254 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package batchrelease
import (
"encoding/json"
"fmt"
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/util"
apps "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/event"
)
func TestEventHandler_Update(t *testing.T) {
RegisterFailHandler(Fail)
cases := []struct {
Name string
GetOldWorkload func() client.Object
GetNewWorkload func() client.Object
ExpectedQueueLen int
}{
{
Name: "Deployment Batch NotReady -> Ready",
GetOldWorkload: func() client.Object {
return getCanaryWithStage(stableDeploy, "v2", 0, false)
},
GetNewWorkload: func() client.Object {
return getCanaryWithStage(stableDeploy, "v2", 0, true)
},
ExpectedQueueLen: 1,
},
{
Name: "Deployment Generation 1 -> 2",
GetOldWorkload: func() client.Object {
oldObject := getStableWithReady(stableDeploy, "v2")
oldObject.SetGeneration(1)
return oldObject
},
GetNewWorkload: func() client.Object {
newObject := getStableWithReady(stableDeploy, "v2")
newObject.SetGeneration(2)
return newObject
},
ExpectedQueueLen: 1,
},
{
Name: "Deployment watched the latest generation",
GetOldWorkload: func() client.Object {
oldObject := getStableWithReady(stableDeploy, "v2").(*apps.Deployment)
oldObject.SetGeneration(2)
oldObject.Status.ObservedGeneration = 1
return oldObject
},
GetNewWorkload: func() client.Object {
newObject := getStableWithReady(stableDeploy, "v2").(*apps.Deployment)
newObject.SetGeneration(2)
newObject.Status.ObservedGeneration = 2
return newObject
},
ExpectedQueueLen: 1,
},
{
Name: "Deployment scaling done",
GetOldWorkload: func() client.Object {
oldObject := getStableWithReady(stableDeploy, "v2").(*apps.Deployment)
oldObject.SetGeneration(2)
oldObject.Status.ObservedGeneration = 2
oldObject.Spec.Replicas = pointer.Int32Ptr(1000)
return oldObject
},
GetNewWorkload: func() client.Object {
newObject := getStableWithReady(stableDeploy, "v2").(*apps.Deployment)
newObject.SetGeneration(2)
newObject.Status.ObservedGeneration = 2
newObject.Spec.Replicas = pointer.Int32Ptr(1000)
newObject.Status.Replicas = 1000
return newObject
},
ExpectedQueueLen: 1,
},
{
Name: "Deployment available pod changed",
GetOldWorkload: func() client.Object {
oldObject := getStableWithReady(stableDeploy, "v2").(*apps.Deployment)
oldObject.SetGeneration(2)
oldObject.Status.ObservedGeneration = 2
oldObject.Status.AvailableReplicas = 20
return oldObject
},
GetNewWorkload: func() client.Object {
newObject := getStableWithReady(stableDeploy, "v2").(*apps.Deployment)
newObject.SetGeneration(2)
newObject.Status.ObservedGeneration = 2
newObject.Status.AvailableReplicas = 50
return newObject
},
ExpectedQueueLen: 1,
},
}
for _, cs := range cases {
t.Run(cs.Name, func(t *testing.T) {
oldObject := cs.GetOldWorkload()
newObject := cs.GetNewWorkload()
newSJk := scheme
fmt.Println(newSJk)
cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(releaseDeploy.DeepCopy()).Build()
handler := workloadEventHandler{Reader: cli}
updateQ := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
updateEvt := event.UpdateEvent{
ObjectOld: oldObject,
ObjectNew: newObject,
}
handler.Update(updateEvt, updateQ)
Expect(updateQ.Len()).Should(Equal(cs.ExpectedQueueLen))
})
}
}
func TestEventHandler_Create(t *testing.T) {
RegisterFailHandler(Fail)
cases := []struct {
Name string
GetNewWorkload func() client.Object
ExpectedQueueLen int
}{
{
Name: "Deployment with controller info annotation",
GetNewWorkload: func() client.Object {
return getCanaryWithStage(stableDeploy, "v2", 0, true)
},
ExpectedQueueLen: 1,
},
{
Name: "Deployment without controller info annotation",
GetNewWorkload: func() client.Object {
object := getStableWithReady(stableDeploy, "v2").(*apps.Deployment)
object.Annotations = nil
return object
},
ExpectedQueueLen: 1,
},
{
Name: "Deployment with wrong controller info annotation",
GetNewWorkload: func() client.Object {
object := getStableWithReady(stableDeploy, "v2").(*apps.Deployment)
controlInfo, _ := json.Marshal(&metav1.OwnerReference{
APIVersion: v1alpha1.GroupVersion.String(),
Kind: "Rollout",
Name: "whatever",
})
object.SetName("another")
object.Annotations[util.BatchReleaseControlAnnotation] = string(controlInfo)
return object
},
ExpectedQueueLen: 0,
},
}
for _, cs := range cases {
t.Run(cs.Name, func(t *testing.T) {
newObject := cs.GetNewWorkload()
cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(releaseDeploy.DeepCopy()).Build()
handler := workloadEventHandler{Reader: cli}
createQ := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
createEvt := event.CreateEvent{
Object: newObject,
}
handler.Create(createEvt, createQ)
Expect(createQ.Len()).Should(Equal(cs.ExpectedQueueLen))
})
}
}
func TestEventHandler_Delete(t *testing.T) {
RegisterFailHandler(Fail)
cases := []struct {
Name string
GetNewWorkload func() client.Object
ExpectedQueueLen int
}{
{
Name: "Deployment with controller info annotation",
GetNewWorkload: func() client.Object {
return getCanaryWithStage(stableDeploy, "v2", 0, true)
},
ExpectedQueueLen: 1,
},
{
Name: "Deployment without controller info annotation",
GetNewWorkload: func() client.Object {
object := getStableWithReady(stableDeploy, "v2").(*apps.Deployment)
object.Annotations = nil
return object
},
ExpectedQueueLen: 1,
},
{
Name: "Deployment with wrong controller info annotation",
GetNewWorkload: func() client.Object {
object := getStableWithReady(stableDeploy, "v2").(*apps.Deployment)
controlInfo, _ := json.Marshal(&metav1.OwnerReference{
APIVersion: v1alpha1.GroupVersion.String(),
Kind: "Rollout",
Name: "whatever",
})
object.SetName("another")
object.Annotations[util.BatchReleaseControlAnnotation] = string(controlInfo)
return object
},
ExpectedQueueLen: 0,
},
}
for _, cs := range cases {
t.Run(cs.Name, func(t *testing.T) {
newObject := cs.GetNewWorkload()
cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(releaseDeploy.DeepCopy()).Build()
handler := workloadEventHandler{Reader: cli}
deleteQ := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
deleteEvt := event.DeleteEvent{
Object: newObject,
}
handler.Delete(deleteEvt, deleteQ)
Expect(deleteQ.Len()).Should(Equal(cs.ExpectedQueueLen))
})
}
}

View File

@ -0,0 +1,312 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package batchrelease
import (
"fmt"
"reflect"
"time"
kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
"github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/controllers/batchrelease/workloads"
"github.com/openkruise/rollouts/pkg/util"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
const (
DefaultDuration = (50 * 1000) * time.Microsecond
)
// Executor is the controller that controls the release plan resource
type Executor struct {
client client.Client
recorder record.EventRecorder
release *v1alpha1.BatchRelease
releasePlan *v1alpha1.ReleasePlan
releaseStatus *v1alpha1.BatchReleaseStatus
releaseKey types.NamespacedName
workloadKey types.NamespacedName
}
// NewReleasePlanExecutor creates a RolloutPlanController
func NewReleasePlanExecutor(client client.Client, recorder record.EventRecorder) *Executor {
return &Executor{
client: client,
recorder: recorder,
}
}
func (r *Executor) SetReleaseInfo(release *v1alpha1.BatchRelease) {
r.release = release
r.releaseStatus = release.Status.DeepCopy()
r.releasePlan = release.Spec.ReleasePlan.DeepCopy()
initializeStatusIfNeeds(r.releaseStatus)
r.releaseKey = client.ObjectKeyFromObject(release)
if release.Spec.TargetRef.WorkloadRef != nil {
r.workloadKey = types.NamespacedName{
Namespace: release.Namespace,
Name: release.Spec.TargetRef.WorkloadRef.Name,
}
}
}
// Do execute the release plan
func (r *Executor) Do() (reconcile.Result, *v1alpha1.BatchReleaseStatus) {
klog.InfoS("Starting one round of reconciling release plan",
"BatchRelease", client.ObjectKeyFromObject(r.release),
"phase", r.releaseStatus.Phase,
"current-batch", r.releaseStatus.CanaryStatus.CurrentBatch,
"current-batch-state", r.releaseStatus.CanaryStatus.CurrentBatchState)
workloadController, err := r.GetWorkloadController()
if err != nil || workloadController == nil {
return reconcile.Result{}, r.releaseStatus
}
shouldStopThisRound, result := r.checkHealthyBeforeExecution(workloadController)
if shouldStopThisRound {
return result, r.releaseStatus
}
return r.executeBatchReleasePlan(workloadController)
}
func (r *Executor) executeBatchReleasePlan(workloadController workloads.WorkloadController) (reconcile.Result, *v1alpha1.BatchReleaseStatus) {
status := r.releaseStatus
result := reconcile.Result{}
switch status.Phase {
case v1alpha1.RolloutPhaseInitial:
// if this batchRelease was created but workload doest not exist,
// should keep this phase and do nothing util workload is created.
case v1alpha1.RolloutPhaseHealthy:
klog.V(3).Infof("BatchRelease(%v) State Machine into %s state", r.releaseKey, v1alpha1.RolloutPhaseHealthy)
// verify whether the workload is ready to execute the release plan in this state.
needed, err := workloadController.IfNeedToProgress()
switch {
case err != nil:
setCondition(status, "VerifyWorkloadError", err.Error(), v1.ConditionFalse)
case needed:
setCondition(status, "VerifyWorkloadSuccessfully", "", v1.ConditionTrue)
status.Phase = v1alpha1.RolloutPhasePreparing
fallthrough
default:
result = reconcile.Result{RequeueAfter: DefaultDuration}
}
case v1alpha1.RolloutPhasePreparing:
klog.V(3).Infof("BatchRelease(%v) State Machine into %s state", r.releaseKey, v1alpha1.RolloutPhasePreparing)
// prepare and initialize something before progressing in this state.
initialized, err := workloadController.PrepareBeforeProgress()
switch {
case err != nil:
setCondition(status, "InitializeError", err.Error(), v1.ConditionFalse)
case initialized:
setCondition(status, "InitializeSuccessfully", "", v1.ConditionTrue)
status.Phase = v1alpha1.RolloutPhaseProgressing
fallthrough
default:
result = reconcile.Result{RequeueAfter: DefaultDuration}
}
case v1alpha1.RolloutPhaseProgressing:
klog.V(3).Infof("BatchRelease(%v) State Machine into %s state", r.releaseKey, v1alpha1.RolloutPhaseProgressing)
// progress the release plan in this state.
var progressDone bool
if progressDone, result = r.progressBatches(workloadController); progressDone {
setCondition(status, "ProgressSuccessfully", "", v1.ConditionTrue)
status.Phase = v1alpha1.RolloutPhaseFinalizing
}
case v1alpha1.RolloutPhaseFinalizing:
klog.V(3).Infof("BatchRelease(%v) State Machine into %s state", r.releaseKey, v1alpha1.RolloutPhaseFinalizing)
// finalize canary the resources when progressing done.
// Do not clean the canary resources and set 'paused=false' if it is controlled by rollout,
// because rollout controller should route the traffic firstly.
clean := false
pause := pointer.BoolPtr(false)
if util.IsControlledByRollout(r.release) {
pause = nil
clean = false
}
if succeed := workloadController.FinalizeProgress(pause, clean); succeed {
cleanupConditions(status)
status.Phase = v1alpha1.RolloutPhaseCompleted
} else {
result = reconcile.Result{RequeueAfter: DefaultDuration}
}
case v1alpha1.RolloutPhaseAbort:
klog.V(3).Infof("BatchRelease(%v) State Machine into %s state", r.releaseKey, v1alpha1.RolloutPhaseAbort)
// Abort the release plan.
// do not clean the canary resources if it is controlled by rollout,
// because rollout controller should route the traffic firstly.
clean := true
pause := pointer.BoolPtr(false)
if util.IsControlledByRollout(r.release) {
pause = nil
clean = false
}
if succeed := workloadController.FinalizeProgress(pause, clean); succeed {
cleanupConditions(status)
status.Phase = v1alpha1.RolloutPhaseCancelled
} else {
result = reconcile.Result{RequeueAfter: DefaultDuration}
}
case v1alpha1.RolloutPhaseTerminating:
klog.V(3).Infof("BatchRelease(%v) State Machine into %s state", r.releaseKey, v1alpha1.RolloutPhaseTerminating)
if succeed := workloadController.FinalizeProgress(nil, true); succeed {
if r.release.DeletionTimestamp != nil {
setCondition(status, v1alpha1.TerminatingReasonInTerminating, "Release plan was cancelled or deleted", v1.ConditionTrue)
} else {
status.Phase = v1alpha1.RolloutPhaseCancelled
}
} else {
result = reconcile.Result{RequeueAfter: DefaultDuration}
}
case v1alpha1.RolloutPhaseCompleted:
klog.V(3).Infof("BatchRelease(%v) State Machine into %s state", r.releaseKey, v1alpha1.RolloutPhaseCompleted)
// this state indicates that the plan is executed successfully, should do nothing in this state.
case v1alpha1.RolloutPhaseCancelled:
klog.V(3).Infof("BatchRelease(%v) State Machine into %s state", r.releaseKey, v1alpha1.RolloutPhaseCancelled)
// this state indicates that the plan is cancelled successfully, should do nothing in this state.
default:
klog.V(3).Infof("BatchRelease(%v) State Machine into %s state", r.releaseKey, "Unknown")
panic(fmt.Sprintf("illegal release status %+v", status))
}
return result, status
}
// reconcile logic when we are in the middle of release, we have to go through finalizing state before succeed or fail
func (r *Executor) progressBatches(workloadController workloads.WorkloadController) (bool, reconcile.Result) {
progressDone := false
result := reconcile.Result{}
switch r.releaseStatus.CanaryStatus.CurrentBatchState {
case "", v1alpha1.InitializeBatchState:
klog.V(3).Infof("BatchRelease(%v) Batch State Machine into %s state", r.releaseKey, v1alpha1.InitializeBatchState)
// prepare something before do canary to modify workload, such as calculating suitable batch index.
r.releaseStatus.CanaryStatus.CurrentBatchState = v1alpha1.DoCanaryBatchState
fallthrough
case v1alpha1.DoCanaryBatchState:
klog.V(3).Infof("BatchRelease(%v) Batch State Machine into %s state", r.releaseKey, v1alpha1.DoCanaryBatchState)
// modify workload replicas/partition based on release plan in this state.
upgradeDone, err := workloadController.ProgressOneBatchReplicas()
switch {
case err != nil:
setCondition(r.releaseStatus, "DoCanaryError", err.Error(), v1.ConditionFalse)
case upgradeDone:
r.releaseStatus.CanaryStatus.CurrentBatchState = v1alpha1.VerifyBatchState
fallthrough
default:
result = reconcile.Result{RequeueAfter: DefaultDuration}
}
case v1alpha1.VerifyBatchState:
klog.V(3).Infof("BatchRelease(%v) Batch State Machine into %s state", r.releaseKey, v1alpha1.VerifyBatchState)
// TODO: metrics analysis
// replicas/partition has been modified, should wait pod ready in this state.
verified, err := workloadController.CheckOneBatchReplicas()
switch {
case err != nil:
setCondition(r.releaseStatus, "VerifyBatchReadyError", err.Error(), v1.ConditionFalse)
case verified:
result = reconcile.Result{RequeueAfter: DefaultDuration}
r.releaseStatus.CanaryStatus.BatchReadyTime = metav1.Now()
r.releaseStatus.CanaryStatus.CurrentBatchState = v1alpha1.ReadyBatchState
default:
r.releaseStatus.CanaryStatus.CurrentBatchState = v1alpha1.InitializeBatchState
}
case v1alpha1.ReadyBatchState:
klog.V(3).Infof("BatchRelease(%v) Batch State Machine into %s state", r.releaseKey, v1alpha1.ReadyBatchState)
// expected pods in the batch are upgraded and their state are ready
// wait to move to the next batch if there are any
progressDone = r.moveToNextBatch()
result = reconcile.Result{RequeueAfter: DefaultDuration}
default:
klog.V(3).Infof("ReleasePlan(%v) Batch State Machine into %s state", "Unknown")
panic(fmt.Sprintf("illegal status %+v", r.releaseStatus))
}
return progressDone, result
}
// GetWorkloadController pick the right workload controller to work on the workload
func (r *Executor) GetWorkloadController() (workloads.WorkloadController, error) {
targetRef := r.release.Spec.TargetRef.WorkloadRef
if targetRef == nil {
return nil, nil
}
targetKey := types.NamespacedName{
Namespace: r.release.Namespace,
Name: targetRef.Name,
}
switch targetRef.APIVersion {
case kruiseappsv1alpha1.GroupVersion.String():
if targetRef.Kind == reflect.TypeOf(kruiseappsv1alpha1.CloneSet{}).Name() {
klog.InfoS("using cloneset batch release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace)
return workloads.NewCloneSetRolloutController(r.client, r.recorder, r.release, r.releasePlan, r.releaseStatus, targetKey), nil
}
case apps.SchemeGroupVersion.String():
if targetRef.Kind == reflect.TypeOf(apps.Deployment{}).Name() {
klog.InfoS("using deployment batch release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace)
return workloads.NewDeploymentRolloutController(r.client, r.recorder, r.release, r.releasePlan, r.releaseStatus, targetKey), nil
}
}
message := fmt.Sprintf("the workload `%v.%v/%v` is not supported", targetRef.APIVersion, targetRef.Kind, targetRef.Name)
r.recorder.Event(r.release, v1.EventTypeWarning, "UnsupportedWorkload", message)
return nil, fmt.Errorf(message)
}
func (r *Executor) moveToNextBatch() bool {
currentBatch := int(r.releaseStatus.CanaryStatus.CurrentBatch)
if currentBatch >= len(r.releasePlan.Batches)-1 {
klog.V(3).Infof("BatchRelease(%v) finished all batch, release current batch: %v", r.releasePlan, r.releaseStatus.CanaryStatus.CurrentBatch)
return true
} else {
if r.releasePlan.BatchPartition == nil || *r.releasePlan.BatchPartition > r.releaseStatus.CanaryStatus.CurrentBatch {
r.releaseStatus.CanaryStatus.CurrentBatch++
}
r.releaseStatus.CanaryStatus.CurrentBatchState = v1alpha1.InitializeBatchState
klog.V(3).Infof("BatchRelease(%v) finished one batch, release current batch: %v", r.releasePlan, r.releaseStatus.CanaryStatus.CurrentBatch)
return false
}
}

View File

@ -0,0 +1,231 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package batchrelease
import (
"time"
"github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/controllers/batchrelease/workloads"
"github.com/openkruise/rollouts/pkg/util"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
const (
Keep = "Keep"
Abort = "Abort"
Start = "Start"
Restart = "Restart"
Finalize = "Finalize"
Terminating = "Terminating"
Recalculate = "Recalculate"
)
func (r *Executor) checkHealthyBeforeExecution(controller workloads.WorkloadController) (needStopThisRound bool, result reconcile.Result) {
var reason string
var message string
var action string
var needRetry bool
// sync the workload info and watch the workload change event
workloadEvent, workloadInfo, err := controller.SyncWorkloadInfo()
// Note: must keep the order of the following cases
switch {
case r.releasePlanTerminating():
reason = "PlanTerminating"
message = "release plan is terminating, terminate the release plan"
action = Terminating
case r.workloadHasGone(err):
reason = "WorkloadGone"
message = "target workload has gone, cancel the release plan"
action = Terminating
case client.IgnoreNotFound(err) != nil:
reason = "GetWorkloadError"
message = err.Error()
needRetry = true
action = Keep
case workloadEvent == workloads.WorkloadReplicasChanged:
reason = "ReplicasChanged"
message = "workload is scaling, recalculate the canary batch"
action = Recalculate
case r.releasePlanPaused():
reason = "PlanPaused"
message = "release plan is paused, no need to reconcile"
action = Keep
case r.releasePlanUnhealthy():
reason = "PlanStatusUnhealthy"
message = "release plan status is unhealthy, try to restart the release plan"
action = Restart
case r.releasePlanChanged():
reason = "PlanChanged"
message = "release plan was changed, try to recalculate the canary status"
action = Recalculate
case r.locatedWorkloadAndStart(err):
action = Start
case workloadEvent == workloads.WorkloadRollback:
reason = "StableOrRollback"
message = "workload is table or rolling back, abort the release plan"
action = Abort
case workloadEvent == workloads.WorkloadPodTemplateChanged:
reason = "TargetRevisionChanged"
// Rollout controller needs route traffic firstly
if !util.IsControlledByRollout(r.release) {
message = "workload revision was changed, try to restart the release plan"
action = Restart
} else {
message = "workload revision was changed, stop the release plan"
action = Finalize
}
case workloadEvent == workloads.WorkloadUnHealthy:
reason = "WorkloadUnHealthy"
message = "workload is UnHealthy, should stop the release plan"
action = Keep
case workloadEvent == workloads.WorkloadStillReconciling:
if r.releaseStatus.Phase != v1alpha1.RolloutPhaseCompleted {
reason = "WorkloadNotStable"
message = "workload status is not stable, wait for it to be stable"
}
needRetry = true
action = Keep
default:
// check canary batch pause seconds
if r.releaseStatus.Phase == v1alpha1.RolloutPhaseProgressing &&
r.releaseStatus.CanaryStatus.CurrentBatchState == v1alpha1.ReadyBatchState &&
int(r.releaseStatus.CanaryStatus.CurrentBatch) < len(r.releasePlan.Batches) {
currentTimestamp := time.Now()
currentBatch := r.releasePlan.Batches[r.releaseStatus.CanaryStatus.CurrentBatch]
waitDuration := time.Duration(currentBatch.PauseSeconds) * time.Second
if waitDuration > 0 && r.releaseStatus.CanaryStatus.BatchReadyTime.Time.Add(waitDuration).After(currentTimestamp) {
needStopThisRound = true
restDuration := r.releaseStatus.CanaryStatus.BatchReadyTime.Time.Add(waitDuration).Sub(currentTimestamp)
result = reconcile.Result{RequeueAfter: restDuration}
klog.V(3).Infof("BatchRelease %v/%v paused and will continue to reconcile after %v", r.release.Namespace, r.release.Name, restDuration)
}
}
}
if len(message) > 0 {
setCondition(r.releaseStatus, reason, message, v1.ConditionFalse)
r.recorder.Eventf(r.release, v1.EventTypeWarning, reason, message)
klog.Warningf("Special case occurred in BatchRelease(%v), message: %v", r.releaseKey, message)
}
// refresh workload info
if workloadInfo != nil {
if workloadInfo.Replicas != nil {
r.releaseStatus.ObservedWorkloadReplicas = *workloadInfo.Replicas
}
if workloadInfo.UpdateRevision != nil {
r.releaseStatus.UpdateRevision = *workloadInfo.UpdateRevision
}
if workloadInfo.Status != nil {
r.releaseStatus.CanaryStatus.UpdatedReplicas = workloadInfo.Status.UpdatedReplicas
r.releaseStatus.CanaryStatus.UpdatedReadyReplicas = workloadInfo.Status.UpdatedReadyReplicas
}
planHash := hashReleasePlanBatches(r.releasePlan)
if r.releaseStatus.ObservedReleasePlanHash != planHash {
r.releaseStatus.ObservedReleasePlanHash = planHash
}
}
// if status phase or state changed, should stop and retry.
// this is because we must ensure that the phase and state is
// persistent in ETCD, or will lead to the chaos of state machine.
switch action {
case Keep:
// keep current status, do nothing
needStopThisRound = true
case Start:
needRetry = signalStart(r.releaseStatus)
case Abort:
needRetry = signalAbort(r.releaseStatus)
case Restart:
needRetry = signalRestart(r.releaseStatus)
case Finalize:
needRetry = signalFinalize(r.releaseStatus)
case Recalculate:
needRetry = signalRecalculate(r.releaseStatus)
case Terminating:
needRetry = signalTerminating(r.releaseStatus)
}
// If it needs to retry
if needRetry {
needStopThisRound = true
result = reconcile.Result{RequeueAfter: DefaultDuration}
}
return needStopThisRound, result
}
func (r *Executor) releasePlanTerminating() bool {
return r.isTerminating()
}
func (r *Executor) releasePlanUnhealthy() bool {
return r.isProgressing() && int(r.release.Status.CanaryStatus.CurrentBatch) >= len(r.releasePlan.Batches)
}
func (r *Executor) releasePlanChanged() bool {
return r.isProgressing() && r.releaseStatus.ObservedReleasePlanHash != hashReleasePlanBatches(r.releasePlan)
}
func (r *Executor) locatedWorkloadAndStart(err error) bool {
return err == nil && r.releaseStatus.Phase == v1alpha1.RolloutPhaseInitial
}
func (r *Executor) workloadHasGone(err error) bool {
return !r.isTerminating() && r.releaseStatus.Phase != v1alpha1.RolloutPhaseInitial && errors.IsNotFound(err)
}
func (r *Executor) releasePlanPaused() bool {
partitioned := r.releasePlan.BatchPartition != nil &&
r.releaseStatus.Phase == v1alpha1.RolloutPhaseProgressing &&
r.releaseStatus.CanaryStatus.CurrentBatchState == v1alpha1.ReadyBatchState &&
r.releaseStatus.CanaryStatus.CurrentBatch >= *r.releasePlan.BatchPartition
return !r.isTerminating() && (r.releasePlan.Paused || partitioned)
}
func (r *Executor) isTerminating() bool {
return r.release.DeletionTimestamp != nil ||
r.release.Status.Phase == v1alpha1.RolloutPhaseTerminating ||
(r.release.Spec.Cancelled && r.releaseStatus.Phase != v1alpha1.RolloutPhaseCancelled)
}
func (r *Executor) isProgressing() bool {
return !r.release.Spec.Cancelled &&
r.release.DeletionTimestamp != nil &&
r.releaseStatus.Phase == v1alpha1.RolloutPhaseProgressing
}

View File

@ -0,0 +1,147 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package batchrelease
import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/openkruise/rollouts/api/v1alpha1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func HasTerminatingCondition(status v1alpha1.BatchReleaseStatus) bool {
for i := range status.Conditions {
c := status.Conditions[i]
if c.Status == v1.ConditionTrue && c.Reason == v1alpha1.TerminatingReasonInTerminating {
return true
}
}
return false
}
func hashReleasePlanBatches(releasePlan *v1alpha1.ReleasePlan) string {
by, _ := json.Marshal(releasePlan.Batches)
md5Hash := sha256.Sum256(by)
return hex.EncodeToString(md5Hash[:])
}
func initializeStatusIfNeeds(status *v1alpha1.BatchReleaseStatus) {
if len(status.Phase) == 0 {
resetStatus(status)
}
}
func signalStart(status *v1alpha1.BatchReleaseStatus) bool {
if status.Phase == v1alpha1.RolloutPhaseHealthy {
return false
}
status.Phase = v1alpha1.RolloutPhaseHealthy
return true
}
func signalRestart(status *v1alpha1.BatchReleaseStatus) bool {
if status.Phase == v1alpha1.RolloutPhaseInitial {
return false
}
resetStatus(status)
return true
}
func signalRecalculate(status *v1alpha1.BatchReleaseStatus) bool {
if status.CanaryStatus.CurrentBatchState == v1alpha1.InitializeBatchState {
return false
}
status.CanaryStatus.CurrentBatchState = v1alpha1.InitializeBatchState
return true
}
func signalTerminating(status *v1alpha1.BatchReleaseStatus) bool {
if status.Phase == v1alpha1.RolloutPhaseTerminating {
return false
}
status.Phase = v1alpha1.RolloutPhaseTerminating
return true
}
func signalFinalize(status *v1alpha1.BatchReleaseStatus) bool {
if status.Phase == v1alpha1.RolloutPhaseFinalizing {
return false
}
status.Phase = v1alpha1.RolloutPhaseFinalizing
return true
}
func signalAbort(status *v1alpha1.BatchReleaseStatus) bool {
if status.Phase == v1alpha1.RolloutPhaseAbort {
return false
}
status.Phase = v1alpha1.RolloutPhaseAbort
return true
}
func resetStatus(status *v1alpha1.BatchReleaseStatus) {
status.Phase = v1alpha1.RolloutPhaseInitial
status.StableRevision = ""
status.UpdateRevision = ""
status.ObservedReleasePlanHash = ""
status.ObservedWorkloadReplicas = -1
status.CanaryStatus = v1alpha1.BatchReleaseCanaryStatus{}
}
func setCondition(status *v1alpha1.BatchReleaseStatus, reason, message string, conditionStatusType v1.ConditionStatus) {
if status == nil {
return
}
var suitableCondition *v1alpha1.RolloutCondition
for i := range status.Conditions {
condition := &status.Conditions[i]
if condition.Type == getConditionType(status.Phase) {
suitableCondition = condition
}
}
if suitableCondition == nil {
status.Conditions = append(status.Conditions, v1alpha1.RolloutCondition{
Type: getConditionType(status.Phase),
Status: conditionStatusType,
Reason: reason,
Message: message,
LastUpdateTime: metav1.Now(),
})
} else {
suitableCondition.Reason = reason
suitableCondition.Message = message
suitableCondition.LastUpdateTime = metav1.Now()
if suitableCondition.Status != conditionStatusType {
suitableCondition.LastTransitionTime = metav1.Now()
}
suitableCondition.Status = conditionStatusType
}
}
func cleanupConditions(status *v1alpha1.BatchReleaseStatus) {
status.Conditions = nil
}
func getConditionType(phase v1alpha1.RolloutPhase) v1alpha1.RolloutConditionType {
return v1alpha1.RolloutConditionType(fmt.Sprintf("%sPhaseCompleted", phase))
}

View File

@ -0,0 +1,338 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workloads
import (
"context"
"fmt"
kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
"github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/util"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// CloneSetRolloutController is responsible for handling rollout CloneSet type of workloads
type CloneSetRolloutController struct {
cloneSetController
clone *kruiseappsv1alpha1.CloneSet
}
//TODO: scale during releasing: workload replicas changed -> Finalising CloneSet with Paused=true
// NewCloneSetRolloutController creates a new CloneSet rollout controller
func NewCloneSetRolloutController(cli client.Client, recorder record.EventRecorder, release *v1alpha1.BatchRelease, plan *v1alpha1.ReleasePlan, status *v1alpha1.BatchReleaseStatus, targetNamespacedName types.NamespacedName) *CloneSetRolloutController {
return &CloneSetRolloutController{
cloneSetController: cloneSetController{
workloadController: workloadController{
client: cli,
recorder: recorder,
parentController: release,
releasePlan: plan,
releaseStatus: status,
},
releasePlanKey: client.ObjectKeyFromObject(release),
targetNamespacedName: targetNamespacedName,
},
}
}
// IfNeedToProgress verifies that the workload is ready to execute release plan
func (c *CloneSetRolloutController) IfNeedToProgress() (bool, error) {
var verifyErr error
defer func() {
if verifyErr != nil {
klog.Warningf(verifyErr.Error())
c.recorder.Event(c.parentController, v1.EventTypeWarning, "VerifyFailed", verifyErr.Error())
}
}()
if err := c.fetchCloneSet(); err != nil {
//c.releaseStatus.RolloutRetry(err.Error())
return false, nil
}
// if the workload status is untrustworthy
if c.clone.Status.ObservedGeneration != c.clone.Generation {
klog.Warningf("CloneSet(%v) is still reconciling, wait for it to be done", c.targetNamespacedName)
return false, nil
}
// if the cloneSet has been promoted, no need to go on
if c.clone.Status.UpdatedReplicas == *c.clone.Spec.Replicas {
verifyErr = fmt.Errorf("CloneSet(%v) update revision has been promoted, no need to reconcile", c.targetNamespacedName)
return false, verifyErr
}
// if the cloneSet is not paused and is not under our control
if !c.clone.Spec.UpdateStrategy.Paused && !util.IsControlledBy(c.clone, c.parentController) {
verifyErr = fmt.Errorf("cloneSet(%v) should be paused before execute the release plan", c.targetNamespacedName)
return false, verifyErr
}
klog.V(3).Infof("Verified CloneSet Successfully, Status %+v", c.targetNamespacedName, c.releaseStatus)
c.recorder.Event(c.parentController, v1.EventTypeNormal, "VerifiedSuccessfully", "ReleasePlan and the CloneSet resource are verified")
return true, nil
}
// PrepareBeforeProgress makes sure that the source and target CloneSet is under our control
func (c *CloneSetRolloutController) PrepareBeforeProgress() (bool, error) {
if err := c.fetchCloneSet(); err != nil {
//c.releaseStatus.RolloutRetry(err.Error())
return false, nil
}
// claim the cloneSet is under our control
if _, err := c.claimCloneSet(c.clone); err != nil {
return false, nil
}
// record revisions and replicas info to BatchRelease.Status
c.recordCloneSetRevisionAndReplicas()
c.recorder.Event(c.parentController, v1.EventTypeNormal, "InitializedSuccessfully", "Rollout resource are initialized")
return true, nil
}
// ProgressOneBatchReplicas calculates the number of pods we can upgrade once according to the rollout spec
// and then set the partition accordingly
func (c *CloneSetRolloutController) ProgressOneBatchReplicas() (bool, error) {
if err := c.fetchCloneSet(); err != nil {
return false, nil
}
// if the workload status is untrustworthy
if c.clone.Status.ObservedGeneration != c.clone.Generation {
return false, nil
}
// the number of canary pods should have in current batch
canaryGoal := c.calculateCurrentCanary(c.releaseStatus.ObservedWorkloadReplicas)
// the number of stable pods should have in current batch
stableGoal := c.calculateCurrentStable(c.releaseStatus.ObservedWorkloadReplicas)
// the number of canary pods now we have in current state
currentCanaryReplicas := c.clone.Status.UpdatedReplicas
// workload partition calculated
workloadPartition, _ := intstr.GetValueFromIntOrPercent(c.clone.Spec.UpdateStrategy.Partition,
int(c.releaseStatus.ObservedWorkloadReplicas), true)
// in case of no need to upgrade pods
if currentCanaryReplicas >= canaryGoal && int32(workloadPartition) <= stableGoal {
klog.V(3).InfoS("upgraded one batch, but no need to update partition of cloneset",
"BatchRelease", c.releasePlanKey, "current-batch", c.releaseStatus.CanaryStatus.CurrentBatch,
"canary-goal", canaryGoal, "stable-goal", stableGoal, "canary-replicas", currentCanaryReplicas, "partition", workloadPartition)
return true, nil
}
// upgrade pods
if err := c.patchCloneSetPartition(c.clone, stableGoal); err != nil {
return false, nil
}
klog.V(3).InfoS("upgraded one batch", "BatchRelease", c.releasePlanKey,
"current batch", c.releaseStatus.CanaryStatus.CurrentBatch, "updateRevision size", canaryGoal)
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "SetBatchDone",
"Finished submitting all upgrade quests for batch %d", c.releaseStatus.CanaryStatus.CurrentBatch)
return true, nil
}
// CheckOneBatchReplicas checks to see if the pods are all available according to the rollout plan
func (c *CloneSetRolloutController) CheckOneBatchReplicas() (bool, error) {
if err := c.fetchCloneSet(); err != nil {
return false, nil
}
// if the workload status is untrustworthy
if c.clone.Status.ObservedGeneration != c.clone.Generation {
return false, nil
}
// the number of canary pods now we have in current state
canaryReplicas := c.clone.Status.UpdatedReplicas
// the number of stable pods now we have in current state
stableReplicas := c.clone.Status.Replicas - canaryReplicas
// the number of canary pods that have been ready in current state
canaryReadyReplicas := c.clone.Status.UpdatedReadyReplicas
// the number of canary pods should have in current batch
canaryGoal := c.calculateCurrentCanary(c.releaseStatus.ObservedWorkloadReplicas)
// the number of stable pods should have in current batch
stableGoal := c.calculateCurrentStable(c.releaseStatus.ObservedWorkloadReplicas)
// the number of max unavailable canary pods allowed by this workload
maxUnavailable := 0
if c.clone.Spec.UpdateStrategy.MaxUnavailable != nil {
maxUnavailable, _ = intstr.GetValueFromIntOrPercent(c.clone.Spec.UpdateStrategy.MaxUnavailable, int(c.releaseStatus.ObservedWorkloadReplicas), true)
}
klog.InfoS("checking the batch releasing progress", "BatchRelease", c.releasePlanKey,
"current-batch", c.releaseStatus.CanaryStatus.CurrentBatch, "canary-ready-replicas", canaryReadyReplicas,
"stable-replicas", stableReplicas, "max-unavailable", maxUnavailable, "canary-goal", canaryGoal, "stable-goal", stableGoal)
if c.clone.Status.Replicas != c.releaseStatus.ObservedWorkloadReplicas {
err := fmt.Errorf("CloneSet(%v) replicas don't match ObservedWorkloadReplicas, workload status replicas: %v, observed workload replicas: %v",
c.targetNamespacedName, c.clone.Status.Replicas, c.releaseStatus.ObservedWorkloadReplicas)
klog.ErrorS(err, "the batch is not valid", "current-batch", c.releaseStatus.CanaryStatus.CurrentBatch)
return false, nil
}
currentBatchNotReadyYet := func() bool {
return canaryGoal > canaryReplicas || stableGoal < stableReplicas || canaryReadyReplicas+int32(maxUnavailable) < canaryGoal || (canaryGoal > 0 && canaryReadyReplicas == 0)
}
if currentBatchNotReadyYet() {
klog.InfoS("the batch is not ready yet", "CloneSet", c.targetNamespacedName,
"ReleasePlan", c.releasePlanKey, "current-batch", c.releaseStatus.CanaryStatus.CurrentBatch)
return false, nil
}
klog.Infof("All pods of CloneSet(%v) in current batch are ready, BatchRelease(%v), current-batch=%v",
c.targetNamespacedName, c.releasePlanKey, c.releaseStatus.CanaryStatus.CurrentBatch)
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "BatchAvailable", "Batch %d is available", c.releaseStatus.CanaryStatus.CurrentBatch)
return true, nil
}
// FinalizeOneBatch isn't needed in this mode.
func (c *CloneSetRolloutController) FinalizeOneBatch() (bool, error) {
return true, nil
}
// FinalizeProgress makes sure the CloneSet is all upgraded
func (c *CloneSetRolloutController) FinalizeProgress(pause *bool, cleanup bool) bool {
if err := c.fetchCloneSet(); client.IgnoreNotFound(err) != nil {
return false
}
//
if _, err := c.releaseCloneSet(c.clone, pause, cleanup); err != nil {
return false
}
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "FinalizedSuccessfully", "Rollout resource are finalized, "+
"pause=%v, cleanup=%v", pause, cleanup)
return true
}
// SyncWorkloadInfo return change type if workload was changed during release
func (c *CloneSetRolloutController) SyncWorkloadInfo() (WorkloadChangeEventType, *WorkloadInfo, error) {
if c.parentController.Spec.Cancelled ||
c.parentController.DeletionTimestamp != nil ||
c.releaseStatus.Phase == v1alpha1.RolloutPhaseAbort ||
c.releaseStatus.Phase == v1alpha1.RolloutPhaseFinalizing ||
c.releaseStatus.Phase == v1alpha1.RolloutPhaseTerminating {
return IgnoreWorkloadEvent, nil, nil
}
workloadInfo := &WorkloadInfo{}
err := c.fetchCloneSet()
if client.IgnoreNotFound(err) != nil {
return "", nil, err
} else if apierrors.IsNotFound(err) {
workloadInfo.Status = &WorkloadStatus{}
return "", workloadInfo, err
}
if c.clone.Status.ObservedGeneration != c.clone.Generation {
klog.Warningf("CloneSet(%v) is still reconciling, waiting for it to complete, generation: %v, observed: %v",
c.targetNamespacedName, c.clone.Generation, c.clone.Status.ObservedGeneration)
return WorkloadStillReconciling, nil, nil
}
workloadInfo.Status = &WorkloadStatus{
UpdatedReplicas: c.clone.Status.UpdatedReplicas,
UpdatedReadyReplicas: c.clone.Status.UpdatedReadyReplicas,
}
if !c.clone.Spec.UpdateStrategy.Paused && c.clone.Status.UpdatedReplicas == c.clone.Status.Replicas {
return IgnoreWorkloadEvent, workloadInfo, nil
}
switch c.releaseStatus.Phase {
default:
if c.clone.Status.CurrentRevision == c.clone.Status.UpdateRevision &&
c.parentController.Status.UpdateRevision != c.clone.Status.UpdateRevision {
workloadInfo.UpdateRevision = &c.clone.Status.UpdateRevision
klog.Warningf("CloneSet(%v) is stable or is rolling back", c.targetNamespacedName)
return WorkloadRollback, workloadInfo, nil
}
if *c.clone.Spec.Replicas != c.releaseStatus.ObservedWorkloadReplicas {
workloadInfo.Replicas = c.clone.Spec.Replicas
klog.Warningf("CloneSet(%v) replicas changed during releasing, should pause and wait for it to complete, replicas from: %v -> %v",
c.targetNamespacedName, c.releaseStatus.ObservedWorkloadReplicas, *c.clone.Spec.Replicas)
return WorkloadReplicasChanged, workloadInfo, nil
}
fallthrough
case v1alpha1.RolloutPhaseCompleted, v1alpha1.RolloutPhaseCancelled:
if c.clone.Status.UpdateRevision != c.releaseStatus.UpdateRevision {
workloadInfo.UpdateRevision = &c.clone.Status.UpdateRevision
klog.Warningf("CloneSet(%v) updateRevision changed during releasing, should try to restart the release plan, updateRevision from: %v -> %v",
c.targetNamespacedName, c.releaseStatus.UpdateRevision, c.clone.Status.UpdateRevision)
return WorkloadPodTemplateChanged, workloadInfo, nil
}
case v1alpha1.RolloutPhaseHealthy, v1alpha1.RolloutPhaseInitial, v1alpha1.RolloutPhasePreparing:
return IgnoreWorkloadEvent, workloadInfo, nil
}
return IgnoreWorkloadEvent, workloadInfo, nil
}
/* ----------------------------------
The functions below are helper functions
------------------------------------- */
// fetch cloneSet to c.clone
func (c *CloneSetRolloutController) fetchCloneSet() error {
clone := &kruiseappsv1alpha1.CloneSet{}
if err := c.client.Get(context.TODO(), c.targetNamespacedName, clone); err != nil {
if !apierrors.IsNotFound(err) {
c.recorder.Event(c.parentController, v1.EventTypeWarning, "GetCloneSetFailed", err.Error())
}
return err
}
c.clone = clone
return nil
}
// the canary workload size for the current batch
func (c *CloneSetRolloutController) calculateCurrentCanary(totalSize int32) int32 {
targetSize := int32(util.CalculateNewBatchTarget(c.releasePlan, int(totalSize), int(c.releaseStatus.CanaryStatus.CurrentBatch)))
klog.InfoS("Calculated the number of pods in the target CloneSet after current batch",
"CloneSet", c.targetNamespacedName, "BatchRelease", c.releasePlanKey,
"current batch", c.releaseStatus.CanaryStatus.CurrentBatch, "workload updateRevision size", targetSize)
return targetSize
}
// the source workload size for the current batch
func (c *CloneSetRolloutController) calculateCurrentStable(totalSize int32) int32 {
sourceSize := totalSize - c.calculateCurrentCanary(totalSize)
klog.InfoS("Calculated the number of pods in the source CloneSet after current batch",
"CloneSet", c.targetNamespacedName, "BatchRelease", c.releasePlanKey,
"current batch", c.releaseStatus.CanaryStatus.CurrentBatch, "workload stableRevision size", sourceSize)
return sourceSize
}
func (c *CloneSetRolloutController) recordCloneSetRevisionAndReplicas() {
c.releaseStatus.ObservedWorkloadReplicas = *c.clone.Spec.Replicas
c.releaseStatus.StableRevision = c.clone.Status.CurrentRevision
c.releaseStatus.UpdateRevision = c.clone.Status.UpdateRevision
}

View File

@ -0,0 +1,191 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workloads
import (
"context"
"encoding/json"
"fmt"
kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
"github.com/openkruise/rollouts/pkg/util"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// cloneSetController is the place to hold fields needed for handle CloneSet type of workloads
type cloneSetController struct {
workloadController
releasePlanKey types.NamespacedName
targetNamespacedName types.NamespacedName
}
// add the parent controller to the owner of the deployment, unpause it and initialize the size
// before kicking start the update and start from every pod in the old version
func (c *cloneSetController) claimCloneSet(clone *kruiseappsv1alpha1.CloneSet) (bool, error) {
var controlled bool
if controlInfo, ok := clone.Annotations[util.BatchReleaseControlAnnotation]; ok && controlInfo != "" {
ref := &metav1.OwnerReference{}
err := json.Unmarshal([]byte(controlInfo), ref)
if err == nil && ref.UID == c.parentController.UID {
controlled = true
klog.V(3).Infof("CloneSet(%v) has been controlled by this BatchRelease(%v), no need to claim again",
c.targetNamespacedName, c.releasePlanKey)
} else {
klog.Errorf("Failed to parse controller info from CloneSet(%v) annotation, error: %v, controller info: %+v",
c.targetNamespacedName, err, *ref)
}
}
patch := map[string]interface{}{}
switch {
// if the cloneSet has been claimed by this parentController
case controlled:
// make sure paused=false
if clone.Spec.UpdateStrategy.Paused {
patch = map[string]interface{}{
"spec": map[string]interface{}{
"updateStrategy": map[string]interface{}{
"paused": false,
},
},
}
}
default:
patch = map[string]interface{}{
"spec": map[string]interface{}{
"updateStrategy": map[string]interface{}{
"partition": &intstr.IntOrString{Type: intstr.String, StrVal: "100%"},
"paused": false,
},
},
}
controlInfo := metav1.NewControllerRef(c.parentController, c.parentController.GetObjectKind().GroupVersionKind())
controlByte, _ := json.Marshal(controlInfo)
patch["metadata"] = map[string]interface{}{
"annotations": map[string]string{
util.BatchReleaseControlAnnotation: string(controlByte),
},
}
if clone.Spec.UpdateStrategy.Partition != nil {
partitionByte, _ := json.Marshal(clone.Spec.UpdateStrategy.Partition)
metadata := patch["metadata"].(map[string]interface{})
annotations := metadata["annotations"].(map[string]string)
annotations[util.StashCloneSetPartition] = string(partitionByte)
annotations[util.BatchReleaseControlAnnotation] = string(controlByte)
}
}
if len(patch) > 0 {
patchByte, _ := json.Marshal(patch)
if err := c.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, patchByte)); err != nil {
c.recorder.Eventf(c.parentController, v1.EventTypeWarning, "ClaimCloneSetFailed", err.Error())
return false, err
}
}
klog.V(3).Infof("Claim CloneSet(%v) Successfully", c.targetNamespacedName)
return true, nil
}
// remove the parent controller from the deployment's owner list
func (c *cloneSetController) releaseCloneSet(clone *kruiseappsv1alpha1.CloneSet, pause *bool, cleanup bool) (bool, error) {
if clone == nil {
return true, nil
}
var found bool
var refByte string
if refByte, found = clone.Annotations[util.BatchReleaseControlAnnotation]; found && refByte != "" {
ref := &metav1.OwnerReference{}
if err := json.Unmarshal([]byte(refByte), ref); err != nil {
found = false
klog.Errorf("failed to decode controller annotations of BatchRelease")
} else if ref.UID != c.parentController.UID {
found = false
}
}
if !found {
klog.V(3).Infof("the CloneSet(%v) is already released", c.targetNamespacedName)
return true, nil
}
var patchByte string
switch {
case cleanup:
patchSpec := map[string]interface{}{
"updateStrategy": map[string]interface{}{
"partition": nil,
},
}
if len(clone.Annotations[util.StashCloneSetPartition]) > 0 {
restoredPartition := &intstr.IntOrString{}
if err := json.Unmarshal([]byte(clone.Annotations[util.StashCloneSetPartition]), restoredPartition); err == nil {
updateStrategy := patchSpec["updateStrategy"].(map[string]interface{})
updateStrategy["partition"] = restoredPartition
}
}
patchSpecByte, _ := json.Marshal(patchSpec)
patchByte = fmt.Sprintf(`{"metadata":{"annotations":{"%s":null, "%s":null}},"spec":%s}`,
util.BatchReleaseControlAnnotation, util.StashCloneSetPartition, string(patchSpecByte))
default:
patchByte = fmt.Sprintf(`{"metadata":{"annotations":{"%s":null}}}`, util.BatchReleaseControlAnnotation)
}
if err := c.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, []byte(patchByte))); err != nil {
c.recorder.Eventf(c.parentController, v1.EventTypeWarning, "ReleaseCloneSetFailed", err.Error())
return false, err
}
klog.V(3).Infof("Release CloneSet(%v) Successfully", client.ObjectKeyFromObject(clone))
return true, nil
}
// scale the deployment
func (c *cloneSetController) patchCloneSetPartition(clone *kruiseappsv1alpha1.CloneSet, partition int32) error {
patch := map[string]interface{}{
"spec": map[string]interface{}{
"updateStrategy": map[string]interface{}{
"partition": &intstr.IntOrString{Type: intstr.Int, IntVal: partition},
},
},
}
patchByte, _ := json.Marshal(patch)
if err := c.client.Patch(context.TODO(), clone, client.RawPatch(types.MergePatchType, patchByte)); err != nil {
c.recorder.Eventf(c.parentController, v1.EventTypeWarning, "PatchPartitionFailed",
"Failed to update the CloneSet(%v) to the correct target partition %d, error: %v",
c.targetNamespacedName, partition, err)
return err
}
klog.InfoS("Submitted modified partition quest for CloneSet", "CloneSet", c.targetNamespacedName,
"target partition size", partition, "batch", c.releaseStatus.CanaryStatus.CurrentBatch)
return nil
}

View File

@ -0,0 +1,218 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workloads
import (
"context"
"fmt"
"reflect"
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
"github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/util"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
apimachineryruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
var (
scheme *runtime.Scheme
releaseClone = &v1alpha1.BatchRelease{
TypeMeta: metav1.TypeMeta{
APIVersion: v1alpha1.GroupVersion.String(),
Kind: "BatchRelease",
},
ObjectMeta: metav1.ObjectMeta{
Name: "release",
Namespace: "application",
UID: uuid.NewUUID(),
},
Spec: v1alpha1.BatchReleaseSpec{
TargetRef: v1alpha1.ObjectRef{
WorkloadRef: &v1alpha1.WorkloadRef{
APIVersion: "apps.kruise.io/v1alpha1",
Kind: "CloneSet",
Name: "sample",
},
},
ReleasePlan: v1alpha1.ReleasePlan{
Batches: []v1alpha1.ReleaseBatch{
{
CanaryReplicas: intstr.FromString("10%"),
PauseSeconds: 100,
},
{
CanaryReplicas: intstr.FromString("50%"),
PauseSeconds: 100,
},
{
CanaryReplicas: intstr.FromString("80%"),
PauseSeconds: 100,
},
},
},
},
}
stableClone = &kruiseappsv1alpha1.CloneSet{
TypeMeta: metav1.TypeMeta{
APIVersion: kruiseappsv1alpha1.SchemeGroupVersion.String(),
Kind: "CloneSet",
},
ObjectMeta: metav1.ObjectMeta{
Name: "sample",
Namespace: "application",
UID: types.UID("87076677"),
Generation: 1,
Labels: map[string]string{
"app": "busybox",
},
Annotations: map[string]string{
"something": "whatever",
},
},
Spec: kruiseappsv1alpha1.CloneSetSpec{
Replicas: pointer.Int32Ptr(100),
UpdateStrategy: kruiseappsv1alpha1.CloneSetUpdateStrategy{
Partition: &intstr.IntOrString{Type: intstr.Int, IntVal: int32(1)},
MaxSurge: &intstr.IntOrString{Type: intstr.Int, IntVal: int32(2)},
MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: int32(2)},
},
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "busybox",
},
},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: containers("v2"),
},
},
},
Status: kruiseappsv1alpha1.CloneSetStatus{
Replicas: 100,
ReadyReplicas: 100,
UpdatedReplicas: 0,
UpdatedReadyReplicas: 0,
ObservedGeneration: 1,
},
}
)
func init() {
scheme = runtime.NewScheme()
apimachineryruntime.Must(apps.AddToScheme(scheme))
apimachineryruntime.Must(v1alpha1.AddToScheme(scheme))
apimachineryruntime.Must(kruiseappsv1alpha1.AddToScheme(scheme))
canaryTemplate := stableClone.Spec.Template.DeepCopy()
stableTemplate := canaryTemplate.DeepCopy()
stableTemplate.Spec.Containers = containers("v1")
stableClone.Status.CurrentRevision = util.ComputeHash(stableTemplate, nil)
stableClone.Status.UpdateRevision = util.ComputeHash(canaryTemplate, nil)
}
func TestCloneSetController(t *testing.T) {
RegisterFailHandler(Fail)
cases := []struct {
Name string
Paused bool
Cleanup bool
}{
{
Name: "paused=true, cleanup=true",
Paused: true,
Cleanup: true,
},
{
Name: "paused=true, cleanup=false",
Paused: true,
Cleanup: false,
},
{
Name: "paused=false cleanup=true",
Paused: false,
Cleanup: true,
},
{
Name: "paused=false , cleanup=false",
Paused: false,
Cleanup: false,
},
}
for _, cs := range cases {
t.Run(cs.Name, func(t *testing.T) {
cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(releaseClone.DeepCopy(), stableClone.DeepCopy()).Build()
rec := record.NewFakeRecorder(100)
c := cloneSetController{
workloadController: workloadController{
client: cli,
recorder: rec,
parentController: releaseClone,
releasePlan: &releaseClone.Spec.ReleasePlan,
releaseStatus: &releaseClone.Status,
},
targetNamespacedName: client.ObjectKeyFromObject(stableClone),
}
oldObject := &kruiseappsv1alpha1.CloneSet{}
Expect(cli.Get(context.TODO(), c.targetNamespacedName, oldObject)).NotTo(HaveOccurred())
succeed, err := c.claimCloneSet(oldObject.DeepCopy())
Expect(succeed).Should(BeTrue())
Expect(err).NotTo(HaveOccurred())
newObject := &kruiseappsv1alpha1.CloneSet{}
Expect(cli.Get(context.TODO(), c.targetNamespacedName, newObject)).NotTo(HaveOccurred())
succeed, err = c.releaseCloneSet(newObject.DeepCopy(), &cs.Paused, cs.Cleanup)
Expect(succeed).Should(BeTrue())
Expect(err).NotTo(HaveOccurred())
newObject = &kruiseappsv1alpha1.CloneSet{}
Expect(cli.Get(context.TODO(), c.targetNamespacedName, newObject)).NotTo(HaveOccurred())
newObject.Spec.UpdateStrategy.Paused = oldObject.Spec.UpdateStrategy.Paused
newObject.Spec.UpdateStrategy.Partition = oldObject.Spec.UpdateStrategy.Partition
Expect(reflect.DeepEqual(oldObject.Spec, newObject.Spec)).Should(BeTrue())
Expect(reflect.DeepEqual(oldObject.Labels, newObject.Labels)).Should(BeTrue())
Expect(reflect.DeepEqual(oldObject.Finalizers, newObject.Finalizers)).Should(BeTrue())
oldObject.Annotations[util.StashCloneSetPartition] = ""
newObject.Annotations[util.StashCloneSetPartition] = ""
Expect(reflect.DeepEqual(oldObject.Annotations, newObject.Annotations)).Should(BeTrue())
})
}
}
func containers(version string) []corev1.Container {
return []corev1.Container{
{
Name: "busybox",
Image: fmt.Sprintf("busybox:%v", version),
},
}
}

View File

@ -0,0 +1,498 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workloads
import (
"context"
"fmt"
"sort"
"github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/util"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
// DeploymentsRolloutController is responsible for handling Deployment type of workloads
type DeploymentsRolloutController struct {
deploymentController
stable *apps.Deployment
canary *apps.Deployment
}
//TODO: scale during releasing: workload replicas changed -> Finalising Deployment with Paused=true
// NewDeploymentRolloutController creates a new Deployment rollout controller
func NewDeploymentRolloutController(cli client.Client, recorder record.EventRecorder, release *v1alpha1.BatchRelease, plan *v1alpha1.ReleasePlan, status *v1alpha1.BatchReleaseStatus, stableNamespacedName types.NamespacedName) *DeploymentsRolloutController {
return &DeploymentsRolloutController{
deploymentController: deploymentController{
workloadController: workloadController{
client: cli,
recorder: recorder,
parentController: release,
releasePlan: plan,
releaseStatus: status,
},
stableNamespacedName: stableNamespacedName,
canaryNamespacedName: stableNamespacedName,
releaseKey: client.ObjectKeyFromObject(release),
},
}
}
// IfNeedToProgress verifies that the workload is ready to execute release plan
func (c *DeploymentsRolloutController) IfNeedToProgress() (bool, error) {
var verifyErr error
defer func() {
if verifyErr != nil {
klog.Error(verifyErr)
c.recorder.Event(c.parentController, v1.EventTypeWarning, "VerifyFailed", verifyErr.Error())
}
}()
if err := c.fetchStableDeployment(); err != nil {
return false, nil
}
// if the workload status is untrustworthy, return and retry
if c.stable.Status.ObservedGeneration != c.stable.Generation {
klog.Warningf("Deployment(%v) is still reconciling, wait for it to be done", c.stableNamespacedName)
return false, nil
}
// if the workload has been promoted, return and not retry
if c.stable.Status.UpdatedReplicas == *c.stable.Spec.Replicas {
verifyErr = fmt.Errorf("deployment(%v) update revision has been promoted, no need to rollout", c.stableNamespacedName)
return false, verifyErr
}
// if the workload is not paused, no need to progress it
if !c.stable.Spec.Paused {
verifyErr = fmt.Errorf("deployment(%v) should be paused before execute the release plan", c.stableNamespacedName)
return false, verifyErr
}
klog.Infof("Verified Deployment(%v) Successfully, Status %+v", c.stableNamespacedName, c.releaseStatus)
c.recorder.Event(c.parentController, v1.EventTypeNormal, "RolloutVerified", "ReleasePlan and the Deployment resource are verified")
return true, nil
}
// PrepareBeforeProgress makes sure that the Deployment is under our control
func (c *DeploymentsRolloutController) PrepareBeforeProgress() (bool, error) {
if err := c.fetchStableDeployment(); err != nil {
return false, nil
}
if err := c.fetchCanaryDeployment(); client.IgnoreNotFound(err) != nil {
return false, nil
}
// claim the deployment is under our control, and create canary deployment if it needs
if _, err := c.claimDeployment(c.stable, c.canary); err != nil {
return false, nil
}
// the workload is verified, and we will record revision and replicas info before progressing
if err := c.recordDeploymentRevisionAndReplicas(); err != nil {
klog.Errorf("Failed to record deployment(%v) revision and replicas info, error: %v", c.stableNamespacedName, err)
return false, nil
}
c.recorder.Event(c.parentController, v1.EventTypeNormal, "Rollout Initialized", "Rollout resource are initialized")
return true, nil
}
// ProgressOneBatchReplicas calculates the number of pods we can upgrade once
// according to the release plan and then set the canary deployment replicas
func (c *DeploymentsRolloutController) ProgressOneBatchReplicas() (bool, error) {
if err := c.fetchStableDeployment(); err != nil {
return false, nil
}
if err := c.fetchCanaryDeployment(); err != nil {
return false, nil
}
// canary replicas now we have at current state
currentCanaryReplicas := *c.canary.Spec.Replicas
// canary goal we should achieve
canaryGoal := c.calculateCurrentCanary(c.releaseStatus.ObservedWorkloadReplicas)
// in case of no need to upgrade in current batch
if currentCanaryReplicas >= canaryGoal {
klog.V(3).InfoS("upgraded one batch, but no need to update replicas of canary Deployment",
"Deployment", client.ObjectKeyFromObject(c.canary), "BatchRelease", c.releaseKey,
"current-batch", c.releaseStatus.CanaryStatus.CurrentBatch, "goal-canary-replicas", canaryGoal,
"current-canary-replicas", currentCanaryReplicas, "current-canary-pod-count", c.canary.Status.UpdatedReplicas)
return true, nil
}
// upgrade pods if it needs
if err := c.patchCanaryReplicas(c.canary, canaryGoal); err != nil {
return false, nil
}
klog.V(3).Infof("Deployment(%v) upgraded one batch, BatchRelease(%v), current batch=%v, canary goal size=%v",
client.ObjectKeyFromObject(c.canary), c.releaseKey, c.releaseStatus.CanaryStatus.CurrentBatch, canaryGoal)
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "Batch Rollout", "Finished submitting all upgrade quests for batch %d", c.releaseStatus.CanaryStatus.CurrentBatch)
return true, nil
}
// CheckOneBatchReplicas checks to see if the pods are all available according to the rollout plan
func (c *DeploymentsRolloutController) CheckOneBatchReplicas() (bool, error) {
if err := c.fetchStableDeployment(); err != nil {
return false, nil
}
if err := c.fetchCanaryDeployment(); err != nil {
return false, nil
}
// in case of workload status is Untrustworthy
if c.canary.Status.ObservedGeneration != c.canary.Generation {
return false, nil
}
// canary pods that have been created
canaryPodCount := c.canary.Status.Replicas
// canary pods that have been available
availableCanaryPodCount := c.canary.Status.AvailableReplicas
// canary goal that should have in current batch
canaryGoal := c.calculateCurrentCanary(c.releaseStatus.ObservedWorkloadReplicas)
// max unavailable allowed replicas
maxUnavailable := 0
if c.canary.Spec.Strategy.RollingUpdate != nil &&
c.canary.Spec.Strategy.RollingUpdate.MaxUnavailable != nil {
maxUnavailable, _ = intstr.GetValueFromIntOrPercent(c.canary.Spec.Strategy.RollingUpdate.MaxUnavailable, int(*c.canary.Spec.Replicas), true)
}
klog.InfoS("checking the batch releasing progress",
"BatchRelease", c.releaseKey, "current-batch", c.releaseStatus.CanaryStatus.CurrentBatch,
"canary-available-pod-count", availableCanaryPodCount, "stable-pod-count", c.stable.Status.Replicas,
"maxUnavailable-pod-allowed", maxUnavailable, "canary-goal", canaryGoal)
// make sure there is at least one pod is available
if canaryPodCount < canaryGoal || availableCanaryPodCount+int32(maxUnavailable) < canaryGoal || (canaryGoal > 0 && availableCanaryPodCount == 0) {
klog.Infof("BatchRelease(%v) batch is not ready yet, current batch=%v", c.releaseKey, c.releaseStatus.CanaryStatus.CurrentBatch)
return false, nil
}
klog.InfoS("Deployment all pods in current batch are ready", "Deployment", client.ObjectKeyFromObject(c.canary), "current batch", c.releaseStatus.CanaryStatus.CurrentBatch)
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "Batch Available", "Batch %d is available", c.releaseStatus.CanaryStatus.CurrentBatch)
return true, nil
}
// FinalizeOneBatch isn't needed in this mode.
func (c *DeploymentsRolloutController) FinalizeOneBatch() (bool, error) {
return true, nil
}
// FinalizeProgress makes sure restore deployments and clean up some canary settings
func (c *DeploymentsRolloutController) FinalizeProgress(pause *bool, cleanup bool) bool {
if err := c.fetchStableDeployment(); client.IgnoreNotFound(err) != nil {
return false
}
// make the deployment ride out of our control, and clean up canary resources
succeed, err := c.releaseDeployment(c.stable, pause, cleanup)
if !succeed || err != nil {
klog.Errorf("Failed to finalize deployment(%v), error: %v", c.stableNamespacedName, err)
return false
}
c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "Finalized", "Finalized: paused=%v, cleanup=%v", pause, cleanup)
return true
}
// SyncWorkloadInfo return workloadInfo if workload info is changed during rollout
func (c *DeploymentsRolloutController) SyncWorkloadInfo() (WorkloadChangeEventType, *WorkloadInfo, error) {
// ignore the sync if the release plan is at the following states
if c.parentController.Spec.Cancelled ||
c.parentController.DeletionTimestamp != nil ||
c.releaseStatus.Phase == v1alpha1.RolloutPhaseAbort ||
c.releaseStatus.Phase == v1alpha1.RolloutPhaseFinalizing ||
c.releaseStatus.Phase == v1alpha1.RolloutPhaseTerminating {
return IgnoreWorkloadEvent, nil, nil
}
var err error
workloadInfo := &WorkloadInfo{}
err = c.fetchStableDeployment()
if err != nil {
return "", nil, err
}
err = c.fetchCanaryDeployment()
if err != nil {
if !apierrors.IsNotFound(err) {
return "", nil, err
}
} else {
workloadInfo.Status = &WorkloadStatus{
UpdatedReplicas: c.canary.Status.Replicas,
UpdatedReadyReplicas: c.canary.Status.AvailableReplicas,
}
}
// if the canary deployment is deleted but still have finalizers, it is out of our expectation
if c.canary != nil && c.canary.DeletionTimestamp != nil &&
controllerutil.ContainsFinalizer(c.canary, util.CanaryDeploymentFinalizer) {
return WorkloadUnHealthy, workloadInfo, nil
}
// check whether the workload status is trustworthy
if c.stable.Status.ObservedGeneration != c.stable.Generation {
klog.Warningf("Deployment(%v) is still reconciling, waiting for it to complete, generation: %v, observed: %v",
c.stableNamespacedName, c.stable.Generation, c.stable.Status.ObservedGeneration)
return WorkloadStillReconciling, workloadInfo, nil
}
// check whether the workload has been promoted
if !c.stable.Spec.Paused && c.stable.Status.UpdatedReplicas == c.stable.Status.Replicas {
return IgnoreWorkloadEvent, workloadInfo, nil
}
var updateRevision string
switch c.releaseStatus.Phase {
// these two phase should not care about too many event types
case v1alpha1.RolloutPhaseHealthy, v1alpha1.RolloutPhaseInitial, v1alpha1.RolloutPhasePreparing:
return IgnoreWorkloadEvent, workloadInfo, nil
// any other phase should care about the following event types
default:
// check if workload needs rollback
if needsRollingBack, err := c.NeedsRollingBack(); err != nil {
return "", workloadInfo, err
} else if needsRollingBack {
workloadInfo.UpdateRevision = &updateRevision
return WorkloadRollback, workloadInfo, nil
}
// check if workload is scaling up/down
if *c.stable.Spec.Replicas != c.releaseStatus.ObservedWorkloadReplicas {
workloadInfo.Replicas = c.stable.Spec.Replicas
klog.Warningf("Deployment(%v) replicas changed during releasing, should pause and wait for it to complete, replicas from: %v -> %v",
c.stableNamespacedName, c.releaseStatus.ObservedWorkloadReplicas, *c.stable.Spec.Replicas)
return WorkloadReplicasChanged, workloadInfo, nil
}
fallthrough
// these two phase should only care about workload revision changed event
case v1alpha1.RolloutPhaseCompleted, v1alpha1.RolloutPhaseCancelled:
// check if workload revision was changed
_, err = c.GetPodTemplateHash(c.stable, Latest)
if (c.canary == nil || !util.EqualIgnoreHash(&c.stable.Spec.Template, &c.canary.Spec.Template)) && apierrors.IsNotFound(err) {
workloadInfo.UpdateRevision = &updateRevision
klog.Warningf("Deployment(%v) updateRevision changed during releasing", c.stableNamespacedName)
return WorkloadPodTemplateChanged, workloadInfo, nil
}
}
return IgnoreWorkloadEvent, workloadInfo, nil
}
/* ----------------------------------
The functions below are helper functions
------------------------------------- */
// NeedsRollingBack returns 'true' if the workload needs to rollback
func (c *DeploymentsRolloutController) NeedsRollingBack() (bool, error) {
rss, err := c.listReplicaSetsFor(c.stable)
if err != nil {
return false, err
}
for _, rs := range rss {
if c.releaseStatus.StableRevision != "" && *rs.Spec.Replicas > 0 &&
util.EqualIgnoreHash(&rs.Spec.Template, &c.stable.Spec.Template) {
return true, nil
}
}
return false, nil
}
// fetch stable deployment to c.stable
func (c *DeploymentsRolloutController) fetchStableDeployment() error {
if c.stable != nil {
return nil
}
stable := &apps.Deployment{}
if err := c.client.Get(context.TODO(), c.stableNamespacedName, stable); err != nil {
if !apierrors.IsNotFound(err) {
c.recorder.Event(c.parentController, v1.EventTypeWarning, "GetStableDeploymentFailed", err.Error())
}
return err
}
c.stable = stable
return nil
}
// fetch canary deployment to c.canary
func (c *DeploymentsRolloutController) fetchCanaryDeployment() error {
err := c.fetchStableDeployment()
if err != nil {
return err
}
ds, err := c.listCanaryDeployment(client.InNamespace(c.stable.Namespace))
if err != nil {
return err
}
ds = util.FilterActiveDeployment(ds)
sort.Slice(ds, func(i, j int) bool {
return ds[i].CreationTimestamp.After(ds[j].CreationTimestamp.Time)
})
if len(ds) == 0 || !util.EqualIgnoreHash(&ds[0].Spec.Template, &c.stable.Spec.Template) {
err := apierrors.NewNotFound(schema.GroupResource{
Group: apps.SchemeGroupVersion.Group,
Resource: c.stable.Kind,
}, c.canaryNamespacedName.Name)
c.recorder.Event(c.parentController, v1.EventTypeWarning, "GetCanaryDeploymentFailed", err.Error())
return err
}
c.canary = ds[0]
return nil
}
// the target workload size for the current batch
func (c *DeploymentsRolloutController) calculateCurrentCanary(totalSize int32) int32 {
targetSize := int32(util.CalculateNewBatchTarget(c.releasePlan, int(totalSize), int(c.releaseStatus.CanaryStatus.CurrentBatch)))
klog.InfoS("Calculated the number of pods in the canary Deployment after current batch",
"Deployment", c.stableNamespacedName, "BatchRelease", c.releaseKey,
"current batch", c.releaseStatus.CanaryStatus.CurrentBatch, "workload updateRevision size", targetSize)
return targetSize
}
// the source workload size for the current batch
func (c *DeploymentsRolloutController) calculateCurrentStable(totalSize int32) int32 {
sourceSize := totalSize - c.calculateCurrentCanary(totalSize)
klog.InfoS("Calculated the number of pods in the stable Deployment after current batch",
"Deployment", c.stableNamespacedName, "BatchRelease", c.releaseKey,
"current batch", c.releaseStatus.CanaryStatus.CurrentBatch, "workload stableRevision size", sourceSize)
return sourceSize
}
// recordDeploymentRevisionAndReplicas records stableRevision, canaryRevision, workloadReplicas to BatchRelease.Status
func (c *DeploymentsRolloutController) recordDeploymentRevisionAndReplicas() error {
err := c.fetchStableDeployment()
if err != nil {
return err
}
err = c.fetchCanaryDeployment()
if err != nil {
return err
}
c.releaseStatus.StableRevision, err = c.GetPodTemplateHash(c.stable, Stable)
if err != nil {
return err
}
c.releaseStatus.UpdateRevision, err = c.GetPodTemplateHash(c.canary, Latest)
if err != nil {
return err
}
c.releaseStatus.ObservedWorkloadReplicas = *c.stable.Spec.Replicas
return nil
}
// GetPodTemplateHash returns latest/stable revision hash of deployment
func (c *DeploymentsRolloutController) GetPodTemplateHash(deploy *apps.Deployment, kind PodTemplateHashType) (string, error) {
switch kind {
case Latest, Stable:
default:
panic("wrong kind type, must be 'stable' or 'canary'")
}
if deploy == nil {
return "", fmt.Errorf("workload cannot be found, may be deleted or not be created yet")
}
rss, err := c.listReplicaSetsFor(deploy)
if err != nil {
return "", err
}
sort.Slice(rss, func(i, j int) bool {
return rss[i].CreationTimestamp.Before(&rss[j].CreationTimestamp)
})
for _, rs := range rss {
switch kind {
case Stable:
if rs.Spec.Replicas != nil && *rs.Spec.Replicas > 0 {
return rs.Labels[apps.DefaultDeploymentUniqueLabelKey], nil
}
case Latest:
if util.EqualIgnoreHash(&deploy.Spec.Template, &rs.Spec.Template) {
return rs.Labels[apps.DefaultDeploymentUniqueLabelKey], nil
}
}
}
notFoundErr := apierrors.NewNotFound(schema.GroupResource{
Group: apps.SchemeGroupVersion.Group,
Resource: fmt.Sprintf("%v-ReplicaSet", kind),
}, c.canaryNamespacedName.Name)
return "", notFoundErr
}
// listReplicaSetsFor list all owned replicaSets of deployment, including those have deletionTimestamp
func (c *DeploymentsRolloutController) listReplicaSetsFor(deploy *apps.Deployment) ([]*apps.ReplicaSet, error) {
deploySelector, err := metav1.LabelSelectorAsSelector(deploy.Spec.Selector)
if err != nil {
return nil, err
}
rsList := &apps.ReplicaSetList{}
err = c.client.List(context.TODO(), rsList, &client.ListOptions{Namespace: deploy.Namespace, LabelSelector: deploySelector})
if err != nil {
return nil, err
}
var rss []*apps.ReplicaSet
for i := range rsList.Items {
rs := &rsList.Items[i]
if rs.DeletionTimestamp != nil {
continue
}
if owner := metav1.GetControllerOf(rs); owner == nil || owner.UID != deploy.UID {
continue
}
rss = append(rss, rs)
}
return rss, nil
}
type PodTemplateHashType string
const (
Latest PodTemplateHashType = "Latest"
Stable PodTemplateHashType = "Stable"
)

View File

@ -0,0 +1,262 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workloads
import (
"context"
"encoding/json"
"fmt"
"sort"
"github.com/openkruise/rollouts/pkg/util"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
// deploymentController is the place to hold fields needed for handle Deployment type of workloads
type deploymentController struct {
workloadController
releaseKey types.NamespacedName
stableNamespacedName types.NamespacedName
canaryNamespacedName types.NamespacedName
}
// add the parent controller to the owner of the deployment, unpause it and initialize the size
// before kicking start the update and start from every pod in the old version
func (c *deploymentController) claimDeployment(stableDeploy, canaryDeploy *apps.Deployment) (*apps.Deployment, error) {
var controlled bool
if controlInfo, ok := stableDeploy.Annotations[util.BatchReleaseControlAnnotation]; ok && controlInfo != "" {
ref := &metav1.OwnerReference{}
err := json.Unmarshal([]byte(controlInfo), ref)
if err == nil && ref.UID == c.parentController.UID {
klog.V(3).Infof("Deployment(%v) has been controlled by this BatchRelease(%v), no need to claim again",
c.stableNamespacedName, c.releaseKey)
controlled = true
} else {
klog.Errorf("Failed to parse controller info from Deployment(%v) annotation, error: %v, controller info: %+v",
c.stableNamespacedName, err, *ref)
}
}
// patch control info to stable deployments if it needs
if !controlled {
controlInfo, _ := json.Marshal(metav1.NewControllerRef(c.parentController, c.parentController.GetObjectKind().GroupVersionKind()))
patchedInfo := map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]string{
util.BatchReleaseControlAnnotation: string(controlInfo),
},
},
}
patchedBody, _ := json.Marshal(patchedInfo)
if err := c.client.Patch(context.TODO(), stableDeploy, client.RawPatch(types.StrategicMergePatchType, patchedBody)); err != nil {
klog.Errorf("Failed to patch controller info annotations to stable deployment(%v), error: %v", client.ObjectKeyFromObject(canaryDeploy), err)
return canaryDeploy, err
}
}
// create canary deployment if it needs
if canaryDeploy == nil || !util.EqualIgnoreHash(&stableDeploy.Spec.Template, &canaryDeploy.Spec.Template) {
var err error
var collisionCount int32
if c.releaseStatus.CollisionCount != nil {
collisionCount = *c.releaseStatus.CollisionCount
}
for {
canaryDeploy, err = c.createCanaryDeployment(stableDeploy, &collisionCount)
if errors.IsAlreadyExists(err) {
collisionCount++
continue
} else if err != nil {
return nil, err
}
break
}
if collisionCount > 0 {
c.releaseStatus.CollisionCount = pointer.Int32Ptr(collisionCount)
}
}
return canaryDeploy, nil
}
func (c *deploymentController) createCanaryDeployment(stableDeploy *apps.Deployment, collisionCount *int32) (*apps.Deployment, error) {
// TODO: find a better way to generate canary deployment name
suffix := util.ShortRandomStr(collisionCount)
canaryDeploy := &apps.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v-%v", c.canaryNamespacedName.Name, suffix),
Namespace: c.stableNamespacedName.Namespace,
Labels: map[string]string{},
Annotations: map[string]string{},
},
}
for k, v := range stableDeploy.Labels {
canaryDeploy.Labels[k] = v
}
for k, v := range stableDeploy.Annotations {
canaryDeploy.Annotations[k] = v
}
for _, f := range stableDeploy.Finalizers {
canaryDeploy.Finalizers = append(canaryDeploy.Finalizers, f)
}
for _, o := range stableDeploy.OwnerReferences {
canaryDeploy.OwnerReferences = append(canaryDeploy.OwnerReferences, *o.DeepCopy())
}
canaryDeploy.Finalizers = append(canaryDeploy.Finalizers, util.CanaryDeploymentFinalizer)
canaryDeploy.OwnerReferences = append(canaryDeploy.OwnerReferences, *metav1.NewControllerRef(
c.parentController, c.parentController.GroupVersionKind()))
// set extra labels & annotations
canaryDeploy.Labels[util.CanaryDeploymentLabelKey] = c.stableNamespacedName.Name
owner := metav1.NewControllerRef(c.parentController, c.parentController.GroupVersionKind())
if owner != nil {
ownerInfo, _ := json.Marshal(owner)
canaryDeploy.Annotations[util.BatchReleaseControlAnnotation] = string(ownerInfo)
}
// copy spec
canaryDeploy.Spec = *stableDeploy.Spec.DeepCopy()
canaryDeploy.Spec.Replicas = pointer.Int32Ptr(0)
canaryDeploy.Spec.Paused = false
canaryKey := client.ObjectKeyFromObject(canaryDeploy)
// create canary Deployment
err := c.client.Create(context.TODO(), canaryDeploy)
if err != nil {
klog.Errorf("Failed to create canary Deployment(%v), error: %v", canaryKey, err)
return nil, err
}
canaryDeployInfo, _ := json.Marshal(canaryDeploy)
klog.V(3).Infof("Create canary Deployment(%v) successfully, details: %v", canaryKey, string(canaryDeployInfo))
// fetch the canary Deployment
var fetchedCanary *apps.Deployment
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
fetchedCanary = &apps.Deployment{}
return c.client.Get(context.TODO(), canaryKey, fetchedCanary)
})
return fetchedCanary, err
}
func (c *deploymentController) releaseDeployment(stableDeploy *apps.Deployment, pause *bool, cleanup bool) (bool, error) {
var patchErr, deleteErr error
// clean up control info for stable deployment if it needs
if stableDeploy != nil && (len(stableDeploy.Annotations[util.BatchReleaseControlAnnotation]) > 0 || (pause != nil && stableDeploy.Spec.Paused != *pause)) {
var patchByte []byte
if pause == nil {
patchByte = []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%v":null}}}`, util.BatchReleaseControlAnnotation))
} else {
patchByte = []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%v":null}},"spec":{"paused":%v}}`, util.BatchReleaseControlAnnotation, *pause))
}
patchErr = c.client.Patch(context.TODO(), stableDeploy, client.RawPatch(types.StrategicMergePatchType, patchByte))
if patchErr != nil {
klog.Errorf("Error occurred when patching Deployment(%v), error: %v", c.stableNamespacedName, patchErr)
return false, patchErr
}
}
// clean up canary deployment if it needs
if cleanup {
ds, err := c.listCanaryDeployment(client.InNamespace(c.stableNamespacedName.Namespace))
if err != nil {
return false, err
}
// must make sure the older is deleted firstly
sort.Slice(ds, func(i, j int) bool {
return ds[i].CreationTimestamp.Before(&ds[j].CreationTimestamp)
})
// delete all the canary deployments
for _, d := range ds {
// clean up finalizers first
if controllerutil.ContainsFinalizer(d, util.CanaryDeploymentFinalizer) {
updateErr := util.UpdateFinalizer(c.client, d, "Remove", util.CanaryDeploymentFinalizer)
if updateErr != nil && !errors.IsNotFound(updateErr) {
klog.Error("Error occurred when updating Deployment(%v), error: %v", client.ObjectKeyFromObject(d), updateErr)
return false, updateErr
}
return false, nil
}
// delete the deployment
deleteErr = c.client.Delete(context.TODO(), d)
if deleteErr != nil && !errors.IsNotFound(deleteErr) {
klog.Errorf("Error occurred when deleting Deployment(%v), error: %v", client.ObjectKeyFromObject(d), deleteErr)
return false, deleteErr
}
}
}
klog.V(3).Infof("Release Deployment(%v) Successfully", c.stableNamespacedName)
return true, nil
}
// scale the deployment
func (c *deploymentController) patchCanaryReplicas(canaryDeploy *apps.Deployment, replicas int32) error {
patch := map[string]interface{}{
"spec": map[string]interface{}{
"replicas": pointer.Int32Ptr(replicas),
},
}
patchByte, _ := json.Marshal(patch)
if err := c.client.Patch(context.TODO(), canaryDeploy, client.RawPatch(types.MergePatchType, patchByte)); err != nil {
c.recorder.Eventf(c.parentController, v1.EventTypeWarning, "PatchPartitionFailed",
"Failed to update the canary Deployment to the correct canary replicas %d, error: %v", replicas, err)
return err
}
klog.InfoS("Submitted modified partition quest for canary Deployment", "Deployment",
client.ObjectKeyFromObject(canaryDeploy), "target canary replicas size", replicas, "batch", c.releaseStatus.CanaryStatus.CurrentBatch)
return nil
}
func (c *deploymentController) listCanaryDeployment(options ...client.ListOption) ([]*apps.Deployment, error) {
dList := &apps.DeploymentList{}
if err := c.client.List(context.TODO(), dList, options...); err != nil {
return nil, err
}
var ds []*apps.Deployment
for i := range dList.Items {
d := &dList.Items[i]
o := metav1.GetControllerOf(d)
if o == nil || o.UID != c.parentController.UID {
continue
}
ds = append(ds, d)
}
return ds, nil
}

View File

@ -0,0 +1,202 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workloads
import (
"context"
"reflect"
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/openkruise/rollouts/api/v1alpha1"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
var (
releaseDeploy = &v1alpha1.BatchRelease{
TypeMeta: metav1.TypeMeta{
APIVersion: v1alpha1.GroupVersion.String(),
Kind: "BatchRelease",
},
ObjectMeta: metav1.ObjectMeta{
Name: "release",
Namespace: "application",
UID: uuid.NewUUID(),
},
Spec: v1alpha1.BatchReleaseSpec{
TargetRef: v1alpha1.ObjectRef{
WorkloadRef: &v1alpha1.WorkloadRef{
APIVersion: "apps/v1",
Kind: "Deployment",
Name: "sample",
},
},
ReleasePlan: v1alpha1.ReleasePlan{
Batches: []v1alpha1.ReleaseBatch{
{
CanaryReplicas: intstr.FromString("10%"),
PauseSeconds: 100,
},
{
CanaryReplicas: intstr.FromString("50%"),
PauseSeconds: 100,
},
{
CanaryReplicas: intstr.FromString("80%"),
PauseSeconds: 100,
},
},
},
},
}
stableDeploy = &apps.Deployment{
TypeMeta: metav1.TypeMeta{
APIVersion: apps.SchemeGroupVersion.String(),
Kind: "Deployment",
},
ObjectMeta: metav1.ObjectMeta{
Name: "sample",
Namespace: "application",
UID: types.UID("87076677"),
Generation: 2,
Labels: map[string]string{
"app": "busybox",
apps.DefaultDeploymentUniqueLabelKey: "update-pod-hash",
},
},
Spec: apps.DeploymentSpec{
Replicas: pointer.Int32Ptr(100),
Strategy: apps.DeploymentStrategy{
Type: apps.RollingUpdateDeploymentStrategyType,
RollingUpdate: &apps.RollingUpdateDeployment{
MaxSurge: &intstr.IntOrString{Type: intstr.Int, IntVal: int32(1)},
MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: int32(2)},
},
},
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "busybox",
},
},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: containers("v2"),
},
},
},
Status: apps.DeploymentStatus{
Replicas: 100,
ReadyReplicas: 100,
UpdatedReplicas: 0,
AvailableReplicas: 100,
},
}
)
func TestDeploymentController(t *testing.T) {
RegisterFailHandler(Fail)
cases := []struct {
Name string
Paused bool
Cleanup bool
}{
{
Name: "paused=true, cleanup=true",
Paused: true,
Cleanup: true,
},
{
Name: "paused=true, cleanup=false",
Paused: true,
Cleanup: false,
},
{
Name: "paused=false cleanup=true",
Paused: false,
Cleanup: true,
},
{
Name: "paused=false , cleanup=false",
Paused: false,
Cleanup: false,
},
}
for _, cs := range cases {
t.Run(cs.Name, func(t *testing.T) {
release := releaseDeploy.DeepCopy()
deploy := stableDeploy.DeepCopy()
cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(release, deploy).Build()
rec := record.NewFakeRecorder(100)
c := deploymentController{
workloadController: workloadController{
client: cli,
recorder: rec,
parentController: releaseDeploy,
releasePlan: &releaseDeploy.Spec.ReleasePlan,
releaseStatus: &releaseDeploy.Status,
},
stableNamespacedName: client.ObjectKeyFromObject(stableDeploy),
canaryNamespacedName: client.ObjectKeyFromObject(stableDeploy),
}
oldObject := &apps.Deployment{}
Expect(cli.Get(context.TODO(), c.stableNamespacedName, oldObject)).NotTo(HaveOccurred())
canary, err := c.claimDeployment(oldObject.DeepCopy(), nil)
Expect(canary).ShouldNot(BeNil())
Expect(err).NotTo(HaveOccurred())
// The following logic should have been done in controller-runtime
{
dList := &apps.DeploymentList{}
Expect(cli.List(context.TODO(), dList)).NotTo(HaveOccurred())
for i := range dList.Items {
d := &dList.Items[i]
d.SetGroupVersionKind(schema.GroupVersionKind{
Group: "apps", Version: "v1", Kind: "Deployment",
})
Expect(cli.Update(context.TODO(), d)).NotTo(HaveOccurred())
}
}
newObject := &apps.Deployment{}
Expect(cli.Get(context.TODO(), c.stableNamespacedName, newObject)).NotTo(HaveOccurred())
_, err = c.releaseDeployment(newObject.DeepCopy(), &cs.Paused, cs.Cleanup)
Expect(err).NotTo(HaveOccurred())
newObject = &apps.Deployment{}
Expect(cli.Get(context.TODO(), c.stableNamespacedName, newObject)).NotTo(HaveOccurred())
newObject.Spec.Paused = oldObject.Spec.Paused
Expect(reflect.DeepEqual(oldObject.Spec, newObject.Spec)).Should(BeTrue())
Expect(reflect.DeepEqual(oldObject.Labels, newObject.Labels)).Should(BeTrue())
Expect(reflect.DeepEqual(oldObject.Finalizers, newObject.Finalizers)).Should(BeTrue())
Expect(reflect.DeepEqual(oldObject.Annotations, newObject.Annotations)).Should(BeTrue())
})
}
}

View File

@ -0,0 +1,108 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workloads
import (
"github.com/openkruise/rollouts/api/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type WorkloadChangeEventType string
const (
IgnoreWorkloadEvent WorkloadChangeEventType = "workload-not-cared"
WorkloadRollback WorkloadChangeEventType = "workload-rollback"
WorkloadPodTemplateChanged WorkloadChangeEventType = "workload-pod-template-changed"
WorkloadReplicasChanged WorkloadChangeEventType = "workload-replicas-changed"
WorkloadStillReconciling WorkloadChangeEventType = "workload-is-reconciling"
WorkloadUnHealthy WorkloadChangeEventType = "workload-is-unhealthy"
)
type WorkloadStatus struct {
Replicas int32
ReadyReplicas int32
UpdatedReplicas int32
UpdatedReadyReplicas int32
ObservedGeneration int64
}
type WorkloadInfo struct {
Paused bool
Replicas *int32
UpdateRevision *string
Status *WorkloadStatus
Metadata *metav1.ObjectMeta
}
type workloadController struct {
client client.Client
recorder record.EventRecorder
releasePlan *v1alpha1.ReleasePlan
releaseStatus *v1alpha1.BatchReleaseStatus
parentController *v1alpha1.BatchRelease
}
// WorkloadController is the interface that all type of cloneSet controller implements
type WorkloadController interface {
// IfNeedToProgress makes sure that the resources can be upgraded according to the release plan.
// it returns 'true' if the verification is succeeded.
// it returns 'false' if the verification should retry.
// it returns not-empty error if the verification has something wrong, and should not retry.
IfNeedToProgress() (bool, error)
// PrepareBeforeProgress make sure that the resource is ready to be progressed.
// this function is tasked to do any initialization work on the resources.
// it returns 'true' if the preparation is succeeded.
// it returns 'false' if the preparation should retry.
// it returns not-empty error if the preparation has something wrong, and should not retry.
PrepareBeforeProgress() (bool, error)
// ProgressOneBatchReplicas tries to upgrade old replicas following the release plan.
// it will upgrade the old replicas as the release plan allows in the current batch.
// it returns 'true' if the progress is succeeded.
// it returns 'false' if the progress should retry.
// it returns not-empty error if the progress has something wrong, and should not retry.
ProgressOneBatchReplicas() (bool, error)
// CheckOneBatchReplicas checks how many replicas are ready to serve requests in the current batch.
// it returns 'true' if the batch has been ready.
// it returns 'false' if the batch should be reset and recheck.
// it returns not-empty error if the check operation has something wrong, and should not retry.
CheckOneBatchReplicas() (bool, error)
// FinalizeOneBatch makes sure that the rollout can start the next batch
// it returns 'true' if the operation is succeeded.
// it returns 'false' if the operation should be retried.
// it returns not-empty error if the check operation has something wrong, and should not retry.
FinalizeOneBatch() (bool, error)
// FinalizeProgress makes sure the resources are in a good final state.
// It might depend on if the rollout succeeded or not.
// For example, we may remove the objects which created by batchRelease.
// this function will always retry util it returns 'true'.
// parameters:
// - pause: 'nil' means keep current state, 'true' means pause workload, 'false' means do not pause workload
// - cleanup: 'true' means clean up canary settings, 'false' means do not clean up.
FinalizeProgress(pause *bool, cleanup bool) bool
// SyncWorkloadInfo will watch and compare the status recorded in BatchRelease.Status
// and the real-time workload info. If workload status is inconsistent with that recorded
// in release.status, will return the corresponding WorkloadChangeEventType and info.
SyncWorkloadInfo() (WorkloadChangeEventType, *WorkloadInfo, error)
}

1
go.mod
View File

@ -7,6 +7,7 @@ require (
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.17.0
github.com/openkruise/kruise-api v1.0.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.22.5
k8s.io/apiextensions-apiserver v0.22.5
k8s.io/apimachinery v0.22.5

14
main.go
View File

@ -22,8 +22,10 @@ import (
kruisev1aplphal "github.com/openkruise/kruise-api/apps/v1alpha1"
rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
br "github.com/openkruise/rollouts/controllers/batchrelease"
"github.com/openkruise/rollouts/controllers/rollout"
"github.com/openkruise/rollouts/pkg/util"
"github.com/openkruise/rollouts/webhook"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
@ -92,6 +94,18 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "Rollout")
}
if err = br.Add(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "BatchRelease")
os.Exit(1)
}
//+kubebuilder:scaffold:builder
setupLog.Info("setup webhook")
if err = webhook.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to setup webhook")
os.Exit(1)
}
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,25 @@
apiVersion: apps.kruise.io/v1alpha1
kind: CloneSet
metadata:
labels:
app: busybox
name: sample
spec:
replicas: 5
selector:
matchLabels:
app: busybox
template:
metadata:
labels:
app: busybox
spec:
containers:
- name: busybox
image: busybox:1.32
imagePullPolicy: IfNotPresent
command: ["bin/sh", "-c", "sleep 10000000"]
resources:
limits:
memory: "10Mi"
cpu: "10m"

View File

@ -0,0 +1,23 @@
apiVersion: rollouts.kruise.io/v1alpha1
kind: BatchRelease
metadata:
name: release-cloneset-100
spec:
targetReference:
type: workloadRef
workloadRef:
apiVersion: apps.kruise.io/v1alpha1
kind: CloneSet
name: sample
releasePlan:
batches:
- canaryReplicas: 1
pauseSeconds: 20
- canaryReplicas: 2
pauseSeconds: 20
- canaryReplicas: 3
pauseSeconds: 20
- canaryReplicas: 5
pauseSeconds: 20

View File

@ -0,0 +1,23 @@
apiVersion: rollouts.kruise.io/v1alpha1
kind: BatchRelease
metadata:
name: release-cloneset-100
spec:
targetReference:
type: workloadRef
workloadRef:
apiVersion: apps.kruise.io/v1alpha1
kind: CloneSet
name: sample
releasePlan:
batches:
- canaryReplicas: 20%
pauseSeconds: 20
- canaryReplicas: 40%
pauseSeconds: 20
- canaryReplicas: 60%
pauseSeconds: 20
- canaryReplicas: 100%
pauseSeconds: 10

View File

@ -0,0 +1,22 @@
apiVersion: rollouts.kruise.io/v1alpha1
kind: BatchRelease
metadata:
name: release-cloneset-50
spec:
targetReference:
type: workloadRef
workloadRef:
apiVersion: apps.kruise.io/v1alpha1
kind: CloneSet
name: sample
releasePlan:
batches:
- canaryReplicas: 20%
pauseSeconds: 30
- canaryReplicas: 40%
pauseSeconds: 60
- canaryReplicas: 50%
pauseSeconds: 30

View File

@ -0,0 +1,25 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: sample
labels:
app: busybox
spec:
replicas: 5
selector:
matchLabels:
app: busybox
template:
metadata:
labels:
app: busybox
spec:
containers:
- name: busybox
image: busybox:1.32
imagePullPolicy: IfNotPresent
command: ["/bin/sh", "-c", "sleep 10000"]
resources:
limits:
memory: "10Mi"
cpu: "10m"

View File

@ -0,0 +1,23 @@
apiVersion: rollouts.kruise.io/v1alpha1
kind: BatchRelease
metadata:
name: release-deployment-number-100
spec:
targetReference:
type: workloadRef
workloadRef:
apiVersion: apps/v1
kind: Deployment
name: sample
releasePlan:
batches:
- canaryReplicas: 1
pauseSeconds: 20
- canaryReplicas: 2
pauseSeconds: 20
- canaryReplicas: 3
pauseSeconds: 20
- canaryReplicas: 5
pauseSeconds: 10

View File

@ -0,0 +1,21 @@
apiVersion: rollouts.kruise.io/v1alpha1
kind: BatchRelease
metadata:
name: release-deployment-percentage-100
spec:
targetReference:
type: workloadRef
workloadRef:
apiVersion: apps/v1
kind: Deployment
name: sample
releasePlan:
batches:
- canaryReplicas: 20%
pauseSeconds: 20
- canaryReplicas: 40%
pauseSeconds: 20
- canaryReplicas: 60%
pauseSeconds: 20
- canaryReplicas: 100%
pauseSeconds: 10

View File

@ -0,0 +1,22 @@
apiVersion: rollouts.kruise.io/v1alpha1
kind: BatchRelease
metadata:
name: release-deployment-percentage-50
spec:
targetReference:
type: workloadRef
workloadRef:
apiVersion: apps/v1
kind: Deployment
name: sample
releasePlan:
batches:
- canaryReplicas: 20%
pauseSeconds: 30
- canaryReplicas: 40%
pauseSeconds: 60
- canaryReplicas: 50%
pauseSeconds: 30

324
test/images/image_utils.go Normal file
View File

@ -0,0 +1,324 @@
/*
Copyright 2022 The Kruise Authors.
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"fmt"
"io/ioutil"
"os"
"strings"
yaml "gopkg.in/yaml.v2"
)
// RegistryList holds public and private image registries
type RegistryList struct {
GcAuthenticatedRegistry string `yaml:"gcAuthenticatedRegistry"`
DockerLibraryRegistry string `yaml:"dockerLibraryRegistry"`
DockerGluster string `yaml:"dockerGluster"`
E2eRegistry string `yaml:"e2eRegistry"`
E2eVolumeRegistry string `yaml:"e2eVolumeRegistry"`
PromoterE2eRegistry string `yaml:"promoterE2eRegistry"`
BuildImageRegistry string `yaml:"buildImageRegistry"`
InvalidRegistry string `yaml:"invalidRegistry"`
GcRegistry string `yaml:"gcRegistry"`
SigStorageRegistry string `yaml:"sigStorageRegistry"`
GcrReleaseRegistry string `yaml:"gcrReleaseRegistry"`
PrivateRegistry string `yaml:"privateRegistry"`
SampleRegistry string `yaml:"sampleRegistry"`
}
// Config holds an images registry, name, and version
type Config struct {
registry string
name string
version string
}
// SetRegistry sets an image registry in a Config struct
func (i *Config) SetRegistry(registry string) {
i.registry = registry
}
// SetName sets an image name in a Config struct
func (i *Config) SetName(name string) {
i.name = name
}
// SetVersion sets an image version in a Config struct
func (i *Config) SetVersion(version string) {
i.version = version
}
func initReg() RegistryList {
registry := RegistryList{
GcAuthenticatedRegistry: "gcr.io/authenticated-image-pulling",
DockerLibraryRegistry: "docker.io/library",
DockerGluster: "docker.io/gluster",
E2eRegistry: "gcr.io/kubernetes-e2e-test-images",
E2eVolumeRegistry: "gcr.io/kubernetes-e2e-test-images/volume",
PromoterE2eRegistry: "k8s.gcr.io/e2e-test-images",
BuildImageRegistry: "k8s.gcr.io/build-image",
InvalidRegistry: "invalid.com/invalid",
GcRegistry: "k8s.gcr.io",
SigStorageRegistry: "k8s.gcr.io/sig-storage",
GcrReleaseRegistry: "gcr.io/gke-release",
PrivateRegistry: "gcr.io/k8s-authenticated-test",
SampleRegistry: "gcr.io/google-samples",
}
repoList := os.Getenv("KUBE_TEST_REPO_LIST")
if repoList == "" {
return registry
}
fileContent, err := ioutil.ReadFile(repoList)
if err != nil {
panic(fmt.Errorf("Error reading '%v' file contents: %v", repoList, err))
}
err = yaml.Unmarshal(fileContent, &registry)
if err != nil {
panic(fmt.Errorf("Error unmarshalling '%v' YAML file: %v", repoList, err))
}
return registry
}
var (
registry = initReg()
dockerLibraryRegistry = registry.DockerLibraryRegistry
dockerGluster = registry.DockerGluster
e2eRegistry = registry.E2eRegistry
e2eVolumeRegistry = registry.E2eVolumeRegistry
promoterE2eRegistry = registry.PromoterE2eRegistry
buildImageRegistry = registry.BuildImageRegistry
gcAuthenticatedRegistry = registry.GcAuthenticatedRegistry
gcRegistry = registry.GcRegistry
sigStorageRegistry = registry.SigStorageRegistry
gcrReleaseRegistry = registry.GcrReleaseRegistry
invalidRegistry = registry.InvalidRegistry
// PrivateRegistry is an image repository that requires authentication
PrivateRegistry = registry.PrivateRegistry
sampleRegistry = registry.SampleRegistry
// Preconfigured image configs
imageConfigs = initImageConfigs()
)
const (
// None is to be used for unset/default images
None = iota
// Agnhost image
Agnhost
// AgnhostPrivate image
AgnhostPrivate
// APIServer image
APIServer
// AppArmorLoader image
AppArmorLoader
// AuthenticatedAlpine image
AuthenticatedAlpine
// AuthenticatedWindowsNanoServer image
AuthenticatedWindowsNanoServer
// BusyBox image
BusyBox
// CheckMetadataConcealment image
CheckMetadataConcealment
// CudaVectorAdd image
CudaVectorAdd
// CudaVectorAdd2 image
CudaVectorAdd2
// DebianIptables Image
DebianIptables
// EchoServer image
EchoServer
// Etcd image
Etcd
// GlusterDynamicProvisioner image
GlusterDynamicProvisioner
// Httpd image
Httpd
// HttpdNew image
HttpdNew
// InvalidRegistryImage image
InvalidRegistryImage
// IpcUtils image
IpcUtils
// JessieDnsutils image
JessieDnsutils
// Kitten image
Kitten
// Nautilus image
Nautilus
// NFSProvisioner image
NFSProvisioner
// Nginx image
Nginx
// NginxNew image
NginxNew
// Nonewprivs image
Nonewprivs
// NonRoot runs with a default user of 1234
NonRoot
// Pause - when these values are updated, also update cmd/kubelet/app/options/container_runtime.go
// Pause image
Pause
// Perl image
Perl
// PrometheusDummyExporter image
PrometheusDummyExporter
// PrometheusToSd image
PrometheusToSd
// Redis image
Redis
// RegressionIssue74839 image
RegressionIssue74839
// ResourceConsumer image
ResourceConsumer
// SdDummyExporter image
SdDummyExporter
// VolumeNFSServer image
VolumeNFSServer
// VolumeISCSIServer image
VolumeISCSIServer
// VolumeGlusterServer image
VolumeGlusterServer
// VolumeRBDServer image
VolumeRBDServer
// BusyBoxV1 is busybox:1.29
BusyBoxV1
// BusyBoxV2 is busybox:1.30
BusyBoxV2
// BusyBoxV3 is busybox:1.31
BusyBoxV3
// BusyBoxV4 is busybox:1.32
BusyBoxV4
// FailedImage is a fake image
FailedImage
)
func initImageConfigs() map[int]Config {
configs := map[int]Config{}
configs[Agnhost] = Config{promoterE2eRegistry, "agnhost", "2.21"}
configs[AgnhostPrivate] = Config{PrivateRegistry, "agnhost", "2.6"}
configs[AuthenticatedAlpine] = Config{gcAuthenticatedRegistry, "alpine", "3.7"}
configs[AuthenticatedWindowsNanoServer] = Config{gcAuthenticatedRegistry, "windows-nanoserver", "v1"}
configs[APIServer] = Config{e2eRegistry, "sample-apiserver", "1.17"}
configs[AppArmorLoader] = Config{e2eRegistry, "apparmor-loader", "1.0"}
configs[BusyBox] = Config{dockerLibraryRegistry, "busybox", "1.29"}
configs[CheckMetadataConcealment] = Config{promoterE2eRegistry, "metadata-concealment", "1.6"}
configs[CudaVectorAdd] = Config{e2eRegistry, "cuda-vector-add", "1.0"}
configs[CudaVectorAdd2] = Config{e2eRegistry, "cuda-vector-add", "2.0"}
configs[DebianIptables] = Config{buildImageRegistry, "debian-iptables", "buster-v1.6.5"}
configs[EchoServer] = Config{e2eRegistry, "echoserver", "2.2"}
configs[Etcd] = Config{gcRegistry, "etcd", "3.4.13-0"}
configs[GlusterDynamicProvisioner] = Config{dockerGluster, "glusterdynamic-provisioner", "v1.0"}
configs[Httpd] = Config{dockerLibraryRegistry, "httpd", "2.4.38-alpine"}
configs[HttpdNew] = Config{dockerLibraryRegistry, "httpd", "2.4.39-alpine"}
configs[InvalidRegistryImage] = Config{invalidRegistry, "alpine", "3.1"}
configs[IpcUtils] = Config{e2eRegistry, "ipc-utils", "1.0"}
configs[JessieDnsutils] = Config{e2eRegistry, "jessie-dnsutils", "1.0"}
configs[Kitten] = Config{e2eRegistry, "kitten", "1.0"}
configs[Nautilus] = Config{e2eRegistry, "nautilus", "1.0"}
configs[NFSProvisioner] = Config{sigStorageRegistry, "nfs-provisioner", "v2.2.2"}
configs[Nginx] = Config{dockerLibraryRegistry, "nginx", "1.14-alpine"}
configs[NginxNew] = Config{dockerLibraryRegistry, "nginx", "1.15-alpine"}
configs[Nonewprivs] = Config{e2eRegistry, "nonewprivs", "1.0"}
configs[NonRoot] = Config{e2eRegistry, "nonroot", "1.0"}
// Pause - when these values are updated, also update cmd/kubelet/app/options/container_runtime.go
configs[Pause] = Config{gcRegistry, "pause", "3.2"}
configs[Perl] = Config{dockerLibraryRegistry, "perl", "5.26"}
configs[PrometheusDummyExporter] = Config{gcRegistry, "prometheus-dummy-exporter", "v0.1.0"}
configs[PrometheusToSd] = Config{gcRegistry, "prometheus-to-sd", "v0.5.0"}
configs[Redis] = Config{dockerLibraryRegistry, "redis", "5.0.5-alpine"}
configs[RegressionIssue74839] = Config{e2eRegistry, "regression-issue-74839-amd64", "1.0"}
configs[ResourceConsumer] = Config{e2eRegistry, "resource-consumer", "1.5"}
configs[SdDummyExporter] = Config{gcRegistry, "sd-dummy-exporter", "v0.2.0"}
configs[VolumeNFSServer] = Config{e2eVolumeRegistry, "nfs", "1.0"}
configs[VolumeISCSIServer] = Config{e2eVolumeRegistry, "iscsi", "2.0"}
configs[VolumeGlusterServer] = Config{e2eVolumeRegistry, "gluster", "1.0"}
configs[VolumeRBDServer] = Config{e2eVolumeRegistry, "rbd", "1.0.1"}
// Rollout - when test kruise rollout e2e cases
configs[BusyBoxV1] = Config{dockerLibraryRegistry, "busybox", "1.29"}
configs[BusyBoxV2] = Config{dockerLibraryRegistry, "busybox", "1.30"}
configs[BusyBoxV3] = Config{dockerLibraryRegistry, "busybox", "1.31"}
configs[BusyBoxV4] = Config{dockerLibraryRegistry, "busybox", "1.32"}
configs[FailedImage] = Config{dockerLibraryRegistry, "failed", "never-started"}
return configs
}
// GetImageConfigs returns the map of imageConfigs
func GetImageConfigs() map[int]Config {
return imageConfigs
}
// GetConfig returns the Config object for an image
func GetConfig(image int) Config {
return imageConfigs[image]
}
// GetE2EImage returns the fully qualified URI to an image (including version)
func GetE2EImage(image int) string {
return fmt.Sprintf("%s/%s:%s", imageConfigs[image].registry, imageConfigs[image].name, imageConfigs[image].version)
}
// GetE2EImage returns the fully qualified URI to an image (including version)
func (i *Config) GetE2EImage() string {
return fmt.Sprintf("%s/%s:%s", i.registry, i.name, i.version)
}
// GetPauseImageName returns the pause image name with proper version
func GetPauseImageName() string {
return GetE2EImage(Pause)
}
// ReplaceRegistryInImageURL replaces the registry in the image URL with a custom one
func ReplaceRegistryInImageURL(imageURL string) (string, error) {
parts := strings.Split(imageURL, "/")
countParts := len(parts)
registryAndUser := strings.Join(parts[:countParts-1], "/")
switch registryAndUser {
case "gcr.io/kubernetes-e2e-test-images":
registryAndUser = e2eRegistry
case "gcr.io/kubernetes-e2e-test-images/volume":
registryAndUser = e2eVolumeRegistry
case "k8s.gcr.io":
registryAndUser = gcRegistry
case "k8s.gcr.io/sig-storage":
registryAndUser = sigStorageRegistry
case "gcr.io/k8s-authenticated-test":
registryAndUser = PrivateRegistry
case "gcr.io/google-samples":
registryAndUser = sampleRegistry
case "gcr.io/gke-release":
registryAndUser = gcrReleaseRegistry
case "docker.io/library":
registryAndUser = dockerLibraryRegistry
default:
if countParts == 1 {
// We assume we found an image from docker hub library
// e.g. openjdk -> docker.io/library/openjdk
registryAndUser = dockerLibraryRegistry
break
}
return "", fmt.Errorf("Registry: %s is missing in test/utils/image/manifest.go, please add the registry, otherwise the test will fail on air-gapped clusters", registryAndUser)
}
return fmt.Sprintf("%s/%s", registryAndUser, parts[countParts-1]), nil
}