implement GetUnschedulableReplicas for estimator

Signed-off-by: Garrybest <garrybest@foxmail.com>
This commit is contained in:
Garrybest 2022-02-23 14:19:51 +08:00
parent 0777cf064c
commit 90900309ca
7 changed files with 390 additions and 12 deletions

View File

@ -11,6 +11,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
@ -56,9 +58,11 @@ func run(ctx context.Context, opts *options.Options) error {
}
restConfig.QPS, restConfig.Burst = opts.ClusterAPIQPS, opts.ClusterAPIBurst
kubeClientSet := kubernetes.NewForConfigOrDie(restConfig)
kubeClient := kubernetes.NewForConfigOrDie(restConfig)
dynamicClient := dynamic.NewForConfigOrDie(restConfig)
discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(restConfig)
e := server.NewEstimatorServer(kubeClientSet, opts)
e := server.NewEstimatorServer(kubeClient, dynamicClient, discoveryClient, opts, ctx.Done())
if err = e.Start(ctx); err != nil {
klog.Errorf("estimator server exits unexpectedly: %v", err)
return err

View File

@ -15,6 +15,8 @@ const SchedulerEstimatorSubsystem = "karmada_scheduler_estimator"
const (
// EstimatingTypeMaxAvailableReplicas - label of estimating type
EstimatingTypeMaxAvailableReplicas = "MaxAvailableReplicas"
// EstimatingTypeGetUnschedulableReplicas - label of estimating type
EstimatingTypeGetUnschedulableReplicas = "GetUnschedulableReplicas"
)
const (
@ -22,6 +24,10 @@ const (
EstimatingStepListNodesByNodeClaim = "ListNodesByNodeClaim"
// EstimatingStepMaxAvailableReplicas - label of estimating step
EstimatingStepMaxAvailableReplicas = "MaxAvailableReplicas"
// EstimatingStepGetObjectFromCache - label of estimating step
EstimatingStepGetObjectFromCache = "GetObjectFromCache"
// EstimatingStepGetUnschedulablePodsOfWorkload - label of estimating step
EstimatingStepGetUnschedulablePodsOfWorkload = "GetWorkloadUnschedulablePods"
// EstimatingStepTotal - label of estimating step, total step
EstimatingStepTotal = "Total"
)

View File

@ -1,9 +1,21 @@
package replica
import (
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
listappsv1 "k8s.io/client-go/listers/apps/v1"
listcorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/pkg/estimator/server/nodes"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
utilworkload "github.com/karmada-io/karmada/pkg/util/workload"
)
// NodeMaxAvailableReplica calculates max available replicas of a node, based on
@ -15,3 +27,67 @@ func NodeMaxAvailableReplica(node *corev1.Node, pods []*corev1.Pod, request core
}
return int32(ni.MaxReplicaDivided(request)), nil
}
// ListerWrapper is a wrapper which wraps the pod lister and replicaset lister.
type ListerWrapper struct {
listcorev1.PodLister
listappsv1.ReplicaSetLister
}
// GetUnschedulablePodsOfWorkload will return how many unschedulable pods a workload derives.
func GetUnschedulablePodsOfWorkload(unstructObj *unstructured.Unstructured, threshold time.Duration, listers *ListerWrapper) (int32, error) {
if threshold < 0 {
threshold = 0
}
unschedulable := 0
// Workloads could be classified into two types. The one is which owns ReplicaSet
// and the other is which owns Pod directly.
switch unstructObj.GetKind() {
case util.DeploymentKind:
deployment, err := helper.ConvertToDeployment(unstructObj)
if err != nil {
return 0, fmt.Errorf("failed to convert ReplicaSet from unstructured object: %v", err)
}
pods, err := listDeploymentPods(deployment, listers)
if err != nil {
return 0, err
}
for _, pod := range pods {
if podUnschedulable(pod, threshold) {
unschedulable++
}
}
default:
// TODO(Garrybest): add abstract workload
return 0, fmt.Errorf("kind(%s) of workload(%s) is not supported", unstructObj.GetKind(), klog.KObj(unstructObj).String())
}
return int32(unschedulable), nil
}
func podUnschedulable(pod *corev1.Pod, threshold time.Duration) bool {
_, cond := helper.GetPodCondition(&pod.Status, corev1.PodScheduled)
return cond != nil && cond.Status == corev1.ConditionFalse && cond.Reason == corev1.PodReasonUnschedulable &&
cond.LastTransitionTime.Add(threshold).Before(time.Now())
}
func listDeploymentPods(deployment *appsv1.Deployment, listers *ListerWrapper) ([]*corev1.Pod, error) {
// Get ReplicaSet
rsListFunc := func(namespace string, selector labels.Selector) ([]*appsv1.ReplicaSet, error) {
return listers.ReplicaSetLister.ReplicaSets(namespace).List(selector)
}
rs, err := utilworkload.GetNewReplicaSet(deployment, rsListFunc)
if err != nil {
return nil, err
}
podListFunc := func(namespace string, selector labels.Selector) ([]*corev1.Pod, error) {
return listers.PodLister.Pods(namespace).List(selector)
}
pods, err := utilworkload.ListPodsByRS(deployment, []*appsv1.ReplicaSet{rs}, podListFunc)
if err != nil {
return nil, err
}
return pods, nil
}

View File

@ -9,11 +9,18 @@ import (
"github.com/kr/pretty"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
infov1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
listv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
@ -24,38 +31,62 @@ import (
"github.com/karmada-io/karmada/pkg/estimator/server/replica"
estimatorservice "github.com/karmada-io/karmada/pkg/estimator/service"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/informermanager"
"github.com/karmada-io/karmada/pkg/util/informermanager/keys"
)
const (
nodeNameKeyIndex = "spec.nodeName"
)
var (
// TODO(Garrybest): make it as an option
supportedGVRs = []schema.GroupVersionResource{
appsv1.SchemeGroupVersion.WithResource("deployments"),
}
)
// AccurateSchedulerEstimatorServer is the gRPC server of a cluster accurate scheduler estimator.
// Please see https://github.com/karmada-io/karmada/pull/580 (#580).
type AccurateSchedulerEstimatorServer struct {
port int
clusterName string
kubeClient kubernetes.Interface
restMapper meta.RESTMapper
informerFactory informers.SharedInformerFactory
nodeInformer infov1.NodeInformer
podInformer infov1.PodInformer
nodeLister listv1.NodeLister
podLister listv1.PodLister
replicaLister *replica.ListerWrapper
getPodFunc func(nodeName string) ([]*corev1.Pod, error)
informerManager informermanager.SingleClusterInformerManager
}
// NewEstimatorServer creates an instance of AccurateSchedulerEstimatorServer.
func NewEstimatorServer(kubeClient kubernetes.Interface, opts *options.Options) *AccurateSchedulerEstimatorServer {
func NewEstimatorServer(
kubeClient kubernetes.Interface,
dynamicClient dynamic.Interface,
discoveryClient discovery.DiscoveryInterface,
opts *options.Options,
stopChan <-chan struct{},
) *AccurateSchedulerEstimatorServer {
cachedDiscoClient := cacheddiscovery.NewMemCacheClient(discoveryClient)
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscoClient)
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
es := &AccurateSchedulerEstimatorServer{
port: opts.ServerPort,
clusterName: opts.ClusterName,
kubeClient: kubeClient,
restMapper: restMapper,
informerFactory: informerFactory,
nodeInformer: informerFactory.Core().V1().Nodes(),
podInformer: informerFactory.Core().V1().Pods(),
nodeLister: informerFactory.Core().V1().Nodes().Lister(),
podLister: informerFactory.Core().V1().Pods().Lister(),
replicaLister: &replica.ListerWrapper{
PodLister: informerFactory.Core().V1().Pods().Lister(),
ReplicaSetLister: informerFactory.Apps().V1().ReplicaSets().Lister(),
},
}
// Establish a connection between the pods and their assigned nodes.
@ -93,20 +124,30 @@ func NewEstimatorServer(kubeClient kubernetes.Interface, opts *options.Options)
}
return pods, nil
}
es.informerManager = informermanager.NewSingleClusterInformerManager(dynamicClient, 0, stopChan)
for _, gvr := range supportedGVRs {
es.informerManager.Lister(gvr)
}
return es
}
// Start runs the accurate replica estimator server.
func (es *AccurateSchedulerEstimatorServer) Start(ctx context.Context) error {
stopCh := ctx.Done()
klog.Infof("Starting karmada cluster(%s) accurate replica estimator", es.clusterName)
defer klog.Infof("Shutting down cluster(%s) accurate replica estimator", es.clusterName)
klog.Infof("Starting karmada cluster(%s) accurate scheduler estimator", es.clusterName)
defer klog.Infof("Shutting down cluster(%s) accurate scheduler estimator", es.clusterName)
es.informerFactory.Start(stopCh)
if !es.waitForCacheSync(stopCh) {
return fmt.Errorf("failed to wait for cache sync")
}
es.informerManager.Start()
synced := es.informerManager.WaitForCacheSync()
if synced == nil {
return fmt.Errorf("informer factory for cluster does not exist")
}
// Listen a port and register the gRPC server.
l, err := net.Listen("tcp", fmt.Sprintf(":%d", es.port))
if err != nil {
@ -169,9 +210,46 @@ func (es *AccurateSchedulerEstimatorServer) MaxAvailableReplicas(ctx context.Con
// GetUnschedulableReplicas is the implementation of gRPC interface. It will return the
// unschedulable replicas of a workload.
func (es *AccurateSchedulerEstimatorServer) GetUnschedulableReplicas(ctx context.Context, request *pb.UnschedulableReplicasRequest) (*pb.UnschedulableReplicasResponse, error) {
//TODO(Garrybest): implement me
return nil, nil
func (es *AccurateSchedulerEstimatorServer) GetUnschedulableReplicas(ctx context.Context, request *pb.UnschedulableReplicasRequest) (response *pb.UnschedulableReplicasResponse, rerr error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
klog.Warningf("No metadata from context.")
}
var object string
if m := md.Get(string(util.ContextKeyObject)); len(m) != 0 {
object = m[0]
}
defer traceGetUnschedulableReplicas(object, time.Now(), request)(&response, &rerr)
if request.Cluster != es.clusterName {
return nil, fmt.Errorf("cluster name does not match, got: %s, desire: %s", request.Cluster, es.clusterName)
}
// Get the workload.
startTime := time.Now()
gvk := schema.FromAPIVersionAndKind(request.Resource.APIVersion, request.Resource.Kind)
unstructObj, err := helper.GetObjectFromSingleClusterCache(es.restMapper, es.informerManager, &keys.ClusterWideKey{
Group: gvk.Group,
Version: gvk.Version,
Kind: gvk.Kind,
Namespace: request.Resource.Namespace,
Name: request.Resource.Name,
})
metrics.UpdateEstimatingAlgorithmLatency(err, metrics.EstimatingTypeGetUnschedulableReplicas, metrics.EstimatingStepGetObjectFromCache, startTime)
if err != nil {
return nil, err
}
// List all unschedulable replicas.
startTime = time.Now()
unschedulables, err := replica.GetUnschedulablePodsOfWorkload(unstructObj, request.UnschedulableThreshold, es.replicaLister)
metrics.UpdateEstimatingAlgorithmLatency(err, metrics.EstimatingTypeGetUnschedulableReplicas, metrics.EstimatingStepGetUnschedulablePodsOfWorkload, startTime)
if err != nil {
return nil, err
}
return &pb.UnschedulableReplicasResponse{UnschedulableReplicas: unschedulables}, err
}
func (es *AccurateSchedulerEstimatorServer) maxAvailableReplicas(nodes []*corev1.Node, request corev1.ResourceList) int32 {
@ -216,3 +294,16 @@ func traceMaxAvailableReplicas(object string, start time.Time, request *pb.MaxAv
klog.Infof("Finish calculating cluster available replicas of resource(%s), max replicas: %d, time elapsed: %s", object, (*response).MaxReplicas, time.Since(start))
}
}
func traceGetUnschedulableReplicas(object string, start time.Time, request *pb.UnschedulableReplicasRequest) func(response **pb.UnschedulableReplicasResponse, err *error) {
klog.V(4).Infof("Begin detecting cluster unscheduable replicas of resource(%s), request: %s", object, pretty.Sprint(*request))
return func(response **pb.UnschedulableReplicasResponse, err *error) {
metrics.CountRequests(*err, metrics.EstimatingTypeGetUnschedulableReplicas)
metrics.UpdateEstimatingAlgorithmLatency(*err, metrics.EstimatingTypeGetUnschedulableReplicas, metrics.EstimatingStepTotal, start)
if *err != nil {
klog.Errorf("Failed to detect cluster unscheduable replicas: %v", *err)
return
}
klog.Infof("Finish detecting cluster unscheduable replicas of resource(%s), unschedulable replicas: %d, time elapsed: %s", object, (*response).UnschedulableReplicas, time.Since(start))
}
}

View File

@ -6,9 +6,15 @@ import (
"reflect"
"testing"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
discoveryfake "k8s.io/client-go/discovery/fake"
dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/fake"
coretesting "k8s.io/client-go/testing"
"github.com/karmada-io/karmada/cmd/scheduler-estimator/app/options"
"github.com/karmada-io/karmada/pkg/estimator/pb"
@ -205,11 +211,27 @@ func TestAccurateSchedulerEstimatorServer_MaxAvailableReplicas(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
es := NewEstimatorServer(fake.NewSimpleClientset(tt.objs...), opt)
gvrToListKind := map[schema.GroupVersionResource]string{
{Group: "apps", Version: "v1", Resource: "deployments"}: "DeploymentList",
}
dynamicClient := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(runtime.NewScheme(), gvrToListKind)
discoveryClient := &discoveryfake.FakeDiscovery{
Fake: &coretesting.Fake{},
}
discoveryClient.Resources = []*metav1.APIResourceList{
{
GroupVersion: appsv1.SchemeGroupVersion.String(),
APIResources: []metav1.APIResource{
{Name: "deployments", Namespaced: true, Kind: "Deployment"},
},
},
}
es := NewEstimatorServer(fake.NewSimpleClientset(tt.objs...), dynamicClient, discoveryClient, opt, ctx.Done())
es.informerFactory.Start(ctx.Done())
if !es.waitForCacheSync(ctx.Done()) {
t.Errorf("MaxAvailableReplicas() error = %v, wantErr %v", fmt.Errorf("failed to wait for cache sync"), tt.wantErr)
t.Fatalf("MaxAvailableReplicas() error = %v, wantErr %v", fmt.Errorf("failed to wait for cache sync"), tt.wantErr)
}
gotResponse, err := es.MaxAvailableReplicas(ctx, tt.args.request)

28
pkg/util/helper/pod.go Normal file
View File

@ -0,0 +1,28 @@
package helper
import (
corev1 "k8s.io/api/core/v1"
)
// GetPodCondition extracts the provided condition from the given status and returns that.
// Returns nil and -1 if the condition is not present, and the index of the located condition.
func GetPodCondition(status *corev1.PodStatus, conditionType corev1.PodConditionType) (int, *corev1.PodCondition) {
if status == nil {
return -1, nil
}
return GetPodConditionFromList(status.Conditions, conditionType)
}
// GetPodConditionFromList extracts the provided condition from the given list of condition and
// returns the index of the condition and the condition. Returns -1 and nil if the condition is not present.
func GetPodConditionFromList(conditions []corev1.PodCondition, conditionType corev1.PodConditionType) (int, *corev1.PodCondition) {
if conditions == nil {
return -1, nil
}
for i := range conditions {
if conditions[i].Type == conditionType {
return i, &conditions[i]
}
}
return -1, nil
}

View File

@ -0,0 +1,151 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This code is lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package.
// However the code has been revised for using Lister instead of API interface.
// For reference: https://github.com/kubernetes/kubernetes/blob/release-1.22/pkg/controller/deployment/util/deployment_util.go
package workload
import (
"sort"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
)
// PodListFunc returns the Pod slice from the Pod namespace and a selector.
type PodListFunc func(string, labels.Selector) ([]*corev1.Pod, error)
// ReplicaSetListFunc returns the ReplicaSet slice from the ReplicaSet namespace and a selector.
type ReplicaSetListFunc func(string, labels.Selector) ([]*appsv1.ReplicaSet, error)
// ReplicaSetsByCreationTimestamp sorts a list of ReplicaSet by creation timestamp, using their names as a tie breaker.
type ReplicaSetsByCreationTimestamp []*appsv1.ReplicaSet
func (o ReplicaSetsByCreationTimestamp) Len() int { return len(o) }
func (o ReplicaSetsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o ReplicaSetsByCreationTimestamp) Less(i, j int) bool {
if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
return o[i].Name < o[j].Name
}
return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
}
// ListReplicaSetsByDeployment returns a slice of RSes the given deployment targets.
// Note that this does NOT attempt to reconcile ControllerRef (adopt/orphan),
// because only the controller itself should do that.
// However, it does filter out anything whose ControllerRef doesn't match.
func ListReplicaSetsByDeployment(deployment *appsv1.Deployment, f ReplicaSetListFunc) ([]*appsv1.ReplicaSet, error) {
// TODO: Right now we list replica sets by their labels. We should list them by selector, i.e. the replica set's selector
// should be a superset of the deployment's selector, see https://github.com/kubernetes/kubernetes/issues/19830.
namespace := deployment.Namespace
selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
if err != nil {
return nil, err
}
all, err := f(namespace, selector)
if err != nil {
return nil, err
}
// Only include those whose ControllerRef matches the Deployment.
owned := make([]*appsv1.ReplicaSet, 0, len(all))
for _, rs := range all {
if metav1.IsControlledBy(rs, deployment) {
owned = append(owned, rs)
}
}
return owned, nil
}
// ListPodsByRS returns a list of pods the given deployment targets.
// This needs a list of ReplicaSets for the Deployment,
// which can be found with ListReplicaSets().
// Note that this does NOT attempt to reconcile ControllerRef (adopt/orphan),
// because only the controller itself should do that.
// However, it does filter out anything whose ControllerRef doesn't match.
func ListPodsByRS(deployment *appsv1.Deployment, rsList []*appsv1.ReplicaSet, f PodListFunc) ([]*corev1.Pod, error) {
namespace := deployment.Namespace
selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
if err != nil {
return nil, err
}
all, err := f(namespace, selector)
if err != nil {
return all, err
}
// Only include those whose ControllerRef points to a ReplicaSet that is in
// turn owned by this Deployment.
rsMap := make(map[types.UID]bool, len(rsList))
for _, rs := range rsList {
if rs != nil {
rsMap[rs.UID] = true
}
}
owned := make([]*corev1.Pod, 0, len(all))
for i := range all {
pod := all[i]
controllerRef := metav1.GetControllerOf(pod)
if controllerRef != nil && rsMap[controllerRef.UID] {
owned = append(owned, pod)
}
}
return owned, nil
}
// EqualIgnoreHash returns true if two given podTemplateSpec are equal, ignoring the diff in value of Labels[pod-template-hash]
// We ignore pod-template-hash because:
// 1. The hash result would be different upon podTemplateSpec API changes
// (e.g. the addition of a new field will cause the hash code to change)
// 2. The deployment template won't have hash labels
func EqualIgnoreHash(template1, template2 *corev1.PodTemplateSpec) bool {
t1Copy := template1.DeepCopy()
t2Copy := template2.DeepCopy()
// Remove hash labels from template.Labels before comparing
delete(t1Copy.Labels, appsv1.DefaultDeploymentUniqueLabelKey)
delete(t2Copy.Labels, appsv1.DefaultDeploymentUniqueLabelKey)
return equality.Semantic.DeepEqual(t1Copy, t2Copy)
}
// GetNewReplicaSet returns a replica set that matches the intent of the given deployment; get ReplicaSetList from client interface.
// Returns nil if the new replica set doesn't exist yet.
func GetNewReplicaSet(deployment *appsv1.Deployment, f ReplicaSetListFunc) (*appsv1.ReplicaSet, error) {
rsList, err := ListReplicaSetsByDeployment(deployment, f)
if err != nil {
return nil, err
}
return FindNewReplicaSet(deployment, rsList), nil
}
// FindNewReplicaSet returns the new RS this given deployment targets (the one with the same pod template).
func FindNewReplicaSet(deployment *appsv1.Deployment, rsList []*appsv1.ReplicaSet) *appsv1.ReplicaSet {
sort.Sort(ReplicaSetsByCreationTimestamp(rsList))
for i := range rsList {
if EqualIgnoreHash(&rsList[i].Spec.Template, &deployment.Spec.Template) {
// In rare cases, such as after cluster upgrades, Deployment may end up with
// having more than one new ReplicaSets that have the same template as its template,
// see https://github.com/kubernetes/kubernetes/issues/40415
// We deterministically choose the oldest new ReplicaSet.
return rsList[i]
}
}
// new ReplicaSet does not exist.
return nil
}