podUnavailableBudget webhook validate pod (#650)

Signed-off-by: liheng.zms <liheng.zms@alibaba-inc.com>
This commit is contained in:
berg 2021-07-08 15:42:53 +08:00 committed by GitHub
parent dee6b813f4
commit dc553e62b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 993 additions and 20 deletions

View File

@ -0,0 +1,43 @@
/*
Copyright 2021 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pubcontrol
import (
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
corev1 "k8s.io/api/core/v1"
)
type PubControl interface {
// Common
// get PodUnavailableBudget
GetPodUnavailableBudget() *policyv1alpha1.PodUnavailableBudget
// IsPodReady indicates whether pod is fully ready
// 1. pod.Status.Phase == v1.PodRunning
// 2. pod.condition PodReady == true
IsPodReady(pod *corev1.Pod) bool
// IsPodStateConsistent indicates whether pod.spec and pod.status are consistent after updating containers
IsPodStateConsistent(pod *corev1.Pod) bool
// webhook
// determine if this change to pod might cause unavailability
IsPodUnavailableChanged(oldPod, newPod *corev1.Pod) bool
}
func NewPubControl(pub *policyv1alpha1.PodUnavailableBudget) PubControl {
return &commonControl{PodUnavailableBudget: pub}
}

View File

@ -0,0 +1,112 @@
/*
Copyright 2021 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pubcontrol
import (
"reflect"
"strings"
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
"github.com/openkruise/kruise/pkg/control/sidecarcontrol"
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog"
)
type commonControl struct {
*policyv1alpha1.PodUnavailableBudget
}
func (c *commonControl) GetPodUnavailableBudget() *policyv1alpha1.PodUnavailableBudget {
return c.PodUnavailableBudget
}
func (c *commonControl) IsPodReady(pod *corev1.Pod) bool {
// 1. pod.Status.Phase == v1.PodRunning
// 2. pod.condition PodReady == true
return util.IsRunningAndReady(pod)
}
func (c *commonControl) IsPodUnavailableChanged(oldPod, newPod *corev1.Pod) bool {
// If pod.spec changed, pod will be in unavailable condition
if !reflect.DeepEqual(oldPod.Spec, newPod.Spec) {
klog.V(3).Infof("pod(%s.%s) specification changed, and maybe cause unavailability", newPod.Namespace, newPod.Name)
return true
}
// pod other changes will not cause unavailability situation, then return false
return false
}
func (c *commonControl) IsPodStateConsistent(pod *corev1.Pod) bool {
// if all container image is digest format
// by comparing status.containers[x].ImageID with spec.container[x].Image can determine whether pod is consistent
allDigestImage := true
for _, container := range pod.Spec.Containers {
//whether image is digest format,
//for example: docker.io/busybox@sha256:a9286defaba7b3a519d585ba0e37d0b2cbee74ebfe590960b0b1d6a5e97d1e1d
if !util.IsImageDigest(container.Image) {
allDigestImage = false
continue
}
if !util.IsPodContainerDigestEqual(sets.NewString(container.Name), pod) {
klog.V(5).Infof("pod(%s.%s) container(%s) image is inconsistent", pod.Namespace, pod.Name, container.Name)
return false
}
}
// If all spec.container[x].image is digest format, only check digest imageId
if allDigestImage {
return true
}
// check whether injected sidecar container is consistent
sidecarSets, sidecars := getSidecarSetsInPod(pod)
if sidecarSets.Len() > 0 && sidecars.Len() > 0 {
if !sidecarcontrol.IsSidecarContainerUpdateCompleted(pod, sidecarSets, sidecars) {
return false
}
}
// whether other containers is consistent
if err := inplaceupdate.DefaultCheckInPlaceUpdateCompleted(pod); err != nil {
klog.V(5).Infof("check pod(%s.%s) InPlaceUpdate failed: %s", pod.Namespace, pod.Name, err.Error())
return false
}
return true
}
func getSidecarSetsInPod(pod *corev1.Pod) (sidecarSets, containers sets.String) {
containers = sets.NewString()
sidecarSets = sets.NewString()
if setList, ok := pod.Annotations[sidecarcontrol.SidecarSetListAnnotation]; ok && len(setList) > 0 {
for _, sidecarSetName := range strings.Split(setList, ",") {
sidecarSets.Insert(sidecarSetName)
}
}
for _, container := range pod.Spec.Containers {
val := util.GetContainerEnvValue(&container, sidecarcontrol.SidecarEnvKey)
if val == "true" {
containers.Insert(container.Name)
}
}
return sidecarSets, containers
}

View File

@ -0,0 +1,106 @@
/*
Copyright 2021 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pubcontrol
import (
"context"
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog"
"sigs.k8s.io/controller-runtime/pkg/client"
)
func GetPodUnavailableBudgetForPod(kClient client.Client, finders *controllerfinder.ControllerFinder, pod *corev1.Pod) (*policyv1alpha1.PodUnavailableBudget, error) {
var err error
if len(pod.Labels) == 0 {
return nil, nil
}
pubList := &policyv1alpha1.PodUnavailableBudgetList{}
if err = kClient.List(context.TODO(), pubList, &client.ListOptions{Namespace: pod.Namespace}); err != nil {
return nil, err
}
var matchedPubs []policyv1alpha1.PodUnavailableBudget
for _, pub := range pubList.Items {
// if targetReference isn't nil, priority to take effect
if pub.Spec.TargetReference != nil {
targetRef := pub.Spec.TargetReference
// check whether APIVersion, Kind, Name is equal
ref := metav1.GetControllerOf(pod)
if ref == nil {
continue
}
// recursive fetch pod reference, e.g. ref.Kind=Replicas, return podRef.Kind=Deployment
podRef, err := finders.GetScaleAndSelectorForRef(ref.APIVersion, ref.Kind, pod.Namespace, ref.Name, ref.UID)
if err != nil {
return nil, err
}
pubRef, err := finders.GetScaleAndSelectorForRef(targetRef.APIVersion, targetRef.Kind, pub.Namespace, targetRef.Name, "")
if err != nil {
return nil, err
}
// belongs the same workload
if isReferenceEqual(podRef, pubRef) {
matchedPubs = append(matchedPubs, pub)
}
} else {
// This error is irreversible, so continue
labelSelector, err := util.GetFastLabelSelector(pub.Spec.Selector)
if err != nil {
continue
}
// If a PUB with a nil or empty selector creeps in, it should match nothing, not everything.
if labelSelector.Empty() || !labelSelector.Matches(labels.Set(pod.Labels)) {
continue
}
matchedPubs = append(matchedPubs, pub)
}
}
if len(matchedPubs) == 0 {
klog.V(6).Infof("could not find PodUnavailableBudget for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
return nil, nil
}
if len(matchedPubs) > 1 {
klog.Warningf("Pod %q/%q matches multiple PodUnavailableBudgets. Choose %q arbitrarily.", pod.Namespace, pod.Name, matchedPubs[0].Name)
}
return &matchedPubs[0], nil
}
// check APIVersion, Kind, Name
func isReferenceEqual(ref1, ref2 *controllerfinder.ScaleAndSelector) bool {
gv1, err := schema.ParseGroupVersion(ref1.APIVersion)
if err != nil {
return false
}
gv2, err := schema.ParseGroupVersion(ref2.APIVersion)
if err != nil {
return false
}
return gv1.Group == gv2.Group && ref1.Kind == ref2.Kind &&
ref1.Name == ref2.Name && ref1.UID == ref2.UID
}

View File

@ -0,0 +1,154 @@
/*
Copyright 2021 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pubcontrol
import (
"context"
"fmt"
"time"
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"k8s.io/klog"
"sigs.k8s.io/controller-runtime/pkg/client"
)
const (
// MaxUnavailablePodSize is the max size of PUB.DisruptedPods + PUB.UnavailablePods.
MaxUnavailablePodSize = 2000
)
var ConflictRetry = wait.Backoff{
Steps: 4,
Duration: 500 * time.Millisecond,
Factor: 1.0,
Jitter: 0.1,
}
type Operation string
const (
UpdateOperation = "UPDATE"
DeleteOperation = "DELETE"
)
func PodUnavailableBudgetValidatePod(client client.Client, pod *corev1.Pod, control PubControl,
operation Operation, dryRun bool) (allowed bool, reason string, err error) {
// If the pod is not ready, it doesn't count towards healthy and we should not decrement
if !control.IsPodReady(pod) {
klog.V(3).Infof("pod(%s.%s) is not ready, then don't need check pub", pod.Namespace, pod.Name)
return true, "", nil
}
pub := control.GetPodUnavailableBudget()
// pod is in pub.Status.DisruptedPods or pub.Status.UnavailablePods, then don't need check it
if isPodRecordedInPub(pod.Name, pub) {
klog.V(5).Infof("pod(%s.%s) already is recorded in pub(%s.%s)", pod.Namespace, pod.Name, pub.Namespace, pub.Name)
return true, "", nil
}
pubClone := pub.DeepCopy()
refresh := false
err = retry.RetryOnConflict(ConflictRetry, func() error {
if refresh {
key := types.NamespacedName{
Name: pubClone.Name,
Namespace: pubClone.Namespace,
}
if err := client.Get(context.TODO(), key, pubClone); err != nil {
klog.Errorf("Get PodUnavailableBudget(%s) failed: %s", key.String(), err.Error())
return err
}
}
// Try to verify-and-decrement
// If it was false already, or if it becomes false during the course of our retries,
err := checkAndDecrement(pod.Name, pubClone, operation)
if err != nil {
return err
}
// If this is a dry-run, we don't need to go any further than that.
if dryRun {
klog.V(5).Infof("pod(%s) operation for pub(%s.%s) is a dry run", pod.Name, pubClone.Namespace, pubClone.Name)
return nil
}
klog.V(3).Infof("pub(%s.%s) update status(disruptedPods:%d, unavailablePods:%d, expectedCount:%d, desiredAvailable:%d, currentAvailable:%d, unavailableAllowed:%d)",
pubClone.Namespace, pubClone.Name, len(pubClone.Status.DisruptedPods), len(pubClone.Status.UnavailablePods),
pubClone.Status.TotalReplicas, pubClone.Status.DesiredAvailable, pubClone.Status.CurrentAvailable, pubClone.Status.UnavailableAllowed)
if err = client.Status().Update(context.TODO(), pubClone); err == nil {
return nil
}
// if conflict, then retry
refresh = true
return err
})
if err != nil && err != wait.ErrWaitTimeout {
klog.Errorf("pod(%s.%s) operation(%s) for pub(%s.%s) failed: %s", pod.Namespace, pod.Name, operation, pub.Namespace, pub.Name, err.Error())
return false, err.Error(), nil
} else if err == wait.ErrWaitTimeout {
err = errors.NewTimeoutError(fmt.Sprintf("couldn't update PodUnavailableBudget %s due to conflicts", pub.Name), 10)
klog.Errorf("pod(%s.%s) operation(%s) failed: %s", pod.Namespace, pod.Name, operation, err.Error())
return false, err.Error(), nil
}
klog.V(3).Infof("admit pod(%s.%s) operation(%s) for pub(%s.%s)", pod.Namespace, pod.Name, operation, pub.Namespace, pub.Name)
return true, "", nil
}
func checkAndDecrement(podName string, pub *policyv1alpha1.PodUnavailableBudget, operation Operation) error {
if pub.Status.UnavailableAllowed <= 0 {
return errors.NewForbidden(policyv1alpha1.Resource("podunavailablebudget"), pub.Name, fmt.Errorf("pub unavailable allowed is negative"))
}
if len(pub.Status.DisruptedPods)+len(pub.Status.UnavailablePods) > MaxUnavailablePodSize {
return errors.NewForbidden(policyv1alpha1.Resource("podunavailablebudget"), pub.Name, fmt.Errorf("DisruptedPods and UnavailablePods map too big - too many unavailable not confirmed by PUB controller"))
}
pub.Status.UnavailableAllowed--
if pub.Status.DisruptedPods == nil {
pub.Status.DisruptedPods = make(map[string]metav1.Time)
}
if pub.Status.UnavailablePods == nil {
pub.Status.UnavailablePods = make(map[string]metav1.Time)
}
if operation == UpdateOperation {
pub.Status.UnavailablePods[podName] = metav1.Time{Time: time.Now()}
klog.V(3).Infof("pod(%s) is recorded in pub(%s.%s) UnavailablePods", podName, pub.Namespace, pub.Name)
} else {
pub.Status.DisruptedPods[podName] = metav1.Time{Time: time.Now()}
klog.V(3).Infof("pod(%s) is recorded in pub(%s.%s) DisruptedPods", podName, pub.Namespace, pub.Name)
}
return nil
}
func isPodRecordedInPub(podName string, pub *policyv1alpha1.PodUnavailableBudget) bool {
if _, ok := pub.Status.UnavailablePods[podName]; ok {
return true
}
if _, ok := pub.Status.DisruptedPods[podName]; ok {
return true
}
return false
}

View File

@ -178,7 +178,7 @@ func (c *commonControl) IsPodStateConsistent(pod *v1.Pod, sidecarContainers sets
}
// check container InpalceUpdate status
return isSidecarContainerUpdateCompleted(pod, sidecarset.Name, sidecarContainers)
return IsSidecarContainerUpdateCompleted(pod, sets.NewString(sidecarset.Name), sidecarContainers)
}
// k8s only allow modify pod.spec.container[x].image,
@ -213,7 +213,7 @@ func (c *commonControl) IsPodAvailabilityChanged(pod, oldPod *v1.Pod) bool {
// isContainerInplaceUpdateCompleted checks whether imageID in container status has been changed since in-place update.
// If the imageID in containerStatuses has not been changed, we assume that kubelet has not updated containers in Pod.
func isSidecarContainerUpdateCompleted(pod *v1.Pod, sidecarSetName string, containers sets.String) bool {
func IsSidecarContainerUpdateCompleted(pod *v1.Pod, sidecarSets, containers sets.String) bool {
//format: sidecarset.name -> appsv1alpha1.InPlaceUpdateState
sidecarUpdateStates := make(map[string]*pub.InPlaceUpdateState)
// when the pod annotation not found, indicates the pod only injected sidecar container, and never inplace update
@ -227,11 +227,16 @@ func isSidecarContainerUpdateCompleted(pod *v1.Pod, sidecarSetName string, conta
return false
}
// when the sidecarset InPlaceUpdateState not found, indicates the pod only injected sidecar container, and never inplace update
// then always think it update complete
inPlaceUpdateState, ok := sidecarUpdateStates[sidecarSetName]
if !ok {
return true
// The container imageId recorded before the in-place sidecar upgrade
// when the container imageId not found, indicates the pod only injected the sidecar container,
// and never in-place update sidecar, then always think it update complete
lastContainerStatus := make(map[string]pub.InPlaceUpdateContainerStatus)
for _, sidecarSetName := range sidecarSets.List() {
if inPlaceUpdateState, ok := sidecarUpdateStates[sidecarSetName]; ok {
for name, status := range inPlaceUpdateState.LastContainerStatuses {
lastContainerStatus[name] = status
}
}
}
containerImages := make(map[string]string, len(pod.Spec.Containers))
@ -245,7 +250,7 @@ func isSidecarContainerUpdateCompleted(pod *v1.Pod, sidecarSetName string, conta
if !containers.Has(cs.Name) {
continue
}
if oldStatus, ok := inPlaceUpdateState.LastContainerStatuses[cs.Name]; ok {
if oldStatus, ok := lastContainerStatus[cs.Name]; ok {
// we assume that users should not update workload template with new image
// which actually has the same imageID as the old image
if oldStatus.ImageID == cs.ImageID && containerImages[cs.Name] != cs.Image {

View File

@ -24,9 +24,20 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
)
var (
// image.Name -> image.Id
ImageIds = map[string]string{
"main:v1": "4120593193b4",
"cold-sidecar:v1": "docker-pullable://cold-sidecar@sha256:9ead06a1362e",
"cold-sidecar:v2": "docker-pullable://cold-sidecar@sha256:7223aa0f3a7a",
"hot-sidecar:v1": "docker-pullable://hot-sidecar@sha256:86618128c92e",
"hot-sidecar:v2": "docker-pullable://hot-sidecar@sha256:74abd85af1e9",
"hotupgrade:empty": "docker-pullable://hotupgrade@sha256:0e9daf5c02e7",
}
podDemo = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{},
@ -35,6 +46,61 @@ var (
Labels: map[string]string{"app": "nginx"},
ResourceVersion: "495711227",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "main",
Image: "main:v1",
},
{
Name: "cold-sidecar",
Image: "cold-sidecar:v1",
},
{
Name: "hot-sidecar-1",
Image: "hot-sidecar:v1",
},
{
Name: "hot-sidecar-2",
Image: "hotupgrade:empty",
},
},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
},
},
ContainerStatuses: []corev1.ContainerStatus{
{
Name: "main",
Image: "main:v1",
ImageID: ImageIds["main:v1"],
Ready: true,
},
{
Name: "cold-sidecar",
Image: "cold-sidecar:v1",
ImageID: ImageIds["cold-sidecar:v1"],
Ready: true,
},
{
Name: "hot-sidecar-1",
Image: "hot-sidecar:v1",
ImageID: ImageIds["hot-sidecar:v1"],
Ready: true,
},
{
Name: "hot-sidecar-2",
Image: "hotupgrade:empty",
ImageID: ImageIds["hotupgrade:empty"],
Ready: true,
},
},
},
}
sidecarSetDemo = &appsv1alpha1.SidecarSet{
@ -47,9 +113,155 @@ var (
Name: "test-sidecarset",
Labels: map[string]string{},
},
Spec: appsv1alpha1.SidecarSetSpec{
Containers: []appsv1alpha1.SidecarContainer{
{
Container: corev1.Container{
Name: "cold-sidecar",
Image: "cold-image:v1",
},
UpgradeStrategy: appsv1alpha1.SidecarContainerUpgradeStrategy{
UpgradeType: appsv1alpha1.SidecarContainerColdUpgrade,
},
},
{
Container: corev1.Container{
Name: "hot-sidecar",
Image: "hot-image:v1",
},
UpgradeStrategy: appsv1alpha1.SidecarContainerUpgradeStrategy{
UpgradeType: appsv1alpha1.SidecarContainerHotUpgrade,
HotUpgradeEmptyImage: "hotupgrade:empty",
},
},
},
},
}
)
func TestIsSidecarContainerUpdateCompleted(t *testing.T) {
cases := []struct {
name string
getPod func() *corev1.Pod
upgradeSidecars func() (sets.String, sets.String)
expectedCompleted bool
}{
{
name: "only inject sidecar, not upgrade",
getPod: func() *corev1.Pod {
return podDemo.DeepCopy()
},
upgradeSidecars: func() (sets.String, sets.String) {
return sets.NewString(sidecarSetDemo.Name), sets.NewString("cold-sidecar", "hot-sidecar-1", "hot-sidecar-2")
},
expectedCompleted: true,
},
{
name: "upgrade cold sidecar, upgrade not completed",
getPod: func() *corev1.Pod {
pod := podDemo.DeepCopy()
control := New(sidecarSetDemo.DeepCopy())
pod.Spec.Containers[1].Image = "cold-sidecar:v2"
control.UpdatePodAnnotationsInUpgrade([]string{"cold-sidecar"}, pod)
return pod
},
upgradeSidecars: func() (sets.String, sets.String) {
return sets.NewString(sidecarSetDemo.Name), sets.NewString("cold-sidecar", "hot-sidecar-1", "hot-sidecar-2")
},
expectedCompleted: false,
},
{
name: "upgrade cold sidecar, upgrade completed",
getPod: func() *corev1.Pod {
pod := podDemo.DeepCopy()
control := New(sidecarSetDemo.DeepCopy())
pod.Spec.Containers[1].Image = "cold-sidecar:v2"
control.UpdatePodAnnotationsInUpgrade([]string{"cold-sidecar"}, pod)
pod.Status.ContainerStatuses[1].ImageID = ImageIds["cold-sidecar:v2"]
return pod
},
upgradeSidecars: func() (sets.String, sets.String) {
return sets.NewString(sidecarSetDemo.Name), sets.NewString("cold-sidecar", "hot-sidecar-1", "hot-sidecar-2")
},
expectedCompleted: true,
},
{
name: "upgrade hot sidecar, upgrade hot-sidecar-2 not completed",
getPod: func() *corev1.Pod {
pod := podDemo.DeepCopy()
control := New(sidecarSetDemo.DeepCopy())
// upgrade cold sidecar completed
pod.Spec.Containers[1].Image = "cold-sidecar:v2"
control.UpdatePodAnnotationsInUpgrade([]string{"cold-sidecar"}, pod)
pod.Status.ContainerStatuses[1].ImageID = ImageIds["cold-sidecar:v2"]
// start upgrading hot sidecar
pod.Spec.Containers[3].Image = "hot-sidecar:v2"
control.UpdatePodAnnotationsInUpgrade([]string{"hot-sidecar-2"}, pod)
return pod
},
upgradeSidecars: func() (sets.String, sets.String) {
return sets.NewString(sidecarSetDemo.Name), sets.NewString("cold-sidecar", "hot-sidecar-1", "hot-sidecar-2")
},
expectedCompleted: false,
},
{
name: "upgrade hot sidecar, upgrade hot-sidecar-1 not completed",
getPod: func() *corev1.Pod {
pod := podDemo.DeepCopy()
control := New(sidecarSetDemo.DeepCopy())
// upgrade cold sidecar completed
pod.Spec.Containers[1].Image = "cold-sidecar:v2"
control.UpdatePodAnnotationsInUpgrade([]string{"cold-sidecar"}, pod)
pod.Status.ContainerStatuses[1].ImageID = ImageIds["cold-sidecar:v2"]
// start upgrading hot sidecar
pod.Spec.Containers[3].Image = "hot-sidecar:v2"
control.UpdatePodAnnotationsInUpgrade([]string{"hot-sidecar-2"}, pod)
pod.Status.ContainerStatuses[3].ImageID = ImageIds["hot-sidecar:v2"]
pod.Spec.Containers[2].Image = "hotupgrade:empty"
control.UpdatePodAnnotationsInUpgrade([]string{"hot-sidecar-1"}, pod)
return pod
},
upgradeSidecars: func() (sets.String, sets.String) {
return sets.NewString(sidecarSetDemo.Name), sets.NewString("cold-sidecar", "hot-sidecar-1", "hot-sidecar-2")
},
expectedCompleted: false,
},
{
name: "upgrade hot sidecar, upgrade hot-sidecar completed",
getPod: func() *corev1.Pod {
pod := podDemo.DeepCopy()
control := New(sidecarSetDemo.DeepCopy())
// upgrade cold sidecar completed
pod.Spec.Containers[1].Image = "cold-sidecar:v2"
control.UpdatePodAnnotationsInUpgrade([]string{"cold-sidecar"}, pod)
pod.Status.ContainerStatuses[1].ImageID = ImageIds["cold-sidecar:v2"]
// start upgrading hot sidecar
pod.Spec.Containers[3].Image = "hot-sidecar:v2"
control.UpdatePodAnnotationsInUpgrade([]string{"hot-sidecar-2"}, pod)
pod.Status.ContainerStatuses[3].ImageID = ImageIds["hot-sidecar:v2"]
pod.Spec.Containers[2].Image = "hotupgrade:empty"
control.UpdatePodAnnotationsInUpgrade([]string{"hot-sidecar-1"}, pod)
pod.Status.ContainerStatuses[2].ImageID = ImageIds["hotupgrade:empty"]
return pod
},
upgradeSidecars: func() (sets.String, sets.String) {
return sets.NewString(sidecarSetDemo.Name), sets.NewString("cold-sidecar", "hot-sidecar-1", "hot-sidecar-2")
},
expectedCompleted: true,
},
}
for _, cs := range cases {
t.Run(cs.name, func(t *testing.T) {
pod := cs.getPod()
sidecarSets, containers := cs.upgradeSidecars()
if IsSidecarContainerUpdateCompleted(pod, sidecarSets, containers) != cs.expectedCompleted {
t.Fatalf("IsSidecarContainerUpdateCompleted failed: %s", cs.name)
}
})
}
}
func TestGetPodSidecarSetRevision(t *testing.T) {
cases := []struct {
name string

View File

@ -0,0 +1,323 @@
/*
Copyright 2021 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllerfinder
import (
"context"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// ScaleAndSelector is used to return (controller, scale, selector) fields from the
// controller finder functions.
type ScaleAndSelector struct {
ControllerReference
// controller.spec.Replicas
Scale int32
// controller.spec.Selector
Selector *metav1.LabelSelector
}
type ControllerReference struct {
// API version of the referent.
APIVersion string `json:"apiVersion" protobuf:"bytes,5,opt,name=apiVersion"`
// Kind of the referent.
Kind string `json:"kind" protobuf:"bytes,1,opt,name=kind"`
// Name of the referent.
Name string `json:"name" protobuf:"bytes,3,opt,name=name"`
// UID of the referent.
UID types.UID `json:"uid" protobuf:"bytes,4,opt,name=uid,casttype=k8s.io/apimachinery/pkg/types.UID"`
}
// PodControllerFinder is a function type that maps a pod to a list of
// controllers and their scale.
type PodControllerFinder func(ref ControllerReference, namespace string) (*ScaleAndSelector, error)
type ControllerFinder struct {
client.Client
}
func NewControllerFinder(c client.Client) *ControllerFinder {
return &ControllerFinder{
Client: c,
}
}
func (r *ControllerFinder) GetScaleAndSelectorForRef(apiVersion, kind, ns, name string, uid types.UID) (*ScaleAndSelector, error) {
var scaleNSelector *ScaleAndSelector
var err error
targetRef := ControllerReference{
APIVersion: apiVersion,
Kind: kind,
Name: name,
UID: uid,
}
for _, finder := range r.Finders() {
scaleNSelector, err = finder(targetRef, ns)
if err != nil {
return nil, err
}
if scaleNSelector != nil {
break
}
}
return scaleNSelector, nil
}
func (r *ControllerFinder) Finders() []PodControllerFinder {
return []PodControllerFinder{r.getPodReplicationController, r.getPodDeployment, r.getPodReplicaSet,
r.getPodStatefulSet, r.getPodKruiseCloneSet, r.getPodKruiseStatefulSet}
}
var (
// kubernetes
controllerKindRS = apps.SchemeGroupVersion.WithKind("ReplicaSet")
controllerKindSS = apps.SchemeGroupVersion.WithKind("StatefulSet")
controllerKindRC = corev1.SchemeGroupVersion.WithKind("ReplicationController")
controllerKindDep = apps.SchemeGroupVersion.WithKind("Deployment")
// kruise
controllerKruiseKindCS = appsv1alpha1.SchemeGroupVersion.WithKind("CloneSet")
controllerKruiseKindSS = appsv1beta1.SchemeGroupVersion.WithKind("StatefulSet")
)
// getPodReplicaSet finds a replicaset which has no matching deployments.
func (r *ControllerFinder) getPodReplicaSet(ref ControllerReference, namespace string) (*ScaleAndSelector, error) {
// This error is irreversible, so there is no need to return error
ok, _ := verifyGroupKind(ref, controllerKindRS.Kind, []string{controllerKindRS.Group})
if !ok {
return nil, nil
}
replicaSet := &apps.ReplicaSet{}
err := r.Get(context.TODO(), client.ObjectKey{Namespace: namespace, Name: ref.Name}, replicaSet)
if err != nil {
// when error is NotFound, it is ok here.
if errors.IsNotFound(err) {
return nil, nil
}
return nil, err
}
if ref.UID != "" && replicaSet.UID != ref.UID {
return nil, nil
}
controllerRef := metav1.GetControllerOf(replicaSet)
if controllerRef != nil && controllerRef.Kind == controllerKindDep.Kind {
refSs := ControllerReference{
APIVersion: controllerRef.APIVersion,
Kind: controllerRef.Kind,
Name: controllerRef.Name,
UID: controllerRef.UID,
}
return r.getPodDeployment(refSs, namespace)
}
return &ScaleAndSelector{
Scale: *(replicaSet.Spec.Replicas),
Selector: replicaSet.Spec.Selector,
ControllerReference: ControllerReference{
APIVersion: replicaSet.APIVersion,
Kind: replicaSet.Kind,
Name: replicaSet.Name,
UID: replicaSet.UID,
},
}, nil
}
// getPodStatefulSet returns the statefulset referenced by the provided controllerRef.
func (r *ControllerFinder) getPodStatefulSet(ref ControllerReference, namespace string) (*ScaleAndSelector, error) {
// This error is irreversible, so there is no need to return error
ok, _ := verifyGroupKind(ref, controllerKindSS.Kind, []string{controllerKindSS.Group})
if !ok {
return nil, nil
}
statefulSet := &apps.StatefulSet{}
err := r.Get(context.TODO(), client.ObjectKey{Namespace: namespace, Name: ref.Name}, statefulSet)
if err != nil {
// when error is NotFound, it is ok here.
if errors.IsNotFound(err) {
return nil, nil
}
return nil, err
}
if ref.UID != "" && statefulSet.UID != ref.UID {
return nil, nil
}
return &ScaleAndSelector{
Scale: *(statefulSet.Spec.Replicas),
Selector: statefulSet.Spec.Selector,
ControllerReference: ControllerReference{
APIVersion: statefulSet.APIVersion,
Kind: statefulSet.Kind,
Name: statefulSet.Name,
UID: statefulSet.UID,
},
}, nil
}
// getPodDeployments finds deployments for any replicasets which are being managed by deployments.
func (r *ControllerFinder) getPodDeployment(ref ControllerReference, namespace string) (*ScaleAndSelector, error) {
// This error is irreversible, so there is no need to return error
ok, _ := verifyGroupKind(ref, controllerKindDep.Kind, []string{controllerKindDep.Group})
if !ok {
return nil, nil
}
deployment := &apps.Deployment{}
err := r.Get(context.TODO(), client.ObjectKey{Namespace: namespace, Name: ref.Name}, deployment)
if err != nil {
// when error is NotFound, it is ok here.
if errors.IsNotFound(err) {
return nil, nil
}
return nil, err
}
if ref.UID != "" && deployment.UID != ref.UID {
return nil, nil
}
return &ScaleAndSelector{
Scale: *(deployment.Spec.Replicas),
Selector: deployment.Spec.Selector,
ControllerReference: ControllerReference{
APIVersion: deployment.APIVersion,
Kind: deployment.Kind,
Name: deployment.Name,
UID: deployment.UID,
},
}, nil
}
func (r *ControllerFinder) getPodReplicationController(ref ControllerReference, namespace string) (*ScaleAndSelector, error) {
// This error is irreversible, so there is no need to return error
ok, _ := verifyGroupKind(ref, controllerKindRC.Kind, []string{controllerKindRC.Group})
if !ok {
return nil, nil
}
rc := &corev1.ReplicationController{}
err := r.Get(context.TODO(), client.ObjectKey{Namespace: namespace, Name: ref.Name}, rc)
if err != nil {
// when error is NotFound, it is ok here.
if errors.IsNotFound(err) {
return nil, nil
}
return nil, err
}
if ref.UID != "" && rc.UID != ref.UID {
return nil, nil
}
return &ScaleAndSelector{
Scale: *(rc.Spec.Replicas),
Selector: &metav1.LabelSelector{MatchLabels: rc.Spec.Selector},
ControllerReference: ControllerReference{
APIVersion: rc.APIVersion,
Kind: rc.Kind,
Name: rc.Name,
UID: rc.UID,
},
}, nil
}
// getPodStatefulSet returns the kruise cloneSet referenced by the provided controllerRef.
func (r *ControllerFinder) getPodKruiseCloneSet(ref ControllerReference, namespace string) (*ScaleAndSelector, error) {
// This error is irreversible, so there is no need to return error
ok, _ := verifyGroupKind(ref, controllerKruiseKindCS.Kind, []string{controllerKruiseKindCS.Group})
if !ok {
return nil, nil
}
cloneSet := &appsv1alpha1.CloneSet{}
err := r.Get(context.TODO(), client.ObjectKey{Namespace: namespace, Name: ref.Name}, cloneSet)
if err != nil {
// when error is NotFound, it is ok here.
if errors.IsNotFound(err) {
return nil, nil
}
return nil, err
}
if ref.UID != "" && cloneSet.UID != ref.UID {
return nil, nil
}
return &ScaleAndSelector{
Scale: *(cloneSet.Spec.Replicas),
Selector: cloneSet.Spec.Selector,
ControllerReference: ControllerReference{
APIVersion: cloneSet.APIVersion,
Kind: cloneSet.Kind,
Name: cloneSet.Name,
UID: cloneSet.UID,
},
}, nil
}
// getPodStatefulSet returns the kruise statefulset referenced by the provided controllerRef.
func (r *ControllerFinder) getPodKruiseStatefulSet(ref ControllerReference, namespace string) (*ScaleAndSelector, error) {
// This error is irreversible, so there is no need to return error
ok, _ := verifyGroupKind(ref, controllerKruiseKindSS.Kind, []string{controllerKruiseKindSS.Group})
if !ok {
return nil, nil
}
ss := &appsv1beta1.StatefulSet{}
err := r.Get(context.TODO(), client.ObjectKey{Namespace: namespace, Name: ref.Name}, ss)
if err != nil {
// when error is NotFound, it is ok here.
if errors.IsNotFound(err) {
return nil, nil
}
return nil, err
}
if ref.UID != "" && ss.UID != ref.UID {
return nil, nil
}
return &ScaleAndSelector{
Scale: *(ss.Spec.Replicas),
Selector: ss.Spec.Selector,
ControllerReference: ControllerReference{
APIVersion: ss.APIVersion,
Kind: ss.Kind,
Name: ss.Name,
UID: ss.UID,
},
}, nil
}
func verifyGroupKind(ref ControllerReference, expectedKind string, expectedGroups []string) (bool, error) {
gv, err := schema.ParseGroupVersion(ref.APIVersion)
if err != nil {
return false, err
}
if ref.Kind != expectedKind {
return false, nil
}
for _, group := range expectedGroups {
if group == gv.Group {
return true, nil
}
}
return false, nil
}

View File

@ -212,12 +212,12 @@ func TestCheckInPlaceUpdateCompleted(t *testing.T) {
}
for _, p := range succeedPods {
if err := defaultCheckInPlaceUpdateCompleted(p); err != nil {
if err := DefaultCheckInPlaceUpdateCompleted(p); err != nil {
t.Errorf("pod %s expected check success, got %v", p.Name, err)
}
}
for _, p := range failPods {
if err := defaultCheckInPlaceUpdateCompleted(p); err == nil {
if err := DefaultCheckInPlaceUpdateCompleted(p); err == nil {
t.Errorf("pod %s expected check failure, got no error", p.Name)
}
}

View File

@ -53,7 +53,7 @@ func SetOptionsDefaults(opts *UpdateOptions) *UpdateOptions {
}
if opts.CheckUpdateCompleted == nil {
opts.CheckUpdateCompleted = defaultCheckInPlaceUpdateCompleted
opts.CheckUpdateCompleted = DefaultCheckInPlaceUpdateCompleted
}
return opts
@ -145,10 +145,10 @@ func defaultCalculateInPlaceUpdateSpec(oldRevision, newRevision *apps.Controller
return updateSpec
}
// defaultCheckInPlaceUpdateCompleted checks whether imageID in pod status has been changed since in-place update.
// DefaultCheckInPlaceUpdateCompleted checks whether imageID in pod status has been changed since in-place update.
// If the imageID in containerStatuses has not been changed, we assume that kubelet has not updated
// containers in Pod.
func defaultCheckInPlaceUpdateCompleted(pod *v1.Pod) error {
func DefaultCheckInPlaceUpdateCompleted(pod *v1.Pod) error {
inPlaceUpdateState := appspub.InPlaceUpdateState{}
if stateStr, ok := appspub.GetInPlaceUpdateState(pod); !ok {
return nil

View File

@ -22,6 +22,7 @@ import (
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned"
"github.com/openkruise/kruise/pkg/control/sidecarcontrol"
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/test/e2e/framework"
@ -626,6 +627,14 @@ var _ = SIGDescribe("sidecarset", func() {
ginkgo.By(fmt.Sprintf("Creating Deployment(%s.%s)", deploymentIn.Namespace, deploymentIn.Name))
tester.CreateDeployment(deploymentIn)
except := &appsv1alpha1.SidecarSetStatus{
MatchedPods: 2,
UpdatedPods: 2,
UpdatedReadyPods: 2,
ReadyPods: 2,
}
tester.WaitForSidecarSetUpgradeComplete(sidecarSetIn, except)
// update sidecarSet sidecar container
sidecarSetIn.Spec.Containers[0].Image = "busybox:latest"
// update sidecarSet selector
@ -634,13 +643,13 @@ var _ = SIGDescribe("sidecarset", func() {
StrVal: "50%",
}
tester.UpdateSidecarSet(sidecarSetIn)
except := &appsv1alpha1.SidecarSetStatus{
except = &appsv1alpha1.SidecarSetStatus{
MatchedPods: 2,
UpdatedPods: 1,
UpdatedReadyPods: 1,
ReadyPods: 2,
}
time.Sleep(time.Minute)
time.Sleep(time.Second * 10)
tester.WaitForSidecarSetUpgradeComplete(sidecarSetIn, except)
// update sidecarSet partition, update all pods
@ -652,7 +661,7 @@ var _ = SIGDescribe("sidecarset", func() {
UpdatedReadyPods: 2,
ReadyPods: 2,
}
time.Sleep(time.Minute)
time.Sleep(time.Second * 10)
tester.WaitForSidecarSetUpgradeComplete(sidecarSetIn, except)
ginkgo.By(fmt.Sprintf("sidecarSet upgrade cold sidecar container image, and partition done"))
@ -723,17 +732,26 @@ var _ = SIGDescribe("sidecarset", func() {
ginkgo.By(fmt.Sprintf("Creating Deployment(%s.%s)", deploymentIn.Namespace, deploymentIn.Name))
tester.CreateDeployment(deploymentIn)
// update sidecarSet sidecar container
sidecarSetIn.Spec.InitContainers[0].Image = "busybox:failed"
tester.UpdateSidecarSet(sidecarSetIn)
// check sidecarSet
except := &appsv1alpha1.SidecarSetStatus{
MatchedPods: 1,
UpdatedPods: 1,
UpdatedReadyPods: 1,
ReadyPods: 1,
}
time.Sleep(time.Minute)
tester.WaitForSidecarSetUpgradeComplete(sidecarSetIn, except)
sidecarSetIn, _ = kc.AppsV1alpha1().SidecarSets().Get(sidecarSetIn.Name, metav1.GetOptions{})
hash1 := sidecarSetIn.Annotations[sidecarcontrol.SidecarSetHashAnnotation]
// update sidecarSet sidecar container
sidecarSetIn.Spec.InitContainers[0].Image = "busybox:failed"
tester.UpdateSidecarSet(sidecarSetIn)
ginkgo.By(fmt.Sprintf("update sidecarset init container image, and sidecarSet hash not changed"))
time.Sleep(time.Second * 5)
sidecarSetIn, _ = kc.AppsV1alpha1().SidecarSets().Get(sidecarSetIn.Name, metav1.GetOptions{})
hash2 := sidecarSetIn.Annotations[sidecarcontrol.SidecarSetHashAnnotation]
// hash not changed
gomega.Expect(hash1).To(gomega.Equal(hash2))
ginkgo.By(fmt.Sprintf("sidecarSet upgrade init sidecar container, and don't upgrade done"))
})
})