From fe7b5fe0b4071fa52eff713a534e8487d23f28dc Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 15 Apr 2022 14:50:05 +0800 Subject: [PATCH] fix: error message (#1255) * fix: error message Signed-off-by: Gaius --- scheduler/job/job.go | 24 +++++------ scheduler/resource/host.go | 4 +- scheduler/resource/peer_manager.go | 2 +- scheduler/resource/task.go | 4 +- scheduler/scheduler.go | 16 +++---- scheduler/scheduler/scheduler.go | 8 ++-- scheduler/service/service.go | 68 ++++++++++++++++-------------- 7 files changed, 66 insertions(+), 60 deletions(-) diff --git a/scheduler/job/job.go b/scheduler/job/job.go index 4514927ae..4013d7034 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -57,27 +57,27 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) { globalJob, err := internaljob.New(redisConfig, internaljob.GlobalQueue) if err != nil { - logger.Errorf("create global job queue error: %v", err) + logger.Errorf("create global job queue error: %s", err.Error()) return nil, err } logger.Infof("create global job queue: %v", globalJob) schedulerJob, err := internaljob.New(redisConfig, internaljob.SchedulersQueue) if err != nil { - logger.Errorf("create scheduler job queue error: %v", err) + logger.Errorf("create scheduler job queue error: %s", err.Error()) return nil, err } logger.Infof("create scheduler job queue: %v", schedulerJob) localQueue, err := internaljob.GetSchedulerQueue(cfg.Manager.SchedulerClusterID, cfg.Server.Host) if err != nil { - logger.Errorf("get local job queue name error: %v", err) + logger.Errorf("get local job queue name error: %s", err.Error()) return nil, err } localJob, err := internaljob.New(redisConfig, localQueue) if err != nil { - logger.Errorf("create local job queue error: %v", err) + logger.Errorf("create local job queue error: %s", err.Error()) return nil, err } logger.Infof("create local job queue: %v", localQueue) @@ -95,7 +95,7 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) { } if err := localJob.RegisterJob(namedJobFuncs); err != nil { - logger.Errorf("register preheat job to local queue error: %v", err) + logger.Errorf("register preheat job to local queue error: %s", err.Error()) return nil, err } @@ -106,21 +106,21 @@ func (j *job) Serve() { go func() { logger.Infof("ready to launch %d worker(s) on global queue", j.config.Job.GlobalWorkerNum) if err := j.globalJob.LaunchWorker("global_worker", int(j.config.Job.GlobalWorkerNum)); err != nil { - logger.Fatalf("global queue worker error: %v", err) + logger.Fatalf("global queue worker error: %s", err.Error()) } }() go func() { logger.Infof("ready to launch %d worker(s) on scheduler queue", j.config.Job.SchedulerWorkerNum) if err := j.schedulerJob.LaunchWorker("scheduler_worker", int(j.config.Job.SchedulerWorkerNum)); err != nil { - logger.Fatalf("scheduler queue worker error: %v", err) + logger.Fatalf("scheduler queue worker error: %s", err.Error()) } }() go func() { logger.Infof("ready to launch %d worker(s) on local queue", j.config.Job.LocalWorkerNum) if err := j.localJob.LaunchWorker("local_worker", int(j.config.Job.LocalWorkerNum)); err != nil { - logger.Fatalf("scheduler queue worker error: %v", err) + logger.Fatalf("scheduler queue worker error: %s", err.Error()) } }() } @@ -138,12 +138,12 @@ func (j *job) preheat(ctx context.Context, req string) error { request := &internaljob.PreheatRequest{} if err := internaljob.UnmarshalRequest(req, request); err != nil { - logger.Errorf("unmarshal request err: %v, request body: %s", err, req) + logger.Errorf("unmarshal request err: %s, request body: %s", err.Error(), req) return err } if err := validator.New().Struct(request); err != nil { - logger.Errorf("url %s validate failed: %v", request.URL, err) + logger.Errorf("url %s validate failed: %s", request.URL, err.Error()) return err } @@ -172,14 +172,14 @@ func (j *job) preheat(ctx context.Context, req string) error { UrlMeta: urlMeta, }) if err != nil { - log.Errorf("preheat failed: %v", err) + log.Errorf("preheat failed: %s", err.Error()) return err } for { piece, err := stream.Recv() if err != nil { - log.Errorf("preheat recive piece failed: %v", err) + log.Errorf("preheat recive piece failed: %s", err.Error()) return err } diff --git a/scheduler/resource/host.go b/scheduler/resource/host.go index 6f9b581d6..62c78540a 100644 --- a/scheduler/resource/host.go +++ b/scheduler/resource/host.go @@ -170,12 +170,12 @@ func (h *Host) LeavePeers() { h.Peers.Range(func(_, value interface{}) bool { if peer, ok := value.(*Peer); ok { if err := peer.FSM.Event(PeerEventDownloadFailed); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return true } if err := peer.FSM.Event(PeerEventLeave); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return true } diff --git a/scheduler/resource/peer_manager.go b/scheduler/resource/peer_manager.go index cb157d4c1..ad8e3666b 100644 --- a/scheduler/resource/peer_manager.go +++ b/scheduler/resource/peer_manager.go @@ -141,7 +141,7 @@ func (p *peerManager) RunGC() error { // If the peer is not leave, // first change the state to PeerEventLeave if err := peer.FSM.Event(PeerEventLeave); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) } peer.Log.Info("gc causes the peer to leave") diff --git a/scheduler/resource/task.go b/scheduler/resource/task.go index 24a9d1bff..0579ba3b9 100644 --- a/scheduler/resource/task.go +++ b/scheduler/resource/task.go @@ -337,13 +337,13 @@ func (t *Task) NotifyPeers(code base.Code, event string) { } if err := stream.Send(&rpcscheduler.PeerPacket{Code: code}); err != nil { - t.Log.Errorf("send packet to peer %s failed: %v", peer.ID, err) + t.Log.Errorf("send packet to peer %s failed: %s", peer.ID, err.Error()) return true } t.Log.Infof("task notify peer %s code %s", peer.ID, code) if err := peer.FSM.Event(event); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return true } } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 0bf1b4b13..6d7f21b54 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -90,7 +90,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err Location: s.config.Host.Location, SchedulerClusterId: uint64(s.config.Manager.SchedulerClusterID), }); err != nil { - logger.Fatalf("register to manager failed %v", err) + logger.Fatalf("register to manager failed %s", err.Error()) } // Initialize dynconfig client @@ -165,7 +165,7 @@ func (s *Server) Serve() error { // Serve dynConfig go func() { if err := s.dynconfig.Serve(); err != nil { - logger.Fatalf("dynconfig start failed %v", err) + logger.Fatalf("dynconfig start failed %s", err.Error()) } logger.Info("dynconfig start successfully") }() @@ -188,7 +188,7 @@ func (s *Server) Serve() error { if err == http.ErrServerClosed { return } - logger.Fatalf("metrics server closed unexpect: %v", err) + logger.Fatalf("metrics server closed unexpect: %s", err.Error()) } }() } @@ -208,14 +208,14 @@ func (s *Server) Serve() error { // Generate GRPC limit listener listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", s.config.Server.Listen, s.config.Server.Port)) if err != nil { - logger.Fatalf("net listener failed to start: %v", err) + logger.Fatalf("net listener failed to start: %s", err.Error()) } defer listener.Close() // Started GRPC server logger.Infof("started grpc server at %s://%s", listener.Addr().Network(), listener.Addr().String()) if err := s.grpcServer.Serve(listener); err != nil { - logger.Errorf("stoped grpc server: %v", err) + logger.Errorf("stoped grpc server: %s", err.Error()) return err } @@ -225,14 +225,14 @@ func (s *Server) Serve() error { func (s *Server) Stop() { // Stop dynconfig server if err := s.dynconfig.Stop(); err != nil { - logger.Errorf("dynconfig client closed failed %v", err) + logger.Errorf("dynconfig client closed failed %s", err.Error()) } logger.Info("dynconfig client closed") // Stop manager client if s.managerClient != nil { if err := s.managerClient.Close(); err != nil { - logger.Errorf("manager client failed to stop: %v", err) + logger.Errorf("manager client failed to stop: %s", err.Error()) } logger.Info("manager client closed") } @@ -244,7 +244,7 @@ func (s *Server) Stop() { // Stop metrics server if s.metricsServer != nil { if err := s.metricsServer.Shutdown(context.Background()); err != nil { - logger.Errorf("metrics server failed to stop: %v", err) + logger.Errorf("metrics server failed to stop: %s", err.Error()) } logger.Info("metrics server closed under request") } diff --git a/scheduler/scheduler/scheduler.go b/scheduler/scheduler/scheduler.go index 9f4328705..c4e4005a8 100644 --- a/scheduler/scheduler/scheduler.go +++ b/scheduler/scheduler/scheduler.go @@ -92,14 +92,14 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo // Notify peer back-to-source if err := stream.Send(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource}); err != nil { - peer.Log.Errorf("send packet failed: %v", err) + peer.Log.Errorf("send packet failed: %s", err.Error()) return } peer.Log.Infof("peer scheduling %d times, peer downloads back-to-source %d", n, base.Code_SchedNeedBackSource) if err := peer.FSM.Event(resource.PeerEventDownloadFromBackToSource); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return } @@ -107,7 +107,7 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo // peer back-to-source and reset task state to TaskStateRunning if peer.Task.FSM.Is(resource.TaskStateFailed) { if err := peer.Task.FSM.Event(resource.TaskEventDownload); err != nil { - peer.Task.Log.Errorf("task fsm event failed: %v", err) + peer.Task.Log.Errorf("task fsm event failed: %s", err.Error()) return } } @@ -125,7 +125,7 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo // Notify peer schedule failed if err := stream.Send(&rpcscheduler.PeerPacket{Code: base.Code_SchedTaskStatusError}); err != nil { - peer.Log.Errorf("send packet failed: %v", err) + peer.Log.Errorf("send packet failed: %s", err.Error()) return } peer.Log.Errorf("peer scheduling exceeds the limit %d times and return code %d", s.config.RetryLimit, base.Code_SchedTaskStatusError) diff --git a/scheduler/service/service.go b/scheduler/service/service.go index e3780b5c4..62811c50b 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -79,7 +79,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa // Register task and trigger cdn download task task, err := s.registerTask(ctx, req) if err != nil { - msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) + msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) logger.Error(msg) return nil, dferrors.New(base.Code_SchedTaskStatusError, msg) } @@ -96,7 +96,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa peer.Log.Info("task size scope is tiny and return piece content directly") if len(task.DirectPiece) > 0 && int64(len(task.DirectPiece)) == task.ContentLength.Load() { if err := peer.FSM.Event(resource.PeerEventRegisterTiny); err != nil { - msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) + msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) peer.Log.Error(msg) return nil, dferrors.New(base.Code_SchedError, msg) } @@ -121,7 +121,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa if !ok { peer.Log.Warn("task size scope is small and it can not select parent") if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { - msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) + msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) peer.Log.Error(msg) return nil, dferrors.New(base.Code_SchedError, msg) } @@ -137,7 +137,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa if !parent.FSM.Is(resource.PeerStateSucceeded) { peer.Log.Infof("task size scope is small and download state %s is not PeerStateSucceeded", parent.FSM.Current()) if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { - msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) + msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) peer.Log.Error(msg) return nil, dferrors.New(base.Code_SchedError, msg) } @@ -152,7 +152,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa if !ok { peer.Log.Warn("task size scope is small and it can not get first piece") if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { - msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) + msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) peer.Log.Error(msg) return nil, dferrors.New(base.Code_SchedError, msg) } @@ -165,7 +165,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa peer.ReplaceParent(parent) if err := peer.FSM.Event(resource.PeerEventRegisterSmall); err != nil { - msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) + msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) peer.Log.Error(msg) return nil, dferrors.New(base.Code_SchedError, msg) } @@ -194,7 +194,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa default: peer.Log.Info("task size scope is normal and needs to be register") if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { - msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) + msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) peer.Log.Error(msg) return nil, dferrors.New(base.Code_SchedError, msg) } @@ -209,7 +209,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa // Task is unsuccessful peer.Log.Infof("task state is %s and needs to be register", task.FSM.Current()) if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { - msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) + msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) peer.Log.Error(msg) return nil, dferrors.New(base.Code_SchedError, msg) } @@ -242,7 +242,7 @@ func (s *Service) ReportPieceResult(stream rpcscheduler.Scheduler_ReportPieceRes if err == io.EOF { return nil } - logger.Errorf("receive piece %#v error: %v", piece, err) + logger.Errorf("receive piece %#v error: %s", piece, err.Error()) return err } @@ -406,7 +406,7 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa if !task.FSM.Is(resource.TaskStateSucceeded) { if task.FSM.Is(resource.TaskStatePending) { if err := task.FSM.Event(resource.TaskEventDownload); err != nil { - msg := fmt.Sprintf("task fsm event failed: %v", err) + msg := fmt.Sprintf("task fsm event failed: %s", err.Error()) task.Log.Error(msg) return dferrors.New(base.Code_SchedError, msg) } @@ -414,7 +414,7 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa if task.FSM.Is(resource.TaskStateFailed) { if err := task.FSM.Event(resource.TaskEventDownload); err != nil { - msg := fmt.Sprintf("task fsm event failed: %v", err) + msg := fmt.Sprintf("task fsm event failed: %s", err.Error()) task.Log.Error(msg) return dferrors.New(base.Code_SchedError, msg) } @@ -444,7 +444,7 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa if !peer.FSM.Is(resource.PeerStateSucceeded) { if peer.FSM.Is(resource.PeerStatePending) { if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { - msg := fmt.Sprintf("peer fsm event failed: %v", err) + msg := fmt.Sprintf("peer fsm event failed: %s", err.Error()) task.Log.Error(msg) return dferrors.New(base.Code_SchedError, msg) } @@ -454,7 +454,7 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa peer.FSM.Is(resource.PeerStateReceivedSmall) || peer.FSM.Is(resource.PeerStateReceivedNormal) { if err := peer.FSM.Event(resource.PeerEventDownload); err != nil { - msg := fmt.Sprintf("peer fsm event failed: %v", err) + msg := fmt.Sprintf("peer fsm event failed: %s", err.Error()) task.Log.Error(msg) return dferrors.New(base.Code_SchedError, msg) } @@ -481,7 +481,7 @@ func (s *Service) LeaveTask(ctx context.Context, req *rpcscheduler.PeerTarget) e if err := peer.FSM.Event(resource.PeerEventLeave); err != nil { metrics.LeaveTaskFailureCount.WithLabelValues(peer.BizTag).Inc() - msg := fmt.Sprintf("peer fsm event failed: %v", err) + msg := fmt.Sprintf("peer fsm event failed: %s", err.Error()) peer.Log.Error(msg) return dferrors.New(base.Code_SchedTaskStatusError, msg) } @@ -567,7 +567,7 @@ func (s *Service) triggerCDNTask(ctx context.Context, task *resource.Task) { peer, endOfPiece, err := s.resource.CDN().TriggerTask( trace.ContextWithSpanContext(context.Background(), trace.SpanContextFromContext(ctx)), task) if err != nil { - task.Log.Errorf("trigger cdn download task failed: %v", err) + task.Log.Errorf("trigger cdn download task failed: %s", err.Error()) s.handleTaskFail(ctx, task) return } @@ -590,7 +590,7 @@ func (s *Service) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) { // the peer has already returned to piece data when registering peer.Log.Info("file type is tiny, peer has already returned to piece data when registering") if err := peer.FSM.Event(resource.PeerEventDownload); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return } case resource.PeerStateReceivedSmall: @@ -598,12 +598,12 @@ func (s *Service) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) { // the peer has already returned to the parent when registering peer.Log.Info("file type is small, peer has already returned to the parent when registering") if err := peer.FSM.Event(resource.PeerEventDownload); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return } case resource.PeerStateReceivedNormal: if err := peer.FSM.Event(resource.PeerEventDownload); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return } @@ -651,7 +651,7 @@ func (s *Service) handlePieceFail(ctx context.Context, peer *resource.Peer, piec switch piece.Code { case base.Code_PeerTaskNotFound, base.Code_CDNError, base.Code_CDNTaskDownloadFail: if err := parent.FSM.Event(resource.PeerEventDownloadFailed); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) break } case base.Code_ClientPieceNotFound: @@ -688,28 +688,34 @@ func (s *Service) handlePieceFail(ctx context.Context, peer *resource.Peer, piec // handlePeerSuccess handles successful peer func (s *Service) handlePeerSuccess(ctx context.Context, peer *resource.Peer) { + if err := peer.FSM.Event(resource.PeerEventDownloadSucceeded); err != nil { + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) + return + } + // If the peer type is tiny and back-to-source, // it need to directly download the tiny file and store the data in task DirectPiece if peer.Task.SizeScope() == base.SizeScope_TINY && len(peer.Task.DirectPiece) == 0 { data, err := peer.DownloadTinyFile() - if err == nil && len(data) == int(peer.Task.ContentLength.Load()) { - // Tiny file downloaded successfully - peer.Task.DirectPiece = data - } else { - peer.Log.Warnf("download tiny file length is %d, task content length is %d, downloading is failed: %v", len(data), peer.Task.ContentLength.Load(), err) + if err != nil { + peer.Log.Errorf("download tiny task failed: %s", err.Error()) + return } - } - if err := peer.FSM.Event(resource.PeerEventDownloadSucceeded); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) - return + if len(data) != int(peer.Task.ContentLength.Load()) { + peer.Log.Errorf("download tiny task length of data is %d, task content length is %d", len(data), peer.Task.ContentLength.Load()) + return + } + + // Tiny file downloaded successfully + peer.Task.DirectPiece = data } } // handlePeerFail handles failed peer func (s *Service) handlePeerFail(ctx context.Context, peer *resource.Peer) { if err := peer.FSM.Event(resource.PeerEventDownloadFailed); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return } @@ -736,7 +742,7 @@ func (s *Service) handleTaskSuccess(ctx context.Context, task *resource.Task, re } if err := task.FSM.Event(resource.TaskEventDownloadSucceeded); err != nil { - task.Log.Errorf("task fsm event failed: %v", err) + task.Log.Errorf("task fsm event failed: %s", err.Error()) return } @@ -761,7 +767,7 @@ func (s *Service) handleTaskFail(ctx context.Context, task *resource.Task) { } if err := task.FSM.Event(resource.TaskEventDownloadFailed); err != nil { - task.Log.Errorf("task fsm event failed: %v", err) + task.Log.Errorf("task fsm event failed: %s", err.Error()) return } }