From 146f82c453d1380c2daa1ce1ff88344fc9fed4bf Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 9 Jun 2023 15:46:56 +0800 Subject: [PATCH] feat: optimize announcer in scheduler and client (#2445) Signed-off-by: Gaius --- client/daemon/announcer/announcer.go | 14 ++++++------- scheduler/announcer/announcer.go | 31 ++++++++++------------------ 2 files changed, 17 insertions(+), 28 deletions(-) diff --git a/client/daemon/announcer/announcer.go b/client/daemon/announcer/announcer.go index c254d6a0e..f69a0ec93 100644 --- a/client/daemon/announcer/announcer.go +++ b/client/daemon/announcer/announcer.go @@ -308,14 +308,12 @@ func (a *announcer) announceToManager() error { } // Start keepalive to manager. - go func() { - a.managerClient.KeepAlive(a.config.Scheduler.Manager.SeedPeer.KeepAlive.Interval, &managerv1.KeepAliveRequest{ - SourceType: managerv1.SourceType_SEED_PEER_SOURCE, - Hostname: a.config.Host.Hostname, - Ip: a.config.Host.AdvertiseIP.String(), - ClusterId: uint64(a.config.Scheduler.Manager.SeedPeer.ClusterID), - }, a.done) - }() + go a.managerClient.KeepAlive(a.config.Scheduler.Manager.SeedPeer.KeepAlive.Interval, &managerv1.KeepAliveRequest{ + SourceType: managerv1.SourceType_SEED_PEER_SOURCE, + Hostname: a.config.Host.Hostname, + Ip: a.config.Host.AdvertiseIP.String(), + ClusterId: uint64(a.config.Scheduler.Manager.SeedPeer.ClusterID), + }, a.done) } return nil diff --git a/scheduler/announcer/announcer.go b/scheduler/announcer/announcer.go index 6c98d75df..d1d4a4b49 100644 --- a/scheduler/announcer/announcer.go +++ b/scheduler/announcer/announcer.go @@ -101,15 +101,11 @@ func New(cfg *config.Config, managerClient managerclient.V2, storage storage.Sto // Started announcer server. func (a *announcer) Serve() error { logger.Info("announce scheduler to manager") - if err := a.announceToManager(); err != nil { - return err - } + go a.announceToManager() if a.trainerClient != nil { logger.Info("announce scheduler to trainer") - if err := a.announceToTrainer(); err != nil { - return err - } + a.announceToTrainer() } return nil @@ -122,22 +118,17 @@ func (a *announcer) Stop() error { } // announceSeedPeer announces peer information to manager. -func (a *announcer) announceToManager() error { - // Start keepalive to manager. - go func() { - a.managerClient.KeepAlive(a.config.Manager.KeepAlive.Interval, &managerv2.KeepAliveRequest{ - SourceType: managerv2.SourceType_SCHEDULER_SOURCE, - Hostname: a.config.Server.Host, - Ip: a.config.Server.AdvertiseIP.String(), - ClusterId: uint64(a.config.Manager.SchedulerClusterID), - }, a.done) - }() - - return nil +func (a *announcer) announceToManager() { + a.managerClient.KeepAlive(a.config.Manager.KeepAlive.Interval, &managerv2.KeepAliveRequest{ + SourceType: managerv2.SourceType_SCHEDULER_SOURCE, + Hostname: a.config.Server.Host, + Ip: a.config.Server.AdvertiseIP.String(), + ClusterId: uint64(a.config.Manager.SchedulerClusterID), + }, a.done) } // announceSeedPeer announces dataset to trainer. -func (a *announcer) announceToTrainer() error { +func (a *announcer) announceToTrainer() { tick := time.NewTicker(a.config.Trainer.Interval) for { select { @@ -146,7 +137,7 @@ func (a *announcer) announceToTrainer() error { logger.Error(err) } case <-a.done: - return nil + return } } }