feat: optimize peer log (#1828)
Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
parent
ba58047871
commit
42223d994f
|
|
@ -123,9 +123,21 @@ func With(args ...any) *SugaredLoggerOnWith {
|
|||
}
|
||||
}
|
||||
|
||||
func WithHostID(hostID string) *SugaredLoggerOnWith {
|
||||
func WithPeer(hostID, taskID, peerID string) *SugaredLoggerOnWith {
|
||||
return &SugaredLoggerOnWith{
|
||||
withArgs: []any{"hostID", hostID},
|
||||
withArgs: []any{"hostID", hostID, "taskID", taskID, "peerID", peerID},
|
||||
}
|
||||
}
|
||||
|
||||
func WithTask(taskID, url string) *SugaredLoggerOnWith {
|
||||
return &SugaredLoggerOnWith{
|
||||
withArgs: []any{"taskID", taskID, "url", url},
|
||||
}
|
||||
}
|
||||
|
||||
func WithHost(hostID, hostname, ip string) *SugaredLoggerOnWith {
|
||||
return &SugaredLoggerOnWith{
|
||||
withArgs: []any{"hostID", hostID, "hostname", hostname, "ip", ip},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -141,12 +153,6 @@ func WithTaskAndPeerID(taskID, peerID string) *SugaredLoggerOnWith {
|
|||
}
|
||||
}
|
||||
|
||||
func WithTaskIDAndURL(taskID, url string) *SugaredLoggerOnWith {
|
||||
return &SugaredLoggerOnWith{
|
||||
withArgs: []any{"taskID", taskID, "url", url},
|
||||
}
|
||||
}
|
||||
|
||||
func WithHostnameAndIP(hostname, ip string) *SugaredLoggerOnWith {
|
||||
return &SugaredLoggerOnWith{
|
||||
withArgs: []any{"hostname", hostname, "ip", ip},
|
||||
|
|
|
|||
|
|
@ -171,7 +171,7 @@ func (j *job) preheat(ctx context.Context, req string) error {
|
|||
|
||||
// Trigger seed peer download seeds.
|
||||
taskID := idgen.TaskID(preheat.URL, urlMeta)
|
||||
log := logger.WithTaskIDAndURL(taskID, preheat.URL)
|
||||
log := logger.WithTask(taskID, preheat.URL)
|
||||
log.Infof("preheat %s headers: %#v, tag: %s, range: %s, filter: %s, digest: %s",
|
||||
preheat.URL, urlMeta.Header, urlMeta.Tag, urlMeta.Range, urlMeta.Filter, urlMeta.Digest)
|
||||
stream, err := j.resource.SeedPeer().Client().ObtainSeeds(ctx, &cdnsystemv1.SeedRequest{
|
||||
|
|
|
|||
|
|
@ -145,7 +145,7 @@ func NewHost(rawHost *schedulerv1.PeerHost, options ...HostOption) *Host {
|
|||
PeerCount: atomic.NewInt32(0),
|
||||
CreateAt: atomic.NewTime(time.Now()),
|
||||
UpdateAt: atomic.NewTime(time.Now()),
|
||||
Log: logger.WithHostID(rawHost.Id),
|
||||
Log: logger.WithHost(rawHost.Id, rawHost.HostName, rawHost.Ip),
|
||||
}
|
||||
|
||||
for _, opt := range options {
|
||||
|
|
@ -187,12 +187,12 @@ func (h *Host) LeavePeers() {
|
|||
return true
|
||||
}
|
||||
|
||||
peer.Log.Info("host leaves peers, causing the peer to leave")
|
||||
if err := peer.FSM.Event(PeerEventLeave); err != nil {
|
||||
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
|
||||
return true
|
||||
}
|
||||
|
||||
h.Log.Infof("peer %s has been left", peer.ID)
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -202,7 +202,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer {
|
|||
IsBackToSource: atomic.NewBool(false),
|
||||
CreateAt: atomic.NewTime(time.Now()),
|
||||
UpdateAt: atomic.NewTime(time.Now()),
|
||||
Log: logger.WithTaskAndPeerID(task.ID, id),
|
||||
Log: logger.WithPeer(host.ID, task.ID, id),
|
||||
}
|
||||
|
||||
// Initialize state machine.
|
||||
|
|
|
|||
|
|
@ -141,24 +141,23 @@ func (p *peerManager) RunGC() error {
|
|||
if elapsed > p.ttl {
|
||||
// If the peer is not leave,
|
||||
// first change the state to PeerEventLeave.
|
||||
peer.Log.Info("peer elapsed exceeds the ttl, causing the peer to leave")
|
||||
if err := peer.FSM.Event(PeerEventLeave); err != nil {
|
||||
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
|
||||
return true
|
||||
}
|
||||
|
||||
peer.Log.Info("peer elapsed exceeds the ttl, causing the peer to leave")
|
||||
return true
|
||||
}
|
||||
|
||||
// If the peer's state is PeerStateFailed,
|
||||
// first set the peer state to PeerStateLeave and then delete peer.
|
||||
if peer.FSM.Is(PeerStateFailed) {
|
||||
peer.Log.Info("peer state is PeerStateFailed, causing the peer to leave")
|
||||
if err := peer.FSM.Event(PeerEventLeave); err != nil {
|
||||
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
|
||||
return true
|
||||
}
|
||||
|
||||
peer.Log.Info("peer state is PeerStateFailed, causing the peer to leave")
|
||||
}
|
||||
|
||||
// If no peer exists in the dag of the task,
|
||||
|
|
@ -175,12 +174,12 @@ func (p *peerManager) RunGC() error {
|
|||
// PeerStateSucceeded, and degree is zero.
|
||||
if peer.Task.PeerCount() > PeerCountLimitForTask &&
|
||||
peer.FSM.Is(PeerStateSucceeded) && degree == 0 {
|
||||
peer.Log.Info("task dag size exceeds the limit, causing the peer to leave")
|
||||
if err := peer.FSM.Event(PeerEventLeave); err != nil {
|
||||
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
|
||||
return true
|
||||
}
|
||||
|
||||
peer.Log.Info("task dag size exceeds the limit, causing the peer to leave")
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -158,7 +158,7 @@ func NewTask(id, url string, taskType commonv1.TaskType, meta *commonv1.UrlMeta,
|
|||
PeerFailedCount: atomic.NewInt32(0),
|
||||
CreateAt: atomic.NewTime(time.Now()),
|
||||
UpdateAt: atomic.NewTime(time.Now()),
|
||||
Log: logger.WithTaskIDAndURL(id, url),
|
||||
Log: logger.WithTask(id, url),
|
||||
}
|
||||
|
||||
// Initialize state machine.
|
||||
|
|
|
|||
|
|
@ -80,7 +80,7 @@ func New(
|
|||
|
||||
// RegisterPeerTask registers peer and triggers seed peer download task.
|
||||
func (s *Service) RegisterPeerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest) (*schedulerv1.RegisterResult, error) {
|
||||
logger.WithTaskAndPeerID(req.TaskId, req.PeerId).Infof("register peer task request: %#v %#v %#v",
|
||||
logger.WithPeer(req.PeerHost.Id, req.TaskId, req.PeerId).Infof("register peer task request: %#v %#v %#v",
|
||||
req, req.UrlMeta, req.HostLoad)
|
||||
// Register task and trigger seed peer download task.
|
||||
task, needBackToSource := s.registerTask(ctx, req)
|
||||
|
|
@ -402,7 +402,7 @@ func (s *Service) LeaveTask(ctx context.Context, req *schedulerv1.PeerTarget) er
|
|||
}
|
||||
metrics.LeaveTaskCount.WithLabelValues(peer.Tag, peer.Application).Inc()
|
||||
|
||||
peer.Log.Infof("leave peer: %#v", req)
|
||||
peer.Log.Infof("client releases peer, causing the peer to leave: %#v", req)
|
||||
if err := peer.FSM.Event(resource.PeerEventLeave); err != nil {
|
||||
metrics.LeaveTaskFailureCount.WithLabelValues(peer.Tag, peer.Application).Inc()
|
||||
|
||||
|
|
@ -812,6 +812,7 @@ func (s *Service) handlePeerFail(ctx context.Context, peer *resource.Peer) {
|
|||
// handleLegacySeedPeer handles seed server's task has left,
|
||||
// but did not notify the scheduler to leave the task.
|
||||
func (s *Service) handleLegacySeedPeer(ctx context.Context, peer *resource.Peer) {
|
||||
peer.Log.Info("peer is legacy seed peer, causing the peer to leave")
|
||||
if err := peer.FSM.Event(resource.PeerEventLeave); err != nil {
|
||||
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
|
||||
return
|
||||
|
|
@ -822,8 +823,6 @@ func (s *Service) handleLegacySeedPeer(ctx context.Context, peer *resource.Peer)
|
|||
child.Log.Infof("reschedule parent because of parent peer %s is failed", peer.ID)
|
||||
s.scheduler.ScheduleParent(ctx, child, child.BlockParents)
|
||||
}
|
||||
|
||||
s.resource.PeerManager().Delete(peer.ID)
|
||||
}
|
||||
|
||||
// Conditions for the task to switch to the TaskStateSucceeded are:
|
||||
|
|
|
|||
Loading…
Reference in New Issue