From 94a02a2005d03a7ebafcbb4c7bca38ff613383b8 Mon Sep 17 00:00:00 2001 From: Chlins Zhang Date: Fri, 18 Jul 2025 10:42:44 +0800 Subject: [PATCH] feat: support mark inactive schedulers in the scheduler gc (#4215) Signed-off-by: chlins --- manager/gc/audit.go | 2 +- manager/gc/scheduler.go | 80 +++++++++++++++++++++++++++++++++++++++-- manager/gc/seed_peer.go | 2 +- 3 files changed, 80 insertions(+), 4 deletions(-) diff --git a/manager/gc/audit.go b/manager/gc/audit.go index 39f9d5b44..a007b4b10 100644 --- a/manager/gc/audit.go +++ b/manager/gc/audit.go @@ -107,7 +107,7 @@ func (a *audit) RunGC(ctx context.Context) error { } gcResult.Purged += result.RowsAffected - logger.Infof("gc audit deleted %d audits", result.RowsAffected) + logger.Infof("audit GC deleted %d audits", result.RowsAffected) } return nil diff --git a/manager/gc/scheduler.go b/manager/gc/scheduler.go index 0cc2d305f..6c46fb7a9 100644 --- a/manager/gc/scheduler.go +++ b/manager/gc/scheduler.go @@ -62,6 +62,77 @@ type scheduler struct { // RunGC implements the gc Runner interface. func (s *scheduler) RunGC(ctx context.Context) error { + if err := s.mark(ctx); err != nil { + // Only log error if failed in mark phase, + // because this can wait to be executed in the next gc window. + logger.Errorf("failed to mark inactive schedulers: %v", err) + } + + return s.sweep(ctx) +} + +// mark running the mark operation for marking inactive schedulers. +func (s *scheduler) mark(_ context.Context) error { + logger.Info("running scheduler GC mark") + + var schedulers []*models.Scheduler + for { + if err := s.db.Model(&models.Scheduler{}). + Where("state = ?", models.SchedulerStateActive). + // As there is no configuration for the old scheduler, so exclude these schedulers. + Where("config IS NOT NULL AND config != ''"). + Limit(DefaultSchedulerGCBatchSize). + Find(&schedulers).Error; err != nil { + return err + } + if len(schedulers) == 0 { + break + } + + now := time.Now() + schedulerIDs := make([]uint, 0, len(schedulers)) + for _, scheduler := range schedulers { + if scheduler.Config == nil { + continue + } + + // Retrieve the keep alive interval from the scheduler's configuration. + keepAliveInterval, ok := scheduler.Config["manager_keep_alive_interval"].(float64) + if !ok { + continue + } + + // Check whether the last keep alive time is greater than 3x keep alive interval, + // indicating that the scheduler is inactive. + if now.Sub(scheduler.LastKeepAliveAt) > time.Duration(keepAliveInterval)*3 { + schedulerIDs = append(schedulerIDs, scheduler.ID) + } + } + + if len(schedulerIDs) > 0 { + if err := s.db.Model(&models.Scheduler{}). + Where("id IN (?)", schedulerIDs). + Update("state", models.SchedulerStateInactive). + Error; err != nil { + return err + } + } + + logger.Infof("scheduler GC marks %d schedulers to inactive", len(schedulerIDs)) + + // If this batch is not full, break the loop as it indicates that this is the last page. + if len(schedulers) < DefaultSchedulerGCBatchSize { + break + } + } + + return nil +} + +// sweep running the sweep operation for cleaning up inactive schedulers. +func (s *scheduler) sweep(ctx context.Context) error { + logger.Info("running scheduler GC sweep") + args := models.JSONMap{ "type": SchedulerGCTaskID, "ttl": DefaultSchedulerGCTTL, @@ -93,7 +164,12 @@ func (s *scheduler) RunGC(ctx context.Context) error { }() for { - result := s.db.Where("updated_at < ?", time.Now().Add(-DefaultSchedulerGCTTL)).Where("state = ?", models.SchedulerStateInactive).Limit(DefaultSchedulerGCBatchSize).Unscoped().Delete(&models.Scheduler{}) + result := s.db.Where("updated_at < ?", time.Now(). + Add(-DefaultSchedulerGCTTL)). + Where("state = ?", models.SchedulerStateInactive). + Limit(DefaultSchedulerGCBatchSize). + Unscoped(). + Delete(&models.Scheduler{}) if result.Error != nil { gcResult.Error = result.Error return result.Error @@ -104,7 +180,7 @@ func (s *scheduler) RunGC(ctx context.Context) error { } gcResult.Purged += result.RowsAffected - logger.Infof("gc scheduler deleted %d inactive schedulers", result.RowsAffected) + logger.Infof("scheduler GC deleted %d inactive schedulers", result.RowsAffected) } return nil diff --git a/manager/gc/seed_peer.go b/manager/gc/seed_peer.go index 80f848cd0..925ce83fb 100644 --- a/manager/gc/seed_peer.go +++ b/manager/gc/seed_peer.go @@ -104,7 +104,7 @@ func (s *seedPeer) RunGC(ctx context.Context) error { } gcResult.Purged += result.RowsAffected - logger.Infof("gc seed peer deleted %d inactive seed peers", result.RowsAffected) + logger.Infof("seed peer GC deleted %d inactive seed peers", result.RowsAffected) } return nil