Feature: prefetch ranged requests (#1053)

1. implement prefetch ranged requests
2. optimize exact http code in transport
3. simplify reuse peer task logic
4. reuse peer task for ranged request size error
5. fix data race for peer task storage

Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
Jim Ma 2022-02-10 21:40:03 +08:00 committed by Gaius
parent 54a9ed79d8
commit f80c75efdb
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
21 changed files with 418 additions and 103 deletions

View File

@ -29,6 +29,10 @@ type Range struct {
Start, Length int64
}
func (r Range) String() string {
return fmt.Sprintf("bytes=%d-%d", r.Start, r.Start+r.Length-1)
}
// ErrNoOverlap is returned by ParseRange if first-byte-pos of
// all of the byte-range-spec values is greater than the content size.
var ErrNoOverlap = errors.New("invalid range: failed to overlap")

View File

@ -25,6 +25,7 @@ const (
AttributePeerID = attribute.Key("d7y.peer.id")
AttributeTargetPeerID = attribute.Key("d7y.peer.target.id")
AttributeReusePeerID = attribute.Key("d7y.peer.reuse.id")
AttributeReuseRange = attribute.Key("d7y.peer.reuse.range")
AttributeTargetPeerAddr = attribute.Key("d7y.peer.target.addr")
AttributeMainPeer = attribute.Key("d7y.peer.task.main_peer")
AttributePeerPacketCode = attribute.Key("d7y.peer.packet.code")

View File

@ -20,6 +20,7 @@ const (
HeaderDragonflyFilter = "X-Dragonfly-Filter"
HeaderDragonflyPeer = "X-Dragonfly-Peer"
HeaderDragonflyTask = "X-Dragonfly-Task"
HeaderDragonflyRange = "X-Dragonfly-Range"
HeaderDragonflyBiz = "X-Dragonfly-Biz"
// HeaderDragonflyRegistry is used for dynamic registry mirrors
HeaderDragonflyRegistry = "X-Dragonfly-Registry"

View File

@ -175,6 +175,7 @@ type DownloadOption struct {
CalculateDigest bool `mapstructure:"calculateDigest" yaml:"calculateDigest"`
TransportOption *TransportOption `mapstructure:"transportOption" yaml:"transportOption"`
GetPiecesMaxRetry int `mapstructure:"getPiecesMaxRetry" yaml:"getPiecesMaxRetry"`
Prefetch bool `mapstructure:"prefetch" yaml:"prefetch"`
}
type TransportOption struct {

View File

@ -169,7 +169,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
return nil, err
}
peerTaskManager, err := peer.NewPeerTaskManager(host, pieceManager, storageManager, sched, opt.Scheduler,
opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.CalculateDigest, opt.Download.GetPiecesMaxRetry)
opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.Prefetch, opt.Download.CalculateDigest, opt.Download.GetPiecesMaxRetry)
if err != nil {
return nil, err
}

View File

@ -414,7 +414,7 @@ func (pt *peerTaskConductor) storeTinyPeerTask() {
pt.SetTotalPieces(1)
ctx := pt.ctx
var err error
pt.storage, err = pt.peerTaskManager.storageManager.RegisterTask(ctx,
storageDriver, err := pt.peerTaskManager.storageManager.RegisterTask(ctx,
storage.RegisterTaskRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.tinyData.PeerID,
@ -424,12 +424,13 @@ func (pt *peerTaskConductor) storeTinyPeerTask() {
TotalPieces: 1,
// TODO check digest
})
pt.storage = storageDriver
if err != nil {
logger.Errorf("register tiny data storage failed: %s", err)
pt.cancel(base.Code_ClientError, err.Error())
return
}
n, err := pt.storage.WritePiece(ctx,
n, err := pt.GetStorage().WritePiece(ctx,
&storage.WritePieceRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: pt.tinyData.PeerID,
@ -653,7 +654,7 @@ func (pt *peerTaskConductor) pullSinglePiece() {
}
request := &DownloadPieceRequest{
storage: pt.storage,
storage: pt.GetStorage(),
piece: pt.singlePiece.PieceInfo,
log: pt.Log(),
TaskID: pt.GetTaskID(),
@ -892,7 +893,7 @@ func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestCh chan *DownloadP
pt.requestedPieces.Set(piece.PieceNum)
}
req := &DownloadPieceRequest{
storage: pt.storage,
storage: pt.GetStorage(),
piece: piece,
log: pt.Log(),
TaskID: pt.GetTaskID(),
@ -1106,6 +1107,12 @@ func (pt *peerTaskConductor) reportFailResult(request *DownloadPieceRequest, res
}
func (pt *peerTaskConductor) InitStorage() (err error) {
pt.lock.Lock()
defer pt.lock.Unlock()
// check storage for partial back source cases.
if pt.storage != nil {
return nil
}
// prepare storage
pt.storage, err = pt.storageManager.RegisterTask(pt.ctx,
storage.RegisterTaskRequest{
@ -1125,7 +1132,7 @@ func (pt *peerTaskConductor) InitStorage() (err error) {
func (pt *peerTaskConductor) UpdateStorage() error {
// update storage
err := pt.storage.UpdateTask(pt.ctx,
err := pt.GetStorage().UpdateTask(pt.ctx,
&storage.UpdateTaskRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: pt.GetPeerID(),
@ -1278,7 +1285,7 @@ func (pt *peerTaskConductor) fail() {
// Validate stores metadata and validates digest
func (pt *peerTaskConductor) Validate() error {
err := pt.storage.Store(pt.ctx,
err := pt.GetStorage().Store(pt.ctx,
&storage.StoreRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.peerID,
@ -1295,7 +1302,7 @@ func (pt *peerTaskConductor) Validate() error {
if !pt.peerTaskManager.calculateDigest {
return nil
}
err = pt.storage.ValidateDigest(
err = pt.GetStorage().ValidateDigest(
&storage.PeerTaskMetadata{
PeerID: pt.GetPeerID(),
TaskID: pt.GetTaskID(),

View File

@ -88,6 +88,10 @@ func (ptm *peerTaskManager) newFileTask(
if err != nil {
return nil, nil, err
}
// prefetch parent request
if ptm.enablePrefetch && request.UrlMeta.Range != "" {
go ptm.prefetch(&request.PeerTaskRequest)
}
ctx, span := tracer.Start(ctx, config.SpanFileTask, trace.WithSpanKind(trace.SpanKindClient))
pt := &fileTask{

View File

@ -21,6 +21,7 @@ import (
"io"
"sync"
"github.com/go-http-utils/headers"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"golang.org/x/time/rate"
@ -29,6 +30,8 @@ import (
"d7y.io/dragonfly/v2/client/daemon/metrics"
"d7y.io/dragonfly/v2/client/daemon/storage"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
)
@ -108,10 +111,10 @@ type peerTaskManager struct {
perPeerRateLimit rate.Limit
// enableMultiplex indicates reusing completed peer task storage
// currently, only check completed peer task after register to scheduler
// TODO multiplex the running peer task
// enableMultiplex indicates to reuse the data of completed peer tasks
enableMultiplex bool
// enablePrefetch indicates to prefetch the whole files of ranged requests
enablePrefetch bool
calculateDigest bool
@ -126,6 +129,7 @@ func NewPeerTaskManager(
schedulerOption config.SchedulerOption,
perPeerRateLimit rate.Limit,
multiplex bool,
prefetch bool,
calculateDigest bool,
getPiecesMaxRetry int) (TaskManager, error) {
@ -139,6 +143,7 @@ func NewPeerTaskManager(
schedulerOption: schedulerOption,
perPeerRateLimit: perPeerRateLimit,
enableMultiplex: multiplex,
enablePrefetch: prefetch,
calculateDigest: calculateDigest,
getPiecesMaxRetry: getPiecesMaxRetry,
}
@ -198,6 +203,41 @@ func (ptm *peerTaskManager) getOrCreatePeerTaskConductor(
return ptc, true, nil
}
func (ptm *peerTaskManager) prefetch(request *scheduler.PeerTaskRequest) {
req := &scheduler.PeerTaskRequest{
Url: request.Url,
PeerId: request.PeerId,
PeerHost: ptm.host,
HostLoad: request.HostLoad,
IsMigrating: request.IsMigrating,
UrlMeta: &base.UrlMeta{
Digest: request.UrlMeta.Digest,
Tag: request.UrlMeta.Tag,
Filter: request.UrlMeta.Filter,
Header: map[string]string{},
},
}
for k, v := range request.UrlMeta.Header {
if k == headers.Range {
continue
}
req.UrlMeta.Header[k] = v
}
taskID := idgen.TaskID(req.Url, req.UrlMeta)
req.PeerId = idgen.PeerID(req.PeerHost.Ip)
var limit = rate.Inf
if ptm.perPeerRateLimit > 0 {
limit = ptm.perPeerRateLimit
}
logger.Infof("prefetch peer task %s/%s", taskID, req.PeerId)
prefetch, err := ptm.getPeerTaskConductor(context.Background(), taskID, req, limit)
if err != nil {
logger.Errorf("prefetch peer task %s/%s error: %s", prefetch.taskID, prefetch.peerID, err)
}
}
func (ptm *peerTaskManager) StartFileTask(ctx context.Context, req *FileTaskRequest) (chan *FileTaskProgress, *TinyData, error) {
if ptm.enableMultiplex {
progress, ok := ptm.tryReuseFilePeerTask(ctx, req)
@ -235,7 +275,7 @@ func (ptm *peerTaskManager) StartStreamTask(ctx context.Context, req *StreamTask
}
if ptm.enableMultiplex {
r, attr, ok := ptm.tryReuseStreamPeerTask(ctx, peerTaskRequest)
r, attr, ok := ptm.tryReuseStreamPeerTask(ctx, req)
if ok {
metrics.PeerTaskCacheHitCount.Add(1)
return r, attr, nil

View File

@ -26,10 +26,12 @@ import (
"net/http/httptest"
"os"
"runtime"
"strings"
"sync"
"testing"
"time"
"github.com/go-http-utils/headers"
"github.com/golang/mock/gomock"
"github.com/phayes/freeport"
testifyassert "github.com/stretchr/testify/assert"
@ -281,6 +283,7 @@ type testSpec struct {
taskType int
name string
taskData []byte
httpRange *clientutil.Range // only used in back source cases
pieceParallelCount int32
pieceSize int
sizeScope base.SizeScope
@ -296,7 +299,7 @@ type testSpec struct {
backSource bool
mockPieceDownloader func(ctrl *gomock.Controller, taskData []byte, pieceSize int) PieceDownloader
mockHTTPSourceClient func(ctrl *gomock.Controller, taskData []byte, url string) source.ResourceClient
mockHTTPSourceClient func(t *testing.T, ctrl *gomock.Controller, rg *clientutil.Range, taskData []byte, url string) source.ResourceClient
cleanUp []func()
}
@ -367,7 +370,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
url: "http://localhost/test/data",
sizeScope: base.SizeScope_NORMAL,
mockPieceDownloader: nil,
mockHTTPSourceClient: func(ctrl *gomock.Controller, taskData []byte, url string) source.ResourceClient {
mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *clientutil.Range, taskData []byte, url string) source.ResourceClient {
sourceClient := sourceMock.NewMockResourceClient(ctrl)
sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
func(request *source.Request) (int64, error) {
@ -380,6 +383,47 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
return sourceClient
},
},
{
name: "normal size scope - range - back source - content length",
taskData: testBytes[0:4096],
httpRange: &clientutil.Range{
Start: 0,
Length: 4096,
},
pieceParallelCount: 4,
pieceSize: 1024,
peerID: "normal-size-peer-range-back-source",
backSource: true,
url: "http://localhost/test/data",
sizeScope: base.SizeScope_NORMAL,
mockPieceDownloader: nil,
mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *clientutil.Range, taskData []byte, url string) source.ResourceClient {
sourceClient := sourceMock.NewMockResourceClient(ctrl)
sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
func(request *source.Request) (int64, error) {
assert := testifyassert.New(t)
if rg != nil {
rgs, err := clientutil.ParseRange(request.Header.Get(headers.Range), math.MaxInt)
assert.Nil(err)
assert.Equal(1, len(rgs))
assert.Equal(rg.String(), rgs[0].String())
}
return int64(len(taskData)), nil
})
sourceClient.EXPECT().Download(source.RequestEq(url)).AnyTimes().DoAndReturn(
func(request *source.Request) (*source.Response, error) {
assert := testifyassert.New(t)
if rg != nil {
rgs, err := clientutil.ParseRange(request.Header.Get(headers.Range), math.MaxInt)
assert.Nil(err)
assert.Equal(1, len(rgs))
assert.Equal(rg.String(), rgs[0].String())
}
return source.NewResponse(io.NopCloser(bytes.NewBuffer(taskData))), nil
})
return sourceClient
},
},
{
name: "normal size scope - back source - no content length",
taskData: testBytes,
@ -390,7 +434,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
url: "http://localhost/test/data",
sizeScope: base.SizeScope_NORMAL,
mockPieceDownloader: nil,
mockHTTPSourceClient: func(ctrl *gomock.Controller, taskData []byte, url string) source.ResourceClient {
mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *clientutil.Range, taskData []byte, url string) source.ResourceClient {
sourceClient := sourceMock.NewMockResourceClient(ctrl)
sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
func(request *source.Request) (int64, error) {
@ -413,7 +457,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
url: "http://localhost/test/data",
sizeScope: base.SizeScope_NORMAL,
mockPieceDownloader: nil,
mockHTTPSourceClient: func(ctrl *gomock.Controller, taskData []byte, url string) source.ResourceClient {
mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *clientutil.Range, taskData []byte, url string) source.ResourceClient {
sourceClient := sourceMock.NewMockResourceClient(ctrl)
sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
func(request *source.Request) (int64, error) {
@ -469,11 +513,25 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
urlMeta := &base.UrlMeta{
Tag: "d7y-test",
}
if tc.httpRange != nil {
urlMeta.Range = strings.TrimLeft(tc.httpRange.String(), "bytes=")
}
if tc.urlGenerator != nil {
tc.url = tc.urlGenerator(&tc)
}
taskID := idgen.TaskID(tc.url, urlMeta)
var (
downloader PieceDownloader
sourceClient source.ResourceClient
)
if tc.mockPieceDownloader != nil {
downloader = tc.mockPieceDownloader(ctrl, tc.taskData, tc.pieceSize)
}
if tc.mockHTTPSourceClient != nil {
source.UnRegister("http")
defer func() {
@ -482,18 +540,8 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
require.Nil(source.Register("http", httpprotocol.NewHTTPSourceClient(), httpprotocol.Adapter))
}()
// replace source client
require.Nil(source.Register("http", tc.mockHTTPSourceClient(ctrl, tc.taskData, tc.url), httpprotocol.Adapter))
}
var (
downloader PieceDownloader
sourceClient source.ResourceClient
)
if tc.mockPieceDownloader != nil {
downloader = tc.mockPieceDownloader(ctrl, tc.taskData, tc.pieceSize)
}
if tc.mockHTTPSourceClient != nil {
sourceClient = tc.mockHTTPSourceClient(ctrl, tc.taskData, tc.url)
sourceClient = tc.mockHTTPSourceClient(t, ctrl, tc.httpRange, tc.taskData, tc.url)
require.Nil(source.Register("http", sourceClient, httpprotocol.Adapter))
}
option := componentsOption{
@ -596,14 +644,14 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
assert.Nil(os.Remove(output))
}()
request := &scheduler.PeerTaskRequest{
peerTaskRequest := &scheduler.PeerTaskRequest{
Url: ts.url,
UrlMeta: urlMeta,
PeerId: ts.peerID,
PeerHost: &scheduler.PeerHost{},
}
ptc, created, err := ptm.getOrCreatePeerTaskConductor(context.Background(), taskID, request, rate.Limit(pieceSize*4))
ptc, created, err := ptm.getOrCreatePeerTaskConductor(context.Background(), taskID, peerTaskRequest, rate.Limit(pieceSize*4))
assert.Nil(err, "load first peerTaskConductor")
assert.True(created, "should create a new peerTaskConductor")
@ -698,7 +746,12 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
assert.True(noRunningTask, "no running tasks")
// test reuse stream task
rc, _, ok := ptm.tryReuseStreamPeerTask(context.Background(), request)
rc, _, ok := ptm.tryReuseStreamPeerTask(context.Background(),
&StreamTaskRequest{
URL: ts.url,
URLMeta: urlMeta,
PeerID: ts.peerID,
})
assert.True(ok, "reuse stream task")
defer func() {
assert.Nil(rc.Close())

View File

@ -20,18 +20,21 @@ import (
"context"
"fmt"
"io"
"math"
"os"
"time"
"github.com/go-http-utils/headers"
"go.opentelemetry.io/otel/semconv"
"go.opentelemetry.io/otel/trace"
"d7y.io/dragonfly/v2/client/clientutil"
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/storage"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
"d7y.io/dragonfly/v2/pkg/util/rangeutils"
)
var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation
@ -40,11 +43,41 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context,
request *FileTaskRequest) (chan *FileTaskProgress, bool) {
taskID := idgen.TaskID(request.Url, request.UrlMeta)
reuse := ptm.storageManager.FindCompletedTask(taskID)
var (
rg *clientutil.Range // the range of parent peer task data to read
log *logger.SugaredLoggerOnWith
length int64
err error
)
if reuse == nil {
return nil, false
taskID = idgen.ParentTaskID(request.Url, request.UrlMeta)
reuse = ptm.storageManager.FindCompletedTask(taskID)
if reuse == nil {
return nil, false
}
var r *rangeutils.Range
r, err = rangeutils.ParseRange(request.UrlMeta.Range, math.MaxInt)
if err != nil {
logger.Warnf("parse range %s error: %s", request.UrlMeta.Range, err)
return nil, false
}
rg = &clientutil.Range{
Start: int64(r.StartIndex),
Length: int64(r.EndIndex - r.StartIndex + 1),
}
}
log := logger.With("peer", request.PeerId, "task", taskID, "component", "reuseFilePeerTask")
if rg == nil {
log = logger.With("peer", request.PeerId, "task", taskID, "component", "reuseFilePeerTask")
log.Infof("reuse from peer task: %s, total size: %d", reuse.PeerID, reuse.ContentLength)
length = reuse.ContentLength
} else {
log = logger.With("peer", request.PeerId, "task", taskID, "range", request.UrlMeta.Range,
"component", "reuseRangeFilePeerTask")
log.Infof("reuse partial data from peer task: %s, total size: %d, range: %s",
reuse.PeerID, reuse.ContentLength, request.UrlMeta.Range)
length = rg.Length
}
_, span := tracer.Start(ctx, config.SpanReusePeerTask, trace.WithSpanKind(trace.SpanKindClient))
span.SetAttributes(config.AttributePeerHost.String(ptm.host.Uuid))
@ -53,15 +86,17 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context,
span.SetAttributes(config.AttributePeerID.String(request.PeerId))
span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID))
span.SetAttributes(semconv.HTTPURLKey.String(request.Url))
if rg != nil {
span.SetAttributes(config.AttributeReuseRange.String(request.UrlMeta.Range))
}
defer span.End()
log.Infof("reuse from peer task: %s, size: %d", reuse.PeerID, reuse.ContentLength)
log.Infof("reuse from peer task: %s, total size: %d, target size: %d", reuse.PeerID, reuse.ContentLength, length)
span.AddEvent("reuse peer task", trace.WithAttributes(config.AttributePeerID.String(reuse.PeerID)))
start := time.Now()
err := ptm.storageManager.Store(
context.Background(),
&storage.StoreRequest{
if rg == nil {
storeRequest := &storage.StoreRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: reuse.PeerID,
TaskID: taskID,
@ -70,13 +105,19 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context,
MetadataOnly: false,
StoreOnly: true,
TotalPieces: reuse.TotalPieces,
})
}
err = ptm.storageManager.Store(context.Background(), storeRequest)
} else {
err = ptm.storePartialFile(ctx, request, log, reuse, rg)
}
if err != nil {
log.Errorf("store error when reuse peer task: %s", err)
span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
span.RecordError(err)
return nil, false
}
var cost = time.Now().Sub(start).Milliseconds()
log.Infof("reuse file peer task done, cost: %dms", cost)
@ -88,8 +129,8 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context,
},
TaskID: taskID,
PeerID: request.PeerId,
ContentLength: reuse.ContentLength,
CompletedLength: reuse.ContentLength,
ContentLength: length,
CompletedLength: length,
PeerTaskDone: true,
DoneCallback: func() {},
}
@ -103,38 +144,93 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context,
return progressCh, true
}
func (ptm *peerTaskManager) storePartialFile(ctx context.Context, request *FileTaskRequest,
log *logger.SugaredLoggerOnWith, reuse *storage.ReusePeerTask, rg *clientutil.Range) error {
f, err := os.OpenFile(request.Output, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
if err != nil {
log.Errorf("open dest file error when reuse peer task: %s", err)
return err
}
rc, err := ptm.storageManager.ReadAllPieces(ctx,
&storage.ReadAllPiecesRequest{PeerTaskMetadata: reuse.PeerTaskMetadata, Range: rg})
if err != nil {
log.Errorf("read pieces error when reuse peer task: %s", err)
return err
}
defer rc.Close()
n, err := io.Copy(f, rc)
if err != nil {
log.Errorf("copy data error when reuse peer task: %s", err)
return err
}
if n != rg.Length {
log.Errorf("copy data length not match when reuse peer task, actual: %d, desire: %d", n, rg.Length)
return io.ErrShortBuffer
}
return nil
}
func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context,
request *scheduler.PeerTaskRequest) (io.ReadCloser, map[string]string, bool) {
taskID := idgen.TaskID(request.Url, request.UrlMeta)
request *StreamTaskRequest) (io.ReadCloser, map[string]string, bool) {
taskID := idgen.TaskID(request.URL, request.URLMeta)
reuse := ptm.storageManager.FindCompletedTask(taskID)
var (
rg *clientutil.Range // the range of parent peer task data to read
log *logger.SugaredLoggerOnWith
)
if reuse == nil {
return nil, nil, false
// for ranged request, check the parent task
if request.Range == nil {
return nil, nil, false
}
taskID = idgen.ParentTaskID(request.URL, request.URLMeta)
reuse = ptm.storageManager.FindCompletedTask(taskID)
if reuse == nil {
return nil, nil, false
}
rg = request.Range
}
log := logger.With("peer", request.PeerId, "task", taskID, "component", "reuseStreamPeerTask")
log.Infof("reuse from peer task: %s, size: %d", reuse.PeerID, reuse.ContentLength)
if rg == nil {
log = logger.With("peer", request.PeerID, "task", taskID, "component", "reuseStreamPeerTask")
log.Infof("reuse from peer task: %s, total size: %d", reuse.PeerID, reuse.ContentLength)
} else {
log = logger.With("peer", request.PeerID, "task", taskID, "component", "reuseRangeStreamPeerTask")
log.Infof("reuse partial data from peer task: %s, total size: %d, range: %s",
reuse.PeerID, reuse.ContentLength, request.URLMeta.Range)
}
ctx, span := tracer.Start(ctx, config.SpanStreamTask, trace.WithSpanKind(trace.SpanKindClient))
span.SetAttributes(config.AttributePeerHost.String(ptm.host.Uuid))
span.SetAttributes(semconv.NetHostIPKey.String(ptm.host.Ip))
span.SetAttributes(config.AttributeTaskID.String(taskID))
span.SetAttributes(config.AttributePeerID.String(request.PeerId))
span.SetAttributes(config.AttributePeerID.String(request.PeerID))
span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID))
span.SetAttributes(semconv.HTTPURLKey.String(request.Url))
span.SetAttributes(semconv.HTTPURLKey.String(request.URL))
if rg != nil {
span.SetAttributes(config.AttributeReuseRange.String(request.URLMeta.Range))
}
defer span.End()
rc, err := ptm.storageManager.ReadAllPieces(ctx, &reuse.PeerTaskMetadata)
rc, err := ptm.storageManager.ReadAllPieces(ctx,
&storage.ReadAllPiecesRequest{PeerTaskMetadata: reuse.PeerTaskMetadata, Range: rg})
if err != nil {
log.Errorf("read all pieces error when reuse peer task: %s", err)
log.Errorf("read pieces error when reuse peer task: %s", err)
span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
span.RecordError(err)
return nil, nil, false
}
attr := map[string]string{}
attr[headers.ContentLength] = fmt.Sprintf("%d", reuse.ContentLength)
attr[config.HeaderDragonflyTask] = taskID
attr[config.HeaderDragonflyPeer] = request.PeerId
attr[config.HeaderDragonflyPeer] = request.PeerID
if rg != nil {
attr[config.HeaderDragonflyRange] = request.URLMeta.Range
attr[headers.ContentRange] = fmt.Sprintf("bytes %d-%d/%d", rg.Start, rg.Start+rg.Length-1, reuse.ContentLength)
attr[headers.ContentLength] = fmt.Sprintf("%d", rg.Length)
} else {
attr[headers.ContentLength] = fmt.Sprintf("%d", reuse.ContentLength)
}
// TODO record time when file closed, need add a type to implement Close and WriteTo
span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))

View File

@ -25,6 +25,7 @@ import (
"go.opentelemetry.io/otel/trace"
"golang.org/x/time/rate"
"d7y.io/dragonfly/v2/client/clientutil"
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/metrics"
"d7y.io/dragonfly/v2/client/daemon/storage"
@ -39,6 +40,8 @@ type StreamTaskRequest struct {
URL string
// url meta info
URLMeta *base.UrlMeta
// http range
Range *clientutil.Range
// peer's id and must be global uniqueness
PeerID string
}
@ -71,6 +74,12 @@ func (ptm *peerTaskManager) newStreamTask(
if err != nil {
return nil, err
}
// prefetch parent request
if ptm.enablePrefetch && request.UrlMeta.Range != "" {
go ptm.prefetch(request)
}
ctx, span := tracer.Start(ctx, config.SpanStreamTask, trace.WithSpanKind(trace.SpanKindClient))
pt := &streamTask{
SugaredLoggerOnWith: ptc.SugaredLoggerOnWith,
@ -101,10 +110,11 @@ func (s *streamTask) Start(ctx context.Context) (io.ReadCloser, map[string]strin
case <-s.peerTaskConductor.successCh:
rc, err := s.peerTaskConductor.peerTaskManager.storageManager.ReadAllPieces(
ctx,
&storage.PeerTaskMetadata{
PeerID: s.peerTaskConductor.peerID,
TaskID: s.peerTaskConductor.taskID,
})
&storage.ReadAllPiecesRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: s.peerTaskConductor.peerID,
TaskID: s.peerTaskConductor.taskID,
}})
return rc, attr, err
case first := <-s.pieceCh:
firstPiece = first
@ -124,7 +134,7 @@ func (s *streamTask) Start(ctx context.Context) (io.ReadCloser, map[string]strin
}
func (s *streamTask) writeOnePiece(w io.Writer, pieceNum int32) (int64, error) {
pr, pc, err := s.peerTaskConductor.storage.ReadPiece(s.ctx, &storage.ReadPieceRequest{
pr, pc, err := s.peerTaskConductor.GetStorage().ReadPiece(s.ctx, &storage.ReadPieceRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: s.peerTaskConductor.peerID,
TaskID: s.peerTaskConductor.taskID,

View File

@ -248,7 +248,8 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, request *sc
request.UrlMeta.Header = map[string]string{}
}
if request.UrlMeta.Range != "" {
request.UrlMeta.Header["Range"] = request.UrlMeta.Range
// in http source package, adapter will update the real range, we inject "X-Dragonfly-Range" here
request.UrlMeta.Header[source.Range] = request.UrlMeta.Range
}
log := pt.Log()
log.Infof("start to download from source")
@ -282,6 +283,7 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, request *sc
return err
}
response, err := source.Download(downloadRequest)
// TODO update expire info
if err != nil {
return err
}

View File

@ -248,24 +248,33 @@ func (t *localTaskStore) ReadPiece(ctx context.Context, req *ReadPieceRequest) (
return io.LimitReader(file, req.Range.Length), file, nil
}
func (t *localTaskStore) ReadAllPieces(ctx context.Context, req *PeerTaskMetadata) (io.ReadCloser, error) {
func (t *localTaskStore) ReadAllPieces(ctx context.Context, req *ReadAllPiecesRequest) (io.ReadCloser, error) {
if t.invalid.Load() {
t.Errorf("invalid digest, refuse to read all pieces")
return nil, ErrInvalidDigest
}
t.touch()
// who call ReadPiece, who close the io.ReadCloser
file, err := os.Open(t.DataFilePath)
if err != nil {
return nil, err
}
if _, err = file.Seek(0, io.SeekStart); err != nil {
if req.Range == nil {
return file, nil
}
if _, err = file.Seek(req.Range.Start, io.SeekStart); err != nil {
file.Close()
t.Errorf("file seek failed: %v", err)
t.Errorf("file seek to %d failed: %v", req.Range.Start, err)
return nil, err
}
// who call ReadPiece, who close the io.ReadCloser
return file, nil
return &limitedReadFile{
reader: io.LimitReader(file, req.Range.Length),
closer: file,
}, nil
}
func (t *localTaskStore) Store(ctx context.Context, req *StoreRequest) error {
@ -479,3 +488,24 @@ func (t *localTaskStore) saveMetadata() error {
}
return err
}
// limitedReadFile implements io optimize for zero copy
type limitedReadFile struct {
reader io.Reader
closer io.Closer
}
func (l *limitedReadFile) Read(p []byte) (n int, err error) {
return l.reader.Read(p)
}
func (l *limitedReadFile) Close() error {
return l.closer.Close()
}
func (l *limitedReadFile) WriteTo(w io.Writer) (n int64, err error) {
if r, ok := w.(io.ReaderFrom); ok {
return r.ReadFrom(l.reader)
}
return io.Copy(w, l.reader)
}

View File

@ -82,6 +82,10 @@ type ReadPieceRequest struct {
PeerTaskMetadata
PieceMetadata
}
type ReadAllPiecesRequest struct {
PeerTaskMetadata
Range *clientutil.Range
}
type UpdateTaskRequest struct {
PeerTaskMetadata

View File

@ -53,7 +53,7 @@ type TaskStorageDriver interface {
// If req.Num is equal to -1, range has a fixed value.
ReadPiece(ctx context.Context, req *ReadPieceRequest) (io.Reader, io.Closer, error)
ReadAllPieces(ctx context.Context, req *PeerTaskMetadata) (io.ReadCloser, error)
ReadAllPieces(ctx context.Context, req *ReadAllPiecesRequest) (io.ReadCloser, error)
GetPieces(ctx context.Context, req *base.PieceTaskRequest) (*base.PiecePacket, error)
@ -241,7 +241,7 @@ func (s *storageManager) ReadPiece(ctx context.Context, req *ReadPieceRequest) (
return t.(TaskStorageDriver).ReadPiece(ctx, req)
}
func (s *storageManager) ReadAllPieces(ctx context.Context, req *PeerTaskMetadata) (io.ReadCloser, error) {
func (s *storageManager) ReadAllPieces(ctx context.Context, req *ReadAllPiecesRequest) (io.ReadCloser, error) {
t, ok := s.LoadTask(
PeerTaskMetadata{
PeerID: req.PeerID,

View File

@ -69,7 +69,7 @@ func (mr *MockTaskStorageDriverMockRecorder) IsInvalid(req interface{}) *gomock.
}
// ReadAllPieces mocks base method.
func (m *MockTaskStorageDriver) ReadAllPieces(ctx context.Context, req *storage.PeerTaskMetadata) (io.ReadCloser, error) {
func (m *MockTaskStorageDriver) ReadAllPieces(ctx context.Context, req *storage.ReadAllPiecesRequest) (io.ReadCloser, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ReadAllPieces", ctx, req)
ret0, _ := ret[0].(io.ReadCloser)
@ -325,7 +325,7 @@ func (mr *MockManagerMockRecorder) Keep() *gomock.Call {
}
// ReadAllPieces mocks base method.
func (m *MockManager) ReadAllPieces(ctx context.Context, req *storage.PeerTaskMetadata) (io.ReadCloser, error) {
func (m *MockManager) ReadAllPieces(ctx context.Context, req *storage.ReadAllPiecesRequest) (io.ReadCloser, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ReadAllPieces", ctx, req)
ret0, _ := ret[0].(io.ReadCloser)

View File

@ -17,19 +17,24 @@
package transport
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"math"
"net"
"net/http"
"net/http/httputil"
"regexp"
"strconv"
"strings"
"time"
"github.com/go-http-utils/headers"
"go.opentelemetry.io/otel/propagation"
"d7y.io/dragonfly/v2/client/clientutil"
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/metrics"
"d7y.io/dragonfly/v2/client/daemon/peer"
@ -201,11 +206,23 @@ func (rt *transport) download(ctx context.Context, req *http.Request) (*http.Res
// Init meta value
meta := &base.UrlMeta{Header: map[string]string{}}
var rg *clientutil.Range
// Set meta range's value
if rg := req.Header.Get("Range"); len(rg) > 0 {
meta.Digest = ""
meta.Range = rg
if rangeHeader := req.Header.Get("Range"); len(rangeHeader) > 0 {
rgs, err := clientutil.ParseRange(rangeHeader, math.MaxInt)
if err != nil {
return badRequest(req, err.Error())
}
if len(rgs) > 1 {
// TODO support multiple range request
return notImplemented(req, "multiple range is not supported")
} else if len(rgs) == 0 {
return requestedRangeNotSatisfiable(req, "zero range is not supported")
}
rg = &rgs[0]
// range in dragonfly is without "bytes="
meta.Range = strings.TrimLeft(rangeHeader, "bytes=")
}
// Pick header's parameters
@ -224,6 +241,7 @@ func (rt *transport) download(ctx context.Context, req *http.Request) (*http.Res
&peer.StreamTaskRequest{
URL: url,
URLMeta: meta,
Range: rg,
PeerID: peerID,
},
)
@ -247,8 +265,14 @@ func (rt *transport) download(ctx context.Context, req *http.Request) (*http.Res
}
}
var status int
if meta.Range == "" {
status = http.StatusOK
} else {
status = http.StatusPartialContent
}
resp := &http.Response{
StatusCode: http.StatusOK,
StatusCode: status,
Body: body,
Header: hdr,
ContentLength: contentLength,
@ -332,3 +356,28 @@ func delHopHeaders(header http.Header) {
header.Del(h)
}
}
func httpResponse(req *http.Request, status int, body string) (*http.Response, error) {
resp := &http.Response{
StatusCode: status,
Body: io.NopCloser(bytes.NewBufferString(body)),
ContentLength: int64(len(body)),
Proto: req.Proto,
ProtoMajor: req.ProtoMajor,
ProtoMinor: req.ProtoMinor,
}
return resp, nil
}
func badRequest(req *http.Request, body string) (*http.Response, error) {
return httpResponse(req, http.StatusBadRequest, body)
}
func notImplemented(req *http.Request, body string) (*http.Response, error) {
return httpResponse(req, http.StatusNotImplemented, body)
}
func requestedRangeNotSatisfiable(req *http.Request, body string) (*http.Response, error) {
return httpResponse(req, http.StatusRequestedRangeNotSatisfiable, body)
}

View File

@ -28,11 +28,11 @@ import (
"strings"
"time"
"github.com/go-http-utils/headers"
"github.com/pkg/errors"
"github.com/schollz/progressbar/v3"
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/internal/dfheaders"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/basic"
"d7y.io/dragonfly/v2/pkg/rpc/base"
@ -210,6 +210,10 @@ func parseHeader(s []string) map[string]string {
}
func newDownRequest(cfg *config.DfgetConfig, hdr map[string]string) *dfdaemon.DownRequest {
var rg string
if r, ok := hdr[headers.Range]; ok {
rg = strings.TrimLeft(r, "bytes=")
}
return &dfdaemon.DownRequest{
Url: cfg.URL,
Output: cfg.Output,
@ -219,7 +223,7 @@ func newDownRequest(cfg *config.DfgetConfig, hdr map[string]string) *dfdaemon.Do
UrlMeta: &base.UrlMeta{
Digest: cfg.Digest,
Tag: cfg.Tag,
Range: hdr[dfheaders.Range],
Range: rg,
Filter: cfg.Filter,
Header: hdr,
},

View File

@ -1,21 +0,0 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dfheaders
const (
Range = "Range"
)

View File

@ -24,9 +24,7 @@ import (
"d7y.io/dragonfly/v2/pkg/util/net/urlutils"
)
// TaskID generates a taskId.
// filter is separated by & character.
func TaskID(url string, meta *base.UrlMeta) string {
func taskID(url string, meta *base.UrlMeta, ignoreRange bool) string {
var filters []string
if meta != nil && meta.Filter != "" {
filters = strings.Split(meta.Filter, "&")
@ -39,7 +37,7 @@ func TaskID(url string, meta *base.UrlMeta) string {
data = append(data, meta.Digest)
}
if meta.Range != "" {
if !ignoreRange && meta.Range != "" {
data = append(data, meta.Range)
}
@ -50,3 +48,15 @@ func TaskID(url string, meta *base.UrlMeta) string {
return digestutils.Sha256(data...)
}
// TaskID generates a task id.
// filter is separated by & character.
func TaskID(url string, meta *base.UrlMeta) string {
return taskID(url, meta, false)
}
// ParentTaskID generates a task id like TaskID, but without range.
// this func is used to check the parent tasks for ranged requests
func ParentTaskID(url string, meta *base.UrlMeta) string {
return taskID(url, meta, true)
}

View File

@ -26,10 +26,11 @@ import (
func TestTaskID(t *testing.T) {
tests := []struct {
name string
url string
meta *base.UrlMeta
expect func(t *testing.T, d interface{})
name string
url string
meta *base.UrlMeta
ignoreRange bool
expect func(t *testing.T, d interface{})
}{
{
name: "generate taskID with url",
@ -53,6 +54,20 @@ func TestTaskID(t *testing.T) {
assert.Equal("aeee0e0a2a0c75130582641353c539aaf9011a0088b31347f7588e70e449a3e0", d)
},
},
{
name: "generate taskID with meta",
url: "https://example.com",
meta: &base.UrlMeta{
Range: "foo",
Digest: "bar",
Tag: "",
},
ignoreRange: true,
expect: func(t *testing.T, d interface{}) {
assert := assert.New(t)
assert.Equal("63dee2822037636b0109876b58e95692233840753a882afa69b9b5ee82a6c57d", d)
},
},
{
name: "generate taskID with filter",
url: "https://example.com?foo=foo&bar=bar",
@ -80,7 +95,12 @@ func TestTaskID(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
data := TaskID(tc.url, tc.meta)
var data string
if tc.ignoreRange {
data = ParentTaskID(tc.url, tc.meta)
} else {
data = TaskID(tc.url, tc.meta)
}
tc.expect(t, data)
})
}