add more info for rollout status cloneset (#65)

* add more info for rollout status cloneset

Signed-off-by: hantmac <hantmac@outlook.com>

* add  flag for rollout status

Signed-off-by: hantmac <hantmac@outlook.com>
This commit is contained in:
Jeremy 2022-07-22 19:51:37 +08:00 committed by GitHub
parent 8012780546
commit e1b55f7d11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 254 additions and 14 deletions

View File

@ -35,6 +35,7 @@ import (
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
@ -77,10 +78,12 @@ type RolloutStatusOptions struct {
Watch bool
Revision int64
Timeout time.Duration
Detail bool
StatusViewerFn func(*meta.RESTMapping) (internalpolymorphichelpers.StatusViewer, error)
Builder func() *resource.Builder
DynamicClient dynamic.Interface
ClientSet kubernetes.Interface
FilenameOptions *resource.FilenameOptions
genericclioptions.IOStreams
@ -94,6 +97,7 @@ func NewRolloutStatusOptions(streams genericclioptions.IOStreams) *RolloutStatus
IOStreams: streams,
Watch: true,
Timeout: 0,
Detail: false,
}
}
@ -122,6 +126,7 @@ func NewCmdRolloutStatus(f cmdutil.Factory, streams genericclioptions.IOStreams)
cmd.Flags().BoolVarP(&o.Watch, "watch", "w", o.Watch, "Watch the status of the rollout until it's done.")
cmd.Flags().Int64Var(&o.Revision, "revision", o.Revision, "Pin to a specific revision for showing its status. Defaults to 0 (last revision).")
cmd.Flags().DurationVar(&o.Timeout, "timeout", o.Timeout, "The length of time to wait before ending watch, zero means never. Any other values should contain a corresponding time unit (e.g. 1s, 2m, 3h).")
cmd.Flags().BoolVarP(&o.Detail, "detail", "d", o.Detail, "Show the detail status of the rollout.")
return cmd
}
@ -149,6 +154,11 @@ func (o *RolloutStatusOptions) Complete(f cmdutil.Factory, args []string) error
return err
}
o.ClientSet, err = kubernetes.NewForConfig(clientConfig)
if err != nil {
return err
}
return nil
}
@ -224,17 +234,23 @@ func (o *RolloutStatusOptions) Run() error {
// if the rollout isn't done yet, keep watching deployment status
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
intr := interrupt.New(nil, cancel)
var status string
var consideredDone bool
return intr.Run(func() error {
_, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, preconditionFunc, func(e watch.Event) (bool, error) {
switch t := e.Type; t {
case watch.Added, watch.Modified:
status, done, err := statusViewer.Status(e.Object.(runtime.Unstructured), o.Revision)
if o.Detail {
status, consideredDone, err = statusViewer.DetailStatus(o.ClientSet, e.Object.(runtime.Unstructured), o.Detail, o.Revision)
} else {
status, consideredDone, err = statusViewer.Status(o.ClientSet, e.Object.(runtime.Unstructured), o.Revision)
}
if err != nil {
return false, err
}
fmt.Fprintf(o.Out, "%s", status)
// Quit waiting if the rollout is done
if done {
if consideredDone {
return true, nil
}

View File

@ -27,12 +27,14 @@ import (
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
deploymentutil "k8s.io/kubectl/pkg/util/deployment"
)
// StatusViewer provides an interface for resources that have rollout status.
type StatusViewer interface {
Status(obj runtime.Unstructured, revision int64) (string, bool, error)
Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error)
DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error)
}
// StatusViewerFor returns a StatusViewer for the resource specified by kind.
@ -71,7 +73,43 @@ type CloneSetStatusViewer struct{}
type AdvancedStatefulSetStatusViewer struct{}
// Status returns a message describing deployment status, and a bool value indicating if the status is considered done.
func (s *DeploymentStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) {
func (s *DeploymentStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) {
deployment := &appsv1.Deployment{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), deployment)
if err != nil {
return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, deployment, err)
}
if revision > 0 {
deploymentRev, err := deploymentutil.Revision(deployment)
if err != nil {
return "", false, fmt.Errorf("cannot get the revision of deployment %q: %v", deployment.Name, err)
}
if revision != deploymentRev {
return "", false, fmt.Errorf("desired revision (%d) is different from the running revision (%d)", revision, deploymentRev)
}
}
if deployment.Generation <= deployment.Status.ObservedGeneration {
cond := deploymentutil.GetDeploymentCondition(deployment.Status, appsv1.DeploymentProgressing)
if cond != nil && cond.Reason == deploymentutil.TimedOutReason {
return "", false, fmt.Errorf("deployment %q exceeded its progress deadline", deployment.Name)
}
if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas {
return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d out of %d new replicas have been updated...\n", deployment.Name, deployment.Status.UpdatedReplicas, *deployment.Spec.Replicas), false, nil
}
if deployment.Status.Replicas > deployment.Status.UpdatedReplicas {
return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d old replicas are pending termination...\n", deployment.Name, deployment.Status.Replicas-deployment.Status.UpdatedReplicas), false, nil
}
if deployment.Status.AvailableReplicas < deployment.Status.UpdatedReplicas {
return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d of %d updated replicas are available...\n", deployment.Name, deployment.Status.AvailableReplicas, deployment.Status.UpdatedReplicas), false, nil
}
return fmt.Sprintf("deployment %q successfully rolled out\n", deployment.Name), true, nil
}
return fmt.Sprintf("Waiting for deployment spec update to be observed...\n"), false, nil
}
// DetailStatus returns a message describing deployment detail status, and a bool value indicating if the status is considered done.
func (s *DeploymentStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) {
deployment := &appsv1.Deployment{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), deployment)
if err != nil {
@ -107,7 +145,32 @@ func (s *DeploymentStatusViewer) Status(obj runtime.Unstructured, revision int64
}
// Status returns a message describing daemon set status, and a bool value indicating if the status is considered done.
func (s *DaemonSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) {
func (s *DaemonSetStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) {
//ignoring revision as DaemonSets does not have history yet
daemon := &appsv1.DaemonSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), daemon)
if err != nil {
return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, daemon, err)
}
if daemon.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType {
return "", true, fmt.Errorf("rollout status is only available for %s strategy type", appsv1.RollingUpdateStatefulSetStrategyType)
}
if daemon.Generation <= daemon.Status.ObservedGeneration {
if daemon.Status.UpdatedNumberScheduled < daemon.Status.DesiredNumberScheduled {
return fmt.Sprintf("Waiting for daemon set %q rollout to finish: %d out of %d new pods have been updated...\n", daemon.Name, daemon.Status.UpdatedNumberScheduled, daemon.Status.DesiredNumberScheduled), false, nil
}
if daemon.Status.NumberAvailable < daemon.Status.DesiredNumberScheduled {
return fmt.Sprintf("Waiting for daemon set %q rollout to finish: %d of %d updated pods are available...\n", daemon.Name, daemon.Status.NumberAvailable, daemon.Status.DesiredNumberScheduled), false, nil
}
return fmt.Sprintf("daemon set %q successfully rolled out\n", daemon.Name), true, nil
}
return fmt.Sprintf("Waiting for daemon set spec update to be observed...\n"), false, nil
}
// DetailStatus returns a message describing daemon set status, and a bool value indicating if the status is considered done.
func (s *DaemonSetStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) {
//ignoring revision as DaemonSets does not have history yet
daemon := &appsv1.DaemonSet{}
@ -132,7 +195,7 @@ func (s *DaemonSetStatusViewer) Status(obj runtime.Unstructured, revision int64)
}
// Status returns a message describing statefulset status, and a bool value indicating if the status is considered done.
func (s *StatefulSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) {
func (s *StatefulSetStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) {
sts := &appsv1.StatefulSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), sts)
if err != nil {
@ -163,11 +226,43 @@ func (s *StatefulSetStatusViewer) Status(obj runtime.Unstructured, revision int6
sts.Status.UpdatedReplicas, sts.Status.UpdateRevision), false, nil
}
return fmt.Sprintf("statefulset rolling update complete %d pods at revision %s...\n", sts.Status.CurrentReplicas, sts.Status.CurrentRevision), true, nil
}
func (s *StatefulSetStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) {
sts := &appsv1.StatefulSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), sts)
if err != nil {
return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, sts, err)
}
if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
return "", true, fmt.Errorf("rollout status is only available for %s strategy type", appsv1.RollingUpdateStatefulSetStrategyType)
}
if sts.Status.ObservedGeneration == 0 || sts.Generation > sts.Status.ObservedGeneration {
return "Waiting for statefulset spec update to be observed...\n", false, nil
}
if sts.Spec.Replicas != nil && sts.Status.ReadyReplicas < *sts.Spec.Replicas {
return fmt.Sprintf("Waiting for %d pods to be ready...\n", *sts.Spec.Replicas-sts.Status.ReadyReplicas), false, nil
}
if sts.Spec.UpdateStrategy.Type == appsv1.RollingUpdateStatefulSetStrategyType && sts.Spec.UpdateStrategy.RollingUpdate != nil {
if sts.Spec.Replicas != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
if sts.Status.UpdatedReplicas < (*sts.Spec.Replicas - *sts.Spec.UpdateStrategy.RollingUpdate.Partition) {
return fmt.Sprintf("Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n",
sts.Status.UpdatedReplicas, *sts.Spec.Replicas-*sts.Spec.UpdateStrategy.RollingUpdate.Partition), false, nil
}
}
return fmt.Sprintf("partitioned roll out complete: %d new pods have been updated...\n",
sts.Status.UpdatedReplicas), true, nil
}
if sts.Status.UpdateRevision != sts.Status.CurrentRevision {
return fmt.Sprintf("waiting for statefulset rolling update to complete %d pods at revision %s...\n",
sts.Status.UpdatedReplicas, sts.Status.UpdateRevision), false, nil
}
return fmt.Sprintf("statefulset rolling update complete %d pods at revision %s...\n", sts.Status.CurrentReplicas, sts.Status.CurrentRevision), true, nil
}
// Status returns a message describing cloneset status, and a bool value indicating if the status is considered done.
func (s *CloneSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) {
func (s *CloneSetStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) {
cs := &kruiseappsv1alpha1.CloneSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), cs)
if err != nil {
@ -178,11 +273,8 @@ func (s *CloneSetStatusViewer) Status(obj runtime.Unstructured, revision int64)
if cs.Spec.UpdateStrategy.Type == kruiseappsv1alpha1.InPlaceOnlyCloneSetUpdateStrategyType ||
cs.Spec.UpdateStrategy.Type == kruiseappsv1alpha1.InPlaceIfPossibleCloneSetUpdateStrategyType {
if cs.Spec.Replicas != nil && cs.Spec.UpdateStrategy.Partition != nil {
if cs.Status.UpdatedReplicas < (*cs.Spec.Replicas - cs.Spec.UpdateStrategy.Partition.IntVal) {
return fmt.Sprintf("Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n",
cs.Status.UpdatedReplicas, *cs.Spec.Replicas-cs.Spec.UpdateStrategy.Partition.IntVal), false, nil
}
return fmt.Sprintf("Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n",
cs.Status.UpdatedReplicas, *cs.Spec.Replicas-cs.Spec.UpdateStrategy.Partition.IntVal), false, nil
}
}
@ -196,8 +288,67 @@ func (s *CloneSetStatusViewer) Status(obj runtime.Unstructured, revision int64)
return fmt.Sprintf("CloneSet rolling update complete %d pods at revision %s...\n", cs.Status.AvailableReplicas, cs.Status.UpdateRevision), true, nil
}
// DetailStatus returns a message describing cloneset detail status, and a bool value indicating if the status is considered done.
func (s *CloneSetStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) {
cs := &kruiseappsv1alpha1.CloneSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), cs)
if err != nil {
return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, cs, err)
}
// check InPlaceOnly and InPlacePossible UpdateStrategy
if cs.Spec.UpdateStrategy.Type == kruiseappsv1alpha1.InPlaceOnlyCloneSetUpdateStrategyType ||
cs.Spec.UpdateStrategy.Type == kruiseappsv1alpha1.InPlaceIfPossibleCloneSetUpdateStrategyType {
if cs.Spec.Replicas != nil && cs.Spec.UpdateStrategy.Partition != nil {
if cs.Status.UpdatedReplicas < (*cs.Spec.Replicas - cs.Spec.UpdateStrategy.Partition.IntVal) {
return fmt.Sprintf("CloneSet %s Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n%s",
cs.Name, cs.Status.UpdatedReplicas, *cs.Spec.Replicas-cs.Spec.UpdateStrategy.Partition.IntVal, generatePodsInfoForCloneSet(c, cs)), false, nil
}
}
}
if cs.Status.ObservedGeneration == 0 || cs.Generation > cs.Status.ObservedGeneration {
return fmt.Sprintf("Waiting for CloneSet %s spec update to be observed...\n", cs.Name), false, nil
}
if cs.Spec.Replicas != nil && cs.Status.ReadyReplicas < *cs.Spec.Replicas {
return fmt.Sprintf("Waiting for %d pods to be ready...\n%s", *cs.Spec.Replicas-cs.Status.ReadyReplicas,
generatePodsInfoForCloneSet(c, cs)), false, nil
}
return fmt.Sprintf("CloneSet %s rolling update complete %d pods at revision %s...\n%s",
cs.Name, cs.Status.AvailableReplicas, cs.Status.UpdateRevision, generatePodsInfoForCloneSet(c, cs)), true, nil
}
// Status returns a message describing advanced statefulset status, and a bool value indicating if the status is considered done.
func (s *AdvancedStatefulSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) {
func (s *AdvancedStatefulSetStatusViewer) Status(c kubernetes.Interface, obj runtime.Unstructured, revision int64) (string, bool, error) {
asts := &kruiseappsv1beta1.StatefulSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), asts)
if err != nil {
return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, asts, err)
}
// check InPlaceOnly and InPlacePossible UpdateStrategy
if asts.Spec.UpdateStrategy.Type == appsv1.RollingUpdateStatefulSetStrategyType {
if asts.Spec.Replicas != nil && asts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
if asts.Status.UpdatedReplicas < (*asts.Spec.Replicas - *asts.Spec.UpdateStrategy.RollingUpdate.Partition) {
return fmt.Sprintf("Waiting for partitioned roll out to finish:%d out of %d new pods has been updated...\n",
asts.Status.UpdatedReplicas, *asts.Spec.Replicas-*asts.Spec.UpdateStrategy.RollingUpdate.Partition), false, nil
}
}
}
if asts.Status.ObservedGeneration == 0 || asts.Generation > asts.Status.ObservedGeneration {
return "Waiting for Advanced StatefulSet spec update to be observed...\n", false, nil
}
if asts.Spec.Replicas != nil && asts.Status.ReadyReplicas < *asts.Spec.Replicas {
return fmt.Sprintf("Waiting for %d pods to be ready...\n", *asts.Spec.Replicas-asts.Status.ReadyReplicas), false, nil
}
return fmt.Sprintf("Advanced StatefulSet rolling update complete %d pods at revision %s...\n", asts.Status.AvailableReplicas, asts.Status.UpdateRevision), true, nil
}
// DetailStatus returns a message describing advanced statefulset status, and a bool value indicating if the status is considered done.
func (s *AdvancedStatefulSetStatusViewer) DetailStatus(c kubernetes.Interface, obj runtime.Unstructured, detail bool, revision int64) (string, bool, error) {
asts := &kruiseappsv1beta1.StatefulSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), asts)
if err != nil {
@ -222,5 +373,4 @@ func (s *AdvancedStatefulSetStatusViewer) Status(obj runtime.Unstructured, revis
return fmt.Sprintf("Waiting for %d pods to be ready...\n", *asts.Spec.Replicas-asts.Status.ReadyReplicas), false, nil
}
return fmt.Sprintf("Advanced StatefulSet rolling update complete %d pods at revision %s...\n", asts.Status.AvailableReplicas, asts.Status.UpdateRevision), true, nil
}

View File

@ -0,0 +1,74 @@
package polymorphichelpers
import (
"context"
"fmt"
kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
)
func getPodsByLabelSelector(client kubernetes.Interface, ns string, labelSelector *metav1.LabelSelector) ([]*corev1.Pod, error) {
var podsList []*corev1.Pod
pods, err := client.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{LabelSelector: labels.Set(labelSelector.MatchLabels).String()})
if err != nil {
return nil, err
}
for i := range pods.Items {
podsList = append(podsList, &pods.Items[i])
}
return podsList, nil
}
func filterOldNewReadyPodsFromCloneSet(client kubernetes.Interface, clone *kruiseappsv1alpha1.CloneSet) (oldPods []*corev1.Pod,
newNotReadyPods []*corev1.Pod, updatedReadyPods []*corev1.Pod, err error) {
pods, err := getPodsByLabelSelector(client, clone.Namespace, clone.Spec.Selector)
if err != nil {
return
}
for i := range pods {
if podRevision, ok := pods[i].GetLabels()["controller-revision-hash"]; ok {
if podRevision == clone.Status.UpdateRevision {
if podReady(pods[i]) {
updatedReadyPods = append(updatedReadyPods, pods[i])
} else {
newNotReadyPods = append(newNotReadyPods, pods[i])
}
} else {
oldPods = append(oldPods, pods[i])
}
}
}
return
}
func podReady(p *corev1.Pod) bool {
cs := p.Status.Conditions
for _, c := range cs {
if c.Type == corev1.PodReady {
return c.Status == corev1.ConditionTrue
}
}
return false
}
func generatePodsInfoForCloneSet(client kubernetes.Interface, clone *kruiseappsv1alpha1.CloneSet) string {
var notReadyPodsSlice, ReadyPodsSlice []string
_, notReadyNewPods, readyNewPods, err := filterOldNewReadyPodsFromCloneSet(client, clone)
if err != nil {
return ""
}
for i := range notReadyNewPods {
notReadyPodsSlice = append(notReadyPodsSlice, notReadyNewPods[i].Name)
}
for j := range readyNewPods {
ReadyPodsSlice = append(ReadyPodsSlice, readyNewPods[j].Name)
}
return fmt.Sprintf("Updated ready pods: %v\nUpdated not ready pods: %v\n", ReadyPodsSlice, notReadyPodsSlice)
}