Added scheduler framework and basic functionality(#108)

Signed-off-by: xuzhonghu <xuzhonghu@huawei.com>
This commit is contained in:
Zhonghu Xu 2021-01-07 06:39:38 -06:00 committed by GitHub
parent 0cfdbcd2ae
commit c59afde0e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1038 additions and 40 deletions

View File

@ -27,7 +27,7 @@ ifeq ($(VERSION), "")
endif
endif
all: karmada-controller-manager karmadactl
all: karmada-controller-manager karmada-scheduler karmadactl
karmada-controller-manager: $(SOURCES)
CGO_ENABLED=0 GOOS=$(GOOS) go build \
@ -35,6 +35,12 @@ karmada-controller-manager: $(SOURCES)
-o karmada-controller-manager \
cmd/controller-manager/controller-manager.go
karmada-scheduler: $(SOURCES)
CGO_ENABLED=0 GOOS=$(GOOS) go build \
-ldflags $(LDFLAGS) \
-o karmada-scheduler \
cmd/scheduler/main.go
karmadactl: $(SOURCES)
CGO_ENABLED=0 GOOS=$(GOOS) go build \
-ldflags $(LDFLAGS) \
@ -42,19 +48,24 @@ karmadactl: $(SOURCES)
cmd/karmadactl/karmadactl.go
clean:
rm -rf karmada-controller-manager
rm -rf karmada-controller-manager karmada-scheduler karmadactl
.PHONY: test
test:
go test --race --v ./pkg/...
images: image-karmada-controller-manager
images: image-karmada-controller-manager image-karmada-scheduler
image-karmada-controller-manager: karmada-controller-manager
cp karmada-controller-manager cluster/images/karmada-controller-manager && \
docker build -t $(REGISTRY)/karmada-controller-manager:$(VERSION) cluster/images/karmada-controller-manager && \
rm cluster/images/karmada-controller-manager/karmada-controller-manager
image-karmada-scheduler: karmada-scheduler
cp karmada-scheduler cluster/images/karmada-scheduler && \
docker build -t $(REGISTRY)/karmada-scheduler:$(VERSION) cluster/images/karmada-scheduler && \
rm cluster/images/karmada-scheduler/karmada-scheduler
upload-images: images
@echo "push images to $(REGISTRY)"
ifneq ($(REGISTRY_USER_NAME), "")

View File

@ -0,0 +1,36 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: karmada-scheduler
namespace: karmada-system
labels:
app: karmada-scheduler
spec:
replicas: 1
selector:
matchLabels:
app: karmada-scheduler
template:
metadata:
labels:
app: karmada-scheduler
spec:
serviceAccountName: karmada-scheduler
tolerations:
- key: node-role.kubernetes.io/master
operator: Exists
containers:
- name: karmada-scheduler
image: swr.ap-southeast-1.myhuaweicloud.com/karmada/karmada-scheduler:latest
imagePullPolicy: IfNotPresent
command:
- /bin/karmada-scheduler
- --kubeconfig=/etc/kubeconfig
volumeMounts:
- name: kubeconfig
subPath: kubeconfig
mountPath: /etc/kubeconfig
volumes:
- name: kubeconfig
secret:
secretName: kubeconfig

View File

@ -3,3 +3,9 @@ kind: ServiceAccount
metadata:
name: karmada-controller-manager
namespace: karmada-system
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: karmada-scheduler
namespace: karmada-system

View File

@ -0,0 +1,7 @@
FROM alpine:3.7
RUN apk add --no-cache ca-certificates
ADD karmada-scheduler /bin/
CMD ["/bin/karmada-scheduler"]

View File

@ -0,0 +1,48 @@
package options
import (
"time"
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/leaderelection/resourcelock"
componentbaseconfig "k8s.io/component-base/config"
)
var (
defaultElectionLeaseDuration = metav1.Duration{Duration: 15 * time.Second}
defaultElectionRenewDeadline = metav1.Duration{Duration: 10 * time.Second}
defaultElectionRetryPeriod = metav1.Duration{Duration: 2 * time.Second}
)
// Options contains everything necessary to create and run controller-manager.
type Options struct {
LeaderElection componentbaseconfig.LeaderElectionConfiguration
KubeConfig string
Master string
}
// NewOptions builds an default scheduler options.
func NewOptions() *Options {
return &Options{
LeaderElection: componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: false,
ResourceLock: resourcelock.LeasesResourceLock,
LeaseDuration: defaultElectionLeaseDuration,
RenewDeadline: defaultElectionRenewDeadline,
RetryPeriod: defaultElectionRetryPeriod,
},
}
}
// AddFlags adds flags of scheduler to the specified FlagSet
func (o *Options) AddFlags(fs *pflag.FlagSet) {
if o == nil {
return
}
fs.BoolVar(&o.LeaderElection.LeaderElect, "leader-elect", false, "Enable leader election, which must be true when running multi instances.")
fs.StringVar(&o.LeaderElection.ResourceNamespace, "lock-namespace", "", "Define the namespace of the lock object.")
fs.StringVar(&o.KubeConfig, "kubeconfig", o.KubeConfig, "Path to a KubeConfig. Only required if out-of-cluster.")
fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server. Overrides any value in KubeConfig. Only required if out-of-cluster.")
}

View File

@ -0,0 +1,103 @@
package app
import (
"context"
"flag"
"fmt"
"os"
"github.com/google/uuid"
"github.com/spf13/cobra"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/cmd/scheduler/app/options"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
"github.com/karmada-io/karmada/pkg/scheduler"
)
// NewSchedulerCommand creates a *cobra.Command object with default parameters
func NewSchedulerCommand(stopChan <-chan struct{}) *cobra.Command {
opts := options.NewOptions()
cmd := &cobra.Command{
Use: "scheduler",
Long: `The karmada scheduler binds resources to the clusters it manages.`,
Run: func(cmd *cobra.Command, args []string) {
if err := run(opts, stopChan); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
opts.AddFlags(cmd.Flags())
cmd.Flags().AddGoFlagSet(flag.CommandLine)
return cmd
}
func run(opts *options.Options, stopChan <-chan struct{}) error {
resetConfig, err := clientcmd.BuildConfigFromFlags(opts.Master, opts.KubeConfig)
if err != nil {
return fmt.Errorf("error building kubeconfig: %s", err.Error())
}
dynamicClientSet := dynamic.NewForConfigOrDie(resetConfig)
karmadaClient := karmadaclientset.NewForConfigOrDie(resetConfig)
kubeClientSet := kubernetes.NewForConfigOrDie(resetConfig)
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-stopChan
cancel()
}()
sched := scheduler.NewScheduler(dynamicClientSet, karmadaClient, kubeClientSet)
if !opts.LeaderElection.LeaderElect {
sched.Run(ctx)
return fmt.Errorf("scheduler exited")
}
leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(resetConfig, "leader-election"))
if err != nil {
return err
}
hostname, err := os.Hostname()
if err != nil {
return fmt.Errorf("unable to get hostname: %v", err)
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
id := hostname + "_" + uuid.New().String()
rl, err := resourcelock.New(opts.LeaderElection.ResourceLock,
opts.LeaderElection.ResourceNamespace,
"karmada-scheduler",
leaderElectionClient.CoreV1(),
leaderElectionClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
})
if err != nil {
return fmt.Errorf("couldn't create resource lock: %v", err)
}
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: opts.LeaderElection.LeaseDuration.Duration,
RenewDeadline: opts.LeaderElection.RenewDeadline.Duration,
RetryPeriod: opts.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: sched.Run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
},
})
return nil
}

21
cmd/scheduler/main.go Normal file
View File

@ -0,0 +1,21 @@
package main
import (
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/component-base/logs"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/cmd/scheduler/app"
)
func main() {
logs.InitLogs()
defer logs.FlushLogs()
// TODO: remove dependency on apiserver
stopChan := apiserver.SetupSignalHandler()
if err := app.NewSchedulerCommand(stopChan).Execute(); err != nil {
klog.Fatal(err.Error())
}
}

1
go.mod
View File

@ -4,6 +4,7 @@ go 1.14
require (
github.com/go-logr/logr v0.3.0 // indirect
github.com/google/uuid v1.1.1
github.com/onsi/ginkgo v1.12.1
github.com/onsi/gomega v1.10.1
github.com/spf13/cobra v1.1.1

View File

@ -82,6 +82,8 @@ waitPodReady $controller_pod_label "karmada-system"
export KUBECONFIG=${KARMADA_APISERVER_CONFIG}
installCRDs
# deploy controller-manager on host cluster
export KUBECONFIG=${HOST_CLUSTER_KUBECONFIG}
# deploy controller-manager on host cluster
kubectl apply -f "${SCRIPT_ROOT}/artifacts/deploy/controller-manager.yaml"
# deploy scheduler on host cluster
kubectl apply -f "${SCRIPT_ROOT}/artifacts/deploy/karmada-scheduler.yaml"

View File

@ -33,6 +33,9 @@ make images
# load controller-manager image
kind load docker-image "${REGISTRY}/karmada-controller-manager:${VERSION}" --name="${HOST_CLUSTER_NAME}"
# load scheduler image
kind load docker-image "${REGISTRY}/karmada-scheduler:${VERSION}" --name="${HOST_CLUSTER_NAME}"
# deploy karmada control plane
"${SCRIPT_ROOT}"/hack/deploy-karmada.sh

View File

@ -16,7 +16,6 @@ import (
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"github.com/karmada-io/karmada/pkg/apis/propagationstrategy/v1alpha1"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
@ -157,8 +156,6 @@ func (c *PropagationPolicyController) calculatePropagationBindings(policy *v1alp
// buildPropagationBinding will build propagationBinding by matched resources.
func (c *PropagationPolicyController) buildPropagationBinding(policy *v1alpha1.PropagationPolicy, policyReferenceWorkloads []*unstructured.Unstructured) (controllerruntime.Result, error) {
targetCluster := c.getTargetClusters(policy.Spec.Placement)
orphanBindings, workloads, err := c.calculatePropagationBindings(policy, policyReferenceWorkloads)
if err != nil {
return controllerruntime.Result{Requeue: true}, err
@ -175,7 +172,7 @@ func (c *PropagationPolicyController) buildPropagationBinding(policy *v1alpha1.P
// If binding already exist, update if changed.
// If binding not exist, create it.
for _, workload := range workloads {
err := c.ensurePropagationBinding(policy, workload, targetCluster)
err := c.ensurePropagationBinding(policy, workload)
if err != nil {
return controllerruntime.Result{Requeue: true}, err
}
@ -287,22 +284,8 @@ func (c *PropagationPolicyController) fetchWorkload(resourceSelector v1alpha1.Re
return workload, nil
}
// getTargetClusters get targetClusters by placement.
// TODO(RainbowMango): This is a dummy function and will be removed once scheduler on board.
func (c *PropagationPolicyController) getTargetClusters(placement v1alpha1.Placement) []v1alpha1.TargetCluster {
matchClusterNames := util.GetDifferenceSet(placement.ClusterAffinity.ClusterNames, placement.ClusterAffinity.ExcludeClusters)
// TODO: cluster labelSelector, fieldSelector, clusterTolerations
// TODO: calc spread contraints. such as maximum, minimum
var targetClusters []v1alpha1.TargetCluster
for _, matchClusterName := range matchClusterNames {
targetClusters = append(targetClusters, v1alpha1.TargetCluster{Name: matchClusterName})
}
return targetClusters
}
// ensurePropagationBinding will ensure propagationBinding are created or updated.
func (c *PropagationPolicyController) ensurePropagationBinding(policy *v1alpha1.PropagationPolicy, workload *unstructured.Unstructured, clusterNames []v1alpha1.TargetCluster) error {
func (c *PropagationPolicyController) ensurePropagationBinding(policy *v1alpha1.PropagationPolicy, workload *unstructured.Unstructured) error {
bindingName := names.GenerateBindingName(workload.GetNamespace(), workload.GetKind(), workload.GetName())
propagationBinding := &v1alpha1.PropagationBinding{
ObjectMeta: metav1.ObjectMeta{
@ -321,28 +304,20 @@ func (c *PropagationPolicyController) ensurePropagationBinding(policy *v1alpha1.
Name: workload.GetName(),
ResourceVersion: workload.GetResourceVersion(),
},
Clusters: clusterNames,
},
}
runtimeObject := propagationBinding.DeepCopy()
operationResult, err := controllerutil.CreateOrUpdate(context.TODO(), c.Client, runtimeObject, func() error {
runtimeObject.Spec = propagationBinding.Spec
return nil
})
if err != nil {
klog.Errorf("Failed to create/update propagationBinding %s/%s. Error: %v", propagationBinding.GetNamespace(), propagationBinding.GetName(), err)
return err
}
if operationResult == controllerutil.OperationResultCreated {
_, err := c.KarmadaClient.PropagationstrategyV1alpha1().PropagationBindings(propagationBinding.Namespace).Create(context.TODO(), propagationBinding, metav1.CreateOptions{})
if err == nil {
klog.Infof("Create propagationBinding %s/%s successfully.", propagationBinding.GetNamespace(), propagationBinding.GetName())
} else if operationResult == controllerutil.OperationResultUpdated {
klog.Infof("Update propagationBinding %s/%s successfully.", propagationBinding.GetNamespace(), propagationBinding.GetName())
} else {
klog.V(2).Infof("PropagationBinding %s/%s is up to date.", propagationBinding.GetNamespace(), propagationBinding.GetName())
return nil
}
return nil
if apierrors.IsAlreadyExists(err) {
klog.V(2).Infof("PropagationBinding %s/%s is up to date.", propagationBinding.GetNamespace(), propagationBinding.GetName())
return nil
}
klog.Errorf("Failed to create propagationBinding %s/%s. Error: %v", propagationBinding.GetNamespace(), propagationBinding.GetName(), err)
return err
}
// SetupWithManager creates a controller and register to controller manager.

62
pkg/scheduler/cache/cache.go vendored Normal file
View File

@ -0,0 +1,62 @@
package cache
import (
"sync"
"github.com/karmada-io/karmada/pkg/apis/membercluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
)
// Cache is an interface for scheduler internal cache.
type Cache interface {
AddCluster(cluster *v1alpha1.MemberCluster)
UpdateCluster(cluster *v1alpha1.MemberCluster)
DeleteCluster(cluster *v1alpha1.MemberCluster)
// Snapshot returns a snapshot of the current clusters info
Snapshot() *Snapshot
}
type schedulerCache struct {
mutex sync.RWMutex
clusters map[string]*v1alpha1.MemberCluster
}
// NewCache instantiates a cache used only by scheduler.
func NewCache() Cache {
return &schedulerCache{
clusters: make(map[string]*v1alpha1.MemberCluster),
}
}
func (c *schedulerCache) AddCluster(cluster *v1alpha1.MemberCluster) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.clusters[cluster.Name] = cluster
}
func (c *schedulerCache) UpdateCluster(cluster *v1alpha1.MemberCluster) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.clusters[cluster.Name] = cluster
}
func (c *schedulerCache) DeleteCluster(cluster *v1alpha1.MemberCluster) {
c.mutex.Lock()
defer c.mutex.Unlock()
delete(c.clusters, cluster.Name)
}
// TODO: need optimization, only clone when necessary
func (c *schedulerCache) Snapshot() *Snapshot {
c.mutex.RLock()
defer c.mutex.RUnlock()
out := NewEmptySnapshot()
out.clusterInfoList = make([]*framework.ClusterInfo, 0, len(c.clusters))
for _, cluster := range c.clusters {
cloned := cluster.DeepCopy()
out.clusterInfoList = append(out.clusterInfoList, framework.NewClusterInfo(cloned))
}
return out
}

25
pkg/scheduler/cache/snapshot.go vendored Normal file
View File

@ -0,0 +1,25 @@
package cache
import "github.com/karmada-io/karmada/pkg/scheduler/framework"
// Snapshot is a snapshot of cache ClusterInfo. The scheduler takes a
// snapshot at the beginning of each scheduling cycle and uses it for its operations in that cycle.
type Snapshot struct {
// clusterInfoList is the list of nodes as ordered in the cache's nodeTree.
clusterInfoList []*framework.ClusterInfo
}
// NewEmptySnapshot initializes a Snapshot struct and returns it.
func NewEmptySnapshot() *Snapshot {
return &Snapshot{}
}
// NumOfClusters returns the number of memberClusters.
func (s *Snapshot) NumOfClusters() int {
return len(s.clusterInfoList)
}
// GetClusters returns all the clusters.
func (s *Snapshot) GetClusters() []*framework.ClusterInfo {
return s.clusterInfoList
}

View File

@ -0,0 +1,140 @@
package core
import (
"context"
"fmt"
"k8s.io/klog/v2"
memberclusterapi "github.com/karmada-io/karmada/pkg/apis/membercluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/apis/propagationstrategy/v1alpha1"
lister "github.com/karmada-io/karmada/pkg/generated/listers/propagationstrategy/v1alpha1"
"github.com/karmada-io/karmada/pkg/scheduler/cache"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
"github.com/karmada-io/karmada/pkg/scheduler/framework/runtime"
)
// ScheduleAlgorithm is the interface that should be implemented to schedule a resource to the target clusters.
type ScheduleAlgorithm interface {
Schedule(context.Context, *v1alpha1.PropagationBinding) (scheduleResult ScheduleResult, err error)
}
// ScheduleResult includes the clusters selected.
type ScheduleResult struct {
SuggestedClusters []string
}
type genericScheduler struct {
schedulerCache cache.Cache
// TODO: move it into schedulerCache
policyLister lister.PropagationPolicyLister
scheduleFramework framework.Framework
}
// NewGenericScheduler creates a genericScheduler object.
func NewGenericScheduler(
schedCache cache.Cache,
policyLister lister.PropagationPolicyLister,
plugins []string,
) ScheduleAlgorithm {
return &genericScheduler{
schedulerCache: schedCache,
policyLister: policyLister,
scheduleFramework: runtime.NewFramework(plugins),
}
}
func (g *genericScheduler) Schedule(ctx context.Context, binding *v1alpha1.PropagationBinding) (result ScheduleResult, err error) {
klog.V(4).Infof("Scheduling %s/%s", binding.Namespace, binding.Name)
clusterInfoSnapshot := g.schedulerCache.Snapshot()
if clusterInfoSnapshot.NumOfClusters() == 0 {
return result, fmt.Errorf("no clusters available to schedule")
}
var policyName string
if len(binding.OwnerReferences) > 0 {
owner := binding.OwnerReferences[0]
if owner.APIVersion == v1alpha1.SchemeGroupVersion.String() && owner.Kind == "PropagationPolicy" {
policyName = owner.Name
}
}
policy, err := g.policyLister.PropagationPolicies(binding.Namespace).Get(policyName)
if err != nil {
return result, fmt.Errorf("no propagation policy found for <%s/%s>: %v", binding.Namespace, binding.Name, err)
}
feasibleClusters, err := g.findClustersThatFit(ctx, g.scheduleFramework, &policy.Spec.Placement, clusterInfoSnapshot)
if err != nil {
return result, fmt.Errorf("failed findClustersThatFit for <%s/%s>: %v", binding.Namespace, binding.Name, err)
}
if len(feasibleClusters) == 0 {
return result, fmt.Errorf("no clusters fit")
}
klog.V(4).Infof("feasible clusters found for <%s/%s>: %v", binding.Namespace, binding.Name, feasibleClusters)
clustersScore, err := g.prioritizeClusters(ctx, g.scheduleFramework, &policy.Spec.Placement, feasibleClusters)
if err != nil {
return result, fmt.Errorf("failed prioritizeClusters for <%s/%s>: %v", binding.Namespace, binding.Name, err)
}
klog.V(4).Infof("feasible clusters scores for <%s/%s>: %v", binding.Namespace, binding.Name, clustersScore)
clusters := g.selectClusters(clustersScore)
result.SuggestedClusters = clusters
return result, nil
}
// findClustersThatFit finds the clusters that are fit for the placement based on running the filter plugins.
func (g *genericScheduler) findClustersThatFit(
ctx context.Context,
fwk framework.Framework,
placement *v1alpha1.Placement,
clusterInfo *cache.Snapshot) ([]*memberclusterapi.MemberCluster, error) {
var out []*memberclusterapi.MemberCluster
clusters := clusterInfo.GetClusters()
for _, c := range clusters {
resMap := fwk.RunFilterPlugins(ctx, placement, c.Cluster())
res := resMap.Merge()
if !res.IsSuccess() {
klog.V(4).Infof("cluster %q is not fit", c.Cluster().Name)
} else {
out = append(out, c.Cluster())
}
}
return out, nil
}
// prioritizeClusters prioritize the clusters by running the score plugins.
func (g *genericScheduler) prioritizeClusters(
ctx context.Context,
fwk framework.Framework,
placement *v1alpha1.Placement,
clusters []*memberclusterapi.MemberCluster) (result framework.ClusterScoreList, err error) {
scoresMap, err := fwk.RunScorePlugins(ctx, placement, clusters)
if err != nil {
return result, err
}
result = make(framework.ClusterScoreList, len(clusters))
for i := range clusters {
result[i] = framework.ClusterScore{Name: clusters[i].Name, Score: 0}
for j := range scoresMap {
result[i].Score += scoresMap[j][i].Score
}
}
return result, nil
}
// TODO: update the algorithms
func (g *genericScheduler) selectClusters(clustersScore framework.ClusterScoreList) []string {
out := make([]string, len(clustersScore))
for i := range clustersScore {
out[i] = clustersScore[i].Name
}
return out
}

View File

@ -0,0 +1,140 @@
package framework
import (
"context"
"errors"
"strings"
membercluster "github.com/karmada-io/karmada/pkg/apis/membercluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/apis/propagationstrategy/v1alpha1"
)
// Framework manages the set of plugins in use by the scheduling framework.
// Configured plugins are called at specified points in a scheduling context.
type Framework interface {
// RunFilterPlugins runs the set of configured Filter plugins for resources on
// the given cluster.
RunFilterPlugins(ctx context.Context, placement *v1alpha1.Placement, cluster *membercluster.MemberCluster) PluginToResult
// RunScorePlugins runs the set of configured Score plugins, it returns a map of plugin name to cores
RunScorePlugins(ctx context.Context, placement *v1alpha1.Placement, clusters []*membercluster.MemberCluster) (PluginToClusterScores, error)
}
// Plugin is the parent type for all the scheduling framework plugins.
type Plugin interface {
Name() string
}
// FilterPlugin is an interface for filter plugins. These filters are used to filter out clusters
// that are not fit for the resource.
type FilterPlugin interface {
Plugin
// Filter is called by the scheduling framework.
Filter(ctx context.Context, placement *v1alpha1.Placement, cluster *membercluster.MemberCluster) *Result
}
// Result indicates the result of running a plugin. It consists of a code, a
// message and (optionally) an error. When the status code is not `Success`,
// the reasons should explain why.
type Result struct {
code Code
reasons []string
err error
}
// Code is the Status code/type which is returned from plugins.
type Code int
// These are predefined codes used in a Status.
const (
// Success means that plugin ran correctly and found resource schedulable.
// NOTE: A nil status is also considered as "Success".
Success Code = iota
// Unschedulable is used when a plugin finds the resource unschedulable.
// The accompanying status message should explain why the it is unschedulable.
Unschedulable
// Error is used for internal plugin errors, unexpected input, etc.
Error
)
// NewResult makes a result out of the given arguments and returns its pointer.
func NewResult(code Code, reasons ...string) *Result {
s := &Result{
code: code,
reasons: reasons,
}
if code == Error {
s.err = errors.New(strings.Join(reasons, ","))
}
return s
}
// PluginToResult maps plugin name to Result.
type PluginToResult map[string]*Result
// Merge merges the statuses in the map into one. The resulting status code have the following
// precedence: Error, Unschedulable.
func (p PluginToResult) Merge() *Result {
if len(p) == 0 {
return nil
}
finalStatus := NewResult(Success)
var hasUnschedulable bool
for _, s := range p {
if s.code == Error {
finalStatus.err = s.err
} else if s.code == Unschedulable {
hasUnschedulable = true
}
finalStatus.code = s.code
finalStatus.reasons = append(finalStatus.reasons, s.reasons...)
}
if finalStatus.err != nil {
finalStatus.code = Error
} else if hasUnschedulable {
finalStatus.code = Unschedulable
}
return finalStatus
}
// IsSuccess returns true if and only if "Result" is nil or Code is "Success".
func (s *Result) IsSuccess() bool {
return s == nil || s.code == Success
}
// AsError returns nil if the Result is a success; otherwise returns an "error" object
// with a concatenated message on reasons of the Result.
func (s *Result) AsError() error {
if s.IsSuccess() {
return nil
}
if s.err != nil {
return s.err
}
return errors.New(strings.Join(s.reasons, ", "))
}
// ScorePlugin is an interface that must be implemented by "Score" plugins to rank
// clusters that passed the filtering phase.
type ScorePlugin interface {
Plugin
// Score is called on each filtered cluster. It must return success and an integer
// indicating the rank of the cluster. All scoring plugins must return success or
// the resource will be rejected.
Score(ctx context.Context, placement *v1alpha1.Placement, cluster *membercluster.MemberCluster) (float64, *Result)
}
// ClusterScore represent the cluster score.
type ClusterScore struct {
Name string
Score float64
}
// ClusterScoreList declares a list of clusters and their scores.
type ClusterScoreList []ClusterScore
// PluginToClusterScores declares a map from plugin name to its ClusterScoreList.
type PluginToClusterScores map[string]ClusterScoreList

View File

@ -0,0 +1,59 @@
package clusteraffinity
import (
"context"
membercluster "github.com/karmada-io/karmada/pkg/apis/membercluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/apis/propagationstrategy/v1alpha1"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
)
const (
// Name is the name of the plugin used in the plugin registry and configurations.
Name = "ClusterAffinity"
)
// ClusterAffinity is a plugin that checks if a resource selector matches the cluster label.
type ClusterAffinity struct{}
var _ framework.FilterPlugin = &ClusterAffinity{}
var _ framework.ScorePlugin = &ClusterAffinity{}
// New instantiates the clusteraffinity plugin.
func New() framework.Plugin {
return &ClusterAffinity{}
}
// Name returns the plugin name.
func (p *ClusterAffinity) Name() string {
return Name
}
// Filter checks if the cluster matched the placement cluster affinity constraint.
func (p *ClusterAffinity) Filter(ctx context.Context, placement *v1alpha1.Placement, cluster *membercluster.MemberCluster) *framework.Result {
affinity := placement.ClusterAffinity
if affinity != nil {
for _, clusterName := range affinity.ExcludeClusters {
if clusterName == cluster.Name {
return framework.NewResult(framework.Unschedulable, "cluster is excluded")
}
}
if len(affinity.ClusterNames) > 0 {
for _, clusterName := range affinity.ClusterNames {
if clusterName == cluster.Name {
return framework.NewResult(framework.Success)
}
}
return framework.NewResult(framework.Unschedulable, "cluster is not specified")
}
}
// If no clusters specified and it is not excluded, mark it matched
return nil
}
// Score calculates the score on the candidate cluster.
func (p *ClusterAffinity) Score(ctx context.Context, placement *v1alpha1.Placement, cluster *membercluster.MemberCluster) (float64, *framework.Result) {
return 0, nil
}

View File

@ -0,0 +1,13 @@
package plugins
import (
"github.com/karmada-io/karmada/pkg/scheduler/framework"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/clusteraffinity"
)
// NewPlugins builds all the scheduling plugins.
func NewPlugins() map[string]framework.Plugin {
return map[string]framework.Plugin{
clusteraffinity.Name: clusteraffinity.New(),
}
}

View File

@ -0,0 +1,83 @@
package runtime
import (
"context"
"fmt"
"reflect"
"k8s.io/klog/v2"
membercluster "github.com/karmada-io/karmada/pkg/apis/membercluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/apis/propagationstrategy/v1alpha1"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
plugins2 "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins"
)
// frameworkImpl implements the Framework interface and is responsible for initializing and running scheduler
// plugins.
type frameworkImpl struct {
filterPlugins []framework.FilterPlugin
scorePlugins []framework.ScorePlugin
}
var _ framework.Framework = &frameworkImpl{}
// NewFramework creates a scheduling framework.
func NewFramework(plugins []string) framework.Framework {
pluginsMap := plugins2.NewPlugins()
out := &frameworkImpl{}
filterPluginsList := reflect.ValueOf(&out.filterPlugins).Elem()
scorePluginsList := reflect.ValueOf(&out.scorePlugins).Elem()
filterType := filterPluginsList.Type().Elem()
scoreType := scorePluginsList.Type().Elem()
for _, p := range plugins {
plugin := pluginsMap[p]
if plugin == nil {
klog.Warningf("scheduling plugin %s not exists", p)
continue
}
if reflect.TypeOf(plugin).Implements(filterType) {
newPlugins := reflect.Append(filterPluginsList, reflect.ValueOf(plugin))
filterPluginsList.Set(newPlugins)
} else if reflect.TypeOf(plugin).Implements(scoreType) {
newPlugins := reflect.Append(scorePluginsList, reflect.ValueOf(plugin))
scorePluginsList.Set(newPlugins)
}
}
return out
}
// RunFilterPlugins runs the set of configured Filter plugins for resources on the cluster.
// If any of the result is not success, the cluster is not suited for the resource.
func (frw *frameworkImpl) RunFilterPlugins(ctx context.Context, placement *v1alpha1.Placement, cluster *membercluster.MemberCluster) framework.PluginToResult {
result := make(framework.PluginToResult, len(frw.filterPlugins))
for _, p := range frw.filterPlugins {
pluginResult := p.Filter(ctx, placement, cluster)
result[p.Name()] = pluginResult
}
return result
}
// RunFilterPlugins runs the set of configured Filter plugins for resources on the cluster.
// If any of the result is not success, the cluster is not suited for the resource.
func (frw *frameworkImpl) RunScorePlugins(ctx context.Context, placement *v1alpha1.Placement, clusters []*membercluster.MemberCluster) (framework.PluginToClusterScores, error) {
result := make(framework.PluginToClusterScores, len(frw.filterPlugins))
for _, p := range frw.scorePlugins {
for i, cluster := range clusters {
score, res := p.Score(ctx, placement, cluster)
if !res.IsSuccess() {
return nil, fmt.Errorf("plugin %q failed with: %w", p.Name(), res.AsError())
}
result[p.Name()][i] = framework.ClusterScore{
Name: cluster.Name,
Score: score,
}
}
}
return result, nil
}

View File

@ -0,0 +1,26 @@
package framework
import (
"github.com/karmada-io/karmada/pkg/apis/membercluster/v1alpha1"
)
// ClusterInfo is cluster level aggregated information.
type ClusterInfo struct {
// Overall cluster information.
cluster *v1alpha1.MemberCluster
}
// NewClusterInfo creates a ClusterInfo object.
func NewClusterInfo(cluster *v1alpha1.MemberCluster) *ClusterInfo {
return &ClusterInfo{
cluster: cluster,
}
}
// Cluster returns overall information about this cluster.
func (n *ClusterInfo) Cluster() *v1alpha1.MemberCluster {
if n == nil {
return nil
}
return n.cluster
}

236
pkg/scheduler/scheduler.go Normal file
View File

@ -0,0 +1,236 @@
package scheduler
import (
"context"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
memclusterapi "github.com/karmada-io/karmada/pkg/apis/membercluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/apis/propagationstrategy/v1alpha1"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
lister "github.com/karmada-io/karmada/pkg/generated/listers/propagationstrategy/v1alpha1"
schedulercache "github.com/karmada-io/karmada/pkg/scheduler/cache"
"github.com/karmada-io/karmada/pkg/scheduler/core"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/clusteraffinity"
)
const (
// maxRetries is the number of times a service will be retried before it is dropped out of the queue.
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the
// sequence of delays between successive queuings of a propagationbinding.
//
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15
)
// Scheduler is the scheduler schema, which is used to schedule a specific resource to specific clusters
type Scheduler struct {
DynamicClient dynamic.Interface
KarmadaClient karmadaclientset.Interface
KubeClient kubernetes.Interface
bindingInformer cache.SharedIndexInformer
bindingLister lister.PropagationBindingLister
policyInformer cache.SharedIndexInformer
policyLister lister.PropagationPolicyLister
informerFactory informerfactory.SharedInformerFactory
// TODO: implement a priority scheduling queue
queue workqueue.RateLimitingInterface
Algorithm core.ScheduleAlgorithm
schedulerCache schedulercache.Cache
}
// NewScheduler instantiates a scheduler
func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientset.Interface, kubeClient kubernetes.Interface) *Scheduler {
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)
bindingInformer := factory.Propagationstrategy().V1alpha1().PropagationBindings().Informer()
bindingLister := factory.Propagationstrategy().V1alpha1().PropagationBindings().Lister()
policyInformer := factory.Propagationstrategy().V1alpha1().PropagationPolicies().Informer()
policyLister := factory.Propagationstrategy().V1alpha1().PropagationPolicies().Lister()
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
schedulerCache := schedulercache.NewCache()
// TODO: make plugins as a flag
algorithm := core.NewGenericScheduler(schedulerCache, policyLister, []string{clusteraffinity.Name})
sched := &Scheduler{
DynamicClient: dynamicClient,
KarmadaClient: karmadaClient,
KubeClient: kubeClient,
bindingInformer: bindingInformer,
bindingLister: bindingLister,
policyInformer: policyInformer,
policyLister: policyLister,
informerFactory: factory,
queue: queue,
Algorithm: algorithm,
schedulerCache: schedulerCache,
}
bindingInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sched.onPropagationBindingAdd,
UpdateFunc: sched.onPropagationBindingUpdate,
})
memclusterInformer := factory.Membercluster().V1alpha1().MemberClusters().Informer()
memclusterInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.addCluster,
UpdateFunc: sched.updateCluster,
DeleteFunc: sched.deleteCluster,
},
)
return sched
}
// Run runs the scheduler
func (s *Scheduler) Run(ctx context.Context) {
stopCh := ctx.Done()
klog.Infof("Starting karmada scheduler")
defer klog.Infof("Shutting down karmada scheduler")
s.informerFactory.Start(stopCh)
if !cache.WaitForCacheSync(stopCh, s.bindingInformer.HasSynced) {
return
}
go wait.Until(s.worker, time.Second, stopCh)
<-stopCh
}
func (s *Scheduler) onPropagationBindingAdd(obj interface{}) {
propagationBinding := obj.(*v1alpha1.PropagationBinding)
if len(propagationBinding.Spec.Clusters) > 0 {
return
}
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
klog.Errorf("couldn't get key for object %#v: %v", obj, err)
return
}
s.queue.Add(key)
}
func (s *Scheduler) onPropagationBindingUpdate(old, cur interface{}) {
s.onPropagationBindingAdd(cur)
}
func (s *Scheduler) worker() {
for s.scheduleNext() {
}
}
func (s *Scheduler) scheduleNext() bool {
key, shutdown := s.queue.Get()
if shutdown {
klog.Errorf("Fail to pop item from queue")
return false
}
defer s.queue.Done(key)
err := s.scheduleOne(key.(string))
s.handleErr(err, key)
return true
}
func (s *Scheduler) scheduleOne(key string) (err error) {
klog.V(4).Infof("begin scheduling PropagationBinding %s", key)
defer klog.V(4).Infof("end scheduling PropagationBinding %s: %v", key, err)
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
propagationBinding, err := s.bindingLister.PropagationBindings(ns).Get(name)
if errors.IsNotFound(err) {
return nil
}
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), propagationBinding)
if err != nil {
klog.V(2).Infof("failed scheduling PropagationBinding %s: %v", key, err)
return err
}
klog.V(4).Infof("PropagationBinding %s scheduled to clusters %v", key, scheduleResult.SuggestedClusters)
binding := propagationBinding.DeepCopy()
targetClusters := make([]v1alpha1.TargetCluster, len(scheduleResult.SuggestedClusters))
for i, cluster := range scheduleResult.SuggestedClusters {
targetClusters[i] = v1alpha1.TargetCluster{Name: cluster}
}
binding.Spec.Clusters = targetClusters
_, err = s.KarmadaClient.PropagationstrategyV1alpha1().PropagationBindings(ns).Update(context.TODO(), binding, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}
func (s *Scheduler) handleErr(err error, key interface{}) {
if err == nil || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
s.queue.Forget(key)
return
}
if s.queue.NumRequeues(key) < maxRetries {
s.queue.AddRateLimited(key)
return
}
utilruntime.HandleError(err)
klog.V(2).Infof("Dropping propagationbinding %q out of the queue: %v", key, err)
s.queue.Forget(key)
}
func (s *Scheduler) addCluster(obj interface{}) {
membercluster, ok := obj.(*memclusterapi.MemberCluster)
if !ok {
klog.Errorf("cannot convert to MemberCluster: %v", obj)
return
}
klog.V(3).Infof("add event for membercluster %s", membercluster.Name)
s.schedulerCache.AddCluster(membercluster)
}
func (s *Scheduler) updateCluster(_, newObj interface{}) {
newCluster, ok := newObj.(*memclusterapi.MemberCluster)
if !ok {
klog.Errorf("cannot convert newObj to MemberCluster: %v", newObj)
return
}
klog.V(3).Infof("update event for membercluster %s", newCluster.Name)
s.schedulerCache.UpdateCluster(newCluster)
}
func (s *Scheduler) deleteCluster(obj interface{}) {
var cluster *memclusterapi.MemberCluster
switch t := obj.(type) {
case *memclusterapi.MemberCluster:
cluster = t
case cache.DeletedFinalStateUnknown:
var ok bool
cluster, ok = t.Obj.(*memclusterapi.MemberCluster)
if !ok {
klog.Errorf("cannot convert to memclusterapi.MemberCluster: %v", t.Obj)
return
}
default:
klog.Errorf("cannot convert to memclusterapi.MemberCluster: %v", t)
return
}
klog.V(3).Infof("delete event for membercluster %s", cluster.Name)
s.schedulerCache.DeleteCluster(cluster)
}

1
vendor/modules.txt vendored
View File

@ -59,6 +59,7 @@ github.com/google/go-cmp/cmp/internal/value
# github.com/google/gofuzz v1.1.0
github.com/google/gofuzz
# github.com/google/uuid v1.1.1
## explicit
github.com/google/uuid
# github.com/googleapis/gnostic v0.4.1
github.com/googleapis/gnostic/compiler