pkg/chunked: add support for sparse files

automatically detect holes in sparse files (the threshold is hardcoded
at 1kb for now) and add this information to the manifest file.

The receiver will create a hole (using unix.Seek and unix.Ftruncate)
instead of writing the actual zeros.

Closes: https://github.com/containers/storage/issues/1091

Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
This commit is contained in:
Giuseppe Scrivano 2022-01-11 18:32:43 +01:00
parent fd89b93ef3
commit 198820877c
No known key found for this signature in database
GPG Key ID: 67E38F7A8BA21772
5 changed files with 384 additions and 53 deletions

View File

@ -524,7 +524,7 @@ func unmarshalToc(manifest []byte) (*internal.TOC, error) {
for iter.ReadArray() {
for field := iter.ReadObject(); field != ""; field = iter.ReadObject() {
switch field {
case "type", "name", "linkName", "digest", "chunkDigest":
case "type", "name", "linkName", "digest", "chunkDigest", "chunkType":
count += len(iter.ReadStringAsSlice())
case "xattrs":
for key := iter.ReadObject(); key != ""; key = iter.ReadObject() {
@ -609,6 +609,8 @@ func unmarshalToc(manifest []byte) (*internal.TOC, error) {
m.ChunkOffset = iter.ReadInt64()
case "chunkDigest":
m.ChunkDigest = getString(iter.ReadStringAsSlice())
case "chunkType":
m.ChunkType = getString(iter.ReadStringAsSlice())
case "xattrs":
m.Xattrs = make(map[string]string)
for key := iter.ReadObject(); key != ""; key = iter.ReadObject() {

View File

@ -17,21 +17,152 @@ import (
)
const RollsumBits = 16
const holesThreshold = int64(1 << 10)
type holesFinder struct {
reader *bufio.Reader
fileOff int64
zeros int64
from int64
threshold int64
state int
}
const (
holesFinderStateRead = iota
holesFinderStateAccumulate
holesFinderStateFound
holesFinderStateEOF
)
// ReadByte reads a single byte from the underlying reader.
// If a single byte is read, the return value is (0, RAW-BYTE-VALUE, nil).
// If there are at least f.THRESHOLD consecutive zeros, then the
// return value is (N_CONSECUTIVE_ZEROS, '\x00').
func (f *holesFinder) ReadByte() (int64, byte, error) {
for {
switch f.state {
// reading the file stream
case holesFinderStateRead:
if f.zeros > 0 {
f.zeros--
return 0, 0, nil
}
b, err := f.reader.ReadByte()
if err != nil {
return 0, b, err
}
if b != 0 {
return 0, b, err
}
f.zeros = 1
if f.zeros == f.threshold {
f.state = holesFinderStateFound
} else {
f.state = holesFinderStateAccumulate
}
// accumulating zeros, but still didn't reach the threshold
case holesFinderStateAccumulate:
b, err := f.reader.ReadByte()
if err != nil {
if err == io.EOF {
f.state = holesFinderStateEOF
continue
}
return 0, b, err
}
if b == 0 {
f.zeros++
if f.zeros == f.threshold {
f.state = holesFinderStateFound
}
} else {
if f.reader.UnreadByte(); err != nil {
return 0, 0, err
}
f.state = holesFinderStateRead
}
// found a hole. Number of zeros >= threshold
case holesFinderStateFound:
b, err := f.reader.ReadByte()
if err != nil {
if err == io.EOF {
f.state = holesFinderStateEOF
}
holeLen := f.zeros
f.zeros = 0
return holeLen, 0, nil
}
if b != 0 {
if f.reader.UnreadByte(); err != nil {
return 0, 0, err
}
f.state = holesFinderStateRead
holeLen := f.zeros
f.zeros = 0
return holeLen, 0, nil
}
f.zeros++
// reached EOF. Flush pending zeros if any.
case holesFinderStateEOF:
if f.zeros > 0 {
f.zeros--
return 0, 0, nil
}
return 0, 0, io.EOF
}
}
}
type rollingChecksumReader struct {
reader *bufio.Reader
closed bool
rollsum *RollSum
reader *holesFinder
closed bool
rollsum *RollSum
pendingHole int64
// WrittenOut is the total number of bytes read from
// the stream.
WrittenOut int64
// IsLastChunkZeros tells whether the last generated
// chunk is a hole (made of consecutive zeros). If it
// is false, then the last chunk is a data chunk
// generated by the rolling checksum.
IsLastChunkZeros bool
}
func (rc *rollingChecksumReader) Read(b []byte) (bool, int, error) {
rc.IsLastChunkZeros = false
if rc.pendingHole > 0 {
toCopy := int64(len(b))
if rc.pendingHole < toCopy {
toCopy = rc.pendingHole
}
rc.pendingHole -= toCopy
for i := int64(0); i < toCopy; i++ {
b[i] = 0
}
rc.WrittenOut += toCopy
rc.IsLastChunkZeros = true
// if there are no other zeros left, terminate the chunk
return rc.pendingHole == 0, int(toCopy), nil
}
if rc.closed {
return false, 0, io.EOF
}
for i := 0; i < len(b); i++ {
n, err := rc.reader.ReadByte()
holeLen, n, err := rc.reader.ReadByte()
if err != nil {
if err == io.EOF {
rc.closed = true
@ -43,6 +174,13 @@ func (rc *rollingChecksumReader) Read(b []byte) (bool, int, error) {
// Report any other error type
return false, -1, err
}
if holeLen > 0 {
for j := int64(0); j < holeLen; j++ {
rc.rollsum.Roll(0)
}
rc.pendingHole = holeLen
return true, i, nil
}
b[i] = n
rc.WrittenOut++
rc.rollsum.Roll(n)
@ -58,6 +196,7 @@ type chunk struct {
Offset int64
Checksum string
ChunkSize int64
ChunkType string
}
func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, reader io.Reader, level int) error {
@ -119,8 +258,13 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r
chunks := []chunk{}
hf := &holesFinder{
threshold: holesThreshold,
reader: bufio.NewReader(tr),
}
rcReader := &rollingChecksumReader{
reader: bufio.NewReader(tr),
reader: hf,
rollsum: NewRollSum(),
}
@ -150,12 +294,21 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r
return err
}
chunks = append(chunks, chunk{
ChunkOffset: lastChunkOffset,
Offset: lastOffset,
Checksum: chunkDigester.Digest().String(),
ChunkSize: rcReader.WrittenOut - lastChunkOffset,
})
chunkSize := rcReader.WrittenOut - lastChunkOffset
if chunkSize > 0 {
chunkType := internal.ChunkTypeData
if rcReader.IsLastChunkZeros {
chunkType = internal.ChunkTypeZeros
}
chunks = append(chunks, chunk{
ChunkOffset: lastChunkOffset,
Offset: lastOffset,
Checksum: chunkDigester.Digest().String(),
ChunkSize: chunkSize,
ChunkType: chunkType,
})
}
lastOffset = off
lastChunkOffset = rcReader.WrittenOut
@ -210,6 +363,7 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r
entries[i].ChunkSize = chunks[i].ChunkSize
entries[i].Offset = chunks[i].Offset
entries[i].ChunkDigest = chunks[i].Checksum
entries[i].ChunkType = chunks[i].ChunkType
}
}
metadata = append(metadata, entries...)

View File

@ -0,0 +1,90 @@
package compressor
import (
"bufio"
"bytes"
"io"
"testing"
)
func TestHole(t *testing.T) {
data := []byte("\x00\x00\x00\x00\x00")
hf := &holesFinder{
threshold: 1,
reader: bufio.NewReader(bytes.NewReader(data)),
}
hole, _, err := hf.ReadByte()
if err != nil {
t.Errorf("got error: %w", err)
}
if hole != 5 {
t.Error("expected hole not found")
}
if _, _, err := hf.ReadByte(); err != io.EOF {
t.Errorf("EOF not found")
}
hf = &holesFinder{
threshold: 1000,
reader: bufio.NewReader(bytes.NewReader(data)),
}
for i := 0; i < 5; i++ {
hole, byte, err := hf.ReadByte()
if err != nil {
t.Errorf("got error: %w", err)
}
if hole != 0 {
t.Error("hole found")
}
if byte != 0 {
t.Error("wrong read")
}
}
if _, _, err := hf.ReadByte(); err != io.EOF {
t.Error("didn't receive EOF")
}
}
func TestTwoHoles(t *testing.T) {
data := []byte("\x00\x00\x00\x00\x00FOO\x00\x00\x00\x00\x00")
hf := &holesFinder{
threshold: 2,
reader: bufio.NewReader(bytes.NewReader(data)),
}
hole, _, err := hf.ReadByte()
if err != nil {
t.Errorf("got error: %w", err)
}
if hole != 5 {
t.Error("hole not found")
}
for _, e := range []byte("FOO") {
hole, c, err := hf.ReadByte()
if err != nil {
t.Errorf("got error: %w", err)
}
if hole != 0 {
t.Error("hole found")
}
if c != e {
t.Errorf("wrong byte read %v instead of %v", c, e)
}
}
hole, _, err = hf.ReadByte()
if err != nil {
t.Errorf("got error: %w", err)
}
if hole != 5 {
t.Error("expected hole not found")
}
if _, _, err := hf.ReadByte(); err != io.EOF {
t.Error("didn't receive EOF")
}
}

View File

@ -46,11 +46,17 @@ type FileMetadata struct {
ChunkSize int64 `json:"chunkSize,omitempty"`
ChunkOffset int64 `json:"chunkOffset,omitempty"`
ChunkDigest string `json:"chunkDigest,omitempty"`
ChunkType string `json:"chunkType,omitempty"`
// internal: computed by mergeTOCEntries.
Chunks []*FileMetadata `json:"-"`
}
const (
ChunkTypeData = ""
ChunkTypeZeros = "zeros"
)
const (
TypeReg = "reg"
TypeChunk = "chunk"

View File

@ -5,6 +5,7 @@ import (
"context"
"encoding/base64"
"fmt"
"hash"
"io"
"io/ioutil"
"os"
@ -42,9 +43,10 @@ const (
containersOverrideXattr = "user.containers.override_stat"
bigDataKey = "zstd-chunked-manifest"
fileTypeZstdChunked = iota
fileTypeEstargz = iota
fileTypeNoCompression = iota
fileTypeZstdChunked = iota
fileTypeEstargz
fileTypeNoCompression
fileTypeHole
copyGoRoutines = 32
)
@ -438,14 +440,14 @@ func maybeDoIDRemap(manifest []internal.FileMetadata, options *archive.TarOption
}
type originFile struct {
Root string
Path string
Root string
Path string
Offset int64
}
type missingFileChunk struct {
Gap int64
Gap int64
Hole bool
File *internal.FileMetadata
@ -454,6 +456,7 @@ type missingFileChunk struct {
}
type missingPart struct {
Hole bool
SourceChunk *ImageSourceChunk
OriginFile *originFile
Chunks []missingFileChunk
@ -722,9 +725,19 @@ func openOrCreateDirUnderRoot(name string, dirfd int, mode os.FileMode) (*os.Fil
return nil, err
}
func (c *chunkedDiffer) prepareCompressedStreamToFile(compression compressedFileType, from io.Reader, mf *missingFileChunk) error {
switch compression {
case fileTypeZstdChunked:
func (c *chunkedDiffer) prepareCompressedStreamToFile(partCompression compressedFileType, from io.Reader, mf *missingFileChunk) (compressedFileType, error) {
switch {
case partCompression == fileTypeHole:
// The entire part is a hole. Do not need to read from a file.
c.rawReader = nil
return fileTypeHole, nil
case mf.Hole:
// Only the missing chunk in the requested part refers to a hole.
// The received data must be discarded.
limitReader := io.LimitReader(from, mf.CompressedSize)
_, err := io.CopyBuffer(ioutil.Discard, limitReader, c.copyBuffer)
return fileTypeHole, err
case partCompression == fileTypeZstdChunked:
c.rawReader = io.LimitReader(from, mf.CompressedSize)
if c.zstdReader == nil {
r := zstd.NewReader(c.rawReader)
@ -732,42 +745,83 @@ func (c *chunkedDiffer) prepareCompressedStreamToFile(compression compressedFile
} else {
c.zstdReader.Reset(c.rawReader, nil)
}
case fileTypeEstargz:
case partCompression == fileTypeEstargz:
c.rawReader = io.LimitReader(from, mf.CompressedSize)
if c.gzipReader == nil {
r, err := pgzip.NewReader(c.rawReader)
if err != nil {
return err
return partCompression, err
}
c.gzipReader = r
} else {
if err := c.gzipReader.Reset(c.rawReader); err != nil {
return err
return partCompression, err
}
}
case fileTypeNoCompression:
case partCompression == fileTypeNoCompression:
c.rawReader = io.LimitReader(from, mf.UncompressedSize)
default:
return fmt.Errorf("unknown file type %q", c.fileType)
return partCompression, fmt.Errorf("unknown file type %q", c.fileType)
}
return partCompression, nil
}
// hashHole writes SIZE zeros to the specified hasher
func hashHole(h hash.Hash, size int64, copyBuffer []byte) error {
count := int64(len(copyBuffer))
if size < count {
count = size
}
for i := int64(0); i < count; i++ {
copyBuffer[i] = 0
}
for size > 0 {
count = int64(len(copyBuffer))
if size < count {
count = size
}
if _, err := h.Write(copyBuffer[:count]); err != nil {
return err
}
size -= count
}
return nil
}
func (c *chunkedDiffer) appendCompressedStreamToFile(compression compressedFileType, to io.Writer, size int64) error {
// appendHole creates a hole with the specified size at the open fd.
func appendHole(fd int, size int64) error {
off, err := unix.Seek(fd, size, unix.SEEK_CUR)
if err != nil {
return err
}
// Make sure the file size is changed. It might be the last hole and no other data written afterwards.
if err := unix.Ftruncate(fd, off); err != nil {
return err
}
return nil
}
func (c *chunkedDiffer) appendCompressedStreamToFile(compression compressedFileType, destFile *destinationFile, size int64) error {
switch compression {
case fileTypeZstdChunked:
defer c.zstdReader.Reset(nil, nil)
if _, err := io.CopyBuffer(to, io.LimitReader(c.zstdReader, size), c.copyBuffer); err != nil {
if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.zstdReader, size), c.copyBuffer); err != nil {
return err
}
case fileTypeEstargz:
defer c.gzipReader.Close()
if _, err := io.CopyBuffer(to, io.LimitReader(c.gzipReader, size), c.copyBuffer); err != nil {
if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.gzipReader, size), c.copyBuffer); err != nil {
return err
}
case fileTypeNoCompression:
if _, err := io.CopyBuffer(to, io.LimitReader(c.rawReader, size), c.copyBuffer); err != nil {
if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.rawReader, size), c.copyBuffer); err != nil {
return err
}
case fileTypeHole:
if err := appendHole(int(destFile.file.Fd()), size); err != nil {
return err
}
if err := hashHole(destFile.hash, size, c.copyBuffer); err != nil {
return err
}
default:
@ -780,6 +834,7 @@ type destinationFile struct {
dirfd int
file *os.File
digester digest.Digester
hash hash.Hash
to io.Writer
metadata *internal.FileMetadata
options *archive.TarOptions
@ -792,11 +847,13 @@ func openDestinationFile(dirfd int, metadata *internal.FileMetadata, options *ar
}
digester := digest.Canonical.Digester()
to := io.MultiWriter(file, digester.Hash())
hash := digester.Hash()
to := io.MultiWriter(file, hash)
return &destinationFile{
file: file,
digester: digester,
hash: hash,
to: to,
metadata: metadata,
options: options,
@ -841,15 +898,17 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan
for _, missingPart := range missingParts {
var part io.ReadCloser
compression := c.fileType
partCompression := c.fileType
switch {
case missingPart.Hole:
partCompression = fileTypeHole
case missingPart.OriginFile != nil:
var err error
part, err = missingPart.OriginFile.OpenFile()
if err != nil {
return err
}
compression = fileTypeNoCompression
partCompression = fileTypeNoCompression
case missingPart.SourceChunk != nil:
select {
case p := <-streams:
@ -880,7 +939,8 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan
goto exit
}
if err := c.prepareCompressedStreamToFile(compression, part, &mf); err != nil {
compression, err := c.prepareCompressedStreamToFile(partCompression, part, &mf)
if err != nil {
Err = err
goto exit
}
@ -911,19 +971,23 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan
}
}
if err := c.appendCompressedStreamToFile(compression, destFile.to, mf.UncompressedSize); err != nil {
if err := c.appendCompressedStreamToFile(compression, destFile, mf.UncompressedSize); err != nil {
Err = err
goto exit
}
if _, err := io.CopyBuffer(ioutil.Discard, c.rawReader, c.copyBuffer); err != nil {
Err = err
goto exit
if c.rawReader != nil {
if _, err := io.CopyBuffer(ioutil.Discard, c.rawReader, c.copyBuffer); err != nil {
Err = err
goto exit
}
}
}
exit:
part.Close()
if Err != nil {
break
if part != nil {
part.Close()
if Err != nil {
break
}
}
}
@ -957,6 +1021,7 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart {
gap := getGap(missingParts, i)
if gap == 0 && missingParts[prevIndex].OriginFile == nil &&
missingParts[i].OriginFile == nil &&
!missingParts[prevIndex].Hole && !missingParts[i].Hole &&
len(missingParts[prevIndex].Chunks) == 1 && len(missingParts[i].Chunks) == 1 &&
missingParts[prevIndex].Chunks[0].File.Name == missingParts[i].Chunks[0].File.Name {
missingParts[prevIndex].SourceChunk.Length += uint64(gap) + missingParts[i].SourceChunk.Length
@ -983,6 +1048,9 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart {
sort.Ints(costs)
toShrink := len(missingParts) - target
if toShrink >= len(costs) {
toShrink = len(costs) - 1
}
targetValue := costs[toShrink]
newMissingParts = missingParts[0:1]
@ -993,6 +1061,7 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart {
gap := getGap(missingParts, i)
prev := &newMissingParts[len(newMissingParts)-1]
prev.SourceChunk.Length += uint64(gap) + missingParts[i].SourceChunk.Length
prev.Hole = false
prev.OriginFile = nil
if gap > 0 {
gapFile := missingFileChunk{
@ -1009,7 +1078,7 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart {
func (c *chunkedDiffer) retrieveMissingFiles(dest string, dirfd int, missingParts []missingPart, options *archive.TarOptions) error {
var chunksToRequest []ImageSourceChunk
for _, c := range missingParts {
if c.OriginFile == nil {
if c.OriginFile == nil && !c.Hole {
chunksToRequest = append(chunksToRequest, *c.SourceChunk)
}
}
@ -1542,16 +1611,26 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions) (gra
},
}
root, path, offset, err := c.layersCache.findChunkInOtherLayers(chunk)
if err != nil {
return output, err
}
if offset >= 0 && validateChunkChecksum(chunk, root, path, offset, c.copyBuffer) {
switch chunk.ChunkType {
case internal.ChunkTypeData:
root, path, offset, err := c.layersCache.findChunkInOtherLayers(chunk)
if err != nil {
return output, err
}
if offset >= 0 && validateChunkChecksum(chunk, root, path, offset, c.copyBuffer) {
missingPartsSize -= size
mp.OriginFile = &originFile{
Root: root,
Path: path,
Offset: offset,
}
}
case internal.ChunkTypeZeros:
missingPartsSize -= size
mp.OriginFile = &originFile{
Root: root,
Path: path,
Offset: offset,
mp.Hole = true
// Mark all chunks belonging to the missing part as holes
for i := range mp.Chunks {
mp.Chunks[i].Hole = true
}
}
missingParts = append(missingParts, mp)