fix: error message (#1255)

* fix: error message

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-04-15 14:50:05 +08:00
parent 9c539fd425
commit fe7b5fe0b4
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
7 changed files with 66 additions and 60 deletions

View File

@ -57,27 +57,27 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) {
globalJob, err := internaljob.New(redisConfig, internaljob.GlobalQueue) globalJob, err := internaljob.New(redisConfig, internaljob.GlobalQueue)
if err != nil { if err != nil {
logger.Errorf("create global job queue error: %v", err) logger.Errorf("create global job queue error: %s", err.Error())
return nil, err return nil, err
} }
logger.Infof("create global job queue: %v", globalJob) logger.Infof("create global job queue: %v", globalJob)
schedulerJob, err := internaljob.New(redisConfig, internaljob.SchedulersQueue) schedulerJob, err := internaljob.New(redisConfig, internaljob.SchedulersQueue)
if err != nil { if err != nil {
logger.Errorf("create scheduler job queue error: %v", err) logger.Errorf("create scheduler job queue error: %s", err.Error())
return nil, err return nil, err
} }
logger.Infof("create scheduler job queue: %v", schedulerJob) logger.Infof("create scheduler job queue: %v", schedulerJob)
localQueue, err := internaljob.GetSchedulerQueue(cfg.Manager.SchedulerClusterID, cfg.Server.Host) localQueue, err := internaljob.GetSchedulerQueue(cfg.Manager.SchedulerClusterID, cfg.Server.Host)
if err != nil { if err != nil {
logger.Errorf("get local job queue name error: %v", err) logger.Errorf("get local job queue name error: %s", err.Error())
return nil, err return nil, err
} }
localJob, err := internaljob.New(redisConfig, localQueue) localJob, err := internaljob.New(redisConfig, localQueue)
if err != nil { if err != nil {
logger.Errorf("create local job queue error: %v", err) logger.Errorf("create local job queue error: %s", err.Error())
return nil, err return nil, err
} }
logger.Infof("create local job queue: %v", localQueue) logger.Infof("create local job queue: %v", localQueue)
@ -95,7 +95,7 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) {
} }
if err := localJob.RegisterJob(namedJobFuncs); err != nil { if err := localJob.RegisterJob(namedJobFuncs); err != nil {
logger.Errorf("register preheat job to local queue error: %v", err) logger.Errorf("register preheat job to local queue error: %s", err.Error())
return nil, err return nil, err
} }
@ -106,21 +106,21 @@ func (j *job) Serve() {
go func() { go func() {
logger.Infof("ready to launch %d worker(s) on global queue", j.config.Job.GlobalWorkerNum) logger.Infof("ready to launch %d worker(s) on global queue", j.config.Job.GlobalWorkerNum)
if err := j.globalJob.LaunchWorker("global_worker", int(j.config.Job.GlobalWorkerNum)); err != nil { if err := j.globalJob.LaunchWorker("global_worker", int(j.config.Job.GlobalWorkerNum)); err != nil {
logger.Fatalf("global queue worker error: %v", err) logger.Fatalf("global queue worker error: %s", err.Error())
} }
}() }()
go func() { go func() {
logger.Infof("ready to launch %d worker(s) on scheduler queue", j.config.Job.SchedulerWorkerNum) logger.Infof("ready to launch %d worker(s) on scheduler queue", j.config.Job.SchedulerWorkerNum)
if err := j.schedulerJob.LaunchWorker("scheduler_worker", int(j.config.Job.SchedulerWorkerNum)); err != nil { if err := j.schedulerJob.LaunchWorker("scheduler_worker", int(j.config.Job.SchedulerWorkerNum)); err != nil {
logger.Fatalf("scheduler queue worker error: %v", err) logger.Fatalf("scheduler queue worker error: %s", err.Error())
} }
}() }()
go func() { go func() {
logger.Infof("ready to launch %d worker(s) on local queue", j.config.Job.LocalWorkerNum) logger.Infof("ready to launch %d worker(s) on local queue", j.config.Job.LocalWorkerNum)
if err := j.localJob.LaunchWorker("local_worker", int(j.config.Job.LocalWorkerNum)); err != nil { if err := j.localJob.LaunchWorker("local_worker", int(j.config.Job.LocalWorkerNum)); err != nil {
logger.Fatalf("scheduler queue worker error: %v", err) logger.Fatalf("scheduler queue worker error: %s", err.Error())
} }
}() }()
} }
@ -138,12 +138,12 @@ func (j *job) preheat(ctx context.Context, req string) error {
request := &internaljob.PreheatRequest{} request := &internaljob.PreheatRequest{}
if err := internaljob.UnmarshalRequest(req, request); err != nil { if err := internaljob.UnmarshalRequest(req, request); err != nil {
logger.Errorf("unmarshal request err: %v, request body: %s", err, req) logger.Errorf("unmarshal request err: %s, request body: %s", err.Error(), req)
return err return err
} }
if err := validator.New().Struct(request); err != nil { if err := validator.New().Struct(request); err != nil {
logger.Errorf("url %s validate failed: %v", request.URL, err) logger.Errorf("url %s validate failed: %s", request.URL, err.Error())
return err return err
} }
@ -172,14 +172,14 @@ func (j *job) preheat(ctx context.Context, req string) error {
UrlMeta: urlMeta, UrlMeta: urlMeta,
}) })
if err != nil { if err != nil {
log.Errorf("preheat failed: %v", err) log.Errorf("preheat failed: %s", err.Error())
return err return err
} }
for { for {
piece, err := stream.Recv() piece, err := stream.Recv()
if err != nil { if err != nil {
log.Errorf("preheat recive piece failed: %v", err) log.Errorf("preheat recive piece failed: %s", err.Error())
return err return err
} }

View File

@ -170,12 +170,12 @@ func (h *Host) LeavePeers() {
h.Peers.Range(func(_, value interface{}) bool { h.Peers.Range(func(_, value interface{}) bool {
if peer, ok := value.(*Peer); ok { if peer, ok := value.(*Peer); ok {
if err := peer.FSM.Event(PeerEventDownloadFailed); err != nil { if err := peer.FSM.Event(PeerEventDownloadFailed); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err) peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return true return true
} }
if err := peer.FSM.Event(PeerEventLeave); err != nil { if err := peer.FSM.Event(PeerEventLeave); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err) peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return true return true
} }

View File

@ -141,7 +141,7 @@ func (p *peerManager) RunGC() error {
// If the peer is not leave, // If the peer is not leave,
// first change the state to PeerEventLeave // first change the state to PeerEventLeave
if err := peer.FSM.Event(PeerEventLeave); err != nil { if err := peer.FSM.Event(PeerEventLeave); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err) peer.Log.Errorf("peer fsm event failed: %s", err.Error())
} }
peer.Log.Info("gc causes the peer to leave") peer.Log.Info("gc causes the peer to leave")

View File

@ -337,13 +337,13 @@ func (t *Task) NotifyPeers(code base.Code, event string) {
} }
if err := stream.Send(&rpcscheduler.PeerPacket{Code: code}); err != nil { if err := stream.Send(&rpcscheduler.PeerPacket{Code: code}); err != nil {
t.Log.Errorf("send packet to peer %s failed: %v", peer.ID, err) t.Log.Errorf("send packet to peer %s failed: %s", peer.ID, err.Error())
return true return true
} }
t.Log.Infof("task notify peer %s code %s", peer.ID, code) t.Log.Infof("task notify peer %s code %s", peer.ID, code)
if err := peer.FSM.Event(event); err != nil { if err := peer.FSM.Event(event); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err) peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return true return true
} }
} }

View File

@ -90,7 +90,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
Location: s.config.Host.Location, Location: s.config.Host.Location,
SchedulerClusterId: uint64(s.config.Manager.SchedulerClusterID), SchedulerClusterId: uint64(s.config.Manager.SchedulerClusterID),
}); err != nil { }); err != nil {
logger.Fatalf("register to manager failed %v", err) logger.Fatalf("register to manager failed %s", err.Error())
} }
// Initialize dynconfig client // Initialize dynconfig client
@ -165,7 +165,7 @@ func (s *Server) Serve() error {
// Serve dynConfig // Serve dynConfig
go func() { go func() {
if err := s.dynconfig.Serve(); err != nil { if err := s.dynconfig.Serve(); err != nil {
logger.Fatalf("dynconfig start failed %v", err) logger.Fatalf("dynconfig start failed %s", err.Error())
} }
logger.Info("dynconfig start successfully") logger.Info("dynconfig start successfully")
}() }()
@ -188,7 +188,7 @@ func (s *Server) Serve() error {
if err == http.ErrServerClosed { if err == http.ErrServerClosed {
return return
} }
logger.Fatalf("metrics server closed unexpect: %v", err) logger.Fatalf("metrics server closed unexpect: %s", err.Error())
} }
}() }()
} }
@ -208,14 +208,14 @@ func (s *Server) Serve() error {
// Generate GRPC limit listener // Generate GRPC limit listener
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", s.config.Server.Listen, s.config.Server.Port)) listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", s.config.Server.Listen, s.config.Server.Port))
if err != nil { if err != nil {
logger.Fatalf("net listener failed to start: %v", err) logger.Fatalf("net listener failed to start: %s", err.Error())
} }
defer listener.Close() defer listener.Close()
// Started GRPC server // Started GRPC server
logger.Infof("started grpc server at %s://%s", listener.Addr().Network(), listener.Addr().String()) logger.Infof("started grpc server at %s://%s", listener.Addr().Network(), listener.Addr().String())
if err := s.grpcServer.Serve(listener); err != nil { if err := s.grpcServer.Serve(listener); err != nil {
logger.Errorf("stoped grpc server: %v", err) logger.Errorf("stoped grpc server: %s", err.Error())
return err return err
} }
@ -225,14 +225,14 @@ func (s *Server) Serve() error {
func (s *Server) Stop() { func (s *Server) Stop() {
// Stop dynconfig server // Stop dynconfig server
if err := s.dynconfig.Stop(); err != nil { if err := s.dynconfig.Stop(); err != nil {
logger.Errorf("dynconfig client closed failed %v", err) logger.Errorf("dynconfig client closed failed %s", err.Error())
} }
logger.Info("dynconfig client closed") logger.Info("dynconfig client closed")
// Stop manager client // Stop manager client
if s.managerClient != nil { if s.managerClient != nil {
if err := s.managerClient.Close(); err != nil { if err := s.managerClient.Close(); err != nil {
logger.Errorf("manager client failed to stop: %v", err) logger.Errorf("manager client failed to stop: %s", err.Error())
} }
logger.Info("manager client closed") logger.Info("manager client closed")
} }
@ -244,7 +244,7 @@ func (s *Server) Stop() {
// Stop metrics server // Stop metrics server
if s.metricsServer != nil { if s.metricsServer != nil {
if err := s.metricsServer.Shutdown(context.Background()); err != nil { if err := s.metricsServer.Shutdown(context.Background()); err != nil {
logger.Errorf("metrics server failed to stop: %v", err) logger.Errorf("metrics server failed to stop: %s", err.Error())
} }
logger.Info("metrics server closed under request") logger.Info("metrics server closed under request")
} }

View File

@ -92,14 +92,14 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo
// Notify peer back-to-source // Notify peer back-to-source
if err := stream.Send(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource}); err != nil { if err := stream.Send(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource}); err != nil {
peer.Log.Errorf("send packet failed: %v", err) peer.Log.Errorf("send packet failed: %s", err.Error())
return return
} }
peer.Log.Infof("peer scheduling %d times, peer downloads back-to-source %d", peer.Log.Infof("peer scheduling %d times, peer downloads back-to-source %d",
n, base.Code_SchedNeedBackSource) n, base.Code_SchedNeedBackSource)
if err := peer.FSM.Event(resource.PeerEventDownloadFromBackToSource); err != nil { if err := peer.FSM.Event(resource.PeerEventDownloadFromBackToSource); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err) peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return return
} }
@ -107,7 +107,7 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo
// peer back-to-source and reset task state to TaskStateRunning // peer back-to-source and reset task state to TaskStateRunning
if peer.Task.FSM.Is(resource.TaskStateFailed) { if peer.Task.FSM.Is(resource.TaskStateFailed) {
if err := peer.Task.FSM.Event(resource.TaskEventDownload); err != nil { if err := peer.Task.FSM.Event(resource.TaskEventDownload); err != nil {
peer.Task.Log.Errorf("task fsm event failed: %v", err) peer.Task.Log.Errorf("task fsm event failed: %s", err.Error())
return return
} }
} }
@ -125,7 +125,7 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo
// Notify peer schedule failed // Notify peer schedule failed
if err := stream.Send(&rpcscheduler.PeerPacket{Code: base.Code_SchedTaskStatusError}); err != nil { if err := stream.Send(&rpcscheduler.PeerPacket{Code: base.Code_SchedTaskStatusError}); err != nil {
peer.Log.Errorf("send packet failed: %v", err) peer.Log.Errorf("send packet failed: %s", err.Error())
return return
} }
peer.Log.Errorf("peer scheduling exceeds the limit %d times and return code %d", s.config.RetryLimit, base.Code_SchedTaskStatusError) peer.Log.Errorf("peer scheduling exceeds the limit %d times and return code %d", s.config.RetryLimit, base.Code_SchedTaskStatusError)

View File

@ -79,7 +79,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
// Register task and trigger cdn download task // Register task and trigger cdn download task
task, err := s.registerTask(ctx, req) task, err := s.registerTask(ctx, req)
if err != nil { if err != nil {
msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error())
logger.Error(msg) logger.Error(msg)
return nil, dferrors.New(base.Code_SchedTaskStatusError, msg) return nil, dferrors.New(base.Code_SchedTaskStatusError, msg)
} }
@ -96,7 +96,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
peer.Log.Info("task size scope is tiny and return piece content directly") peer.Log.Info("task size scope is tiny and return piece content directly")
if len(task.DirectPiece) > 0 && int64(len(task.DirectPiece)) == task.ContentLength.Load() { if len(task.DirectPiece) > 0 && int64(len(task.DirectPiece)) == task.ContentLength.Load() {
if err := peer.FSM.Event(resource.PeerEventRegisterTiny); err != nil { if err := peer.FSM.Event(resource.PeerEventRegisterTiny); err != nil {
msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error())
peer.Log.Error(msg) peer.Log.Error(msg)
return nil, dferrors.New(base.Code_SchedError, msg) return nil, dferrors.New(base.Code_SchedError, msg)
} }
@ -121,7 +121,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
if !ok { if !ok {
peer.Log.Warn("task size scope is small and it can not select parent") peer.Log.Warn("task size scope is small and it can not select parent")
if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil {
msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error())
peer.Log.Error(msg) peer.Log.Error(msg)
return nil, dferrors.New(base.Code_SchedError, msg) return nil, dferrors.New(base.Code_SchedError, msg)
} }
@ -137,7 +137,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
if !parent.FSM.Is(resource.PeerStateSucceeded) { if !parent.FSM.Is(resource.PeerStateSucceeded) {
peer.Log.Infof("task size scope is small and download state %s is not PeerStateSucceeded", parent.FSM.Current()) peer.Log.Infof("task size scope is small and download state %s is not PeerStateSucceeded", parent.FSM.Current())
if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil {
msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error())
peer.Log.Error(msg) peer.Log.Error(msg)
return nil, dferrors.New(base.Code_SchedError, msg) return nil, dferrors.New(base.Code_SchedError, msg)
} }
@ -152,7 +152,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
if !ok { if !ok {
peer.Log.Warn("task size scope is small and it can not get first piece") peer.Log.Warn("task size scope is small and it can not get first piece")
if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil {
msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error())
peer.Log.Error(msg) peer.Log.Error(msg)
return nil, dferrors.New(base.Code_SchedError, msg) return nil, dferrors.New(base.Code_SchedError, msg)
} }
@ -165,7 +165,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
peer.ReplaceParent(parent) peer.ReplaceParent(parent)
if err := peer.FSM.Event(resource.PeerEventRegisterSmall); err != nil { if err := peer.FSM.Event(resource.PeerEventRegisterSmall); err != nil {
msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error())
peer.Log.Error(msg) peer.Log.Error(msg)
return nil, dferrors.New(base.Code_SchedError, msg) return nil, dferrors.New(base.Code_SchedError, msg)
} }
@ -194,7 +194,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
default: default:
peer.Log.Info("task size scope is normal and needs to be register") peer.Log.Info("task size scope is normal and needs to be register")
if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil {
msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error())
peer.Log.Error(msg) peer.Log.Error(msg)
return nil, dferrors.New(base.Code_SchedError, msg) return nil, dferrors.New(base.Code_SchedError, msg)
} }
@ -209,7 +209,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
// Task is unsuccessful // Task is unsuccessful
peer.Log.Infof("task state is %s and needs to be register", task.FSM.Current()) peer.Log.Infof("task state is %s and needs to be register", task.FSM.Current())
if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil {
msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error())
peer.Log.Error(msg) peer.Log.Error(msg)
return nil, dferrors.New(base.Code_SchedError, msg) return nil, dferrors.New(base.Code_SchedError, msg)
} }
@ -242,7 +242,7 @@ func (s *Service) ReportPieceResult(stream rpcscheduler.Scheduler_ReportPieceRes
if err == io.EOF { if err == io.EOF {
return nil return nil
} }
logger.Errorf("receive piece %#v error: %v", piece, err) logger.Errorf("receive piece %#v error: %s", piece, err.Error())
return err return err
} }
@ -406,7 +406,7 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa
if !task.FSM.Is(resource.TaskStateSucceeded) { if !task.FSM.Is(resource.TaskStateSucceeded) {
if task.FSM.Is(resource.TaskStatePending) { if task.FSM.Is(resource.TaskStatePending) {
if err := task.FSM.Event(resource.TaskEventDownload); err != nil { if err := task.FSM.Event(resource.TaskEventDownload); err != nil {
msg := fmt.Sprintf("task fsm event failed: %v", err) msg := fmt.Sprintf("task fsm event failed: %s", err.Error())
task.Log.Error(msg) task.Log.Error(msg)
return dferrors.New(base.Code_SchedError, msg) return dferrors.New(base.Code_SchedError, msg)
} }
@ -414,7 +414,7 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa
if task.FSM.Is(resource.TaskStateFailed) { if task.FSM.Is(resource.TaskStateFailed) {
if err := task.FSM.Event(resource.TaskEventDownload); err != nil { if err := task.FSM.Event(resource.TaskEventDownload); err != nil {
msg := fmt.Sprintf("task fsm event failed: %v", err) msg := fmt.Sprintf("task fsm event failed: %s", err.Error())
task.Log.Error(msg) task.Log.Error(msg)
return dferrors.New(base.Code_SchedError, msg) return dferrors.New(base.Code_SchedError, msg)
} }
@ -444,7 +444,7 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa
if !peer.FSM.Is(resource.PeerStateSucceeded) { if !peer.FSM.Is(resource.PeerStateSucceeded) {
if peer.FSM.Is(resource.PeerStatePending) { if peer.FSM.Is(resource.PeerStatePending) {
if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil {
msg := fmt.Sprintf("peer fsm event failed: %v", err) msg := fmt.Sprintf("peer fsm event failed: %s", err.Error())
task.Log.Error(msg) task.Log.Error(msg)
return dferrors.New(base.Code_SchedError, msg) return dferrors.New(base.Code_SchedError, msg)
} }
@ -454,7 +454,7 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa
peer.FSM.Is(resource.PeerStateReceivedSmall) || peer.FSM.Is(resource.PeerStateReceivedSmall) ||
peer.FSM.Is(resource.PeerStateReceivedNormal) { peer.FSM.Is(resource.PeerStateReceivedNormal) {
if err := peer.FSM.Event(resource.PeerEventDownload); err != nil { if err := peer.FSM.Event(resource.PeerEventDownload); err != nil {
msg := fmt.Sprintf("peer fsm event failed: %v", err) msg := fmt.Sprintf("peer fsm event failed: %s", err.Error())
task.Log.Error(msg) task.Log.Error(msg)
return dferrors.New(base.Code_SchedError, msg) return dferrors.New(base.Code_SchedError, msg)
} }
@ -481,7 +481,7 @@ func (s *Service) LeaveTask(ctx context.Context, req *rpcscheduler.PeerTarget) e
if err := peer.FSM.Event(resource.PeerEventLeave); err != nil { if err := peer.FSM.Event(resource.PeerEventLeave); err != nil {
metrics.LeaveTaskFailureCount.WithLabelValues(peer.BizTag).Inc() metrics.LeaveTaskFailureCount.WithLabelValues(peer.BizTag).Inc()
msg := fmt.Sprintf("peer fsm event failed: %v", err) msg := fmt.Sprintf("peer fsm event failed: %s", err.Error())
peer.Log.Error(msg) peer.Log.Error(msg)
return dferrors.New(base.Code_SchedTaskStatusError, msg) return dferrors.New(base.Code_SchedTaskStatusError, msg)
} }
@ -567,7 +567,7 @@ func (s *Service) triggerCDNTask(ctx context.Context, task *resource.Task) {
peer, endOfPiece, err := s.resource.CDN().TriggerTask( peer, endOfPiece, err := s.resource.CDN().TriggerTask(
trace.ContextWithSpanContext(context.Background(), trace.SpanContextFromContext(ctx)), task) trace.ContextWithSpanContext(context.Background(), trace.SpanContextFromContext(ctx)), task)
if err != nil { if err != nil {
task.Log.Errorf("trigger cdn download task failed: %v", err) task.Log.Errorf("trigger cdn download task failed: %s", err.Error())
s.handleTaskFail(ctx, task) s.handleTaskFail(ctx, task)
return return
} }
@ -590,7 +590,7 @@ func (s *Service) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) {
// the peer has already returned to piece data when registering // the peer has already returned to piece data when registering
peer.Log.Info("file type is tiny, peer has already returned to piece data when registering") peer.Log.Info("file type is tiny, peer has already returned to piece data when registering")
if err := peer.FSM.Event(resource.PeerEventDownload); err != nil { if err := peer.FSM.Event(resource.PeerEventDownload); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err) peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return return
} }
case resource.PeerStateReceivedSmall: case resource.PeerStateReceivedSmall:
@ -598,12 +598,12 @@ func (s *Service) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) {
// the peer has already returned to the parent when registering // the peer has already returned to the parent when registering
peer.Log.Info("file type is small, peer has already returned to the parent when registering") peer.Log.Info("file type is small, peer has already returned to the parent when registering")
if err := peer.FSM.Event(resource.PeerEventDownload); err != nil { if err := peer.FSM.Event(resource.PeerEventDownload); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err) peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return return
} }
case resource.PeerStateReceivedNormal: case resource.PeerStateReceivedNormal:
if err := peer.FSM.Event(resource.PeerEventDownload); err != nil { if err := peer.FSM.Event(resource.PeerEventDownload); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err) peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return return
} }
@ -651,7 +651,7 @@ func (s *Service) handlePieceFail(ctx context.Context, peer *resource.Peer, piec
switch piece.Code { switch piece.Code {
case base.Code_PeerTaskNotFound, base.Code_CDNError, base.Code_CDNTaskDownloadFail: case base.Code_PeerTaskNotFound, base.Code_CDNError, base.Code_CDNTaskDownloadFail:
if err := parent.FSM.Event(resource.PeerEventDownloadFailed); err != nil { if err := parent.FSM.Event(resource.PeerEventDownloadFailed); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err) peer.Log.Errorf("peer fsm event failed: %s", err.Error())
break break
} }
case base.Code_ClientPieceNotFound: case base.Code_ClientPieceNotFound:
@ -688,28 +688,34 @@ func (s *Service) handlePieceFail(ctx context.Context, peer *resource.Peer, piec
// handlePeerSuccess handles successful peer // handlePeerSuccess handles successful peer
func (s *Service) handlePeerSuccess(ctx context.Context, peer *resource.Peer) { func (s *Service) handlePeerSuccess(ctx context.Context, peer *resource.Peer) {
if err := peer.FSM.Event(resource.PeerEventDownloadSucceeded); err != nil {
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return
}
// If the peer type is tiny and back-to-source, // If the peer type is tiny and back-to-source,
// it need to directly download the tiny file and store the data in task DirectPiece // it need to directly download the tiny file and store the data in task DirectPiece
if peer.Task.SizeScope() == base.SizeScope_TINY && len(peer.Task.DirectPiece) == 0 { if peer.Task.SizeScope() == base.SizeScope_TINY && len(peer.Task.DirectPiece) == 0 {
data, err := peer.DownloadTinyFile() data, err := peer.DownloadTinyFile()
if err == nil && len(data) == int(peer.Task.ContentLength.Load()) { if err != nil {
// Tiny file downloaded successfully peer.Log.Errorf("download tiny task failed: %s", err.Error())
peer.Task.DirectPiece = data return
} else {
peer.Log.Warnf("download tiny file length is %d, task content length is %d, downloading is failed: %v", len(data), peer.Task.ContentLength.Load(), err)
} }
}
if err := peer.FSM.Event(resource.PeerEventDownloadSucceeded); err != nil { if len(data) != int(peer.Task.ContentLength.Load()) {
peer.Log.Errorf("peer fsm event failed: %v", err) peer.Log.Errorf("download tiny task length of data is %d, task content length is %d", len(data), peer.Task.ContentLength.Load())
return return
}
// Tiny file downloaded successfully
peer.Task.DirectPiece = data
} }
} }
// handlePeerFail handles failed peer // handlePeerFail handles failed peer
func (s *Service) handlePeerFail(ctx context.Context, peer *resource.Peer) { func (s *Service) handlePeerFail(ctx context.Context, peer *resource.Peer) {
if err := peer.FSM.Event(resource.PeerEventDownloadFailed); err != nil { if err := peer.FSM.Event(resource.PeerEventDownloadFailed); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err) peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return return
} }
@ -736,7 +742,7 @@ func (s *Service) handleTaskSuccess(ctx context.Context, task *resource.Task, re
} }
if err := task.FSM.Event(resource.TaskEventDownloadSucceeded); err != nil { if err := task.FSM.Event(resource.TaskEventDownloadSucceeded); err != nil {
task.Log.Errorf("task fsm event failed: %v", err) task.Log.Errorf("task fsm event failed: %s", err.Error())
return return
} }
@ -761,7 +767,7 @@ func (s *Service) handleTaskFail(ctx context.Context, task *resource.Task) {
} }
if err := task.FSM.Event(resource.TaskEventDownloadFailed); err != nil { if err := task.FSM.Event(resource.TaskEventDownloadFailed); err != nil {
task.Log.Errorf("task fsm event failed: %v", err) task.Log.Errorf("task fsm event failed: %s", err.Error())
return return
} }
} }