package compressor // NOTE: This is used from github.com/containers/image by callers that // don't otherwise use containers/storage, so don't make this depend on any // larger software like the graph drivers. import ( "bufio" "bytes" "io" "github.com/containers/storage/pkg/chunked/internal/minimal" "github.com/containers/storage/pkg/ioutils" "github.com/klauspost/compress/zstd" "github.com/opencontainers/go-digest" "github.com/vbatts/tar-split/archive/tar" "github.com/vbatts/tar-split/tar/asm" "github.com/vbatts/tar-split/tar/storage" ) const ( RollsumBits = 16 holesThreshold = int64(1 << 10) ) type holesFinder struct { reader *bufio.Reader zeros int64 threshold int64 state int } const ( holesFinderStateRead = iota holesFinderStateAccumulate holesFinderStateFound holesFinderStateEOF ) // readByte reads a single byte from the underlying reader. // If a single byte is read, the return value is (0, RAW-BYTE-VALUE, nil). // If there are at least f.THRESHOLD consecutive zeros, then the // return value is (N_CONSECUTIVE_ZEROS, '\x00'). func (f *holesFinder) readByte() (int64, byte, error) { for { switch f.state { // reading the file stream case holesFinderStateRead: if f.zeros > 0 { f.zeros-- return 0, 0, nil } b, err := f.reader.ReadByte() if err != nil { return 0, b, err } if b != 0 { return 0, b, err } f.zeros = 1 if f.zeros == f.threshold { f.state = holesFinderStateFound } else { f.state = holesFinderStateAccumulate } // accumulating zeros, but still didn't reach the threshold case holesFinderStateAccumulate: b, err := f.reader.ReadByte() if err != nil { if err == io.EOF { f.state = holesFinderStateEOF continue } return 0, b, err } if b == 0 { f.zeros++ if f.zeros == f.threshold { f.state = holesFinderStateFound } } else { if err := f.reader.UnreadByte(); err != nil { return 0, 0, err } f.state = holesFinderStateRead } // found a hole. Number of zeros >= threshold case holesFinderStateFound: b, err := f.reader.ReadByte() if err != nil { if err == io.EOF { f.state = holesFinderStateEOF } holeLen := f.zeros f.zeros = 0 return holeLen, 0, nil } if b != 0 { if err := f.reader.UnreadByte(); err != nil { return 0, 0, err } f.state = holesFinderStateRead holeLen := f.zeros f.zeros = 0 return holeLen, 0, nil } f.zeros++ // reached EOF. Flush pending zeros if any. case holesFinderStateEOF: if f.zeros > 0 { f.zeros-- return 0, 0, nil } return 0, 0, io.EOF } } } type rollingChecksumReader struct { reader *holesFinder closed bool rollsum *RollSum pendingHole int64 // WrittenOut is the total number of bytes read from // the stream. WrittenOut int64 // IsLastChunkZeros tells whether the last generated // chunk is a hole (made of consecutive zeros). If it // is false, then the last chunk is a data chunk // generated by the rolling checksum. IsLastChunkZeros bool } func (rc *rollingChecksumReader) Read(b []byte) (bool, int, error) { rc.IsLastChunkZeros = false if rc.pendingHole > 0 { toCopy := min(rc.pendingHole, int64(len(b))) rc.pendingHole -= toCopy for i := int64(0); i < toCopy; i++ { b[i] = 0 } rc.WrittenOut += toCopy rc.IsLastChunkZeros = true // if there are no other zeros left, terminate the chunk return rc.pendingHole == 0, int(toCopy), nil } if rc.closed { return false, 0, io.EOF } for i := range b { holeLen, n, err := rc.reader.readByte() if err != nil { if err == io.EOF { rc.closed = true if i == 0 { return false, 0, err } return false, i, nil } // Report any other error type return false, -1, err } if holeLen > 0 { for j := int64(0); j < holeLen; j++ { rc.rollsum.Roll(0) } rc.pendingHole = holeLen return true, i, nil } b[i] = n rc.WrittenOut++ rc.rollsum.Roll(n) if rc.rollsum.OnSplitWithBits(RollsumBits) { return true, i + 1, nil } } return false, len(b), nil } type chunk struct { ChunkOffset int64 Offset int64 Checksum string ChunkSize int64 ChunkType string } type tarSplitData struct { compressed *bytes.Buffer digester digest.Digester uncompressedCounter *ioutils.WriteCounter zstd *zstd.Encoder packer storage.Packer } func newTarSplitData(level int) (*tarSplitData, error) { compressed := bytes.NewBuffer(nil) digester := digest.Canonical.Digester() zstdWriter, err := minimal.ZstdWriterWithLevel(io.MultiWriter(compressed, digester.Hash()), level) if err != nil { return nil, err } uncompressedCounter := ioutils.NewWriteCounter(zstdWriter) metaPacker := storage.NewJSONPacker(uncompressedCounter) return &tarSplitData{ compressed: compressed, digester: digester, uncompressedCounter: uncompressedCounter, zstd: zstdWriter, packer: metaPacker, }, nil } func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, reader io.Reader, level int) error { // total written so far. Used to retrieve partial offsets in the file dest := ioutils.NewWriteCounter(destFile) tarSplitData, err := newTarSplitData(level) if err != nil { return err } defer func() { if tarSplitData.zstd != nil { tarSplitData.zstd.Close() } }() its, err := asm.NewInputTarStream(reader, tarSplitData.packer, nil) if err != nil { return err } tr := tar.NewReader(its) tr.RawAccounting = true buf := make([]byte, 4096) zstdWriter, err := minimal.ZstdWriterWithLevel(dest, level) if err != nil { return err } defer func() { if zstdWriter != nil { zstdWriter.Close() } }() restartCompression := func() (int64, error) { var offset int64 if zstdWriter != nil { if err := zstdWriter.Close(); err != nil { return 0, err } offset = dest.Count zstdWriter.Reset(dest) } return offset, nil } var metadata []minimal.FileMetadata for { hdr, err := tr.Next() if err != nil { if err == io.EOF { break } return err } rawBytes := tr.RawBytes() if _, err := zstdWriter.Write(rawBytes); err != nil { return err } payloadDigester := digest.Canonical.Digester() chunkDigester := digest.Canonical.Digester() // Now handle the payload, if any startOffset := int64(0) lastOffset := int64(0) lastChunkOffset := int64(0) checksum := "" chunks := []chunk{} hf := &holesFinder{ threshold: holesThreshold, reader: bufio.NewReader(tr), } rcReader := &rollingChecksumReader{ reader: hf, rollsum: NewRollSum(), } payloadDest := io.MultiWriter(payloadDigester.Hash(), chunkDigester.Hash(), zstdWriter) for { mustSplit, read, errRead := rcReader.Read(buf) if errRead != nil && errRead != io.EOF { return err } // restart the compression only if there is a payload. if read > 0 { if startOffset == 0 { startOffset, err = restartCompression() if err != nil { return err } lastOffset = startOffset } if _, err := payloadDest.Write(buf[:read]); err != nil { return err } } if (mustSplit || errRead == io.EOF) && startOffset > 0 { off, err := restartCompression() if err != nil { return err } chunkSize := rcReader.WrittenOut - lastChunkOffset if chunkSize > 0 { chunkType := minimal.ChunkTypeData if rcReader.IsLastChunkZeros { chunkType = minimal.ChunkTypeZeros } chunks = append(chunks, chunk{ ChunkOffset: lastChunkOffset, Offset: lastOffset, Checksum: chunkDigester.Digest().String(), ChunkSize: chunkSize, ChunkType: chunkType, }) } lastOffset = off lastChunkOffset = rcReader.WrittenOut chunkDigester = digest.Canonical.Digester() payloadDest = io.MultiWriter(payloadDigester.Hash(), chunkDigester.Hash(), zstdWriter) } if errRead == io.EOF { if startOffset > 0 { checksum = payloadDigester.Digest().String() } break } } mainEntry, err := minimal.NewFileMetadata(hdr) if err != nil { return err } mainEntry.Digest = checksum mainEntry.Offset = startOffset mainEntry.EndOffset = lastOffset entries := []minimal.FileMetadata{mainEntry} for i := 1; i < len(chunks); i++ { entries = append(entries, minimal.FileMetadata{ Type: minimal.TypeChunk, Name: hdr.Name, ChunkOffset: chunks[i].ChunkOffset, }) } if len(chunks) > 1 { for i := range chunks { entries[i].ChunkSize = chunks[i].ChunkSize entries[i].Offset = chunks[i].Offset entries[i].ChunkDigest = chunks[i].Checksum entries[i].ChunkType = chunks[i].ChunkType } } metadata = append(metadata, entries...) } rawBytes := tr.RawBytes() if _, err := zstdWriter.Write(rawBytes); err != nil { zstdWriter.Close() return err } // make sure the entire tarball is flushed to the output as it might contain // some trailing zeros that affect the checksum. if _, err := io.Copy(zstdWriter, its); err != nil { zstdWriter.Close() return err } if err := zstdWriter.Flush(); err != nil { zstdWriter.Close() return err } if err := zstdWriter.Close(); err != nil { return err } zstdWriter = nil if err := tarSplitData.zstd.Flush(); err != nil { return err } if err := tarSplitData.zstd.Close(); err != nil { return err } tarSplitData.zstd = nil ts := minimal.TarSplitData{ Data: tarSplitData.compressed.Bytes(), Digest: tarSplitData.digester.Digest(), UncompressedSize: tarSplitData.uncompressedCounter.Count, } return minimal.WriteZstdChunkedManifest(dest, outMetadata, uint64(dest.Count), &ts, metadata, level) } type zstdChunkedWriter struct { tarSplitOut *io.PipeWriter tarSplitErr chan error } func (w zstdChunkedWriter) Close() error { errClose := w.tarSplitOut.Close() if err := <-w.tarSplitErr; err != nil && err != io.EOF { return err } return errClose } func (w zstdChunkedWriter) Write(p []byte) (int, error) { select { case err := <-w.tarSplitErr: w.tarSplitOut.Close() return 0, err default: return w.tarSplitOut.Write(p) } } // zstdChunkedWriterWithLevel writes a zstd compressed tarball where each file is // compressed separately so it can be addressed separately. Idea based on CRFS: // https://github.com/google/crfs // The difference with CRFS is that the zstd compression is used instead of gzip. // The reason for it is that zstd supports embedding metadata ignored by the decoder // as part of the compressed stream. // A manifest json file with all the metadata is appended at the end of the tarball // stream, using zstd skippable frames. // The final file will look like: // [FILE_1][FILE_2]..[FILE_N][SKIPPABLE FRAME 1][SKIPPABLE FRAME 2] // Where: // [FILE_N]: [ZSTD HEADER][TAR HEADER][PAYLOAD FILE_N][ZSTD FOOTER] // [SKIPPABLE FRAME 1]: [ZSTD SKIPPABLE FRAME, SIZE=MANIFEST LENGTH][MANIFEST] // [SKIPPABLE FRAME 2]: [ZSTD SKIPPABLE FRAME, SIZE=16][MANIFEST_OFFSET][MANIFEST_LENGTH][MANIFEST_LENGTH_UNCOMPRESSED][MANIFEST_TYPE][CHUNKED_ZSTD_MAGIC_NUMBER] // MANIFEST_OFFSET, MANIFEST_LENGTH, MANIFEST_LENGTH_UNCOMPRESSED and CHUNKED_ZSTD_MAGIC_NUMBER are 64 bits unsigned in little endian format. func zstdChunkedWriterWithLevel(out io.Writer, metadata map[string]string, level int) (io.WriteCloser, error) { ch := make(chan error, 1) r, w := io.Pipe() go func() { ch <- writeZstdChunkedStream(out, metadata, r, level) _, _ = io.Copy(io.Discard, r) // Ordinarily writeZstdChunkedStream consumes all of r. If it fails, ensure the write end never blocks and eventually terminates. r.Close() close(ch) }() return zstdChunkedWriter{ tarSplitOut: w, tarSplitErr: ch, }, nil } // ZstdCompressor is a CompressorFunc for the zstd compression algorithm. func ZstdCompressor(r io.Writer, metadata map[string]string, level *int) (io.WriteCloser, error) { if level == nil { l := 10 level = &l } return zstdChunkedWriterWithLevel(r, metadata, *level) }