From cee7d749eff8eda7dea894a6ec2a62cf0b8aae24 Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Sun, 15 Aug 2021 14:05:20 +0800 Subject: [PATCH] fix: cdn back source range size overflow (#550) Signed-off-by: Jim Ma --- cdnsystem/daemon/cdn/cache_writer.go | 51 +++++++++++++++------------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/cdnsystem/daemon/cdn/cache_writer.go b/cdnsystem/daemon/cdn/cache_writer.go index 6cbaf00c3..55104aee4 100644 --- a/cdnsystem/daemon/cdn/cache_writer.go +++ b/cdnsystem/daemon/cdn/cache_writer.go @@ -69,23 +69,23 @@ func (cw *cacheWriter) startWriter(reader io.Reader, task *types.SeedTask, detec curPieceNum := len(detectResult.pieceMetaRecords) routineCount := calculateRoutineCount(task.SourceFileLength-currentSourceFileLength, task.PieceSize) // start writer pool - backSourceFileLength, totalPieceCount, err := cw.doWrite(reader, task, routineCount, curPieceNum) + backSourceLength, totalPieceCount, err := cw.doWrite(reader, task, routineCount, curPieceNum) if err != nil { - return &downloadMetadata{backSourceLength: backSourceFileLength}, fmt.Errorf("write data: %v", err) + return &downloadMetadata{backSourceLength: backSourceLength}, fmt.Errorf("write data: %v", err) } storageInfo, err := cw.cacheDataManager.statDownloadFile(task.TaskID) if err != nil { - return &downloadMetadata{backSourceLength: backSourceFileLength}, fmt.Errorf("stat cdn download file: %v", err) + return &downloadMetadata{backSourceLength: backSourceLength}, fmt.Errorf("stat cdn download file: %v", err) } // TODO Try getting it from the ProgressManager first pieceMd5Sign, _, err := cw.cacheDataManager.getPieceMd5Sign(task.TaskID) if err != nil { - return &downloadMetadata{backSourceLength: backSourceFileLength}, fmt.Errorf("get piece md5 sign: %v", err) + return &downloadMetadata{backSourceLength: backSourceLength}, fmt.Errorf("get piece md5 sign: %v", err) } return &downloadMetadata{ - backSourceLength: backSourceFileLength, + backSourceLength: backSourceLength, realCdnFileLength: storageInfo.Size, - realSourceFileLength: currentSourceFileLength + backSourceFileLength, + realSourceFileLength: currentSourceFileLength + backSourceLength, pieceTotalCount: int32(totalPieceCount), pieceMd5Sign: pieceMd5Sign, }, nil @@ -97,7 +97,7 @@ func (cw *cacheWriter) doWrite(reader io.Reader, task *types.SeedTask, routineCo return new(bytes.Buffer) }, } - var backSourceFileLength int64 + var backSourceLength int64 buf := make([]byte, 256*1024) jobCh := make(chan *piece) var wg = &sync.WaitGroup{} @@ -106,15 +106,15 @@ func (cw *cacheWriter) doWrite(reader io.Reader, task *types.SeedTask, routineCo var bb = bufPool.Get().(*bytes.Buffer) bb.Reset() limitReader := io.LimitReader(reader, int64(task.PieceSize)) - n, err := io.CopyBuffer(bb, limitReader, buf) + n, err = io.CopyBuffer(bb, limitReader, buf) if err != nil { close(jobCh) - return backSourceFileLength, 0, fmt.Errorf("read source taskID %s pieceNum %d piece: %v", task.TaskID, curPieceNum, err) + return backSourceLength, 0, fmt.Errorf("read source taskID %s pieceNum %d piece: %v", task.TaskID, curPieceNum, err) } if n == 0 { break } - backSourceFileLength = backSourceFileLength + n + backSourceLength += n jobCh <- &piece{ taskID: task.TaskID, @@ -129,7 +129,7 @@ func (cw *cacheWriter) doWrite(reader io.Reader, task *types.SeedTask, routineCo } close(jobCh) wg.Wait() - return backSourceFileLength, curPieceNum, nil + return backSourceLength, curPieceNum, nil } func (cw *cacheWriter) writerPool(wg *sync.WaitGroup, routineCount int, pieceCh chan *piece, bufPool *sync.Pool) { @@ -137,45 +137,48 @@ func (cw *cacheWriter) writerPool(wg *sync.WaitGroup, routineCount int, pieceCh for i := 0; i < routineCount; i++ { go func() { defer wg.Done() - for piece := range pieceCh { + for p := range pieceCh { // TODO Subsequent compression and other features are implemented through waitToWriteContent and pieceStyle - waitToWriteContent := piece.pieceContent + waitToWriteContent := p.pieceContent originPieceLen := waitToWriteContent.Len() // the length of the original data that has not been processed pieceLen := originPieceLen // the real length written to the storage medium after processing pieceStyle := types.PlainUnspecified pieceMd5 := md5.New() - err := cw.cacheDataManager.writeDownloadFile(piece.taskID, int64(piece.pieceNum)*int64(piece.pieceSize), int64(waitToWriteContent.Len()), - io.TeeReader(io.LimitReader(piece.pieceContent, int64(waitToWriteContent.Len())), pieceMd5)) + err := cw.cacheDataManager.writeDownloadFile( + p.taskID, int64(p.pieceNum)*int64(p.pieceSize), int64(waitToWriteContent.Len()), + io.TeeReader(io.LimitReader(p.pieceContent, int64(waitToWriteContent.Len())), pieceMd5)) // Recycle Buffer bufPool.Put(waitToWriteContent) if err != nil { - logger.Errorf("write taskID %s pieceNum %d file: %v", piece.taskID, piece.pieceNum, err) + logger.Errorf("write taskID %s pieceNum %d file: %v", p.taskID, p.pieceNum, err) continue } + start := uint64(p.pieceNum) * uint64(p.pieceSize) + end := start + uint64(pieceLen) - 1 pieceRecord := &storage.PieceMetaRecord{ - PieceNum: piece.pieceNum, + PieceNum: p.pieceNum, PieceLen: int32(pieceLen), Md5: digestutils.ToHashString(pieceMd5), Range: &rangeutils.Range{ - StartIndex: uint64(piece.pieceNum * piece.pieceSize), - EndIndex: uint64(piece.pieceNum*piece.pieceSize + int32(pieceLen) - 1), + StartIndex: start, + EndIndex: end, }, OriginRange: &rangeutils.Range{ - StartIndex: uint64(piece.pieceNum * piece.pieceSize), - EndIndex: uint64(piece.pieceNum*piece.pieceSize + int32(originPieceLen) - 1), + StartIndex: start, + EndIndex: end, }, PieceStyle: pieceStyle, } // write piece meta to storage - if err := cw.cacheDataManager.appendPieceMetaData(piece.taskID, pieceRecord); err != nil { + if err = cw.cacheDataManager.appendPieceMetaData(p.taskID, pieceRecord); err != nil { logger.Errorf("write piece meta file: %v", err) continue } if cw.cdnReporter != nil { - if err := cw.cdnReporter.reportPieceMetaRecord(piece.taskID, pieceRecord, DownloaderReport); err != nil { + if err = cw.cdnReporter.reportPieceMetaRecord(p.taskID, pieceRecord, DownloaderReport); err != nil { // NOTE: should we do this job again? - logger.Errorf("report piece status, pieceNum %d pieceMetaRecord %s: %v", piece.pieceNum, pieceRecord, err) + logger.Errorf("report piece status, pieceNum %d pieceMetaRecord %s: %v", p.pieceNum, pieceRecord, err) } } }