init advanced deployment controller as native deployment controller (#104)
Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com> Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com> Co-authored-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
This commit is contained in:
parent
973e39b0c8
commit
b0c7b3b92a
1
go.mod
1
go.mod
|
|
@ -10,6 +10,7 @@ require (
|
|||
github.com/openkruise/kruise-api v1.3.0
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64
|
||||
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
|
||||
gopkg.in/yaml.v2 v2.4.0
|
||||
k8s.io/api v0.22.6
|
||||
k8s.io/apiextensions-apiserver v0.22.6
|
||||
|
|
|
|||
|
|
@ -0,0 +1,204 @@
|
|||
/*
|
||||
Copyright 2019 The Kruise Authors.
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package deployment
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"reflect"
|
||||
|
||||
"github.com/openkruise/rollouts/pkg/feature"
|
||||
clientutil "github.com/openkruise/rollouts/pkg/util/client"
|
||||
utilfeature "github.com/openkruise/rollouts/pkg/util/feature"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
appslisters "k8s.io/client-go/listers/apps/v1"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
toolscache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/klog/v2"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/manager"
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
"sigs.k8s.io/controller-runtime/pkg/source"
|
||||
)
|
||||
|
||||
func init() {
|
||||
flag.IntVar(&concurrentReconciles, "deployment-workers", concurrentReconciles, "Max concurrent workers for StatefulSet controller.")
|
||||
}
|
||||
|
||||
var (
|
||||
concurrentReconciles = 3
|
||||
)
|
||||
|
||||
// Add creates a new StatefulSet Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
|
||||
// and Start it when the Manager is Started.
|
||||
func Add(mgr manager.Manager) error {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(feature.AdvancedDeploymentGate) {
|
||||
klog.Warningf("Advanced deployment controller is disabled")
|
||||
return nil
|
||||
}
|
||||
r, err := newReconciler(mgr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return add(mgr, r)
|
||||
}
|
||||
|
||||
// newReconciler returns a new reconcile.Reconciler
|
||||
func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {
|
||||
cacher := mgr.GetCache()
|
||||
podInformer, err := cacher.GetInformerForKind(context.TODO(), v1.SchemeGroupVersion.WithKind("Pod"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dInformer, err := cacher.GetInformerForKind(context.TODO(), appsv1.SchemeGroupVersion.WithKind("Deployment"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rsInformer, err := cacher.GetInformerForKind(context.TODO(), appsv1.SchemeGroupVersion.WithKind("ReplicaSet"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Lister
|
||||
dLister := appslisters.NewDeploymentLister(dInformer.(toolscache.SharedIndexInformer).GetIndexer())
|
||||
rsLister := appslisters.NewReplicaSetLister(rsInformer.(toolscache.SharedIndexInformer).GetIndexer())
|
||||
podLister := corelisters.NewPodLister(podInformer.(toolscache.SharedIndexInformer).GetIndexer())
|
||||
|
||||
// Client & Recorder
|
||||
genericClient := clientutil.GetGenericClientWithName("advanced-deployment-controller")
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(klog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: genericClient.KubeClient.CoreV1().Events("")})
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "advanced-deployment-controller"})
|
||||
|
||||
// Deployment controller factory
|
||||
factory := &controllerFactory{
|
||||
client: genericClient.KubeClient,
|
||||
eventBroadcaster: eventBroadcaster,
|
||||
eventRecorder: recorder,
|
||||
dLister: dLister,
|
||||
rsLister: rsLister,
|
||||
podLister: podLister,
|
||||
}
|
||||
return &ReconcileDeployment{Client: mgr.GetClient(), controllerFactory: factory}, nil
|
||||
}
|
||||
|
||||
var _ reconcile.Reconciler = &ReconcileDeployment{}
|
||||
|
||||
// ReconcileDeployment reconciles a Deployment object
|
||||
type ReconcileDeployment struct {
|
||||
// client interface
|
||||
client.Client
|
||||
controllerFactory *controllerFactory
|
||||
}
|
||||
|
||||
// add adds a new Controller to mgr with r as the reconcile.Reconciler
|
||||
func add(mgr manager.Manager, r reconcile.Reconciler) error {
|
||||
// Create a new controller
|
||||
c, err := controller.New("advanced-deployment-controller", mgr, controller.Options{
|
||||
Reconciler: r, MaxConcurrentReconciles: concurrentReconciles})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = c.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForOwner{
|
||||
IsController: true, OwnerType: &appsv1.ReplicaSet{}}, predicate.Funcs{}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: handle deployment only when the deployment is under our control
|
||||
updateHandler := func(e event.UpdateEvent) bool {
|
||||
oldObject := e.ObjectOld.(*appsv1.Deployment)
|
||||
newObject := e.ObjectNew.(*appsv1.Deployment)
|
||||
if oldObject.Generation != newObject.Generation || newObject.DeletionTimestamp != nil {
|
||||
klog.V(3).Infof("Observed updated Spec for Deployment: %s/%s", newObject.Namespace, newObject.Name)
|
||||
return true
|
||||
}
|
||||
if len(oldObject.Annotations) != len(newObject.Annotations) || !reflect.DeepEqual(oldObject.Annotations, newObject.Annotations) {
|
||||
klog.V(3).Infof("Observed updated Annotation for Deployment: %s/%s", newObject.Namespace, newObject.Name)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Watch for changes to Deployment
|
||||
return c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{UpdateFunc: updateHandler})
|
||||
}
|
||||
|
||||
// Reconcile reads that state of the cluster for a Deployment object and makes changes based on the state read
|
||||
// and what is in the Deployment.Spec and Deployment.Annotations
|
||||
// Automatically generate RBAC rules to allow the Controller to read and write ReplicaSets
|
||||
func (r *ReconcileDeployment) Reconcile(_ context.Context, request reconcile.Request) (res reconcile.Result, retErr error) {
|
||||
deployment := new(appsv1.Deployment)
|
||||
err := r.Get(context.TODO(), request.NamespacedName, deployment)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
// Object not found, return. Created objects are automatically garbage collected.
|
||||
// For additional cleanup logic use finalizers.
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
// Error reading the object - requeue the request.
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// TODO: create new controller only when deployment is under our control
|
||||
dc, err := r.controllerFactory.NewController(deployment)
|
||||
if err != nil {
|
||||
return reconcile.Result{}, nil
|
||||
}
|
||||
|
||||
err = dc.syncDeployment(context.Background(), request.NamespacedName.String())
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
type controllerFactory struct {
|
||||
client clientset.Interface
|
||||
eventBroadcaster record.EventBroadcaster
|
||||
eventRecorder record.EventRecorder
|
||||
|
||||
// dLister can list/get deployments from the shared informer's store
|
||||
dLister appslisters.DeploymentLister
|
||||
// rsLister can list/get replica sets from the shared informer's store
|
||||
rsLister appslisters.ReplicaSetLister
|
||||
// podLister can list/get pods from the shared informer's store
|
||||
podLister corelisters.PodLister
|
||||
}
|
||||
|
||||
// NewController create a new DeploymentController
|
||||
// TODO: create new controller only when deployment is under our control
|
||||
func (f *controllerFactory) NewController(_ *appsv1.Deployment) (*DeploymentController, error) {
|
||||
return &DeploymentController{
|
||||
client: f.client,
|
||||
eventBroadcaster: f.eventBroadcaster,
|
||||
eventRecorder: f.eventRecorder,
|
||||
dLister: f.dLister,
|
||||
rsLister: f.rsLister,
|
||||
podLister: f.podLister,
|
||||
}, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,280 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package deployment contains all the logic for handling Kubernetes Deployments.
|
||||
// It implements a set of strategies (rolling, recreate) for deploying an application,
|
||||
// the means to rollback to previous versions, proportional scaling for mitigating
|
||||
// risk, cleanup policy, and other useful features of Deployments.
|
||||
package deployment
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/openkruise/rollouts/pkg/controller/deployment/util"
|
||||
apps "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
appslisters "k8s.io/client-go/listers/apps/v1"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
// maxRetries is the number of times a deployment will be retried before it is dropped out of the queue.
|
||||
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times
|
||||
// a deployment is going to be requeued:
|
||||
//
|
||||
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
|
||||
maxRetries = 15
|
||||
)
|
||||
|
||||
// controllerKind contains the schema.GroupVersionKind for this controller type.
|
||||
var controllerKind = apps.SchemeGroupVersion.WithKind("Deployment")
|
||||
|
||||
// DeploymentController is responsible for synchronizing Deployment objects stored
|
||||
// in the system with actual running replica sets and pods.
|
||||
type DeploymentController struct {
|
||||
client clientset.Interface
|
||||
|
||||
eventBroadcaster record.EventBroadcaster
|
||||
eventRecorder record.EventRecorder
|
||||
|
||||
// dLister can list/get deployments from the shared informer's store
|
||||
dLister appslisters.DeploymentLister
|
||||
// rsLister can list/get replica sets from the shared informer's store
|
||||
rsLister appslisters.ReplicaSetLister
|
||||
// podLister can list/get pods from the shared informer's store
|
||||
podLister corelisters.PodLister
|
||||
}
|
||||
|
||||
// getDeploymentsForReplicaSet returns a list of Deployments that potentially
|
||||
// match a ReplicaSet.
|
||||
func (dc *DeploymentController) getDeploymentsForReplicaSet(rs *apps.ReplicaSet) []*apps.Deployment {
|
||||
deployments, err := util.GetDeploymentsForReplicaSet(dc.dLister, rs)
|
||||
if err != nil || len(deployments) == 0 {
|
||||
return nil
|
||||
}
|
||||
// Because all ReplicaSet's belonging to a deployment should have a unique label key,
|
||||
// there should never be more than one deployment returned by the above method.
|
||||
// If that happens we should probably dynamically repair the situation by ultimately
|
||||
// trying to clean up one of the controllers, for now we just return the older one
|
||||
if len(deployments) > 1 {
|
||||
// ControllerRef will ensure we don't do anything crazy, but more than one
|
||||
// item in this list nevertheless constitutes user error.
|
||||
klog.V(4).InfoS("user error! more than one deployment is selecting replica set",
|
||||
"replicaSet", klog.KObj(rs), "labels", rs.Labels, "deployment", klog.KObj(deployments[0]))
|
||||
}
|
||||
return deployments
|
||||
}
|
||||
|
||||
// getDeploymentForPod returns the deployment managing the given Pod.
|
||||
func (dc *DeploymentController) getDeploymentForPod(pod *v1.Pod) *apps.Deployment {
|
||||
// Find the owning replica set
|
||||
var rs *apps.ReplicaSet
|
||||
var err error
|
||||
controllerRef := metav1.GetControllerOf(pod)
|
||||
if controllerRef == nil {
|
||||
// No controller owns this Pod.
|
||||
return nil
|
||||
}
|
||||
if controllerRef.Kind != apps.SchemeGroupVersion.WithKind("ReplicaSet").Kind {
|
||||
// Not a pod owned by a replica set.
|
||||
return nil
|
||||
}
|
||||
rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
|
||||
if err != nil || rs.UID != controllerRef.UID {
|
||||
klog.V(4).InfoS("Cannot get replicaset for pod", "ownerReference", controllerRef.Name, "pod", klog.KObj(pod), "err", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Now find the Deployment that owns that ReplicaSet.
|
||||
controllerRef = metav1.GetControllerOf(rs)
|
||||
if controllerRef == nil {
|
||||
return nil
|
||||
}
|
||||
return dc.resolveControllerRef(rs.Namespace, controllerRef)
|
||||
}
|
||||
|
||||
// resolveControllerRef returns the controller referenced by a ControllerRef,
|
||||
// or nil if the ControllerRef could not be resolved to a matching controller
|
||||
// of the correct Kind.
|
||||
func (dc *DeploymentController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.Deployment {
|
||||
// We can't look up by UID, so look up by Name and then verify UID.
|
||||
// Don't even try to look up by Name if it's the wrong Kind.
|
||||
if controllerRef.Kind != controllerKind.Kind {
|
||||
return nil
|
||||
}
|
||||
d, err := dc.dLister.Deployments(namespace).Get(controllerRef.Name)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
if d.UID != controllerRef.UID {
|
||||
// The controller we found with this Name is not the same one that the
|
||||
// ControllerRef points to.
|
||||
return nil
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
// getReplicaSetsForDeployment uses ControllerRefManager to reconcile
|
||||
// ControllerRef by adopting and orphaning.
|
||||
// It returns the list of ReplicaSets that this Deployment should manage.
|
||||
func (dc *DeploymentController) getReplicaSetsForDeployment(ctx context.Context, d *apps.Deployment) ([]*apps.ReplicaSet, error) {
|
||||
deploymentSelector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err)
|
||||
}
|
||||
// List all ReplicaSets to find those we own but that no longer match our
|
||||
// selector. They will be orphaned by ClaimReplicaSets().
|
||||
return dc.rsLister.ReplicaSets(d.Namespace).List(deploymentSelector)
|
||||
}
|
||||
|
||||
// getPodMapForDeployment returns the Pods managed by a Deployment.
|
||||
//
|
||||
// It returns a map from ReplicaSet UID to a list of Pods controlled by that RS,
|
||||
// according to the Pod's ControllerRef.
|
||||
// NOTE: The pod pointers returned by this method point the pod objects in the cache and thus
|
||||
// shouldn't be modified in any way.
|
||||
func (dc *DeploymentController) getPodMapForDeployment(d *apps.Deployment, rsList []*apps.ReplicaSet) (map[types.UID][]*v1.Pod, error) {
|
||||
// Get all Pods that potentially belong to this Deployment.
|
||||
selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pods, err := dc.podLister.Pods(d.Namespace).List(selector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Group Pods by their controller (if it's in rsList).
|
||||
podMap := make(map[types.UID][]*v1.Pod, len(rsList))
|
||||
for _, rs := range rsList {
|
||||
podMap[rs.UID] = []*v1.Pod{}
|
||||
}
|
||||
for _, pod := range pods {
|
||||
// Do not ignore inactive Pods because Recreate Deployments need to verify that no
|
||||
// Pods from older versions are running before spinning up new Pods.
|
||||
controllerRef := metav1.GetControllerOf(pod)
|
||||
if controllerRef == nil {
|
||||
continue
|
||||
}
|
||||
// Only append if we care about this UID.
|
||||
if _, ok := podMap[controllerRef.UID]; ok {
|
||||
podMap[controllerRef.UID] = append(podMap[controllerRef.UID], pod)
|
||||
}
|
||||
}
|
||||
return podMap, nil
|
||||
}
|
||||
|
||||
// syncDeployment will sync the deployment with the given key.
|
||||
// This function is not meant to be invoked concurrently with the same key.
|
||||
func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key)
|
||||
return err
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
klog.V(4).InfoS("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
|
||||
defer func() {
|
||||
klog.V(4).InfoS("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
|
||||
}()
|
||||
|
||||
deployment, err := dc.dLister.Deployments(namespace).Get(name)
|
||||
if errors.IsNotFound(err) {
|
||||
klog.V(2).InfoS("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Deep-copy otherwise we are mutating our cache.
|
||||
// TODO: Deep-copy only when needed.
|
||||
d := deployment.DeepCopy()
|
||||
|
||||
everything := metav1.LabelSelector{}
|
||||
if reflect.DeepEqual(d.Spec.Selector, &everything) {
|
||||
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
|
||||
if d.Status.ObservedGeneration < d.Generation {
|
||||
d.Status.ObservedGeneration = d.Generation
|
||||
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
|
||||
// through adoption/orphaning.
|
||||
rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// List all Pods owned by this Deployment, grouped by their ReplicaSet.
|
||||
// Current uses of the podMap are:
|
||||
//
|
||||
// * check if a Pod is labeled correctly with the pod-template-hash label.
|
||||
// * check that no old Pods are running in the middle of Recreate Deployments.
|
||||
podMap, err := dc.getPodMapForDeployment(d, rsList)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if d.DeletionTimestamp != nil {
|
||||
return dc.syncStatusOnly(ctx, d, rsList)
|
||||
}
|
||||
|
||||
// Update deployment conditions with an Unknown condition when pausing/resuming
|
||||
// a deployment. In this way, we can be sure that we won't timeout when a user
|
||||
// resumes a Deployment with a set progressDeadlineSeconds.
|
||||
if err = dc.checkPausedConditions(ctx, d); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if d.Spec.Paused {
|
||||
return dc.sync(ctx, d, rsList)
|
||||
}
|
||||
|
||||
// rollback is not re-entrant in case the underlying replica sets are updated with a new
|
||||
// revision so we should ensure that we won't proceed to update replica sets until we
|
||||
// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
|
||||
if getRollbackTo(d) != nil {
|
||||
return dc.rollback(ctx, d, rsList)
|
||||
}
|
||||
|
||||
scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if scalingEvent {
|
||||
return dc.sync(ctx, d, rsList)
|
||||
}
|
||||
|
||||
switch d.Spec.Strategy.Type {
|
||||
case apps.RecreateDeploymentStrategyType:
|
||||
return dc.rolloutRecreate(ctx, d, rsList, podMap)
|
||||
case apps.RollingUpdateDeploymentStrategyType:
|
||||
return dc.rolloutRolling(ctx, d, rsList)
|
||||
}
|
||||
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
|
||||
}
|
||||
|
|
@ -0,0 +1,200 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package deployment
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/openkruise/rollouts/pkg/controller/deployment/util"
|
||||
)
|
||||
|
||||
// syncRolloutStatus updates the status of a deployment during a rollout. There are
|
||||
// cases this helper will run that cannot be prevented from the scaling detection,
|
||||
// for example a resync of the deployment after it was scaled up. In those cases,
|
||||
// we shouldn't try to estimate any progress.
|
||||
func (dc *DeploymentController) syncRolloutStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
|
||||
newStatus := calculateStatus(allRSs, newRS, d)
|
||||
|
||||
// If there is no progressDeadlineSeconds set, remove any Progressing condition.
|
||||
if !util.HasProgressDeadline(d) {
|
||||
util.RemoveDeploymentCondition(&newStatus, apps.DeploymentProgressing)
|
||||
}
|
||||
|
||||
// If there is only one replica set that is active then that means we are not running
|
||||
// a new rollout and this is a resync where we don't need to estimate any progress.
|
||||
// In such a case, we should simply not estimate any progress for this deployment.
|
||||
currentCond := util.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
|
||||
isCompleteDeployment := newStatus.Replicas == newStatus.UpdatedReplicas && currentCond != nil && currentCond.Reason == util.NewRSAvailableReason
|
||||
// Check for progress only if there is a progress deadline set and the latest rollout
|
||||
// hasn't completed yet.
|
||||
if util.HasProgressDeadline(d) && !isCompleteDeployment {
|
||||
switch {
|
||||
case util.DeploymentComplete(d, &newStatus):
|
||||
// Update the deployment conditions with a message for the new replica set that
|
||||
// was successfully deployed. If the condition already exists, we ignore this update.
|
||||
msg := fmt.Sprintf("Deployment %q has successfully progressed.", d.Name)
|
||||
if newRS != nil {
|
||||
msg = fmt.Sprintf("ReplicaSet %q has successfully progressed.", newRS.Name)
|
||||
}
|
||||
condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, util.NewRSAvailableReason, msg)
|
||||
util.SetDeploymentCondition(&newStatus, *condition)
|
||||
|
||||
case util.DeploymentProgressing(d, &newStatus):
|
||||
// If there is any progress made, continue by not checking if the deployment failed. This
|
||||
// behavior emulates the rolling updater progressDeadline check.
|
||||
msg := fmt.Sprintf("Deployment %q is progressing.", d.Name)
|
||||
if newRS != nil {
|
||||
msg = fmt.Sprintf("ReplicaSet %q is progressing.", newRS.Name)
|
||||
}
|
||||
condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, util.ReplicaSetUpdatedReason, msg)
|
||||
// Update the current Progressing condition or add a new one if it doesn't exist.
|
||||
// If a Progressing condition with status=true already exists, we should update
|
||||
// everything but lastTransitionTime. SetDeploymentCondition already does that but
|
||||
// it also is not updating conditions when the reason of the new condition is the
|
||||
// same as the old. The Progressing condition is a special case because we want to
|
||||
// update with the same reason and change just lastUpdateTime iff we notice any
|
||||
// progress. That's why we handle it here.
|
||||
if currentCond != nil {
|
||||
if currentCond.Status == v1.ConditionTrue {
|
||||
condition.LastTransitionTime = currentCond.LastTransitionTime
|
||||
}
|
||||
util.RemoveDeploymentCondition(&newStatus, apps.DeploymentProgressing)
|
||||
}
|
||||
util.SetDeploymentCondition(&newStatus, *condition)
|
||||
|
||||
case util.DeploymentTimedOut(d, &newStatus):
|
||||
// Update the deployment with a timeout condition. If the condition already exists,
|
||||
// we ignore this update.
|
||||
msg := fmt.Sprintf("Deployment %q has timed out progressing.", d.Name)
|
||||
if newRS != nil {
|
||||
msg = fmt.Sprintf("ReplicaSet %q has timed out progressing.", newRS.Name)
|
||||
}
|
||||
condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionFalse, util.TimedOutReason, msg)
|
||||
util.SetDeploymentCondition(&newStatus, *condition)
|
||||
}
|
||||
}
|
||||
|
||||
// Move failure conditions of all replica sets in deployment conditions. For now,
|
||||
// only one failure condition is returned from getReplicaFailures.
|
||||
if replicaFailureCond := dc.getReplicaFailures(allRSs, newRS); len(replicaFailureCond) > 0 {
|
||||
// There will be only one ReplicaFailure condition on the replica set.
|
||||
util.SetDeploymentCondition(&newStatus, replicaFailureCond[0])
|
||||
} else {
|
||||
util.RemoveDeploymentCondition(&newStatus, apps.DeploymentReplicaFailure)
|
||||
}
|
||||
|
||||
// Do not update if there is nothing new to add.
|
||||
if reflect.DeepEqual(d.Status, newStatus) {
|
||||
// Requeue the deployment if required.
|
||||
dc.requeueStuckDeployment(d, newStatus)
|
||||
return nil
|
||||
}
|
||||
|
||||
newDeployment := d
|
||||
newDeployment.Status = newStatus
|
||||
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
// getReplicaFailures will convert replica failure conditions from replica sets
|
||||
// to deployment conditions.
|
||||
func (dc *DeploymentController) getReplicaFailures(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet) []apps.DeploymentCondition {
|
||||
var conditions []apps.DeploymentCondition
|
||||
if newRS != nil {
|
||||
for _, c := range newRS.Status.Conditions {
|
||||
if c.Type != apps.ReplicaSetReplicaFailure {
|
||||
continue
|
||||
}
|
||||
conditions = append(conditions, util.ReplicaSetToDeploymentCondition(c))
|
||||
}
|
||||
}
|
||||
|
||||
// Return failures for the new replica set over failures from old replica sets.
|
||||
if len(conditions) > 0 {
|
||||
return conditions
|
||||
}
|
||||
|
||||
for i := range allRSs {
|
||||
rs := allRSs[i]
|
||||
if rs == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, c := range rs.Status.Conditions {
|
||||
if c.Type != apps.ReplicaSetReplicaFailure {
|
||||
continue
|
||||
}
|
||||
conditions = append(conditions, util.ReplicaSetToDeploymentCondition(c))
|
||||
}
|
||||
}
|
||||
return conditions
|
||||
}
|
||||
|
||||
// used for unit testing
|
||||
var nowFn = func() time.Time { return time.Now() }
|
||||
|
||||
// requeueStuckDeployment checks whether the provided deployment needs to be synced for a progress
|
||||
// check. It returns the time after the deployment will be requeued for the progress check, 0 if it
|
||||
// will be requeued now, or -1 if it does not need to be requeued.
|
||||
func (dc *DeploymentController) requeueStuckDeployment(d *apps.Deployment, newStatus apps.DeploymentStatus) time.Duration {
|
||||
currentCond := util.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
|
||||
// Can't estimate progress if there is no deadline in the spec or progressing condition in the current status.
|
||||
if !util.HasProgressDeadline(d) || currentCond == nil {
|
||||
return time.Duration(-1)
|
||||
}
|
||||
// No need to estimate progress if the rollout is complete or already timed out.
|
||||
if util.DeploymentComplete(d, &newStatus) || currentCond.Reason == util.TimedOutReason {
|
||||
return time.Duration(-1)
|
||||
}
|
||||
// If there is no sign of progress at this point then there is a high chance that the
|
||||
// deployment is stuck. We should resync this deployment at some point in the future[1]
|
||||
// and check whether it has timed out. We definitely need this, otherwise we depend on the
|
||||
// controller resync interval. See https://github.com/kubernetes/kubernetes/issues/34458.
|
||||
//
|
||||
// [1] ProgressingCondition.LastUpdatedTime + progressDeadlineSeconds - time.Now()
|
||||
//
|
||||
// For example, if a Deployment updated its Progressing condition 3 minutes ago and has a
|
||||
// deadline of 10 minutes, it would need to be resynced for a progress check after 7 minutes.
|
||||
//
|
||||
// lastUpdated: 00:00:00
|
||||
// now: 00:03:00
|
||||
// progressDeadlineSeconds: 600 (10 minutes)
|
||||
//
|
||||
// lastUpdated + progressDeadlineSeconds - now => 00:00:00 + 00:10:00 - 00:03:00 => 07:00
|
||||
after := currentCond.LastUpdateTime.Time.Add(time.Duration(*d.Spec.ProgressDeadlineSeconds) * time.Second).Sub(nowFn())
|
||||
// If the remaining time is less than a second, then requeue the deployment immediately.
|
||||
// Make it ratelimited so we stay on the safe side, eventually the Deployment should
|
||||
// transition either to a Complete or to a TimedOut condition.
|
||||
if after < time.Second {
|
||||
klog.V(4).Infof("Queueing up deployment %q for a progress check now", d.Name)
|
||||
// dc.enqueueRateLimited(d) requeue
|
||||
return time.Duration(0)
|
||||
}
|
||||
klog.V(4).Infof("Queueing up deployment %q for a progress check after %ds", d.Name, int(after.Seconds()))
|
||||
// Add a second to avoid milliseconds skew in AddAfter.
|
||||
// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
|
||||
// dc.enqueueAfter(d, after+time.Second) requeue
|
||||
return after
|
||||
}
|
||||
|
|
@ -0,0 +1,132 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package deployment
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/openkruise/rollouts/pkg/controller/deployment/util"
|
||||
apps "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
// rolloutRecreate implements the logic for recreating a replica set.
|
||||
func (dc *DeploymentController) rolloutRecreate(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) error {
|
||||
// Don't create a new RS if not already existed, so that we avoid scaling up before scaling down.
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
allRSs := append(oldRSs, newRS)
|
||||
activeOldRSs := util.FilterActiveReplicaSets(oldRSs)
|
||||
|
||||
// scale down old replica sets.
|
||||
scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(ctx, activeOldRSs, d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if scaledDown {
|
||||
// Update DeploymentStatus.
|
||||
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
|
||||
}
|
||||
|
||||
// Do not process a deployment when it has old pods running.
|
||||
if oldPodsRunning(newRS, oldRSs, podMap) {
|
||||
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
|
||||
}
|
||||
|
||||
// If we need to create a new RS, create it now.
|
||||
if newRS == nil {
|
||||
newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
allRSs = append(oldRSs, newRS)
|
||||
}
|
||||
|
||||
// scale up new replica set.
|
||||
if _, err := dc.scaleUpNewReplicaSetForRecreate(ctx, newRS, d); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if util.DeploymentComplete(d, &d.Status) {
|
||||
if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Sync deployment status.
|
||||
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
|
||||
}
|
||||
|
||||
// scaleDownOldReplicaSetsForRecreate scales down old replica sets when deployment strategy is "Recreate".
|
||||
func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
|
||||
scaled := false
|
||||
for i := range oldRSs {
|
||||
rs := oldRSs[i]
|
||||
// Scaling not required.
|
||||
if *(rs.Spec.Replicas) == 0 {
|
||||
continue
|
||||
}
|
||||
scaledRS, updatedRS, err := dc.scaleReplicaSetAndRecordEvent(ctx, rs, 0, deployment)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if scaledRS {
|
||||
oldRSs[i] = updatedRS
|
||||
scaled = true
|
||||
}
|
||||
}
|
||||
return scaled, nil
|
||||
}
|
||||
|
||||
// oldPodsRunning returns whether there are old pods running or any of the old ReplicaSets thinks that it runs pods.
|
||||
func oldPodsRunning(newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) bool {
|
||||
if oldPods := util.GetActualReplicaCountForReplicaSets(oldRSs); oldPods > 0 {
|
||||
return true
|
||||
}
|
||||
for rsUID, podList := range podMap {
|
||||
// If the pods belong to the new ReplicaSet, ignore.
|
||||
if newRS != nil && newRS.UID == rsUID {
|
||||
continue
|
||||
}
|
||||
for _, pod := range podList {
|
||||
switch pod.Status.Phase {
|
||||
case v1.PodFailed, v1.PodSucceeded:
|
||||
// Don't count pods in terminal state.
|
||||
continue
|
||||
case v1.PodUnknown:
|
||||
// v1.PodUnknown is a deprecated status.
|
||||
// This logic is kept for backward compatibility.
|
||||
// This used to happen in situation like when the node is temporarily disconnected from the cluster.
|
||||
// If we can't be sure that the pod is not running, we have to count it.
|
||||
return true
|
||||
default:
|
||||
// Pod is not in terminal phase.
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate".
|
||||
func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(ctx context.Context, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
|
||||
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, *(deployment.Spec.Replicas), deployment)
|
||||
return scaled, err
|
||||
}
|
||||
|
|
@ -0,0 +1,149 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package deployment
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
deploymentutil "github.com/openkruise/rollouts/pkg/controller/deployment/util"
|
||||
)
|
||||
|
||||
// rollback the deployment to the specified revision. In any case cleanup the rollback spec.
|
||||
func (dc *DeploymentController) rollback(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
|
||||
newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
allRSs := append(allOldRSs, newRS)
|
||||
rollbackTo := getRollbackTo(d)
|
||||
// If rollback revision is 0, rollback to the last revision
|
||||
if rollbackTo.Revision == 0 {
|
||||
if rollbackTo.Revision = deploymentutil.LastRevision(allRSs); rollbackTo.Revision == 0 {
|
||||
// If we still can't find the last revision, gives up rollback
|
||||
dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.")
|
||||
// Gives up rollback
|
||||
return dc.updateDeploymentAndClearRollbackTo(ctx, d)
|
||||
}
|
||||
}
|
||||
for _, rs := range allRSs {
|
||||
v, err := deploymentutil.Revision(rs)
|
||||
if err != nil {
|
||||
klog.V(4).Infof("Unable to extract revision from deployment's replica set %q: %v", rs.Name, err)
|
||||
continue
|
||||
}
|
||||
if v == rollbackTo.Revision {
|
||||
klog.V(4).Infof("Found replica set %q with desired revision %d", rs.Name, v)
|
||||
// rollback by copying podTemplate.Spec from the replica set
|
||||
// revision number will be incremented during the next getAllReplicaSetsAndSyncRevision call
|
||||
// no-op if the spec matches current deployment's podTemplate.Spec
|
||||
performedRollback, err := dc.rollbackToTemplate(ctx, d, rs)
|
||||
if performedRollback && err == nil {
|
||||
dc.emitRollbackNormalEvent(d, fmt.Sprintf("Rolled back deployment %q to revision %d", d.Name, rollbackTo.Revision))
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find the revision to rollback to.")
|
||||
// Gives up rollback
|
||||
return dc.updateDeploymentAndClearRollbackTo(ctx, d)
|
||||
}
|
||||
|
||||
// rollbackToTemplate compares the templates of the provided deployment and replica set and
|
||||
// updates the deployment with the replica set template in case they are different. It also
|
||||
// cleans up the rollback spec so subsequent requeues of the deployment won't end up in here.
|
||||
func (dc *DeploymentController) rollbackToTemplate(ctx context.Context, d *apps.Deployment, rs *apps.ReplicaSet) (bool, error) {
|
||||
performedRollback := false
|
||||
if !deploymentutil.EqualIgnoreHash(&d.Spec.Template, &rs.Spec.Template) {
|
||||
klog.V(4).Infof("Rolling back deployment %q to template spec %+v", d.Name, rs.Spec.Template.Spec)
|
||||
deploymentutil.SetFromReplicaSetTemplate(d, rs.Spec.Template)
|
||||
// set RS (the old RS we'll rolling back to) annotations back to the deployment;
|
||||
// otherwise, the deployment's current annotations (should be the same as current new RS) will be copied to the RS after the rollback.
|
||||
//
|
||||
// For example,
|
||||
// A Deployment has old RS1 with annotation {change-cause:create}, and new RS2 {change-cause:edit}.
|
||||
// Note that both annotations are copied from Deployment, and the Deployment should be annotated {change-cause:edit} as well.
|
||||
// Now, rollback Deployment to RS1, we should update Deployment's pod-template and also copy annotation from RS1.
|
||||
// Deployment is now annotated {change-cause:create}, and we have new RS1 {change-cause:create}, old RS2 {change-cause:edit}.
|
||||
//
|
||||
// If we don't copy the annotations back from RS to deployment on rollback, the Deployment will stay as {change-cause:edit},
|
||||
// and new RS1 becomes {change-cause:edit} (copied from deployment after rollback), old RS2 {change-cause:edit}, which is not correct.
|
||||
deploymentutil.SetDeploymentAnnotationsTo(d, rs)
|
||||
performedRollback = true
|
||||
} else {
|
||||
klog.V(4).Infof("Rolling back to a revision that contains the same template as current deployment %q, skipping rollback...", d.Name)
|
||||
eventMsg := fmt.Sprintf("The rollback revision contains the same template as current deployment %q", d.Name)
|
||||
dc.emitRollbackWarningEvent(d, deploymentutil.RollbackTemplateUnchanged, eventMsg)
|
||||
}
|
||||
|
||||
return performedRollback, dc.updateDeploymentAndClearRollbackTo(ctx, d)
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) emitRollbackWarningEvent(d *apps.Deployment, reason, message string) {
|
||||
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, reason, message)
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) emitRollbackNormalEvent(d *apps.Deployment, message string) {
|
||||
dc.eventRecorder.Eventf(d, v1.EventTypeNormal, deploymentutil.RollbackDone, message)
|
||||
}
|
||||
|
||||
// updateDeploymentAndClearRollbackTo sets .spec.rollbackTo to nil and update the input deployment
|
||||
// It is assumed that the caller will have updated the deployment template appropriately (in case
|
||||
// we want to rollback).
|
||||
func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(ctx context.Context, d *apps.Deployment) error {
|
||||
klog.V(4).Infof("Cleans up rollbackTo of deployment %q", d.Name)
|
||||
setRollbackTo(d, nil)
|
||||
_, err := dc.client.AppsV1().Deployments(d.Namespace).Update(ctx, d, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: Remove this when extensions/v1beta1 and apps/v1beta1 Deployment are dropped.
|
||||
func getRollbackTo(d *apps.Deployment) *extensions.RollbackConfig {
|
||||
// Extract the annotation used for round-tripping the deprecated RollbackTo field.
|
||||
revision := d.Annotations[apps.DeprecatedRollbackTo]
|
||||
if revision == "" {
|
||||
return nil
|
||||
}
|
||||
revision64, err := strconv.ParseInt(revision, 10, 64)
|
||||
if err != nil {
|
||||
// If it's invalid, ignore it.
|
||||
return nil
|
||||
}
|
||||
return &extensions.RollbackConfig{
|
||||
Revision: revision64,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Remove this when extensions/v1beta1 and apps/v1beta1 Deployment are dropped.
|
||||
func setRollbackTo(d *apps.Deployment, rollbackTo *extensions.RollbackConfig) {
|
||||
if rollbackTo == nil {
|
||||
delete(d.Annotations, apps.DeprecatedRollbackTo)
|
||||
return
|
||||
}
|
||||
if d.Annotations == nil {
|
||||
d.Annotations = make(map[string]string)
|
||||
}
|
||||
d.Annotations[apps.DeprecatedRollbackTo] = strconv.FormatInt(rollbackTo.Revision, 10)
|
||||
}
|
||||
|
|
@ -0,0 +1,234 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package deployment
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/utils/integer"
|
||||
|
||||
deploymentutil "github.com/openkruise/rollouts/pkg/controller/deployment/util"
|
||||
)
|
||||
|
||||
// rolloutRolling implements the logic for rolling a new replica set.
|
||||
func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
allRSs := append(oldRSs, newRS)
|
||||
|
||||
// Scale up, if we can.
|
||||
scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if scaledUp {
|
||||
// Update DeploymentStatus
|
||||
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
|
||||
}
|
||||
|
||||
// Scale down, if we can.
|
||||
scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, deploymentutil.FilterActiveReplicaSets(oldRSs), newRS, d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if scaledDown {
|
||||
// Update DeploymentStatus
|
||||
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
|
||||
}
|
||||
|
||||
if deploymentutil.DeploymentComplete(d, &d.Status) {
|
||||
if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Sync deployment status
|
||||
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) reconcileNewReplicaSet(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
|
||||
if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) {
|
||||
// Scaling not required.
|
||||
return false, nil
|
||||
}
|
||||
if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) {
|
||||
// Scale down.
|
||||
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, *(deployment.Spec.Replicas), deployment)
|
||||
return scaled, err
|
||||
}
|
||||
newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, newReplicasCount, deployment)
|
||||
return scaled, err
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
|
||||
oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
|
||||
if oldPodsCount == 0 {
|
||||
// Can't scale down further
|
||||
return false, nil
|
||||
}
|
||||
|
||||
allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
|
||||
klog.V(4).Infof("New replica set %s/%s has %d available pods.", newRS.Namespace, newRS.Name, newRS.Status.AvailableReplicas)
|
||||
maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
|
||||
|
||||
// Check if we can scale down. We can scale down in the following 2 cases:
|
||||
// * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further
|
||||
// increase unavailability.
|
||||
// * New replica set has scaled up and it's replicas becomes ready, then we can scale down old replica sets in a further step.
|
||||
//
|
||||
// maxScaledDown := allPodsCount - minAvailable - newReplicaSetPodsUnavailable
|
||||
// take into account not only maxUnavailable and any surge pods that have been created, but also unavailable pods from
|
||||
// the newRS, so that the unavailable pods from the newRS would not make us scale down old replica sets in a further
|
||||
// step(that will increase unavailability).
|
||||
//
|
||||
// Concrete example:
|
||||
//
|
||||
// * 10 replicas
|
||||
// * 2 maxUnavailable (absolute number, not percent)
|
||||
// * 3 maxSurge (absolute number, not percent)
|
||||
//
|
||||
// case 1:
|
||||
// * Deployment is updated, newRS is created with 3 replicas, oldRS is scaled down to 8, and newRS is scaled up to 5.
|
||||
// * The new replica set pods crashloop and never become available.
|
||||
// * allPodsCount is 13. minAvailable is 8. newRSPodsUnavailable is 5.
|
||||
// * A node fails and causes one of the oldRS pods to become unavailable. However, 13 - 8 - 5 = 0, so the oldRS won't be scaled down.
|
||||
// * The user notices the crashloop and does kubectl rollout undo to rollback.
|
||||
// * newRSPodsUnavailable is 1, since we rolled back to the good replica set, so maxScaledDown = 13 - 8 - 1 = 4. 4 of the crashlooping pods will be scaled down.
|
||||
// * The total number of pods will then be 9 and the newRS can be scaled up to 10.
|
||||
//
|
||||
// case 2:
|
||||
// Same example, but pushing a new pod template instead of rolling back (aka "roll over"):
|
||||
// * The new replica set created must start with 0 replicas because allPodsCount is already at 13.
|
||||
// * However, newRSPodsUnavailable would also be 0, so the 2 old replica sets could be scaled down by 5 (13 - 8 - 0), which would then
|
||||
// allow the new replica set to be scaled up by 5.
|
||||
minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
|
||||
newRSUnavailablePodCount := *(newRS.Spec.Replicas) - newRS.Status.AvailableReplicas
|
||||
maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount
|
||||
if maxScaledDown <= 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment
|
||||
// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
|
||||
oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(ctx, oldRSs, deployment, maxScaledDown)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
klog.V(4).Infof("Cleaned up unhealthy replicas from old RSes by %d", cleanupCount)
|
||||
|
||||
// Scale down old replica sets, need check maxUnavailable to ensure we can scale down
|
||||
allRSs = append(oldRSs, newRS)
|
||||
scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(ctx, allRSs, oldRSs, deployment)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
klog.V(4).Infof("Scaled down old RSes of deployment %s by %d", deployment.Name, scaledDownCount)
|
||||
|
||||
totalScaledDown := cleanupCount + scaledDownCount
|
||||
return totalScaledDown > 0, nil
|
||||
}
|
||||
|
||||
// cleanupUnhealthyReplicas will scale down old replica sets with unhealthy replicas, so that all unhealthy replicas will be deleted.
|
||||
func (dc *DeploymentController) cleanupUnhealthyReplicas(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment, maxCleanupCount int32) ([]*apps.ReplicaSet, int32, error) {
|
||||
sort.Sort(deploymentutil.ReplicaSetsByCreationTimestamp(oldRSs))
|
||||
// Safely scale down all old replica sets with unhealthy replicas. Replica set will sort the pods in the order
|
||||
// such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will
|
||||
// been deleted first and won't increase unavailability.
|
||||
totalScaledDown := int32(0)
|
||||
for i, targetRS := range oldRSs {
|
||||
if totalScaledDown >= maxCleanupCount {
|
||||
break
|
||||
}
|
||||
if *(targetRS.Spec.Replicas) == 0 {
|
||||
// cannot scale down this replica set.
|
||||
continue
|
||||
}
|
||||
klog.V(4).Infof("Found %d available pods in old RS %s/%s", targetRS.Status.AvailableReplicas, targetRS.Namespace, targetRS.Name)
|
||||
if *(targetRS.Spec.Replicas) == targetRS.Status.AvailableReplicas {
|
||||
// no unhealthy replicas found, no scaling required.
|
||||
continue
|
||||
}
|
||||
|
||||
scaledDownCount := int32(integer.IntMin(int(maxCleanupCount-totalScaledDown), int(*(targetRS.Spec.Replicas)-targetRS.Status.AvailableReplicas)))
|
||||
newReplicasCount := *(targetRS.Spec.Replicas) - scaledDownCount
|
||||
if newReplicasCount > *(targetRS.Spec.Replicas) {
|
||||
return nil, 0, fmt.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
|
||||
}
|
||||
_, updatedOldRS, err := dc.scaleReplicaSetAndRecordEvent(ctx, targetRS, newReplicasCount, deployment)
|
||||
if err != nil {
|
||||
return nil, totalScaledDown, err
|
||||
}
|
||||
totalScaledDown += scaledDownCount
|
||||
oldRSs[i] = updatedOldRS
|
||||
}
|
||||
return oldRSs, totalScaledDown, nil
|
||||
}
|
||||
|
||||
// scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate".
|
||||
// Need check maxUnavailable to ensure availability
|
||||
func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) {
|
||||
maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
|
||||
|
||||
// Check if we can scale down.
|
||||
minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
|
||||
// Find the number of available pods.
|
||||
availablePodCount := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs)
|
||||
if availablePodCount <= minAvailable {
|
||||
// Cannot scale down.
|
||||
return 0, nil
|
||||
}
|
||||
klog.V(4).Infof("Found %d available pods in deployment %s, scaling down old RSes", availablePodCount, deployment.Name)
|
||||
|
||||
sort.Sort(deploymentutil.ReplicaSetsByCreationTimestamp(oldRSs))
|
||||
|
||||
totalScaledDown := int32(0)
|
||||
totalScaleDownCount := availablePodCount - minAvailable
|
||||
for _, targetRS := range oldRSs {
|
||||
if totalScaledDown >= totalScaleDownCount {
|
||||
// No further scaling required.
|
||||
break
|
||||
}
|
||||
if *(targetRS.Spec.Replicas) == 0 {
|
||||
// cannot scale down this ReplicaSet.
|
||||
continue
|
||||
}
|
||||
// Scale down.
|
||||
scaleDownCount := int32(integer.IntMin(int(*(targetRS.Spec.Replicas)), int(totalScaleDownCount-totalScaledDown)))
|
||||
newReplicasCount := *(targetRS.Spec.Replicas) - scaleDownCount
|
||||
if newReplicasCount > *(targetRS.Spec.Replicas) {
|
||||
return 0, fmt.Errorf("when scaling down old RS, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
|
||||
}
|
||||
_, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, targetRS, newReplicasCount, deployment)
|
||||
if err != nil {
|
||||
return totalScaledDown, err
|
||||
}
|
||||
|
||||
totalScaledDown += scaleDownCount
|
||||
}
|
||||
|
||||
return totalScaledDown, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,546 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package deployment
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
deploymentutil "github.com/openkruise/rollouts/pkg/controller/deployment/util"
|
||||
"github.com/openkruise/rollouts/pkg/util"
|
||||
labelsutil "github.com/openkruise/rollouts/pkg/util/labels"
|
||||
)
|
||||
|
||||
// syncStatusOnly only updates Deployments Status and doesn't take any mutating actions.
|
||||
func (dc *DeploymentController) syncStatusOnly(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
allRSs := append(oldRSs, newRS)
|
||||
return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
|
||||
}
|
||||
|
||||
// sync is responsible for reconciling deployments on scaling events or when they
|
||||
// are paused.
|
||||
func (dc *DeploymentController) sync(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := dc.scale(ctx, d, newRS, oldRSs); err != nil {
|
||||
// If we get an error while trying to scale, the deployment will be requeued
|
||||
// so we can abort this resync
|
||||
return err
|
||||
}
|
||||
|
||||
// Clean up the deployment when it's paused and no rollback is in flight.
|
||||
if d.Spec.Paused && getRollbackTo(d) == nil {
|
||||
if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
allRSs := append(oldRSs, newRS)
|
||||
return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
|
||||
}
|
||||
|
||||
// checkPausedConditions checks if the given deployment is paused or not and adds an appropriate condition.
|
||||
// These conditions are needed so that we won't accidentally report lack of progress for resumed deployments
|
||||
// that were paused for longer than progressDeadlineSeconds.
|
||||
func (dc *DeploymentController) checkPausedConditions(ctx context.Context, d *apps.Deployment) error {
|
||||
if !deploymentutil.HasProgressDeadline(d) {
|
||||
return nil
|
||||
}
|
||||
cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
|
||||
if cond != nil && cond.Reason == deploymentutil.TimedOutReason {
|
||||
// If we have reported lack of progress, do not overwrite it with a paused condition.
|
||||
return nil
|
||||
}
|
||||
pausedCondExists := cond != nil && cond.Reason == deploymentutil.PausedDeployReason
|
||||
|
||||
needsUpdate := false
|
||||
if d.Spec.Paused && !pausedCondExists {
|
||||
condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.PausedDeployReason, "Deployment is paused")
|
||||
deploymentutil.SetDeploymentCondition(&d.Status, *condition)
|
||||
needsUpdate = true
|
||||
} else if !d.Spec.Paused && pausedCondExists {
|
||||
condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.ResumedDeployReason, "Deployment is resumed")
|
||||
deploymentutil.SetDeploymentCondition(&d.Status, *condition)
|
||||
needsUpdate = true
|
||||
}
|
||||
|
||||
if !needsUpdate {
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
// getAllReplicaSetsAndSyncRevision returns all the replica sets for the provided deployment (new and all old), with new RS's and deployment's revision updated.
|
||||
//
|
||||
// rsList should come from getReplicaSetsForDeployment(d).
|
||||
//
|
||||
// 1. Get all old RSes this deployment targets, and calculate the max revision number among them (maxOldV).
|
||||
// 2. Get new RS this deployment targets (whose pod template matches deployment's), and update new RS's revision number to (maxOldV + 1),
|
||||
// only if its revision number is smaller than (maxOldV + 1). If this step failed, we'll update it in the next deployment sync loop.
|
||||
// 3. Copy new RS's revision number to deployment (update deployment's revision). If this step failed, we'll update it in the next deployment sync loop.
|
||||
//
|
||||
// Note that currently the deployment controller is using caches to avoid querying the server for reads.
|
||||
// This may lead to stale reads of replica sets, thus incorrect deployment status.
|
||||
func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, []*apps.ReplicaSet, error) {
|
||||
_, allOldRSs := deploymentutil.FindOldReplicaSets(d, rsList)
|
||||
|
||||
// Get new replica set with the updated revision number
|
||||
newRS, err := dc.getNewReplicaSet(ctx, d, rsList, allOldRSs, createIfNotExisted)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return newRS, allOldRSs, nil
|
||||
}
|
||||
|
||||
const (
|
||||
// limit revision history length to 100 element (~2000 chars)
|
||||
maxRevHistoryLengthInChars = 2000
|
||||
)
|
||||
|
||||
// Returns a replica set that matches the intent of the given deployment. Returns nil if the new replica set doesn't exist yet.
|
||||
// 1. Get existing new RS (the RS that the given deployment targets, whose pod template is the same as deployment's).
|
||||
// 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes.
|
||||
// 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas.
|
||||
// Note that the pod-template-hash will be added to adopted RSes and pods.
|
||||
func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.Deployment, rsList, oldRSs []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, error) {
|
||||
existingNewRS := deploymentutil.FindNewReplicaSet(d, rsList)
|
||||
|
||||
// Calculate the max revision number among all old RSes
|
||||
maxOldRevision := deploymentutil.MaxRevision(oldRSs)
|
||||
// Calculate revision number for this new replica set
|
||||
newRevision := strconv.FormatInt(maxOldRevision+1, 10)
|
||||
|
||||
// Latest replica set exists. We need to sync its annotations (includes copying all but
|
||||
// annotationsToSkip from the parent deployment, and update revision, desiredReplicas,
|
||||
// and maxReplicas) and also update the revision annotation in the deployment with the
|
||||
// latest revision.
|
||||
if existingNewRS != nil {
|
||||
rsCopy := existingNewRS.DeepCopy()
|
||||
|
||||
// Set existing new replica set's annotation
|
||||
annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, newRevision, true, maxRevHistoryLengthInChars)
|
||||
minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds
|
||||
if annotationsUpdated || minReadySecondsNeedsUpdate {
|
||||
rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds
|
||||
return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
|
||||
}
|
||||
|
||||
// Should use the revision in existingNewRS's annotation, since it set by before
|
||||
needsUpdate := deploymentutil.SetDeploymentRevision(d, rsCopy.Annotations[deploymentutil.RevisionAnnotation])
|
||||
// If no other Progressing condition has been recorded and we need to estimate the progress
|
||||
// of this deployment then it is likely that old users started caring about progress. In that
|
||||
// case we need to take into account the first time we noticed their new replica set.
|
||||
cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
|
||||
if deploymentutil.HasProgressDeadline(d) && cond == nil {
|
||||
msg := fmt.Sprintf("Found new replica set %q", rsCopy.Name)
|
||||
condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, deploymentutil.FoundNewRSReason, msg)
|
||||
deploymentutil.SetDeploymentCondition(&d.Status, *condition)
|
||||
needsUpdate = true
|
||||
}
|
||||
|
||||
if needsUpdate {
|
||||
var err error
|
||||
if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return rsCopy, nil
|
||||
}
|
||||
|
||||
if !createIfNotExisted {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// new ReplicaSet does not exist, create one.
|
||||
newRSTemplate := *d.Spec.Template.DeepCopy()
|
||||
podTemplateSpecHash := util.ComputeHash(&newRSTemplate, d.Status.CollisionCount)
|
||||
newRSTemplate.Labels = labelsutil.CloneAndAddLabel(d.Spec.Template.Labels, apps.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
|
||||
// Add podTemplateHash label to selector.
|
||||
newRSSelector := labelsutil.CloneSelectorAndAddLabel(d.Spec.Selector, apps.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
|
||||
|
||||
// Create new ReplicaSet
|
||||
newRS := apps.ReplicaSet{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
// Make the name deterministic, to ensure idempotence
|
||||
Name: d.Name + "-" + podTemplateSpecHash,
|
||||
Namespace: d.Namespace,
|
||||
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(d, controllerKind)},
|
||||
Labels: newRSTemplate.Labels,
|
||||
},
|
||||
Spec: apps.ReplicaSetSpec{
|
||||
Replicas: new(int32),
|
||||
MinReadySeconds: d.Spec.MinReadySeconds,
|
||||
Selector: newRSSelector,
|
||||
Template: newRSTemplate,
|
||||
},
|
||||
}
|
||||
allRSs := append(oldRSs, &newRS)
|
||||
newReplicasCount, err := deploymentutil.NewRSNewReplicas(d, allRSs, &newRS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
*(newRS.Spec.Replicas) = newReplicasCount
|
||||
// Set new replica set's annotation
|
||||
deploymentutil.SetNewReplicaSetAnnotations(d, &newRS, newRevision, false, maxRevHistoryLengthInChars)
|
||||
// Create the new ReplicaSet. If it already exists, then we need to check for possible
|
||||
// hash collisions. If there is any other error, we need to report it in the status of
|
||||
// the Deployment.
|
||||
alreadyExists := false
|
||||
createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(ctx, &newRS, metav1.CreateOptions{})
|
||||
switch {
|
||||
// We may end up hitting this due to a slow cache or a fast resync of the Deployment.
|
||||
case errors.IsAlreadyExists(err):
|
||||
alreadyExists = true
|
||||
|
||||
// Fetch a copy of the ReplicaSet.
|
||||
rs, rsErr := dc.rsLister.ReplicaSets(newRS.Namespace).Get(newRS.Name)
|
||||
if rsErr != nil {
|
||||
return nil, rsErr
|
||||
}
|
||||
|
||||
// If the Deployment owns the ReplicaSet and the ReplicaSet's PodTemplateSpec is semantically
|
||||
// deep equal to the PodTemplateSpec of the Deployment, it's the Deployment's new ReplicaSet.
|
||||
// Otherwise, this is a hash collision and we need to increment the collisionCount field in
|
||||
// the status of the Deployment and requeue to try the creation in the next sync.
|
||||
controllerRef := metav1.GetControllerOf(rs)
|
||||
if controllerRef != nil && controllerRef.UID == d.UID && deploymentutil.EqualIgnoreHash(&d.Spec.Template, &rs.Spec.Template) {
|
||||
createdRS = rs
|
||||
err = nil
|
||||
break
|
||||
}
|
||||
|
||||
// Matching ReplicaSet is not equal - increment the collisionCount in the DeploymentStatus
|
||||
// and requeue the Deployment.
|
||||
if d.Status.CollisionCount == nil {
|
||||
d.Status.CollisionCount = new(int32)
|
||||
}
|
||||
preCollisionCount := *d.Status.CollisionCount
|
||||
*d.Status.CollisionCount++
|
||||
// Update the collisionCount for the Deployment and let it requeue by returning the original
|
||||
// error.
|
||||
_, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
|
||||
if dErr == nil {
|
||||
klog.V(2).Infof("Found a hash collision for deployment %q - bumping collisionCount (%d->%d) to resolve it", d.Name, preCollisionCount, *d.Status.CollisionCount)
|
||||
}
|
||||
return nil, err
|
||||
case errors.HasStatusCause(err, v1.NamespaceTerminatingCause):
|
||||
// if the namespace is terminating, all subsequent creates will fail and we can safely do nothing
|
||||
return nil, err
|
||||
case err != nil:
|
||||
msg := fmt.Sprintf("Failed to create new replica set %q: %v", newRS.Name, err)
|
||||
if deploymentutil.HasProgressDeadline(d) {
|
||||
cond := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionFalse, deploymentutil.FailedRSCreateReason, msg)
|
||||
deploymentutil.SetDeploymentCondition(&d.Status, *cond)
|
||||
// We don't really care about this error at this point, since we have a bigger issue to report.
|
||||
// TODO: Identify which errors are permanent and switch DeploymentIsFailed to take into account
|
||||
// these reasons as well. Related issue: https://github.com/kubernetes/kubernetes/issues/18568
|
||||
_, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
|
||||
}
|
||||
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg)
|
||||
return nil, err
|
||||
}
|
||||
if !alreadyExists && newReplicasCount > 0 {
|
||||
dc.eventRecorder.Eventf(d, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled up replica set %s to %d", createdRS.Name, newReplicasCount)
|
||||
}
|
||||
|
||||
needsUpdate := deploymentutil.SetDeploymentRevision(d, newRevision)
|
||||
if !alreadyExists && deploymentutil.HasProgressDeadline(d) {
|
||||
msg := fmt.Sprintf("Created new replica set %q", createdRS.Name)
|
||||
condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, deploymentutil.NewReplicaSetReason, msg)
|
||||
deploymentutil.SetDeploymentCondition(&d.Status, *condition)
|
||||
needsUpdate = true
|
||||
}
|
||||
if needsUpdate {
|
||||
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
|
||||
}
|
||||
return createdRS, err
|
||||
}
|
||||
|
||||
// scale scales proportionally in order to mitigate risk. Otherwise, scaling up can increase the size
|
||||
// of the new replica set and scaling down can decrease the sizes of the old ones, both of which would
|
||||
// have the effect of hastening the rollout progress, which could produce a higher proportion of unavailable
|
||||
// replicas in the event of a problem with the rolled out template. Should run only on scaling events or
|
||||
// when a deployment is paused and not during the normal rollout process.
|
||||
func (dc *DeploymentController) scale(ctx context.Context, deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error {
|
||||
// If there is only one active replica set then we should scale that up to the full count of the
|
||||
// deployment. If there is no active replica set, then we should scale up the newest replica set.
|
||||
if activeOrLatest := deploymentutil.FindActiveOrLatest(newRS, oldRSs); activeOrLatest != nil {
|
||||
if *(activeOrLatest.Spec.Replicas) == *(deployment.Spec.Replicas) {
|
||||
return nil
|
||||
}
|
||||
_, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, activeOrLatest, *(deployment.Spec.Replicas), deployment)
|
||||
return err
|
||||
}
|
||||
|
||||
// If the new replica set is saturated, old replica sets should be fully scaled down.
|
||||
// This case handles replica set adoption during a saturated new replica set.
|
||||
if deploymentutil.IsSaturated(deployment, newRS) {
|
||||
for _, old := range deploymentutil.FilterActiveReplicaSets(oldRSs) {
|
||||
if _, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, old, 0, deployment); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// There are old replica sets with pods and the new replica set is not saturated.
|
||||
// We need to proportionally scale all replica sets (new and old) in case of a
|
||||
// rolling deployment.
|
||||
if deploymentutil.IsRollingUpdate(deployment) {
|
||||
allRSs := deploymentutil.FilterActiveReplicaSets(append(oldRSs, newRS))
|
||||
allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
|
||||
|
||||
allowedSize := int32(0)
|
||||
if *(deployment.Spec.Replicas) > 0 {
|
||||
allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
|
||||
}
|
||||
|
||||
// Number of additional replicas that can be either added or removed from the total
|
||||
// replicas count. These replicas should be distributed proportionally to the active
|
||||
// replica sets.
|
||||
deploymentReplicasToAdd := allowedSize - allRSsReplicas
|
||||
|
||||
// The additional replicas should be distributed proportionally amongst the active
|
||||
// replica sets from the larger to the smaller in size replica set. Scaling direction
|
||||
// drives what happens in case we are trying to scale replica sets of the same size.
|
||||
// In such a case when scaling up, we should scale up newer replica sets first, and
|
||||
// when scaling down, we should scale down older replica sets first.
|
||||
var scalingOperation string
|
||||
switch {
|
||||
case deploymentReplicasToAdd > 0:
|
||||
sort.Sort(deploymentutil.ReplicaSetsBySizeNewer(allRSs))
|
||||
scalingOperation = "up"
|
||||
|
||||
case deploymentReplicasToAdd < 0:
|
||||
sort.Sort(deploymentutil.ReplicaSetsBySizeOlder(allRSs))
|
||||
scalingOperation = "down"
|
||||
}
|
||||
|
||||
// Iterate over all active replica sets and estimate proportions for each of them.
|
||||
// The absolute value of deploymentReplicasAdded should never exceed the absolute
|
||||
// value of deploymentReplicasToAdd.
|
||||
deploymentReplicasAdded := int32(0)
|
||||
nameToSize := make(map[string]int32)
|
||||
for i := range allRSs {
|
||||
rs := allRSs[i]
|
||||
|
||||
// Estimate proportions if we have replicas to add, otherwise simply populate
|
||||
// nameToSize with the current sizes for each replica set.
|
||||
if deploymentReplicasToAdd != 0 {
|
||||
proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)
|
||||
|
||||
nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion
|
||||
deploymentReplicasAdded += proportion
|
||||
} else {
|
||||
nameToSize[rs.Name] = *(rs.Spec.Replicas)
|
||||
}
|
||||
}
|
||||
|
||||
// Update all replica sets
|
||||
for i := range allRSs {
|
||||
rs := allRSs[i]
|
||||
|
||||
// Add/remove any leftovers to the largest replica set.
|
||||
if i == 0 && deploymentReplicasToAdd != 0 {
|
||||
leftover := deploymentReplicasToAdd - deploymentReplicasAdded
|
||||
nameToSize[rs.Name] = nameToSize[rs.Name] + leftover
|
||||
if nameToSize[rs.Name] < 0 {
|
||||
nameToSize[rs.Name] = 0
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Use transactions when we have them.
|
||||
if _, _, err := dc.scaleReplicaSet(ctx, rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil {
|
||||
// Return as soon as we fail, the deployment is requeued
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(ctx context.Context, rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment) (bool, *apps.ReplicaSet, error) {
|
||||
// No need to scale
|
||||
if *(rs.Spec.Replicas) == newScale {
|
||||
return false, rs, nil
|
||||
}
|
||||
var scalingOperation string
|
||||
if *(rs.Spec.Replicas) < newScale {
|
||||
scalingOperation = "up"
|
||||
} else {
|
||||
scalingOperation = "down"
|
||||
}
|
||||
scaled, newRS, err := dc.scaleReplicaSet(ctx, rs, newScale, deployment, scalingOperation)
|
||||
return scaled, newRS, err
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) scaleReplicaSet(ctx context.Context, rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) {
|
||||
|
||||
sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale
|
||||
|
||||
annotationsNeedUpdate := deploymentutil.ReplicasAnnotationsNeedUpdate(rs, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))
|
||||
|
||||
scaled := false
|
||||
var err error
|
||||
if sizeNeedsUpdate || annotationsNeedUpdate {
|
||||
oldScale := *(rs.Spec.Replicas)
|
||||
rsCopy := rs.DeepCopy()
|
||||
*(rsCopy.Spec.Replicas) = newScale
|
||||
deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))
|
||||
rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
|
||||
if err == nil && sizeNeedsUpdate {
|
||||
scaled = true
|
||||
dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d from %d", scalingOperation, rs.Name, newScale, oldScale)
|
||||
}
|
||||
}
|
||||
return scaled, rs, err
|
||||
}
|
||||
|
||||
// cleanupDeployment is responsible for cleaning up a deployment ie. retains all but the latest N old replica sets
|
||||
// where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept
|
||||
// around by default 1) for historical reasons and 2) for the ability to rollback a deployment.
|
||||
func (dc *DeploymentController) cleanupDeployment(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) error {
|
||||
if !deploymentutil.HasRevisionHistoryLimit(deployment) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Avoid deleting replica set with deletion timestamp set
|
||||
aliveFilter := func(rs *apps.ReplicaSet) bool {
|
||||
return rs != nil && rs.ObjectMeta.DeletionTimestamp == nil
|
||||
}
|
||||
cleanableRSes := deploymentutil.FilterReplicaSets(oldRSs, aliveFilter)
|
||||
|
||||
diff := int32(len(cleanableRSes)) - *deployment.Spec.RevisionHistoryLimit
|
||||
if diff <= 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
sort.Sort(deploymentutil.ReplicaSetsByRevision(cleanableRSes))
|
||||
klog.V(4).Infof("Looking to cleanup old replica sets for deployment %q", deployment.Name)
|
||||
|
||||
for i := int32(0); i < diff; i++ {
|
||||
rs := cleanableRSes[i]
|
||||
// Avoid delete replica set with non-zero replica counts
|
||||
if rs.Status.Replicas != 0 || *(rs.Spec.Replicas) != 0 || rs.Generation > rs.Status.ObservedGeneration || rs.DeletionTimestamp != nil {
|
||||
continue
|
||||
}
|
||||
klog.V(4).Infof("Trying to cleanup replica set %q for deployment %q", rs.Name, deployment.Name)
|
||||
if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
|
||||
// Return error instead of aggregating and continuing DELETEs on the theory
|
||||
// that we may be overloading the api server.
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// syncDeploymentStatus checks if the status is up-to-date and sync it if necessary
|
||||
func (dc *DeploymentController) syncDeploymentStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
|
||||
newStatus := calculateStatus(allRSs, newRS, d)
|
||||
|
||||
if reflect.DeepEqual(d.Status, newStatus) {
|
||||
return nil
|
||||
}
|
||||
|
||||
newDeployment := d
|
||||
newDeployment.Status = newStatus
|
||||
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
// calculateStatus calculates the latest status for the provided deployment by looking into the provided replica sets.
|
||||
func calculateStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) apps.DeploymentStatus {
|
||||
availableReplicas := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs)
|
||||
totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
|
||||
unavailableReplicas := totalReplicas - availableReplicas
|
||||
// If unavailableReplicas is negative, then that means the Deployment has more available replicas running than
|
||||
// desired, e.g. whenever it scales down. In such a case we should simply default unavailableReplicas to zero.
|
||||
if unavailableReplicas < 0 {
|
||||
unavailableReplicas = 0
|
||||
}
|
||||
|
||||
status := apps.DeploymentStatus{
|
||||
// TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value.
|
||||
ObservedGeneration: deployment.Generation,
|
||||
Replicas: deploymentutil.GetActualReplicaCountForReplicaSets(allRSs),
|
||||
UpdatedReplicas: deploymentutil.GetActualReplicaCountForReplicaSets([]*apps.ReplicaSet{newRS}),
|
||||
ReadyReplicas: deploymentutil.GetReadyReplicaCountForReplicaSets(allRSs),
|
||||
AvailableReplicas: availableReplicas,
|
||||
UnavailableReplicas: unavailableReplicas,
|
||||
CollisionCount: deployment.Status.CollisionCount,
|
||||
}
|
||||
|
||||
// Copy conditions one by one so we won't mutate the original object.
|
||||
conditions := deployment.Status.Conditions
|
||||
for i := range conditions {
|
||||
status.Conditions = append(status.Conditions, conditions[i])
|
||||
}
|
||||
|
||||
if availableReplicas >= *(deployment.Spec.Replicas)-deploymentutil.MaxUnavailable(*deployment) {
|
||||
minAvailability := deploymentutil.NewDeploymentCondition(apps.DeploymentAvailable, v1.ConditionTrue, deploymentutil.MinimumReplicasAvailable, "Deployment has minimum availability.")
|
||||
deploymentutil.SetDeploymentCondition(&status, *minAvailability)
|
||||
} else {
|
||||
noMinAvailability := deploymentutil.NewDeploymentCondition(apps.DeploymentAvailable, v1.ConditionFalse, deploymentutil.MinimumReplicasUnavailable, "Deployment does not have minimum availability.")
|
||||
deploymentutil.SetDeploymentCondition(&status, *noMinAvailability)
|
||||
}
|
||||
|
||||
return status
|
||||
}
|
||||
|
||||
// isScalingEvent checks whether the provided deployment has been updated with a scaling event
|
||||
// by looking at the desired-replicas annotation in the active replica sets of the deployment.
|
||||
//
|
||||
// rsList should come from getReplicaSetsForDeployment(d).
|
||||
func (dc *DeploymentController) isScalingEvent(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) (bool, error) {
|
||||
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
allRSs := append(oldRSs, newRS)
|
||||
for _, rs := range deploymentutil.FilterActiveReplicaSets(allRSs) {
|
||||
desired, ok := deploymentutil.GetDesiredReplicasAnnotation(rs)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if desired != *(d.Spec.Replicas) {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
|
@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package feature
|
||||
|
||||
import (
|
||||
|
|
@ -23,13 +24,15 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// PodProbeMarkerGate enable Kruise provide the ability to execute custom Probes.
|
||||
// Note: custom probe execution requires kruise daemon, so currently only traditional Kubelet is supported, not virtual-kubelet.
|
||||
RolloutHistoryGate featuregate.Feature = "RolloutHistoryGate"
|
||||
// RolloutHistoryGate enable recording history for each rolling.
|
||||
RolloutHistoryGate featuregate.Feature = "RolloutHistory"
|
||||
// AdvancedDeploymentGate enable advanced deployment controller.
|
||||
AdvancedDeploymentGate featuregate.Feature = "AdvancedDeployment"
|
||||
)
|
||||
|
||||
var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
|
||||
RolloutHistoryGate: {Default: false, PreRelease: featuregate.Alpha},
|
||||
RolloutHistoryGate: {Default: false, PreRelease: featuregate.Alpha},
|
||||
AdvancedDeploymentGate: {Default: false, PreRelease: featuregate.Alpha},
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@ limitations under the License.
|
|||
package client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
kruiseclientset "github.com/openkruise/kruise-api/client/clientset/versioned"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/discovery"
|
||||
|
|
@ -75,3 +77,14 @@ func NewRegistry(c *rest.Config) error {
|
|||
func GetGenericClient() *GenericClientset {
|
||||
return defaultGenericClient
|
||||
}
|
||||
|
||||
// GetGenericClientWithName returns clientset with given name as user-agent
|
||||
func GetGenericClientWithName(name string) *GenericClientset {
|
||||
if cfg == nil {
|
||||
return nil
|
||||
}
|
||||
newCfg := *cfg
|
||||
newCfg.UserAgent = fmt.Sprintf("%s/%s", cfg.UserAgent, name)
|
||||
clientset, _ := newForConfig(cfg)
|
||||
return clientset
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,124 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package labels
|
||||
|
||||
import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
// Clones the given map and returns a new map with the given key and value added.
|
||||
// Returns the given map, if labelKey is empty.
|
||||
func CloneAndAddLabel(labels map[string]string, labelKey, labelValue string) map[string]string {
|
||||
if labelKey == "" {
|
||||
// Don't need to add a label.
|
||||
return labels
|
||||
}
|
||||
// Clone.
|
||||
newLabels := map[string]string{}
|
||||
for key, value := range labels {
|
||||
newLabels[key] = value
|
||||
}
|
||||
newLabels[labelKey] = labelValue
|
||||
return newLabels
|
||||
}
|
||||
|
||||
// CloneAndRemoveLabel clones the given map and returns a new map with the given key removed.
|
||||
// Returns the given map, if labelKey is empty.
|
||||
func CloneAndRemoveLabel(labels map[string]string, labelKey string) map[string]string {
|
||||
if labelKey == "" {
|
||||
// Don't need to add a label.
|
||||
return labels
|
||||
}
|
||||
// Clone.
|
||||
newLabels := map[string]string{}
|
||||
for key, value := range labels {
|
||||
newLabels[key] = value
|
||||
}
|
||||
delete(newLabels, labelKey)
|
||||
return newLabels
|
||||
}
|
||||
|
||||
// AddLabel returns a map with the given key and value added to the given map.
|
||||
func AddLabel(labels map[string]string, labelKey, labelValue string) map[string]string {
|
||||
if labelKey == "" {
|
||||
// Don't need to add a label.
|
||||
return labels
|
||||
}
|
||||
if labels == nil {
|
||||
labels = make(map[string]string)
|
||||
}
|
||||
labels[labelKey] = labelValue
|
||||
return labels
|
||||
}
|
||||
|
||||
// Clones the given selector and returns a new selector with the given key and value added.
|
||||
// Returns the given selector, if labelKey is empty.
|
||||
func CloneSelectorAndAddLabel(selector *metav1.LabelSelector, labelKey, labelValue string) *metav1.LabelSelector {
|
||||
if labelKey == "" {
|
||||
// Don't need to add a label.
|
||||
return selector
|
||||
}
|
||||
|
||||
// Clone.
|
||||
newSelector := new(metav1.LabelSelector)
|
||||
|
||||
// TODO(madhusudancs): Check if you can use deepCopy_extensions_LabelSelector here.
|
||||
newSelector.MatchLabels = make(map[string]string)
|
||||
if selector.MatchLabels != nil {
|
||||
for key, val := range selector.MatchLabels {
|
||||
newSelector.MatchLabels[key] = val
|
||||
}
|
||||
}
|
||||
newSelector.MatchLabels[labelKey] = labelValue
|
||||
|
||||
if selector.MatchExpressions != nil {
|
||||
newMExps := make([]metav1.LabelSelectorRequirement, len(selector.MatchExpressions))
|
||||
for i, me := range selector.MatchExpressions {
|
||||
newMExps[i].Key = me.Key
|
||||
newMExps[i].Operator = me.Operator
|
||||
if me.Values != nil {
|
||||
newMExps[i].Values = make([]string, len(me.Values))
|
||||
copy(newMExps[i].Values, me.Values)
|
||||
} else {
|
||||
newMExps[i].Values = nil
|
||||
}
|
||||
}
|
||||
newSelector.MatchExpressions = newMExps
|
||||
} else {
|
||||
newSelector.MatchExpressions = nil
|
||||
}
|
||||
|
||||
return newSelector
|
||||
}
|
||||
|
||||
// AddLabelToSelector returns a selector with the given key and value added to the given selector's MatchLabels.
|
||||
func AddLabelToSelector(selector *metav1.LabelSelector, labelKey, labelValue string) *metav1.LabelSelector {
|
||||
if labelKey == "" {
|
||||
// Don't need to add a label.
|
||||
return selector
|
||||
}
|
||||
if selector.MatchLabels == nil {
|
||||
selector.MatchLabels = make(map[string]string)
|
||||
}
|
||||
selector.MatchLabels[labelKey] = labelValue
|
||||
return selector
|
||||
}
|
||||
|
||||
// SelectorHasLabel checks if the given selector contains the given label key in its MatchLabels
|
||||
func SelectorHasLabel(selector *metav1.LabelSelector, labelKey string) bool {
|
||||
return len(selector.MatchLabels[labelKey]) > 0
|
||||
}
|
||||
|
|
@ -0,0 +1,211 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package labels
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func TestCloneAndAddLabel(t *testing.T) {
|
||||
labels := map[string]string{
|
||||
"foo1": "bar1",
|
||||
"foo2": "bar2",
|
||||
"foo3": "bar3",
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
labels map[string]string
|
||||
labelKey string
|
||||
labelValue string
|
||||
want map[string]string
|
||||
}{
|
||||
{
|
||||
labels: labels,
|
||||
want: labels,
|
||||
},
|
||||
{
|
||||
labels: labels,
|
||||
labelKey: "foo4",
|
||||
labelValue: "42",
|
||||
want: map[string]string{
|
||||
"foo1": "bar1",
|
||||
"foo2": "bar2",
|
||||
"foo3": "bar3",
|
||||
"foo4": "42",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
got := CloneAndAddLabel(tc.labels, tc.labelKey, tc.labelValue)
|
||||
if !reflect.DeepEqual(got, tc.want) {
|
||||
t.Errorf("[Add] got %v, want %v", got, tc.want)
|
||||
}
|
||||
// now test the inverse.
|
||||
got_rm := CloneAndRemoveLabel(got, tc.labelKey)
|
||||
if !reflect.DeepEqual(got_rm, tc.labels) {
|
||||
t.Errorf("[RM] got %v, want %v", got_rm, tc.labels)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddLabel(t *testing.T) {
|
||||
labels := map[string]string{
|
||||
"foo1": "bar1",
|
||||
"foo2": "bar2",
|
||||
"foo3": "bar3",
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
labels map[string]string
|
||||
labelKey string
|
||||
labelValue string
|
||||
want map[string]string
|
||||
}{
|
||||
{
|
||||
labels: labels,
|
||||
want: labels,
|
||||
},
|
||||
{
|
||||
labels: labels,
|
||||
labelKey: "foo4",
|
||||
labelValue: "food",
|
||||
want: map[string]string{
|
||||
"foo1": "bar1",
|
||||
"foo2": "bar2",
|
||||
"foo3": "bar3",
|
||||
"foo4": "food",
|
||||
},
|
||||
},
|
||||
{
|
||||
labels: nil,
|
||||
labelKey: "foo4",
|
||||
labelValue: "food",
|
||||
want: map[string]string{
|
||||
"foo4": "food",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
got := AddLabel(tc.labels, tc.labelKey, tc.labelValue)
|
||||
if !reflect.DeepEqual(got, tc.want) {
|
||||
t.Errorf("got %v, want %v", got, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCloneSelectorAndAddLabel(t *testing.T) {
|
||||
labels := map[string]string{
|
||||
"foo1": "bar1",
|
||||
"foo2": "bar2",
|
||||
"foo3": "bar3",
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
labels map[string]string
|
||||
labelKey string
|
||||
labelValue string
|
||||
want map[string]string
|
||||
}{
|
||||
{
|
||||
labels: labels,
|
||||
want: labels,
|
||||
},
|
||||
{
|
||||
labels: labels,
|
||||
labelKey: "foo4",
|
||||
labelValue: "89",
|
||||
want: map[string]string{
|
||||
"foo1": "bar1",
|
||||
"foo2": "bar2",
|
||||
"foo3": "bar3",
|
||||
"foo4": "89",
|
||||
},
|
||||
},
|
||||
{
|
||||
labels: nil,
|
||||
labelKey: "foo4",
|
||||
labelValue: "12",
|
||||
want: map[string]string{
|
||||
"foo4": "12",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
ls_in := metav1.LabelSelector{MatchLabels: tc.labels}
|
||||
ls_out := metav1.LabelSelector{MatchLabels: tc.want}
|
||||
|
||||
got := CloneSelectorAndAddLabel(&ls_in, tc.labelKey, tc.labelValue)
|
||||
if !reflect.DeepEqual(got, &ls_out) {
|
||||
t.Errorf("got %v, want %v", got, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddLabelToSelector(t *testing.T) {
|
||||
labels := map[string]string{
|
||||
"foo1": "bar1",
|
||||
"foo2": "bar2",
|
||||
"foo3": "bar3",
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
labels map[string]string
|
||||
labelKey string
|
||||
labelValue string
|
||||
want map[string]string
|
||||
}{
|
||||
{
|
||||
labels: labels,
|
||||
want: labels,
|
||||
},
|
||||
{
|
||||
labels: labels,
|
||||
labelKey: "foo4",
|
||||
labelValue: "89",
|
||||
want: map[string]string{
|
||||
"foo1": "bar1",
|
||||
"foo2": "bar2",
|
||||
"foo3": "bar3",
|
||||
"foo4": "89",
|
||||
},
|
||||
},
|
||||
{
|
||||
labels: nil,
|
||||
labelKey: "foo4",
|
||||
labelValue: "12",
|
||||
want: map[string]string{
|
||||
"foo4": "12",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
ls_in := metav1.LabelSelector{MatchLabels: tc.labels}
|
||||
ls_out := metav1.LabelSelector{MatchLabels: tc.want}
|
||||
|
||||
got := AddLabelToSelector(&ls_in, tc.labelKey, tc.labelValue)
|
||||
if !reflect.DeepEqual(got, &ls_out) {
|
||||
t.Errorf("got %v, want %v", got, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
package ratelimiter
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
)
|
||||
|
||||
func init() {
|
||||
flag.DurationVar(&baseDelay, "rate-limiter-base-delay", time.Millisecond*5, "The base delay for rate limiter. Defaults 5ms")
|
||||
flag.DurationVar(&maxDelay, "rate-limiter-max-delay", time.Second*1000, "The max delay for rate limiter. Defaults 1000s")
|
||||
flag.IntVar(&qps, "rate-limiter-qps", 10, "The qps for rate limier. Defaults 10")
|
||||
flag.IntVar(&bucketSize, "rate-limiter-bucket-size", 100, "The bucket size for rate limier. Defaults 100")
|
||||
}
|
||||
|
||||
var baseDelay, maxDelay time.Duration
|
||||
var qps, bucketSize int
|
||||
|
||||
func DefaultControllerRateLimiter() workqueue.RateLimiter {
|
||||
return workqueue.NewMaxOfRateLimiter(
|
||||
workqueue.NewItemExponentialFailureRateLimiter(baseDelay, maxDelay),
|
||||
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(qps), bucketSize)},
|
||||
)
|
||||
}
|
||||
Loading…
Reference in New Issue