From 24b99d165fec8329f142faa5bbbf74393a60b1ab Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Mon, 20 Dec 2021 17:55:34 +0100 Subject: [PATCH] chunked: support copy from uncompressed stream Signed-off-by: Giuseppe Scrivano --- pkg/chunked/storage_linux.go | 49 ++++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/pkg/chunked/storage_linux.go b/pkg/chunked/storage_linux.go index 9f5a1b788..dd67d0a1f 100644 --- a/pkg/chunked/storage_linux.go +++ b/pkg/chunked/storage_linux.go @@ -8,7 +8,6 @@ import ( "fmt" "io" "io/ioutil" - "math" "os" "path/filepath" "reflect" @@ -43,8 +42,9 @@ const ( containersOverrideXattr = "user.containers.override_stat" bigDataKey = "zstd-chunked-manifest" - fileTypeZstdChunked = iota - fileTypeEstargz = iota + fileTypeZstdChunked = iota + fileTypeEstargz = iota + fileTypeNoCompression = iota ) type compressedFileType int @@ -783,8 +783,8 @@ func openOrCreateDirUnderRoot(name string, dirfd int, mode os.FileMode) (*os.Fil return nil, err } -func (c *chunkedDiffer) appendCompressedStreamToFile(destFile *destinationFile, reader io.Reader, size int64) (err error) { - switch c.fileType { +func (c *chunkedDiffer) appendCompressedStreamToFile(compression compressedFileType, destFile *destinationFile, reader io.Reader, size int64) (err error) { + switch compression { case fileTypeZstdChunked: z, err := zstd.NewReader(reader) if err != nil { @@ -818,6 +818,14 @@ func (c *chunkedDiffer) appendCompressedStreamToFile(destFile *destinationFile, if _, err := io.Copy(ioutil.Discard, reader); err != nil { return err } + case fileTypeNoCompression: + _, err := io.Copy(destFile.to, io.LimitReader(reader, size)) + if err != nil { + return err + } + if _, err := io.Copy(ioutil.Discard, reader); err != nil { + return err + } default: return fmt.Errorf("unknown file type %q", c.fileType) } @@ -869,6 +877,7 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan var destFile *destinationFile for _, missingPart := range missingParts { var part io.ReadCloser + compression := c.fileType switch { case missingPart.SourceChunk != nil: select { @@ -886,6 +895,7 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan if err != nil { return err } + compression = fileTypeNoCompression default: return errors.Errorf("internal error: missing part misses both local and remote data stream") } @@ -925,7 +935,7 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan limitReader := io.LimitReader(part, mf.CompressedSize) - if err := c.appendCompressedStreamToFile(destFile, limitReader, mf.UncompressedSize); err != nil { + if err := c.appendCompressedStreamToFile(compression, destFile, limitReader, mf.UncompressedSize); err != nil { part.Close() return err } @@ -946,34 +956,41 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart { } getGap := func(missingParts []missingPart, i int) int { - if missingParts[i-1].SourceChunk == nil || missingParts[i].SourceChunk == nil { - return math.MaxInt32 - } - prev := missingParts[i-1].SourceChunk.Offset + missingParts[i-1].SourceChunk.Length return int(missingParts[i].SourceChunk.Offset - prev) } + getCost := func(missingParts []missingPart, i int) int { + cost := getGap(missingParts, i) + if missingParts[i-1].OriginFile != nil { + cost += int(missingParts[i-1].SourceChunk.Length) + } + if missingParts[i].OriginFile != nil { + cost += int(missingParts[i].SourceChunk.Length) + } + return cost + } // this implementation doesn't account for duplicates, so it could merge // more than necessary to reach the specified target. Since target itself // is a heuristic value, it doesn't matter. - var gaps []int + costs := make([]int, len(missingParts)-1) for i := 1; i < len(missingParts); i++ { - gaps = append(gaps, getGap(missingParts, i)) + costs[i-1] = getCost(missingParts, i) } - sort.Ints(gaps) + sort.Ints(costs) toShrink := len(missingParts) - target - targetValue := gaps[toShrink-1] + targetValue := costs[toShrink] newMissingChunks := missingParts[0:1] for i := 1; i < len(missingParts); i++ { - gap := getGap(missingParts, i) - if gap > targetValue { + if getCost(missingParts, i) > targetValue { newMissingChunks = append(newMissingChunks, missingParts[i]) } else { + gap := getGap(missingParts, i) prev := &newMissingChunks[len(newMissingChunks)-1] prev.SourceChunk.Length += uint64(gap) + missingParts[i].SourceChunk.Length + prev.OriginFile = nil if gap > 0 { gapFile := missingFileChunk{ Gap: int64(gap),