chunked: allow streaming to the same file

Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
This commit is contained in:
Giuseppe Scrivano 2021-12-20 15:25:14 +01:00
parent 20282b354b
commit 8e67467c2f
No known key found for this signature in database
GPG Key ID: 67E38F7A8BA21772
1 changed files with 88 additions and 35 deletions

View File

@ -8,6 +8,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"os"
"path/filepath"
"reflect"
@ -498,15 +499,16 @@ func maybeDoIDRemap(manifest []internal.FileMetadata, options *archive.TarOption
}
type missingFileChunk struct {
File *internal.FileMetadata
Gap int64
Gap int64
Offset int64
Size int64
File *internal.FileMetadata
CompressedSize int64
UncompressedSize int64
}
type missingPart struct {
SourceChunk ImageSourceChunk
SourceChunk *ImageSourceChunk
Chunks []missingFileChunk
}
@ -754,23 +756,7 @@ func openOrCreateDirUnderRoot(name string, dirfd int, mode os.FileMode) (*os.Fil
return nil, err
}
func (c *chunkedDiffer) createFileFromCompressedStream(dest string, dirfd int, reader io.Reader, metadata *internal.FileMetadata, options *archive.TarOptions) (err error) {
mode := os.FileMode(metadata.Mode)
file, err := openFileUnderRoot(metadata.Name, dirfd, newFileFlags, 0)
if err != nil {
return err
}
defer func() {
err2 := file.Close()
if err == nil {
err = err2
}
}()
digester := digest.Canonical.Digester()
checksum := digester.Hash()
to := io.MultiWriter(file, checksum)
func (c *chunkedDiffer) appendCompressedStreamToFile(destFile *destinationFile, reader io.Reader, size int64) (err error) {
switch c.fileType {
case fileTypeZstdChunked:
z, err := zstd.NewReader(reader)
@ -779,7 +765,7 @@ func (c *chunkedDiffer) createFileFromCompressedStream(dest string, dirfd int, r
}
defer z.Close()
if _, err := io.Copy(to, io.LimitReader(z, metadata.Size)); err != nil {
if _, err := io.Copy(destFile.to, io.LimitReader(z, size)); err != nil {
return err
}
if _, err := io.Copy(ioutil.Discard, reader); err != nil {
@ -799,7 +785,7 @@ func (c *chunkedDiffer) createFileFromCompressedStream(dest string, dirfd int, r
}
defer c.gzipReader.Close()
if _, err := io.Copy(to, io.LimitReader(c.gzipReader, metadata.Size)); err != nil {
if _, err := io.Copy(destFile.to, io.LimitReader(c.gzipReader, size)); err != nil {
return err
}
if _, err := io.Copy(ioutil.Discard, reader); err != nil {
@ -808,18 +794,52 @@ func (c *chunkedDiffer) createFileFromCompressedStream(dest string, dirfd int, r
default:
return fmt.Errorf("unknown file type %q", c.fileType)
}
return nil
}
manifestChecksum, err := digest.Parse(metadata.Digest)
type destinationFile struct {
dirfd int
file *os.File
digester digest.Digester
to io.Writer
metadata *internal.FileMetadata
options *archive.TarOptions
}
func openDestinationFile(dirfd int, metadata *internal.FileMetadata, options *archive.TarOptions) (*destinationFile, error) {
file, err := openFileUnderRoot(metadata.Name, dirfd, newFileFlags, 0)
if err != nil {
return nil, err
}
digester := digest.Canonical.Digester()
to := io.MultiWriter(file, digester.Hash())
return &destinationFile{
file: file,
digester: digester,
to: to,
metadata: metadata,
options: options,
dirfd: dirfd,
}, nil
}
func (d *destinationFile) Close() error {
manifestChecksum, err := digest.Parse(d.metadata.Digest)
if err != nil {
return err
}
if digester.Digest() != manifestChecksum {
return fmt.Errorf("checksum mismatch for %q", dest)
if d.digester.Digest() != manifestChecksum {
return fmt.Errorf("checksum mismatch for %q (got %q instead of %q)", d.file.Name(), d.digester.Digest(), manifestChecksum)
}
return setFileAttrs(dirfd, file, mode, metadata, options, false)
return setFileAttrs(d.dirfd, d.file, os.FileMode(d.metadata.Mode), d.metadata, d.options, false)
}
func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan error, dest string, dirfd int, missingParts []missingPart, options *archive.TarOptions) error {
var destFile *destinationFile
for mc := 0; ; mc++ {
var part io.ReadCloser
select {
@ -850,15 +870,42 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan
continue
}
limitReader := io.LimitReader(part, mf.Size)
if mf.File.Name == "" {
part.Close()
return errors.Errorf("file name empty")
}
if err := c.createFileFromCompressedStream(dest, dirfd, limitReader, mf.File, options); err != nil {
// Open the new file if it is different that what is already
// opened
if destFile == nil || destFile.metadata.Name != mf.File.Name {
if destFile != nil {
if err := destFile.Close(); err != nil {
part.Close()
return err
}
}
var err error
destFile, err = openDestinationFile(dirfd, mf.File, options)
if err != nil {
part.Close()
return err
}
}
limitReader := io.LimitReader(part, mf.CompressedSize)
if err := c.appendCompressedStreamToFile(destFile, limitReader, mf.UncompressedSize); err != nil {
part.Close()
return err
}
}
part.Close()
}
if destFile != nil {
return destFile.Close()
}
return nil
}
@ -868,6 +915,10 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart {
}
getGap := func(missingParts []missingPart, i int) int {
if missingParts[i-1].SourceChunk == nil || missingParts[i].SourceChunk == nil {
return math.MaxInt32
}
prev := missingParts[i-1].SourceChunk.Offset + missingParts[i-1].SourceChunk.Length
return int(missingParts[i].SourceChunk.Offset - prev)
}
@ -907,7 +958,9 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart {
func (c *chunkedDiffer) retrieveMissingFiles(dest string, dirfd int, missingParts []missingPart, options *archive.TarOptions) error {
var chunksToRequest []ImageSourceChunk
for _, c := range missingParts {
chunksToRequest = append(chunksToRequest, c.SourceChunk)
if c.SourceChunk != nil {
chunksToRequest = append(chunksToRequest, *c.SourceChunk)
}
}
// There are some missing files. Prepare a multirange request for the missing chunks.
@ -1325,13 +1378,13 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions) (gra
}
file := missingFileChunk{
File: &mergedEntries[i],
Offset: 0,
Size: mergedEntries[i].EndOffset - mergedEntries[i].Offset,
File: &mergedEntries[i],
CompressedSize: mergedEntries[i].EndOffset - mergedEntries[i].Offset,
UncompressedSize: r.Size,
}
missingParts = append(missingParts, missingPart{
SourceChunk: rawChunk,
SourceChunk: &rawChunk,
Chunks: []missingFileChunk{
file,
},