Merge pull request #88057 from julianvmodesto/remove-deprecated-rolling-update

Remove deprecated rolling-update command

Kubernetes-commit: 23045f9247ab9295c79661832b44efa45e351209
This commit is contained in:
Kubernetes Publisher 2020-02-21 18:31:30 -08:00
commit f5fd42308c
7 changed files with 12 additions and 3293 deletions

2
Godeps/Godeps.json generated
View File

@ -568,7 +568,7 @@
},
{
"ImportPath": "k8s.io/api",
"Rev": "026463abc787"
"Rev": "2db522d7d8a2"
},
{
"ImportPath": "k8s.io/apimachinery",

4
go.mod
View File

@ -36,7 +36,7 @@ require (
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7
gopkg.in/yaml.v2 v2.2.8
gotest.tools v2.2.0+incompatible // indirect
k8s.io/api v0.0.0-20200214081624-026463abc787
k8s.io/api v0.0.0-20200221201225-2db522d7d8a2
k8s.io/apimachinery v0.0.0-20200214081019-2373d029717c
k8s.io/cli-runtime v0.0.0-20200221172330-03707b9714f9
k8s.io/client-go v0.0.0-20200221163115-5e1786105b6f
@ -53,7 +53,7 @@ require (
replace (
golang.org/x/sys => golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // pinned to release-branch.go1.13
golang.org/x/tools => golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7 // pinned to release-branch.go1.13
k8s.io/api => k8s.io/api v0.0.0-20200214081624-026463abc787
k8s.io/api => k8s.io/api v0.0.0-20200221201225-2db522d7d8a2
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20200214081019-2373d029717c
k8s.io/cli-runtime => k8s.io/cli-runtime v0.0.0-20200221172330-03707b9714f9
k8s.io/client-go => k8s.io/client-go v0.0.0-20200221163115-5e1786105b6f

10
go.sum
View File

@ -122,6 +122,7 @@ github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:Fecb
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
@ -151,6 +152,7 @@ github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhn
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
github.com/lithammer/dedent v1.1.0 h1:VNzHMVCBNG1j0fh3OrsFRkVUwStdDArbgBWoPAffktY=
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
@ -161,6 +163,7 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-wordwrap v1.0.0 h1:6GlHJ/LTGMrIJbwgdqdl2eEH8o+Exx/0m8ir9Gns0u4=
github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
@ -181,6 +184,7 @@ github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opencontainers/go-digest v1.0.0-rc1 h1:WzifXhOVOEOuFYOJAW6aQqW0TooG2iki3E3Ii+WN7gQ=
github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
@ -203,15 +207,19 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.5 h1:f0B+LkLX6DtmRH1isoNA9VTtNUK9K8xYd28JNNfOv/s=
github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.3.2 h1:VUFqw5KcqRf7i70GOzW7N+Q7+gxVBkSSqiXB12+JQ4M=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@ -298,7 +306,7 @@ gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.0.0-20200214081624-026463abc787/go.mod h1:brPp6rLV9ZWi2IgXmvCsY7TKw2l27eF4rfCHlyW88ys=
k8s.io/api v0.0.0-20200221201225-2db522d7d8a2/go.mod h1:brPp6rLV9ZWi2IgXmvCsY7TKw2l27eF4rfCHlyW88ys=
k8s.io/apimachinery v0.0.0-20200214081019-2373d029717c/go.mod h1:5X8oEhnd931nEg6/Nkumo00nT6ZsCLp2h7Xwd7Ym6P4=
k8s.io/cli-runtime v0.0.0-20200221172330-03707b9714f9/go.mod h1:jFVnu9CV5hIeRRksJ7Tt7Uo+k7nXrLmHlLDSZT+lSsw=
k8s.io/client-go v0.0.0-20200221163115-5e1786105b6f/go.mod h1:UmUmKsm/QPFM6PX++FkaGSN2sbquEn4rxsBMAvnX+58=

View File

@ -1,865 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rollingupdate
import (
"context"
"fmt"
"io"
"strconv"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
scaleclient "k8s.io/client-go/scale"
"k8s.io/client-go/util/retry"
"k8s.io/kubectl/pkg/scale"
"k8s.io/kubectl/pkg/util"
deploymentutil "k8s.io/kubectl/pkg/util/deployment"
"k8s.io/kubectl/pkg/util/podutils"
"k8s.io/utils/integer"
utilpointer "k8s.io/utils/pointer"
)
func valOrZero(val *int32) int32 {
if val == nil {
return int32(0)
}
return *val
}
const (
kubectlAnnotationPrefix = "kubectl.kubernetes.io/"
sourceIDAnnotation = kubectlAnnotationPrefix + "update-source-id"
desiredReplicasAnnotation = kubectlAnnotationPrefix + "desired-replicas"
originalReplicasAnnotation = kubectlAnnotationPrefix + "original-replicas"
nextControllerAnnotation = kubectlAnnotationPrefix + "next-controller-id"
)
// RollingUpdaterConfig is the configuration for a rolling deployment process.
type RollingUpdaterConfig struct {
// Out is a writer for progress output.
Out io.Writer
// OldRC is an existing controller to be replaced.
OldRc *corev1.ReplicationController
// NewRc is a controller that will take ownership of updated pods (will be
// created if needed).
NewRc *corev1.ReplicationController
// UpdatePeriod is the time to wait between individual pod updates.
UpdatePeriod time.Duration
// Interval is the time to wait between polling controller status after
// update.
Interval time.Duration
// Timeout is the time to wait for controller updates before giving up.
Timeout time.Duration
// MinReadySeconds is the number of seconds to wait after the pods are ready
MinReadySeconds int32
// CleanupPolicy defines the cleanup action to take after the deployment is
// complete.
CleanupPolicy RollingUpdaterCleanupPolicy
// MaxUnavailable is the maximum number of pods that can be unavailable during the update.
// Value can be an absolute number (ex: 5) or a percentage of desired pods (ex: 10%).
// Absolute number is calculated from percentage by rounding up.
// This can not be 0 if MaxSurge is 0.
// By default, a fixed value of 1 is used.
// Example: when this is set to 30%, the old RC can be scaled down to 70% of desired pods
// immediately when the rolling update starts. Once new pods are ready, old RC
// can be scaled down further, followed by scaling up the new RC, ensuring
// that the total number of pods available at all times during the update is at
// least 70% of desired pods.
MaxUnavailable intstr.IntOrString
// MaxSurge is the maximum number of pods that can be scheduled above the desired number of pods.
// Value can be an absolute number (ex: 5) or a percentage of desired pods (ex: 10%).
// This can not be 0 if MaxUnavailable is 0.
// Absolute number is calculated from percentage by rounding up.
// By default, a value of 1 is used.
// Example: when this is set to 30%, the new RC can be scaled up immediately
// when the rolling update starts, such that the total number of old and new pods do not exceed
// 130% of desired pods. Once old pods have been killed, new RC can be scaled up
// further, ensuring that total number of pods running at any time during
// the update is at most 130% of desired pods.
MaxSurge intstr.IntOrString
// OnProgress is invoked if set during each scale cycle, to allow the caller to perform additional logic or
// abort the scale. If an error is returned the cleanup method will not be invoked. The percentage value
// is a synthetic "progress" calculation that represents the approximate percentage completion.
OnProgress func(oldRc, newRc *corev1.ReplicationController, percentage int) error
}
// RollingUpdaterCleanupPolicy is a cleanup action to take after the
// deployment is complete.
type RollingUpdaterCleanupPolicy string
const (
// DeleteRollingUpdateCleanupPolicy means delete the old controller.
DeleteRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Delete"
// PreserveRollingUpdateCleanupPolicy means keep the old controller.
PreserveRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Preserve"
// RenameRollingUpdateCleanupPolicy means delete the old controller, and rename
// the new controller to the name of the old controller.
RenameRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Rename"
)
// RollingUpdater provides methods for updating replicated pods in a predictable,
// fault-tolerant way.
type RollingUpdater struct {
rcClient corev1client.ReplicationControllersGetter
podClient corev1client.PodsGetter
scaleClient scaleclient.ScalesGetter
// Namespace for resources
ns string
// scaleAndWait scales a controller and returns its updated state.
scaleAndWait func(rc *corev1.ReplicationController, retry *scale.RetryParams, wait *scale.RetryParams) (*corev1.ReplicationController, error)
//getOrCreateTargetController gets and validates an existing controller or
//makes a new one.
getOrCreateTargetController func(controller *corev1.ReplicationController, sourceID string) (*corev1.ReplicationController, bool, error)
// cleanup performs post deployment cleanup tasks for newRc and oldRc.
cleanup func(oldRc, newRc *corev1.ReplicationController, config *RollingUpdaterConfig) error
// getReadyPods returns the amount of old and new ready pods.
getReadyPods func(oldRc, newRc *corev1.ReplicationController, minReadySeconds int32) (int32, int32, error)
// nowFn returns the current time used to calculate the minReadySeconds
nowFn func() metav1.Time
}
// NewRollingUpdater creates a RollingUpdater from a client.
func NewRollingUpdater(namespace string, rcClient corev1client.ReplicationControllersGetter, podClient corev1client.PodsGetter, sc scaleclient.ScalesGetter) *RollingUpdater {
updater := &RollingUpdater{
rcClient: rcClient,
podClient: podClient,
scaleClient: sc,
ns: namespace,
}
// Inject real implementations.
updater.scaleAndWait = updater.scaleAndWaitWithScaler
updater.getOrCreateTargetController = updater.getOrCreateTargetControllerWithClient
updater.getReadyPods = updater.readyPods
updater.cleanup = updater.cleanupWithClients
updater.nowFn = func() metav1.Time { return metav1.Now() }
return updater
}
// Update all pods for a ReplicationController (oldRc) by creating a new
// controller (newRc) with 0 replicas, and synchronously scaling oldRc and
// newRc until oldRc has 0 replicas and newRc has the original # of desired
// replicas. Cleanup occurs based on a RollingUpdaterCleanupPolicy.
//
// Each interval, the updater will attempt to make progress however it can
// without violating any availability constraints defined by the config. This
// means the amount scaled up or down each interval will vary based on the
// timeliness of readiness and the updater will always try to make progress,
// even slowly.
//
// If an update from newRc to oldRc is already in progress, we attempt to
// drive it to completion. If an error occurs at any step of the update, the
// error will be returned.
//
// A scaling event (either up or down) is considered progress; if no progress
// is made within the config.Timeout, an error is returned.
//
// TODO: make this handle performing a rollback of a partially completed
// rollout.
func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
out := config.Out
oldRc := config.OldRc
scaleRetryParams := scale.NewRetryParams(config.Interval, config.Timeout)
// Find an existing controller (for continuing an interrupted update) or
// create a new one if necessary.
sourceID := fmt.Sprintf("%s:%s", oldRc.Name, oldRc.UID)
newRc, existed, err := r.getOrCreateTargetController(config.NewRc, sourceID)
if err != nil {
return err
}
if existed {
fmt.Fprintf(out, "Continuing update with existing controller %s.\n", newRc.Name)
} else {
fmt.Fprintf(out, "Created %s\n", newRc.Name)
}
// Extract the desired replica count from the controller.
desiredAnnotation, err := strconv.Atoi(newRc.Annotations[desiredReplicasAnnotation])
if err != nil {
return fmt.Errorf("Unable to parse annotation for %s: %s=%s",
newRc.Name, desiredReplicasAnnotation, newRc.Annotations[desiredReplicasAnnotation])
}
desired := int32(desiredAnnotation)
// Extract the original replica count from the old controller, adding the
// annotation if it doesn't yet exist.
_, hasOriginalAnnotation := oldRc.Annotations[originalReplicasAnnotation]
if !hasOriginalAnnotation {
existing, err := r.rcClient.ReplicationControllers(oldRc.Namespace).Get(context.TODO(), oldRc.Name, metav1.GetOptions{})
if err != nil {
return err
}
originReplicas := strconv.Itoa(int(valOrZero(existing.Spec.Replicas)))
applyUpdate := func(rc *corev1.ReplicationController) {
if rc.Annotations == nil {
rc.Annotations = map[string]string{}
}
rc.Annotations[originalReplicasAnnotation] = originReplicas
}
if oldRc, err = updateRcWithRetries(r.rcClient, existing.Namespace, existing, applyUpdate); err != nil {
return err
}
}
// maxSurge is the maximum scaling increment and maxUnavailable are the maximum pods
// that can be unavailable during a rollout.
maxSurge, maxUnavailable, err := deploymentutil.ResolveFenceposts(&config.MaxSurge, &config.MaxUnavailable, desired)
if err != nil {
return err
}
// Validate maximums.
if desired > 0 && maxUnavailable == 0 && maxSurge == 0 {
return fmt.Errorf("one of maxSurge or maxUnavailable must be specified")
}
// The minimum pods which must remain available throughout the update
// calculated for internal convenience.
minAvailable := int32(integer.IntMax(0, int(desired-maxUnavailable)))
// If the desired new scale is 0, then the max unavailable is necessarily
// the effective scale of the old RC regardless of the configuration
// (equivalent to 100% maxUnavailable).
if desired == 0 {
maxUnavailable = valOrZero(oldRc.Spec.Replicas)
minAvailable = 0
}
fmt.Fprintf(out, "Scaling up %s from %d to %d, scaling down %s from %d to 0 (keep %d pods available, don't exceed %d pods)\n",
newRc.Name, valOrZero(newRc.Spec.Replicas), desired, oldRc.Name, valOrZero(oldRc.Spec.Replicas), minAvailable, desired+maxSurge)
// give a caller incremental notification and allow them to exit early
goal := desired - valOrZero(newRc.Spec.Replicas)
if goal < 0 {
goal = -goal
}
progress := func(complete bool) error {
if config.OnProgress == nil {
return nil
}
progress := desired - valOrZero(newRc.Spec.Replicas)
if progress < 0 {
progress = -progress
}
percentage := 100
if !complete && goal > 0 {
percentage = int((goal - progress) * 100 / goal)
}
return config.OnProgress(oldRc, newRc, percentage)
}
// Scale newRc and oldRc until newRc has the desired number of replicas and
// oldRc has 0 replicas.
progressDeadline := time.Now().UnixNano() + config.Timeout.Nanoseconds()
for valOrZero(newRc.Spec.Replicas) != desired || valOrZero(oldRc.Spec.Replicas) != 0 {
// Store the existing replica counts for progress timeout tracking.
newReplicas := valOrZero(newRc.Spec.Replicas)
oldReplicas := valOrZero(oldRc.Spec.Replicas)
// Scale up as much as possible.
scaledRc, err := r.scaleUp(newRc, oldRc, desired, maxSurge, maxUnavailable, scaleRetryParams, config)
if err != nil {
return err
}
newRc = scaledRc
// notify the caller if necessary
if err := progress(false); err != nil {
return err
}
// Wait between scaling operations for things to settle.
time.Sleep(config.UpdatePeriod)
// Scale down as much as possible.
scaledRc, err = r.scaleDown(newRc, oldRc, desired, minAvailable, maxUnavailable, maxSurge, config)
if err != nil {
return err
}
oldRc = scaledRc
// notify the caller if necessary
if err := progress(false); err != nil {
return err
}
// If we are making progress, continue to advance the progress deadline.
// Otherwise, time out with an error.
progressMade := (valOrZero(newRc.Spec.Replicas) != newReplicas) || (valOrZero(oldRc.Spec.Replicas) != oldReplicas)
if progressMade {
progressDeadline = time.Now().UnixNano() + config.Timeout.Nanoseconds()
} else if time.Now().UnixNano() > progressDeadline {
return fmt.Errorf("timed out waiting for any update progress to be made")
}
}
// notify the caller if necessary
if err := progress(true); err != nil {
return err
}
// Housekeeping and cleanup policy execution.
return r.cleanup(oldRc, newRc, config)
}
// scaleUp scales up newRc to desired by whatever increment is possible given
// the configured surge threshold. scaleUp will safely no-op as necessary when
// it detects redundancy or other relevant conditions.
func (r *RollingUpdater) scaleUp(newRc, oldRc *corev1.ReplicationController, desired, maxSurge, maxUnavailable int32, scaleRetryParams *scale.RetryParams, config *RollingUpdaterConfig) (*corev1.ReplicationController, error) {
// If we're already at the desired, do nothing.
if valOrZero(newRc.Spec.Replicas) == desired {
return newRc, nil
}
// Scale up as far as we can based on the surge limit.
increment := (desired + maxSurge) - (valOrZero(oldRc.Spec.Replicas) + valOrZero(newRc.Spec.Replicas))
// If the old is already scaled down, go ahead and scale all the way up.
if valOrZero(oldRc.Spec.Replicas) == 0 {
increment = desired - valOrZero(newRc.Spec.Replicas)
}
// We can't scale up without violating the surge limit, so do nothing.
if increment <= 0 {
return newRc, nil
}
// Increase the replica count, and deal with fenceposts.
nextVal := valOrZero(newRc.Spec.Replicas) + increment
newRc.Spec.Replicas = &nextVal
if valOrZero(newRc.Spec.Replicas) > desired {
newRc.Spec.Replicas = &desired
}
// Perform the scale-up.
fmt.Fprintf(config.Out, "Scaling %s up to %d\n", newRc.Name, valOrZero(newRc.Spec.Replicas))
scaledRc, err := r.scaleAndWait(newRc, scaleRetryParams, scaleRetryParams)
if err != nil {
return nil, err
}
return scaledRc, nil
}
// scaleDown scales down oldRc to 0 at whatever decrement possible given the
// thresholds defined on the config. scaleDown will safely no-op as necessary
// when it detects redundancy or other relevant conditions.
func (r *RollingUpdater) scaleDown(newRc, oldRc *corev1.ReplicationController, desired, minAvailable, maxUnavailable, maxSurge int32, config *RollingUpdaterConfig) (*corev1.ReplicationController, error) {
// Already scaled down; do nothing.
if valOrZero(oldRc.Spec.Replicas) == 0 {
return oldRc, nil
}
// Get ready pods. We shouldn't block, otherwise in case both old and new
// pods are unavailable then the rolling update process blocks.
// Timeout-wise we are already covered by the progress check.
_, newAvailable, err := r.getReadyPods(oldRc, newRc, config.MinReadySeconds)
if err != nil {
return nil, err
}
// The old controller is considered as part of the total because we want to
// maintain minimum availability even with a volatile old controller.
// Scale down as much as possible while maintaining minimum availability
allPods := valOrZero(oldRc.Spec.Replicas) + valOrZero(newRc.Spec.Replicas)
newUnavailable := valOrZero(newRc.Spec.Replicas) - newAvailable
decrement := allPods - minAvailable - newUnavailable
// The decrement normally shouldn't drop below 0 because the available count
// always starts below the old replica count, but the old replica count can
// decrement due to externalities like pods death in the replica set. This
// will be considered a transient condition; do nothing and try again later
// with new readiness values.
//
// If the most we can scale is 0, it means we can't scale down without
// violating the minimum. Do nothing and try again later when conditions may
// have changed.
if decrement <= 0 {
return oldRc, nil
}
// Reduce the replica count, and deal with fenceposts.
nextOldVal := valOrZero(oldRc.Spec.Replicas) - decrement
oldRc.Spec.Replicas = &nextOldVal
if valOrZero(oldRc.Spec.Replicas) < 0 {
oldRc.Spec.Replicas = utilpointer.Int32Ptr(0)
}
// If the new is already fully scaled and available up to the desired size, go
// ahead and scale old all the way down.
if valOrZero(newRc.Spec.Replicas) == desired && newAvailable == desired {
oldRc.Spec.Replicas = utilpointer.Int32Ptr(0)
}
// Perform the scale-down.
fmt.Fprintf(config.Out, "Scaling %s down to %d\n", oldRc.Name, valOrZero(oldRc.Spec.Replicas))
retryWait := &scale.RetryParams{
Interval: config.Interval,
Timeout: config.Timeout,
}
scaledRc, err := r.scaleAndWait(oldRc, retryWait, retryWait)
if err != nil {
return nil, err
}
return scaledRc, nil
}
// scalerScaleAndWait scales a controller using a Scaler and a real client.
func (r *RollingUpdater) scaleAndWaitWithScaler(rc *corev1.ReplicationController, retry *scale.RetryParams, wait *scale.RetryParams) (*corev1.ReplicationController, error) {
scaler := scale.NewScaler(r.scaleClient)
if err := scaler.Scale(rc.Namespace, rc.Name, uint(valOrZero(rc.Spec.Replicas)), &scale.ScalePrecondition{Size: -1, ResourceVersion: ""}, retry, wait, corev1.SchemeGroupVersion.WithResource("replicationcontrollers")); err != nil {
return nil, err
}
return r.rcClient.ReplicationControllers(rc.Namespace).Get(context.TODO(), rc.Name, metav1.GetOptions{})
}
// readyPods returns the old and new ready counts for their pods.
// If a pod is observed as being ready, it's considered ready even
// if it later becomes notReady.
func (r *RollingUpdater) readyPods(oldRc, newRc *corev1.ReplicationController, minReadySeconds int32) (int32, int32, error) {
controllers := []*corev1.ReplicationController{oldRc, newRc}
oldReady := int32(0)
newReady := int32(0)
if r.nowFn == nil {
r.nowFn = func() metav1.Time { return metav1.Now() }
}
for i := range controllers {
controller := controllers[i]
selector := labels.Set(controller.Spec.Selector).AsSelector()
options := metav1.ListOptions{LabelSelector: selector.String()}
pods, err := r.podClient.Pods(controller.Namespace).List(context.TODO(), options)
if err != nil {
return 0, 0, err
}
for _, v1Pod := range pods.Items {
// Do not count deleted pods as ready
if v1Pod.DeletionTimestamp != nil {
continue
}
if !podutils.IsPodAvailable(&v1Pod, minReadySeconds, r.nowFn()) {
continue
}
switch controller.Name {
case oldRc.Name:
oldReady++
case newRc.Name:
newReady++
}
}
}
return oldReady, newReady, nil
}
// getOrCreateTargetControllerWithClient looks for an existing controller with
// sourceID. If found, the existing controller is returned with true
// indicating that the controller already exists. If the controller isn't
// found, a new one is created and returned along with false indicating the
// controller was created.
//
// Existing controllers are validated to ensure their sourceIDAnnotation
// matches sourceID; if there's a mismatch, an error is returned.
func (r *RollingUpdater) getOrCreateTargetControllerWithClient(controller *corev1.ReplicationController, sourceID string) (*corev1.ReplicationController, bool, error) {
existingRc, err := r.existingController(controller)
if err != nil {
if !errors.IsNotFound(err) {
// There was an error trying to find the controller; don't assume we
// should create it.
return nil, false, err
}
if valOrZero(controller.Spec.Replicas) <= 0 {
return nil, false, fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %d", controller.Name, valOrZero(controller.Spec.Replicas))
}
// The controller wasn't found, so create it.
if controller.Annotations == nil {
controller.Annotations = map[string]string{}
}
controller.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", valOrZero(controller.Spec.Replicas))
controller.Annotations[sourceIDAnnotation] = sourceID
controller.Spec.Replicas = utilpointer.Int32Ptr(0)
newRc, err := r.rcClient.ReplicationControllers(r.ns).Create(context.TODO(), controller, metav1.CreateOptions{})
return newRc, false, err
}
// Validate and use the existing controller.
annotations := existingRc.Annotations
source := annotations[sourceIDAnnotation]
_, ok := annotations[desiredReplicasAnnotation]
if source != sourceID || !ok {
return nil, false, fmt.Errorf("Missing/unexpected annotations for controller %s, expected %s : %s", controller.Name, sourceID, annotations)
}
return existingRc, true, nil
}
// existingController verifies if the controller already exists
func (r *RollingUpdater) existingController(controller *corev1.ReplicationController) (*corev1.ReplicationController, error) {
// without rc name but generate name, there's no existing rc
if len(controller.Name) == 0 && len(controller.GenerateName) > 0 {
return nil, errors.NewNotFound(corev1.Resource("replicationcontrollers"), controller.Name)
}
// controller name is required to get rc back
return r.rcClient.ReplicationControllers(controller.Namespace).Get(context.TODO(), controller.Name, metav1.GetOptions{})
}
// cleanupWithClients performs cleanup tasks after the rolling update. Update
// process related annotations are removed from oldRc and newRc. The
// CleanupPolicy on config is executed.
func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *corev1.ReplicationController, config *RollingUpdaterConfig) error {
// Clean up annotations
var err error
newRc, err = r.rcClient.ReplicationControllers(r.ns).Get(context.TODO(), newRc.Name, metav1.GetOptions{})
if err != nil {
return err
}
applyUpdate := func(rc *corev1.ReplicationController) {
delete(rc.Annotations, sourceIDAnnotation)
delete(rc.Annotations, desiredReplicasAnnotation)
}
if newRc, err = updateRcWithRetries(r.rcClient, r.ns, newRc, applyUpdate); err != nil {
return err
}
if err = wait.Poll(config.Interval, config.Timeout, controllerHasDesiredReplicas(r.rcClient, newRc)); err != nil {
return err
}
newRc, err = r.rcClient.ReplicationControllers(r.ns).Get(context.TODO(), newRc.Name, metav1.GetOptions{})
if err != nil {
return err
}
switch config.CleanupPolicy {
case DeleteRollingUpdateCleanupPolicy:
// delete old rc
fmt.Fprintf(config.Out, "Update succeeded. Deleting %s\n", oldRc.Name)
return r.rcClient.ReplicationControllers(r.ns).Delete(context.TODO(), oldRc.Name, nil)
case RenameRollingUpdateCleanupPolicy:
// delete old rc
fmt.Fprintf(config.Out, "Update succeeded. Deleting old controller: %s\n", oldRc.Name)
if err := r.rcClient.ReplicationControllers(r.ns).Delete(context.TODO(), oldRc.Name, nil); err != nil {
return err
}
fmt.Fprintf(config.Out, "Renaming %s to %s\n", newRc.Name, oldRc.Name)
return Rename(r.rcClient, newRc, oldRc.Name)
case PreserveRollingUpdateCleanupPolicy:
return nil
default:
return nil
}
}
func Rename(c corev1client.ReplicationControllersGetter, rc *corev1.ReplicationController, newName string) error {
oldName := rc.Name
rc.Name = newName
rc.ResourceVersion = ""
// First delete the oldName RC and orphan its pods.
policy := metav1.DeletePropagationOrphan
err := c.ReplicationControllers(rc.Namespace).Delete(context.TODO(), oldName, &metav1.DeleteOptions{PropagationPolicy: &policy})
if err != nil && !errors.IsNotFound(err) {
return err
}
err = wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
_, err := c.ReplicationControllers(rc.Namespace).Get(context.TODO(), oldName, metav1.GetOptions{})
if err == nil {
return false, nil
} else if errors.IsNotFound(err) {
return true, nil
} else {
return false, err
}
})
if err != nil {
return err
}
// Then create the same RC with the new name.
_, err = c.ReplicationControllers(rc.Namespace).Create(context.TODO(), rc, metav1.CreateOptions{})
return err
}
func LoadExistingNextReplicationController(c corev1client.ReplicationControllersGetter, namespace, newName string) (*corev1.ReplicationController, error) {
if len(newName) == 0 {
return nil, nil
}
newRc, err := c.ReplicationControllers(namespace).Get(context.TODO(), newName, metav1.GetOptions{})
if err != nil && errors.IsNotFound(err) {
return nil, nil
}
return newRc, err
}
type NewControllerConfig struct {
Namespace string
OldName, NewName string
Image string
Container string
DeploymentKey string
PullPolicy corev1.PullPolicy
}
func CreateNewControllerFromCurrentController(rcClient corev1client.ReplicationControllersGetter, codec runtime.Codec, cfg *NewControllerConfig) (*corev1.ReplicationController, error) {
containerIndex := 0
// load the old RC into the "new" RC
newRc, err := rcClient.ReplicationControllers(cfg.Namespace).Get(context.TODO(), cfg.OldName, metav1.GetOptions{})
if err != nil {
return nil, err
}
if len(cfg.Container) != 0 {
containerFound := false
for i, c := range newRc.Spec.Template.Spec.Containers {
if c.Name == cfg.Container {
containerIndex = i
containerFound = true
break
}
}
if !containerFound {
return nil, fmt.Errorf("container %s not found in pod", cfg.Container)
}
}
if len(newRc.Spec.Template.Spec.Containers) > 1 && len(cfg.Container) == 0 {
return nil, fmt.Errorf("must specify container to update when updating a multi-container pod")
}
if len(newRc.Spec.Template.Spec.Containers) == 0 {
return nil, fmt.Errorf("pod has no containers! (%v)", newRc)
}
newRc.Spec.Template.Spec.Containers[containerIndex].Image = cfg.Image
if len(cfg.PullPolicy) != 0 {
newRc.Spec.Template.Spec.Containers[containerIndex].ImagePullPolicy = cfg.PullPolicy
}
newHash, err := util.HashObject(newRc, codec)
if err != nil {
return nil, err
}
if len(cfg.NewName) == 0 {
cfg.NewName = fmt.Sprintf("%s-%s", newRc.Name, newHash)
}
newRc.Name = cfg.NewName
newRc.Spec.Selector[cfg.DeploymentKey] = newHash
newRc.Spec.Template.Labels[cfg.DeploymentKey] = newHash
// Clear resource version after hashing so that identical updates get different hashes.
newRc.ResourceVersion = ""
return newRc, nil
}
func AbortRollingUpdate(c *RollingUpdaterConfig) error {
// Swap the controllers
tmp := c.OldRc
c.OldRc = c.NewRc
c.NewRc = tmp
if c.NewRc.Annotations == nil {
c.NewRc.Annotations = map[string]string{}
}
c.NewRc.Annotations[sourceIDAnnotation] = fmt.Sprintf("%s:%s", c.OldRc.Name, c.OldRc.UID)
// Use the original value since the replica count change from old to new
// could be asymmetric. If we don't know the original count, we can't safely
// roll back to a known good size.
originalSize, foundOriginal := tmp.Annotations[originalReplicasAnnotation]
if !foundOriginal {
return fmt.Errorf("couldn't find original replica count of %q", tmp.Name)
}
fmt.Fprintf(c.Out, "Setting %q replicas to %s\n", c.NewRc.Name, originalSize)
c.NewRc.Annotations[desiredReplicasAnnotation] = originalSize
c.CleanupPolicy = DeleteRollingUpdateCleanupPolicy
return nil
}
func GetNextControllerAnnotation(rc *corev1.ReplicationController) (string, bool) {
res, found := rc.Annotations[nextControllerAnnotation]
return res, found
}
func SetNextControllerAnnotation(rc *corev1.ReplicationController, name string) {
if rc.Annotations == nil {
rc.Annotations = map[string]string{}
}
rc.Annotations[nextControllerAnnotation] = name
}
func UpdateExistingReplicationController(rcClient corev1client.ReplicationControllersGetter, podClient corev1client.PodsGetter, oldRc *corev1.ReplicationController, namespace, newName, deploymentKey, deploymentValue string, out io.Writer) (*corev1.ReplicationController, error) {
if _, found := oldRc.Spec.Selector[deploymentKey]; !found {
SetNextControllerAnnotation(oldRc, newName)
return AddDeploymentKeyToReplicationController(oldRc, rcClient, podClient, deploymentKey, deploymentValue, namespace, out)
}
// If we didn't need to update the controller for the deployment key, we still need to write
// the "next" controller.
applyUpdate := func(rc *corev1.ReplicationController) {
SetNextControllerAnnotation(rc, newName)
}
return updateRcWithRetries(rcClient, namespace, oldRc, applyUpdate)
}
func AddDeploymentKeyToReplicationController(oldRc *corev1.ReplicationController, rcClient corev1client.ReplicationControllersGetter, podClient corev1client.PodsGetter, deploymentKey, deploymentValue, namespace string, out io.Writer) (*corev1.ReplicationController, error) {
var err error
// First, update the template label. This ensures that any newly created pods will have the new label
applyUpdate := func(rc *corev1.ReplicationController) {
if rc.Spec.Template.Labels == nil {
rc.Spec.Template.Labels = map[string]string{}
}
rc.Spec.Template.Labels[deploymentKey] = deploymentValue
}
if oldRc, err = updateRcWithRetries(rcClient, namespace, oldRc, applyUpdate); err != nil {
return nil, err
}
// Update all pods managed by the rc to have the new hash label, so they are correctly adopted
// TODO: extract the code from the label command and re-use it here.
selector := labels.SelectorFromSet(oldRc.Spec.Selector)
options := metav1.ListOptions{LabelSelector: selector.String()}
podList, err := podClient.Pods(namespace).List(context.TODO(), options)
if err != nil {
return nil, err
}
for ix := range podList.Items {
pod := &podList.Items[ix]
applyUpdate := func(p *corev1.Pod) {
if p.Labels == nil {
p.Labels = map[string]string{
deploymentKey: deploymentValue,
}
} else {
p.Labels[deploymentKey] = deploymentValue
}
}
if pod, err = updatePodWithRetries(podClient, namespace, pod, applyUpdate); err != nil {
return nil, err
}
}
if oldRc.Spec.Selector == nil {
oldRc.Spec.Selector = map[string]string{}
}
applyUpdate = func(rc *corev1.ReplicationController) {
rc.Spec.Selector[deploymentKey] = deploymentValue
}
// Update the selector of the rc so it manages all the pods we updated above
if oldRc, err = updateRcWithRetries(rcClient, namespace, oldRc, applyUpdate); err != nil {
return nil, err
}
// Clean up any orphaned pods that don't have the new label, this can happen if the rc manager
// doesn't see the update to its pod template and creates a new pod with the old labels after
// we've finished re-adopting existing pods to the rc.
selector = labels.SelectorFromSet(oldRc.Spec.Selector)
options = metav1.ListOptions{LabelSelector: selector.String()}
if podList, err = podClient.Pods(namespace).List(context.TODO(), options); err != nil {
return nil, err
}
for ix := range podList.Items {
pod := &podList.Items[ix]
if value, found := pod.Labels[deploymentKey]; !found || value != deploymentValue {
if err := podClient.Pods(namespace).Delete(context.TODO(), pod.Name, nil); err != nil {
return nil, err
}
}
}
return oldRc, nil
}
type updateRcFunc func(controller *corev1.ReplicationController)
// updateRcWithRetries retries updating the given rc on conflict with the following steps:
// 1. Get latest resource
// 2. applyUpdate
// 3. Update the resource
func updateRcWithRetries(rcClient corev1client.ReplicationControllersGetter, namespace string, rc *corev1.ReplicationController, applyUpdate updateRcFunc) (*corev1.ReplicationController, error) {
// Deep copy the rc in case we failed on Get during retry loop
oldRc := rc.DeepCopy()
err := retry.RetryOnConflict(retry.DefaultBackoff, func() (e error) {
// Apply the update, then attempt to push it to the apiserver.
applyUpdate(rc)
if rc, e = rcClient.ReplicationControllers(namespace).Update(context.TODO(), rc, metav1.UpdateOptions{}); e == nil {
// rc contains the latest controller post update
return
}
updateErr := e
// Update the controller with the latest resource version, if the update failed we
// can't trust rc so use oldRc.Name.
if rc, e = rcClient.ReplicationControllers(namespace).Get(context.TODO(), oldRc.Name, metav1.GetOptions{}); e != nil {
// The Get failed: Value in rc cannot be trusted.
rc = oldRc
}
// Only return the error from update
return updateErr
})
// If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned
// controller contains the applied update.
return rc, err
}
type updatePodFunc func(controller *corev1.Pod)
// updatePodWithRetries retries updating the given pod on conflict with the following steps:
// 1. Get latest resource
// 2. applyUpdate
// 3. Update the resource
func updatePodWithRetries(podClient corev1client.PodsGetter, namespace string, pod *corev1.Pod, applyUpdate updatePodFunc) (*corev1.Pod, error) {
// Deep copy the pod in case we failed on Get during retry loop
oldPod := pod.DeepCopy()
err := retry.RetryOnConflict(retry.DefaultBackoff, func() (e error) {
// Apply the update, then attempt to push it to the apiserver.
applyUpdate(pod)
if pod, e = podClient.Pods(namespace).Update(context.TODO(), pod, metav1.UpdateOptions{}); e == nil {
return
}
updateErr := e
if pod, e = podClient.Pods(namespace).Get(context.TODO(), oldPod.Name, metav1.GetOptions{}); e != nil {
pod = oldPod
}
// Only return the error from update
return updateErr
})
// If the error is non-nil the returned pod cannot be trusted, if it is nil, the returned
// controller contains the applied update.
return pod, err
}
func FindSourceController(r corev1client.ReplicationControllersGetter, namespace, name string) (*corev1.ReplicationController, error) {
list, err := r.ReplicationControllers(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}
for ix := range list.Items {
rc := &list.Items[ix]
if rc.Annotations != nil && strings.HasPrefix(rc.Annotations[sourceIDAnnotation], name) {
return rc, nil
}
}
return nil, fmt.Errorf("couldn't find a replication controller with source id == %s/%s", namespace, name)
}
// controllerHasDesiredReplicas returns a condition that will be true if and only if
// the desired replica count for a controller's ReplicaSelector equals the Replicas count.
func controllerHasDesiredReplicas(rcClient corev1client.ReplicationControllersGetter, controller *corev1.ReplicationController) wait.ConditionFunc {
// If we're given a controller where the status lags the spec, it either means that the controller is stale,
// or that the rc manager hasn't noticed the update yet. Polling status.Replicas is not safe in the latter case.
desiredGeneration := controller.Generation
return func() (bool, error) {
ctrl, err := rcClient.ReplicationControllers(controller.Namespace).Get(context.TODO(), controller.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
// There's a chance a concurrent update modifies the Spec.Replicas causing this check to pass,
// or, after this check has passed, a modification causes the rc manager to create more pods.
// This will not be an issue once we've implemented graceful delete for rcs, but till then
// concurrent stop operations on the same rc might have unintended side effects.
return ctrl.Status.ObservedGeneration >= desiredGeneration && ctrl.Status.Replicas == valOrZero(ctrl.Spec.Replicas), nil
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,480 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rollingupdate
import (
"bytes"
"context"
"fmt"
"time"
"github.com/spf13/cobra"
"k8s.io/klog"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/printers"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes"
scaleclient "k8s.io/client-go/scale"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/scheme"
"k8s.io/kubectl/pkg/util"
"k8s.io/kubectl/pkg/util/i18n"
"k8s.io/kubectl/pkg/util/templates"
"k8s.io/kubectl/pkg/validation"
)
var (
rollingUpdateLong = templates.LongDesc(i18n.T(`
Perform a rolling update of the given ReplicationController.
Replaces the specified replication controller with a new replication controller by updating one pod at a time to use the
new PodTemplate. The new-controller.json must specify the same namespace as the
existing replication controller and overwrite at least one (common) label in its replicaSelector.
![Workflow](http://kubernetes.io/images/docs/kubectl_rollingupdate.svg)`))
rollingUpdateExample = templates.Examples(i18n.T(`
# Update pods of frontend-v1 using new replication controller data in frontend-v2.json.
kubectl rolling-update frontend-v1 -f frontend-v2.json
# Update pods of frontend-v1 using JSON data passed into stdin.
cat frontend-v2.json | kubectl rolling-update frontend-v1 -f -
# Update the pods of frontend-v1 to frontend-v2 by just changing the image, and switching the
# name of the replication controller.
kubectl rolling-update frontend-v1 frontend-v2 --image=image:v2
# Update the pods of frontend by just changing the image, and keeping the old name.
kubectl rolling-update frontend --image=image:v2
# Abort and reverse an existing rollout in progress (from frontend-v1 to frontend-v2).
kubectl rolling-update frontend-v1 frontend-v2 --rollback`))
)
const (
updatePeriod = 1 * time.Minute
timeout = 5 * time.Minute
pollInterval = 3 * time.Second
)
type RollingUpdateOptions struct {
FilenameOptions *resource.FilenameOptions
OldName string
KeepOldName bool
DeploymentKey string
Image string
Container string
PullPolicy string
Rollback bool
Period time.Duration
Timeout time.Duration
Interval time.Duration
DryRun bool
OutputFormat string
Namespace string
EnforceNamespace bool
ScaleClient scaleclient.ScalesGetter
ClientSet kubernetes.Interface
Builder *resource.Builder
ShouldValidate bool
Validator func(bool) (validation.Schema, error)
FindNewName func(*corev1.ReplicationController) string
PrintFlags *genericclioptions.PrintFlags
ToPrinter func(string) (printers.ResourcePrinter, error)
genericclioptions.IOStreams
}
func NewRollingUpdateOptions(streams genericclioptions.IOStreams) *RollingUpdateOptions {
return &RollingUpdateOptions{
PrintFlags: genericclioptions.NewPrintFlags("rolling updated").WithTypeSetter(scheme.Scheme),
FilenameOptions: &resource.FilenameOptions{},
DeploymentKey: "deployment",
Timeout: timeout,
Interval: pollInterval,
Period: updatePeriod,
IOStreams: streams,
}
}
func NewCmdRollingUpdate(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
o := NewRollingUpdateOptions(ioStreams)
cmd := &cobra.Command{
Use: "rolling-update OLD_CONTROLLER_NAME ([NEW_CONTROLLER_NAME] --image=NEW_CONTAINER_IMAGE | -f NEW_CONTROLLER_SPEC)",
DisableFlagsInUseLine: true,
Short: "Perform a rolling update. This command is deprecated, use rollout instead.",
Long: rollingUpdateLong,
Example: rollingUpdateExample,
Deprecated: `use "rollout" instead`,
Hidden: true,
Run: func(cmd *cobra.Command, args []string) {
cmdutil.CheckErr(o.Complete(f, cmd, args))
cmdutil.CheckErr(o.Validate(cmd, args))
cmdutil.CheckErr(o.Run())
},
}
o.PrintFlags.AddFlags(cmd)
cmd.Flags().DurationVar(&o.Period, "update-period", o.Period, `Time to wait between updating pods. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".`)
cmd.Flags().DurationVar(&o.Interval, "poll-interval", o.Interval, `Time delay between polling for replication controller status after the update. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".`)
cmd.Flags().DurationVar(&o.Timeout, "timeout", o.Timeout, `Max time to wait for a replication controller to update before giving up. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".`)
usage := "Filename or URL to file to use to create the new replication controller."
cmdutil.AddJsonFilenameFlag(cmd.Flags(), &o.FilenameOptions.Filenames, usage)
cmd.Flags().StringVar(&o.Image, "image", o.Image, i18n.T("Image to use for upgrading the replication controller. Must be distinct from the existing image (either new image or new image tag). Can not be used with --filename/-f"))
cmd.Flags().StringVar(&o.DeploymentKey, "deployment-label-key", o.DeploymentKey, i18n.T("The key to use to differentiate between two different controllers, default 'deployment'. Only relevant when --image is specified, ignored otherwise"))
cmd.Flags().StringVar(&o.Container, "container", o.Container, i18n.T("Container name which will have its image upgraded. Only relevant when --image is specified, ignored otherwise. Required when using --image on a multi-container pod"))
cmd.Flags().StringVar(&o.PullPolicy, "image-pull-policy", o.PullPolicy, i18n.T("Explicit policy for when to pull container images. Required when --image is same as existing image, ignored otherwise."))
cmd.Flags().BoolVar(&o.Rollback, "rollback", o.Rollback, "If true, this is a request to abort an existing rollout that is partially rolled out. It effectively reverses current and next and runs a rollout")
cmdutil.AddDryRunFlag(cmd)
cmdutil.AddValidateFlags(cmd)
return cmd
}
func validateArguments(cmd *cobra.Command, filenames, args []string) error {
deploymentKey := cmdutil.GetFlagString(cmd, "deployment-label-key")
image := cmdutil.GetFlagString(cmd, "image")
rollback := cmdutil.GetFlagBool(cmd, "rollback")
errors := []error{}
if len(deploymentKey) == 0 {
errors = append(errors, cmdutil.UsageErrorf(cmd, "--deployment-label-key can not be empty"))
}
if len(filenames) > 1 {
errors = append(errors, cmdutil.UsageErrorf(cmd, "May only specify a single filename for new controller"))
}
if !rollback {
if len(filenames) == 0 && len(image) == 0 {
errors = append(errors, cmdutil.UsageErrorf(cmd, "Must specify --filename or --image for new controller"))
} else if len(filenames) != 0 && len(image) != 0 {
errors = append(errors, cmdutil.UsageErrorf(cmd, "--filename and --image can not both be specified"))
}
} else {
if len(filenames) != 0 || len(image) != 0 {
errors = append(errors, cmdutil.UsageErrorf(cmd, "Don't specify --filename or --image on rollback"))
}
}
if len(args) < 1 {
errors = append(errors, cmdutil.UsageErrorf(cmd, "Must specify the controller to update"))
}
return utilerrors.NewAggregate(errors)
}
func (o *RollingUpdateOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []string) error {
if len(args) > 0 {
o.OldName = args[0]
}
o.DryRun = getClientSideDryRun(cmd)
o.OutputFormat = cmdutil.GetFlagString(cmd, "output")
o.KeepOldName = len(args) == 1
o.ShouldValidate = cmdutil.GetFlagBool(cmd, "validate")
o.Validator = f.Validator
o.FindNewName = func(obj *corev1.ReplicationController) string {
return findNewName(args, obj)
}
var err error
o.Namespace, o.EnforceNamespace, err = f.ToRawKubeConfigLoader().Namespace()
if err != nil {
return err
}
o.ScaleClient, err = cmdutil.ScaleClientFn(f)
if err != nil {
return err
}
o.ClientSet, err = f.KubernetesClientSet()
if err != nil {
return err
}
o.Builder = f.NewBuilder()
o.ToPrinter = func(operation string) (printers.ResourcePrinter, error) {
o.PrintFlags.NamePrintFlags.Operation = operation
if o.DryRun {
o.PrintFlags.Complete("%s (dry run)")
}
return o.PrintFlags.ToPrinter()
}
return nil
}
func (o *RollingUpdateOptions) Validate(cmd *cobra.Command, args []string) error {
return validateArguments(cmd, o.FilenameOptions.Filenames, args)
}
func (o *RollingUpdateOptions) Run() error {
filename := ""
if len(o.FilenameOptions.Filenames) > 0 {
filename = o.FilenameOptions.Filenames[0]
}
coreClient := o.ClientSet.CoreV1()
var newRc *corev1.ReplicationController
// fetch rc
oldRc, err := coreClient.ReplicationControllers(o.Namespace).Get(context.TODO(), o.OldName, metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) || len(o.Image) == 0 || !o.KeepOldName {
return err
}
// We're in the middle of a rename, look for an RC with a source annotation of oldName
newRc, err := FindSourceController(coreClient, o.Namespace, o.OldName)
if err != nil {
return err
}
return Rename(coreClient, newRc, o.OldName)
}
var replicasDefaulted bool
if len(filename) != 0 {
schema, err := o.Validator(o.ShouldValidate)
if err != nil {
return err
}
request := o.Builder.
Unstructured().
Schema(schema).
NamespaceParam(o.Namespace).DefaultNamespace().
FilenameParam(o.EnforceNamespace, &resource.FilenameOptions{Recursive: false, Filenames: []string{filename}}).
Flatten().
Do()
infos, err := request.Infos()
if err != nil {
return err
}
// Handle filename input from stdin.
if len(infos) > 1 {
return fmt.Errorf("%s specifies multiple items", filename)
}
if len(infos) == 0 {
return fmt.Errorf("please make sure %s exists and is not empty", filename)
}
uncastVersionedObj, err := scheme.Scheme.ConvertToVersion(infos[0].Object, corev1.SchemeGroupVersion)
if err != nil {
klog.V(4).Infof("Object %T is not a ReplicationController", infos[0].Object)
return fmt.Errorf("%s contains a %v not a ReplicationController", filename, infos[0].Object.GetObjectKind().GroupVersionKind())
}
switch t := uncastVersionedObj.(type) {
case *corev1.ReplicationController:
replicasDefaulted = t.Spec.Replicas == nil
newRc = t
}
if newRc == nil {
klog.V(4).Infof("Object %T is not a ReplicationController", infos[0].Object)
return fmt.Errorf("%s contains a %v not a ReplicationController", filename, infos[0].Object.GetObjectKind().GroupVersionKind())
}
}
// If the --image option is specified, we need to create a new rc with at least one different selector
// than the old rc. This selector is the hash of the rc, with a suffix to provide uniqueness for
// same-image updates.
if len(o.Image) != 0 {
codec := scheme.Codecs.LegacyCodec(corev1.SchemeGroupVersion)
newName := o.FindNewName(oldRc)
if newRc, err = LoadExistingNextReplicationController(coreClient, o.Namespace, newName); err != nil {
return err
}
if newRc != nil {
if inProgressImage := newRc.Spec.Template.Spec.Containers[0].Image; inProgressImage != o.Image {
return fmt.Errorf("Found existing in-progress update to image (%s).\nEither continue in-progress update with --image=%s or rollback with --rollback", inProgressImage, inProgressImage)
}
fmt.Fprintf(o.Out, "Found existing update in progress (%s), resuming.\n", newRc.Name)
} else {
config := &NewControllerConfig{
Namespace: o.Namespace,
OldName: o.OldName,
NewName: newName,
Image: o.Image,
Container: o.Container,
DeploymentKey: o.DeploymentKey,
}
if oldRc.Spec.Template.Spec.Containers[0].Image == o.Image {
if len(o.PullPolicy) == 0 {
return fmt.Errorf("--image-pull-policy (Always|Never|IfNotPresent) must be provided when --image is the same as existing container image")
}
config.PullPolicy = corev1.PullPolicy(o.PullPolicy)
}
newRc, err = CreateNewControllerFromCurrentController(coreClient, codec, config)
if err != nil {
return err
}
}
// Update the existing replication controller with pointers to the 'next' controller
// and adding the <deploymentKey> label if necessary to distinguish it from the 'next' controller.
oldHash, err := util.HashObject(oldRc, codec)
if err != nil {
return err
}
// If new image is same as old, the hash may not be distinct, so add a suffix.
oldHash += "-orig"
oldRc, err = UpdateExistingReplicationController(coreClient, coreClient, oldRc, o.Namespace, newRc.Name, o.DeploymentKey, oldHash, o.Out)
if err != nil {
return err
}
}
if o.Rollback {
newName := o.FindNewName(oldRc)
if newRc, err = LoadExistingNextReplicationController(coreClient, o.Namespace, newName); err != nil {
return err
}
if newRc == nil {
return fmt.Errorf("Could not find %s to rollback.\n", newName)
}
}
if o.OldName == newRc.Name {
return fmt.Errorf("%s cannot have the same name as the existing ReplicationController %s",
filename, o.OldName)
}
updater := NewRollingUpdater(newRc.Namespace, coreClient, coreClient, o.ScaleClient)
// To successfully pull off a rolling update the new and old rc have to differ
// by at least one selector. Every new pod should have the selector and every
// old pod should not have the selector.
var hasLabel bool
for key, oldValue := range oldRc.Spec.Selector {
if newValue, ok := newRc.Spec.Selector[key]; ok && newValue != oldValue {
hasLabel = true
break
}
}
if !hasLabel {
return fmt.Errorf("%s must specify a matching key with non-equal value in Selector for %s",
filename, o.OldName)
}
// TODO: handle scales during rolling update
if replicasDefaulted {
t := *oldRc.Spec.Replicas
newRc.Spec.Replicas = &t
}
if o.DryRun {
oldRcData := &bytes.Buffer{}
newRcData := &bytes.Buffer{}
if o.OutputFormat == "" {
oldRcData.WriteString(oldRc.Name)
newRcData.WriteString(newRc.Name)
} else {
printer, err := o.ToPrinter("rolling updated")
if err != nil {
return err
}
if err := printer.PrintObj(oldRc, oldRcData); err != nil {
return err
}
if err := printer.PrintObj(newRc, newRcData); err != nil {
return err
}
}
fmt.Fprintf(o.Out, "Rolling from:\n%s\nTo:\n%s\n", string(oldRcData.Bytes()), string(newRcData.Bytes()))
return nil
}
updateCleanupPolicy := DeleteRollingUpdateCleanupPolicy
if o.KeepOldName {
updateCleanupPolicy = RenameRollingUpdateCleanupPolicy
}
config := &RollingUpdaterConfig{
Out: o.Out,
OldRc: oldRc,
NewRc: newRc,
UpdatePeriod: o.Period,
Interval: o.Interval,
Timeout: timeout,
CleanupPolicy: updateCleanupPolicy,
MaxUnavailable: intstr.FromInt(0),
MaxSurge: intstr.FromInt(1),
}
if o.Rollback {
err = AbortRollingUpdate(config)
if err != nil {
return err
}
coreClient.ReplicationControllers(config.NewRc.Namespace).Update(context.TODO(), config.NewRc, metav1.UpdateOptions{})
}
err = updater.Update(config)
if err != nil {
return err
}
message := "rolling updated"
if o.KeepOldName {
newRc.Name = o.OldName
} else {
message = fmt.Sprintf("rolling updated to %q", newRc.Name)
}
newRc, err = coreClient.ReplicationControllers(o.Namespace).Get(context.TODO(), newRc.Name, metav1.GetOptions{})
if err != nil {
return err
}
printer, err := o.ToPrinter(message)
if err != nil {
return err
}
return printer.PrintObj(newRc, o.Out)
}
func findNewName(args []string, oldRc *corev1.ReplicationController) string {
if len(args) >= 2 {
return args[1]
}
if oldRc != nil {
newName, _ := GetNextControllerAnnotation(oldRc)
return newName
}
return ""
}
func getClientSideDryRun(cmd *cobra.Command) bool {
dryRunStrategy, err := cmdutil.GetDryRunStrategy(cmd)
if err != nil {
klog.Fatalf("error accessing --dry-run flag for command %s: %v", cmd.Name(), err)
}
if dryRunStrategy == cmdutil.DryRunServer {
klog.Fatalf("--dry-run=server for command %s is not supported yet", cmd.Name())
}
return dryRunStrategy == cmdutil.DryRunClient
}

View File

@ -1,92 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rollingupdate
import (
"testing"
"k8s.io/cli-runtime/pkg/genericclioptions"
cmdtesting "k8s.io/kubectl/pkg/cmd/testing"
)
func TestValidateArgs(t *testing.T) {
f := cmdtesting.NewTestFactory()
defer f.Cleanup()
tests := []struct {
testName string
flags map[string]string
filenames []string
args []string
expectErr bool
}{
{
testName: "nothing",
expectErr: true,
},
{
testName: "no file, no image",
flags: map[string]string{},
args: []string{"foo"},
expectErr: true,
},
{
testName: "valid file example",
filenames: []string{"bar.yaml"},
args: []string{"foo"},
},
{
testName: "missing second image name",
flags: map[string]string{
"image": "foo:v2",
},
args: []string{"foo"},
},
{
testName: "valid image example",
flags: map[string]string{
"image": "foo:v2",
},
args: []string{"foo", "foo-v2"},
},
{
testName: "both filename and image example",
flags: map[string]string{
"image": "foo:v2",
},
filenames: []string{"bar.yaml"},
args: []string{"foo", "foo-v2"},
expectErr: true,
},
}
for _, test := range tests {
cmd := NewCmdRollingUpdate(f, genericclioptions.NewTestIOStreamsDiscard())
if test.flags != nil {
for key, val := range test.flags {
cmd.Flags().Set(key, val)
}
}
err := validateArguments(cmd, test.filenames, test.args)
if err != nil && !test.expectErr {
t.Errorf("%s: unexpected error: %v", test.testName, err)
}
if err == nil && test.expectErr {
t.Errorf("%s: unexpected non-error", test.testName)
}
}
}