feat: back to source with piece group(multiple pieces) (#2826)

Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
Jim Ma 2023-12-27 15:54:21 +08:00 committed by GitHub
parent 086cb62d01
commit e329bdbb39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 476 additions and 11 deletions

View File

@ -31,6 +31,7 @@ import (
"sync" "sync"
"time" "time"
mapset "github.com/deckarep/golang-set/v2"
"github.com/go-http-utils/headers" "github.com/go-http-utils/headers"
"go.uber.org/atomic" "go.uber.org/atomic"
"golang.org/x/time/rate" "golang.org/x/time/rate"
@ -235,6 +236,7 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, request *DownloadPiec
return result, nil return result, nil
} }
// pieceOffset is the offset in the peer task, not the original range start from source
func (pm *pieceManager) processPieceFromSource(pt Task, func (pm *pieceManager) processPieceFromSource(pt Task,
reader io.Reader, contentLength int64, pieceNum int32, pieceOffset uint64, pieceSize uint32, reader io.Reader, contentLength int64, pieceNum int32, pieceOffset uint64, pieceSize uint32,
isLastPiece func(n int64) (totalPieces int32, contentLength int64, ok bool)) ( isLastPiece func(n int64) (totalPieces int32, contentLength int64, ok bool)) (
@ -796,26 +798,129 @@ func (pm *pieceManager) concurrentDownloadSource(ctx context.Context, pt Task, p
pt.SetContentLength(parsedRange.Length) pt.SetContentLength(parsedRange.Length)
pt.SetTotalPieces(pieceCount) pt.SetTotalPieces(pieceCount)
pieceCountToDownload := pieceCount - continuePieceNum
con := pm.concurrentOption.GoroutineCount con := pm.concurrentOption.GoroutineCount
// Fix int overflow // Fix int overflow
if int(pieceCount) > 0 && int(pieceCount) < con { if int(pieceCountToDownload) > 0 && int(pieceCountToDownload) < con {
con = int(pieceCount) con = int(pieceCountToDownload)
} }
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
err := pm.concurrentDownloadSourceByPiece(ctx, pt, peerTaskRequest, parsedRange, continuePieceNum, pieceCount, con, pieceSize, cancel) return pm.concurrentDownloadSourceByPieceGroup(ctx, pt, peerTaskRequest, parsedRange, continuePieceNum, pieceCount, pieceCountToDownload, con, pieceSize, cancel)
if err != nil { }
return err
func (pm *pieceManager) concurrentDownloadSourceByPieceGroup(
ctx context.Context, pt Task, peerTaskRequest *schedulerv1.PeerTaskRequest,
parsedRange *nethttp.Range, startPieceNum int32, pieceCount int32, pieceCountToDownload int32,
con int, pieceSize uint32, cancel context.CancelFunc) error {
log := pt.Log()
log.Infof("start concurrentDownloadSourceByPieceGroup, startPieceNum: %d, pieceCount: %d, pieceCountToDownload: %d, con: %d, pieceSize: %d",
startPieceNum, pieceCount, pieceCountToDownload, con, pieceSize)
var downloadError atomic.Value
downloadedPieces := mapset.NewSet[int32]()
wg := sync.WaitGroup{}
wg.Add(con)
minPieceCountPerGroup := pieceCountToDownload / int32(con)
reminderPieces := pieceCountToDownload % int32(con)
// piece group eg:
// con = 4, piece = 5:
// worker 0: 2
// worker 1: 1
// worker 2: 1
// worker 3: 1
// worker 4: 1
for i := int32(0); i < int32(con); i++ {
go func(i int32) {
pg := newPieceGroup(i, reminderPieces, startPieceNum, minPieceCountPerGroup, pieceSize, parsedRange)
log.Infof("concurrent worker %d start to download piece %d-%d, byte %d-%d", i, pg.start, pg.end, pg.startByte, pg.endByte)
_, _, retryErr := retry.Run(ctx,
pm.concurrentOption.InitBackoff,
pm.concurrentOption.MaxBackoff,
pm.concurrentOption.MaxAttempts,
func() (data any, cancel bool, err error) {
err = pm.downloadPieceGroupFromSource(ctx, pt, log,
peerTaskRequest, pg, pieceCount, pieceCountToDownload, downloadedPieces)
return nil, errors.Is(err, context.Canceled), err
})
if retryErr != nil {
// download piece error after many retry, cancel task
cancel()
downloadError.Store(&backSourceError{err: retryErr})
log.Infof("concurrent worker %d failed to download piece group after %d retries, last error: %s",
i, pm.concurrentOption.MaxAttempts, retryErr.Error())
}
wg.Done()
}(i)
}
wg.Wait()
// check error
if downloadError.Load() != nil {
return downloadError.Load().(*backSourceError).err
} }
return nil return nil
} }
type pieceGroup struct {
start, end int32
startByte, endByte int64
// store original task metadata
pieceSize uint32
parsedRange *nethttp.Range
}
func newPieceGroup(i int32, reminderPieces int32, startPieceNum int32, minPieceCountPerGroup int32, pieceSize uint32, parsedRange *nethttp.Range) *pieceGroup {
var (
start int32
end int32
)
if i < reminderPieces {
start = i*minPieceCountPerGroup + i
end = start + minPieceCountPerGroup
} else {
start = i*minPieceCountPerGroup + reminderPieces
end = start + minPieceCountPerGroup - 1
}
// adjust by startPieceNum
start += startPieceNum
end += startPieceNum
// calculate piece group first and last range byte with parsedRange.Start
startByte := int64(start) * int64(pieceSize)
endByte := int64(end+1)*int64(pieceSize) - 1
if endByte > parsedRange.Length-1 {
endByte = parsedRange.Length - 1
}
// adjust by range start
startByte += parsedRange.Start
endByte += parsedRange.Start
pg := &pieceGroup{
start: start,
end: end,
startByte: startByte,
endByte: endByte,
pieceSize: pieceSize,
parsedRange: parsedRange,
}
return pg
}
func (pm *pieceManager) concurrentDownloadSourceByPiece( func (pm *pieceManager) concurrentDownloadSourceByPiece(
ctx context.Context, pt Task, peerTaskRequest *schedulerv1.PeerTaskRequest, ctx context.Context, pt Task, peerTaskRequest *schedulerv1.PeerTaskRequest,
parsedRange *nethttp.Range, startPieceNum int32, pieceCount int32, parsedRange *nethttp.Range, startPieceNum int32, pieceCount int32, pieceCountToDownload int32,
con int, pieceSize uint32, cancel context.CancelFunc) error { con int, pieceSize uint32, cancel context.CancelFunc) error {
log := pt.Log() log := pt.Log()
@ -823,7 +928,7 @@ func (pm *pieceManager) concurrentDownloadSourceByPiece(
var pieceCh = make(chan int32, con) var pieceCh = make(chan int32, con)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(int(pieceCount - startPieceNum)) wg.Add(int(pieceCountToDownload))
downloadedPieceCount := atomic.NewInt32(startPieceNum) downloadedPieceCount := atomic.NewInt32(startPieceNum)
@ -965,3 +1070,87 @@ func (pm *pieceManager) downloadPieceFromSource(ctx context.Context,
pt.PublishPieceInfo(pieceNum, uint32(result.Size)) pt.PublishPieceInfo(pieceNum, uint32(result.Size))
return nil return nil
} }
func (pm *pieceManager) downloadPieceGroupFromSource(ctx context.Context,
pt Task, log *logger.SugaredLoggerOnWith,
peerTaskRequest *schedulerv1.PeerTaskRequest,
pg *pieceGroup,
totalPieceCount int32,
totalPieceCountToDownload int32,
downloadedPieces mapset.Set[int32]) error {
backSourceRequest, err := source.NewRequestWithContext(ctx, peerTaskRequest.Url, peerTaskRequest.UrlMeta.Header)
if err != nil {
log.Errorf("build piece %d-%d back source request error: %s", pg.start, pg.end, err)
return err
}
pieceGroupRange := fmt.Sprintf("%d-%d", pg.startByte, pg.endByte)
// FIXME refactor source package, normal Range header is enough
backSourceRequest.Header.Set(source.Range, pieceGroupRange)
backSourceRequest.Header.Set(headers.Range, "bytes="+pieceGroupRange)
log.Debugf("piece %d-%d back source header: %#v", pg.start, pg.end, backSourceRequest.Header)
response, err := source.Download(backSourceRequest)
if err != nil {
log.Errorf("piece %d-%d back source response error: %s", pg.start, pg.end, err)
return err
}
defer response.Body.Close()
err = response.Validate()
if err != nil {
log.Errorf("piece %d-%d back source response validate error: %s", pg.start, pg.end, err)
return err
}
log.Debugf("piece %d-%d back source response ok", pg.start, pg.end)
for i := pg.start; i <= pg.end; i++ {
pieceNum := i
offset := uint64(pg.startByte) + uint64(i-pg.start)*uint64(pg.pieceSize)
size := pg.pieceSize
// update last piece size
if offset+uint64(size)-1 > uint64(pg.endByte) {
size = uint32(uint64(pg.endByte) + 1 - offset)
}
result, md5, err := pm.processPieceFromSource(
pt, response.Body, pg.parsedRange.Length, pieceNum, offset-uint64(pg.parsedRange.Start), size,
func(int64) (int32, int64, bool) {
downloadedPieces.Add(pieceNum)
return totalPieceCount, pg.parsedRange.Length, downloadedPieces.Cardinality() == int(totalPieceCountToDownload)
})
request := &DownloadPieceRequest{
TaskID: pt.GetTaskID(),
PeerID: pt.GetPeerID(),
piece: &commonv1.PieceInfo{
PieceNum: pieceNum,
RangeStart: offset,
RangeSize: uint32(result.Size),
PieceMd5: md5,
PieceOffset: offset,
PieceStyle: 0,
},
}
if err != nil {
log.Errorf("download piece %d error: %s", pieceNum, err)
pt.ReportPieceResult(request, result, detectBackSourceError(err))
return err
}
if result.Size != int64(size) {
log.Errorf("download piece %d size not match, desired: %d, actual: %d", pieceNum, size, result.Size)
pt.ReportPieceResult(request, result, detectBackSourceError(err))
return storage.ErrShortRead
}
pt.ReportPieceResult(request, result, nil)
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
log.Debugf("piece %d done", pieceNum)
}
return nil
}

View File

@ -23,6 +23,7 @@ import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"io" "io"
"math"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
@ -481,7 +482,9 @@ func TestPieceManager_DownloadSource(t *testing.T) {
outputBytes, err := os.ReadFile(output) outputBytes, err := os.ReadFile(output)
assert.Nil(err, "load output file") assert.Nil(err, "load output file")
assert.Equal(testBytes, outputBytes, "output and desired output must match") if string(testBytes) != string(outputBytes) {
assert.Equal(string(testBytes), string(outputBytes), "output and desired output must match")
}
}) })
} }
} }
@ -541,3 +544,273 @@ func TestDetectBackSourceError(t *testing.T) {
}) })
} }
} }
func TestPieceGroup(t *testing.T) {
assert := testifyassert.New(t)
testCases := []struct {
name string
parsedRange *nethttp.Range
startPieceNum int32
pieceSize uint32
con int32
pieceGroups []pieceGroup
}{
{
name: "100-200-2",
pieceSize: 100,
parsedRange: &nethttp.Range{Start: 0, Length: 200},
con: 2,
pieceGroups: []pieceGroup{
{
start: 0,
end: 0,
startByte: 0,
endByte: 99,
},
{
start: 1,
end: 1,
startByte: 100,
endByte: 199,
},
},
},
{
name: "100-300-2",
pieceSize: 100,
parsedRange: &nethttp.Range{Start: 0, Length: 300},
con: 2,
pieceGroups: []pieceGroup{
{
start: 0,
end: 1,
startByte: 0,
endByte: 199,
},
{
start: 2,
end: 2,
startByte: 200,
endByte: 299,
},
},
},
{
name: "100-500-4",
pieceSize: 100,
parsedRange: &nethttp.Range{Start: 0, Length: 500},
con: 4,
pieceGroups: []pieceGroup{
{
start: 0,
end: 1,
startByte: 0,
endByte: 199,
},
{
start: 2,
end: 2,
startByte: 200,
endByte: 299,
},
{
start: 3,
end: 3,
startByte: 300,
endByte: 399,
},
{
start: 4,
end: 4,
startByte: 400,
endByte: 499,
},
},
},
{
name: "100-600-4",
pieceSize: 100,
parsedRange: &nethttp.Range{Start: 0, Length: 600},
con: 4,
pieceGroups: []pieceGroup{
{
start: 0,
end: 1,
startByte: 0,
endByte: 199,
},
{
start: 2,
end: 3,
startByte: 200,
endByte: 399,
},
{
start: 4,
end: 4,
startByte: 400,
endByte: 499,
},
{
start: 5,
end: 5,
startByte: 500,
endByte: 599,
},
},
},
{
name: "100-700-4",
pieceSize: 100,
parsedRange: &nethttp.Range{Start: 0, Length: 700},
con: 4,
pieceGroups: []pieceGroup{
{
start: 0,
end: 1,
startByte: 0,
endByte: 199,
},
{
start: 2,
end: 3,
startByte: 200,
endByte: 399,
},
{
start: 4,
end: 5,
startByte: 400,
endByte: 599,
},
{
start: 6,
end: 6,
startByte: 600,
endByte: 699,
},
},
},
{
name: "1-100-700-4",
pieceSize: 100,
startPieceNum: 1,
parsedRange: &nethttp.Range{Start: 90, Length: 707}, // last byte: 90 + 706 = 796
con: 4,
pieceGroups: []pieceGroup{
{
start: 1,
end: 2,
startByte: 190,
endByte: 389,
},
{
start: 3,
end: 4,
startByte: 390,
endByte: 589,
},
{
start: 5,
end: 6,
startByte: 590,
endByte: 789,
},
{
start: 7,
end: 7,
startByte: 790,
endByte: 796,
},
},
},
{
name: "100-1100-4",
pieceSize: 100,
parsedRange: &nethttp.Range{Start: 0, Length: 1100},
con: 4,
pieceGroups: []pieceGroup{
{
start: 0,
end: 2,
startByte: 0,
endByte: 299,
},
{
start: 3,
end: 5,
startByte: 300,
endByte: 599,
},
{
start: 6,
end: 8,
startByte: 600,
endByte: 899,
},
{
start: 9,
end: 10,
startByte: 900,
endByte: 1099,
},
},
},
{
name: "from-real-e2e-test",
pieceSize: 4194304,
startPieceNum: 1,
parsedRange: &nethttp.Range{Start: 984674, Length: 20638941},
con: 4,
pieceGroups: []pieceGroup{
{
start: 1,
end: 1,
startByte: 5178978,
endByte: 9373281,
},
{
start: 2,
end: 2,
startByte: 9373282,
endByte: 13567585,
},
{
start: 3,
end: 3,
startByte: 13567586,
endByte: 17761889,
},
{
start: 4,
end: 4,
startByte: 17761890,
endByte: 21623614,
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
pieceCount := int32(math.Ceil(float64(tc.parsedRange.Length) / float64(tc.pieceSize)))
pieceCountToDownload := pieceCount - tc.startPieceNum
minPieceCountPerGroup := pieceCountToDownload / tc.con
reminderPieces := pieceCountToDownload % tc.con
for i := int32(0); i < tc.con; i++ {
tc.pieceGroups[i].pieceSize = tc.pieceSize
tc.pieceGroups[i].parsedRange = tc.parsedRange
}
var pieceGroups []pieceGroup
for i := int32(0); i < tc.con; i++ {
pg := newPieceGroup(i, reminderPieces, tc.startPieceNum, minPieceCountPerGroup, tc.pieceSize, tc.parsedRange)
pieceGroups = append(pieceGroups, *pg)
}
assert.Equal(tc.pieceGroups, pieceGroups, "piece group should equal")
})
}
}

View File

@ -124,7 +124,7 @@ func (t *localSubTaskStore) WritePiece(ctx context.Context, req *WritePieceReque
} }
t.Debugf("wrote %d bytes to file %s, piece %d, start %d, length: %d", t.Debugf("wrote %d bytes to file %s, piece %d, start %d, length: %d",
n, t.DataFilePath, req.Num, req.Range.Start, req.Range.Length) n, t.parent.DataFilePath, req.Num, req.Range.Start, req.Range.Length)
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
// double check // double check

1
go.mod
View File

@ -118,6 +118,7 @@ require (
github.com/containerd/log v0.1.0 // indirect github.com/containerd/log v0.1.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set/v2 v2.3.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/distribution/reference v0.5.0 // indirect github.com/distribution/reference v0.5.0 // indirect
github.com/docker/go-metrics v0.0.1 // indirect github.com/docker/go-metrics v0.0.1 // indirect

2
go.sum
View File

@ -370,6 +370,8 @@ github.com/daaku/go.zipexe v1.0.0/go.mod h1:z8IiR6TsVLEYKwXAoE/I+8ys/sDkgTzSL0CL
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deckarep/golang-set/v2 v2.3.1 h1:vjmkvJt/IV27WXPyYQpAh4bRyWJc5Y435D17XQ9QU5A=
github.com/deckarep/golang-set/v2 v2.3.1/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4=
github.com/denisenkom/go-mssqldb v0.11.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= github.com/denisenkom/go-mssqldb v0.11.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba/go.mod h1:dV8lFg6daOBZbT6/BDGIz6Y3WFGn8juu6G+CQ6LHtl0= github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba/go.mod h1:dV8lFg6daOBZbT6/BDGIz6Y3WFGn8juu6G+CQ6LHtl0=
github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=