fix: cdn back source range size overflow (#550)

Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
Jim Ma 2021-08-15 14:05:20 +08:00 committed by Gaius
parent 962a171bcd
commit cee7d749ef
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
1 changed files with 27 additions and 24 deletions

View File

@ -69,23 +69,23 @@ func (cw *cacheWriter) startWriter(reader io.Reader, task *types.SeedTask, detec
curPieceNum := len(detectResult.pieceMetaRecords) curPieceNum := len(detectResult.pieceMetaRecords)
routineCount := calculateRoutineCount(task.SourceFileLength-currentSourceFileLength, task.PieceSize) routineCount := calculateRoutineCount(task.SourceFileLength-currentSourceFileLength, task.PieceSize)
// start writer pool // start writer pool
backSourceFileLength, totalPieceCount, err := cw.doWrite(reader, task, routineCount, curPieceNum) backSourceLength, totalPieceCount, err := cw.doWrite(reader, task, routineCount, curPieceNum)
if err != nil { if err != nil {
return &downloadMetadata{backSourceLength: backSourceFileLength}, fmt.Errorf("write data: %v", err) return &downloadMetadata{backSourceLength: backSourceLength}, fmt.Errorf("write data: %v", err)
} }
storageInfo, err := cw.cacheDataManager.statDownloadFile(task.TaskID) storageInfo, err := cw.cacheDataManager.statDownloadFile(task.TaskID)
if err != nil { if err != nil {
return &downloadMetadata{backSourceLength: backSourceFileLength}, fmt.Errorf("stat cdn download file: %v", err) return &downloadMetadata{backSourceLength: backSourceLength}, fmt.Errorf("stat cdn download file: %v", err)
} }
// TODO Try getting it from the ProgressManager first // TODO Try getting it from the ProgressManager first
pieceMd5Sign, _, err := cw.cacheDataManager.getPieceMd5Sign(task.TaskID) pieceMd5Sign, _, err := cw.cacheDataManager.getPieceMd5Sign(task.TaskID)
if err != nil { if err != nil {
return &downloadMetadata{backSourceLength: backSourceFileLength}, fmt.Errorf("get piece md5 sign: %v", err) return &downloadMetadata{backSourceLength: backSourceLength}, fmt.Errorf("get piece md5 sign: %v", err)
} }
return &downloadMetadata{ return &downloadMetadata{
backSourceLength: backSourceFileLength, backSourceLength: backSourceLength,
realCdnFileLength: storageInfo.Size, realCdnFileLength: storageInfo.Size,
realSourceFileLength: currentSourceFileLength + backSourceFileLength, realSourceFileLength: currentSourceFileLength + backSourceLength,
pieceTotalCount: int32(totalPieceCount), pieceTotalCount: int32(totalPieceCount),
pieceMd5Sign: pieceMd5Sign, pieceMd5Sign: pieceMd5Sign,
}, nil }, nil
@ -97,7 +97,7 @@ func (cw *cacheWriter) doWrite(reader io.Reader, task *types.SeedTask, routineCo
return new(bytes.Buffer) return new(bytes.Buffer)
}, },
} }
var backSourceFileLength int64 var backSourceLength int64
buf := make([]byte, 256*1024) buf := make([]byte, 256*1024)
jobCh := make(chan *piece) jobCh := make(chan *piece)
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
@ -106,15 +106,15 @@ func (cw *cacheWriter) doWrite(reader io.Reader, task *types.SeedTask, routineCo
var bb = bufPool.Get().(*bytes.Buffer) var bb = bufPool.Get().(*bytes.Buffer)
bb.Reset() bb.Reset()
limitReader := io.LimitReader(reader, int64(task.PieceSize)) limitReader := io.LimitReader(reader, int64(task.PieceSize))
n, err := io.CopyBuffer(bb, limitReader, buf) n, err = io.CopyBuffer(bb, limitReader, buf)
if err != nil { if err != nil {
close(jobCh) close(jobCh)
return backSourceFileLength, 0, fmt.Errorf("read source taskID %s pieceNum %d piece: %v", task.TaskID, curPieceNum, err) return backSourceLength, 0, fmt.Errorf("read source taskID %s pieceNum %d piece: %v", task.TaskID, curPieceNum, err)
} }
if n == 0 { if n == 0 {
break break
} }
backSourceFileLength = backSourceFileLength + n backSourceLength += n
jobCh <- &piece{ jobCh <- &piece{
taskID: task.TaskID, taskID: task.TaskID,
@ -129,7 +129,7 @@ func (cw *cacheWriter) doWrite(reader io.Reader, task *types.SeedTask, routineCo
} }
close(jobCh) close(jobCh)
wg.Wait() wg.Wait()
return backSourceFileLength, curPieceNum, nil return backSourceLength, curPieceNum, nil
} }
func (cw *cacheWriter) writerPool(wg *sync.WaitGroup, routineCount int, pieceCh chan *piece, bufPool *sync.Pool) { func (cw *cacheWriter) writerPool(wg *sync.WaitGroup, routineCount int, pieceCh chan *piece, bufPool *sync.Pool) {
@ -137,45 +137,48 @@ func (cw *cacheWriter) writerPool(wg *sync.WaitGroup, routineCount int, pieceCh
for i := 0; i < routineCount; i++ { for i := 0; i < routineCount; i++ {
go func() { go func() {
defer wg.Done() defer wg.Done()
for piece := range pieceCh { for p := range pieceCh {
// TODO Subsequent compression and other features are implemented through waitToWriteContent and pieceStyle // TODO Subsequent compression and other features are implemented through waitToWriteContent and pieceStyle
waitToWriteContent := piece.pieceContent waitToWriteContent := p.pieceContent
originPieceLen := waitToWriteContent.Len() // the length of the original data that has not been processed originPieceLen := waitToWriteContent.Len() // the length of the original data that has not been processed
pieceLen := originPieceLen // the real length written to the storage medium after processing pieceLen := originPieceLen // the real length written to the storage medium after processing
pieceStyle := types.PlainUnspecified pieceStyle := types.PlainUnspecified
pieceMd5 := md5.New() pieceMd5 := md5.New()
err := cw.cacheDataManager.writeDownloadFile(piece.taskID, int64(piece.pieceNum)*int64(piece.pieceSize), int64(waitToWriteContent.Len()), err := cw.cacheDataManager.writeDownloadFile(
io.TeeReader(io.LimitReader(piece.pieceContent, int64(waitToWriteContent.Len())), pieceMd5)) p.taskID, int64(p.pieceNum)*int64(p.pieceSize), int64(waitToWriteContent.Len()),
io.TeeReader(io.LimitReader(p.pieceContent, int64(waitToWriteContent.Len())), pieceMd5))
// Recycle Buffer // Recycle Buffer
bufPool.Put(waitToWriteContent) bufPool.Put(waitToWriteContent)
if err != nil { if err != nil {
logger.Errorf("write taskID %s pieceNum %d file: %v", piece.taskID, piece.pieceNum, err) logger.Errorf("write taskID %s pieceNum %d file: %v", p.taskID, p.pieceNum, err)
continue continue
} }
start := uint64(p.pieceNum) * uint64(p.pieceSize)
end := start + uint64(pieceLen) - 1
pieceRecord := &storage.PieceMetaRecord{ pieceRecord := &storage.PieceMetaRecord{
PieceNum: piece.pieceNum, PieceNum: p.pieceNum,
PieceLen: int32(pieceLen), PieceLen: int32(pieceLen),
Md5: digestutils.ToHashString(pieceMd5), Md5: digestutils.ToHashString(pieceMd5),
Range: &rangeutils.Range{ Range: &rangeutils.Range{
StartIndex: uint64(piece.pieceNum * piece.pieceSize), StartIndex: start,
EndIndex: uint64(piece.pieceNum*piece.pieceSize + int32(pieceLen) - 1), EndIndex: end,
}, },
OriginRange: &rangeutils.Range{ OriginRange: &rangeutils.Range{
StartIndex: uint64(piece.pieceNum * piece.pieceSize), StartIndex: start,
EndIndex: uint64(piece.pieceNum*piece.pieceSize + int32(originPieceLen) - 1), EndIndex: end,
}, },
PieceStyle: pieceStyle, PieceStyle: pieceStyle,
} }
// write piece meta to storage // write piece meta to storage
if err := cw.cacheDataManager.appendPieceMetaData(piece.taskID, pieceRecord); err != nil { if err = cw.cacheDataManager.appendPieceMetaData(p.taskID, pieceRecord); err != nil {
logger.Errorf("write piece meta file: %v", err) logger.Errorf("write piece meta file: %v", err)
continue continue
} }
if cw.cdnReporter != nil { if cw.cdnReporter != nil {
if err := cw.cdnReporter.reportPieceMetaRecord(piece.taskID, pieceRecord, DownloaderReport); err != nil { if err = cw.cdnReporter.reportPieceMetaRecord(p.taskID, pieceRecord, DownloaderReport); err != nil {
// NOTE: should we do this job again? // NOTE: should we do this job again?
logger.Errorf("report piece status, pieceNum %d pieceMetaRecord %s: %v", piece.pieceNum, pieceRecord, err) logger.Errorf("report piece status, pieceNum %d pieceMetaRecord %s: %v", p.pieceNum, pieceRecord, err)
} }
} }
} }