diff --git a/graph/load.go b/graph/load.go index d09b04a0e2..f1dfce7a1c 100644 --- a/graph/load.go +++ b/graph/load.go @@ -106,8 +106,8 @@ func (s *TagStore) recursiveLoad(address, tmpImageDir string) error { } // ensure no two downloads of the same layer happen at the same time - if ps, err := s.poolAdd("pull", "layer:"+img.ID); err != nil { - logrus.Debugf("Image (id: %s) load is already running, waiting: %v", img.ID, err) + if ps, found := s.poolAdd("pull", "layer:"+img.ID); found { + logrus.Debugf("Image (id: %s) load is already running, waiting", img.ID) ps.Wait(nil, nil) return nil } diff --git a/graph/pools_test.go b/graph/pools_test.go index b0ee3b0337..f88a1cf15b 100644 --- a/graph/pools_test.go +++ b/graph/pools_test.go @@ -17,20 +17,17 @@ func TestPools(t *testing.T) { pushingPool: make(map[string]*progressreader.ProgressStatus), } - if _, err := s.poolAdd("pull", "test1"); err != nil { - t.Fatal(err) + if _, found := s.poolAdd("pull", "test1"); found { + t.Fatal("Expected pull test1 not to be in progress") } - if _, err := s.poolAdd("pull", "test2"); err != nil { - t.Fatal(err) + if _, found := s.poolAdd("pull", "test2"); found { + t.Fatal("Expected pull test2 not to be in progress") } - if _, err := s.poolAdd("push", "test1"); err == nil || err.Error() != "pull test1 is already in progress" { - t.Fatalf("Expected `pull test1 is already in progress`") + if _, found := s.poolAdd("push", "test1"); !found { + t.Fatalf("Expected pull test1 to be in progress`") } - if _, err := s.poolAdd("pull", "test1"); err == nil || err.Error() != "pull test1 is already in progress" { - t.Fatalf("Expected `pull test1 is already in progress`") - } - if _, err := s.poolAdd("wait", "test3"); err == nil || err.Error() != "Unknown pool type" { - t.Fatalf("Expected `Unknown pool type`") + if _, found := s.poolAdd("pull", "test1"); !found { + t.Fatalf("Expected pull test1 to be in progress`") } if err := s.poolRemove("pull", "test2"); err != nil { t.Fatal(err) @@ -44,7 +41,4 @@ func TestPools(t *testing.T) { if err := s.poolRemove("push", "test1"); err != nil { t.Fatal(err) } - if err := s.poolRemove("wait", "test3"); err == nil || err.Error() != "Unknown pool type" { - t.Fatalf("Expected `Unknown pool type`") - } } diff --git a/graph/pull_v1.go b/graph/pull_v1.go index ab1e4a57bd..d5f9492790 100644 --- a/graph/pull_v1.go +++ b/graph/pull_v1.go @@ -138,8 +138,8 @@ func (p *v1Puller) pullRepository(askedTag string) error { } // ensure no two downloads of the same image happen at the same time - ps, err := p.poolAdd("pull", "img:"+img.ID) - if err != nil { + ps, found := p.poolAdd("pull", "img:"+img.ID) + if found { msg := p.sf.FormatProgress(stringid.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil) ps.Wait(out, msg) out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil)) @@ -155,7 +155,7 @@ func (p *v1Puller) pullRepository(askedTag string) error { ps.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, p.repoInfo.CanonicalName), nil)) success := false - var lastErr error + var lastErr, err error var isDownloaded bool for _, ep := range p.repoInfo.Index.Mirrors { ep += "v1/" @@ -244,9 +244,9 @@ func (p *v1Puller) pullImage(out io.Writer, imgID, endpoint string, token []stri id := history[i] // ensure no two downloads of the same layer happen at the same time - ps, err := p.poolAdd("pull", "layer:"+id) - if err != nil { - logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err) + ps, found := p.poolAdd("pull", "layer:"+id) + if found { + logrus.Debugf("Image (id: %s) pull is already running, skipping", id) msg := p.sf.FormatProgress(stringid.TruncateID(imgID), "Layer already being pulled by another client. Waiting.", nil) ps.Wait(out, msg) } else { diff --git a/graph/pull_v2.go b/graph/pull_v2.go index ee9aee4c0e..ed5605befb 100644 --- a/graph/pull_v2.go +++ b/graph/pull_v2.go @@ -73,8 +73,8 @@ func (p *v2Puller) pullV2Repository(tag string) (err error) { } - ps, err := p.poolAdd("pull", taggedName) - if err != nil { + ps, found := p.poolAdd("pull", taggedName) + if found { // Another pull of the same repository is already taking place; just wait for it to finish msg := p.sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", p.repoInfo.CanonicalName) ps.Wait(p.config.OutStream, msg) @@ -119,8 +119,8 @@ func (p *v2Puller) download(di *downloadInfo) { out := di.out - ps, err := p.poolAdd("pull", "img:"+di.img.ID) - if err != nil { + ps, found := p.poolAdd("pull", "img:"+di.img.ID) + if found { msg := p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Layer already being pulled by another client. Waiting.", nil) ps.Wait(out, msg) out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil)) diff --git a/graph/push_v1.go b/graph/push_v1.go index 6de293403b..9769ac711f 100644 --- a/graph/push_v1.go +++ b/graph/push_v1.go @@ -214,7 +214,6 @@ func (p *v1Pusher) pushImageToEndpoint(endpoint string, imageIDs []string, tags // pushRepository pushes layers that do not already exist on the registry. func (p *v1Pusher) pushRepository(tag string) error { - logrus.Debugf("Local repo: %s", p.localRepo) p.out = ioutils.NewWriteFlusher(p.config.OutStream) imgList, tags, err := p.getImageList(tag) @@ -229,8 +228,8 @@ func (p *v1Pusher) pushRepository(tag string) error { logrus.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag) } - if _, err := p.poolAdd("push", p.repoInfo.LocalName); err != nil { - return err + if _, found := p.poolAdd("push", p.repoInfo.LocalName); found { + return fmt.Errorf("push or pull %s is already in progress", p.repoInfo.LocalName) } defer p.poolRemove("push", p.repoInfo.LocalName) diff --git a/graph/push_v2.go b/graph/push_v2.go index 6823ac5928..7d5ca44d96 100644 --- a/graph/push_v2.go +++ b/graph/push_v2.go @@ -57,8 +57,8 @@ func (p *v2Pusher) getImageTags(askedTag string) ([]string, error) { func (p *v2Pusher) pushV2Repository(tag string) error { localName := p.repoInfo.LocalName - if _, err := p.poolAdd("push", localName); err != nil { - return err + if _, found := p.poolAdd("push", localName); found { + return fmt.Errorf("push or pull %s is already in progress", localName) } defer p.poolRemove("push", localName) diff --git a/graph/tags.go b/graph/tags.go index 1485be6705..51b19babbb 100644 --- a/graph/tags.go +++ b/graph/tags.go @@ -428,27 +428,32 @@ func validateDigest(dgst string) error { return nil } -func (store *TagStore) poolAdd(kind, key string) (*progressreader.ProgressStatus, error) { +// poolAdd checks if a push or pull is already running, and returns (ps, true) +// if a running operation is found. Otherwise, it creates a new one and returns +// (ps, false). +func (store *TagStore) poolAdd(kind, key string) (*progressreader.ProgressStatus, bool) { store.Lock() defer store.Unlock() if p, exists := store.pullingPool[key]; exists { - return p, fmt.Errorf("pull %s is already in progress", key) + return p, true } if p, exists := store.pushingPool[key]; exists { - return p, fmt.Errorf("push %s is already in progress", key) + return p, true } ps := progressreader.NewProgressStatus() + switch kind { case "pull": store.pullingPool[key] = ps case "push": store.pushingPool[key] = ps default: - return nil, fmt.Errorf("Unknown pool type") + panic("Unknown pool type") } - return ps, nil + + return ps, false } func (store *TagStore) poolRemove(kind, key string) error {