chunked: split appendCompressedStreamToFile

Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
This commit is contained in:
Giuseppe Scrivano 2021-12-24 16:45:53 +01:00
parent 126a172519
commit 05c959406b
1 changed files with 34 additions and 21 deletions

View File

@ -60,6 +60,7 @@ type chunkedDiffer struct {
gzipReader *pgzip.Reader
zstdReader *zstd.Decoder
rawReader io.Reader
}
var xattrsToIgnore = map[string]interface{}{
@ -748,45 +749,58 @@ func openOrCreateDirUnderRoot(name string, dirfd int, mode os.FileMode) (*os.Fil
return nil, err
}
func (c *chunkedDiffer) appendCompressedStreamToFile(compression compressedFileType, destFile *destinationFile, reader io.Reader, size int64) (err error) {
func (c *chunkedDiffer) prepareCompressedStreamToFile(compression compressedFileType, from io.Reader, mf *missingFileChunk) error {
switch compression {
case fileTypeZstdChunked:
c.rawReader = io.LimitReader(from, mf.CompressedSize)
if c.zstdReader == nil {
r, err := zstd.NewReader(reader)
r, err := zstd.NewReader(c.rawReader)
if err != nil {
return err
}
c.zstdReader = r
} else {
if err := c.zstdReader.Reset(reader); err != nil {
if err := c.zstdReader.Reset(c.rawReader); err != nil {
return err
}
}
defer c.zstdReader.Reset(nil)
if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.zstdReader, size), c.copyBuffer); err != nil {
return err
}
case fileTypeEstargz:
c.rawReader = io.LimitReader(from, mf.CompressedSize)
if c.gzipReader == nil {
r, err := pgzip.NewReader(reader)
r, err := pgzip.NewReader(c.rawReader)
if err != nil {
return err
}
c.gzipReader = r
} else {
if err := c.gzipReader.Reset(reader); err != nil {
if err := c.gzipReader.Reset(c.rawReader); err != nil {
return err
}
}
defer c.gzipReader.Close()
if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.gzipReader, size), c.copyBuffer); err != nil {
case fileTypeNoCompression:
c.rawReader = io.LimitReader(from, mf.UncompressedSize)
default:
return fmt.Errorf("unknown file type %q", c.fileType)
}
return nil
}
func (c *chunkedDiffer) appendCompressedStreamToFile(compression compressedFileType, to io.Writer, size int64) error {
switch compression {
case fileTypeZstdChunked:
defer c.zstdReader.Reset(nil)
if _, err := io.CopyBuffer(to, io.LimitReader(c.zstdReader, size), c.copyBuffer); err != nil {
return err
}
case fileTypeEstargz:
defer c.gzipReader.Close()
if _, err := io.CopyBuffer(to, io.LimitReader(c.gzipReader, size), c.copyBuffer); err != nil {
return err
}
case fileTypeNoCompression:
_, err := io.CopyBuffer(destFile.to, io.LimitReader(reader, size), c.copyBuffer)
if err != nil {
if _, err := io.CopyBuffer(to, io.LimitReader(c.rawReader, size), c.copyBuffer); err != nil {
return err
}
default:
@ -899,6 +913,11 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan
goto exit
}
if err := c.prepareCompressedStreamToFile(compression, part, &mf); err != nil {
Err = err
goto exit
}
// Open the new file if it is different that what is already
// opened
if destFile == nil || destFile.metadata.Name != mf.File.Name {
@ -925,17 +944,11 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan
}
}
streamLength := mf.CompressedSize
if compression == fileTypeNoCompression {
streamLength = mf.UncompressedSize
}
limitReader := io.LimitReader(part, streamLength)
if err := c.appendCompressedStreamToFile(compression, destFile, limitReader, mf.UncompressedSize); err != nil {
if err := c.appendCompressedStreamToFile(compression, destFile.to, mf.UncompressedSize); err != nil {
Err = err
goto exit
}
if _, err := io.CopyBuffer(ioutil.Discard, limitReader, c.copyBuffer); err != nil {
if _, err := io.CopyBuffer(ioutil.Discard, c.rawReader, c.copyBuffer); err != nil {
Err = err
goto exit
}