UnitedDeployment: subset provision (#155)

* UnitedDeployment: subset definition

* UnitedDeployment: replicas allocator considers how pods spread

* UnitedDeployment: subset provision

* fix review issues

* add header

* fix review issue: change nameToReplicas.Limited to Specified

* fix review issues: mainly refactor allocator

* refactoring

* fix review issues
This commit is contained in:
Kan Wu 2019-11-13 09:25:11 +08:00 committed by Fei Guo
parent b2dea84a72
commit b00c632b16
27 changed files with 2457 additions and 231 deletions

View File

@ -481,6 +481,20 @@
}
}
},
"kruise.apps.v1alpha1.ManualUpdate": {
"description": "ManualUpdate is a update strategy which allow users to provide the partition of each subset.",
"type": "object",
"properties": {
"partitions": {
"description": "Indicates number of subset partition.",
"type": "object",
"additionalProperties": {
"type": "integer",
"format": "int32"
}
}
}
},
"kruise.apps.v1alpha1.RollingUpdateSidecarSet": {
"description": "RollingUpdateSidecarSet is used to communicate parameter",
"type": "object",
@ -838,7 +852,7 @@
],
"properties": {
"name": {
"description": "Indicates the name of this subset, which will be used to generate subset workload name in the format '\u003cdeployment-name\u003e-\u003csubset-name\u003e'.",
"description": "Indicates the name of this subset, which will be used to generate subset workload name prefix in the format '\u003cdeployment-name\u003e-\u003csubset-name\u003e-'.",
"type": "string"
},
"nodeSelector": {
@ -970,10 +984,6 @@
"description": "Selector is a label query over pods that should match the replica count. It must match the pod template's labels.",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.LabelSelector"
},
"strategy": {
"description": "Strategy indicates the action of updating",
"$ref": "#/definitions/kruise.apps.v1alpha1.UnitedDeploymentUpdateStrategy"
},
"template": {
"description": "Template is the object that describes the subset that will be created.",
"$ref": "#/definitions/kruise.apps.v1alpha1.SubsetTemplate"
@ -981,6 +991,10 @@
"topology": {
"description": "Contains the information of subset topology.",
"$ref": "#/definitions/kruise.apps.v1alpha1.Topology"
},
"updateStrategy": {
"description": "Strategy indicates the action of updating",
"$ref": "#/definitions/kruise.apps.v1alpha1.UnitedDeploymentUpdateStrategy"
}
}
},
@ -1052,13 +1066,13 @@
"description": "UnitedDeploymentUpdateStrategy defines the update strategy of UnitedDeployment.",
"type": "object",
"properties": {
"partitions": {
"description": "Indicates the partition of each subset.",
"type": "object",
"additionalProperties": {
"type": "integer",
"format": "int32"
}
"manualUpdate": {
"description": "Indicate the partition of each subset.",
"$ref": "#/definitions/kruise.apps.v1alpha1.ManualUpdate"
},
"type": {
"description": "Type of UnitedDeployment update. Default is Manual.",
"type": "string"
}
}
},

View File

@ -74,13 +74,6 @@ spec:
description: Selector is a label query over pods that should match the
replica count. It must match the pod template's labels.
type: object
strategy:
description: Strategy indicates the action of updating
properties:
partitions:
description: Indicates the partition of each subset.
type: object
type: object
template:
description: Template is the object that describes the subset that will
be created.
@ -105,7 +98,8 @@ spec:
properties:
name:
description: Indicates the name of this subset, which will
be used to generate subset workload name in the format '<deployment-name>-<subset-name>'.
be used to generate subset workload name prefix in the format
'<deployment-name>-<subset-name>-'.
type: string
nodeSelector:
description: Indicates the node select strategy to form the
@ -123,6 +117,20 @@ spec:
type: object
type: array
type: object
updateStrategy:
description: Strategy indicates the action of updating
properties:
manualUpdate:
description: Indicate the partition of each subset.
properties:
partitions:
description: Indicates number of subset partition.
type: object
type: object
type:
description: Type of UnitedDeployment update. Default is Manual.
type: string
type: object
required:
- selector
type: object

View File

@ -5,7 +5,7 @@ metadata:
controller-tools.k8s.io: "1.0"
name: uniteddeployment-sample
spec:
replicas: 4
replicas: 6
revisionHistoryLimit: 10
selector:
matchLabels:
@ -26,22 +26,29 @@ spec:
image: nginx:1.0
topology:
subsets:
- name: subsetA
- name: subset-a
replicas: 1
nodeSelector:
nodeSelectorTerms:
- matchExpressions:
- key: node
operator: In
values:
- zoneA
- name: subsetB
- zone-a
- name: subset-b
replicas: 50%
nodeSelector:
nodeSelectorTerms:
- matchExpressions:
- key: node
operator: In
values:
- zoneB
- zone-b
- name: subset-c
nodeSelector:
nodeSelectorTerms:
- matchExpressions:
- key: node
operator: In
values:
- zone-c

View File

@ -169,6 +169,14 @@ func SetDefaults_UnitedDeployment(obj *UnitedDeployment) {
*obj.Spec.RevisionHistoryLimit = 10
}
if len(obj.Spec.UpdateStrategy.Type) == 0 {
obj.Spec.UpdateStrategy.Type = ManualUpdateStrategyType
}
if obj.Spec.UpdateStrategy.Type == ManualUpdateStrategyType && obj.Spec.UpdateStrategy.ManualUpdate == nil {
obj.Spec.UpdateStrategy.ManualUpdate = &ManualUpdate{}
}
if obj.Spec.Template.StatefulSetTemplate != nil {
utils.SetDefaultPodTemplate(&obj.Spec.Template.StatefulSetTemplate.Spec.Template.Spec)
for i := range obj.Spec.Template.StatefulSetTemplate.Spec.VolumeClaimTemplates {

View File

@ -46,6 +46,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA
"github.com/openkruise/kruise/pkg/apis/apps/v1alpha1.InPlaceUpdateContainerStatus": schema_pkg_apis_apps_v1alpha1_InPlaceUpdateContainerStatus(ref),
"github.com/openkruise/kruise/pkg/apis/apps/v1alpha1.InPlaceUpdateState": schema_pkg_apis_apps_v1alpha1_InPlaceUpdateState(ref),
"github.com/openkruise/kruise/pkg/apis/apps/v1alpha1.JobCondition": schema_pkg_apis_apps_v1alpha1_JobCondition(ref),
"github.com/openkruise/kruise/pkg/apis/apps/v1alpha1.ManualUpdate": schema_pkg_apis_apps_v1alpha1_ManualUpdate(ref),
"github.com/openkruise/kruise/pkg/apis/apps/v1alpha1.RollingUpdateSidecarSet": schema_pkg_apis_apps_v1alpha1_RollingUpdateSidecarSet(ref),
"github.com/openkruise/kruise/pkg/apis/apps/v1alpha1.RollingUpdateStatefulSetStrategy": schema_pkg_apis_apps_v1alpha1_RollingUpdateStatefulSetStrategy(ref),
"github.com/openkruise/kruise/pkg/apis/apps/v1alpha1.SidecarContainer": schema_pkg_apis_apps_v1alpha1_SidecarContainer(ref),
@ -885,6 +886,34 @@ func schema_pkg_apis_apps_v1alpha1_JobCondition(ref common.ReferenceCallback) co
}
}
func schema_pkg_apis_apps_v1alpha1_ManualUpdate(ref common.ReferenceCallback) common.OpenAPIDefinition {
return common.OpenAPIDefinition{
Schema: spec.Schema{
SchemaProps: spec.SchemaProps{
Description: "ManualUpdate is a update strategy which allow users to provide the partition of each subset.",
Type: []string{"object"},
Properties: map[string]spec.Schema{
"partitions": {
SchemaProps: spec.SchemaProps{
Description: "Indicates number of subset partition.",
Type: []string{"object"},
AdditionalProperties: &spec.SchemaOrBool{
Allows: true,
Schema: &spec.Schema{
SchemaProps: spec.SchemaProps{
Type: []string{"integer"},
Format: "int32",
},
},
},
},
},
},
},
},
}
}
func schema_pkg_apis_apps_v1alpha1_RollingUpdateSidecarSet(ref common.ReferenceCallback) common.OpenAPIDefinition {
return common.OpenAPIDefinition{
Schema: spec.Schema{
@ -1501,7 +1530,7 @@ func schema_pkg_apis_apps_v1alpha1_Subset(ref common.ReferenceCallback) common.O
Properties: map[string]spec.Schema{
"name": {
SchemaProps: spec.SchemaProps{
Description: "Indicates the name of this subset, which will be used to generate subset workload name in the format '<deployment-name>-<subset-name>'.",
Description: "Indicates the name of this subset, which will be used to generate subset workload name prefix in the format '<deployment-name>-<subset-name>-'.",
Type: []string{"string"},
Format: "",
},
@ -1748,7 +1777,7 @@ func schema_pkg_apis_apps_v1alpha1_UnitedDeploymentSpec(ref common.ReferenceCall
Ref: ref("github.com/openkruise/kruise/pkg/apis/apps/v1alpha1.Topology"),
},
},
"strategy": {
"updateStrategy": {
SchemaProps: spec.SchemaProps{
Description: "Strategy indicates the action of updating",
Ref: ref("github.com/openkruise/kruise/pkg/apis/apps/v1alpha1.UnitedDeploymentUpdateStrategy"),
@ -1876,24 +1905,24 @@ func schema_pkg_apis_apps_v1alpha1_UnitedDeploymentUpdateStrategy(ref common.Ref
Description: "UnitedDeploymentUpdateStrategy defines the update strategy of UnitedDeployment.",
Type: []string{"object"},
Properties: map[string]spec.Schema{
"partitions": {
"type": {
SchemaProps: spec.SchemaProps{
Description: "Indicates the partition of each subset.",
Type: []string{"object"},
AdditionalProperties: &spec.SchemaOrBool{
Allows: true,
Schema: &spec.Schema{
SchemaProps: spec.SchemaProps{
Type: []string{"integer"},
Format: "int32",
},
},
},
Description: "Type of UnitedDeployment update. Default is Manual.",
Type: []string{"string"},
Format: "",
},
},
"manualUpdate": {
SchemaProps: spec.SchemaProps{
Description: "Indicate the partition of each subset.",
Ref: ref("github.com/openkruise/kruise/pkg/apis/apps/v1alpha1.ManualUpdate"),
},
},
},
},
},
Dependencies: []string{
"github.com/openkruise/kruise/pkg/apis/apps/v1alpha1.ManualUpdate"},
}
}

View File

@ -23,6 +23,15 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
)
// UpdateStrategyType is a string enumeration type that enumerates
// all possible update strategies for the UnitedDeployment controller.
type UpdateStrategyType string
const (
// ManualUpdateStrategyType indicate the partition of each subset.
ManualUpdateStrategyType UpdateStrategyType = "Manual"
)
// UnitedDeploymentSpec defines the desired state of UnitedDeployment
type UnitedDeploymentSpec struct {
// Replicas is the totally desired number of replicas of all the owning workloads.
@ -44,7 +53,7 @@ type UnitedDeploymentSpec struct {
// Strategy indicates the action of updating
// +optional
Strategy UnitedDeploymentUpdateStrategy `json:"strategy,omitempty"`
UpdateStrategy UnitedDeploymentUpdateStrategy `json:"updateStrategy,omitempty"`
// Indicates the number of histories to be conserved.
// If unspecified, defaults to 10.
@ -67,9 +76,20 @@ type StatefulSetTemplateSpec struct {
// UnitedDeploymentUpdateStrategy defines the update strategy of UnitedDeployment.
type UnitedDeploymentUpdateStrategy struct {
// Indicates the partition of each subset.
// Type of UnitedDeployment update.
// Default is Manual.
// +optional
Partitions map[string]*int32 `json:"partitions,omitempty"`
Type UpdateStrategyType `json:"type,omitempty"`
// Indicate the partition of each subset.
// +optional
ManualUpdate *ManualUpdate `json:"manualUpdate,omitempty"`
}
// ManualUpdate is a update strategy which allow users to provide the partition of each subset.
type ManualUpdate struct {
// Indicates number of subset partition.
// +optional
Partitions map[string]int32 `json:"partitions,omitempty"`
}
// Topology defines the spread detail of each subset under UnitedDeployment.
@ -82,7 +102,7 @@ type Topology struct {
// Subset defines the detail of a subset.
type Subset struct {
// Indicates the name of this subset, which will be used to generate
// subset workload name in the format '<deployment-name>-<subset-name>'.
// subset workload name prefix in the format '<deployment-name>-<subset-name>-'.
Name string `json:"name"`
// Indicates the node select strategy to form the subset.

View File

@ -482,6 +482,29 @@ func (in *JobCondition) DeepCopy() *JobCondition {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ManualUpdate) DeepCopyInto(out *ManualUpdate) {
*out = *in
if in.Partitions != nil {
in, out := &in.Partitions, &out.Partitions
*out = make(map[string]int32, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ManualUpdate.
func (in *ManualUpdate) DeepCopy() *ManualUpdate {
if in == nil {
return nil
}
out := new(ManualUpdate)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *RollingUpdateSidecarSet) DeepCopyInto(out *RollingUpdateSidecarSet) {
*out = *in
@ -1007,7 +1030,7 @@ func (in *UnitedDeploymentSpec) DeepCopyInto(out *UnitedDeploymentSpec) {
}
in.Template.DeepCopyInto(&out.Template)
in.Topology.DeepCopyInto(&out.Topology)
in.Strategy.DeepCopyInto(&out.Strategy)
in.UpdateStrategy.DeepCopyInto(&out.UpdateStrategy)
if in.RevisionHistoryLimit != nil {
in, out := &in.RevisionHistoryLimit, &out.RevisionHistoryLimit
*out = new(int32)
@ -1069,20 +1092,10 @@ func (in *UnitedDeploymentStatus) DeepCopy() *UnitedDeploymentStatus {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *UnitedDeploymentUpdateStrategy) DeepCopyInto(out *UnitedDeploymentUpdateStrategy) {
*out = *in
if in.Partitions != nil {
in, out := &in.Partitions, &out.Partitions
*out = make(map[string]*int32, len(*in))
for key, val := range *in {
var outVal *int32
if val == nil {
(*out)[key] = nil
} else {
in, out := &val, &outVal
*out = new(int32)
**out = **in
}
(*out)[key] = outVal
}
if in.ManualUpdate != nil {
in, out := &in.ManualUpdate, &out.ManualUpdate
*out = new(ManualUpdate)
(*in).DeepCopyInto(*out)
}
return
}

View File

@ -0,0 +1,285 @@
/*
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package uniteddeployment
import (
"fmt"
"sort"
"strings"
"k8s.io/klog"
appsv1alpha1 "github.com/openkruise/kruise/pkg/apis/apps/v1alpha1"
)
type nameToReplicas struct {
SubsetName string
Replicas int32
Specified bool
}
type subsetInfos []*nameToReplicas
func (n subsetInfos) Get(i int) *nameToReplicas {
return []*nameToReplicas(n)[i]
}
func (n subsetInfos) Len() int {
return len(n)
}
func (n subsetInfos) Less(i, j int) bool {
if n[i].Replicas != n[j].Replicas {
return n[i].Replicas < n[j].Replicas
}
return strings.Compare(n[i].SubsetName, n[j].SubsetName) < 0
}
func (n subsetInfos) Swap(i, j int) {
n[i], n[j] = n[j], n[i]
}
// GetAllocatedReplicas returns a mapping from subset to next replicas.
// Next replicas is allocated by replicasAllocator, which will consider the current replicas of each subset and
// new replicas indicated from UnitedDeployment.Spec.Topology.Subsets.
func GetAllocatedReplicas(nameToSubset *map[string]*Subset, ud *appsv1alpha1.UnitedDeployment) (*map[string]int32, bool, string) {
subsetInfos := getSubsetInfos(nameToSubset, ud)
specifiedReplicas := getSpecifiedSubsetReplicas(ud)
// call SortToAllocator to sort all subset by subset.Replicas in order of increment
return subsetInfos.SortToAllocator().AllocateReplicas(*ud.Spec.Replicas, specifiedReplicas)
}
func (n subsetInfos) SortToAllocator() *replicasAllocator {
sort.Sort(n)
return &replicasAllocator{subsets: &n}
}
type replicasAllocator struct {
subsets *subsetInfos
}
func (s *replicasAllocator) effectiveReplicas(replicas int32, subsetReplicasLimits *map[string]int32) (bool, string) {
if subsetReplicasLimits == nil {
return true, ""
}
var specifiedReplicas int32
for _, replicas := range *subsetReplicasLimits {
specifiedReplicas += replicas
}
if specifiedReplicas > replicas {
return false, fmt.Sprintf("Specified subsets' replica (%d) is greater than UnitedDeployment replica (%d)", specifiedReplicas, replicas)
} else if specifiedReplicas < replicas {
specifiedCount := 0
for _, subset := range *s.subsets {
if _, exist := (*subsetReplicasLimits)[subset.SubsetName]; exist {
specifiedCount++
}
}
if specifiedCount == len(*s.subsets) {
return false, fmt.Sprintf("Specified subsets' replica (%d) is less than UnitedDeployment replica (%d)", specifiedReplicas, replicas)
}
}
return true, ""
}
func getSpecifiedSubsetReplicas(ud *appsv1alpha1.UnitedDeployment) *(map[string]int32) {
replicaLimits := map[string]int32{}
if ud.Spec.Topology.Subsets == nil {
return &replicaLimits
}
for _, subsetDef := range ud.Spec.Topology.Subsets {
if subsetDef.Replicas == nil {
continue
}
if specifiedReplicas, err := ParseSubsetReplicas(*ud.Spec.Replicas, *subsetDef.Replicas); err == nil {
replicaLimits[subsetDef.Name] = specifiedReplicas
} else {
klog.Warningf("Fail to consider the replicas of subset %s when parsing replicaLimits during managing replicas of UnitedDeployment %s/%s: %s",
subsetDef.Name, ud.Namespace, ud.Name, err)
}
}
return &replicaLimits
}
func getSubsetInfos(nameToSubset *map[string]*Subset, ud *appsv1alpha1.UnitedDeployment) *subsetInfos {
infos := make(subsetInfos, len(ud.Spec.Topology.Subsets))
for idx, subsetDef := range ud.Spec.Topology.Subsets {
var replicas int32
if subset, exist := (*nameToSubset)[subsetDef.Name]; exist {
replicas = subset.Spec.Replicas
}
infos[idx] = &nameToReplicas{SubsetName: subsetDef.Name, Replicas: replicas}
}
return &infos
}
// AllocateReplicas will first try to check the specifiedSubsetReplicas is effective or not.
// If effective, normalAllocate will be called. It will apply these specified replicas, then average the rest replicas to left unspecified subsets.
// If not, it will incrementally allocate all of the replicas. The current replicas spread situation will be considered,
// in order to make the scaling smoothly
func (s *replicasAllocator) AllocateReplicas(replicas int32, specifiedSubsetReplicas *map[string]int32) (*map[string]int32, bool, string) {
if effective, reason := s.effectiveReplicas(replicas, specifiedSubsetReplicas); !effective {
return s.incrementalAllocate(replicas), false, reason
}
return s.normalAllocate(replicas, specifiedSubsetReplicas), true, ""
}
func (s *replicasAllocator) normalAllocate(expectedReplicas int32, specifiedSubsetReplicas *map[string]int32) *map[string]int32 {
var specifiedReplicas int32
specifiedSubsetCount := 0
// Step 1: apply replicas to specified subsets, and mark them as specified = true.
for _, subset := range *s.subsets {
if replicas, exist := (*specifiedSubsetReplicas)[subset.SubsetName]; exist {
specifiedReplicas += replicas
subset.Replicas = replicas
subset.Specified = true
specifiedSubsetCount++
}
}
// Step 2: averagely allocate the rest replicas to left unspecified subsets.
leftSubsetCount := len(*s.subsets) - specifiedSubsetCount
if leftSubsetCount != 0 {
allocatableReplicas := expectedReplicas - specifiedReplicas
average := int(allocatableReplicas) / leftSubsetCount
remainder := int(allocatableReplicas) % leftSubsetCount
for i := len(*s.subsets) - 1; i >= 0; i-- {
subset := (*s.subsets)[i]
if subset.Specified {
continue
}
if remainder > 0 {
subset.Replicas = int32(average + 1)
remainder--
} else {
subset.Replicas = int32(average)
}
leftSubsetCount--
if leftSubsetCount == 0 {
break
}
}
}
return s.toSubsetReplicaMap()
}
func (s *replicasAllocator) incrementalAllocate(expectedReplicas int32) *map[string]int32 {
var currentReplicas int32
for _, nts := range *s.subsets {
currentReplicas += nts.Replicas
}
consideredLen := len(*s.subsets)
diff := expectedReplicas - currentReplicas
var average int32
var reminder int32
var i int
var leftSubsetsCount int32
if diff > 0 {
// UnitedDeployment is supposed to scale out replicas.
// The policy here is try to allocate the new replicas as even as possible.
// But this policy is also try not to affect the subset which has the replicas more than the average.
// So it starts from the biggest index subset, which has the most replicas.
for i = consideredLen - 1; i >= 0; i-- {
// Consider the subsets from index 0 to i
leftSubsetsCount = int32(i) + 1
average = expectedReplicas / leftSubsetsCount
consideredAverage := average
reminder = expectedReplicas % leftSubsetsCount
if reminder > 0 {
consideredAverage++
}
// If the i th subset, which currently have the most replicas, has more replicas than the average, give up this try.
if consideredAverage < s.subsets.Get(i).Replicas {
expectedReplicas -= s.subsets.Get(i).Replicas
continue
}
break
}
for j := i; j > -1; j-- {
if reminder > 0 {
s.subsets.Get(j).Replicas = average + 1
reminder--
} else {
s.subsets.Get(j).Replicas = average
}
}
} else if diff < 0 {
// Right now, UnitedDeployment is scaling in.
// It is also considering to allocate the replicas as average as possible. But this time, it is scaling in,
// so the subsets which have the less replicas than the average replicas are not supposed to be bothered.
for i = 0; i < consideredLen; i++ {
leftSubsetsCount = int32(consideredLen - i)
average = expectedReplicas / leftSubsetsCount
reminder = expectedReplicas % leftSubsetsCount
if average > s.subsets.Get(i).Replicas {
expectedReplicas -= s.subsets.Get(i).Replicas
continue
}
break
}
for j := i; j < consideredLen; j++ {
if leftSubsetsCount <= reminder {
s.subsets.Get(j).Replicas = average + 1
} else {
s.subsets.Get(j).Replicas = average
leftSubsetsCount--
}
}
}
return s.toSubsetReplicaMap()
}
func (s *replicasAllocator) toSubsetReplicaMap() *map[string]int32 {
allocatedReplicas := map[string]int32{}
for _, subset := range *s.subsets {
allocatedReplicas[subset.SubsetName] = subset.Replicas
}
return &allocatedReplicas
}
func (s *replicasAllocator) String() string {
result := ""
sort.Sort(s.subsets)
for _, subset := range *s.subsets {
result = fmt.Sprintf("%s %s -> %d;", result, subset.SubsetName, subset.Replicas)
}
return result
}

View File

@ -0,0 +1,285 @@
/*
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package uniteddeployment
import (
"testing"
)
func TestScaleReplicas(t *testing.T) {
infos := subsetInfos{}
infos = append(infos,
createSubset("t1", 1),
createSubset("t2", 4),
createSubset("t3", 2),
createSubset("t4", 2))
allocator := infos.SortToAllocator()
allocator.AllocateReplicas(5, &map[string]int32{})
if " t1 -> 1; t3 -> 1; t4 -> 1; t2 -> 2;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
infos = subsetInfos{}
infos = append(infos,
createSubset("t1", 2))
infos = subsetInfos(infos)
allocator = infos.SortToAllocator()
allocator.AllocateReplicas(0, &map[string]int32{})
if " t1 -> 0;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
infos = subsetInfos{}
infos = append(infos,
createSubset("t1", 1),
createSubset("t2", 4),
createSubset("t3", 2),
createSubset("t4", 2))
allocator = infos.SortToAllocator()
allocator.AllocateReplicas(17, &map[string]int32{})
if " t1 -> 4; t3 -> 4; t4 -> 4; t2 -> 5;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
infos = subsetInfos{}
infos = append(infos,
createSubset("t1", 2),
createSubset("t2", 4),
createSubset("t3", 2),
createSubset("t4", 2))
infos = subsetInfos(infos)
allocator = infos.SortToAllocator()
allocator.AllocateReplicas(9, &map[string]int32{})
if " t1 -> 2; t3 -> 2; t4 -> 2; t2 -> 3;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
infos = subsetInfos{}
infos = append(infos,
createSubset("t1", 0),
createSubset("t2", 10))
allocator = infos.SortToAllocator()
allocator.AllocateReplicas(19, &map[string]int32{})
if " t1 -> 9; t2 -> 10;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
infos = subsetInfos{}
infos = append(infos,
createSubset("t1", 0),
createSubset("t2", 10))
allocator = infos.SortToAllocator()
allocator.AllocateReplicas(21, &map[string]int32{})
if " t1 -> 10; t2 -> 11;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
}
func TestSpecifyValidReplicas(t *testing.T) {
infos := subsetInfos{}
infos = append(infos,
createSubset("t1", 1),
createSubset("t2", 4),
createSubset("t3", 2),
createSubset("t4", 2))
allocator := infos.SortToAllocator()
allocator.AllocateReplicas(27, &map[string]int32{
"t1": 4,
"t3": 4,
})
if " t1 -> 4; t3 -> 4; t4 -> 9; t2 -> 10;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
infos = subsetInfos{}
infos = append(infos,
createSubset("t1", 2),
createSubset("t2", 4),
createSubset("t3", 2),
createSubset("t4", 2))
allocator = infos.SortToAllocator()
allocator.AllocateReplicas(8, &map[string]int32{
"t1": 4,
"t3": 4,
})
if " t2 -> 0; t4 -> 0; t1 -> 4; t3 -> 4;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
infos = subsetInfos{}
infos = append(infos,
createSubset("t1", 2),
createSubset("t2", 4),
createSubset("t3", 2),
createSubset("t4", 2))
allocator = infos.SortToAllocator()
allocator.AllocateReplicas(16, &map[string]int32{
"t1": 4,
"t2": 4,
"t3": 4,
"t4": 4,
})
if " t1 -> 4; t2 -> 4; t3 -> 4; t4 -> 4;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
infos = subsetInfos{}
infos = append(infos,
createSubset("t1", 4),
createSubset("t2", 2),
createSubset("t3", 2),
createSubset("t4", 2))
allocator = infos.SortToAllocator()
allocator.AllocateReplicas(10, &map[string]int32{
"t1": 1,
"t2": 2,
"t3": 3,
})
if " t1 -> 1; t2 -> 2; t3 -> 3; t4 -> 4;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
infos = subsetInfos{}
infos = append(infos,
createSubset("t1", 4),
createSubset("t2", 2),
createSubset("t3", 2),
createSubset("t4", 2))
allocator = infos.SortToAllocator()
allocator.AllocateReplicas(10, &map[string]int32{
"t1": 1,
"t2": 2,
"t3": 3,
"t4": 4,
})
if " t1 -> 1; t2 -> 2; t3 -> 3; t4 -> 4;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
}
func TestSpecifyInvalidReplicas(t *testing.T) {
// ignore specified replicas if invalid
infos := subsetInfos{}
infos = append(infos,
createSubset("t1", 10),
createSubset("t2", 4))
allocator := infos.SortToAllocator()
allocator.AllocateReplicas(17, &map[string]int32{
"t1": 6,
"t2": 6,
})
if " t2 -> 7; t1 -> 10;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
infos = subsetInfos{}
infos = append(infos,
createSubset("t1", 10),
createSubset("t2", 4))
allocator = infos.SortToAllocator()
allocator.AllocateReplicas(17, &map[string]int32{
"t1": 10,
"t2": 11,
})
if " t2 -> 7; t1 -> 10;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
infos = subsetInfos{}
infos = append(infos,
createSubset("t1", 10),
createSubset("t2", 4))
allocator = infos.SortToAllocator()
allocator.AllocateReplicas(14, &map[string]int32{
"t1": 6,
"t2": 6,
})
if " t2 -> 4; t1 -> 10;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
infos = subsetInfos{}
infos = append(infos,
createSubset("t1", 10),
createSubset("t2", 4))
allocator = infos.SortToAllocator()
allocator.AllocateReplicas(14, &map[string]int32{
"t1": 10,
"t2": 11,
})
if " t2 -> 4; t1 -> 10;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
infos = subsetInfos{}
infos = append(infos,
createSubset("t1", 10),
createSubset("t2", 4))
allocator = infos.SortToAllocator()
allocator.AllocateReplicas(10, &map[string]int32{
"t1": 6,
"t2": 6,
})
if " t2 -> 4; t1 -> 6;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
infos = subsetInfos{}
infos = append(infos,
createSubset("t1", 10),
createSubset("t2", 4))
allocator = infos.SortToAllocator()
allocator.AllocateReplicas(10, &map[string]int32{
"t1": 10,
"t2": 11,
})
if " t2 -> 4; t1 -> 6;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
infos = subsetInfos{}
infos = append(infos,
createSubset("t1", 10),
createSubset("t2", 4))
allocator = infos.SortToAllocator()
allocator.AllocateReplicas(4, &map[string]int32{
"t1": 6,
"t2": 6,
})
if " t1 -> 2; t2 -> 2;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
infos = subsetInfos{}
infos = append(infos,
createSubset("t1", 10),
createSubset("t2", 4))
allocator = infos.SortToAllocator()
allocator.AllocateReplicas(24, &map[string]int32{
"t1": 27,
})
if " t1 -> 12; t2 -> 12;" != allocator.String() {
t.Fatalf("unexpected %s", allocator)
}
}
func createSubset(name string, replicas int32) *nameToReplicas {
return &nameToReplicas{
Replicas: replicas,
SubsetName: name,
}
}

View File

@ -1,3 +1,20 @@
/*
Copyright 2019 The Kruise Authors.
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package uniteddeployment
import (
@ -138,7 +155,17 @@ func (r *ReconcileUnitedDeployment) cleanExpiredRevision(ud *appsalphav1.UnitedD
return sortedRevisions, nil
}
live := map[string]bool{}
live[ud.Status.CurrentRevision] = true
if ud.Status.UpdateStatus != nil {
live[ud.Status.UpdateStatus.UpdatedRevision] = true
}
for i, revision := range *sortedRevisions {
if _, exist := live[revision.Name]; exist {
continue
}
if i >= exceedNum {
break
}

View File

@ -1,10 +1,23 @@
/*
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package uniteddeployment
import (
"fmt"
"sync"
"testing"
"time"
"github.com/onsi/gomega"
"golang.org/x/net/context"
@ -13,16 +26,10 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
appsv1alpha1 "github.com/openkruise/kruise/pkg/apis/apps/v1alpha1"
)
var (
one int32 = 1
)
func TestRevisionManage(t *testing.T) {
g, requests, stopMgr, mgrStopped := setUp(t)
defer func() {
@ -112,7 +119,7 @@ func TestRevisionManage(t *testing.T) {
g.Expect(c.Get(context.TODO(), client.ObjectKey{Namespace: instance.Namespace, Name: instance.Name}, instance)).Should(gomega.BeNil())
instance.Spec.Template.StatefulSetTemplate.Labels["version"] = "v2"
g.Expect(c.Update(context.TODO(), instance)).Should(gomega.BeNil())
g.Eventually(requests, timeout).Should(gomega.Receive(gomega.Equal(expectedRequest)))
waitReconcilerProcessFinished(g, requests, 0)
revisionList = &appsv1.ControllerRevisionList{}
g.Expect(c.List(context.TODO(), &client.ListOptions{}, revisionList)).Should(gomega.BeNil())
@ -121,97 +128,9 @@ func TestRevisionManage(t *testing.T) {
g.Expect(c.Get(context.TODO(), client.ObjectKey{Namespace: instance.Namespace, Name: instance.Name}, instance)).Should(gomega.BeNil())
instance.Spec.Template.StatefulSetTemplate.Spec.Template.Spec.Containers[0].Image = "nginx:1.1"
g.Expect(c.Update(context.TODO(), instance)).Should(gomega.BeNil())
g.Eventually(requests, timeout).Should(gomega.Receive(gomega.Equal(expectedRequest)))
waitReconcilerProcessFinished(g, requests, 0)
revisionList = &appsv1.ControllerRevisionList{}
g.Expect(c.List(context.TODO(), &client.ListOptions{}, revisionList)).Should(gomega.BeNil())
g.Expect(len(revisionList.Items)).Should(gomega.BeEquivalentTo(2))
}
func setUp(t *testing.T) (*gomega.GomegaWithT, chan reconcile.Request, chan struct{}, *sync.WaitGroup) {
g := gomega.NewGomegaWithT(t)
// Setup the Manager and Controller. Wrap the Controller Reconcile function so it writes each request to a
// channel when it is finished.
mgr, err := manager.New(cfg, manager.Options{})
g.Expect(err).NotTo(gomega.HaveOccurred())
c = mgr.GetClient()
recFn, requests := SetupTestReconcile(newReconciler(mgr))
g.Expect(add(mgr, recFn)).NotTo(gomega.HaveOccurred())
stopMgr, mgrStopped := StartTestManager(mgr, g)
return g, requests, stopMgr, mgrStopped
}
func clean(g *gomega.GomegaWithT, c client.Client) {
udList := &appsv1alpha1.UnitedDeploymentList{}
if err := c.List(context.TODO(), &client.ListOptions{}, udList); err == nil {
for _, ud := range udList.Items {
c.Delete(context.TODO(), &ud)
}
}
g.Eventually(func() error {
if err := c.List(context.TODO(), &client.ListOptions{}, udList); err != nil {
return err
}
if len(udList.Items) != 0 {
return fmt.Errorf("expected %d sts, got %d", 0, len(udList.Items))
}
return nil
}, timeout, time.Second).Should(gomega.Succeed())
rList := &appsv1.ControllerRevisionList{}
if err := c.List(context.TODO(), &client.ListOptions{}, rList); err == nil {
for _, ud := range rList.Items {
c.Delete(context.TODO(), &ud)
}
}
g.Eventually(func() error {
if err := c.List(context.TODO(), &client.ListOptions{}, rList); err != nil {
return err
}
if len(rList.Items) != 0 {
return fmt.Errorf("expected %d sts, got %d", 0, len(rList.Items))
}
return nil
}, timeout, time.Second).Should(gomega.Succeed())
stsList := &appsv1.StatefulSetList{}
if err := c.List(context.TODO(), &client.ListOptions{}, stsList); err == nil {
for _, sts := range stsList.Items {
c.Delete(context.TODO(), &sts)
}
}
g.Eventually(func() error {
if err := c.List(context.TODO(), &client.ListOptions{}, stsList); err != nil {
return err
}
if len(stsList.Items) != 0 {
return fmt.Errorf("expected %d sts, got %d", 0, len(stsList.Items))
}
return nil
}, timeout, time.Second).Should(gomega.Succeed())
podList := &corev1.PodList{}
if err := c.List(context.TODO(), &client.ListOptions{}, podList); err == nil {
for _, pod := range podList.Items {
c.Delete(context.TODO(), &pod)
}
}
g.Eventually(func() error {
if err := c.List(context.TODO(), &client.ListOptions{}, podList); err != nil {
return err
}
if len(podList.Items) != 0 {
return fmt.Errorf("expected %d sts, got %d", 0, len(podList.Items))
}
return nil
}, timeout, time.Second).Should(gomega.Succeed())
}

View File

@ -0,0 +1,335 @@
/*
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package uniteddeployment
import (
"context"
"fmt"
"regexp"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
alpha1 "github.com/openkruise/kruise/pkg/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/util/refmanager"
)
var statefulPodRegex = regexp.MustCompile("(.*)-([0-9]+)$")
// StatefulSetControl provides subset operations of StatefulSet.
type StatefulSetControl struct {
client.Client
scheme *runtime.Scheme
}
// GetAllSubsets returns all of subsets owned by the UnitedDeployment.
func (m *StatefulSetControl) GetAllSubsets(ud *alpha1.UnitedDeployment) (podSets []*Subset, err error) {
selector, err := metav1.LabelSelectorAsSelector(ud.Spec.Selector)
if err != nil {
return nil, err
}
setList := &appsv1.StatefulSetList{}
err = m.Client.List(context.TODO(), &client.ListOptions{LabelSelector: selector}, setList)
if err != nil {
return nil, err
}
manager, err := refmanager.New(m.Client, ud.Spec.Selector, ud, m.scheme)
if err != nil {
return nil, err
}
selected := make([]metav1.Object, len(setList.Items))
for i, set := range setList.Items {
selected[i] = set.DeepCopy()
}
claimedSets, err := manager.ClaimOwnedObjects(selected)
if err != nil {
return nil, err
}
for _, claimedSet := range claimedSets {
podSet, err := m.convertToSubset(claimedSet.(*appsv1.StatefulSet))
if err != nil {
return nil, err
}
podSets = append(podSets, podSet)
}
return podSets, nil
}
// CreateSubset creates the StatefulSet depending on the inputs.
func (m *StatefulSetControl) CreateSubset(ud *alpha1.UnitedDeployment, subsetName string, revision string, replicas, partition int32) error {
set := &appsv1.StatefulSet{}
applyStatefulSetTemplate(ud, subsetName, revision, m.scheme, replicas, partition, set)
klog.V(4).Infof("Have %d replicas when creating StatefulSet %s/%s", *set.Spec.Replicas, set.Namespace, set.Name)
return m.Create(context.TODO(), set)
}
func applyStatefulSetTemplate(ud *alpha1.UnitedDeployment, subsetName string, revision string, scheme *runtime.Scheme, replicas, partition int32, set *appsv1.StatefulSet) error {
var subSetConfig *alpha1.Subset
for _, subset := range ud.Spec.Topology.Subsets {
if subset.Name == subsetName {
subSetConfig = &subset
break
}
}
if subSetConfig == nil {
return fmt.Errorf("fail to find subset config %s", subsetName)
}
set.Namespace = ud.Namespace
if set.Labels == nil {
set.Labels = map[string]string{}
}
for k, v := range ud.Spec.Template.StatefulSetTemplate.Labels {
set.Labels[k] = v
}
for k, v := range ud.Spec.Selector.MatchLabels {
set.Labels[k] = v
}
set.Labels[alpha1.ControllerRevisionHashLabelKey] = revision
// record the subset name as a label
set.Labels[alpha1.SubSetNameLabelKey] = subsetName
if set.Annotations == nil {
set.Annotations = map[string]string{}
}
for k, v := range ud.Spec.Template.StatefulSetTemplate.Annotations {
set.Annotations[k] = v
}
set.GenerateName = getSubsetPrefix(ud.Name, subsetName)
selectors := ud.Spec.Selector.DeepCopy()
selectors.MatchLabels[alpha1.SubSetNameLabelKey] = subsetName
if err := controllerutil.SetControllerReference(ud, set, scheme); err != nil {
return err
}
set.Spec.Selector = selectors
set.Spec.Replicas = &replicas
if set.Spec.UpdateStrategy.Type == appsv1.RollingUpdateStatefulSetStrategyType {
if set.Spec.UpdateStrategy.RollingUpdate == nil {
set.Spec.UpdateStrategy.RollingUpdate = &appsv1.RollingUpdateStatefulSetStrategy{}
}
set.Spec.UpdateStrategy.RollingUpdate.Partition = &partition
}
set.Spec.Template = *ud.Spec.Template.StatefulSetTemplate.Spec.Template.DeepCopy()
if set.Spec.Template.Labels == nil {
set.Spec.Template.Labels = map[string]string{}
}
set.Spec.Template.Labels[alpha1.SubSetNameLabelKey] = subsetName
set.Spec.Template.Labels[alpha1.ControllerRevisionHashLabelKey] = revision
attachNodeAffinity(&set.Spec.Template.Spec, subSetConfig)
return nil
}
// UpdateSubset is used to update the subset. The target StatefulSet can be found with the input subset.
func (m *StatefulSetControl) UpdateSubset(subset *Subset, ud *alpha1.UnitedDeployment, revision string, replicas, partition int32) error {
set := &appsv1.StatefulSet{}
var updateError error
for i := 0; i < updateRetries; i++ {
getError := m.Client.Get(context.TODO(), m.objectKey(&subset.ObjectMeta), set)
if getError != nil {
return getError
}
if err := applyStatefulSetTemplate(ud, subset.Spec.SubsetName, revision, m.scheme, replicas, partition, set); err != nil {
return err
}
updateError = m.Client.Update(context.TODO(), set)
if updateError == nil {
break
}
}
if updateError != nil {
return updateError
}
if set.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType {
return nil
}
// If RollingUpdate, work around for issue https://github.com/kubernetes/kubernetes/issues/67250
return m.deleteStuckPods(set, revision, partition)
}
// DeleteSubset is called to delete the subset. The target StatefulSet can be found with the input subset.
func (m *StatefulSetControl) DeleteSubset(podSet *Subset) error {
set := podSet.Spec.SubsetRef.Resources[0].(*appsv1.StatefulSet)
return m.Delete(context.TODO(), set, client.PropagationPolicy(metav1.DeletePropagationBackground))
}
func (m *StatefulSetControl) convertToSubset(set *appsv1.StatefulSet) (*Subset, error) {
subSetName, err := getSubsetNameFrom(set)
if err != nil {
return nil, err
}
subset := &Subset{}
subset.ObjectMeta = *set.ObjectMeta.DeepCopy()
subset.Spec.SubsetName = subSetName
if set.Spec.Replicas != nil {
subset.Spec.Replicas = *set.Spec.Replicas
}
pods, err := m.getStatefulSetPods(set)
if err != nil {
return subset, err
}
subset.Spec.UpdateStrategy.Partition = 0
if set.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType {
revision := getRevision(&set.ObjectMeta)
subset.Spec.UpdateStrategy.Partition = getCurrentPartition(pods, revision)
} else if set.Spec.UpdateStrategy.RollingUpdate != nil &&
set.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
subset.Spec.UpdateStrategy.Partition = *set.Spec.UpdateStrategy.RollingUpdate.Partition
}
subset.Spec.SubsetRef.Resources = append(subset.Spec.SubsetRef.Resources, set)
subset.Status.ObservedGeneration = set.Status.ObservedGeneration
subset.Status.Replicas = set.Status.Replicas
subset.Status.ReadyReplicas = set.Status.ReadyReplicas
subset.Status.RevisionReplicas = calculateStatus(pods, set)
return subset, nil
}
// getCurrentPartition calculates current partition by counting the pods not having the updated revision
func getCurrentPartition(pods []*corev1.Pod, revision string) int32 {
var partition int32
for _, pod := range pods {
if getRevision(&pod.ObjectMeta) != revision {
partition++
}
}
return partition
}
// deleteStucckPods tries to work around the blocking issue https://github.com/kubernetes/kubernetes/issues/67250
func (m *StatefulSetControl) deleteStuckPods(set *appsv1.StatefulSet, revision string, partition int32) error {
pods, err := m.getStatefulSetPods(set)
if err != nil {
return err
}
for i := range pods {
pod := pods[i]
// If the pod is considered as stuck, delete it.
if isPodStuckForRollingUpdate(pod, revision, partition) {
klog.V(2).Infof("Delete pod %s/%s at stuck state", pod.Namespace, pod.Name)
err = m.deletePod(pod)
if err != nil {
return err
}
}
}
return nil
}
func (m *StatefulSetControl) deletePod(pod *corev1.Pod) error {
return m.Delete(context.TODO(), pod, client.PropagationPolicy(metav1.DeletePropagationBackground))
}
func (m *StatefulSetControl) getStatefulSetPods(set *appsv1.StatefulSet) ([]*corev1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return nil, err
}
podList := &corev1.PodList{}
err = m.Client.List(context.TODO(), &client.ListOptions{LabelSelector: selector}, podList)
if err != nil {
return nil, err
}
manager, err := refmanager.New(m.Client, set.Spec.Selector, set, m.scheme)
if err != nil {
return nil, err
}
selected := make([]metav1.Object, len(podList.Items))
for i, pod := range podList.Items {
selected[i] = pod.DeepCopy()
}
claimed, err := manager.ClaimOwnedObjects(selected)
if err != nil {
return nil, err
}
claimedPods := make([]*corev1.Pod, len(claimed))
for i, pod := range claimed {
claimedPods[i] = pod.(*corev1.Pod)
}
return claimedPods, nil
}
func calculateStatus(podList []*corev1.Pod, set *appsv1.StatefulSet) (revisionReplicas map[string]*SubsetReplicaStatus) {
revisionReplicas = map[string]*SubsetReplicaStatus{}
for _, pod := range podList {
revision := getRevision(&pod.ObjectMeta)
status, exist := revisionReplicas[revision]
if !exist {
status = &SubsetReplicaStatus{}
revisionReplicas[revision] = status
}
status.Replicas++
if podutil.IsPodReady(pod) {
status.ReadyReplicas++
}
}
return
}
// isPodStuckForRollingUpdate checks whether the pod is stuck under strategy RollingUpdate.
// If a pod needs to upgrade (pod_ordinal >= partition && pod_revision != sts_revision)
// and its readiness is false, or worse status like Pending, ImagePullBackOff, it will be blocked.
func isPodStuckForRollingUpdate(pod *corev1.Pod, revision string, partition int32) bool {
if getOrdinal(pod) < partition {
return false
}
if getRevision(pod) == revision {
return false
}
return !podutil.IsPodReadyConditionTrue(pod.Status)
}
func (m *StatefulSetControl) objectKey(objMeta *metav1.ObjectMeta) client.ObjectKey {
return types.NamespacedName{
Namespace: objMeta.Namespace,
Name: objMeta.Name,
}
}

View File

@ -0,0 +1,87 @@
/*
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package uniteddeployment
import (
"fmt"
"testing"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
appsv1alpha1 "github.com/openkruise/kruise/pkg/apis/apps/v1alpha1"
)
func TestGetCurrentPartitionForStrategyOnDelete(t *testing.T) {
currentPods := buildPodList([]int{0, 1, 2}, []string{"v1", "v2", "v2"}, t)
if partition := getCurrentPartition(currentPods, "v2"); partition != 1 {
t.Fatalf("expected partition 1, got %d", partition)
}
currentPods = buildPodList([]int{0, 1, 2}, []string{"v1", "v1", "v2"}, t)
if partition := getCurrentPartition(currentPods, "v2"); partition != 2 {
t.Fatalf("expected partition 2, got %d", partition)
}
currentPods = buildPodList([]int{0, 1, 2, 3}, []string{"v2", "v1", "v2", "v2"}, t)
if partition := getCurrentPartition(currentPods, "v2"); partition != 1 {
t.Fatalf("expected partition 1, got %d", partition)
}
currentPods = buildPodList([]int{1, 2, 3}, []string{"v1", "v2", "v2"}, t)
if partition := getCurrentPartition(currentPods, "v2"); partition != 1 {
t.Fatalf("expected partition 1, got %d", partition)
}
currentPods = buildPodList([]int{0, 1, 3}, []string{"v2", "v1", "v2"}, t)
if partition := getCurrentPartition(currentPods, "v2"); partition != 1 {
t.Fatalf("expected partition 1, got %d", partition)
}
currentPods = buildPodList([]int{0, 1, 2}, []string{"v1", "v1", "v1"}, t)
if partition := getCurrentPartition(currentPods, "v2"); partition != 3 {
t.Fatalf("expected partition 3, got %d", partition)
}
currentPods = buildPodList([]int{0, 1, 2, 4}, []string{"v1", "", "v2", "v3"}, t)
if partition := getCurrentPartition(currentPods, "v2"); partition != 3 {
t.Fatalf("expected partition 3, got %d", partition)
}
}
func buildPodList(ordinals []int, revisions []string, t *testing.T) []*corev1.Pod {
if len(ordinals) != len(revisions) {
t.Fatalf("ordinals count should equals to revision count")
}
pods := []*corev1.Pod{}
for i, ordinal := range ordinals {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: fmt.Sprintf("pod-%d", ordinal),
},
}
if revisions[i] != "" {
pod.Labels = map[string]string{
appsv1alpha1.ControllerRevisionHashLabelKey: revisions[i],
}
}
pods = append(pods, pod)
}
return pods
}

View File

@ -0,0 +1,43 @@
/*
Copyright 2019 The Kruise Authors.
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package uniteddeployment
import (
"strconv"
corev1 "k8s.io/api/core/v1"
)
func getOrdinal(pod *corev1.Pod) int32 {
_, ordinal := getParentNameAndOrdinal(pod)
return ordinal
}
func getParentNameAndOrdinal(pod *corev1.Pod) (string, int32) {
parent := ""
var ordinal int32 = -1
subMatches := statefulPodRegex.FindStringSubmatch(pod.Name)
if len(subMatches) < 3 {
return parent, ordinal
}
parent = subMatches[1]
if i, err := strconv.ParseInt(subMatches[2], 10, 32); err == nil {
ordinal = int32(i)
}
return parent, ordinal
}

View File

@ -0,0 +1,75 @@
/*
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package uniteddeployment
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
appsv1alpha1 "github.com/openkruise/kruise/pkg/apis/apps/v1alpha1"
)
// Subset stores the details of a subset resource owned by one UnitedDeployment.
type Subset struct {
metav1.ObjectMeta
Spec SubsetSpec
Status SubsetStatus
}
// SubsetSpec stores the spec details of the Subset
type SubsetSpec struct {
SubsetName string
Replicas int32
UpdateStrategy SubsetUpdateStrategy
SubsetRef ResourceRef
}
// SubsetStatus stores the observed state of the Subset.
type SubsetStatus struct {
ObservedGeneration int64
Replicas int32
ReadyReplicas int32
RevisionReplicas map[string]*SubsetReplicaStatus
}
// SubsetReplicaStatus store the replicas of pods under corresponding revision
type SubsetReplicaStatus struct {
Replicas int32
ReadyReplicas int32
}
// SubsetUpdateStrategy stores the strategy detail of the Subset.
type SubsetUpdateStrategy struct {
Partition int32
}
// ResourceRef stores the Subset resource it represents.
type ResourceRef struct {
Resources []metav1.Object
}
// ControlInterface defines the interface that UnitedDeployment uses to list, create, update, and delete Subsets.
type ControlInterface interface {
// GetAllSubsets returns the subsets which are managed by the UnitedDeployment
GetAllSubsets(ud *appsv1alpha1.UnitedDeployment) ([]*Subset, error)
// // CreateSubset creates the subset depending on the inputs.
CreateSubset(ud *appsv1alpha1.UnitedDeployment, unit string, revision string, replicas, partition int32) error
// UpdateSubset updates the target subset with the input information.
UpdateSubset(subSet *Subset, ud *appsv1alpha1.UnitedDeployment, revision string, replicas, partition int32) error
// UpdateSubset is used to delete the input subset.
DeleteSubset(*Subset) error
}

View File

@ -18,28 +18,41 @@ package uniteddeployment
import (
"context"
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
"sigs.k8s.io/controller-runtime/pkg/source"
appsv1alpha1 "github.com/openkruise/kruise/pkg/apis/apps/v1alpha1"
)
var log = logf.Log.WithName("controller")
const (
controllerName = "uniteddeployment-controller"
/**
* USER ACTION REQUIRED: This is a scaffold file intended for the user to modify with their own Controller
* business logic. Delete these comments after modifying this file.*
*/
eventTypeRevisionProvision = "RevisionProvision"
eventTypeFindSubsets = "FindSubsets"
eventTypeDupSubsetsDelete = "DeleteDuplicatedSubsets"
eventTypeSubsetsUpdate = "UpdateSubset"
eventTypeSpecifySubbsetReplicas = "SpecifySubsetReplicas"
slowStartInitialBatchSize = 1
)
type subSetType string
const (
statefulSetSubSetType subSetType = "StatefulSet"
)
// Add creates a new UnitedDeployment Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
@ -49,13 +62,21 @@ func Add(mgr manager.Manager) error {
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileUnitedDeployment{Client: mgr.GetClient(), scheme: mgr.GetScheme()}
return &ReconcileUnitedDeployment{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
recorder: mgr.GetRecorder(controllerName),
subSetControls: map[subSetType]ControlInterface{
statefulSetSubSetType: &StatefulSetControl{Client: mgr.GetClient(), scheme: mgr.GetScheme()},
},
}
}
// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
c, err := controller.New("uniteddeployment-controller", mgr, controller.Options{Reconciler: r})
c, err := controller.New(controllerName, mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}
@ -66,9 +87,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}
// TODO(user): Modify this to be the types you create
// Uncomment watch a Deployment created by UnitedDeployment - change this for objects you create
err = c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForOwner{
err = c.Watch(&source.Kind{Type: &appsv1.StatefulSet{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &appsv1alpha1.UnitedDeployment{},
})
@ -85,34 +104,156 @@ var _ reconcile.Reconciler = &ReconcileUnitedDeployment{}
type ReconcileUnitedDeployment struct {
client.Client
scheme *runtime.Scheme
recorder record.EventRecorder
subSetControls map[subSetType]ControlInterface
}
// Reconcile reads that state of the cluster for a UnitedDeployment object and makes changes based on the state read
// and what is in the UnitedDeployment.Spec
// TODO(user): Modify this Reconcile function to implement your Controller logic. The scaffolding writes
// a Deployment as an example
// Automatically generate RBAC rules to allow the Controller to read and write Deployments
// +kubebuilder:rbac:groups=apps.kruise.io,resources=uniteddeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps.kruise.io,resources=uniteddeployments/status,verbs=get;update;patch
func (r *ReconcileUnitedDeployment) Reconcile(request reconcile.Request) (reconcile.Result, error) {
klog.V(4).Infof("Reconcile UnitedDeployment %s/%s", request.Namespace, request.Name)
// Fetch the UnitedDeployment instance
instance := &appsv1alpha1.UnitedDeployment{}
err := r.Get(context.TODO(), request.NamespacedName, instance)
if err != nil {
if errors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
return reconcile.Result{}, err
}
_, _, _, _, err = r.constructUnitedDeploymentRevisions(instance)
if instance.DeletionTimestamp != nil {
return reconcile.Result{}, nil
}
currentRevision, updatedRevision, _, _, err := r.constructUnitedDeploymentRevisions(instance)
if err != nil {
klog.Errorf("Fail to construct controller revision of UnitedDeployment %s/%s: %s", instance.Namespace, instance.Name, err)
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeRevisionProvision), err.Error())
return reconcile.Result{}, err
}
control, subsetType := r.getSubsetControls(instance)
klog.V(4).Infof("Get UnitedDeployment %s/%s all subsets", request.Namespace, request.Name)
nameToSubset, err := r.getNameToSubset(instance, control)
if err != nil {
klog.Errorf("Fail to get Subsets of UnitedDeployment %s/%s: %s", instance.Namespace, instance.Name, err)
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeFindSubsets), err.Error())
return reconcile.Result{}, nil
}
nextReplicas, effectiveSpecifiedReplicas, ineffectiveReason := GetAllocatedReplicas(nameToSubset, instance)
klog.V(4).Infof("Get UnitedDeployment %s/%s next replicas %v", instance.Namespace, instance.Name, nextReplicas)
if !effectiveSpecifiedReplicas {
r.recorder.Eventf(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeSpecifySubbsetReplicas), "Specified subset replicas is ineffective: %s", ineffectiveReason)
}
nextPartitions := calcNextPartitions(instance, nextReplicas)
klog.V(4).Infof("Get UnitedDeployment %s/%s next partition %v", instance.Namespace, instance.Name, nextPartitions)
if _, err := r.manageSubsetProvision(instance, nameToSubset, nextReplicas, nextPartitions, currentRevision, updatedRevision, subsetType); err != nil {
klog.Errorf("Fail to update UnitedDeployment %s/%s: %s", instance.Namespace, instance.Name, err)
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeSubsetsUpdate), err.Error())
return reconcile.Result{}, nil
}
return reconcile.Result{}, nil
}
func (r *ReconcileUnitedDeployment) getNameToSubset(instance *appsv1alpha1.UnitedDeployment, control ControlInterface) (*map[string]*Subset, error) {
subSets, err := control.GetAllSubsets(instance)
if err != nil {
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeFindSubsets), err.Error())
return nil, fmt.Errorf("fail to get all Subsets for UnitedDeployment %s/%s: %s", instance.Namespace, instance.Name, err)
}
klog.V(4).Infof("Classify UnitedDeployment %s/%s by subSet name", instance.Namespace, instance.Name)
nameToSubsets := r.classifySubsetBySubsetName(instance, subSets)
nameToSubset, err := r.deleteDupSubset(instance, nameToSubsets, control)
if err != nil {
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeDupSubsetsDelete), err.Error())
return nil, fmt.Errorf("fail to manage duplicate Subset of UnitedDeployment %s/%s: %s", instance.Namespace, instance.Name, err)
}
return nameToSubset, nil
}
func calcNextPartitions(ud *appsv1alpha1.UnitedDeployment, nextReplicas *map[string]int32) *map[string]int32 {
partitions := map[string]int32{}
for _, subset := range ud.Spec.Topology.Subsets {
var subsetPartition int32
if ud.Spec.UpdateStrategy.Type == appsv1alpha1.ManualUpdateStrategyType && ud.Spec.UpdateStrategy.ManualUpdate != nil && ud.Spec.UpdateStrategy.ManualUpdate.Partitions != nil {
if partition, exist := ud.Spec.UpdateStrategy.ManualUpdate.Partitions[subset.Name]; exist {
subsetPartition = partition
}
}
if subsetReplicas, exist := (*nextReplicas)[subset.Name]; exist && subsetPartition > subsetReplicas {
subsetPartition = subsetReplicas
}
partitions[subset.Name] = subsetPartition
}
return &partitions
}
var subsetReplicasFn = subSetReplicas
func subSetReplicas(subset *Subset) int32 {
return subset.Status.Replicas
}
func (r *ReconcileUnitedDeployment) deleteDupSubset(ud *appsv1alpha1.UnitedDeployment, nameToSubsets map[string][]*Subset, control ControlInterface) (*map[string]*Subset, error) {
nameToSubset := map[string]*Subset{}
for name, subsets := range nameToSubsets {
if len(subsets) > 1 {
for _, subset := range subsets[1:] {
klog.V(0).Infof("Delete duplicated Subset %s/%s for subset name %s", subset.Namespace, subset.Name, name)
if err := control.DeleteSubset(subset); err != nil {
if errors.IsNotFound(err) {
continue
}
return &nameToSubset, err
}
}
}
if len(subsets) > 0 {
nameToSubset[name] = subsets[0]
}
}
return &nameToSubset, nil
}
func (r *ReconcileUnitedDeployment) getSubsetControls(instance *appsv1alpha1.UnitedDeployment) (ControlInterface, subSetType) {
if instance.Spec.Template.StatefulSetTemplate != nil {
return r.subSetControls[statefulSetSubSetType], statefulSetSubSetType
}
// unexpected
return nil, statefulSetSubSetType
}
func (r *ReconcileUnitedDeployment) classifySubsetBySubsetName(ud *appsv1alpha1.UnitedDeployment, subsets []*Subset) map[string][]*Subset {
mapping := map[string][]*Subset{}
for _, ss := range subsets {
subSetName, err := getSubsetNameFrom(ss)
if err != nil {
// filter out Subset without correct Subset name
continue
}
mapping[subSetName] = append(mapping[subSetName], ss)
}
return mapping
}

View File

@ -0,0 +1,508 @@
/*
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package uniteddeployment
import (
"fmt"
"sync"
"testing"
"time"
"github.com/onsi/gomega"
"golang.org/x/net/context"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
appsv1alpha1 "github.com/openkruise/kruise/pkg/apis/apps/v1alpha1"
)
var c client.Client
var expectedRequest = reconcile.Request{NamespacedName: types.NamespacedName{Name: "foo", Namespace: "default"}}
var deploy = client.ObjectKey{Namespace: "default", Name: "foo"}
const timeout = time.Second * 2
var (
one int32 = 1
two int32 = 2
ten int32 = 10
)
func TestStsReconcile(t *testing.T) {
g, requests, stopMgr, mgrStopped := setUp(t)
defer func() {
clean(g, c)
close(stopMgr)
mgrStopped.Wait()
}()
instance := &appsv1alpha1.UnitedDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
},
Spec: appsv1alpha1.UnitedDeploymentSpec{
Replicas: &one,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"name": "foo",
},
},
Template: appsv1alpha1.SubsetTemplate{
StatefulSetTemplate: &appsv1alpha1.StatefulSetTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"name": "foo",
},
},
Spec: appsv1.StatefulSetSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"name": "foo",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "container-a",
Image: "nginx:1.0",
},
},
},
},
},
},
},
Topology: appsv1alpha1.Topology{
Subsets: []appsv1alpha1.Subset{
{
Name: "subset-a",
NodeSelector: corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "node-name",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"node-a"},
},
},
},
},
},
},
},
},
RevisionHistoryLimit: &ten,
},
}
// Create the UnitedDeployment object and expect the Reconcile and Deployment to be created
err := c.Create(context.TODO(), instance)
// The instance object may not be a valid object because it might be missing some required fields.
// Please modify the instance object by adding required fields and then remove the following if statement.
if apierrors.IsInvalid(err) {
t.Logf("failed to create object, got an invalid object error: %v", err)
return
}
g.Expect(err).NotTo(gomega.HaveOccurred())
defer c.Delete(context.TODO(), instance)
waitReconcilerProcessFinished(g, requests, 3)
expectedStsCount(g, 1)
}
func TestStsSubsetProvision(t *testing.T) {
g, requests, stopMgr, mgrStopped := setUp(t)
defer func() {
clean(g, c)
close(stopMgr)
mgrStopped.Wait()
}()
instance := &appsv1alpha1.UnitedDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
},
Spec: appsv1alpha1.UnitedDeploymentSpec{
Replicas: &one,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"name": "foo",
},
},
Template: appsv1alpha1.SubsetTemplate{
StatefulSetTemplate: &appsv1alpha1.StatefulSetTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"name": "foo",
},
},
Spec: appsv1.StatefulSetSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"name": "foo",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "container-a",
Image: "nginx:1.0",
},
},
},
},
},
},
},
Topology: appsv1alpha1.Topology{
Subsets: []appsv1alpha1.Subset{
{
Name: "subset-a",
NodeSelector: corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "node-name",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"node-a"},
},
},
},
},
},
},
},
},
RevisionHistoryLimit: &ten,
},
}
// Create the UnitedDeployment object and expect the Reconcile and Deployment to be created
err := c.Create(context.TODO(), instance)
// The instance object may not be a valid object because it might be missing some required fields.
// Please modify the instance object by adding required fields and then remove the following if statement.
if apierrors.IsInvalid(err) {
t.Logf("failed to create object, got an invalid object error: %v", err)
return
}
g.Expect(err).NotTo(gomega.HaveOccurred())
defer c.Delete(context.TODO(), instance)
waitReconcilerProcessFinished(g, requests, 3)
stsList := expectedStsCount(g, 1)
sts := &stsList.Items[0]
g.Expect(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).ShouldNot(gomega.BeNil())
g.Expect(len(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms)).Should(gomega.BeEquivalentTo(1))
g.Expect(len(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions)).Should(gomega.BeEquivalentTo(1))
g.Expect(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Key).Should(gomega.BeEquivalentTo("node-name"))
g.Expect(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Operator).Should(gomega.BeEquivalentTo(corev1.NodeSelectorOpIn))
g.Expect(len(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Values)).Should(gomega.BeEquivalentTo(1))
g.Expect(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Values[0]).Should(gomega.BeEquivalentTo("node-a"))
g.Expect(c.Get(context.TODO(), client.ObjectKey{Namespace: instance.Namespace, Name: instance.Name}, instance)).Should(gomega.BeNil())
instance.Spec.Topology.Subsets = append(instance.Spec.Topology.Subsets, appsv1alpha1.Subset{
Name: "subset-b",
NodeSelector: corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "node-name",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"node-b"},
},
},
},
},
},
})
g.Expect(c.Update(context.TODO(), instance)).Should(gomega.BeNil())
waitReconcilerProcessFinished(g, requests, 2)
stsList = expectedStsCount(g, 2)
sts = getSubsetByName(stsList, "subset-a")
g.Expect(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).ShouldNot(gomega.BeNil())
g.Expect(len(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms)).Should(gomega.BeEquivalentTo(1))
g.Expect(len(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions)).Should(gomega.BeEquivalentTo(1))
g.Expect(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Key).Should(gomega.BeEquivalentTo("node-name"))
g.Expect(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Operator).Should(gomega.BeEquivalentTo(corev1.NodeSelectorOpIn))
g.Expect(len(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Values)).Should(gomega.BeEquivalentTo(1))
g.Expect(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Values[0]).Should(gomega.BeEquivalentTo("node-a"))
sts = getSubsetByName(stsList, "subset-b")
g.Expect(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).ShouldNot(gomega.BeNil())
g.Expect(len(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms)).Should(gomega.BeEquivalentTo(1))
g.Expect(len(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions)).Should(gomega.BeEquivalentTo(1))
g.Expect(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Key).Should(gomega.BeEquivalentTo("node-name"))
g.Expect(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Operator).Should(gomega.BeEquivalentTo(corev1.NodeSelectorOpIn))
g.Expect(len(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Values)).Should(gomega.BeEquivalentTo(1))
g.Expect(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Values[0]).Should(gomega.BeEquivalentTo("node-b"))
g.Expect(c.Get(context.TODO(), client.ObjectKey{Namespace: instance.Namespace, Name: instance.Name}, instance)).Should(gomega.BeNil())
instance.Spec.Topology.Subsets = instance.Spec.Topology.Subsets[1:]
g.Expect(c.Update(context.TODO(), instance)).Should(gomega.BeNil())
waitReconcilerProcessFinished(g, requests, 2)
stsList = expectedStsCount(g, 1)
sts = &stsList.Items[0]
g.Expect(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).ShouldNot(gomega.BeNil())
g.Expect(len(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms)).Should(gomega.BeEquivalentTo(1))
g.Expect(len(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions)).Should(gomega.BeEquivalentTo(1))
g.Expect(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Key).Should(gomega.BeEquivalentTo("node-name"))
g.Expect(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Operator).Should(gomega.BeEquivalentTo(corev1.NodeSelectorOpIn))
g.Expect(len(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Values)).Should(gomega.BeEquivalentTo(1))
g.Expect(sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Values[0]).Should(gomega.BeEquivalentTo("node-b"))
}
func TestStsDupSubset(t *testing.T) {
g, requests, stopMgr, mgrStopped := setUp(t)
defer func() {
clean(g, c)
close(stopMgr)
mgrStopped.Wait()
}()
instance := &appsv1alpha1.UnitedDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
},
Spec: appsv1alpha1.UnitedDeploymentSpec{
Replicas: &one,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"name": "foo",
},
},
Template: appsv1alpha1.SubsetTemplate{
StatefulSetTemplate: &appsv1alpha1.StatefulSetTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"name": "foo",
},
},
Spec: appsv1.StatefulSetSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"name": "foo",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "container-a",
Image: "nginx:1.0",
},
},
},
},
},
},
},
Topology: appsv1alpha1.Topology{
Subsets: []appsv1alpha1.Subset{
{
Name: "subset-a",
NodeSelector: corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "node-name",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"node-a"},
},
},
},
},
},
},
},
},
RevisionHistoryLimit: &ten,
},
}
// Create the UnitedDeployment object and expect the Reconcile and Deployment to be created
err := c.Create(context.TODO(), instance)
// The instance object may not be a valid object because it might be missing some required fields.
// Please modify the instance object by adding required fields and then remove the following if statement.
if apierrors.IsInvalid(err) {
t.Logf("failed to create object, got an invalid object error: %v", err)
return
}
g.Expect(err).NotTo(gomega.HaveOccurred())
defer c.Delete(context.TODO(), instance)
waitReconcilerProcessFinished(g, requests, 3)
stsList := expectedStsCount(g, 1)
subsetA := stsList.Items[0]
dupSts := subsetA.DeepCopy()
dupSts.Name = "dup-subset-a"
dupSts.ResourceVersion = ""
g.Expect(c.Create(context.TODO(), dupSts)).Should(gomega.BeNil())
waitReconcilerProcessFinished(g, requests, 3)
expectedStsCount(g, 1)
}
func waitReconcilerProcessFinished(g *gomega.GomegaWithT, requests chan reconcile.Request, minCount int) {
timeout := time.After(timeout)
for {
minCount--
select {
case <-requests:
continue
case <-timeout:
if minCount <= 0 {
return
}
}
}
}
func getSubsetByName(stsList *appsv1.StatefulSetList, name string) *appsv1.StatefulSet {
for _, sts := range stsList.Items {
if sts.Labels[appsv1alpha1.SubSetNameLabelKey] == name {
return &sts
}
}
return nil
}
func expectedStsCount(g *gomega.GomegaWithT, count int) *appsv1.StatefulSetList {
stsList := &appsv1.StatefulSetList{}
g.Eventually(func() error {
if err := c.List(context.TODO(), &client.ListOptions{}, stsList); err != nil {
return err
}
if len(stsList.Items) != count {
return fmt.Errorf("expected %d sts, got %d", count, len(stsList.Items))
}
return nil
}, timeout).Should(gomega.Succeed())
return stsList
}
func setUp(t *testing.T) (*gomega.GomegaWithT, chan reconcile.Request, chan struct{}, *sync.WaitGroup) {
g := gomega.NewGomegaWithT(t)
// Setup the Manager and Controller. Wrap the Controller Reconcile function so it writes each request to a
// channel when it is finished.
mgr, err := manager.New(cfg, manager.Options{})
g.Expect(err).NotTo(gomega.HaveOccurred())
c = mgr.GetClient()
recFn, requests := SetupTestReconcile(newReconciler(mgr))
g.Expect(add(mgr, recFn)).NotTo(gomega.HaveOccurred())
stopMgr, mgrStopped := StartTestManager(mgr, g)
subsetReplicasFn = func(subset *Subset) int32 {
return subset.Spec.Replicas
}
return g, requests, stopMgr, mgrStopped
}
func clean(g *gomega.GomegaWithT, c client.Client) {
udList := &appsv1alpha1.UnitedDeploymentList{}
if err := c.List(context.TODO(), &client.ListOptions{}, udList); err == nil {
for _, ud := range udList.Items {
c.Delete(context.TODO(), &ud)
}
}
g.Eventually(func() error {
if err := c.List(context.TODO(), &client.ListOptions{}, udList); err != nil {
return err
}
if len(udList.Items) != 0 {
return fmt.Errorf("expected %d sts, got %d", 0, len(udList.Items))
}
return nil
}, timeout, time.Second).Should(gomega.Succeed())
rList := &appsv1.ControllerRevisionList{}
if err := c.List(context.TODO(), &client.ListOptions{}, rList); err == nil {
for _, ud := range rList.Items {
c.Delete(context.TODO(), &ud)
}
}
g.Eventually(func() error {
if err := c.List(context.TODO(), &client.ListOptions{}, rList); err != nil {
return err
}
if len(rList.Items) != 0 {
return fmt.Errorf("expected %d sts, got %d", 0, len(rList.Items))
}
return nil
}, timeout, time.Second).Should(gomega.Succeed())
stsList := &appsv1.StatefulSetList{}
if err := c.List(context.TODO(), &client.ListOptions{}, stsList); err == nil {
for _, sts := range stsList.Items {
c.Delete(context.TODO(), &sts)
}
}
g.Eventually(func() error {
if err := c.List(context.TODO(), &client.ListOptions{}, stsList); err != nil {
return err
}
if len(stsList.Items) != 0 {
return fmt.Errorf("expected %d sts, got %d", 0, len(stsList.Items))
}
return nil
}, timeout, time.Second).Should(gomega.Succeed())
podList := &corev1.PodList{}
if err := c.List(context.TODO(), &client.ListOptions{}, podList); err == nil {
for _, pod := range podList.Items {
c.Delete(context.TODO(), &pod)
}
}
g.Eventually(func() error {
if err := c.List(context.TODO(), &client.ListOptions{}, podList); err != nil {
return err
}
if len(podList.Items) != 0 {
return fmt.Errorf("expected %d sts, got %d", 0, len(podList.Items))
}
return nil
}, timeout, time.Second).Should(gomega.Succeed())
}

View File

@ -24,12 +24,13 @@ import (
"testing"
"github.com/onsi/gomega"
"github.com/openkruise/kruise/pkg/apis"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"github.com/openkruise/kruise/pkg/apis"
)
var cfg *rest.Config

View File

@ -18,7 +18,6 @@ package uniteddeployment
import (
"testing"
"time"
"github.com/onsi/gomega"
"golang.org/x/net/context"
@ -26,21 +25,11 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
appsv1alpha1 "github.com/openkruise/kruise/pkg/apis/apps/v1alpha1"
)
var c client.Client
var expectedRequest = reconcile.Request{NamespacedName: types.NamespacedName{Name: "foo", Namespace: "default"}}
var depKey = types.NamespacedName{Name: "foo-deployment", Namespace: "default"}
const timeout = time.Second * 5
func TestReconcile(t *testing.T) {
g := gomega.NewGomegaWithT(t)
instance := &appsv1alpha1.UnitedDeployment{

View File

@ -0,0 +1,114 @@
/*
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package uniteddeployment
import (
"fmt"
"math"
"strconv"
"strings"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
appsv1alpha1 "github.com/openkruise/kruise/pkg/apis/apps/v1alpha1"
)
const updateRetries = 5
// ParseSubsetReplicas parses the subsetReplicas, and returns the replicas number depending on the sum replicas.
func ParseSubsetReplicas(udReplicas int32, subsetReplicas intstr.IntOrString) (int32, error) {
if subsetReplicas.Type == intstr.Int {
if subsetReplicas.IntVal < 0 {
return 0, fmt.Errorf("subset replicas (%d) should not be less than 0", subsetReplicas.IntVal)
}
return subsetReplicas.IntVal, nil
}
strVal := subsetReplicas.StrVal
if !strings.HasSuffix(strVal, "%") {
return 0, fmt.Errorf("subset replicas (%s) only support integer value or percentage value with a suffix '%%'", strVal)
}
intPart := strVal[:len(strVal)-1]
percent64, err := strconv.ParseInt(intPart, 10, 32)
if err != nil {
return 0, fmt.Errorf("subset replicas (%s) should be correct percentage integer: %s", strVal, err)
}
if percent64 > int64(100) || percent64 < int64(0) {
return 0, fmt.Errorf("subset replicas (%s) should be in range [0, 100]", strVal)
}
return int32(round(float64(udReplicas) * float64(percent64) / 100)), nil
}
func round(x float64) int {
return int(math.Floor(x + 0.5))
}
func getSubsetPrefix(controllerName, subsetName string) string {
prefix := fmt.Sprintf("%s-%s-", controllerName, subsetName)
if len(validation.NameIsDNSSubdomain(prefix, true)) != 0 {
prefix = fmt.Sprintf("%s-", controllerName)
}
return prefix
}
func attachNodeAffinity(podSpec *corev1.PodSpec, subsetConfig *appsv1alpha1.Subset) {
if podSpec.Affinity == nil {
podSpec.Affinity = &corev1.Affinity{}
}
if podSpec.Affinity.NodeAffinity == nil {
podSpec.Affinity.NodeAffinity = &corev1.NodeAffinity{}
}
if podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = &corev1.NodeSelector{}
}
if podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms == nil {
podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = []corev1.NodeSelectorTerm{}
}
for _, term := range subsetConfig.NodeSelector.NodeSelectorTerms {
podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = append(podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, term)
}
}
func getSubsetNameFrom(metaObj metav1.Object) (string, error) {
name, exist := metaObj.GetLabels()[appsv1alpha1.SubSetNameLabelKey]
if !exist {
return "", fmt.Errorf("fail to get subSet name from label of subset %s/%s: no label %s found", metaObj.GetNamespace(), metaObj.GetName(), appsv1alpha1.SubSetNameLabelKey)
}
if len(name) == 0 {
return "", fmt.Errorf("fail to get subSet name from label of subset %s/%s: label %s has an empty value", metaObj.GetNamespace(), metaObj.GetName(), appsv1alpha1.SubSetNameLabelKey)
}
return name, nil
}
func getRevision(objMeta metav1.Object) string {
if objMeta.GetLabels() == nil {
return ""
}
return objMeta.GetLabels()[appsv1alpha1.ControllerRevisionHashLabelKey]
}

View File

@ -0,0 +1,141 @@
/*
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package uniteddeployment
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog"
appsv1alpha1 "github.com/openkruise/kruise/pkg/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/util"
)
func (r *ReconcileUnitedDeployment) manageSubsetProvision(ud *appsv1alpha1.UnitedDeployment, nameToSubset *map[string]*Subset, nextReplicas, nextPartitions *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, subsetType subSetType) (sets.String, error) {
expectedSubsets := sets.String{}
gotSubsets := sets.String{}
for _, subset := range ud.Spec.Topology.Subsets {
expectedSubsets.Insert(subset.Name)
}
for subsetName := range *nameToSubset {
gotSubsets.Insert(subsetName)
}
klog.V(4).Infof("UnitedDeployment %s/%s has subsets %v, expects subsets %v", ud.Namespace, ud.Name, gotSubsets.List(), expectedSubsets.List())
var creates []string
for _, expectSubset := range expectedSubsets.List() {
if gotSubsets.Has(expectSubset) {
continue
}
creates = append(creates, expectSubset)
}
var deletes []string
for _, gotSubset := range gotSubsets.List() {
if expectedSubsets.Has(gotSubset) {
continue
}
deletes = append(deletes, gotSubset)
}
var errs []error
// manage creating
if len(creates) > 0 {
// do not consider deletion
klog.V(0).Infof("UnitedDeployment %s/%s needs creating subset (%s) with name: %v", ud.Namespace, ud.Name, subsetType, creates)
createdSubsets := make([]string, len(creates))
for i, subset := range creates {
createdSubsets[i] = subset
}
revision := currentRevision.Name
if updatedRevision != nil {
revision = updatedRevision.Name
}
var createdNum int
var createdErr error
createdNum, createdErr = util.SlowStartBatch(len(creates), slowStartInitialBatchSize, func(idx int) error {
subsetName := createdSubsets[idx]
replicas := (*nextReplicas)[subsetName]
partition := (*nextPartitions)[subsetName]
err := r.subSetControls[subsetType].CreateSubset(ud, subsetName, revision, replicas, partition)
if err != nil {
if !errors.IsTimeout(err) {
return fmt.Errorf("fail to create Subset (%s) %s: %s", subsetType, subsetName, err.Error())
}
}
return nil
})
if createdErr == nil {
r.recorder.Eventf(ud.DeepCopy(), corev1.EventTypeNormal, fmt.Sprintf("Successful%s", eventTypeSubsetsUpdate), "Create %d Subset (%s)", createdNum, subsetType)
} else {
errs = append(errs, createdErr)
}
}
// manage deleting
if len(deletes) > 0 {
klog.V(0).Infof("UnitedDeployment %s/%s needs deleting subset (%s) with name: [%v]", ud.Namespace, ud.Name, subsetType, deletes)
var deleteErrs []error
for _, subsetName := range deletes {
subset := (*nameToSubset)[subsetName]
if err := r.subSetControls[subsetType].DeleteSubset(subset); err != nil {
deleteErrs = append(deleteErrs, fmt.Errorf("fail to delete Subset (%s) %s/%s for %s: %s", subsetType, subset.Namespace, subset.Name, subsetName, err))
}
}
if len(deleteErrs) > 0 {
errs = append(errs, deleteErrs...)
} else {
r.recorder.Eventf(ud.DeepCopy(), corev1.EventTypeNormal, fmt.Sprintf("Successful%s", eventTypeSubsetsUpdate), "Delete %d Subset (%s)", len(deletes), subsetType)
}
}
// clean the other kind of subsets
for t, control := range r.subSetControls {
if t == subsetType {
continue
}
subsets, err := control.GetAllSubsets(ud)
if err != nil {
errs = append(errs, fmt.Errorf("fail to list Subset of other type %s for UnitedDeployment %s/%s: %s", t, ud.Namespace, ud.Name, err))
continue
}
for _, subset := range subsets {
if err := control.DeleteSubset(subset); err != nil {
errs = append(errs, fmt.Errorf("fail to delete Subset %s of other type %s for UnitedDeployment %s/%s: %s", subset.Name, t, ud.Namespace, ud.Name, err))
continue
}
}
}
return expectedSubsets.Intersection(gotSubsets), utilerrors.NewAggregate(errs)
}

View File

@ -1,3 +1,20 @@
/*
Copyright 2019 The Kruise Authors.
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package refmanager
import (

View File

@ -1,3 +1,19 @@
/*
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package refmanager
import (

63
pkg/util/tools.go Normal file
View File

@ -0,0 +1,63 @@
/*
Copyright 2019 The Kruise Authors.
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"sync"
"k8s.io/client-go/util/integer"
)
// SlowStartBatch tries to call the provided function a total of 'count' times,
// starting slow to check for errors, then speeding up if calls succeed.
//
// It groups the calls into batches, starting with a group of initialBatchSize.
// Within each batch, it may call the function multiple times concurrently with its index.
//
// If a whole batch succeeds, the next batch may get exponentially larger.
// If there are any failures in a batch, all remaining batches are skipped
// after waiting for the current batch to complete.
//
// It returns the number of successful calls to the function.
func SlowStartBatch(count int, initialBatchSize int, fn func(index int) error) (int, error) {
remaining := count
successes := 0
index := 0
for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
errCh := make(chan error, batchSize)
var wg sync.WaitGroup
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
go func(idx int) {
defer wg.Done()
if err := fn(idx); err != nil {
errCh <- err
}
}(index)
index++
}
wg.Wait()
curSuccesses := batchSize - len(errCh)
successes += curSuccesses
if len(errCh) > 0 {
return successes, <-errCh
}
remaining -= batchSize
}
return successes, nil
}

94
pkg/util/tools_test.go Normal file
View File

@ -0,0 +1,94 @@
/*
Copyright 2019 The Kruise Authors.
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"sync"
"testing"
)
func TestSlowStartBatch(t *testing.T) {
fakeErr := fmt.Errorf("fake error")
callCnt := 0
callLimit := 0
var lock sync.Mutex
fn := func(idx int) error {
lock.Lock()
defer lock.Unlock()
callCnt++
if callCnt > callLimit {
return fakeErr
}
return nil
}
tests := []struct {
name string
count int
callLimit int
fn func(int) error
expectedSuccesses int
expectedErr error
expectedCallCnt int
}{
{
name: "callLimit = 0 (all fail)",
count: 10,
callLimit: 0,
fn: fn,
expectedSuccesses: 0,
expectedErr: fakeErr,
expectedCallCnt: 1, // 1(first batch): function will be called at least once
},
{
name: "callLimit = count (all succeed)",
count: 10,
callLimit: 10,
fn: fn,
expectedSuccesses: 10,
expectedErr: nil,
expectedCallCnt: 10, // 1(first batch) + 2(2nd batch) + 4(3rd batch) + 3(4th batch) = 10
},
{
name: "callLimit < count (some succeed)",
count: 10,
callLimit: 5,
fn: fn,
expectedSuccesses: 5,
expectedErr: fakeErr,
expectedCallCnt: 7, // 1(first batch) + 2(2nd batch) + 4(3rd batch) = 7
},
}
for _, test := range tests {
callCnt = 0
callLimit = test.callLimit
successes, err := SlowStartBatch(test.count, 1, test.fn)
if successes != test.expectedSuccesses {
t.Errorf("%s: unexpected processed batch size, expected %d, got %d", test.name, test.expectedSuccesses, successes)
}
if err != test.expectedErr {
t.Errorf("%s: unexpected processed batch size, expected %v, got %v", test.name, test.expectedErr, err)
}
// verify that slowStartBatch stops trying more calls after a batch fails
if callCnt != test.expectedCallCnt {
t.Errorf("%s: slowStartBatch() still tries calls after a batch fails, expected %d calls, got %d", test.name, test.expectedCallCnt, callCnt)
}
}
}

View File

@ -1,10 +1,24 @@
/*
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package validating
import (
"fmt"
"math"
"regexp"
"strconv"
"strings"
appsv1 "k8s.io/api/apps/v1"
@ -14,7 +28,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
unversionedvalidation "k8s.io/apimachinery/pkg/apis/meta/v1/validation"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field"
appsvalidation "k8s.io/kubernetes/pkg/apis/apps/validation"
@ -23,6 +36,7 @@ import (
apivalidation "k8s.io/kubernetes/pkg/apis/core/validation"
appsv1alpha1 "github.com/openkruise/kruise/pkg/apis/apps/v1alpha1"
udctrl "github.com/openkruise/kruise/pkg/controller/uniteddeployment"
)
var inPlaceUpdateTemplateSpecPatchRexp = regexp.MustCompile("/containers/([0-9]+)/image")
@ -78,7 +92,7 @@ func validateUnitedDeploymentSpec(spec *appsv1alpha1.UnitedDeploymentSpec, fldPa
continue
}
replicas, err := parseSubsetReplicas(expectedReplicas, *subset.Replicas)
replicas, err := udctrl.ParseSubsetReplicas(expectedReplicas, *subset.Replicas)
if err != nil {
allErrs = append(allErrs, field.Invalid(fldPath.Child("topology", "subset", "replicas"), subset.Replicas, fmt.Sprintf("invalid replicas %s", subset.Replicas.String())))
} else {
@ -96,9 +110,11 @@ func validateUnitedDeploymentSpec(spec *appsv1alpha1.UnitedDeploymentSpec, fldPa
allErrs = append(allErrs, field.Invalid(fldPath.Child("topology", "subset"), sumReplicas, fmt.Sprintf("if replicas of all subsets are provided, the sum of indicated subset replicas %d should equal UnitedDeployment replicas %d", sumReplicas, expectedReplicas)))
}
for subset := range spec.Strategy.Partitions {
if !subSetNames.Has(subset) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("strategy", "partitions"), spec.Strategy.Partitions, fmt.Sprintf("subset %s does not exist", subset)))
if spec.UpdateStrategy.ManualUpdate != nil {
for subset := range spec.UpdateStrategy.ManualUpdate.Partitions {
if !subSetNames.Has(subset) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("updateStrategy", "partitions"), spec.UpdateStrategy.ManualUpdate.Partitions, fmt.Sprintf("subset %s does not exist", subset)))
}
}
}
@ -195,33 +211,3 @@ func validateStatefulSetUpdate(statefulSet, oldStatefulSet *appsv1alpha1.Statefu
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(*statefulSet.Spec.Replicas), fldPath.Child("spec", "replicas"))...)
return allErrs
}
func parseSubsetReplicas(replicas int32, subsetReplicas intstr.IntOrString) (int32, error) {
if subsetReplicas.Type == intstr.Int {
if subsetReplicas.IntVal < 0 {
return 0, fmt.Errorf("unitReplicas (%d) should not be less than 0", subsetReplicas.IntVal)
}
return subsetReplicas.IntVal, nil
}
strVal := subsetReplicas.StrVal
if !strings.HasSuffix(strVal, "%") {
return 0, fmt.Errorf("unitReplicas (%s) only support int value or percentage value with a suffix '%%'", strVal)
}
intPart := strVal[:len(strVal)-1]
percent64, err := strconv.ParseInt(intPart, 10, 32)
if err != nil {
return 0, fmt.Errorf("unitReplicas (%s) should be correct percentage value", strVal)
}
if percent64 > int64(100) || percent64 < int64(0) {
return 0, fmt.Errorf("unitReplicas (%s) should be in range (0, 100]", strVal)
}
return int32(round(float64(replicas) * float64(percent64) / 100)), nil
}
func round(x float64) int {
return int(math.Floor(x + 0.5))
}

View File

@ -187,7 +187,6 @@ func TestValidateUnitedDeployment(t *testing.T) {
})
}
var val1 int32 = 1
errorCases := map[string]appsv1alpha1.UnitedDeployment{
"no pod template label": {
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
@ -338,9 +337,11 @@ func TestValidateUnitedDeployment(t *testing.T) {
},
},
},
Strategy: appsv1alpha1.UnitedDeploymentUpdateStrategy{
Partitions: map[string]*int32{
"notExist": &val1,
UpdateStrategy: appsv1alpha1.UnitedDeploymentUpdateStrategy{
ManualUpdate: &appsv1alpha1.ManualUpdate{
Partitions: map[string]int32{
"notExist": 1,
},
},
},
Topology: appsv1alpha1.Topology{
@ -373,7 +374,7 @@ func TestValidateUnitedDeployment(t *testing.T) {
field != "spec.selector" &&
field != "spec.topology.subset" &&
field != "spec.topology.subset.name" &&
field != "spec.strategy.partitions" {
field != "spec.updateStrategy.partitions" {
t.Errorf("%s: missing prefix for: %v", k, errs[i])
}
}