Cleanup v2 push logic

Manifest is now generated during a v2 push, not relying on previously generated hashes. When pushing a layer, the hash is directly calculated from the tar contents which will be pushed. Computing the hash on push ensures that the hash contents always match what is seen by the registry. This also mitigates issues with tarsum differences and permits using pure SHA digests.
Additionally the new manifest function is moved to the unit tests since it is no longer called outside the tests.

Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)
This commit is contained in:
Derek McGowan 2015-03-04 12:05:17 -08:00
parent c5af44e6d0
commit d172f1253a
4 changed files with 235 additions and 186 deletions

View File

@ -4,113 +4,13 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"io/ioutil"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/docker/docker/engine" "github.com/docker/docker/engine"
"github.com/docker/docker/pkg/tarsum"
"github.com/docker/docker/registry" "github.com/docker/docker/registry"
"github.com/docker/docker/runconfig"
"github.com/docker/libtrust" "github.com/docker/libtrust"
) )
func (s *TagStore) newManifest(localName, remoteName, tag string) ([]byte, error) {
manifest := &registry.ManifestData{
Name: remoteName,
Tag: tag,
SchemaVersion: 1,
}
localRepo, err := s.Get(localName)
if err != nil {
return nil, err
}
if localRepo == nil {
return nil, fmt.Errorf("Repo does not exist: %s", localName)
}
// Get the top-most layer id which the tag points to
layerId, exists := localRepo[tag]
if !exists {
return nil, fmt.Errorf("Tag does not exist for %s: %s", localName, tag)
}
layersSeen := make(map[string]bool)
layer, err := s.graph.Get(layerId)
if err != nil {
return nil, err
}
manifest.Architecture = layer.Architecture
manifest.FSLayers = make([]*registry.FSLayer, 0, 4)
manifest.History = make([]*registry.ManifestHistory, 0, 4)
var metadata runconfig.Config
if layer.Config != nil {
metadata = *layer.Config
}
for ; layer != nil; layer, err = layer.GetParent() {
if err != nil {
return nil, err
}
if layersSeen[layer.ID] {
break
}
if layer.Config != nil && metadata.Image != layer.ID {
err = runconfig.Merge(&metadata, layer.Config)
if err != nil {
return nil, err
}
}
checksum, err := layer.GetCheckSum(s.graph.ImageRoot(layer.ID))
if err != nil {
return nil, fmt.Errorf("Error getting image checksum: %s", err)
}
if tarsum.VersionLabelForChecksum(checksum) != tarsum.Version1.String() {
archive, err := layer.TarLayer()
if err != nil {
return nil, err
}
defer archive.Close()
tarSum, err := tarsum.NewTarSum(archive, true, tarsum.Version1)
if err != nil {
return nil, err
}
if _, err := io.Copy(ioutil.Discard, tarSum); err != nil {
return nil, err
}
checksum = tarSum.Sum(nil)
// Save checksum value
if err := layer.SaveCheckSum(s.graph.ImageRoot(layer.ID), checksum); err != nil {
return nil, err
}
}
jsonData, err := layer.RawJson()
if err != nil {
return nil, fmt.Errorf("Cannot retrieve the path for {%s}: %s", layer.ID, err)
}
manifest.FSLayers = append(manifest.FSLayers, &registry.FSLayer{BlobSum: checksum})
layersSeen[layer.ID] = true
manifest.History = append(manifest.History, &registry.ManifestHistory{V1Compatibility: string(jsonData)})
}
manifestBytes, err := json.MarshalIndent(manifest, "", " ")
if err != nil {
return nil, err
}
return manifestBytes, nil
}
// loadManifest loads a manifest from a byte array and verifies its content. // loadManifest loads a manifest from a byte array and verifies its content.
// The signature must be verified or an error is returned. If the manifest // The signature must be verified or an error is returned. If the manifest
// contains no signatures by a trusted key for the name in the manifest, the // contains no signatures by a trusted key for the name in the manifest, the

View File

@ -2,11 +2,16 @@ package graph
import ( import (
"encoding/json" "encoding/json"
"fmt"
"io"
"io/ioutil"
"os" "os"
"testing" "testing"
"github.com/docker/docker/image" "github.com/docker/docker/image"
"github.com/docker/docker/pkg/tarsum"
"github.com/docker/docker/registry" "github.com/docker/docker/registry"
"github.com/docker/docker/runconfig"
"github.com/docker/docker/utils" "github.com/docker/docker/utils"
) )
@ -17,6 +22,102 @@ const (
testManifestTag = "manifesttest" testManifestTag = "manifesttest"
) )
func (s *TagStore) newManifest(localName, remoteName, tag string) ([]byte, error) {
manifest := &registry.ManifestData{
Name: remoteName,
Tag: tag,
SchemaVersion: 1,
}
localRepo, err := s.Get(localName)
if err != nil {
return nil, err
}
if localRepo == nil {
return nil, fmt.Errorf("Repo does not exist: %s", localName)
}
// Get the top-most layer id which the tag points to
layerId, exists := localRepo[tag]
if !exists {
return nil, fmt.Errorf("Tag does not exist for %s: %s", localName, tag)
}
layersSeen := make(map[string]bool)
layer, err := s.graph.Get(layerId)
if err != nil {
return nil, err
}
manifest.Architecture = layer.Architecture
manifest.FSLayers = make([]*registry.FSLayer, 0, 4)
manifest.History = make([]*registry.ManifestHistory, 0, 4)
var metadata runconfig.Config
if layer.Config != nil {
metadata = *layer.Config
}
for ; layer != nil; layer, err = layer.GetParent() {
if err != nil {
return nil, err
}
if layersSeen[layer.ID] {
break
}
if layer.Config != nil && metadata.Image != layer.ID {
err = runconfig.Merge(&metadata, layer.Config)
if err != nil {
return nil, err
}
}
checksum, err := layer.GetCheckSum(s.graph.ImageRoot(layer.ID))
if err != nil {
return nil, fmt.Errorf("Error getting image checksum: %s", err)
}
if tarsum.VersionLabelForChecksum(checksum) != tarsum.Version1.String() {
archive, err := layer.TarLayer()
if err != nil {
return nil, err
}
defer archive.Close()
tarSum, err := tarsum.NewTarSum(archive, true, tarsum.Version1)
if err != nil {
return nil, err
}
if _, err := io.Copy(ioutil.Discard, tarSum); err != nil {
return nil, err
}
checksum = tarSum.Sum(nil)
// Save checksum value
if err := layer.SaveCheckSum(s.graph.ImageRoot(layer.ID), checksum); err != nil {
return nil, err
}
}
jsonData, err := layer.RawJson()
if err != nil {
return nil, fmt.Errorf("Cannot retrieve the path for {%s}: %s", layer.ID, err)
}
manifest.FSLayers = append(manifest.FSLayers, &registry.FSLayer{BlobSum: checksum})
layersSeen[layer.ID] = true
manifest.History = append(manifest.History, &registry.ManifestHistory{V1Compatibility: string(jsonData)})
}
manifestBytes, err := json.MarshalIndent(manifest, "", " ")
if err != nil {
return nil, err
}
return manifestBytes, nil
}
func TestManifestTarsumCache(t *testing.T) { func TestManifestTarsumCache(t *testing.T) {
tmp, err := utils.TestDirectory("") tmp, err := utils.TestDirectory("")
if err != nil { if err != nil {

View File

@ -2,6 +2,7 @@ package graph
import ( import (
"bytes" "bytes"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -15,7 +16,9 @@ import (
"github.com/docker/docker/engine" "github.com/docker/docker/engine"
"github.com/docker/docker/image" "github.com/docker/docker/image"
"github.com/docker/docker/pkg/common" "github.com/docker/docker/pkg/common"
"github.com/docker/docker/pkg/tarsum"
"github.com/docker/docker/registry" "github.com/docker/docker/registry"
"github.com/docker/docker/runconfig"
"github.com/docker/docker/utils" "github.com/docker/docker/utils"
"github.com/docker/libtrust" "github.com/docker/libtrust"
) )
@ -69,15 +72,11 @@ func (s *TagStore) getImageList(localRepo map[string]string, requestedTag string
return imageList, tagsByImage, nil return imageList, tagsByImage, nil
} }
func (s *TagStore) getImageTags(localName, askedTag string) ([]string, error) { func (s *TagStore) getImageTags(localRepo map[string]string, askedTag string) ([]string, error) {
localRepo, err := s.Get(localName)
if err != nil {
return nil, err
}
log.Debugf("Checking %s against %#v", askedTag, localRepo) log.Debugf("Checking %s against %#v", askedTag, localRepo)
if len(askedTag) > 0 { if len(askedTag) > 0 {
if _, ok := localRepo[askedTag]; !ok { if _, ok := localRepo[askedTag]; !ok {
return nil, fmt.Errorf("Tag does not exist for %s:%s", localName, askedTag) return nil, fmt.Errorf("Tag does not exist: %s", askedTag)
} }
return []string{askedTag}, nil return []string{askedTag}, nil
} }
@ -274,14 +273,7 @@ func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep strin
return imgData.Checksum, nil return imgData.Checksum, nil
} }
func (s *TagStore) pushV2Repository(r *registry.Session, eng *engine.Engine, out io.Writer, repoInfo *registry.RepositoryInfo, tag string, sf *utils.StreamFormatter) error { func (s *TagStore) pushV2Repository(r *registry.Session, localRepo Repository, out io.Writer, repoInfo *registry.RepositoryInfo, tag string, sf *utils.StreamFormatter) error {
if repoInfo.Official {
j := eng.Job("trust_update_base")
if err := j.Run(); err != nil {
log.Errorf("error updating trust base graph: %s", err)
}
}
endpoint, err := r.V2RegistryEndpoint(repoInfo.Index) endpoint, err := r.V2RegistryEndpoint(repoInfo.Index)
if err != nil { if err != nil {
if repoInfo.Index.Official { if repoInfo.Index.Official {
@ -291,7 +283,7 @@ func (s *TagStore) pushV2Repository(r *registry.Session, eng *engine.Engine, out
return fmt.Errorf("error getting registry endpoint: %s", err) return fmt.Errorf("error getting registry endpoint: %s", err)
} }
tags, err := s.getImageTags(repoInfo.LocalName, tag) tags, err := s.getImageTags(localRepo, tag)
if err != nil { if err != nil {
return err return err
} }
@ -305,8 +297,102 @@ func (s *TagStore) pushV2Repository(r *registry.Session, eng *engine.Engine, out
} }
for _, tag := range tags { for _, tag := range tags {
log.Debugf("Pushing repository: %s:%s", repoInfo.CanonicalName, tag)
layerId, exists := localRepo[tag]
if !exists {
return fmt.Errorf("tag does not exist: %s", tag)
}
layer, err := s.graph.Get(layerId)
if err != nil {
return err
}
m := &registry.ManifestData{
SchemaVersion: 1,
Name: repoInfo.RemoteName,
Tag: tag,
Architecture: layer.Architecture,
}
var metadata runconfig.Config
if layer.Config != nil {
metadata = *layer.Config
}
layersSeen := make(map[string]bool)
layers := []*image.Image{layer}
for ; layer != nil; layer, err = layer.GetParent() {
if err != nil {
return err
}
if layersSeen[layer.ID] {
break
}
layers = append(layers, layer)
layersSeen[layer.ID] = true
}
m.FSLayers = make([]*registry.FSLayer, len(layers))
m.History = make([]*registry.ManifestHistory, len(layers))
// Schema version 1 requires layer ordering from top to root
for i, layer := range layers {
log.Debugf("Pushing layer: %s", layer.ID)
if layer.Config != nil && metadata.Image != layer.ID {
err = runconfig.Merge(&metadata, layer.Config)
if err != nil {
return err
}
}
jsonData, err := layer.RawJson()
if err != nil {
return fmt.Errorf("cannot retrieve the path for %s: %s", layer.ID, err)
}
checksum, err := layer.GetCheckSum(s.graph.ImageRoot(layer.ID))
if err != nil {
return fmt.Errorf("error getting image checksum: %s", err)
}
var exists bool
if len(checksum) > 0 {
sumParts := strings.SplitN(checksum, ":", 2)
if len(sumParts) < 2 {
return fmt.Errorf("Invalid checksum: %s", checksum)
}
// Call mount blob
exists, err = r.HeadV2ImageBlob(endpoint, repoInfo.RemoteName, sumParts[0], sumParts[1], auth)
if err != nil {
out.Write(sf.FormatProgress(common.TruncateID(layer.ID), "Image push failed", nil))
return err
}
}
if !exists {
if cs, err := s.pushV2Image(r, layer, endpoint, repoInfo.RemoteName, sf, out, auth); err != nil {
return err
} else if cs != checksum {
// Cache new checksum
if err := layer.SaveCheckSum(s.graph.ImageRoot(layer.ID), cs); err != nil {
return err
}
checksum = cs
}
} else {
out.Write(sf.FormatProgress(common.TruncateID(layer.ID), "Image already exists", nil))
}
m.FSLayers[i] = &registry.FSLayer{BlobSum: checksum}
m.History[i] = &registry.ManifestHistory{V1Compatibility: string(jsonData)}
}
if err := checkValidManifest(m); err != nil {
return fmt.Errorf("invalid manifest: %s", err)
}
log.Debugf("Pushing %s:%s to v2 repository", repoInfo.LocalName, tag) log.Debugf("Pushing %s:%s to v2 repository", repoInfo.LocalName, tag)
mBytes, err := s.newManifest(repoInfo.LocalName, repoInfo.RemoteName, tag) mBytes, err := json.MarshalIndent(m, "", " ")
if err != nil { if err != nil {
return err return err
} }
@ -325,56 +411,8 @@ func (s *TagStore) pushV2Repository(r *registry.Session, eng *engine.Engine, out
} }
log.Infof("Signed manifest for %s:%s using daemon's key: %s", repoInfo.LocalName, tag, s.trustKey.KeyID()) log.Infof("Signed manifest for %s:%s using daemon's key: %s", repoInfo.LocalName, tag, s.trustKey.KeyID())
manifestBytes := string(signedBody)
manifest, verified, err := s.loadManifest(eng, signedBody)
if err != nil {
return fmt.Errorf("error verifying manifest: %s", err)
}
if err := checkValidManifest(manifest); err != nil {
return fmt.Errorf("invalid manifest: %s", err)
}
if verified {
log.Infof("Pushing verified image, key %s is registered for %q", s.trustKey.KeyID(), repoInfo.RemoteName)
}
for i := len(manifest.FSLayers) - 1; i >= 0; i-- {
var (
sumStr = manifest.FSLayers[i].BlobSum
imgJSON = []byte(manifest.History[i].V1Compatibility)
)
sumParts := strings.SplitN(sumStr, ":", 2)
if len(sumParts) < 2 {
return fmt.Errorf("Invalid checksum: %s", sumStr)
}
manifestSum := sumParts[1]
img, err := image.NewImgJSON(imgJSON)
if err != nil {
return fmt.Errorf("Failed to parse json: %s", err)
}
// Call mount blob
exists, err := r.HeadV2ImageBlob(endpoint, repoInfo.RemoteName, sumParts[0], manifestSum, auth)
if err != nil {
out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Image push failed", nil))
return err
}
if !exists {
if err := s.pushV2Image(r, img, endpoint, repoInfo.RemoteName, sumParts[0], manifestSum, sf, out, auth); err != nil {
return err
}
} else {
out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Image already exists", nil))
}
}
// push the manifest // push the manifest
if err := r.PutV2ImageManifest(endpoint, repoInfo.RemoteName, tag, bytes.NewReader([]byte(manifestBytes)), auth); err != nil { if err := r.PutV2ImageManifest(endpoint, repoInfo.RemoteName, tag, bytes.NewReader(signedBody), auth); err != nil {
return err return err
} }
} }
@ -382,42 +420,51 @@ func (s *TagStore) pushV2Repository(r *registry.Session, eng *engine.Engine, out
} }
// PushV2Image pushes the image content to the v2 registry, first buffering the contents to disk // PushV2Image pushes the image content to the v2 registry, first buffering the contents to disk
func (s *TagStore) pushV2Image(r *registry.Session, img *image.Image, endpoint *registry.Endpoint, imageName, sumType, sumStr string, sf *utils.StreamFormatter, out io.Writer, auth *registry.RequestAuthorization) error { func (s *TagStore) pushV2Image(r *registry.Session, img *image.Image, endpoint *registry.Endpoint, imageName string, sf *utils.StreamFormatter, out io.Writer, auth *registry.RequestAuthorization) (string, error) {
out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Buffering to Disk", nil)) out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Buffering to Disk", nil))
image, err := s.graph.Get(img.ID) image, err := s.graph.Get(img.ID)
if err != nil { if err != nil {
return err return "", err
} }
arch, err := image.TarLayer() arch, err := image.TarLayer()
if err != nil { if err != nil {
return err return "", err
} }
defer arch.Close() defer arch.Close()
tf, err := s.graph.newTempFile() tf, err := s.graph.newTempFile()
if err != nil { if err != nil {
return err return "", err
} }
defer func() { defer func() {
tf.Close() tf.Close()
os.Remove(tf.Name()) os.Remove(tf.Name())
}() }()
size, err := bufferToFile(tf, arch) ts, err := tarsum.NewTarSum(arch, true, tarsum.Version1)
if err != nil { if err != nil {
return err return "", err
}
size, err := bufferToFile(tf, ts)
if err != nil {
return "", err
}
checksum := ts.Sum(nil)
sumParts := strings.SplitN(checksum, ":", 2)
if len(sumParts) < 2 {
return "", fmt.Errorf("Invalid checksum: %s", checksum)
} }
// Send the layer // Send the layer
log.Debugf("rendered layer for %s of [%d] size", img.ID, size) log.Debugf("rendered layer for %s of [%d] size", img.ID, size)
if err := r.PutV2ImageBlob(endpoint, imageName, sumType, sumStr, utils.ProgressReader(tf, int(size), out, sf, false, common.TruncateID(img.ID), "Pushing"), auth); err != nil { if err := r.PutV2ImageBlob(endpoint, imageName, sumParts[0], sumParts[1], utils.ProgressReader(tf, int(size), out, sf, false, common.TruncateID(img.ID), "Pushing"), auth); err != nil {
out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Image push failed", nil)) out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Image push failed", nil))
return err return "", err
} }
out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Image successfully pushed", nil)) out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Image successfully pushed", nil))
return nil return checksum, nil
} }
// FIXME: Allow to interrupt current push when new push of same image is done. // FIXME: Allow to interrupt current push when new push of same image is done.
@ -457,17 +504,6 @@ func (s *TagStore) CmdPush(job *engine.Job) engine.Status {
return job.Error(err) return job.Error(err)
} }
if endpoint.Version == registry.APIVersion2 {
err := s.pushV2Repository(r, job.Eng, job.Stdout, repoInfo, tag, sf)
if err == nil {
return engine.StatusOK
}
if err != ErrV2RegistryUnavailable {
return job.Errorf("Error pushing to registry: %s", err)
}
}
reposLen := 1 reposLen := 1
if tag == "" { if tag == "" {
reposLen = len(s.Repositories[repoInfo.LocalName]) reposLen = len(s.Repositories[repoInfo.LocalName])
@ -478,6 +514,18 @@ func (s *TagStore) CmdPush(job *engine.Job) engine.Status {
if !exists { if !exists {
return job.Errorf("Repository does not exist: %s", repoInfo.LocalName) return job.Errorf("Repository does not exist: %s", repoInfo.LocalName)
} }
if endpoint.Version == registry.APIVersion2 {
err := s.pushV2Repository(r, localRepo, job.Stdout, repoInfo, tag, sf)
if err == nil {
return engine.StatusOK
}
if err != ErrV2RegistryUnavailable {
return job.Errorf("Error pushing to registry: %s", err)
}
}
if err := s.pushRepository(r, job.Stdout, repoInfo, localRepo, tag, sf); err != nil { if err := s.pushRepository(r, job.Stdout, repoInfo, localRepo, tag, sf); err != nil {
return job.Error(err) return job.Error(err)
} }

View File

@ -45,7 +45,7 @@ func TestPushUntagged(t *testing.T) {
repoName := fmt.Sprintf("%v/dockercli/busybox", privateRegistryURL) repoName := fmt.Sprintf("%v/dockercli/busybox", privateRegistryURL)
expected := "No tags to push" expected := "Repository does not exist"
pushCmd := exec.Command(dockerBinary, "push", repoName) pushCmd := exec.Command(dockerBinary, "push", repoName)
if out, _, err := runCommandWithOutput(pushCmd); err == nil { if out, _, err := runCommandWithOutput(pushCmd); err == nil {
t.Fatalf("pushing the image to the private registry should have failed: outuput %q", out) t.Fatalf("pushing the image to the private registry should have failed: outuput %q", out)