chunked: support converting existing images
if the "convert_images" option is set in the configuration file, then convert traditional images to the chunked format on the fly. This is very expensive at the moment since the entire zstd:chunked file is created and then processed. Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
This commit is contained in:
parent
54f574cfe3
commit
c98840f31d
|
|
@ -3,7 +3,6 @@ package chunked
|
|||
import (
|
||||
archivetar "archive/tar"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
|
@ -150,7 +149,7 @@ func readEstargzChunkedManifest(blobStream ImageSourceSeekable, blobSize int64,
|
|||
// readZstdChunkedManifest reads the zstd:chunked manifest from the seekable stream blobStream. The blob total size must
|
||||
// be specified.
|
||||
// This function uses the io.github.containers.zstd-chunked. annotations when specified.
|
||||
func readZstdChunkedManifest(ctx context.Context, blobStream ImageSourceSeekable, blobSize int64, annotations map[string]string) ([]byte, []byte, int64, error) {
|
||||
func readZstdChunkedManifest(blobStream ImageSourceSeekable, blobSize int64, annotations map[string]string) ([]byte, []byte, int64, error) {
|
||||
footerSize := int64(internal.FooterSizeSupported)
|
||||
if blobSize <= footerSize {
|
||||
return nil, nil, 0, errors.New("blob too small")
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import (
|
|||
graphdriver "github.com/containers/storage/drivers"
|
||||
driversCopy "github.com/containers/storage/drivers/copy"
|
||||
"github.com/containers/storage/pkg/archive"
|
||||
"github.com/containers/storage/pkg/chunked/compressor"
|
||||
"github.com/containers/storage/pkg/chunked/internal"
|
||||
"github.com/containers/storage/pkg/idtools"
|
||||
"github.com/containers/storage/pkg/system"
|
||||
|
|
@ -69,7 +70,20 @@ type chunkedDiffer struct {
|
|||
zstdReader *zstd.Decoder
|
||||
rawReader io.Reader
|
||||
|
||||
tocDigest digest.Digest
|
||||
// contentDigest is the digest of the uncompressed content
|
||||
// (diffID) when the layer is fully retrieved. If the layer
|
||||
// is not fully retrieved, instead of using the digest of the
|
||||
// uncompressed content, it refers to the digest of the TOC.
|
||||
contentDigest digest.Digest
|
||||
|
||||
// convertedToZstdChunked is set to true if the layer needs to
|
||||
// be converted to the zstd:chunked format before it can be
|
||||
// handled.
|
||||
convertToZstdChunked bool
|
||||
|
||||
blobSize int64
|
||||
|
||||
storeOpts *types.StoreOptions
|
||||
}
|
||||
|
||||
var xattrsToIgnore = map[string]interface{}{
|
||||
|
|
@ -149,15 +163,15 @@ func copyFileContent(srcFd int, destFile string, dirfd int, mode os.FileMode, us
|
|||
// GetTOCDigest returns the digest of the TOC as recorded in the annotations.
|
||||
// This is an experimental feature and may be changed/removed in the future.
|
||||
func GetTOCDigest(annotations map[string]string) (*digest.Digest, error) {
|
||||
if tocDigest, ok := annotations[estargz.TOCJSONDigestAnnotation]; ok {
|
||||
d, err := digest.Parse(tocDigest)
|
||||
if contentDigest, ok := annotations[estargz.TOCJSONDigestAnnotation]; ok {
|
||||
d, err := digest.Parse(contentDigest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &d, nil
|
||||
}
|
||||
if tocDigest, ok := annotations[internal.ManifestChecksumKey]; ok {
|
||||
d, err := digest.Parse(tocDigest)
|
||||
if contentDigest, ok := annotations[internal.ManifestChecksumKey]; ok {
|
||||
d, err := digest.Parse(contentDigest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -166,19 +180,132 @@ func GetTOCDigest(annotations map[string]string) (*digest.Digest, error) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// GetDiffer returns a differ than can be used with ApplyDiffWithDiffer.
|
||||
func GetDiffer(ctx context.Context, store storage.Store, blobSize int64, annotations map[string]string, iss ImageSourceSeekable) (graphdriver.Differ, error) {
|
||||
if _, ok := annotations[internal.ManifestChecksumKey]; ok {
|
||||
return makeZstdChunkedDiffer(ctx, store, blobSize, annotations, iss)
|
||||
}
|
||||
if _, ok := annotations[estargz.TOCJSONDigestAnnotation]; ok {
|
||||
return makeEstargzChunkedDiffer(ctx, store, blobSize, annotations, iss)
|
||||
}
|
||||
return nil, errors.New("blob type not supported for partial retrieval")
|
||||
type seekableFile struct {
|
||||
file *os.File
|
||||
}
|
||||
|
||||
func makeZstdChunkedDiffer(ctx context.Context, store storage.Store, blobSize int64, annotations map[string]string, iss ImageSourceSeekable) (*chunkedDiffer, error) {
|
||||
manifest, tarSplit, tocOffset, err := readZstdChunkedManifest(ctx, iss, blobSize, annotations)
|
||||
func (f *seekableFile) Close() error {
|
||||
return f.file.Close()
|
||||
}
|
||||
|
||||
func (f *seekableFile) GetBlobAt(chunks []ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
|
||||
streams := make(chan io.ReadCloser)
|
||||
errs := make(chan error)
|
||||
|
||||
go func() {
|
||||
for _, chunk := range chunks {
|
||||
streams <- io.NopCloser(io.NewSectionReader(f.file, int64(chunk.Offset), int64(chunk.Length)))
|
||||
}
|
||||
close(streams)
|
||||
close(errs)
|
||||
}()
|
||||
|
||||
return streams, errs, nil
|
||||
}
|
||||
|
||||
func convertTarToZstdChunked(destDirectory string, blobSize int64, iss ImageSourceSeekable) (*seekableFile, digest.Digest, map[string]string, error) {
|
||||
var payload io.ReadCloser
|
||||
var streams chan io.ReadCloser
|
||||
var errs chan error
|
||||
var err error
|
||||
|
||||
chunksToRequest := []ImageSourceChunk{
|
||||
{
|
||||
Offset: 0,
|
||||
Length: uint64(blobSize),
|
||||
},
|
||||
}
|
||||
|
||||
streams, errs, err = iss.GetBlobAt(chunksToRequest)
|
||||
if err != nil {
|
||||
return nil, "", nil, err
|
||||
}
|
||||
select {
|
||||
case p := <-streams:
|
||||
payload = p
|
||||
case err := <-errs:
|
||||
return nil, "", nil, err
|
||||
}
|
||||
if payload == nil {
|
||||
return nil, "", nil, errors.New("invalid stream returned")
|
||||
}
|
||||
|
||||
diff, err := archive.DecompressStream(payload)
|
||||
if err != nil {
|
||||
return nil, "", nil, err
|
||||
}
|
||||
|
||||
fd, err := unix.Open(destDirectory, unix.O_TMPFILE|unix.O_RDWR|unix.O_CLOEXEC, 0o600)
|
||||
if err != nil {
|
||||
return nil, "", nil, err
|
||||
}
|
||||
|
||||
f := os.NewFile(uintptr(fd), destDirectory)
|
||||
|
||||
newAnnotations := make(map[string]string)
|
||||
level := 1
|
||||
chunked, err := compressor.ZstdCompressor(f, newAnnotations, &level)
|
||||
if err != nil {
|
||||
f.Close()
|
||||
return nil, "", nil, err
|
||||
}
|
||||
|
||||
digester := digest.Canonical.Digester()
|
||||
hash := digester.Hash()
|
||||
|
||||
if _, err := io.Copy(io.MultiWriter(chunked, hash), diff); err != nil {
|
||||
f.Close()
|
||||
return nil, "", nil, err
|
||||
}
|
||||
if err := chunked.Close(); err != nil {
|
||||
f.Close()
|
||||
return nil, "", nil, err
|
||||
}
|
||||
is := seekableFile{
|
||||
file: f,
|
||||
}
|
||||
return &is, digester.Digest(), newAnnotations, nil
|
||||
}
|
||||
|
||||
// GetDiffer returns a differ than can be used with ApplyDiffWithDiffer.
|
||||
func GetDiffer(ctx context.Context, store storage.Store, blobSize int64, annotations map[string]string, iss ImageSourceSeekable) (graphdriver.Differ, error) {
|
||||
storeOpts, err := types.DefaultStoreOptionsAutoDetectUID()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if _, ok := annotations[internal.ManifestChecksumKey]; ok {
|
||||
return makeZstdChunkedDiffer(ctx, store, blobSize, annotations, iss, &storeOpts)
|
||||
}
|
||||
if _, ok := annotations[estargz.TOCJSONDigestAnnotation]; ok {
|
||||
return makeEstargzChunkedDiffer(ctx, store, blobSize, annotations, iss, &storeOpts)
|
||||
}
|
||||
|
||||
return makeConvertFromRawDiffer(ctx, store, blobSize, annotations, iss, &storeOpts)
|
||||
}
|
||||
|
||||
func makeConvertFromRawDiffer(ctx context.Context, store storage.Store, blobSize int64, annotations map[string]string, iss ImageSourceSeekable, storeOpts *types.StoreOptions) (*chunkedDiffer, error) {
|
||||
if !parseBooleanPullOption(storeOpts, "convert_images", false) {
|
||||
return nil, errors.New("convert_images not configured")
|
||||
}
|
||||
|
||||
layersCache, err := getLayersCache(store)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &chunkedDiffer{
|
||||
blobSize: blobSize,
|
||||
convertToZstdChunked: true,
|
||||
copyBuffer: makeCopyBuffer(),
|
||||
layersCache: layersCache,
|
||||
storeOpts: storeOpts,
|
||||
stream: iss,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func makeZstdChunkedDiffer(ctx context.Context, store storage.Store, blobSize int64, annotations map[string]string, iss ImageSourceSeekable, storeOpts *types.StoreOptions) (*chunkedDiffer, error) {
|
||||
manifest, tarSplit, tocOffset, err := readZstdChunkedManifest(iss, blobSize, annotations)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read zstd:chunked manifest: %w", err)
|
||||
}
|
||||
|
|
@ -187,24 +314,26 @@ func makeZstdChunkedDiffer(ctx context.Context, store storage.Store, blobSize in
|
|||
return nil, err
|
||||
}
|
||||
|
||||
tocDigest, err := digest.Parse(annotations[internal.ManifestChecksumKey])
|
||||
contentDigest, err := digest.Parse(annotations[internal.ManifestChecksumKey])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse TOC digest %q: %w", annotations[internal.ManifestChecksumKey], err)
|
||||
}
|
||||
|
||||
return &chunkedDiffer{
|
||||
copyBuffer: makeCopyBuffer(),
|
||||
fileType: fileTypeZstdChunked,
|
||||
layersCache: layersCache,
|
||||
manifest: manifest,
|
||||
stream: iss,
|
||||
tarSplit: tarSplit,
|
||||
tocOffset: tocOffset,
|
||||
tocDigest: tocDigest,
|
||||
blobSize: blobSize,
|
||||
contentDigest: contentDigest,
|
||||
copyBuffer: makeCopyBuffer(),
|
||||
fileType: fileTypeZstdChunked,
|
||||
layersCache: layersCache,
|
||||
manifest: manifest,
|
||||
storeOpts: storeOpts,
|
||||
stream: iss,
|
||||
tarSplit: tarSplit,
|
||||
tocOffset: tocOffset,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func makeEstargzChunkedDiffer(ctx context.Context, store storage.Store, blobSize int64, annotations map[string]string, iss ImageSourceSeekable) (*chunkedDiffer, error) {
|
||||
func makeEstargzChunkedDiffer(ctx context.Context, store storage.Store, blobSize int64, annotations map[string]string, iss ImageSourceSeekable, storeOpts *types.StoreOptions) (*chunkedDiffer, error) {
|
||||
manifest, tocOffset, err := readEstargzChunkedManifest(iss, blobSize, annotations)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read zstd:chunked manifest: %w", err)
|
||||
|
|
@ -214,19 +343,21 @@ func makeEstargzChunkedDiffer(ctx context.Context, store storage.Store, blobSize
|
|||
return nil, err
|
||||
}
|
||||
|
||||
tocDigest, err := digest.Parse(annotations[estargz.TOCJSONDigestAnnotation])
|
||||
contentDigest, err := digest.Parse(annotations[estargz.TOCJSONDigestAnnotation])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse TOC digest %q: %w", annotations[estargz.TOCJSONDigestAnnotation], err)
|
||||
}
|
||||
|
||||
return &chunkedDiffer{
|
||||
copyBuffer: makeCopyBuffer(),
|
||||
stream: iss,
|
||||
manifest: manifest,
|
||||
layersCache: layersCache,
|
||||
tocOffset: tocOffset,
|
||||
fileType: fileTypeEstargz,
|
||||
tocDigest: tocDigest,
|
||||
blobSize: blobSize,
|
||||
contentDigest: contentDigest,
|
||||
copyBuffer: makeCopyBuffer(),
|
||||
fileType: fileTypeEstargz,
|
||||
layersCache: layersCache,
|
||||
manifest: manifest,
|
||||
storeOpts: storeOpts,
|
||||
stream: iss,
|
||||
tocOffset: tocOffset,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
@ -1076,7 +1207,7 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart {
|
|||
return newMissingParts
|
||||
}
|
||||
|
||||
func (c *chunkedDiffer) retrieveMissingFiles(dest string, dirfd int, missingParts []missingPart, options *archive.TarOptions) error {
|
||||
func (c *chunkedDiffer) retrieveMissingFiles(stream ImageSourceSeekable, dest string, dirfd int, missingParts []missingPart, options *archive.TarOptions) error {
|
||||
var chunksToRequest []ImageSourceChunk
|
||||
|
||||
calculateChunksToRequest := func() {
|
||||
|
|
@ -1095,7 +1226,7 @@ func (c *chunkedDiffer) retrieveMissingFiles(dest string, dirfd int, missingPart
|
|||
var err error
|
||||
var errs chan error
|
||||
for {
|
||||
streams, errs, err = c.stream.GetBlobAt(chunksToRequest)
|
||||
streams, errs, err = stream.GetBlobAt(chunksToRequest)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
|
@ -1372,6 +1503,35 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions, diff
|
|||
}
|
||||
}()
|
||||
|
||||
// stream to use for reading the zstd:chunked or Estargz file.
|
||||
stream := c.stream
|
||||
|
||||
if c.convertToZstdChunked {
|
||||
fileSource, diffID, annotations, err := convertTarToZstdChunked(dest, c.blobSize, c.stream)
|
||||
if err != nil {
|
||||
return graphdriver.DriverWithDifferOutput{}, err
|
||||
}
|
||||
// fileSource is a O_TMPFILE file descriptor, so we
|
||||
// need to keep it open until the entire file is processed.
|
||||
defer fileSource.Close()
|
||||
|
||||
manifest, tarSplit, tocOffset, err := readZstdChunkedManifest(fileSource, c.blobSize, annotations)
|
||||
if err != nil {
|
||||
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("read zstd:chunked manifest: %w", err)
|
||||
}
|
||||
|
||||
// Use the new file for accessing the zstd:chunked file.
|
||||
stream = fileSource
|
||||
|
||||
// fill the chunkedDiffer with the data we just read.
|
||||
c.fileType = fileTypeZstdChunked
|
||||
c.manifest = manifest
|
||||
c.tarSplit = tarSplit
|
||||
// since we retrieved the whole file and it was validated, use the diffID instead of the TOC digest.
|
||||
c.contentDigest = diffID
|
||||
c.tocOffset = tocOffset
|
||||
}
|
||||
|
||||
lcd := chunkedLayerData{
|
||||
Format: differOpts.Format,
|
||||
}
|
||||
|
|
@ -1388,24 +1548,19 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions, diff
|
|||
bigDataKey: c.manifest,
|
||||
chunkedLayerDataKey: lcdBigData,
|
||||
},
|
||||
TOCDigest: c.tocDigest,
|
||||
TOCDigest: c.contentDigest,
|
||||
}
|
||||
|
||||
storeOpts, err := types.DefaultStoreOptionsAutoDetectUID()
|
||||
if err != nil {
|
||||
return output, err
|
||||
}
|
||||
|
||||
if !parseBooleanPullOption(&storeOpts, "enable_partial_images", false) {
|
||||
if !parseBooleanPullOption(c.storeOpts, "enable_partial_images", false) {
|
||||
return output, errors.New("enable_partial_images not configured")
|
||||
}
|
||||
|
||||
// When the hard links deduplication is used, file attributes are ignored because setting them
|
||||
// modifies the source file as well.
|
||||
useHardLinks := parseBooleanPullOption(&storeOpts, "use_hard_links", false)
|
||||
useHardLinks := parseBooleanPullOption(c.storeOpts, "use_hard_links", false)
|
||||
|
||||
// List of OSTree repositories to use for deduplication
|
||||
ostreeRepos := strings.Split(storeOpts.PullOptions["ostree_repos"], ":")
|
||||
ostreeRepos := strings.Split(c.storeOpts.PullOptions["ostree_repos"], ":")
|
||||
|
||||
// Generate the manifest
|
||||
toc, err := unmarshalToc(c.manifest)
|
||||
|
|
@ -1694,7 +1849,7 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions, diff
|
|||
// There are some missing files. Prepare a multirange request for the missing chunks.
|
||||
if len(missingParts) > 0 {
|
||||
missingParts = mergeMissingChunks(missingParts, maxNumberMissingChunks)
|
||||
if err := c.retrieveMissingFiles(dest, dirfd, missingParts, options); err != nil {
|
||||
if err := c.retrieveMissingFiles(stream, dest, dirfd, missingParts, options); err != nil {
|
||||
return output, err
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ package chunked
|
|||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
|
|
@ -161,7 +160,7 @@ func TestGenerateAndParseManifest(t *testing.T) {
|
|||
t: t,
|
||||
}
|
||||
|
||||
manifest, _, _, err := readZstdChunkedManifest(context.TODO(), s, 8192, annotations)
|
||||
manifest, _, _, err := readZstdChunkedManifest(s, 8192, annotations)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue