Merge pull request #19840 from aaronlehmann/resumable-downloads

Add support for resuming downloads on transfer failure
This commit is contained in:
Brian Goff 2016-02-11 10:32:17 -05:00
commit 2edd5f698c
5 changed files with 136 additions and 41 deletions

View File

@ -2,7 +2,6 @@ package distribution
import (
"fmt"
"os"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api"
@ -187,16 +186,3 @@ func validateRepoName(name string) error {
}
return nil
}
// tmpFileClose creates a closer function for a temporary file that closes the file
// and also deletes it.
func tmpFileCloser(tmpFile *os.File) func() error {
return func() error {
tmpFile.Close()
if err := os.RemoveAll(tmpFile.Name()); err != nil {
logrus.Errorf("Failed to remove temp file: %s", tmpFile.Name())
}
return nil
}
}

View File

@ -7,6 +7,7 @@ import (
"io/ioutil"
"net"
"net/url"
"os"
"strings"
"time"
@ -279,6 +280,7 @@ type v1LayerDescriptor struct {
layersDownloaded *bool
layerSize int64
session *registry.Session
tmpFile *os.File
}
func (ld *v1LayerDescriptor) Key() string {
@ -308,7 +310,7 @@ func (ld *v1LayerDescriptor) Download(ctx context.Context, progressOutput progre
}
*ld.layersDownloaded = true
tmpFile, err := ioutil.TempFile("", "GetImageBlob")
ld.tmpFile, err = ioutil.TempFile("", "GetImageBlob")
if err != nil {
layerReader.Close()
return nil, 0, err
@ -317,17 +319,28 @@ func (ld *v1LayerDescriptor) Download(ctx context.Context, progressOutput progre
reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, layerReader), progressOutput, ld.layerSize, ld.ID(), "Downloading")
defer reader.Close()
_, err = io.Copy(tmpFile, reader)
_, err = io.Copy(ld.tmpFile, reader)
if err != nil {
ld.Close()
return nil, 0, err
}
progress.Update(progressOutput, ld.ID(), "Download complete")
logrus.Debugf("Downloaded %s to tempfile %s", ld.ID(), tmpFile.Name())
logrus.Debugf("Downloaded %s to tempfile %s", ld.ID(), ld.tmpFile.Name())
tmpFile.Seek(0, 0)
return ioutils.NewReadCloserWrapper(tmpFile, tmpFileCloser(tmpFile)), ld.layerSize, nil
ld.tmpFile.Seek(0, 0)
return ld.tmpFile, ld.layerSize, nil
}
func (ld *v1LayerDescriptor) Close() {
if ld.tmpFile != nil {
ld.tmpFile.Close()
if err := os.RemoveAll(ld.tmpFile.Name()); err != nil {
logrus.Errorf("Failed to remove temp file: %s", ld.tmpFile.Name())
}
ld.tmpFile = nil
}
}
func (ld *v1LayerDescriptor) Registered(diffID layer.DiffID) {

View File

@ -17,6 +17,7 @@ import (
"github.com/docker/distribution/manifest/schema2"
"github.com/docker/distribution/registry/api/errcode"
"github.com/docker/distribution/registry/client"
"github.com/docker/distribution/registry/client/transport"
"github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/distribution/xfer"
"github.com/docker/docker/image"
@ -114,6 +115,8 @@ type v2LayerDescriptor struct {
repoInfo *registry.RepositoryInfo
repo distribution.Repository
V2MetadataService *metadata.V2MetadataService
tmpFile *os.File
verifier digest.Verifier
}
func (ld *v2LayerDescriptor) Key() string {
@ -131,17 +134,56 @@ func (ld *v2LayerDescriptor) DiffID() (layer.DiffID, error) {
func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) {
logrus.Debugf("pulling blob %q", ld.digest)
var (
err error
offset int64
)
if ld.tmpFile == nil {
ld.tmpFile, err = createDownloadFile()
if err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
} else {
offset, err = ld.tmpFile.Seek(0, os.SEEK_END)
if err != nil {
logrus.Debugf("error seeking to end of download file: %v", err)
offset = 0
ld.tmpFile.Close()
if err := os.Remove(ld.tmpFile.Name()); err != nil {
logrus.Errorf("Failed to remove temp file: %s", ld.tmpFile.Name())
}
ld.tmpFile, err = createDownloadFile()
if err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
} else if offset != 0 {
logrus.Debugf("attempting to resume download of %q from %d bytes", ld.digest, offset)
}
}
tmpFile := ld.tmpFile
blobs := ld.repo.Blobs(ctx)
layerDownload, err := blobs.Open(ctx, ld.digest)
if err != nil {
logrus.Debugf("Error statting layer: %v", err)
logrus.Debugf("Error initiating layer download: %v", err)
if err == distribution.ErrBlobUnknown {
return nil, 0, xfer.DoNotRetry{Err: err}
}
return nil, 0, retryOnError(err)
}
if offset != 0 {
_, err := layerDownload.Seek(offset, os.SEEK_SET)
if err != nil {
if err := ld.truncateDownloadFile(); err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
return nil, 0, err
}
}
size, err := layerDownload.Seek(0, os.SEEK_END)
if err != nil {
// Seek failed, perhaps because there was no Content-Length
@ -149,46 +191,59 @@ func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progre
// still continue without a progress bar.
size = 0
} else {
// Restore the seek offset at the beginning of the stream.
_, err = layerDownload.Seek(0, os.SEEK_SET)
if size != 0 && offset > size {
logrus.Debugf("Partial download is larger than full blob. Starting over")
offset = 0
if err := ld.truncateDownloadFile(); err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
}
// Restore the seek offset either at the beginning of the
// stream, or just after the last byte we have from previous
// attempts.
_, err = layerDownload.Seek(offset, os.SEEK_SET)
if err != nil {
return nil, 0, err
}
}
reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, layerDownload), progressOutput, size, ld.ID(), "Downloading")
reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, layerDownload), progressOutput, size-offset, ld.ID(), "Downloading")
defer reader.Close()
verifier, err := digest.NewDigestVerifier(ld.digest)
if err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
if ld.verifier == nil {
ld.verifier, err = digest.NewDigestVerifier(ld.digest)
if err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
}
tmpFile, err := ioutil.TempFile("", "GetImageBlob")
_, err = io.Copy(tmpFile, io.TeeReader(reader, ld.verifier))
if err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
_, err = io.Copy(tmpFile, io.TeeReader(reader, verifier))
if err != nil {
tmpFile.Close()
if err := os.Remove(tmpFile.Name()); err != nil {
logrus.Errorf("Failed to remove temp file: %s", tmpFile.Name())
if err == transport.ErrWrongCodeForByteRange {
if err := ld.truncateDownloadFile(); err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
return nil, 0, err
}
return nil, 0, retryOnError(err)
}
progress.Update(progressOutput, ld.ID(), "Verifying Checksum")
if !verifier.Verified() {
if !ld.verifier.Verified() {
err = fmt.Errorf("filesystem layer verification failed for digest %s", ld.digest)
logrus.Error(err)
tmpFile.Close()
if err := os.Remove(tmpFile.Name()); err != nil {
logrus.Errorf("Failed to remove temp file: %s", tmpFile.Name())
}
// Allow a retry if this digest verification error happened
// after a resumed download.
if offset != 0 {
if err := ld.truncateDownloadFile(); err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
return nil, 0, err
}
return nil, 0, xfer.DoNotRetry{Err: err}
}
@ -202,9 +257,37 @@ func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progre
if err := os.Remove(tmpFile.Name()); err != nil {
logrus.Errorf("Failed to remove temp file: %s", tmpFile.Name())
}
ld.tmpFile = nil
ld.verifier = nil
return nil, 0, xfer.DoNotRetry{Err: err}
}
return ioutils.NewReadCloserWrapper(tmpFile, tmpFileCloser(tmpFile)), size, nil
return tmpFile, size, nil
}
func (ld *v2LayerDescriptor) Close() {
if ld.tmpFile != nil {
ld.tmpFile.Close()
if err := os.RemoveAll(ld.tmpFile.Name()); err != nil {
logrus.Errorf("Failed to remove temp file: %s", ld.tmpFile.Name())
}
}
}
func (ld *v2LayerDescriptor) truncateDownloadFile() error {
// Need a new hash context since we will be redoing the download
ld.verifier = nil
if _, err := ld.tmpFile.Seek(0, os.SEEK_SET); err != nil {
logrus.Debugf("error seeking to beginning of download file: %v", err)
return err
}
if err := ld.tmpFile.Truncate(0); err != nil {
logrus.Debugf("error truncating download file: %v", err)
return err
}
return nil
}
func (ld *v2LayerDescriptor) Registered(diffID layer.DiffID) {
@ -711,3 +794,7 @@ func fixManifestLayers(m *schema1.Manifest) error {
return nil
}
func createDownloadFile() (*os.File, error) {
return ioutil.TempFile("", "GetImageBlob")
}

View File

@ -59,6 +59,10 @@ type DownloadDescriptor interface {
DiffID() (layer.DiffID, error)
// Download is called to perform the download.
Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error)
// Close is called when the download manager is finished with this
// descriptor and will not call Download again or read from the reader
// that Download returned.
Close()
}
// DownloadDescriptorWithRegistered is a DownloadDescriptor that has an
@ -229,6 +233,8 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
retries int
)
defer descriptor.Close()
for {
downloadReader, size, err = descriptor.Download(d.Transfer.Context(), progressOutput)
if err == nil {

View File

@ -199,6 +199,9 @@ func (d *mockDownloadDescriptor) Download(ctx context.Context, progressOutput pr
return d.mockTarStream(), 0, nil
}
func (d *mockDownloadDescriptor) Close() {
}
func downloadDescriptors(currentDownloads *int32) []DownloadDescriptor {
return []DownloadDescriptor{
&mockDownloadDescriptor{