From e329bdbb3911cf2ca2342325294176d87f08a056 Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Wed, 27 Dec 2023 15:54:21 +0800 Subject: [PATCH] feat: back to source with piece group(multiple pieces) (#2826) Signed-off-by: Jim Ma --- client/daemon/peer/piece_manager.go | 203 ++++++++++++- client/daemon/peer/piece_manager_test.go | 275 +++++++++++++++++- client/daemon/storage/local_storage.go | 2 +- .../daemon/storage/local_storage_subtask.go | 4 +- go.mod | 1 + go.sum | 2 + 6 files changed, 476 insertions(+), 11 deletions(-) diff --git a/client/daemon/peer/piece_manager.go b/client/daemon/peer/piece_manager.go index 6c2e378ab..c240ad02b 100644 --- a/client/daemon/peer/piece_manager.go +++ b/client/daemon/peer/piece_manager.go @@ -31,6 +31,7 @@ import ( "sync" "time" + mapset "github.com/deckarep/golang-set/v2" "github.com/go-http-utils/headers" "go.uber.org/atomic" "golang.org/x/time/rate" @@ -235,6 +236,7 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, request *DownloadPiec return result, nil } +// pieceOffset is the offset in the peer task, not the original range start from source func (pm *pieceManager) processPieceFromSource(pt Task, reader io.Reader, contentLength int64, pieceNum int32, pieceOffset uint64, pieceSize uint32, isLastPiece func(n int64) (totalPieces int32, contentLength int64, ok bool)) ( @@ -796,26 +798,129 @@ func (pm *pieceManager) concurrentDownloadSource(ctx context.Context, pt Task, p pt.SetContentLength(parsedRange.Length) pt.SetTotalPieces(pieceCount) + pieceCountToDownload := pieceCount - continuePieceNum + con := pm.concurrentOption.GoroutineCount // Fix int overflow - if int(pieceCount) > 0 && int(pieceCount) < con { - con = int(pieceCount) + if int(pieceCountToDownload) > 0 && int(pieceCountToDownload) < con { + con = int(pieceCountToDownload) } ctx, cancel := context.WithCancel(ctx) defer cancel() - err := pm.concurrentDownloadSourceByPiece(ctx, pt, peerTaskRequest, parsedRange, continuePieceNum, pieceCount, con, pieceSize, cancel) - if err != nil { - return err + return pm.concurrentDownloadSourceByPieceGroup(ctx, pt, peerTaskRequest, parsedRange, continuePieceNum, pieceCount, pieceCountToDownload, con, pieceSize, cancel) +} + +func (pm *pieceManager) concurrentDownloadSourceByPieceGroup( + ctx context.Context, pt Task, peerTaskRequest *schedulerv1.PeerTaskRequest, + parsedRange *nethttp.Range, startPieceNum int32, pieceCount int32, pieceCountToDownload int32, + con int, pieceSize uint32, cancel context.CancelFunc) error { + log := pt.Log() + log.Infof("start concurrentDownloadSourceByPieceGroup, startPieceNum: %d, pieceCount: %d, pieceCountToDownload: %d, con: %d, pieceSize: %d", + startPieceNum, pieceCount, pieceCountToDownload, con, pieceSize) + + var downloadError atomic.Value + downloadedPieces := mapset.NewSet[int32]() + + wg := sync.WaitGroup{} + wg.Add(con) + + minPieceCountPerGroup := pieceCountToDownload / int32(con) + reminderPieces := pieceCountToDownload % int32(con) + + // piece group eg: + // con = 4, piece = 5: + // worker 0: 2 + // worker 1: 1 + // worker 2: 1 + // worker 3: 1 + // worker 4: 1 + for i := int32(0); i < int32(con); i++ { + go func(i int32) { + pg := newPieceGroup(i, reminderPieces, startPieceNum, minPieceCountPerGroup, pieceSize, parsedRange) + log.Infof("concurrent worker %d start to download piece %d-%d, byte %d-%d", i, pg.start, pg.end, pg.startByte, pg.endByte) + _, _, retryErr := retry.Run(ctx, + pm.concurrentOption.InitBackoff, + pm.concurrentOption.MaxBackoff, + pm.concurrentOption.MaxAttempts, + func() (data any, cancel bool, err error) { + err = pm.downloadPieceGroupFromSource(ctx, pt, log, + peerTaskRequest, pg, pieceCount, pieceCountToDownload, downloadedPieces) + return nil, errors.Is(err, context.Canceled), err + }) + if retryErr != nil { + // download piece error after many retry, cancel task + cancel() + downloadError.Store(&backSourceError{err: retryErr}) + log.Infof("concurrent worker %d failed to download piece group after %d retries, last error: %s", + i, pm.concurrentOption.MaxAttempts, retryErr.Error()) + } + wg.Done() + }(i) + } + + wg.Wait() + + // check error + if downloadError.Load() != nil { + return downloadError.Load().(*backSourceError).err } return nil } +type pieceGroup struct { + start, end int32 + startByte, endByte int64 + // store original task metadata + pieceSize uint32 + parsedRange *nethttp.Range +} + +func newPieceGroup(i int32, reminderPieces int32, startPieceNum int32, minPieceCountPerGroup int32, pieceSize uint32, parsedRange *nethttp.Range) *pieceGroup { + var ( + start int32 + end int32 + ) + + if i < reminderPieces { + start = i*minPieceCountPerGroup + i + end = start + minPieceCountPerGroup + } else { + start = i*minPieceCountPerGroup + reminderPieces + end = start + minPieceCountPerGroup - 1 + } + + // adjust by startPieceNum + start += startPieceNum + end += startPieceNum + + // calculate piece group first and last range byte with parsedRange.Start + startByte := int64(start) * int64(pieceSize) + endByte := int64(end+1)*int64(pieceSize) - 1 + if endByte > parsedRange.Length-1 { + endByte = parsedRange.Length - 1 + } + + // adjust by range start + startByte += parsedRange.Start + endByte += parsedRange.Start + + pg := &pieceGroup{ + start: start, + end: end, + startByte: startByte, + endByte: endByte, + pieceSize: pieceSize, + parsedRange: parsedRange, + } + return pg +} + func (pm *pieceManager) concurrentDownloadSourceByPiece( ctx context.Context, pt Task, peerTaskRequest *schedulerv1.PeerTaskRequest, - parsedRange *nethttp.Range, startPieceNum int32, pieceCount int32, + parsedRange *nethttp.Range, startPieceNum int32, pieceCount int32, pieceCountToDownload int32, con int, pieceSize uint32, cancel context.CancelFunc) error { log := pt.Log() @@ -823,7 +928,7 @@ func (pm *pieceManager) concurrentDownloadSourceByPiece( var pieceCh = make(chan int32, con) wg := sync.WaitGroup{} - wg.Add(int(pieceCount - startPieceNum)) + wg.Add(int(pieceCountToDownload)) downloadedPieceCount := atomic.NewInt32(startPieceNum) @@ -965,3 +1070,87 @@ func (pm *pieceManager) downloadPieceFromSource(ctx context.Context, pt.PublishPieceInfo(pieceNum, uint32(result.Size)) return nil } + +func (pm *pieceManager) downloadPieceGroupFromSource(ctx context.Context, + pt Task, log *logger.SugaredLoggerOnWith, + peerTaskRequest *schedulerv1.PeerTaskRequest, + pg *pieceGroup, + totalPieceCount int32, + totalPieceCountToDownload int32, + downloadedPieces mapset.Set[int32]) error { + + backSourceRequest, err := source.NewRequestWithContext(ctx, peerTaskRequest.Url, peerTaskRequest.UrlMeta.Header) + if err != nil { + log.Errorf("build piece %d-%d back source request error: %s", pg.start, pg.end, err) + return err + } + + pieceGroupRange := fmt.Sprintf("%d-%d", pg.startByte, pg.endByte) + // FIXME refactor source package, normal Range header is enough + backSourceRequest.Header.Set(source.Range, pieceGroupRange) + backSourceRequest.Header.Set(headers.Range, "bytes="+pieceGroupRange) + + log.Debugf("piece %d-%d back source header: %#v", pg.start, pg.end, backSourceRequest.Header) + + response, err := source.Download(backSourceRequest) + if err != nil { + log.Errorf("piece %d-%d back source response error: %s", pg.start, pg.end, err) + return err + } + defer response.Body.Close() + + err = response.Validate() + if err != nil { + log.Errorf("piece %d-%d back source response validate error: %s", pg.start, pg.end, err) + return err + } + + log.Debugf("piece %d-%d back source response ok", pg.start, pg.end) + + for i := pg.start; i <= pg.end; i++ { + pieceNum := i + offset := uint64(pg.startByte) + uint64(i-pg.start)*uint64(pg.pieceSize) + size := pg.pieceSize + // update last piece size + if offset+uint64(size)-1 > uint64(pg.endByte) { + size = uint32(uint64(pg.endByte) + 1 - offset) + } + + result, md5, err := pm.processPieceFromSource( + pt, response.Body, pg.parsedRange.Length, pieceNum, offset-uint64(pg.parsedRange.Start), size, + func(int64) (int32, int64, bool) { + downloadedPieces.Add(pieceNum) + return totalPieceCount, pg.parsedRange.Length, downloadedPieces.Cardinality() == int(totalPieceCountToDownload) + }) + request := &DownloadPieceRequest{ + TaskID: pt.GetTaskID(), + PeerID: pt.GetPeerID(), + piece: &commonv1.PieceInfo{ + PieceNum: pieceNum, + RangeStart: offset, + RangeSize: uint32(result.Size), + PieceMd5: md5, + PieceOffset: offset, + PieceStyle: 0, + }, + } + + if err != nil { + log.Errorf("download piece %d error: %s", pieceNum, err) + pt.ReportPieceResult(request, result, detectBackSourceError(err)) + return err + } + + if result.Size != int64(size) { + log.Errorf("download piece %d size not match, desired: %d, actual: %d", pieceNum, size, result.Size) + pt.ReportPieceResult(request, result, detectBackSourceError(err)) + return storage.ErrShortRead + } + + pt.ReportPieceResult(request, result, nil) + pt.PublishPieceInfo(pieceNum, uint32(result.Size)) + + log.Debugf("piece %d done", pieceNum) + } + return nil +} diff --git a/client/daemon/peer/piece_manager_test.go b/client/daemon/peer/piece_manager_test.go index 493aebdf6..2c1022495 100644 --- a/client/daemon/peer/piece_manager_test.go +++ b/client/daemon/peer/piece_manager_test.go @@ -23,6 +23,7 @@ import ( "encoding/hex" "fmt" "io" + "math" "net/http" "net/http/httptest" "os" @@ -481,7 +482,9 @@ func TestPieceManager_DownloadSource(t *testing.T) { outputBytes, err := os.ReadFile(output) assert.Nil(err, "load output file") - assert.Equal(testBytes, outputBytes, "output and desired output must match") + if string(testBytes) != string(outputBytes) { + assert.Equal(string(testBytes), string(outputBytes), "output and desired output must match") + } }) } } @@ -541,3 +544,273 @@ func TestDetectBackSourceError(t *testing.T) { }) } } + +func TestPieceGroup(t *testing.T) { + assert := testifyassert.New(t) + testCases := []struct { + name string + parsedRange *nethttp.Range + startPieceNum int32 + pieceSize uint32 + con int32 + pieceGroups []pieceGroup + }{ + { + name: "100-200-2", + pieceSize: 100, + parsedRange: &nethttp.Range{Start: 0, Length: 200}, + con: 2, + pieceGroups: []pieceGroup{ + { + start: 0, + end: 0, + startByte: 0, + endByte: 99, + }, + { + start: 1, + end: 1, + startByte: 100, + endByte: 199, + }, + }, + }, + { + name: "100-300-2", + pieceSize: 100, + parsedRange: &nethttp.Range{Start: 0, Length: 300}, + con: 2, + pieceGroups: []pieceGroup{ + { + start: 0, + end: 1, + startByte: 0, + endByte: 199, + }, + { + start: 2, + end: 2, + startByte: 200, + endByte: 299, + }, + }, + }, + { + name: "100-500-4", + pieceSize: 100, + parsedRange: &nethttp.Range{Start: 0, Length: 500}, + con: 4, + pieceGroups: []pieceGroup{ + { + start: 0, + end: 1, + startByte: 0, + endByte: 199, + }, + { + start: 2, + end: 2, + startByte: 200, + endByte: 299, + }, + { + start: 3, + end: 3, + startByte: 300, + endByte: 399, + }, + { + start: 4, + end: 4, + startByte: 400, + endByte: 499, + }, + }, + }, + { + name: "100-600-4", + pieceSize: 100, + parsedRange: &nethttp.Range{Start: 0, Length: 600}, + con: 4, + pieceGroups: []pieceGroup{ + { + start: 0, + end: 1, + startByte: 0, + endByte: 199, + }, + { + start: 2, + end: 3, + startByte: 200, + endByte: 399, + }, + { + start: 4, + end: 4, + startByte: 400, + endByte: 499, + }, + { + start: 5, + end: 5, + startByte: 500, + endByte: 599, + }, + }, + }, + { + name: "100-700-4", + pieceSize: 100, + parsedRange: &nethttp.Range{Start: 0, Length: 700}, + con: 4, + pieceGroups: []pieceGroup{ + { + start: 0, + end: 1, + startByte: 0, + endByte: 199, + }, + { + start: 2, + end: 3, + startByte: 200, + endByte: 399, + }, + { + start: 4, + end: 5, + startByte: 400, + endByte: 599, + }, + { + start: 6, + end: 6, + startByte: 600, + endByte: 699, + }, + }, + }, + { + name: "1-100-700-4", + pieceSize: 100, + startPieceNum: 1, + parsedRange: &nethttp.Range{Start: 90, Length: 707}, // last byte: 90 + 706 = 796 + con: 4, + pieceGroups: []pieceGroup{ + { + start: 1, + end: 2, + startByte: 190, + endByte: 389, + }, + { + start: 3, + end: 4, + startByte: 390, + endByte: 589, + }, + { + start: 5, + end: 6, + startByte: 590, + endByte: 789, + }, + { + start: 7, + end: 7, + startByte: 790, + endByte: 796, + }, + }, + }, + { + name: "100-1100-4", + pieceSize: 100, + parsedRange: &nethttp.Range{Start: 0, Length: 1100}, + con: 4, + pieceGroups: []pieceGroup{ + { + start: 0, + end: 2, + startByte: 0, + endByte: 299, + }, + { + start: 3, + end: 5, + startByte: 300, + endByte: 599, + }, + { + start: 6, + end: 8, + startByte: 600, + endByte: 899, + }, + { + start: 9, + end: 10, + startByte: 900, + endByte: 1099, + }, + }, + }, + { + name: "from-real-e2e-test", + pieceSize: 4194304, + startPieceNum: 1, + parsedRange: &nethttp.Range{Start: 984674, Length: 20638941}, + con: 4, + pieceGroups: []pieceGroup{ + { + start: 1, + end: 1, + startByte: 5178978, + endByte: 9373281, + }, + { + start: 2, + end: 2, + startByte: 9373282, + endByte: 13567585, + }, + { + start: 3, + end: 3, + startByte: 13567586, + endByte: 17761889, + }, + { + start: 4, + end: 4, + startByte: 17761890, + endByte: 21623614, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + pieceCount := int32(math.Ceil(float64(tc.parsedRange.Length) / float64(tc.pieceSize))) + pieceCountToDownload := pieceCount - tc.startPieceNum + + minPieceCountPerGroup := pieceCountToDownload / tc.con + reminderPieces := pieceCountToDownload % tc.con + + for i := int32(0); i < tc.con; i++ { + tc.pieceGroups[i].pieceSize = tc.pieceSize + tc.pieceGroups[i].parsedRange = tc.parsedRange + } + + var pieceGroups []pieceGroup + for i := int32(0); i < tc.con; i++ { + pg := newPieceGroup(i, reminderPieces, tc.startPieceNum, minPieceCountPerGroup, tc.pieceSize, tc.parsedRange) + pieceGroups = append(pieceGroups, *pg) + } + + assert.Equal(tc.pieceGroups, pieceGroups, "piece group should equal") + }) + } +} diff --git a/client/daemon/storage/local_storage.go b/client/daemon/storage/local_storage.go index 3c29999f7..67656244b 100644 --- a/client/daemon/storage/local_storage.go +++ b/client/daemon/storage/local_storage.go @@ -106,7 +106,7 @@ func (t *localTaskStore) WritePiece(ctx context.Context, req *WritePieceRequest) t.RLock() if piece, ok := t.Pieces[req.Num]; ok { t.RUnlock() - t.Debugf("piece %d already exist,ignore writing piece", req.Num) + t.Debugf("piece %d already exist, ignore writing piece", req.Num) // discard already downloaded data for back source n, err = io.CopyN(io.Discard, req.Reader, piece.Range.Length) if err != nil && err != io.EOF { diff --git a/client/daemon/storage/local_storage_subtask.go b/client/daemon/storage/local_storage_subtask.go index 705497dd7..5673b4083 100644 --- a/client/daemon/storage/local_storage_subtask.go +++ b/client/daemon/storage/local_storage_subtask.go @@ -50,7 +50,7 @@ func (t *localSubTaskStore) WritePiece(ctx context.Context, req *WritePieceReque t.RLock() if piece, ok := t.Pieces[req.Num]; ok { t.RUnlock() - t.Debugf("piece %d already exist,ignore writing piece", req.Num) + t.Debugf("piece %d already exist, ignore writing piece", req.Num) // discard already downloaded data for back source n, err = io.CopyN(io.Discard, req.Reader, piece.Range.Length) if err != nil && err != io.EOF { @@ -124,7 +124,7 @@ func (t *localSubTaskStore) WritePiece(ctx context.Context, req *WritePieceReque } t.Debugf("wrote %d bytes to file %s, piece %d, start %d, length: %d", - n, t.DataFilePath, req.Num, req.Range.Start, req.Range.Length) + n, t.parent.DataFilePath, req.Num, req.Range.Start, req.Range.Length) t.Lock() defer t.Unlock() // double check diff --git a/go.mod b/go.mod index ee9e7d182..9ace1d744 100644 --- a/go.mod +++ b/go.mod @@ -118,6 +118,7 @@ require ( github.com/containerd/log v0.1.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/deckarep/golang-set/v2 v2.3.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/distribution/reference v0.5.0 // indirect github.com/docker/go-metrics v0.0.1 // indirect diff --git a/go.sum b/go.sum index 207c84859..e14266c8a 100644 --- a/go.sum +++ b/go.sum @@ -370,6 +370,8 @@ github.com/daaku/go.zipexe v1.0.0/go.mod h1:z8IiR6TsVLEYKwXAoE/I+8ys/sDkgTzSL0CL github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/deckarep/golang-set/v2 v2.3.1 h1:vjmkvJt/IV27WXPyYQpAh4bRyWJc5Y435D17XQ9QU5A= +github.com/deckarep/golang-set/v2 v2.3.1/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= github.com/denisenkom/go-mssqldb v0.11.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba/go.mod h1:dV8lFg6daOBZbT6/BDGIz6Y3WFGn8juu6G+CQ6LHtl0= github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=