remove pkg/store

Signed-off-by: Alexandre Beslic <abronan@docker.com>
This commit is contained in:
Alexandre Beslic 2015-06-15 15:29:06 -07:00
parent c70f6f8afe
commit 04d55f3951
14 changed files with 0 additions and 2121 deletions

View File

@ -1,83 +0,0 @@
# Storage
The goal of `pkg/store` is to abstract common store operations for multiple Key/Value backends.
For example, you can use it to store your metadata or for service discovery to register machines and endpoints inside your cluster.
As of now, `pkg/store` offers support for `Consul`, `Etcd` and `Zookeeper`.
## Example of usage
### Create a new store and use Put/Get
```go
package main
import (
"fmt"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/store"
)
func main() {
var (
// Consul local address
client = "localhost:8500"
)
// Initialize a new store with consul
kv, err = store.NewStore(
store.CONSUL, // or "consul"
[]string{client},
&store.Config{
Timeout: 10*time.Second,
},
)
if err != nil {
log.Error("Cannot create store consul")
}
key := "foo"
err = kv.Put(key, []byte("bar"), nil)
if err != nil {
log.Error("Error trying to put value at key `", key, "`")
}
pair, err := kv.Get(key)
if err != nil {
log.Error("Error trying accessing value at key `", key, "`")
}
log.Info("value: ", string(pair.Value))
}
```
## Contributing to a new storage backend
A new **storage backend** should include those calls:
```go
type Store interface {
Put(key string, value []byte, options *WriteOptions) error
Get(key string) (*KVPair, error)
Delete(key string) error
Exists(key string) (bool, error)
Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)
WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error)
NewLock(key string, options *LockOptions) (Locker, error)
List(prefix string) ([]*KVPair, error)
DeleteTree(prefix string) error
AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error)
AtomicDelete(key string, previous *KVPair) (bool, error)
}
```
In the case of Swarm and to be eligible as a **discovery backend** only, a K/V store implementation should at least offer `Get`, `Put`, `WatchTree` and `List`.
`Put` should support usage of `ttl` to be able to remove entries in case of a node failure.
You can get inspiration from existing backends to create a new one. This interface could be subject to changes to improve the experience of using the library and contributing to a new backend.

View File

@ -1,403 +0,0 @@
package store
import (
"crypto/tls"
"net/http"
"strings"
"sync"
"time"
log "github.com/Sirupsen/logrus"
api "github.com/hashicorp/consul/api"
)
const (
// DefaultWatchWaitTime is how long we block for at a time to check if the
// watched key has changed. This affects the minimum time it takes to
// cancel a watch.
DefaultWatchWaitTime = 15 * time.Second
)
// Consul embeds the client and watches
type Consul struct {
sync.Mutex
config *api.Config
client *api.Client
ephemeralTTL time.Duration
ephemeralSession string
}
type consulLock struct {
lock *api.Lock
}
// InitializeConsul creates a new Consul client given
// a list of endpoints and optional tls config
func InitializeConsul(endpoints []string, options *Config) (Store, error) {
s := &Consul{}
// Create Consul client
config := api.DefaultConfig()
s.config = config
config.HttpClient = http.DefaultClient
config.Address = endpoints[0]
config.Scheme = "http"
// Set options
if options != nil {
if options.TLS != nil {
s.setTLS(options.TLS)
}
if options.ConnectionTimeout != 0 {
s.setTimeout(options.ConnectionTimeout)
}
if options.EphemeralTTL != 0 {
s.setEphemeralTTL(options.EphemeralTTL)
}
}
// Creates a new client
client, err := api.NewClient(config)
if err != nil {
log.Errorf("Couldn't initialize consul client..")
return nil, err
}
s.client = client
return s, nil
}
// SetTLS sets Consul TLS options
func (s *Consul) setTLS(tls *tls.Config) {
s.config.HttpClient.Transport = &http.Transport{
TLSClientConfig: tls,
}
s.config.Scheme = "https"
}
// SetTimeout sets the timout for connecting to Consul
func (s *Consul) setTimeout(time time.Duration) {
s.config.WaitTime = time
}
// SetEphemeralTTL sets the ttl for ephemeral nodes
func (s *Consul) setEphemeralTTL(ttl time.Duration) {
s.ephemeralTTL = ttl
}
// createEphemeralSession creates the global session
// once that is used to delete keys at node failure
func (s *Consul) createEphemeralSession() error {
s.Lock()
defer s.Unlock()
// Create new session
if s.ephemeralSession == "" {
entry := &api.SessionEntry{
Behavior: api.SessionBehaviorDelete,
TTL: s.ephemeralTTL.String(),
}
// Create global ephemeral keys session
session, _, err := s.client.Session().Create(entry, nil)
if err != nil {
return err
}
s.ephemeralSession = session
}
return nil
}
// checkActiveSession checks if the key already has a session attached
func (s *Consul) checkActiveSession(key string) (string, error) {
pair, _, err := s.client.KV().Get(key, nil)
if err != nil {
return "", err
}
if pair != nil && pair.Session != "" {
return pair.Session, nil
}
return "", nil
}
// Normalize the key for usage in Consul
func (s *Consul) normalize(key string) string {
key = normalize(key)
return strings.TrimPrefix(key, "/")
}
// Get the value at "key", returns the last modified index
// to use in conjunction to CAS calls
func (s *Consul) Get(key string) (*KVPair, error) {
options := &api.QueryOptions{
AllowStale: false,
RequireConsistent: true,
}
pair, meta, err := s.client.KV().Get(s.normalize(key), options)
if err != nil {
return nil, err
}
if pair == nil {
return nil, ErrKeyNotFound
}
return &KVPair{pair.Key, pair.Value, meta.LastIndex}, nil
}
// Put a value at "key"
func (s *Consul) Put(key string, value []byte, opts *WriteOptions) error {
key = s.normalize(key)
p := &api.KVPair{
Key: key,
Value: value,
}
if opts != nil && opts.Ephemeral {
// Check if there is any previous session with an active TTL
previous, err := s.checkActiveSession(key)
if err != nil {
return err
}
// Create the global ephemeral session if it does not exist yet
if s.ephemeralSession == "" {
if err = s.createEphemeralSession(); err != nil {
return err
}
}
// If a previous session is still active for that key, use it
// else we use the global ephemeral session
if previous != "" {
p.Session = previous
} else {
p.Session = s.ephemeralSession
}
// Create lock option with the
// EphemeralSession
lockOpts := &api.LockOptions{
Key: key,
Session: p.Session,
}
// Lock and ignore if lock is held
// It's just a placeholder for the
// ephemeral behavior
lock, _ := s.client.LockOpts(lockOpts)
if lock != nil {
lock.Lock(nil)
}
// Renew the session
_, _, err = s.client.Session().Renew(p.Session, nil)
if err != nil {
s.ephemeralSession = ""
return err
}
}
_, err := s.client.KV().Put(p, nil)
return err
}
// Delete a value at "key"
func (s *Consul) Delete(key string) error {
_, err := s.client.KV().Delete(s.normalize(key), nil)
return err
}
// Exists checks that the key exists inside the store
func (s *Consul) Exists(key string) (bool, error) {
_, err := s.Get(key)
if err != nil && err == ErrKeyNotFound {
return false, err
}
return true, nil
}
// List the content of a given prefix
func (s *Consul) List(prefix string) ([]*KVPair, error) {
pairs, _, err := s.client.KV().List(s.normalize(prefix), nil)
if err != nil {
return nil, err
}
if len(pairs) == 0 {
return nil, ErrKeyNotFound
}
kv := []*KVPair{}
for _, pair := range pairs {
if pair.Key == prefix {
continue
}
kv = append(kv, &KVPair{pair.Key, pair.Value, pair.ModifyIndex})
}
return kv, nil
}
// DeleteTree deletes a range of keys based on prefix
func (s *Consul) DeleteTree(prefix string) error {
_, err := s.client.KV().DeleteTree(s.normalize(prefix), nil)
return err
}
// Watch changes on a key.
// Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) {
key = s.normalize(key)
kv := s.client.KV()
watchCh := make(chan *KVPair)
go func() {
defer close(watchCh)
// Use a wait time in order to check if we should quit from time to
// time.
opts := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}
for {
// Check if we should quit
select {
case <-stopCh:
return
default:
}
pair, meta, err := kv.Get(key, opts)
if err != nil {
log.Errorf("consul: %v", err)
return
}
// If LastIndex didn't change then it means `Get` returned because
// of the WaitTime and the key didn't change.
if opts.WaitIndex == meta.LastIndex {
continue
}
opts.WaitIndex = meta.LastIndex
// FIXME: What happens when a key is deleted?
if pair != nil {
watchCh <- &KVPair{pair.Key, pair.Value, pair.ModifyIndex}
}
}
}()
return watchCh, nil
}
// WatchTree watches changes on a "directory"
// Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
func (s *Consul) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) {
prefix = s.normalize(prefix)
kv := s.client.KV()
watchCh := make(chan []*KVPair)
go func() {
defer close(watchCh)
// Use a wait time in order to check if we should quit from time to
// time.
opts := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}
for {
// Check if we should quit
select {
case <-stopCh:
return
default:
}
pairs, meta, err := kv.List(prefix, opts)
if err != nil {
log.Errorf("consul: %v", err)
return
}
// If LastIndex didn't change then it means `Get` returned because
// of the WaitTime and the key didn't change.
if opts.WaitIndex == meta.LastIndex {
continue
}
opts.WaitIndex = meta.LastIndex
kv := []*KVPair{}
for _, pair := range pairs {
if pair.Key == prefix {
continue
}
kv = append(kv, &KVPair{pair.Key, pair.Value, pair.ModifyIndex})
}
watchCh <- kv
}
}()
return watchCh, nil
}
// NewLock returns a handle to a lock struct which can be used to acquire and
// release the mutex.
func (s *Consul) NewLock(key string, options *LockOptions) (Locker, error) {
consulOpts := &api.LockOptions{
Key: s.normalize(key),
}
if options != nil {
consulOpts.Value = options.Value
}
l, err := s.client.LockOpts(consulOpts)
if err != nil {
return nil, err
}
return &consulLock{lock: l}, nil
}
// Lock attempts to acquire the lock and blocks while doing so.
// Returns a channel that is closed if our lock is lost or an error.
func (l *consulLock) Lock() (<-chan struct{}, error) {
return l.lock.Lock(nil)
}
// Unlock released the lock. It is an error to call this
// if the lock is not currently held.
func (l *consulLock) Unlock() error {
return l.lock.Unlock()
}
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Consul) AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error) {
if previous == nil {
return false, nil, ErrPreviousNotSpecified
}
p := &api.KVPair{Key: s.normalize(key), Value: value, ModifyIndex: previous.LastIndex}
if work, _, err := s.client.KV().CAS(p, nil); err != nil {
return false, nil, err
} else if !work {
return false, nil, ErrKeyModified
}
pair, err := s.Get(key)
if err != nil {
return false, nil, err
}
return true, pair, nil
}
// AtomicDelete deletes a value at "key" if the key has not
// been modified in the meantime, throws an error if this is the case
func (s *Consul) AtomicDelete(key string, previous *KVPair) (bool, error) {
if previous == nil {
return false, ErrPreviousNotSpecified
}
p := &api.KVPair{Key: s.normalize(key), ModifyIndex: previous.LastIndex}
if work, _, err := s.client.KV().DeleteCAS(p, nil); err != nil {
return false, err
} else if !work {
return false, ErrKeyModified
}
return true, nil
}
// Close closes the client connection
func (s *Consul) Close() {
return
}

View File

@ -1,69 +0,0 @@
package store
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func makeConsulClient(t *testing.T) Store {
client := "localhost:8500"
kv, err := NewStore(
CONSUL,
[]string{client},
&Config{
ConnectionTimeout: 3 * time.Second,
EphemeralTTL: 2 * time.Second,
},
)
if err != nil {
t.Fatalf("cannot create store: %v", err)
}
return kv
}
func TestConsulStore(t *testing.T) {
kv := makeConsulClient(t)
testStore(t, kv)
}
func TestCreateEphemeralSession(t *testing.T) {
kv := makeConsulClient(t)
consul := kv.(*Consul)
err := consul.createEphemeralSession()
assert.NoError(t, err)
assert.NotEqual(t, consul.ephemeralSession, "")
}
func TestCheckActiveSession(t *testing.T) {
kv := makeConsulClient(t)
consul := kv.(*Consul)
key := "foo"
value := []byte("bar")
// Put the first key with the Ephemeral flag
err := kv.Put(key, value, &WriteOptions{Ephemeral: true})
assert.NoError(t, err)
// Session should not be empty
session, err := consul.checkActiveSession(key)
assert.NoError(t, err)
assert.NotEqual(t, session, "")
// Delete the key
err = kv.Delete(key)
assert.NoError(t, err)
// Check the session again, it should return nothing
session, err = consul.checkActiveSession(key)
assert.NoError(t, err)
assert.Equal(t, session, "")
}

View File

@ -1,444 +0,0 @@
package store
import (
"crypto/tls"
"net"
"net/http"
"strings"
"time"
etcd "github.com/coreos/go-etcd/etcd"
)
// Etcd embeds the client
type Etcd struct {
client *etcd.Client
ephemeralTTL time.Duration
}
type etcdLock struct {
client *etcd.Client
stopLock chan struct{}
key string
value string
last *etcd.Response
ttl uint64
}
const (
defaultLockTTL = 20 * time.Second
defaultUpdateTime = 5 * time.Second
// periodicSync is the time between each call to SyncCluster
periodicSync = 10 * time.Minute
)
// InitializeEtcd creates a new Etcd client given
// a list of endpoints and optional tls config
func InitializeEtcd(addrs []string, options *Config) (Store, error) {
s := &Etcd{}
entries := createEndpoints(addrs, "http")
s.client = etcd.NewClient(entries)
// Set options
if options != nil {
if options.TLS != nil {
s.setTLS(options.TLS)
}
if options.ConnectionTimeout != 0 {
s.setTimeout(options.ConnectionTimeout)
}
if options.EphemeralTTL != 0 {
s.setEphemeralTTL(options.EphemeralTTL)
}
}
go func() {
for {
s.client.SyncCluster()
time.Sleep(periodicSync)
}
}()
return s, nil
}
// SetTLS sets the tls configuration given the path
// of certificate files
func (s *Etcd) setTLS(tls *tls.Config) {
// Change to https scheme
var addrs []string
entries := s.client.GetCluster()
for _, entry := range entries {
addrs = append(addrs, strings.Replace(entry, "http", "https", -1))
}
s.client.SetCluster(addrs)
// Set transport
t := http.Transport{
Dial: (&net.Dialer{
Timeout: 30 * time.Second, // default timeout
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: tls,
}
s.client.SetTransport(&t)
}
// SetTimeout sets the timeout used for connecting to the store
func (s *Etcd) setTimeout(time time.Duration) {
s.client.SetDialTimeout(time)
}
// SetHeartbeat sets the heartbeat value to notify we are alive
func (s *Etcd) setEphemeralTTL(time time.Duration) {
s.ephemeralTTL = time
}
// Create the entire path for a directory that does not exist
func (s *Etcd) createDirectory(path string) error {
if _, err := s.client.CreateDir(normalize(path), 10); err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
if etcdError.ErrorCode != 105 { // Skip key already exists
return err
}
} else {
return err
}
}
return nil
}
// Get the value at "key", returns the last modified index
// to use in conjunction to CAS calls
func (s *Etcd) Get(key string) (*KVPair, error) {
result, err := s.client.Get(normalize(key), false, false)
if err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
// Not a Directory or Not a file
if etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 {
return nil, ErrKeyNotFound
}
}
return nil, err
}
return &KVPair{key, []byte(result.Node.Value), result.Node.ModifiedIndex}, nil
}
// Put a value at "key"
func (s *Etcd) Put(key string, value []byte, opts *WriteOptions) error {
// Default TTL = 0 means no expiration
var ttl uint64
if opts != nil && opts.Ephemeral {
ttl = uint64(s.ephemeralTTL.Seconds())
}
if _, err := s.client.Set(key, string(value), ttl); err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
if etcdError.ErrorCode == 104 { // Not a directory
// Remove the last element (the actual key) and set the prefix as a dir
err = s.createDirectory(getDirectory(key))
if _, err := s.client.Set(key, string(value), ttl); err != nil {
return err
}
}
}
return err
}
return nil
}
// Delete a value at "key"
func (s *Etcd) Delete(key string) error {
if _, err := s.client.Delete(normalize(key), false); err != nil {
return err
}
return nil
}
// Exists checks if the key exists inside the store
func (s *Etcd) Exists(key string) (bool, error) {
entry, err := s.Get(key)
if err != nil {
if err == ErrKeyNotFound || entry.Value == nil {
return false, nil
}
return false, err
}
return true, nil
}
// Watch changes on a key.
// Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) {
// Get the current value
current, err := s.Get(key)
if err != nil {
return nil, err
}
// Start an etcd watch.
// Note: etcd will send the current value through the channel.
etcdWatchCh := make(chan *etcd.Response)
etcdStopCh := make(chan bool)
go s.client.Watch(normalize(key), 0, false, etcdWatchCh, etcdStopCh)
// Adapter goroutine: The goal here is to convert wathever format etcd is
// using into our interface.
watchCh := make(chan *KVPair)
go func() {
defer close(watchCh)
// Push the current value through the channel.
watchCh <- current
for {
select {
case result := <-etcdWatchCh:
watchCh <- &KVPair{
key,
[]byte(result.Node.Value),
result.Node.ModifiedIndex,
}
case <-stopCh:
etcdStopCh <- true
return
}
}
}()
return watchCh, nil
}
// WatchTree watches changes on a "directory"
// Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
func (s *Etcd) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) {
// Get the current value
current, err := s.List(prefix)
if err != nil {
return nil, err
}
// Start an etcd watch.
etcdWatchCh := make(chan *etcd.Response)
etcdStopCh := make(chan bool)
go s.client.Watch(normalize(prefix), 0, true, etcdWatchCh, etcdStopCh)
// Adapter goroutine: The goal here is to convert wathever format etcd is
// using into our interface.
watchCh := make(chan []*KVPair)
go func() {
defer close(watchCh)
// Push the current value through the channel.
watchCh <- current
for {
select {
case <-etcdWatchCh:
// FIXME: We should probably use the value pushed by the channel.
// However, .Node.Nodes seems to be empty.
if list, err := s.List(prefix); err == nil {
watchCh <- list
}
case <-stopCh:
etcdStopCh <- true
return
}
}
}()
return watchCh, nil
}
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Etcd) AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error) {
if previous == nil {
return false, nil, ErrPreviousNotSpecified
}
meta, err := s.client.CompareAndSwap(normalize(key), string(value), 0, "", previous.LastIndex)
if err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
if etcdError.ErrorCode == 101 { // Compare failed
return false, nil, ErrKeyModified
}
}
return false, nil, err
}
return true, &KVPair{Key: key, Value: value, LastIndex: meta.Node.ModifiedIndex}, nil
}
// AtomicDelete deletes a value at "key" if the key has not
// been modified in the meantime, throws an error if this is the case
func (s *Etcd) AtomicDelete(key string, previous *KVPair) (bool, error) {
if previous == nil {
return false, ErrPreviousNotSpecified
}
_, err := s.client.CompareAndDelete(normalize(key), "", previous.LastIndex)
if err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
if etcdError.ErrorCode == 101 { // Compare failed
return false, ErrKeyModified
}
}
return false, err
}
return true, nil
}
// List the content of a given prefix
func (s *Etcd) List(prefix string) ([]*KVPair, error) {
resp, err := s.client.Get(normalize(prefix), true, true)
if err != nil {
return nil, err
}
kv := []*KVPair{}
for _, n := range resp.Node.Nodes {
key := strings.TrimLeft(n.Key, "/")
kv = append(kv, &KVPair{key, []byte(n.Value), n.ModifiedIndex})
}
return kv, nil
}
// DeleteTree deletes a range of keys based on prefix
func (s *Etcd) DeleteTree(prefix string) error {
if _, err := s.client.Delete(normalize(prefix), true); err != nil {
return err
}
return nil
}
// NewLock returns a handle to a lock struct which can be used to acquire and
// release the mutex.
func (s *Etcd) NewLock(key string, options *LockOptions) (Locker, error) {
var value string
ttl := uint64(time.Duration(defaultLockTTL).Seconds())
// Apply options
if options != nil {
if options.Value != nil {
value = string(options.Value)
}
if options.TTL != 0 {
ttl = uint64(options.TTL.Seconds())
}
}
// Create lock object
lock := &etcdLock{
client: s.client,
key: key,
value: value,
ttl: ttl,
}
return lock, nil
}
// Lock attempts to acquire the lock and blocks while doing so.
// Returns a channel that is closed if our lock is lost or an error.
func (l *etcdLock) Lock() (<-chan struct{}, error) {
key := normalize(l.key)
// Lock holder channels
lockHeld := make(chan struct{})
stopLocking := make(chan struct{})
var lastIndex uint64
for {
resp, err := l.client.Create(key, l.value, l.ttl)
if err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
// Key already exists
if etcdError.ErrorCode != 105 {
lastIndex = ^uint64(0)
}
}
} else {
lastIndex = resp.Node.ModifiedIndex
}
_, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", lastIndex)
if err == nil {
// Leader section
l.stopLock = stopLocking
go l.holdLock(key, lockHeld, stopLocking)
break
} else {
// Seeker section
chW := make(chan *etcd.Response)
chWStop := make(chan bool)
l.waitLock(key, chW, chWStop)
// Delete or Expire event occured
// Retry
}
}
return lockHeld, nil
}
// Hold the lock as long as we can
// Updates the key ttl periodically until we receive
// an explicit stop signal from the Unlock method
func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking chan struct{}) {
defer close(lockHeld)
update := time.NewTicker(defaultUpdateTime)
defer update.Stop()
var err error
for {
select {
case <-update.C:
l.last, err = l.client.Update(key, l.value, l.ttl)
if err != nil {
return
}
case <-stopLocking:
return
}
}
}
// WaitLock simply waits for the key to be available for creation
func (l *etcdLock) waitLock(key string, eventCh chan *etcd.Response, stopWatchCh chan bool) {
go l.client.Watch(key, 0, false, eventCh, stopWatchCh)
for event := range eventCh {
if event.Action == "delete" || event.Action == "expire" {
return
}
}
}
// Unlock released the lock. It is an error to call this
// if the lock is not currently held.
func (l *etcdLock) Unlock() error {
if l.stopLock != nil {
l.stopLock <- struct{}{}
}
if l.last != nil {
_, err := l.client.CompareAndDelete(normalize(l.key), l.value, l.last.Node.ModifiedIndex)
if err != nil {
return err
}
}
return nil
}
// Close closes the client connection
func (s *Etcd) Close() {
return
}

View File

@ -1,30 +0,0 @@
package store
import (
"testing"
"time"
)
func makeEtcdClient(t *testing.T) Store {
client := "localhost:4001"
kv, err := NewStore(
ETCD,
[]string{client},
&Config{
ConnectionTimeout: 3 * time.Second,
EphemeralTTL: 2 * time.Second,
},
)
if err != nil {
t.Fatalf("cannot create store: %v", err)
}
return kv
}
func TestEtcdStore(t *testing.T) {
kv := makeEtcdClient(t)
testStore(t, kv)
}

View File

@ -1,46 +0,0 @@
package store
import (
"strings"
)
// Creates a list of endpoints given the right scheme
func createEndpoints(addrs []string, scheme string) (entries []string) {
for _, addr := range addrs {
entries = append(entries, scheme+"://"+addr)
}
return entries
}
// Normalize the key for each store to the form:
//
// /path/to/key
//
func normalize(key string) string {
return "/" + join(splitKey(key))
}
// Get the full directory part of the key to the form:
//
// /path/to/
//
func getDirectory(key string) string {
parts := splitKey(key)
parts = parts[:len(parts)-1]
return "/" + join(parts)
}
// SplitKey splits the key to extract path informations
func splitKey(key string) (path []string) {
if strings.Contains(key, "/") {
path = strings.Split(key, "/")
} else {
path = []string{key}
}
return path
}
// Join the path parts with '/'
func join(parts []string) string {
return strings.Join(parts, "/")
}

View File

@ -1,109 +0,0 @@
package store
import "github.com/stretchr/testify/mock"
// Mock store. Mocks all Store functions using testify.Mock.
type Mock struct {
mock.Mock
// Endpoints passed to InitializeMock
Endpoints []string
// Options passed to InitializeMock
Options *Config
}
// InitializeMock creates a Mock store.
func InitializeMock(endpoints []string, options *Config) (Store, error) {
s := &Mock{}
s.Endpoints = endpoints
s.Options = options
return s, nil
}
// Put mock
func (s *Mock) Put(key string, value []byte, opts *WriteOptions) error {
args := s.Mock.Called(key, value, opts)
return args.Error(0)
}
// Get mock
func (s *Mock) Get(key string) (*KVPair, error) {
args := s.Mock.Called(key)
return args.Get(0).(*KVPair), args.Error(1)
}
// Delete mock
func (s *Mock) Delete(key string) error {
args := s.Mock.Called(key)
return args.Error(0)
}
// Exists mock
func (s *Mock) Exists(key string) (bool, error) {
args := s.Mock.Called(key)
return args.Bool(0), args.Error(1)
}
// Watch mock
func (s *Mock) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) {
args := s.Mock.Called(key, stopCh)
return args.Get(0).(<-chan *KVPair), args.Error(1)
}
// WatchTree mock
func (s *Mock) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) {
args := s.Mock.Called(prefix, stopCh)
return args.Get(0).(chan []*KVPair), args.Error(1)
}
// NewLock mock
func (s *Mock) NewLock(key string, options *LockOptions) (Locker, error) {
args := s.Mock.Called(key, options)
return args.Get(0).(Locker), args.Error(1)
}
// List mock
func (s *Mock) List(prefix string) ([]*KVPair, error) {
args := s.Mock.Called(prefix)
return args.Get(0).([]*KVPair), args.Error(1)
}
// DeleteTree mock
func (s *Mock) DeleteTree(prefix string) error {
args := s.Mock.Called(prefix)
return args.Error(0)
}
// AtomicPut mock
func (s *Mock) AtomicPut(key string, value []byte, previous *KVPair, opts *WriteOptions) (bool, *KVPair, error) {
args := s.Mock.Called(key, value, previous, opts)
return args.Bool(0), args.Get(1).(*KVPair), args.Error(2)
}
// AtomicDelete mock
func (s *Mock) AtomicDelete(key string, previous *KVPair) (bool, error) {
args := s.Mock.Called(key, previous)
return args.Bool(0), args.Error(1)
}
// MockLock mock implementation of Locker
type MockLock struct {
mock.Mock
}
// Lock mock
func (l *MockLock) Lock() (<-chan struct{}, error) {
args := l.Mock.Called()
return args.Get(0).(<-chan struct{}), args.Error(1)
}
// Unlock mock
func (l *MockLock) Unlock() error {
args := l.Mock.Called()
return args.Error(0)
}
// Close mock
func (s *Mock) Close() {
return
}

View File

@ -1,154 +0,0 @@
package store
import (
"crypto/tls"
"errors"
"time"
log "github.com/Sirupsen/logrus"
)
// Backend represents a KV Store Backend
type Backend string
const (
// MOCK backend
MOCK Backend = "mock"
// CONSUL backend
CONSUL = "consul"
// ETCD backend
ETCD = "etcd"
// ZK backend
ZK = "zk"
)
var (
// ErrInvalidTTL is a specific error to consul
ErrInvalidTTL = errors.New("Invalid TTL, please change the value to the miminum allowed ttl for the chosen store")
// ErrNotSupported is exported
ErrNotSupported = errors.New("Backend storage not supported yet, please choose another one")
// ErrNotImplemented is exported
ErrNotImplemented = errors.New("Call not implemented in current backend")
// ErrNotReachable is exported
ErrNotReachable = errors.New("Api not reachable")
// ErrCannotLock is exported
ErrCannotLock = errors.New("Error acquiring the lock")
// ErrWatchDoesNotExist is exported
ErrWatchDoesNotExist = errors.New("No watch found for specified key")
// ErrKeyModified is exported
ErrKeyModified = errors.New("Unable to complete atomic operation, key modified")
// ErrKeyNotFound is exported
ErrKeyNotFound = errors.New("Key not found in store")
// ErrPreviousNotSpecified is exported
ErrPreviousNotSpecified = errors.New("Previous K/V pair should be provided for the Atomic operation")
)
// Config contains the options for a storage client
type Config struct {
TLS *tls.Config
ConnectionTimeout time.Duration
EphemeralTTL time.Duration
}
// Store represents the backend K/V storage
// Each store should support every call listed
// here. Or it couldn't be implemented as a K/V
// backend for libkv
type Store interface {
// Put a value at the specified key
Put(key string, value []byte, options *WriteOptions) error
// Get a value given its key
Get(key string) (*KVPair, error)
// Delete the value at the specified key
Delete(key string) error
// Verify if a Key exists in the store
Exists(key string) (bool, error)
// Watch changes on a key.
// Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)
// WatchTree watches changes on a "directory"
// Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error)
// CreateLock for a given key.
// The returned Locker is not held and must be acquired with `.Lock`.
// value is optional.
NewLock(key string, options *LockOptions) (Locker, error)
// List the content of a given prefix
List(prefix string) ([]*KVPair, error)
// DeleteTree deletes a range of keys based on prefix
DeleteTree(prefix string) error
// Atomic operation on a single value
AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error)
// Atomic delete of a single value
AtomicDelete(key string, previous *KVPair) (bool, error)
// Close the store connection
Close()
}
// KVPair represents {Key, Value, Lastindex} tuple
type KVPair struct {
Key string
Value []byte
LastIndex uint64
}
// WriteOptions contains optional request parameters
type WriteOptions struct {
Heartbeat time.Duration
Ephemeral bool
}
// LockOptions contains optional request parameters
type LockOptions struct {
Value []byte // Optional, value to associate with the lock
TTL time.Duration // Optional, expiration ttl associated with the lock
}
// WatchCallback is used for watch methods on keys
// and is triggered on key change
type WatchCallback func(entries ...*KVPair)
// Locker provides locking mechanism on top of the store.
// Similar to `sync.Lock` except it may return errors.
type Locker interface {
Lock() (<-chan struct{}, error)
Unlock() error
}
// Initialize creates a new Store object, initializing the client
type Initialize func(addrs []string, options *Config) (Store, error)
var (
// Backend initializers
initializers = map[Backend]Initialize{
MOCK: InitializeMock,
CONSUL: InitializeConsul,
ETCD: InitializeEtcd,
ZK: InitializeZookeeper,
}
)
// NewStore creates a an instance of store
func NewStore(backend Backend, addrs []string, options *Config) (Store, error) {
if init, exists := initializers[backend]; exists {
log.WithFields(log.Fields{"backend": backend}).Debug("Initializing store service")
return init(addrs, options)
}
return nil, ErrNotSupported
}

View File

@ -1,401 +0,0 @@
package store
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func testStore(t *testing.T, kv Store) {
testPutGetDelete(t, kv)
testWatch(t, kv)
testWatchTree(t, kv)
testAtomicPut(t, kv)
testAtomicDelete(t, kv)
testLockUnlock(t, kv)
testPutEphemeral(t, kv)
testList(t, kv)
testDeleteTree(t, kv)
}
func testPutGetDelete(t *testing.T, kv Store) {
key := "foo"
value := []byte("bar")
// Put the key
err := kv.Put(key, value, nil)
assert.NoError(t, err)
// Get should return the value and an incremented index
pair, err := kv.Get(key)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
}
assert.Equal(t, pair.Value, value)
assert.NotEqual(t, pair.LastIndex, 0)
// Delete the key
err = kv.Delete(key)
assert.NoError(t, err)
// Get should fail
pair, err = kv.Get(key)
assert.Error(t, err)
assert.Nil(t, pair)
}
func testWatch(t *testing.T, kv Store) {
key := "hello"
value := []byte("world")
newValue := []byte("world!")
// Put the key
err := kv.Put(key, value, nil)
assert.NoError(t, err)
stopCh := make(<-chan struct{})
events, err := kv.Watch(key, stopCh)
assert.NoError(t, err)
assert.NotNil(t, events)
// Update loop
go func() {
timeout := time.After(1 * time.Second)
tick := time.Tick(250 * time.Millisecond)
for {
select {
case <-timeout:
return
case <-tick:
err := kv.Put(key, newValue, nil)
if assert.NoError(t, err) {
continue
}
return
}
}
}()
// Check for updates
timeout := time.After(2 * time.Second)
eventCount := 1
for {
select {
case event := <-events:
assert.NotNil(t, event)
if eventCount == 1 {
assert.Equal(t, event.Key, key)
assert.Equal(t, event.Value, value)
} else {
assert.Equal(t, event.Key, key)
assert.Equal(t, event.Value, newValue)
}
eventCount++
// We received all the events we wanted to check
if eventCount >= 4 {
return
}
case <-timeout:
t.Fatal("Timeout reached")
return
}
}
}
func testWatchTree(t *testing.T, kv Store) {
dir := "tree"
node1 := "tree/node1"
value1 := []byte("node1")
node2 := "tree/node2"
value2 := []byte("node2")
node3 := "tree/node3"
value3 := []byte("node3")
err := kv.Put(node1, value1, nil)
assert.NoError(t, err)
err = kv.Put(node2, value2, nil)
assert.NoError(t, err)
err = kv.Put(node3, value3, nil)
assert.NoError(t, err)
stopCh := make(<-chan struct{})
events, err := kv.WatchTree(dir, stopCh)
assert.NoError(t, err)
assert.NotNil(t, events)
// Update loop
go func() {
timeout := time.After(250 * time.Millisecond)
for {
select {
case <-timeout:
err := kv.Delete(node3)
assert.NoError(t, err)
return
}
}
}()
// Check for updates
timeout := time.After(4 * time.Second)
for {
select {
case event := <-events:
assert.NotNil(t, event)
// We received the Delete event on a child node
// Exit test successfully
if len(event) == 2 {
return
}
case <-timeout:
t.Fatal("Timeout reached")
return
}
}
}
func testAtomicPut(t *testing.T, kv Store) {
key := "hello"
value := []byte("world")
// Put the key
err := kv.Put(key, value, nil)
assert.NoError(t, err)
// Get should return the value and an incremented index
pair, err := kv.Get(key)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
}
assert.Equal(t, pair.Value, value)
assert.NotEqual(t, pair.LastIndex, 0)
// This CAS should succeed
success, _, err := kv.AtomicPut("hello", []byte("WORLD"), pair, nil)
assert.NoError(t, err)
assert.True(t, success)
// This CAS should fail
pair.LastIndex = 0
success, _, err = kv.AtomicPut("hello", []byte("WORLDWORLD"), pair, nil)
assert.Error(t, err)
assert.False(t, success)
}
func testAtomicDelete(t *testing.T, kv Store) {
key := "atomic"
value := []byte("world")
// Put the key
err := kv.Put(key, value, nil)
assert.NoError(t, err)
// Get should return the value and an incremented index
pair, err := kv.Get(key)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
}
assert.Equal(t, pair.Value, value)
assert.NotEqual(t, pair.LastIndex, 0)
tempIndex := pair.LastIndex
// AtomicDelete should fail
pair.LastIndex = 0
success, err := kv.AtomicDelete(key, pair)
assert.Error(t, err)
assert.False(t, success)
// AtomicDelete should succeed
pair.LastIndex = tempIndex
success, err = kv.AtomicDelete(key, pair)
assert.NoError(t, err)
assert.True(t, success)
}
func testLockUnlock(t *testing.T, kv Store) {
t.Parallel()
key := "foo"
value := []byte("bar")
// We should be able to create a new lock on key
lock, err := kv.NewLock(key, &LockOptions{Value: value})
assert.NoError(t, err)
assert.NotNil(t, lock)
// Lock should successfully succeed or block
lockChan, err := lock.Lock()
assert.NoError(t, err)
assert.NotNil(t, lockChan)
// Get should work
pair, err := kv.Get(key)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
}
assert.Equal(t, pair.Value, value)
assert.NotEqual(t, pair.LastIndex, 0)
// Unlock should succeed
err = lock.Unlock()
assert.NoError(t, err)
// Get should work
pair, err = kv.Get(key)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
}
assert.Equal(t, pair.Value, value)
assert.NotEqual(t, pair.LastIndex, 0)
}
// FIXME Gracefully handle Zookeeper
func testPutEphemeral(t *testing.T, kv Store) {
// Zookeeper: initialize client here (Close() hangs otherwise)
zookeeper := false
if _, ok := kv.(*Zookeeper); ok {
zookeeper = true
kv = makeZkClient(t)
}
firstKey := "first"
firstValue := []byte("foo")
secondKey := "second"
secondValue := []byte("bar")
// Put the first key with the Ephemeral flag
err := kv.Put(firstKey, firstValue, &WriteOptions{Ephemeral: true})
assert.NoError(t, err)
// Put a second key with the Ephemeral flag
err = kv.Put(secondKey, secondValue, &WriteOptions{Ephemeral: true})
assert.NoError(t, err)
// Get on firstKey should work
pair, err := kv.Get(firstKey)
assert.NoError(t, err)
assert.NotNil(t, pair)
// Get on secondKey should work
pair, err = kv.Get(secondKey)
assert.NoError(t, err)
assert.NotNil(t, pair)
// Zookeeper: close client connection
if zookeeper {
kv.Close()
}
// Let the session expire
time.Sleep(5 * time.Second)
// Zookeeper: re-create the client
if zookeeper {
kv = makeZkClient(t)
}
// Get on firstKey shouldn't work
pair, err = kv.Get(firstKey)
assert.Error(t, err)
assert.Nil(t, pair)
// Get on secondKey shouldn't work
pair, err = kv.Get(secondKey)
assert.Error(t, err)
assert.Nil(t, pair)
}
func testList(t *testing.T, kv Store) {
prefix := "nodes"
firstKey := "nodes/first"
firstValue := []byte("first")
secondKey := "nodes/second"
secondValue := []byte("second")
// Put the first key
err := kv.Put(firstKey, firstValue, nil)
assert.NoError(t, err)
// Put the second key
err = kv.Put(secondKey, secondValue, nil)
assert.NoError(t, err)
// List should work and return the two correct values
pairs, err := kv.List(prefix)
assert.NoError(t, err)
if assert.NotNil(t, pairs) {
assert.Equal(t, len(pairs), 2)
}
// Check pairs, those are not necessarily in Put order
for _, pair := range pairs {
if pair.Key == firstKey {
assert.Equal(t, pair.Value, firstValue)
}
if pair.Key == secondKey {
assert.Equal(t, pair.Value, secondValue)
}
}
}
func testDeleteTree(t *testing.T, kv Store) {
prefix := "nodes"
firstKey := "nodes/first"
firstValue := []byte("first")
secondKey := "nodes/second"
secondValue := []byte("second")
// Put the first key
err := kv.Put(firstKey, firstValue, nil)
assert.NoError(t, err)
// Put the second key
err = kv.Put(secondKey, secondValue, nil)
assert.NoError(t, err)
// Get should work on the first Key
pair, err := kv.Get(firstKey)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
}
assert.Equal(t, pair.Value, firstValue)
assert.NotEqual(t, pair.LastIndex, 0)
// Get should work on the second Key
pair, err = kv.Get(secondKey)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
}
assert.Equal(t, pair.Value, secondValue)
assert.NotEqual(t, pair.LastIndex, 0)
// Delete Values under directory `nodes`
err = kv.DeleteTree(prefix)
assert.NoError(t, err)
// Get should fail on both keys
pair, err = kv.Get(firstKey)
assert.Error(t, err)
assert.Nil(t, pair)
pair, err = kv.Get(secondKey)
assert.Error(t, err)
assert.Nil(t, pair)
}

View File

@ -1,311 +0,0 @@
package store
import (
"strings"
"time"
log "github.com/Sirupsen/logrus"
zk "github.com/samuel/go-zookeeper/zk"
)
const defaultTimeout = 10 * time.Second
// Zookeeper embeds the zookeeper client
type Zookeeper struct {
timeout time.Duration
client *zk.Conn
}
type zookeeperLock struct {
client *zk.Conn
lock *zk.Lock
key string
value []byte
}
// InitializeZookeeper creates a new Zookeeper client
// given a list of endpoints and optional tls config
func InitializeZookeeper(endpoints []string, options *Config) (Store, error) {
s := &Zookeeper{}
s.timeout = defaultTimeout
// Set options
if options != nil {
if options.ConnectionTimeout != 0 {
s.setTimeout(options.ConnectionTimeout)
}
}
conn, _, err := zk.Connect(endpoints, s.timeout)
if err != nil {
log.Error(err)
return nil, err
}
s.client = conn
return s, nil
}
// SetTimeout sets the timout for connecting to Zookeeper
func (s *Zookeeper) setTimeout(time time.Duration) {
s.timeout = time
}
// Get the value at "key", returns the last modified index
// to use in conjunction to CAS calls
func (s *Zookeeper) Get(key string) (*KVPair, error) {
resp, meta, err := s.client.Get(normalize(key))
if err != nil {
return nil, err
}
if resp == nil {
return nil, ErrKeyNotFound
}
return &KVPair{key, resp, uint64(meta.Version)}, nil
}
// Create the entire path for a directory that does not exist
func (s *Zookeeper) createFullpath(path []string, ephemeral bool) error {
for i := 1; i <= len(path); i++ {
newpath := "/" + strings.Join(path[:i], "/")
if i == len(path) && ephemeral {
_, err := s.client.Create(newpath, []byte{1}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
return err
}
_, err := s.client.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll))
if err != nil {
// Skip if node already exists
if err != zk.ErrNodeExists {
return err
}
}
}
return nil
}
// Put a value at "key"
func (s *Zookeeper) Put(key string, value []byte, opts *WriteOptions) error {
fkey := normalize(key)
exists, err := s.Exists(key)
if err != nil {
return err
}
if !exists {
if opts != nil && opts.Ephemeral {
s.createFullpath(splitKey(key), opts.Ephemeral)
} else {
s.createFullpath(splitKey(key), false)
}
}
_, err = s.client.Set(fkey, value, -1)
return err
}
// Delete a value at "key"
func (s *Zookeeper) Delete(key string) error {
err := s.client.Delete(normalize(key), -1)
return err
}
// Exists checks if the key exists inside the store
func (s *Zookeeper) Exists(key string) (bool, error) {
exists, _, err := s.client.Exists(normalize(key))
if err != nil {
return false, err
}
return exists, nil
}
// Watch changes on a key.
// Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) {
fkey := normalize(key)
pair, err := s.Get(key)
if err != nil {
return nil, err
}
// Catch zk notifications and fire changes into the channel.
watchCh := make(chan *KVPair)
go func() {
defer close(watchCh)
// Get returns the current value before setting the watch.
watchCh <- pair
for {
_, _, eventCh, err := s.client.GetW(fkey)
if err != nil {
return
}
select {
case e := <-eventCh:
if e.Type == zk.EventNodeDataChanged {
if entry, err := s.Get(key); err == nil {
watchCh <- entry
}
}
case <-stopCh:
// There is no way to stop GetW so just quit
return
}
}
}()
return watchCh, nil
}
// WatchTree watches changes on a "directory"
// Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
func (s *Zookeeper) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) {
fprefix := normalize(prefix)
entries, err := s.List(prefix)
if err != nil {
return nil, err
}
// Catch zk notifications and fire changes into the channel.
watchCh := make(chan []*KVPair)
go func() {
defer close(watchCh)
// List returns the current values before setting the watch.
watchCh <- entries
for {
_, _, eventCh, err := s.client.ChildrenW(fprefix)
if err != nil {
return
}
select {
case e := <-eventCh:
if e.Type == zk.EventNodeChildrenChanged {
if kv, err := s.List(prefix); err == nil {
watchCh <- kv
}
}
case <-stopCh:
// There is no way to stop GetW so just quit
return
}
}
}()
return watchCh, nil
}
// List the content of a given prefix
func (s *Zookeeper) List(prefix string) ([]*KVPair, error) {
keys, stat, err := s.client.Children(normalize(prefix))
if err != nil {
return nil, err
}
kv := []*KVPair{}
for _, key := range keys {
// FIXME Costly Get request for each child key..
pair, err := s.Get(prefix + normalize(key))
if err != nil {
return nil, err
}
kv = append(kv, &KVPair{key, []byte(pair.Value), uint64(stat.Version)})
}
return kv, nil
}
// DeleteTree deletes a range of keys based on prefix
func (s *Zookeeper) DeleteTree(prefix string) error {
pairs, err := s.List(prefix)
if err != nil {
return err
}
var reqs []interface{}
for _, pair := range pairs {
reqs = append(reqs, &zk.DeleteRequest{
Path: normalize(prefix + "/" + pair.Key),
Version: -1,
})
}
_, err = s.client.Multi(reqs...)
return err
}
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVPair, _ *WriteOptions) (bool, *KVPair, error) {
if previous == nil {
return false, nil, ErrPreviousNotSpecified
}
meta, err := s.client.Set(normalize(key), value, int32(previous.LastIndex))
if err != nil {
if err == zk.ErrBadVersion {
return false, nil, ErrKeyModified
}
return false, nil, err
}
return true, &KVPair{Key: key, Value: value, LastIndex: uint64(meta.Version)}, nil
}
// AtomicDelete deletes a value at "key" if the key has not
// been modified in the meantime, throws an error if this is the case
func (s *Zookeeper) AtomicDelete(key string, previous *KVPair) (bool, error) {
if previous == nil {
return false, ErrPreviousNotSpecified
}
err := s.client.Delete(normalize(key), int32(previous.LastIndex))
if err != nil {
if err == zk.ErrBadVersion {
return false, ErrKeyModified
}
return false, err
}
return true, nil
}
// NewLock returns a handle to a lock struct which can be used to acquire and
// release the mutex.
func (s *Zookeeper) NewLock(key string, options *LockOptions) (Locker, error) {
value := []byte("")
// Apply options
if options != nil {
if options.Value != nil {
value = options.Value
}
}
return &zookeeperLock{
client: s.client,
key: normalize(key),
value: value,
lock: zk.NewLock(s.client, normalize(key), zk.WorldACL(zk.PermAll)),
}, nil
}
// Lock attempts to acquire the lock and blocks while doing so.
// Returns a channel that is closed if our lock is lost or an error.
func (l *zookeeperLock) Lock() (<-chan struct{}, error) {
err := l.lock.Lock()
if err == nil {
// We hold the lock, we can set our value
// FIXME: When the last leader leaves the election, this value will be left behind
_, err = l.client.Set(l.key, l.value, -1)
}
return make(chan struct{}), err
}
// Unlock released the lock. It is an error to call this
// if the lock is not currently held.
func (l *zookeeperLock) Unlock() error {
return l.lock.Unlock()
}
// Close closes the client connection
func (s *Zookeeper) Close() {
s.client.Close()
}

View File

@ -1,30 +0,0 @@
package store
import (
"testing"
"time"
)
func makeZkClient(t *testing.T) Store {
client := "localhost:2181"
kv, err := NewStore(
ZK,
[]string{client},
&Config{
ConnectionTimeout: 3 * time.Second,
EphemeralTTL: 2 * time.Second,
},
)
if err != nil {
t.Fatalf("cannot create store: %v", err)
}
return kv
}
func TestZkStore(t *testing.T) {
kv := makeZkClient(t)
testStore(t, kv)
}

View File

@ -1,18 +0,0 @@
#!/bin/bash
if [ $# -gt 0 ] ; then
CONSUL_VERSION="$1"
else
CONSUL_VERSION="0.5.2"
fi
# install consul
wget "https://dl.bintray.com/mitchellh/consul/${CONSUL_VERSION}_linux_amd64.zip"
unzip "${CONSUL_VERSION}_linux_amd64.zip"
# make config for minimum ttl
touch config.json
echo "{\"session_ttl_min\": \"2s\"}" >> config.json
# check
./consul --version

View File

@ -1,11 +0,0 @@
#!/bin/bash
if [ $# -gt 0 ] ; then
ETCD_VERSION="$1"
else
ETCD_VERSION="2.0.11"
fi
curl -L https://github.com/coreos/etcd/releases/download/v$ETCD_VERSION/etcd-v$ETCD_VERSION-linux-amd64.tar.gz -o etcd-v$ETCD_VERSION-linux-amd64.tar.gz
tar xzvf etcd-v$ETCD_VERSION-linux-amd64.tar.gz
mv etcd-v$ETCD_VERSION-linux-amd64 etcd

View File

@ -1,12 +0,0 @@
#!/bin/bash
if [ $# -gt 0 ] ; then
ZK_VERSION="$1"
else
ZK_VERSION="3.4.6"
fi
wget "http://mirrors.ukfast.co.uk/sites/ftp.apache.org/zookeeper/stable/zookeeper-${ZK_VERSION}.tar.gz"
tar -xvf "zookeeper-${ZK_VERSION}.tar.gz"
mv zookeeper-$ZK_VERSION zk
mv ./zk/conf/zoo_sample.cfg ./zk/conf/zoo.cfg