chunked: copy local files from multiple goroutines

Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
This commit is contained in:
Giuseppe Scrivano 2022-01-03 17:50:49 +01:00
parent e4c2296b20
commit 88deb61566
1 changed files with 127 additions and 47 deletions

View File

@ -12,6 +12,7 @@ import (
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
@ -44,6 +45,8 @@ const (
fileTypeZstdChunked = iota
fileTypeEstargz = iota
fileTypeNoCompression = iota
copyGoRoutines = 32
)
type compressedFileType int
@ -1215,18 +1218,25 @@ func parseBooleanPullOption(storeOpts *storage.StoreOptions, name string, def bo
return def
}
func (c *chunkedDiffer) findAndCopyFile(dirfd int, r *internal.FileMetadata, useHardLinks, enableHostDedup bool, ostreeRepos []string, options *archive.TarOptions, mode os.FileMode) (bool, error) {
type findAndCopyFileOptions struct {
useHardLinks bool
enableHostDedup bool
ostreeRepos []string
options *archive.TarOptions
}
func (c *chunkedDiffer) findAndCopyFile(dirfd int, r *internal.FileMetadata, copyOptions *findAndCopyFileOptions, mode os.FileMode) (bool, error) {
finalizeFile := func(dstFile *os.File) error {
if dstFile != nil {
defer dstFile.Close()
if err := setFileAttrs(dirfd, dstFile, mode, r, options, false); err != nil {
if err := setFileAttrs(dirfd, dstFile, mode, r, copyOptions.options, false); err != nil {
return err
}
}
return nil
}
found, dstFile, _, err := findFileInOtherLayers(c.layersCache, r, dirfd, useHardLinks)
found, dstFile, _, err := findFileInOtherLayers(c.layersCache, r, dirfd, copyOptions.useHardLinks)
if err != nil {
return false, err
}
@ -1237,7 +1247,7 @@ func (c *chunkedDiffer) findAndCopyFile(dirfd int, r *internal.FileMetadata, use
return true, nil
}
found, dstFile, _, err = findFileInOSTreeRepos(r, ostreeRepos, dirfd, useHardLinks)
found, dstFile, _, err = findFileInOSTreeRepos(r, copyOptions.ostreeRepos, dirfd, copyOptions.useHardLinks)
if err != nil {
return false, err
}
@ -1248,8 +1258,8 @@ func (c *chunkedDiffer) findAndCopyFile(dirfd int, r *internal.FileMetadata, use
return true, nil
}
if enableHostDedup {
found, dstFile, _, err = findFileOnTheHost(r, dirfd, useHardLinks, c.copyBuffer)
if copyOptions.enableHostDedup {
found, dstFile, _, err = findFileOnTheHost(r, dirfd, copyOptions.useHardLinks, c.copyBuffer)
if err != nil {
return false, err
}
@ -1336,6 +1346,52 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions) (gra
var hardLinks []hardLinkToCreate
missingPartsSize, totalChunksSize := int64(0), int64(0)
copyOptions := findAndCopyFileOptions{
useHardLinks: useHardLinks,
enableHostDedup: enableHostDedup,
ostreeRepos: ostreeRepos,
options: options,
}
type copyFileJob struct {
njob int
index int
mode os.FileMode
metadata *internal.FileMetadata
found bool
err error
}
var wg sync.WaitGroup
copyResults := make([]copyFileJob, len(mergedEntries))
copyFileJobs := make(chan copyFileJob)
defer func() {
if copyFileJobs != nil {
close(copyFileJobs)
}
wg.Wait()
}()
for i := 0; i < copyGoRoutines; i++ {
wg.Add(1)
jobs := copyFileJobs
go func() {
defer wg.Done()
for job := range jobs {
found, err := c.findAndCopyFile(dirfd, job.metadata, &copyOptions, job.mode)
job.err = err
job.found = found
copyResults[job.njob] = job
}
}()
}
filesToWaitFor := 0
for i, r := range mergedEntries {
if options.ForceMask != nil {
value := fmt.Sprintf("%d:%d:0%o", r.UID, r.GID, r.Mode&07777)
@ -1431,17 +1487,42 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions) (gra
totalChunksSize += r.Size
found, err := c.findAndCopyFile(dirfd, &r, useHardLinks, enableHostDedup, ostreeRepos, options, mode)
if err != nil {
return output, err
if t == tar.TypeReg {
index := i
njob := filesToWaitFor
job := copyFileJob{
mode: mode,
metadata: &mergedEntries[index],
index: index,
njob: njob,
}
if found {
copyFileJobs <- job
filesToWaitFor++
}
}
close(copyFileJobs)
copyFileJobs = nil
wg.Wait()
for _, res := range copyResults[:filesToWaitFor] {
if res.err != nil {
return output, res.err
}
// the file was already copied to its destination
// so nothing left to do.
if res.found {
continue
}
r := &mergedEntries[res.index]
missingPartsSize += r.Size
if t == tar.TypeReg {
remainingSize := r.Size
// the file is missing, attempt to find individual chunks.
for _, chunk := range r.Chunks {
compressedSize := int64(chunk.EndOffset - chunk.Offset)
size := remainingSize
@ -1455,7 +1536,7 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions) (gra
Length: uint64(compressedSize),
}
file := missingFileChunk{
File: &mergedEntries[i],
File: &mergedEntries[res.index],
CompressedSize: compressedSize,
UncompressedSize: size,
}
@ -1481,7 +1562,6 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions) (gra
missingParts = append(missingParts, mp)
}
}
}
// There are some missing files. Prepare a multirange request for the missing chunks.
if len(missingParts) > 0 {
missingParts = mergeMissingChunks(missingParts, maxNumberMissingChunks)