Support lifecycle hooks for CloneSet (#362)

Signed-off-by: Siyu Wang <FillZpp.pub@gmail.com>
This commit is contained in:
Siyu Wang 2020-09-05 01:38:23 +08:00 committed by GitHub
parent fc284a3704
commit 1bc2c39a56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 850 additions and 125 deletions

View File

@ -69,6 +69,9 @@ type CloneSetSpec struct {
// without any of its container crashing, for it to be considered available.
// Defaults to 0 (pod will be considered available as soon as it is ready)
MinReadySeconds int32 `json:"minReadySeconds,omitempty"`
// Lifecycle defines the lifecycle hooks for Pods pre-delete, in-place update.
Lifecycle *Lifecycle `json:"lifecycle,omitempty"`
}
// CloneSetScaleStrategy defines strategies for pods scale.

View File

@ -0,0 +1,43 @@
/*
Copyright 2020 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
const (
LifecycleStateKey = "lifecycle.apps.kruise.io/state"
LifecycleTimestampKey = "lifecycle.apps.kruise.io/timestamp"
LifecycleStateNormal LifecycleStateType = "Normal"
LifecycleStatePreparingUpdate LifecycleStateType = "PreparingUpdate"
LifecycleStateUpdating LifecycleStateType = "Updating"
LifecycleStateUpdated LifecycleStateType = "Updated"
LifecycleStatePreparingDelete LifecycleStateType = "PreparingDelete"
)
type LifecycleStateType string
// Lifecycle contains the hooks for Pod lifecycle.
type Lifecycle struct {
// PreDelete is the hook before Pod to be deleted.
PreDelete *LifecycleHook `json:"preDelete,omitempty"`
// InPlaceUpdate is the hook before Pod to update and after Pod has been updated.
InPlaceUpdate *LifecycleHook `json:"inPlaceUpdate,omitempty"`
}
type LifecycleHook struct {
LabelsHandler map[string]string `json:"labelsHandler,omitempty"`
FinalizersHandler []string `json:"finalizersHandler,omitempty"`
}

View File

@ -6,4 +6,7 @@ const (
// SubSetNameLabelKey is used to record the name of current subset.
SubSetNameLabelKey = "apps.kruise.io/subset-name"
// SpecifiedDeleteKey indicates this object should be deleted, and the value could be the deletion option.
SpecifiedDeleteKey = "apps.kruise.io/specified-delete"
)

View File

@ -280,6 +280,11 @@ func (in *CloneSetSpec) DeepCopyInto(out *CloneSetSpec) {
*out = new(int32)
**out = **in
}
if in.Lifecycle != nil {
in, out := &in.Lifecycle, &out.Lifecycle
*out = new(Lifecycle)
(*in).DeepCopyInto(*out)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CloneSetSpec.
@ -921,6 +926,58 @@ func (in *JobCondition) DeepCopy() *JobCondition {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Lifecycle) DeepCopyInto(out *Lifecycle) {
*out = *in
if in.PreDelete != nil {
in, out := &in.PreDelete, &out.PreDelete
*out = new(LifecycleHook)
(*in).DeepCopyInto(*out)
}
if in.InPlaceUpdate != nil {
in, out := &in.InPlaceUpdate, &out.InPlaceUpdate
*out = new(LifecycleHook)
(*in).DeepCopyInto(*out)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Lifecycle.
func (in *Lifecycle) DeepCopy() *Lifecycle {
if in == nil {
return nil
}
out := new(Lifecycle)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LifecycleHook) DeepCopyInto(out *LifecycleHook) {
*out = *in
if in.LabelsHandler != nil {
in, out := &in.LabelsHandler, &out.LabelsHandler
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
if in.FinalizersHandler != nil {
in, out := &in.FinalizersHandler, &out.FinalizersHandler
*out = make([]string, len(*in))
copy(*out, *in)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LifecycleHook.
func (in *LifecycleHook) DeepCopy() *LifecycleHook {
if in == nil {
return nil
}
out := new(LifecycleHook)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ManualUpdate) DeepCopyInto(out *ManualUpdate) {
*out = *in

View File

@ -70,6 +70,33 @@ spec:
spec:
description: CloneSetSpec defines the desired state of CloneSet
properties:
lifecycle:
description: Lifecycle defines the lifecycle hooks for Pods pre-delete,
in-place update.
properties:
inPlaceUpdate:
properties:
finalizersHandler:
items:
type: string
type: array
labelsHandler:
additionalProperties:
type: string
type: object
type: object
preDelete:
properties:
finalizersHandler:
items:
type: string
type: array
labelsHandler:
additionalProperties:
type: string
type: object
type: object
type: object
minReadySeconds:
description: Minimum number of seconds for which a newly created pod
should be ready without any of its container crashing, for it to be

View File

@ -29,7 +29,6 @@ import (
scalecontrol "github.com/openkruise/kruise/pkg/controller/cloneset/scale"
updatecontrol "github.com/openkruise/kruise/pkg/controller/cloneset/update"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/util/expectations"
"github.com/openkruise/kruise/pkg/util/fieldindex"
"github.com/openkruise/kruise/pkg/util/gate"
historyutil "github.com/openkruise/kruise/pkg/util/history"
@ -60,9 +59,6 @@ func init() {
var (
concurrentReconciles = 3
scaleExpectations = expectations.NewScaleExpectations()
updateExpectations = expectations.NewUpdateExpectations(clonesetutils.GetPodRevision)
)
// Add creates a new CloneSet Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
@ -92,8 +88,8 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {
controllerHistory: historyutil.NewHistory(mgr.GetClient()),
revisionControl: revisioncontrol.NewRevisionControl(),
}
reconciler.scaleControl = scalecontrol.New(mgr.GetClient(), reconciler.recorder, scaleExpectations)
reconciler.updateControl = updatecontrol.New(mgr.GetClient(), reconciler.recorder, scaleExpectations, updateExpectations)
reconciler.scaleControl = scalecontrol.New(mgr.GetClient(), reconciler.recorder)
reconciler.updateControl = updatecontrol.New(mgr.GetClient(), reconciler.recorder)
reconciler.reconcileFunc = reconciler.doReconcile
return reconciler
}
@ -189,8 +185,8 @@ func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcil
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
klog.V(3).Infof("CloneSet %s has been deleted.", request)
scaleExpectations.DeleteExpectations(request.String())
updateExpectations.DeleteExpectations(request.String())
clonesetutils.ScaleExpectations.DeleteExpectations(request.String())
clonesetutils.UpdateExpectations.DeleteExpectations(request.String())
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
@ -210,7 +206,7 @@ func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcil
}
// If scaling expectations have not satisfied yet, just skip this reconcile.
if scaleSatisfied, scaleDirtyPods := scaleExpectations.SatisfiedExpectations(request.String()); !scaleSatisfied {
if scaleSatisfied, scaleDirtyPods := clonesetutils.ScaleExpectations.SatisfiedExpectations(request.String()); !scaleSatisfied {
klog.V(4).Infof("Not satisfied scale for %v, scaleDirtyPods=%v", request.String(), scaleDirtyPods)
return reconcile.Result{}, nil
}
@ -236,13 +232,20 @@ func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcil
// Refresh update expectations
for _, pod := range filteredPods {
updateExpectations.ObserveUpdated(request.String(), updateRevision.Name, pod)
clonesetutils.UpdateExpectations.ObserveUpdated(request.String(), updateRevision.Name, pod)
}
// If update expectations have not satisfied yet, just skip this reconcile.
if updateSatisfied, updateDirtyPods := updateExpectations.SatisfiedExpectations(request.String(), updateRevision.Name); !updateSatisfied {
if updateSatisfied, updateDirtyPods := clonesetutils.UpdateExpectations.SatisfiedExpectations(request.String(), updateRevision.Name); !updateSatisfied {
klog.V(4).Infof("Not satisfied update for %v, updateDirtyPods=%v", request.String(), updateDirtyPods)
return reconcile.Result{}, nil
}
// If resourceVersion expectations have not satisfied yet, just skip this reconcile
for _, pod := range filteredPods {
if !clonesetutils.ResourceVersionExpectations.IsSatisfied(pod) {
klog.V(4).Infof("Not satisfied resourceVersion for %v, wait for pod %v updating", request.String(), pod.Name)
return reconcile.Result{}, nil
}
}
newStatus := appsv1alpha1.CloneSetStatus{
ObservedGeneration: instance.Generation,

View File

@ -59,7 +59,7 @@ func (e *podEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimiting
return
}
klog.V(4).Infof("Pod %s/%s created, owner: %s", pod.Namespace, pod.Name, req.Name)
scaleExpectations.ObserveScale(req.String(), expectations.Create, pod.Name)
clonesetutils.ScaleExpectations.ObserveScale(req.String(), expectations.Create, pod.Name)
q.Add(*req)
return
}
@ -89,6 +89,7 @@ func (e *podEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimiting
// Two different versions of the same pod will always have different RVs.
return
}
clonesetutils.ResourceVersionExpectations.Observe(curPod)
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
if curPod.DeletionTimestamp != nil {
@ -150,6 +151,7 @@ func (e *podEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimiting
klog.Errorf("DeleteEvent parse pod failed, DeleteStateUnknown: %#v, obj: %#v", evt.DeleteStateUnknown, evt.Object)
return
}
clonesetutils.ResourceVersionExpectations.Delete(pod)
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
@ -162,7 +164,7 @@ func (e *podEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimiting
}
klog.V(4).Infof("Pod %s/%s deleted, owner: %s", pod.Namespace, pod.Name, req.Name)
scaleExpectations.ObserveScale(req.String(), expectations.Delete, pod.Name)
clonesetutils.ScaleExpectations.ObserveScale(req.String(), expectations.Delete, pod.Name)
q.Add(*req)
}
@ -240,7 +242,7 @@ func (e *pvcEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimiting
if controllerRef := metav1.GetControllerOf(pvc); controllerRef != nil {
if req := resoleControllerRef(pvc.Namespace, controllerRef); req != nil {
scaleExpectations.ObserveScale(req.String(), expectations.Create, pvc.Name)
clonesetutils.ScaleExpectations.ObserveScale(req.String(), expectations.Create, pvc.Name)
q.Add(*req)
}
}
@ -263,7 +265,7 @@ func (e *pvcEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimiting
if controllerRef := metav1.GetControllerOf(pvc); controllerRef != nil {
if req := resoleControllerRef(pvc.Namespace, controllerRef); req != nil {
scaleExpectations.ObserveScale(req.String(), expectations.Delete, pvc.Name)
clonesetutils.ScaleExpectations.ObserveScale(req.String(), expectations.Delete, pvc.Name)
q.Add(*req)
}
}

View File

@ -21,9 +21,9 @@ import (
"testing"
"time"
"github.com/openkruise/kruise/pkg/util/expectations"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/util/expectations"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/workqueue"
@ -172,9 +172,9 @@ func TestEnqueueRequestForPodCreate(t *testing.T) {
modifySatisfied := false
if testCase.alterExpectationCreationsKey != "" && len(testCase.alterExpectationCreationsAdds) > 0 {
for _, n := range testCase.alterExpectationCreationsAdds {
scaleExpectations.ExpectScale(testCase.alterExpectationCreationsKey, expectations.Create, n)
clonesetutils.ScaleExpectations.ExpectScale(testCase.alterExpectationCreationsKey, expectations.Create, n)
}
if ok, _ := scaleExpectations.SatisfiedExpectations(testCase.alterExpectationCreationsKey); ok {
if ok, _ := clonesetutils.ScaleExpectations.SatisfiedExpectations(testCase.alterExpectationCreationsKey); ok {
t.Fatalf("%s before execute, should not be satisfied", testCase.name)
}
modifySatisfied = true
@ -185,7 +185,7 @@ func TestEnqueueRequestForPodCreate(t *testing.T) {
t.Fatalf("%s failed, expected queue len %d, got queue len %d", testCase.name, testCase.expectedQueueLen, q.Len())
}
if modifySatisfied {
if ok, _ := scaleExpectations.SatisfiedExpectations(testCase.alterExpectationCreationsKey); !ok {
if ok, _ := clonesetutils.ScaleExpectations.SatisfiedExpectations(testCase.alterExpectationCreationsKey); !ok {
t.Fatalf("%s expected satisfied, but it is not", testCase.name)
}
}

View File

@ -11,6 +11,7 @@ import (
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/pkg/util/expectations"
"github.com/openkruise/kruise/pkg/util/lifecycle"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
@ -37,14 +38,13 @@ type Interface interface {
}
// New returns a scale control.
func New(c client.Client, recorder record.EventRecorder, exp expectations.ScaleExpectations) Interface {
return &realControl{Client: c, recorder: recorder, exp: exp}
func New(c client.Client, recorder record.EventRecorder) Interface {
return &realControl{Client: c, recorder: recorder}
}
type realControl struct {
client.Client
recorder record.EventRecorder
exp expectations.ScaleExpectations
}
func (r *realControl) Manage(
@ -63,13 +63,22 @@ func (r *realControl) Manage(
return false, nil
}
if podsToDelete := getPodsToDelete(updateCS, pods); len(podsToDelete) > 0 {
klog.V(3).Infof("CloneSet %s begin to delete pods in podsToDelete: %v", controllerKey, podsToDelete)
return r.deletePods(updateCS, podsToDelete, pvcs)
podsSpecifiedToDelete, podsInPreDelete := getPlannedDeletedPods(updateCS, pods)
podsToDelete := util.MergePods(podsSpecifiedToDelete, podsInPreDelete)
if len(podsToDelete) > 0 {
klog.V(3).Infof("CloneSet %s find pods %v specified to delete and pods %v in preDelete",
controllerKey, util.GetPodNames(podsSpecifiedToDelete).List(), util.GetPodNames(podsInPreDelete).List())
if modified, err := r.managePreparingDelete(updateCS, pods, podsInPreDelete, len(podsToDelete)); err != nil || modified {
return modified, err
}
if modified, err := r.deletePods(updateCS, podsToDelete, pvcs); err != nil || modified {
return modified, err
}
}
updatedPods, notUpdatedPods := clonesetutils.SplitPodsByRevision(pods, updateRevision)
diff, currentRevDiff := calculateDiffs(updateCS, updateRevision == currentRevision, len(pods), len(notUpdatedPods))
if diff < 0 {
@ -96,6 +105,11 @@ func (r *realControl) Manage(
currentCS, updateCS, currentRevision, updateRevision, availableIDs.List(), existingPVCNames)
} else if diff > 0 {
if len(podsToDelete) > 0 {
klog.V(3).Infof("CloneSet %s skip to scale in %d for existing pods to delete", controllerKey, diff)
return false, nil
}
klog.V(3).Infof("CloneSet %s begin to scale in %d pods including %d (current rev)",
controllerKey, diff, currentRevDiff)
@ -107,6 +121,30 @@ func (r *realControl) Manage(
return false, nil
}
func (r *realControl) managePreparingDelete(cs *appsv1alpha1.CloneSet, pods, podsInPreDelete []*v1.Pod, numToDelete int) (bool, error) {
diff := int(*cs.Spec.Replicas) - len(pods) + numToDelete
var modified bool
for _, pod := range podsInPreDelete {
if diff <= 0 {
return modified, nil
}
if isPodSpecifiedDelete(cs, pod) {
continue
}
klog.V(3).Infof("CloneSet %s patch pod %s lifecycle from PreparingDelete to Normal",
clonesetutils.GetControllerKey(cs), pod.Name)
if patched, err := lifecycle.PatchPodLifecycle(r, pod, appsv1alpha1.LifecycleStateNormal); err != nil {
return modified, err
} else if patched {
modified = true
clonesetutils.ResourceVersionExpectations.Expect(pod)
}
diff--
}
return modified, nil
}
func (r *realControl) createPods(
expectedCreations, expectedCurrentCreations int,
currentCS, updateCS *appsv1alpha1.CloneSet,
@ -123,7 +161,7 @@ func (r *realControl) createPods(
podsCreationChan := make(chan *v1.Pod, len(newPods))
for _, p := range newPods {
r.exp.ExpectScale(clonesetutils.GetControllerKey(updateCS), expectations.Create, p.Name)
clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(updateCS), expectations.Create, p.Name)
podsCreationChan <- p
}
@ -136,6 +174,7 @@ func (r *realControl) createPods(
if pod.Labels[apps.ControllerRevisionHashLabelKey] == currentRevision {
cs = currentCS
}
lifecycle.SetPodLifecycle(appsv1alpha1.LifecycleStateNormal)(pod)
var createErr error
if createErr = r.createOnePod(cs, pod, existingPVCNames); createErr != nil {
@ -151,7 +190,7 @@ func (r *realControl) createPods(
// rollback to ignore failure pods because the informer won't observe these pods
for _, pod := range newPods {
if _, ok := successPodNames.Load(pod.Name); !ok {
r.exp.ObserveScale(clonesetutils.GetControllerKey(updateCS), expectations.Create, pod.Name)
clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(updateCS), expectations.Create, pod.Name)
}
}
@ -167,9 +206,9 @@ func (r *realControl) createOnePod(cs *appsv1alpha1.CloneSet, pod *v1.Pod, exist
if existingPVCNames.Has(c.Name) {
continue
}
r.exp.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Create, c.Name)
clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Create, c.Name)
if err := r.Create(context.TODO(), &c); err != nil {
r.exp.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Create, c.Name)
clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Create, c.Name)
r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedCreate", "failed to create pvc: %v, pvc: %v", err, util.DumpJSON(c))
return err
}
@ -185,15 +224,27 @@ func (r *realControl) createOnePod(cs *appsv1alpha1.CloneSet, pod *v1.Pod, exist
}
func (r *realControl) deletePods(cs *appsv1alpha1.CloneSet, podsToDelete []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) (bool, error) {
var deleted bool
var modified bool
for _, pod := range podsToDelete {
r.exp.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pod.Name)
if err := r.Delete(context.TODO(), pod); err != nil {
r.exp.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pod.Name)
r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedDelete", "failed to delete pod %s: %v", pod.Name, err)
return deleted, err
if cs.Spec.Lifecycle != nil && lifecycle.IsPodHooked(cs.Spec.Lifecycle.PreDelete, pod) {
if patched, err := lifecycle.PatchPodLifecycle(r, pod, appsv1alpha1.LifecycleStatePreparingDelete); err != nil {
return false, err
} else if patched {
klog.V(3).Infof("CloneSet %s scaling patch pod %s lifecycle to PreparingDelete",
clonesetutils.GetControllerKey(cs), pod.Name)
modified = true
clonesetutils.ResourceVersionExpectations.Expect(pod)
}
continue
}
deleted = true
clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pod.Name)
if err := r.Delete(context.TODO(), pod); err != nil {
clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pod.Name)
r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedDelete", "failed to delete pod %s: %v", pod.Name, err)
return modified, err
}
modified = true
r.recorder.Event(cs, v1.EventTypeNormal, "SuccessfulDelete", fmt.Sprintf("succeed to delete pod %s", pod.Name))
// delete pvcs which have the same instance-id
@ -202,14 +253,14 @@ func (r *realControl) deletePods(cs *appsv1alpha1.CloneSet, podsToDelete []*v1.P
continue
}
r.exp.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
if err := r.Delete(context.TODO(), pvc); err != nil {
r.exp.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedDelete", "failed to delete pvc %s: %v", pvc.Name, err)
return deleted, err
return modified, err
}
}
}
return deleted, nil
return modified, nil
}

View File

@ -8,6 +8,7 @@ import (
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
clonesettest "github.com/openkruise/kruise/pkg/controller/cloneset/test"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/pkg/util/expectations"
apps "k8s.io/api/apps/v1"
@ -25,7 +26,6 @@ func newFakeControl() *realControl {
return &realControl{
Client: fake.NewFakeClient(),
recorder: record.NewFakeRecorder(10),
exp: expectations.NewScaleExpectations(),
}
}
@ -67,6 +67,7 @@ func TestCreatePods(t *testing.T) {
appsv1alpha1.CloneSetInstanceID: "id1",
apps.ControllerRevisionHashLabelKey: "revision-abc",
"foo": "bar",
appsv1alpha1.LifecycleStateKey: string(appsv1alpha1.LifecycleStateNormal),
},
OwnerReferences: []metav1.OwnerReference{
{
@ -122,6 +123,7 @@ func TestCreatePods(t *testing.T) {
appsv1alpha1.CloneSetInstanceID: "id3",
apps.ControllerRevisionHashLabelKey: "revision-xyz",
"foo": "bar",
appsv1alpha1.LifecycleStateKey: string(appsv1alpha1.LifecycleStateNormal),
},
OwnerReferences: []metav1.OwnerReference{
{
@ -178,6 +180,7 @@ func TestCreatePods(t *testing.T) {
appsv1alpha1.CloneSetInstanceID: "id4",
apps.ControllerRevisionHashLabelKey: "revision-xyz",
"foo": "bar",
appsv1alpha1.LifecycleStateKey: string(appsv1alpha1.LifecycleStateNormal),
},
OwnerReferences: []metav1.OwnerReference{
{
@ -226,11 +229,22 @@ func TestCreatePods(t *testing.T) {
},
},
}
for i := range expectedPods {
appsv1alpha1.SetDefaultPod(&expectedPods[i])
}
sort.Slice(pods.Items, func(i, j int) bool { return pods.Items[i].Name < pods.Items[j].Name })
if len(pods.Items) != len(expectedPods) {
t.Fatalf("expected pods \n%s\ngot pods\n%s", util.DumpJSON(expectedPods), util.DumpJSON(pods.Items))
}
for i := range expectedPods {
appsv1alpha1.SetDefaultPod(&expectedPods[i])
if v, ok := pods.Items[i].Annotations[appsv1alpha1.LifecycleTimestampKey]; ok {
if expectedPods[i].Annotations == nil {
expectedPods[i].Annotations = make(map[string]string)
}
expectedPods[i].Annotations[appsv1alpha1.LifecycleTimestampKey] = v
}
}
if !reflect.DeepEqual(expectedPods, pods.Items) {
t.Fatalf("expected pods \n%s\ngot pods\n%s", util.DumpJSON(expectedPods), util.DumpJSON(pods.Items))
}
@ -305,7 +319,7 @@ func TestCreatePods(t *testing.T) {
t.Fatalf("expected pvcs \n%s\ngot pvcs\n%s", util.DumpJSON(expectedPVCs), util.DumpJSON(pvcs.Items))
}
exp := ctrl.exp.GetExpectations("default/foo")
exp := clonesetutils.ScaleExpectations.GetExpectations("default/foo")
expectedExp := map[expectations.ScaleAction]sets.String{
expectations.Create: sets.NewString("foo-id1", "foo-id3", "foo-id4", "datadir-foo-id1", "datadir-foo-id4"),
}

View File

@ -4,6 +4,8 @@ import (
"sort"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/util/lifecycle"
"github.com/openkruise/kruise/pkg/util/specifieddelete"
v1 "k8s.io/api/core/v1"
intstrutil "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/rand"
@ -13,15 +15,30 @@ import (
"k8s.io/utils/integer"
)
func getPodsToDelete(cs *appsv1alpha1.CloneSet, pods []*v1.Pod) []*v1.Pod {
var podsToDelete []*v1.Pod
s := sets.NewString(cs.Spec.ScaleStrategy.PodsToDelete...)
for _, p := range pods {
if s.Has(p.Name) {
podsToDelete = append(podsToDelete, p)
func isPodSpecifiedDelete(cs *appsv1alpha1.CloneSet, pod *v1.Pod) bool {
if specifieddelete.IsSpecifiedDelete(pod) {
return true
}
for _, name := range cs.Spec.ScaleStrategy.PodsToDelete {
if name == pod.Name {
return true
}
}
return podsToDelete
return false
}
func getPlannedDeletedPods(cs *appsv1alpha1.CloneSet, pods []*v1.Pod) ([]*v1.Pod, []*v1.Pod) {
var podsSpecifiedToDelete []*v1.Pod
var podsInPreDelete []*v1.Pod
for _, pod := range pods {
if isPodSpecifiedDelete(cs, pod) {
podsSpecifiedToDelete = append(podsSpecifiedToDelete, pod)
}
if lifecycle.GetPodLifecycleState(pod) == appsv1alpha1.LifecycleStatePreparingDelete {
podsInPreDelete = append(podsInPreDelete, pod)
}
}
return podsSpecifiedToDelete, podsInPreDelete
}
// Get available IDs, if the a PVC exists but the corresponding pod does not exist, then reusing the ID, i.e., reuse the pvc.

View File

@ -17,7 +17,6 @@ limitations under the License.
package update
import (
"context"
"fmt"
"sort"
"time"
@ -25,9 +24,10 @@ import (
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
clonesetcore "github.com/openkruise/kruise/pkg/controller/cloneset/core"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/util/expectations"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
"github.com/openkruise/kruise/pkg/util/lifecycle"
"github.com/openkruise/kruise/pkg/util/requeueduration"
"github.com/openkruise/kruise/pkg/util/specifieddelete"
"github.com/openkruise/kruise/pkg/util/updatesort"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
@ -45,13 +45,11 @@ type Interface interface {
) (time.Duration, error)
}
func New(c client.Client, recorder record.EventRecorder, scaleExp expectations.ScaleExpectations, updateExp expectations.UpdateExpectations) Interface {
func New(c client.Client, recorder record.EventRecorder) Interface {
return &realControl{
inplaceControl: inplaceupdate.New(c, apps.ControllerRevisionHashLabelKey),
Client: c,
recorder: recorder,
scaleExp: scaleExp,
updateExp: updateExp,
}
}
@ -59,8 +57,6 @@ type realControl struct {
client.Client
inplaceControl inplaceupdate.Interface
recorder record.EventRecorder
scaleExp expectations.ScaleExpectations
updateExp expectations.UpdateExpectations
}
func (c *realControl) Manage(cs *appsv1alpha1.CloneSet,
@ -75,36 +71,51 @@ func (c *realControl) Manage(cs *appsv1alpha1.CloneSet,
return requeueDuration.Get(), nil
}
// 1. find currently updated and not-ready count and all pods waiting to update
// 1. refresh states for all pods
var modified bool
for _, pod := range pods {
patched, duration, err := c.refreshPodState(cs, coreControl, pod)
if err != nil {
return 0, err
} else if duration > 0 {
requeueDuration.Update(duration)
}
if patched {
modified = true
}
}
if modified {
return requeueDuration.Get(), nil
}
// 2. find currently updated and not-ready count and all pods waiting to update
var waitUpdateIndexes []int
for i := range pods {
if coreControl.IsPodUpdatePaused(pods[i]) {
continue
}
if res := c.inplaceControl.Refresh(pods[i], coreControl.GetUpdateOptions()); res.RefreshErr != nil {
klog.Errorf("CloneSet %s/%s failed to update pod %s condition for inplace: %v",
cs.Namespace, cs.Name, pods[i].Name, res.RefreshErr)
return requeueDuration.Get(), res.RefreshErr
} else if res.DelayDuration > 0 {
requeueDuration.Update(res.DelayDuration)
}
if clonesetutils.GetPodRevision(pods[i]) != updateRevision.Name {
waitUpdateIndexes = append(waitUpdateIndexes, i)
switch lifecycle.GetPodLifecycleState(pods[i]) {
case appsv1alpha1.LifecycleStatePreparingDelete, appsv1alpha1.LifecycleStateUpdated:
klog.V(3).Infof("CloneSet %s/%s find pod %s in state %s, so skip to update it",
cs.Namespace, cs.Name, pods[i].Name, lifecycle.GetPodLifecycleState(pods[i]))
default:
waitUpdateIndexes = append(waitUpdateIndexes, i)
}
}
}
// 2. sort all pods waiting to update
// 3. sort all pods waiting to update
waitUpdateIndexes = sortUpdateIndexes(coreControl, cs.Spec.UpdateStrategy, pods, waitUpdateIndexes)
// 3. calculate max count of pods can update
// 4. calculate max count of pods can update
needToUpdateCount := calculateUpdateCount(coreControl, cs.Spec.UpdateStrategy, cs.Spec.MinReadySeconds, int(*cs.Spec.Replicas), waitUpdateIndexes, pods)
if needToUpdateCount < len(waitUpdateIndexes) {
waitUpdateIndexes = waitUpdateIndexes[:needToUpdateCount]
}
// 4. update pods
// 5. update pods
for _, idx := range waitUpdateIndexes {
pod := pods[idx]
if duration, err := c.updatePod(cs, coreControl, updateRevision, revisions, pod, pvcs); err != nil {
@ -117,6 +128,49 @@ func (c *realControl) Manage(cs *appsv1alpha1.CloneSet,
return requeueDuration.Get(), nil
}
func (c *realControl) refreshPodState(cs *appsv1alpha1.CloneSet, coreControl clonesetcore.Control, pod *v1.Pod) (bool, time.Duration, error) {
opts := coreControl.GetUpdateOptions()
res := c.inplaceControl.Refresh(pod, opts)
if res.RefreshErr != nil {
klog.Errorf("CloneSet %s/%s failed to update pod %s condition for inplace: %v",
cs.Namespace, cs.Name, pod.Name, res.RefreshErr)
return false, 0, res.RefreshErr
}
var state appsv1alpha1.LifecycleStateType
switch lifecycle.GetPodLifecycleState(pod) {
case appsv1alpha1.LifecycleStateUpdating:
checkFunc := inplaceupdate.CheckInPlaceUpdateCompleted
if opts != nil && opts.CustomizeCheckUpdateCompleted != nil {
checkFunc = opts.CustomizeCheckUpdateCompleted
}
if checkFunc(pod) == nil {
if cs.Spec.Lifecycle != nil && !lifecycle.IsPodHooked(cs.Spec.Lifecycle.InPlaceUpdate, pod) {
state = appsv1alpha1.LifecycleStateUpdated
} else {
state = appsv1alpha1.LifecycleStateNormal
}
}
case appsv1alpha1.LifecycleStateUpdated:
if cs.Spec.Lifecycle == nil || lifecycle.IsPodHooked(cs.Spec.Lifecycle.InPlaceUpdate, pod) {
state = appsv1alpha1.LifecycleStateNormal
}
}
if state != "" {
if patched, err := lifecycle.PatchPodLifecycle(c, pod, state); err != nil {
return false, 0, err
} else if patched {
clonesetutils.ResourceVersionExpectations.Expect(pod)
klog.V(3).Infof("CloneSet %s patch pod %s lifecycle to %s",
clonesetutils.GetControllerKey(cs), pod.Name, state)
return true, res.DelayDuration, nil
}
}
return false, res.DelayDuration, nil
}
func sortUpdateIndexes(coreControl clonesetcore.Control, strategy appsv1alpha1.CloneSetUpdateStrategy, pods []*v1.Pod, waitUpdateIndexes []int) []int {
// Sort Pods with default sequence
sort.Slice(waitUpdateIndexes, coreControl.GetPodsSortFunc(pods, waitUpdateIndexes))
@ -127,6 +181,16 @@ func sortUpdateIndexes(coreControl clonesetcore.Control, strategy appsv1alpha1.C
if strategy.ScatterStrategy != nil {
waitUpdateIndexes = updatesort.NewScatterSorter(strategy.ScatterStrategy).Sort(pods, waitUpdateIndexes)
}
// PreparingUpdate first
sort.Slice(waitUpdateIndexes, func(i, j int) bool {
preparingUpdateI := lifecycle.GetPodLifecycleState(pods[waitUpdateIndexes[i]]) == appsv1alpha1.LifecycleStatePreparingUpdate
preparingUpdateJ := lifecycle.GetPodLifecycleState(pods[waitUpdateIndexes[j]]) == appsv1alpha1.LifecycleStatePreparingUpdate
if preparingUpdateI != preparingUpdateJ {
return preparingUpdateI
}
return false
})
return waitUpdateIndexes
}
@ -152,12 +216,12 @@ func calculateUpdateCount(coreControl clonesetcore.Control, strategy appsv1alpha
var notReadyCount, updateCount int
for _, p := range pods {
if !coreControl.IsPodUpdateReady(p, minReadySeconds) {
if !isPodReady(coreControl, p, minReadySeconds) {
notReadyCount++
}
}
for _, i := range waitUpdateIndexes {
if coreControl.IsPodUpdateReady(pods[i], minReadySeconds) {
if isPodReady(coreControl, pods[i], minReadySeconds) {
if notReadyCount >= (maxUnavailable + usedSurge) {
break
} else {
@ -170,6 +234,14 @@ func calculateUpdateCount(coreControl clonesetcore.Control, strategy appsv1alpha
return updateCount
}
func isPodReady(coreControl clonesetcore.Control, pod *v1.Pod, minReadySeconds int32) bool {
state := lifecycle.GetPodLifecycleState(pod)
if state != "" && state != appsv1alpha1.LifecycleStateNormal {
return false
}
return coreControl.IsPodUpdateReady(pod, minReadySeconds)
}
func (c *realControl) updatePod(cs *appsv1alpha1.CloneSet, coreControl clonesetcore.Control,
updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
pod *v1.Pod, pvcs []*v1.PersistentVolumeClaim,
@ -185,51 +257,72 @@ func (c *realControl) updatePod(cs *appsv1alpha1.CloneSet, coreControl clonesetc
}
}
res := c.inplaceControl.Update(pod, oldRevision, updateRevision, coreControl.GetUpdateOptions())
if res.InPlaceUpdate {
if res.UpdateErr == nil {
c.recorder.Eventf(cs, v1.EventTypeNormal, "SuccessfulUpdatePodInPlace", "successfully update pod %s in-place", pod.Name)
c.updateExp.ExpectUpdated(clonesetutils.GetControllerKey(cs), updateRevision.Name, pod)
return res.DelayDuration, nil
if c.inplaceControl.CanUpdateInPlace(oldRevision, updateRevision, coreControl.GetUpdateOptions()) {
if cs.Spec.Lifecycle != nil && lifecycle.IsPodHooked(cs.Spec.Lifecycle.InPlaceUpdate, pod) {
if patched, err := lifecycle.PatchPodLifecycle(c, pod, appsv1alpha1.LifecycleStatePreparingUpdate); err != nil {
return 0, err
} else if patched {
clonesetutils.ResourceVersionExpectations.Expect(pod)
klog.V(3).Infof("CloneSet %s patch pod %s lifecycle to PreparingUpdate",
clonesetutils.GetControllerKey(cs), pod.Name)
}
return 0, nil
}
c.recorder.Eventf(cs, v1.EventTypeWarning, "FailedUpdatePodInPlace", "failed to update pod %s in-place: %v", pod.Name, res.UpdateErr)
return res.DelayDuration, res.UpdateErr
opts := coreControl.GetUpdateOptions()
opts.AdditionalFuncs = append(opts.AdditionalFuncs, lifecycle.SetPodLifecycle(appsv1alpha1.LifecycleStateUpdating))
res := c.inplaceControl.Update(pod, oldRevision, updateRevision, opts)
if res.InPlaceUpdate {
if res.UpdateErr == nil {
c.recorder.Eventf(cs, v1.EventTypeNormal, "SuccessfulUpdatePodInPlace", "successfully update pod %s in-place(revision %v)", pod.Name, updateRevision.Name)
clonesetutils.UpdateExpectations.ExpectUpdated(clonesetutils.GetControllerKey(cs), updateRevision.Name, pod)
return res.DelayDuration, nil
}
c.recorder.Eventf(cs, v1.EventTypeWarning, "FailedUpdatePodInPlace", "failed to update pod %s in-place(revision %v): %v", pod.Name, updateRevision.Name, res.UpdateErr)
return res.DelayDuration, res.UpdateErr
}
}
if cs.Spec.UpdateStrategy.Type == appsv1alpha1.InPlaceOnlyCloneSetUpdateStrategyType {
return res.DelayDuration, fmt.Errorf("find Pod %s update strategy is InPlaceOnly but can not update in-place", pod.Name)
return 0, fmt.Errorf("find Pod %s update strategy is InPlaceOnly but can not update in-place", pod.Name)
}
klog.Warningf("CloneSet %s/%s can not update Pod %s in-place, so it will back off to ReCreate", cs.Namespace, cs.Name, pod.Name)
}
klog.V(2).Infof("CloneSet %s/%s deleting Pod %s for update %s", cs.Namespace, cs.Name, pod.Name, updateRevision.Name)
klog.V(2).Infof("CloneSet %s/%s start to patch Pod %s specified-delete for update %s", cs.Namespace, cs.Name, pod.Name, updateRevision.Name)
c.scaleExp.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pod.Name)
if err := c.Delete(context.TODO(), pod); err != nil {
c.scaleExp.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pod.Name)
if patched, err := specifieddelete.PatchPodSpecifiedDelete(c, pod, "true"); err != nil {
c.recorder.Eventf(cs, v1.EventTypeWarning, "FailedUpdatePodReCreate",
"failed to delete pod %s for update: %v", pod.Name, err)
"failed to patch pod specified-delete %s for update(revision %s): %v", pod.Name, updateRevision.Name, err)
return 0, err
} else if patched {
clonesetutils.ResourceVersionExpectations.Expect(pod)
}
// TODO(FillZpp): add a strategy controlling if the PVCs of this pod should be deleted
for _, pvc := range pvcs {
if pvc.Labels[appsv1alpha1.CloneSetInstanceID] != pod.Labels[appsv1alpha1.CloneSetInstanceID] {
continue
}
c.scaleExp.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
if err := c.Delete(context.TODO(), pvc); err != nil {
c.scaleExp.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
c.recorder.Eventf(cs, v1.EventTypeWarning, "FailedDelete", "failed to delete pvc %s: %v", pvc.Name, err)
return 0, err
}
}
//clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pod.Name)
//if err := c.Delete(context.TODO(), pod); err != nil {
// clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pod.Name)
// c.recorder.Eventf(cs, v1.EventTypeWarning, "FailedUpdatePodReCreate",
// "failed to delete pod %s for update: %v", pod.Name, err)
// return 0, err
//}
//
//// TODO(FillZpp): add a strategy controlling if the PVCs of this pod should be deleted
//for _, pvc := range pvcs {
// if pvc.Labels[appsv1alpha1.CloneSetInstanceID] != pod.Labels[appsv1alpha1.CloneSetInstanceID] {
// continue
// }
//
// clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
// if err := c.Delete(context.TODO(), pvc); err != nil {
// clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
// c.recorder.Eventf(cs, v1.EventTypeWarning, "FailedDelete", "failed to delete pvc %s: %v", pvc.Name, err)
// return 0, err
// }
//}
c.recorder.Eventf(cs, v1.EventTypeNormal, "SuccessfulUpdatePodReCreate",
"successfully delete pod %s for update", pod.Name)
"successfully patch pod %s specified-delete for update(revision %s)", pod.Name, updateRevision.Name)
return 0, nil
}

View File

@ -25,9 +25,7 @@ import (
"github.com/openkruise/kruise/apis"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
clonesetcore "github.com/openkruise/kruise/pkg/controller/cloneset/core"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/pkg/util/expectations"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
@ -150,9 +148,24 @@ func TestMange(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "pvc-1", Labels: map[string]string{appsv1alpha1.CloneSetInstanceID: "id-1"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "pvc-2", Labels: map[string]string{appsv1alpha1.CloneSetInstanceID: "id-0"}}},
},
expectedPods: []*v1.Pod{},
expectedPods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod-0", ResourceVersion: "1", Labels: map[string]string{
apps.ControllerRevisionHashLabelKey: "rev-old",
appsv1alpha1.CloneSetInstanceID: "id-0",
appsv1alpha1.SpecifiedDeleteKey: "true",
}},
Spec: v1.PodSpec{ReadinessGates: []v1.PodReadinessGate{{ConditionType: appsv1alpha1.InPlaceUpdateReady}}},
Status: v1.PodStatus{Phase: v1.PodRunning, Conditions: []v1.PodCondition{
{Type: v1.PodReady, Status: v1.ConditionTrue},
{Type: appsv1alpha1.InPlaceUpdateReady, Status: v1.ConditionTrue},
}},
},
},
expectedPVCs: []*v1.PersistentVolumeClaim{
{ObjectMeta: metav1.ObjectMeta{Name: "pvc-0", Labels: map[string]string{appsv1alpha1.CloneSetInstanceID: "id-0"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "pvc-1", Labels: map[string]string{appsv1alpha1.CloneSetInstanceID: "id-1"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "pvc-2", Labels: map[string]string{appsv1alpha1.CloneSetInstanceID: "id-0"}}},
},
},
{
@ -196,9 +209,31 @@ func TestMange(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "pvc-1", Labels: map[string]string{appsv1alpha1.CloneSetInstanceID: "id-1"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "pvc-2", Labels: map[string]string{appsv1alpha1.CloneSetInstanceID: "id-0"}}},
},
expectedPods: []*v1.Pod{},
expectedPods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod-0", ResourceVersion: "1", Labels: map[string]string{
apps.ControllerRevisionHashLabelKey: "rev-old",
appsv1alpha1.CloneSetInstanceID: "id-0",
appsv1alpha1.SpecifiedDeleteKey: "true",
}},
Spec: v1.PodSpec{
ReadinessGates: []v1.PodReadinessGate{{ConditionType: appsv1alpha1.InPlaceUpdateReady}},
Containers: []v1.Container{{Name: "c1", Image: "foo1"}},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{Type: v1.PodReady, Status: v1.ConditionTrue},
{Type: appsv1alpha1.InPlaceUpdateReady, Status: v1.ConditionTrue},
},
ContainerStatuses: []v1.ContainerStatus{{Name: "c1", ImageID: "image-id-xyz"}},
},
},
},
expectedPVCs: []*v1.PersistentVolumeClaim{
{ObjectMeta: metav1.ObjectMeta{Name: "pvc-0", Labels: map[string]string{appsv1alpha1.CloneSetInstanceID: "id-0"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "pvc-1", Labels: map[string]string{appsv1alpha1.CloneSetInstanceID: "id-1"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "pvc-2", Labels: map[string]string{appsv1alpha1.CloneSetInstanceID: "id-0"}}},
},
},
{
@ -245,7 +280,11 @@ func TestMange(t *testing.T) {
expectedPods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod-0",
Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "rev-new", appsv1alpha1.CloneSetInstanceID: "id-0"},
Labels: map[string]string{
apps.ControllerRevisionHashLabelKey: "rev-new",
appsv1alpha1.CloneSetInstanceID: "id-0",
appsv1alpha1.LifecycleStateKey: string(appsv1alpha1.LifecycleStateUpdating),
},
Annotations: map[string]string{appsv1alpha1.InPlaceUpdateStateKey: util.DumpJSON(appsv1alpha1.InPlaceUpdateState{
Revision: "rev-new",
UpdateTimestamp: now,
@ -317,7 +356,11 @@ func TestMange(t *testing.T) {
expectedPods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod-0",
Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "rev-new", appsv1alpha1.CloneSetInstanceID: "id-0"},
Labels: map[string]string{
apps.ControllerRevisionHashLabelKey: "rev-new",
appsv1alpha1.CloneSetInstanceID: "id-0",
appsv1alpha1.LifecycleStateKey: string(appsv1alpha1.LifecycleStateUpdating),
},
Annotations: map[string]string{
appsv1alpha1.InPlaceUpdateStateKey: util.DumpJSON(appsv1alpha1.InPlaceUpdateState{
Revision: "rev-new",
@ -399,7 +442,10 @@ func TestMange(t *testing.T) {
expectedPods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod-0",
Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "rev-new", appsv1alpha1.CloneSetInstanceID: "id-0"},
Labels: map[string]string{
apps.ControllerRevisionHashLabelKey: "rev-new",
appsv1alpha1.CloneSetInstanceID: "id-0",
},
Annotations: map[string]string{
appsv1alpha1.InPlaceUpdateStateKey: util.DumpJSON(appsv1alpha1.InPlaceUpdateState{
Revision: "rev-new",
@ -480,7 +526,10 @@ func TestMange(t *testing.T) {
expectedPods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod-0",
Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "rev-new", appsv1alpha1.CloneSetInstanceID: "id-0"},
Labels: map[string]string{
apps.ControllerRevisionHashLabelKey: "rev-new",
appsv1alpha1.CloneSetInstanceID: "id-0",
},
Annotations: map[string]string{
appsv1alpha1.InPlaceUpdateStateKey: util.DumpJSON(appsv1alpha1.InPlaceUpdateState{
Revision: "rev-new",
@ -519,8 +568,6 @@ func TestMange(t *testing.T) {
fakeClient,
inplaceupdate.NewForTest(fakeClient, apps.ControllerRevisionHashLabelKey, func() metav1.Time { return now }),
record.NewFakeRecorder(10),
expectations.NewScaleExpectations(),
expectations.NewUpdateExpectations(clonesetutils.GetPodRevision),
}
if _, err := ctrl.Manage(mc.cs, mc.updateRevision, mc.revisions, mc.pods, mc.pvcs); err != nil {
t.Fatalf("Failed to test %s, manage error: %v", mc.name, err)
@ -541,6 +588,14 @@ func TestMange(t *testing.T) {
if err := ctrl.Client.Get(context.TODO(), types.NamespacedName{Namespace: p.Namespace, Name: p.Name}, gotPod); err != nil {
t.Fatalf("Failed to test %s, get pod %s error: %v", mc.name, p.Name, err)
}
if v, ok := gotPod.Annotations[appsv1alpha1.LifecycleTimestampKey]; ok {
if p.Annotations == nil {
p.Annotations = map[string]string{}
}
p.Annotations[appsv1alpha1.LifecycleTimestampKey] = v
}
if !reflect.DeepEqual(gotPod, p) {
t.Fatalf("Failed to test %s, unexpected pod %s, expected \n%v\n got \n%v", mc.name, p.Name, util.DumpJSON(p), util.DumpJSON(gotPod))
}

View File

@ -22,6 +22,7 @@ import (
"sync"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/util/expectations"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -33,8 +34,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)
// ControllerKind is GroupVersionKind for CloneSet.
var ControllerKind = appsv1alpha1.SchemeGroupVersion.WithKind("CloneSet")
var (
// ControllerKind is GroupVersionKind for CloneSet.
ControllerKind = appsv1alpha1.SchemeGroupVersion.WithKind("CloneSet")
ScaleExpectations = expectations.NewScaleExpectations()
UpdateExpectations = expectations.NewUpdateExpectations(GetPodRevision)
ResourceVersionExpectations = expectations.NewResourceVersionExpectation()
)
// GetControllerKey return key of CloneSet.
func GetControllerKey(cs *appsv1alpha1.CloneSet) string {

View File

@ -0,0 +1,91 @@
/*
Copyright 2020 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package expectations
import (
"strconv"
"sync"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
type ResourceVersionExpectation interface {
Expect(obj metav1.Object)
Observe(obj metav1.Object)
IsSatisfied(obj metav1.Object) bool
Delete(obj metav1.Object)
}
func NewResourceVersionExpectation() ResourceVersionExpectation {
return &realResourceVersionExpectation{objectVersions: make(map[types.UID]string, 100)}
}
type realResourceVersionExpectation struct {
sync.RWMutex
objectVersions map[types.UID]string
}
func (r *realResourceVersionExpectation) Expect(obj metav1.Object) {
r.Lock()
defer r.Unlock()
if isResourceVersionNewer(r.objectVersions[obj.GetUID()], obj.GetResourceVersion()) {
r.objectVersions[obj.GetUID()] = obj.GetResourceVersion()
}
}
func (r *realResourceVersionExpectation) Observe(obj metav1.Object) {
r.Lock()
defer r.Unlock()
if isResourceVersionNewer(r.objectVersions[obj.GetUID()], obj.GetResourceVersion()) {
delete(r.objectVersions, obj.GetUID())
}
}
func (r *realResourceVersionExpectation) IsSatisfied(obj metav1.Object) bool {
r.RLock()
defer r.RUnlock()
if isResourceVersionNewer(r.objectVersions[obj.GetUID()], obj.GetResourceVersion()) {
delete(r.objectVersions, obj.GetUID())
}
_, existing := r.objectVersions[obj.GetUID()]
return !existing
}
func (r *realResourceVersionExpectation) Delete(obj metav1.Object) {
r.Lock()
defer r.Unlock()
delete(r.objectVersions, obj.GetUID())
}
func isResourceVersionNewer(old, new string) bool {
if len(old) == 0 {
return true
}
oldCount, err := strconv.ParseUint(old, 10, 64)
if err != nil {
return true
}
newCount, err := strconv.ParseUint(new, 10, 64)
if err != nil {
return false
}
return newCount >= oldCount
}

View File

@ -0,0 +1,68 @@
/*
Copyright 2020 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package expectations
import (
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestResourceVersionExpectation(t *testing.T) {
cases := []struct {
expect *v1.Pod
observe *v1.Pod
isSatisfied *v1.Pod
result bool
}{
{
expect: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
observe: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
isSatisfied: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
result: false,
},
{
expect: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
observe: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
isSatisfied: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
result: true,
},
{
expect: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
observe: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
isSatisfied: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
result: true,
},
{
expect: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
observe: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
isSatisfied: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
result: true,
},
}
for i, testCase := range cases {
c := NewResourceVersionExpectation()
c.Expect(testCase.expect)
c.Observe(testCase.observe)
got := c.IsSatisfied(testCase.isSatisfied)
if got != testCase.result {
t.Fatalf("#%d expected %v, got %v", i, testCase.result, got)
}
}
}

View File

@ -48,6 +48,7 @@ type (
type UpdateOptions struct {
GracePeriodSeconds int32
AdditionalFuncs []func(*v1.Pod)
CustomizeSpecCalculate CustomizeSpecCalculateFunc
CustomizeSpecPatch CustomizeSpecPatchFunc
@ -69,6 +70,7 @@ type UpdateResult struct {
// Interface for managing pods in-place update.
type Interface interface {
Refresh(pod *v1.Pod, opts *UpdateOptions) RefreshResult
CanUpdateInPlace(oldRevision, newRevision *apps.ControllerRevision, opts *UpdateOptions) bool
Update(pod *v1.Pod, oldRevision, newRevision *apps.ControllerRevision, opts *UpdateOptions) UpdateResult
}
@ -215,14 +217,13 @@ func (c *realControl) finishGracePeriod(pod *v1.Pod, opts *UpdateOptions) (time.
return delayDuration, err
}
func (c *realControl) CanUpdateInPlace(oldRevision, newRevision *apps.ControllerRevision, opts *UpdateOptions) bool {
return calculateInPlaceUpdateSpec(oldRevision, newRevision, opts) != nil
}
func (c *realControl) Update(pod *v1.Pod, oldRevision, newRevision *apps.ControllerRevision, opts *UpdateOptions) UpdateResult {
// 1. calculate inplace update spec
var spec *UpdateSpec
if opts == nil || opts.CustomizeSpecCalculate == nil {
spec = calculateInPlaceUpdateSpec(oldRevision, newRevision, opts)
} else {
spec = opts.CustomizeSpecCalculate(oldRevision, newRevision)
}
spec := calculateInPlaceUpdateSpec(oldRevision, newRevision, opts)
if spec == nil {
return UpdateResult{}
}
@ -271,6 +272,11 @@ func (c *realControl) updatePodInPlace(pod *v1.Pod, spec *UpdateSpec, opts *Upda
if clone.Annotations == nil {
clone.Annotations = map[string]string{}
}
if opts != nil {
for _, f := range opts.AdditionalFuncs {
f(clone)
}
}
// record old containerStatuses
inPlaceUpdateState := appsv1alpha1.InPlaceUpdateState{
@ -331,6 +337,10 @@ func patchUpdateSpecToPod(pod *v1.Pod, spec *UpdateSpec, opts *UpdateOptions) (*
// If the diff just contains replace operation of spec.containers[x].image, it will returns an UpdateSpec.
// Otherwise, it returns nil which means can not use in-place update.
func calculateInPlaceUpdateSpec(oldRevision, newRevision *apps.ControllerRevision, opts *UpdateOptions) *UpdateSpec {
if opts != nil && opts.CustomizeSpecCalculate != nil {
return opts.CustomizeSpecCalculate(oldRevision, newRevision)
}
if oldRevision == nil || newRevision == nil {
return nil
}
@ -419,10 +429,16 @@ func CheckInPlaceUpdateCompleted(pod *v1.Pod) error {
pod.Labels[apps.StatefulSetRevisionLabel], inPlaceUpdateState.Revision)
}
containerImages := make(map[string]string, len(pod.Spec.Containers))
for i := range pod.Spec.Containers {
c := &pod.Spec.Containers[i]
containerImages[c.Name] = c.Image
}
for _, cs := range pod.Status.ContainerStatuses {
if oldStatus, ok := inPlaceUpdateState.LastContainerStatuses[cs.Name]; ok {
// TODO: we assume that users should not update workload template with new image which actually has the same imageID as the old image
if oldStatus.ImageID == cs.ImageID {
if oldStatus.ImageID == cs.ImageID && containerImages[cs.Name] != cs.Image {
return fmt.Errorf("container %s imageID not changed", cs.Name)
}
delete(inPlaceUpdateState.LastContainerStatuses, cs.Name)

View File

@ -191,6 +191,7 @@ func TestCheckInPlaceUpdateCompleted(t *testing.T) {
{
Name: "c1",
ImageID: "img01",
Image: "image01",
},
},
},

View File

@ -0,0 +1,77 @@
/*
Copyright 2020 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package lifecycle
import (
"fmt"
"time"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
func GetPodLifecycleState(pod *v1.Pod) appsv1alpha1.LifecycleStateType {
return appsv1alpha1.LifecycleStateType(pod.Labels[appsv1alpha1.LifecycleStateKey])
}
func SetPodLifecycle(state appsv1alpha1.LifecycleStateType) func(*v1.Pod) {
return func(pod *v1.Pod) {
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Labels[appsv1alpha1.LifecycleStateKey] = string(state)
pod.Annotations[appsv1alpha1.LifecycleTimestampKey] = time.Now().Format(time.RFC3339)
}
}
func PatchPodLifecycle(c client.Client, pod *v1.Pod, state appsv1alpha1.LifecycleStateType) (bool, error) {
if GetPodLifecycleState(pod) == state {
return false, nil
}
body := fmt.Sprintf(
`{"metadata":{"labels":{"%s":"%s"},"annotations":{"%s":"%s"}}}`,
appsv1alpha1.LifecycleStateKey,
string(state),
appsv1alpha1.LifecycleTimestampKey,
time.Now().Format(time.RFC3339),
)
return true, c.Patch(nil, pod, client.RawPatch(types.StrategicMergePatchType, []byte(body)))
}
func IsPodHooked(hook *appsv1alpha1.LifecycleHook, pod *v1.Pod) bool {
if hook == nil || pod == nil {
return false
}
for _, f := range hook.FinalizersHandler {
if controllerutil.ContainsFinalizer(pod, f) {
return true
}
}
for k, v := range hook.LabelsHandler {
if pod.Labels[k] == v {
return true
}
}
return false
}

51
pkg/util/pods.go Normal file
View File

@ -0,0 +1,51 @@
/*
Copyright 2020 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
)
// GetPodNames returns names of the given Pods array
func GetPodNames(pods []*v1.Pod) sets.String {
set := sets.NewString()
for _, pod := range pods {
set.Insert(pod.Name)
}
return set
}
// MergePods merges two pods arrays
func MergePods(pods1, pods2 []*v1.Pod) []*v1.Pod {
var ret []*v1.Pod
names := sets.NewString()
for _, pod := range pods1 {
if !names.Has(pod.Name) {
ret = append(ret, pod)
names.Insert(pod.Name)
}
}
for _, pod := range pods2 {
if !names.Has(pod.Name) {
ret = append(ret, pod)
names.Insert(pod.Name)
}
}
return ret
}

View File

@ -0,0 +1,45 @@
/*
Copyright 2020 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package specifieddelete
import (
"fmt"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
func IsSpecifiedDelete(obj metav1.Object) bool {
_, ok := obj.GetLabels()[appsv1alpha1.SpecifiedDeleteKey]
return ok
}
func PatchPodSpecifiedDelete(c client.Client, pod *v1.Pod, value string) (bool, error) {
if _, ok := pod.Labels[appsv1alpha1.SpecifiedDeleteKey]; ok {
return false, nil
}
body := fmt.Sprintf(
`{"metadata":{"labels":{"%s":"%s"}}}`,
appsv1alpha1.SpecifiedDeleteKey,
value,
)
return true, c.Patch(nil, pod, client.RawPatch(types.StrategicMergePatchType, []byte(body)))
}

View File

@ -159,8 +159,9 @@ func (h *CloneSetCreateUpdateHandler) validateCloneSetUpdate(cloneSet, oldCloneS
clone.Spec.ScaleStrategy = oldCloneSet.Spec.ScaleStrategy
clone.Spec.UpdateStrategy = oldCloneSet.Spec.UpdateStrategy
clone.Spec.MinReadySeconds = oldCloneSet.Spec.MinReadySeconds
clone.Spec.Lifecycle = oldCloneSet.Spec.Lifecycle
if !reflect.DeepEqual(clone.Spec, oldCloneSet.Spec) {
allErrs = append(allErrs, field.Forbidden(field.NewPath("spec"), "updates to cloneset spec for fields other than 'replicas', 'template', 'scaleStrategy', and 'updateStrategy' are forbidden"))
allErrs = append(allErrs, field.Forbidden(field.NewPath("spec"), "updates to cloneset spec for fields other than 'replicas', 'template', 'lifecycle', 'scaleStrategy', and 'updateStrategy' are forbidden"))
}
coreControl := clonesetcore.New(cloneSet)