feat: support grpc recursive download (#1518)

* feat: daemon grpc recursive download
* fix: init storage error

Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
Jim Ma 2022-08-04 14:09:33 +08:00 committed by Gaius
parent 585884f01d
commit f6506115a7
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
8 changed files with 227 additions and 81 deletions

View File

@ -253,7 +253,7 @@ func (cfg *ClientOption) checkOutput() error {
return nil
}
// MkdirAll make directories recursive, and changes uid, gid to latest directory.
// MkdirAll make directories recursive, and changes uid, gid to the latest directory.
// For example: the path /data/x exists, uid=1, gid=1
// when call MkdirAll("/data/x/y/z", 0755, 2, 2)
// MkdirAll creates /data/x/y and change owner to 2:2, creates /data/x/y/z and change owner to 2:2

View File

@ -33,6 +33,9 @@ const (
// Failed download task type is source
FailTypeBackSource = "source"
// Failed download task type is init, indecates not yet register to scheduler
FailTypeInit = "init"
// SeedPeerDownload type is p2p
SeedPeerDownloadTypeP2P = "p2p"

View File

@ -458,6 +458,28 @@ func (pt *peerTaskConductor) cancel(code commonv1.Code, reason string) {
})
}
func (pt *peerTaskConductor) cancelNotRegisterred(code commonv1.Code, reason string) {
pt.statusOnce.Do(func() {
pt.failedCode = code
pt.failedReason = reason
metrics.PeerTaskFailedCount.WithLabelValues(metrics.FailTypeInit).Add(1)
pt.peerTaskManager.PeerTaskDone(pt.taskID)
pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
pt.span.SetAttributes(config.AttributePeerTaskCode.Int(int(pt.failedCode)))
pt.span.SetAttributes(config.AttributePeerTaskMessage.String(pt.failedReason))
close(pt.failCh)
pt.broker.Stop()
pt.span.End()
pt.pieceDownloadCancel()
if pt.pieceTaskSyncManager != nil {
pt.pieceTaskSyncManager.cancel()
}
})
}
// only use when receive back source code from scheduler
func (pt *peerTaskConductor) markBackSource() {
pt.needBackSource.Store(true)

View File

@ -239,7 +239,7 @@ func (ptm *peerTaskManager) getOrCreatePeerTaskConductor(
err := ptc.initStorage(desiredLocation)
if err != nil {
ptc.Errorf("init storage error: %s", err)
ptc.cancel(commonv1.Code_ClientError, err.Error())
ptc.cancelNotRegisterred(commonv1.Code_ClientError, err.Error())
return nil, false, err
}
return ptc, true, nil

View File

@ -9,6 +9,7 @@ import (
reflect "reflect"
time "time"
v1 "d7y.io/api/pkg/apis/dfdaemon/v1"
gomock "github.com/golang/mock/gomock"
)
@ -100,3 +101,40 @@ func (mr *MockServerMockRecorder) Stop() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockServer)(nil).Stop))
}
// MockResultSender is a mock of ResultSender interface.
type MockResultSender struct {
ctrl *gomock.Controller
recorder *MockResultSenderMockRecorder
}
// MockResultSenderMockRecorder is the mock recorder for MockResultSender.
type MockResultSenderMockRecorder struct {
mock *MockResultSender
}
// NewMockResultSender creates a new mock instance.
func NewMockResultSender(ctrl *gomock.Controller) *MockResultSender {
mock := &MockResultSender{ctrl: ctrl}
mock.recorder = &MockResultSenderMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockResultSender) EXPECT() *MockResultSenderMockRecorder {
return m.recorder
}
// Send mocks base method.
func (m *MockResultSender) Send(arg0 *v1.DownResult) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Send", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Send indicates an expected call of Send.
func (mr *MockResultSenderMockRecorder) Send(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockResultSender)(nil).Send), arg0)
}

View File

@ -25,9 +25,16 @@ import (
"io"
"math"
"net"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"syscall"
"time"
"github.com/gammazero/deque"
"github.com/google/uuid"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@ -46,10 +53,12 @@ import (
"d7y.io/dragonfly/v2/client/util"
"d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/basic"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/net/http"
dfdaemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server"
"d7y.io/dragonfly/v2/pkg/safe"
"d7y.io/dragonfly/v2/pkg/source"
"d7y.io/dragonfly/v2/scheduler/resource"
)
@ -314,14 +323,104 @@ func (s *server) CheckHealth(context.Context) error {
return nil
}
func (s *server) Download(ctx context.Context,
req *dfdaemonv1.DownRequest, results chan<- *dfdaemonv1.DownResult) error {
func (s *server) Download(req *dfdaemonv1.DownRequest, stream dfdaemonv1.Daemon_DownloadServer) error {
s.Keep()
return s.doDownload(ctx, req, results, "")
ctx := stream.Context()
if req.Recursive {
return s.doRecursiveDownload(ctx, req, stream)
}
return s.doDownload(ctx, req, stream, "")
}
func (s *server) doDownload(ctx context.Context, req *dfdaemonv1.DownRequest,
results chan<- *dfdaemonv1.DownResult, peerID string) error {
func (s *server) doRecursiveDownload(ctx context.Context, req *dfdaemonv1.DownRequest, stream dfdaemonv1.Daemon_DownloadServer) error {
if stat, err := os.Stat(req.Output); err != nil {
return err
} else if !stat.IsDir() {
return fmt.Errorf("target output %s must be directory", req.Output)
}
var queue deque.Deque[*dfdaemonv1.DownRequest]
queue.PushBack(req)
downloadMap := map[url.URL]struct{}{}
for {
if queue.Len() == 0 {
break
}
parentReq := queue.PopFront()
request, err := source.NewRequestWithContext(ctx, parentReq.Url, parentReq.UrlMeta.Header)
if err != nil {
return err
}
// prevent loop downloading
if _, exist := downloadMap[*request.URL]; exist {
continue
}
downloadMap[*request.URL] = struct{}{}
urlEntries, err := source.List(request)
if err != nil {
logger.Errorf("url [%v] source lister error: %v", request.URL, err)
return err
}
for _, urlEntry := range urlEntries {
childReq := copyDownRequest(parentReq) //create new req
childReq.Output = path.Join(parentReq.Output, urlEntry.Name)
logger.Infof("target output: %s", strings.TrimPrefix(childReq.Output, req.Output))
u := urlEntry.URL
childReq.Url = u.String()
logger.Infof("download %s to %s", childReq.Url, childReq.Output)
if urlEntry.IsDir {
queue.PushBack(childReq)
continue
}
// validate new request
if err = childReq.Validate(); err != nil {
logger.Errorf("validate %#v failed: %s", childReq, err)
return err
}
if err = checkOutput(childReq.Output); err != nil {
logger.Errorf("check output %#v failed: %s", childReq, err)
return err
}
if err = s.doDownload(ctx, childReq, stream, ""); err != nil {
return err
}
}
}
return nil
}
func copyDownRequest(req *dfdaemonv1.DownRequest) *dfdaemonv1.DownRequest {
return &dfdaemonv1.DownRequest{
Uuid: uuid.New().String(),
Url: req.Url,
Output: req.Output,
Timeout: req.Timeout,
Limit: req.Limit,
DisableBackSource: req.DisableBackSource,
UrlMeta: req.UrlMeta,
Pattern: req.Pattern,
Callsystem: req.Callsystem,
Uid: req.Uid,
Gid: req.Gid,
KeepOriginalOffset: req.KeepOriginalOffset,
Recursive: false, // not used anymore
}
}
type ResultSender interface {
Send(*dfdaemonv1.DownResult) error
}
func (s *server) doDownload(ctx context.Context, req *dfdaemonv1.DownRequest, stream ResultSender, peerID string) error {
if req.UrlMeta == nil {
req.UrlMeta = &commonv1.UrlMeta{}
}
@ -363,11 +462,16 @@ func (s *server) doDownload(ctx context.Context, req *dfdaemonv1.DownRequest,
return dferrors.New(commonv1.Code_UnknownError, fmt.Sprintf("%s", err))
}
if tiny != nil {
results <- &dfdaemonv1.DownResult{
err = stream.Send(&dfdaemonv1.DownResult{
TaskId: tiny.TaskID,
PeerId: tiny.PeerID,
CompletedLength: uint64(len(tiny.Content)),
Done: true,
Output: req.Output,
})
if err != nil {
log.Infof("send download result error: %s", err.Error())
return err
}
log.Infof("tiny file, wrote to output")
if req.Uid != 0 && req.Gid != 0 {
@ -391,11 +495,16 @@ func (s *server) doDownload(ctx context.Context, req *dfdaemonv1.DownRequest,
log.Errorf("task %s/%s failed: %d/%s", p.PeerID, p.TaskID, p.State.Code, p.State.Msg)
return dferrors.New(p.State.Code, p.State.Msg)
}
results <- &dfdaemonv1.DownResult{
err = stream.Send(&dfdaemonv1.DownResult{
TaskId: p.TaskID,
PeerId: p.PeerID,
CompletedLength: uint64(p.CompletedLength),
Done: p.PeerTaskDone,
Output: req.Output,
})
if err != nil {
log.Infof("send download result error: %s", err.Error())
return err
}
// peer task sets PeerTaskDone to true only once
if p.PeerTaskDone {
@ -411,9 +520,14 @@ func (s *server) doDownload(ctx context.Context, req *dfdaemonv1.DownRequest,
return nil
}
case <-ctx.Done():
results <- &dfdaemonv1.DownResult{
err = stream.Send(&dfdaemonv1.DownResult{
CompletedLength: 0,
Done: true,
Output: req.Output,
})
if err != nil {
log.Infof("send download result error: %s", err.Error())
return err
}
log.Infof("context done due to %s", ctx.Err())
return status.Error(codes.Canceled, ctx.Err().Error())
@ -596,7 +710,7 @@ func (s *server) exportFromPeers(ctx context.Context, log *logger.SugaredLoggerO
Gid: req.Gid,
}
go call(ctx, peerID, drc, s, downRequest, errChan)
go call(ctx, peerID, &simpleResultSender{drc}, s, downRequest, errChan)
go func() {
for result = range drc {
if result.Done {
@ -619,9 +733,19 @@ func (s *server) exportFromPeers(ctx context.Context, log *logger.SugaredLoggerO
return nil
}
func call(ctx context.Context, peerID string, drc chan *dfdaemonv1.DownResult, s *server, req *dfdaemonv1.DownRequest, errChan chan error) {
// TODO remove this wrapper in exportFromPeers
type simpleResultSender struct {
drc chan *dfdaemonv1.DownResult
}
func (s *simpleResultSender) Send(result *dfdaemonv1.DownResult) error {
s.drc <- result
return nil
}
func call(ctx context.Context, peerID string, sender ResultSender, s *server, req *dfdaemonv1.DownRequest, errChan chan error) {
err := safe.Call(func() {
if err := s.doDownload(ctx, req, drc, peerID); err != nil {
if err := s.doDownload(ctx, req, sender, peerID); err != nil {
errChan <- err
}
})
@ -655,3 +779,23 @@ func (s *server) DeleteTask(ctx context.Context, req *dfdaemonv1.DeleteTaskReque
}
return nil
}
func checkOutput(output string) error {
if !filepath.IsAbs(output) {
return fmt.Errorf("path[%s] is not absolute path", output)
}
outputDir, _ := path.Split(output)
if err := config.MkdirAll(outputDir, 0777, basic.UserID, basic.UserGroup); err != nil {
return err
}
// check permission
for dir := output; dir != ""; dir = filepath.Dir(dir) {
if err := syscall.Access(dir, syscall.O_RDWR); err == nil {
break
} else if os.IsPermission(err) || dir == "/" {
return fmt.Errorf("user[%s] path[%s] %v", basic.Username, output, err)
}
}
return nil
}

View File

@ -65,17 +65,17 @@ func (mr *MockDaemonServerMockRecorder) DeleteTask(arg0, arg1 interface{}) *gomo
}
// Download mocks base method.
func (m *MockDaemonServer) Download(arg0 context.Context, arg1 *v10.DownRequest, arg2 chan<- *v10.DownResult) error {
func (m *MockDaemonServer) Download(arg0 *v10.DownRequest, arg1 v10.Daemon_DownloadServer) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Download", arg0, arg1, arg2)
ret := m.ctrl.Call(m, "Download", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// Download indicates an expected call of Download.
func (mr *MockDaemonServerMockRecorder) Download(arg0, arg1, arg2 interface{}) *gomock.Call {
func (mr *MockDaemonServerMockRecorder) Download(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockDaemonServer)(nil).Download), arg0, arg1, arg2)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockDaemonServer)(nil).Download), arg0, arg1)
}
// ExportTask mocks base method.

View File

@ -20,7 +20,6 @@ package server
import (
"context"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
@ -29,16 +28,14 @@ import (
commonv1 "d7y.io/api/pkg/apis/common/v1"
dfdaemonv1 "d7y.io/api/pkg/apis/dfdaemon/v1"
"d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc"
"d7y.io/dragonfly/v2/pkg/safe"
)
// DaemonServer refer to dfdaemonv1.DaemonServer
type DaemonServer interface {
// Download triggers client to download file
Download(context.Context, *dfdaemonv1.DownRequest, chan<- *dfdaemonv1.DownResult) error
Download(*dfdaemonv1.DownRequest, dfdaemonv1.Daemon_DownloadServer) error
// GetPieceTasks get piece tasks from other peers
GetPieceTasks(context.Context, *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error)
// SyncPieceTasks sync piece tasks info with other peers
@ -67,35 +64,12 @@ func New(daemonServer DaemonServer, opts ...grpc.ServerOption) *grpc.Server {
}
func (p *proxy) Download(req *dfdaemonv1.DownRequest, stream dfdaemonv1.Daemon_DownloadServer) (err error) {
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
peerAddr := "unknown"
if pe, ok := peer.FromContext(ctx); ok {
if pe, ok := peer.FromContext(stream.Context()); ok {
peerAddr = pe.Addr.String()
}
logger.Infof("trigger download for url: %s, from: %s, uuid: %s", req.Url, peerAddr, req.Uuid)
errChan := make(chan error, 10)
drc := make(chan *dfdaemonv1.DownResult, 4)
once := new(sync.Once)
closeDrc := func() {
once.Do(func() {
close(drc)
})
}
defer closeDrc()
go call(ctx, drc, p, req, errChan)
go send(drc, closeDrc, stream, errChan)
if err = <-errChan; dferrors.IsEndOfStream(err) {
err = nil
}
return
return p.server.Download(req, stream)
}
func (p *proxy) GetPieceTasks(ctx context.Context, ptr *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) {
@ -125,38 +99,3 @@ func (p *proxy) ExportTask(ctx context.Context, req *dfdaemonv1.ExportTaskReques
func (p *proxy) DeleteTask(ctx context.Context, req *dfdaemonv1.DeleteTaskRequest) (*emptypb.Empty, error) {
return new(emptypb.Empty), p.server.DeleteTask(ctx, req)
}
func send(drc chan *dfdaemonv1.DownResult, closeDrc func(), stream dfdaemonv1.Daemon_DownloadServer, errChan chan error) {
err := safe.Call(func() {
defer closeDrc()
for v := range drc {
if err := stream.Send(v); err != nil {
errChan <- err
return
}
if v.Done {
break
}
}
errChan <- dferrors.ErrEndOfStream
})
if err != nil {
errChan <- err
}
}
func call(ctx context.Context, drc chan *dfdaemonv1.DownResult, p *proxy, req *dfdaemonv1.DownRequest, errChan chan error) {
err := safe.Call(func() {
if err := p.server.Download(ctx, req, drc); err != nil {
errChan <- err
}
})
if err != nil {
errChan <- err
}
}