store: Refactor Atomic operations to use KVEntry.

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2015-05-14 21:13:21 -07:00
parent 93ad39c079
commit bd18f27bda
4 changed files with 16 additions and 15 deletions

View File

@ -271,8 +271,8 @@ func (l *consulLock) Unlock() error {
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Consul) AtomicPut(key string, _ []byte, newValue []byte, index uint64) (bool, error) {
p := &api.KVPair{Key: s.normalize(key), Value: newValue, ModifyIndex: index}
func (s *Consul) AtomicPut(key string, value []byte, previous *KVEntry) (bool, error) {
p := &api.KVPair{Key: s.normalize(key), Value: value, ModifyIndex: previous.LastIndex}
if work, _, err := s.client.KV().CAS(p, nil); err != nil {
return false, err
} else if !work {
@ -283,8 +283,8 @@ func (s *Consul) AtomicPut(key string, _ []byte, newValue []byte, index uint64)
// AtomicDelete deletes a value at "key" if the key has not
// been modified in the meantime, throws an error if this is the case
func (s *Consul) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) {
p := &api.KVPair{Key: s.normalize(key), ModifyIndex: index}
func (s *Consul) AtomicDelete(key string, previous *KVEntry) (bool, error) {
p := &api.KVPair{Key: s.normalize(key), ModifyIndex: previous.LastIndex}
if work, _, err := s.client.KV().DeleteCAS(p, nil); err != nil {
return false, err
} else if !work {

View File

@ -176,15 +176,16 @@ func (s *Etcd) CancelWatch(key string) error {
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Etcd) AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) {
resp, err := s.client.CompareAndSwap(normalize(key), string(newValue), 5, string(oldValue), 0)
func (s *Etcd) AtomicPut(key string, value []byte, previous *KVEntry) (bool, error) {
resp, err := s.client.CompareAndSwap(normalize(key), string(value), 5, string(previous.Value), 0)
if err != nil {
return false, err
}
if !(resp.Node.Value == string(newValue) && resp.Node.Key == key && resp.Node.TTL == 5) {
// FIXME: Why do we do the check like this? Why is the TTL hardcoded to 5?
if !(resp.Node.Value == string(value) && resp.Node.Key == key && resp.Node.TTL == 5) {
return false, ErrKeyModified
}
if !(resp.PrevNode.Value == string(newValue) && resp.PrevNode.Key == key && resp.PrevNode.TTL == 5) {
if !(resp.PrevNode.Value == string(previous.Value) && resp.PrevNode.Key == key && resp.PrevNode.TTL == 5) {
return false, ErrKeyModified
}
return true, nil
@ -192,12 +193,12 @@ func (s *Etcd) AtomicPut(key string, oldValue []byte, newValue []byte, index uin
// AtomicDelete deletes a value at "key" if the key has not
// been modified in the meantime, throws an error if this is the case
func (s *Etcd) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) {
resp, err := s.client.CompareAndDelete(normalize(key), string(oldValue), 0)
func (s *Etcd) AtomicDelete(key string, previous *KVEntry) (bool, error) {
resp, err := s.client.CompareAndDelete(normalize(key), string(previous.Value), 0)
if err != nil {
return false, err
}
if !(resp.PrevNode.Value == string(oldValue) && resp.PrevNode.Key == key && resp.PrevNode.TTL == 5) {
if !(resp.PrevNode.Value == string(previous.Value) && resp.PrevNode.Key == key && resp.PrevNode.TTL == 5) {
return false, ErrKeyModified
}
return true, nil

View File

@ -66,10 +66,10 @@ type Store interface {
CancelWatchRange(prefix string) error
// Atomic operation on a single value
AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error)
AtomicPut(key string, value []byte, previous *KVEntry) (bool, error)
// Atomic delete of a single value
AtomicDelete(key string, oldValue []byte, index uint64) (bool, error)
AtomicDelete(key string, previous *KVEntry) (bool, error)
}
// KVEntry represents {Key, Value, Lastindex} tuple

View File

@ -192,14 +192,14 @@ func (s *Zookeeper) CancelWatchRange(prefix string) error {
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Zookeeper) AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) {
func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVEntry) (bool, error) {
// Use index of Set method to implement CAS
return false, ErrNotImplemented
}
// AtomicDelete deletes a value at "key" if the key has not
// been modified in the meantime, throws an error if this is the case
func (s *Zookeeper) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) {
func (s *Zookeeper) AtomicDelete(key string, previous *KVEntry) (bool, error) {
return false, ErrNotImplemented
}