store: KVEntry -> KVPair

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2015-05-14 22:00:27 -07:00
parent 895484ec2a
commit f87505e2e7
5 changed files with 30 additions and 30 deletions

View File

@ -71,7 +71,7 @@ func (s *Discovery) Fetch() ([]*discovery.Entry, error) {
// Watch is exported
func (s *Discovery) Watch(callback discovery.WatchCallback) {
s.store.WatchTree(s.prefix, "", s.heartbeat, func(kv ...*store.KVEntry) {
s.store.WatchTree(s.prefix, "", s.heartbeat, func(kv ...*store.KVPair) {
// Traduce byte array entries to discovery.Entry
entries, _ := discovery.CreateEntries(convertToStringArray(kv))
callback(entries)
@ -84,7 +84,7 @@ func (s *Discovery) Register(addr string) error {
return err
}
func convertToStringArray(entries []*store.KVEntry) (addrs []string) {
func convertToStringArray(entries []*store.KVPair) (addrs []string) {
for _, entry := range entries {
addrs = append(addrs, string(entry.Value))
}

View File

@ -81,7 +81,7 @@ func (s *Consul) normalize(key string) string {
// Get the value at "key", returns the last modified index
// to use in conjunction to CAS calls
func (s *Consul) Get(key string) (*KVEntry, error) {
func (s *Consul) Get(key string) (*KVPair, error) {
pair, meta, err := s.client.KV().Get(s.normalize(key), nil)
if err != nil {
return nil, err
@ -89,7 +89,7 @@ func (s *Consul) Get(key string) (*KVEntry, error) {
if pair == nil {
return nil, ErrKeyNotFound
}
return &KVEntry{pair.Key, pair.Value, meta.LastIndex}, nil
return &KVPair{pair.Key, pair.Value, meta.LastIndex}, nil
}
// Put a value at "key"
@ -118,7 +118,7 @@ func (s *Consul) Exists(key string) (bool, error) {
}
// GetRange gets a range of values at "directory"
func (s *Consul) List(prefix string) ([]*KVEntry, error) {
func (s *Consul) List(prefix string) ([]*KVPair, error) {
pairs, _, err := s.client.KV().List(s.normalize(prefix), nil)
if err != nil {
return nil, err
@ -126,12 +126,12 @@ func (s *Consul) List(prefix string) ([]*KVEntry, error) {
if len(pairs) == 0 {
return nil, ErrKeyNotFound
}
kv := []*KVEntry{}
kv := []*KVPair{}
for _, pair := range pairs {
if pair.Key == prefix {
continue
}
kv = append(kv, &KVEntry{pair.Key, pair.Value, pair.ModifyIndex})
kv = append(kv, &KVPair{pair.Key, pair.Value, pair.ModifyIndex})
}
return kv, nil
}
@ -271,7 +271,7 @@ func (l *consulLock) Unlock() error {
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Consul) AtomicPut(key string, value []byte, previous *KVEntry) (bool, error) {
func (s *Consul) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) {
p := &api.KVPair{Key: s.normalize(key), Value: value, ModifyIndex: previous.LastIndex}
if work, _, err := s.client.KV().CAS(p, nil); err != nil {
return false, err
@ -283,7 +283,7 @@ func (s *Consul) AtomicPut(key string, value []byte, previous *KVEntry) (bool, e
// AtomicDelete deletes a value at "key" if the key has not
// been modified in the meantime, throws an error if this is the case
func (s *Consul) AtomicDelete(key string, previous *KVEntry) (bool, error) {
func (s *Consul) AtomicDelete(key string, previous *KVPair) (bool, error) {
p := &api.KVPair{Key: s.normalize(key), ModifyIndex: previous.LastIndex}
if work, _, err := s.client.KV().DeleteCAS(p, nil); err != nil {
return false, err

View File

@ -84,7 +84,7 @@ func (s *Etcd) createDirectory(path string) error {
// Get the value at "key", returns the last modified index
// to use in conjunction to CAS calls
func (s *Etcd) Get(key string) (*KVEntry, error) {
func (s *Etcd) Get(key string) (*KVPair, error) {
result, err := s.client.Get(normalize(key), false, false)
if err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
@ -95,7 +95,7 @@ func (s *Etcd) Get(key string) (*KVEntry, error) {
}
return nil, err
}
return &KVEntry{result.Node.Key, []byte(result.Node.Value), result.Node.ModifiedIndex}, nil
return &KVPair{result.Node.Key, []byte(result.Node.Value), result.Node.ModifiedIndex}, nil
}
// Put a value at "key"
@ -176,7 +176,7 @@ func (s *Etcd) CancelWatch(key string) error {
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Etcd) AtomicPut(key string, value []byte, previous *KVEntry) (bool, error) {
func (s *Etcd) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) {
_, err := s.client.CompareAndSwap(normalize(key), string(value), 0, "", previous.LastIndex)
if err != nil {
return false, err
@ -186,7 +186,7 @@ func (s *Etcd) AtomicPut(key string, value []byte, previous *KVEntry) (bool, err
// AtomicDelete deletes a value at "key" if the key has not
// been modified in the meantime, throws an error if this is the case
func (s *Etcd) AtomicDelete(key string, previous *KVEntry) (bool, error) {
func (s *Etcd) AtomicDelete(key string, previous *KVPair) (bool, error) {
_, err := s.client.CompareAndDelete(normalize(key), "", previous.LastIndex)
if err != nil {
return false, err
@ -195,14 +195,14 @@ func (s *Etcd) AtomicDelete(key string, previous *KVEntry) (bool, error) {
}
// GetRange gets a range of values at "directory"
func (s *Etcd) List(prefix string) ([]*KVEntry, error) {
func (s *Etcd) List(prefix string) ([]*KVPair, error) {
resp, err := s.client.Get(normalize(prefix), true, true)
if err != nil {
return nil, err
}
kv := []*KVEntry{}
kv := []*KVPair{}
for _, n := range resp.Node.Nodes {
kv = append(kv, &KVEntry{n.Key, []byte(n.Value), n.ModifiedIndex})
kv = append(kv, &KVPair{n.Key, []byte(n.Value), n.ModifiedIndex})
}
return kv, nil
}

View File

@ -20,7 +20,7 @@ const (
// WatchCallback is used for watch methods on keys
// and is triggered on key change
type WatchCallback func(entries ...*KVEntry)
type WatchCallback func(entries ...*KVPair)
// Initialize creates a new Store object, initializing the client
type Initialize func(addrs []string, options *Config) (Store, error)
@ -34,7 +34,7 @@ type Store interface {
Put(key string, value []byte) error
// Get a value given its key
Get(key string) (*KVEntry, error)
Get(key string) (*KVPair, error)
// Delete the value at the specified key
Delete(key string) error
@ -54,7 +54,7 @@ type Store interface {
CreateLock(key string, value []byte) (Locker, error)
// Get range of keys based on prefix
List(prefix string) ([]*KVEntry, error)
List(prefix string) ([]*KVPair, error)
// Delete range of keys based on prefix
DeleteTree(prefix string) error
@ -66,14 +66,14 @@ type Store interface {
CancelWatchRange(prefix string) error
// Atomic operation on a single value
AtomicPut(key string, value []byte, previous *KVEntry) (bool, error)
AtomicPut(key string, value []byte, previous *KVPair) (bool, error)
// Atomic delete of a single value
AtomicDelete(key string, previous *KVEntry) (bool, error)
AtomicDelete(key string, previous *KVPair) (bool, error)
}
// KVEntry represents {Key, Value, Lastindex} tuple
type KVEntry struct {
// KVPair represents {Key, Value, Lastindex} tuple
type KVPair struct {
Key string
Value []byte
LastIndex uint64

View File

@ -47,7 +47,7 @@ func (s *Zookeeper) setTimeout(time time.Duration) {
// Get the value at "key", returns the last modified index
// to use in conjunction to CAS calls
func (s *Zookeeper) Get(key string) (*KVEntry, error) {
func (s *Zookeeper) Get(key string) (*KVPair, error) {
resp, meta, err := s.client.Get(normalize(key))
if err != nil {
return nil, err
@ -55,7 +55,7 @@ func (s *Zookeeper) Get(key string) (*KVEntry, error) {
if resp == nil {
return nil, ErrKeyNotFound
}
return &KVEntry{key, resp, uint64(meta.Mzxid)}, nil
return &KVPair{key, resp, uint64(meta.Mzxid)}, nil
}
// Create the entire path for a directory that does not exist
@ -140,16 +140,16 @@ func (s *Zookeeper) CancelWatch(key string) error {
}
// GetRange gets a range of values at "directory"
func (s *Zookeeper) List(prefix string) ([]*KVEntry, error) {
func (s *Zookeeper) List(prefix string) ([]*KVPair, error) {
prefix = normalize(prefix)
entries, stat, err := s.client.Children(prefix)
if err != nil {
log.Error("Cannot fetch range of keys beginning with prefix: ", prefix)
return nil, err
}
kv := []*KVEntry{}
kv := []*KVPair{}
for _, item := range entries {
kv = append(kv, &KVEntry{prefix, []byte(item), uint64(stat.Mzxid)})
kv = append(kv, &KVPair{prefix, []byte(item), uint64(stat.Mzxid)})
}
return kv, err
}
@ -192,14 +192,14 @@ func (s *Zookeeper) CancelWatchRange(prefix string) error {
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVEntry) (bool, error) {
func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) {
// Use index of Set method to implement CAS
return false, ErrNotImplemented
}
// AtomicDelete deletes a value at "key" if the key has not
// been modified in the meantime, throws an error if this is the case
func (s *Zookeeper) AtomicDelete(key string, previous *KVEntry) (bool, error) {
func (s *Zookeeper) AtomicDelete(key string, previous *KVPair) (bool, error) {
return false, ErrNotImplemented
}