compression: allow to specify the compression format

add the possibility to choose what compression format must be used and
the compression level to use.

Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
This commit is contained in:
Giuseppe Scrivano 2019-06-11 22:36:42 +02:00
parent f89e6fbb02
commit 58c8793f8e
No known key found for this signature in database
GPG Key ID: E4730F97F60286ED
3 changed files with 104 additions and 21 deletions

View File

@ -21,7 +21,6 @@ import (
"github.com/containers/image/signature"
"github.com/containers/image/transports"
"github.com/containers/image/types"
"github.com/klauspost/pgzip"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -86,14 +85,16 @@ func (d *digestingReader) Read(p []byte) (int, error) {
// copier allows us to keep track of diffID values for blobs, and other
// data shared across one or more images in a possible manifest list.
type copier struct {
dest types.ImageDestination
rawSource types.ImageSource
reportWriter io.Writer
progressOutput io.Writer
progressInterval time.Duration
progress chan types.ProgressProperties
blobInfoCache types.BlobInfoCache
copyInParallel bool
dest types.ImageDestination
rawSource types.ImageSource
reportWriter io.Writer
progressOutput io.Writer
progressInterval time.Duration
progress chan types.ProgressProperties
blobInfoCache types.BlobInfoCache
copyInParallel bool
compressionFormat string
compressionLevel *int
}
// imageCopier tracks state specific to a single image (possibly an item of a manifest list)
@ -178,6 +179,9 @@ func Image(ctx context.Context, policyContext *signature.PolicyContext, destRef,
// For now, use DestinationCtx (because blob reuse changes the behavior of the destination side more); eventually
// we might want to add a separate CommonCtx — or would that be too confusing?
blobInfoCache: blobinfocache.DefaultCache(options.DestinationCtx),
compressionFormat: options.DestinationCtx.CompressionFormat,
compressionLevel: options.DestinationCtx.CompressionLevel,
}
unparsedToplevel := image.UnparsedInstance(rawSource, nil)
@ -805,7 +809,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
// === Detect compression of the input stream.
// This requires us to “peek ahead” into the stream to read the initial part, which requires us to chain through another io.Reader returned by DetectCompression.
decompressor, destStream, err := compression.DetectCompression(destStream) // We could skip this in some cases, but let's keep the code path uniform
compressionFormat, decompressor, destStream, err := compression.DetectCompressionFormat(destStream) // We could skip this in some cases, but let's keep the code path uniform
if err != nil {
return types.BlobInfo{}, errors.Wrapf(err, "Error reading blob %s", srcInfo.Digest)
}
@ -819,6 +823,8 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
originalLayerReader = destStream
}
desiredCompressionFormat := c.compressionFormat
// === Deal with layer compression/decompression if necessary
var inputInfo types.BlobInfo
var compressionOperation types.LayerCompression
@ -831,7 +837,27 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
// If this fails while writing data, it will do pipeWriter.CloseWithError(); if it fails otherwise,
// e.g. because we have exited and due to pipeReader.Close() above further writing to the pipe has failed,
// we dont care.
go compressGoroutine(pipeWriter, destStream) // Closes pipeWriter
go c.compressGoroutine(pipeWriter, destStream, desiredCompressionFormat) // Closes pipeWriter
destStream = pipeReader
inputInfo.Digest = ""
inputInfo.Size = -1
} else if canModifyBlob && c.dest.DesiredLayerCompression() == types.Compress && isCompressed && desiredCompressionFormat != compressionFormat {
// When the blob is compressed, but the desired format is different, it first needs to be decompressed and finally
// re-compressed using the desired format.
logrus.Debugf("Blob will be converted")
compressionOperation = types.PreserveOriginal
s, err := decompressor(destStream)
if err != nil {
return types.BlobInfo{}, err
}
defer s.Close()
pipeReader, pipeWriter := io.Pipe()
defer pipeReader.Close()
go c.compressGoroutine(pipeWriter, s, desiredCompressionFormat) // Closes pipeWriter
destStream = pipeReader
inputInfo.Digest = ""
inputInfo.Size = -1
@ -847,6 +873,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
inputInfo.Digest = ""
inputInfo.Size = -1
} else {
// PreserveOriginal might also need to recompress the original blob if the desired compression format is different.
logrus.Debugf("Using original blob without modification")
compressionOperation = types.PreserveOriginal
inputInfo = srcInfo
@ -907,14 +934,17 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
}
// compressGoroutine reads all input from src and writes its compressed equivalent to dest.
func compressGoroutine(dest *io.PipeWriter, src io.Reader) {
func (c *copier) compressGoroutine(dest *io.PipeWriter, src io.Reader, compressionFormat string) {
err := errors.New("Internal error: unexpected panic in compressGoroutine")
defer func() { // Note that this is not the same as {defer dest.CloseWithError(err)}; we need err to be evaluated lazily.
dest.CloseWithError(err) // CloseWithError(nil) is equivalent to Close()
}()
zipper := pgzip.NewWriter(dest)
defer zipper.Close()
compressor, err := compression.CompressStream(dest, compressionFormat, c.compressionLevel)
if err != nil {
return
}
defer compressor.Close()
_, err = io.Copy(zipper, src) // Sets err to nil, i.e. causes dest.Close()
_, err = io.Copy(compressor, src) // Sets err to nil, i.e. causes dest.Close()
}

View File

@ -3,6 +3,7 @@ package compression
import (
"bytes"
"compress/bzip2"
"fmt"
"io"
"io/ioutil"
@ -35,6 +36,28 @@ func XzDecompressor(r io.Reader) (io.ReadCloser, error) {
return ioutil.NopCloser(r), nil
}
// compressorFunc writes the compressed stream to the given writer using the specified compression level.
// The caller must call Close() on the stream (even if the input stream does not need closing!).
type compressorFunc func(io.Writer, *int) (io.WriteCloser, error)
// gzipCompressor is a CompressorFunc for the gzip compression algorithm.
func gzipCompressor(r io.Writer, level *int) (io.WriteCloser, error) {
if level != nil {
return pgzip.NewWriterLevel(r, *level)
}
return pgzip.NewWriter(r), nil
}
// bzip2Compressor is a CompressorFunc for the bzip2 compression algorithm.
func bzip2Compressor(r io.Writer, level *int) (io.WriteCloser, error) {
return nil, fmt.Errorf("bzip2 compression not supported")
}
// xzCompressor is a CompressorFunc for the xz compression algorithm.
func xzCompressor(r io.Writer, level *int) (io.WriteCloser, error) {
return xz.NewWriter(r)
}
// compressionAlgos is an internal implementation detail of DetectCompression
var compressionAlgos = map[string]struct {
prefix []byte
@ -46,22 +69,40 @@ var compressionAlgos = map[string]struct {
"zstd": {[]byte{0x28, 0xb5, 0x2f, 0xfd}, ZstdDecompressor}, // zstd (http://www.zstd.net)
}
// DetectCompression returns a DecompressorFunc if the input is recognized as a compressed format, nil otherwise.
// compressors maps an algorithm to its compression function
var compressors = map[string]compressorFunc{
"gzip": gzipCompressor,
"bzip2": bzip2Compressor,
"xz": xzCompressor,
"zstd": zstdCompressor,
}
// CompressStream returns the compressor by its name
func CompressStream(dest io.Writer, name string, level *int) (io.WriteCloser, error) {
c, found := compressors[name]
if !found {
return nil, fmt.Errorf("cannot find compressor for '%s'", name)
}
return c(dest, level)
// DetectCompressionFormat returns a DecompressorFunc if the input is recognized as a compressed format, nil otherwise.
// Because it consumes the start of input, other consumers must use the returned io.Reader instead to also read from the beginning.
func DetectCompression(input io.Reader) (DecompressorFunc, io.Reader, error) {
func DetectCompressionFormat(input io.Reader) (string, DecompressorFunc, io.Reader, error) {
buffer := [8]byte{}
n, err := io.ReadAtLeast(input, buffer[:], len(buffer))
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
// This is a “real” error. We could just ignore it this time, process the data we have, and hope that the source will report the same error again.
// Instead, fail immediately with the original error cause instead of a possibly secondary/misleading error returned later.
return nil, nil, err
return "", nil, nil, err
}
name := ""
var decompressor DecompressorFunc
for name, algo := range compressionAlgos {
for algoname, algo := range compressionAlgos {
if bytes.HasPrefix(buffer[:n], algo.prefix) {
logrus.Debugf("Detected compression format %s", name)
logrus.Debugf("Detected compression format %s", algoname)
name = algoname
decompressor = algo.decompressor
break
}
@ -70,7 +111,14 @@ func DetectCompression(input io.Reader) (DecompressorFunc, io.Reader, error) {
logrus.Debugf("No compression detected")
}
return decompressor, io.MultiReader(bytes.NewReader(buffer[:n]), input), nil
return name, decompressor, io.MultiReader(bytes.NewReader(buffer[:n]), input), nil
}
// DetectCompression returns a DecompressorFunc if the input is recognized as a compressed format, nil otherwise.
// Because it consumes the start of input, other consumers must use the returned io.Reader instead to also read from the beginning.
func DetectCompression(input io.Reader) (DecompressorFunc, io.Reader, error) {
_, d, r, e := DetectCompressionFormat(input)
return d, r, e
}
// AutoDecompress takes a stream and returns an uncompressed version of the

View File

@ -511,6 +511,11 @@ type SystemContext struct {
// === dir.Transport overrides ===
// DirForceCompress compresses the image layers if set to true
DirForceCompress bool
// CompressionFormat is the format to use for the compression of the blobs
CompressionFormat string
// CompressionLevel specifies what compression level is used
CompressionLevel *int
}
// ProgressProperties is used to pass information from the copy code to a monitor which