diff --git a/pkg/store/consul.go b/pkg/store/consul.go index 856a1fa941..95313b77eb 100644 --- a/pkg/store/consul.go +++ b/pkg/store/consul.go @@ -271,8 +271,8 @@ func (l *consulLock) Unlock() error { // AtomicPut put a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case -func (s *Consul) AtomicPut(key string, _ []byte, newValue []byte, index uint64) (bool, error) { - p := &api.KVPair{Key: s.normalize(key), Value: newValue, ModifyIndex: index} +func (s *Consul) AtomicPut(key string, value []byte, previous *KVEntry) (bool, error) { + p := &api.KVPair{Key: s.normalize(key), Value: value, ModifyIndex: previous.LastIndex} if work, _, err := s.client.KV().CAS(p, nil); err != nil { return false, err } else if !work { @@ -283,8 +283,8 @@ func (s *Consul) AtomicPut(key string, _ []byte, newValue []byte, index uint64) // AtomicDelete deletes a value at "key" if the key has not // been modified in the meantime, throws an error if this is the case -func (s *Consul) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) { - p := &api.KVPair{Key: s.normalize(key), ModifyIndex: index} +func (s *Consul) AtomicDelete(key string, previous *KVEntry) (bool, error) { + p := &api.KVPair{Key: s.normalize(key), ModifyIndex: previous.LastIndex} if work, _, err := s.client.KV().DeleteCAS(p, nil); err != nil { return false, err } else if !work { diff --git a/pkg/store/etcd.go b/pkg/store/etcd.go index 00d59e36a7..189346e60c 100644 --- a/pkg/store/etcd.go +++ b/pkg/store/etcd.go @@ -176,15 +176,16 @@ func (s *Etcd) CancelWatch(key string) error { // AtomicPut put a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case -func (s *Etcd) AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) { - resp, err := s.client.CompareAndSwap(normalize(key), string(newValue), 5, string(oldValue), 0) +func (s *Etcd) AtomicPut(key string, value []byte, previous *KVEntry) (bool, error) { + resp, err := s.client.CompareAndSwap(normalize(key), string(value), 5, string(previous.Value), 0) if err != nil { return false, err } - if !(resp.Node.Value == string(newValue) && resp.Node.Key == key && resp.Node.TTL == 5) { + // FIXME: Why do we do the check like this? Why is the TTL hardcoded to 5? + if !(resp.Node.Value == string(value) && resp.Node.Key == key && resp.Node.TTL == 5) { return false, ErrKeyModified } - if !(resp.PrevNode.Value == string(newValue) && resp.PrevNode.Key == key && resp.PrevNode.TTL == 5) { + if !(resp.PrevNode.Value == string(previous.Value) && resp.PrevNode.Key == key && resp.PrevNode.TTL == 5) { return false, ErrKeyModified } return true, nil @@ -192,12 +193,12 @@ func (s *Etcd) AtomicPut(key string, oldValue []byte, newValue []byte, index uin // AtomicDelete deletes a value at "key" if the key has not // been modified in the meantime, throws an error if this is the case -func (s *Etcd) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) { - resp, err := s.client.CompareAndDelete(normalize(key), string(oldValue), 0) +func (s *Etcd) AtomicDelete(key string, previous *KVEntry) (bool, error) { + resp, err := s.client.CompareAndDelete(normalize(key), string(previous.Value), 0) if err != nil { return false, err } - if !(resp.PrevNode.Value == string(oldValue) && resp.PrevNode.Key == key && resp.PrevNode.TTL == 5) { + if !(resp.PrevNode.Value == string(previous.Value) && resp.PrevNode.Key == key && resp.PrevNode.TTL == 5) { return false, ErrKeyModified } return true, nil diff --git a/pkg/store/store.go b/pkg/store/store.go index e825323b89..ea6db4a05f 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -66,10 +66,10 @@ type Store interface { CancelWatchRange(prefix string) error // Atomic operation on a single value - AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) + AtomicPut(key string, value []byte, previous *KVEntry) (bool, error) // Atomic delete of a single value - AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) + AtomicDelete(key string, previous *KVEntry) (bool, error) } // KVEntry represents {Key, Value, Lastindex} tuple diff --git a/pkg/store/zookeeper.go b/pkg/store/zookeeper.go index 67a7d84af1..3770a39e6e 100644 --- a/pkg/store/zookeeper.go +++ b/pkg/store/zookeeper.go @@ -192,14 +192,14 @@ func (s *Zookeeper) CancelWatchRange(prefix string) error { // AtomicPut put a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case -func (s *Zookeeper) AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) { +func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVEntry) (bool, error) { // Use index of Set method to implement CAS return false, ErrNotImplemented } // AtomicDelete deletes a value at "key" if the key has not // been modified in the meantime, throws an error if this is the case -func (s *Zookeeper) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) { +func (s *Zookeeper) AtomicDelete(key string, previous *KVEntry) (bool, error) { return false, ErrNotImplemented }