diff --git a/internal/dflog/logger.go b/internal/dflog/logger.go index 7576e1a58..ea0326c68 100644 --- a/internal/dflog/logger.go +++ b/internal/dflog/logger.go @@ -123,9 +123,21 @@ func With(args ...any) *SugaredLoggerOnWith { } } -func WithHostID(hostID string) *SugaredLoggerOnWith { +func WithPeer(hostID, taskID, peerID string) *SugaredLoggerOnWith { return &SugaredLoggerOnWith{ - withArgs: []any{"hostID", hostID}, + withArgs: []any{"hostID", hostID, "taskID", taskID, "peerID", peerID}, + } +} + +func WithTask(taskID, url string) *SugaredLoggerOnWith { + return &SugaredLoggerOnWith{ + withArgs: []any{"taskID", taskID, "url", url}, + } +} + +func WithHost(hostID, hostname, ip string) *SugaredLoggerOnWith { + return &SugaredLoggerOnWith{ + withArgs: []any{"hostID", hostID, "hostname", hostname, "ip", ip}, } } @@ -141,12 +153,6 @@ func WithTaskAndPeerID(taskID, peerID string) *SugaredLoggerOnWith { } } -func WithTaskIDAndURL(taskID, url string) *SugaredLoggerOnWith { - return &SugaredLoggerOnWith{ - withArgs: []any{"taskID", taskID, "url", url}, - } -} - func WithHostnameAndIP(hostname, ip string) *SugaredLoggerOnWith { return &SugaredLoggerOnWith{ withArgs: []any{"hostname", hostname, "ip", ip}, diff --git a/scheduler/job/job.go b/scheduler/job/job.go index 7c795cefc..9552400dd 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -171,7 +171,7 @@ func (j *job) preheat(ctx context.Context, req string) error { // Trigger seed peer download seeds. taskID := idgen.TaskID(preheat.URL, urlMeta) - log := logger.WithTaskIDAndURL(taskID, preheat.URL) + log := logger.WithTask(taskID, preheat.URL) log.Infof("preheat %s headers: %#v, tag: %s, range: %s, filter: %s, digest: %s", preheat.URL, urlMeta.Header, urlMeta.Tag, urlMeta.Range, urlMeta.Filter, urlMeta.Digest) stream, err := j.resource.SeedPeer().Client().ObtainSeeds(ctx, &cdnsystemv1.SeedRequest{ diff --git a/scheduler/resource/host.go b/scheduler/resource/host.go index 76c923aaa..16e438c43 100644 --- a/scheduler/resource/host.go +++ b/scheduler/resource/host.go @@ -145,7 +145,7 @@ func NewHost(rawHost *schedulerv1.PeerHost, options ...HostOption) *Host { PeerCount: atomic.NewInt32(0), CreateAt: atomic.NewTime(time.Now()), UpdateAt: atomic.NewTime(time.Now()), - Log: logger.WithHostID(rawHost.Id), + Log: logger.WithHost(rawHost.Id, rawHost.HostName, rawHost.Ip), } for _, opt := range options { @@ -187,12 +187,12 @@ func (h *Host) LeavePeers() { return true } + peer.Log.Info("host leaves peers, causing the peer to leave") if err := peer.FSM.Event(PeerEventLeave); err != nil { peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return true } - h.Log.Infof("peer %s has been left", peer.ID) return true }) } diff --git a/scheduler/resource/peer.go b/scheduler/resource/peer.go index a11f31e16..8b37ce8a5 100644 --- a/scheduler/resource/peer.go +++ b/scheduler/resource/peer.go @@ -202,7 +202,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer { IsBackToSource: atomic.NewBool(false), CreateAt: atomic.NewTime(time.Now()), UpdateAt: atomic.NewTime(time.Now()), - Log: logger.WithTaskAndPeerID(task.ID, id), + Log: logger.WithPeer(host.ID, task.ID, id), } // Initialize state machine. diff --git a/scheduler/resource/peer_manager.go b/scheduler/resource/peer_manager.go index 7ecc466c1..a2df33d2a 100644 --- a/scheduler/resource/peer_manager.go +++ b/scheduler/resource/peer_manager.go @@ -141,24 +141,23 @@ func (p *peerManager) RunGC() error { if elapsed > p.ttl { // If the peer is not leave, // first change the state to PeerEventLeave. + peer.Log.Info("peer elapsed exceeds the ttl, causing the peer to leave") if err := peer.FSM.Event(PeerEventLeave); err != nil { peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return true } - peer.Log.Info("peer elapsed exceeds the ttl, causing the peer to leave") return true } // If the peer's state is PeerStateFailed, // first set the peer state to PeerStateLeave and then delete peer. if peer.FSM.Is(PeerStateFailed) { + peer.Log.Info("peer state is PeerStateFailed, causing the peer to leave") if err := peer.FSM.Event(PeerEventLeave); err != nil { peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return true } - - peer.Log.Info("peer state is PeerStateFailed, causing the peer to leave") } // If no peer exists in the dag of the task, @@ -175,12 +174,12 @@ func (p *peerManager) RunGC() error { // PeerStateSucceeded, and degree is zero. if peer.Task.PeerCount() > PeerCountLimitForTask && peer.FSM.Is(PeerStateSucceeded) && degree == 0 { + peer.Log.Info("task dag size exceeds the limit, causing the peer to leave") if err := peer.FSM.Event(PeerEventLeave); err != nil { peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return true } - peer.Log.Info("task dag size exceeds the limit, causing the peer to leave") return true } diff --git a/scheduler/resource/task.go b/scheduler/resource/task.go index 9b69b8473..9383891f7 100644 --- a/scheduler/resource/task.go +++ b/scheduler/resource/task.go @@ -158,7 +158,7 @@ func NewTask(id, url string, taskType commonv1.TaskType, meta *commonv1.UrlMeta, PeerFailedCount: atomic.NewInt32(0), CreateAt: atomic.NewTime(time.Now()), UpdateAt: atomic.NewTime(time.Now()), - Log: logger.WithTaskIDAndURL(id, url), + Log: logger.WithTask(id, url), } // Initialize state machine. diff --git a/scheduler/service/service.go b/scheduler/service/service.go index 93492b4b5..a7cbacd8b 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -80,7 +80,7 @@ func New( // RegisterPeerTask registers peer and triggers seed peer download task. func (s *Service) RegisterPeerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest) (*schedulerv1.RegisterResult, error) { - logger.WithTaskAndPeerID(req.TaskId, req.PeerId).Infof("register peer task request: %#v %#v %#v", + logger.WithPeer(req.PeerHost.Id, req.TaskId, req.PeerId).Infof("register peer task request: %#v %#v %#v", req, req.UrlMeta, req.HostLoad) // Register task and trigger seed peer download task. task, needBackToSource := s.registerTask(ctx, req) @@ -402,7 +402,7 @@ func (s *Service) LeaveTask(ctx context.Context, req *schedulerv1.PeerTarget) er } metrics.LeaveTaskCount.WithLabelValues(peer.Tag, peer.Application).Inc() - peer.Log.Infof("leave peer: %#v", req) + peer.Log.Infof("client releases peer, causing the peer to leave: %#v", req) if err := peer.FSM.Event(resource.PeerEventLeave); err != nil { metrics.LeaveTaskFailureCount.WithLabelValues(peer.Tag, peer.Application).Inc() @@ -812,6 +812,7 @@ func (s *Service) handlePeerFail(ctx context.Context, peer *resource.Peer) { // handleLegacySeedPeer handles seed server's task has left, // but did not notify the scheduler to leave the task. func (s *Service) handleLegacySeedPeer(ctx context.Context, peer *resource.Peer) { + peer.Log.Info("peer is legacy seed peer, causing the peer to leave") if err := peer.FSM.Event(resource.PeerEventLeave); err != nil { peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return @@ -822,8 +823,6 @@ func (s *Service) handleLegacySeedPeer(ctx context.Context, peer *resource.Peer) child.Log.Infof("reschedule parent because of parent peer %s is failed", peer.ID) s.scheduler.ScheduleParent(ctx, child, child.BlockParents) } - - s.resource.PeerManager().Delete(peer.ID) } // Conditions for the task to switch to the TaskStateSucceeded are: