diff --git a/cmd/scheduler-estimator/app/scheduler-estimator.go b/cmd/scheduler-estimator/app/scheduler-estimator.go index c8d7f5c8a..5a7154806 100644 --- a/cmd/scheduler-estimator/app/scheduler-estimator.go +++ b/cmd/scheduler-estimator/app/scheduler-estimator.go @@ -11,6 +11,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" + "k8s.io/client-go/discovery" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" @@ -56,9 +58,11 @@ func run(ctx context.Context, opts *options.Options) error { } restConfig.QPS, restConfig.Burst = opts.ClusterAPIQPS, opts.ClusterAPIBurst - kubeClientSet := kubernetes.NewForConfigOrDie(restConfig) + kubeClient := kubernetes.NewForConfigOrDie(restConfig) + dynamicClient := dynamic.NewForConfigOrDie(restConfig) + discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(restConfig) - e := server.NewEstimatorServer(kubeClientSet, opts) + e := server.NewEstimatorServer(kubeClient, dynamicClient, discoveryClient, opts, ctx.Done()) if err = e.Start(ctx); err != nil { klog.Errorf("estimator server exits unexpectedly: %v", err) return err diff --git a/pkg/estimator/server/metrics/metrics.go b/pkg/estimator/server/metrics/metrics.go index af08ad85d..f1cb80153 100644 --- a/pkg/estimator/server/metrics/metrics.go +++ b/pkg/estimator/server/metrics/metrics.go @@ -15,6 +15,8 @@ const SchedulerEstimatorSubsystem = "karmada_scheduler_estimator" const ( // EstimatingTypeMaxAvailableReplicas - label of estimating type EstimatingTypeMaxAvailableReplicas = "MaxAvailableReplicas" + // EstimatingTypeGetUnschedulableReplicas - label of estimating type + EstimatingTypeGetUnschedulableReplicas = "GetUnschedulableReplicas" ) const ( @@ -22,6 +24,10 @@ const ( EstimatingStepListNodesByNodeClaim = "ListNodesByNodeClaim" // EstimatingStepMaxAvailableReplicas - label of estimating step EstimatingStepMaxAvailableReplicas = "MaxAvailableReplicas" + // EstimatingStepGetObjectFromCache - label of estimating step + EstimatingStepGetObjectFromCache = "GetObjectFromCache" + // EstimatingStepGetUnschedulablePodsOfWorkload - label of estimating step + EstimatingStepGetUnschedulablePodsOfWorkload = "GetWorkloadUnschedulablePods" // EstimatingStepTotal - label of estimating step, total step EstimatingStepTotal = "Total" ) diff --git a/pkg/estimator/server/replica/replica.go b/pkg/estimator/server/replica/replica.go index 4f504f203..4e32c432d 100644 --- a/pkg/estimator/server/replica/replica.go +++ b/pkg/estimator/server/replica/replica.go @@ -1,9 +1,21 @@ package replica import ( + "fmt" + "time" + + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + listappsv1 "k8s.io/client-go/listers/apps/v1" + listcorev1 "k8s.io/client-go/listers/core/v1" + "k8s.io/klog/v2" "github.com/karmada-io/karmada/pkg/estimator/server/nodes" + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/helper" + utilworkload "github.com/karmada-io/karmada/pkg/util/workload" ) // NodeMaxAvailableReplica calculates max available replicas of a node, based on @@ -15,3 +27,67 @@ func NodeMaxAvailableReplica(node *corev1.Node, pods []*corev1.Pod, request core } return int32(ni.MaxReplicaDivided(request)), nil } + +// ListerWrapper is a wrapper which wraps the pod lister and replicaset lister. +type ListerWrapper struct { + listcorev1.PodLister + listappsv1.ReplicaSetLister +} + +// GetUnschedulablePodsOfWorkload will return how many unschedulable pods a workload derives. +func GetUnschedulablePodsOfWorkload(unstructObj *unstructured.Unstructured, threshold time.Duration, listers *ListerWrapper) (int32, error) { + if threshold < 0 { + threshold = 0 + } + unschedulable := 0 + // Workloads could be classified into two types. The one is which owns ReplicaSet + // and the other is which owns Pod directly. + switch unstructObj.GetKind() { + case util.DeploymentKind: + deployment, err := helper.ConvertToDeployment(unstructObj) + if err != nil { + return 0, fmt.Errorf("failed to convert ReplicaSet from unstructured object: %v", err) + } + pods, err := listDeploymentPods(deployment, listers) + if err != nil { + return 0, err + } + for _, pod := range pods { + if podUnschedulable(pod, threshold) { + unschedulable++ + } + } + default: + // TODO(Garrybest): add abstract workload + return 0, fmt.Errorf("kind(%s) of workload(%s) is not supported", unstructObj.GetKind(), klog.KObj(unstructObj).String()) + } + return int32(unschedulable), nil +} + +func podUnschedulable(pod *corev1.Pod, threshold time.Duration) bool { + _, cond := helper.GetPodCondition(&pod.Status, corev1.PodScheduled) + return cond != nil && cond.Status == corev1.ConditionFalse && cond.Reason == corev1.PodReasonUnschedulable && + cond.LastTransitionTime.Add(threshold).Before(time.Now()) +} + +func listDeploymentPods(deployment *appsv1.Deployment, listers *ListerWrapper) ([]*corev1.Pod, error) { + // Get ReplicaSet + rsListFunc := func(namespace string, selector labels.Selector) ([]*appsv1.ReplicaSet, error) { + return listers.ReplicaSetLister.ReplicaSets(namespace).List(selector) + } + + rs, err := utilworkload.GetNewReplicaSet(deployment, rsListFunc) + if err != nil { + return nil, err + } + + podListFunc := func(namespace string, selector labels.Selector) ([]*corev1.Pod, error) { + return listers.PodLister.Pods(namespace).List(selector) + } + + pods, err := utilworkload.ListPodsByRS(deployment, []*appsv1.ReplicaSet{rs}, podListFunc) + if err != nil { + return nil, err + } + return pods, nil +} diff --git a/pkg/estimator/server/server.go b/pkg/estimator/server/server.go index c9dc4aa7a..80c07d8ea 100644 --- a/pkg/estimator/server/server.go +++ b/pkg/estimator/server/server.go @@ -9,11 +9,18 @@ import ( "github.com/kr/pretty" "google.golang.org/grpc" "google.golang.org/grpc/metadata" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" + cacheddiscovery "k8s.io/client-go/discovery/cached/memory" + "k8s.io/client-go/dynamic" "k8s.io/client-go/informers" infov1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" listv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/restmapper" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -24,38 +31,62 @@ import ( "github.com/karmada-io/karmada/pkg/estimator/server/replica" estimatorservice "github.com/karmada-io/karmada/pkg/estimator/service" "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/helper" + "github.com/karmada-io/karmada/pkg/util/informermanager" + "github.com/karmada-io/karmada/pkg/util/informermanager/keys" ) const ( nodeNameKeyIndex = "spec.nodeName" ) +var ( + // TODO(Garrybest): make it as an option + supportedGVRs = []schema.GroupVersionResource{ + appsv1.SchemeGroupVersion.WithResource("deployments"), + } +) + // AccurateSchedulerEstimatorServer is the gRPC server of a cluster accurate scheduler estimator. // Please see https://github.com/karmada-io/karmada/pull/580 (#580). type AccurateSchedulerEstimatorServer struct { port int clusterName string kubeClient kubernetes.Interface + restMapper meta.RESTMapper informerFactory informers.SharedInformerFactory nodeInformer infov1.NodeInformer podInformer infov1.PodInformer nodeLister listv1.NodeLister - podLister listv1.PodLister + replicaLister *replica.ListerWrapper getPodFunc func(nodeName string) ([]*corev1.Pod, error) + informerManager informermanager.SingleClusterInformerManager } // NewEstimatorServer creates an instance of AccurateSchedulerEstimatorServer. -func NewEstimatorServer(kubeClient kubernetes.Interface, opts *options.Options) *AccurateSchedulerEstimatorServer { +func NewEstimatorServer( + kubeClient kubernetes.Interface, + dynamicClient dynamic.Interface, + discoveryClient discovery.DiscoveryInterface, + opts *options.Options, + stopChan <-chan struct{}, +) *AccurateSchedulerEstimatorServer { + cachedDiscoClient := cacheddiscovery.NewMemCacheClient(discoveryClient) + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscoClient) informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) es := &AccurateSchedulerEstimatorServer{ port: opts.ServerPort, clusterName: opts.ClusterName, kubeClient: kubeClient, + restMapper: restMapper, informerFactory: informerFactory, nodeInformer: informerFactory.Core().V1().Nodes(), podInformer: informerFactory.Core().V1().Pods(), nodeLister: informerFactory.Core().V1().Nodes().Lister(), - podLister: informerFactory.Core().V1().Pods().Lister(), + replicaLister: &replica.ListerWrapper{ + PodLister: informerFactory.Core().V1().Pods().Lister(), + ReplicaSetLister: informerFactory.Apps().V1().ReplicaSets().Lister(), + }, } // Establish a connection between the pods and their assigned nodes. @@ -93,20 +124,30 @@ func NewEstimatorServer(kubeClient kubernetes.Interface, opts *options.Options) } return pods, nil } + es.informerManager = informermanager.NewSingleClusterInformerManager(dynamicClient, 0, stopChan) + for _, gvr := range supportedGVRs { + es.informerManager.Lister(gvr) + } return es } // Start runs the accurate replica estimator server. func (es *AccurateSchedulerEstimatorServer) Start(ctx context.Context) error { stopCh := ctx.Done() - klog.Infof("Starting karmada cluster(%s) accurate replica estimator", es.clusterName) - defer klog.Infof("Shutting down cluster(%s) accurate replica estimator", es.clusterName) + klog.Infof("Starting karmada cluster(%s) accurate scheduler estimator", es.clusterName) + defer klog.Infof("Shutting down cluster(%s) accurate scheduler estimator", es.clusterName) es.informerFactory.Start(stopCh) if !es.waitForCacheSync(stopCh) { return fmt.Errorf("failed to wait for cache sync") } + es.informerManager.Start() + synced := es.informerManager.WaitForCacheSync() + if synced == nil { + return fmt.Errorf("informer factory for cluster does not exist") + } + // Listen a port and register the gRPC server. l, err := net.Listen("tcp", fmt.Sprintf(":%d", es.port)) if err != nil { @@ -169,9 +210,46 @@ func (es *AccurateSchedulerEstimatorServer) MaxAvailableReplicas(ctx context.Con // GetUnschedulableReplicas is the implementation of gRPC interface. It will return the // unschedulable replicas of a workload. -func (es *AccurateSchedulerEstimatorServer) GetUnschedulableReplicas(ctx context.Context, request *pb.UnschedulableReplicasRequest) (*pb.UnschedulableReplicasResponse, error) { - //TODO(Garrybest): implement me - return nil, nil +func (es *AccurateSchedulerEstimatorServer) GetUnschedulableReplicas(ctx context.Context, request *pb.UnschedulableReplicasRequest) (response *pb.UnschedulableReplicasResponse, rerr error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + klog.Warningf("No metadata from context.") + } + var object string + if m := md.Get(string(util.ContextKeyObject)); len(m) != 0 { + object = m[0] + } + + defer traceGetUnschedulableReplicas(object, time.Now(), request)(&response, &rerr) + + if request.Cluster != es.clusterName { + return nil, fmt.Errorf("cluster name does not match, got: %s, desire: %s", request.Cluster, es.clusterName) + } + + // Get the workload. + startTime := time.Now() + gvk := schema.FromAPIVersionAndKind(request.Resource.APIVersion, request.Resource.Kind) + unstructObj, err := helper.GetObjectFromSingleClusterCache(es.restMapper, es.informerManager, &keys.ClusterWideKey{ + Group: gvk.Group, + Version: gvk.Version, + Kind: gvk.Kind, + Namespace: request.Resource.Namespace, + Name: request.Resource.Name, + }) + metrics.UpdateEstimatingAlgorithmLatency(err, metrics.EstimatingTypeGetUnschedulableReplicas, metrics.EstimatingStepGetObjectFromCache, startTime) + if err != nil { + return nil, err + } + + // List all unschedulable replicas. + startTime = time.Now() + unschedulables, err := replica.GetUnschedulablePodsOfWorkload(unstructObj, request.UnschedulableThreshold, es.replicaLister) + metrics.UpdateEstimatingAlgorithmLatency(err, metrics.EstimatingTypeGetUnschedulableReplicas, metrics.EstimatingStepGetUnschedulablePodsOfWorkload, startTime) + if err != nil { + return nil, err + } + + return &pb.UnschedulableReplicasResponse{UnschedulableReplicas: unschedulables}, err } func (es *AccurateSchedulerEstimatorServer) maxAvailableReplicas(nodes []*corev1.Node, request corev1.ResourceList) int32 { @@ -216,3 +294,16 @@ func traceMaxAvailableReplicas(object string, start time.Time, request *pb.MaxAv klog.Infof("Finish calculating cluster available replicas of resource(%s), max replicas: %d, time elapsed: %s", object, (*response).MaxReplicas, time.Since(start)) } } + +func traceGetUnschedulableReplicas(object string, start time.Time, request *pb.UnschedulableReplicasRequest) func(response **pb.UnschedulableReplicasResponse, err *error) { + klog.V(4).Infof("Begin detecting cluster unscheduable replicas of resource(%s), request: %s", object, pretty.Sprint(*request)) + return func(response **pb.UnschedulableReplicasResponse, err *error) { + metrics.CountRequests(*err, metrics.EstimatingTypeGetUnschedulableReplicas) + metrics.UpdateEstimatingAlgorithmLatency(*err, metrics.EstimatingTypeGetUnschedulableReplicas, metrics.EstimatingStepTotal, start) + if *err != nil { + klog.Errorf("Failed to detect cluster unscheduable replicas: %v", *err) + return + } + klog.Infof("Finish detecting cluster unscheduable replicas of resource(%s), unschedulable replicas: %d, time elapsed: %s", object, (*response).UnschedulableReplicas, time.Since(start)) + } +} diff --git a/pkg/estimator/server/server_test.go b/pkg/estimator/server/server_test.go index 9cf46ef4d..2a6a0a009 100644 --- a/pkg/estimator/server/server_test.go +++ b/pkg/estimator/server/server_test.go @@ -6,9 +6,15 @@ import ( "reflect" "testing" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + discoveryfake "k8s.io/client-go/discovery/fake" + dynamicfake "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/kubernetes/fake" + coretesting "k8s.io/client-go/testing" "github.com/karmada-io/karmada/cmd/scheduler-estimator/app/options" "github.com/karmada-io/karmada/pkg/estimator/pb" @@ -205,11 +211,27 @@ func TestAccurateSchedulerEstimatorServer_MaxAvailableReplicas(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - es := NewEstimatorServer(fake.NewSimpleClientset(tt.objs...), opt) + gvrToListKind := map[schema.GroupVersionResource]string{ + {Group: "apps", Version: "v1", Resource: "deployments"}: "DeploymentList", + } + dynamicClient := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(runtime.NewScheme(), gvrToListKind) + discoveryClient := &discoveryfake.FakeDiscovery{ + Fake: &coretesting.Fake{}, + } + discoveryClient.Resources = []*metav1.APIResourceList{ + { + GroupVersion: appsv1.SchemeGroupVersion.String(), + APIResources: []metav1.APIResource{ + {Name: "deployments", Namespaced: true, Kind: "Deployment"}, + }, + }, + } + + es := NewEstimatorServer(fake.NewSimpleClientset(tt.objs...), dynamicClient, discoveryClient, opt, ctx.Done()) es.informerFactory.Start(ctx.Done()) if !es.waitForCacheSync(ctx.Done()) { - t.Errorf("MaxAvailableReplicas() error = %v, wantErr %v", fmt.Errorf("failed to wait for cache sync"), tt.wantErr) + t.Fatalf("MaxAvailableReplicas() error = %v, wantErr %v", fmt.Errorf("failed to wait for cache sync"), tt.wantErr) } gotResponse, err := es.MaxAvailableReplicas(ctx, tt.args.request) diff --git a/pkg/util/helper/pod.go b/pkg/util/helper/pod.go new file mode 100644 index 000000000..0b5a73229 --- /dev/null +++ b/pkg/util/helper/pod.go @@ -0,0 +1,28 @@ +package helper + +import ( + corev1 "k8s.io/api/core/v1" +) + +// GetPodCondition extracts the provided condition from the given status and returns that. +// Returns nil and -1 if the condition is not present, and the index of the located condition. +func GetPodCondition(status *corev1.PodStatus, conditionType corev1.PodConditionType) (int, *corev1.PodCondition) { + if status == nil { + return -1, nil + } + return GetPodConditionFromList(status.Conditions, conditionType) +} + +// GetPodConditionFromList extracts the provided condition from the given list of condition and +// returns the index of the condition and the condition. Returns -1 and nil if the condition is not present. +func GetPodConditionFromList(conditions []corev1.PodCondition, conditionType corev1.PodConditionType) (int, *corev1.PodCondition) { + if conditions == nil { + return -1, nil + } + for i := range conditions { + if conditions[i].Type == conditionType { + return i, &conditions[i] + } + } + return -1, nil +} diff --git a/pkg/util/workload/deployment.go b/pkg/util/workload/deployment.go new file mode 100644 index 000000000..327808581 --- /dev/null +++ b/pkg/util/workload/deployment.go @@ -0,0 +1,151 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This code is lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package. +// However the code has been revised for using Lister instead of API interface. +// For reference: https://github.com/kubernetes/kubernetes/blob/release-1.22/pkg/controller/deployment/util/deployment_util.go + +package workload + +import ( + "sort" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" +) + +// PodListFunc returns the Pod slice from the Pod namespace and a selector. +type PodListFunc func(string, labels.Selector) ([]*corev1.Pod, error) + +// ReplicaSetListFunc returns the ReplicaSet slice from the ReplicaSet namespace and a selector. +type ReplicaSetListFunc func(string, labels.Selector) ([]*appsv1.ReplicaSet, error) + +// ReplicaSetsByCreationTimestamp sorts a list of ReplicaSet by creation timestamp, using their names as a tie breaker. +type ReplicaSetsByCreationTimestamp []*appsv1.ReplicaSet + +func (o ReplicaSetsByCreationTimestamp) Len() int { return len(o) } +func (o ReplicaSetsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] } +func (o ReplicaSetsByCreationTimestamp) Less(i, j int) bool { + if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) { + return o[i].Name < o[j].Name + } + return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp) +} + +// ListReplicaSetsByDeployment returns a slice of RSes the given deployment targets. +// Note that this does NOT attempt to reconcile ControllerRef (adopt/orphan), +// because only the controller itself should do that. +// However, it does filter out anything whose ControllerRef doesn't match. +func ListReplicaSetsByDeployment(deployment *appsv1.Deployment, f ReplicaSetListFunc) ([]*appsv1.ReplicaSet, error) { + // TODO: Right now we list replica sets by their labels. We should list them by selector, i.e. the replica set's selector + // should be a superset of the deployment's selector, see https://github.com/kubernetes/kubernetes/issues/19830. + namespace := deployment.Namespace + selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector) + if err != nil { + return nil, err + } + all, err := f(namespace, selector) + if err != nil { + return nil, err + } + // Only include those whose ControllerRef matches the Deployment. + owned := make([]*appsv1.ReplicaSet, 0, len(all)) + for _, rs := range all { + if metav1.IsControlledBy(rs, deployment) { + owned = append(owned, rs) + } + } + return owned, nil +} + +// ListPodsByRS returns a list of pods the given deployment targets. +// This needs a list of ReplicaSets for the Deployment, +// which can be found with ListReplicaSets(). +// Note that this does NOT attempt to reconcile ControllerRef (adopt/orphan), +// because only the controller itself should do that. +// However, it does filter out anything whose ControllerRef doesn't match. +func ListPodsByRS(deployment *appsv1.Deployment, rsList []*appsv1.ReplicaSet, f PodListFunc) ([]*corev1.Pod, error) { + namespace := deployment.Namespace + selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector) + if err != nil { + return nil, err + } + all, err := f(namespace, selector) + if err != nil { + return all, err + } + // Only include those whose ControllerRef points to a ReplicaSet that is in + // turn owned by this Deployment. + rsMap := make(map[types.UID]bool, len(rsList)) + for _, rs := range rsList { + if rs != nil { + rsMap[rs.UID] = true + } + } + owned := make([]*corev1.Pod, 0, len(all)) + for i := range all { + pod := all[i] + controllerRef := metav1.GetControllerOf(pod) + if controllerRef != nil && rsMap[controllerRef.UID] { + owned = append(owned, pod) + } + } + return owned, nil +} + +// EqualIgnoreHash returns true if two given podTemplateSpec are equal, ignoring the diff in value of Labels[pod-template-hash] +// We ignore pod-template-hash because: +// 1. The hash result would be different upon podTemplateSpec API changes +// (e.g. the addition of a new field will cause the hash code to change) +// 2. The deployment template won't have hash labels +func EqualIgnoreHash(template1, template2 *corev1.PodTemplateSpec) bool { + t1Copy := template1.DeepCopy() + t2Copy := template2.DeepCopy() + // Remove hash labels from template.Labels before comparing + delete(t1Copy.Labels, appsv1.DefaultDeploymentUniqueLabelKey) + delete(t2Copy.Labels, appsv1.DefaultDeploymentUniqueLabelKey) + return equality.Semantic.DeepEqual(t1Copy, t2Copy) +} + +// GetNewReplicaSet returns a replica set that matches the intent of the given deployment; get ReplicaSetList from client interface. +// Returns nil if the new replica set doesn't exist yet. +func GetNewReplicaSet(deployment *appsv1.Deployment, f ReplicaSetListFunc) (*appsv1.ReplicaSet, error) { + rsList, err := ListReplicaSetsByDeployment(deployment, f) + if err != nil { + return nil, err + } + return FindNewReplicaSet(deployment, rsList), nil +} + +// FindNewReplicaSet returns the new RS this given deployment targets (the one with the same pod template). +func FindNewReplicaSet(deployment *appsv1.Deployment, rsList []*appsv1.ReplicaSet) *appsv1.ReplicaSet { + sort.Sort(ReplicaSetsByCreationTimestamp(rsList)) + for i := range rsList { + if EqualIgnoreHash(&rsList[i].Spec.Template, &deployment.Spec.Template) { + // In rare cases, such as after cluster upgrades, Deployment may end up with + // having more than one new ReplicaSets that have the same template as its template, + // see https://github.com/kubernetes/kubernetes/issues/40415 + // We deterministically choose the oldest new ReplicaSet. + return rsList[i] + } + } + // new ReplicaSet does not exist. + return nil +}