Merge pull request #2527 from carlory/fix-02

scheduler & descheduler introduce scheduler-estimator-service-prefix flag
This commit is contained in:
karmada-bot 2022-10-15 14:37:10 +08:00 committed by GitHub
commit 78fb5dbb3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 44 additions and 17 deletions

View File

@ -42,6 +42,8 @@ type Options struct {
// SchedulerEstimatorTimeout specifies the timeout period of calling the accurate scheduler estimator service.
SchedulerEstimatorTimeout metav1.Duration
// SchedulerEstimatorServicePrefix presents the prefix of the accurate scheduler estimator service name.
SchedulerEstimatorServicePrefix string
// SchedulerEstimatorPort is the port that the accurate scheduler estimator server serves at.
SchedulerEstimatorPort int
// DeschedulingInterval specifies time interval for descheduler to run.
@ -81,6 +83,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.DurationVar(&o.SchedulerEstimatorTimeout.Duration, "scheduler-estimator-timeout", 3*time.Second, "Specifies the timeout period of calling the scheduler estimator service.")
fs.IntVar(&o.SchedulerEstimatorPort, "scheduler-estimator-port", defaultEstimatorPort, "The secure port on which to connect the accurate scheduler estimator.")
fs.StringVar(&o.SchedulerEstimatorServicePrefix, "scheduler-estimator-service-prefix", "karmada-scheduler-estimator", "The prefix of scheduler estimator service name")
fs.DurationVar(&o.DeschedulingInterval.Duration, "descheduling-interval", defaultDeschedulingInterval, "Time interval between two consecutive descheduler executions. Setting this value instructs the descheduler to run in a continuous loop at the interval specified.")
fs.DurationVar(&o.UnschedulableThreshold.Duration, "unschedulable-threshold", defaultUnschedulableThreshold, "The period of pod unschedulable condition. This value is considered as a classification standard of unschedulable replicas.")
o.ProfileOpts.AddFlags(fs)

View File

@ -49,6 +49,8 @@ type Options struct {
DisableSchedulerEstimatorInPullMode bool
// SchedulerEstimatorTimeout specifies the timeout period of calling the accurate scheduler estimator service.
SchedulerEstimatorTimeout metav1.Duration
// SchedulerEstimatorServicePrefix presents the prefix of the accurate scheduler estimator service name.
SchedulerEstimatorServicePrefix string
// SchedulerEstimatorPort is the port that the accurate scheduler estimator server serves at.
SchedulerEstimatorPort int
@ -95,6 +97,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&o.EnableSchedulerEstimator, "enable-scheduler-estimator", false, "Enable calling cluster scheduler estimator for adjusting replicas.")
fs.BoolVar(&o.DisableSchedulerEstimatorInPullMode, "disable-scheduler-estimator-in-pull-mode", false, "Disable the scheduler estimator for clusters in pull mode, which takes effect only when enable-scheduler-estimator is true.")
fs.DurationVar(&o.SchedulerEstimatorTimeout.Duration, "scheduler-estimator-timeout", 3*time.Second, "Specifies the timeout period of calling the scheduler estimator service.")
fs.StringVar(&o.SchedulerEstimatorServicePrefix, "scheduler-estimator-service-prefix", "karmada-scheduler-estimator", "The prefix of scheduler estimator service name")
fs.IntVar(&o.SchedulerEstimatorPort, "scheduler-estimator-port", defaultEstimatorPort, "The secure port on which to connect the accurate scheduler estimator.")
fs.BoolVar(&o.EnableEmptyWorkloadPropagation, "enable-empty-workload-propagation", false, "Enable workload with replicas 0 to be propagated to member clusters.")
fs.StringSliceVar(&o.Plugins, "plugins", []string{"*"},

View File

@ -136,6 +136,7 @@ func run(opts *options.Options, stopChan <-chan struct{}, registryOptions ...Opt
scheduler.WithOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithEnableSchedulerEstimator(opts.EnableSchedulerEstimator),
scheduler.WithDisableSchedulerEstimatorInPullMode(opts.DisableSchedulerEstimatorInPullMode),
scheduler.WithSchedulerEstimatorServicePrefix(opts.SchedulerEstimatorServicePrefix),
scheduler.WithSchedulerEstimatorPort(opts.SchedulerEstimatorPort),
scheduler.WithSchedulerEstimatorTimeout(opts.SchedulerEstimatorTimeout),
scheduler.WithEnableEmptyWorkloadPropagation(opts.EnableEmptyWorkloadPropagation),

View File

@ -46,9 +46,10 @@ type Descheduler struct {
eventRecorder record.EventRecorder
schedulerEstimatorCache *estimatorclient.SchedulerEstimatorCache
schedulerEstimatorPort int
schedulerEstimatorWorker util.AsyncWorker
schedulerEstimatorCache *estimatorclient.SchedulerEstimatorCache
schedulerEstimatorServicePrefix string
schedulerEstimatorPort int
schedulerEstimatorWorker util.AsyncWorker
unschedulableThreshold time.Duration
deschedulingInterval time.Duration
@ -250,7 +251,7 @@ func (d *Descheduler) establishEstimatorConnections() {
return
}
for i := range clusterList.Items {
if err = estimatorclient.EstablishConnection(d.KubeClient, clusterList.Items[i].Name, d.schedulerEstimatorCache, d.schedulerEstimatorPort); err != nil {
if err = estimatorclient.EstablishConnection(d.KubeClient, clusterList.Items[i].Name, d.schedulerEstimatorCache, d.schedulerEstimatorServicePrefix, d.schedulerEstimatorPort); err != nil {
klog.Error(err)
}
}
@ -270,7 +271,7 @@ func (d *Descheduler) reconcileEstimatorConnection(key util.QueueKey) error {
}
return err
}
return estimatorclient.EstablishConnection(d.KubeClient, name, d.schedulerEstimatorCache, d.schedulerEstimatorPort)
return estimatorclient.EstablishConnection(d.KubeClient, name, d.schedulerEstimatorCache, d.schedulerEstimatorServicePrefix, d.schedulerEstimatorPort)
}
func (d *Descheduler) recordDescheduleResultEventForResourceBinding(rb *workv1alpha2.ResourceBinding, message string, err error) {

View File

@ -80,13 +80,13 @@ func (c *SchedulerEstimatorCache) GetClient(name string) (estimatorservice.Estim
}
// EstablishConnection establishes a new gRPC connection with the specified cluster scheduler estimator.
func EstablishConnection(kubeClient kubernetes.Interface, name string, estimatorCache *SchedulerEstimatorCache, port int) error {
func EstablishConnection(kubeClient kubernetes.Interface, name string, estimatorCache *SchedulerEstimatorCache, estimatorServicePrefix string, port int) error {
if estimatorCache.IsEstimatorExist(name) {
return nil
}
serverAddr, err := resolveCluster(kubeClient, util.NamespaceKarmadaSystem,
names.GenerateEstimatorServiceName(name), int32(port))
names.GenerateEstimatorServiceName(estimatorServicePrefix, name), int32(port))
if err != nil {
return err
}

View File

@ -80,6 +80,7 @@ type Scheduler struct {
enableSchedulerEstimator bool
disableSchedulerEstimatorInPullMode bool
schedulerEstimatorCache *estimatorclient.SchedulerEstimatorCache
schedulerEstimatorServicePrefix string
schedulerEstimatorPort int
schedulerEstimatorWorker util.AsyncWorker
@ -93,6 +94,8 @@ type schedulerOptions struct {
disableSchedulerEstimatorInPullMode bool
// schedulerEstimatorTimeout specifies the timeout period of calling the accurate scheduler estimator service.
schedulerEstimatorTimeout metav1.Duration
// SchedulerEstimatorServicePrefix presents the prefix of the accurate scheduler estimator service name.
schedulerEstimatorServicePrefix string
// schedulerEstimatorPort is the port that the accurate scheduler estimator server serves at.
schedulerEstimatorPort int
//enableEmptyWorkloadPropagation represents whether allow workload with replicas 0 propagated to member clusters should be enabled
@ -127,6 +130,13 @@ func WithSchedulerEstimatorTimeout(schedulerEstimatorTimeout metav1.Duration) Op
}
}
// WithSchedulerEstimatorServicePrefix sets the schedulerEstimatorServicePrefix for scheduler
func WithSchedulerEstimatorServicePrefix(schedulerEstimatorServicePrefix string) Option {
return func(o *schedulerOptions) {
o.schedulerEstimatorServicePrefix = schedulerEstimatorServicePrefix
}
}
// WithSchedulerEstimatorPort sets the schedulerEstimatorPort for scheduler
func WithSchedulerEstimatorPort(schedulerEstimatorPort int) Option {
return func(o *schedulerOptions) {
@ -200,6 +210,7 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
if options.enableSchedulerEstimator {
sched.enableSchedulerEstimator = options.enableSchedulerEstimator
sched.disableSchedulerEstimatorInPullMode = options.disableSchedulerEstimatorInPullMode
sched.schedulerEstimatorServicePrefix = options.schedulerEstimatorServicePrefix
sched.schedulerEstimatorPort = options.schedulerEstimatorPort
sched.schedulerEstimatorCache = estimatorclient.NewSchedulerEstimatorCache()
schedulerEstimatorWorkerOptions := util.Options{
@ -577,7 +588,7 @@ func (s *Scheduler) reconcileEstimatorConnection(key util.QueueKey) error {
return nil
}
return estimatorclient.EstablishConnection(s.KubeClient, name, s.schedulerEstimatorCache, s.schedulerEstimatorPort)
return estimatorclient.EstablishConnection(s.KubeClient, name, s.schedulerEstimatorCache, s.schedulerEstimatorServicePrefix, s.schedulerEstimatorPort)
}
func (s *Scheduler) establishEstimatorConnections() {
@ -590,7 +601,7 @@ func (s *Scheduler) establishEstimatorConnections() {
if clusterList.Items[i].Spec.SyncMode == clusterv1alpha1.Pull && s.disableSchedulerEstimatorInPullMode {
continue
}
if err = estimatorclient.EstablishConnection(s.KubeClient, clusterList.Items[i].Name, s.schedulerEstimatorCache, s.schedulerEstimatorPort); err != nil {
if err = estimatorclient.EstablishConnection(s.KubeClient, clusterList.Items[i].Name, s.schedulerEstimatorCache, s.schedulerEstimatorServicePrefix, s.schedulerEstimatorPort); err != nil {
klog.Error(err)
}
}

View File

@ -127,7 +127,7 @@ func GenerateDerivedServiceName(serviceName string) string {
}
// GenerateEstimatorServiceName generates the gRPC scheduler estimator service name which belongs to a cluster.
func GenerateEstimatorServiceName(clusterName string) string {
func GenerateEstimatorServiceName(estimatorServicePrefix, clusterName string) string {
return fmt.Sprintf("%s-%s", estimatorServicePrefix, clusterName)
}

View File

@ -312,18 +312,26 @@ func TestGenerateEstimatorDeploymentName(t *testing.T) {
func TestGenerateEstimatorServiceName(t *testing.T) {
tests := []struct {
name string
clusterName string
expected string
name string
clusterName string
estimatorServicePrefix string
expected string
}{
{
name: "",
clusterName: "cluster",
expected: "karmada-scheduler-estimator-cluster",
name: "",
clusterName: "cluster",
estimatorServicePrefix: "karmada-scheduler-estimator",
expected: "karmada-scheduler-estimator-cluster",
},
{
name: "",
clusterName: "cluster",
estimatorServicePrefix: "demo-karmada-scheduler-estimator",
expected: "demo-karmada-scheduler-estimator-cluster",
},
}
for _, test := range tests {
got := GenerateEstimatorServiceName(test.clusterName)
got := GenerateEstimatorServiceName(test.estimatorServicePrefix, test.clusterName)
if got != test.expected {
t.Errorf("Test %s failed: expected %v, but got %v", test.name, test.expected, got)
}