Merge pull request #1008 from abronan/update_libkv_godeps

Update libkv godeps and minor fixes
This commit is contained in:
Andrea Luzzardi 2015-06-30 15:19:44 -07:00
commit bedf1ec3ff
11 changed files with 187 additions and 26 deletions

2
Godeps/Godeps.json generated
View File

@ -52,7 +52,7 @@
},
{
"ImportPath": "github.com/docker/libkv",
"Rev": "e201296a473f7e796488b8bf82629f28ca13634b"
"Rev": "82ad407bbfaf1ef435a2ebabc498a9afb128ed37"
},
{
"ImportPath": "github.com/gogo/protobuf/proto",

View File

@ -34,7 +34,7 @@ func main() {
client := "localhost:8500"
// Initialize a new store with consul
kv, err = libkv.NewStore(
kv, err := libkv.NewStore(
store.CONSUL, // or "consul"
[]string{client},
&store.Config{

View File

@ -1,3 +1,64 @@
// Package libkv provides a Go native library to store metadata.
//
// The goal of libkv is to abstract common store operations for multiple
// Key/Value backends and offer the same experience no matter which one of the
// backend you want to use.
//
// For example, you can use it to store your metadata or for service discovery to
// register machines and endpoints inside your cluster.
//
// As of now, `libkv` offers support for `Consul`, `Etcd` and `Zookeeper`.
//
// ## Example of usage
//
// ### Create a new store and use Put/Get
//
//
// package main
//
// import (
// "fmt"
// "time"
//
// "github.com/docker/libkv"
// "github.com/docker/libkv/store"
// log "github.com/Sirupsen/logrus"
// )
//
// func main() {
// client := "localhost:8500"
//
// // Initialize a new store with consul
// kv, err := libkv.NewStore(
// store.CONSUL, // or "consul"
// []string{client},
// &store.Config{
// ConnectionTimeout: 10*time.Second,
// },
// )
// if err != nil {
// log.Fatal("Cannot create store consul")
// }
//
// key := "foo"
// err = kv.Put(key, []byte("bar"), nil)
// if err != nil {
// log.Error("Error trying to put value at key `", key, "`")
// }
//
// pair, err := kv.Get(key)
// if err != nil {
// log.Error("Error trying accessing value at key `", key, "`")
// }
//
// log.Info("value: ", string(pair.Value))
// }
//
// ##Copyright and license
//
// Code and documentation copyright 2015 Docker, inc. Code released under the
// Apache 2.0 license. Docs released under Creative commons.
//
package libkv
import (

View File

@ -375,11 +375,16 @@ func (l *consulLock) Unlock() error {
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Consul) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
p := &api.KVPair{Key: s.normalize(key), Value: value}
if previous == nil {
return false, nil, store.ErrPreviousNotSpecified
// Consul interprets ModifyIndex = 0 as new key.
p.ModifyIndex = 0
} else {
p.ModifyIndex = previous.LastIndex
}
p := &api.KVPair{Key: s.normalize(key), Value: value, ModifyIndex: previous.LastIndex}
if work, _, err := s.client.KV().CAS(p, nil); err != nil {
return false, nil, err
} else if !work {

View File

@ -218,6 +218,11 @@ func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair,
for {
select {
case result := <-etcdWatchCh:
if result == nil || result.Node == nil {
// Something went wrong, exit
// No need to stop the chan as the watch already ended
return
}
watchCh <- &store.KVPair{
Key: key,
Value: []byte(result.Node.Value),
@ -260,7 +265,12 @@ func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*st
for {
select {
case <-etcdWatchCh:
case event := <-etcdWatchCh:
if event == nil {
// Something went wrong, exit
// No need to stop the chan as the watch already ended
return
}
// FIXME: We should probably use the value pushed by the channel.
// However, Node.Nodes seems to be empty.
if list, err := s.List(directory); err == nil {
@ -278,11 +288,32 @@ func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*st
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Etcd) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
if previous == nil {
return false, nil, store.ErrPreviousNotSpecified
}
meta, err := s.client.CompareAndSwap(store.Normalize(key), string(value), 0, "", previous.LastIndex)
var meta *etcd.Response
var err error
if previous != nil {
meta, err = s.client.CompareAndSwap(store.Normalize(key), string(value), 0, "", previous.LastIndex)
} else {
// Interpret previous == nil as Atomic Create
meta, err = s.client.Create(store.Normalize(key), string(value), 0)
if etcdError, ok := err.(*etcd.EtcdError); ok {
// Directory doesn't exist.
if etcdError.ErrorCode == 104 {
// Remove the last element (the actual key)
// and create the full directory path
err = s.createDirectory(store.GetDirectory(key))
if err != nil {
return false, nil, err
}
// Now that the directory is created, create the key
if _, err := s.client.Create(key, string(value), 0); err != nil {
return false, nil, err
}
}
}
}
if err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
// Compare Failed

View File

@ -77,7 +77,8 @@ type Store interface {
// DeleteTree deletes a range of keys under a given directory
DeleteTree(directory string) error
// Atomic operation on a single value
// Atomic CAS operation on a single value.
// Pass previous = nil to create a new key.
AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error)
// Atomic delete of a single value

View File

@ -265,23 +265,47 @@ func (s *Zookeeper) DeleteTree(directory string) error {
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, _ *store.WriteOptions) (bool, *store.KVPair, error) {
if previous == nil {
return false, nil, store.ErrPreviousNotSpecified
}
meta, err := s.client.Set(store.Normalize(key), value, int32(previous.LastIndex))
if err != nil {
// Compare Failed
if err == zk.ErrBadVersion {
return false, nil, store.ErrKeyModified
var lastIndex uint64
if previous != nil {
meta, err := s.client.Set(store.Normalize(key), value, int32(previous.LastIndex))
if err != nil {
// Compare Failed
if err == zk.ErrBadVersion {
return false, nil, store.ErrKeyModified
}
return false, nil, err
}
return false, nil, err
lastIndex = uint64(meta.Version)
} else {
// Interpret previous == nil as create operation.
_, err := s.client.Create(store.Normalize(key), value, 0, zk.WorldACL(zk.PermAll))
if err != nil {
// Zookeeper will complain if the directory doesn't exist.
if err == zk.ErrNoNode {
// Create the directory
parts := store.SplitKey(key)
parts = parts[:len(parts)-1]
if err = s.createFullPath(parts, false); err != nil {
// Failed to create the directory.
return false, nil, err
}
if _, err := s.client.Create(store.Normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil {
return false, nil, err
}
} else {
// Unhandled error
return false, nil, err
}
}
lastIndex = 0 // Newly created nodes have version 0.
}
pair := &store.KVPair{
Key: key,
Value: value,
LastIndex: uint64(meta.Version),
LastIndex: lastIndex,
}
return true, pair, nil

View File

@ -15,6 +15,7 @@ func RunTestStore(t *testing.T, kv store.Store, backup store.Store) {
testWatch(t, kv)
testWatchTree(t, kv)
testAtomicPut(t, kv)
testAtomicPutCreate(t, kv)
testAtomicDelete(t, kv)
testLockUnlock(t, kv)
testPutEphemeral(t, kv, backup)
@ -177,7 +178,7 @@ func testAtomicPut(t *testing.T, kv store.Store) {
assert.Equal(t, pair.Value, value)
assert.NotEqual(t, pair.LastIndex, 0)
// This CAS should fail: no previous
// This CAS should fail: previous exists.
success, _, err := kv.AtomicPut("hello", []byte("WORLD"), nil, nil)
assert.Error(t, err)
assert.False(t, success)
@ -187,13 +188,47 @@ func testAtomicPut(t *testing.T, kv store.Store) {
assert.NoError(t, err)
assert.True(t, success)
// This CAS should fail
// This CAS should fail, key exists.
pair.LastIndex = 0
success, _, err = kv.AtomicPut("hello", []byte("WORLDWORLD"), pair, nil)
assert.Error(t, err)
assert.False(t, success)
}
func testAtomicPutCreate(t *testing.T, kv store.Store) {
// Use a key in a new directory to ensure Stores will create directories
// that don't yet exist.
key := "put/create"
value := []byte("putcreate")
// AtomicPut the key, previous = nil indicates create.
success, _, err := kv.AtomicPut(key, value, nil, nil)
assert.NoError(t, err)
assert.True(t, success)
// Get should return the value and an incremented index
pair, err := kv.Get(key)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
}
assert.Equal(t, pair.Value, value)
// Attempting to create again should fail.
success, _, err = kv.AtomicPut(key, value, nil, nil)
assert.Error(t, err)
assert.False(t, success)
// This CAS should succeed, since it has the value from Get()
success, _, err = kv.AtomicPut(key, []byte("PUTCREATE"), pair, nil)
assert.NoError(t, err)
assert.True(t, success)
// Delete the key, ensures runs of the test don't interfere with each other.
err = kv.DeleteTree("put")
assert.NoError(t, err)
}
func testAtomicDelete(t *testing.T, kv store.Store) {
key := "atomic"
value := []byte("world")

View File

@ -153,7 +153,10 @@ func CreateEntries(addrs []string) (Entries, error) {
}
for _, addr := range addrs {
if len(addr) == 0 {
// Check if addr is non empty and valid,
// FIXME <= 1 because zookeeper may wrongfully
// return a separator character (SOH)
if len(addr) <= 1 {
continue
}
entry, err := NewEntry(addr)

View File

@ -13,7 +13,8 @@ import (
)
const (
discoveryPath = "docker/swarm/nodes"
discoveryPath = "docker/swarm/nodes"
defaultFailoverWaitTime = 10 * time.Second
)
// Discovery is exported
@ -123,7 +124,7 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c
// If we get here it means the store watch channel was closed. This
// is unexpected so let's retry later.
errCh <- fmt.Errorf("Unexpected watch error")
time.Sleep(s.heartbeat)
time.Sleep(defaultFailoverWaitTime)
}
}()
return ch, errCh

View File

@ -85,7 +85,7 @@ func TestWatch(t *testing.T) {
stopCh := make(chan struct{})
ch, errCh := d.Watch(stopCh)
// It should fire an error since the first WatchRange call failed.
// It should fire an error since the first WatchTree call failed.
assert.EqualError(t, <-errCh, "test error")
// We have to drain the error channel otherwise Watch will get stuck.
go func() {