add karmada-scheduler-estimator

Signed-off-by: Garrybest <garrybest@foxmail.com>
This commit is contained in:
Garrybest 2021-09-10 16:13:56 +08:00
parent f22219c583
commit 5411290137
8 changed files with 680 additions and 4 deletions

View File

@ -0,0 +1,48 @@
package options
import (
"github.com/spf13/pflag"
)
const (
defaultBindAddress = "0.0.0.0"
defaultServerPort = 10352
defaultHealthzPort = 10351
)
// Options contains everything necessary to create and run scheduler-estimator.
type Options struct {
KubeConfig string
Master string
ClusterName string
// BindAddress is the IP address on which to listen for the --secure-port port.
BindAddress string
// SecurePort is the port that the server serves at.
SecurePort int
// ServerPort is the port that the server gRPC serves at.
ServerPort int
// ClusterAPIQPS is the QPS to use while talking with cluster kube-apiserver.
ClusterAPIQPS float32
// ClusterAPIBurst is the burst to allow while talking with cluster kube-apiserver.
ClusterAPIBurst int
}
// NewOptions builds an empty options.
func NewOptions() *Options {
return &Options{}
}
// AddFlags adds flags of estimator to the specified FlagSet
func (o *Options) AddFlags(fs *pflag.FlagSet) {
if o == nil {
return
}
fs.StringVar(&o.KubeConfig, "kubeconfig", o.KubeConfig, "Path to a KubeConfig. Only required if out-of-cluster.")
fs.StringVar(&o.Master, "master", o.Master, "The address of the member Kubernetes API server. Overrides any value in KubeConfig. Only required if out-of-cluster.")
fs.StringVar(&o.ClusterName, "cluster-name", o.ClusterName, "Name of member cluster that the estimator serves for.")
fs.StringVar(&o.BindAddress, "bind-address", defaultBindAddress, "The IP address on which to listen for the --secure-port port.")
fs.IntVar(&o.ServerPort, "server-port", defaultServerPort, "The secure port on which to serve gRPC.")
fs.IntVar(&o.SecurePort, "secure-port", defaultHealthzPort, "The secure port on which to serve HTTPS.")
fs.Float32Var(&o.ClusterAPIQPS, "kube-api-qps", 20.0, "QPS to use while talking with apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.IntVar(&o.ClusterAPIBurst, "kube-api-burst", 30, "Burst to use while talking with apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
}

View File

@ -0,0 +1,67 @@
package app
import (
"context"
"flag"
"fmt"
"net/http"
"os"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/cmd/scheduler-estimator/app/options"
"github.com/karmada-io/karmada/pkg/estimator/server"
)
// NewSchedulerEstimatorCommand creates a *cobra.Command object with default parameters
func NewSchedulerEstimatorCommand(ctx context.Context) *cobra.Command {
opts := options.NewOptions()
cmd := &cobra.Command{
Use: "scheduler-estimator",
Long: `The scheduler estimator runs an accurate scheduler estimator of a cluster`,
Run: func(cmd *cobra.Command, args []string) {
if err := run(ctx, opts); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
opts.AddFlags(cmd.Flags())
cmd.Flags().AddGoFlagSet(flag.CommandLine)
return cmd
}
func run(ctx context.Context, opts *options.Options) error {
go serveHealthz(fmt.Sprintf("%s:%d", opts.BindAddress, opts.SecurePort))
restConfig, err := clientcmd.BuildConfigFromFlags(opts.Master, opts.KubeConfig)
if err != nil {
return fmt.Errorf("error building kubeconfig: %s", err.Error())
}
restConfig.QPS, restConfig.Burst = opts.ClusterAPIQPS, opts.ClusterAPIBurst
kubeClientSet := kubernetes.NewForConfigOrDie(restConfig)
e := server.NewEstimatorServer(kubeClientSet, opts)
if err = e.Start(ctx); err != nil {
klog.Errorf("estimator server exits unexpectedly: %v", err)
return err
}
// never reach here
return nil
}
func serveHealthz(address string) {
http.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
})
klog.Fatal(http.ListenAndServe(address, nil))
}

View File

@ -0,0 +1,23 @@
package main
import (
"fmt"
"os"
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/component-base/logs"
"github.com/karmada-io/karmada/cmd/scheduler-estimator/app"
)
func main() {
logs.InitLogs()
defer logs.FlushLogs()
ctx := apiserver.SetupSignalContext()
if err := app.NewSchedulerEstimatorCommand(ctx).Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}

View File

@ -0,0 +1,98 @@
package nodes
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
listv1 "k8s.io/client-go/listers/core/v1"
schedcorev1 "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"github.com/karmada-io/karmada/pkg/estimator/pb"
)
// ListNodesByNodeClaim returns all nodes that match the node claim.
func ListNodesByNodeClaim(nodeLister listv1.NodeLister, claim *pb.NodeClaim) ([]*corev1.Node, error) {
nodeClaim := claim
if nodeClaim == nil {
nodeClaim = &pb.NodeClaim{}
}
nodes, err := ListNodesByLabelSelector(nodeLister, labels.SelectorFromSet(nodeClaim.NodeSelector))
if err != nil {
return nil, fmt.Errorf("cannot list nodes by label selector, %v", err)
}
nodes, err = FilterNodesByNodeAffinity(nodes, nodeClaim.NodeAffinity)
if err != nil {
return nil, fmt.Errorf("cannot filter nodes by node affinity, %v", err)
}
nodes, err = FilterSchedulableNodes(nodes, nodeClaim.Tolerations)
if err != nil {
return nil, fmt.Errorf("cannot filter nodes by tolerations, %v", err)
}
return nodes, err
}
// ListAllNodes returns all nodes.
func ListAllNodes(nodeLister listv1.NodeLister) ([]*corev1.Node, error) {
return ListNodesByLabelSelector(nodeLister, labels.Everything())
}
// ListNodesByLabelSelector returns nodes that match the node selector.
func ListNodesByLabelSelector(nodeLister listv1.NodeLister, selector labels.Selector) ([]*corev1.Node, error) {
nodes, err := nodeLister.List(selector)
if err != nil {
return nil, err
}
return nodes, nil
}
// FilterNodesByNodeAffinity returns nodes that match the node affinity.
func FilterNodesByNodeAffinity(nodes []*corev1.Node, affinity *corev1.NodeSelector) ([]*corev1.Node, error) {
if affinity == nil {
return nodes, nil
}
matchedNodes := make([]*corev1.Node, 0)
selector, err := nodeaffinity.NewNodeSelector(affinity)
if err != nil {
return nil, err
}
for _, node := range nodes {
if selector.Match(node) {
matchedNodes = append(matchedNodes, node)
}
}
return matchedNodes, nil
}
// FilterSchedulableNodes filters schedulable nodes that match the given tolerations.
func FilterSchedulableNodes(nodes []*corev1.Node, tolerations []corev1.Toleration) ([]*corev1.Node, error) {
filterPredicate := func(t *corev1.Taint) bool {
// PodToleratesNodeTaints is only interested in NoSchedule and NoExecute taints.
return t.Effect == corev1.TaintEffectNoSchedule || t.Effect == corev1.TaintEffectNoExecute
}
matchedNodes := make([]*corev1.Node, 0)
for _, node := range nodes {
if node.Spec.Unschedulable {
continue
}
if !IsNodeReady(node.Status.Conditions) {
continue
}
if _, isUntolerated := schedcorev1.FindMatchingUntoleratedTaint(node.Spec.Taints, tolerations, filterPredicate); isUntolerated {
continue
}
matchedNodes = append(matchedNodes, node)
}
return matchedNodes, nil
}
// IsNodeReady checks whether the node condition is ready.
func IsNodeReady(nodeStatus []corev1.NodeCondition) bool {
for i := range nodeStatus {
if nodeStatus[i].Type == corev1.NodeReady {
return nodeStatus[i].Status == corev1.ConditionTrue
}
}
return false
}

View File

@ -0,0 +1,54 @@
package nodes
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"github.com/karmada-io/karmada/pkg/util"
)
// NodeInfo is the wrapper of a node and its resource.
type NodeInfo struct {
Node *corev1.Node
AllocatableResource *util.Resource
IdleResource *util.Resource
}
// NewNodeInfo returns a instance of NodeInfo. The initial IdleResource equals with AllocatableResource.
func NewNodeInfo(node *corev1.Node) *NodeInfo {
allocatableResource := util.NewResource(node.Status.Allocatable)
return &NodeInfo{
Node: node,
AllocatableResource: allocatableResource,
IdleResource: allocatableResource,
}
}
// AssignedPodRequest counts the effective request resource of pods for the node.
// IdleResource will be subtracted from the all pod request resources when this function is called.
func (ni *NodeInfo) AssignedPodRequest(pods []*corev1.Pod) error {
occupiedResource := util.EmptyResource()
for _, pod := range pods {
occupiedResource.AddPodRequest(&pod.Spec)
}
// The pod request does not contain a pod resource, so we add it manually.
occupiedResource.AddResourcePods(int64(len(pods)))
// The occupied resource must be less than or equal with the node idle resource.
if !occupiedResource.LessEqual(ni.IdleResource) {
return fmt.Errorf("node %s does have enough idle resource to accommodate %d pods", ni.Node.Name, len(pods))
}
// subtract
if err := ni.IdleResource.Sub(occupiedResource.ResourceList()); err != nil {
return err
}
return nil
}
// MaxReplicaDivided returns how many replicas that the node can produce.
func (ni *NodeInfo) MaxReplicaDivided(rl corev1.ResourceList) int64 {
return ni.IdleResource.MaxDivided(rl)
}

View File

@ -0,0 +1,17 @@
package replica
import (
corev1 "k8s.io/api/core/v1"
"github.com/karmada-io/karmada/pkg/estimator/server/nodes"
)
// NodeMaxAvailableReplica calculates max available replicas of a node, based on
// the pods assigned to the node and the request resource of the replica.
func NodeMaxAvailableReplica(node *corev1.Node, pods []*corev1.Pod, request corev1.ResourceList) (int32, error) {
ni := nodes.NewNodeInfo(node)
if err := ni.AssignedPodRequest(pods); err != nil {
return 0, err
}
return int32(ni.MaxReplicaDivided(request)), nil
}

View File

@ -0,0 +1,205 @@
package server
import (
"context"
"fmt"
"net"
"net/http"
"time"
"github.com/kr/pretty"
"google.golang.org/grpc"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
infov1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
listv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/cmd/scheduler-estimator/app/options"
"github.com/karmada-io/karmada/pkg/estimator/pb"
nodeutil "github.com/karmada-io/karmada/pkg/estimator/server/nodes"
"github.com/karmada-io/karmada/pkg/estimator/server/replica"
)
const (
nodeNameKeyIndex = "spec.nodeName"
)
// AccurateSchedulerEstimatorServer is the gRPC server of a cluster accurate scheduler estimator.
// Please see https://github.com/karmada-io/karmada/pull/580 (#580).
type AccurateSchedulerEstimatorServer struct {
port int
clusterName string
kubeClient kubernetes.Interface
informerFactory informers.SharedInformerFactory
nodeInformer infov1.NodeInformer
podInformer infov1.PodInformer
nodeLister listv1.NodeLister
podLister listv1.PodLister
getPodFunc func(nodeName string) ([]*corev1.Pod, error)
httpServer *http.Server
}
// NewEstimatorServer creates an instance of AccurateSchedulerEstimatorServer.
func NewEstimatorServer(kubeClient kubernetes.Interface, opts *options.Options) *AccurateSchedulerEstimatorServer {
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
es := &AccurateSchedulerEstimatorServer{
port: opts.ServerPort,
clusterName: opts.ClusterName,
kubeClient: kubeClient,
informerFactory: informerFactory,
nodeInformer: informerFactory.Core().V1().Nodes(),
podInformer: informerFactory.Core().V1().Pods(),
nodeLister: informerFactory.Core().V1().Nodes().Lister(),
podLister: informerFactory.Core().V1().Pods().Lister(),
}
// Establish a connection between the pods and their assigned nodes.
_ = es.podInformer.Informer().AddIndexers(cache.Indexers{
nodeNameKeyIndex: func(obj interface{}) ([]string, error) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return []string{}, nil
}
if len(pod.Spec.NodeName) == 0 {
return []string{}, nil
}
return []string{pod.Spec.NodeName}, nil
},
})
// The indexer helps us get all the pods that assigned to a node.
podIndexer := es.podInformer.Informer().GetIndexer()
es.getPodFunc = func(nodeName string) ([]*corev1.Pod, error) {
objs, err := podIndexer.ByIndex(nodeNameKeyIndex, nodeName)
if err != nil {
return nil, err
}
pods := make([]*corev1.Pod, 0, len(objs))
for _, obj := range objs {
pod, ok := obj.(*corev1.Pod)
if !ok {
continue
}
// Succeeded and failed pods are not considered because they don't occupy any resource.
// See https://github.com/kubernetes/kubernetes/blob/f61ed439882e34d9dad28b602afdc852feb2337a/pkg/scheduler/scheduler.go#L756-L763
if pod.Status.Phase != corev1.PodSucceeded && pod.Status.Phase != corev1.PodFailed {
pods = append(pods, pod)
}
}
return pods, nil
}
return es
}
// Start runs the accurate replica estimator server.
func (es *AccurateSchedulerEstimatorServer) Start(ctx context.Context) error {
stopCh := ctx.Done()
klog.Infof("Starting karmada cluster(%s) accurate replica estimator", es.clusterName)
defer klog.Infof("Shutting down cluster(%s) accurate replica estimator", es.clusterName)
es.informerFactory.Start(stopCh)
if !es.waitForCacheSync(stopCh) {
return fmt.Errorf("failed to wait for cache sync")
}
// Listen a port and register the gRPC server.
l, err := net.Listen("tcp", fmt.Sprintf(":%d", es.port))
if err != nil {
return fmt.Errorf("failed to listen port %d: %v", es.port, err)
}
klog.Infof("Listening port: %d", es.port)
defer l.Close()
s := grpc.NewServer()
pb.RegisterEstimatorServer(s, es)
// Graceful stop when the context is cancelled.
go func() {
<-stopCh
s.GracefulStop()
if err := es.httpServer.Shutdown(context.Background()); nil != err {
klog.Fatalf("server shutdown failed, err: %v\n", err)
}
}()
// Start the gRPC server.
if err := s.Serve(l); err != nil {
return err
}
// Should never reach here.
return nil
}
// MaxAvailableReplicas is the implementation of gRPC interface. It will return the
// max available replicas that a cluster could accommodate based on its requirements.
func (es *AccurateSchedulerEstimatorServer) MaxAvailableReplicas(ctx context.Context, request *pb.MaxAvailableReplicasRequest) (response *pb.MaxAvailableReplicasResponse, rerr error) {
defer traceMaxAvailableReplicas(time.Now(), request)(&response, &rerr)
if request.Cluster != es.clusterName {
return nil, fmt.Errorf("cluster name does not match, got: %s, desire: %s", request.Cluster, es.clusterName)
}
// Step 1: Get all matched nodes by node claim
nodes, err := nodeutil.ListNodesByNodeClaim(es.nodeLister, request.ReplicaRequirements.NodeClaim)
if err != nil {
return nil, fmt.Errorf("failed to find matched nodes: %v", err)
}
// Step 2: Calculate cluster max available replicas by filtered nodes
maxReplicas := es.maxAvailableReplicas(nodes, request.ReplicaRequirements.ResourceRequest)
return &pb.MaxAvailableReplicasResponse{MaxReplicas: maxReplicas}, nil
}
func (es *AccurateSchedulerEstimatorServer) maxAvailableReplicas(nodes []*corev1.Node, request corev1.ResourceList) int32 {
var maxReplicas int32
for _, node := range nodes {
maxReplica, err := es.nodeMaxAvailableReplica(node, request)
if err != nil {
klog.Errorf("Error: %v", err)
continue
}
klog.V(4).Infof("Node(%s) max available replica: %d", node.Name, maxReplica)
maxReplicas += maxReplica
}
return maxReplicas
}
func (es *AccurateSchedulerEstimatorServer) nodeMaxAvailableReplica(node *corev1.Node, request corev1.ResourceList) (int32, error) {
pods, err := es.getPodFunc(node.Name)
if err != nil {
return 0, fmt.Errorf("failed to get pods that assigned to node %s, err: %v", node.Name, err)
}
maxReplica, err := replica.NodeMaxAvailableReplica(node, pods, request)
if err != nil {
return 0, fmt.Errorf("failed to calculating max replica of node %s, err: %v", node.Name, err)
}
return maxReplica, nil
}
func (es *AccurateSchedulerEstimatorServer) waitForCacheSync(stopCh <-chan struct{}) bool {
return cache.WaitForCacheSync(stopCh,
func() []cache.InformerSynced {
informerSynced := []cache.InformerSynced{
es.podInformer.Informer().HasSynced,
es.nodeInformer.Informer().HasSynced,
}
return informerSynced
}()...,
)
}
func traceMaxAvailableReplicas(start time.Time, request *pb.MaxAvailableReplicasRequest) func(response **pb.MaxAvailableReplicasResponse, err *error) {
klog.V(4).Infof("Begin calculating cluster available replicas, request: %s", pretty.Sprint(*request))
return func(response **pb.MaxAvailableReplicasResponse, err *error) {
if *err != nil {
klog.Errorf("Failed to calculate cluster available replicas: %v", *err)
return
}
klog.Infof("Finish calculating cluster available replicas, max replicas: %d, time elapsed: %s", (*response).MaxReplicas, time.Since(start))
}
}

View File

@ -1,6 +1,9 @@
package util
import (
"fmt"
"math"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
@ -9,8 +12,10 @@ import (
// Resource is a collection of compute resource.
type Resource struct {
MilliCPU int64
Memory int64
MilliCPU int64
Memory int64
EphemeralStorage int64
AllowedPodNumber int64
// ScalarResources
ScalarResources map[corev1.ResourceName]int64
@ -21,6 +26,28 @@ func EmptyResource() *Resource {
return &Resource{}
}
// NewResource creates a new resource object from resource list.
func NewResource(rl corev1.ResourceList) *Resource {
r := &Resource{}
for rName, rQuant := range rl {
switch rName {
case corev1.ResourceCPU:
r.MilliCPU += rQuant.MilliValue()
case corev1.ResourceMemory:
r.Memory += rQuant.Value()
case corev1.ResourcePods:
r.AllowedPodNumber += rQuant.Value()
case corev1.ResourceEphemeralStorage:
r.EphemeralStorage += rQuant.Value()
default:
if schedutil.IsScalarResourceName(rName) {
r.AddScalar(rName, rQuant.Value())
}
}
}
return r
}
// Add is used to add two resources.
func (r *Resource) Add(rl corev1.ResourceList) {
if r == nil {
@ -33,6 +60,10 @@ func (r *Resource) Add(rl corev1.ResourceList) {
r.MilliCPU += rQuant.MilliValue()
case corev1.ResourceMemory:
r.Memory += rQuant.Value()
case corev1.ResourcePods:
r.AllowedPodNumber += rQuant.Value()
case corev1.ResourceEphemeralStorage:
r.EphemeralStorage += rQuant.Value()
default:
if schedutil.IsScalarResourceName(rName) {
r.AddScalar(rName, rQuant.Value())
@ -41,6 +72,52 @@ func (r *Resource) Add(rl corev1.ResourceList) {
}
}
// Sub is used to subtract two resources.
// Return error when the minuend is less than the subtrahend.
func (r *Resource) Sub(rl corev1.ResourceList) error {
for rName, rQuant := range rl {
switch rName {
case corev1.ResourceCPU:
cpu := rQuant.MilliValue()
if r.MilliCPU < cpu {
return fmt.Errorf("cpu difference is less than 0, remain %d, got %d", r.MilliCPU, cpu)
}
r.MilliCPU -= cpu
case corev1.ResourceMemory:
mem := rQuant.Value()
if r.Memory < mem {
return fmt.Errorf("memory difference is less than 0, remain %d, got %d", r.Memory, mem)
}
r.Memory -= mem
case corev1.ResourcePods:
pods := rQuant.Value()
if r.AllowedPodNumber < pods {
return fmt.Errorf("allowed pod difference is less than 0, remain %d, got %d", r.AllowedPodNumber, pods)
}
r.AllowedPodNumber -= pods
case corev1.ResourceEphemeralStorage:
ephemeralStorage := rQuant.Value()
if r.EphemeralStorage < ephemeralStorage {
return fmt.Errorf("allowed storage number difference is less than 0, remain %d, got %d", r.EphemeralStorage, ephemeralStorage)
}
r.EphemeralStorage -= ephemeralStorage
default:
if schedutil.IsScalarResourceName(rName) {
rScalar, ok := r.ScalarResources[rName]
scalar := rQuant.Value()
if !ok {
return fmt.Errorf("scalar resources %s does not exist, got %d", rName, scalar)
}
if rScalar < scalar {
return fmt.Errorf("scalar resources %s difference is less than 0, remain %d, got %d", rName, rScalar, scalar)
}
r.ScalarResources[rName] = rScalar - scalar
}
}
}
return nil
}
// SetMaxResource compares with ResourceList and takes max value for each Resource.
func (r *Resource) SetMaxResource(rl corev1.ResourceList) {
if r == nil {
@ -57,6 +134,14 @@ func (r *Resource) SetMaxResource(rl corev1.ResourceList) {
if mem := rQuant.Value(); mem > r.Memory {
r.Memory = mem
}
case corev1.ResourceEphemeralStorage:
if ephemeralStorage := rQuant.Value(); ephemeralStorage > r.EphemeralStorage {
r.EphemeralStorage = ephemeralStorage
}
case corev1.ResourcePods:
if pods := rQuant.Value(); pods > r.AllowedPodNumber {
r.AllowedPodNumber = pods
}
default:
if schedutil.IsScalarResourceName(rName) {
if value := rQuant.Value(); value > r.ScalarResources[rName] {
@ -84,8 +169,10 @@ func (r *Resource) SetScalar(name corev1.ResourceName, quantity int64) {
// ResourceList returns a resource list of this resource.
func (r *Resource) ResourceList() corev1.ResourceList {
result := corev1.ResourceList{
corev1.ResourceCPU: *resource.NewMilliQuantity(r.MilliCPU, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(r.Memory, resource.BinarySI),
corev1.ResourceCPU: *resource.NewMilliQuantity(r.MilliCPU, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(r.Memory, resource.BinarySI),
corev1.ResourceEphemeralStorage: *resource.NewQuantity(r.EphemeralStorage, resource.BinarySI),
corev1.ResourcePods: *resource.NewQuantity(r.AllowedPodNumber, resource.DecimalSI),
}
for rName, rQuant := range r.ScalarResources {
if v1helper.IsHugePageResourceName(rName) {
@ -97,6 +184,66 @@ func (r *Resource) ResourceList() corev1.ResourceList {
return result
}
// MaxDivided returns how many replicas that the resource can be divided.
func (r *Resource) MaxDivided(rl corev1.ResourceList) int64 {
res := int64(math.MaxInt64)
for rName, rQuant := range rl {
switch rName {
case corev1.ResourceCPU:
if cpu := rQuant.MilliValue(); cpu > 0 {
res = MinInt64(res, r.MilliCPU/cpu)
}
case corev1.ResourceMemory:
if mem := rQuant.Value(); mem > 0 {
res = MinInt64(res, r.Memory/mem)
}
case corev1.ResourceEphemeralStorage:
if ephemeralStorage := rQuant.Value(); ephemeralStorage > 0 {
res = MinInt64(res, r.EphemeralStorage/ephemeralStorage)
}
default:
if schedutil.IsScalarResourceName(rName) {
rScalar, ok := r.ScalarResources[rName]
if !ok {
return 0
}
if scalar := rQuant.Value(); scalar > 0 {
res = MinInt64(res, rScalar/scalar)
}
}
}
}
res = MinInt64(res, r.AllowedPodNumber)
return res
}
// LessEqual returns whether all dimensions of resources in r are less than or equal with that of rr.
func (r *Resource) LessEqual(rr *Resource) bool {
lessEqualFunc := func(l, r int64) bool {
return l <= r
}
if !lessEqualFunc(r.MilliCPU, rr.MilliCPU) {
return false
}
if !lessEqualFunc(r.Memory, rr.Memory) {
return false
}
if !lessEqualFunc(r.EphemeralStorage, rr.EphemeralStorage) {
return false
}
if !lessEqualFunc(r.AllowedPodNumber, rr.AllowedPodNumber) {
return false
}
for rrName, rrQuant := range rr.ScalarResources {
rQuant := r.ScalarResources[rrName]
if !lessEqualFunc(rQuant, rrQuant) {
return false
}
}
return true
}
// AddPodRequest add the effective request resource of a pod to the origin resource.
// The Pod's effective request is the higher of:
// - the sum of all app containers(spec.Containers) request for a resource.
@ -111,3 +258,20 @@ func (r *Resource) AddPodRequest(podSpec *corev1.PodSpec) *Resource {
}
return r
}
// AddResourcePods adds pod resources into the Resource.
// Notice that a pod request resource list does not contain a request for pod resources,
// this function helps to add the pod resources.
func (r *Resource) AddResourcePods(pods int64) {
r.Add(corev1.ResourceList{
corev1.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI),
})
}
// MinInt64 returns the smaller of two int64 numbers.
func MinInt64(a, b int64) int64 {
if a <= b {
return a
}
return b
}