feat: add last heartbeat tracking for scheduler service
Signed-off-by: chlins <chlins.zhang@gmail.com>
This commit is contained in:
parent
88cc9b622d
commit
b7e67a9110
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
package models
|
||||
|
||||
import "time"
|
||||
|
||||
const (
|
||||
// SchedulerStateActive represents the scheduler whose state is active.
|
||||
SchedulerStateActive = "active"
|
||||
|
|
@ -34,6 +36,7 @@ type Scheduler struct {
|
|||
Port int32 `gorm:"column:port;not null;comment:grpc service listening port" json:"port"`
|
||||
State string `gorm:"column:state;type:varchar(256);default:'inactive';comment:service state" json:"state"`
|
||||
Features Array `gorm:"column:features;comment:feature flags" json:"features"`
|
||||
LastKeepAliveAt time.Time `gorm:"column:last_keep_alive_at;comment:last keep alive time" json:"last_keep_alive_at"`
|
||||
SchedulerClusterID uint `gorm:"index:uk_scheduler,unique;not null;comment:scheduler cluster id" json:"scheduler_cluster_id"`
|
||||
SchedulerCluster SchedulerCluster `json:"scheduler_cluster"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import (
|
|||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
cachev9 "github.com/go-redis/cache/v9"
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
|
@ -476,6 +477,7 @@ func (s *managerServerV2) UpdateScheduler(ctx context.Context, req *managerv2.Up
|
|||
Port: req.GetPort(),
|
||||
SchedulerClusterID: uint(req.GetSchedulerClusterId()),
|
||||
Features: schedulerFeatures,
|
||||
LastKeepAliveAt: time.Now(),
|
||||
}).Error; err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
|
@ -522,6 +524,7 @@ func (s *managerServerV2) createScheduler(ctx context.Context, req *managerv2.Up
|
|||
Port: req.GetPort(),
|
||||
Features: schedulerFeatures,
|
||||
SchedulerClusterID: uint(req.GetSchedulerClusterId()),
|
||||
LastKeepAliveAt: time.Now(),
|
||||
}
|
||||
|
||||
if err := s.db.WithContext(ctx).Create(&scheduler).Error; err != nil {
|
||||
|
|
@ -816,8 +819,7 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er
|
|||
}
|
||||
|
||||
for {
|
||||
_, err := stream.Recv()
|
||||
if err != nil {
|
||||
if _, err := stream.Recv(); err != nil {
|
||||
// Inactive scheduler.
|
||||
if sourceType == managerv2.SourceType_SCHEDULER_SOURCE {
|
||||
scheduler := models.Scheduler{}
|
||||
|
|
@ -868,5 +870,19 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er
|
|||
log.Errorf("keepalive failed: %s", err.Error())
|
||||
return status.Error(codes.Unknown, err.Error())
|
||||
}
|
||||
|
||||
// Keepalive successful, update last heartbeat time.
|
||||
if sourceType == managerv2.SourceType_SCHEDULER_SOURCE {
|
||||
scheduler := models.Scheduler{}
|
||||
if err := s.db.First(&scheduler, models.Scheduler{
|
||||
Hostname: hostname,
|
||||
IP: ip,
|
||||
SchedulerClusterID: clusterID,
|
||||
}).Updates(models.Scheduler{
|
||||
LastKeepAliveAt: time.Now(),
|
||||
}).Error; err != nil {
|
||||
log.Errorf("update scheduler last heartbeat failed: %s", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue