Merge pull request #731 from dddddai/sceduler-cache

Use cluster lister instead of map to implement scheduler cache
This commit is contained in:
karmada-bot 2021-09-18 17:20:20 +08:00 committed by GitHub
commit fd89085dac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 24 deletions

View File

@ -1,9 +1,11 @@
package cache
import (
"sync"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
)
@ -17,43 +19,38 @@ type Cache interface {
}
type schedulerCache struct {
mutex sync.RWMutex
clusters map[string]*clusterv1alpha1.Cluster
clusterLister clusterlister.ClusterLister
}
// NewCache instantiates a cache used only by scheduler.
func NewCache() Cache {
func NewCache(clusterLister clusterlister.ClusterLister) Cache {
return &schedulerCache{
clusters: make(map[string]*clusterv1alpha1.Cluster),
clusterLister: clusterLister,
}
}
// AddCluster does nothing since clusterLister would synchronize automatically
func (c *schedulerCache) AddCluster(cluster *clusterv1alpha1.Cluster) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.clusters[cluster.Name] = cluster
}
// UpdateCluster does nothing since clusterLister would synchronize automatically
func (c *schedulerCache) UpdateCluster(cluster *clusterv1alpha1.Cluster) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.clusters[cluster.Name] = cluster
}
// DeleteCluster does nothing since clusterLister would synchronize automatically
func (c *schedulerCache) DeleteCluster(cluster *clusterv1alpha1.Cluster) {
c.mutex.Lock()
defer c.mutex.Unlock()
delete(c.clusters, cluster.Name)
}
// TODO: need optimization, only clone when necessary
func (c *schedulerCache) Snapshot() *Snapshot {
c.mutex.RLock()
defer c.mutex.RUnlock()
clusters, err := c.clusterLister.List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list clusters: %v", err)
return nil
}
out := NewEmptySnapshot()
out.clusterInfoList = make([]*framework.ClusterInfo, 0, len(c.clusters))
for _, cluster := range c.clusters {
out.clusterInfoList = make([]*framework.ClusterInfo, 0, len(clusters))
for _, cluster := range clusters {
cloned := cluster.DeepCopy()
out.clusterInfoList = append(out.clusterInfoList, framework.NewClusterInfo(cloned))
}

View File

@ -105,7 +105,7 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
clusterPolicyLister := factory.Policy().V1alpha1().ClusterPropagationPolicies().Lister()
clusterLister := factory.Cluster().V1alpha1().Clusters().Lister()
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
schedulerCache := schedulercache.NewCache()
schedulerCache := schedulercache.NewCache(clusterLister)
// TODO: make plugins as a flag
algorithm := core.NewGenericScheduler(schedulerCache, policyLister, []string{clusteraffinity.Name, tainttoleration.Name, apiinstalled.Name})
sched := &Scheduler{
@ -506,8 +506,6 @@ func (s *Scheduler) addCluster(obj interface{}) {
}
klog.V(3).Infof("add event for cluster %s", cluster.Name)
s.schedulerCache.AddCluster(cluster)
if s.enableSchedulerEstimator {
s.schedulerEstimatorWorker.AddRateLimited(cluster.Name)
}
@ -520,7 +518,6 @@ func (s *Scheduler) updateCluster(_, newObj interface{}) {
return
}
klog.V(3).Infof("update event for cluster %s", newCluster.Name)
s.schedulerCache.UpdateCluster(newCluster)
if s.enableSchedulerEstimator {
s.schedulerEstimatorWorker.AddRateLimited(newCluster.Name)
@ -555,7 +552,6 @@ func (s *Scheduler) deleteCluster(obj interface{}) {
return
}
klog.V(3).Infof("delete event for cluster %s", cluster.Name)
s.schedulerCache.DeleteCluster(cluster)
if s.enableSchedulerEstimator {
s.schedulerEstimatorWorker.AddRateLimited(cluster.Name)