From b86461e82fbadba43a44baa6ad12eb6b08994fc2 Mon Sep 17 00:00:00 2001 From: Garrybest Date: Wed, 30 Mar 2022 20:18:47 +0800 Subject: [PATCH 1/2] add benchmark in estimator Signed-off-by: Garrybest --- pkg/estimator/server/server.go | 4 +- pkg/estimator/server/server_test.go | 155 ++++++++++++++++++++++++++++ test/helper/resource.go | 24 +++++ 3 files changed, 181 insertions(+), 2 deletions(-) diff --git a/pkg/estimator/server/server.go b/pkg/estimator/server/server.go index f6008c661..7c4af96af 100644 --- a/pkg/estimator/server/server.go +++ b/pkg/estimator/server/server.go @@ -291,7 +291,7 @@ func traceMaxAvailableReplicas(object string, start time.Time, request *pb.MaxAv klog.Errorf("Failed to calculate cluster available replicas: %v", *err) return } - klog.Infof("Finish calculating cluster available replicas of resource(%s), max replicas: %d, time elapsed: %s", object, (*response).MaxReplicas, time.Since(start)) + klog.V(2).Infof("Finish calculating cluster available replicas of resource(%s), max replicas: %d, time elapsed: %s", object, (*response).MaxReplicas, time.Since(start)) } } @@ -304,6 +304,6 @@ func traceGetUnschedulableReplicas(object string, start time.Time, request *pb.U klog.Errorf("Failed to detect cluster unschedulable replicas: %v", *err) return } - klog.Infof("Finish detecting cluster unschedulable replicas of resource(%s), unschedulable replicas: %d, time elapsed: %s", object, (*response).UnschedulableReplicas, time.Since(start)) + klog.V(2).Infof("Finish detecting cluster unschedulable replicas of resource(%s), unschedulable replicas: %d, time elapsed: %s", object, (*response).UnschedulableReplicas, time.Since(start)) } } diff --git a/pkg/estimator/server/server_test.go b/pkg/estimator/server/server_test.go index 2a6a0a009..43874431d 100644 --- a/pkg/estimator/server/server_test.go +++ b/pkg/estimator/server/server_test.go @@ -6,6 +6,7 @@ import ( "reflect" "testing" + "google.golang.org/grpc/metadata" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -18,6 +19,7 @@ import ( "github.com/karmada-io/karmada/cmd/scheduler-estimator/app/options" "github.com/karmada-io/karmada/pkg/estimator/pb" + "github.com/karmada-io/karmada/pkg/util" testhelper "github.com/karmada-io/karmada/test/helper" ) @@ -245,3 +247,156 @@ func TestAccurateSchedulerEstimatorServer_MaxAvailableReplicas(t *testing.T) { }) } } + +func BenchmarkAccurateSchedulerEstimatorServer_MaxAvailableReplicas(b *testing.B) { + opt := &options.Options{ + ClusterName: "fake", + } + type args struct { + request *pb.MaxAvailableReplicasRequest + } + tests := []struct { + name string + allNodesNum int + allPodsNum int + nodeTemplate *corev1.Node + podTemplate *corev1.Pod + args args + }{ + { + name: "500 nodes and 10,000 pods without affinity and tolerations", + allNodesNum: 500, + allPodsNum: 10000, + nodeTemplate: testhelper.NewNode("", 100*testhelper.ResourceUnitCPU, 200*testhelper.ResourceUnitMem, 110*testhelper.ResourceUnitPod, 200*testhelper.ResourceUnitEphemeralStorage), + podTemplate: testhelper.NewPodWithRequest("", "", 2*testhelper.ResourceUnitCPU, 3*testhelper.ResourceUnitMem, 4*testhelper.ResourceUnitEphemeralStorage), + // request 1 cpu, 2 mem + args: args{ + request: &pb.MaxAvailableReplicasRequest{ + Cluster: "fake", + ReplicaRequirements: pb.ReplicaRequirements{ + ResourceRequest: testhelper.NewResourceList(1*testhelper.ResourceUnitCPU, 2*testhelper.ResourceUnitMem, testhelper.ResourceUnitZero), + }, + }, + }, + }, + { + name: "5000 nodes and 100,000 pods without affinity and tolerations", + allNodesNum: 5000, + allPodsNum: 100000, + nodeTemplate: testhelper.NewNode("", 100*testhelper.ResourceUnitCPU, 200*testhelper.ResourceUnitMem, 110*testhelper.ResourceUnitPod, 200*testhelper.ResourceUnitEphemeralStorage), + podTemplate: testhelper.NewPodWithRequest("", "", 2*testhelper.ResourceUnitCPU, 3*testhelper.ResourceUnitMem, 4*testhelper.ResourceUnitEphemeralStorage), + // request 1 cpu, 2 mem + args: args{ + request: &pb.MaxAvailableReplicasRequest{ + Cluster: "fake", + ReplicaRequirements: pb.ReplicaRequirements{ + ResourceRequest: testhelper.NewResourceList(1*testhelper.ResourceUnitCPU, 2*testhelper.ResourceUnitMem, testhelper.ResourceUnitZero), + }, + }, + }, + }, + { + name: "5000 nodes and 100,000 pods with taint and tolerations", + allNodesNum: 5000, + allPodsNum: 100000, + nodeTemplate: testhelper.MakeNodeWithTaints("", 100*testhelper.ResourceUnitCPU, 200*testhelper.ResourceUnitMem, 110*testhelper.ResourceUnitPod, 200*testhelper.ResourceUnitEphemeralStorage, []corev1.Taint{{Key: "key1", Value: "value1", Effect: corev1.TaintEffectNoSchedule}}), + podTemplate: testhelper.NewPodWithRequest("", "", 2*testhelper.ResourceUnitCPU, 3*testhelper.ResourceUnitMem, 4*testhelper.ResourceUnitEphemeralStorage), + // request 1 cpu, 2 mem + args: args{ + request: &pb.MaxAvailableReplicasRequest{ + Cluster: "fake", + ReplicaRequirements: pb.ReplicaRequirements{ + NodeClaim: &pb.NodeClaim{ + Tolerations: []corev1.Toleration{ + {Key: "key1", Operator: corev1.TolerationOpEqual, Value: "value1"}, + }, + }, + ResourceRequest: testhelper.NewResourceList(1*testhelper.ResourceUnitCPU, 2*testhelper.ResourceUnitMem, testhelper.ResourceUnitZero), + }, + }, + }, + }, + { + name: "5000 nodes and 100,000 pods with node affinity and tolerations", + allNodesNum: 5000, + allPodsNum: 100000, + nodeTemplate: testhelper.MakeNodeWithLabels("", 100*testhelper.ResourceUnitCPU, 200*testhelper.ResourceUnitMem, 110*testhelper.ResourceUnitPod, 200*testhelper.ResourceUnitEphemeralStorage, map[string]string{"a": "1"}), + podTemplate: testhelper.NewPodWithRequest("", "", 2*testhelper.ResourceUnitCPU, 3*testhelper.ResourceUnitMem, 4*testhelper.ResourceUnitEphemeralStorage), + // request 1 cpu, 2 mem + args: args{ + request: &pb.MaxAvailableReplicasRequest{ + Cluster: "fake", + ReplicaRequirements: pb.ReplicaRequirements{ + NodeClaim: &pb.NodeClaim{ + NodeAffinity: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "a", + Operator: corev1.NodeSelectorOpGt, + Values: []string{"0"}, + }, + }, + }, + }, + }, + Tolerations: []corev1.Toleration{ + {Key: "key1", Operator: corev1.TolerationOpEqual, Value: "value1"}, + }, + }, + ResourceRequest: testhelper.NewResourceList(1*testhelper.ResourceUnitCPU, 2*testhelper.ResourceUnitMem, testhelper.ResourceUnitZero), + }, + }, + }, + }, + } + for _, tt := range tests { + b.Run(tt.name, func(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctx = metadata.NewIncomingContext(ctx, metadata.Pairs(string(util.ContextKeyObject), "fake")) + + gvrToListKind := map[schema.GroupVersionResource]string{ + {Group: "apps", Version: "v1", Resource: "deployments"}: "DeploymentList", + } + dynamicClient := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(runtime.NewScheme(), gvrToListKind) + discoveryClient := &discoveryfake.FakeDiscovery{ + Fake: &coretesting.Fake{}, + } + discoveryClient.Resources = []*metav1.APIResourceList{ + { + GroupVersion: appsv1.SchemeGroupVersion.String(), + APIResources: []metav1.APIResource{ + {Name: "deployments", Namespaced: true, Kind: "Deployment"}, + }, + }, + } + nodes, pods := testhelper.MakeNodesAndPods(tt.allNodesNum, tt.allPodsNum, tt.nodeTemplate, tt.podTemplate) + objs := make([]runtime.Object, 0, len(nodes)+len(pods)) + for _, node := range nodes { + objs = append(objs, node) + } + for _, pod := range pods { + objs = append(objs, pod) + } + + es := NewEstimatorServer(fake.NewSimpleClientset(objs...), dynamicClient, discoveryClient, opt, ctx.Done()) + + es.informerFactory.Start(ctx.Done()) + if !es.waitForCacheSync(ctx.Done()) { + b.Fatalf("MaxAvailableReplicas() error = %v", fmt.Errorf("failed to wait for cache sync")) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := es.MaxAvailableReplicas(ctx, tt.args.request) + if err != nil { + b.Fatalf("MaxAvailableReplicas() error = %v", err) + return + } + } + }) + } +} diff --git a/test/helper/resource.go b/test/helper/resource.go index 04b777dad..0d24c2125 100644 --- a/test/helper/resource.go +++ b/test/helper/resource.go @@ -321,6 +321,30 @@ func NewNode(node string, milliCPU, memory, pods, ephemeralStorage int64) *corev } } +// MakeNodesAndPods will make batch of nodes and pods based on template. +func MakeNodesAndPods(allNodesNum, allPodsNum int, nodeTemplate *corev1.Node, podTemplate *corev1.Pod) ([]*corev1.Node, []*corev1.Pod) { + nodes := make([]*corev1.Node, 0, allNodesNum) + pods := make([]*corev1.Pod, 0, allPodsNum) + + avg, residue := allPodsNum/allNodesNum, allPodsNum%allNodesNum + for i := 0; i < allNodesNum; i++ { + node := nodeTemplate.DeepCopy() + node.Name = fmt.Sprintf("node-%d", i) + nodes = append(nodes, node) + num := avg + if i < residue { + num++ + } + for j := 0; j < num; j++ { + pod := podTemplate.DeepCopy() + pod.Name = fmt.Sprintf("node-%d-%d", i, j) + pod.Spec.NodeName = node.Name + pods = append(pods, pod) + } + } + return nodes, pods +} + // MakeNodeWithLabels will build a ready node with resource and labels. func MakeNodeWithLabels(node string, milliCPU, memory, pods, ephemeralStorage int64, labels map[string]string) *corev1.Node { return &corev1.Node{ From 5e9d5d79bb16f667457083a7b88f3122cc37776b Mon Sep 17 00:00:00 2001 From: Garrybest Date: Wed, 30 Mar 2022 20:29:06 +0800 Subject: [PATCH 2/2] add parallelism in estimator Signed-off-by: Garrybest --- .../app/options/options.go | 3 + pkg/estimator/server/nodes/filter.go | 112 ++++++++---------- pkg/estimator/server/server.go | 44 +++++-- pkg/util/lifted/parallelism.go | 63 ++++++++++ 4 files changed, 150 insertions(+), 72 deletions(-) create mode 100644 pkg/util/lifted/parallelism.go diff --git a/cmd/scheduler-estimator/app/options/options.go b/cmd/scheduler-estimator/app/options/options.go index b33658d8e..4cfa84cda 100644 --- a/cmd/scheduler-estimator/app/options/options.go +++ b/cmd/scheduler-estimator/app/options/options.go @@ -25,6 +25,8 @@ type Options struct { ClusterAPIQPS float32 // ClusterAPIBurst is the burst to allow while talking with cluster kube-apiserver. ClusterAPIBurst int + // Parallelism defines the amount of parallelism in algorithms for estimating. Must be greater than 0. Defaults to 16. + Parallelism int } // NewOptions builds an empty options. @@ -45,4 +47,5 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&o.SecurePort, "secure-port", defaultHealthzPort, "The secure port on which to serve HTTPS.") fs.Float32Var(&o.ClusterAPIQPS, "kube-api-qps", 20.0, "QPS to use while talking with apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") fs.IntVar(&o.ClusterAPIBurst, "kube-api-burst", 30, "Burst to use while talking with apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") + fs.IntVar(&o.Parallelism, "parallelism", o.Parallelism, "Parallelism defines the amount of parallelism in algorithms for estimating. Must be greater than 0. Defaults to 16.") } diff --git a/pkg/estimator/server/nodes/filter.go b/pkg/estimator/server/nodes/filter.go index 3c4a9fbf8..89b37e272 100644 --- a/pkg/estimator/server/nodes/filter.go +++ b/pkg/estimator/server/nodes/filter.go @@ -1,8 +1,6 @@ package nodes import ( - "fmt" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" listv1 "k8s.io/client-go/listers/core/v1" @@ -13,77 +11,71 @@ import ( "github.com/karmada-io/karmada/pkg/util/helper" ) -// ListNodesByNodeClaim returns all nodes that match the node claim. -func ListNodesByNodeClaim(nodeLister listv1.NodeLister, claim *pb.NodeClaim) ([]*corev1.Node, error) { - nodeClaim := claim - if nodeClaim == nil { - nodeClaim = &pb.NodeClaim{} +var ( + tolerationFilterPredicate = func(t *corev1.Taint) bool { + // PodToleratesNodeTaints is only interested in NoSchedule and NoExecute taints. + return t.Effect == corev1.TaintEffectNoSchedule || t.Effect == corev1.TaintEffectNoExecute } - nodes, err := ListNodesByLabelSelector(nodeLister, labels.SelectorFromSet(nodeClaim.NodeSelector)) - if err != nil { - return nil, fmt.Errorf("cannot list nodes by label selector, %v", err) - } - nodes, err = FilterNodesByNodeAffinity(nodes, nodeClaim.NodeAffinity) - if err != nil { - return nil, fmt.Errorf("cannot filter nodes by node affinity, %v", err) - } - nodes, err = FilterSchedulableNodes(nodes, nodeClaim.Tolerations) - if err != nil { - return nil, fmt.Errorf("cannot filter nodes by tolerations, %v", err) - } - return nodes, err +) + +// NodeClaimWrapper is a wrapper that wraps the node claim. +type NodeClaimWrapper struct { + nodeSelector labels.Selector + tolerations []corev1.Toleration + nodeAffinitySelector *nodeaffinity.NodeSelector } -// ListAllNodes returns all nodes. -func ListAllNodes(nodeLister listv1.NodeLister) ([]*corev1.Node, error) { - return ListNodesByLabelSelector(nodeLister, labels.Everything()) +// NewNodeClaimWrapper returns a new NodeClaimWrapper. +func NewNodeClaimWrapper(claim *pb.NodeClaim) (*NodeClaimWrapper, error) { + wrapper := &NodeClaimWrapper{} + if claim == nil { + wrapper.nodeSelector = labels.Everything() + return wrapper, nil + } + if claim.NodeAffinity != nil { + selector, err := nodeaffinity.NewNodeSelector(claim.NodeAffinity) + if err != nil { + return nil, err + } + wrapper.nodeAffinitySelector = selector + } + wrapper.nodeSelector = labels.SelectorFromSet(claim.NodeSelector) + wrapper.tolerations = claim.Tolerations + return wrapper, nil } // ListNodesByLabelSelector returns nodes that match the node selector. -func ListNodesByLabelSelector(nodeLister listv1.NodeLister, selector labels.Selector) ([]*corev1.Node, error) { - nodes, err := nodeLister.List(selector) +func (w *NodeClaimWrapper) ListNodesByLabelSelector(nodeLister listv1.NodeLister) ([]*corev1.Node, error) { + nodes, err := nodeLister.List(w.nodeSelector) if err != nil { return nil, err } return nodes, nil } -// FilterNodesByNodeAffinity returns nodes that match the node affinity. -func FilterNodesByNodeAffinity(nodes []*corev1.Node, affinity *corev1.NodeSelector) ([]*corev1.Node, error) { - if affinity == nil { - return nodes, nil - } - matchedNodes := make([]*corev1.Node, 0) - selector, err := nodeaffinity.NewNodeSelector(affinity) - if err != nil { - return nil, err - } - for _, node := range nodes { - if selector.Match(node) { - matchedNodes = append(matchedNodes, node) - } - } - return matchedNodes, nil +// IsNodeMatched returns whether the node matches all conditions. +func (w *NodeClaimWrapper) IsNodeMatched(node *corev1.Node) bool { + return w.IsNodeAffinityMatched(node) && w.IsNodeSchedulable(node) } -// FilterSchedulableNodes filters schedulable nodes that match the given tolerations. -func FilterSchedulableNodes(nodes []*corev1.Node, tolerations []corev1.Toleration) ([]*corev1.Node, error) { - filterPredicate := func(t *corev1.Taint) bool { - // PodToleratesNodeTaints is only interested in NoSchedule and NoExecute taints. - return t.Effect == corev1.TaintEffectNoSchedule || t.Effect == corev1.TaintEffectNoExecute +// IsNodeAffinityMatched returns whether the node matches the node affinity. +func (w *NodeClaimWrapper) IsNodeAffinityMatched(node *corev1.Node) bool { + if w.nodeAffinitySelector == nil { + return true } - matchedNodes := make([]*corev1.Node, 0) - for _, node := range nodes { - if node.Spec.Unschedulable { - continue - } - if !helper.NodeReady(node) { - continue - } - if _, isUntolerated := schedcorev1.FindMatchingUntoleratedTaint(node.Spec.Taints, tolerations, filterPredicate); isUntolerated { - continue - } - matchedNodes = append(matchedNodes, node) - } - return matchedNodes, nil + return w.nodeAffinitySelector.Match(node) +} + +// IsNodeSchedulable returns whether the node matches the tolerations. +func (w *NodeClaimWrapper) IsNodeSchedulable(node *corev1.Node) bool { + if node.Spec.Unschedulable { + return false + } + if !helper.NodeReady(node) { + return false + } + if _, isUntolerated := schedcorev1.FindMatchingUntoleratedTaint(node.Spec.Taints, w.tolerations, tolerationFilterPredicate); isUntolerated { + return false + } + return true } diff --git a/pkg/estimator/server/server.go b/pkg/estimator/server/server.go index 7c4af96af..289d97494 100644 --- a/pkg/estimator/server/server.go +++ b/pkg/estimator/server/server.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "sync/atomic" "time" "github.com/kr/pretty" @@ -34,6 +35,7 @@ import ( "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/informermanager" "github.com/karmada-io/karmada/pkg/util/informermanager/keys" + "github.com/karmada-io/karmada/pkg/util/lifted" ) const ( @@ -61,6 +63,7 @@ type AccurateSchedulerEstimatorServer struct { replicaLister *replica.ListerWrapper getPodFunc func(nodeName string) ([]*corev1.Pod, error) informerManager informermanager.SingleClusterInformerManager + parallelizer lifted.Parallelizer } // NewEstimatorServer creates an instance of AccurateSchedulerEstimatorServer. @@ -87,6 +90,7 @@ func NewEstimatorServer( PodLister: informerFactory.Core().V1().Pods().Lister(), ReplicaSetLister: informerFactory.Apps().V1().ReplicaSets().Lister(), }, + parallelizer: lifted.NewParallelizer(opts.Parallelism), } // Establish a connection between the pods and their assigned nodes. @@ -194,15 +198,19 @@ func (es *AccurateSchedulerEstimatorServer) MaxAvailableReplicas(ctx context.Con // Step 1: Get all matched nodes by node claim startTime := time.Now() - nodes, err := nodeutil.ListNodesByNodeClaim(es.nodeLister, request.ReplicaRequirements.NodeClaim) - metrics.UpdateEstimatingAlgorithmLatency(err, metrics.EstimatingTypeMaxAvailableReplicas, metrics.EstimatingStepListNodesByNodeClaim, startTime) + ncw, err := nodeutil.NewNodeClaimWrapper(request.ReplicaRequirements.NodeClaim) if err != nil { - return nil, fmt.Errorf("failed to find matched nodes: %v", err) + return nil, fmt.Errorf("failed to new node claim wrapper: %v", err) } + nodes, err := ncw.ListNodesByLabelSelector(es.nodeLister) + if err != nil { + return nil, fmt.Errorf("failed to list matched nodes by label selector: %v", err) + } + metrics.UpdateEstimatingAlgorithmLatency(err, metrics.EstimatingTypeMaxAvailableReplicas, metrics.EstimatingStepListNodesByNodeClaim, startTime) - // Step 2: Calculate cluster max available replicas by filtered nodes + // Step 2: Calculate cluster max available replicas by filtered nodes concurrently startTime = time.Now() - maxReplicas := es.maxAvailableReplicas(nodes, request.ReplicaRequirements.ResourceRequest) + maxReplicas := es.maxAvailableReplicas(ctx, ncw, nodes, request.ReplicaRequirements.ResourceRequest) metrics.UpdateEstimatingAlgorithmLatency(nil, metrics.EstimatingTypeMaxAvailableReplicas, metrics.EstimatingStepMaxAvailableReplicas, startTime) return &pb.MaxAvailableReplicasResponse{MaxReplicas: maxReplicas}, nil @@ -252,18 +260,30 @@ func (es *AccurateSchedulerEstimatorServer) GetUnschedulableReplicas(ctx context return &pb.UnschedulableReplicasResponse{UnschedulableReplicas: unschedulables}, err } -func (es *AccurateSchedulerEstimatorServer) maxAvailableReplicas(nodes []*corev1.Node, request corev1.ResourceList) int32 { - var maxReplicas int32 - for _, node := range nodes { +func (es *AccurateSchedulerEstimatorServer) maxAvailableReplicas( + ctx context.Context, + ncw *nodeutil.NodeClaimWrapper, + nodes []*corev1.Node, + request corev1.ResourceList, +) int32 { + var res int32 + processNode := func(i int) { + node := nodes[i] + if node == nil { + return + } + if !ncw.IsNodeMatched(node) { + return + } maxReplica, err := es.nodeMaxAvailableReplica(node, request) if err != nil { klog.Errorf("Error: %v", err) - continue + return } - klog.V(4).Infof("Node(%s) max available replica: %d", node.Name, maxReplica) - maxReplicas += maxReplica + atomic.AddInt32(&res, maxReplica) } - return maxReplicas + es.parallelizer.Until(ctx, len(nodes), processNode) + return res } func (es *AccurateSchedulerEstimatorServer) nodeMaxAvailableReplica(node *corev1.Node, request corev1.ResourceList) (int32, error) { diff --git a/pkg/util/lifted/parallelism.go b/pkg/util/lifted/parallelism.go new file mode 100644 index 000000000..f0c930dd1 --- /dev/null +++ b/pkg/util/lifted/parallelism.go @@ -0,0 +1,63 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This code is directly lifted from the Kubernetes codebase. +// For reference: +// https://github.com/kubernetes/kubernetes/blob/release-1.23/pkg/scheduler/framework/parallelize/parallelism.go + +package lifted + +import ( + "context" + "math" + + "k8s.io/client-go/util/workqueue" +) + +// DefaultParallelism is the default parallelism used in scheduler. +const DefaultParallelism int = 16 + +// Parallelizer holds the parallelism for scheduler. +type Parallelizer struct { + parallelism int +} + +// NewParallelizer returns an object holding the parallelism. +func NewParallelizer(p int) Parallelizer { + if p <= 0 { + p = DefaultParallelism + } + return Parallelizer{parallelism: p} +} + +// chunkSizeFor returns a chunk size for the given number of items to use for +// parallel work. The size aims to produce good CPU utilization. +// returns max(1, min(sqrt(n), n/Parallelism)) +func chunkSizeFor(n, parallelism int) int { + s := int(math.Sqrt(float64(n))) + + if r := n/parallelism + 1; s > r { + s = r + } else if s < 1 { + s = 1 + } + return s +} + +// Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms. +func (p Parallelizer) Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc) { + workqueue.ParallelizeUntil(ctx, p.parallelism, pieces, doWorkPiece, workqueue.WithChunkSize(chunkSizeFor(pieces, p.parallelism))) +}