mirror of https://github.com/openkruise/kruise.git
opt sts updateStatefulSet (#1648)
Signed-off-by: Abner-1 <yuanyuxing.yyx@alibaba-inc.com>
This commit is contained in:
parent
837b67192d
commit
0f6aada9d3
|
|
@ -20,6 +20,13 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
|
||||
appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
|
||||
clonesetcore "github.com/openkruise/kruise/pkg/controller/cloneset/core"
|
||||
|
|
@ -27,12 +34,6 @@ import (
|
|||
sidecarsetcontroller "github.com/openkruise/kruise/pkg/controller/sidecarset"
|
||||
statefulsetcontroller "github.com/openkruise/kruise/pkg/controller/statefulset"
|
||||
"github.com/openkruise/kruise/pkg/util/updatesort"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
// SortPods sorts the given Pods according the owner workload logic.
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
|
|
@ -32,6 +33,7 @@ import (
|
|||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/controller/history"
|
||||
"k8s.io/utils/integer"
|
||||
utilpointer "k8s.io/utils/pointer"
|
||||
|
||||
appspub "github.com/openkruise/kruise/apis/apps/pub"
|
||||
|
|
@ -305,6 +307,39 @@ func (ssc *defaultStatefulSetControl) getStatefulSetRevisions(
|
|||
return currentRevision, updateRevision, collisionCount, nil
|
||||
}
|
||||
|
||||
func (ssc *defaultStatefulSetControl) doPreDownload(set *appsv1beta1.StatefulSet, currentRevision, updateRevision *apps.ControllerRevision) {
|
||||
var err error
|
||||
if isPreDownloadDisabled || sigsruntimeClient == nil {
|
||||
return
|
||||
}
|
||||
if currentRevision.Name != updateRevision.Name {
|
||||
// get asts pre-download annotation
|
||||
minUpdatedReadyPodsCount := 0
|
||||
if minUpdatedReadyPods, ok := set.Annotations[appsv1alpha1.ImagePreDownloadMinUpdatedReadyPods]; ok {
|
||||
minUpdatedReadyPodsIntStr := intstrutil.Parse(minUpdatedReadyPods)
|
||||
minUpdatedReadyPodsCount, err = intstrutil.GetScaledValueFromIntOrPercent(&minUpdatedReadyPodsIntStr, int(*set.Spec.Replicas), true)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Failed to GetScaledValueFromIntOrPercent of minUpdatedReadyPods for statefulSet", "statefulSet", klog.KObj(set))
|
||||
}
|
||||
}
|
||||
updatedReadyReplicas := set.Status.UpdatedReadyReplicas
|
||||
if updateRevision.Name != set.Status.UpdateRevision {
|
||||
updatedReadyReplicas = 0
|
||||
}
|
||||
if int32(minUpdatedReadyPodsCount) <= updatedReadyReplicas {
|
||||
// pre-download images for new revision
|
||||
if err := ssc.createImagePullJobsForInPlaceUpdate(set, currentRevision, updateRevision); err != nil {
|
||||
klog.ErrorS(err, "Failed to create ImagePullJobs for statefulSet", "statefulSet", klog.KObj(set))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// delete ImagePullJobs if revisions have been consistent
|
||||
if err := imagejobutilfunc.DeleteJobsForWorkload(sigsruntimeClient, set); err != nil {
|
||||
klog.ErrorS(err, "Failed to delete ImagePullJobs for statefulSet", "statefulSet", klog.KObj(set))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateStatefulSet performs the update function for a StatefulSet. This method creates, updates, and deletes Pods in
|
||||
// the set in order to conform the system to the target state for the set. The target state always contains
|
||||
// set.Spec.Replicas Pods with a Ready Condition. If the UpdateStrategy.Type for the set is
|
||||
|
|
@ -314,8 +349,6 @@ func (ssc *defaultStatefulSetControl) getStatefulSetRevisions(
|
|||
// all Pods with ordinal less than UpdateStrategy.Partition.Ordinal must be at Status.CurrentRevision and all other
|
||||
// Pods must be at Status.UpdateRevision. If the returned error is nil, the returned StatefulSetStatus is valid and the
|
||||
// update must be recorded. If the error is not nil, the method should be retried until successful.
|
||||
|
||||
// TODO (RZ): Break the below spaghetti code into smaller chucks with unit tests
|
||||
func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
||||
ctx context.Context,
|
||||
set *appsv1beta1.StatefulSet,
|
||||
|
|
@ -339,34 +372,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
|||
return set.Status.DeepCopy(), err
|
||||
}
|
||||
|
||||
if !isPreDownloadDisabled && sigsruntimeClient != nil {
|
||||
if currentRevision.Name != updateRevision.Name {
|
||||
// get asts pre-download annotation
|
||||
minUpdatedReadyPodsCount := 0
|
||||
if minUpdatedReadyPods, ok := set.Annotations[appsv1alpha1.ImagePreDownloadMinUpdatedReadyPods]; ok {
|
||||
minUpdatedReadyPodsIntStr := intstrutil.Parse(minUpdatedReadyPods)
|
||||
minUpdatedReadyPodsCount, err = intstrutil.GetScaledValueFromIntOrPercent(&minUpdatedReadyPodsIntStr, int(*set.Spec.Replicas), true)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to GetScaledValueFromIntOrPercent of minUpdatedReadyPods for %v: %v", set, err)
|
||||
}
|
||||
}
|
||||
updatedReadyReplicas := set.Status.UpdatedReadyReplicas
|
||||
if updateRevision.Name != set.Status.UpdateRevision {
|
||||
updatedReadyReplicas = 0
|
||||
}
|
||||
if int32(minUpdatedReadyPodsCount) <= updatedReadyReplicas {
|
||||
// pre-download images for new revision
|
||||
if err := ssc.createImagePullJobsForInPlaceUpdate(set, currentRevision, updateRevision); err != nil {
|
||||
klog.Errorf("Failed to create ImagePullJobs for %v: %v", set, err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// delete ImagePullJobs if revisions have been consistent
|
||||
if err := imagejobutilfunc.DeleteJobsForWorkload(sigsruntimeClient, set); err != nil {
|
||||
klog.Errorf("Failed to delete ImagePullJobs for %v: %v", set, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
ssc.doPreDownload(set, currentRevision, updateRevision)
|
||||
|
||||
// set the generation, and revisions in the returned status
|
||||
status := appsv1beta1.StatefulSetStatus{}
|
||||
|
|
@ -375,6 +381,9 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
|||
status.UpdateRevision = updateRevision.Name
|
||||
status.CollisionCount = utilpointer.Int32Ptr(collisionCount)
|
||||
status.LabelSelector = selector.String()
|
||||
minReadySeconds := getMinReadySeconds(set)
|
||||
|
||||
updateStatus(&status, minReadySeconds, currentRevision, updateRevision, pods)
|
||||
|
||||
startOrdinal, endOrdinal, reserveOrdinals := getStatefulSetReplicasRange(set)
|
||||
// slice that will contain all Pods such that startOrdinal <= getOrdinal(pod) < endOrdinal and not in reserveOrdinals
|
||||
|
|
@ -385,48 +394,9 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
|||
firstUnhealthyOrdinal := math.MaxInt32
|
||||
var firstUnhealthyPod *v1.Pod
|
||||
monotonic := !allowsBurst(set)
|
||||
minReadySeconds := getMinReadySeconds(set)
|
||||
var scaleMaxUnavailable *int
|
||||
if set.Spec.ScaleStrategy != nil && set.Spec.ScaleStrategy.MaxUnavailable != nil {
|
||||
maxUnavailable, err := intstrutil.GetValueFromIntOrPercent(set.Spec.ScaleStrategy.MaxUnavailable, int(*set.Spec.Replicas), false)
|
||||
if err != nil {
|
||||
return &status, err
|
||||
}
|
||||
// maxUnavailable should not be less than 1
|
||||
if maxUnavailable < 1 {
|
||||
maxUnavailable = 1
|
||||
}
|
||||
scaleMaxUnavailable = &maxUnavailable
|
||||
}
|
||||
|
||||
// First we partition pods into two lists valid replicas and condemned Pods
|
||||
for i := range pods {
|
||||
status.Replicas++
|
||||
|
||||
// count the number of running and ready replicas
|
||||
if isRunningAndReady(pods[i]) {
|
||||
status.ReadyReplicas++
|
||||
if getPodRevision(pods[i]) == updateRevision.Name {
|
||||
status.UpdatedReadyReplicas++
|
||||
if avail, _ := isRunningAndAvailable(pods[i], minReadySeconds); avail {
|
||||
status.UpdatedAvailableReplicas++
|
||||
}
|
||||
}
|
||||
if avail, _ := isRunningAndAvailable(pods[i], minReadySeconds); avail {
|
||||
status.AvailableReplicas++
|
||||
}
|
||||
}
|
||||
|
||||
// count the number of current and update replicas
|
||||
if isCreated(pods[i]) && !isTerminating(pods[i]) {
|
||||
if getPodRevision(pods[i]) == currentRevision.Name {
|
||||
status.CurrentReplicas++
|
||||
}
|
||||
if getPodRevision(pods[i]) == updateRevision.Name {
|
||||
status.UpdatedReplicas++
|
||||
}
|
||||
}
|
||||
|
||||
if ord := getOrdinal(pods[i]); podInOrdinalRangeWithParams(pods[i], startOrdinal, endOrdinal, reserveOrdinals) {
|
||||
// if the ordinal of the pod is within the range of the current number of replicas and not in reserveOrdinals,
|
||||
// insert it at the indirection of its ordinal
|
||||
|
|
@ -456,7 +426,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
|||
}
|
||||
|
||||
// sort the condemned Pods by their ordinals
|
||||
sort.Sort(ascendingOrdinal(condemned))
|
||||
sort.Sort(descendingOrdinal(condemned))
|
||||
|
||||
// find the first unhealthy Pod
|
||||
for i := range replicas {
|
||||
|
|
@ -472,7 +442,8 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
|||
}
|
||||
}
|
||||
|
||||
for i := range condemned {
|
||||
// or the first unhealthy condemned Pod (condemned are sorted in descending order for ease of use)
|
||||
for i := len(condemned) - 1; i >= 0; i-- {
|
||||
if !isHealthy(condemned[i]) {
|
||||
unhealthy++
|
||||
if ord := getOrdinal(condemned[i]); ord < firstUnhealthyOrdinal {
|
||||
|
|
@ -492,229 +463,52 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
|||
return &status, nil
|
||||
}
|
||||
|
||||
// Examine each replica with respect to its ordinal
|
||||
for i := range replicas {
|
||||
if replicas[i] == nil {
|
||||
continue
|
||||
}
|
||||
// delete and recreate failed pods
|
||||
if isFailed(replicas[i]) {
|
||||
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
|
||||
"StatefulSet %s/%s is recreating failed Pod %s",
|
||||
set.Namespace,
|
||||
set.Name,
|
||||
replicas[i].Name)
|
||||
if _, err := ssc.deletePod(set, replicas[i]); err != nil {
|
||||
return &status, err
|
||||
}
|
||||
if getPodRevision(replicas[i]) == currentRevision.Name {
|
||||
status.CurrentReplicas--
|
||||
}
|
||||
if getPodRevision(replicas[i]) == updateRevision.Name {
|
||||
status.UpdatedReplicas--
|
||||
}
|
||||
status.Replicas--
|
||||
replicas[i] = newVersionedStatefulSetPod(
|
||||
currentSet,
|
||||
updateSet,
|
||||
currentRevision.Name,
|
||||
updateRevision.Name,
|
||||
i, replicas)
|
||||
}
|
||||
// If we find a Pod that has not been created we create the Pod
|
||||
if !isCreated(replicas[i]) {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
||||
if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil {
|
||||
return &status, err
|
||||
} else if isStale {
|
||||
// If a pod has a stale PVC, no more work can be done this round.
|
||||
return &status, err
|
||||
}
|
||||
}
|
||||
|
||||
lifecycle.SetPodLifecycle(appspub.LifecycleStateNormal)(replicas[i])
|
||||
if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil {
|
||||
msg := fmt.Sprintf("StatefulPodControl failed to create Pod error: %s", err)
|
||||
condition := NewStatefulsetCondition(appsv1beta1.FailedCreatePod, v1.ConditionTrue, "", msg)
|
||||
SetStatefulsetCondition(&status, condition)
|
||||
return &status, err
|
||||
}
|
||||
status.Replicas++
|
||||
if getPodRevision(replicas[i]) == currentRevision.Name {
|
||||
status.CurrentReplicas++
|
||||
}
|
||||
if getPodRevision(replicas[i]) == updateRevision.Name {
|
||||
status.UpdatedReplicas++
|
||||
}
|
||||
// if the set does not allow bursting, return immediately
|
||||
if monotonic {
|
||||
return &status, nil
|
||||
} else if decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) {
|
||||
klog.V(4).Infof(
|
||||
"StatefulSet %s/%s Pod %s is Creating, and break pods scale",
|
||||
set.Namespace,
|
||||
set.Name,
|
||||
replicas[i].Name)
|
||||
break
|
||||
}
|
||||
// pod created, no more work possible for this round
|
||||
continue
|
||||
}
|
||||
|
||||
// If the Pod is in pending state then trigger PVC creation to create missing PVCs
|
||||
if isPending(replicas[i]) {
|
||||
klog.V(4).Info(
|
||||
"StatefulSet is triggering PVC creation for pending Pod",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||
if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil {
|
||||
return &status, err
|
||||
}
|
||||
}
|
||||
|
||||
// If we find a Pod that is currently terminating, we must wait until graceful deletion
|
||||
// completes before we continue to make progress.
|
||||
if isTerminating(replicas[i]) && monotonic {
|
||||
klog.V(4).Infof(
|
||||
"StatefulSet %s/%s is waiting for Pod %s to Terminate",
|
||||
set.Namespace,
|
||||
set.Name,
|
||||
replicas[i].Name)
|
||||
return &status, nil
|
||||
} else if isTerminating(replicas[i]) && decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) {
|
||||
klog.V(4).Infof(
|
||||
"StatefulSet %s/%s Pod %s is Terminating, and break pods scale",
|
||||
set.Namespace,
|
||||
set.Name,
|
||||
replicas[i].Name)
|
||||
break
|
||||
}
|
||||
// Update InPlaceUpdateReady condition for pod
|
||||
if res := ssc.inplaceControl.Refresh(replicas[i], nil); res.RefreshErr != nil {
|
||||
klog.Errorf("StatefulSet %s/%s failed to update pod %s condition for inplace: %v",
|
||||
set.Namespace, set.Name, replicas[i].Name, res.RefreshErr)
|
||||
return &status, res.RefreshErr
|
||||
} else if res.DelayDuration > 0 {
|
||||
durationStore.Push(getStatefulSetKey(set), res.DelayDuration)
|
||||
}
|
||||
// If we have a Pod that has been created but is not running and available we can not make progress.
|
||||
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
|
||||
// ordinal, are Running and Available.
|
||||
if monotonic || scaleMaxUnavailable != nil {
|
||||
isAvailable, waitTime := isRunningAndAvailable(replicas[i], minReadySeconds)
|
||||
if !isAvailable && monotonic {
|
||||
if waitTime > 0 {
|
||||
// make sure we check later
|
||||
durationStore.Push(getStatefulSetKey(set), waitTime)
|
||||
klog.V(4).Infof(
|
||||
"StatefulSet %s/%s needs to wait %s for the Pod %s to be Running and Available after being"+
|
||||
" Ready for %d seconds",
|
||||
set.Namespace,
|
||||
set.Name,
|
||||
waitTime,
|
||||
replicas[i].Name,
|
||||
minReadySeconds)
|
||||
} else {
|
||||
klog.V(4).Infof(
|
||||
"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready",
|
||||
set.Namespace,
|
||||
set.Name,
|
||||
replicas[i].Name)
|
||||
}
|
||||
return &status, nil
|
||||
} else if !isAvailable && decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) {
|
||||
klog.V(4).Infof(
|
||||
"StatefulSet %s/%s Pod %s is unavailable, and break pods scale",
|
||||
set.Namespace,
|
||||
set.Name,
|
||||
replicas[i].Name)
|
||||
if waitTime > 0 {
|
||||
// make sure we check later
|
||||
durationStore.Push(getStatefulSetKey(set), waitTime)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
// Enforce the StatefulSet invariants
|
||||
retentionMatch := true
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
||||
var err error
|
||||
retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(updateSet, replicas[i])
|
||||
// An error is expected if the pod is not yet fully updated, and so return is treated as matching.
|
||||
if err != nil {
|
||||
retentionMatch = true
|
||||
}
|
||||
}
|
||||
if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch {
|
||||
continue
|
||||
}
|
||||
// Make a deep copy so we don't mutate the shared cache
|
||||
replica := replicas[i].DeepCopy()
|
||||
if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil {
|
||||
msg := fmt.Sprintf("StatefulPodControl failed to update Pod error: %s", err)
|
||||
condition := NewStatefulsetCondition(appsv1beta1.FailedUpdatePod, v1.ConditionTrue, "", msg)
|
||||
SetStatefulsetCondition(&status, condition)
|
||||
return &status, err
|
||||
}
|
||||
// First, process each living replica. Exit if we run into an error or something blocking in monotonic mode.
|
||||
scaleMaxUnavailable, err := getScaleMaxUnavailable(set)
|
||||
if err != nil {
|
||||
return &status, err
|
||||
}
|
||||
processReplicaFn := func(i int) (bool, bool, error) {
|
||||
return ssc.processReplica(ctx, set, updateSet, monotonic, replicas, i, &status, scaleMaxUnavailable)
|
||||
}
|
||||
if shouldExit, err := runForAllWithBreak(replicas, processReplicaFn); shouldExit || err != nil {
|
||||
updateStatus(&status, minReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
||||
return &status, err
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
||||
// Ensure ownerRefs are set correctly for the condemned pods.
|
||||
for i := range condemned {
|
||||
fixPodClaim := func(i int) (bool, error) {
|
||||
if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(updateSet, condemned[i]); err != nil {
|
||||
return &status, err
|
||||
return true, err
|
||||
} else if !matchPolicy {
|
||||
if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(updateSet, condemned[i]); err != nil {
|
||||
return &status, err
|
||||
return true, err
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// At this point, all of the current Replicas are Running and Ready, we can consider termination.
|
||||
// We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
|
||||
// We will terminate Pods in a monotonically decreasing order over [len(pods),set.Spec.Replicas).
|
||||
// Note that we do not resurrect Pods in this interval. Also not that scaling will take precedence over
|
||||
// updates.
|
||||
for target := len(condemned) - 1; target >= 0; target-- {
|
||||
// wait for terminating pods to expire
|
||||
if isTerminating(condemned[target]) {
|
||||
klog.V(4).InfoS("StatefulSet is waiting for Pod to Terminate prior to scale down",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[target]))
|
||||
// block if we are in monotonic mode
|
||||
if monotonic {
|
||||
return &status, nil
|
||||
}
|
||||
continue
|
||||
}
|
||||
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
|
||||
if avail, waitTime := isRunningAndAvailable(condemned[target], minReadySeconds); !avail && monotonic && condemned[target] != firstUnhealthyPod {
|
||||
klog.V(4).InfoS("StatefulSet is waiting for Pod to be Running and Ready prior to scale down",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
|
||||
if waitTime > 0 {
|
||||
durationStore.Push(getStatefulSetKey(condemned[target]), waitTime)
|
||||
}
|
||||
return &status, nil
|
||||
}
|
||||
klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for scale down",
|
||||
set.Namespace,
|
||||
set.Name,
|
||||
condemned[target].Name)
|
||||
|
||||
modified, err := ssc.deletePod(set, condemned[target])
|
||||
if err != nil || modified {
|
||||
if shouldExit, err := runForAll(condemned, fixPodClaim, monotonic); shouldExit || err != nil {
|
||||
updateStatus(&status, minReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
||||
return &status, err
|
||||
}
|
||||
if getPodRevision(condemned[target]) == currentRevision.Name {
|
||||
status.CurrentReplicas--
|
||||
}
|
||||
if getPodRevision(condemned[target]) == updateRevision.Name {
|
||||
status.UpdatedReplicas--
|
||||
}
|
||||
if monotonic {
|
||||
return &status, nil
|
||||
}
|
||||
}
|
||||
|
||||
// At this point, in monotonic mode all of the current Replicas are Running, Ready and Available,
|
||||
// and we can consider termination.
|
||||
// We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
|
||||
// We will terminate Pods in a monotonically decreasing order.
|
||||
// Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over
|
||||
// updates.
|
||||
processCondemnedFn := func(i int) (bool, error) {
|
||||
return ssc.processCondemned(ctx, set, firstUnhealthyPod, monotonic, condemned, i)
|
||||
}
|
||||
if shouldExit, err := runForAll(condemned, processCondemnedFn, monotonic); shouldExit || err != nil {
|
||||
updateStatus(&status, minReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
||||
return &status, err
|
||||
}
|
||||
updateStatus(&status, minReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
||||
|
||||
// for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
|
||||
if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
|
||||
return &status, nil
|
||||
|
|
@ -1055,6 +849,77 @@ func (ssc *defaultStatefulSetControl) updateStatefulSetStatus(
|
|||
return nil
|
||||
}
|
||||
|
||||
type replicaStatus struct {
|
||||
replicas int32
|
||||
readyReplicas int32
|
||||
availableReplicas int32
|
||||
currentReplicas int32
|
||||
|
||||
updatedReplicas int32
|
||||
updatedReadyReplicas int32
|
||||
updatedAvailableReplicas int32
|
||||
}
|
||||
|
||||
func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision) replicaStatus {
|
||||
status := replicaStatus{}
|
||||
for _, pod := range pods {
|
||||
if pod == nil {
|
||||
continue
|
||||
}
|
||||
if isCreated(pod) {
|
||||
status.replicas++
|
||||
}
|
||||
|
||||
// count the number of running and ready replicas
|
||||
if isRunningAndReady(pod) {
|
||||
status.readyReplicas++
|
||||
if getPodRevision(pod) == updateRevision.Name {
|
||||
status.updatedReadyReplicas++
|
||||
if avail, _ := isRunningAndAvailable(pod, minReadySeconds); avail {
|
||||
status.updatedAvailableReplicas++
|
||||
}
|
||||
}
|
||||
// count the number of running and available replicas
|
||||
ok, _ := isRunningAndAvailable(pod, minReadySeconds)
|
||||
if ok {
|
||||
status.availableReplicas++
|
||||
}
|
||||
}
|
||||
|
||||
// count the number of current and update replicas
|
||||
if isCreated(pod) && !isTerminating(pod) {
|
||||
revision := getPodRevision(pod)
|
||||
if revision == currentRevision.Name {
|
||||
status.currentReplicas++
|
||||
}
|
||||
if revision == updateRevision.Name {
|
||||
status.updatedReplicas++
|
||||
}
|
||||
}
|
||||
}
|
||||
return status
|
||||
}
|
||||
|
||||
func updateStatus(status *appsv1beta1.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, podLists ...[]*v1.Pod) {
|
||||
status.Replicas = 0
|
||||
status.ReadyReplicas = 0
|
||||
status.AvailableReplicas = 0
|
||||
status.CurrentReplicas = 0
|
||||
status.UpdatedReplicas = 0
|
||||
status.UpdatedReadyReplicas = 0
|
||||
status.UpdatedAvailableReplicas = 0
|
||||
for _, list := range podLists {
|
||||
replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision)
|
||||
status.Replicas += replicaStatus.replicas
|
||||
status.ReadyReplicas += replicaStatus.readyReplicas
|
||||
status.AvailableReplicas += replicaStatus.availableReplicas
|
||||
status.UpdatedReadyReplicas += replicaStatus.updatedReadyReplicas
|
||||
status.UpdatedAvailableReplicas += replicaStatus.updatedAvailableReplicas
|
||||
status.CurrentReplicas += replicaStatus.currentReplicas
|
||||
status.UpdatedReplicas += replicaStatus.updatedReplicas
|
||||
}
|
||||
}
|
||||
|
||||
// getStartOrdinal gets the first possible ordinal (inclusive).
|
||||
// Returns spec.ordinals.start if spec.ordinals is set, otherwise returns 0.
|
||||
func getStartOrdinal(set *appsv1beta1.StatefulSet) int {
|
||||
|
|
@ -1065,3 +930,289 @@ func getStartOrdinal(set *appsv1beta1.StatefulSet) int {
|
|||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *appsv1beta1.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int) (bool, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
if isTerminating(condemned[i]) {
|
||||
// if we are in monotonic mode, block and wait for terminating pods to expire
|
||||
if monotonic {
|
||||
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate prior to scale down",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i]))
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
|
||||
if !isRunningAndReady(condemned[i]) && monotonic && condemned[i] != firstUnhealthyPod {
|
||||
logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready prior to scale down",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
|
||||
return true, nil
|
||||
}
|
||||
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block.
|
||||
if avail, waitTime := isRunningAndAvailable(condemned[i], getMinReadySeconds(set)); !avail && monotonic && condemned[i] != firstUnhealthyPod {
|
||||
logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
|
||||
if waitTime > 0 {
|
||||
durationStore.Push(getStatefulSetKey(condemned[i]), waitTime)
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
logger.V(2).Info("Pod of StatefulSet is terminating for scale down",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i]))
|
||||
|
||||
modified, err := ssc.deletePod(set, condemned[i])
|
||||
if err != nil || (monotonic && modified) {
|
||||
return true, err
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// processReplica handles an individual replica within a StatefulSet based on its current state.
|
||||
// It decides whether to delete, create, update the replica, or await its readiness.
|
||||
//
|
||||
// different from stateful set:
|
||||
//
|
||||
// If decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) returns true,
|
||||
// break the pod for-loop and proceed with the update logic,
|
||||
// which will apply in-place conditions to make the pod ready.
|
||||
// For example: update unhealthy pods or add some conditions
|
||||
//
|
||||
// Returns:
|
||||
// - bool shouldExit: whether to exit.
|
||||
// - bool shouldBreak: whether to break the pod for-loop and proceed with the update logic.
|
||||
// - An error if encountered during processing; nil otherwise.
|
||||
func (ssc *defaultStatefulSetControl) processReplica(
|
||||
ctx context.Context,
|
||||
set *appsv1beta1.StatefulSet,
|
||||
updateSet *appsv1beta1.StatefulSet,
|
||||
monotonic bool,
|
||||
replicas []*v1.Pod,
|
||||
i int,
|
||||
status *appsv1beta1.StatefulSetStatus,
|
||||
scaleMaxUnavailable *int) (bool, bool, error) {
|
||||
minReadySeconds := getMinReadySeconds(set)
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
if replicas[i] == nil {
|
||||
return false, false, nil
|
||||
}
|
||||
// Note that pods with phase Succeeded will also trigger this event. This is
|
||||
// because final pod phase of evicted or otherwise forcibly stopped pods
|
||||
// (e.g. terminated on node reboot) is determined by the exit code of the
|
||||
// container, not by the reason for pod termination. We should restart the pod
|
||||
// regardless of the exit code.
|
||||
if isFailed(replicas[i]) || isSucceeded(replicas[i]) {
|
||||
if replicas[i].DeletionTimestamp == nil {
|
||||
if _, err := ssc.deletePod(set, replicas[i]); err != nil {
|
||||
return true, false, err
|
||||
}
|
||||
}
|
||||
// New pod should be generated on the next sync after the current pod is removed from etcd.
|
||||
return true, false, nil
|
||||
}
|
||||
// If we find a Pod that has not been created we create the Pod
|
||||
if !isCreated(replicas[i]) {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
||||
if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil {
|
||||
return true, false, err
|
||||
} else if isStale {
|
||||
// If a pod has a stale PVC, no more work can be done this round.
|
||||
return true, false, err
|
||||
}
|
||||
}
|
||||
lifecycle.SetPodLifecycle(appspub.LifecycleStateNormal)(replicas[i])
|
||||
if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil {
|
||||
msg := fmt.Sprintf("StatefulPodControl failed to create Pod error: %s", err)
|
||||
condition := NewStatefulsetCondition(appsv1beta1.FailedCreatePod, v1.ConditionTrue, "", msg)
|
||||
SetStatefulsetCondition(status, condition)
|
||||
return true, false, err
|
||||
}
|
||||
if monotonic {
|
||||
// if the set does not allow bursting, return immediately
|
||||
return true, false, nil
|
||||
} else if decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) {
|
||||
logger.V(4).Info(
|
||||
"StatefulSet pod is Creating, and break pods scale",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||
return false, true, nil
|
||||
}
|
||||
// pod created, no more work possible for this round
|
||||
return false, false, nil
|
||||
}
|
||||
|
||||
// If the Pod is in pending state then trigger PVC creation to create missing PVCs
|
||||
if isPending(replicas[i]) {
|
||||
logger.V(4).Info(
|
||||
"StatefulSet is triggering PVC creation for pending Pod",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||
if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil {
|
||||
return true, false, err
|
||||
}
|
||||
}
|
||||
|
||||
// If we find a Pod that is currently terminating, we must wait until graceful deletion
|
||||
// completes before we continue to make progress.
|
||||
if isTerminating(replicas[i]) && monotonic {
|
||||
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||
return true, false, nil
|
||||
} else if isTerminating(replicas[i]) && decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) {
|
||||
logger.V(4).Info(
|
||||
"StatefulSet pod is Terminating, and break pods scale",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||
return false, true, nil
|
||||
}
|
||||
|
||||
// Update InPlaceUpdateReady condition for pod
|
||||
if res := ssc.inplaceControl.Refresh(replicas[i], nil); res.RefreshErr != nil {
|
||||
logger.Error(res.RefreshErr, "StatefulSet failed to update pod condition for inplace",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||
return true, false, res.RefreshErr
|
||||
} else if res.DelayDuration > 0 {
|
||||
durationStore.Push(getStatefulSetKey(set), res.DelayDuration)
|
||||
}
|
||||
|
||||
// If we have a Pod that has been created but is not running and available we can not make progress.
|
||||
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
|
||||
// ordinal, are Running and Available.
|
||||
if monotonic || scaleMaxUnavailable != nil {
|
||||
isAvailable, waitTime := isRunningAndAvailable(replicas[i], minReadySeconds)
|
||||
if !isAvailable && monotonic {
|
||||
if waitTime > 0 {
|
||||
// make sure we check later
|
||||
durationStore.Push(getStatefulSetKey(set), waitTime)
|
||||
logger.V(4).Info(
|
||||
"StatefulSet needs to wait for the pod to be Running and Available after being"+
|
||||
" Ready for minReadySeconds", "statefulSet", klog.KObj(set), "waitTime", waitTime,
|
||||
"pod", klog.KObj(replicas[i]), "minReadySeconds", minReadySeconds)
|
||||
} else {
|
||||
logger.V(4).Info("StatefulSet is waiting for Pod to be Available",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||
}
|
||||
return true, false, nil
|
||||
} else if !isAvailable && decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) {
|
||||
logger.V(4).Info(
|
||||
"StatefulSet pod is unavailable, and break pods scale",
|
||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||
if waitTime > 0 {
|
||||
// make sure we check later
|
||||
durationStore.Push(getStatefulSetKey(set), waitTime)
|
||||
}
|
||||
return false, true, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Enforce the StatefulSet invariants
|
||||
retentionMatch := true
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
||||
var err error
|
||||
retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(updateSet, replicas[i])
|
||||
// An error is expected if the pod is not yet fully updated, and so return is treated as matching.
|
||||
if err != nil {
|
||||
retentionMatch = true
|
||||
}
|
||||
}
|
||||
|
||||
if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch {
|
||||
return false, false, nil
|
||||
}
|
||||
|
||||
// Make a deep copy so we don't mutate the shared cache
|
||||
replica := replicas[i].DeepCopy()
|
||||
if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil {
|
||||
msg := fmt.Sprintf("StatefulPodControl failed to update Pod error: %s", err)
|
||||
condition := NewStatefulsetCondition(appsv1beta1.FailedUpdatePod, v1.ConditionTrue, "", msg)
|
||||
SetStatefulsetCondition(status, condition)
|
||||
return true, false, err
|
||||
}
|
||||
|
||||
return false, false, nil
|
||||
}
|
||||
|
||||
func slowStartBatch(initialBatchSize int, remaining int, fn func(int) (bool, error)) (int, error) {
|
||||
successes := 0
|
||||
j := 0
|
||||
for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(integer.IntMin(2*batchSize, remaining), MaxBatchSize) {
|
||||
errCh := make(chan error, batchSize)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(batchSize)
|
||||
for i := 0; i < batchSize; i++ {
|
||||
go func(k int) {
|
||||
defer wg.Done()
|
||||
// Ignore the first parameter - relevant for monotonic only.
|
||||
if _, err := fn(k); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}(j)
|
||||
j++
|
||||
}
|
||||
wg.Wait()
|
||||
successes += batchSize - len(errCh)
|
||||
close(errCh)
|
||||
if len(errCh) > 0 {
|
||||
errs := make([]error, 0)
|
||||
for err := range errCh {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
return successes, utilerrors.NewAggregate(errs)
|
||||
}
|
||||
remaining -= batchSize
|
||||
}
|
||||
return successes, nil
|
||||
}
|
||||
|
||||
// runForAllWithBreak iterates through all pod objects, applying the given function until a specified condition is met.
|
||||
// The function can decide whether to continue, break out of the loop, or return an error.
|
||||
// Parameters:
|
||||
// - pods: An array of pointers to Pod objects, representing the collection of pods to be processed.
|
||||
// - fn: A function that takes an index as a parameter and returns three values:
|
||||
// 1. A boolean indicating whether to exit the current iteration.
|
||||
// 2. A boolean indicating whether to break out of the loop.
|
||||
// 3. An error object, in case an error occurs during function execution.
|
||||
//
|
||||
// Returns:
|
||||
// - A boolean indicating whether an exit condition was met or an error occurred during iteration.
|
||||
// - An error object, if an error was encountered during the execution of the provided function.
|
||||
func runForAllWithBreak(pods []*v1.Pod, fn func(i int) (bool, bool, error)) (bool, error) {
|
||||
for i := range pods {
|
||||
if shouldExit, shouldBreak, err := fn(i); shouldExit || err != nil {
|
||||
return true, err
|
||||
} else if shouldBreak {
|
||||
//Introduce this branch to exit the for-loop while proceeding with subsequent update logic.
|
||||
break
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func runForAll(pods []*v1.Pod, fn func(i int) (bool, error), monotonic bool) (bool, error) {
|
||||
if monotonic {
|
||||
for i := range pods {
|
||||
if shouldExit, err := fn(i); shouldExit || err != nil {
|
||||
return true, err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if _, err := slowStartBatch(1, len(pods), fn); err != nil {
|
||||
return true, err
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func getScaleMaxUnavailable(set *appsv1beta1.StatefulSet) (*int, error) {
|
||||
var scaleMaxUnavailable *int
|
||||
if set.Spec.ScaleStrategy != nil && set.Spec.ScaleStrategy.MaxUnavailable != nil {
|
||||
maxUnavailable, err := intstrutil.GetValueFromIntOrPercent(set.Spec.ScaleStrategy.MaxUnavailable, int(*set.Spec.Replicas), false)
|
||||
if err != nil {
|
||||
return scaleMaxUnavailable, err
|
||||
}
|
||||
// maxUnavailable should not be less than 1
|
||||
if maxUnavailable < 1 {
|
||||
maxUnavailable = 1
|
||||
}
|
||||
scaleMaxUnavailable = &maxUnavailable
|
||||
}
|
||||
return scaleMaxUnavailable, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -423,6 +423,11 @@ func isFailed(pod *v1.Pod) bool {
|
|||
return pod.Status.Phase == v1.PodFailed
|
||||
}
|
||||
|
||||
// isSucceeded returns true if pod has a Phase of PodSucceeded
|
||||
func isSucceeded(pod *v1.Pod) bool {
|
||||
return pod.Status.Phase == v1.PodSucceeded
|
||||
}
|
||||
|
||||
// isTerminating returns true if pod's DeletionTimestamp has been set
|
||||
func isTerminating(pod *v1.Pod) bool {
|
||||
return pod.DeletionTimestamp != nil
|
||||
|
|
@ -658,6 +663,20 @@ func (ao ascendingOrdinal) Less(i, j int) bool {
|
|||
return getOrdinal(ao[i]) < getOrdinal(ao[j])
|
||||
}
|
||||
|
||||
type descendingOrdinal []*v1.Pod
|
||||
|
||||
func (do descendingOrdinal) Len() int {
|
||||
return len(do)
|
||||
}
|
||||
|
||||
func (do descendingOrdinal) Swap(i, j int) {
|
||||
do[i], do[j] = do[j], do[i]
|
||||
}
|
||||
|
||||
func (do descendingOrdinal) Less(i, j int) bool {
|
||||
return getOrdinal(do[i]) > getOrdinal(do[j])
|
||||
}
|
||||
|
||||
// NewStatefulsetCondition creates a new statefulset condition.
|
||||
func NewStatefulsetCondition(conditionType apps.StatefulSetConditionType, conditionStatus v1.ConditionStatus, reason, message string) apps.StatefulSetCondition {
|
||||
return apps.StatefulSetCondition{
|
||||
|
|
|
|||
Loading…
Reference in New Issue