From 05c959406bb9db8cdce2b280ca6bc7b3357a0ebe Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Fri, 24 Dec 2021 16:45:53 +0100 Subject: [PATCH] chunked: split appendCompressedStreamToFile Signed-off-by: Giuseppe Scrivano --- storage/pkg/chunked/storage_linux.go | 55 +++++++++++++++++----------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/storage/pkg/chunked/storage_linux.go b/storage/pkg/chunked/storage_linux.go index 83538f8aa6..875455dd33 100644 --- a/storage/pkg/chunked/storage_linux.go +++ b/storage/pkg/chunked/storage_linux.go @@ -60,6 +60,7 @@ type chunkedDiffer struct { gzipReader *pgzip.Reader zstdReader *zstd.Decoder + rawReader io.Reader } var xattrsToIgnore = map[string]interface{}{ @@ -748,45 +749,58 @@ func openOrCreateDirUnderRoot(name string, dirfd int, mode os.FileMode) (*os.Fil return nil, err } -func (c *chunkedDiffer) appendCompressedStreamToFile(compression compressedFileType, destFile *destinationFile, reader io.Reader, size int64) (err error) { +func (c *chunkedDiffer) prepareCompressedStreamToFile(compression compressedFileType, from io.Reader, mf *missingFileChunk) error { switch compression { case fileTypeZstdChunked: + c.rawReader = io.LimitReader(from, mf.CompressedSize) if c.zstdReader == nil { - r, err := zstd.NewReader(reader) + r, err := zstd.NewReader(c.rawReader) if err != nil { return err } c.zstdReader = r } else { - if err := c.zstdReader.Reset(reader); err != nil { + if err := c.zstdReader.Reset(c.rawReader); err != nil { return err } } - defer c.zstdReader.Reset(nil) - if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.zstdReader, size), c.copyBuffer); err != nil { - return err - } case fileTypeEstargz: + c.rawReader = io.LimitReader(from, mf.CompressedSize) if c.gzipReader == nil { - r, err := pgzip.NewReader(reader) + r, err := pgzip.NewReader(c.rawReader) if err != nil { return err } c.gzipReader = r } else { - if err := c.gzipReader.Reset(reader); err != nil { + if err := c.gzipReader.Reset(c.rawReader); err != nil { return err } } - defer c.gzipReader.Close() - if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.gzipReader, size), c.copyBuffer); err != nil { + case fileTypeNoCompression: + c.rawReader = io.LimitReader(from, mf.UncompressedSize) + default: + return fmt.Errorf("unknown file type %q", c.fileType) + } + return nil +} + +func (c *chunkedDiffer) appendCompressedStreamToFile(compression compressedFileType, to io.Writer, size int64) error { + switch compression { + case fileTypeZstdChunked: + defer c.zstdReader.Reset(nil) + if _, err := io.CopyBuffer(to, io.LimitReader(c.zstdReader, size), c.copyBuffer); err != nil { + return err + } + case fileTypeEstargz: + defer c.gzipReader.Close() + if _, err := io.CopyBuffer(to, io.LimitReader(c.gzipReader, size), c.copyBuffer); err != nil { return err } case fileTypeNoCompression: - _, err := io.CopyBuffer(destFile.to, io.LimitReader(reader, size), c.copyBuffer) - if err != nil { + if _, err := io.CopyBuffer(to, io.LimitReader(c.rawReader, size), c.copyBuffer); err != nil { return err } default: @@ -899,6 +913,11 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan goto exit } + if err := c.prepareCompressedStreamToFile(compression, part, &mf); err != nil { + Err = err + goto exit + } + // Open the new file if it is different that what is already // opened if destFile == nil || destFile.metadata.Name != mf.File.Name { @@ -925,17 +944,11 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan } } - streamLength := mf.CompressedSize - if compression == fileTypeNoCompression { - streamLength = mf.UncompressedSize - } - limitReader := io.LimitReader(part, streamLength) - - if err := c.appendCompressedStreamToFile(compression, destFile, limitReader, mf.UncompressedSize); err != nil { + if err := c.appendCompressedStreamToFile(compression, destFile.to, mf.UncompressedSize); err != nil { Err = err goto exit } - if _, err := io.CopyBuffer(ioutil.Discard, limitReader, c.copyBuffer); err != nil { + if _, err := io.CopyBuffer(ioutil.Discard, c.rawReader, c.copyBuffer); err != nil { Err = err goto exit }