rollouts/pkg/webhook/workload/mutating/workload_update_handler.go

464 lines
16 KiB
Go

/*
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mutating
import (
"context"
"encoding/json"
"math"
"net/http"
"strings"
kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
admissionv1 "k8s.io/api/admission/v1"
v1 "k8s.io/api/admissionregistration/v1"
apps "k8s.io/api/apps/v1"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
labels2 "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
admission2 "k8s.io/apiserver/pkg/admission"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
appsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
appsv1beta1 "github.com/openkruise/rollouts/api/v1beta1"
"github.com/openkruise/rollouts/pkg/util"
utilclient "github.com/openkruise/rollouts/pkg/util/client"
util2 "github.com/openkruise/rollouts/pkg/webhook/util"
"github.com/openkruise/rollouts/pkg/webhook/util/configuration"
)
// WorkloadHandler handles Pod
type WorkloadHandler struct {
// To use the client, you need to do the following:
// - uncomment it
// - import sigs.k8s.io/controller-runtime/pkg/client
// - uncomment the InjectClient method at the bottom of this file.
Client client.Client
// Decoder decodes objects
Decoder *admission.Decoder
Finder *util.ControllerFinder
}
var _ admission.Handler = &WorkloadHandler{}
// Handle handles admission requests.
func (h *WorkloadHandler) Handle(ctx context.Context, req admission.Request) admission.Response {
// if subResources, then ignore
if req.Operation != admissionv1.Update || req.SubResource != "" {
return admission.Allowed("")
}
meetingRules, err := h.checkWorkloadRules(ctx, req)
if err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
if !meetingRules {
return admission.Allowed("")
}
// Because kruise Rollout is a bypassed approach, needs to be determined in the webhook if the workload meet to enter the rollout progressing:
// 1. Traffic Routing, all the following conditions must be met
// a. PodTemplateSpec is changed
// b. Workload must only contain one version of Pods
// 2. No Traffic Routing, Only Release in batches
// a. No RolloutId
// - PodTemplateSpec is changed
// b. Configure RolloutId
// - RolloutId and PodTemplateSpec change, enter the rollout progressing.
// - RolloutId changes and PodTemplateSpec no change, enter the rollout progressing
// - RolloutId no change and PodTemplateSpec change, do not enter the rollout progressing
switch req.Kind.Group {
// kruise cloneSet
case kruiseappsv1alpha1.GroupVersion.Group:
switch req.Kind.Kind {
case util.ControllerKruiseKindCS.Kind:
// check cloneset
newObj := &kruiseappsv1alpha1.CloneSet{}
if err := h.Decoder.Decode(req, newObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
newObjClone := newObj.DeepCopy()
oldObj := &kruiseappsv1alpha1.CloneSet{}
if err := h.Decoder.Decode(
admission.Request{AdmissionRequest: admissionv1.AdmissionRequest{Object: req.AdmissionRequest.OldObject}},
oldObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
changed, err := h.handleCloneSet(newObjClone, oldObj)
if err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
if !changed {
return admission.Allowed("")
}
marshaled, err := json.Marshal(newObjClone)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
}
original, err := json.Marshal(newObj)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
}
return admission.PatchResponseFromRaw(original, marshaled)
case util.ControllerKruiseKindDS.Kind:
// check daemonset
newObj := &kruiseappsv1alpha1.DaemonSet{}
if err := h.Decoder.Decode(req, newObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
newObjClone := newObj.DeepCopy()
oldObj := &kruiseappsv1alpha1.DaemonSet{}
if err := h.Decoder.Decode(
admission.Request{AdmissionRequest: admissionv1.AdmissionRequest{Object: req.AdmissionRequest.OldObject}},
oldObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
changed, err := h.handleDaemonSet(newObjClone, oldObj)
if err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
if !changed {
return admission.Allowed("")
}
marshaled, err := json.Marshal(newObjClone)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
}
original, err := json.Marshal(newObj)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
}
return admission.PatchResponseFromRaw(original, marshaled)
}
// native k8s deloyment
case apps.SchemeGroupVersion.Group:
switch req.Kind.Kind {
case util.ControllerKindDep.Kind:
// check deployment
newObj := &apps.Deployment{}
if err := h.Decoder.Decode(req, newObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
newObjClone := newObj.DeepCopy()
oldObj := &apps.Deployment{}
if err := h.Decoder.Decode(
admission.Request{AdmissionRequest: admissionv1.AdmissionRequest{Object: req.AdmissionRequest.OldObject}},
oldObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
changed, err := h.handleDeployment(newObjClone, oldObj)
if err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
if !changed {
return admission.Allowed("")
}
marshaled, err := json.Marshal(newObjClone)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
}
original, err := json.Marshal(newObj)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
}
return admission.PatchResponseFromRaw(original, marshaled)
}
}
return admission.Allowed("")
}
func (h *WorkloadHandler) handleDeployment(newObj, oldObj *apps.Deployment) (bool, error) {
// make sure matched Rollout CR always exists
rollout, err := h.fetchMatchedRollout(newObj)
if err != nil {
return false, err
} else if rollout == nil || rollout.Spec.Strategy.IsEmptyRelease() {
return false, nil
}
// in rollout progressing
if newObj.Annotations[util.InRolloutProgressingAnnotation] != "" {
modified := false
strategy := util.GetDeploymentStrategy(newObj)
// partition
if strings.EqualFold(string(strategy.RollingStyle), string(appsv1alpha1.PartitionRollingStyle)) {
if !newObj.Spec.Paused {
modified = true
newObj.Spec.Paused = true
}
// Make sure it is always Recreate to disable native controller
if newObj.Spec.Strategy.Type == apps.RollingUpdateDeploymentStrategyType {
modified = true
newObj.Spec.Strategy.Type = apps.RecreateDeploymentStrategyType
}
if newObj.Spec.Strategy.RollingUpdate != nil {
modified = true
// Allow to modify RollingUpdate config during rolling
strategy.RollingUpdate = newObj.Spec.Strategy.RollingUpdate
newObj.Spec.Strategy.RollingUpdate = nil
}
if isEffectiveDeploymentRevisionChange(oldObj, newObj) {
modified = true
strategy.Paused = true
}
appsv1alpha1.SetDefaultDeploymentStrategy(&strategy)
setDeploymentStrategyAnnotation(strategy, newObj)
// bluegreenStyle
} else if len(newObj.GetAnnotations()[appsv1beta1.OriginalDeploymentStrategyAnnotation]) > 0 {
if isEffectiveDeploymentRevisionChange(oldObj, newObj) {
newObj.Spec.Paused, modified = true, true
// disallow continuous release, allow rollback
klog.Warningf("rollback or continuous release detected in Deployment webhook, while only rollback is allowed for bluegreen release for now")
}
// not allow to modify Strategy.Type to Recreate
if newObj.Spec.Strategy.Type != apps.RollingUpdateDeploymentStrategyType {
modified = true
newObj.Spec.Strategy.Type = oldObj.Spec.Strategy.Type
klog.Warningf("Not allow to modify Strategy.Type to Recreate")
}
} else { // default
if !newObj.Spec.Paused {
modified = true
newObj.Spec.Paused = true
}
// Do not allow to modify strategy as Recreate during rolling
if newObj.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType {
modified = true
newObj.Spec.Strategy = oldObj.Spec.Strategy
klog.Warningf("")
}
}
return modified, nil
}
// indicate whether the workload can enter the rollout process
// replicas > 0
if newObj.Spec.Replicas != nil && *newObj.Spec.Replicas == 0 {
return false, nil
}
if !isEffectiveDeploymentRevisionChange(oldObj, newObj) {
return false, nil
}
rss, err := h.Finder.GetReplicaSetsForDeployment(newObj)
if err != nil || len(rss) == 0 {
klog.Warningf("Cannot find any activate replicaset for deployment %s/%s, no need to rolling", newObj.Namespace, newObj.Name)
return false, nil
}
// if traffic routing, workload must only be one version of Pods
if rollout.Spec.Strategy.HasTrafficRoutings() {
if len(rss) != 1 {
klog.Warningf("Because deployment(%s/%s) have multiple versions of Pods, so can not enter rollout progressing", newObj.Namespace, newObj.Name)
return false, nil
}
}
// label the stable version replicaset
_, stableRS := util.FindCanaryAndStableReplicaSet(rss, newObj)
if stableRS == nil {
klog.Warningf("Cannot find any stable replicaset for deployment %s/%s", newObj.Namespace, newObj.Name)
} else {
if newObj.Labels == nil {
newObj.Labels = map[string]string{}
}
// blueGreen also need the stable revision label
newObj.Labels[appsv1alpha1.DeploymentStableRevisionLabel] = stableRS.Labels[apps.DefaultDeploymentUniqueLabelKey]
}
// need set workload paused = true
newObj.Spec.Paused = true
state := &util.RolloutState{RolloutName: rollout.Name}
by, _ := json.Marshal(state)
if newObj.Annotations == nil {
newObj.Annotations = map[string]string{}
}
newObj.Annotations[util.InRolloutProgressingAnnotation] = string(by)
klog.Infof("Deployment(%s/%s) will be released incrementally based on Rollout(%s)", newObj.Namespace, newObj.Name, rollout.Name)
return true, nil
}
func (h *WorkloadHandler) handleCloneSet(newObj, oldObj *kruiseappsv1alpha1.CloneSet) (bool, error) {
// indicate whether the workload can enter the rollout process
// when cloneSet don't contain any pods, no need to enter rollout progressing
if newObj.Spec.Replicas != nil && *newObj.Spec.Replicas == 0 {
return false, nil
}
if newObj.Annotations[appsv1beta1.RolloutIDLabel] != "" &&
oldObj.Annotations[appsv1beta1.RolloutIDLabel] == newObj.Annotations[appsv1beta1.RolloutIDLabel] {
return false, nil
} else if newObj.Annotations[appsv1beta1.RolloutIDLabel] == "" && util.EqualIgnoreHash(&oldObj.Spec.Template, &newObj.Spec.Template) {
return false, nil
}
rollout, err := h.fetchMatchedRollout(newObj)
if err != nil {
return false, err
} else if rollout == nil || rollout.Spec.Strategy.IsEmptyRelease() {
return false, nil
}
// if traffic routing, there must only be one version of Pods
if rollout.Spec.Strategy.HasTrafficRoutings() && newObj.Status.Replicas != newObj.Status.UpdatedReplicas {
klog.Warningf("Because cloneSet(%s/%s) have multiple versions of Pods, so can not enter rollout progressing", newObj.Namespace, newObj.Name)
return false, nil
}
newObj.Spec.UpdateStrategy.Partition = &intstr.IntOrString{Type: intstr.String, StrVal: "100%"}
state := &util.RolloutState{RolloutName: rollout.Name}
by, _ := json.Marshal(state)
if newObj.Annotations == nil {
newObj.Annotations = map[string]string{}
}
newObj.Annotations[util.InRolloutProgressingAnnotation] = string(by)
klog.Infof("CloneSet(%s/%s) will be released incrementally based on Rollout(%s)", newObj.Namespace, newObj.Name, rollout.Name)
return true, nil
}
func (h *WorkloadHandler) handleDaemonSet(newObj, oldObj *kruiseappsv1alpha1.DaemonSet) (bool, error) {
// indicate whether the workload can enter the rollout process
if newObj.Annotations[appsv1beta1.RolloutIDLabel] != "" &&
oldObj.Annotations[appsv1beta1.RolloutIDLabel] == newObj.Annotations[appsv1beta1.RolloutIDLabel] {
return false, nil
} else if newObj.Annotations[appsv1beta1.RolloutIDLabel] == "" && util.EqualIgnoreHash(&oldObj.Spec.Template, &newObj.Spec.Template) {
return false, nil
}
rollout, err := h.fetchMatchedRollout(newObj)
if err != nil {
return false, err
} else if rollout == nil || rollout.Spec.Strategy.IsEmptyRelease() {
return false, nil
}
newObj.Spec.UpdateStrategy.RollingUpdate.Partition = pointer.Int32(math.MaxInt16)
state := &util.RolloutState{RolloutName: rollout.Name}
by, _ := json.Marshal(state)
if newObj.Annotations == nil {
newObj.Annotations = map[string]string{}
}
newObj.Annotations[util.InRolloutProgressingAnnotation] = string(by)
klog.Infof("DaemonSet(%s/%s) will be released incrementally based on Rollout(%s)", newObj.Namespace, newObj.Name, rollout.Name)
return true, nil
}
func (h *WorkloadHandler) fetchMatchedRollout(obj client.Object) (*appsv1beta1.Rollout, error) {
oGv := obj.GetObjectKind().GroupVersionKind()
rolloutList := &appsv1beta1.RolloutList{}
if err := h.Client.List(context.TODO(), rolloutList, utilclient.DisableDeepCopy,
&client.ListOptions{Namespace: obj.GetNamespace()}); err != nil {
klog.Errorf("WorkloadHandler List rollout failed: %s", err.Error())
return nil, err
}
for i := range rolloutList.Items {
rollout := &rolloutList.Items[i]
if !rollout.DeletionTimestamp.IsZero() {
continue
}
if rollout.Status.Phase == appsv1beta1.RolloutPhaseDisabled {
klog.Infof("Disabled rollout(%s/%s) fetched when fetching matched rollout", rollout.Namespace, rollout.Name)
continue
}
ref := rollout.Spec.WorkloadRef
gv, err := schema.ParseGroupVersion(ref.APIVersion)
if err != nil {
klog.Warningf("ParseGroupVersion rollout(%s/%s) ref failed: %s", rollout.Namespace, rollout.Name, err.Error())
continue
}
if oGv.Group == gv.Group && oGv.Kind == ref.Kind && obj.GetName() == ref.Name {
return rollout, nil
}
}
return nil, nil
}
func isEffectiveDeploymentRevisionChange(oldObj, newObj *apps.Deployment) bool {
if newObj.Annotations[appsv1beta1.RolloutIDLabel] != "" &&
oldObj.Annotations[appsv1beta1.RolloutIDLabel] == newObj.Annotations[appsv1beta1.RolloutIDLabel] {
return false
} else if newObj.Annotations[appsv1beta1.RolloutIDLabel] == "" &&
util.EqualIgnoreHash(&oldObj.Spec.Template, &newObj.Spec.Template) {
return false
}
return true
}
func setDeploymentStrategyAnnotation(strategy appsv1alpha1.DeploymentStrategy, d *apps.Deployment) {
strategyAnno, _ := json.Marshal(&strategy)
d.Annotations[appsv1alpha1.DeploymentStrategyAnnotation] = string(strategyAnno)
}
func (h *WorkloadHandler) checkWorkloadRules(ctx context.Context, req admission.Request) (bool, error) {
webhook := &v1.MutatingWebhookConfiguration{}
if err := h.Client.Get(ctx, types.NamespacedName{Name: configuration.MutatingWebhookConfigurationName}, webhook); err != nil {
return false, err
}
newObject := unstructured.Unstructured{}
if err := h.Decoder.Decode(req, &newObject); err != nil {
return false, err
}
labels := newObject.GetLabels()
attr, err := constructAttr(req)
if err != nil {
return false, err
}
for _, webhook := range webhook.Webhooks {
for _, rule := range webhook.Rules {
m := util2.Matcher{Rule: rule, Attr: attr}
if m.Matches() {
selector, err := v12.LabelSelectorAsSelector(webhook.ObjectSelector)
if err != nil {
return false, nil
}
if selector.Matches(labels2.Set(labels)) {
return true, nil
}
}
}
}
return false, nil
}
func constructAttr(req admission.Request) (admission2.Attributes, error) {
obj := unstructured.Unstructured{}
if err := json.Unmarshal(req.Object.Raw, &obj); err != nil {
return nil, err
}
return admission2.NewAttributesRecord(&obj, nil, obj.GetObjectKind().GroupVersionKind(), obj.GetNamespace(), obj.GetName(),
schema.GroupVersionResource{
Group: req.Resource.Group,
Version: req.Resource.Version,
Resource: req.Resource.Resource,
}, req.SubResource, admission2.Operation(req.Operation), nil, *req.DryRun, nil), nil
}