From c5f4ca50fa94e7412c888f6f2997bf98f1e5ac98 Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Thu, 22 Sep 2022 10:38:53 +0800 Subject: [PATCH] fix: grpc download tidy file error (#1697) Signed-off-by: Jim Ma --- client/daemon/peer/peertask_manager.go | 12 +++++------ client/daemon/peer/peertask_manager_mock.go | 7 +++---- client/daemon/peer/peertask_manager_test.go | 2 +- client/daemon/rpcserver/rpcserver.go | 23 +-------------------- 4 files changed, 11 insertions(+), 33 deletions(-) diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index 26ac9d92f..30ed89c1e 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -48,7 +48,7 @@ type TaskManager interface { // return a progress channel for request download progress // tiny stands task file is tiny and task is done StartFileTask(ctx context.Context, req *FileTaskRequest) ( - progress chan *FileTaskProgress, tiny *TinyData, err error) + progress chan *FileTaskProgress, err error) // StartStreamTask starts a peer task with stream io StartStreamTask(ctx context.Context, req *StreamTaskRequest) ( readCloser io.ReadCloser, attribute map[string]string, err error) @@ -260,15 +260,15 @@ func (ptm *peerTaskManager) prefetchParentTask(request *schedulerv1.PeerTaskRequ return prefetch } -func (ptm *peerTaskManager) StartFileTask(ctx context.Context, req *FileTaskRequest) (chan *FileTaskProgress, *TinyData, error) { +func (ptm *peerTaskManager) StartFileTask(ctx context.Context, req *FileTaskRequest) (chan *FileTaskProgress, error) { if req.KeepOriginalOffset && !ptm.Prefetch { - return nil, nil, fmt.Errorf("please enable prefetch when use original offset feature") + return nil, fmt.Errorf("please enable prefetch when use original offset feature") } if ptm.Multiplex { progress, ok := ptm.tryReuseFilePeerTask(ctx, req) if ok { metrics.PeerTaskCacheHitCount.Add(1) - return progress, nil, nil + return progress, nil } } // TODO ensure scheduler is ok first @@ -281,12 +281,12 @@ func (ptm *peerTaskManager) StartFileTask(ctx context.Context, req *FileTaskRequ } ctx, pt, err := ptm.newFileTask(ctx, req, limit) if err != nil { - return nil, nil, err + return nil, err } // FIXME when failed due to SchedulerClient error, relocate SchedulerClient and retry progress, err := pt.Start(ctx) - return progress, pt.peerTaskConductor.tinyData, err + return progress, err } func (ptm *peerTaskManager) StartStreamTask(ctx context.Context, req *StreamTaskRequest) (io.ReadCloser, map[string]string, error) { diff --git a/client/daemon/peer/peertask_manager_mock.go b/client/daemon/peer/peertask_manager_mock.go index ae9ab12ea..5ad97121a 100644 --- a/client/daemon/peer/peertask_manager_mock.go +++ b/client/daemon/peer/peertask_manager_mock.go @@ -84,13 +84,12 @@ func (mr *MockTaskManagerMockRecorder) IsPeerTaskRunning(taskID interface{}) *go } // StartFileTask mocks base method. -func (m *MockTaskManager) StartFileTask(ctx context.Context, req *FileTaskRequest) (chan *FileTaskProgress, *TinyData, error) { +func (m *MockTaskManager) StartFileTask(ctx context.Context, req *FileTaskRequest) (chan *FileTaskProgress, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "StartFileTask", ctx, req) ret0, _ := ret[0].(chan *FileTaskProgress) - ret1, _ := ret[1].(*TinyData) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + ret1, _ := ret[1].(error) + return ret0, ret1 } // StartFileTask indicates an expected call of StartFileTask. diff --git a/client/daemon/peer/peertask_manager_test.go b/client/daemon/peer/peertask_manager_test.go index e7af51212..659a69f3c 100644 --- a/client/daemon/peer/peertask_manager_test.go +++ b/client/daemon/peer/peertask_manager_test.go @@ -699,7 +699,7 @@ func (ts *testSpec) runFileTaskTest(assert *testifyassert.Assertions, require *t defer func() { assert.Nil(os.Remove(output)) }() - progress, _, err := mm.peerTaskManager.StartFileTask( + progress, err := mm.peerTaskManager.StartFileTask( context.Background(), &FileTaskRequest{ PeerTaskRequest: schedulerv1.PeerTaskRequest{ diff --git a/client/daemon/rpcserver/rpcserver.go b/client/daemon/rpcserver/rpcserver.go index 414028074..89d3fa99e 100644 --- a/client/daemon/rpcserver/rpcserver.go +++ b/client/daemon/rpcserver/rpcserver.go @@ -485,32 +485,11 @@ func (s *server) doDownload(ctx context.Context, req *dfdaemonv1.DownRequest, st } log := logger.With("peer", peerTask.PeerId, "component", "downloadService") - peerTaskProgress, tiny, err := s.peerTaskManager.StartFileTask(ctx, peerTask) + peerTaskProgress, err := s.peerTaskManager.StartFileTask(ctx, peerTask) if err != nil { return dferrors.New(commonv1.Code_UnknownError, fmt.Sprintf("%s", err)) } - if tiny != nil { - err = stream.Send(&dfdaemonv1.DownResult{ - TaskId: tiny.TaskID, - PeerId: tiny.PeerID, - CompletedLength: uint64(len(tiny.Content)), - Done: true, - Output: req.Output, - }) - if err != nil { - log.Infof("send download result error: %s", err.Error()) - return err - } - log.Infof("tiny file, wrote to output") - if req.Uid != 0 && req.Gid != 0 { - if err = os.Chown(req.Output, int(req.Uid), int(req.Gid)); err != nil { - log.Errorf("change own failed: %s", err) - return err - } - } - return nil - } for { select { case p, ok := <-peerTaskProgress: