Add namespace content store in metadata
Add a metadata store for content which enforces content is only visible inside a given namespace. Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
		
							parent
							
								
									106c7504f4
								
							
						
					
					
						commit
						2c9004d431
					
				| 
						 | 
				
			
			@ -5,9 +5,11 @@ import (
 | 
			
		|||
	"io/ioutil"
 | 
			
		||||
	"os"
 | 
			
		||||
 | 
			
		||||
	"github.com/boltdb/bolt"
 | 
			
		||||
	"github.com/containerd/containerd/archive"
 | 
			
		||||
	"github.com/containerd/containerd/archive/compression"
 | 
			
		||||
	"github.com/containerd/containerd/content"
 | 
			
		||||
	"github.com/containerd/containerd/metadata"
 | 
			
		||||
	"github.com/containerd/containerd/mount"
 | 
			
		||||
	"github.com/containerd/containerd/plugin"
 | 
			
		||||
	digest "github.com/opencontainers/go-digest"
 | 
			
		||||
| 
						 | 
				
			
			@ -22,13 +24,18 @@ func init() {
 | 
			
		|||
		ID:   "base-diff",
 | 
			
		||||
		Requires: []plugin.PluginType{
 | 
			
		||||
			plugin.ContentPlugin,
 | 
			
		||||
			plugin.MetadataPlugin,
 | 
			
		||||
		},
 | 
			
		||||
		Init: func(ic *plugin.InitContext) (interface{}, error) {
 | 
			
		||||
			c, err := ic.Get(plugin.ContentPlugin)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
			return NewBaseDiff(c.(content.Store))
 | 
			
		||||
			md, err := ic.Get(plugin.MetadataPlugin)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
			return NewBaseDiff(metadata.NewContentStore(md.(*bolt.DB), c.(content.Store)))
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,6 +2,7 @@ package metadata
 | 
			
		|||
 | 
			
		||||
import (
 | 
			
		||||
	"github.com/boltdb/bolt"
 | 
			
		||||
	digest "github.com/opencontainers/go-digest"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// The layout where a "/" delineates a bucket is desribed in the following
 | 
			
		||||
| 
						 | 
				
			
			@ -33,6 +34,7 @@ var (
 | 
			
		|||
	bucketKeyObjectImages     = []byte("images")     // stores image objects
 | 
			
		||||
	bucketKeyObjectContainers = []byte("containers") // stores container objects
 | 
			
		||||
	bucketKeyObjectSnapshots  = []byte("snapshots")  // stores snapshot references
 | 
			
		||||
	bucketKeyObjectContent    = []byte("content")    // stores content links
 | 
			
		||||
 | 
			
		||||
	bucketKeyDigest    = []byte("digest")
 | 
			
		||||
	bucketKeyMediaType = []byte("mediatype")
 | 
			
		||||
| 
						 | 
				
			
			@ -139,3 +141,19 @@ func createSnapshotterBucket(tx *bolt.Tx, namespace, snapshotter string) (*bolt.
 | 
			
		|||
func getSnapshotterBucket(tx *bolt.Tx, namespace, snapshotter string) *bolt.Bucket {
 | 
			
		||||
	return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectSnapshots, []byte(snapshotter))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func createContentBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) (*bolt.Bucket, error) {
 | 
			
		||||
	bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, []byte(dgst.String()))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return bkt, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getAllContentBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {
 | 
			
		||||
	return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getContentBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) *bolt.Bucket {
 | 
			
		||||
	return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, []byte(dgst.String()))
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,281 @@
 | 
			
		|||
package metadata
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"encoding/binary"
 | 
			
		||||
	"io"
 | 
			
		||||
 | 
			
		||||
	"github.com/boltdb/bolt"
 | 
			
		||||
	"github.com/containerd/containerd/content"
 | 
			
		||||
	"github.com/containerd/containerd/namespaces"
 | 
			
		||||
	digest "github.com/opencontainers/go-digest"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type contentStore struct {
 | 
			
		||||
	content.Store
 | 
			
		||||
	db *bolt.DB
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewContentStore returns a namespaced content store using an existing
 | 
			
		||||
// content store interface.
 | 
			
		||||
func NewContentStore(db *bolt.DB, cs content.Store) content.Store {
 | 
			
		||||
	return &contentStore{
 | 
			
		||||
		Store: cs,
 | 
			
		||||
		db:    db,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (cs *contentStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
 | 
			
		||||
	ns, err := namespaces.NamespaceRequired(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return content.Info{}, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var info content.Info
 | 
			
		||||
	if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
 | 
			
		||||
		bkt := getContentBucket(tx, ns, dgst)
 | 
			
		||||
		if bkt == nil {
 | 
			
		||||
			return content.ErrNotFound("")
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		info.Digest = dgst
 | 
			
		||||
		return readInfo(&info, bkt)
 | 
			
		||||
	}); err != nil {
 | 
			
		||||
		return content.Info{}, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return info, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (cs *contentStore) Walk(ctx context.Context, fn content.WalkFunc) error {
 | 
			
		||||
	ns, err := namespaces.NamespaceRequired(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// TODO: Batch results to keep from reading all info into memory
 | 
			
		||||
	var infos []content.Info
 | 
			
		||||
	if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
 | 
			
		||||
		bkt := getAllContentBucket(tx, ns)
 | 
			
		||||
		if bkt == nil {
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return bkt.ForEach(func(k, v []byte) error {
 | 
			
		||||
			dgst, err := digest.Parse(string(k))
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil
 | 
			
		||||
			}
 | 
			
		||||
			info := content.Info{
 | 
			
		||||
				Digest: dgst,
 | 
			
		||||
			}
 | 
			
		||||
			if err := readInfo(&info, bkt.Bucket(k)); err != nil {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
			infos = append(infos, info)
 | 
			
		||||
			return nil
 | 
			
		||||
		})
 | 
			
		||||
	}); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, info := range infos {
 | 
			
		||||
		if err := fn(info); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (cs *contentStore) Delete(ctx context.Context, dgst digest.Digest) error {
 | 
			
		||||
	ns, err := namespaces.NamespaceRequired(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return update(ctx, cs.db, func(tx *bolt.Tx) error {
 | 
			
		||||
		bkt := getContentBucket(tx, ns, dgst)
 | 
			
		||||
		if bkt == nil {
 | 
			
		||||
			return content.ErrNotFound("")
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Just remove local reference, garbage collector is responsible for
 | 
			
		||||
		// cleaning up on disk content
 | 
			
		||||
		return getAllContentBucket(tx, ns).Delete([]byte(dgst.String()))
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (cs *contentStore) Status(ctx context.Context, re string) ([]content.Status, error) {
 | 
			
		||||
	_, err := namespaces.NamespaceRequired(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// TODO: Read status keys and match
 | 
			
		||||
 | 
			
		||||
	return cs.Store.Status(ctx, re)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (cs *contentStore) Abort(ctx context.Context, ref string) error {
 | 
			
		||||
	_, err := namespaces.NamespaceRequired(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// TODO: Read status key and delete
 | 
			
		||||
 | 
			
		||||
	return cs.Store.Abort(ctx, ref)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) {
 | 
			
		||||
	ns, err := namespaces.NamespaceRequired(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// TODO: Create ref key
 | 
			
		||||
 | 
			
		||||
	if expected != "" {
 | 
			
		||||
		if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
 | 
			
		||||
			bkt := getContentBucket(tx, ns, expected)
 | 
			
		||||
			if bkt != nil {
 | 
			
		||||
				return content.ErrExists("")
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		}); err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Do not use the passed in expected value here since it was
 | 
			
		||||
	// already checked against the user metadata. If the content
 | 
			
		||||
	// store has the content, it must still be written before
 | 
			
		||||
	// linked into the given namespace. It is possible in the future
 | 
			
		||||
	// to allow content which exists in content store but not
 | 
			
		||||
	// namespace to be linked here and returned an exist error, but
 | 
			
		||||
	// this would require more configuration to make secure.
 | 
			
		||||
	w, err := cs.Store.Writer(ctx, ref, size, "")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// TODO: keep the expected in the writer to use on commit
 | 
			
		||||
	// when no expected is provided there.
 | 
			
		||||
	return &namespacedWriter{
 | 
			
		||||
		Writer:    w,
 | 
			
		||||
		namespace: ns,
 | 
			
		||||
		db:        cs.db,
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type namespacedWriter struct {
 | 
			
		||||
	content.Writer
 | 
			
		||||
	namespace string
 | 
			
		||||
	db        *bolt.DB
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (nw *namespacedWriter) Commit(size int64, expected digest.Digest) error {
 | 
			
		||||
	tx, err := nw.db.Begin(true)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := nw.commit(tx, size, expected); err != nil {
 | 
			
		||||
		tx.Rollback()
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return tx.Commit()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (nw *namespacedWriter) commit(tx *bolt.Tx, size int64, expected digest.Digest) error {
 | 
			
		||||
	status, err := nw.Writer.Status()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	actual := nw.Writer.Digest()
 | 
			
		||||
 | 
			
		||||
	// TODO: Handle already exists
 | 
			
		||||
	if err := nw.Writer.Commit(size, expected); err != nil {
 | 
			
		||||
		if !content.IsExists(err) {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		if getContentBucket(tx, nw.namespace, actual) != nil {
 | 
			
		||||
			return content.ErrExists("")
 | 
			
		||||
		}
 | 
			
		||||
		// Link into this namespace
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	size = status.Total
 | 
			
		||||
 | 
			
		||||
	bkt, err := createContentBucket(tx, nw.namespace, actual)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	sizeEncoded, err := encodeSize(size)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	timeEncoded, err := status.UpdatedAt.MarshalBinary()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, v := range [][2][]byte{
 | 
			
		||||
		{bucketKeyCreatedAt, timeEncoded},
 | 
			
		||||
		{bucketKeySize, sizeEncoded},
 | 
			
		||||
	} {
 | 
			
		||||
		if err := bkt.Put(v[0], v[1]); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (cs *contentStore) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) {
 | 
			
		||||
	if err := cs.checkAccess(ctx, dgst); err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return cs.Store.Reader(ctx, dgst)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (io.ReaderAt, error) {
 | 
			
		||||
	if err := cs.checkAccess(ctx, dgst); err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return cs.Store.ReaderAt(ctx, dgst)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (cs *contentStore) checkAccess(ctx context.Context, dgst digest.Digest) error {
 | 
			
		||||
	ns, err := namespaces.NamespaceRequired(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return view(ctx, cs.db, func(tx *bolt.Tx) error {
 | 
			
		||||
		bkt := getContentBucket(tx, ns, dgst)
 | 
			
		||||
		if bkt == nil {
 | 
			
		||||
			return content.ErrNotFound("")
 | 
			
		||||
		}
 | 
			
		||||
		return nil
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func readInfo(info *content.Info, bkt *bolt.Bucket) error {
 | 
			
		||||
	return bkt.ForEach(func(k, v []byte) error {
 | 
			
		||||
		switch string(k) {
 | 
			
		||||
		case string(bucketKeyCreatedAt):
 | 
			
		||||
			if err := info.CommittedAt.UnmarshalBinary(v); err != nil {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
		case string(bucketKeySize):
 | 
			
		||||
			info.Size, _ = binary.Varint(v)
 | 
			
		||||
		}
 | 
			
		||||
		// TODO: Read labels
 | 
			
		||||
		return nil
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -243,14 +243,9 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error {
 | 
			
		|||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var (
 | 
			
		||||
		buf         [binary.MaxVarintLen64]byte
 | 
			
		||||
		sizeEncoded []byte = buf[:]
 | 
			
		||||
	)
 | 
			
		||||
	sizeEncoded = sizeEncoded[:binary.PutVarint(sizeEncoded, image.Target.Size)]
 | 
			
		||||
 | 
			
		||||
	if len(sizeEncoded) == 0 {
 | 
			
		||||
		return fmt.Errorf("failed encoding size = %v", image.Target.Size)
 | 
			
		||||
	sizeEncoded, err := encodeSize(image.Target.Size)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, v := range [][2][]byte{
 | 
			
		||||
| 
						 | 
				
			
			@ -265,3 +260,16 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error {
 | 
			
		|||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func encodeSize(size int64) ([]byte, error) {
 | 
			
		||||
	var (
 | 
			
		||||
		buf         [binary.MaxVarintLen64]byte
 | 
			
		||||
		sizeEncoded []byte = buf[:]
 | 
			
		||||
	)
 | 
			
		||||
	sizeEncoded = sizeEncoded[:binary.PutVarint(sizeEncoded, size)]
 | 
			
		||||
 | 
			
		||||
	if len(sizeEncoded) == 0 {
 | 
			
		||||
		return nil, fmt.Errorf("failed encoding size = %v", size)
 | 
			
		||||
	}
 | 
			
		||||
	return sizeEncoded, nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -5,12 +5,14 @@ import (
 | 
			
		|||
	"sync"
 | 
			
		||||
 | 
			
		||||
	"github.com/Sirupsen/logrus"
 | 
			
		||||
	"github.com/boltdb/bolt"
 | 
			
		||||
	api "github.com/containerd/containerd/api/services/content/v1"
 | 
			
		||||
	eventsapi "github.com/containerd/containerd/api/services/events/v1"
 | 
			
		||||
	"github.com/containerd/containerd/content"
 | 
			
		||||
	"github.com/containerd/containerd/errdefs"
 | 
			
		||||
	"github.com/containerd/containerd/events"
 | 
			
		||||
	"github.com/containerd/containerd/log"
 | 
			
		||||
	"github.com/containerd/containerd/metadata"
 | 
			
		||||
	"github.com/containerd/containerd/plugin"
 | 
			
		||||
	"github.com/golang/protobuf/ptypes/empty"
 | 
			
		||||
	digest "github.com/opencontainers/go-digest"
 | 
			
		||||
| 
						 | 
				
			
			@ -39,6 +41,7 @@ func init() {
 | 
			
		|||
		ID:   "content",
 | 
			
		||||
		Requires: []plugin.PluginType{
 | 
			
		||||
			plugin.ContentPlugin,
 | 
			
		||||
			plugin.MetadataPlugin,
 | 
			
		||||
		},
 | 
			
		||||
		Init: NewService,
 | 
			
		||||
	})
 | 
			
		||||
| 
						 | 
				
			
			@ -49,8 +52,13 @@ func NewService(ic *plugin.InitContext) (interface{}, error) {
 | 
			
		|||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	m, err := ic.Get(plugin.MetadataPlugin)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	cs := metadata.NewContentStore(m.(*bolt.DB), c.(content.Store))
 | 
			
		||||
	return &Service{
 | 
			
		||||
		store:   c.(content.Store),
 | 
			
		||||
		store:   cs,
 | 
			
		||||
		emitter: events.GetPoster(ic.Context),
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue