Merge pull request #2312 from giuseppe/chunked-read-tarsplit-sequentially
chunked: use temporary file for tar-split data
This commit is contained in:
commit
ff9e524dc0
|
|
@ -216,7 +216,7 @@ type DriverWithDifferOutput struct {
|
|||
CompressedDigest digest.Digest
|
||||
Metadata string
|
||||
BigData map[string][]byte
|
||||
TarSplit []byte // nil if not available
|
||||
TarSplit *os.File // nil if not available
|
||||
TOCDigest digest.Digest
|
||||
// RootDirMode is the mode of the root directory of the layer, if specified.
|
||||
RootDirMode *os.FileMode
|
||||
|
|
@ -267,6 +267,7 @@ type DifferOptions struct {
|
|||
// This API is experimental and can be changed without bumping the major version number.
|
||||
type Differ interface {
|
||||
ApplyDiff(dest string, options *archive.TarOptions, differOpts *DifferOptions) (DriverWithDifferOutput, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
// DriverWithDiffer is the interface for direct diff access.
|
||||
|
|
|
|||
|
|
@ -2550,10 +2550,14 @@ func (r *layerStore) applyDiffFromStagingDirectory(id string, diffOutput *driver
|
|||
if err != nil {
|
||||
compressor = pgzip.NewWriter(&tsdata)
|
||||
}
|
||||
if _, err := diffOutput.TarSplit.Seek(0, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := compressor.SetConcurrency(1024*1024, 1); err != nil { // 1024*1024 is the hard-coded default; we're not changing that
|
||||
logrus.Infof("setting compression concurrency threads to 1: %v; ignoring", err)
|
||||
}
|
||||
if _, err := compressor.Write(diffOutput.TarSplit); err != nil {
|
||||
if _, err := diffOutput.TarSplit.WriteTo(compressor); err != nil {
|
||||
compressor.Close()
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"maps"
|
||||
"os"
|
||||
"slices"
|
||||
"strconv"
|
||||
"time"
|
||||
|
|
@ -18,6 +19,7 @@ import (
|
|||
"github.com/vbatts/tar-split/archive/tar"
|
||||
"github.com/vbatts/tar-split/tar/asm"
|
||||
"github.com/vbatts/tar-split/tar/storage"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -157,10 +159,33 @@ func readEstargzChunkedManifest(blobStream ImageSourceSeekable, blobSize int64,
|
|||
return manifestUncompressed, tocOffset, nil
|
||||
}
|
||||
|
||||
func openTmpFile(tmpDir string) (*os.File, error) {
|
||||
file, err := os.OpenFile(tmpDir, unix.O_TMPFILE|unix.O_RDWR|unix.O_CLOEXEC|unix.O_EXCL, 0o600)
|
||||
if err == nil {
|
||||
return file, nil
|
||||
}
|
||||
return openTmpFileNoTmpFile(tmpDir)
|
||||
}
|
||||
|
||||
// openTmpFileNoTmpFile is a fallback used by openTmpFile when the underlying file system does not
|
||||
// support O_TMPFILE.
|
||||
func openTmpFileNoTmpFile(tmpDir string) (*os.File, error) {
|
||||
file, err := os.CreateTemp(tmpDir, ".tmpfile")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Unlink the file immediately so that only the open fd refers to it.
|
||||
_ = os.Remove(file.Name())
|
||||
return file, nil
|
||||
}
|
||||
|
||||
// readZstdChunkedManifest reads the zstd:chunked manifest from the seekable stream blobStream.
|
||||
// Returns (manifest blob, parsed manifest, tar-split blob or nil, manifest offset).
|
||||
// tmpDir is a directory where the tar-split temporary file is written to. The file is opened with
|
||||
// O_TMPFILE so that it is automatically removed when it is closed.
|
||||
// Returns (manifest blob, parsed manifest, tar-split file or nil, manifest offset). The opened tar-split file
|
||||
// points to the end of the file (equivalent to Seek(0, 2)).
|
||||
// It may return an error matching ErrFallbackToOrdinaryLayerDownload / errFallbackCanConvert.
|
||||
func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Digest, annotations map[string]string) (_ []byte, _ *minimal.TOC, _ []byte, _ int64, retErr error) {
|
||||
func readZstdChunkedManifest(tmpDir string, blobStream ImageSourceSeekable, tocDigest digest.Digest, annotations map[string]string) (_ []byte, _ *minimal.TOC, _ *os.File, _ int64, retErr error) {
|
||||
offsetMetadata := annotations[minimal.ManifestInfoKey]
|
||||
if offsetMetadata == "" {
|
||||
return nil, nil, nil, 0, fmt.Errorf("%q annotation missing", minimal.ManifestInfoKey)
|
||||
|
|
@ -245,7 +270,7 @@ func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Di
|
|||
return nil, nil, nil, 0, fmt.Errorf("unmarshaling TOC: %w", err)
|
||||
}
|
||||
|
||||
var decodedTarSplit []byte = nil
|
||||
var decodedTarSplit *os.File
|
||||
if toc.TarSplitDigest != "" {
|
||||
if tarSplitChunk.Offset <= 0 {
|
||||
return nil, nil, nil, 0, fmt.Errorf("TOC requires a tar-split, but the %s annotation does not describe a position", minimal.TarSplitInfoKey)
|
||||
|
|
@ -254,14 +279,19 @@ func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Di
|
|||
if err != nil {
|
||||
return nil, nil, nil, 0, err
|
||||
}
|
||||
decodedTarSplit, err = decodeAndValidateBlob(tarSplit, tarSplitLengthUncompressed, toc.TarSplitDigest.String())
|
||||
decodedTarSplit, err = openTmpFile(tmpDir)
|
||||
if err != nil {
|
||||
return nil, nil, nil, 0, err
|
||||
}
|
||||
if err := decodeAndValidateBlobToStream(tarSplit, decodedTarSplit, toc.TarSplitDigest.String()); err != nil {
|
||||
decodedTarSplit.Close()
|
||||
return nil, nil, nil, 0, fmt.Errorf("validating and decompressing tar-split: %w", err)
|
||||
}
|
||||
// We use the TOC for creating on-disk files, but the tar-split for creating metadata
|
||||
// when exporting the layer contents. Ensure the two match, otherwise local inspection of a container
|
||||
// might be misleading about the exported contents.
|
||||
if err := ensureTOCMatchesTarSplit(toc, decodedTarSplit); err != nil {
|
||||
decodedTarSplit.Close()
|
||||
return nil, nil, nil, 0, fmt.Errorf("tar-split and TOC data is inconsistent: %w", err)
|
||||
}
|
||||
} else if tarSplitChunk.Offset > 0 {
|
||||
|
|
@ -278,7 +308,7 @@ func readZstdChunkedManifest(blobStream ImageSourceSeekable, tocDigest digest.Di
|
|||
}
|
||||
|
||||
// ensureTOCMatchesTarSplit validates that toc and tarSplit contain _exactly_ the same entries.
|
||||
func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit []byte) error {
|
||||
func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit *os.File) error {
|
||||
pendingFiles := map[string]*minimal.FileMetadata{} // Name -> an entry in toc.Entries
|
||||
for i := range toc.Entries {
|
||||
e := &toc.Entries[i]
|
||||
|
|
@ -290,7 +320,11 @@ func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit []byte) error {
|
|||
}
|
||||
}
|
||||
|
||||
unpacker := storage.NewJSONUnpacker(bytes.NewReader(tarSplit))
|
||||
if _, err := tarSplit.Seek(0, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
unpacker := storage.NewJSONUnpacker(tarSplit)
|
||||
if err := asm.IterateHeaders(unpacker, func(hdr *tar.Header) error {
|
||||
e, ok := pendingFiles[hdr.Name]
|
||||
if !ok {
|
||||
|
|
@ -320,10 +354,10 @@ func ensureTOCMatchesTarSplit(toc *minimal.TOC, tarSplit []byte) error {
|
|||
}
|
||||
|
||||
// tarSizeFromTarSplit computes the total tarball size, using only the tarSplit metadata
|
||||
func tarSizeFromTarSplit(tarSplit []byte) (int64, error) {
|
||||
func tarSizeFromTarSplit(tarSplit io.Reader) (int64, error) {
|
||||
var res int64 = 0
|
||||
|
||||
unpacker := storage.NewJSONUnpacker(bytes.NewReader(tarSplit))
|
||||
unpacker := storage.NewJSONUnpacker(tarSplit)
|
||||
for {
|
||||
entry, err := unpacker.Next()
|
||||
if err != nil {
|
||||
|
|
@ -433,22 +467,29 @@ func ensureFileMetadataAttributesMatch(a, b *minimal.FileMetadata) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func decodeAndValidateBlob(blob []byte, lengthUncompressed uint64, expectedCompressedChecksum string) ([]byte, error) {
|
||||
func validateBlob(blob []byte, expectedCompressedChecksum string) error {
|
||||
d, err := digest.Parse(expectedCompressedChecksum)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid digest %q: %w", expectedCompressedChecksum, err)
|
||||
return fmt.Errorf("invalid digest %q: %w", expectedCompressedChecksum, err)
|
||||
}
|
||||
|
||||
blobDigester := d.Algorithm().Digester()
|
||||
blobChecksum := blobDigester.Hash()
|
||||
if _, err := blobChecksum.Write(blob); err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
if blobDigester.Digest() != d {
|
||||
return nil, fmt.Errorf("invalid blob checksum, expected checksum %s, got %s", d, blobDigester.Digest())
|
||||
return fmt.Errorf("invalid blob checksum, expected checksum %s, got %s", d, blobDigester.Digest())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func decodeAndValidateBlob(blob []byte, lengthUncompressed uint64, expectedCompressedChecksum string) ([]byte, error) {
|
||||
if err := validateBlob(blob, expectedCompressedChecksum); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
decoder, err := zstd.NewReader(nil) //nolint:contextcheck
|
||||
decoder, err := zstd.NewReader(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -457,3 +498,18 @@ func decodeAndValidateBlob(blob []byte, lengthUncompressed uint64, expectedCompr
|
|||
b := make([]byte, 0, lengthUncompressed)
|
||||
return decoder.DecodeAll(blob, b)
|
||||
}
|
||||
|
||||
func decodeAndValidateBlobToStream(blob []byte, w *os.File, expectedCompressedChecksum string) error {
|
||||
if err := validateBlob(blob, expectedCompressedChecksum); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
decoder, err := zstd.NewReader(bytes.NewReader(blob)) //nolint:contextcheck
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer decoder.Close()
|
||||
|
||||
_, err = decoder.WriteTo(w)
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,9 @@ package chunked
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
|
@ -39,7 +41,28 @@ func TestTarSizeFromTarSplit(t *testing.T) {
|
|||
_, err = io.Copy(io.Discard, tsReader)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := tarSizeFromTarSplit(tarSplit.Bytes())
|
||||
res, err := tarSizeFromTarSplit(&tarSplit)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, expectedTarSize, res)
|
||||
}
|
||||
|
||||
func TestOpenTmpFile(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
for range 1000 {
|
||||
// scope for cleanup
|
||||
f := func(fn func(tmpDir string) (*os.File, error)) {
|
||||
file, err := fn(tmpDir)
|
||||
assert.NoError(t, err)
|
||||
defer file.Close()
|
||||
|
||||
path, err := os.Readlink(fmt.Sprintf("/proc/self/fd/%d", file.Fd()))
|
||||
assert.NoError(t, err)
|
||||
|
||||
// the path under /proc/self/fd/$FD has the prefix "(deleted)" when the file
|
||||
// is unlinked
|
||||
assert.Contains(t, path, "(deleted)")
|
||||
}
|
||||
f(openTmpFile)
|
||||
f(openTmpFileNoTmpFile)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ package chunked
|
|||
|
||||
import (
|
||||
archivetar "archive/tar"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
|
|
@ -89,7 +88,7 @@ type chunkedDiffer struct {
|
|||
tocOffset int64
|
||||
manifest []byte
|
||||
toc *minimal.TOC // The parsed contents of manifest, or nil if not yet available
|
||||
tarSplit []byte
|
||||
tarSplit *os.File
|
||||
uncompressedTarSize int64 // -1 if unknown
|
||||
// skipValidation is set to true if the individual files in
|
||||
// the layer are trusted and should not be validated.
|
||||
|
|
@ -164,13 +163,11 @@ func (c *chunkedDiffer) convertTarToZstdChunked(destDirectory string, payload *o
|
|||
|
||||
defer diff.Close()
|
||||
|
||||
fd, err := unix.Open(destDirectory, unix.O_TMPFILE|unix.O_RDWR|unix.O_CLOEXEC, 0o600)
|
||||
f, err := openTmpFile(destDirectory)
|
||||
if err != nil {
|
||||
return 0, nil, "", nil, &fs.PathError{Op: "open", Path: destDirectory, Err: err}
|
||||
return 0, nil, "", nil, err
|
||||
}
|
||||
|
||||
f := os.NewFile(uintptr(fd), destDirectory)
|
||||
|
||||
newAnnotations := make(map[string]string)
|
||||
level := 1
|
||||
chunked, err := compressor.ZstdCompressor(f, newAnnotations, &level)
|
||||
|
|
@ -193,6 +190,15 @@ func (c *chunkedDiffer) convertTarToZstdChunked(destDirectory string, payload *o
|
|||
return copied, newSeekableFile(f), convertedOutputDigester.Digest(), newAnnotations, nil
|
||||
}
|
||||
|
||||
func (c *chunkedDiffer) Close() error {
|
||||
if c.tarSplit != nil {
|
||||
err := c.tarSplit.Close()
|
||||
c.tarSplit = nil
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDiffer returns a differ than can be used with ApplyDiffWithDiffer.
|
||||
// If it returns an error that matches ErrFallbackToOrdinaryLayerDownload, the caller can
|
||||
// retry the operation with a different method.
|
||||
|
|
@ -333,13 +339,16 @@ func makeConvertFromRawDiffer(store storage.Store, blobDigest digest.Digest, blo
|
|||
// makeZstdChunkedDiffer sets up a chunkedDiffer for a zstd:chunked layer.
|
||||
// It may return an error matching ErrFallbackToOrdinaryLayerDownload / errFallbackCanConvert.
|
||||
func makeZstdChunkedDiffer(store storage.Store, blobSize int64, tocDigest digest.Digest, annotations map[string]string, iss ImageSourceSeekable, pullOptions pullOptions) (*chunkedDiffer, error) {
|
||||
manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(iss, tocDigest, annotations)
|
||||
manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(store.RunRoot(), iss, tocDigest, annotations)
|
||||
if err != nil { // May be ErrFallbackToOrdinaryLayerDownload / errFallbackCanConvert
|
||||
return nil, fmt.Errorf("read zstd:chunked manifest: %w", err)
|
||||
}
|
||||
|
||||
var uncompressedTarSize int64 = -1
|
||||
if tarSplit != nil {
|
||||
if _, err := tarSplit.Seek(0, 0); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
uncompressedTarSize, err = tarSizeFromTarSplit(tarSplit)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("computing size from tar-split: %w", err)
|
||||
|
|
@ -1435,7 +1444,7 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions, diff
|
|||
if tocDigest == nil {
|
||||
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("internal error: just-created zstd:chunked missing TOC digest")
|
||||
}
|
||||
manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(fileSource, *tocDigest, annotations)
|
||||
manifest, toc, tarSplit, tocOffset, err := readZstdChunkedManifest(dest, fileSource, *tocDigest, annotations)
|
||||
if err != nil {
|
||||
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("read zstd:chunked manifest: %w", err)
|
||||
}
|
||||
|
|
@ -1842,7 +1851,10 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions, diff
|
|||
case c.pullOptions.insecureAllowUnpredictableImageContents:
|
||||
// Oh well. Skip the costly digest computation.
|
||||
case output.TarSplit != nil:
|
||||
metadata := tsStorage.NewJSONUnpacker(bytes.NewReader(output.TarSplit))
|
||||
if _, err := output.TarSplit.Seek(0, 0); err != nil {
|
||||
return output, err
|
||||
}
|
||||
metadata := tsStorage.NewJSONUnpacker(output.TarSplit)
|
||||
fg := newStagedFileGetter(dirFile, flatPathNameMap)
|
||||
digester := digest.Canonical.Digester()
|
||||
if err := asm.WriteOutputTarStream(fg, metadata, digester.Hash()); err != nil {
|
||||
|
|
|
|||
|
|
@ -179,7 +179,7 @@ func TestGenerateAndParseManifest(t *testing.T) {
|
|||
tocDigest, err := toc.GetTOCDigest(annotations)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, tocDigest)
|
||||
manifest, decodedTOC, _, _, err := readZstdChunkedManifest(s, *tocDigest, annotations)
|
||||
manifest, decodedTOC, _, _, err := readZstdChunkedManifest(t.TempDir(), s, *tocDigest, annotations)
|
||||
require.NoError(t, err)
|
||||
|
||||
var toc minimal.TOC
|
||||
|
|
|
|||
Loading…
Reference in New Issue