diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 40432e2803..515688570c 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -47,7 +47,7 @@ }, { "ImportPath": "github.com/docker/libkv", - "Rev": "56d6633db7a524c74773fd6b98f02afcd0e6f27a" + "Rev": "4aec61dc3c9c1c4f11221a0c0cfde67ccb8f04c3" }, { "ImportPath": "github.com/gogo/protobuf/proto", diff --git a/Godeps/_workspace/src/github.com/docker/libkv/README.md b/Godeps/_workspace/src/github.com/docker/libkv/README.md index 6cc4ae9b18..941c96925f 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/README.md +++ b/Godeps/_workspace/src/github.com/docker/libkv/README.md @@ -12,7 +12,7 @@ For example, you can use it to store your metadata or for service discovery to r You can also easily implement a generic *Leader Election* on top of it (see the [swarm/leadership](https://github.com/docker/swarm/tree/master/leadership) package). -As of now, `libkv` offers support for `Consul`, `Etcd` and `Zookeeper`. +As of now, `libkv` offers support for `Consul`, `Etcd`, `Zookeeper` and `BoltDB`. ## Example of usage @@ -24,12 +24,18 @@ package main import ( "fmt" "time" - + "github.com/docker/libkv" "github.com/docker/libkv/store" + "github.com/docker/libkv/store/consul" log "github.com/Sirupsen/logrus" ) +func init() { + // Register consul store to libkv + consul.Register() +} + func main() { client := "localhost:8500" @@ -62,11 +68,13 @@ func main() { You can find other usage examples for `libkv` under the `docker/swarm` or `docker/libnetwork` repositories. -## Details +## Warning -You should expect the same experience for basic operations like `Get`/`Put`, etc. +There are a few consistency issues with *etcd*, on the notion of *directory* and *key*. If you want to use the three KV backends in an interchangeable way, you should only put data on leaves (see [Issue 20](https://github.com/docker/libkv/issues/20) for more details). This will be fixed when *etcd* API v3 will be made available (API v3 drops the *directory/key* distinction). An official release for *libkv* with a tag is likely to come after this issue being marked as **solved**. -However calls like `WatchTree` may return different events (or number of events) depending on the backend (for now, `Etcd` and `Consul` will likely return more events than `Zookeeper` that you should triage properly). +Other than that, you should expect the same experience for basic operations like `Get`/`Put`, etc. + +Calls like `WatchTree` may return different events (or number of events) depending on the backend (for now, `Etcd` and `Consul` will likely return more events than `Zookeeper` that you should triage properly). Although you should be able to use it successfully to watch on events in an interchangeable way (see the **swarm/leadership** or **swarm/discovery** packages in **docker/swarm**). ## Create a new storage backend diff --git a/Godeps/_workspace/src/github.com/docker/libkv/libkv.go b/Godeps/_workspace/src/github.com/docker/libkv/libkv.go index 9035f64f3c..7b3d0584ba 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/libkv.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/libkv.go @@ -62,10 +62,11 @@ package libkv import ( + "fmt" + "sort" + "strings" + "github.com/docker/libkv/store" - "github.com/docker/libkv/store/consul" - "github.com/docker/libkv/store/etcd" - "github.com/docker/libkv/store/zookeeper" ) // Initialize creates a new Store object, initializing the client @@ -73,11 +74,16 @@ type Initialize func(addrs []string, options *store.Config) (store.Store, error) var ( // Backend initializers - initializers = map[store.Backend]Initialize{ - store.CONSUL: consul.New, - store.ETCD: etcd.New, - store.ZK: zookeeper.New, - } + initializers = make(map[store.Backend]Initialize) + + supportedBackend = func() string { + keys := make([]string, 0, len(initializers)) + for k := range initializers { + keys = append(keys, string(k)) + } + sort.Strings(keys) + return strings.Join(keys, ", ") + }() ) // NewStore creates a an instance of store @@ -86,5 +92,10 @@ func NewStore(backend store.Backend, addrs []string, options *store.Config) (sto return init(addrs, options) } - return nil, store.ErrNotSupported + return nil, fmt.Errorf("%s %s", store.ErrNotSupported.Error(), supportedBackend) +} + +// AddStore adds a new store backend to libkv +func AddStore(store store.Backend, init Initialize) { + initializers[store] = init } diff --git a/Godeps/_workspace/src/github.com/docker/libkv/libkv_test.go b/Godeps/_workspace/src/github.com/docker/libkv/libkv_test.go index 8b2ae0e6cc..fe7af6b06c 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/libkv_test.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/libkv_test.go @@ -5,66 +5,9 @@ import ( "time" "github.com/docker/libkv/store" - "github.com/docker/libkv/store/consul" - "github.com/docker/libkv/store/etcd" - "github.com/docker/libkv/store/zookeeper" "github.com/stretchr/testify/assert" ) -func TestNewStoreConsul(t *testing.T) { - client := "localhost:8500" - - kv, err := NewStore( - store.CONSUL, - []string{client}, - &store.Config{ - ConnectionTimeout: 10 * time.Second, - }, - ) - assert.NoError(t, err) - assert.NotNil(t, kv) - - if _, ok := kv.(*consul.Consul); !ok { - t.Fatal("Error while initializing store consul") - } -} - -func TestNewStoreEtcd(t *testing.T) { - client := "localhost:4001" - - kv, err := NewStore( - store.ETCD, - []string{client}, - &store.Config{ - ConnectionTimeout: 10 * time.Second, - }, - ) - assert.NoError(t, err) - assert.NotNil(t, kv) - - if _, ok := kv.(*etcd.Etcd); !ok { - t.Fatal("Error while initializing store etcd") - } -} - -func TestNewStoreZookeeper(t *testing.T) { - client := "localhost:2181" - - kv, err := NewStore( - store.ZK, - []string{client}, - &store.Config{ - ConnectionTimeout: 10 * time.Second, - }, - ) - assert.NoError(t, err) - assert.NotNil(t, kv) - - if _, ok := kv.(*zookeeper.Zookeeper); !ok { - t.Fatal("Error while initializing store zookeeper") - } -} - func TestNewStoreUnsupported(t *testing.T) { client := "localhost:9999" @@ -77,4 +20,5 @@ func TestNewStoreUnsupported(t *testing.T) { ) assert.Error(t, err) assert.Nil(t, kv) + assert.Equal(t, "Backend storage not supported yet, please choose one of ", err.Error()) } diff --git a/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go b/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go index 415161b063..76762ce54a 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/docker/libkv" "github.com/docker/libkv/store" api "github.com/hashicorp/consul/api" ) @@ -29,15 +30,19 @@ var ( // Store interface type Consul struct { sync.Mutex - config *api.Config - client *api.Client - ephemeralTTL time.Duration + config *api.Config + client *api.Client } type consulLock struct { lock *api.Lock } +// Register registers consul to libkv +func Register() { + libkv.AddStore(store.CONSUL, New) +} + // New creates a new Consul client given a list // of endpoints and optional tls config func New(endpoints []string, options *store.Config) (store.Store, error) { @@ -62,9 +67,6 @@ func New(endpoints []string, options *store.Config) (store.Store, error) { if options.ConnectionTimeout != 0 { s.setTimeout(options.ConnectionTimeout) } - if options.EphemeralTTL != 0 { - s.setEphemeralTTL(options.EphemeralTTL) - } } // Creates a new client @@ -90,18 +92,13 @@ func (s *Consul) setTimeout(time time.Duration) { s.config.WaitTime = time } -// SetEphemeralTTL sets the ttl for ephemeral nodes -func (s *Consul) setEphemeralTTL(ttl time.Duration) { - s.ephemeralTTL = ttl -} - // Normalize the key for usage in Consul func (s *Consul) normalize(key string) string { key = store.Normalize(key) return strings.TrimPrefix(key, "/") } -func (s *Consul) refreshSession(pair *api.KVPair) error { +func (s *Consul) refreshSession(pair *api.KVPair, ttl time.Duration) error { // Check if there is any previous session with an active TTL session, err := s.getActiveSession(pair.Key) if err != nil { @@ -110,9 +107,9 @@ func (s *Consul) refreshSession(pair *api.KVPair) error { if session == "" { entry := &api.SessionEntry{ - Behavior: api.SessionBehaviorDelete, // Delete the key when the session expires - TTL: ((s.ephemeralTTL) / 2).String(), // Consul multiplies the TTL by 2x - LockDelay: 1 * time.Millisecond, // Virtually disable lock delay + Behavior: api.SessionBehaviorDelete, // Delete the key when the session expires + TTL: (ttl / 2).String(), // Consul multiplies the TTL by 2x + LockDelay: 1 * time.Millisecond, // Virtually disable lock delay } // Create the key session @@ -137,7 +134,7 @@ func (s *Consul) refreshSession(pair *api.KVPair) error { _, _, err = s.client.Session().Renew(session, nil) if err != nil { - return s.refreshSession(pair) + return s.refreshSession(pair, ttl) } return nil } @@ -185,9 +182,9 @@ func (s *Consul) Put(key string, value []byte, opts *store.WriteOptions) error { Value: value, } - if opts != nil && opts.Ephemeral { + if opts != nil && opts.TTL > 0 { // Create or refresh the session - err := s.refreshSession(p) + err := s.refreshSession(p, opts.TTL) if err != nil { return err } @@ -199,6 +196,9 @@ func (s *Consul) Put(key string, value []byte, opts *store.WriteOptions) error { // Delete a value at "key" func (s *Consul) Delete(key string) error { + if _, err := s.Get(key); err != nil { + return err + } _, err := s.client.KV().Delete(s.normalize(key), nil) return err } @@ -206,7 +206,10 @@ func (s *Consul) Delete(key string) error { // Exists checks that the key exists inside the store func (s *Consul) Exists(key string) (bool, error) { _, err := s.Get(key) - if err != nil && err == store.ErrKeyNotFound { + if err != nil { + if err == store.ErrKeyNotFound { + return false, nil + } return false, err } return true, nil @@ -240,6 +243,9 @@ func (s *Consul) List(directory string) ([]*store.KVPair, error) { // DeleteTree deletes a range of keys under a given directory func (s *Consul) DeleteTree(directory string) error { + if _, err := s.List(directory); err != nil { + return err + } _, err := s.client.KV().DeleteTree(s.normalize(directory), nil) return err } diff --git a/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul_test.go b/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul_test.go index 26e20bb471..a15bf9e91d 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul_test.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul_test.go @@ -4,19 +4,22 @@ import ( "testing" "time" + "github.com/docker/libkv" "github.com/docker/libkv/store" "github.com/docker/libkv/testutils" "github.com/stretchr/testify/assert" ) +var ( + client = "localhost:8500" +) + func makeConsulClient(t *testing.T) store.Store { - client := "localhost:8500" kv, err := New( []string{client}, &store.Config{ ConnectionTimeout: 3 * time.Second, - EphemeralTTL: 2 * time.Second, }, ) @@ -27,11 +30,28 @@ func makeConsulClient(t *testing.T) store.Store { return kv } +func TestRegister(t *testing.T) { + Register() + + kv, err := libkv.NewStore(store.CONSUL, []string{client}, nil) + assert.NoError(t, err) + assert.NotNil(t, kv) + + if _, ok := kv.(*Consul); !ok { + t.Fatal("Error registering and initializing consul") + } +} + func TestConsulStore(t *testing.T) { kv := makeConsulClient(t) backup := makeConsulClient(t) - testutils.RunTestStore(t, kv, backup) + testutils.RunTestCommon(t, kv) + testutils.RunTestAtomic(t, kv) + testutils.RunTestWatch(t, kv) + testutils.RunTestLock(t, kv) + testutils.RunTestTTL(t, kv, backup) + testutils.RunCleanup(t, kv) } func TestGetActiveSession(t *testing.T) { @@ -43,7 +63,7 @@ func TestGetActiveSession(t *testing.T) { value := []byte("bar") // Put the first key with the Ephemeral flag - err := kv.Put(key, value, &store.WriteOptions{Ephemeral: true}) + err := kv.Put(key, value, &store.WriteOptions{TTL: 2 * time.Second}) assert.NoError(t, err) // Session should not be empty diff --git a/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go b/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go index c2d4aad147..92c239e460 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go @@ -8,14 +8,14 @@ import ( "time" etcd "github.com/coreos/go-etcd/etcd" + "github.com/docker/libkv" "github.com/docker/libkv/store" ) // Etcd is the receiver type for the // Store interface type Etcd struct { - client *etcd.Client - ephemeralTTL time.Duration + client *etcd.Client } type etcdLock struct { @@ -33,6 +33,11 @@ const ( defaultUpdateTime = 5 * time.Second ) +// Register registers etcd to libkv +func Register() { + libkv.AddStore(store.ETCD, New) +} + // New creates a new Etcd client given a list // of endpoints and an optional tls config func New(addrs []string, options *store.Config) (store.Store, error) { @@ -49,9 +54,6 @@ func New(addrs []string, options *store.Config) (store.Store, error) { if options.ConnectionTimeout != 0 { s.setTimeout(options.ConnectionTimeout) } - if options.EphemeralTTL != 0 { - s.setEphemeralTTL(options.EphemeralTTL) - } } // Periodic SyncCluster @@ -93,12 +95,6 @@ func (s *Etcd) setTimeout(time time.Duration) { s.client.SetDialTimeout(time) } -// setEphemeralHeartbeat sets the heartbeat value to notify -// that a node is alive -func (s *Etcd) setEphemeralTTL(time time.Duration) { - s.ephemeralTTL = time -} - // createDirectory creates the entire path for a directory // that does not exist func (s *Etcd) createDirectory(path string) error { @@ -120,11 +116,8 @@ func (s *Etcd) createDirectory(path string) error { func (s *Etcd) Get(key string) (pair *store.KVPair, err error) { result, err := s.client.Get(store.Normalize(key), false, false) if err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - // Not a Directory or Not a file - if etcdError.ErrorCode == 100 || etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 { - return nil, store.ErrKeyNotFound - } + if isKeyNotFoundError(err) { + return nil, store.ErrKeyNotFound } return nil, err } @@ -143,8 +136,8 @@ func (s *Etcd) Put(key string, value []byte, opts *store.WriteOptions) error { // Default TTL = 0 means no expiration var ttl uint64 - if opts != nil && opts.Ephemeral { - ttl = uint64(s.ephemeralTTL.Seconds()) + if opts != nil && opts.TTL > 0 { + ttl = uint64(opts.TTL.Seconds()) } if _, err := s.client.Set(key, string(value), ttl); err != nil { @@ -173,14 +166,17 @@ func (s *Etcd) Put(key string, value []byte, opts *store.WriteOptions) error { // Delete a value at "key" func (s *Etcd) Delete(key string) error { _, err := s.client.Delete(store.Normalize(key), false) + if isKeyNotFoundError(err) { + return store.ErrKeyNotFound + } return err } // Exists checks if the key exists inside the store func (s *Etcd) Exists(key string) (bool, error) { - entry, err := s.Get(key) - if err != nil && entry != nil { - if err == store.ErrKeyNotFound || entry.Value == nil { + _, err := s.Get(key) + if err != nil { + if err == store.ErrKeyNotFound { return false, nil } return false, err @@ -359,6 +355,9 @@ func (s *Etcd) AtomicDelete(key string, previous *store.KVPair) (bool, error) { func (s *Etcd) List(directory string) ([]*store.KVPair, error) { resp, err := s.client.Get(store.Normalize(directory), true, true) if err != nil { + if isKeyNotFoundError(err) { + return nil, store.ErrKeyNotFound + } return nil, err } kv := []*store.KVPair{} @@ -376,6 +375,9 @@ func (s *Etcd) List(directory string) ([]*store.KVPair, error) { // DeleteTree deletes a range of keys under a given directory func (s *Etcd) DeleteTree(directory string) error { _, err := s.client.Delete(store.Normalize(directory), true) + if isKeyNotFoundError(err) { + return store.ErrKeyNotFound + } return err } @@ -507,3 +509,15 @@ func (l *etcdLock) Unlock() error { func (s *Etcd) Close() { return } + +func isKeyNotFoundError(err error) bool { + if err != nil { + if etcdError, ok := err.(*etcd.EtcdError); ok { + // Not a Directory or Not a file + if etcdError.ErrorCode == 100 || etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 { + return true + } + } + } + return false +} diff --git a/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd_test.go b/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd_test.go index 73d1b06f97..3f79ce09f7 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd_test.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd_test.go @@ -4,18 +4,21 @@ import ( "testing" "time" + "github.com/docker/libkv" "github.com/docker/libkv/store" "github.com/docker/libkv/testutils" + "github.com/stretchr/testify/assert" +) + +var ( + client = "localhost:4001" ) func makeEtcdClient(t *testing.T) store.Store { - client := "localhost:4001" - kv, err := New( []string{client}, &store.Config{ ConnectionTimeout: 3 * time.Second, - EphemeralTTL: 2 * time.Second, }, ) @@ -26,9 +29,26 @@ func makeEtcdClient(t *testing.T) store.Store { return kv } +func TestRegister(t *testing.T) { + Register() + + kv, err := libkv.NewStore(store.ETCD, []string{client}, nil) + assert.NoError(t, err) + assert.NotNil(t, kv) + + if _, ok := kv.(*Etcd); !ok { + t.Fatal("Error registering and initializing etcd") + } +} + func TestEtcdStore(t *testing.T) { kv := makeEtcdClient(t) backup := makeEtcdClient(t) - testutils.RunTestStore(t, kv, backup) + testutils.RunTestCommon(t, kv) + testutils.RunTestAtomic(t, kv) + testutils.RunTestWatch(t, kv) + testutils.RunTestLock(t, kv) + testutils.RunTestTTL(t, kv, backup) + testutils.RunCleanup(t, kv) } diff --git a/Godeps/_workspace/src/github.com/docker/libkv/store/store.go b/Godeps/_workspace/src/github.com/docker/libkv/store/store.go index 527cb4a69d..352edaa56a 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/store/store.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/store/store.go @@ -16,11 +16,13 @@ const ( ETCD Backend = "etcd" // ZK backend ZK Backend = "zk" + // BOLTDB backend + BOLTDB Backend = "boltdb" ) var ( // ErrNotSupported is thrown when the backend k/v store is not supported by libkv - ErrNotSupported = errors.New("Backend storage not supported yet, please choose another one") + ErrNotSupported = errors.New("Backend storage not supported yet, please choose one of") // ErrNotImplemented is thrown when a method is not implemented by the current backend ErrNotImplemented = errors.New("Call not implemented in current backend") // ErrNotReachable is thrown when the API cannot be reached for issuing common store operations @@ -39,7 +41,7 @@ var ( type Config struct { TLS *tls.Config ConnectionTimeout time.Duration - EphemeralTTL time.Duration + Bucket string } // Store represents the backend K/V storage @@ -63,10 +65,10 @@ type Store interface { Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) // WatchTree watches for changes on child nodes under - // a given a directory + // a given directory WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*KVPair, error) - // CreateLock for a given key. + // NewLock creates a lock for a given key. // The returned Locker is not held and must be acquired // with `.Lock`. The Value is optional. NewLock(key string, options *LockOptions) (Locker, error) @@ -97,8 +99,7 @@ type KVPair struct { // WriteOptions contains optional request parameters type WriteOptions struct { - Heartbeat time.Duration - Ephemeral bool + TTL time.Duration } // LockOptions contains optional request parameters diff --git a/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go b/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go index 2689982f84..ff6b481947 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go @@ -4,14 +4,12 @@ import ( "strings" "time" + "github.com/docker/libkv" "github.com/docker/libkv/store" zk "github.com/samuel/go-zookeeper/zk" ) const ( - // SOH control character - SOH = "\x01" - defaultTimeout = 10 * time.Second ) @@ -29,6 +27,11 @@ type zookeeperLock struct { value []byte } +// Register registers zookeeper to libkv +func Register() { + libkv.AddStore(store.ZK, New) +} + // New creates a new Zookeeper client given a // list of endpoints and an optional tls config func New(endpoints []string, options *store.Config) (store.Store, error) { @@ -60,22 +63,15 @@ func (s *Zookeeper) setTimeout(time time.Duration) { // Get the value at "key", returns the last modified index // to use in conjunction to Atomic calls func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) { - resp, meta, err := s.client.Get(store.Normalize(key)) + resp, meta, err := s.client.Get(s.normalize(key)) + if err != nil { + if err == zk.ErrNoNode { + return nil, store.ErrKeyNotFound + } return nil, err } - // If resp is nil, the key does not exist - if resp == nil { - return nil, store.ErrKeyNotFound - } - - // FIXME handle very rare cases where Get returns the - // SOH control character instead of the actual value - if string(resp) == SOH { - return s.Get(store.Normalize(key)) - } - pair = &store.KVPair{ Key: key, Value: resp, @@ -91,10 +87,10 @@ func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error { for i := 1; i <= len(path); i++ { newpath := "/" + strings.Join(path[:i], "/") if i == len(path) && ephemeral { - _, err := s.client.Create(newpath, []byte{1}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + _, err := s.client.Create(newpath, []byte{}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) return err } - _, err := s.client.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll)) + _, err := s.client.Create(newpath, []byte{}, 0, zk.WorldACL(zk.PermAll)) if err != nil { // Skip if node already exists if err != zk.ErrNodeExists { @@ -107,7 +103,7 @@ func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error { // Put a value at "key" func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) error { - fkey := store.Normalize(key) + fkey := s.normalize(key) exists, err := s.Exists(key) if err != nil { @@ -115,10 +111,10 @@ func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) erro } if !exists { - if opts != nil && opts.Ephemeral { - s.createFullPath(store.SplitKey(key), opts.Ephemeral) + if opts != nil && opts.TTL > 0 { + s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), true) } else { - s.createFullPath(store.SplitKey(key), false) + s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), false) } } @@ -128,13 +124,16 @@ func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) erro // Delete a value at "key" func (s *Zookeeper) Delete(key string) error { - err := s.client.Delete(store.Normalize(key), -1) + err := s.client.Delete(s.normalize(key), -1) + if err == zk.ErrNoNode { + return store.ErrKeyNotFound + } return err } // Exists checks if the key exists inside the store func (s *Zookeeper) Exists(key string) (bool, error) { - exists, _, err := s.client.Exists(store.Normalize(key)) + exists, _, err := s.client.Exists(s.normalize(key)) if err != nil { return false, err } @@ -162,7 +161,7 @@ func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVP // to listening to any event that may occur on that key watchCh <- pair for { - _, _, eventCh, err := s.client.GetW(store.Normalize(key)) + _, _, eventCh, err := s.client.GetW(s.normalize(key)) if err != nil { return } @@ -206,7 +205,7 @@ func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan watchCh <- entries for { - _, _, eventCh, err := s.client.ChildrenW(store.Normalize(directory)) + _, _, eventCh, err := s.client.ChildrenW(s.normalize(directory)) if err != nil { return } @@ -229,8 +228,11 @@ func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan // List child nodes of a given directory func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) { - keys, stat, err := s.client.Children(store.Normalize(directory)) + keys, stat, err := s.client.Children(s.normalize(directory)) if err != nil { + if err == zk.ErrNoNode { + return nil, store.ErrKeyNotFound + } return nil, err } @@ -238,7 +240,7 @@ func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) { // FIXME Costly Get request for each child key.. for _, key := range keys { - pair, err := s.Get(directory + store.Normalize(key)) + pair, err := s.Get(strings.TrimSuffix(directory, "/") + s.normalize(key)) if err != nil { // If node is not found: List is out of date, retry if err == zk.ErrNoNode { @@ -268,7 +270,7 @@ func (s *Zookeeper) DeleteTree(directory string) error { for _, pair := range pairs { reqs = append(reqs, &zk.DeleteRequest{ - Path: store.Normalize(directory + "/" + pair.Key), + Path: s.normalize(directory + "/" + pair.Key), Version: -1, }) } @@ -283,7 +285,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, var lastIndex uint64 if previous != nil { - meta, err := s.client.Set(store.Normalize(key), value, int32(previous.LastIndex)) + meta, err := s.client.Set(s.normalize(key), value, int32(previous.LastIndex)) if err != nil { // Compare Failed if err == zk.ErrBadVersion { @@ -294,7 +296,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, lastIndex = uint64(meta.Version) } else { // Interpret previous == nil as create operation. - _, err := s.client.Create(store.Normalize(key), value, 0, zk.WorldACL(zk.PermAll)) + _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll)) if err != nil { // Zookeeper will complain if the directory doesn't exist. if err == zk.ErrNoNode { @@ -305,7 +307,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, // Failed to create the directory. return false, nil, err } - if _, err := s.client.Create(store.Normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil { + if _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil { return false, nil, err } @@ -334,7 +336,7 @@ func (s *Zookeeper) AtomicDelete(key string, previous *store.KVPair) (bool, erro return false, store.ErrPreviousNotSpecified } - err := s.client.Delete(store.Normalize(key), int32(previous.LastIndex)) + err := s.client.Delete(s.normalize(key), int32(previous.LastIndex)) if err != nil { if err == zk.ErrBadVersion { return false, store.ErrKeyModified @@ -358,9 +360,9 @@ func (s *Zookeeper) NewLock(key string, options *store.LockOptions) (lock store. lock = &zookeeperLock{ client: s.client, - key: store.Normalize(key), + key: s.normalize(key), value: value, - lock: zk.NewLock(s.client, store.Normalize(key), zk.WorldACL(zk.PermAll)), + lock: zk.NewLock(s.client, s.normalize(key), zk.WorldACL(zk.PermAll)), } return lock, err @@ -392,3 +394,9 @@ func (l *zookeeperLock) Unlock() error { func (s *Zookeeper) Close() { s.client.Close() } + +// Normalize the key for usage in Zookeeper +func (s *Zookeeper) normalize(key string) string { + key = store.Normalize(key) + return strings.TrimSuffix(key, "/") +} diff --git a/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper_test.go b/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper_test.go index 759297e542..739c2ba619 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper_test.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper_test.go @@ -4,18 +4,21 @@ import ( "testing" "time" + "github.com/docker/libkv" "github.com/docker/libkv/store" "github.com/docker/libkv/testutils" + "github.com/stretchr/testify/assert" +) + +var ( + client = "localhost:2181" ) func makeZkClient(t *testing.T) store.Store { - client := "localhost:2181" - kv, err := New( []string{client}, &store.Config{ ConnectionTimeout: 3 * time.Second, - EphemeralTTL: 2 * time.Second, }, ) @@ -26,9 +29,26 @@ func makeZkClient(t *testing.T) store.Store { return kv } +func TestRegister(t *testing.T) { + Register() + + kv, err := libkv.NewStore(store.ZK, []string{client}, nil) + assert.NoError(t, err) + assert.NotNil(t, kv) + + if _, ok := kv.(*Zookeeper); !ok { + t.Fatal("Error registering and initializing zookeeper") + } +} + func TestZkStore(t *testing.T) { kv := makeZkClient(t) backup := makeZkClient(t) - testutils.RunTestStore(t, kv, backup) + testutils.RunTestCommon(t, kv) + testutils.RunTestAtomic(t, kv) + testutils.RunTestWatch(t, kv) + testutils.RunTestLock(t, kv) + testutils.RunTestTTL(t, kv, backup) + testutils.RunCleanup(t, kv) } diff --git a/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go b/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go index d1ddc0ff22..bb8f26336d 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go @@ -1,6 +1,7 @@ package testutils import ( + "fmt" "testing" "time" @@ -8,50 +9,89 @@ import ( "github.com/stretchr/testify/assert" ) -// RunTestStore is an helper testing method that is -// called by each K/V backend sub-package testing -func RunTestStore(t *testing.T, kv store.Store, backup store.Store) { - testPutGetDelete(t, kv) - testWatch(t, kv) - testWatchTree(t, kv) - testAtomicPut(t, kv) - testAtomicPutCreate(t, kv) - testAtomicDelete(t, kv) - testLockUnlock(t, kv) - testPutEphemeral(t, kv, backup) +// RunTestCommon tests the minimal required APIs which +// should be supported by all K/V backends +func RunTestCommon(t *testing.T, kv store.Store) { + testPutGetDeleteExists(t, kv) testList(t, kv) testDeleteTree(t, kv) } -func testPutGetDelete(t *testing.T, kv store.Store) { - key := "foo" +// RunTestAtomic tests the Atomic operations by the K/V +// backends +func RunTestAtomic(t *testing.T, kv store.Store) { + testAtomicPut(t, kv) + testAtomicPutCreate(t, kv) + testAtomicDelete(t, kv) +} + +// RunTestWatch tests the watch/monitor APIs supported +// by the K/V backends. +func RunTestWatch(t *testing.T, kv store.Store) { + testWatch(t, kv) + testWatchTree(t, kv) +} + +// RunTestLock tests the KV pair Lock/Unlock APIs supported +// by the K/V backends. +func RunTestLock(t *testing.T, kv store.Store) { + testLockUnlock(t, kv) +} + +// RunTestTTL tests the TTL funtionality of the K/V backend. +func RunTestTTL(t *testing.T, kv store.Store, backup store.Store) { + testPutTTL(t, kv, backup) +} + +func testPutGetDeleteExists(t *testing.T, kv store.Store) { + // Get a not exist key should return ErrKeyNotFound + pair, err := kv.Get("/testPutGetDelete_not_exist_key") + assert.Equal(t, store.ErrKeyNotFound, err) + value := []byte("bar") + for _, key := range []string{ + "testPutGetDeleteExists", + "testPutGetDeleteExists/", + "testPutGetDeleteExists/testbar/", + "testPutGetDeleteExists/testbar/testfoobar", + } { + failMsg := fmt.Sprintf("Fail key %s", key) + // Put the key + err = kv.Put(key, value, nil) + assert.NoError(t, err, failMsg) - // Put the key - err := kv.Put(key, value, nil) - assert.NoError(t, err) + // Get should return the value and an incremented index + pair, err = kv.Get(key) + assert.NoError(t, err, failMsg) + if assert.NotNil(t, pair, failMsg) { + assert.NotNil(t, pair.Value, failMsg) + } + assert.Equal(t, pair.Value, value, failMsg) + assert.NotEqual(t, pair.LastIndex, 0, failMsg) - // Get should return the value and an incremented index - pair, err := kv.Get(key) - assert.NoError(t, err) - if assert.NotNil(t, pair) { - assert.NotNil(t, pair.Value) + // Exists should return true + exists, err := kv.Exists(key) + assert.NoError(t, err, failMsg) + assert.True(t, exists, failMsg) + + // Delete the key + err = kv.Delete(key) + assert.NoError(t, err, failMsg) + + // Get should fail + pair, err = kv.Get(key) + assert.Error(t, err, failMsg) + assert.Nil(t, pair, failMsg) + + // Exists should return false + exists, err = kv.Exists(key) + assert.NoError(t, err, failMsg) + assert.False(t, exists, failMsg) } - assert.Equal(t, pair.Value, value) - assert.NotEqual(t, pair.LastIndex, 0) - - // Delete the key - err = kv.Delete(key) - assert.NoError(t, err) - - // Get should fail - pair, err = kv.Get(key) - assert.Error(t, err) - assert.Nil(t, pair) } func testWatch(t *testing.T, kv store.Store) { - key := "hello" + key := "testWatch" value := []byte("world") newValue := []byte("world!") @@ -108,15 +148,15 @@ func testWatch(t *testing.T, kv store.Store) { } func testWatchTree(t *testing.T, kv store.Store) { - dir := "tree" + dir := "testWatchTree" - node1 := "tree/node1" + node1 := "testWatchTree/node1" value1 := []byte("node1") - node2 := "tree/node2" + node2 := "testWatchTree/node2" value2 := []byte("node2") - node3 := "tree/node3" + node3 := "testWatchTree/node3" value3 := []byte("node3") err := kv.Put(node1, value1, nil) @@ -162,7 +202,7 @@ func testWatchTree(t *testing.T, kv store.Store) { } func testAtomicPut(t *testing.T, kv store.Store) { - key := "hello" + key := "testAtomicPut" value := []byte("world") // Put the key @@ -179,18 +219,18 @@ func testAtomicPut(t *testing.T, kv store.Store) { assert.NotEqual(t, pair.LastIndex, 0) // This CAS should fail: previous exists. - success, _, err := kv.AtomicPut("hello", []byte("WORLD"), nil, nil) + success, _, err := kv.AtomicPut(key, []byte("WORLD"), nil, nil) assert.Error(t, err) assert.False(t, success) // This CAS should succeed - success, _, err = kv.AtomicPut("hello", []byte("WORLD"), pair, nil) + success, _, err = kv.AtomicPut(key, []byte("WORLD"), pair, nil) assert.NoError(t, err) assert.True(t, success) // This CAS should fail, key exists. pair.LastIndex = 0 - success, _, err = kv.AtomicPut("hello", []byte("WORLDWORLD"), pair, nil) + success, _, err = kv.AtomicPut(key, []byte("WORLDWORLD"), pair, nil) assert.Error(t, err) assert.False(t, success) } @@ -198,7 +238,7 @@ func testAtomicPut(t *testing.T, kv store.Store) { func testAtomicPutCreate(t *testing.T, kv store.Store) { // Use a key in a new directory to ensure Stores will create directories // that don't yet exist. - key := "put/create" + key := "testAtomicPutCreate/create" value := []byte("putcreate") // AtomicPut the key, previous = nil indicates create. @@ -223,14 +263,10 @@ func testAtomicPutCreate(t *testing.T, kv store.Store) { success, _, err = kv.AtomicPut(key, []byte("PUTCREATE"), pair, nil) assert.NoError(t, err) assert.True(t, success) - - // Delete the key, ensures runs of the test don't interfere with each other. - err = kv.DeleteTree("put") - assert.NoError(t, err) } func testAtomicDelete(t *testing.T, kv store.Store) { - key := "atomic" + key := "testAtomicDelete" value := []byte("world") // Put the key @@ -262,11 +298,11 @@ func testAtomicDelete(t *testing.T, kv store.Store) { } func testLockUnlock(t *testing.T, kv store.Store) { - key := "foo" + key := "testLockUnlock" value := []byte("bar") // We should be able to create a new lock on key - lock, err := kv.NewLock(key, &store.LockOptions{Value: value}) + lock, err := kv.NewLock(key, &store.LockOptions{Value: value, TTL: 2 * time.Second}) assert.NoError(t, err) assert.NotNil(t, lock) @@ -303,19 +339,19 @@ func testLockUnlock(t *testing.T, kv store.Store) { assert.NotEqual(t, pair.LastIndex, 0) } -func testPutEphemeral(t *testing.T, kv store.Store, otherConn store.Store) { - firstKey := "first" +func testPutTTL(t *testing.T, kv store.Store, otherConn store.Store) { + firstKey := "testPutTTL" firstValue := []byte("foo") secondKey := "second" secondValue := []byte("bar") // Put the first key with the Ephemeral flag - err := otherConn.Put(firstKey, firstValue, &store.WriteOptions{Ephemeral: true}) + err := otherConn.Put(firstKey, firstValue, &store.WriteOptions{TTL: 2 * time.Second}) assert.NoError(t, err) // Put a second key with the Ephemeral flag - err = otherConn.Put(secondKey, secondValue, &store.WriteOptions{Ephemeral: true}) + err = otherConn.Put(secondKey, secondValue, &store.WriteOptions{TTL: 2 * time.Second}) assert.NoError(t, err) // Get on firstKey should work @@ -346,12 +382,12 @@ func testPutEphemeral(t *testing.T, kv store.Store, otherConn store.Store) { } func testList(t *testing.T, kv store.Store) { - prefix := "nodes" + prefix := "testList" - firstKey := "nodes/first" + firstKey := "testList/first" firstValue := []byte("first") - secondKey := "nodes/second" + secondKey := "testList/second" secondValue := []byte("second") // Put the first key @@ -363,35 +399,37 @@ func testList(t *testing.T, kv store.Store) { assert.NoError(t, err) // List should work and return the two correct values - pairs, err := kv.List(prefix) - assert.NoError(t, err) - if assert.NotNil(t, pairs) { - assert.Equal(t, len(pairs), 2) - } - - // Check pairs, those are not necessarily in Put order - for _, pair := range pairs { - if pair.Key == firstKey { - assert.Equal(t, pair.Value, firstValue) + for _, parent := range []string{prefix, prefix + "/"} { + pairs, err := kv.List(parent) + assert.NoError(t, err) + if assert.NotNil(t, pairs) { + assert.Equal(t, len(pairs), 2) } - if pair.Key == secondKey { - assert.Equal(t, pair.Value, secondValue) + + // Check pairs, those are not necessarily in Put order + for _, pair := range pairs { + if pair.Key == firstKey { + assert.Equal(t, pair.Value, firstValue) + } + if pair.Key == secondKey { + assert.Equal(t, pair.Value, secondValue) + } } } // List should fail: the key does not exist - pairs, err = kv.List("idontexist") - assert.Error(t, err) + pairs, err := kv.List("idontexist") + assert.Equal(t, store.ErrKeyNotFound, err) assert.Nil(t, pairs) } func testDeleteTree(t *testing.T, kv store.Store) { - prefix := "nodes" + prefix := "testDeleteTree" - firstKey := "nodes/first" + firstKey := "testDeleteTree/first" firstValue := []byte("first") - secondKey := "nodes/second" + secondKey := "testDeleteTree/second" secondValue := []byte("second") // Put the first key @@ -433,3 +471,24 @@ func testDeleteTree(t *testing.T, kv store.Store) { assert.Error(t, err) assert.Nil(t, pair) } + +// RunCleanup cleans up keys introduced by the tests +func RunCleanup(t *testing.T, kv store.Store) { + for _, key := range []string{ + "testPutGetDeleteExists", + "testWatch", + "testWatchTree", + "testAtomicPut", + "testAtomicPutCreate", + "testAtomicDelete", + "testLockUnlock", + "testPutTTL", + "testList", + "testDeleteTree", + } { + err := kv.DeleteTree(key) + assert.True(t, err == nil || err == store.ErrKeyNotFound, fmt.Sprintf("failed to delete tree key %s: %v", key, err)) + err = kv.Delete(key) + assert.True(t, err == nil || err == store.ErrKeyNotFound, fmt.Sprintf("failed to delete key %s: %v", key, err)) + } +} diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go index 44e8d58d16..6e29d224a5 100644 --- a/discovery/kv/kv.go +++ b/discovery/kv/kv.go @@ -9,6 +9,9 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/libkv" "github.com/docker/libkv/store" + "github.com/docker/libkv/store/consul" + "github.com/docker/libkv/store/etcd" + "github.com/docker/libkv/store/zookeeper" "github.com/docker/swarm/discovery" ) @@ -32,6 +35,12 @@ func init() { // Init is exported func Init() { + // Register to libkv + zookeeper.Register() + consul.Register() + etcd.Register() + + // Register to internal Swarm discovery service discovery.Register("zk", &Discovery{backend: store.ZK}) discovery.Register("consul", &Discovery{backend: store.CONSUL}) discovery.Register("etcd", &Discovery{backend: store.ETCD}) @@ -56,14 +65,7 @@ func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Du // Creates a new store, will ignore options given // if not supported by the chosen store - s.store, err = libkv.NewStore( - s.backend, - addrs, - &store.Config{ - EphemeralTTL: s.ttl, - }, - ) - + s.store, err = libkv.NewStore(s.backend, addrs, &store.Config{}) return err } @@ -126,12 +128,13 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c time.Sleep(s.heartbeat) } }() + return ch, errCh } // Register is exported func (s *Discovery) Register(addr string) error { - opts := &store.WriteOptions{Ephemeral: true, Heartbeat: s.heartbeat} + opts := &store.WriteOptions{TTL: s.ttl} return s.store.Put(path.Join(s.path, addr), []byte(addr), opts) }