From b7e67a9110ec82dd7d00658e00938a4e23163157 Mon Sep 17 00:00:00 2001 From: chlins Date: Wed, 16 Jul 2025 16:38:07 +0800 Subject: [PATCH] feat: add last heartbeat tracking for scheduler service Signed-off-by: chlins --- manager/models/scheduler.go | 3 +++ manager/rpcserver/manager_server_v2.go | 20 ++++++++++++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/manager/models/scheduler.go b/manager/models/scheduler.go index f3eb819a9..5537b620c 100644 --- a/manager/models/scheduler.go +++ b/manager/models/scheduler.go @@ -16,6 +16,8 @@ package models +import "time" + const ( // SchedulerStateActive represents the scheduler whose state is active. SchedulerStateActive = "active" @@ -34,6 +36,7 @@ type Scheduler struct { Port int32 `gorm:"column:port;not null;comment:grpc service listening port" json:"port"` State string `gorm:"column:state;type:varchar(256);default:'inactive';comment:service state" json:"state"` Features Array `gorm:"column:features;comment:feature flags" json:"features"` + LastKeepAliveAt time.Time `gorm:"column:last_keep_alive_at;comment:last keep alive time" json:"last_keep_alive_at"` SchedulerClusterID uint `gorm:"index:uk_scheduler,unique;not null;comment:scheduler cluster id" json:"scheduler_cluster_id"` SchedulerCluster SchedulerCluster `json:"scheduler_cluster"` } diff --git a/manager/rpcserver/manager_server_v2.go b/manager/rpcserver/manager_server_v2.go index 9de5ce882..ace31a431 100644 --- a/manager/rpcserver/manager_server_v2.go +++ b/manager/rpcserver/manager_server_v2.go @@ -21,6 +21,7 @@ import ( "encoding/json" "errors" "io" + "time" cachev9 "github.com/go-redis/cache/v9" "github.com/redis/go-redis/v9" @@ -476,6 +477,7 @@ func (s *managerServerV2) UpdateScheduler(ctx context.Context, req *managerv2.Up Port: req.GetPort(), SchedulerClusterID: uint(req.GetSchedulerClusterId()), Features: schedulerFeatures, + LastKeepAliveAt: time.Now(), }).Error; err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -522,6 +524,7 @@ func (s *managerServerV2) createScheduler(ctx context.Context, req *managerv2.Up Port: req.GetPort(), Features: schedulerFeatures, SchedulerClusterID: uint(req.GetSchedulerClusterId()), + LastKeepAliveAt: time.Now(), } if err := s.db.WithContext(ctx).Create(&scheduler).Error; err != nil { @@ -816,8 +819,7 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er } for { - _, err := stream.Recv() - if err != nil { + if _, err := stream.Recv(); err != nil { // Inactive scheduler. if sourceType == managerv2.SourceType_SCHEDULER_SOURCE { scheduler := models.Scheduler{} @@ -868,5 +870,19 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er log.Errorf("keepalive failed: %s", err.Error()) return status.Error(codes.Unknown, err.Error()) } + + // Keepalive successful, update last heartbeat time. + if sourceType == managerv2.SourceType_SCHEDULER_SOURCE { + scheduler := models.Scheduler{} + if err := s.db.First(&scheduler, models.Scheduler{ + Hostname: hostname, + IP: ip, + SchedulerClusterID: clusterID, + }).Updates(models.Scheduler{ + LastKeepAliveAt: time.Now(), + }).Error; err != nil { + log.Errorf("update scheduler last heartbeat failed: %s", err.Error()) + } + } } }