Merge pull request #1202 from abronan/update_libkv

Update libkv Godeps
This commit is contained in:
Victor Vieux 2015-09-17 22:58:22 -07:00
commit 02bd0e9b76
13 changed files with 360 additions and 246 deletions

2
Godeps/Godeps.json generated
View File

@ -47,7 +47,7 @@
},
{
"ImportPath": "github.com/docker/libkv",
"Rev": "56d6633db7a524c74773fd6b98f02afcd0e6f27a"
"Rev": "4aec61dc3c9c1c4f11221a0c0cfde67ccb8f04c3"
},
{
"ImportPath": "github.com/gogo/protobuf/proto",

View File

@ -12,7 +12,7 @@ For example, you can use it to store your metadata or for service discovery to r
You can also easily implement a generic *Leader Election* on top of it (see the [swarm/leadership](https://github.com/docker/swarm/tree/master/leadership) package).
As of now, `libkv` offers support for `Consul`, `Etcd` and `Zookeeper`.
As of now, `libkv` offers support for `Consul`, `Etcd`, `Zookeeper` and `BoltDB`.
## Example of usage
@ -24,12 +24,18 @@ package main
import (
"fmt"
"time"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/consul"
log "github.com/Sirupsen/logrus"
)
func init() {
// Register consul store to libkv
consul.Register()
}
func main() {
client := "localhost:8500"
@ -62,11 +68,13 @@ func main() {
You can find other usage examples for `libkv` under the `docker/swarm` or `docker/libnetwork` repositories.
## Details
## Warning
You should expect the same experience for basic operations like `Get`/`Put`, etc.
There are a few consistency issues with *etcd*, on the notion of *directory* and *key*. If you want to use the three KV backends in an interchangeable way, you should only put data on leaves (see [Issue 20](https://github.com/docker/libkv/issues/20) for more details). This will be fixed when *etcd* API v3 will be made available (API v3 drops the *directory/key* distinction). An official release for *libkv* with a tag is likely to come after this issue being marked as **solved**.
However calls like `WatchTree` may return different events (or number of events) depending on the backend (for now, `Etcd` and `Consul` will likely return more events than `Zookeeper` that you should triage properly).
Other than that, you should expect the same experience for basic operations like `Get`/`Put`, etc.
Calls like `WatchTree` may return different events (or number of events) depending on the backend (for now, `Etcd` and `Consul` will likely return more events than `Zookeeper` that you should triage properly). Although you should be able to use it successfully to watch on events in an interchangeable way (see the **swarm/leadership** or **swarm/discovery** packages in **docker/swarm**).
## Create a new storage backend

View File

@ -62,10 +62,11 @@
package libkv
import (
"fmt"
"sort"
"strings"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/consul"
"github.com/docker/libkv/store/etcd"
"github.com/docker/libkv/store/zookeeper"
)
// Initialize creates a new Store object, initializing the client
@ -73,11 +74,16 @@ type Initialize func(addrs []string, options *store.Config) (store.Store, error)
var (
// Backend initializers
initializers = map[store.Backend]Initialize{
store.CONSUL: consul.New,
store.ETCD: etcd.New,
store.ZK: zookeeper.New,
}
initializers = make(map[store.Backend]Initialize)
supportedBackend = func() string {
keys := make([]string, 0, len(initializers))
for k := range initializers {
keys = append(keys, string(k))
}
sort.Strings(keys)
return strings.Join(keys, ", ")
}()
)
// NewStore creates a an instance of store
@ -86,5 +92,10 @@ func NewStore(backend store.Backend, addrs []string, options *store.Config) (sto
return init(addrs, options)
}
return nil, store.ErrNotSupported
return nil, fmt.Errorf("%s %s", store.ErrNotSupported.Error(), supportedBackend)
}
// AddStore adds a new store backend to libkv
func AddStore(store store.Backend, init Initialize) {
initializers[store] = init
}

View File

@ -5,66 +5,9 @@ import (
"time"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/consul"
"github.com/docker/libkv/store/etcd"
"github.com/docker/libkv/store/zookeeper"
"github.com/stretchr/testify/assert"
)
func TestNewStoreConsul(t *testing.T) {
client := "localhost:8500"
kv, err := NewStore(
store.CONSUL,
[]string{client},
&store.Config{
ConnectionTimeout: 10 * time.Second,
},
)
assert.NoError(t, err)
assert.NotNil(t, kv)
if _, ok := kv.(*consul.Consul); !ok {
t.Fatal("Error while initializing store consul")
}
}
func TestNewStoreEtcd(t *testing.T) {
client := "localhost:4001"
kv, err := NewStore(
store.ETCD,
[]string{client},
&store.Config{
ConnectionTimeout: 10 * time.Second,
},
)
assert.NoError(t, err)
assert.NotNil(t, kv)
if _, ok := kv.(*etcd.Etcd); !ok {
t.Fatal("Error while initializing store etcd")
}
}
func TestNewStoreZookeeper(t *testing.T) {
client := "localhost:2181"
kv, err := NewStore(
store.ZK,
[]string{client},
&store.Config{
ConnectionTimeout: 10 * time.Second,
},
)
assert.NoError(t, err)
assert.NotNil(t, kv)
if _, ok := kv.(*zookeeper.Zookeeper); !ok {
t.Fatal("Error while initializing store zookeeper")
}
}
func TestNewStoreUnsupported(t *testing.T) {
client := "localhost:9999"
@ -77,4 +20,5 @@ func TestNewStoreUnsupported(t *testing.T) {
)
assert.Error(t, err)
assert.Nil(t, kv)
assert.Equal(t, "Backend storage not supported yet, please choose one of ", err.Error())
}

View File

@ -8,6 +8,7 @@ import (
"sync"
"time"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
api "github.com/hashicorp/consul/api"
)
@ -29,15 +30,19 @@ var (
// Store interface
type Consul struct {
sync.Mutex
config *api.Config
client *api.Client
ephemeralTTL time.Duration
config *api.Config
client *api.Client
}
type consulLock struct {
lock *api.Lock
}
// Register registers consul to libkv
func Register() {
libkv.AddStore(store.CONSUL, New)
}
// New creates a new Consul client given a list
// of endpoints and optional tls config
func New(endpoints []string, options *store.Config) (store.Store, error) {
@ -62,9 +67,6 @@ func New(endpoints []string, options *store.Config) (store.Store, error) {
if options.ConnectionTimeout != 0 {
s.setTimeout(options.ConnectionTimeout)
}
if options.EphemeralTTL != 0 {
s.setEphemeralTTL(options.EphemeralTTL)
}
}
// Creates a new client
@ -90,18 +92,13 @@ func (s *Consul) setTimeout(time time.Duration) {
s.config.WaitTime = time
}
// SetEphemeralTTL sets the ttl for ephemeral nodes
func (s *Consul) setEphemeralTTL(ttl time.Duration) {
s.ephemeralTTL = ttl
}
// Normalize the key for usage in Consul
func (s *Consul) normalize(key string) string {
key = store.Normalize(key)
return strings.TrimPrefix(key, "/")
}
func (s *Consul) refreshSession(pair *api.KVPair) error {
func (s *Consul) refreshSession(pair *api.KVPair, ttl time.Duration) error {
// Check if there is any previous session with an active TTL
session, err := s.getActiveSession(pair.Key)
if err != nil {
@ -110,9 +107,9 @@ func (s *Consul) refreshSession(pair *api.KVPair) error {
if session == "" {
entry := &api.SessionEntry{
Behavior: api.SessionBehaviorDelete, // Delete the key when the session expires
TTL: ((s.ephemeralTTL) / 2).String(), // Consul multiplies the TTL by 2x
LockDelay: 1 * time.Millisecond, // Virtually disable lock delay
Behavior: api.SessionBehaviorDelete, // Delete the key when the session expires
TTL: (ttl / 2).String(), // Consul multiplies the TTL by 2x
LockDelay: 1 * time.Millisecond, // Virtually disable lock delay
}
// Create the key session
@ -137,7 +134,7 @@ func (s *Consul) refreshSession(pair *api.KVPair) error {
_, _, err = s.client.Session().Renew(session, nil)
if err != nil {
return s.refreshSession(pair)
return s.refreshSession(pair, ttl)
}
return nil
}
@ -185,9 +182,9 @@ func (s *Consul) Put(key string, value []byte, opts *store.WriteOptions) error {
Value: value,
}
if opts != nil && opts.Ephemeral {
if opts != nil && opts.TTL > 0 {
// Create or refresh the session
err := s.refreshSession(p)
err := s.refreshSession(p, opts.TTL)
if err != nil {
return err
}
@ -199,6 +196,9 @@ func (s *Consul) Put(key string, value []byte, opts *store.WriteOptions) error {
// Delete a value at "key"
func (s *Consul) Delete(key string) error {
if _, err := s.Get(key); err != nil {
return err
}
_, err := s.client.KV().Delete(s.normalize(key), nil)
return err
}
@ -206,7 +206,10 @@ func (s *Consul) Delete(key string) error {
// Exists checks that the key exists inside the store
func (s *Consul) Exists(key string) (bool, error) {
_, err := s.Get(key)
if err != nil && err == store.ErrKeyNotFound {
if err != nil {
if err == store.ErrKeyNotFound {
return false, nil
}
return false, err
}
return true, nil
@ -240,6 +243,9 @@ func (s *Consul) List(directory string) ([]*store.KVPair, error) {
// DeleteTree deletes a range of keys under a given directory
func (s *Consul) DeleteTree(directory string) error {
if _, err := s.List(directory); err != nil {
return err
}
_, err := s.client.KV().DeleteTree(s.normalize(directory), nil)
return err
}

View File

@ -4,19 +4,22 @@ import (
"testing"
"time"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
"github.com/docker/libkv/testutils"
"github.com/stretchr/testify/assert"
)
var (
client = "localhost:8500"
)
func makeConsulClient(t *testing.T) store.Store {
client := "localhost:8500"
kv, err := New(
[]string{client},
&store.Config{
ConnectionTimeout: 3 * time.Second,
EphemeralTTL: 2 * time.Second,
},
)
@ -27,11 +30,28 @@ func makeConsulClient(t *testing.T) store.Store {
return kv
}
func TestRegister(t *testing.T) {
Register()
kv, err := libkv.NewStore(store.CONSUL, []string{client}, nil)
assert.NoError(t, err)
assert.NotNil(t, kv)
if _, ok := kv.(*Consul); !ok {
t.Fatal("Error registering and initializing consul")
}
}
func TestConsulStore(t *testing.T) {
kv := makeConsulClient(t)
backup := makeConsulClient(t)
testutils.RunTestStore(t, kv, backup)
testutils.RunTestCommon(t, kv)
testutils.RunTestAtomic(t, kv)
testutils.RunTestWatch(t, kv)
testutils.RunTestLock(t, kv)
testutils.RunTestTTL(t, kv, backup)
testutils.RunCleanup(t, kv)
}
func TestGetActiveSession(t *testing.T) {
@ -43,7 +63,7 @@ func TestGetActiveSession(t *testing.T) {
value := []byte("bar")
// Put the first key with the Ephemeral flag
err := kv.Put(key, value, &store.WriteOptions{Ephemeral: true})
err := kv.Put(key, value, &store.WriteOptions{TTL: 2 * time.Second})
assert.NoError(t, err)
// Session should not be empty

View File

@ -8,14 +8,14 @@ import (
"time"
etcd "github.com/coreos/go-etcd/etcd"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
)
// Etcd is the receiver type for the
// Store interface
type Etcd struct {
client *etcd.Client
ephemeralTTL time.Duration
client *etcd.Client
}
type etcdLock struct {
@ -33,6 +33,11 @@ const (
defaultUpdateTime = 5 * time.Second
)
// Register registers etcd to libkv
func Register() {
libkv.AddStore(store.ETCD, New)
}
// New creates a new Etcd client given a list
// of endpoints and an optional tls config
func New(addrs []string, options *store.Config) (store.Store, error) {
@ -49,9 +54,6 @@ func New(addrs []string, options *store.Config) (store.Store, error) {
if options.ConnectionTimeout != 0 {
s.setTimeout(options.ConnectionTimeout)
}
if options.EphemeralTTL != 0 {
s.setEphemeralTTL(options.EphemeralTTL)
}
}
// Periodic SyncCluster
@ -93,12 +95,6 @@ func (s *Etcd) setTimeout(time time.Duration) {
s.client.SetDialTimeout(time)
}
// setEphemeralHeartbeat sets the heartbeat value to notify
// that a node is alive
func (s *Etcd) setEphemeralTTL(time time.Duration) {
s.ephemeralTTL = time
}
// createDirectory creates the entire path for a directory
// that does not exist
func (s *Etcd) createDirectory(path string) error {
@ -120,11 +116,8 @@ func (s *Etcd) createDirectory(path string) error {
func (s *Etcd) Get(key string) (pair *store.KVPair, err error) {
result, err := s.client.Get(store.Normalize(key), false, false)
if err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
// Not a Directory or Not a file
if etcdError.ErrorCode == 100 || etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 {
return nil, store.ErrKeyNotFound
}
if isKeyNotFoundError(err) {
return nil, store.ErrKeyNotFound
}
return nil, err
}
@ -143,8 +136,8 @@ func (s *Etcd) Put(key string, value []byte, opts *store.WriteOptions) error {
// Default TTL = 0 means no expiration
var ttl uint64
if opts != nil && opts.Ephemeral {
ttl = uint64(s.ephemeralTTL.Seconds())
if opts != nil && opts.TTL > 0 {
ttl = uint64(opts.TTL.Seconds())
}
if _, err := s.client.Set(key, string(value), ttl); err != nil {
@ -173,14 +166,17 @@ func (s *Etcd) Put(key string, value []byte, opts *store.WriteOptions) error {
// Delete a value at "key"
func (s *Etcd) Delete(key string) error {
_, err := s.client.Delete(store.Normalize(key), false)
if isKeyNotFoundError(err) {
return store.ErrKeyNotFound
}
return err
}
// Exists checks if the key exists inside the store
func (s *Etcd) Exists(key string) (bool, error) {
entry, err := s.Get(key)
if err != nil && entry != nil {
if err == store.ErrKeyNotFound || entry.Value == nil {
_, err := s.Get(key)
if err != nil {
if err == store.ErrKeyNotFound {
return false, nil
}
return false, err
@ -359,6 +355,9 @@ func (s *Etcd) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
func (s *Etcd) List(directory string) ([]*store.KVPair, error) {
resp, err := s.client.Get(store.Normalize(directory), true, true)
if err != nil {
if isKeyNotFoundError(err) {
return nil, store.ErrKeyNotFound
}
return nil, err
}
kv := []*store.KVPair{}
@ -376,6 +375,9 @@ func (s *Etcd) List(directory string) ([]*store.KVPair, error) {
// DeleteTree deletes a range of keys under a given directory
func (s *Etcd) DeleteTree(directory string) error {
_, err := s.client.Delete(store.Normalize(directory), true)
if isKeyNotFoundError(err) {
return store.ErrKeyNotFound
}
return err
}
@ -507,3 +509,15 @@ func (l *etcdLock) Unlock() error {
func (s *Etcd) Close() {
return
}
func isKeyNotFoundError(err error) bool {
if err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
// Not a Directory or Not a file
if etcdError.ErrorCode == 100 || etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 {
return true
}
}
}
return false
}

View File

@ -4,18 +4,21 @@ import (
"testing"
"time"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
"github.com/docker/libkv/testutils"
"github.com/stretchr/testify/assert"
)
var (
client = "localhost:4001"
)
func makeEtcdClient(t *testing.T) store.Store {
client := "localhost:4001"
kv, err := New(
[]string{client},
&store.Config{
ConnectionTimeout: 3 * time.Second,
EphemeralTTL: 2 * time.Second,
},
)
@ -26,9 +29,26 @@ func makeEtcdClient(t *testing.T) store.Store {
return kv
}
func TestRegister(t *testing.T) {
Register()
kv, err := libkv.NewStore(store.ETCD, []string{client}, nil)
assert.NoError(t, err)
assert.NotNil(t, kv)
if _, ok := kv.(*Etcd); !ok {
t.Fatal("Error registering and initializing etcd")
}
}
func TestEtcdStore(t *testing.T) {
kv := makeEtcdClient(t)
backup := makeEtcdClient(t)
testutils.RunTestStore(t, kv, backup)
testutils.RunTestCommon(t, kv)
testutils.RunTestAtomic(t, kv)
testutils.RunTestWatch(t, kv)
testutils.RunTestLock(t, kv)
testutils.RunTestTTL(t, kv, backup)
testutils.RunCleanup(t, kv)
}

View File

@ -16,11 +16,13 @@ const (
ETCD Backend = "etcd"
// ZK backend
ZK Backend = "zk"
// BOLTDB backend
BOLTDB Backend = "boltdb"
)
var (
// ErrNotSupported is thrown when the backend k/v store is not supported by libkv
ErrNotSupported = errors.New("Backend storage not supported yet, please choose another one")
ErrNotSupported = errors.New("Backend storage not supported yet, please choose one of")
// ErrNotImplemented is thrown when a method is not implemented by the current backend
ErrNotImplemented = errors.New("Call not implemented in current backend")
// ErrNotReachable is thrown when the API cannot be reached for issuing common store operations
@ -39,7 +41,7 @@ var (
type Config struct {
TLS *tls.Config
ConnectionTimeout time.Duration
EphemeralTTL time.Duration
Bucket string
}
// Store represents the backend K/V storage
@ -63,10 +65,10 @@ type Store interface {
Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)
// WatchTree watches for changes on child nodes under
// a given a directory
// a given directory
WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*KVPair, error)
// CreateLock for a given key.
// NewLock creates a lock for a given key.
// The returned Locker is not held and must be acquired
// with `.Lock`. The Value is optional.
NewLock(key string, options *LockOptions) (Locker, error)
@ -97,8 +99,7 @@ type KVPair struct {
// WriteOptions contains optional request parameters
type WriteOptions struct {
Heartbeat time.Duration
Ephemeral bool
TTL time.Duration
}
// LockOptions contains optional request parameters

View File

@ -4,14 +4,12 @@ import (
"strings"
"time"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
zk "github.com/samuel/go-zookeeper/zk"
)
const (
// SOH control character
SOH = "\x01"
defaultTimeout = 10 * time.Second
)
@ -29,6 +27,11 @@ type zookeeperLock struct {
value []byte
}
// Register registers zookeeper to libkv
func Register() {
libkv.AddStore(store.ZK, New)
}
// New creates a new Zookeeper client given a
// list of endpoints and an optional tls config
func New(endpoints []string, options *store.Config) (store.Store, error) {
@ -60,22 +63,15 @@ func (s *Zookeeper) setTimeout(time time.Duration) {
// Get the value at "key", returns the last modified index
// to use in conjunction to Atomic calls
func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) {
resp, meta, err := s.client.Get(store.Normalize(key))
resp, meta, err := s.client.Get(s.normalize(key))
if err != nil {
if err == zk.ErrNoNode {
return nil, store.ErrKeyNotFound
}
return nil, err
}
// If resp is nil, the key does not exist
if resp == nil {
return nil, store.ErrKeyNotFound
}
// FIXME handle very rare cases where Get returns the
// SOH control character instead of the actual value
if string(resp) == SOH {
return s.Get(store.Normalize(key))
}
pair = &store.KVPair{
Key: key,
Value: resp,
@ -91,10 +87,10 @@ func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error {
for i := 1; i <= len(path); i++ {
newpath := "/" + strings.Join(path[:i], "/")
if i == len(path) && ephemeral {
_, err := s.client.Create(newpath, []byte{1}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
_, err := s.client.Create(newpath, []byte{}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
return err
}
_, err := s.client.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll))
_, err := s.client.Create(newpath, []byte{}, 0, zk.WorldACL(zk.PermAll))
if err != nil {
// Skip if node already exists
if err != zk.ErrNodeExists {
@ -107,7 +103,7 @@ func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error {
// Put a value at "key"
func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) error {
fkey := store.Normalize(key)
fkey := s.normalize(key)
exists, err := s.Exists(key)
if err != nil {
@ -115,10 +111,10 @@ func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) erro
}
if !exists {
if opts != nil && opts.Ephemeral {
s.createFullPath(store.SplitKey(key), opts.Ephemeral)
if opts != nil && opts.TTL > 0 {
s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), true)
} else {
s.createFullPath(store.SplitKey(key), false)
s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), false)
}
}
@ -128,13 +124,16 @@ func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) erro
// Delete a value at "key"
func (s *Zookeeper) Delete(key string) error {
err := s.client.Delete(store.Normalize(key), -1)
err := s.client.Delete(s.normalize(key), -1)
if err == zk.ErrNoNode {
return store.ErrKeyNotFound
}
return err
}
// Exists checks if the key exists inside the store
func (s *Zookeeper) Exists(key string) (bool, error) {
exists, _, err := s.client.Exists(store.Normalize(key))
exists, _, err := s.client.Exists(s.normalize(key))
if err != nil {
return false, err
}
@ -162,7 +161,7 @@ func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVP
// to listening to any event that may occur on that key
watchCh <- pair
for {
_, _, eventCh, err := s.client.GetW(store.Normalize(key))
_, _, eventCh, err := s.client.GetW(s.normalize(key))
if err != nil {
return
}
@ -206,7 +205,7 @@ func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan
watchCh <- entries
for {
_, _, eventCh, err := s.client.ChildrenW(store.Normalize(directory))
_, _, eventCh, err := s.client.ChildrenW(s.normalize(directory))
if err != nil {
return
}
@ -229,8 +228,11 @@ func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan
// List child nodes of a given directory
func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) {
keys, stat, err := s.client.Children(store.Normalize(directory))
keys, stat, err := s.client.Children(s.normalize(directory))
if err != nil {
if err == zk.ErrNoNode {
return nil, store.ErrKeyNotFound
}
return nil, err
}
@ -238,7 +240,7 @@ func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) {
// FIXME Costly Get request for each child key..
for _, key := range keys {
pair, err := s.Get(directory + store.Normalize(key))
pair, err := s.Get(strings.TrimSuffix(directory, "/") + s.normalize(key))
if err != nil {
// If node is not found: List is out of date, retry
if err == zk.ErrNoNode {
@ -268,7 +270,7 @@ func (s *Zookeeper) DeleteTree(directory string) error {
for _, pair := range pairs {
reqs = append(reqs, &zk.DeleteRequest{
Path: store.Normalize(directory + "/" + pair.Key),
Path: s.normalize(directory + "/" + pair.Key),
Version: -1,
})
}
@ -283,7 +285,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair,
var lastIndex uint64
if previous != nil {
meta, err := s.client.Set(store.Normalize(key), value, int32(previous.LastIndex))
meta, err := s.client.Set(s.normalize(key), value, int32(previous.LastIndex))
if err != nil {
// Compare Failed
if err == zk.ErrBadVersion {
@ -294,7 +296,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair,
lastIndex = uint64(meta.Version)
} else {
// Interpret previous == nil as create operation.
_, err := s.client.Create(store.Normalize(key), value, 0, zk.WorldACL(zk.PermAll))
_, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll))
if err != nil {
// Zookeeper will complain if the directory doesn't exist.
if err == zk.ErrNoNode {
@ -305,7 +307,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair,
// Failed to create the directory.
return false, nil, err
}
if _, err := s.client.Create(store.Normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil {
if _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil {
return false, nil, err
}
@ -334,7 +336,7 @@ func (s *Zookeeper) AtomicDelete(key string, previous *store.KVPair) (bool, erro
return false, store.ErrPreviousNotSpecified
}
err := s.client.Delete(store.Normalize(key), int32(previous.LastIndex))
err := s.client.Delete(s.normalize(key), int32(previous.LastIndex))
if err != nil {
if err == zk.ErrBadVersion {
return false, store.ErrKeyModified
@ -358,9 +360,9 @@ func (s *Zookeeper) NewLock(key string, options *store.LockOptions) (lock store.
lock = &zookeeperLock{
client: s.client,
key: store.Normalize(key),
key: s.normalize(key),
value: value,
lock: zk.NewLock(s.client, store.Normalize(key), zk.WorldACL(zk.PermAll)),
lock: zk.NewLock(s.client, s.normalize(key), zk.WorldACL(zk.PermAll)),
}
return lock, err
@ -392,3 +394,9 @@ func (l *zookeeperLock) Unlock() error {
func (s *Zookeeper) Close() {
s.client.Close()
}
// Normalize the key for usage in Zookeeper
func (s *Zookeeper) normalize(key string) string {
key = store.Normalize(key)
return strings.TrimSuffix(key, "/")
}

View File

@ -4,18 +4,21 @@ import (
"testing"
"time"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
"github.com/docker/libkv/testutils"
"github.com/stretchr/testify/assert"
)
var (
client = "localhost:2181"
)
func makeZkClient(t *testing.T) store.Store {
client := "localhost:2181"
kv, err := New(
[]string{client},
&store.Config{
ConnectionTimeout: 3 * time.Second,
EphemeralTTL: 2 * time.Second,
},
)
@ -26,9 +29,26 @@ func makeZkClient(t *testing.T) store.Store {
return kv
}
func TestRegister(t *testing.T) {
Register()
kv, err := libkv.NewStore(store.ZK, []string{client}, nil)
assert.NoError(t, err)
assert.NotNil(t, kv)
if _, ok := kv.(*Zookeeper); !ok {
t.Fatal("Error registering and initializing zookeeper")
}
}
func TestZkStore(t *testing.T) {
kv := makeZkClient(t)
backup := makeZkClient(t)
testutils.RunTestStore(t, kv, backup)
testutils.RunTestCommon(t, kv)
testutils.RunTestAtomic(t, kv)
testutils.RunTestWatch(t, kv)
testutils.RunTestLock(t, kv)
testutils.RunTestTTL(t, kv, backup)
testutils.RunCleanup(t, kv)
}

View File

@ -1,6 +1,7 @@
package testutils
import (
"fmt"
"testing"
"time"
@ -8,50 +9,89 @@ import (
"github.com/stretchr/testify/assert"
)
// RunTestStore is an helper testing method that is
// called by each K/V backend sub-package testing
func RunTestStore(t *testing.T, kv store.Store, backup store.Store) {
testPutGetDelete(t, kv)
testWatch(t, kv)
testWatchTree(t, kv)
testAtomicPut(t, kv)
testAtomicPutCreate(t, kv)
testAtomicDelete(t, kv)
testLockUnlock(t, kv)
testPutEphemeral(t, kv, backup)
// RunTestCommon tests the minimal required APIs which
// should be supported by all K/V backends
func RunTestCommon(t *testing.T, kv store.Store) {
testPutGetDeleteExists(t, kv)
testList(t, kv)
testDeleteTree(t, kv)
}
func testPutGetDelete(t *testing.T, kv store.Store) {
key := "foo"
// RunTestAtomic tests the Atomic operations by the K/V
// backends
func RunTestAtomic(t *testing.T, kv store.Store) {
testAtomicPut(t, kv)
testAtomicPutCreate(t, kv)
testAtomicDelete(t, kv)
}
// RunTestWatch tests the watch/monitor APIs supported
// by the K/V backends.
func RunTestWatch(t *testing.T, kv store.Store) {
testWatch(t, kv)
testWatchTree(t, kv)
}
// RunTestLock tests the KV pair Lock/Unlock APIs supported
// by the K/V backends.
func RunTestLock(t *testing.T, kv store.Store) {
testLockUnlock(t, kv)
}
// RunTestTTL tests the TTL funtionality of the K/V backend.
func RunTestTTL(t *testing.T, kv store.Store, backup store.Store) {
testPutTTL(t, kv, backup)
}
func testPutGetDeleteExists(t *testing.T, kv store.Store) {
// Get a not exist key should return ErrKeyNotFound
pair, err := kv.Get("/testPutGetDelete_not_exist_key")
assert.Equal(t, store.ErrKeyNotFound, err)
value := []byte("bar")
for _, key := range []string{
"testPutGetDeleteExists",
"testPutGetDeleteExists/",
"testPutGetDeleteExists/testbar/",
"testPutGetDeleteExists/testbar/testfoobar",
} {
failMsg := fmt.Sprintf("Fail key %s", key)
// Put the key
err = kv.Put(key, value, nil)
assert.NoError(t, err, failMsg)
// Put the key
err := kv.Put(key, value, nil)
assert.NoError(t, err)
// Get should return the value and an incremented index
pair, err = kv.Get(key)
assert.NoError(t, err, failMsg)
if assert.NotNil(t, pair, failMsg) {
assert.NotNil(t, pair.Value, failMsg)
}
assert.Equal(t, pair.Value, value, failMsg)
assert.NotEqual(t, pair.LastIndex, 0, failMsg)
// Get should return the value and an incremented index
pair, err := kv.Get(key)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
// Exists should return true
exists, err := kv.Exists(key)
assert.NoError(t, err, failMsg)
assert.True(t, exists, failMsg)
// Delete the key
err = kv.Delete(key)
assert.NoError(t, err, failMsg)
// Get should fail
pair, err = kv.Get(key)
assert.Error(t, err, failMsg)
assert.Nil(t, pair, failMsg)
// Exists should return false
exists, err = kv.Exists(key)
assert.NoError(t, err, failMsg)
assert.False(t, exists, failMsg)
}
assert.Equal(t, pair.Value, value)
assert.NotEqual(t, pair.LastIndex, 0)
// Delete the key
err = kv.Delete(key)
assert.NoError(t, err)
// Get should fail
pair, err = kv.Get(key)
assert.Error(t, err)
assert.Nil(t, pair)
}
func testWatch(t *testing.T, kv store.Store) {
key := "hello"
key := "testWatch"
value := []byte("world")
newValue := []byte("world!")
@ -108,15 +148,15 @@ func testWatch(t *testing.T, kv store.Store) {
}
func testWatchTree(t *testing.T, kv store.Store) {
dir := "tree"
dir := "testWatchTree"
node1 := "tree/node1"
node1 := "testWatchTree/node1"
value1 := []byte("node1")
node2 := "tree/node2"
node2 := "testWatchTree/node2"
value2 := []byte("node2")
node3 := "tree/node3"
node3 := "testWatchTree/node3"
value3 := []byte("node3")
err := kv.Put(node1, value1, nil)
@ -162,7 +202,7 @@ func testWatchTree(t *testing.T, kv store.Store) {
}
func testAtomicPut(t *testing.T, kv store.Store) {
key := "hello"
key := "testAtomicPut"
value := []byte("world")
// Put the key
@ -179,18 +219,18 @@ func testAtomicPut(t *testing.T, kv store.Store) {
assert.NotEqual(t, pair.LastIndex, 0)
// This CAS should fail: previous exists.
success, _, err := kv.AtomicPut("hello", []byte("WORLD"), nil, nil)
success, _, err := kv.AtomicPut(key, []byte("WORLD"), nil, nil)
assert.Error(t, err)
assert.False(t, success)
// This CAS should succeed
success, _, err = kv.AtomicPut("hello", []byte("WORLD"), pair, nil)
success, _, err = kv.AtomicPut(key, []byte("WORLD"), pair, nil)
assert.NoError(t, err)
assert.True(t, success)
// This CAS should fail, key exists.
pair.LastIndex = 0
success, _, err = kv.AtomicPut("hello", []byte("WORLDWORLD"), pair, nil)
success, _, err = kv.AtomicPut(key, []byte("WORLDWORLD"), pair, nil)
assert.Error(t, err)
assert.False(t, success)
}
@ -198,7 +238,7 @@ func testAtomicPut(t *testing.T, kv store.Store) {
func testAtomicPutCreate(t *testing.T, kv store.Store) {
// Use a key in a new directory to ensure Stores will create directories
// that don't yet exist.
key := "put/create"
key := "testAtomicPutCreate/create"
value := []byte("putcreate")
// AtomicPut the key, previous = nil indicates create.
@ -223,14 +263,10 @@ func testAtomicPutCreate(t *testing.T, kv store.Store) {
success, _, err = kv.AtomicPut(key, []byte("PUTCREATE"), pair, nil)
assert.NoError(t, err)
assert.True(t, success)
// Delete the key, ensures runs of the test don't interfere with each other.
err = kv.DeleteTree("put")
assert.NoError(t, err)
}
func testAtomicDelete(t *testing.T, kv store.Store) {
key := "atomic"
key := "testAtomicDelete"
value := []byte("world")
// Put the key
@ -262,11 +298,11 @@ func testAtomicDelete(t *testing.T, kv store.Store) {
}
func testLockUnlock(t *testing.T, kv store.Store) {
key := "foo"
key := "testLockUnlock"
value := []byte("bar")
// We should be able to create a new lock on key
lock, err := kv.NewLock(key, &store.LockOptions{Value: value})
lock, err := kv.NewLock(key, &store.LockOptions{Value: value, TTL: 2 * time.Second})
assert.NoError(t, err)
assert.NotNil(t, lock)
@ -303,19 +339,19 @@ func testLockUnlock(t *testing.T, kv store.Store) {
assert.NotEqual(t, pair.LastIndex, 0)
}
func testPutEphemeral(t *testing.T, kv store.Store, otherConn store.Store) {
firstKey := "first"
func testPutTTL(t *testing.T, kv store.Store, otherConn store.Store) {
firstKey := "testPutTTL"
firstValue := []byte("foo")
secondKey := "second"
secondValue := []byte("bar")
// Put the first key with the Ephemeral flag
err := otherConn.Put(firstKey, firstValue, &store.WriteOptions{Ephemeral: true})
err := otherConn.Put(firstKey, firstValue, &store.WriteOptions{TTL: 2 * time.Second})
assert.NoError(t, err)
// Put a second key with the Ephemeral flag
err = otherConn.Put(secondKey, secondValue, &store.WriteOptions{Ephemeral: true})
err = otherConn.Put(secondKey, secondValue, &store.WriteOptions{TTL: 2 * time.Second})
assert.NoError(t, err)
// Get on firstKey should work
@ -346,12 +382,12 @@ func testPutEphemeral(t *testing.T, kv store.Store, otherConn store.Store) {
}
func testList(t *testing.T, kv store.Store) {
prefix := "nodes"
prefix := "testList"
firstKey := "nodes/first"
firstKey := "testList/first"
firstValue := []byte("first")
secondKey := "nodes/second"
secondKey := "testList/second"
secondValue := []byte("second")
// Put the first key
@ -363,35 +399,37 @@ func testList(t *testing.T, kv store.Store) {
assert.NoError(t, err)
// List should work and return the two correct values
pairs, err := kv.List(prefix)
assert.NoError(t, err)
if assert.NotNil(t, pairs) {
assert.Equal(t, len(pairs), 2)
}
// Check pairs, those are not necessarily in Put order
for _, pair := range pairs {
if pair.Key == firstKey {
assert.Equal(t, pair.Value, firstValue)
for _, parent := range []string{prefix, prefix + "/"} {
pairs, err := kv.List(parent)
assert.NoError(t, err)
if assert.NotNil(t, pairs) {
assert.Equal(t, len(pairs), 2)
}
if pair.Key == secondKey {
assert.Equal(t, pair.Value, secondValue)
// Check pairs, those are not necessarily in Put order
for _, pair := range pairs {
if pair.Key == firstKey {
assert.Equal(t, pair.Value, firstValue)
}
if pair.Key == secondKey {
assert.Equal(t, pair.Value, secondValue)
}
}
}
// List should fail: the key does not exist
pairs, err = kv.List("idontexist")
assert.Error(t, err)
pairs, err := kv.List("idontexist")
assert.Equal(t, store.ErrKeyNotFound, err)
assert.Nil(t, pairs)
}
func testDeleteTree(t *testing.T, kv store.Store) {
prefix := "nodes"
prefix := "testDeleteTree"
firstKey := "nodes/first"
firstKey := "testDeleteTree/first"
firstValue := []byte("first")
secondKey := "nodes/second"
secondKey := "testDeleteTree/second"
secondValue := []byte("second")
// Put the first key
@ -433,3 +471,24 @@ func testDeleteTree(t *testing.T, kv store.Store) {
assert.Error(t, err)
assert.Nil(t, pair)
}
// RunCleanup cleans up keys introduced by the tests
func RunCleanup(t *testing.T, kv store.Store) {
for _, key := range []string{
"testPutGetDeleteExists",
"testWatch",
"testWatchTree",
"testAtomicPut",
"testAtomicPutCreate",
"testAtomicDelete",
"testLockUnlock",
"testPutTTL",
"testList",
"testDeleteTree",
} {
err := kv.DeleteTree(key)
assert.True(t, err == nil || err == store.ErrKeyNotFound, fmt.Sprintf("failed to delete tree key %s: %v", key, err))
err = kv.Delete(key)
assert.True(t, err == nil || err == store.ErrKeyNotFound, fmt.Sprintf("failed to delete key %s: %v", key, err))
}
}

View File

@ -9,6 +9,9 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/consul"
"github.com/docker/libkv/store/etcd"
"github.com/docker/libkv/store/zookeeper"
"github.com/docker/swarm/discovery"
)
@ -32,6 +35,12 @@ func init() {
// Init is exported
func Init() {
// Register to libkv
zookeeper.Register()
consul.Register()
etcd.Register()
// Register to internal Swarm discovery service
discovery.Register("zk", &Discovery{backend: store.ZK})
discovery.Register("consul", &Discovery{backend: store.CONSUL})
discovery.Register("etcd", &Discovery{backend: store.ETCD})
@ -56,14 +65,7 @@ func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Du
// Creates a new store, will ignore options given
// if not supported by the chosen store
s.store, err = libkv.NewStore(
s.backend,
addrs,
&store.Config{
EphemeralTTL: s.ttl,
},
)
s.store, err = libkv.NewStore(s.backend, addrs, &store.Config{})
return err
}
@ -126,12 +128,13 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c
time.Sleep(s.heartbeat)
}
}()
return ch, errCh
}
// Register is exported
func (s *Discovery) Register(addr string) error {
opts := &store.WriteOptions{Ephemeral: true, Heartbeat: s.heartbeat}
opts := &store.WriteOptions{TTL: s.ttl}
return s.store.Put(path.Join(s.path, addr), []byte(addr), opts)
}