Refactor helpers to be more meaningful and flexible for future store usage

Signed-off-by: Alexandre Beslic <abronan@docker.com>
This commit is contained in:
Alexandre Beslic 2015-05-13 15:39:36 -07:00
parent ac8c210384
commit ccf6e37c16
4 changed files with 55 additions and 53 deletions

View File

@ -4,6 +4,7 @@ import (
"crypto/tls" "crypto/tls"
"errors" "errors"
"net/http" "net/http"
"strings"
"time" "time"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
@ -78,10 +79,16 @@ func (s *Consul) setTimeout(time time.Duration) {
s.config.WaitTime = time s.config.WaitTime = time
} }
// Normalize the key for usage in Consul
func (s *Consul) normalize(key string) string {
key = normalize(key)
return strings.TrimPrefix(key, "/")
}
// Get the value at "key", returns the last modified index // Get the value at "key", returns the last modified index
// to use in conjunction to CAS calls // to use in conjunction to CAS calls
func (s *Consul) Get(key string) (value []byte, lastIndex uint64, err error) { func (s *Consul) Get(key string) (value []byte, lastIndex uint64, err error) {
pair, meta, err := s.client.KV().Get(partialFormat(key), nil) pair, meta, err := s.client.KV().Get(s.normalize(key), nil)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
@ -93,7 +100,7 @@ func (s *Consul) Get(key string) (value []byte, lastIndex uint64, err error) {
// Put a value at "key" // Put a value at "key"
func (s *Consul) Put(key string, value []byte) error { func (s *Consul) Put(key string, value []byte) error {
p := &api.KVPair{Key: partialFormat(key), Value: value} p := &api.KVPair{Key: s.normalize(key), Value: value}
if s.client == nil { if s.client == nil {
log.Error("Error initializing client") log.Error("Error initializing client")
} }
@ -103,7 +110,7 @@ func (s *Consul) Put(key string, value []byte) error {
// Delete a value at "key" // Delete a value at "key"
func (s *Consul) Delete(key string) error { func (s *Consul) Delete(key string) error {
_, err := s.client.KV().Delete(partialFormat(key), nil) _, err := s.client.KV().Delete(s.normalize(key), nil)
return err return err
} }
@ -118,7 +125,7 @@ func (s *Consul) Exists(key string) (bool, error) {
// GetRange gets a range of values at "directory" // GetRange gets a range of values at "directory"
func (s *Consul) GetRange(prefix string) (kvi []KVEntry, err error) { func (s *Consul) GetRange(prefix string) (kvi []KVEntry, err error) {
pairs, _, err := s.client.KV().List(partialFormat(prefix), nil) pairs, _, err := s.client.KV().List(s.normalize(prefix), nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -136,13 +143,13 @@ func (s *Consul) GetRange(prefix string) (kvi []KVEntry, err error) {
// DeleteRange deletes a range of values at "directory" // DeleteRange deletes a range of values at "directory"
func (s *Consul) DeleteRange(prefix string) error { func (s *Consul) DeleteRange(prefix string) error {
_, err := s.client.KV().DeleteTree(partialFormat(prefix), nil) _, err := s.client.KV().DeleteTree(s.normalize(prefix), nil)
return err return err
} }
// Watch a single key for modifications // Watch a single key for modifications
func (s *Consul) Watch(key string, heartbeat time.Duration, callback WatchCallback) error { func (s *Consul) Watch(key string, heartbeat time.Duration, callback WatchCallback) error {
fkey := partialFormat(key) fkey := s.normalize(key)
// We get the last index first // We get the last index first
_, meta, err := s.client.KV().Get(fkey, nil) _, meta, err := s.client.KV().Get(fkey, nil)
@ -171,7 +178,7 @@ func (s *Consul) Watch(key string, heartbeat time.Duration, callback WatchCallba
// CancelWatch cancels a watch, sends a signal to the appropriate // CancelWatch cancels a watch, sends a signal to the appropriate
// stop channel // stop channel
func (s *Consul) CancelWatch(key string) error { func (s *Consul) CancelWatch(key string) error {
key = partialFormat(key) key = s.normalize(key)
if _, ok := s.watches[key]; !ok { if _, ok := s.watches[key]; !ok {
log.Error("Chan does not exist for key: ", key) log.Error("Chan does not exist for key: ", key)
return ErrWatchDoesNotExist return ErrWatchDoesNotExist
@ -210,7 +217,7 @@ func (s *Consul) waitForChange(key string) <-chan uint64 {
// WatchRange triggers a watch on a range of values at "directory" // WatchRange triggers a watch on a range of values at "directory"
func (s *Consul) WatchRange(prefix string, filter string, heartbeat time.Duration, callback WatchCallback) error { func (s *Consul) WatchRange(prefix string, filter string, heartbeat time.Duration, callback WatchCallback) error {
fprefix := partialFormat(prefix) fprefix := s.normalize(prefix)
// We get the last index first // We get the last index first
_, meta, err := s.client.KV().Get(prefix, nil) _, meta, err := s.client.KV().Get(prefix, nil)
@ -246,7 +253,7 @@ func (s *Consul) CancelWatchRange(prefix string) error {
// to acquire and release the mutex. // to acquire and release the mutex.
func (s *Consul) CreateLock(key string, value []byte) (Locker, error) { func (s *Consul) CreateLock(key string, value []byte) (Locker, error) {
l, err := s.client.LockOpts(&api.LockOptions{ l, err := s.client.LockOpts(&api.LockOptions{
Key: partialFormat(key), Key: s.normalize(key),
Value: value, Value: value,
}) })
if err != nil { if err != nil {
@ -271,7 +278,7 @@ func (l *consulLock) Unlock() error {
// AtomicPut put a value at "key" if the key has not been // AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case // modified in the meantime, throws an error if this is the case
func (s *Consul) AtomicPut(key string, _ []byte, newValue []byte, index uint64) (bool, error) { func (s *Consul) AtomicPut(key string, _ []byte, newValue []byte, index uint64) (bool, error) {
p := &api.KVPair{Key: partialFormat(key), Value: newValue, ModifyIndex: index} p := &api.KVPair{Key: s.normalize(key), Value: newValue, ModifyIndex: index}
if work, _, err := s.client.KV().CAS(p, nil); err != nil { if work, _, err := s.client.KV().CAS(p, nil); err != nil {
return false, err return false, err
} else if !work { } else if !work {
@ -283,7 +290,7 @@ func (s *Consul) AtomicPut(key string, _ []byte, newValue []byte, index uint64)
// AtomicDelete deletes a value at "key" if the key has not // AtomicDelete deletes a value at "key" if the key has not
// been modified in the meantime, throws an error if this is the case // been modified in the meantime, throws an error if this is the case
func (s *Consul) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) { func (s *Consul) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) {
p := &api.KVPair{Key: partialFormat(key), ModifyIndex: index} p := &api.KVPair{Key: s.normalize(key), ModifyIndex: index}
if work, _, err := s.client.KV().DeleteCAS(p, nil); err != nil { if work, _, err := s.client.KV().DeleteCAS(p, nil); err != nil {
return false, err return false, err
} else if !work { } else if !work {

View File

@ -70,7 +70,7 @@ func (s *Etcd) setTimeout(time time.Duration) {
// Create the entire path for a directory that does not exist // Create the entire path for a directory that does not exist
func (s *Etcd) createDirectory(path string) error { func (s *Etcd) createDirectory(path string) error {
// TODO Handle TTL at key/dir creation -> use K/V struct for key infos? // TODO Handle TTL at key/dir creation -> use K/V struct for key infos?
if _, err := s.client.CreateDir(format(path), 10); err != nil { if _, err := s.client.CreateDir(normalize(path), 10); err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok { if etcdError, ok := err.(*etcd.EtcdError); ok {
if etcdError.ErrorCode != 105 { // Skip key already exists if etcdError.ErrorCode != 105 { // Skip key already exists
return err return err
@ -85,7 +85,7 @@ func (s *Etcd) createDirectory(path string) error {
// Get the value at "key", returns the last modified index // Get the value at "key", returns the last modified index
// to use in conjunction to CAS calls // to use in conjunction to CAS calls
func (s *Etcd) Get(key string) (value []byte, lastIndex uint64, err error) { func (s *Etcd) Get(key string) (value []byte, lastIndex uint64, err error) {
result, err := s.client.Get(format(key), false, false) result, err := s.client.Get(normalize(key), false, false)
if err != nil { if err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok { if etcdError, ok := err.(*etcd.EtcdError); ok {
// Not a Directory or Not a file // Not a Directory or Not a file
@ -104,7 +104,7 @@ func (s *Etcd) Put(key string, value []byte) error {
if etcdError, ok := err.(*etcd.EtcdError); ok { if etcdError, ok := err.(*etcd.EtcdError); ok {
if etcdError.ErrorCode == 104 { // Not a directory if etcdError.ErrorCode == 104 { // Not a directory
// Remove the last element (the actual key) and set the prefix as a dir // Remove the last element (the actual key) and set the prefix as a dir
err = s.createDirectory(getDir(key)) err = s.createDirectory(getDirectory(key, false))
if _, err := s.client.Set(key, string(value), 0); err != nil { if _, err := s.client.Set(key, string(value), 0); err != nil {
return err return err
} }
@ -117,7 +117,7 @@ func (s *Etcd) Put(key string, value []byte) error {
// Delete a value at "key" // Delete a value at "key"
func (s *Etcd) Delete(key string) error { func (s *Etcd) Delete(key string) error {
if _, err := s.client.Delete(format(key), false); err != nil { if _, err := s.client.Delete(normalize(key), false); err != nil {
return err return err
} }
return nil return nil
@ -137,7 +137,7 @@ func (s *Etcd) Exists(key string) (bool, error) {
// Watch a single key for modifications // Watch a single key for modifications
func (s *Etcd) Watch(key string, _ time.Duration, callback WatchCallback) error { func (s *Etcd) Watch(key string, _ time.Duration, callback WatchCallback) error {
key = format(key) key = normalize(key)
watchChan := make(chan *etcd.Response) watchChan := make(chan *etcd.Response)
stopChan := make(chan bool) stopChan := make(chan bool)
@ -163,7 +163,7 @@ func (s *Etcd) Watch(key string, _ time.Duration, callback WatchCallback) error
// CancelWatch cancels a watch, sends a signal to the appropriate // CancelWatch cancels a watch, sends a signal to the appropriate
// stop channel // stop channel
func (s *Etcd) CancelWatch(key string) error { func (s *Etcd) CancelWatch(key string) error {
key = format(key) key = normalize(key)
if _, ok := s.watches[key]; !ok { if _, ok := s.watches[key]; !ok {
log.Error("Chan does not exist for key: ", key) log.Error("Chan does not exist for key: ", key)
return ErrWatchDoesNotExist return ErrWatchDoesNotExist
@ -177,7 +177,7 @@ func (s *Etcd) CancelWatch(key string) error {
// AtomicPut put a value at "key" if the key has not been // AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case // modified in the meantime, throws an error if this is the case
func (s *Etcd) AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) { func (s *Etcd) AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) {
resp, err := s.client.CompareAndSwap(format(key), string(newValue), 5, string(oldValue), 0) resp, err := s.client.CompareAndSwap(normalize(key), string(newValue), 5, string(oldValue), 0)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -193,7 +193,7 @@ func (s *Etcd) AtomicPut(key string, oldValue []byte, newValue []byte, index uin
// AtomicDelete deletes a value at "key" if the key has not // AtomicDelete deletes a value at "key" if the key has not
// been modified in the meantime, throws an error if this is the case // been modified in the meantime, throws an error if this is the case
func (s *Etcd) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) { func (s *Etcd) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) {
resp, err := s.client.CompareAndDelete(format(key), string(oldValue), 0) resp, err := s.client.CompareAndDelete(normalize(key), string(oldValue), 0)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -205,7 +205,7 @@ func (s *Etcd) AtomicDelete(key string, oldValue []byte, index uint64) (bool, er
// GetRange gets a range of values at "directory" // GetRange gets a range of values at "directory"
func (s *Etcd) GetRange(prefix string) ([]KVEntry, error) { func (s *Etcd) GetRange(prefix string) ([]KVEntry, error) {
resp, err := s.client.Get(format(prefix), true, true) resp, err := s.client.Get(normalize(prefix), true, true)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -218,7 +218,7 @@ func (s *Etcd) GetRange(prefix string) ([]KVEntry, error) {
// DeleteRange deletes a range of values at "directory" // DeleteRange deletes a range of values at "directory"
func (s *Etcd) DeleteRange(prefix string) error { func (s *Etcd) DeleteRange(prefix string) error {
if _, err := s.client.Delete(format(prefix), true); err != nil { if _, err := s.client.Delete(normalize(prefix), true); err != nil {
return err return err
} }
return nil return nil
@ -226,7 +226,7 @@ func (s *Etcd) DeleteRange(prefix string) error {
// WatchRange triggers a watch on a range of values at "directory" // WatchRange triggers a watch on a range of values at "directory"
func (s *Etcd) WatchRange(prefix string, filter string, _ time.Duration, callback WatchCallback) error { func (s *Etcd) WatchRange(prefix string, filter string, _ time.Duration, callback WatchCallback) error {
prefix = format(prefix) prefix = normalize(prefix)
watchChan := make(chan *etcd.Response) watchChan := make(chan *etcd.Response)
stopChan := make(chan bool) stopChan := make(chan bool)
@ -251,7 +251,7 @@ func (s *Etcd) WatchRange(prefix string, filter string, _ time.Duration, callbac
// CancelWatchRange stops the watch on the range of values, sends // CancelWatchRange stops the watch on the range of values, sends
// a signal to the appropriate stop channel // a signal to the appropriate stop channel
func (s *Etcd) CancelWatchRange(prefix string) error { func (s *Etcd) CancelWatchRange(prefix string) error {
return s.CancelWatch(format(prefix)) return s.CancelWatch(normalize(prefix))
} }
// CreateLock returns a handle to a lock struct which can be used // CreateLock returns a handle to a lock struct which can be used

View File

@ -12,21 +12,22 @@ func createEndpoints(addrs []string, scheme string) (entries []string) {
return entries return entries
} }
// Formats the key // Normalize the key for each store to the form:
func format(key string) string { //
return fullpath(splitKey(key)) // /path/to/key
//
func normalize(key string) string {
return "/" + join(splitKey(key))
} }
// Formats the key partially (omits the first '/') // Get the full directory part of the key to the form:
func partialFormat(key string) string { //
return partialpath(splitKey(key)) // /path/to/
} //
func getDirectory(key string, omit bool) string {
// Get the full directory part of the key
func getDir(key string) string {
parts := splitKey(key) parts := splitKey(key)
parts = parts[:len(parts)-1] parts = parts[:len(parts)-1]
return fullpath(parts) return "/" + join(parts)
} }
// SplitKey splits the key to extract path informations // SplitKey splits the key to extract path informations
@ -39,13 +40,7 @@ func splitKey(key string) (path []string) {
return path return path
} }
// Get the full correct path representation of a splitted key/directory // Join the path parts with '/'
func fullpath(path []string) string { func join(parts []string) string {
return "/" + strings.Join(path, "/") return strings.Join(parts, "/")
}
// Get the partial correct path representation of a splitted key/directory
// Omits the first '/'
func partialpath(path []string) string {
return strings.Join(path, "/")
} }

View File

@ -48,7 +48,7 @@ func (s *Zookeeper) setTimeout(time time.Duration) {
// Get the value at "key", returns the last modified index // Get the value at "key", returns the last modified index
// to use in conjunction to CAS calls // to use in conjunction to CAS calls
func (s *Zookeeper) Get(key string) (value []byte, lastIndex uint64, err error) { func (s *Zookeeper) Get(key string) (value []byte, lastIndex uint64, err error) {
resp, meta, err := s.client.Get(format(key)) resp, meta, err := s.client.Get(normalize(key))
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
@ -75,7 +75,7 @@ func (s *Zookeeper) createFullpath(path []string) error {
// Put a value at "key" // Put a value at "key"
func (s *Zookeeper) Put(key string, value []byte) error { func (s *Zookeeper) Put(key string, value []byte) error {
fkey := format(key) fkey := normalize(key)
exists, err := s.Exists(key) exists, err := s.Exists(key)
if err != nil { if err != nil {
return err return err
@ -89,13 +89,13 @@ func (s *Zookeeper) Put(key string, value []byte) error {
// Delete a value at "key" // Delete a value at "key"
func (s *Zookeeper) Delete(key string) error { func (s *Zookeeper) Delete(key string) error {
err := s.client.Delete(format(key), -1) err := s.client.Delete(normalize(key), -1)
return err return err
} }
// Exists checks if the key exists inside the store // Exists checks if the key exists inside the store
func (s *Zookeeper) Exists(key string) (bool, error) { func (s *Zookeeper) Exists(key string) (bool, error) {
exists, _, err := s.client.Exists(format(key)) exists, _, err := s.client.Exists(normalize(key))
if err != nil { if err != nil {
return false, err return false, err
} }
@ -104,7 +104,7 @@ func (s *Zookeeper) Exists(key string) (bool, error) {
// Watch a single key for modifications // Watch a single key for modifications
func (s *Zookeeper) Watch(key string, _ time.Duration, callback WatchCallback) error { func (s *Zookeeper) Watch(key string, _ time.Duration, callback WatchCallback) error {
fkey := format(key) fkey := normalize(key)
_, _, eventChan, err := s.client.GetW(fkey) _, _, eventChan, err := s.client.GetW(fkey)
if err != nil { if err != nil {
return err return err
@ -129,7 +129,7 @@ func (s *Zookeeper) Watch(key string, _ time.Duration, callback WatchCallback) e
// CancelWatch cancels a watch, sends a signal to the appropriate // CancelWatch cancels a watch, sends a signal to the appropriate
// stop channel // stop channel
func (s *Zookeeper) CancelWatch(key string) error { func (s *Zookeeper) CancelWatch(key string) error {
key = format(key) key = normalize(key)
if _, ok := s.watches[key]; !ok { if _, ok := s.watches[key]; !ok {
log.Error("Chan does not exist for key: ", key) log.Error("Chan does not exist for key: ", key)
return ErrWatchDoesNotExist return ErrWatchDoesNotExist
@ -141,7 +141,7 @@ func (s *Zookeeper) CancelWatch(key string) error {
// GetRange gets a range of values at "directory" // GetRange gets a range of values at "directory"
func (s *Zookeeper) GetRange(prefix string) (kvi []KVEntry, err error) { func (s *Zookeeper) GetRange(prefix string) (kvi []KVEntry, err error) {
prefix = format(prefix) prefix = normalize(prefix)
entries, stat, err := s.client.Children(prefix) entries, stat, err := s.client.Children(prefix)
if err != nil { if err != nil {
log.Error("Cannot fetch range of keys beginning with prefix: ", prefix) log.Error("Cannot fetch range of keys beginning with prefix: ", prefix)
@ -155,13 +155,13 @@ func (s *Zookeeper) GetRange(prefix string) (kvi []KVEntry, err error) {
// DeleteRange deletes a range of values at "directory" // DeleteRange deletes a range of values at "directory"
func (s *Zookeeper) DeleteRange(prefix string) error { func (s *Zookeeper) DeleteRange(prefix string) error {
err := s.client.Delete(format(prefix), -1) err := s.client.Delete(normalize(prefix), -1)
return err return err
} }
// WatchRange triggers a watch on a range of values at "directory" // WatchRange triggers a watch on a range of values at "directory"
func (s *Zookeeper) WatchRange(prefix string, filter string, _ time.Duration, callback WatchCallback) error { func (s *Zookeeper) WatchRange(prefix string, filter string, _ time.Duration, callback WatchCallback) error {
fprefix := format(prefix) fprefix := normalize(prefix)
_, _, eventChan, err := s.client.ChildrenW(fprefix) _, _, eventChan, err := s.client.ChildrenW(fprefix)
if err != nil { if err != nil {
return err return err
@ -207,7 +207,7 @@ func (s *Zookeeper) AtomicDelete(key string, oldValue []byte, index uint64) (boo
func (s *Zookeeper) CreateLock(key string, value []byte) (Locker, error) { func (s *Zookeeper) CreateLock(key string, value []byte) (Locker, error) {
// FIXME: `value` is not being used since there is no support in zk.NewLock(). // FIXME: `value` is not being used since there is no support in zk.NewLock().
return &zookeeperLock{ return &zookeeperLock{
lock: zk.NewLock(s.client, format(key), zk.WorldACL(zk.PermAll)), lock: zk.NewLock(s.client, normalize(key), zk.WorldACL(zk.PermAll)),
}, nil }, nil
} }