Transaction-logic for rethink

Signed-off-by: Riyaz Faizullabhoy <riyaz.faizullabhoy@docker.com>
This commit is contained in:
Riyaz Faizullabhoy 2016-04-18 17:35:54 -07:00
parent db5d1b6fb8
commit db2e625341
6 changed files with 138 additions and 29 deletions

View File

@ -241,12 +241,19 @@ func TestGetHandlerSnapshot(t *testing.T) {
ctx := getContext(handlerState{store: metaStore, crypto: crypto})
// Need to create a timestamp and snapshot
sn, err := repo.SignSnapshot(data.DefaultExpires("snapshot"))
snJSON, err := json.Marshal(sn)
require.NoError(t, err)
metaStore.UpdateCurrent(
"gun", storage.MetaUpdate{Role: "snapshot", Version: 1, Data: snJSON})
ts, err := repo.SignTimestamp(data.DefaultExpires("timestamp"))
tsJSON, err := json.Marshal(ts)
require.NoError(t, err)
metaStore.UpdateCurrent(
"gun", storage.MetaUpdate{Role: "timestamp", Version: 1, Data: tsJSON})
req := &http.Request{
Body: ioutil.NopCloser(bytes.NewBuffer(nil)),
}

View File

@ -5,8 +5,11 @@ import (
"golang.org/x/net/context"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/docker/notary"
"github.com/docker/notary/server/errors"
"github.com/docker/notary/server/snapshot"
"github.com/docker/notary/server/storage"
"github.com/docker/notary/server/timestamp"
"github.com/docker/notary/tuf/data"
@ -61,12 +64,10 @@ func getMaybeServerSigned(ctx context.Context, store storage.MetaStore, gun, rol
out []byte
err error
)
switch role {
case data.CanonicalSnapshotRole:
lastModified, out, err = snapshot.GetOrCreateSnapshot(gun, store, cryptoService)
case data.CanonicalTimestampRole:
lastModified, out, err = timestamp.GetOrCreateTimestamp(gun, store, cryptoService)
if role != data.CanonicalTimestampRole && role != data.CanonicalSnapshotRole {
return nil, nil, fmt.Errorf("role %s cannot be server signed", role)
}
lastModified, out, err = timestamp.GetOrCreateTimestamp(gun, store, cryptoService)
if err != nil {
switch err.(type) {
case *storage.ErrNoKey, storage.ErrNotFound:
@ -76,5 +77,22 @@ func getMaybeServerSigned(ctx context.Context, store storage.MetaStore, gun, rol
}
}
// If we wanted the snapshot, get it by checksum from the timestamp data
if role == data.CanonicalSnapshotRole {
ts := new(data.SignedTimestamp)
if err := json.Unmarshal(out, ts); err != nil {
return nil, nil, err
}
snapshotChecksums, err := ts.GetSnapshot()
if err != nil || snapshotChecksums == nil {
return nil, nil, fmt.Errorf("could not retrieve latest snapshot checksum")
}
if snapshotSha256Bytes, ok := snapshotChecksums.Hashes[notary.SHA256]; ok {
snapshotSha256Hex := hex.EncodeToString(snapshotSha256Bytes[:])
return store.GetChecksum(gun, role, snapshotSha256Hex)
}
return nil, nil, fmt.Errorf("could not retrieve sha256 snapshot checksum")
}
return lastModified, out, nil
}

View File

@ -44,9 +44,11 @@ func GetOrCreateSnapshotKey(gun string, store storage.KeyStore, crypto signed.Cr
return nil, err
}
// GetOrCreateSnapshot either returns the exisiting latest snapshot, or uses
// whatever the most recent snapshot is to create the next one, only updating
// the expiry time and version.
// GetOrCreateSnapshot either returns the existing latest snapshot, or uses
// whatever the most recent snapshot is to generate the next one, only updating
// the expiry time and version. Note that this function does not write generated
// snapshots to the underlying data store, and will either return the latest snapshot time
// or nil as the time modified
func GetOrCreateSnapshot(gun string, store storage.MetaStore, cryptoService signed.CryptoService) (
*time.Time, []byte, error) {
@ -85,13 +87,7 @@ func GetOrCreateSnapshot(gun string, store storage.MetaStore, cryptoService sign
logrus.Error("Failed to create a new snapshot")
return nil, nil, err
}
c := time.Now()
if err = store.UpdateCurrent(gun, *snapshotUpdate); err != nil {
return nil, nil, err
}
return &c, snapshotUpdate.Data, nil
return nil, snapshotUpdate.Data, nil
}
// snapshotExpired simply checks if the snapshot is past its expiry time

View File

@ -4,10 +4,12 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"sort"
"time"
"github.com/dancannon/gorethink"
"github.com/docker/notary/storage/rethinkdb"
"github.com/docker/notary/tuf/data"
)
// RDBTUFFile is a tuf file record
@ -19,6 +21,7 @@ type RDBTUFFile struct {
Version int `gorethink:"version"`
Sha256 string `gorethink:"sha256"`
Data []byte `gorethink:"data"`
TSchecksum string `gorethink:"timestamp_checksum"`
}
// TableName returns the table name for the record type
@ -120,15 +123,71 @@ func (rdb RethinkDB) UpdateCurrent(gun string, update MetaUpdate) error {
return err
}
// UpdateCurrentWithTSChecksum adds new metadata version for the given GUN with an associated
// checksum for the timestamp it belongs to, to afford us transaction-like functionality
func (rdb RethinkDB) UpdateCurrentWithTSChecksum(gun, tsChecksum string, update MetaUpdate) error {
now := time.Now()
checksum := sha256.Sum256(update.Data)
file := RDBTUFFile{
Timing: rethinkdb.Timing{
CreatedAt: now,
UpdatedAt: now,
},
GunRoleVersion: []interface{}{gun, update.Role, update.Version},
Gun: gun,
Role: update.Role,
Version: update.Version,
Sha256: hex.EncodeToString(checksum[:]),
TSchecksum: tsChecksum,
Data: update.Data,
}
_, err := gorethink.DB(rdb.dbName).Table(file.TableName()).Insert(
file,
gorethink.InsertOpts{
Conflict: "error", // default but explicit for clarity of intent
},
).RunWrite(rdb.sess)
if err != nil && gorethink.IsConflictErr(err) {
return &ErrOldVersion{}
}
return err
}
// Used for sorting updates alphabetically by role name, such that timestamp is always last:
// Ordering: root, snapshot, targets, targets/* (delegations), timestamp
type updateSorter []MetaUpdate
func (u updateSorter) Len() int { return len(u) }
func (u updateSorter) Swap(i, j int) { u[i], u[j] = u[j], u[i] }
func (u updateSorter) Less(i, j int) bool {
return u[i].Role < u[j].Role
}
// UpdateMany adds multiple new metadata for the given GUN. RethinkDB does
// not support transactions, therefore we will attempt to insert the timestamp
// first as this represents a published version of the repo. If this is successful,
// we will insert the remaining roles (in any order). If any of those roles
// errors on insert, we will do a best effort rollback, at a minimum attempting
// to delete the timestamp so nobody pulls a broken repo.
// last as this represents a published version of the repo. However, we will
// insert all other role data in alphabetical order first, and also include the
// associated timestamp checksum so that we can easily roll back this pseudotransaction
func (rdb RethinkDB) UpdateMany(gun string, updates []MetaUpdate) error {
// find the timestamp first and save its checksum
// then apply the updates in alphabetic role order with the timestamp last
// if there are any failures, we roll back in the same alphabetic order
var tsChecksum string
for _, up := range updates {
if err := rdb.UpdateCurrent(gun, up); err != nil {
if up.Role == data.CanonicalTimestampRole {
tsChecksumBytes := sha256.Sum256(up.Data)
tsChecksum = hex.EncodeToString(tsChecksumBytes[:])
break
}
}
// alphabetize the updates by Role name
sort.Stable(updateSorter(updates))
for _, up := range updates {
if err := rdb.UpdateCurrentWithTSChecksum(gun, tsChecksum, up); err != nil {
// roll back with best-effort deletion, and then error out
rdb.deleteByTSChecksum(tsChecksum)
return err
}
}
@ -191,6 +250,18 @@ func (rdb RethinkDB) Delete(gun string) error {
return nil
}
// deleteByTSChecksum removes all metadata by a timestamp checksum, used for rolling back a "transaction"
// from a call to rethinkdb's UpdateMany
func (rdb RethinkDB) deleteByTSChecksum(tsChecksum string) error {
_, err := gorethink.DB(rdb.dbName).Table(RDBTUFFile{}.TableName()).GetAllByIndex(
"timestamp_checksum", []string{tsChecksum},
).Delete().RunWrite(rdb.sess)
if err != nil {
return fmt.Errorf("unable to delete timestamp checksum data: %s from database: %s", tsChecksum, err.Error())
}
return nil
}
// Bootstrap sets up the database and tables
func (rdb RethinkDB) Bootstrap() error {
return rethinkdb.SetupDB(rdb.sess, rdb.dbName, []rethinkdb.Table{

View File

@ -16,10 +16,11 @@ var (
Name: RDBTUFFile{}.TableName(),
PrimaryKey: "gun_role_version",
SecondaryIndexes: map[string][]string{
rdbSha256Idx: nil,
"gun": nil,
rdbGunRoleIdx: {"gun", "role"},
rdbGunRoleSha256Idx: {"gun", "role", "sha256"},
rdbSha256Idx: nil,
"gun": nil,
"timestamp_checksum": nil,
rdbGunRoleIdx: {"gun", "role"},
rdbGunRoleSha256Idx: {"gun", "role", "sha256"},
},
// this configuration guarantees linearizability of individual atomic operations on individual documents
Config: map[string]string{

View File

@ -50,9 +50,13 @@ func GetOrCreateTimestampKey(gun string, store storage.MetaStore, crypto signed.
// GetOrCreateTimestamp returns the current timestamp for the gun. This may mean
// a new timestamp is generated either because none exists, or because the current
// one has expired. Once generated, the timestamp is saved in the store.
// Additionally, if we had to generate a new snapshot for this timestamp,
// it is also saved in the store
func GetOrCreateTimestamp(gun string, store storage.MetaStore, cryptoService signed.CryptoService) (
*time.Time, []byte, error) {
updates := []storage.MetaUpdate{}
lastModified, timestampJSON, err := store.GetCurrent(gun, data.CanonicalTimestampRole)
if err != nil {
logrus.Error("error retrieving timestamp: ", err.Error())
@ -65,28 +69,40 @@ func GetOrCreateTimestamp(gun string, store storage.MetaStore, cryptoService sig
return nil, nil, err
}
_, snapshot, err := snapshot.GetOrCreateSnapshot(gun, store, cryptoService)
snapshotTime, snapshot, err := snapshot.GetOrCreateSnapshot(gun, store, cryptoService)
if err != nil {
logrus.Debug("Previous timestamp, but no valid snapshot for GUN ", gun)
return nil, nil, err
}
snapshotRole := &data.SignedSnapshot{}
if err := json.Unmarshal(snapshot, snapshotRole); err != nil {
logrus.Error("Failed to unmarshal retrieved snapshot")
return nil, nil, err
}
// If the snapshot was generated, we should write it with the timestamp
if snapshotTime == nil {
updates = append(updates, storage.MetaUpdate{Role: data.CanonicalSnapshotRole, Version: snapshotRole.Signed.Version, Data: snapshot})
}
if !timestampExpired(prev) && !snapshotExpired(prev, snapshot) {
return lastModified, timestampJSON, nil
}
update, err := createTimestamp(gun, prev, snapshot, store, cryptoService)
tsUpdate, err := createTimestamp(gun, prev, snapshot, store, cryptoService)
if err != nil {
logrus.Error("Failed to create a new timestamp")
return nil, nil, err
}
updates = append(updates, *tsUpdate)
c := time.Now()
if err = store.UpdateCurrent(gun, *update); err != nil {
// Write the timestamp, and potentially snapshot
if err = store.UpdateMany(gun, updates); err != nil {
return nil, nil, err
}
return &c, update.Data, nil
return &c, tsUpdate.Data, nil
}
// timestampExpired compares the current time to the expiry time of the timestamp