From 7289c7218e2101eb94fb90f2cb22e1412d016984 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Tue, 5 Jan 2016 14:17:42 -0800 Subject: [PATCH] Adds cross-repository blob pushing behavior Tracks source repository information for each blob in the blobsum service, which is then used to attempt to mount blobs from another repository when pushing instead of having to re-push blobs to the same registry. Signed-off-by: Brian Bland --- Dockerfile | 2 +- distribution/metadata/blobsum_service.go | 53 ++++++++++++--- distribution/metadata/blobsum_service_test.go | 46 +++++++------ distribution/metadata/metadata.go | 12 ++++ distribution/pull_v2.go | 7 +- distribution/push_v2.go | 47 ++++++++++++-- hack/vendor.sh | 2 +- integration-cli/docker_cli_push_test.go | 48 ++++++++++++++ migrate/v1/migratev1.go | 2 +- migrate/v1/migratev1_test.go | 6 +- .../github.com/docker/distribution/blobs.go | 4 ++ .../registry/api/v2/descriptors.go | 64 +++++++++++++++++++ .../registry/client/auth/session.go | 28 +++++++- .../registry/client/repository.go | 56 ++++++++++++++++ 14 files changed, 335 insertions(+), 42 deletions(-) diff --git a/Dockerfile b/Dockerfile index d92ca5cf2d..a36f774019 100644 --- a/Dockerfile +++ b/Dockerfile @@ -152,7 +152,7 @@ RUN set -x \ # both. This allows integration-cli tests to cover push/pull with both schema1 # and schema2 manifests. ENV REGISTRY_COMMIT_SCHEMA1 ec87e9b6971d831f0eff752ddb54fb64693e51cd -ENV REGISTRY_COMMIT a7ae88da459b98b481a245e5b1750134724ac67d +ENV REGISTRY_COMMIT 93d9070c8bb28414de9ec96fd38c89614acd8435 RUN set -x \ && export GOPATH="$(mktemp -d)" \ && git clone https://github.com/docker/distribution.git "$GOPATH/src/github.com/docker/distribution" \ diff --git a/distribution/metadata/blobsum_service.go b/distribution/metadata/blobsum_service.go index 88ed7bb197..1208d0f39a 100644 --- a/distribution/metadata/blobsum_service.go +++ b/distribution/metadata/blobsum_service.go @@ -13,8 +13,14 @@ type BlobSumService struct { store Store } +// BlobSum contains the digest and source repository information for a layer. +type BlobSum struct { + Digest digest.Digest + SourceRepository string +} + // maxBlobSums is the number of blobsums to keep per layer DiffID. -const maxBlobSums = 5 +const maxBlobSums = 50 // NewBlobSumService creates a new blobsum mapping service. func NewBlobSumService(store Store) *BlobSumService { @@ -35,18 +41,18 @@ func (blobserv *BlobSumService) diffIDKey(diffID layer.DiffID) string { return string(digest.Digest(diffID).Algorithm()) + "/" + digest.Digest(diffID).Hex() } -func (blobserv *BlobSumService) blobSumKey(blobsum digest.Digest) string { - return string(blobsum.Algorithm()) + "/" + blobsum.Hex() +func (blobserv *BlobSumService) blobSumKey(blobsum BlobSum) string { + return string(blobsum.Digest.Algorithm()) + "/" + blobsum.Digest.Hex() } // GetBlobSums finds the blobsums associated with a layer DiffID. -func (blobserv *BlobSumService) GetBlobSums(diffID layer.DiffID) ([]digest.Digest, error) { +func (blobserv *BlobSumService) GetBlobSums(diffID layer.DiffID) ([]BlobSum, error) { jsonBytes, err := blobserv.store.Get(blobserv.diffIDNamespace(), blobserv.diffIDKey(diffID)) if err != nil { return nil, err } - var blobsums []digest.Digest + var blobsums []BlobSum if err := json.Unmarshal(jsonBytes, &blobsums); err != nil { return nil, err } @@ -55,7 +61,7 @@ func (blobserv *BlobSumService) GetBlobSums(diffID layer.DiffID) ([]digest.Diges } // GetDiffID finds a layer DiffID from a blobsum hash. -func (blobserv *BlobSumService) GetDiffID(blobsum digest.Digest) (layer.DiffID, error) { +func (blobserv *BlobSumService) GetDiffID(blobsum BlobSum) (layer.DiffID, error) { diffIDBytes, err := blobserv.store.Get(blobserv.blobSumNamespace(), blobserv.blobSumKey(blobsum)) if err != nil { return layer.DiffID(""), err @@ -66,12 +72,12 @@ func (blobserv *BlobSumService) GetDiffID(blobsum digest.Digest) (layer.DiffID, // Add associates a blobsum with a layer DiffID. If too many blobsums are // present, the oldest one is dropped. -func (blobserv *BlobSumService) Add(diffID layer.DiffID, blobsum digest.Digest) error { +func (blobserv *BlobSumService) Add(diffID layer.DiffID, blobsum BlobSum) error { oldBlobSums, err := blobserv.GetBlobSums(diffID) if err != nil { oldBlobSums = nil } - newBlobSums := make([]digest.Digest, 0, len(oldBlobSums)+1) + newBlobSums := make([]BlobSum, 0, len(oldBlobSums)+1) // Copy all other blobsums to new slice for _, oldSum := range oldBlobSums { @@ -98,3 +104,34 @@ func (blobserv *BlobSumService) Add(diffID layer.DiffID, blobsum digest.Digest) return blobserv.store.Set(blobserv.blobSumNamespace(), blobserv.blobSumKey(blobsum), []byte(diffID)) } + +// Remove unassociates a blobsum from a layer DiffID. +func (blobserv *BlobSumService) Remove(blobsum BlobSum) error { + diffID, err := blobserv.GetDiffID(blobsum) + if err != nil { + return err + } + oldBlobSums, err := blobserv.GetBlobSums(diffID) + if err != nil { + oldBlobSums = nil + } + newBlobSums := make([]BlobSum, 0, len(oldBlobSums)) + + // Copy all other blobsums to new slice + for _, oldSum := range oldBlobSums { + if oldSum != blobsum { + newBlobSums = append(newBlobSums, oldSum) + } + } + + if len(newBlobSums) == 0 { + return blobserv.store.Delete(blobserv.diffIDNamespace(), blobserv.diffIDKey(diffID)) + } + + jsonBytes, err := json.Marshal(newBlobSums) + if err != nil { + return err + } + + return blobserv.store.Set(blobserv.diffIDNamespace(), blobserv.diffIDKey(diffID), jsonBytes) +} diff --git a/distribution/metadata/blobsum_service_test.go b/distribution/metadata/blobsum_service_test.go index dee64df1ee..8af76d0e31 100644 --- a/distribution/metadata/blobsum_service_test.go +++ b/distribution/metadata/blobsum_service_test.go @@ -1,7 +1,9 @@ package metadata import ( + "encoding/hex" "io/ioutil" + "math/rand" "os" "reflect" "testing" @@ -23,33 +25,32 @@ func TestBlobSumService(t *testing.T) { } blobSumService := NewBlobSumService(metadataStore) + tooManyBlobSums := make([]BlobSum, 100) + for i := range tooManyBlobSums { + randDigest := randomDigest() + tooManyBlobSums[i] = BlobSum{Digest: randDigest} + } + testVectors := []struct { diffID layer.DiffID - blobsums []digest.Digest + blobsums []BlobSum }{ { diffID: layer.DiffID("sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4"), - blobsums: []digest.Digest{ - digest.Digest("sha256:f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937"), + blobsums: []BlobSum{ + {Digest: digest.Digest("sha256:f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937")}, }, }, { diffID: layer.DiffID("sha256:86e0e091d0da6bde2456dbb48306f3956bbeb2eae1b5b9a43045843f69fe4aaa"), - blobsums: []digest.Digest{ - digest.Digest("sha256:f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937"), - digest.Digest("sha256:9e3447ca24cb96d86ebd5960cb34d1299b07e0a0e03801d90b9969a2c187dd6e"), + blobsums: []BlobSum{ + {Digest: digest.Digest("sha256:f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937")}, + {Digest: digest.Digest("sha256:9e3447ca24cb96d86ebd5960cb34d1299b07e0a0e03801d90b9969a2c187dd6e")}, }, }, { - diffID: layer.DiffID("sha256:03f4658f8b782e12230c1783426bd3bacce651ce582a4ffb6fbbfa2079428ecb"), - blobsums: []digest.Digest{ - digest.Digest("sha256:f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937"), - digest.Digest("sha256:9e3447ca24cb96d86ebd5960cb34d1299b07e0a0e03801d90b9969a2c187dd6e"), - digest.Digest("sha256:cbbf2f9a99b47fc460d422812b6a5adff7dfee951d8fa2e4a98caa0382cfbdbf"), - digest.Digest("sha256:8902a7ca89aabbb868835260912159026637634090dd8899eee969523252236e"), - digest.Digest("sha256:c84364306344ccc48532c52ff5209236273525231dddaaab53262322352883aa"), - digest.Digest("sha256:aa7583bbc87532a8352bbb72520a821b3623523523a8352523a52352aaa888fe"), - }, + diffID: layer.DiffID("sha256:03f4658f8b782e12230c1783426bd3bacce651ce582a4ffb6fbbfa2079428ecb"), + blobsums: tooManyBlobSums, }, } @@ -70,8 +71,8 @@ func TestBlobSumService(t *testing.T) { t.Fatalf("error calling Get: %v", err) } expectedBlobsums := len(vec.blobsums) - if expectedBlobsums > 5 { - expectedBlobsums = 5 + if expectedBlobsums > 50 { + expectedBlobsums = 50 } if !reflect.DeepEqual(blobsums, vec.blobsums[len(vec.blobsums)-expectedBlobsums:len(vec.blobsums)]) { t.Fatal("Get returned incorrect layer ID") @@ -85,7 +86,7 @@ func TestBlobSumService(t *testing.T) { } // Test GetDiffID on a nonexistent entry - _, err = blobSumService.GetDiffID(digest.Digest("sha256:82379823067823853223359023576437723560923756b03560378f4497753917")) + _, err = blobSumService.GetDiffID(BlobSum{Digest: digest.Digest("sha256:82379823067823853223359023576437723560923756b03560378f4497753917")}) if err == nil { t.Fatal("expected error looking up nonexistent entry") } @@ -103,3 +104,12 @@ func TestBlobSumService(t *testing.T) { t.Fatal("GetDiffID returned incorrect diffID") } } + +func randomDigest() digest.Digest { + b := [32]byte{} + for i := 0; i < len(b); i++ { + b[i] = byte(rand.Intn(256)) + } + d := hex.EncodeToString(b[:]) + return digest.Digest("sha256:" + d) +} diff --git a/distribution/metadata/metadata.go b/distribution/metadata/metadata.go index ab9cc5b626..9f744d46fc 100644 --- a/distribution/metadata/metadata.go +++ b/distribution/metadata/metadata.go @@ -15,6 +15,8 @@ type Store interface { Get(namespace string, key string) ([]byte, error) // Set writes data indexed by namespace and key. Set(namespace, key string, value []byte) error + // Delete removes data indexed by namespace and key. + Delete(namespace, key string) error } // FSMetadataStore uses the filesystem to associate metadata with layer and @@ -63,3 +65,13 @@ func (store *FSMetadataStore) Set(namespace, key string, value []byte) error { } return os.Rename(tempFilePath, path) } + +// Delete removes data indexed by namespace and key. The data file named after +// the key, stored in the namespace's directory is deleted. +func (store *FSMetadataStore) Delete(namespace, key string) error { + store.Lock() + defer store.Unlock() + + path := store.path(namespace, key) + return os.Remove(path) +} diff --git a/distribution/pull_v2.go b/distribution/pull_v2.go index 7277d07fb1..e7eddce034 100644 --- a/distribution/pull_v2.go +++ b/distribution/pull_v2.go @@ -111,6 +111,7 @@ func (p *v2Puller) pullV2Repository(ctx context.Context, ref reference.Named) (e type v2LayerDescriptor struct { digest digest.Digest + repoInfo *registry.RepositoryInfo repo distribution.Repository blobSumService *metadata.BlobSumService } @@ -124,7 +125,7 @@ func (ld *v2LayerDescriptor) ID() string { } func (ld *v2LayerDescriptor) DiffID() (layer.DiffID, error) { - return ld.blobSumService.GetDiffID(ld.digest) + return ld.blobSumService.GetDiffID(metadata.BlobSum{Digest: ld.digest, SourceRepository: ld.repoInfo.FullName()}) } func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) { @@ -196,7 +197,7 @@ func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progre func (ld *v2LayerDescriptor) Registered(diffID layer.DiffID) { // Cache mapping from this layer's DiffID to the blobsum - ld.blobSumService.Add(diffID, ld.digest) + ld.blobSumService.Add(diffID, metadata.BlobSum{Digest: ld.digest, SourceRepository: ld.repoInfo.FullName()}) } func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named) (tagUpdated bool, err error) { @@ -334,6 +335,7 @@ func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Named, unverif layerDescriptor := &v2LayerDescriptor{ digest: blobSum, + repoInfo: p.repoInfo, repo: p.repo, blobSumService: p.blobSumService, } @@ -400,6 +402,7 @@ func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *s layerDescriptor := &v2LayerDescriptor{ digest: d.Digest, repo: p.repo, + repoInfo: p.repoInfo, blobSumService: p.blobSumService, } diff --git a/distribution/push_v2.go b/distribution/push_v2.go index 98fb13e5c2..ce7be6097d 100644 --- a/distribution/push_v2.go +++ b/distribution/push_v2.go @@ -131,6 +131,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, ima descriptorTemplate := v2PushDescriptor{ blobSumService: p.blobSumService, + repoInfo: p.repoInfo, repo: p.repo, pushState: &p.pushState, } @@ -211,6 +212,7 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild type v2PushDescriptor struct { layer layer.Layer blobSumService *metadata.BlobSumService + repoInfo reference.Named repo distribution.Repository pushState *pushState } @@ -243,7 +245,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. // Do we have any blobsums associated with this layer's DiffID? possibleBlobsums, err := pd.blobSumService.GetBlobSums(diffID) if err == nil { - descriptor, exists, err := blobSumAlreadyExists(ctx, possibleBlobsums, pd.repo, pd.pushState) + descriptor, exists, err := blobSumAlreadyExists(ctx, possibleBlobsums, pd.repoInfo, pd.repo, pd.pushState) if err != nil { progress.Update(progressOutput, pd.ID(), "Image push failed") return retryOnError(err) @@ -263,6 +265,37 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. // then push the blob. bs := pd.repo.Blobs(ctx) + // Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload + for _, blobsum := range possibleBlobsums { + sourceRepo, err := reference.ParseNamed(blobsum.SourceRepository) + if err != nil { + continue + } + if pd.repoInfo.Hostname() == sourceRepo.Hostname() { + logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, blobsum.Digest, sourceRepo.FullName()) + + desc, err := bs.Mount(ctx, sourceRepo.RemoteName(), blobsum.Digest) + if err == nil { + progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", sourceRepo.RemoteName()) + + pd.pushState.Lock() + pd.pushState.confirmedV2 = true + pd.pushState.remoteLayers[diffID] = desc + pd.pushState.Unlock() + + // Cache mapping from this layer's DiffID to the blobsum + if err := pd.blobSumService.Add(diffID, metadata.BlobSum{Digest: blobsum.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil { + return xfer.DoNotRetry{Err: err} + } + + return nil + } + // Unable to mount layer from this repository, so this source mapping is no longer valid + logrus.Debugf("unassociating layer %s (%s) with %s", diffID, blobsum.Digest, sourceRepo.FullName()) + pd.blobSumService.Remove(blobsum) + } + } + // Send the layer layerUpload, err := bs.Create(ctx) if err != nil { @@ -300,7 +333,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. progress.Update(progressOutput, pd.ID(), "Pushed") // Cache mapping from this layer's DiffID to the blobsum - if err := pd.blobSumService.Add(diffID, pushDigest); err != nil { + if err := pd.blobSumService.Add(diffID, metadata.BlobSum{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil { return xfer.DoNotRetry{Err: err} } @@ -332,9 +365,13 @@ func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor { // blobSumAlreadyExists checks if the registry already know about any of the // blobsums passed in the "blobsums" slice. If it finds one that the registry // knows about, it returns the known digest and "true". -func blobSumAlreadyExists(ctx context.Context, blobsums []digest.Digest, repo distribution.Repository, pushState *pushState) (distribution.Descriptor, bool, error) { - for _, dgst := range blobsums { - descriptor, err := repo.Blobs(ctx).Stat(ctx, dgst) +func blobSumAlreadyExists(ctx context.Context, blobsums []metadata.BlobSum, repoInfo reference.Named, repo distribution.Repository, pushState *pushState) (distribution.Descriptor, bool, error) { + for _, blobSum := range blobsums { + // Only check blobsums that are known to this repository or have an unknown source + if blobSum.SourceRepository != "" && blobSum.SourceRepository != repoInfo.FullName() { + continue + } + descriptor, err := repo.Blobs(ctx).Stat(ctx, blobSum.Digest) switch err { case nil: descriptor.MediaType = schema2.MediaTypeLayer diff --git a/hack/vendor.sh b/hack/vendor.sh index 030928b747..cd8aa38840 100755 --- a/hack/vendor.sh +++ b/hack/vendor.sh @@ -44,7 +44,7 @@ clone git github.com/boltdb/bolt v1.1.0 clone git github.com/miekg/dns d27455715200c7d3e321a1e5cadb27c9ee0b0f02 # get graph and distribution packages -clone git github.com/docker/distribution a7ae88da459b98b481a245e5b1750134724ac67d +clone git github.com/docker/distribution 93d9070c8bb28414de9ec96fd38c89614acd8435 clone git github.com/vbatts/tar-split v0.9.11 # get desired notary commit, might also need to be updated in Dockerfile diff --git a/integration-cli/docker_cli_push_test.go b/integration-cli/docker_cli_push_test.go index be5f9aad8e..05cd828478 100644 --- a/integration-cli/docker_cli_push_test.go +++ b/integration-cli/docker_cli_push_test.go @@ -147,6 +147,54 @@ func (s *DockerSchema1RegistrySuite) TestPushEmptyLayer(c *check.C) { testPushEmptyLayer(c) } +func (s *DockerRegistrySuite) TestCrossRepositoryLayerPush(c *check.C) { + sourceRepoName := fmt.Sprintf("%v/dockercli/busybox", privateRegistryURL) + // tag the image to upload it to the private registry + dockerCmd(c, "tag", "busybox", sourceRepoName) + // push the image to the registry + out1, _, err := dockerCmdWithError("push", sourceRepoName) + c.Assert(err, check.IsNil, check.Commentf("pushing the image to the private registry has failed: %s", out1)) + // ensure that none of the layers were mounted from another repository during push + c.Assert(strings.Contains(out1, "Mounted from"), check.Equals, false) + + destRepoName := fmt.Sprintf("%v/dockercli/crossrepopush", privateRegistryURL) + // retag the image to upload the same layers to another repo in the same registry + dockerCmd(c, "tag", "busybox", destRepoName) + // push the image to the registry + out2, _, err := dockerCmdWithError("push", destRepoName) + c.Assert(err, check.IsNil, check.Commentf("pushing the image to the private registry has failed: %s", out2)) + // ensure that layers were mounted from the first repo during push + c.Assert(strings.Contains(out2, "Mounted from dockercli/busybox"), check.Equals, true) + + // ensure that we can pull the cross-repo-pushed repository + dockerCmd(c, "rmi", destRepoName) + dockerCmd(c, "pull", destRepoName) +} + +func (s *DockerSchema1RegistrySuite) TestCrossRepositoryLayerPushNotSupported(c *check.C) { + sourceRepoName := fmt.Sprintf("%v/dockercli/busybox", privateRegistryURL) + // tag the image to upload it to the private registry + dockerCmd(c, "tag", "busybox", sourceRepoName) + // push the image to the registry + out1, _, err := dockerCmdWithError("push", sourceRepoName) + c.Assert(err, check.IsNil, check.Commentf("pushing the image to the private registry has failed: %s", out1)) + // ensure that none of the layers were mounted from another repository during push + c.Assert(strings.Contains(out1, "Mounted from"), check.Equals, false) + + destRepoName := fmt.Sprintf("%v/dockercli/crossrepopush", privateRegistryURL) + // retag the image to upload the same layers to another repo in the same registry + dockerCmd(c, "tag", "busybox", destRepoName) + // push the image to the registry + out2, _, err := dockerCmdWithError("push", destRepoName) + c.Assert(err, check.IsNil, check.Commentf("pushing the image to the private registry has failed: %s", out2)) + // schema1 registry should not support cross-repo layer mounts, so ensure that this does not happen + c.Assert(strings.Contains(out2, "Mounted from dockercli/busybox"), check.Equals, false) + + // ensure that we can pull the second pushed repository + dockerCmd(c, "rmi", destRepoName) + dockerCmd(c, "pull", destRepoName) +} + func (s *DockerTrustSuite) TestTrustedPush(c *check.C) { repoName := fmt.Sprintf("%v/dockercli/trusted:latest", privateRegistryURL) // tag the image and upload it to the private registry diff --git a/migrate/v1/migratev1.go b/migrate/v1/migratev1.go index 77507c3dd4..0fbaa58b63 100644 --- a/migrate/v1/migratev1.go +++ b/migrate/v1/migratev1.go @@ -477,7 +477,7 @@ func migrateImage(id, root string, ls graphIDRegistrar, is image.Store, ms metad dgst, err := digest.ParseDigest(string(checksum)) if err == nil { blobSumService := metadata.NewBlobSumService(ms) - blobSumService.Add(layer.DiffID(), dgst) + blobSumService.Add(layer.DiffID(), metadata.BlobSum{Digest: dgst}) } } _, err = ls.Release(layer) diff --git a/migrate/v1/migratev1_test.go b/migrate/v1/migratev1_test.go index 5fe26637e0..551b10c584 100644 --- a/migrate/v1/migratev1_test.go +++ b/migrate/v1/migratev1_test.go @@ -216,9 +216,9 @@ func TestMigrateImages(t *testing.T) { t.Fatal(err) } - expectedBlobsums := []digest.Digest{ - "sha256:55dc925c23d1ed82551fd018c27ac3ee731377b6bad3963a2a4e76e753d70e57", - "sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4", + expectedBlobsums := []metadata.BlobSum{ + {Digest: digest.Digest("sha256:55dc925c23d1ed82551fd018c27ac3ee731377b6bad3963a2a4e76e753d70e57")}, + {Digest: digest.Digest("sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4")}, } if !reflect.DeepEqual(expectedBlobsums, blobsums) { diff --git a/vendor/src/github.com/docker/distribution/blobs.go b/vendor/src/github.com/docker/distribution/blobs.go index 40cd829578..bd5f0bc9f5 100644 --- a/vendor/src/github.com/docker/distribution/blobs.go +++ b/vendor/src/github.com/docker/distribution/blobs.go @@ -155,6 +155,10 @@ type BlobIngester interface { // Resume attempts to resume a write to a blob, identified by an id. Resume(ctx context.Context, id string) (BlobWriter, error) + + // Mount adds a blob to this service from another source repository, + // identified by a digest. + Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (Descriptor, error) } // BlobWriter provides a handle for inserting data into a blob store. diff --git a/vendor/src/github.com/docker/distribution/registry/api/v2/descriptors.go b/vendor/src/github.com/docker/distribution/registry/api/v2/descriptors.go index 52c725dc2f..ad3da3efb9 100644 --- a/vendor/src/github.com/docker/distribution/registry/api/v2/descriptors.go +++ b/vendor/src/github.com/docker/distribution/registry/api/v2/descriptors.go @@ -1041,6 +1041,70 @@ var routeDescriptors = []RouteDescriptor{ deniedResponseDescriptor, }, }, + { + Name: "Mount Blob", + Description: "Mount a blob identified by the `mount` parameter from another repository.", + Headers: []ParameterDescriptor{ + hostHeader, + authHeader, + contentLengthZeroHeader, + }, + PathParameters: []ParameterDescriptor{ + nameParameterDescriptor, + }, + QueryParameters: []ParameterDescriptor{ + { + Name: "mount", + Type: "query", + Format: "", + Regexp: digest.DigestRegexp, + Description: `Digest of blob to mount from the source repository.`, + }, + { + Name: "from", + Type: "query", + Format: "", + Regexp: reference.NameRegexp, + Description: `Name of the source repository.`, + }, + }, + Successes: []ResponseDescriptor{ + { + Description: "The blob has been mounted in the repository and is available at the provided location.", + StatusCode: http.StatusCreated, + Headers: []ParameterDescriptor{ + { + Name: "Location", + Type: "url", + Format: "", + }, + contentLengthZeroHeader, + dockerUploadUUIDHeader, + }, + }, + }, + Failures: []ResponseDescriptor{ + { + Name: "Invalid Name or Digest", + StatusCode: http.StatusBadRequest, + ErrorCodes: []errcode.ErrorCode{ + ErrorCodeDigestInvalid, + ErrorCodeNameInvalid, + }, + }, + { + Name: "Not allowed", + Description: "Blob mount is not allowed because the registry is configured as a pull-through cache or for some other reason", + StatusCode: http.StatusMethodNotAllowed, + ErrorCodes: []errcode.ErrorCode{ + errcode.ErrorCodeUnsupported, + }, + }, + unauthorizedResponseDescriptor, + repositoryNotFoundResponseDescriptor, + deniedResponseDescriptor, + }, + }, }, }, }, diff --git a/vendor/src/github.com/docker/distribution/registry/client/auth/session.go b/vendor/src/github.com/docker/distribution/registry/client/auth/session.go index 9819b3cb84..6b483c62ef 100644 --- a/vendor/src/github.com/docker/distribution/registry/client/auth/session.go +++ b/vendor/src/github.com/docker/distribution/registry/client/auth/session.go @@ -108,6 +108,8 @@ type tokenHandler struct { tokenLock sync.Mutex tokenCache string tokenExpiration time.Time + + additionalScopes map[string]struct{} } // tokenScope represents the scope at which a token will be requested. @@ -145,6 +147,7 @@ func newTokenHandler(transport http.RoundTripper, creds CredentialStore, c clock Scope: scope, Actions: actions, }, + additionalScopes: map[string]struct{}{}, } } @@ -160,7 +163,15 @@ func (th *tokenHandler) Scheme() string { } func (th *tokenHandler) AuthorizeRequest(req *http.Request, params map[string]string) error { - if err := th.refreshToken(params); err != nil { + var additionalScopes []string + if fromParam := req.URL.Query().Get("from"); fromParam != "" { + additionalScopes = append(additionalScopes, tokenScope{ + Resource: "repository", + Scope: fromParam, + Actions: []string{"pull"}, + }.String()) + } + if err := th.refreshToken(params, additionalScopes...); err != nil { return err } @@ -169,11 +180,18 @@ func (th *tokenHandler) AuthorizeRequest(req *http.Request, params map[string]st return nil } -func (th *tokenHandler) refreshToken(params map[string]string) error { +func (th *tokenHandler) refreshToken(params map[string]string, additionalScopes ...string) error { th.tokenLock.Lock() defer th.tokenLock.Unlock() + var addedScopes bool + for _, scope := range additionalScopes { + if _, ok := th.additionalScopes[scope]; !ok { + th.additionalScopes[scope] = struct{}{} + addedScopes = true + } + } now := th.clock.Now() - if now.After(th.tokenExpiration) { + if now.After(th.tokenExpiration) || addedScopes { tr, err := th.fetchToken(params) if err != nil { return err @@ -223,6 +241,10 @@ func (th *tokenHandler) fetchToken(params map[string]string) (token *tokenRespon reqParams.Add("scope", scopeField) } + for scope := range th.additionalScopes { + reqParams.Add("scope", scope) + } + if th.creds != nil { username, password := th.creds.Basic(realmURL) if username != "" && password != "" { diff --git a/vendor/src/github.com/docker/distribution/registry/client/repository.go b/vendor/src/github.com/docker/distribution/registry/client/repository.go index 758c6e5e31..8f30b4f13a 100644 --- a/vendor/src/github.com/docker/distribution/registry/client/repository.go +++ b/vendor/src/github.com/docker/distribution/registry/client/repository.go @@ -10,6 +10,7 @@ import ( "net/http" "net/url" "strconv" + "sync" "time" "github.com/docker/distribution" @@ -499,6 +500,9 @@ type blobs struct { statter distribution.BlobDescriptorService distribution.BlobDeleter + + cacheLock sync.Mutex + cachedBlobUpload distribution.BlobWriter } func sanitizeLocation(location, base string) (string, error) { @@ -573,7 +577,20 @@ func (bs *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut } func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) { + bs.cacheLock.Lock() + if bs.cachedBlobUpload != nil { + upload := bs.cachedBlobUpload + bs.cachedBlobUpload = nil + bs.cacheLock.Unlock() + + return upload, nil + } + bs.cacheLock.Unlock() + u, err := bs.ub.BuildBlobUploadURL(bs.name) + if err != nil { + return nil, err + } resp, err := bs.client.Post(u, "", nil) if err != nil { @@ -604,6 +621,45 @@ func (bs *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter panic("not implemented") } +func (bs *blobs) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { + u, err := bs.ub.BuildBlobUploadURL(bs.name, url.Values{"from": {sourceRepo}, "mount": {dgst.String()}}) + if err != nil { + return distribution.Descriptor{}, err + } + + resp, err := bs.client.Post(u, "", nil) + if err != nil { + return distribution.Descriptor{}, err + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusCreated: + return bs.Stat(ctx, dgst) + case http.StatusAccepted: + // Triggered a blob upload (legacy behavior), so cache the creation info + uuid := resp.Header.Get("Docker-Upload-UUID") + location, err := sanitizeLocation(resp.Header.Get("Location"), u) + if err != nil { + return distribution.Descriptor{}, err + } + + bs.cacheLock.Lock() + bs.cachedBlobUpload = &httpBlobUpload{ + statter: bs.statter, + client: bs.client, + uuid: uuid, + startedAt: time.Now(), + location: location, + } + bs.cacheLock.Unlock() + + return distribution.Descriptor{}, HandleErrorResponse(resp) + default: + return distribution.Descriptor{}, HandleErrorResponse(resp) + } +} + func (bs *blobs) Delete(ctx context.Context, dgst digest.Digest) error { return bs.statter.Clear(ctx, dgst) }