diff --git a/pkg/store/consul.go b/pkg/store/consul.go index 2be4252f64..eeacc0ab07 100644 --- a/pkg/store/consul.go +++ b/pkg/store/consul.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "errors" "net/http" + "strings" "time" log "github.com/Sirupsen/logrus" @@ -78,10 +79,16 @@ func (s *Consul) setTimeout(time time.Duration) { s.config.WaitTime = time } +// Normalize the key for usage in Consul +func (s *Consul) normalize(key string) string { + key = normalize(key) + return strings.TrimPrefix(key, "/") +} + // Get the value at "key", returns the last modified index // to use in conjunction to CAS calls func (s *Consul) Get(key string) (value []byte, lastIndex uint64, err error) { - pair, meta, err := s.client.KV().Get(partialFormat(key), nil) + pair, meta, err := s.client.KV().Get(s.normalize(key), nil) if err != nil { return nil, 0, err } @@ -93,7 +100,7 @@ func (s *Consul) Get(key string) (value []byte, lastIndex uint64, err error) { // Put a value at "key" func (s *Consul) Put(key string, value []byte) error { - p := &api.KVPair{Key: partialFormat(key), Value: value} + p := &api.KVPair{Key: s.normalize(key), Value: value} if s.client == nil { log.Error("Error initializing client") } @@ -103,7 +110,7 @@ func (s *Consul) Put(key string, value []byte) error { // Delete a value at "key" func (s *Consul) Delete(key string) error { - _, err := s.client.KV().Delete(partialFormat(key), nil) + _, err := s.client.KV().Delete(s.normalize(key), nil) return err } @@ -118,7 +125,7 @@ func (s *Consul) Exists(key string) (bool, error) { // GetRange gets a range of values at "directory" func (s *Consul) GetRange(prefix string) (kvi []KVEntry, err error) { - pairs, _, err := s.client.KV().List(partialFormat(prefix), nil) + pairs, _, err := s.client.KV().List(s.normalize(prefix), nil) if err != nil { return nil, err } @@ -136,13 +143,13 @@ func (s *Consul) GetRange(prefix string) (kvi []KVEntry, err error) { // DeleteRange deletes a range of values at "directory" func (s *Consul) DeleteRange(prefix string) error { - _, err := s.client.KV().DeleteTree(partialFormat(prefix), nil) + _, err := s.client.KV().DeleteTree(s.normalize(prefix), nil) return err } // Watch a single key for modifications func (s *Consul) Watch(key string, heartbeat time.Duration, callback WatchCallback) error { - fkey := partialFormat(key) + fkey := s.normalize(key) // We get the last index first _, meta, err := s.client.KV().Get(fkey, nil) @@ -171,7 +178,7 @@ func (s *Consul) Watch(key string, heartbeat time.Duration, callback WatchCallba // CancelWatch cancels a watch, sends a signal to the appropriate // stop channel func (s *Consul) CancelWatch(key string) error { - key = partialFormat(key) + key = s.normalize(key) if _, ok := s.watches[key]; !ok { log.Error("Chan does not exist for key: ", key) return ErrWatchDoesNotExist @@ -210,7 +217,7 @@ func (s *Consul) waitForChange(key string) <-chan uint64 { // WatchRange triggers a watch on a range of values at "directory" func (s *Consul) WatchRange(prefix string, filter string, heartbeat time.Duration, callback WatchCallback) error { - fprefix := partialFormat(prefix) + fprefix := s.normalize(prefix) // We get the last index first _, meta, err := s.client.KV().Get(prefix, nil) @@ -246,7 +253,7 @@ func (s *Consul) CancelWatchRange(prefix string) error { // to acquire and release the mutex. func (s *Consul) CreateLock(key string, value []byte) (Locker, error) { l, err := s.client.LockOpts(&api.LockOptions{ - Key: partialFormat(key), + Key: s.normalize(key), Value: value, }) if err != nil { @@ -271,7 +278,7 @@ func (l *consulLock) Unlock() error { // AtomicPut put a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case func (s *Consul) AtomicPut(key string, _ []byte, newValue []byte, index uint64) (bool, error) { - p := &api.KVPair{Key: partialFormat(key), Value: newValue, ModifyIndex: index} + p := &api.KVPair{Key: s.normalize(key), Value: newValue, ModifyIndex: index} if work, _, err := s.client.KV().CAS(p, nil); err != nil { return false, err } else if !work { @@ -283,7 +290,7 @@ func (s *Consul) AtomicPut(key string, _ []byte, newValue []byte, index uint64) // AtomicDelete deletes a value at "key" if the key has not // been modified in the meantime, throws an error if this is the case func (s *Consul) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) { - p := &api.KVPair{Key: partialFormat(key), ModifyIndex: index} + p := &api.KVPair{Key: s.normalize(key), ModifyIndex: index} if work, _, err := s.client.KV().DeleteCAS(p, nil); err != nil { return false, err } else if !work { diff --git a/pkg/store/etcd.go b/pkg/store/etcd.go index 2a257d5847..7b03bd8205 100644 --- a/pkg/store/etcd.go +++ b/pkg/store/etcd.go @@ -70,7 +70,7 @@ func (s *Etcd) setTimeout(time time.Duration) { // Create the entire path for a directory that does not exist func (s *Etcd) createDirectory(path string) error { // TODO Handle TTL at key/dir creation -> use K/V struct for key infos? - if _, err := s.client.CreateDir(format(path), 10); err != nil { + if _, err := s.client.CreateDir(normalize(path), 10); err != nil { if etcdError, ok := err.(*etcd.EtcdError); ok { if etcdError.ErrorCode != 105 { // Skip key already exists return err @@ -85,7 +85,7 @@ func (s *Etcd) createDirectory(path string) error { // Get the value at "key", returns the last modified index // to use in conjunction to CAS calls func (s *Etcd) Get(key string) (value []byte, lastIndex uint64, err error) { - result, err := s.client.Get(format(key), false, false) + result, err := s.client.Get(normalize(key), false, false) if err != nil { if etcdError, ok := err.(*etcd.EtcdError); ok { // Not a Directory or Not a file @@ -104,7 +104,7 @@ func (s *Etcd) Put(key string, value []byte) error { if etcdError, ok := err.(*etcd.EtcdError); ok { if etcdError.ErrorCode == 104 { // Not a directory // Remove the last element (the actual key) and set the prefix as a dir - err = s.createDirectory(getDir(key)) + err = s.createDirectory(getDirectory(key, false)) if _, err := s.client.Set(key, string(value), 0); err != nil { return err } @@ -117,7 +117,7 @@ func (s *Etcd) Put(key string, value []byte) error { // Delete a value at "key" func (s *Etcd) Delete(key string) error { - if _, err := s.client.Delete(format(key), false); err != nil { + if _, err := s.client.Delete(normalize(key), false); err != nil { return err } return nil @@ -137,7 +137,7 @@ func (s *Etcd) Exists(key string) (bool, error) { // Watch a single key for modifications func (s *Etcd) Watch(key string, _ time.Duration, callback WatchCallback) error { - key = format(key) + key = normalize(key) watchChan := make(chan *etcd.Response) stopChan := make(chan bool) @@ -163,7 +163,7 @@ func (s *Etcd) Watch(key string, _ time.Duration, callback WatchCallback) error // CancelWatch cancels a watch, sends a signal to the appropriate // stop channel func (s *Etcd) CancelWatch(key string) error { - key = format(key) + key = normalize(key) if _, ok := s.watches[key]; !ok { log.Error("Chan does not exist for key: ", key) return ErrWatchDoesNotExist @@ -177,7 +177,7 @@ func (s *Etcd) CancelWatch(key string) error { // AtomicPut put a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case func (s *Etcd) AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) { - resp, err := s.client.CompareAndSwap(format(key), string(newValue), 5, string(oldValue), 0) + resp, err := s.client.CompareAndSwap(normalize(key), string(newValue), 5, string(oldValue), 0) if err != nil { return false, err } @@ -193,7 +193,7 @@ func (s *Etcd) AtomicPut(key string, oldValue []byte, newValue []byte, index uin // AtomicDelete deletes a value at "key" if the key has not // been modified in the meantime, throws an error if this is the case func (s *Etcd) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) { - resp, err := s.client.CompareAndDelete(format(key), string(oldValue), 0) + resp, err := s.client.CompareAndDelete(normalize(key), string(oldValue), 0) if err != nil { return false, err } @@ -205,7 +205,7 @@ func (s *Etcd) AtomicDelete(key string, oldValue []byte, index uint64) (bool, er // GetRange gets a range of values at "directory" func (s *Etcd) GetRange(prefix string) ([]KVEntry, error) { - resp, err := s.client.Get(format(prefix), true, true) + resp, err := s.client.Get(normalize(prefix), true, true) if err != nil { return nil, err } @@ -218,7 +218,7 @@ func (s *Etcd) GetRange(prefix string) ([]KVEntry, error) { // DeleteRange deletes a range of values at "directory" func (s *Etcd) DeleteRange(prefix string) error { - if _, err := s.client.Delete(format(prefix), true); err != nil { + if _, err := s.client.Delete(normalize(prefix), true); err != nil { return err } return nil @@ -226,7 +226,7 @@ func (s *Etcd) DeleteRange(prefix string) error { // WatchRange triggers a watch on a range of values at "directory" func (s *Etcd) WatchRange(prefix string, filter string, _ time.Duration, callback WatchCallback) error { - prefix = format(prefix) + prefix = normalize(prefix) watchChan := make(chan *etcd.Response) stopChan := make(chan bool) @@ -251,7 +251,7 @@ func (s *Etcd) WatchRange(prefix string, filter string, _ time.Duration, callbac // CancelWatchRange stops the watch on the range of values, sends // a signal to the appropriate stop channel func (s *Etcd) CancelWatchRange(prefix string) error { - return s.CancelWatch(format(prefix)) + return s.CancelWatch(normalize(prefix)) } // CreateLock returns a handle to a lock struct which can be used diff --git a/pkg/store/helpers.go b/pkg/store/helpers.go index 2c94b54c14..99ba37eafd 100644 --- a/pkg/store/helpers.go +++ b/pkg/store/helpers.go @@ -12,21 +12,22 @@ func createEndpoints(addrs []string, scheme string) (entries []string) { return entries } -// Formats the key -func format(key string) string { - return fullpath(splitKey(key)) +// Normalize the key for each store to the form: +// +// /path/to/key +// +func normalize(key string) string { + return "/" + join(splitKey(key)) } -// Formats the key partially (omits the first '/') -func partialFormat(key string) string { - return partialpath(splitKey(key)) -} - -// Get the full directory part of the key -func getDir(key string) string { +// Get the full directory part of the key to the form: +// +// /path/to/ +// +func getDirectory(key string, omit bool) string { parts := splitKey(key) parts = parts[:len(parts)-1] - return fullpath(parts) + return "/" + join(parts) } // SplitKey splits the key to extract path informations @@ -39,13 +40,7 @@ func splitKey(key string) (path []string) { return path } -// Get the full correct path representation of a splitted key/directory -func fullpath(path []string) string { - return "/" + strings.Join(path, "/") -} - -// Get the partial correct path representation of a splitted key/directory -// Omits the first '/' -func partialpath(path []string) string { - return strings.Join(path, "/") +// Join the path parts with '/' +func join(parts []string) string { + return strings.Join(parts, "/") } diff --git a/pkg/store/zookeeper.go b/pkg/store/zookeeper.go index d12b016a13..60fcc97ac3 100644 --- a/pkg/store/zookeeper.go +++ b/pkg/store/zookeeper.go @@ -48,7 +48,7 @@ func (s *Zookeeper) setTimeout(time time.Duration) { // Get the value at "key", returns the last modified index // to use in conjunction to CAS calls func (s *Zookeeper) Get(key string) (value []byte, lastIndex uint64, err error) { - resp, meta, err := s.client.Get(format(key)) + resp, meta, err := s.client.Get(normalize(key)) if err != nil { return nil, 0, err } @@ -75,7 +75,7 @@ func (s *Zookeeper) createFullpath(path []string) error { // Put a value at "key" func (s *Zookeeper) Put(key string, value []byte) error { - fkey := format(key) + fkey := normalize(key) exists, err := s.Exists(key) if err != nil { return err @@ -89,13 +89,13 @@ func (s *Zookeeper) Put(key string, value []byte) error { // Delete a value at "key" func (s *Zookeeper) Delete(key string) error { - err := s.client.Delete(format(key), -1) + err := s.client.Delete(normalize(key), -1) return err } // Exists checks if the key exists inside the store func (s *Zookeeper) Exists(key string) (bool, error) { - exists, _, err := s.client.Exists(format(key)) + exists, _, err := s.client.Exists(normalize(key)) if err != nil { return false, err } @@ -104,7 +104,7 @@ func (s *Zookeeper) Exists(key string) (bool, error) { // Watch a single key for modifications func (s *Zookeeper) Watch(key string, _ time.Duration, callback WatchCallback) error { - fkey := format(key) + fkey := normalize(key) _, _, eventChan, err := s.client.GetW(fkey) if err != nil { return err @@ -129,7 +129,7 @@ func (s *Zookeeper) Watch(key string, _ time.Duration, callback WatchCallback) e // CancelWatch cancels a watch, sends a signal to the appropriate // stop channel func (s *Zookeeper) CancelWatch(key string) error { - key = format(key) + key = normalize(key) if _, ok := s.watches[key]; !ok { log.Error("Chan does not exist for key: ", key) return ErrWatchDoesNotExist @@ -141,7 +141,7 @@ func (s *Zookeeper) CancelWatch(key string) error { // GetRange gets a range of values at "directory" func (s *Zookeeper) GetRange(prefix string) (kvi []KVEntry, err error) { - prefix = format(prefix) + prefix = normalize(prefix) entries, stat, err := s.client.Children(prefix) if err != nil { log.Error("Cannot fetch range of keys beginning with prefix: ", prefix) @@ -155,13 +155,13 @@ func (s *Zookeeper) GetRange(prefix string) (kvi []KVEntry, err error) { // DeleteRange deletes a range of values at "directory" func (s *Zookeeper) DeleteRange(prefix string) error { - err := s.client.Delete(format(prefix), -1) + err := s.client.Delete(normalize(prefix), -1) return err } // WatchRange triggers a watch on a range of values at "directory" func (s *Zookeeper) WatchRange(prefix string, filter string, _ time.Duration, callback WatchCallback) error { - fprefix := format(prefix) + fprefix := normalize(prefix) _, _, eventChan, err := s.client.ChildrenW(fprefix) if err != nil { return err @@ -207,7 +207,7 @@ func (s *Zookeeper) AtomicDelete(key string, oldValue []byte, index uint64) (boo func (s *Zookeeper) CreateLock(key string, value []byte) (Locker, error) { // FIXME: `value` is not being used since there is no support in zk.NewLock(). return &zookeeperLock{ - lock: zk.NewLock(s.client, format(key), zk.WorldACL(zk.PermAll)), + lock: zk.NewLock(s.client, normalize(key), zk.WorldACL(zk.PermAll)), }, nil }