Adaptive scheduling strategy for UnitedDeployment and refactor subset adapter (#1720)

Signed-off-by: AiRanthem <zhongtianyun.zty@alibaba-inc.com>
Co-authored-by: AiRanthem <zhongtianyun.zty@alibaba-inc.com>
This commit is contained in:
Ai Ranthem 2024-10-15 10:38:01 +08:00 committed by GitHub
parent 0964df6da6
commit 29f2323d59
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 1545 additions and 193 deletions

View File

@ -17,13 +17,14 @@ limitations under the License.
package v1alpha1
import (
"time"
"github.com/openkruise/kruise/apis/apps/v1beta1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"github.com/openkruise/kruise/apis/apps/v1beta1"
)
// UpdateStrategyType is a string enumeration type that enumerates
@ -165,6 +166,10 @@ type Topology struct {
// +patchStrategy=merge
// +optional
Subsets []Subset `json:"subsets,omitempty" patchStrategy:"merge" patchMergeKey:"name"`
// ScheduleStrategy indicates the strategy the UnitedDeployment used to preform the schedule between each of subsets.
// +optional
ScheduleStrategy UnitedDeploymentScheduleStrategy `json:"scheduleStrategy,omitempty"`
}
// Subset defines the detail of a subset.
@ -218,6 +223,69 @@ type Subset struct {
Patch runtime.RawExtension `json:"patch,omitempty"`
}
// UnitedDeploymentScheduleStrategyType is a string enumeration type that enumerates
// all possible schedule strategies for the UnitedDeployment controller.
// +kubebuilder:validation:Enum=Adaptive;Fixed;""
type UnitedDeploymentScheduleStrategyType string
const (
// AdaptiveUnitedDeploymentScheduleStrategyType represents that when a pod is stuck in the pending status and cannot
// be scheduled, allow it to be rescheduled to another subset.
AdaptiveUnitedDeploymentScheduleStrategyType UnitedDeploymentScheduleStrategyType = "Adaptive"
// FixedUnitedDeploymentScheduleStrategyType represents that pods are strictly scheduled to the selected subset
// even if scheduling fail.
FixedUnitedDeploymentScheduleStrategyType UnitedDeploymentScheduleStrategyType = "Fixed"
)
const (
DefaultRescheduleCriticalDuration = 30 * time.Second
DefaultUnschedulableStatusLastDuration = 300 * time.Second
)
// AdaptiveUnitedDeploymentStrategy is used to communicate parameters when Type is AdaptiveUnitedDeploymentScheduleStrategyType.
type AdaptiveUnitedDeploymentStrategy struct {
// RescheduleCriticalSeconds indicates how long controller will reschedule a schedule failed Pod to the subset that has
// redundant capacity after the subset where the Pod lives. If a Pod was scheduled failed and still in an unschedulabe status
// over RescheduleCriticalSeconds duration, the controller will reschedule it to a suitable subset. Default is 30 seconds.
// +optional
RescheduleCriticalSeconds *int32 `json:"rescheduleCriticalSeconds,omitempty"`
// UnschedulableLastSeconds is used to set the number of seconds for a Subset to recover from an unschedulable state,
// with a default value of 300 seconds.
// +optional
UnschedulableLastSeconds *int32 `json:"unschedulableLastSeconds,omitempty"`
}
// UnitedDeploymentScheduleStrategy defines the schedule performance of UnitedDeployment.
type UnitedDeploymentScheduleStrategy struct {
// Type indicates the type of the UnitedDeploymentScheduleStrategy.
// Default is Fixed
// +optional
Type UnitedDeploymentScheduleStrategyType `json:"type,omitempty"`
// Adaptive is used to communicate parameters when Type is AdaptiveUnitedDeploymentScheduleStrategyType.
// +optional
Adaptive *AdaptiveUnitedDeploymentStrategy `json:"adaptive,omitempty"`
}
func (s *UnitedDeploymentScheduleStrategy) IsAdaptive() bool {
return s.Type == AdaptiveUnitedDeploymentScheduleStrategyType
}
func (s *UnitedDeploymentScheduleStrategy) GetRescheduleCriticalDuration() time.Duration {
if s.Adaptive == nil || s.Adaptive.RescheduleCriticalSeconds == nil {
return DefaultRescheduleCriticalDuration
}
return time.Duration(*s.Adaptive.RescheduleCriticalSeconds) * time.Second
}
func (s *UnitedDeploymentScheduleStrategy) GetUnschedulableLastDuration() time.Duration {
if s.Adaptive == nil || s.Adaptive.UnschedulableLastSeconds == nil {
return DefaultUnschedulableStatusLastDuration
}
return time.Duration(*s.Adaptive.UnschedulableLastSeconds) * time.Second
}
// UnitedDeploymentStatus defines the observed state of UnitedDeployment.
type UnitedDeploymentStatus struct {
// ObservedGeneration is the most recent generation observed for this UnitedDeployment. It corresponds to the
@ -252,6 +320,8 @@ type UnitedDeploymentStatus struct {
// +optional
SubsetReplicas map[string]int32 `json:"subsetReplicas,omitempty"`
// Record the conditions of each subset.
SubsetStatuses []UnitedDeploymentSubsetStatus `json:"subsetStatuses,omitempty"`
// Represents the latest available observations of a UnitedDeployment's current state.
// +optional
Conditions []UnitedDeploymentCondition `json:"conditions,omitempty"`
@ -264,6 +334,23 @@ type UnitedDeploymentStatus struct {
LabelSelector string `json:"labelSelector,omitempty"`
}
func (s *UnitedDeploymentStatus) GetSubsetStatus(subset string) *UnitedDeploymentSubsetStatus {
for i, subsetStatus := range s.SubsetStatuses {
if subsetStatus.Name == subset {
return &s.SubsetStatuses[i]
}
}
return nil
}
func (u *UnitedDeployment) InitSubsetStatuses() {
for _, subset := range u.Spec.Topology.Subsets {
if u.Status.GetSubsetStatus(subset.Name) == nil {
u.Status.SubsetStatuses = append(u.Status.SubsetStatuses, UnitedDeploymentSubsetStatus{Name: subset.Name})
}
}
}
// UnitedDeploymentCondition describes current state of a UnitedDeployment.
type UnitedDeploymentCondition struct {
// Type of in place set condition.
@ -278,7 +365,7 @@ type UnitedDeploymentCondition struct {
// The reason for the condition's last transition.
Reason string `json:"reason,omitempty"`
// A human readable message indicating details about the transition.
// A human-readable message indicating details about the transition.
Message string `json:"message,omitempty"`
}
@ -293,6 +380,62 @@ type UpdateStatus struct {
CurrentPartitions map[string]int32 `json:"currentPartitions,omitempty"`
}
type UnitedDeploymentSubsetStatus struct {
// Subset name specified in Topology.Subsets
Name string `json:"name,omitempty"`
// Recores the current replicas. Currently unused.
Replicas int32 `json:"replicas,omitempty"`
// Records the current partition. Currently unused.
Partition int32 `json:"partition,omitempty"`
// Conditions is an array of current observed subset conditions.
Conditions []UnitedDeploymentSubsetCondition `json:"conditions,omitempty"`
}
func (s *UnitedDeploymentSubsetStatus) GetCondition(condType UnitedDeploymentSubsetConditionType) *UnitedDeploymentSubsetCondition {
for _, condition := range s.Conditions {
if condition.Type == condType {
return &condition
}
}
return nil
}
func (s *UnitedDeploymentSubsetStatus) SetCondition(condType UnitedDeploymentSubsetConditionType, status corev1.ConditionStatus, reason, message string) {
var currentCond *UnitedDeploymentSubsetCondition
for i, c := range s.Conditions {
if c.Type == condType {
currentCond = &s.Conditions[i]
break
}
}
if currentCond != nil && currentCond.Status == status && currentCond.Reason == reason {
return
}
if currentCond == nil {
s.Conditions = append(s.Conditions, UnitedDeploymentSubsetCondition{Type: condType})
currentCond = &s.Conditions[len(s.Conditions)-1]
}
currentCond.LastTransitionTime = metav1.Now()
currentCond.Status = status
currentCond.Reason = reason
currentCond.Message = message
}
type UnitedDeploymentSubsetConditionType string
const (
// UnitedDeploymentSubsetSchedulable means new pods allocated into the subset will keep pending.
UnitedDeploymentSubsetSchedulable UnitedDeploymentSubsetConditionType = "Schedulable"
)
type UnitedDeploymentSubsetCondition struct {
Type UnitedDeploymentSubsetConditionType `json:"type"`
Status corev1.ConditionStatus `json:"status"`
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
Reason string `json:"reason,omitempty"`
Message string `json:"message,omitempty"`
}
// +genclient
// +genclient:method=GetScale,verb=get,subresource=scale,result=k8s.io/api/autoscaling/v1.Scale
// +genclient:method=UpdateScale,verb=update,subresource=scale,input=k8s.io/api/autoscaling/v1.Scale,result=k8s.io/api/autoscaling/v1.Scale

View File

@ -30,6 +30,31 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *AdaptiveUnitedDeploymentStrategy) DeepCopyInto(out *AdaptiveUnitedDeploymentStrategy) {
*out = *in
if in.RescheduleCriticalSeconds != nil {
in, out := &in.RescheduleCriticalSeconds, &out.RescheduleCriticalSeconds
*out = new(int32)
**out = **in
}
if in.UnschedulableLastSeconds != nil {
in, out := &in.UnschedulableLastSeconds, &out.UnschedulableLastSeconds
*out = new(int32)
**out = **in
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AdaptiveUnitedDeploymentStrategy.
func (in *AdaptiveUnitedDeploymentStrategy) DeepCopy() *AdaptiveUnitedDeploymentStrategy {
if in == nil {
return nil
}
out := new(AdaptiveUnitedDeploymentStrategy)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *AdaptiveWorkloadSpreadStrategy) DeepCopyInto(out *AdaptiveWorkloadSpreadStrategy) {
*out = *in
@ -3268,6 +3293,7 @@ func (in *Topology) DeepCopyInto(out *Topology) {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
in.ScheduleStrategy.DeepCopyInto(&out.ScheduleStrategy)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Topology.
@ -3380,6 +3406,26 @@ func (in *UnitedDeploymentList) DeepCopyObject() runtime.Object {
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *UnitedDeploymentScheduleStrategy) DeepCopyInto(out *UnitedDeploymentScheduleStrategy) {
*out = *in
if in.Adaptive != nil {
in, out := &in.Adaptive, &out.Adaptive
*out = new(AdaptiveUnitedDeploymentStrategy)
(*in).DeepCopyInto(*out)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UnitedDeploymentScheduleStrategy.
func (in *UnitedDeploymentScheduleStrategy) DeepCopy() *UnitedDeploymentScheduleStrategy {
if in == nil {
return nil
}
out := new(UnitedDeploymentScheduleStrategy)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *UnitedDeploymentSpec) DeepCopyInto(out *UnitedDeploymentSpec) {
*out = *in
@ -3428,6 +3474,13 @@ func (in *UnitedDeploymentStatus) DeepCopyInto(out *UnitedDeploymentStatus) {
(*out)[key] = val
}
}
if in.SubsetStatuses != nil {
in, out := &in.SubsetStatuses, &out.SubsetStatuses
*out = make([]UnitedDeploymentSubsetStatus, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions
*out = make([]UnitedDeploymentCondition, len(*in))
@ -3452,6 +3505,44 @@ func (in *UnitedDeploymentStatus) DeepCopy() *UnitedDeploymentStatus {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *UnitedDeploymentSubsetCondition) DeepCopyInto(out *UnitedDeploymentSubsetCondition) {
*out = *in
in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UnitedDeploymentSubsetCondition.
func (in *UnitedDeploymentSubsetCondition) DeepCopy() *UnitedDeploymentSubsetCondition {
if in == nil {
return nil
}
out := new(UnitedDeploymentSubsetCondition)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *UnitedDeploymentSubsetStatus) DeepCopyInto(out *UnitedDeploymentSubsetStatus) {
*out = *in
if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions
*out = make([]UnitedDeploymentSubsetCondition, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UnitedDeploymentSubsetStatus.
func (in *UnitedDeploymentSubsetStatus) DeepCopy() *UnitedDeploymentSubsetStatus {
if in == nil {
return nil
}
out := new(UnitedDeploymentSubsetStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *UnitedDeploymentUpdateStrategy) DeepCopyInto(out *UnitedDeploymentUpdateStrategy) {
*out = *in

View File

@ -955,6 +955,38 @@ spec:
description: Topology describes the pods distribution detail between
each of subsets.
properties:
scheduleStrategy:
description: ScheduleStrategy indicates the strategy the UnitedDeployment
used to preform the schedule between each of subsets.
properties:
adaptive:
description: Adaptive is used to communicate parameters when
Type is AdaptiveUnitedDeploymentScheduleStrategyType.
properties:
rescheduleCriticalSeconds:
description: |-
RescheduleCriticalSeconds indicates how long controller will reschedule a schedule failed Pod to the subset that has
redundant capacity after the subset where the Pod lives. If a Pod was scheduled failed and still in an unschedulabe status
over RescheduleCriticalSeconds duration, the controller will reschedule it to a suitable subset. Default is 30 seconds.
format: int32
type: integer
unschedulableLastSeconds:
description: |-
UnschedulableLastSeconds is used to set the number of seconds for a Subset to recover from an unschedulable state,
with a default value of 300 seconds.
format: int32
type: integer
type: object
type:
description: |-
Type indicates the type of the UnitedDeploymentScheduleStrategy.
Default is Fixed
enum:
- Adaptive
- Fixed
- ""
type: string
type: object
subsets:
description: |-
Contains the details of each subset. Each element in this array represents one subset
@ -1173,7 +1205,7 @@ spec:
format: date-time
type: string
message:
description: A human readable message indicating details about
description: A human-readable message indicating details about
the transition.
type: string
reason:
@ -1216,6 +1248,44 @@ spec:
description: Records the topology detail information of the replicas
of each subset.
type: object
subsetStatuses:
description: Record the conditions of each subset.
items:
properties:
conditions:
description: Conditions is an array of current observed subset
conditions.
items:
properties:
lastTransitionTime:
format: date-time
type: string
message:
type: string
reason:
type: string
status:
type: string
type:
type: string
required:
- status
- type
type: object
type: array
name:
description: Subset name specified in Topology.Subsets
type: string
partition:
description: Records the current partition. Currently unused.
format: int32
type: integer
replicas:
description: Recores the current replicas. Currently unused.
format: int32
type: integer
type: object
type: array
updateStatus:
description: Records the information of update progress.
properties:

View File

@ -17,6 +17,7 @@ limitations under the License.
package adapter
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
@ -25,14 +26,22 @@ import (
)
type Adapter interface {
// NewResourceObject creates a empty subset object.
// NewResourceObject creates an empty subset object.
NewResourceObject() client.Object
// NewResourceListObject creates a empty subset list object.
// NewResourceListObject creates an empty subset list object.
NewResourceListObject() client.ObjectList
// GetStatusObservedGeneration returns the observed generation of the subset.
GetStatusObservedGeneration(subset metav1.Object) int64
// GetReplicaDetails returns the replicas information of the subset status.
GetReplicaDetails(subset metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error)
// GetSubsetPods returns all pods of the subset workload.
GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error)
// GetSpecReplicas returns the replicas information of the subset workload.
GetSpecReplicas(obj metav1.Object) *int32
// GetSpecPartition returns the partition information of the subset workload if possible.
GetSpecPartition(obj metav1.Object, pods []*corev1.Pod) *int32
// GetStatusReplicas returns the replicas from the subset workload status.
GetStatusReplicas(obj metav1.Object) int32
// GetStatusReadyReplicas returns the ready replicas information from the subset workload status.
GetStatusReadyReplicas(obj metav1.Object) int32
// GetSubsetFailure returns failure information of the subset.
GetSubsetFailure() *string
// ApplySubsetTemplate updates the subset to the latest revision.

View File

@ -19,11 +19,12 @@ package adapter
import (
"fmt"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
)
func getSubsetPrefix(controllerName, subsetName string) string {
@ -97,3 +98,19 @@ func getCurrentPartition(pods []*corev1.Pod, revision string) *int32 {
return &partition
}
func CalculateUpdatedReplicas(podList []*corev1.Pod, updatedRevision string) (updatedReplicas, updatedReadyReplicas int32) {
for _, pod := range podList {
revision := getRevision(&pod.ObjectMeta)
// Only count pods that are updated and are not terminating
if revision == updatedRevision && controller.IsPodActive(pod) {
updatedReplicas++
if podutil.IsPodReady(pod) {
updatedReadyReplicas++
}
}
}
return
}

View File

@ -85,3 +85,50 @@ func buildPodList(ordinals []int, revisions []string, t *testing.T) []*corev1.Po
return pods
}
// mockPodList 创建一个模拟的Pod列表
func mockPodList(updatedRevision string, replicas int32, readyReplicas int32) []*corev1.Pod {
pods := make([]*corev1.Pod, replicas)
for i := int32(0); i < replicas; i++ {
var status corev1.ConditionStatus
if i < readyReplicas {
status = corev1.ConditionTrue
} else {
status = corev1.ConditionFalse
}
pods[i] = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
appsv1alpha1.ControllerRevisionHashLabelKey: updatedRevision,
},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: status,
},
},
},
}
}
return pods
}
func TestCalculateUpdatedReplicas(t *testing.T) {
updatedRevision := "updated-revision"
replicas := int32(5)
readyReplicas := int32(3)
podList := mockPodList(updatedRevision, replicas, readyReplicas)
podList = append(podList, mockPodList("old-revision", 2, 3)...)
updated, updatedReady := CalculateUpdatedReplicas(podList, updatedRevision)
if updated != replicas {
t.Errorf("Expected %d updated replicas, got %d", replicas, updated)
}
if updatedReady != readyReplicas {
t.Errorf("Expected %d updated ready replicas, got %d", readyReplicas, updatedReady)
}
}

View File

@ -62,29 +62,32 @@ func (a *AdvancedStatefulSetAdapter) GetStatusObservedGeneration(obj metav1.Obje
return obj.(*v1beta1.StatefulSet).Status.ObservedGeneration
}
// GetReplicaDetails returns the replicas detail the subset needs.
func (a *AdvancedStatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) {
set := obj.(*v1beta1.StatefulSet)
var pods []*corev1.Pod
pods, err = a.getStatefulSetPods(set)
if err != nil {
return
}
func (a *AdvancedStatefulSetAdapter) GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error) {
return a.getStatefulSetPods(obj.(*v1beta1.StatefulSet))
}
specReplicas = set.Spec.Replicas
func (a *AdvancedStatefulSetAdapter) GetSpecReplicas(obj metav1.Object) *int32 {
return obj.(*v1beta1.StatefulSet).Spec.Replicas
}
func (a *AdvancedStatefulSetAdapter) GetSpecPartition(obj metav1.Object, pods []*corev1.Pod) *int32 {
set := obj.(*v1beta1.StatefulSet)
if set.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType {
revision := getRevision(&set.ObjectMeta)
specPartition = getCurrentPartition(pods, revision)
return getCurrentPartition(pods, revision)
} else if set.Spec.UpdateStrategy.RollingUpdate != nil &&
set.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
specPartition = set.Spec.UpdateStrategy.RollingUpdate.Partition
return set.Spec.UpdateStrategy.RollingUpdate.Partition
}
return nil
}
statusReplicas = set.Status.Replicas
statusReadyReplicas = set.Status.ReadyReplicas
statusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision)
func (a *AdvancedStatefulSetAdapter) GetStatusReplicas(obj metav1.Object) int32 {
return obj.(*v1beta1.StatefulSet).Status.Replicas
}
return
func (a *AdvancedStatefulSetAdapter) GetStatusReadyReplicas(obj metav1.Object) int32 {
return obj.(*v1beta1.StatefulSet).Status.ReadyReplicas
}
// GetSubsetFailure returns the failure information of the subset.

View File

@ -41,30 +41,29 @@ func (a *CloneSetAdapter) GetStatusObservedGeneration(obj metav1.Object) int64 {
return obj.(*alpha1.CloneSet).Status.ObservedGeneration
}
func (a *CloneSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) {
func (a *CloneSetAdapter) GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error) {
return a.getCloneSetPods(obj.(*alpha1.CloneSet))
}
func (a *CloneSetAdapter) GetSpecReplicas(obj metav1.Object) *int32 {
return obj.(*alpha1.CloneSet).Spec.Replicas
}
func (a *CloneSetAdapter) GetSpecPartition(obj metav1.Object, _ []*corev1.Pod) *int32 {
set := obj.(*alpha1.CloneSet)
var pods []*corev1.Pod
pods, err = a.getCloneSetPods(set)
if err != nil {
return
}
specReplicas = set.Spec.Replicas
if set.Spec.UpdateStrategy.Partition != nil {
partition, _ := intstr.GetValueFromIntOrPercent(set.Spec.UpdateStrategy.Partition, int(*set.Spec.Replicas), true)
specPartition = utilpointer.Int32Ptr(int32(partition))
partition, _ := intstr.GetScaledValueFromIntOrPercent(set.Spec.UpdateStrategy.Partition, int(*set.Spec.Replicas), true)
return utilpointer.Int32Ptr(int32(partition))
}
return nil
}
statusReplicas = set.Status.Replicas
statusReadyReplicas = set.Status.ReadyReplicas
statusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision)
func (a *CloneSetAdapter) GetStatusReplicas(obj metav1.Object) int32 {
return obj.(*alpha1.CloneSet).Status.Replicas
}
return
func (a *CloneSetAdapter) GetStatusReadyReplicas(obj metav1.Object) int32 {
return obj.(*alpha1.CloneSet).Status.ReadyReplicas
}
func (a *CloneSetAdapter) GetSubsetFailure() *string {

View File

@ -57,25 +57,28 @@ func (a *DeploymentAdapter) GetStatusObservedGeneration(obj metav1.Object) int64
return obj.(*appsv1.Deployment).Status.ObservedGeneration
}
// GetReplicaDetails returns the replicas detail the subset needs.
func (a *DeploymentAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) {
// Convert to Deployment Object
func (a *DeploymentAdapter) GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error) {
set := obj.(*appsv1.Deployment)
return a.getDeploymentPods(set)
}
// Get all pods belonging to deployment
var pods []*corev1.Pod
pods, err = a.getDeploymentPods(set)
if err != nil {
return
}
func (a *DeploymentAdapter) GetSpecReplicas(obj metav1.Object) *int32 {
set := obj.(*appsv1.Deployment)
return set.Spec.Replicas
}
// Set according replica counts
specReplicas = set.Spec.Replicas
statusReplicas = set.Status.Replicas
statusReadyReplicas = set.Status.ReadyReplicas
statusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision)
func (a *DeploymentAdapter) GetSpecPartition(obj metav1.Object, pods []*corev1.Pod) *int32 {
return nil
}
return
func (a *DeploymentAdapter) GetStatusReplicas(obj metav1.Object) int32 {
set := obj.(*appsv1.Deployment)
return set.Status.Replicas
}
func (a *DeploymentAdapter) GetStatusReadyReplicas(obj metav1.Object) int32 {
set := obj.(*appsv1.Deployment)
return set.Status.ReadyReplicas
}
// GetSubsetFailure returns the failure information of the subset.

View File

@ -58,29 +58,32 @@ func (a *StatefulSetAdapter) GetStatusObservedGeneration(obj metav1.Object) int6
return obj.(*appsv1.StatefulSet).Status.ObservedGeneration
}
// GetReplicaDetails returns the replicas detail the subset needs.
func (a *StatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) {
set := obj.(*appsv1.StatefulSet)
var pods []*corev1.Pod
pods, err = a.getStatefulSetPods(set)
if err != nil {
return
}
func (a *StatefulSetAdapter) GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error) {
return a.getStatefulSetPods(obj.(*appsv1.StatefulSet))
}
specReplicas = set.Spec.Replicas
func (a *StatefulSetAdapter) GetSpecReplicas(obj metav1.Object) *int32 {
return obj.(*appsv1.StatefulSet).Spec.Replicas
}
func (a *StatefulSetAdapter) GetSpecPartition(obj metav1.Object, pods []*corev1.Pod) *int32 {
set := obj.(*appsv1.StatefulSet)
if set.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType {
revision := getRevision(&set.ObjectMeta)
specPartition = getCurrentPartition(pods, revision)
return getCurrentPartition(pods, revision)
} else if set.Spec.UpdateStrategy.RollingUpdate != nil &&
set.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
specPartition = set.Spec.UpdateStrategy.RollingUpdate.Partition
return set.Spec.UpdateStrategy.RollingUpdate.Partition
}
return nil
}
statusReplicas = set.Status.Replicas
statusReadyReplicas = set.Status.ReadyReplicas
statusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision)
func (a *StatefulSetAdapter) GetStatusReplicas(obj metav1.Object) int32 {
return obj.(*appsv1.StatefulSet).Status.Replicas
}
return
func (a *StatefulSetAdapter) GetStatusReadyReplicas(obj metav1.Object) int32 {
return obj.(*appsv1.StatefulSet).Status.ReadyReplicas
}
// GetSubsetFailure returns the failure information of the subset.
@ -232,22 +235,6 @@ func (a *StatefulSetAdapter) getStatefulSetPods(set *appsv1.StatefulSet) ([]*cor
return claimedPods, nil
}
func calculateUpdatedReplicas(podList []*corev1.Pod, updatedRevision string) (updatedReplicas, updatedReadyReplicas int32) {
for _, pod := range podList {
revision := getRevision(&pod.ObjectMeta)
// Only count pods that are updated and are not terminating
if revision == updatedRevision && pod.GetDeletionTimestamp() == nil {
updatedReplicas++
if podutil.IsPodReady(pod) {
updatedReadyReplicas++
}
}
}
return
}
// deleteStuckPods tries to work around the blocking issue https://github.com/kubernetes/kubernetes/issues/67250
func (a *StatefulSetAdapter) deleteStuckPods(set *appsv1.StatefulSet, revision string, partition int32) error {
pods, err := a.getStatefulSetPods(set)

View File

@ -18,6 +18,7 @@ package uniteddeployment
import (
"fmt"
"math"
"sort"
"strings"
@ -68,6 +69,30 @@ func NewReplicaAllocator(ud *appsv1alpha1.UnitedDeployment) ReplicaAllocator {
return &specificAllocator{UnitedDeployment: ud}
}
// RunningReplicas refers to the number of Pods that an unschedulable subset can safely accommodate.
// Exceeding this number may lead to scheduling failures within that subset.
// This value is only effective in the Adaptive scheduling strategy.
func getSubsetRunningReplicas(nameToSubset *map[string]*Subset) map[string]int32 {
if nameToSubset == nil {
return nil
}
var result = make(map[string]int32)
for name, subset := range *nameToSubset {
result[name] = subset.Status.Replicas - subset.Status.UnschedulableStatus.PendingPods
}
return result
}
func isSubSetUnschedulable(name string, nameToSubset *map[string]*Subset) (unschedulable bool) {
if subsetObj, ok := (*nameToSubset)[name]; ok {
unschedulable = subsetObj.Status.UnschedulableStatus.Unschedulable
} else {
// newly created subsets are all schedulable
unschedulable = false
}
return
}
type specificAllocator struct {
*appsv1alpha1.UnitedDeployment
subsets *subsetInfos
@ -250,43 +275,58 @@ type elasticAllocator struct {
// maxReplicas: nil # will be satisfied with 4th priority
//
// the results of map will be: {"subset-a": 3, "subset-b": 2}
func (ac *elasticAllocator) Alloc(_ *map[string]*Subset) (*map[string]int32, error) {
func (ac *elasticAllocator) Alloc(nameToSubset *map[string]*Subset) (*map[string]int32, error) {
replicas := int32(1)
if ac.Spec.Replicas != nil {
replicas = *ac.Spec.Replicas
}
minReplicasMap, maxReplicasMap, err := ac.validateAndCalculateMinMaxMap(replicas)
minReplicasMap, maxReplicasMap, err := ac.validateAndCalculateMinMaxMap(replicas, nameToSubset)
if err != nil {
return nil, err
}
return ac.alloc(replicas, minReplicasMap, maxReplicasMap), nil
}
func (ac *elasticAllocator) validateAndCalculateMinMaxMap(replicas int32) (map[string]int32, map[string]int32, error) {
totalMin, totalMax := int64(0), int64(0)
func (ac *elasticAllocator) validateAndCalculateMinMaxMap(replicas int32, nameToSubset *map[string]*Subset) (map[string]int32, map[string]int32, error) {
numSubset := len(ac.Spec.Topology.Subsets)
minReplicasMap := make(map[string]int32, numSubset)
maxReplicasMap := make(map[string]int32, numSubset)
runningReplicasMap := getSubsetRunningReplicas(nameToSubset)
for index, subset := range ac.Spec.Topology.Subsets {
minReplicas := int32(0)
maxReplicas := int32(math.MaxInt32)
if subset.MinReplicas != nil {
minReplicas, _ = ParseSubsetReplicas(replicas, *subset.MinReplicas)
}
totalMin += int64(minReplicas)
minReplicasMap[subset.Name] = minReplicas
maxReplicas := int32(1000000)
if subset.MaxReplicas != nil {
maxReplicas, _ = ParseSubsetReplicas(replicas, *subset.MaxReplicas)
}
totalMax += int64(maxReplicas)
if ac.Spec.Topology.ScheduleStrategy.IsAdaptive() {
unschedulable := isSubSetUnschedulable(subset.Name, nameToSubset)
// This means that in the Adaptive scheduling strategy, an unschedulable subset can only be scaled down, not scaled up.
if runningReplicas, ok := runningReplicasMap[subset.Name]; unschedulable && ok {
klog.InfoS("Assign min(runningReplicas, minReplicas/maxReplicas) for unschedulable subset",
"subset", subset.Name)
minReplicas = integer.Int32Min(runningReplicas, minReplicas)
maxReplicas = integer.Int32Min(runningReplicas, maxReplicas)
}
// To prevent healthy pod from being deleted
if runningReplicas := runningReplicasMap[subset.Name]; !unschedulable && runningReplicas > minReplicas {
klog.InfoS("Assign min(runningReplicas, maxReplicas) to minReplicas to avoid deleting running pods",
"subset", subset.Name, "minReplicas", minReplicas, "runningReplicas", runningReplicas, "maxReplicas", maxReplicas)
minReplicas = integer.Int32Min(runningReplicas, maxReplicas)
}
}
minReplicasMap[subset.Name] = minReplicas
maxReplicasMap[subset.Name] = maxReplicas
if minReplicas > maxReplicas {
return nil, nil, fmt.Errorf("subset[%d].maxReplicas must be more than or equal to minReplicas", index)
}
}
klog.InfoS("elastic allocate maps calculated", "minReplicasMap", minReplicasMap, "maxReplicasMap", maxReplicasMap)
return minReplicasMap, maxReplicasMap, nil
}

View File

@ -39,7 +39,7 @@ func TestScaleReplicas(t *testing.T) {
createSubset("t4", 2),
}
allocator := sortToAllocator(infos)
allocator.AllocateReplicas(5, &map[string]int32{})
_, _ = allocator.AllocateReplicas(5, &map[string]int32{})
if " t1 -> 1; t3 -> 1; t4 -> 1; t2 -> 2;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
@ -48,7 +48,7 @@ func TestScaleReplicas(t *testing.T) {
createSubset("t1", 2),
}
allocator = sortToAllocator(infos)
allocator.AllocateReplicas(0, &map[string]int32{})
_, _ = allocator.AllocateReplicas(0, &map[string]int32{})
if " t1 -> 0;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
@ -60,7 +60,7 @@ func TestScaleReplicas(t *testing.T) {
createSubset("t4", 2),
}
allocator = sortToAllocator(infos)
allocator.AllocateReplicas(17, &map[string]int32{})
_, _ = allocator.AllocateReplicas(17, &map[string]int32{})
if " t1 -> 4; t3 -> 4; t4 -> 4; t2 -> 5;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
@ -72,7 +72,7 @@ func TestScaleReplicas(t *testing.T) {
createSubset("t4", 2),
}
allocator = sortToAllocator(infos)
allocator.AllocateReplicas(9, &map[string]int32{})
_, _ = allocator.AllocateReplicas(9, &map[string]int32{})
if " t1 -> 2; t3 -> 2; t4 -> 2; t2 -> 3;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
@ -82,7 +82,7 @@ func TestScaleReplicas(t *testing.T) {
createSubset("t2", 10),
}
allocator = sortToAllocator(infos)
allocator.AllocateReplicas(19, &map[string]int32{})
_, _ = allocator.AllocateReplicas(19, &map[string]int32{})
if " t1 -> 9; t2 -> 10;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
@ -92,7 +92,7 @@ func TestScaleReplicas(t *testing.T) {
createSubset("t2", 10),
}
allocator = sortToAllocator(infos)
allocator.AllocateReplicas(21, &map[string]int32{})
_, _ = allocator.AllocateReplicas(21, &map[string]int32{})
if " t1 -> 10; t2 -> 11;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
@ -106,7 +106,7 @@ func TestSpecifyValidReplicas(t *testing.T) {
createSubset("t4", 2),
}
allocator := sortToAllocator(infos)
allocator.AllocateReplicas(27, &map[string]int32{
_, _ = allocator.AllocateReplicas(27, &map[string]int32{
"t1": 4,
"t3": 4,
})
@ -121,7 +121,7 @@ func TestSpecifyValidReplicas(t *testing.T) {
createSubset("t4", 2),
}
allocator = sortToAllocator(infos)
allocator.AllocateReplicas(8, &map[string]int32{
_, _ = allocator.AllocateReplicas(8, &map[string]int32{
"t1": 4,
"t3": 4,
})
@ -136,7 +136,7 @@ func TestSpecifyValidReplicas(t *testing.T) {
createSubset("t4", 2),
}
allocator = sortToAllocator(infos)
allocator.AllocateReplicas(16, &map[string]int32{
_, _ = allocator.AllocateReplicas(16, &map[string]int32{
"t1": 4,
"t2": 4,
"t3": 4,
@ -153,7 +153,7 @@ func TestSpecifyValidReplicas(t *testing.T) {
createSubset("t4", 2),
}
allocator = sortToAllocator(infos)
allocator.AllocateReplicas(10, &map[string]int32{
_, _ = allocator.AllocateReplicas(10, &map[string]int32{
"t1": 1,
"t2": 2,
"t3": 3,
@ -169,7 +169,7 @@ func TestSpecifyValidReplicas(t *testing.T) {
createSubset("t4", 2),
}
allocator = sortToAllocator(infos)
allocator.AllocateReplicas(10, &map[string]int32{
_, _ = allocator.AllocateReplicas(10, &map[string]int32{
"t1": 1,
"t2": 2,
"t3": 3,
@ -186,7 +186,7 @@ func TestSpecifyValidReplicas(t *testing.T) {
createSubset("t4", 2),
}
allocator = sortToAllocator(infos)
allocator.AllocateReplicas(-1, &map[string]int32{
_, _ = allocator.AllocateReplicas(-1, &map[string]int32{
"t1": 1,
"t2": 2,
"t3": 3,
@ -203,7 +203,7 @@ func TestSpecifyInvalidReplicas(t *testing.T) {
createSubset("t2", 4),
}
allocator := sortToAllocator(infos)
allocator.AllocateReplicas(14, &map[string]int32{
_, _ = allocator.AllocateReplicas(14, &map[string]int32{
"t1": 6,
"t2": 6,
})
@ -216,7 +216,7 @@ func TestSpecifyInvalidReplicas(t *testing.T) {
createSubset("t2", 4),
}
allocator = sortToAllocator(infos)
allocator.AllocateReplicas(14, &map[string]int32{
_, _ = allocator.AllocateReplicas(14, &map[string]int32{
"t1": 10,
"t2": 11,
})
@ -280,16 +280,16 @@ func TestCapacityAllocator(t *testing.T) {
ud.Spec.Replicas = pointer.Int32(cs.replicas)
ud.Spec.Topology.Subsets = []appsv1alpha1.Subset{}
for index := range cs.minReplicas {
min := intstr.FromInt(int(cs.minReplicas[index]))
var max *intstr.IntOrString
minReplicas := intstr.FromInt32(cs.minReplicas[index])
var maxReplicas *intstr.IntOrString
if cs.maxReplicas[index] != -1 {
m := intstr.FromInt(int(cs.maxReplicas[index]))
max = &m
m := intstr.FromInt32(cs.maxReplicas[index])
maxReplicas = &m
}
ud.Spec.Topology.Subsets = append(ud.Spec.Topology.Subsets, appsv1alpha1.Subset{
Name: fmt.Sprintf("subset-%d", index),
MinReplicas: &min,
MaxReplicas: max,
MinReplicas: &minReplicas,
MaxReplicas: maxReplicas,
})
}
@ -307,6 +307,209 @@ func TestCapacityAllocator(t *testing.T) {
}
}
func TestAdaptiveElasticAllocator(t *testing.T) {
getUnitedDeploymentAndSubsets := func(totalReplicas, minReplicas, maxReplicas, failedPods int32) (
*appsv1alpha1.UnitedDeployment, map[string]*Subset) {
minR, maxR := intstr.FromInt32(minReplicas), intstr.FromInt32(maxReplicas)
return &appsv1alpha1.UnitedDeployment{
Spec: appsv1alpha1.UnitedDeploymentSpec{
Replicas: &totalReplicas,
Topology: appsv1alpha1.Topology{
Subsets: []appsv1alpha1.Subset{
{
Name: "subset-1",
MinReplicas: &minR,
MaxReplicas: &maxR,
},
{
Name: "subset-2",
},
},
ScheduleStrategy: appsv1alpha1.UnitedDeploymentScheduleStrategy{
Type: appsv1alpha1.AdaptiveUnitedDeploymentScheduleStrategyType,
},
},
},
}, map[string]*Subset{
"subset-1": {
Status: SubsetStatus{
UnschedulableStatus: SubsetUnschedulableStatus{
Unschedulable: true,
PendingPods: failedPods,
},
Replicas: maxReplicas,
},
Spec: SubsetSpec{Replicas: minReplicas},
},
//"subset-2": {
// Status: SubsetStatus{},
//},
}
}
cases := []struct {
name string
totalReplicas, minReplicas, maxReplicas, pendingPods int32
subset1Replicas, subset2Replicas int32
}{
{
name: "5 pending pods > maxReplicas -> 0, 10",
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 5,
subset1Replicas: 0, subset2Replicas: 10,
},
{
name: "4 pending pods = maxReplicas -> 0, 10",
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 4,
subset1Replicas: 0, subset2Replicas: 10,
},
{
name: "3 pending pods < maxReplicas -> 1, 9",
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 3,
subset1Replicas: 1, subset2Replicas: 9,
},
{
name: "2 pending pods = minReplicas -> 2, 8",
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 2,
subset1Replicas: 2, subset2Replicas: 8,
},
{
name: "1 pending pods < minReplicas -> 3, 7",
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 1,
subset1Replicas: 3, subset2Replicas: 7,
},
{
name: "no pending pods -> 2, 8",
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 0,
subset1Replicas: 4, subset2Replicas: 6,
},
}
for _, testCase := range cases {
t.Run(testCase.name, func(t *testing.T) {
ud, subsets := getUnitedDeploymentAndSubsets(
testCase.totalReplicas, testCase.minReplicas, testCase.maxReplicas, testCase.pendingPods)
alloc, err := NewReplicaAllocator(ud).Alloc(&subsets)
if err != nil {
t.Fatalf("unexpected alloc error %v", err)
} else {
subset1Replicas, subset2Replicas := (*alloc)["subset-1"], (*alloc)["subset-2"]
if subset1Replicas != testCase.subset1Replicas || subset2Replicas != testCase.subset2Replicas {
t.Fatalf("subset1Replicas = %d, subset1Replicas = %d, test case is %+v",
subset1Replicas, subset2Replicas, testCase)
}
}
})
}
}
func TestProtectingRunningPodsAdaptive(t *testing.T) {
getUnitedDeploymentAndSubsets := func(subset1MinReplicas, subset1MaxReplicas, subset1RunningReplicas, subset2RunningReplicas int32) (
*appsv1alpha1.UnitedDeployment, map[string]*Subset) {
minR, maxR := intstr.FromInt32(subset1MinReplicas), intstr.FromInt32(subset1MaxReplicas)
totalReplicas := subset1RunningReplicas + subset2RunningReplicas
return &appsv1alpha1.UnitedDeployment{
Spec: appsv1alpha1.UnitedDeploymentSpec{
Replicas: &totalReplicas,
Topology: appsv1alpha1.Topology{
Subsets: []appsv1alpha1.Subset{
{
Name: "subset-1",
MinReplicas: &minR,
MaxReplicas: &maxR,
},
{
Name: "subset-2",
},
},
ScheduleStrategy: appsv1alpha1.UnitedDeploymentScheduleStrategy{
Type: appsv1alpha1.AdaptiveUnitedDeploymentScheduleStrategyType,
},
},
},
}, map[string]*Subset{
"subset-1": {
Status: SubsetStatus{
Replicas: subset1RunningReplicas,
},
},
"subset-2": {
Status: SubsetStatus{
Replicas: subset2RunningReplicas,
},
},
}
}
cases := []struct {
name string
subset1MinReplicas, subset1MaxReplicas, subset1RunningReplicas, subset2RunningReplicas int32
subset1Replicas, subset2Replicas int32
}{
{
name: "subset1: [2,4], 1,1 -> 2,0",
subset1MinReplicas: 2,
subset1MaxReplicas: 4,
subset1RunningReplicas: 1,
subset2RunningReplicas: 1,
subset1Replicas: 2,
subset2Replicas: 0,
},
{
name: "subset1: [2,4], 2,1 -> 2,1",
subset1MinReplicas: 2,
subset1MaxReplicas: 4,
subset1RunningReplicas: 2,
subset2RunningReplicas: 1,
subset1Replicas: 2,
subset2Replicas: 1,
},
{
name: "subset1: [2,4], 1,2 -> 2,1",
subset1MinReplicas: 2,
subset1MaxReplicas: 4,
subset1RunningReplicas: 1,
subset2RunningReplicas: 2,
subset1Replicas: 2,
subset2Replicas: 1,
},
{
name: "subset1: [2,4], 0,4 -> 2,2",
subset1MinReplicas: 2,
subset1MaxReplicas: 4,
subset1RunningReplicas: 0,
subset2RunningReplicas: 4,
subset1Replicas: 2,
subset2Replicas: 2,
},
{
name: "subset1: [2,4], 3,1 -> 3,1",
subset1MinReplicas: 2,
subset1MaxReplicas: 4,
subset1RunningReplicas: 3,
subset2RunningReplicas: 1,
subset1Replicas: 3,
subset2Replicas: 1,
},
}
for _, c := range cases {
ud, nameToSubset := getUnitedDeploymentAndSubsets(c.subset1MinReplicas, c.subset1MaxReplicas, c.subset1RunningReplicas, c.subset2RunningReplicas)
alloc, err := NewReplicaAllocator(ud).Alloc(&nameToSubset)
if err != nil {
t.Fatalf("unexpected alloc error %v", err)
} else {
subset1Replicas, subset2Replicas := (*alloc)["subset-1"], (*alloc)["subset-2"]
if subset1Replicas != c.subset1Replicas || subset2Replicas != c.subset2Replicas {
t.Logf("subset1Replicas got %d, expect %d, subset1Replicas got %d, expect %d", subset1Replicas, c.subset1Replicas, subset2Replicas, c.subset2Replicas)
t.Fail()
}
}
}
// invalid inputs
ud, nameToSubset := getUnitedDeploymentAndSubsets(4, 2, 0, 0)
_, err := NewReplicaAllocator(ud).Alloc(&nameToSubset)
if err == nil {
t.Logf("expected error not happen")
t.Fail()
}
}
func createSubset(name string, replicas int32) *nameToReplicas {
return &nameToReplicas{
Replicas: replicas,

View File

@ -17,9 +17,9 @@ limitations under the License.
package uniteddeployment
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// Subset stores the details of a subset resource owned by one UnitedDeployment.
@ -36,6 +36,7 @@ type SubsetSpec struct {
Replicas int32
UpdateStrategy SubsetUpdateStrategy
SubsetRef ResourceRef
SubsetPods []*corev1.Pod
}
// SubsetStatus stores the observed state of the Subset.
@ -45,6 +46,12 @@ type SubsetStatus struct {
ReadyReplicas int32
UpdatedReplicas int32
UpdatedReadyReplicas int32
UnschedulableStatus SubsetUnschedulableStatus
}
type SubsetUnschedulableStatus struct {
Unschedulable bool
PendingPods int32
}
// SubsetUpdateStrategy stores the strategy detail of the Subset.
@ -72,7 +79,7 @@ type ControlInterface interface {
CreateSubset(ud *appsv1alpha1.UnitedDeployment, unit string, revision string, replicas, partition int32) error
// UpdateSubset updates the target subset with the input information.
UpdateSubset(subSet *Subset, ud *appsv1alpha1.UnitedDeployment, revision string, replicas, partition int32) error
// UpdateSubset is used to delete the input subset.
// DeleteSubset is used to delete the input subset.
DeleteSubset(*Subset) error
// GetSubsetFailure extracts the subset failure message to expose on UnitedDeployment status.
GetSubsetFailure(*Subset) *string

View File

@ -39,7 +39,7 @@ type SubsetControl struct {
adapter adapter.Adapter
}
// GetAllSubsets returns all of subsets owned by the UnitedDeployment.
// GetAllSubsets returns all subsets owned by the UnitedDeployment.
func (m *SubsetControl) GetAllSubsets(ud *alpha1.UnitedDeployment, updatedRevision string) (subSets []*Subset, err error) {
selector, err := metav1.LabelSelectorAsSelector(ud.Spec.Selector)
if err != nil {
@ -132,11 +132,6 @@ func (m *SubsetControl) IsExpected(subSet *Subset, revision string) bool {
}
func (m *SubsetControl) convertToSubset(set metav1.Object, updatedRevision string) (*Subset, error) {
subSetName, err := getSubsetNameFrom(set)
if err != nil {
return nil, err
}
subset := &Subset{}
subset.ObjectMeta = metav1.ObjectMeta{
Name: set.GetName(),
@ -154,28 +149,32 @@ func (m *SubsetControl) convertToSubset(set metav1.Object, updatedRevision strin
OwnerReferences: set.GetOwnerReferences(),
Finalizers: set.GetFinalizers(),
}
pods, err := m.adapter.GetSubsetPods(set)
if err != nil {
return nil, err
}
subset.Spec.SubsetPods = pods
subSetName, err := getSubsetNameFrom(set)
if err != nil {
return nil, err
}
subset.Spec.SubsetName = subSetName
specReplicas, specPartition, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas, err := m.adapter.GetReplicaDetails(set, updatedRevision)
if err != nil {
return subset, err
}
if specReplicas != nil {
if specReplicas := m.adapter.GetSpecReplicas(set); specReplicas != nil {
subset.Spec.Replicas = *specReplicas
}
if specPartition != nil {
if specPartition := m.adapter.GetSpecPartition(set, pods); specPartition != nil {
subset.Spec.UpdateStrategy.Partition = *specPartition
}
subset.Spec.SubsetRef.Resources = append(subset.Spec.SubsetRef.Resources, set)
subset.Status.ObservedGeneration = m.adapter.GetStatusObservedGeneration(set)
subset.Status.Replicas = statusReplicas
subset.Status.ReadyReplicas = statusReadyReplicas
subset.Status.UpdatedReplicas = statusUpdatedReplicas
subset.Status.UpdatedReadyReplicas = statusUpdatedReadyReplicas
subset.Spec.SubsetRef.Resources = append(subset.Spec.SubsetRef.Resources, set)
subset.Status.Replicas = m.adapter.GetStatusReplicas(set)
subset.Status.ReadyReplicas = m.adapter.GetStatusReadyReplicas(set)
subset.Status.UpdatedReplicas, subset.Status.UpdatedReadyReplicas = adapter.CalculateUpdatedReplicas(pods, updatedRevision)
return subset, nil
}

View File

@ -0,0 +1,228 @@
package uniteddeployment
import (
"reflect"
"testing"
"github.com/alibaba/pouch/pkg/randomid"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/apis/apps/v1beta1"
"github.com/openkruise/kruise/pkg/controller/uniteddeployment/adapter"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
func TestSubsetControl_convertToSubset(t *testing.T) {
v1, v2 := "v1", "v2"
selectorLabels := map[string]string{
"foo": "bar",
}
subsetLabels := map[string]string{
appsv1alpha1.ControllerRevisionHashLabelKey: v2,
appsv1alpha1.SubSetNameLabelKey: "subset-1",
}
getPod := func(revision string) *corev1.Pod {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: randomid.Generate(),
Labels: selectorLabels,
},
Status: corev1.PodStatus{
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
},
},
},
}
pod = pod.DeepCopy()
pod.Labels[appsv1alpha1.ControllerRevisionHashLabelKey] = revision
return pod
}
scheme := runtime.NewScheme()
_ = appsv1alpha1.AddToScheme(scheme)
_ = v1beta1.AddToScheme(scheme)
_ = appsv1.AddToScheme(scheme)
selector := &metav1.LabelSelector{
MatchLabels: selectorLabels,
}
asts := &v1beta1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "asts",
Labels: subsetLabels,
},
Spec: v1beta1.StatefulSetSpec{
Selector: selector,
Replicas: ptr.To(int32(2)),
UpdateStrategy: v1beta1.StatefulSetUpdateStrategy{
Type: appsv1.OnDeleteStatefulSetStrategyType,
},
},
Status: v1beta1.StatefulSetStatus{
ObservedGeneration: 1,
Replicas: 2,
ReadyReplicas: 1,
},
}
oneIntStr := intstr.FromInt32(int32(1))
cloneset := &appsv1alpha1.CloneSet{
ObjectMeta: metav1.ObjectMeta{
Name: "cloneset",
Labels: subsetLabels,
},
Spec: appsv1alpha1.CloneSetSpec{
Selector: selector,
Replicas: ptr.To(int32(2)),
UpdateStrategy: appsv1alpha1.CloneSetUpdateStrategy{
Partition: &oneIntStr,
},
},
Status: appsv1alpha1.CloneSetStatus{
ObservedGeneration: 1,
Replicas: 2,
ReadyReplicas: 1,
},
}
rs := &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: "rs",
Labels: selectorLabels,
},
Spec: appsv1.ReplicaSetSpec{
Replicas: ptr.To(int32(2)),
Selector: selector,
},
}
deploy := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "deploy",
Labels: subsetLabels,
},
Spec: appsv1.DeploymentSpec{
Selector: selector,
Replicas: ptr.To(int32(2)),
},
Status: appsv1.DeploymentStatus{
ObservedGeneration: 1,
Replicas: 2,
ReadyReplicas: 1,
},
}
sts := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "sts",
Labels: subsetLabels,
},
Spec: appsv1.StatefulSetSpec{
Selector: selector,
Replicas: ptr.To(int32(2)),
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: appsv1.OnDeleteStatefulSetStrategyType,
},
},
Status: appsv1.StatefulSetStatus{
ObservedGeneration: 1,
Replicas: 2,
ReadyReplicas: 1,
},
}
pod1, pod2 := getPod(v1), getPod(v2)
fakeClient := fake.NewFakeClient(pod1, pod2, asts, cloneset, deploy, sts, rs)
want := func(partition int32) *Subset {
return &Subset{
Spec: SubsetSpec{
SubsetName: "subset-1",
Replicas: 2,
UpdateStrategy: SubsetUpdateStrategy{
Partition: partition,
},
},
Status: SubsetStatus{
ObservedGeneration: 1,
Replicas: 2,
ReadyReplicas: 1,
UpdatedReplicas: 1,
UpdatedReadyReplicas: 1,
},
}
}
type fields struct {
adapter adapter.Adapter
}
type args struct {
set metav1.Object
updatedRevision string
}
tests := []struct {
name string
fields fields
args args
want *Subset
wantErr bool
}{
{
name: "asts",
fields: fields{adapter: &adapter.AdvancedStatefulSetAdapter{
Scheme: scheme,
Client: fakeClient,
}},
args: args{asts, v2},
want: want(1),
}, {
name: "cloneset",
fields: fields{adapter: &adapter.CloneSetAdapter{
Client: fakeClient,
Scheme: scheme,
}},
args: args{cloneset, v2},
want: want(1),
}, {
name: "deploy",
fields: fields{adapter: &adapter.DeploymentAdapter{
Client: fakeClient,
Scheme: scheme,
}},
args: args{deploy, v2},
want: want(0),
}, {
name: "sts",
fields: fields{adapter: &adapter.StatefulSetAdapter{
Client: fakeClient,
Scheme: scheme,
}},
args: args{sts, v2},
want: want(1),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &SubsetControl{
adapter: tt.fields.adapter,
}
got, err := m.convertToSubset(tt.args.set, tt.args.updatedRevision)
if (err != nil) != tt.wantErr {
t.Errorf("convertToSubset() error = %v, wantErr %v", err, tt.wantErr)
return
}
got.ObjectMeta = metav1.ObjectMeta{}
if res := got.Spec.SubsetRef.Resources; len(res) != 1 || res[0].GetName() != tt.args.set.GetName() {
t.Errorf("convertToSubset() subsetRef.Resources = %+v", res)
}
got.Spec.SubsetRef.Resources = nil
if len(got.Spec.SubsetPods) != 2 {
t.Errorf("convertToSubset() SubsetPods got = %+v, want %+v", got, tt.want)
}
got.Spec.SubsetPods = nil
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("convertToSubset() got = %+v, want %+v", got, tt.want)
}
})
}
}

View File

@ -20,7 +20,9 @@ import (
"context"
"flag"
"fmt"
"math"
"reflect"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
@ -38,10 +40,12 @@ import (
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
"github.com/openkruise/kruise/pkg/controller/uniteddeployment/adapter"
utilcontroller "github.com/openkruise/kruise/pkg/controller/util"
"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
utildiscovery "github.com/openkruise/kruise/pkg/util/discovery"
"github.com/openkruise/kruise/pkg/util/ratelimiter"
"github.com/openkruise/kruise/pkg/util/requeueduration"
)
func init() {
@ -51,16 +55,17 @@ func init() {
var (
concurrentReconciles = 3
controllerKind = appsv1alpha1.SchemeGroupVersion.WithKind("UnitedDeployment")
durationStore = requeueduration.DurationStore{}
)
const (
controllerName = "uniteddeployment-controller"
eventTypeRevisionProvision = "RevisionProvision"
eventTypeFindSubsets = "FindSubsets"
eventTypeDupSubsetsDelete = "DeleteDuplicatedSubsets"
eventTypeSubsetsUpdate = "UpdateSubset"
eventTypeSpecifySubbsetReplicas = "SpecifySubsetReplicas"
eventTypeRevisionProvision = "RevisionProvision"
eventTypeFindSubsets = "FindSubsets"
eventTypeDupSubsetsDelete = "DeleteDuplicatedSubsets"
eventTypeSubsetsUpdate = "UpdateSubset"
eventTypeSpecifySubsetReplicas = "SpecifySubsetReplicas"
slowStartInitialBatchSize = 1
)
@ -111,7 +116,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}
// Watch for changes to UnitedDeployment
err = c.Watch(source.Kind(mgr.GetCache(), &appsv1alpha1.UnitedDeployment{}), &handler.EnqueueRequestForObject{})
err = c.Watch(source.Kind(mgr.GetCache(), &appsv1alpha1.UnitedDeployment{}), &eventHandler{})
if err != nil {
return err
}
@ -181,9 +186,16 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci
if instance.DeletionTimestamp != nil {
return reconcile.Result{}, nil
}
oldStatus := instance.Status.DeepCopy()
currentRevision, updatedRevision, _, collisionCount, err := r.constructUnitedDeploymentRevisions(instance)
if satisfied, _ := ResourceVersionExpectation.IsSatisfied(instance); !satisfied {
klog.InfoS("resource version not up-to-date, requeue in 1s", "resourceVersion", instance.GetResourceVersion(), "unitedDeployment", request)
return reconcile.Result{RequeueAfter: time.Second}, nil
}
klog.InfoS("Updated Resource observed", "unitedDeployment", klog.KObj(instance), "ResourceVersion", instance.GetResourceVersion())
oldStatus := instance.Status.DeepCopy()
instance.InitSubsetStatuses()
currentRevision, updatedRevision, _, _, err := r.constructUnitedDeploymentRevisions(instance)
if err != nil {
klog.ErrorS(err, "Failed to construct controller revision of UnitedDeployment", "unitedDeployment", klog.KObj(instance))
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeRevisionProvision), err.Error())
@ -192,7 +204,7 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci
control, subsetType := r.getSubsetControls(instance)
klog.V(4).InfoS("Got all subsets of UnitedDeployment", "unitedDeployment", request)
klog.V(4).InfoS("Got all subsets of UnitedDeployment", "unitedDeployment", klog.KObj(instance))
expectedRevision := currentRevision.Name
if updatedRevision != nil {
expectedRevision = updatedRevision.Name
@ -205,12 +217,18 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci
return reconcile.Result{}, err
}
if instance.Spec.Topology.ScheduleStrategy.IsAdaptive() {
for name, subset := range *nameToSubset {
manageUnschedulableStatusForExistingSubset(name, subset, instance)
}
}
nextReplicas, err := NewReplicaAllocator(instance).Alloc(nameToSubset)
klog.V(4).InfoS("Got UnitedDeployment next replicas", "unitedDeployment", klog.KObj(instance), "nextReplicas", nextReplicas)
if err != nil {
klog.ErrorS(err, "UnitedDeployment specified subset replicas is ineffective", "unitedDeployment", klog.KObj(instance))
r.recorder.Eventf(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed %s",
eventTypeSpecifySubbsetReplicas), "Specified subset replicas is ineffective: %s", err.Error())
eventTypeSpecifySubsetReplicas), "Specified subset replicas is ineffective: %s", err.Error())
return reconcile.Result{}, err
}
@ -233,10 +251,17 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci
}
newStatus.LabelSelector = selector.String()
return r.updateStatus(instance, newStatus, oldStatus, nameToSubset, nextReplicas, nextPartitions, currentRevision, updatedRevision, collisionCount, control)
requeueAfter := durationStore.Pop(getUnitedDeploymentKey(instance))
if requeueAfter > 0 {
klog.InfoS("Requeue needed", "afterSeconds", requeueAfter.Seconds())
}
newStatus = r.calculateStatus(newStatus, nameToSubset, nextReplicas, nextPartitions, currentRevision, updatedRevision, control)
return reconcile.Result{RequeueAfter: requeueAfter}, r.updateStatus(instance, newStatus, oldStatus)
}
func (r *ReconcileUnitedDeployment) getNameToSubset(instance *appsv1alpha1.UnitedDeployment, control ControlInterface, expectedRevision string) (*map[string]*Subset, error) {
// getNameToSubset fetches all subset workloads in cluster managed by this UnitedDeployment
// if adaptive scheduling strategy is used, existing subset unscheduable status will be set true here (newly created subsets are default false)
func (r *ReconcileUnitedDeployment) getNameToSubset(instance *appsv1alpha1.UnitedDeployment, control ControlInterface, expectedRevision string) (name2Subset *map[string]*Subset, err error) {
subSets, err := control.GetAllSubsets(instance, expectedRevision)
if err != nil {
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeFindSubsets), err.Error())
@ -244,9 +269,9 @@ func (r *ReconcileUnitedDeployment) getNameToSubset(instance *appsv1alpha1.Unite
}
klog.V(4).InfoS("Classify UnitedDeployment by subSet name", "unitedDeployment", klog.KObj(instance))
nameToSubsets := r.classifySubsetBySubsetName(instance, subSets)
nameToSubsets := r.classifySubsetBySubsetName(subSets)
nameToSubset, err := r.deleteDupSubset(instance, nameToSubsets, control)
nameToSubset, err := r.deleteDupSubset(nameToSubsets, control)
if err != nil {
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeDupSubsetsDelete), err.Error())
return nil, fmt.Errorf("fail to manage duplicate Subset of UnitedDeployment %s/%s: %s", instance.Namespace, instance.Name, err)
@ -255,6 +280,58 @@ func (r *ReconcileUnitedDeployment) getNameToSubset(instance *appsv1alpha1.Unite
return nameToSubset, nil
}
// manageUnschedulableStatusForExistingSubset manages subset unscheduable status and store them in the Subset.Status.UnschedulableStatus field.
func manageUnschedulableStatusForExistingSubset(name string, subset *Subset, ud *appsv1alpha1.UnitedDeployment) {
now := time.Now()
unitedDeploymentKey := getUnitedDeploymentKey(ud)
status := ud.Status.GetSubsetStatus(name)
if status == nil {
klog.InfoS("SubsetStatus not found", "subset", name)
return
}
condition := status.GetCondition(appsv1alpha1.UnitedDeploymentSubsetSchedulable)
// process with existing condition
if condition != nil && condition.Status == corev1.ConditionFalse {
// The unschedulable state of a subset lasts for at least 5 minutes.
recoverTime := condition.LastTransitionTime.Add(ud.Spec.Topology.ScheduleStrategy.GetUnschedulableLastDuration())
klog.InfoS("existing unschedulable subset found", "subset", name, "recoverTime", recoverTime, "unitedDeployment", klog.KObj(ud))
if now.Before(recoverTime) {
klog.InfoS("subset is still unschedulable", "subset", name, "unitedDeployment", klog.KObj(ud))
durationStore.Push(unitedDeploymentKey, recoverTime.Sub(now))
subset.Status.UnschedulableStatus.Unschedulable = true
} else {
klog.InfoS("unschedulable subset recovered", "subset", name, "unitedDeployment", klog.KObj(ud))
status.SetCondition(appsv1alpha1.UnitedDeploymentSubsetSchedulable, corev1.ConditionTrue, "recover",
fmt.Sprintf("unschedulable subset recovered after %f seconds", ud.Spec.Topology.ScheduleStrategy.GetUnschedulableLastDuration().Seconds()))
}
}
// Maybe there exist some pending pods because the subset is unschedulable.
if subset.Status.ReadyReplicas < subset.Status.Replicas {
var requeueAfter time.Duration = math.MaxInt64
for _, pod := range subset.Spec.SubsetPods {
timeouted, checkAfter := utilcontroller.GetTimeBeforePendingTimeout(pod, ud.Spec.Topology.ScheduleStrategy.GetRescheduleCriticalDuration())
if timeouted {
subset.Status.UnschedulableStatus.PendingPods++
}
if checkAfter > 0 && checkAfter < requeueAfter {
requeueAfter = checkAfter
}
}
if requeueAfter < math.MaxInt64 {
durationStore.Push(unitedDeploymentKey, requeueAfter)
}
if subset.Status.UnschedulableStatus.PendingPods > 0 {
klog.InfoS("subset has pending pods", "subset", subset.Name,
"pendingPods", subset.Status.UnschedulableStatus.PendingPods, "unitedDeployment", klog.KObj(ud))
subset.Status.UnschedulableStatus.Unschedulable = true
status.SetCondition(appsv1alpha1.UnitedDeploymentSubsetSchedulable, corev1.ConditionFalse, "reschedule",
"timeout pending pods found")
durationStore.Push(unitedDeploymentKey, ud.Spec.Topology.ScheduleStrategy.GetUnschedulableLastDuration())
}
}
klog.InfoS("subset status", "status", status, "unitedDeployment", klog.KObj(ud))
}
func calcNextPartitions(ud *appsv1alpha1.UnitedDeployment, nextReplicas *map[string]int32) *map[string]int32 {
partitions := map[string]int32{}
for _, subset := range ud.Spec.Topology.Subsets {
@ -288,7 +365,7 @@ func getNextUpdate(ud *appsv1alpha1.UnitedDeployment, nextReplicas *map[string]i
return next
}
func (r *ReconcileUnitedDeployment) deleteDupSubset(ud *appsv1alpha1.UnitedDeployment, nameToSubsets map[string][]*Subset, control ControlInterface) (*map[string]*Subset, error) {
func (r *ReconcileUnitedDeployment) deleteDupSubset(nameToSubsets map[string][]*Subset, control ControlInterface) (*map[string]*Subset, error) {
nameToSubset := map[string]*Subset{}
for name, subsets := range nameToSubsets {
if len(subsets) > 1 {
@ -333,7 +410,7 @@ func (r *ReconcileUnitedDeployment) getSubsetControls(instance *appsv1alpha1.Uni
return nil, statefulSetSubSetType
}
func (r *ReconcileUnitedDeployment) classifySubsetBySubsetName(ud *appsv1alpha1.UnitedDeployment, subsets []*Subset) map[string][]*Subset {
func (r *ReconcileUnitedDeployment) classifySubsetBySubsetName(subsets []*Subset) map[string][]*Subset {
mapping := map[string][]*Subset{}
for _, ss := range subsets {
@ -348,13 +425,16 @@ func (r *ReconcileUnitedDeployment) classifySubsetBySubsetName(ud *appsv1alpha1.
return mapping
}
func (r *ReconcileUnitedDeployment) updateStatus(instance *appsv1alpha1.UnitedDeployment, newStatus, oldStatus *appsv1alpha1.UnitedDeploymentStatus, nameToSubset *map[string]*Subset, nextReplicas, nextPartition *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, collisionCount int32, control ControlInterface) (reconcile.Result, error) {
newStatus = r.calculateStatus(newStatus, nameToSubset, nextReplicas, nextPartition, currentRevision, updatedRevision, collisionCount, control)
_, err := r.updateUnitedDeployment(instance, oldStatus, newStatus)
return reconcile.Result{}, err
func (r *ReconcileUnitedDeployment) updateStatus(instance *appsv1alpha1.UnitedDeployment, newStatus, oldStatus *appsv1alpha1.UnitedDeploymentStatus) error {
newObj, err := r.updateUnitedDeployment(instance, oldStatus, newStatus)
if err == nil && newObj != nil {
ResourceVersionExpectation.Expect(newObj)
klog.InfoS("new resource version expected", "UnitedDeployment", klog.KObj(newObj), "ResourceVersion", newObj.GetResourceVersion())
}
return err
}
func (r *ReconcileUnitedDeployment) calculateStatus(newStatus *appsv1alpha1.UnitedDeploymentStatus, nameToSubset *map[string]*Subset, nextReplicas, nextPartition *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, collisionCount int32, control ControlInterface) *appsv1alpha1.UnitedDeploymentStatus {
func (r *ReconcileUnitedDeployment) calculateStatus(newStatus *appsv1alpha1.UnitedDeploymentStatus, nameToSubset *map[string]*Subset, nextReplicas, nextPartition *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, control ControlInterface) *appsv1alpha1.UnitedDeploymentStatus {
expectedRevision := currentRevision.Name
if updatedRevision != nil {
expectedRevision = updatedRevision.Name
@ -431,7 +511,8 @@ func (r *ReconcileUnitedDeployment) updateUnitedDeployment(ud *appsv1alpha1.Unit
ud.Generation == newStatus.ObservedGeneration &&
reflect.DeepEqual(oldStatus.SubsetReplicas, newStatus.SubsetReplicas) &&
reflect.DeepEqual(oldStatus.UpdateStatus, newStatus.UpdateStatus) &&
reflect.DeepEqual(oldStatus.Conditions, newStatus.Conditions) {
reflect.DeepEqual(oldStatus.Conditions, newStatus.Conditions) &&
reflect.DeepEqual(oldStatus.SubsetStatuses, newStatus.SubsetStatuses) {
return ud, nil
}
@ -439,13 +520,14 @@ func (r *ReconcileUnitedDeployment) updateUnitedDeployment(ud *appsv1alpha1.Unit
var getErr, updateErr error
for i, obj := 0, ud; ; i++ {
klog.V(4).InfoS("Updating status",
klog.V(4).InfoS("updating UnitedDeployment status",
"updateCount", i, "unitedDeployment", klog.KObj(obj),
"replicasSpec", obj.Spec.Replicas, "oldReplicas", obj.Status.Replicas, "newReplicas", newStatus.Replicas,
"readyReplicasSpec", obj.Spec.Replicas, "oldReadyReplicas", obj.Status.ReadyReplicas, "newReadyReplicas", newStatus.ReadyReplicas,
"oldUpdatedReplicas", obj.Status.UpdatedReplicas, "newUpdatedReplicas", newStatus.UpdatedReplicas,
"oldUpdatedReadyReplicas", obj.Status.UpdatedReadyReplicas, "newUpdatedReadyReplicas", newStatus.UpdatedReadyReplicas,
"oldObservedGeneration", obj.Status.ObservedGeneration, "newObservedGeneration", newStatus.ObservedGeneration,
"SubsetStatuses", obj.Status.SubsetStatuses, "newSubsetStatuses", newStatus.SubsetStatuses,
)
obj.Status = *newStatus

View File

@ -18,6 +18,7 @@ package uniteddeployment
import (
"testing"
"time"
"github.com/onsi/gomega"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
@ -95,7 +96,7 @@ func TestReconcile(t *testing.T) {
},
}
// Setup the Manager and Controller. Wrap the Controller Reconcile function so it writes each request to a
// Set up the Manager and Controller. Wrap the Controller Reconcile function so it writes each request to a
// channel when it is finished.
mgr, err := manager.New(cfg, manager.Options{})
g.Expect(err).NotTo(gomega.HaveOccurred())
@ -124,3 +125,173 @@ func TestReconcile(t *testing.T) {
defer c.Delete(context.TODO(), instance)
g.Eventually(requests, timeout).Should(gomega.Receive(gomega.Equal(expectedRequest)))
}
func TestUnschedulableStatusManagement(t *testing.T) {
subsetName := "subset-1"
baseEnvFactory := func() (*corev1.Pod, *Subset, *appsv1alpha1.UnitedDeployment) {
pod := &corev1.Pod{
Status: corev1.PodStatus{
Phase: corev1.PodPending,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodScheduled,
Status: corev1.ConditionFalse,
Reason: corev1.PodReasonUnschedulable,
},
},
},
ObjectMeta: metav1.ObjectMeta{
CreationTimestamp: metav1.NewTime(time.Now().Add(-15 * time.Second)),
},
}
subset := &Subset{
ObjectMeta: metav1.ObjectMeta{
Name: subsetName,
},
Status: SubsetStatus{
ReadyReplicas: 0,
Replicas: 1,
},
Spec: SubsetSpec{
SubsetPods: []*corev1.Pod{pod},
},
}
return pod, subset, &appsv1alpha1.UnitedDeployment{
Status: appsv1alpha1.UnitedDeploymentStatus{
SubsetStatuses: []appsv1alpha1.UnitedDeploymentSubsetStatus{
{
Name: subsetName,
Conditions: []appsv1alpha1.UnitedDeploymentSubsetCondition{
{
Type: appsv1alpha1.UnitedDeploymentSubsetSchedulable,
Status: corev1.ConditionTrue,
},
},
},
},
},
Spec: appsv1alpha1.UnitedDeploymentSpec{
Topology: appsv1alpha1.Topology{
ScheduleStrategy: appsv1alpha1.UnitedDeploymentScheduleStrategy{
Type: appsv1alpha1.AdaptiveUnitedDeploymentScheduleStrategyType,
},
},
},
}
}
cases := []struct {
name string
envFactory func() (*Subset, *appsv1alpha1.UnitedDeployment)
expectPendingPods int32
requeueUpperLimit time.Duration
requeueLowerLimit time.Duration
unschedulable bool
}{
{
name: "Not timeouted yet",
envFactory: func() (*Subset, *appsv1alpha1.UnitedDeployment) {
_, subset, ud := baseEnvFactory()
return subset, ud
},
expectPendingPods: 0,
requeueUpperLimit: appsv1alpha1.DefaultRescheduleCriticalDuration - 15*time.Second + 100*time.Millisecond,
requeueLowerLimit: appsv1alpha1.DefaultRescheduleCriticalDuration - 15*time.Second - 100*time.Millisecond,
unschedulable: false,
},
{
name: "Timeouted",
envFactory: func() (*Subset, *appsv1alpha1.UnitedDeployment) {
pod, subset, ud := baseEnvFactory()
pod.CreationTimestamp = metav1.NewTime(time.Now().Add(-31 * time.Second))
return subset, ud
},
expectPendingPods: 1,
requeueUpperLimit: appsv1alpha1.DefaultUnschedulableStatusLastDuration,
requeueLowerLimit: appsv1alpha1.DefaultUnschedulableStatusLastDuration,
unschedulable: true,
},
{
name: "During unschedulable status",
envFactory: func() (*Subset, *appsv1alpha1.UnitedDeployment) {
_, subset, ud := baseEnvFactory()
ud.Status.SubsetStatuses = []appsv1alpha1.UnitedDeploymentSubsetStatus{
{
Name: subset.Name,
Conditions: []appsv1alpha1.UnitedDeploymentSubsetCondition{
{
Type: appsv1alpha1.UnitedDeploymentSubsetSchedulable,
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Time{Time: time.Now().Add(-time.Minute)},
},
},
},
}
subset.Status.ReadyReplicas = 1
subset.Status.UnschedulableStatus.PendingPods = 0
return subset, ud
},
expectPendingPods: 0,
requeueUpperLimit: appsv1alpha1.DefaultUnschedulableStatusLastDuration - time.Minute + time.Second,
requeueLowerLimit: appsv1alpha1.DefaultUnschedulableStatusLastDuration - time.Minute - time.Second,
unschedulable: true,
},
{
name: "After status reset",
envFactory: func() (*Subset, *appsv1alpha1.UnitedDeployment) {
pod, subset, ud := baseEnvFactory()
ud.Status.SubsetStatuses = []appsv1alpha1.UnitedDeploymentSubsetStatus{
{
Name: subset.Name,
Conditions: []appsv1alpha1.UnitedDeploymentSubsetCondition{
{
Type: appsv1alpha1.UnitedDeploymentSubsetSchedulable,
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Time{Time: time.Now().Add(-time.Minute - appsv1alpha1.DefaultUnschedulableStatusLastDuration)},
},
},
},
}
pod.Status.Conditions = []corev1.PodCondition{
{
Type: corev1.PodScheduled,
Status: corev1.ConditionTrue,
},
}
return subset, ud
},
expectPendingPods: 0,
requeueUpperLimit: 0,
requeueLowerLimit: 0,
unschedulable: false,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
subset, ud := c.envFactory()
start := time.Now()
manageUnschedulableStatusForExistingSubset(subset.Name, subset, ud)
cost := time.Now().Sub(start)
if subset.Status.UnschedulableStatus.PendingPods != c.expectPendingPods {
t.Logf("case %s failed: expect pending pods %d, but got %d", c.name, c.expectPendingPods, subset.Status.UnschedulableStatus.PendingPods)
t.Fail()
}
requeueAfter := durationStore.Pop(getUnitedDeploymentKey(ud))
if c.requeueUpperLimit != c.requeueLowerLimit {
// result is not a const, which means this case will be affected by low execution speed.
requeueAfter += cost
} else {
cost = 0
}
t.Logf("got requeueAfter %f not in range [%f, %f] (cost fix %f)",
requeueAfter.Seconds(), c.requeueLowerLimit.Seconds(), c.requeueUpperLimit.Seconds(), cost.Seconds())
if requeueAfter > c.requeueUpperLimit || requeueAfter < c.requeueLowerLimit {
t.Fail()
}
if subset.Status.UnschedulableStatus.Unschedulable != c.unschedulable {
t.Logf("case %s failed: expect unschedulable %v, but got %v", c.name, c.unschedulable, subset.Status.UnschedulableStatus.Unschedulable)
t.Fail()
}
})
}
}

View File

@ -23,11 +23,11 @@ import (
"strconv"
"strings"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/util/expectations"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
)
const updateRetries = 5
@ -132,3 +132,9 @@ func filterOutCondition(conditions []appsv1alpha1.UnitedDeploymentCondition, con
}
return newConditions
}
func getUnitedDeploymentKey(ud *appsv1alpha1.UnitedDeployment) string {
return ud.GetNamespace() + "/" + ud.GetName()
}
var ResourceVersionExpectation = expectations.NewResourceVersionExpectation()

View File

@ -0,0 +1,42 @@
/*
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package uniteddeployment
import (
"context"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
)
type eventHandler struct {
handler.EnqueueRequestForObject
}
func (e *eventHandler) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
klog.InfoS("cleaning up UnitedDeployment", "unitedDeployment", evt.Object)
ResourceVersionExpectation.Delete(evt.Object)
e.EnqueueRequestForObject.Delete(ctx, evt, q)
}
func (e *eventHandler) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
// make sure latest version is observed
ResourceVersionExpectation.Observe(evt.ObjectNew)
e.EnqueueRequestForObject.Update(ctx, evt, q)
}

View File

@ -32,6 +32,7 @@ import (
func (r *ReconcileUnitedDeployment) manageSubsets(ud *appsv1alpha1.UnitedDeployment, nameToSubset *map[string]*Subset, nextUpdate map[string]SubsetUpdate, currentRevision, updatedRevision *appsv1.ControllerRevision, subsetType subSetType) (newStatus *appsv1alpha1.UnitedDeploymentStatus, updateErr error) {
newStatus = ud.Status.DeepCopy()
exists, provisioned, err := r.manageSubsetProvision(ud, nameToSubset, nextUpdate, currentRevision, updatedRevision, subsetType)
if err != nil {
SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetProvisioned, corev1.ConditionFalse, "Error", err.Error()))
@ -79,6 +80,17 @@ func (r *ReconcileUnitedDeployment) manageSubsets(ud *appsv1alpha1.UnitedDeploym
if updateErr == nil {
SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetUpdated, corev1.ConditionTrue, "", ""))
} else {
// If using an Adaptive scheduling strategy, when the subset is scaled out leading to the creation of new Pods,
// future potential scheduling failures need to be checked for rescheduling.
var newPodCreated = false
for _, cell := range needUpdate {
subset := (*nameToSubset)[cell]
replicas := nextUpdate[cell].Replicas
newPodCreated = newPodCreated || subset.Spec.Replicas < replicas
}
if strategy := ud.Spec.Topology.ScheduleStrategy; strategy.IsAdaptive() && newPodCreated {
durationStore.Push(getUnitedDeploymentKey(ud), strategy.GetRescheduleCriticalDuration())
}
SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetUpdated, corev1.ConditionFalse, "Error", updateErr.Error()))
}
return
@ -132,6 +144,11 @@ func (r *ReconcileUnitedDeployment) manageSubsetProvision(ud *appsv1alpha1.Unite
return nil
})
if createdErr == nil {
// When a new subset is created, regardless of whether it contains newly created Pods,
// a requeue is triggered to treat it as an existing subset and update its unschedulable information.
if strategy := ud.Spec.Topology.ScheduleStrategy; strategy.IsAdaptive() {
durationStore.Push(getUnitedDeploymentKey(ud), strategy.GetRescheduleCriticalDuration())
}
r.recorder.Eventf(ud.DeepCopy(), corev1.EventTypeNormal, fmt.Sprintf("Successful%s", eventTypeSubsetsUpdate), "Create %d Subset (%s)", createdNum, subsetType)
} else {
errs = append(errs, createdErr)

View File

@ -2,6 +2,7 @@ package util
import (
"encoding/json"
"time"
v1 "k8s.io/api/core/v1"
)
@ -22,3 +23,24 @@ func UpdateMessageKvCondition(kv map[string]interface{}, condition *v1.PodCondit
message, _ := json.Marshal(kv)
condition.Message = string(message)
}
// GetTimeBeforePendingTimeout return true when Pod was scheduled failed and timeout.
// nextCheckAfter > 0 means the pod is failed to schedule but not timeout yet.
func GetTimeBeforePendingTimeout(pod *v1.Pod, timeout time.Duration) (timeouted bool, nextCheckAfter time.Duration) {
if pod.DeletionTimestamp != nil || pod.Status.Phase != v1.PodPending || pod.Spec.NodeName != "" {
return false, -1
}
for _, condition := range pod.Status.Conditions {
if condition.Type == v1.PodScheduled && condition.Status == v1.ConditionFalse &&
condition.Reason == v1.PodReasonUnschedulable {
currentTime := time.Now()
expectSchedule := pod.CreationTimestamp.Add(timeout)
// schedule timeout
if expectSchedule.Before(currentTime) {
return true, -1
}
return false, expectSchedule.Sub(currentTime)
}
}
return false, -1
}

View File

@ -20,6 +20,7 @@ import (
"context"
"time"
"github.com/openkruise/kruise/pkg/controller/util"
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
@ -111,26 +112,9 @@ func (r *ReconcileWorkloadSpread) deletePodsForSubset(ws *appsv1alpha1.WorkloadS
// PodUnscheduledTimeout return true when Pod was scheduled failed and timeout.
func PodUnscheduledTimeout(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod) bool {
if pod.DeletionTimestamp != nil || pod.Status.Phase != corev1.PodPending || pod.Spec.NodeName != "" {
return false
timeouted, nextCheckAfter := util.GetTimeBeforePendingTimeout(pod, time.Second*time.Duration(*ws.Spec.ScheduleStrategy.Adaptive.RescheduleCriticalSeconds))
if nextCheckAfter > 0 {
durationStore.Push(getWorkloadSpreadKey(ws), nextCheckAfter)
}
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodScheduled && condition.Status == corev1.ConditionFalse &&
condition.Reason == corev1.PodReasonUnschedulable {
currentTime := time.Now()
rescheduleCriticalSeconds := *ws.Spec.ScheduleStrategy.Adaptive.RescheduleCriticalSeconds
expectSchedule := pod.CreationTimestamp.Add(time.Second * time.Duration(rescheduleCriticalSeconds))
// schedule timeout
if expectSchedule.Before(currentTime) {
return true
}
// no timeout, requeue key when expectSchedule is equal to time.Now()
durationStore.Push(getWorkloadSpreadKey(ws), expectSchedule.Sub(currentTime))
return false
}
}
return false
return timeouted
}

View File

@ -566,7 +566,7 @@ func (h *Handler) updateSubsetForPod(ws *appsv1alpha1.WorkloadSpread,
}
// return two parameters
// 1. isRecord(bool) 2. SubsetStatus
// 1. isRecord(bool) 2. SubsetStatuses
func isPodRecordedInSubset(subsetStatuses []appsv1alpha1.WorkloadSpreadSubsetStatus, podName string) (bool, *appsv1alpha1.WorkloadSpreadSubsetStatus) {
for _, subset := range subsetStatuses {
if _, ok := subset.CreatingPods[podName]; ok {

View File

@ -1,13 +1,19 @@
package apps
import (
"context"
"fmt"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned"
"github.com/openkruise/kruise/test/e2e/framework"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/utils/ptr"
)
var _ = SIGDescribe("uniteddeployment", func() {
@ -77,4 +83,118 @@ var _ = SIGDescribe("uniteddeployment", func() {
udManager.Scale(1)
udManager.CheckSubsets(replicasMap([]int32{0, 0, 1}))
})
ginkgo.It("adaptive united deployment with elastic allocator", func() {
replicas := func(r int) *intstr.IntOrString { p := intstr.FromInt32(int32(r)); return &p }
replicasMap := func(replicas []int32) map[string]int32 {
replicaMap := make(map[string]int32)
for i, r := range replicas {
replicaMap[fmt.Sprintf("subset-%d", i)] = r
}
return replicaMap
}
unschedulableMap := func(unschedulables []bool) map[string]bool {
resultMap := make(map[string]bool)
for i, r := range unschedulables {
resultMap[fmt.Sprintf("subset-%d", i)] = r
}
return resultMap
}
udManager := tester.NewUnitedDeploymentManager("adaptive-ud-elastic-test")
// enable adaptive scheduling
udManager.UnitedDeployment.Spec.Topology.ScheduleStrategy = appsv1alpha1.UnitedDeploymentScheduleStrategy{
Type: appsv1alpha1.AdaptiveUnitedDeploymentScheduleStrategyType,
Adaptive: &appsv1alpha1.AdaptiveUnitedDeploymentStrategy{
RescheduleCriticalSeconds: ptr.To(int32(20)),
UnschedulableLastSeconds: ptr.To(int32(15)),
},
}
udManager.AddSubset("subset-0", nil, nil, replicas(2))
udManager.AddSubset("subset-1", nil, nil, replicas(2))
udManager.AddSubset("subset-2", nil, nil, nil)
// make subset-1 unschedulable
nodeKey := "ud-e2e/to-make-a-bad-subset-elastic"
udManager.UnitedDeployment.Spec.Topology.Subsets[1].NodeSelectorTerm = corev1.NodeSelectorTerm{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: nodeKey,
Operator: corev1.NodeSelectorOpExists,
},
},
}
ginkgo.By("creating united deployment")
udManager.Spec.Replicas = ptr.To(int32(3))
_, err := f.KruiseClientSet.AppsV1alpha1().UnitedDeployments(udManager.Namespace).Create(context.Background(),
udManager.UnitedDeployment, metav1.CreateOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
ginkgo.By("wait for rescheduling, will take long")
udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, true, false}))
udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 1}))
fmt.Println()
ginkgo.By("scale up while unschedulable")
udManager.Scale(4)
udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 2}))
fmt.Println()
ginkgo.By("scale down while unschedulable")
udManager.Scale(3)
udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 1}))
fmt.Println()
ginkgo.By("wait subset recovery, will take long")
udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false}))
fmt.Println()
ginkgo.By("scale up after recovery")
udManager.Scale(4)
udManager.CheckSubsetPods(replicasMap([]int32{2, 1, 1}))
fmt.Println()
ginkgo.By("scale down after recovery")
udManager.Scale(3)
udManager.CheckSubsetPods(replicasMap([]int32{2, 1, 0})) // even pods in subset-1 are not ready
fmt.Println()
ginkgo.By("create new subset")
udManager.AddSubset("subset-3", nil, replicas(2), nil)
udManager.Update()
fmt.Println()
ginkgo.By("waiting final status after scaling up to new subset, will take long")
udManager.Scale(6)
udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false, false}))
udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 2, 2}))
fmt.Println()
ginkgo.By("fix subset-1 and wait recover")
nodeList, err := f.ClientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
someNode := nodeList.Items[1]
someNode.Labels[nodeKey] = "haha"
_, err = f.ClientSet.CoreV1().Nodes().Update(context.Background(), &someNode, metav1.UpdateOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false}))
ginkgo.By("waiting final status after deleting new subset")
udManager.Spec.Topology.Subsets = udManager.Spec.Topology.Subsets[:3]
udManager.Update()
udManager.CheckSubsetPods(replicasMap([]int32{2, 2, 2}))
fmt.Println()
ginkgo.By("scale down after fixed")
udManager.Scale(3)
udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false}))
udManager.CheckSubsetPods(replicasMap([]int32{2, 1, 0}))
fmt.Println()
ginkgo.By("scale up after fixed")
udManager.Scale(5)
udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false}))
udManager.CheckSubsetPods(replicasMap([]int32{2, 2, 1}))
fmt.Println()
})
})

View File

@ -3,6 +3,9 @@ package framework
import (
"context"
"fmt"
"reflect"
"time"
"github.com/onsi/gomega"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned"
@ -13,8 +16,6 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/utils/pointer"
"reflect"
"time"
)
type UnitedDeploymentTester struct {
@ -31,6 +32,8 @@ func NewUnitedDeploymentTester(c clientset.Interface, kc kruiseclientset.Interfa
}
}
var zero = int64(0)
func (t *UnitedDeploymentTester) NewUnitedDeploymentManager(name string) *UnitedDeploymentManager {
return &UnitedDeploymentManager{
UnitedDeployment: &appsv1alpha1.UnitedDeployment{
@ -64,6 +67,7 @@ func (t *UnitedDeploymentTester) NewUnitedDeploymentManager(name string) *United
},
},
Spec: v1.PodSpec{
TerminationGracePeriodSeconds: &zero,
Containers: []v1.Container{
{
Name: "busybox",
@ -81,12 +85,14 @@ func (t *UnitedDeploymentTester) NewUnitedDeploymentManager(name string) *United
},
},
kc: t.kc,
c: t.c,
}
}
type UnitedDeploymentManager struct {
*appsv1alpha1.UnitedDeployment
kc kruiseclientset.Interface
c clientset.Interface
}
func (m *UnitedDeploymentManager) AddSubset(name string, replicas, minReplicas, maxReplicas *intstr.IntOrString) {
@ -120,7 +126,12 @@ func (m *UnitedDeploymentManager) Create(replicas int32) {
gomega.Eventually(func() bool {
ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.TODO(), m.Name, metav1.GetOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
return ud.Status.Replicas == replicas && ud.Generation == ud.Status.ObservedGeneration
ok := ud.Status.Replicas == replicas && ud.Generation == ud.Status.ObservedGeneration
if !ok {
fmt.Printf("UnitedDeploymentManager.Create failed\nud.Status.Replicas: %d, ud.Generation: %d, ud.Status.ObservedGeneration: %d\n",
ud.Status.Replicas, ud.Generation, ud.Status.ObservedGeneration)
}
return ok
}, time.Minute, time.Second).Should(gomega.BeTrue())
}
@ -128,6 +139,57 @@ func (m *UnitedDeploymentManager) CheckSubsets(replicas map[string]int32) {
gomega.Eventually(func() bool {
ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.TODO(), m.Name, metav1.GetOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
return ud.GetGeneration() == ud.Status.ObservedGeneration && *ud.Spec.Replicas == ud.Status.Replicas && reflect.DeepEqual(replicas, ud.Status.SubsetReplicas)
}, time.Minute, time.Second).Should(gomega.BeTrue())
ok := ud.GetGeneration() == ud.Status.ObservedGeneration && *ud.Spec.Replicas == ud.Status.Replicas && reflect.DeepEqual(replicas, ud.Status.SubsetReplicas)
if !ok {
fmt.Printf("UnitedDeploymentManager.CheckSubsets failed\nud.GetGeneration(): %d, ud.Status.ObservedGeneration: %d, *ud.Spec.Replicas: %d, ud.Status.Replicas: %d, ud.Status.SubsetReplicas: %v\n", ud.GetGeneration(),
ud.Status.ObservedGeneration, *ud.Spec.Replicas, ud.Status.Replicas, ud.Status.SubsetReplicas)
}
return ok
}, 3*time.Minute, time.Second).Should(gomega.BeTrue())
}
func (m *UnitedDeploymentManager) Update() {
gomega.Eventually(func(g gomega.Gomega) {
ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.Background(), m.Name, metav1.GetOptions{})
g.Expect(err).NotTo(gomega.HaveOccurred())
ud.Spec = m.UnitedDeployment.DeepCopy().Spec
_, err = m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Update(context.Background(), ud, metav1.UpdateOptions{})
g.Expect(err).NotTo(gomega.HaveOccurred())
}, time.Minute, time.Second).Should(gomega.Succeed())
}
func (m *UnitedDeploymentManager) CheckSubsetPods(expect map[string]int32) {
fmt.Print("CheckSubsetPods ")
ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.TODO(), m.Name, metav1.GetOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Eventually(func(g gomega.Gomega) {
actual := map[string]int32{}
for _, subset := range ud.Spec.Topology.Subsets {
podList, err := m.c.CoreV1().Pods(m.Namespace).List(context.Background(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("apps.kruise.io/subset-name=%s", subset.Name),
})
g.Expect(err).NotTo(gomega.HaveOccurred())
actual[subset.Name] = int32(len(podList.Items))
}
g.Expect(expect).To(gomega.BeEquivalentTo(actual))
}, time.Minute, 500*time.Millisecond).Should(gomega.Succeed())
fmt.Println("pass")
}
func (m *UnitedDeploymentManager) CheckUnschedulableStatus(expect map[string]bool) {
fmt.Print("CheckUnschedulableStatus ")
gomega.Eventually(func(g gomega.Gomega) {
ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.TODO(), m.Name, metav1.GetOptions{})
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(ud.Status.SubsetStatuses != nil).To(gomega.BeTrue())
actual := map[string]bool{}
for name := range expect {
status := ud.Status.GetSubsetStatus(name)
g.Expect(status != nil).To(gomega.BeTrue())
condition := status.GetCondition(appsv1alpha1.UnitedDeploymentSubsetSchedulable)
actual[name] = condition != nil && condition.Status == v1.ConditionFalse
}
g.Expect(expect).To(gomega.BeEquivalentTo(actual))
}, time.Minute, 500*time.Millisecond).Should(gomega.Succeed())
fmt.Println("pass")
}