add parallelism in estimator
Signed-off-by: Garrybest <garrybest@foxmail.com>
This commit is contained in:
parent
b86461e82f
commit
5e9d5d79bb
|
@ -25,6 +25,8 @@ type Options struct {
|
|||
ClusterAPIQPS float32
|
||||
// ClusterAPIBurst is the burst to allow while talking with cluster kube-apiserver.
|
||||
ClusterAPIBurst int
|
||||
// Parallelism defines the amount of parallelism in algorithms for estimating. Must be greater than 0. Defaults to 16.
|
||||
Parallelism int
|
||||
}
|
||||
|
||||
// NewOptions builds an empty options.
|
||||
|
@ -45,4 +47,5 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
|
|||
fs.IntVar(&o.SecurePort, "secure-port", defaultHealthzPort, "The secure port on which to serve HTTPS.")
|
||||
fs.Float32Var(&o.ClusterAPIQPS, "kube-api-qps", 20.0, "QPS to use while talking with apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
|
||||
fs.IntVar(&o.ClusterAPIBurst, "kube-api-burst", 30, "Burst to use while talking with apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
|
||||
fs.IntVar(&o.Parallelism, "parallelism", o.Parallelism, "Parallelism defines the amount of parallelism in algorithms for estimating. Must be greater than 0. Defaults to 16.")
|
||||
}
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
package nodes
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
listv1 "k8s.io/client-go/listers/core/v1"
|
||||
|
@ -13,77 +11,71 @@ import (
|
|||
"github.com/karmada-io/karmada/pkg/util/helper"
|
||||
)
|
||||
|
||||
// ListNodesByNodeClaim returns all nodes that match the node claim.
|
||||
func ListNodesByNodeClaim(nodeLister listv1.NodeLister, claim *pb.NodeClaim) ([]*corev1.Node, error) {
|
||||
nodeClaim := claim
|
||||
if nodeClaim == nil {
|
||||
nodeClaim = &pb.NodeClaim{}
|
||||
var (
|
||||
tolerationFilterPredicate = func(t *corev1.Taint) bool {
|
||||
// PodToleratesNodeTaints is only interested in NoSchedule and NoExecute taints.
|
||||
return t.Effect == corev1.TaintEffectNoSchedule || t.Effect == corev1.TaintEffectNoExecute
|
||||
}
|
||||
nodes, err := ListNodesByLabelSelector(nodeLister, labels.SelectorFromSet(nodeClaim.NodeSelector))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot list nodes by label selector, %v", err)
|
||||
}
|
||||
nodes, err = FilterNodesByNodeAffinity(nodes, nodeClaim.NodeAffinity)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot filter nodes by node affinity, %v", err)
|
||||
}
|
||||
nodes, err = FilterSchedulableNodes(nodes, nodeClaim.Tolerations)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot filter nodes by tolerations, %v", err)
|
||||
}
|
||||
return nodes, err
|
||||
)
|
||||
|
||||
// NodeClaimWrapper is a wrapper that wraps the node claim.
|
||||
type NodeClaimWrapper struct {
|
||||
nodeSelector labels.Selector
|
||||
tolerations []corev1.Toleration
|
||||
nodeAffinitySelector *nodeaffinity.NodeSelector
|
||||
}
|
||||
|
||||
// ListAllNodes returns all nodes.
|
||||
func ListAllNodes(nodeLister listv1.NodeLister) ([]*corev1.Node, error) {
|
||||
return ListNodesByLabelSelector(nodeLister, labels.Everything())
|
||||
// NewNodeClaimWrapper returns a new NodeClaimWrapper.
|
||||
func NewNodeClaimWrapper(claim *pb.NodeClaim) (*NodeClaimWrapper, error) {
|
||||
wrapper := &NodeClaimWrapper{}
|
||||
if claim == nil {
|
||||
wrapper.nodeSelector = labels.Everything()
|
||||
return wrapper, nil
|
||||
}
|
||||
if claim.NodeAffinity != nil {
|
||||
selector, err := nodeaffinity.NewNodeSelector(claim.NodeAffinity)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
wrapper.nodeAffinitySelector = selector
|
||||
}
|
||||
wrapper.nodeSelector = labels.SelectorFromSet(claim.NodeSelector)
|
||||
wrapper.tolerations = claim.Tolerations
|
||||
return wrapper, nil
|
||||
}
|
||||
|
||||
// ListNodesByLabelSelector returns nodes that match the node selector.
|
||||
func ListNodesByLabelSelector(nodeLister listv1.NodeLister, selector labels.Selector) ([]*corev1.Node, error) {
|
||||
nodes, err := nodeLister.List(selector)
|
||||
func (w *NodeClaimWrapper) ListNodesByLabelSelector(nodeLister listv1.NodeLister) ([]*corev1.Node, error) {
|
||||
nodes, err := nodeLister.List(w.nodeSelector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
// FilterNodesByNodeAffinity returns nodes that match the node affinity.
|
||||
func FilterNodesByNodeAffinity(nodes []*corev1.Node, affinity *corev1.NodeSelector) ([]*corev1.Node, error) {
|
||||
if affinity == nil {
|
||||
return nodes, nil
|
||||
}
|
||||
matchedNodes := make([]*corev1.Node, 0)
|
||||
selector, err := nodeaffinity.NewNodeSelector(affinity)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, node := range nodes {
|
||||
if selector.Match(node) {
|
||||
matchedNodes = append(matchedNodes, node)
|
||||
}
|
||||
}
|
||||
return matchedNodes, nil
|
||||
// IsNodeMatched returns whether the node matches all conditions.
|
||||
func (w *NodeClaimWrapper) IsNodeMatched(node *corev1.Node) bool {
|
||||
return w.IsNodeAffinityMatched(node) && w.IsNodeSchedulable(node)
|
||||
}
|
||||
|
||||
// FilterSchedulableNodes filters schedulable nodes that match the given tolerations.
|
||||
func FilterSchedulableNodes(nodes []*corev1.Node, tolerations []corev1.Toleration) ([]*corev1.Node, error) {
|
||||
filterPredicate := func(t *corev1.Taint) bool {
|
||||
// PodToleratesNodeTaints is only interested in NoSchedule and NoExecute taints.
|
||||
return t.Effect == corev1.TaintEffectNoSchedule || t.Effect == corev1.TaintEffectNoExecute
|
||||
// IsNodeAffinityMatched returns whether the node matches the node affinity.
|
||||
func (w *NodeClaimWrapper) IsNodeAffinityMatched(node *corev1.Node) bool {
|
||||
if w.nodeAffinitySelector == nil {
|
||||
return true
|
||||
}
|
||||
matchedNodes := make([]*corev1.Node, 0)
|
||||
for _, node := range nodes {
|
||||
if node.Spec.Unschedulable {
|
||||
continue
|
||||
}
|
||||
if !helper.NodeReady(node) {
|
||||
continue
|
||||
}
|
||||
if _, isUntolerated := schedcorev1.FindMatchingUntoleratedTaint(node.Spec.Taints, tolerations, filterPredicate); isUntolerated {
|
||||
continue
|
||||
}
|
||||
matchedNodes = append(matchedNodes, node)
|
||||
}
|
||||
return matchedNodes, nil
|
||||
return w.nodeAffinitySelector.Match(node)
|
||||
}
|
||||
|
||||
// IsNodeSchedulable returns whether the node matches the tolerations.
|
||||
func (w *NodeClaimWrapper) IsNodeSchedulable(node *corev1.Node) bool {
|
||||
if node.Spec.Unschedulable {
|
||||
return false
|
||||
}
|
||||
if !helper.NodeReady(node) {
|
||||
return false
|
||||
}
|
||||
if _, isUntolerated := schedcorev1.FindMatchingUntoleratedTaint(node.Spec.Taints, w.tolerations, tolerationFilterPredicate); isUntolerated {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/kr/pretty"
|
||||
|
@ -34,6 +35,7 @@ import (
|
|||
"github.com/karmada-io/karmada/pkg/util/helper"
|
||||
"github.com/karmada-io/karmada/pkg/util/informermanager"
|
||||
"github.com/karmada-io/karmada/pkg/util/informermanager/keys"
|
||||
"github.com/karmada-io/karmada/pkg/util/lifted"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -61,6 +63,7 @@ type AccurateSchedulerEstimatorServer struct {
|
|||
replicaLister *replica.ListerWrapper
|
||||
getPodFunc func(nodeName string) ([]*corev1.Pod, error)
|
||||
informerManager informermanager.SingleClusterInformerManager
|
||||
parallelizer lifted.Parallelizer
|
||||
}
|
||||
|
||||
// NewEstimatorServer creates an instance of AccurateSchedulerEstimatorServer.
|
||||
|
@ -87,6 +90,7 @@ func NewEstimatorServer(
|
|||
PodLister: informerFactory.Core().V1().Pods().Lister(),
|
||||
ReplicaSetLister: informerFactory.Apps().V1().ReplicaSets().Lister(),
|
||||
},
|
||||
parallelizer: lifted.NewParallelizer(opts.Parallelism),
|
||||
}
|
||||
|
||||
// Establish a connection between the pods and their assigned nodes.
|
||||
|
@ -194,15 +198,19 @@ func (es *AccurateSchedulerEstimatorServer) MaxAvailableReplicas(ctx context.Con
|
|||
|
||||
// Step 1: Get all matched nodes by node claim
|
||||
startTime := time.Now()
|
||||
nodes, err := nodeutil.ListNodesByNodeClaim(es.nodeLister, request.ReplicaRequirements.NodeClaim)
|
||||
metrics.UpdateEstimatingAlgorithmLatency(err, metrics.EstimatingTypeMaxAvailableReplicas, metrics.EstimatingStepListNodesByNodeClaim, startTime)
|
||||
ncw, err := nodeutil.NewNodeClaimWrapper(request.ReplicaRequirements.NodeClaim)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to find matched nodes: %v", err)
|
||||
return nil, fmt.Errorf("failed to new node claim wrapper: %v", err)
|
||||
}
|
||||
nodes, err := ncw.ListNodesByLabelSelector(es.nodeLister)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list matched nodes by label selector: %v", err)
|
||||
}
|
||||
metrics.UpdateEstimatingAlgorithmLatency(err, metrics.EstimatingTypeMaxAvailableReplicas, metrics.EstimatingStepListNodesByNodeClaim, startTime)
|
||||
|
||||
// Step 2: Calculate cluster max available replicas by filtered nodes
|
||||
// Step 2: Calculate cluster max available replicas by filtered nodes concurrently
|
||||
startTime = time.Now()
|
||||
maxReplicas := es.maxAvailableReplicas(nodes, request.ReplicaRequirements.ResourceRequest)
|
||||
maxReplicas := es.maxAvailableReplicas(ctx, ncw, nodes, request.ReplicaRequirements.ResourceRequest)
|
||||
metrics.UpdateEstimatingAlgorithmLatency(nil, metrics.EstimatingTypeMaxAvailableReplicas, metrics.EstimatingStepMaxAvailableReplicas, startTime)
|
||||
|
||||
return &pb.MaxAvailableReplicasResponse{MaxReplicas: maxReplicas}, nil
|
||||
|
@ -252,18 +260,30 @@ func (es *AccurateSchedulerEstimatorServer) GetUnschedulableReplicas(ctx context
|
|||
return &pb.UnschedulableReplicasResponse{UnschedulableReplicas: unschedulables}, err
|
||||
}
|
||||
|
||||
func (es *AccurateSchedulerEstimatorServer) maxAvailableReplicas(nodes []*corev1.Node, request corev1.ResourceList) int32 {
|
||||
var maxReplicas int32
|
||||
for _, node := range nodes {
|
||||
func (es *AccurateSchedulerEstimatorServer) maxAvailableReplicas(
|
||||
ctx context.Context,
|
||||
ncw *nodeutil.NodeClaimWrapper,
|
||||
nodes []*corev1.Node,
|
||||
request corev1.ResourceList,
|
||||
) int32 {
|
||||
var res int32
|
||||
processNode := func(i int) {
|
||||
node := nodes[i]
|
||||
if node == nil {
|
||||
return
|
||||
}
|
||||
if !ncw.IsNodeMatched(node) {
|
||||
return
|
||||
}
|
||||
maxReplica, err := es.nodeMaxAvailableReplica(node, request)
|
||||
if err != nil {
|
||||
klog.Errorf("Error: %v", err)
|
||||
continue
|
||||
return
|
||||
}
|
||||
klog.V(4).Infof("Node(%s) max available replica: %d", node.Name, maxReplica)
|
||||
maxReplicas += maxReplica
|
||||
atomic.AddInt32(&res, maxReplica)
|
||||
}
|
||||
return maxReplicas
|
||||
es.parallelizer.Until(ctx, len(nodes), processNode)
|
||||
return res
|
||||
}
|
||||
|
||||
func (es *AccurateSchedulerEstimatorServer) nodeMaxAvailableReplica(node *corev1.Node, request corev1.ResourceList) (int32, error) {
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
Copyright 2020 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// This code is directly lifted from the Kubernetes codebase.
|
||||
// For reference:
|
||||
// https://github.com/kubernetes/kubernetes/blob/release-1.23/pkg/scheduler/framework/parallelize/parallelism.go
|
||||
|
||||
package lifted
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
)
|
||||
|
||||
// DefaultParallelism is the default parallelism used in scheduler.
|
||||
const DefaultParallelism int = 16
|
||||
|
||||
// Parallelizer holds the parallelism for scheduler.
|
||||
type Parallelizer struct {
|
||||
parallelism int
|
||||
}
|
||||
|
||||
// NewParallelizer returns an object holding the parallelism.
|
||||
func NewParallelizer(p int) Parallelizer {
|
||||
if p <= 0 {
|
||||
p = DefaultParallelism
|
||||
}
|
||||
return Parallelizer{parallelism: p}
|
||||
}
|
||||
|
||||
// chunkSizeFor returns a chunk size for the given number of items to use for
|
||||
// parallel work. The size aims to produce good CPU utilization.
|
||||
// returns max(1, min(sqrt(n), n/Parallelism))
|
||||
func chunkSizeFor(n, parallelism int) int {
|
||||
s := int(math.Sqrt(float64(n)))
|
||||
|
||||
if r := n/parallelism + 1; s > r {
|
||||
s = r
|
||||
} else if s < 1 {
|
||||
s = 1
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms.
|
||||
func (p Parallelizer) Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc) {
|
||||
workqueue.ParallelizeUntil(ctx, p.parallelism, pieces, doWorkPiece, workqueue.WithChunkSize(chunkSizeFor(pieces, p.parallelism)))
|
||||
}
|
Loading…
Reference in New Issue