From f6506115a74b8bc01279e6a881c47b46ea99c6b6 Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Thu, 4 Aug 2022 14:09:33 +0800 Subject: [PATCH] feat: support grpc recursive download (#1518) * feat: daemon grpc recursive download * fix: init storage error Signed-off-by: Jim Ma --- client/config/dfget.go | 2 +- client/daemon/metrics/metrics.go | 3 + client/daemon/peer/peertask_conductor.go | 22 +++ client/daemon/peer/peertask_manager.go | 2 +- .../daemon/rpcserver/mocks/rpcserver_mock.go | 38 ++++ client/daemon/rpcserver/rpcserver.go | 166 ++++++++++++++++-- pkg/rpc/dfdaemon/server/mocks/server_mock.go | 8 +- pkg/rpc/dfdaemon/server/server.go | 67 +------ 8 files changed, 227 insertions(+), 81 deletions(-) diff --git a/client/config/dfget.go b/client/config/dfget.go index a027b2f14..c567ef029 100644 --- a/client/config/dfget.go +++ b/client/config/dfget.go @@ -253,7 +253,7 @@ func (cfg *ClientOption) checkOutput() error { return nil } -// MkdirAll make directories recursive, and changes uid, gid to latest directory. +// MkdirAll make directories recursive, and changes uid, gid to the latest directory. // For example: the path /data/x exists, uid=1, gid=1 // when call MkdirAll("/data/x/y/z", 0755, 2, 2) // MkdirAll creates /data/x/y and change owner to 2:2, creates /data/x/y/z and change owner to 2:2 diff --git a/client/daemon/metrics/metrics.go b/client/daemon/metrics/metrics.go index a5f35ba4c..a9d399a58 100644 --- a/client/daemon/metrics/metrics.go +++ b/client/daemon/metrics/metrics.go @@ -33,6 +33,9 @@ const ( // Failed download task type is source FailTypeBackSource = "source" + // Failed download task type is init, indecates not yet register to scheduler + FailTypeInit = "init" + // SeedPeerDownload type is p2p SeedPeerDownloadTypeP2P = "p2p" diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index cd987f098..e05982211 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -458,6 +458,28 @@ func (pt *peerTaskConductor) cancel(code commonv1.Code, reason string) { }) } +func (pt *peerTaskConductor) cancelNotRegisterred(code commonv1.Code, reason string) { + pt.statusOnce.Do(func() { + pt.failedCode = code + pt.failedReason = reason + + metrics.PeerTaskFailedCount.WithLabelValues(metrics.FailTypeInit).Add(1) + + pt.peerTaskManager.PeerTaskDone(pt.taskID) + pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false)) + pt.span.SetAttributes(config.AttributePeerTaskCode.Int(int(pt.failedCode))) + pt.span.SetAttributes(config.AttributePeerTaskMessage.String(pt.failedReason)) + + close(pt.failCh) + pt.broker.Stop() + pt.span.End() + pt.pieceDownloadCancel() + if pt.pieceTaskSyncManager != nil { + pt.pieceTaskSyncManager.cancel() + } + }) +} + // only use when receive back source code from scheduler func (pt *peerTaskConductor) markBackSource() { pt.needBackSource.Store(true) diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index d5db51913..226211784 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -239,7 +239,7 @@ func (ptm *peerTaskManager) getOrCreatePeerTaskConductor( err := ptc.initStorage(desiredLocation) if err != nil { ptc.Errorf("init storage error: %s", err) - ptc.cancel(commonv1.Code_ClientError, err.Error()) + ptc.cancelNotRegisterred(commonv1.Code_ClientError, err.Error()) return nil, false, err } return ptc, true, nil diff --git a/client/daemon/rpcserver/mocks/rpcserver_mock.go b/client/daemon/rpcserver/mocks/rpcserver_mock.go index 20b132d25..77f1e1921 100644 --- a/client/daemon/rpcserver/mocks/rpcserver_mock.go +++ b/client/daemon/rpcserver/mocks/rpcserver_mock.go @@ -9,6 +9,7 @@ import ( reflect "reflect" time "time" + v1 "d7y.io/api/pkg/apis/dfdaemon/v1" gomock "github.com/golang/mock/gomock" ) @@ -100,3 +101,40 @@ func (mr *MockServerMockRecorder) Stop() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockServer)(nil).Stop)) } + +// MockResultSender is a mock of ResultSender interface. +type MockResultSender struct { + ctrl *gomock.Controller + recorder *MockResultSenderMockRecorder +} + +// MockResultSenderMockRecorder is the mock recorder for MockResultSender. +type MockResultSenderMockRecorder struct { + mock *MockResultSender +} + +// NewMockResultSender creates a new mock instance. +func NewMockResultSender(ctrl *gomock.Controller) *MockResultSender { + mock := &MockResultSender{ctrl: ctrl} + mock.recorder = &MockResultSenderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockResultSender) EXPECT() *MockResultSenderMockRecorder { + return m.recorder +} + +// Send mocks base method. +func (m *MockResultSender) Send(arg0 *v1.DownResult) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *MockResultSenderMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockResultSender)(nil).Send), arg0) +} diff --git a/client/daemon/rpcserver/rpcserver.go b/client/daemon/rpcserver/rpcserver.go index 80099448c..4bf8ca5b1 100644 --- a/client/daemon/rpcserver/rpcserver.go +++ b/client/daemon/rpcserver/rpcserver.go @@ -25,9 +25,16 @@ import ( "io" "math" "net" + "net/url" "os" + "path" + "path/filepath" + "strings" + "syscall" "time" + "github.com/gammazero/deque" + "github.com/google/uuid" "go.uber.org/atomic" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -46,10 +53,12 @@ import ( "d7y.io/dragonfly/v2/client/util" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/basic" "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/net/http" dfdaemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server" "d7y.io/dragonfly/v2/pkg/safe" + "d7y.io/dragonfly/v2/pkg/source" "d7y.io/dragonfly/v2/scheduler/resource" ) @@ -314,14 +323,104 @@ func (s *server) CheckHealth(context.Context) error { return nil } -func (s *server) Download(ctx context.Context, - req *dfdaemonv1.DownRequest, results chan<- *dfdaemonv1.DownResult) error { +func (s *server) Download(req *dfdaemonv1.DownRequest, stream dfdaemonv1.Daemon_DownloadServer) error { s.Keep() - return s.doDownload(ctx, req, results, "") + ctx := stream.Context() + if req.Recursive { + return s.doRecursiveDownload(ctx, req, stream) + } + return s.doDownload(ctx, req, stream, "") } -func (s *server) doDownload(ctx context.Context, req *dfdaemonv1.DownRequest, - results chan<- *dfdaemonv1.DownResult, peerID string) error { +func (s *server) doRecursiveDownload(ctx context.Context, req *dfdaemonv1.DownRequest, stream dfdaemonv1.Daemon_DownloadServer) error { + if stat, err := os.Stat(req.Output); err != nil { + return err + } else if !stat.IsDir() { + return fmt.Errorf("target output %s must be directory", req.Output) + } + + var queue deque.Deque[*dfdaemonv1.DownRequest] + queue.PushBack(req) + downloadMap := map[url.URL]struct{}{} + for { + if queue.Len() == 0 { + break + } + + parentReq := queue.PopFront() + request, err := source.NewRequestWithContext(ctx, parentReq.Url, parentReq.UrlMeta.Header) + if err != nil { + return err + } + + // prevent loop downloading + if _, exist := downloadMap[*request.URL]; exist { + continue + } + downloadMap[*request.URL] = struct{}{} + + urlEntries, err := source.List(request) + if err != nil { + logger.Errorf("url [%v] source lister error: %v", request.URL, err) + return err + } + + for _, urlEntry := range urlEntries { + childReq := copyDownRequest(parentReq) //create new req + childReq.Output = path.Join(parentReq.Output, urlEntry.Name) + logger.Infof("target output: %s", strings.TrimPrefix(childReq.Output, req.Output)) + + u := urlEntry.URL + childReq.Url = u.String() + logger.Infof("download %s to %s", childReq.Url, childReq.Output) + + if urlEntry.IsDir { + queue.PushBack(childReq) + continue + } + + // validate new request + if err = childReq.Validate(); err != nil { + logger.Errorf("validate %#v failed: %s", childReq, err) + return err + } + + if err = checkOutput(childReq.Output); err != nil { + logger.Errorf("check output %#v failed: %s", childReq, err) + return err + } + + if err = s.doDownload(ctx, childReq, stream, ""); err != nil { + return err + } + } + } + return nil +} + +func copyDownRequest(req *dfdaemonv1.DownRequest) *dfdaemonv1.DownRequest { + return &dfdaemonv1.DownRequest{ + Uuid: uuid.New().String(), + Url: req.Url, + Output: req.Output, + Timeout: req.Timeout, + Limit: req.Limit, + DisableBackSource: req.DisableBackSource, + UrlMeta: req.UrlMeta, + Pattern: req.Pattern, + Callsystem: req.Callsystem, + Uid: req.Uid, + Gid: req.Gid, + KeepOriginalOffset: req.KeepOriginalOffset, + Recursive: false, // not used anymore + } +} + +type ResultSender interface { + Send(*dfdaemonv1.DownResult) error +} + +func (s *server) doDownload(ctx context.Context, req *dfdaemonv1.DownRequest, stream ResultSender, peerID string) error { if req.UrlMeta == nil { req.UrlMeta = &commonv1.UrlMeta{} } @@ -363,11 +462,16 @@ func (s *server) doDownload(ctx context.Context, req *dfdaemonv1.DownRequest, return dferrors.New(commonv1.Code_UnknownError, fmt.Sprintf("%s", err)) } if tiny != nil { - results <- &dfdaemonv1.DownResult{ + err = stream.Send(&dfdaemonv1.DownResult{ TaskId: tiny.TaskID, PeerId: tiny.PeerID, CompletedLength: uint64(len(tiny.Content)), Done: true, + Output: req.Output, + }) + if err != nil { + log.Infof("send download result error: %s", err.Error()) + return err } log.Infof("tiny file, wrote to output") if req.Uid != 0 && req.Gid != 0 { @@ -391,11 +495,16 @@ func (s *server) doDownload(ctx context.Context, req *dfdaemonv1.DownRequest, log.Errorf("task %s/%s failed: %d/%s", p.PeerID, p.TaskID, p.State.Code, p.State.Msg) return dferrors.New(p.State.Code, p.State.Msg) } - results <- &dfdaemonv1.DownResult{ + err = stream.Send(&dfdaemonv1.DownResult{ TaskId: p.TaskID, PeerId: p.PeerID, CompletedLength: uint64(p.CompletedLength), Done: p.PeerTaskDone, + Output: req.Output, + }) + if err != nil { + log.Infof("send download result error: %s", err.Error()) + return err } // peer task sets PeerTaskDone to true only once if p.PeerTaskDone { @@ -411,9 +520,14 @@ func (s *server) doDownload(ctx context.Context, req *dfdaemonv1.DownRequest, return nil } case <-ctx.Done(): - results <- &dfdaemonv1.DownResult{ + err = stream.Send(&dfdaemonv1.DownResult{ CompletedLength: 0, Done: true, + Output: req.Output, + }) + if err != nil { + log.Infof("send download result error: %s", err.Error()) + return err } log.Infof("context done due to %s", ctx.Err()) return status.Error(codes.Canceled, ctx.Err().Error()) @@ -596,7 +710,7 @@ func (s *server) exportFromPeers(ctx context.Context, log *logger.SugaredLoggerO Gid: req.Gid, } - go call(ctx, peerID, drc, s, downRequest, errChan) + go call(ctx, peerID, &simpleResultSender{drc}, s, downRequest, errChan) go func() { for result = range drc { if result.Done { @@ -619,9 +733,19 @@ func (s *server) exportFromPeers(ctx context.Context, log *logger.SugaredLoggerO return nil } -func call(ctx context.Context, peerID string, drc chan *dfdaemonv1.DownResult, s *server, req *dfdaemonv1.DownRequest, errChan chan error) { +// TODO remove this wrapper in exportFromPeers +type simpleResultSender struct { + drc chan *dfdaemonv1.DownResult +} + +func (s *simpleResultSender) Send(result *dfdaemonv1.DownResult) error { + s.drc <- result + return nil +} + +func call(ctx context.Context, peerID string, sender ResultSender, s *server, req *dfdaemonv1.DownRequest, errChan chan error) { err := safe.Call(func() { - if err := s.doDownload(ctx, req, drc, peerID); err != nil { + if err := s.doDownload(ctx, req, sender, peerID); err != nil { errChan <- err } }) @@ -655,3 +779,23 @@ func (s *server) DeleteTask(ctx context.Context, req *dfdaemonv1.DeleteTaskReque } return nil } + +func checkOutput(output string) error { + if !filepath.IsAbs(output) { + return fmt.Errorf("path[%s] is not absolute path", output) + } + outputDir, _ := path.Split(output) + if err := config.MkdirAll(outputDir, 0777, basic.UserID, basic.UserGroup); err != nil { + return err + } + + // check permission + for dir := output; dir != ""; dir = filepath.Dir(dir) { + if err := syscall.Access(dir, syscall.O_RDWR); err == nil { + break + } else if os.IsPermission(err) || dir == "/" { + return fmt.Errorf("user[%s] path[%s] %v", basic.Username, output, err) + } + } + return nil +} diff --git a/pkg/rpc/dfdaemon/server/mocks/server_mock.go b/pkg/rpc/dfdaemon/server/mocks/server_mock.go index df9363d02..a18876dda 100644 --- a/pkg/rpc/dfdaemon/server/mocks/server_mock.go +++ b/pkg/rpc/dfdaemon/server/mocks/server_mock.go @@ -65,17 +65,17 @@ func (mr *MockDaemonServerMockRecorder) DeleteTask(arg0, arg1 interface{}) *gomo } // Download mocks base method. -func (m *MockDaemonServer) Download(arg0 context.Context, arg1 *v10.DownRequest, arg2 chan<- *v10.DownResult) error { +func (m *MockDaemonServer) Download(arg0 *v10.DownRequest, arg1 v10.Daemon_DownloadServer) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Download", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "Download", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // Download indicates an expected call of Download. -func (mr *MockDaemonServerMockRecorder) Download(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockDaemonServerMockRecorder) Download(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockDaemonServer)(nil).Download), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockDaemonServer)(nil).Download), arg0, arg1) } // ExportTask mocks base method. diff --git a/pkg/rpc/dfdaemon/server/server.go b/pkg/rpc/dfdaemon/server/server.go index cb0d98c02..ca942aa23 100644 --- a/pkg/rpc/dfdaemon/server/server.go +++ b/pkg/rpc/dfdaemon/server/server.go @@ -20,7 +20,6 @@ package server import ( "context" - "sync" "google.golang.org/grpc" "google.golang.org/grpc/peer" @@ -29,16 +28,14 @@ import ( commonv1 "d7y.io/api/pkg/apis/common/v1" dfdaemonv1 "d7y.io/api/pkg/apis/dfdaemon/v1" - "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc" - "d7y.io/dragonfly/v2/pkg/safe" ) // DaemonServer refer to dfdaemonv1.DaemonServer type DaemonServer interface { // Download triggers client to download file - Download(context.Context, *dfdaemonv1.DownRequest, chan<- *dfdaemonv1.DownResult) error + Download(*dfdaemonv1.DownRequest, dfdaemonv1.Daemon_DownloadServer) error // GetPieceTasks get piece tasks from other peers GetPieceTasks(context.Context, *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) // SyncPieceTasks sync piece tasks info with other peers @@ -67,35 +64,12 @@ func New(daemonServer DaemonServer, opts ...grpc.ServerOption) *grpc.Server { } func (p *proxy) Download(req *dfdaemonv1.DownRequest, stream dfdaemonv1.Daemon_DownloadServer) (err error) { - ctx, cancel := context.WithCancel(stream.Context()) - defer cancel() - peerAddr := "unknown" - if pe, ok := peer.FromContext(ctx); ok { + if pe, ok := peer.FromContext(stream.Context()); ok { peerAddr = pe.Addr.String() } logger.Infof("trigger download for url: %s, from: %s, uuid: %s", req.Url, peerAddr, req.Uuid) - - errChan := make(chan error, 10) - drc := make(chan *dfdaemonv1.DownResult, 4) - - once := new(sync.Once) - closeDrc := func() { - once.Do(func() { - close(drc) - }) - } - defer closeDrc() - - go call(ctx, drc, p, req, errChan) - - go send(drc, closeDrc, stream, errChan) - - if err = <-errChan; dferrors.IsEndOfStream(err) { - err = nil - } - - return + return p.server.Download(req, stream) } func (p *proxy) GetPieceTasks(ctx context.Context, ptr *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) { @@ -125,38 +99,3 @@ func (p *proxy) ExportTask(ctx context.Context, req *dfdaemonv1.ExportTaskReques func (p *proxy) DeleteTask(ctx context.Context, req *dfdaemonv1.DeleteTaskRequest) (*emptypb.Empty, error) { return new(emptypb.Empty), p.server.DeleteTask(ctx, req) } - -func send(drc chan *dfdaemonv1.DownResult, closeDrc func(), stream dfdaemonv1.Daemon_DownloadServer, errChan chan error) { - err := safe.Call(func() { - defer closeDrc() - - for v := range drc { - if err := stream.Send(v); err != nil { - errChan <- err - return - } - - if v.Done { - break - } - } - - errChan <- dferrors.ErrEndOfStream - }) - - if err != nil { - errChan <- err - } -} - -func call(ctx context.Context, drc chan *dfdaemonv1.DownResult, p *proxy, req *dfdaemonv1.DownRequest, errChan chan error) { - err := safe.Call(func() { - if err := p.server.Download(ctx, req, drc); err != nil { - errChan <- err - } - }) - - if err != nil { - errChan <- err - } -}