chunked: support copy from uncompressed stream
Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
This commit is contained in:
parent
22ba9b01ef
commit
24b99d165f
|
|
@ -8,7 +8,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math"
|
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
|
@ -43,8 +42,9 @@ const (
|
||||||
containersOverrideXattr = "user.containers.override_stat"
|
containersOverrideXattr = "user.containers.override_stat"
|
||||||
bigDataKey = "zstd-chunked-manifest"
|
bigDataKey = "zstd-chunked-manifest"
|
||||||
|
|
||||||
fileTypeZstdChunked = iota
|
fileTypeZstdChunked = iota
|
||||||
fileTypeEstargz = iota
|
fileTypeEstargz = iota
|
||||||
|
fileTypeNoCompression = iota
|
||||||
)
|
)
|
||||||
|
|
||||||
type compressedFileType int
|
type compressedFileType int
|
||||||
|
|
@ -783,8 +783,8 @@ func openOrCreateDirUnderRoot(name string, dirfd int, mode os.FileMode) (*os.Fil
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *chunkedDiffer) appendCompressedStreamToFile(destFile *destinationFile, reader io.Reader, size int64) (err error) {
|
func (c *chunkedDiffer) appendCompressedStreamToFile(compression compressedFileType, destFile *destinationFile, reader io.Reader, size int64) (err error) {
|
||||||
switch c.fileType {
|
switch compression {
|
||||||
case fileTypeZstdChunked:
|
case fileTypeZstdChunked:
|
||||||
z, err := zstd.NewReader(reader)
|
z, err := zstd.NewReader(reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -818,6 +818,14 @@ func (c *chunkedDiffer) appendCompressedStreamToFile(destFile *destinationFile,
|
||||||
if _, err := io.Copy(ioutil.Discard, reader); err != nil {
|
if _, err := io.Copy(ioutil.Discard, reader); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
case fileTypeNoCompression:
|
||||||
|
_, err := io.Copy(destFile.to, io.LimitReader(reader, size))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := io.Copy(ioutil.Discard, reader); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("unknown file type %q", c.fileType)
|
return fmt.Errorf("unknown file type %q", c.fileType)
|
||||||
}
|
}
|
||||||
|
|
@ -869,6 +877,7 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan
|
||||||
var destFile *destinationFile
|
var destFile *destinationFile
|
||||||
for _, missingPart := range missingParts {
|
for _, missingPart := range missingParts {
|
||||||
var part io.ReadCloser
|
var part io.ReadCloser
|
||||||
|
compression := c.fileType
|
||||||
switch {
|
switch {
|
||||||
case missingPart.SourceChunk != nil:
|
case missingPart.SourceChunk != nil:
|
||||||
select {
|
select {
|
||||||
|
|
@ -886,6 +895,7 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
compression = fileTypeNoCompression
|
||||||
default:
|
default:
|
||||||
return errors.Errorf("internal error: missing part misses both local and remote data stream")
|
return errors.Errorf("internal error: missing part misses both local and remote data stream")
|
||||||
}
|
}
|
||||||
|
|
@ -925,7 +935,7 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan
|
||||||
|
|
||||||
limitReader := io.LimitReader(part, mf.CompressedSize)
|
limitReader := io.LimitReader(part, mf.CompressedSize)
|
||||||
|
|
||||||
if err := c.appendCompressedStreamToFile(destFile, limitReader, mf.UncompressedSize); err != nil {
|
if err := c.appendCompressedStreamToFile(compression, destFile, limitReader, mf.UncompressedSize); err != nil {
|
||||||
part.Close()
|
part.Close()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -946,34 +956,41 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart {
|
||||||
}
|
}
|
||||||
|
|
||||||
getGap := func(missingParts []missingPart, i int) int {
|
getGap := func(missingParts []missingPart, i int) int {
|
||||||
if missingParts[i-1].SourceChunk == nil || missingParts[i].SourceChunk == nil {
|
|
||||||
return math.MaxInt32
|
|
||||||
}
|
|
||||||
|
|
||||||
prev := missingParts[i-1].SourceChunk.Offset + missingParts[i-1].SourceChunk.Length
|
prev := missingParts[i-1].SourceChunk.Offset + missingParts[i-1].SourceChunk.Length
|
||||||
return int(missingParts[i].SourceChunk.Offset - prev)
|
return int(missingParts[i].SourceChunk.Offset - prev)
|
||||||
}
|
}
|
||||||
|
getCost := func(missingParts []missingPart, i int) int {
|
||||||
|
cost := getGap(missingParts, i)
|
||||||
|
if missingParts[i-1].OriginFile != nil {
|
||||||
|
cost += int(missingParts[i-1].SourceChunk.Length)
|
||||||
|
}
|
||||||
|
if missingParts[i].OriginFile != nil {
|
||||||
|
cost += int(missingParts[i].SourceChunk.Length)
|
||||||
|
}
|
||||||
|
return cost
|
||||||
|
}
|
||||||
|
|
||||||
// this implementation doesn't account for duplicates, so it could merge
|
// this implementation doesn't account for duplicates, so it could merge
|
||||||
// more than necessary to reach the specified target. Since target itself
|
// more than necessary to reach the specified target. Since target itself
|
||||||
// is a heuristic value, it doesn't matter.
|
// is a heuristic value, it doesn't matter.
|
||||||
var gaps []int
|
costs := make([]int, len(missingParts)-1)
|
||||||
for i := 1; i < len(missingParts); i++ {
|
for i := 1; i < len(missingParts); i++ {
|
||||||
gaps = append(gaps, getGap(missingParts, i))
|
costs[i-1] = getCost(missingParts, i)
|
||||||
}
|
}
|
||||||
sort.Ints(gaps)
|
sort.Ints(costs)
|
||||||
|
|
||||||
toShrink := len(missingParts) - target
|
toShrink := len(missingParts) - target
|
||||||
targetValue := gaps[toShrink-1]
|
targetValue := costs[toShrink]
|
||||||
|
|
||||||
newMissingChunks := missingParts[0:1]
|
newMissingChunks := missingParts[0:1]
|
||||||
for i := 1; i < len(missingParts); i++ {
|
for i := 1; i < len(missingParts); i++ {
|
||||||
gap := getGap(missingParts, i)
|
if getCost(missingParts, i) > targetValue {
|
||||||
if gap > targetValue {
|
|
||||||
newMissingChunks = append(newMissingChunks, missingParts[i])
|
newMissingChunks = append(newMissingChunks, missingParts[i])
|
||||||
} else {
|
} else {
|
||||||
|
gap := getGap(missingParts, i)
|
||||||
prev := &newMissingChunks[len(newMissingChunks)-1]
|
prev := &newMissingChunks[len(newMissingChunks)-1]
|
||||||
prev.SourceChunk.Length += uint64(gap) + missingParts[i].SourceChunk.Length
|
prev.SourceChunk.Length += uint64(gap) + missingParts[i].SourceChunk.Length
|
||||||
|
prev.OriginFile = nil
|
||||||
if gap > 0 {
|
if gap > 0 {
|
||||||
gapFile := missingFileChunk{
|
gapFile := missingFileChunk{
|
||||||
Gap: int64(gap),
|
Gap: int64(gap),
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue