kruise/pkg/control/pubcontrol/pub_control.go

364 lines
13 KiB
Go

/*
Copyright 2021 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pubcontrol
import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"reflect"
"strconv"
"strings"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
kubecontroller "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/kubelet/types"
"sigs.k8s.io/controller-runtime/pkg/client"
appspub "github.com/openkruise/kruise/apis/apps/pub"
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
"github.com/openkruise/kruise/pkg/control/sidecarcontrol"
"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
)
type commonControl struct {
client.Client
controllerFinder *controllerfinder.ControllerFinder
}
func (c *commonControl) IsPodReady(pod *corev1.Pod) bool {
// 1. pod.Status.Phase == v1.PodRunning
// 2. pod.condition PodReady == true
if !util.IsRunningAndReady(pod) {
return false
}
// unavailable label
return !appspub.HasUnavailableLabel(pod.Labels)
}
func (c *commonControl) IsPodUnavailableChanged(oldPod, newPod *corev1.Pod) bool {
// If pod.spec changed, pod may be in unavailable condition
if !reflect.DeepEqual(oldPod.Spec, newPod.Spec) {
klog.V(3).InfoS("Pod specification changed, and maybe cause unavailability", "pod", klog.KObj(newPod))
return true
}
// pod add unavailable label
if !appspub.HasUnavailableLabel(oldPod.Labels) && appspub.HasUnavailableLabel(newPod.Labels) {
klog.V(3).InfoS("Pod add unavailable label, and maybe cause unavailability", "pod", klog.KObj(newPod))
return true
}
// pod other changes will not cause unavailability situation, then return false
klog.V(3).InfoS("Pod other changes, and maybe not cause unavailability", "pod", klog.KObj(newPod))
return false
}
// return true if this action won't cause unavailability
func (c *commonControl) CanResizeInplace(oldPod, newPod *corev1.Pod) bool {
if !appspub.HasUnavailableLabel(oldPod.Labels) && appspub.HasUnavailableLabel(newPod.Labels) {
klog.V(3).InfoS("Pod add unavailable label, can not resize inplace", "pod", klog.KObj(newPod))
return false
}
// now check if only container resources changed
oldPodSpecWithoutResourcesHash, err := podSpecWithoutResourcesHash(oldPod)
if err != nil {
klog.ErrorS(err, "Failed to get pod hash", "pod", klog.KObj(oldPod))
return false
}
newPodSpecWithoutResourcesHash, err := podSpecWithoutResourcesHash(newPod)
if err != nil {
klog.ErrorS(err, "Failed to get pod hash", "pod", klog.KObj(newPod))
return false
}
if oldPodSpecWithoutResourcesHash != newPodSpecWithoutResourcesHash {
klog.V(3).InfoS("Pod specification without resources changed, and maybe cause unavailability", "pod", klog.KObj(newPod))
return false
}
// now only containerResources changed.
// check if static pod
if types.IsStaticPod(newPod) || types.IsStaticPod(oldPod) {
klog.V(3).InfoS("Static pod resources changed, and maybe cause unavailability", "pod", klog.KObj(newPod))
return false
}
// check if QoS changed
if v1qos.ComputePodQOS(oldPod) != v1qos.ComputePodQOS(newPod) {
klog.V(3).InfoS("Pod QoS changed, and maybe cause unavailability", "pod", klog.KObj(newPod))
return false
}
// only containerResources changed, check if container resource resizePolicy is not NotRequired but
// resource changed, then return true
if !canInplaceUpdateResources(oldPod, newPod) {
klog.V(3).InfoS("Pod container resources changed with restartContainer policy, and maybe cause unavailability", "pod", klog.KObj(newPod))
return false
}
// TODO: Check annotations、labels bind with env using downwardAPI and considering `InPlaceUpdateEnvFromMetadata` featureGate in kruise-daemon
return true
}
// only allowedResizeResourceKey with NotRequired or "" restartPolicy can do inplace update
func canInplaceUpdateResources(oldPod, newPod *corev1.Pod) bool {
if len(oldPod.Spec.Containers) != len(newPod.Spec.Containers) {
return false
}
nativeInplaceUpdateImpl := inplaceupdate.GetNativeVerticalUpdateImpl()
oldCtrMap := make(map[string]corev1.Container)
for _, ctr := range oldPod.Spec.Containers {
oldCtrMap[ctr.Name] = ctr
}
for _, ctr := range newPod.Spec.Containers {
oldCtr, ok := oldCtrMap[ctr.Name]
if !ok {
return false
}
// check whether oldCtr to newCtr resource changed
rsNames := getResourcesNames(oldCtr.Resources.Requests, ctr.Resources.Requests,
oldCtr.Resources.Limits, ctr.Resources.Limits)
rsPolicyMap := getContainerRestartPolicyMap(&ctr)
for rsName := range rsNames {
if isContainerResourcesChanged(oldCtr, ctr, rsName) {
if !nativeInplaceUpdateImpl.CanResourcesResizeInPlace(string(rsName)) {
klog.V(3).InfoS("Pod container resources changed with not allowed resource, and maybe cause unavailability",
"pod", klog.KObj(newPod), "container", ctr.Name, "resourceName", rsName)
return false
}
// check restartPolicy
policy, ok := rsPolicyMap[rsName]
if !ok || policy == corev1.NotRequired || policy == "" {
continue
}
klog.V(3).InfoS("Pod container resources changed with restartContainer policy, and maybe cause unavailability",
"pod", klog.KObj(newPod), "container", ctr.Name, "restartPolicy", policy, "resourceName", rsName)
return false
}
}
}
return true
}
func getResourcesNames(rsLists ...corev1.ResourceList) sets.Set[corev1.ResourceName] {
names := sets.New[corev1.ResourceName]()
for _, list := range rsLists {
for name := range list {
names.Insert(name)
}
}
return names
}
func getContainerRestartPolicyMap(ctr *corev1.Container) map[corev1.ResourceName]corev1.ResourceResizeRestartPolicy {
policyMap := make(map[corev1.ResourceName]corev1.ResourceResizeRestartPolicy)
for _, policy := range ctr.ResizePolicy {
policyMap[policy.ResourceName] = policy.RestartPolicy
}
return policyMap
}
func isContainerResourcesChanged(oldCtr, newCtr corev1.Container, resourceName corev1.ResourceName) bool {
requestsChanged := isResourceChanged(oldCtr.Resources.Requests, newCtr.Resources.Requests, resourceName)
limitsChanged := isResourceChanged(oldCtr.Resources.Limits, newCtr.Resources.Limits, resourceName)
return requestsChanged || limitsChanged
}
func isResourceChanged(a, b corev1.ResourceList, resourceName corev1.ResourceName) bool {
// Although the scheduler allocates a certain amount of space, such as 100m/200Mi,
// for null values in advance, this does not mean that the kubelet will do the same.
// Therefore, our current implementation imposes a strict constraint—requiring the
// resource values to be explicitly equal.
oldResource, oldOk := a[resourceName]
newResource, newOk := b[resourceName]
if !oldOk && !newOk {
return false
}
if oldOk && !newOk || !oldOk && newOk {
return true
}
// both exist
return !oldResource.Equal(newResource)
}
func podSpecWithoutResourcesHash(podIn *corev1.Pod) (string, error) {
pod := podIn.DeepCopy()
// do not need to process init-containers because they wont change
for i := range pod.Spec.Containers {
pod.Spec.Containers[i].Resources = corev1.ResourceRequirements{}
}
data, err := json.Marshal(pod.Spec)
if err != nil {
return "", err
}
return rand.SafeEncodeString(hash(string(data))), nil
}
// hash hashes `data` with sha256 and returns the hex string
func hash(data string) string {
return fmt.Sprintf("%x", sha256.Sum256([]byte(data)))
}
// GetPodsForPub returns Pods protected by the pub object.
// return two parameters
// 1. podList
// 2. expectedCount, the default is workload.Replicas
func (c *commonControl) GetPodsForPub(pub *policyv1alpha1.PodUnavailableBudget) ([]*corev1.Pod, int32, error) {
// if targetReference isn't nil, priority to take effect
var listOptions *client.ListOptions
if pub.Spec.TargetReference != nil {
ref := pub.Spec.TargetReference
matchedPods, expectedCount, err := c.controllerFinder.GetPodsForRef(ref.APIVersion, ref.Kind, pub.Namespace, ref.Name, true)
if value, _ := pub.Annotations[policyv1alpha1.PubProtectTotalReplicasAnnotation]; value != "" {
count, _ := strconv.ParseInt(value, 10, 32)
expectedCount = int32(count)
}
return matchedPods, expectedCount, err
} else if pub.Spec.Selector == nil {
klog.InfoS("Pub spec.Selector could not be empty", "pub", klog.KObj(pub))
return nil, 0, nil
}
// get pods for selector
labelSelector, err := util.ValidatedLabelSelectorAsSelector(pub.Spec.Selector)
if err != nil {
klog.InfoS("Pub ValidatedLabelSelectorAsSelector failed", "pub", klog.KObj(pub), "error", err)
return nil, 0, nil
}
listOptions = &client.ListOptions{Namespace: pub.Namespace, LabelSelector: labelSelector}
podList := &corev1.PodList{}
if err = c.List(context.TODO(), podList, listOptions, utilclient.DisableDeepCopy); err != nil {
return nil, 0, err
}
matchedPods := make([]*corev1.Pod, 0, len(podList.Items))
for i := range podList.Items {
pod := &podList.Items[i]
if kubecontroller.IsPodActive(pod) {
matchedPods = append(matchedPods, pod)
}
}
if value, _ := pub.Annotations[policyv1alpha1.PubProtectTotalReplicasAnnotation]; value != "" {
expectedCount, _ := strconv.ParseInt(value, 10, 32)
return matchedPods, int32(expectedCount), nil
}
expectedCount, err := c.controllerFinder.GetExpectedScaleForPods(matchedPods)
if err != nil {
return nil, 0, err
}
return matchedPods, expectedCount, nil
}
func (c *commonControl) IsPodStateConsistent(pod *corev1.Pod) bool {
// if all container image is digest format
// by comparing status.containers[x].ImageID with spec.container[x].Image can determine whether pod is consistent
allDigestImage := true
for _, container := range pod.Spec.Containers {
//whether image is digest format,
//for example: docker.io/busybox@sha256:a9286defaba7b3a519d585ba0e37d0b2cbee74ebfe590960b0b1d6a5e97d1e1d
if !util.IsImageDigest(container.Image) {
allDigestImage = false
continue
}
if !util.IsPodContainerDigestEqual(sets.NewString(container.Name), pod) {
klog.V(5).InfoS("Pod container image was inconsistent", "pod", klog.KObj(pod), "containerName", container.Name)
return false
}
}
// If all spec.container[x].image is digest format, only check digest imageId
if allDigestImage {
return true
}
// check whether injected sidecar container is consistent
sidecarSets, sidecars := getSidecarSetsInPod(pod)
if sidecarSets.Len() > 0 && sidecars.Len() > 0 {
if !sidecarcontrol.IsSidecarContainerUpdateCompleted(pod, sidecarSets, sidecars) {
klog.V(5).InfoS("PodUnavailableBudget check pod was inconsistent", "pod", klog.KObj(pod))
return false
}
}
// whether other containers is consistent
if err := inplaceupdate.DefaultCheckInPlaceUpdateCompleted(pod); err != nil {
klog.V(5).InfoS("Failed to check pod InPlaceUpdate", "pod", klog.KObj(pod), "error", err)
return false
}
return true
}
func (c *commonControl) GetPubForPod(pod *corev1.Pod) (*policyv1alpha1.PodUnavailableBudget, error) {
if len(pod.Annotations) == 0 || pod.Annotations[PodRelatedPubAnnotation] == "" {
return nil, nil
}
pubName := pod.Annotations[PodRelatedPubAnnotation]
pub := &policyv1alpha1.PodUnavailableBudget{}
err := c.Get(context.TODO(), client.ObjectKey{Namespace: pod.Namespace, Name: pubName}, pub)
if err != nil {
if errors.IsNotFound(err) {
klog.InfoS("Pod pub was NotFound", "pod", klog.KObj(pod), "pubName", pubName)
return nil, nil
}
return nil, err
}
return pub, nil
}
func (c *commonControl) GetPodControllerOf(pod *corev1.Pod) *metav1.OwnerReference {
return metav1.GetControllerOf(pod)
}
func getSidecarSetsInPod(pod *corev1.Pod) (sidecarSets, containers sets.String) {
containers = sets.NewString()
sidecarSets = sets.NewString()
if setList, ok := pod.Annotations[sidecarcontrol.SidecarSetListAnnotation]; ok && len(setList) > 0 {
for _, sidecarSetName := range strings.Split(setList, ",") {
sidecarSets.Insert(sidecarSetName)
}
}
for _, container := range pod.Spec.Containers {
val := util.GetContainerEnvValue(&container, sidecarcontrol.SidecarEnvKey)
if val == "true" {
containers.Insert(container.Name)
}
}
return sidecarSets, containers
}