feat: optimize announcer in scheduler and client (#2445)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-06-09 15:46:56 +08:00
parent 33cc68159b
commit 146f82c453
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
2 changed files with 17 additions and 28 deletions

View File

@ -308,14 +308,12 @@ func (a *announcer) announceToManager() error {
} }
// Start keepalive to manager. // Start keepalive to manager.
go func() { go a.managerClient.KeepAlive(a.config.Scheduler.Manager.SeedPeer.KeepAlive.Interval, &managerv1.KeepAliveRequest{
a.managerClient.KeepAlive(a.config.Scheduler.Manager.SeedPeer.KeepAlive.Interval, &managerv1.KeepAliveRequest{
SourceType: managerv1.SourceType_SEED_PEER_SOURCE, SourceType: managerv1.SourceType_SEED_PEER_SOURCE,
Hostname: a.config.Host.Hostname, Hostname: a.config.Host.Hostname,
Ip: a.config.Host.AdvertiseIP.String(), Ip: a.config.Host.AdvertiseIP.String(),
ClusterId: uint64(a.config.Scheduler.Manager.SeedPeer.ClusterID), ClusterId: uint64(a.config.Scheduler.Manager.SeedPeer.ClusterID),
}, a.done) }, a.done)
}()
} }
return nil return nil

View File

@ -101,15 +101,11 @@ func New(cfg *config.Config, managerClient managerclient.V2, storage storage.Sto
// Started announcer server. // Started announcer server.
func (a *announcer) Serve() error { func (a *announcer) Serve() error {
logger.Info("announce scheduler to manager") logger.Info("announce scheduler to manager")
if err := a.announceToManager(); err != nil { go a.announceToManager()
return err
}
if a.trainerClient != nil { if a.trainerClient != nil {
logger.Info("announce scheduler to trainer") logger.Info("announce scheduler to trainer")
if err := a.announceToTrainer(); err != nil { a.announceToTrainer()
return err
}
} }
return nil return nil
@ -122,22 +118,17 @@ func (a *announcer) Stop() error {
} }
// announceSeedPeer announces peer information to manager. // announceSeedPeer announces peer information to manager.
func (a *announcer) announceToManager() error { func (a *announcer) announceToManager() {
// Start keepalive to manager.
go func() {
a.managerClient.KeepAlive(a.config.Manager.KeepAlive.Interval, &managerv2.KeepAliveRequest{ a.managerClient.KeepAlive(a.config.Manager.KeepAlive.Interval, &managerv2.KeepAliveRequest{
SourceType: managerv2.SourceType_SCHEDULER_SOURCE, SourceType: managerv2.SourceType_SCHEDULER_SOURCE,
Hostname: a.config.Server.Host, Hostname: a.config.Server.Host,
Ip: a.config.Server.AdvertiseIP.String(), Ip: a.config.Server.AdvertiseIP.String(),
ClusterId: uint64(a.config.Manager.SchedulerClusterID), ClusterId: uint64(a.config.Manager.SchedulerClusterID),
}, a.done) }, a.done)
}()
return nil
} }
// announceSeedPeer announces dataset to trainer. // announceSeedPeer announces dataset to trainer.
func (a *announcer) announceToTrainer() error { func (a *announcer) announceToTrainer() {
tick := time.NewTicker(a.config.Trainer.Interval) tick := time.NewTicker(a.config.Trainer.Interval)
for { for {
select { select {
@ -146,7 +137,7 @@ func (a *announcer) announceToTrainer() error {
logger.Error(err) logger.Error(err)
} }
case <-a.done: case <-a.done:
return nil return
} }
} }
} }