From f80c75efdb12eec4cf93166ab3c5d4d662fb8a5a Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Thu, 10 Feb 2022 21:40:03 +0800 Subject: [PATCH] Feature: prefetch ranged requests (#1053) 1. implement prefetch ranged requests 2. optimize exact http code in transport 3. simplify reuse peer task logic 4. reuse peer task for ranged request size error 5. fix data race for peer task storage Signed-off-by: Jim Ma --- client/clientutil/range.go | 4 + client/config/constants_otel.go | 1 + client/config/headers.go | 1 + client/config/peerhost.go | 1 + client/daemon/daemon.go | 2 +- client/daemon/peer/peertask_conductor.go | 21 ++- client/daemon/peer/peertask_file.go | 4 + client/daemon/peer/peertask_manager.go | 48 ++++++- client/daemon/peer/peertask_manager_test.go | 91 ++++++++++--- client/daemon/peer/peertask_reuse.go | 138 +++++++++++++++++--- client/daemon/peer/peertask_stream.go | 20 ++- client/daemon/peer/piece_manager.go | 4 +- client/daemon/storage/local_storage.go | 40 +++++- client/daemon/storage/metadata.go | 4 + client/daemon/storage/storage_manager.go | 4 +- client/daemon/test/mock/storage/manager.go | 4 +- client/daemon/transport/transport.go | 57 +++++++- client/dfget/dfget.go | 8 +- internal/dfheaders/headers.go | 21 --- pkg/idgen/task_id.go | 18 ++- pkg/idgen/task_id_test.go | 30 ++++- 21 files changed, 418 insertions(+), 103 deletions(-) delete mode 100644 internal/dfheaders/headers.go diff --git a/client/clientutil/range.go b/client/clientutil/range.go index 85e526551..8d1f32de3 100644 --- a/client/clientutil/range.go +++ b/client/clientutil/range.go @@ -29,6 +29,10 @@ type Range struct { Start, Length int64 } +func (r Range) String() string { + return fmt.Sprintf("bytes=%d-%d", r.Start, r.Start+r.Length-1) +} + // ErrNoOverlap is returned by ParseRange if first-byte-pos of // all of the byte-range-spec values is greater than the content size. var ErrNoOverlap = errors.New("invalid range: failed to overlap") diff --git a/client/config/constants_otel.go b/client/config/constants_otel.go index 1b50ba5b0..97758e76b 100644 --- a/client/config/constants_otel.go +++ b/client/config/constants_otel.go @@ -25,6 +25,7 @@ const ( AttributePeerID = attribute.Key("d7y.peer.id") AttributeTargetPeerID = attribute.Key("d7y.peer.target.id") AttributeReusePeerID = attribute.Key("d7y.peer.reuse.id") + AttributeReuseRange = attribute.Key("d7y.peer.reuse.range") AttributeTargetPeerAddr = attribute.Key("d7y.peer.target.addr") AttributeMainPeer = attribute.Key("d7y.peer.task.main_peer") AttributePeerPacketCode = attribute.Key("d7y.peer.packet.code") diff --git a/client/config/headers.go b/client/config/headers.go index b4dcb083f..2f9873500 100644 --- a/client/config/headers.go +++ b/client/config/headers.go @@ -20,6 +20,7 @@ const ( HeaderDragonflyFilter = "X-Dragonfly-Filter" HeaderDragonflyPeer = "X-Dragonfly-Peer" HeaderDragonflyTask = "X-Dragonfly-Task" + HeaderDragonflyRange = "X-Dragonfly-Range" HeaderDragonflyBiz = "X-Dragonfly-Biz" // HeaderDragonflyRegistry is used for dynamic registry mirrors HeaderDragonflyRegistry = "X-Dragonfly-Registry" diff --git a/client/config/peerhost.go b/client/config/peerhost.go index aa6b5c587..4076b7949 100644 --- a/client/config/peerhost.go +++ b/client/config/peerhost.go @@ -175,6 +175,7 @@ type DownloadOption struct { CalculateDigest bool `mapstructure:"calculateDigest" yaml:"calculateDigest"` TransportOption *TransportOption `mapstructure:"transportOption" yaml:"transportOption"` GetPiecesMaxRetry int `mapstructure:"getPiecesMaxRetry" yaml:"getPiecesMaxRetry"` + Prefetch bool `mapstructure:"prefetch" yaml:"prefetch"` } type TransportOption struct { diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index 0bcaaf764..df2e7603f 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -169,7 +169,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) { return nil, err } peerTaskManager, err := peer.NewPeerTaskManager(host, pieceManager, storageManager, sched, opt.Scheduler, - opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.CalculateDigest, opt.Download.GetPiecesMaxRetry) + opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.Prefetch, opt.Download.CalculateDigest, opt.Download.GetPiecesMaxRetry) if err != nil { return nil, err } diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index cf2800593..df642871f 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -414,7 +414,7 @@ func (pt *peerTaskConductor) storeTinyPeerTask() { pt.SetTotalPieces(1) ctx := pt.ctx var err error - pt.storage, err = pt.peerTaskManager.storageManager.RegisterTask(ctx, + storageDriver, err := pt.peerTaskManager.storageManager.RegisterTask(ctx, storage.RegisterTaskRequest{ CommonTaskRequest: storage.CommonTaskRequest{ PeerID: pt.tinyData.PeerID, @@ -424,12 +424,13 @@ func (pt *peerTaskConductor) storeTinyPeerTask() { TotalPieces: 1, // TODO check digest }) + pt.storage = storageDriver if err != nil { logger.Errorf("register tiny data storage failed: %s", err) pt.cancel(base.Code_ClientError, err.Error()) return } - n, err := pt.storage.WritePiece(ctx, + n, err := pt.GetStorage().WritePiece(ctx, &storage.WritePieceRequest{ PeerTaskMetadata: storage.PeerTaskMetadata{ PeerID: pt.tinyData.PeerID, @@ -653,7 +654,7 @@ func (pt *peerTaskConductor) pullSinglePiece() { } request := &DownloadPieceRequest{ - storage: pt.storage, + storage: pt.GetStorage(), piece: pt.singlePiece.PieceInfo, log: pt.Log(), TaskID: pt.GetTaskID(), @@ -892,7 +893,7 @@ func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestCh chan *DownloadP pt.requestedPieces.Set(piece.PieceNum) } req := &DownloadPieceRequest{ - storage: pt.storage, + storage: pt.GetStorage(), piece: piece, log: pt.Log(), TaskID: pt.GetTaskID(), @@ -1106,6 +1107,12 @@ func (pt *peerTaskConductor) reportFailResult(request *DownloadPieceRequest, res } func (pt *peerTaskConductor) InitStorage() (err error) { + pt.lock.Lock() + defer pt.lock.Unlock() + // check storage for partial back source cases. + if pt.storage != nil { + return nil + } // prepare storage pt.storage, err = pt.storageManager.RegisterTask(pt.ctx, storage.RegisterTaskRequest{ @@ -1125,7 +1132,7 @@ func (pt *peerTaskConductor) InitStorage() (err error) { func (pt *peerTaskConductor) UpdateStorage() error { // update storage - err := pt.storage.UpdateTask(pt.ctx, + err := pt.GetStorage().UpdateTask(pt.ctx, &storage.UpdateTaskRequest{ PeerTaskMetadata: storage.PeerTaskMetadata{ PeerID: pt.GetPeerID(), @@ -1278,7 +1285,7 @@ func (pt *peerTaskConductor) fail() { // Validate stores metadata and validates digest func (pt *peerTaskConductor) Validate() error { - err := pt.storage.Store(pt.ctx, + err := pt.GetStorage().Store(pt.ctx, &storage.StoreRequest{ CommonTaskRequest: storage.CommonTaskRequest{ PeerID: pt.peerID, @@ -1295,7 +1302,7 @@ func (pt *peerTaskConductor) Validate() error { if !pt.peerTaskManager.calculateDigest { return nil } - err = pt.storage.ValidateDigest( + err = pt.GetStorage().ValidateDigest( &storage.PeerTaskMetadata{ PeerID: pt.GetPeerID(), TaskID: pt.GetTaskID(), diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index 103c1da77..313702db7 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -88,6 +88,10 @@ func (ptm *peerTaskManager) newFileTask( if err != nil { return nil, nil, err } + // prefetch parent request + if ptm.enablePrefetch && request.UrlMeta.Range != "" { + go ptm.prefetch(&request.PeerTaskRequest) + } ctx, span := tracer.Start(ctx, config.SpanFileTask, trace.WithSpanKind(trace.SpanKindClient)) pt := &fileTask{ diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index bbccd6b19..767545c24 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -21,6 +21,7 @@ import ( "io" "sync" + "github.com/go-http-utils/headers" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" "golang.org/x/time/rate" @@ -29,6 +30,8 @@ import ( "d7y.io/dragonfly/v2/client/daemon/metrics" "d7y.io/dragonfly/v2/client/daemon/storage" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/idgen" + "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" ) @@ -108,10 +111,10 @@ type peerTaskManager struct { perPeerRateLimit rate.Limit - // enableMultiplex indicates reusing completed peer task storage - // currently, only check completed peer task after register to scheduler - // TODO multiplex the running peer task + // enableMultiplex indicates to reuse the data of completed peer tasks enableMultiplex bool + // enablePrefetch indicates to prefetch the whole files of ranged requests + enablePrefetch bool calculateDigest bool @@ -126,6 +129,7 @@ func NewPeerTaskManager( schedulerOption config.SchedulerOption, perPeerRateLimit rate.Limit, multiplex bool, + prefetch bool, calculateDigest bool, getPiecesMaxRetry int) (TaskManager, error) { @@ -139,6 +143,7 @@ func NewPeerTaskManager( schedulerOption: schedulerOption, perPeerRateLimit: perPeerRateLimit, enableMultiplex: multiplex, + enablePrefetch: prefetch, calculateDigest: calculateDigest, getPiecesMaxRetry: getPiecesMaxRetry, } @@ -198,6 +203,41 @@ func (ptm *peerTaskManager) getOrCreatePeerTaskConductor( return ptc, true, nil } +func (ptm *peerTaskManager) prefetch(request *scheduler.PeerTaskRequest) { + req := &scheduler.PeerTaskRequest{ + Url: request.Url, + PeerId: request.PeerId, + PeerHost: ptm.host, + HostLoad: request.HostLoad, + IsMigrating: request.IsMigrating, + UrlMeta: &base.UrlMeta{ + Digest: request.UrlMeta.Digest, + Tag: request.UrlMeta.Tag, + Filter: request.UrlMeta.Filter, + Header: map[string]string{}, + }, + } + for k, v := range request.UrlMeta.Header { + if k == headers.Range { + continue + } + req.UrlMeta.Header[k] = v + } + taskID := idgen.TaskID(req.Url, req.UrlMeta) + req.PeerId = idgen.PeerID(req.PeerHost.Ip) + + var limit = rate.Inf + if ptm.perPeerRateLimit > 0 { + limit = ptm.perPeerRateLimit + } + + logger.Infof("prefetch peer task %s/%s", taskID, req.PeerId) + prefetch, err := ptm.getPeerTaskConductor(context.Background(), taskID, req, limit) + if err != nil { + logger.Errorf("prefetch peer task %s/%s error: %s", prefetch.taskID, prefetch.peerID, err) + } +} + func (ptm *peerTaskManager) StartFileTask(ctx context.Context, req *FileTaskRequest) (chan *FileTaskProgress, *TinyData, error) { if ptm.enableMultiplex { progress, ok := ptm.tryReuseFilePeerTask(ctx, req) @@ -235,7 +275,7 @@ func (ptm *peerTaskManager) StartStreamTask(ctx context.Context, req *StreamTask } if ptm.enableMultiplex { - r, attr, ok := ptm.tryReuseStreamPeerTask(ctx, peerTaskRequest) + r, attr, ok := ptm.tryReuseStreamPeerTask(ctx, req) if ok { metrics.PeerTaskCacheHitCount.Add(1) return r, attr, nil diff --git a/client/daemon/peer/peertask_manager_test.go b/client/daemon/peer/peertask_manager_test.go index 3bff820e8..9a77be2db 100644 --- a/client/daemon/peer/peertask_manager_test.go +++ b/client/daemon/peer/peertask_manager_test.go @@ -26,10 +26,12 @@ import ( "net/http/httptest" "os" "runtime" + "strings" "sync" "testing" "time" + "github.com/go-http-utils/headers" "github.com/golang/mock/gomock" "github.com/phayes/freeport" testifyassert "github.com/stretchr/testify/assert" @@ -281,6 +283,7 @@ type testSpec struct { taskType int name string taskData []byte + httpRange *clientutil.Range // only used in back source cases pieceParallelCount int32 pieceSize int sizeScope base.SizeScope @@ -296,7 +299,7 @@ type testSpec struct { backSource bool mockPieceDownloader func(ctrl *gomock.Controller, taskData []byte, pieceSize int) PieceDownloader - mockHTTPSourceClient func(ctrl *gomock.Controller, taskData []byte, url string) source.ResourceClient + mockHTTPSourceClient func(t *testing.T, ctrl *gomock.Controller, rg *clientutil.Range, taskData []byte, url string) source.ResourceClient cleanUp []func() } @@ -367,7 +370,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { url: "http://localhost/test/data", sizeScope: base.SizeScope_NORMAL, mockPieceDownloader: nil, - mockHTTPSourceClient: func(ctrl *gomock.Controller, taskData []byte, url string) source.ResourceClient { + mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *clientutil.Range, taskData []byte, url string) source.ResourceClient { sourceClient := sourceMock.NewMockResourceClient(ctrl) sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn( func(request *source.Request) (int64, error) { @@ -380,6 +383,47 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { return sourceClient }, }, + { + name: "normal size scope - range - back source - content length", + taskData: testBytes[0:4096], + httpRange: &clientutil.Range{ + Start: 0, + Length: 4096, + }, + pieceParallelCount: 4, + pieceSize: 1024, + peerID: "normal-size-peer-range-back-source", + backSource: true, + url: "http://localhost/test/data", + sizeScope: base.SizeScope_NORMAL, + mockPieceDownloader: nil, + mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *clientutil.Range, taskData []byte, url string) source.ResourceClient { + sourceClient := sourceMock.NewMockResourceClient(ctrl) + sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn( + func(request *source.Request) (int64, error) { + assert := testifyassert.New(t) + if rg != nil { + rgs, err := clientutil.ParseRange(request.Header.Get(headers.Range), math.MaxInt) + assert.Nil(err) + assert.Equal(1, len(rgs)) + assert.Equal(rg.String(), rgs[0].String()) + } + return int64(len(taskData)), nil + }) + sourceClient.EXPECT().Download(source.RequestEq(url)).AnyTimes().DoAndReturn( + func(request *source.Request) (*source.Response, error) { + assert := testifyassert.New(t) + if rg != nil { + rgs, err := clientutil.ParseRange(request.Header.Get(headers.Range), math.MaxInt) + assert.Nil(err) + assert.Equal(1, len(rgs)) + assert.Equal(rg.String(), rgs[0].String()) + } + return source.NewResponse(io.NopCloser(bytes.NewBuffer(taskData))), nil + }) + return sourceClient + }, + }, { name: "normal size scope - back source - no content length", taskData: testBytes, @@ -390,7 +434,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { url: "http://localhost/test/data", sizeScope: base.SizeScope_NORMAL, mockPieceDownloader: nil, - mockHTTPSourceClient: func(ctrl *gomock.Controller, taskData []byte, url string) source.ResourceClient { + mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *clientutil.Range, taskData []byte, url string) source.ResourceClient { sourceClient := sourceMock.NewMockResourceClient(ctrl) sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn( func(request *source.Request) (int64, error) { @@ -413,7 +457,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { url: "http://localhost/test/data", sizeScope: base.SizeScope_NORMAL, mockPieceDownloader: nil, - mockHTTPSourceClient: func(ctrl *gomock.Controller, taskData []byte, url string) source.ResourceClient { + mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *clientutil.Range, taskData []byte, url string) source.ResourceClient { sourceClient := sourceMock.NewMockResourceClient(ctrl) sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn( func(request *source.Request) (int64, error) { @@ -469,11 +513,25 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { urlMeta := &base.UrlMeta{ Tag: "d7y-test", } + + if tc.httpRange != nil { + urlMeta.Range = strings.TrimLeft(tc.httpRange.String(), "bytes=") + } + if tc.urlGenerator != nil { tc.url = tc.urlGenerator(&tc) } taskID := idgen.TaskID(tc.url, urlMeta) + var ( + downloader PieceDownloader + sourceClient source.ResourceClient + ) + + if tc.mockPieceDownloader != nil { + downloader = tc.mockPieceDownloader(ctrl, tc.taskData, tc.pieceSize) + } + if tc.mockHTTPSourceClient != nil { source.UnRegister("http") defer func() { @@ -482,18 +540,8 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { require.Nil(source.Register("http", httpprotocol.NewHTTPSourceClient(), httpprotocol.Adapter)) }() // replace source client - require.Nil(source.Register("http", tc.mockHTTPSourceClient(ctrl, tc.taskData, tc.url), httpprotocol.Adapter)) - } - - var ( - downloader PieceDownloader - sourceClient source.ResourceClient - ) - if tc.mockPieceDownloader != nil { - downloader = tc.mockPieceDownloader(ctrl, tc.taskData, tc.pieceSize) - } - if tc.mockHTTPSourceClient != nil { - sourceClient = tc.mockHTTPSourceClient(ctrl, tc.taskData, tc.url) + sourceClient = tc.mockHTTPSourceClient(t, ctrl, tc.httpRange, tc.taskData, tc.url) + require.Nil(source.Register("http", sourceClient, httpprotocol.Adapter)) } option := componentsOption{ @@ -596,14 +644,14 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require * assert.Nil(os.Remove(output)) }() - request := &scheduler.PeerTaskRequest{ + peerTaskRequest := &scheduler.PeerTaskRequest{ Url: ts.url, UrlMeta: urlMeta, PeerId: ts.peerID, PeerHost: &scheduler.PeerHost{}, } - ptc, created, err := ptm.getOrCreatePeerTaskConductor(context.Background(), taskID, request, rate.Limit(pieceSize*4)) + ptc, created, err := ptm.getOrCreatePeerTaskConductor(context.Background(), taskID, peerTaskRequest, rate.Limit(pieceSize*4)) assert.Nil(err, "load first peerTaskConductor") assert.True(created, "should create a new peerTaskConductor") @@ -698,7 +746,12 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require * assert.True(noRunningTask, "no running tasks") // test reuse stream task - rc, _, ok := ptm.tryReuseStreamPeerTask(context.Background(), request) + rc, _, ok := ptm.tryReuseStreamPeerTask(context.Background(), + &StreamTaskRequest{ + URL: ts.url, + URLMeta: urlMeta, + PeerID: ts.peerID, + }) assert.True(ok, "reuse stream task") defer func() { assert.Nil(rc.Close()) diff --git a/client/daemon/peer/peertask_reuse.go b/client/daemon/peer/peertask_reuse.go index 5e8de526a..228c3f59f 100644 --- a/client/daemon/peer/peertask_reuse.go +++ b/client/daemon/peer/peertask_reuse.go @@ -20,18 +20,21 @@ import ( "context" "fmt" "io" + "math" + "os" "time" "github.com/go-http-utils/headers" "go.opentelemetry.io/otel/semconv" "go.opentelemetry.io/otel/trace" + "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/storage" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/rpc/base" - "d7y.io/dragonfly/v2/pkg/rpc/scheduler" + "d7y.io/dragonfly/v2/pkg/util/rangeutils" ) var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation @@ -40,11 +43,41 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context, request *FileTaskRequest) (chan *FileTaskProgress, bool) { taskID := idgen.TaskID(request.Url, request.UrlMeta) reuse := ptm.storageManager.FindCompletedTask(taskID) + var ( + rg *clientutil.Range // the range of parent peer task data to read + log *logger.SugaredLoggerOnWith + length int64 + err error + ) if reuse == nil { - return nil, false + taskID = idgen.ParentTaskID(request.Url, request.UrlMeta) + reuse = ptm.storageManager.FindCompletedTask(taskID) + if reuse == nil { + return nil, false + } + var r *rangeutils.Range + r, err = rangeutils.ParseRange(request.UrlMeta.Range, math.MaxInt) + if err != nil { + logger.Warnf("parse range %s error: %s", request.UrlMeta.Range, err) + return nil, false + } + rg = &clientutil.Range{ + Start: int64(r.StartIndex), + Length: int64(r.EndIndex - r.StartIndex + 1), + } } - log := logger.With("peer", request.PeerId, "task", taskID, "component", "reuseFilePeerTask") + if rg == nil { + log = logger.With("peer", request.PeerId, "task", taskID, "component", "reuseFilePeerTask") + log.Infof("reuse from peer task: %s, total size: %d", reuse.PeerID, reuse.ContentLength) + length = reuse.ContentLength + } else { + log = logger.With("peer", request.PeerId, "task", taskID, "range", request.UrlMeta.Range, + "component", "reuseRangeFilePeerTask") + log.Infof("reuse partial data from peer task: %s, total size: %d, range: %s", + reuse.PeerID, reuse.ContentLength, request.UrlMeta.Range) + length = rg.Length + } _, span := tracer.Start(ctx, config.SpanReusePeerTask, trace.WithSpanKind(trace.SpanKindClient)) span.SetAttributes(config.AttributePeerHost.String(ptm.host.Uuid)) @@ -53,15 +86,17 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context, span.SetAttributes(config.AttributePeerID.String(request.PeerId)) span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID)) span.SetAttributes(semconv.HTTPURLKey.String(request.Url)) + if rg != nil { + span.SetAttributes(config.AttributeReuseRange.String(request.UrlMeta.Range)) + } defer span.End() - log.Infof("reuse from peer task: %s, size: %d", reuse.PeerID, reuse.ContentLength) + log.Infof("reuse from peer task: %s, total size: %d, target size: %d", reuse.PeerID, reuse.ContentLength, length) span.AddEvent("reuse peer task", trace.WithAttributes(config.AttributePeerID.String(reuse.PeerID))) start := time.Now() - err := ptm.storageManager.Store( - context.Background(), - &storage.StoreRequest{ + if rg == nil { + storeRequest := &storage.StoreRequest{ CommonTaskRequest: storage.CommonTaskRequest{ PeerID: reuse.PeerID, TaskID: taskID, @@ -70,13 +105,19 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context, MetadataOnly: false, StoreOnly: true, TotalPieces: reuse.TotalPieces, - }) + } + err = ptm.storageManager.Store(context.Background(), storeRequest) + } else { + err = ptm.storePartialFile(ctx, request, log, reuse, rg) + } + if err != nil { log.Errorf("store error when reuse peer task: %s", err) span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false)) span.RecordError(err) return nil, false } + var cost = time.Now().Sub(start).Milliseconds() log.Infof("reuse file peer task done, cost: %dms", cost) @@ -88,8 +129,8 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context, }, TaskID: taskID, PeerID: request.PeerId, - ContentLength: reuse.ContentLength, - CompletedLength: reuse.ContentLength, + ContentLength: length, + CompletedLength: length, PeerTaskDone: true, DoneCallback: func() {}, } @@ -103,38 +144,93 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context, return progressCh, true } +func (ptm *peerTaskManager) storePartialFile(ctx context.Context, request *FileTaskRequest, + log *logger.SugaredLoggerOnWith, reuse *storage.ReusePeerTask, rg *clientutil.Range) error { + f, err := os.OpenFile(request.Output, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) + if err != nil { + log.Errorf("open dest file error when reuse peer task: %s", err) + return err + } + rc, err := ptm.storageManager.ReadAllPieces(ctx, + &storage.ReadAllPiecesRequest{PeerTaskMetadata: reuse.PeerTaskMetadata, Range: rg}) + if err != nil { + log.Errorf("read pieces error when reuse peer task: %s", err) + return err + } + defer rc.Close() + n, err := io.Copy(f, rc) + if err != nil { + log.Errorf("copy data error when reuse peer task: %s", err) + return err + } + if n != rg.Length { + log.Errorf("copy data length not match when reuse peer task, actual: %d, desire: %d", n, rg.Length) + return io.ErrShortBuffer + } + return nil +} + func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context, - request *scheduler.PeerTaskRequest) (io.ReadCloser, map[string]string, bool) { - taskID := idgen.TaskID(request.Url, request.UrlMeta) + request *StreamTaskRequest) (io.ReadCloser, map[string]string, bool) { + taskID := idgen.TaskID(request.URL, request.URLMeta) reuse := ptm.storageManager.FindCompletedTask(taskID) + var ( + rg *clientutil.Range // the range of parent peer task data to read + log *logger.SugaredLoggerOnWith + ) if reuse == nil { - return nil, nil, false + // for ranged request, check the parent task + if request.Range == nil { + return nil, nil, false + } + taskID = idgen.ParentTaskID(request.URL, request.URLMeta) + reuse = ptm.storageManager.FindCompletedTask(taskID) + if reuse == nil { + return nil, nil, false + } + rg = request.Range } - log := logger.With("peer", request.PeerId, "task", taskID, "component", "reuseStreamPeerTask") - log.Infof("reuse from peer task: %s, size: %d", reuse.PeerID, reuse.ContentLength) + if rg == nil { + log = logger.With("peer", request.PeerID, "task", taskID, "component", "reuseStreamPeerTask") + log.Infof("reuse from peer task: %s, total size: %d", reuse.PeerID, reuse.ContentLength) + } else { + log = logger.With("peer", request.PeerID, "task", taskID, "component", "reuseRangeStreamPeerTask") + log.Infof("reuse partial data from peer task: %s, total size: %d, range: %s", + reuse.PeerID, reuse.ContentLength, request.URLMeta.Range) + } ctx, span := tracer.Start(ctx, config.SpanStreamTask, trace.WithSpanKind(trace.SpanKindClient)) span.SetAttributes(config.AttributePeerHost.String(ptm.host.Uuid)) span.SetAttributes(semconv.NetHostIPKey.String(ptm.host.Ip)) span.SetAttributes(config.AttributeTaskID.String(taskID)) - span.SetAttributes(config.AttributePeerID.String(request.PeerId)) + span.SetAttributes(config.AttributePeerID.String(request.PeerID)) span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID)) - span.SetAttributes(semconv.HTTPURLKey.String(request.Url)) + span.SetAttributes(semconv.HTTPURLKey.String(request.URL)) + if rg != nil { + span.SetAttributes(config.AttributeReuseRange.String(request.URLMeta.Range)) + } defer span.End() - rc, err := ptm.storageManager.ReadAllPieces(ctx, &reuse.PeerTaskMetadata) + rc, err := ptm.storageManager.ReadAllPieces(ctx, + &storage.ReadAllPiecesRequest{PeerTaskMetadata: reuse.PeerTaskMetadata, Range: rg}) if err != nil { - log.Errorf("read all pieces error when reuse peer task: %s", err) + log.Errorf("read pieces error when reuse peer task: %s", err) span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false)) span.RecordError(err) return nil, nil, false } attr := map[string]string{} - attr[headers.ContentLength] = fmt.Sprintf("%d", reuse.ContentLength) attr[config.HeaderDragonflyTask] = taskID - attr[config.HeaderDragonflyPeer] = request.PeerId + attr[config.HeaderDragonflyPeer] = request.PeerID + if rg != nil { + attr[config.HeaderDragonflyRange] = request.URLMeta.Range + attr[headers.ContentRange] = fmt.Sprintf("bytes %d-%d/%d", rg.Start, rg.Start+rg.Length-1, reuse.ContentLength) + attr[headers.ContentLength] = fmt.Sprintf("%d", rg.Length) + } else { + attr[headers.ContentLength] = fmt.Sprintf("%d", reuse.ContentLength) + } // TODO record time when file closed, need add a type to implement Close and WriteTo span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true)) diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index 50bf2b1c6..4456549b2 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/otel/trace" "golang.org/x/time/rate" + "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/metrics" "d7y.io/dragonfly/v2/client/daemon/storage" @@ -39,6 +40,8 @@ type StreamTaskRequest struct { URL string // url meta info URLMeta *base.UrlMeta + // http range + Range *clientutil.Range // peer's id and must be global uniqueness PeerID string } @@ -71,6 +74,12 @@ func (ptm *peerTaskManager) newStreamTask( if err != nil { return nil, err } + + // prefetch parent request + if ptm.enablePrefetch && request.UrlMeta.Range != "" { + go ptm.prefetch(request) + } + ctx, span := tracer.Start(ctx, config.SpanStreamTask, trace.WithSpanKind(trace.SpanKindClient)) pt := &streamTask{ SugaredLoggerOnWith: ptc.SugaredLoggerOnWith, @@ -101,10 +110,11 @@ func (s *streamTask) Start(ctx context.Context) (io.ReadCloser, map[string]strin case <-s.peerTaskConductor.successCh: rc, err := s.peerTaskConductor.peerTaskManager.storageManager.ReadAllPieces( ctx, - &storage.PeerTaskMetadata{ - PeerID: s.peerTaskConductor.peerID, - TaskID: s.peerTaskConductor.taskID, - }) + &storage.ReadAllPiecesRequest{ + PeerTaskMetadata: storage.PeerTaskMetadata{ + PeerID: s.peerTaskConductor.peerID, + TaskID: s.peerTaskConductor.taskID, + }}) return rc, attr, err case first := <-s.pieceCh: firstPiece = first @@ -124,7 +134,7 @@ func (s *streamTask) Start(ctx context.Context) (io.ReadCloser, map[string]strin } func (s *streamTask) writeOnePiece(w io.Writer, pieceNum int32) (int64, error) { - pr, pc, err := s.peerTaskConductor.storage.ReadPiece(s.ctx, &storage.ReadPieceRequest{ + pr, pc, err := s.peerTaskConductor.GetStorage().ReadPiece(s.ctx, &storage.ReadPieceRequest{ PeerTaskMetadata: storage.PeerTaskMetadata{ PeerID: s.peerTaskConductor.peerID, TaskID: s.peerTaskConductor.taskID, diff --git a/client/daemon/peer/piece_manager.go b/client/daemon/peer/piece_manager.go index 2224e1803..d8e8ac42d 100644 --- a/client/daemon/peer/piece_manager.go +++ b/client/daemon/peer/piece_manager.go @@ -248,7 +248,8 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, request *sc request.UrlMeta.Header = map[string]string{} } if request.UrlMeta.Range != "" { - request.UrlMeta.Header["Range"] = request.UrlMeta.Range + // in http source package, adapter will update the real range, we inject "X-Dragonfly-Range" here + request.UrlMeta.Header[source.Range] = request.UrlMeta.Range } log := pt.Log() log.Infof("start to download from source") @@ -282,6 +283,7 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, request *sc return err } response, err := source.Download(downloadRequest) + // TODO update expire info if err != nil { return err } diff --git a/client/daemon/storage/local_storage.go b/client/daemon/storage/local_storage.go index af8b220a8..9855067f7 100644 --- a/client/daemon/storage/local_storage.go +++ b/client/daemon/storage/local_storage.go @@ -248,24 +248,33 @@ func (t *localTaskStore) ReadPiece(ctx context.Context, req *ReadPieceRequest) ( return io.LimitReader(file, req.Range.Length), file, nil } -func (t *localTaskStore) ReadAllPieces(ctx context.Context, req *PeerTaskMetadata) (io.ReadCloser, error) { +func (t *localTaskStore) ReadAllPieces(ctx context.Context, req *ReadAllPiecesRequest) (io.ReadCloser, error) { if t.invalid.Load() { t.Errorf("invalid digest, refuse to read all pieces") return nil, ErrInvalidDigest } t.touch() + + // who call ReadPiece, who close the io.ReadCloser file, err := os.Open(t.DataFilePath) if err != nil { return nil, err } - if _, err = file.Seek(0, io.SeekStart); err != nil { + if req.Range == nil { + return file, nil + } + + if _, err = file.Seek(req.Range.Start, io.SeekStart); err != nil { file.Close() - t.Errorf("file seek failed: %v", err) + t.Errorf("file seek to %d failed: %v", req.Range.Start, err) return nil, err } - // who call ReadPiece, who close the io.ReadCloser - return file, nil + + return &limitedReadFile{ + reader: io.LimitReader(file, req.Range.Length), + closer: file, + }, nil } func (t *localTaskStore) Store(ctx context.Context, req *StoreRequest) error { @@ -479,3 +488,24 @@ func (t *localTaskStore) saveMetadata() error { } return err } + +// limitedReadFile implements io optimize for zero copy +type limitedReadFile struct { + reader io.Reader + closer io.Closer +} + +func (l *limitedReadFile) Read(p []byte) (n int, err error) { + return l.reader.Read(p) +} + +func (l *limitedReadFile) Close() error { + return l.closer.Close() +} + +func (l *limitedReadFile) WriteTo(w io.Writer) (n int64, err error) { + if r, ok := w.(io.ReaderFrom); ok { + return r.ReadFrom(l.reader) + } + return io.Copy(w, l.reader) +} diff --git a/client/daemon/storage/metadata.go b/client/daemon/storage/metadata.go index a5ec98c58..28e644125 100644 --- a/client/daemon/storage/metadata.go +++ b/client/daemon/storage/metadata.go @@ -82,6 +82,10 @@ type ReadPieceRequest struct { PeerTaskMetadata PieceMetadata } +type ReadAllPiecesRequest struct { + PeerTaskMetadata + Range *clientutil.Range +} type UpdateTaskRequest struct { PeerTaskMetadata diff --git a/client/daemon/storage/storage_manager.go b/client/daemon/storage/storage_manager.go index 74e966542..832d23bb2 100644 --- a/client/daemon/storage/storage_manager.go +++ b/client/daemon/storage/storage_manager.go @@ -53,7 +53,7 @@ type TaskStorageDriver interface { // If req.Num is equal to -1, range has a fixed value. ReadPiece(ctx context.Context, req *ReadPieceRequest) (io.Reader, io.Closer, error) - ReadAllPieces(ctx context.Context, req *PeerTaskMetadata) (io.ReadCloser, error) + ReadAllPieces(ctx context.Context, req *ReadAllPiecesRequest) (io.ReadCloser, error) GetPieces(ctx context.Context, req *base.PieceTaskRequest) (*base.PiecePacket, error) @@ -241,7 +241,7 @@ func (s *storageManager) ReadPiece(ctx context.Context, req *ReadPieceRequest) ( return t.(TaskStorageDriver).ReadPiece(ctx, req) } -func (s *storageManager) ReadAllPieces(ctx context.Context, req *PeerTaskMetadata) (io.ReadCloser, error) { +func (s *storageManager) ReadAllPieces(ctx context.Context, req *ReadAllPiecesRequest) (io.ReadCloser, error) { t, ok := s.LoadTask( PeerTaskMetadata{ PeerID: req.PeerID, diff --git a/client/daemon/test/mock/storage/manager.go b/client/daemon/test/mock/storage/manager.go index 22d441c54..aab197920 100644 --- a/client/daemon/test/mock/storage/manager.go +++ b/client/daemon/test/mock/storage/manager.go @@ -69,7 +69,7 @@ func (mr *MockTaskStorageDriverMockRecorder) IsInvalid(req interface{}) *gomock. } // ReadAllPieces mocks base method. -func (m *MockTaskStorageDriver) ReadAllPieces(ctx context.Context, req *storage.PeerTaskMetadata) (io.ReadCloser, error) { +func (m *MockTaskStorageDriver) ReadAllPieces(ctx context.Context, req *storage.ReadAllPiecesRequest) (io.ReadCloser, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ReadAllPieces", ctx, req) ret0, _ := ret[0].(io.ReadCloser) @@ -325,7 +325,7 @@ func (mr *MockManagerMockRecorder) Keep() *gomock.Call { } // ReadAllPieces mocks base method. -func (m *MockManager) ReadAllPieces(ctx context.Context, req *storage.PeerTaskMetadata) (io.ReadCloser, error) { +func (m *MockManager) ReadAllPieces(ctx context.Context, req *storage.ReadAllPiecesRequest) (io.ReadCloser, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ReadAllPieces", ctx, req) ret0, _ := ret[0].(io.ReadCloser) diff --git a/client/daemon/transport/transport.go b/client/daemon/transport/transport.go index d4288ee3c..2cbfd9235 100644 --- a/client/daemon/transport/transport.go +++ b/client/daemon/transport/transport.go @@ -17,19 +17,24 @@ package transport import ( + "bytes" "context" "crypto/tls" "fmt" + "io" + "math" "net" "net/http" "net/http/httputil" "regexp" "strconv" + "strings" "time" "github.com/go-http-utils/headers" "go.opentelemetry.io/otel/propagation" + "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/metrics" "d7y.io/dragonfly/v2/client/daemon/peer" @@ -201,11 +206,23 @@ func (rt *transport) download(ctx context.Context, req *http.Request) (*http.Res // Init meta value meta := &base.UrlMeta{Header: map[string]string{}} + var rg *clientutil.Range // Set meta range's value - if rg := req.Header.Get("Range"); len(rg) > 0 { - meta.Digest = "" - meta.Range = rg + if rangeHeader := req.Header.Get("Range"); len(rangeHeader) > 0 { + rgs, err := clientutil.ParseRange(rangeHeader, math.MaxInt) + if err != nil { + return badRequest(req, err.Error()) + } + if len(rgs) > 1 { + // TODO support multiple range request + return notImplemented(req, "multiple range is not supported") + } else if len(rgs) == 0 { + return requestedRangeNotSatisfiable(req, "zero range is not supported") + } + rg = &rgs[0] + // range in dragonfly is without "bytes=" + meta.Range = strings.TrimLeft(rangeHeader, "bytes=") } // Pick header's parameters @@ -224,6 +241,7 @@ func (rt *transport) download(ctx context.Context, req *http.Request) (*http.Res &peer.StreamTaskRequest{ URL: url, URLMeta: meta, + Range: rg, PeerID: peerID, }, ) @@ -247,8 +265,14 @@ func (rt *transport) download(ctx context.Context, req *http.Request) (*http.Res } } + var status int + if meta.Range == "" { + status = http.StatusOK + } else { + status = http.StatusPartialContent + } resp := &http.Response{ - StatusCode: http.StatusOK, + StatusCode: status, Body: body, Header: hdr, ContentLength: contentLength, @@ -332,3 +356,28 @@ func delHopHeaders(header http.Header) { header.Del(h) } } + +func httpResponse(req *http.Request, status int, body string) (*http.Response, error) { + resp := &http.Response{ + StatusCode: status, + Body: io.NopCloser(bytes.NewBufferString(body)), + ContentLength: int64(len(body)), + + Proto: req.Proto, + ProtoMajor: req.ProtoMajor, + ProtoMinor: req.ProtoMinor, + } + return resp, nil +} + +func badRequest(req *http.Request, body string) (*http.Response, error) { + return httpResponse(req, http.StatusBadRequest, body) +} + +func notImplemented(req *http.Request, body string) (*http.Response, error) { + return httpResponse(req, http.StatusNotImplemented, body) +} + +func requestedRangeNotSatisfiable(req *http.Request, body string) (*http.Response, error) { + return httpResponse(req, http.StatusRequestedRangeNotSatisfiable, body) +} diff --git a/client/dfget/dfget.go b/client/dfget/dfget.go index b55575391..1c229b5aa 100644 --- a/client/dfget/dfget.go +++ b/client/dfget/dfget.go @@ -28,11 +28,11 @@ import ( "strings" "time" + "github.com/go-http-utils/headers" "github.com/pkg/errors" "github.com/schollz/progressbar/v3" "d7y.io/dragonfly/v2/client/config" - "d7y.io/dragonfly/v2/internal/dfheaders" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/basic" "d7y.io/dragonfly/v2/pkg/rpc/base" @@ -210,6 +210,10 @@ func parseHeader(s []string) map[string]string { } func newDownRequest(cfg *config.DfgetConfig, hdr map[string]string) *dfdaemon.DownRequest { + var rg string + if r, ok := hdr[headers.Range]; ok { + rg = strings.TrimLeft(r, "bytes=") + } return &dfdaemon.DownRequest{ Url: cfg.URL, Output: cfg.Output, @@ -219,7 +223,7 @@ func newDownRequest(cfg *config.DfgetConfig, hdr map[string]string) *dfdaemon.Do UrlMeta: &base.UrlMeta{ Digest: cfg.Digest, Tag: cfg.Tag, - Range: hdr[dfheaders.Range], + Range: rg, Filter: cfg.Filter, Header: hdr, }, diff --git a/internal/dfheaders/headers.go b/internal/dfheaders/headers.go deleted file mode 100644 index 970459d7e..000000000 --- a/internal/dfheaders/headers.go +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright 2020 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dfheaders - -const ( - Range = "Range" -) diff --git a/pkg/idgen/task_id.go b/pkg/idgen/task_id.go index 3287ec83b..0aa938c22 100644 --- a/pkg/idgen/task_id.go +++ b/pkg/idgen/task_id.go @@ -24,9 +24,7 @@ import ( "d7y.io/dragonfly/v2/pkg/util/net/urlutils" ) -// TaskID generates a taskId. -// filter is separated by & character. -func TaskID(url string, meta *base.UrlMeta) string { +func taskID(url string, meta *base.UrlMeta, ignoreRange bool) string { var filters []string if meta != nil && meta.Filter != "" { filters = strings.Split(meta.Filter, "&") @@ -39,7 +37,7 @@ func TaskID(url string, meta *base.UrlMeta) string { data = append(data, meta.Digest) } - if meta.Range != "" { + if !ignoreRange && meta.Range != "" { data = append(data, meta.Range) } @@ -50,3 +48,15 @@ func TaskID(url string, meta *base.UrlMeta) string { return digestutils.Sha256(data...) } + +// TaskID generates a task id. +// filter is separated by & character. +func TaskID(url string, meta *base.UrlMeta) string { + return taskID(url, meta, false) +} + +// ParentTaskID generates a task id like TaskID, but without range. +// this func is used to check the parent tasks for ranged requests +func ParentTaskID(url string, meta *base.UrlMeta) string { + return taskID(url, meta, true) +} diff --git a/pkg/idgen/task_id_test.go b/pkg/idgen/task_id_test.go index cd44652d7..86e0f6835 100644 --- a/pkg/idgen/task_id_test.go +++ b/pkg/idgen/task_id_test.go @@ -26,10 +26,11 @@ import ( func TestTaskID(t *testing.T) { tests := []struct { - name string - url string - meta *base.UrlMeta - expect func(t *testing.T, d interface{}) + name string + url string + meta *base.UrlMeta + ignoreRange bool + expect func(t *testing.T, d interface{}) }{ { name: "generate taskID with url", @@ -53,6 +54,20 @@ func TestTaskID(t *testing.T) { assert.Equal("aeee0e0a2a0c75130582641353c539aaf9011a0088b31347f7588e70e449a3e0", d) }, }, + { + name: "generate taskID with meta", + url: "https://example.com", + meta: &base.UrlMeta{ + Range: "foo", + Digest: "bar", + Tag: "", + }, + ignoreRange: true, + expect: func(t *testing.T, d interface{}) { + assert := assert.New(t) + assert.Equal("63dee2822037636b0109876b58e95692233840753a882afa69b9b5ee82a6c57d", d) + }, + }, { name: "generate taskID with filter", url: "https://example.com?foo=foo&bar=bar", @@ -80,7 +95,12 @@ func TestTaskID(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - data := TaskID(tc.url, tc.meta) + var data string + if tc.ignoreRange { + data = ParentTaskID(tc.url, tc.meta) + } else { + data = TaskID(tc.url, tc.meta) + } tc.expect(t, data) }) }