diff --git a/client/clientutil/range.go b/client/clientutil/range.go index 85e526551..8d1f32de3 100644 --- a/client/clientutil/range.go +++ b/client/clientutil/range.go @@ -29,6 +29,10 @@ type Range struct { Start, Length int64 } +func (r Range) String() string { + return fmt.Sprintf("bytes=%d-%d", r.Start, r.Start+r.Length-1) +} + // ErrNoOverlap is returned by ParseRange if first-byte-pos of // all of the byte-range-spec values is greater than the content size. var ErrNoOverlap = errors.New("invalid range: failed to overlap") diff --git a/client/config/constants_otel.go b/client/config/constants_otel.go index 1b50ba5b0..97758e76b 100644 --- a/client/config/constants_otel.go +++ b/client/config/constants_otel.go @@ -25,6 +25,7 @@ const ( AttributePeerID = attribute.Key("d7y.peer.id") AttributeTargetPeerID = attribute.Key("d7y.peer.target.id") AttributeReusePeerID = attribute.Key("d7y.peer.reuse.id") + AttributeReuseRange = attribute.Key("d7y.peer.reuse.range") AttributeTargetPeerAddr = attribute.Key("d7y.peer.target.addr") AttributeMainPeer = attribute.Key("d7y.peer.task.main_peer") AttributePeerPacketCode = attribute.Key("d7y.peer.packet.code") diff --git a/client/config/headers.go b/client/config/headers.go index b4dcb083f..2f9873500 100644 --- a/client/config/headers.go +++ b/client/config/headers.go @@ -20,6 +20,7 @@ const ( HeaderDragonflyFilter = "X-Dragonfly-Filter" HeaderDragonflyPeer = "X-Dragonfly-Peer" HeaderDragonflyTask = "X-Dragonfly-Task" + HeaderDragonflyRange = "X-Dragonfly-Range" HeaderDragonflyBiz = "X-Dragonfly-Biz" // HeaderDragonflyRegistry is used for dynamic registry mirrors HeaderDragonflyRegistry = "X-Dragonfly-Registry" diff --git a/client/config/peerhost.go b/client/config/peerhost.go index aa6b5c587..4076b7949 100644 --- a/client/config/peerhost.go +++ b/client/config/peerhost.go @@ -175,6 +175,7 @@ type DownloadOption struct { CalculateDigest bool `mapstructure:"calculateDigest" yaml:"calculateDigest"` TransportOption *TransportOption `mapstructure:"transportOption" yaml:"transportOption"` GetPiecesMaxRetry int `mapstructure:"getPiecesMaxRetry" yaml:"getPiecesMaxRetry"` + Prefetch bool `mapstructure:"prefetch" yaml:"prefetch"` } type TransportOption struct { diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index 0bcaaf764..df2e7603f 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -169,7 +169,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) { return nil, err } peerTaskManager, err := peer.NewPeerTaskManager(host, pieceManager, storageManager, sched, opt.Scheduler, - opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.CalculateDigest, opt.Download.GetPiecesMaxRetry) + opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.Prefetch, opt.Download.CalculateDigest, opt.Download.GetPiecesMaxRetry) if err != nil { return nil, err } diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index cf2800593..df642871f 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -414,7 +414,7 @@ func (pt *peerTaskConductor) storeTinyPeerTask() { pt.SetTotalPieces(1) ctx := pt.ctx var err error - pt.storage, err = pt.peerTaskManager.storageManager.RegisterTask(ctx, + storageDriver, err := pt.peerTaskManager.storageManager.RegisterTask(ctx, storage.RegisterTaskRequest{ CommonTaskRequest: storage.CommonTaskRequest{ PeerID: pt.tinyData.PeerID, @@ -424,12 +424,13 @@ func (pt *peerTaskConductor) storeTinyPeerTask() { TotalPieces: 1, // TODO check digest }) + pt.storage = storageDriver if err != nil { logger.Errorf("register tiny data storage failed: %s", err) pt.cancel(base.Code_ClientError, err.Error()) return } - n, err := pt.storage.WritePiece(ctx, + n, err := pt.GetStorage().WritePiece(ctx, &storage.WritePieceRequest{ PeerTaskMetadata: storage.PeerTaskMetadata{ PeerID: pt.tinyData.PeerID, @@ -653,7 +654,7 @@ func (pt *peerTaskConductor) pullSinglePiece() { } request := &DownloadPieceRequest{ - storage: pt.storage, + storage: pt.GetStorage(), piece: pt.singlePiece.PieceInfo, log: pt.Log(), TaskID: pt.GetTaskID(), @@ -892,7 +893,7 @@ func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestCh chan *DownloadP pt.requestedPieces.Set(piece.PieceNum) } req := &DownloadPieceRequest{ - storage: pt.storage, + storage: pt.GetStorage(), piece: piece, log: pt.Log(), TaskID: pt.GetTaskID(), @@ -1106,6 +1107,12 @@ func (pt *peerTaskConductor) reportFailResult(request *DownloadPieceRequest, res } func (pt *peerTaskConductor) InitStorage() (err error) { + pt.lock.Lock() + defer pt.lock.Unlock() + // check storage for partial back source cases. + if pt.storage != nil { + return nil + } // prepare storage pt.storage, err = pt.storageManager.RegisterTask(pt.ctx, storage.RegisterTaskRequest{ @@ -1125,7 +1132,7 @@ func (pt *peerTaskConductor) InitStorage() (err error) { func (pt *peerTaskConductor) UpdateStorage() error { // update storage - err := pt.storage.UpdateTask(pt.ctx, + err := pt.GetStorage().UpdateTask(pt.ctx, &storage.UpdateTaskRequest{ PeerTaskMetadata: storage.PeerTaskMetadata{ PeerID: pt.GetPeerID(), @@ -1278,7 +1285,7 @@ func (pt *peerTaskConductor) fail() { // Validate stores metadata and validates digest func (pt *peerTaskConductor) Validate() error { - err := pt.storage.Store(pt.ctx, + err := pt.GetStorage().Store(pt.ctx, &storage.StoreRequest{ CommonTaskRequest: storage.CommonTaskRequest{ PeerID: pt.peerID, @@ -1295,7 +1302,7 @@ func (pt *peerTaskConductor) Validate() error { if !pt.peerTaskManager.calculateDigest { return nil } - err = pt.storage.ValidateDigest( + err = pt.GetStorage().ValidateDigest( &storage.PeerTaskMetadata{ PeerID: pt.GetPeerID(), TaskID: pt.GetTaskID(), diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index 103c1da77..313702db7 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -88,6 +88,10 @@ func (ptm *peerTaskManager) newFileTask( if err != nil { return nil, nil, err } + // prefetch parent request + if ptm.enablePrefetch && request.UrlMeta.Range != "" { + go ptm.prefetch(&request.PeerTaskRequest) + } ctx, span := tracer.Start(ctx, config.SpanFileTask, trace.WithSpanKind(trace.SpanKindClient)) pt := &fileTask{ diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index bbccd6b19..767545c24 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -21,6 +21,7 @@ import ( "io" "sync" + "github.com/go-http-utils/headers" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" "golang.org/x/time/rate" @@ -29,6 +30,8 @@ import ( "d7y.io/dragonfly/v2/client/daemon/metrics" "d7y.io/dragonfly/v2/client/daemon/storage" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/idgen" + "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" ) @@ -108,10 +111,10 @@ type peerTaskManager struct { perPeerRateLimit rate.Limit - // enableMultiplex indicates reusing completed peer task storage - // currently, only check completed peer task after register to scheduler - // TODO multiplex the running peer task + // enableMultiplex indicates to reuse the data of completed peer tasks enableMultiplex bool + // enablePrefetch indicates to prefetch the whole files of ranged requests + enablePrefetch bool calculateDigest bool @@ -126,6 +129,7 @@ func NewPeerTaskManager( schedulerOption config.SchedulerOption, perPeerRateLimit rate.Limit, multiplex bool, + prefetch bool, calculateDigest bool, getPiecesMaxRetry int) (TaskManager, error) { @@ -139,6 +143,7 @@ func NewPeerTaskManager( schedulerOption: schedulerOption, perPeerRateLimit: perPeerRateLimit, enableMultiplex: multiplex, + enablePrefetch: prefetch, calculateDigest: calculateDigest, getPiecesMaxRetry: getPiecesMaxRetry, } @@ -198,6 +203,41 @@ func (ptm *peerTaskManager) getOrCreatePeerTaskConductor( return ptc, true, nil } +func (ptm *peerTaskManager) prefetch(request *scheduler.PeerTaskRequest) { + req := &scheduler.PeerTaskRequest{ + Url: request.Url, + PeerId: request.PeerId, + PeerHost: ptm.host, + HostLoad: request.HostLoad, + IsMigrating: request.IsMigrating, + UrlMeta: &base.UrlMeta{ + Digest: request.UrlMeta.Digest, + Tag: request.UrlMeta.Tag, + Filter: request.UrlMeta.Filter, + Header: map[string]string{}, + }, + } + for k, v := range request.UrlMeta.Header { + if k == headers.Range { + continue + } + req.UrlMeta.Header[k] = v + } + taskID := idgen.TaskID(req.Url, req.UrlMeta) + req.PeerId = idgen.PeerID(req.PeerHost.Ip) + + var limit = rate.Inf + if ptm.perPeerRateLimit > 0 { + limit = ptm.perPeerRateLimit + } + + logger.Infof("prefetch peer task %s/%s", taskID, req.PeerId) + prefetch, err := ptm.getPeerTaskConductor(context.Background(), taskID, req, limit) + if err != nil { + logger.Errorf("prefetch peer task %s/%s error: %s", prefetch.taskID, prefetch.peerID, err) + } +} + func (ptm *peerTaskManager) StartFileTask(ctx context.Context, req *FileTaskRequest) (chan *FileTaskProgress, *TinyData, error) { if ptm.enableMultiplex { progress, ok := ptm.tryReuseFilePeerTask(ctx, req) @@ -235,7 +275,7 @@ func (ptm *peerTaskManager) StartStreamTask(ctx context.Context, req *StreamTask } if ptm.enableMultiplex { - r, attr, ok := ptm.tryReuseStreamPeerTask(ctx, peerTaskRequest) + r, attr, ok := ptm.tryReuseStreamPeerTask(ctx, req) if ok { metrics.PeerTaskCacheHitCount.Add(1) return r, attr, nil diff --git a/client/daemon/peer/peertask_manager_test.go b/client/daemon/peer/peertask_manager_test.go index 3bff820e8..9a77be2db 100644 --- a/client/daemon/peer/peertask_manager_test.go +++ b/client/daemon/peer/peertask_manager_test.go @@ -26,10 +26,12 @@ import ( "net/http/httptest" "os" "runtime" + "strings" "sync" "testing" "time" + "github.com/go-http-utils/headers" "github.com/golang/mock/gomock" "github.com/phayes/freeport" testifyassert "github.com/stretchr/testify/assert" @@ -281,6 +283,7 @@ type testSpec struct { taskType int name string taskData []byte + httpRange *clientutil.Range // only used in back source cases pieceParallelCount int32 pieceSize int sizeScope base.SizeScope @@ -296,7 +299,7 @@ type testSpec struct { backSource bool mockPieceDownloader func(ctrl *gomock.Controller, taskData []byte, pieceSize int) PieceDownloader - mockHTTPSourceClient func(ctrl *gomock.Controller, taskData []byte, url string) source.ResourceClient + mockHTTPSourceClient func(t *testing.T, ctrl *gomock.Controller, rg *clientutil.Range, taskData []byte, url string) source.ResourceClient cleanUp []func() } @@ -367,7 +370,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { url: "http://localhost/test/data", sizeScope: base.SizeScope_NORMAL, mockPieceDownloader: nil, - mockHTTPSourceClient: func(ctrl *gomock.Controller, taskData []byte, url string) source.ResourceClient { + mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *clientutil.Range, taskData []byte, url string) source.ResourceClient { sourceClient := sourceMock.NewMockResourceClient(ctrl) sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn( func(request *source.Request) (int64, error) { @@ -380,6 +383,47 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { return sourceClient }, }, + { + name: "normal size scope - range - back source - content length", + taskData: testBytes[0:4096], + httpRange: &clientutil.Range{ + Start: 0, + Length: 4096, + }, + pieceParallelCount: 4, + pieceSize: 1024, + peerID: "normal-size-peer-range-back-source", + backSource: true, + url: "http://localhost/test/data", + sizeScope: base.SizeScope_NORMAL, + mockPieceDownloader: nil, + mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *clientutil.Range, taskData []byte, url string) source.ResourceClient { + sourceClient := sourceMock.NewMockResourceClient(ctrl) + sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn( + func(request *source.Request) (int64, error) { + assert := testifyassert.New(t) + if rg != nil { + rgs, err := clientutil.ParseRange(request.Header.Get(headers.Range), math.MaxInt) + assert.Nil(err) + assert.Equal(1, len(rgs)) + assert.Equal(rg.String(), rgs[0].String()) + } + return int64(len(taskData)), nil + }) + sourceClient.EXPECT().Download(source.RequestEq(url)).AnyTimes().DoAndReturn( + func(request *source.Request) (*source.Response, error) { + assert := testifyassert.New(t) + if rg != nil { + rgs, err := clientutil.ParseRange(request.Header.Get(headers.Range), math.MaxInt) + assert.Nil(err) + assert.Equal(1, len(rgs)) + assert.Equal(rg.String(), rgs[0].String()) + } + return source.NewResponse(io.NopCloser(bytes.NewBuffer(taskData))), nil + }) + return sourceClient + }, + }, { name: "normal size scope - back source - no content length", taskData: testBytes, @@ -390,7 +434,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { url: "http://localhost/test/data", sizeScope: base.SizeScope_NORMAL, mockPieceDownloader: nil, - mockHTTPSourceClient: func(ctrl *gomock.Controller, taskData []byte, url string) source.ResourceClient { + mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *clientutil.Range, taskData []byte, url string) source.ResourceClient { sourceClient := sourceMock.NewMockResourceClient(ctrl) sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn( func(request *source.Request) (int64, error) { @@ -413,7 +457,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { url: "http://localhost/test/data", sizeScope: base.SizeScope_NORMAL, mockPieceDownloader: nil, - mockHTTPSourceClient: func(ctrl *gomock.Controller, taskData []byte, url string) source.ResourceClient { + mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *clientutil.Range, taskData []byte, url string) source.ResourceClient { sourceClient := sourceMock.NewMockResourceClient(ctrl) sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn( func(request *source.Request) (int64, error) { @@ -469,11 +513,25 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { urlMeta := &base.UrlMeta{ Tag: "d7y-test", } + + if tc.httpRange != nil { + urlMeta.Range = strings.TrimLeft(tc.httpRange.String(), "bytes=") + } + if tc.urlGenerator != nil { tc.url = tc.urlGenerator(&tc) } taskID := idgen.TaskID(tc.url, urlMeta) + var ( + downloader PieceDownloader + sourceClient source.ResourceClient + ) + + if tc.mockPieceDownloader != nil { + downloader = tc.mockPieceDownloader(ctrl, tc.taskData, tc.pieceSize) + } + if tc.mockHTTPSourceClient != nil { source.UnRegister("http") defer func() { @@ -482,18 +540,8 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { require.Nil(source.Register("http", httpprotocol.NewHTTPSourceClient(), httpprotocol.Adapter)) }() // replace source client - require.Nil(source.Register("http", tc.mockHTTPSourceClient(ctrl, tc.taskData, tc.url), httpprotocol.Adapter)) - } - - var ( - downloader PieceDownloader - sourceClient source.ResourceClient - ) - if tc.mockPieceDownloader != nil { - downloader = tc.mockPieceDownloader(ctrl, tc.taskData, tc.pieceSize) - } - if tc.mockHTTPSourceClient != nil { - sourceClient = tc.mockHTTPSourceClient(ctrl, tc.taskData, tc.url) + sourceClient = tc.mockHTTPSourceClient(t, ctrl, tc.httpRange, tc.taskData, tc.url) + require.Nil(source.Register("http", sourceClient, httpprotocol.Adapter)) } option := componentsOption{ @@ -596,14 +644,14 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require * assert.Nil(os.Remove(output)) }() - request := &scheduler.PeerTaskRequest{ + peerTaskRequest := &scheduler.PeerTaskRequest{ Url: ts.url, UrlMeta: urlMeta, PeerId: ts.peerID, PeerHost: &scheduler.PeerHost{}, } - ptc, created, err := ptm.getOrCreatePeerTaskConductor(context.Background(), taskID, request, rate.Limit(pieceSize*4)) + ptc, created, err := ptm.getOrCreatePeerTaskConductor(context.Background(), taskID, peerTaskRequest, rate.Limit(pieceSize*4)) assert.Nil(err, "load first peerTaskConductor") assert.True(created, "should create a new peerTaskConductor") @@ -698,7 +746,12 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require * assert.True(noRunningTask, "no running tasks") // test reuse stream task - rc, _, ok := ptm.tryReuseStreamPeerTask(context.Background(), request) + rc, _, ok := ptm.tryReuseStreamPeerTask(context.Background(), + &StreamTaskRequest{ + URL: ts.url, + URLMeta: urlMeta, + PeerID: ts.peerID, + }) assert.True(ok, "reuse stream task") defer func() { assert.Nil(rc.Close()) diff --git a/client/daemon/peer/peertask_reuse.go b/client/daemon/peer/peertask_reuse.go index 5e8de526a..228c3f59f 100644 --- a/client/daemon/peer/peertask_reuse.go +++ b/client/daemon/peer/peertask_reuse.go @@ -20,18 +20,21 @@ import ( "context" "fmt" "io" + "math" + "os" "time" "github.com/go-http-utils/headers" "go.opentelemetry.io/otel/semconv" "go.opentelemetry.io/otel/trace" + "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/storage" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/rpc/base" - "d7y.io/dragonfly/v2/pkg/rpc/scheduler" + "d7y.io/dragonfly/v2/pkg/util/rangeutils" ) var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation @@ -40,11 +43,41 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context, request *FileTaskRequest) (chan *FileTaskProgress, bool) { taskID := idgen.TaskID(request.Url, request.UrlMeta) reuse := ptm.storageManager.FindCompletedTask(taskID) + var ( + rg *clientutil.Range // the range of parent peer task data to read + log *logger.SugaredLoggerOnWith + length int64 + err error + ) if reuse == nil { - return nil, false + taskID = idgen.ParentTaskID(request.Url, request.UrlMeta) + reuse = ptm.storageManager.FindCompletedTask(taskID) + if reuse == nil { + return nil, false + } + var r *rangeutils.Range + r, err = rangeutils.ParseRange(request.UrlMeta.Range, math.MaxInt) + if err != nil { + logger.Warnf("parse range %s error: %s", request.UrlMeta.Range, err) + return nil, false + } + rg = &clientutil.Range{ + Start: int64(r.StartIndex), + Length: int64(r.EndIndex - r.StartIndex + 1), + } } - log := logger.With("peer", request.PeerId, "task", taskID, "component", "reuseFilePeerTask") + if rg == nil { + log = logger.With("peer", request.PeerId, "task", taskID, "component", "reuseFilePeerTask") + log.Infof("reuse from peer task: %s, total size: %d", reuse.PeerID, reuse.ContentLength) + length = reuse.ContentLength + } else { + log = logger.With("peer", request.PeerId, "task", taskID, "range", request.UrlMeta.Range, + "component", "reuseRangeFilePeerTask") + log.Infof("reuse partial data from peer task: %s, total size: %d, range: %s", + reuse.PeerID, reuse.ContentLength, request.UrlMeta.Range) + length = rg.Length + } _, span := tracer.Start(ctx, config.SpanReusePeerTask, trace.WithSpanKind(trace.SpanKindClient)) span.SetAttributes(config.AttributePeerHost.String(ptm.host.Uuid)) @@ -53,15 +86,17 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context, span.SetAttributes(config.AttributePeerID.String(request.PeerId)) span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID)) span.SetAttributes(semconv.HTTPURLKey.String(request.Url)) + if rg != nil { + span.SetAttributes(config.AttributeReuseRange.String(request.UrlMeta.Range)) + } defer span.End() - log.Infof("reuse from peer task: %s, size: %d", reuse.PeerID, reuse.ContentLength) + log.Infof("reuse from peer task: %s, total size: %d, target size: %d", reuse.PeerID, reuse.ContentLength, length) span.AddEvent("reuse peer task", trace.WithAttributes(config.AttributePeerID.String(reuse.PeerID))) start := time.Now() - err := ptm.storageManager.Store( - context.Background(), - &storage.StoreRequest{ + if rg == nil { + storeRequest := &storage.StoreRequest{ CommonTaskRequest: storage.CommonTaskRequest{ PeerID: reuse.PeerID, TaskID: taskID, @@ -70,13 +105,19 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context, MetadataOnly: false, StoreOnly: true, TotalPieces: reuse.TotalPieces, - }) + } + err = ptm.storageManager.Store(context.Background(), storeRequest) + } else { + err = ptm.storePartialFile(ctx, request, log, reuse, rg) + } + if err != nil { log.Errorf("store error when reuse peer task: %s", err) span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false)) span.RecordError(err) return nil, false } + var cost = time.Now().Sub(start).Milliseconds() log.Infof("reuse file peer task done, cost: %dms", cost) @@ -88,8 +129,8 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context, }, TaskID: taskID, PeerID: request.PeerId, - ContentLength: reuse.ContentLength, - CompletedLength: reuse.ContentLength, + ContentLength: length, + CompletedLength: length, PeerTaskDone: true, DoneCallback: func() {}, } @@ -103,38 +144,93 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context, return progressCh, true } +func (ptm *peerTaskManager) storePartialFile(ctx context.Context, request *FileTaskRequest, + log *logger.SugaredLoggerOnWith, reuse *storage.ReusePeerTask, rg *clientutil.Range) error { + f, err := os.OpenFile(request.Output, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) + if err != nil { + log.Errorf("open dest file error when reuse peer task: %s", err) + return err + } + rc, err := ptm.storageManager.ReadAllPieces(ctx, + &storage.ReadAllPiecesRequest{PeerTaskMetadata: reuse.PeerTaskMetadata, Range: rg}) + if err != nil { + log.Errorf("read pieces error when reuse peer task: %s", err) + return err + } + defer rc.Close() + n, err := io.Copy(f, rc) + if err != nil { + log.Errorf("copy data error when reuse peer task: %s", err) + return err + } + if n != rg.Length { + log.Errorf("copy data length not match when reuse peer task, actual: %d, desire: %d", n, rg.Length) + return io.ErrShortBuffer + } + return nil +} + func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context, - request *scheduler.PeerTaskRequest) (io.ReadCloser, map[string]string, bool) { - taskID := idgen.TaskID(request.Url, request.UrlMeta) + request *StreamTaskRequest) (io.ReadCloser, map[string]string, bool) { + taskID := idgen.TaskID(request.URL, request.URLMeta) reuse := ptm.storageManager.FindCompletedTask(taskID) + var ( + rg *clientutil.Range // the range of parent peer task data to read + log *logger.SugaredLoggerOnWith + ) if reuse == nil { - return nil, nil, false + // for ranged request, check the parent task + if request.Range == nil { + return nil, nil, false + } + taskID = idgen.ParentTaskID(request.URL, request.URLMeta) + reuse = ptm.storageManager.FindCompletedTask(taskID) + if reuse == nil { + return nil, nil, false + } + rg = request.Range } - log := logger.With("peer", request.PeerId, "task", taskID, "component", "reuseStreamPeerTask") - log.Infof("reuse from peer task: %s, size: %d", reuse.PeerID, reuse.ContentLength) + if rg == nil { + log = logger.With("peer", request.PeerID, "task", taskID, "component", "reuseStreamPeerTask") + log.Infof("reuse from peer task: %s, total size: %d", reuse.PeerID, reuse.ContentLength) + } else { + log = logger.With("peer", request.PeerID, "task", taskID, "component", "reuseRangeStreamPeerTask") + log.Infof("reuse partial data from peer task: %s, total size: %d, range: %s", + reuse.PeerID, reuse.ContentLength, request.URLMeta.Range) + } ctx, span := tracer.Start(ctx, config.SpanStreamTask, trace.WithSpanKind(trace.SpanKindClient)) span.SetAttributes(config.AttributePeerHost.String(ptm.host.Uuid)) span.SetAttributes(semconv.NetHostIPKey.String(ptm.host.Ip)) span.SetAttributes(config.AttributeTaskID.String(taskID)) - span.SetAttributes(config.AttributePeerID.String(request.PeerId)) + span.SetAttributes(config.AttributePeerID.String(request.PeerID)) span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID)) - span.SetAttributes(semconv.HTTPURLKey.String(request.Url)) + span.SetAttributes(semconv.HTTPURLKey.String(request.URL)) + if rg != nil { + span.SetAttributes(config.AttributeReuseRange.String(request.URLMeta.Range)) + } defer span.End() - rc, err := ptm.storageManager.ReadAllPieces(ctx, &reuse.PeerTaskMetadata) + rc, err := ptm.storageManager.ReadAllPieces(ctx, + &storage.ReadAllPiecesRequest{PeerTaskMetadata: reuse.PeerTaskMetadata, Range: rg}) if err != nil { - log.Errorf("read all pieces error when reuse peer task: %s", err) + log.Errorf("read pieces error when reuse peer task: %s", err) span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false)) span.RecordError(err) return nil, nil, false } attr := map[string]string{} - attr[headers.ContentLength] = fmt.Sprintf("%d", reuse.ContentLength) attr[config.HeaderDragonflyTask] = taskID - attr[config.HeaderDragonflyPeer] = request.PeerId + attr[config.HeaderDragonflyPeer] = request.PeerID + if rg != nil { + attr[config.HeaderDragonflyRange] = request.URLMeta.Range + attr[headers.ContentRange] = fmt.Sprintf("bytes %d-%d/%d", rg.Start, rg.Start+rg.Length-1, reuse.ContentLength) + attr[headers.ContentLength] = fmt.Sprintf("%d", rg.Length) + } else { + attr[headers.ContentLength] = fmt.Sprintf("%d", reuse.ContentLength) + } // TODO record time when file closed, need add a type to implement Close and WriteTo span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true)) diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index 50bf2b1c6..4456549b2 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/otel/trace" "golang.org/x/time/rate" + "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/metrics" "d7y.io/dragonfly/v2/client/daemon/storage" @@ -39,6 +40,8 @@ type StreamTaskRequest struct { URL string // url meta info URLMeta *base.UrlMeta + // http range + Range *clientutil.Range // peer's id and must be global uniqueness PeerID string } @@ -71,6 +74,12 @@ func (ptm *peerTaskManager) newStreamTask( if err != nil { return nil, err } + + // prefetch parent request + if ptm.enablePrefetch && request.UrlMeta.Range != "" { + go ptm.prefetch(request) + } + ctx, span := tracer.Start(ctx, config.SpanStreamTask, trace.WithSpanKind(trace.SpanKindClient)) pt := &streamTask{ SugaredLoggerOnWith: ptc.SugaredLoggerOnWith, @@ -101,10 +110,11 @@ func (s *streamTask) Start(ctx context.Context) (io.ReadCloser, map[string]strin case <-s.peerTaskConductor.successCh: rc, err := s.peerTaskConductor.peerTaskManager.storageManager.ReadAllPieces( ctx, - &storage.PeerTaskMetadata{ - PeerID: s.peerTaskConductor.peerID, - TaskID: s.peerTaskConductor.taskID, - }) + &storage.ReadAllPiecesRequest{ + PeerTaskMetadata: storage.PeerTaskMetadata{ + PeerID: s.peerTaskConductor.peerID, + TaskID: s.peerTaskConductor.taskID, + }}) return rc, attr, err case first := <-s.pieceCh: firstPiece = first @@ -124,7 +134,7 @@ func (s *streamTask) Start(ctx context.Context) (io.ReadCloser, map[string]strin } func (s *streamTask) writeOnePiece(w io.Writer, pieceNum int32) (int64, error) { - pr, pc, err := s.peerTaskConductor.storage.ReadPiece(s.ctx, &storage.ReadPieceRequest{ + pr, pc, err := s.peerTaskConductor.GetStorage().ReadPiece(s.ctx, &storage.ReadPieceRequest{ PeerTaskMetadata: storage.PeerTaskMetadata{ PeerID: s.peerTaskConductor.peerID, TaskID: s.peerTaskConductor.taskID, diff --git a/client/daemon/peer/piece_manager.go b/client/daemon/peer/piece_manager.go index 2224e1803..d8e8ac42d 100644 --- a/client/daemon/peer/piece_manager.go +++ b/client/daemon/peer/piece_manager.go @@ -248,7 +248,8 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, request *sc request.UrlMeta.Header = map[string]string{} } if request.UrlMeta.Range != "" { - request.UrlMeta.Header["Range"] = request.UrlMeta.Range + // in http source package, adapter will update the real range, we inject "X-Dragonfly-Range" here + request.UrlMeta.Header[source.Range] = request.UrlMeta.Range } log := pt.Log() log.Infof("start to download from source") @@ -282,6 +283,7 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, request *sc return err } response, err := source.Download(downloadRequest) + // TODO update expire info if err != nil { return err } diff --git a/client/daemon/storage/local_storage.go b/client/daemon/storage/local_storage.go index af8b220a8..9855067f7 100644 --- a/client/daemon/storage/local_storage.go +++ b/client/daemon/storage/local_storage.go @@ -248,24 +248,33 @@ func (t *localTaskStore) ReadPiece(ctx context.Context, req *ReadPieceRequest) ( return io.LimitReader(file, req.Range.Length), file, nil } -func (t *localTaskStore) ReadAllPieces(ctx context.Context, req *PeerTaskMetadata) (io.ReadCloser, error) { +func (t *localTaskStore) ReadAllPieces(ctx context.Context, req *ReadAllPiecesRequest) (io.ReadCloser, error) { if t.invalid.Load() { t.Errorf("invalid digest, refuse to read all pieces") return nil, ErrInvalidDigest } t.touch() + + // who call ReadPiece, who close the io.ReadCloser file, err := os.Open(t.DataFilePath) if err != nil { return nil, err } - if _, err = file.Seek(0, io.SeekStart); err != nil { + if req.Range == nil { + return file, nil + } + + if _, err = file.Seek(req.Range.Start, io.SeekStart); err != nil { file.Close() - t.Errorf("file seek failed: %v", err) + t.Errorf("file seek to %d failed: %v", req.Range.Start, err) return nil, err } - // who call ReadPiece, who close the io.ReadCloser - return file, nil + + return &limitedReadFile{ + reader: io.LimitReader(file, req.Range.Length), + closer: file, + }, nil } func (t *localTaskStore) Store(ctx context.Context, req *StoreRequest) error { @@ -479,3 +488,24 @@ func (t *localTaskStore) saveMetadata() error { } return err } + +// limitedReadFile implements io optimize for zero copy +type limitedReadFile struct { + reader io.Reader + closer io.Closer +} + +func (l *limitedReadFile) Read(p []byte) (n int, err error) { + return l.reader.Read(p) +} + +func (l *limitedReadFile) Close() error { + return l.closer.Close() +} + +func (l *limitedReadFile) WriteTo(w io.Writer) (n int64, err error) { + if r, ok := w.(io.ReaderFrom); ok { + return r.ReadFrom(l.reader) + } + return io.Copy(w, l.reader) +} diff --git a/client/daemon/storage/metadata.go b/client/daemon/storage/metadata.go index a5ec98c58..28e644125 100644 --- a/client/daemon/storage/metadata.go +++ b/client/daemon/storage/metadata.go @@ -82,6 +82,10 @@ type ReadPieceRequest struct { PeerTaskMetadata PieceMetadata } +type ReadAllPiecesRequest struct { + PeerTaskMetadata + Range *clientutil.Range +} type UpdateTaskRequest struct { PeerTaskMetadata diff --git a/client/daemon/storage/storage_manager.go b/client/daemon/storage/storage_manager.go index 74e966542..832d23bb2 100644 --- a/client/daemon/storage/storage_manager.go +++ b/client/daemon/storage/storage_manager.go @@ -53,7 +53,7 @@ type TaskStorageDriver interface { // If req.Num is equal to -1, range has a fixed value. ReadPiece(ctx context.Context, req *ReadPieceRequest) (io.Reader, io.Closer, error) - ReadAllPieces(ctx context.Context, req *PeerTaskMetadata) (io.ReadCloser, error) + ReadAllPieces(ctx context.Context, req *ReadAllPiecesRequest) (io.ReadCloser, error) GetPieces(ctx context.Context, req *base.PieceTaskRequest) (*base.PiecePacket, error) @@ -241,7 +241,7 @@ func (s *storageManager) ReadPiece(ctx context.Context, req *ReadPieceRequest) ( return t.(TaskStorageDriver).ReadPiece(ctx, req) } -func (s *storageManager) ReadAllPieces(ctx context.Context, req *PeerTaskMetadata) (io.ReadCloser, error) { +func (s *storageManager) ReadAllPieces(ctx context.Context, req *ReadAllPiecesRequest) (io.ReadCloser, error) { t, ok := s.LoadTask( PeerTaskMetadata{ PeerID: req.PeerID, diff --git a/client/daemon/test/mock/storage/manager.go b/client/daemon/test/mock/storage/manager.go index 22d441c54..aab197920 100644 --- a/client/daemon/test/mock/storage/manager.go +++ b/client/daemon/test/mock/storage/manager.go @@ -69,7 +69,7 @@ func (mr *MockTaskStorageDriverMockRecorder) IsInvalid(req interface{}) *gomock. } // ReadAllPieces mocks base method. -func (m *MockTaskStorageDriver) ReadAllPieces(ctx context.Context, req *storage.PeerTaskMetadata) (io.ReadCloser, error) { +func (m *MockTaskStorageDriver) ReadAllPieces(ctx context.Context, req *storage.ReadAllPiecesRequest) (io.ReadCloser, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ReadAllPieces", ctx, req) ret0, _ := ret[0].(io.ReadCloser) @@ -325,7 +325,7 @@ func (mr *MockManagerMockRecorder) Keep() *gomock.Call { } // ReadAllPieces mocks base method. -func (m *MockManager) ReadAllPieces(ctx context.Context, req *storage.PeerTaskMetadata) (io.ReadCloser, error) { +func (m *MockManager) ReadAllPieces(ctx context.Context, req *storage.ReadAllPiecesRequest) (io.ReadCloser, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ReadAllPieces", ctx, req) ret0, _ := ret[0].(io.ReadCloser) diff --git a/client/daemon/transport/transport.go b/client/daemon/transport/transport.go index d4288ee3c..2cbfd9235 100644 --- a/client/daemon/transport/transport.go +++ b/client/daemon/transport/transport.go @@ -17,19 +17,24 @@ package transport import ( + "bytes" "context" "crypto/tls" "fmt" + "io" + "math" "net" "net/http" "net/http/httputil" "regexp" "strconv" + "strings" "time" "github.com/go-http-utils/headers" "go.opentelemetry.io/otel/propagation" + "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/metrics" "d7y.io/dragonfly/v2/client/daemon/peer" @@ -201,11 +206,23 @@ func (rt *transport) download(ctx context.Context, req *http.Request) (*http.Res // Init meta value meta := &base.UrlMeta{Header: map[string]string{}} + var rg *clientutil.Range // Set meta range's value - if rg := req.Header.Get("Range"); len(rg) > 0 { - meta.Digest = "" - meta.Range = rg + if rangeHeader := req.Header.Get("Range"); len(rangeHeader) > 0 { + rgs, err := clientutil.ParseRange(rangeHeader, math.MaxInt) + if err != nil { + return badRequest(req, err.Error()) + } + if len(rgs) > 1 { + // TODO support multiple range request + return notImplemented(req, "multiple range is not supported") + } else if len(rgs) == 0 { + return requestedRangeNotSatisfiable(req, "zero range is not supported") + } + rg = &rgs[0] + // range in dragonfly is without "bytes=" + meta.Range = strings.TrimLeft(rangeHeader, "bytes=") } // Pick header's parameters @@ -224,6 +241,7 @@ func (rt *transport) download(ctx context.Context, req *http.Request) (*http.Res &peer.StreamTaskRequest{ URL: url, URLMeta: meta, + Range: rg, PeerID: peerID, }, ) @@ -247,8 +265,14 @@ func (rt *transport) download(ctx context.Context, req *http.Request) (*http.Res } } + var status int + if meta.Range == "" { + status = http.StatusOK + } else { + status = http.StatusPartialContent + } resp := &http.Response{ - StatusCode: http.StatusOK, + StatusCode: status, Body: body, Header: hdr, ContentLength: contentLength, @@ -332,3 +356,28 @@ func delHopHeaders(header http.Header) { header.Del(h) } } + +func httpResponse(req *http.Request, status int, body string) (*http.Response, error) { + resp := &http.Response{ + StatusCode: status, + Body: io.NopCloser(bytes.NewBufferString(body)), + ContentLength: int64(len(body)), + + Proto: req.Proto, + ProtoMajor: req.ProtoMajor, + ProtoMinor: req.ProtoMinor, + } + return resp, nil +} + +func badRequest(req *http.Request, body string) (*http.Response, error) { + return httpResponse(req, http.StatusBadRequest, body) +} + +func notImplemented(req *http.Request, body string) (*http.Response, error) { + return httpResponse(req, http.StatusNotImplemented, body) +} + +func requestedRangeNotSatisfiable(req *http.Request, body string) (*http.Response, error) { + return httpResponse(req, http.StatusRequestedRangeNotSatisfiable, body) +} diff --git a/client/dfget/dfget.go b/client/dfget/dfget.go index b55575391..1c229b5aa 100644 --- a/client/dfget/dfget.go +++ b/client/dfget/dfget.go @@ -28,11 +28,11 @@ import ( "strings" "time" + "github.com/go-http-utils/headers" "github.com/pkg/errors" "github.com/schollz/progressbar/v3" "d7y.io/dragonfly/v2/client/config" - "d7y.io/dragonfly/v2/internal/dfheaders" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/basic" "d7y.io/dragonfly/v2/pkg/rpc/base" @@ -210,6 +210,10 @@ func parseHeader(s []string) map[string]string { } func newDownRequest(cfg *config.DfgetConfig, hdr map[string]string) *dfdaemon.DownRequest { + var rg string + if r, ok := hdr[headers.Range]; ok { + rg = strings.TrimLeft(r, "bytes=") + } return &dfdaemon.DownRequest{ Url: cfg.URL, Output: cfg.Output, @@ -219,7 +223,7 @@ func newDownRequest(cfg *config.DfgetConfig, hdr map[string]string) *dfdaemon.Do UrlMeta: &base.UrlMeta{ Digest: cfg.Digest, Tag: cfg.Tag, - Range: hdr[dfheaders.Range], + Range: rg, Filter: cfg.Filter, Header: hdr, }, diff --git a/internal/dfheaders/headers.go b/internal/dfheaders/headers.go deleted file mode 100644 index 970459d7e..000000000 --- a/internal/dfheaders/headers.go +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright 2020 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dfheaders - -const ( - Range = "Range" -) diff --git a/pkg/idgen/task_id.go b/pkg/idgen/task_id.go index 3287ec83b..0aa938c22 100644 --- a/pkg/idgen/task_id.go +++ b/pkg/idgen/task_id.go @@ -24,9 +24,7 @@ import ( "d7y.io/dragonfly/v2/pkg/util/net/urlutils" ) -// TaskID generates a taskId. -// filter is separated by & character. -func TaskID(url string, meta *base.UrlMeta) string { +func taskID(url string, meta *base.UrlMeta, ignoreRange bool) string { var filters []string if meta != nil && meta.Filter != "" { filters = strings.Split(meta.Filter, "&") @@ -39,7 +37,7 @@ func TaskID(url string, meta *base.UrlMeta) string { data = append(data, meta.Digest) } - if meta.Range != "" { + if !ignoreRange && meta.Range != "" { data = append(data, meta.Range) } @@ -50,3 +48,15 @@ func TaskID(url string, meta *base.UrlMeta) string { return digestutils.Sha256(data...) } + +// TaskID generates a task id. +// filter is separated by & character. +func TaskID(url string, meta *base.UrlMeta) string { + return taskID(url, meta, false) +} + +// ParentTaskID generates a task id like TaskID, but without range. +// this func is used to check the parent tasks for ranged requests +func ParentTaskID(url string, meta *base.UrlMeta) string { + return taskID(url, meta, true) +} diff --git a/pkg/idgen/task_id_test.go b/pkg/idgen/task_id_test.go index cd44652d7..86e0f6835 100644 --- a/pkg/idgen/task_id_test.go +++ b/pkg/idgen/task_id_test.go @@ -26,10 +26,11 @@ import ( func TestTaskID(t *testing.T) { tests := []struct { - name string - url string - meta *base.UrlMeta - expect func(t *testing.T, d interface{}) + name string + url string + meta *base.UrlMeta + ignoreRange bool + expect func(t *testing.T, d interface{}) }{ { name: "generate taskID with url", @@ -53,6 +54,20 @@ func TestTaskID(t *testing.T) { assert.Equal("aeee0e0a2a0c75130582641353c539aaf9011a0088b31347f7588e70e449a3e0", d) }, }, + { + name: "generate taskID with meta", + url: "https://example.com", + meta: &base.UrlMeta{ + Range: "foo", + Digest: "bar", + Tag: "", + }, + ignoreRange: true, + expect: func(t *testing.T, d interface{}) { + assert := assert.New(t) + assert.Equal("63dee2822037636b0109876b58e95692233840753a882afa69b9b5ee82a6c57d", d) + }, + }, { name: "generate taskID with filter", url: "https://example.com?foo=foo&bar=bar", @@ -80,7 +95,12 @@ func TestTaskID(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - data := TaskID(tc.url, tc.meta) + var data string + if tc.ignoreRange { + data = ParentTaskID(tc.url, tc.meta) + } else { + data = TaskID(tc.url, tc.meta) + } tc.expect(t, data) }) }