diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index 9bb787a85..33b9cd118 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -168,9 +168,9 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) { PeerId: request.PeerID, }) if er != nil { - logger.Errorf("step 4:leave task %s/%s, error: %v", request.TaskID, request.PeerID, er) + logger.Errorf("step 4: leave task %s/%s, error: %v", request.TaskID, request.PeerID, er) } else { - logger.Infof("step 4:leave task %s/%s state ok", request.TaskID, request.PeerID) + logger.Infof("step 4: leave task %s/%s state ok", request.TaskID, request.PeerID) } } storageManager, err := storage.NewStorageManager(opt.Storage.StoreStrategy, &opt.Storage, diff --git a/client/daemon/rpcserver/rpcserver.go b/client/daemon/rpcserver/rpcserver.go index bdf728f10..80099448c 100644 --- a/client/daemon/rpcserver/rpcserver.go +++ b/client/daemon/rpcserver/rpcserver.go @@ -124,13 +124,22 @@ func (s *server) GetPieceTasks(ctx context.Context, request *commonv1.PieceTaskR err, request.TaskId, request.SrcPid, request.DstPid, request.StartNum, request.Limit) return nil, dferrors.New(code, err.Error()) } - // dst peer is not running + + // check whether dst peer is not running task, ok := s.peerTaskManager.IsPeerTaskRunning(request.TaskId) if !ok { + // 1. no running task code = commonv1.Code_PeerTaskNotFound logger.Errorf("get piece tasks error: target peer task not found, task id: %s, src peer: %s, dst peer: %s, piece num: %d, limit: %d", request.TaskId, request.SrcPid, request.DstPid, request.StartNum, request.Limit) return nil, dferrors.New(code, err.Error()) + } else if task.GetPeerID() == request.GetSrcPid() { + // 2. scheduler schedules same peer host, and the running peer is the source peer (schedulers misses the old peer leaved event) + code = commonv1.Code_PeerTaskNotFound + logger.Errorf("get piece tasks error: target peer task not found, task id: %s, the src peer is same with running peer,"+ + " src peer: %s, dst peer: %s, piece num: %d, limit: %d", + request.TaskId, request.SrcPid, request.DstPid, request.StartNum, request.Limit) + return nil, dferrors.New(code, err.Error()) } if task.GetPeerID() != request.GetDstPid() {