mirror of https://github.com/openkruise/kruise.git
364 lines
13 KiB
Go
364 lines
13 KiB
Go
/*
|
|
Copyright 2021 The Kruise Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package pubcontrol
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/json"
|
|
"fmt"
|
|
"reflect"
|
|
"strconv"
|
|
"strings"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/rand"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/klog/v2"
|
|
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
|
|
kubecontroller "k8s.io/kubernetes/pkg/controller"
|
|
"k8s.io/kubernetes/pkg/kubelet/types"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
|
|
appspub "github.com/openkruise/kruise/apis/apps/pub"
|
|
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
|
|
"github.com/openkruise/kruise/pkg/control/sidecarcontrol"
|
|
"github.com/openkruise/kruise/pkg/util"
|
|
utilclient "github.com/openkruise/kruise/pkg/util/client"
|
|
"github.com/openkruise/kruise/pkg/util/controllerfinder"
|
|
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
|
|
)
|
|
|
|
type commonControl struct {
|
|
client.Client
|
|
controllerFinder *controllerfinder.ControllerFinder
|
|
}
|
|
|
|
func (c *commonControl) IsPodReady(pod *corev1.Pod) bool {
|
|
// 1. pod.Status.Phase == v1.PodRunning
|
|
// 2. pod.condition PodReady == true
|
|
if !util.IsRunningAndReady(pod) {
|
|
return false
|
|
}
|
|
|
|
// unavailable label
|
|
return !appspub.HasUnavailableLabel(pod.Labels)
|
|
}
|
|
|
|
func (c *commonControl) IsPodUnavailableChanged(oldPod, newPod *corev1.Pod) bool {
|
|
// If pod.spec changed, pod may be in unavailable condition
|
|
if !reflect.DeepEqual(oldPod.Spec, newPod.Spec) {
|
|
klog.V(3).InfoS("Pod specification changed, and maybe cause unavailability", "pod", klog.KObj(newPod))
|
|
return true
|
|
}
|
|
// pod add unavailable label
|
|
if !appspub.HasUnavailableLabel(oldPod.Labels) && appspub.HasUnavailableLabel(newPod.Labels) {
|
|
klog.V(3).InfoS("Pod add unavailable label, and maybe cause unavailability", "pod", klog.KObj(newPod))
|
|
return true
|
|
}
|
|
// pod other changes will not cause unavailability situation, then return false
|
|
|
|
klog.V(3).InfoS("Pod other changes, and maybe not cause unavailability", "pod", klog.KObj(newPod))
|
|
return false
|
|
}
|
|
|
|
// return true if this action won't cause unavailability
|
|
func (c *commonControl) CanResizeInplace(oldPod, newPod *corev1.Pod) bool {
|
|
if !appspub.HasUnavailableLabel(oldPod.Labels) && appspub.HasUnavailableLabel(newPod.Labels) {
|
|
klog.V(3).InfoS("Pod add unavailable label, can not resize inplace", "pod", klog.KObj(newPod))
|
|
return false
|
|
}
|
|
// now check if only container resources changed
|
|
oldPodSpecWithoutResourcesHash, err := podSpecWithoutResourcesHash(oldPod)
|
|
if err != nil {
|
|
klog.ErrorS(err, "Failed to get pod hash", "pod", klog.KObj(oldPod))
|
|
return false
|
|
}
|
|
|
|
newPodSpecWithoutResourcesHash, err := podSpecWithoutResourcesHash(newPod)
|
|
if err != nil {
|
|
klog.ErrorS(err, "Failed to get pod hash", "pod", klog.KObj(newPod))
|
|
return false
|
|
}
|
|
|
|
if oldPodSpecWithoutResourcesHash != newPodSpecWithoutResourcesHash {
|
|
klog.V(3).InfoS("Pod specification without resources changed, and maybe cause unavailability", "pod", klog.KObj(newPod))
|
|
return false
|
|
}
|
|
|
|
// now only containerResources changed.
|
|
// check if static pod
|
|
if types.IsStaticPod(newPod) || types.IsStaticPod(oldPod) {
|
|
klog.V(3).InfoS("Static pod resources changed, and maybe cause unavailability", "pod", klog.KObj(newPod))
|
|
return false
|
|
}
|
|
|
|
// check if QoS changed
|
|
if v1qos.ComputePodQOS(oldPod) != v1qos.ComputePodQOS(newPod) {
|
|
klog.V(3).InfoS("Pod QoS changed, and maybe cause unavailability", "pod", klog.KObj(newPod))
|
|
return false
|
|
}
|
|
|
|
// only containerResources changed, check if container resource resizePolicy is not NotRequired but
|
|
// resource changed, then return true
|
|
if !canInplaceUpdateResources(oldPod, newPod) {
|
|
klog.V(3).InfoS("Pod container resources changed with restartContainer policy, and maybe cause unavailability", "pod", klog.KObj(newPod))
|
|
return false
|
|
}
|
|
|
|
// TODO: Check annotations、labels bind with env using downwardAPI and considering `InPlaceUpdateEnvFromMetadata` featureGate in kruise-daemon
|
|
return true
|
|
}
|
|
|
|
// only allowedResizeResourceKey with NotRequired or "" restartPolicy can do inplace update
|
|
func canInplaceUpdateResources(oldPod, newPod *corev1.Pod) bool {
|
|
if len(oldPod.Spec.Containers) != len(newPod.Spec.Containers) {
|
|
return false
|
|
}
|
|
|
|
nativeInplaceUpdateImpl := inplaceupdate.GetNativeVerticalUpdateImpl()
|
|
|
|
oldCtrMap := make(map[string]corev1.Container)
|
|
for _, ctr := range oldPod.Spec.Containers {
|
|
oldCtrMap[ctr.Name] = ctr
|
|
}
|
|
|
|
for _, ctr := range newPod.Spec.Containers {
|
|
oldCtr, ok := oldCtrMap[ctr.Name]
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
// check whether oldCtr to newCtr resource changed
|
|
rsNames := getResourcesNames(oldCtr.Resources.Requests, ctr.Resources.Requests,
|
|
oldCtr.Resources.Limits, ctr.Resources.Limits)
|
|
rsPolicyMap := getContainerRestartPolicyMap(&ctr)
|
|
for rsName := range rsNames {
|
|
if isContainerResourcesChanged(oldCtr, ctr, rsName) {
|
|
if !nativeInplaceUpdateImpl.CanResourcesResizeInPlace(string(rsName)) {
|
|
klog.V(3).InfoS("Pod container resources changed with not allowed resource, and maybe cause unavailability",
|
|
"pod", klog.KObj(newPod), "container", ctr.Name, "resourceName", rsName)
|
|
return false
|
|
}
|
|
|
|
// check restartPolicy
|
|
policy, ok := rsPolicyMap[rsName]
|
|
if !ok || policy == corev1.NotRequired || policy == "" {
|
|
continue
|
|
}
|
|
|
|
klog.V(3).InfoS("Pod container resources changed with restartContainer policy, and maybe cause unavailability",
|
|
"pod", klog.KObj(newPod), "container", ctr.Name, "restartPolicy", policy, "resourceName", rsName)
|
|
return false
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func getResourcesNames(rsLists ...corev1.ResourceList) sets.Set[corev1.ResourceName] {
|
|
names := sets.New[corev1.ResourceName]()
|
|
for _, list := range rsLists {
|
|
for name := range list {
|
|
names.Insert(name)
|
|
}
|
|
}
|
|
return names
|
|
}
|
|
|
|
func getContainerRestartPolicyMap(ctr *corev1.Container) map[corev1.ResourceName]corev1.ResourceResizeRestartPolicy {
|
|
policyMap := make(map[corev1.ResourceName]corev1.ResourceResizeRestartPolicy)
|
|
for _, policy := range ctr.ResizePolicy {
|
|
policyMap[policy.ResourceName] = policy.RestartPolicy
|
|
}
|
|
return policyMap
|
|
}
|
|
|
|
func isContainerResourcesChanged(oldCtr, newCtr corev1.Container, resourceName corev1.ResourceName) bool {
|
|
requestsChanged := isResourceChanged(oldCtr.Resources.Requests, newCtr.Resources.Requests, resourceName)
|
|
limitsChanged := isResourceChanged(oldCtr.Resources.Limits, newCtr.Resources.Limits, resourceName)
|
|
return requestsChanged || limitsChanged
|
|
}
|
|
|
|
func isResourceChanged(a, b corev1.ResourceList, resourceName corev1.ResourceName) bool {
|
|
// Although the scheduler allocates a certain amount of space, such as 100m/200Mi,
|
|
// for null values in advance, this does not mean that the kubelet will do the same.
|
|
// Therefore, our current implementation imposes a strict constraint—requiring the
|
|
// resource values to be explicitly equal.
|
|
oldResource, oldOk := a[resourceName]
|
|
newResource, newOk := b[resourceName]
|
|
if !oldOk && !newOk {
|
|
return false
|
|
}
|
|
|
|
if oldOk && !newOk || !oldOk && newOk {
|
|
return true
|
|
}
|
|
|
|
// both exist
|
|
return !oldResource.Equal(newResource)
|
|
}
|
|
|
|
func podSpecWithoutResourcesHash(podIn *corev1.Pod) (string, error) {
|
|
pod := podIn.DeepCopy()
|
|
// do not need to process init-containers because they wont change
|
|
for i := range pod.Spec.Containers {
|
|
pod.Spec.Containers[i].Resources = corev1.ResourceRequirements{}
|
|
}
|
|
|
|
data, err := json.Marshal(pod.Spec)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return rand.SafeEncodeString(hash(string(data))), nil
|
|
}
|
|
|
|
// hash hashes `data` with sha256 and returns the hex string
|
|
func hash(data string) string {
|
|
return fmt.Sprintf("%x", sha256.Sum256([]byte(data)))
|
|
}
|
|
|
|
// GetPodsForPub returns Pods protected by the pub object.
|
|
// return two parameters
|
|
// 1. podList
|
|
// 2. expectedCount, the default is workload.Replicas
|
|
func (c *commonControl) GetPodsForPub(pub *policyv1alpha1.PodUnavailableBudget) ([]*corev1.Pod, int32, error) {
|
|
// if targetReference isn't nil, priority to take effect
|
|
var listOptions *client.ListOptions
|
|
if pub.Spec.TargetReference != nil {
|
|
ref := pub.Spec.TargetReference
|
|
matchedPods, expectedCount, err := c.controllerFinder.GetPodsForRef(ref.APIVersion, ref.Kind, pub.Namespace, ref.Name, true)
|
|
if value, _ := pub.Annotations[policyv1alpha1.PubProtectTotalReplicasAnnotation]; value != "" {
|
|
count, _ := strconv.ParseInt(value, 10, 32)
|
|
expectedCount = int32(count)
|
|
}
|
|
return matchedPods, expectedCount, err
|
|
} else if pub.Spec.Selector == nil {
|
|
klog.InfoS("Pub spec.Selector could not be empty", "pub", klog.KObj(pub))
|
|
return nil, 0, nil
|
|
}
|
|
// get pods for selector
|
|
labelSelector, err := util.ValidatedLabelSelectorAsSelector(pub.Spec.Selector)
|
|
if err != nil {
|
|
klog.InfoS("Pub ValidatedLabelSelectorAsSelector failed", "pub", klog.KObj(pub), "error", err)
|
|
return nil, 0, nil
|
|
}
|
|
listOptions = &client.ListOptions{Namespace: pub.Namespace, LabelSelector: labelSelector}
|
|
podList := &corev1.PodList{}
|
|
if err = c.List(context.TODO(), podList, listOptions, utilclient.DisableDeepCopy); err != nil {
|
|
return nil, 0, err
|
|
}
|
|
matchedPods := make([]*corev1.Pod, 0, len(podList.Items))
|
|
for i := range podList.Items {
|
|
pod := &podList.Items[i]
|
|
if kubecontroller.IsPodActive(pod) {
|
|
matchedPods = append(matchedPods, pod)
|
|
}
|
|
}
|
|
if value, _ := pub.Annotations[policyv1alpha1.PubProtectTotalReplicasAnnotation]; value != "" {
|
|
expectedCount, _ := strconv.ParseInt(value, 10, 32)
|
|
return matchedPods, int32(expectedCount), nil
|
|
}
|
|
expectedCount, err := c.controllerFinder.GetExpectedScaleForPods(matchedPods)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
return matchedPods, expectedCount, nil
|
|
}
|
|
|
|
func (c *commonControl) IsPodStateConsistent(pod *corev1.Pod) bool {
|
|
// if all container image is digest format
|
|
// by comparing status.containers[x].ImageID with spec.container[x].Image can determine whether pod is consistent
|
|
allDigestImage := true
|
|
for _, container := range pod.Spec.Containers {
|
|
//whether image is digest format,
|
|
//for example: docker.io/busybox@sha256:a9286defaba7b3a519d585ba0e37d0b2cbee74ebfe590960b0b1d6a5e97d1e1d
|
|
if !util.IsImageDigest(container.Image) {
|
|
allDigestImage = false
|
|
continue
|
|
}
|
|
|
|
if !util.IsPodContainerDigestEqual(sets.NewString(container.Name), pod) {
|
|
klog.V(5).InfoS("Pod container image was inconsistent", "pod", klog.KObj(pod), "containerName", container.Name)
|
|
return false
|
|
}
|
|
}
|
|
// If all spec.container[x].image is digest format, only check digest imageId
|
|
if allDigestImage {
|
|
return true
|
|
}
|
|
|
|
// check whether injected sidecar container is consistent
|
|
sidecarSets, sidecars := getSidecarSetsInPod(pod)
|
|
if sidecarSets.Len() > 0 && sidecars.Len() > 0 {
|
|
if !sidecarcontrol.IsSidecarContainerUpdateCompleted(pod, sidecarSets, sidecars) {
|
|
klog.V(5).InfoS("PodUnavailableBudget check pod was inconsistent", "pod", klog.KObj(pod))
|
|
return false
|
|
}
|
|
}
|
|
|
|
// whether other containers is consistent
|
|
if err := inplaceupdate.DefaultCheckInPlaceUpdateCompleted(pod); err != nil {
|
|
klog.V(5).InfoS("Failed to check pod InPlaceUpdate", "pod", klog.KObj(pod), "error", err)
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (c *commonControl) GetPubForPod(pod *corev1.Pod) (*policyv1alpha1.PodUnavailableBudget, error) {
|
|
if len(pod.Annotations) == 0 || pod.Annotations[PodRelatedPubAnnotation] == "" {
|
|
return nil, nil
|
|
}
|
|
pubName := pod.Annotations[PodRelatedPubAnnotation]
|
|
pub := &policyv1alpha1.PodUnavailableBudget{}
|
|
err := c.Get(context.TODO(), client.ObjectKey{Namespace: pod.Namespace, Name: pubName}, pub)
|
|
if err != nil {
|
|
if errors.IsNotFound(err) {
|
|
klog.InfoS("Pod pub was NotFound", "pod", klog.KObj(pod), "pubName", pubName)
|
|
return nil, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
return pub, nil
|
|
}
|
|
|
|
func (c *commonControl) GetPodControllerOf(pod *corev1.Pod) *metav1.OwnerReference {
|
|
return metav1.GetControllerOf(pod)
|
|
}
|
|
|
|
func getSidecarSetsInPod(pod *corev1.Pod) (sidecarSets, containers sets.String) {
|
|
containers = sets.NewString()
|
|
sidecarSets = sets.NewString()
|
|
if setList, ok := pod.Annotations[sidecarcontrol.SidecarSetListAnnotation]; ok && len(setList) > 0 {
|
|
for _, sidecarSetName := range strings.Split(setList, ",") {
|
|
sidecarSets.Insert(sidecarSetName)
|
|
}
|
|
}
|
|
for _, container := range pod.Spec.Containers {
|
|
val := util.GetContainerEnvValue(&container, sidecarcontrol.SidecarEnvKey)
|
|
if val == "true" {
|
|
containers.Insert(container.Name)
|
|
}
|
|
}
|
|
|
|
return sidecarSets, containers
|
|
}
|