diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index d91f556ff2..01eebb02a7 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -52,7 +52,7 @@ }, { "ImportPath": "github.com/docker/libkv", - "Rev": "e201296a473f7e796488b8bf82629f28ca13634b" + "Rev": "82ad407bbfaf1ef435a2ebabc498a9afb128ed37" }, { "ImportPath": "github.com/gogo/protobuf/proto", diff --git a/Godeps/_workspace/src/github.com/docker/libkv/README.md b/Godeps/_workspace/src/github.com/docker/libkv/README.md index d09b34c1ea..6cc4ae9b18 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/README.md +++ b/Godeps/_workspace/src/github.com/docker/libkv/README.md @@ -34,7 +34,7 @@ func main() { client := "localhost:8500" // Initialize a new store with consul - kv, err = libkv.NewStore( + kv, err := libkv.NewStore( store.CONSUL, // or "consul" []string{client}, &store.Config{ diff --git a/Godeps/_workspace/src/github.com/docker/libkv/libkv.go b/Godeps/_workspace/src/github.com/docker/libkv/libkv.go index 28df703afc..9035f64f3c 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/libkv.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/libkv.go @@ -1,3 +1,64 @@ +// Package libkv provides a Go native library to store metadata. +// +// The goal of libkv is to abstract common store operations for multiple +// Key/Value backends and offer the same experience no matter which one of the +// backend you want to use. +// +// For example, you can use it to store your metadata or for service discovery to +// register machines and endpoints inside your cluster. +// +// As of now, `libkv` offers support for `Consul`, `Etcd` and `Zookeeper`. +// +// ## Example of usage +// +// ### Create a new store and use Put/Get +// +// +// package main +// +// import ( +// "fmt" +// "time" +// +// "github.com/docker/libkv" +// "github.com/docker/libkv/store" +// log "github.com/Sirupsen/logrus" +// ) +// +// func main() { +// client := "localhost:8500" +// +// // Initialize a new store with consul +// kv, err := libkv.NewStore( +// store.CONSUL, // or "consul" +// []string{client}, +// &store.Config{ +// ConnectionTimeout: 10*time.Second, +// }, +// ) +// if err != nil { +// log.Fatal("Cannot create store consul") +// } +// +// key := "foo" +// err = kv.Put(key, []byte("bar"), nil) +// if err != nil { +// log.Error("Error trying to put value at key `", key, "`") +// } +// +// pair, err := kv.Get(key) +// if err != nil { +// log.Error("Error trying accessing value at key `", key, "`") +// } +// +// log.Info("value: ", string(pair.Value)) +// } +// +// ##Copyright and license +// +// Code and documentation copyright 2015 Docker, inc. Code released under the +// Apache 2.0 license. Docs released under Creative commons. +// package libkv import ( diff --git a/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go b/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go index 8eb8fed8e3..ebabde5ffa 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go @@ -375,11 +375,16 @@ func (l *consulLock) Unlock() error { // AtomicPut put a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case func (s *Consul) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) { + + p := &api.KVPair{Key: s.normalize(key), Value: value} + if previous == nil { - return false, nil, store.ErrPreviousNotSpecified + // Consul interprets ModifyIndex = 0 as new key. + p.ModifyIndex = 0 + } else { + p.ModifyIndex = previous.LastIndex } - p := &api.KVPair{Key: s.normalize(key), Value: value, ModifyIndex: previous.LastIndex} if work, _, err := s.client.KV().CAS(p, nil); err != nil { return false, nil, err } else if !work { diff --git a/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go b/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go index f58f6d9d7d..58f6e273ef 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go @@ -218,6 +218,11 @@ func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, for { select { case result := <-etcdWatchCh: + if result == nil || result.Node == nil { + // Something went wrong, exit + // No need to stop the chan as the watch already ended + return + } watchCh <- &store.KVPair{ Key: key, Value: []byte(result.Node.Value), @@ -260,7 +265,12 @@ func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*st for { select { - case <-etcdWatchCh: + case event := <-etcdWatchCh: + if event == nil { + // Something went wrong, exit + // No need to stop the chan as the watch already ended + return + } // FIXME: We should probably use the value pushed by the channel. // However, Node.Nodes seems to be empty. if list, err := s.List(directory); err == nil { @@ -278,11 +288,32 @@ func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*st // AtomicPut put a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case func (s *Etcd) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) { - if previous == nil { - return false, nil, store.ErrPreviousNotSpecified - } - meta, err := s.client.CompareAndSwap(store.Normalize(key), string(value), 0, "", previous.LastIndex) + var meta *etcd.Response + var err error + if previous != nil { + meta, err = s.client.CompareAndSwap(store.Normalize(key), string(value), 0, "", previous.LastIndex) + } else { + // Interpret previous == nil as Atomic Create + meta, err = s.client.Create(store.Normalize(key), string(value), 0) + if etcdError, ok := err.(*etcd.EtcdError); ok { + + // Directory doesn't exist. + if etcdError.ErrorCode == 104 { + // Remove the last element (the actual key) + // and create the full directory path + err = s.createDirectory(store.GetDirectory(key)) + if err != nil { + return false, nil, err + } + + // Now that the directory is created, create the key + if _, err := s.client.Create(key, string(value), 0); err != nil { + return false, nil, err + } + } + } + } if err != nil { if etcdError, ok := err.(*etcd.EtcdError); ok { // Compare Failed diff --git a/Godeps/_workspace/src/github.com/docker/libkv/store/store.go b/Godeps/_workspace/src/github.com/docker/libkv/store/store.go index 61ddb0777c..49e2eb9dcd 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/store/store.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/store/store.go @@ -77,7 +77,8 @@ type Store interface { // DeleteTree deletes a range of keys under a given directory DeleteTree(directory string) error - // Atomic operation on a single value + // Atomic CAS operation on a single value. + // Pass previous = nil to create a new key. AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error) // Atomic delete of a single value diff --git a/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go b/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go index 792a916c1e..d12edf6792 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go @@ -265,23 +265,47 @@ func (s *Zookeeper) DeleteTree(directory string) error { // AtomicPut put a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, _ *store.WriteOptions) (bool, *store.KVPair, error) { - if previous == nil { - return false, nil, store.ErrPreviousNotSpecified - } - meta, err := s.client.Set(store.Normalize(key), value, int32(previous.LastIndex)) - if err != nil { - // Compare Failed - if err == zk.ErrBadVersion { - return false, nil, store.ErrKeyModified + var lastIndex uint64 + if previous != nil { + meta, err := s.client.Set(store.Normalize(key), value, int32(previous.LastIndex)) + if err != nil { + // Compare Failed + if err == zk.ErrBadVersion { + return false, nil, store.ErrKeyModified + } + return false, nil, err } - return false, nil, err + lastIndex = uint64(meta.Version) + } else { + // Interpret previous == nil as create operation. + _, err := s.client.Create(store.Normalize(key), value, 0, zk.WorldACL(zk.PermAll)) + if err != nil { + // Zookeeper will complain if the directory doesn't exist. + if err == zk.ErrNoNode { + // Create the directory + parts := store.SplitKey(key) + parts = parts[:len(parts)-1] + if err = s.createFullPath(parts, false); err != nil { + // Failed to create the directory. + return false, nil, err + } + if _, err := s.client.Create(store.Normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil { + return false, nil, err + } + + } else { + // Unhandled error + return false, nil, err + } + } + lastIndex = 0 // Newly created nodes have version 0. } pair := &store.KVPair{ Key: key, Value: value, - LastIndex: uint64(meta.Version), + LastIndex: lastIndex, } return true, pair, nil diff --git a/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go b/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go index bf3336dd93..717d9ecdc2 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go @@ -15,6 +15,7 @@ func RunTestStore(t *testing.T, kv store.Store, backup store.Store) { testWatch(t, kv) testWatchTree(t, kv) testAtomicPut(t, kv) + testAtomicPutCreate(t, kv) testAtomicDelete(t, kv) testLockUnlock(t, kv) testPutEphemeral(t, kv, backup) @@ -177,7 +178,7 @@ func testAtomicPut(t *testing.T, kv store.Store) { assert.Equal(t, pair.Value, value) assert.NotEqual(t, pair.LastIndex, 0) - // This CAS should fail: no previous + // This CAS should fail: previous exists. success, _, err := kv.AtomicPut("hello", []byte("WORLD"), nil, nil) assert.Error(t, err) assert.False(t, success) @@ -187,13 +188,47 @@ func testAtomicPut(t *testing.T, kv store.Store) { assert.NoError(t, err) assert.True(t, success) - // This CAS should fail + // This CAS should fail, key exists. pair.LastIndex = 0 success, _, err = kv.AtomicPut("hello", []byte("WORLDWORLD"), pair, nil) assert.Error(t, err) assert.False(t, success) } +func testAtomicPutCreate(t *testing.T, kv store.Store) { + // Use a key in a new directory to ensure Stores will create directories + // that don't yet exist. + key := "put/create" + value := []byte("putcreate") + + // AtomicPut the key, previous = nil indicates create. + success, _, err := kv.AtomicPut(key, value, nil, nil) + assert.NoError(t, err) + assert.True(t, success) + + // Get should return the value and an incremented index + pair, err := kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + + // Attempting to create again should fail. + success, _, err = kv.AtomicPut(key, value, nil, nil) + assert.Error(t, err) + assert.False(t, success) + + // This CAS should succeed, since it has the value from Get() + success, _, err = kv.AtomicPut(key, []byte("PUTCREATE"), pair, nil) + assert.NoError(t, err) + assert.True(t, success) + + // Delete the key, ensures runs of the test don't interfere with each other. + err = kv.DeleteTree("put") + assert.NoError(t, err) +} + func testAtomicDelete(t *testing.T, kv store.Store) { key := "atomic" value := []byte("world") diff --git a/discovery/discovery.go b/discovery/discovery.go index 9942de7cef..1389f6d53b 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -153,7 +153,10 @@ func CreateEntries(addrs []string) (Entries, error) { } for _, addr := range addrs { - if len(addr) == 0 { + // Check if addr is non empty and valid, + // FIXME <= 1 because zookeeper may wrongfully + // return a separator character (SOH) + if len(addr) <= 1 { continue } entry, err := NewEntry(addr) diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go index 8a3d2b96f1..69c35e0f23 100644 --- a/discovery/kv/kv.go +++ b/discovery/kv/kv.go @@ -13,7 +13,8 @@ import ( ) const ( - discoveryPath = "docker/swarm/nodes" + discoveryPath = "docker/swarm/nodes" + defaultFailoverWaitTime = 10 * time.Second ) // Discovery is exported @@ -123,7 +124,7 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c // If we get here it means the store watch channel was closed. This // is unexpected so let's retry later. errCh <- fmt.Errorf("Unexpected watch error") - time.Sleep(s.heartbeat) + time.Sleep(defaultFailoverWaitTime) } }() return ch, errCh diff --git a/discovery/kv/kv_test.go b/discovery/kv/kv_test.go index f57dca2c03..d1e827db7a 100644 --- a/discovery/kv/kv_test.go +++ b/discovery/kv/kv_test.go @@ -85,7 +85,7 @@ func TestWatch(t *testing.T) { stopCh := make(chan struct{}) ch, errCh := d.Watch(stopCh) - // It should fire an error since the first WatchRange call failed. + // It should fire an error since the first WatchTree call failed. assert.EqualError(t, <-errCh, "test error") // We have to drain the error channel otherwise Watch will get stuck. go func() {