mirror of https://github.com/docker/docs.git
commit
a30c2ae680
|
@ -6,22 +6,23 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/docker/swarm/discovery"
|
||||
"github.com/docker/swarm/pkg/store"
|
||||
)
|
||||
|
||||
// Discovery is exported
|
||||
type Discovery struct {
|
||||
backend store.Backend
|
||||
store store.Store
|
||||
name string
|
||||
heartbeat time.Duration
|
||||
prefix string
|
||||
}
|
||||
|
||||
func init() {
|
||||
discovery.Register("zk", &Discovery{name: "zk"})
|
||||
discovery.Register("consul", &Discovery{name: "consul"})
|
||||
discovery.Register("etcd", &Discovery{name: "etcd"})
|
||||
discovery.Register("zk", &Discovery{backend: store.ZK})
|
||||
discovery.Register("consul", &Discovery{backend: store.CONSUL})
|
||||
discovery.Register("etcd", &Discovery{backend: store.ETCD})
|
||||
}
|
||||
|
||||
// Initialize is exported
|
||||
|
@ -47,7 +48,7 @@ func (s *Discovery) Initialize(uris string, heartbeat uint64) error {
|
|||
// Creates a new store, will ignore options given
|
||||
// if not supported by the chosen store
|
||||
s.store, err = store.CreateStore(
|
||||
s.name, // name of the store
|
||||
s.backend,
|
||||
addrs,
|
||||
&store.Config{
|
||||
Timeout: s.heartbeat,
|
||||
|
@ -62,7 +63,7 @@ func (s *Discovery) Initialize(uris string, heartbeat uint64) error {
|
|||
|
||||
// Fetch is exported
|
||||
func (s *Discovery) Fetch() ([]*discovery.Entry, error) {
|
||||
addrs, err := s.store.GetRange(s.prefix)
|
||||
addrs, err := s.store.List(s.prefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -71,9 +72,10 @@ func (s *Discovery) Fetch() ([]*discovery.Entry, error) {
|
|||
|
||||
// Watch is exported
|
||||
func (s *Discovery) Watch(callback discovery.WatchCallback) {
|
||||
s.store.WatchRange(s.prefix, "", s.heartbeat, func(kvalues ...store.KVEntry) {
|
||||
s.store.WatchTree(s.prefix, func(kv ...*store.KVPair) {
|
||||
log.WithField("name", s.backend).Debug("Discovery watch triggered")
|
||||
// Traduce byte array entries to discovery.Entry
|
||||
entries, _ := discovery.CreateEntries(convertToStringArray(kvalues))
|
||||
entries, _ := discovery.CreateEntries(convertToStringArray(kv))
|
||||
callback(entries)
|
||||
})
|
||||
}
|
||||
|
@ -84,9 +86,9 @@ func (s *Discovery) Register(addr string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func convertToStringArray(entries []store.KVEntry) (addrs []string) {
|
||||
func convertToStringArray(entries []*store.KVPair) (addrs []string) {
|
||||
for _, entry := range entries {
|
||||
addrs = append(addrs, string(entry.Value()))
|
||||
addrs = append(addrs, string(entry.Value))
|
||||
}
|
||||
return addrs
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ type consulLock struct {
|
|||
// refresh interval
|
||||
type Watch struct {
|
||||
LastIndex uint64
|
||||
Interval time.Duration
|
||||
}
|
||||
|
||||
// InitializeConsul creates a new Consul client given
|
||||
|
@ -81,15 +80,15 @@ func (s *Consul) normalize(key string) string {
|
|||
|
||||
// Get the value at "key", returns the last modified index
|
||||
// to use in conjunction to CAS calls
|
||||
func (s *Consul) Get(key string) (value []byte, lastIndex uint64, err error) {
|
||||
func (s *Consul) Get(key string) (*KVPair, error) {
|
||||
pair, meta, err := s.client.KV().Get(s.normalize(key), nil)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
return nil, err
|
||||
}
|
||||
if pair == nil {
|
||||
return nil, 0, ErrKeyNotFound
|
||||
return nil, ErrKeyNotFound
|
||||
}
|
||||
return pair.Value, meta.LastIndex, nil
|
||||
return &KVPair{pair.Key, pair.Value, meta.LastIndex}, nil
|
||||
}
|
||||
|
||||
// Put a value at "key"
|
||||
|
@ -110,15 +109,15 @@ func (s *Consul) Delete(key string) error {
|
|||
|
||||
// Exists checks that the key exists inside the store
|
||||
func (s *Consul) Exists(key string) (bool, error) {
|
||||
_, _, err := s.Get(key)
|
||||
_, err := s.Get(key)
|
||||
if err != nil && err == ErrKeyNotFound {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// GetRange gets a range of values at "directory"
|
||||
func (s *Consul) GetRange(prefix string) (kvi []KVEntry, err error) {
|
||||
// List values at "directory"
|
||||
func (s *Consul) List(prefix string) ([]*KVPair, error) {
|
||||
pairs, _, err := s.client.KV().List(s.normalize(prefix), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -126,23 +125,24 @@ func (s *Consul) GetRange(prefix string) (kvi []KVEntry, err error) {
|
|||
if len(pairs) == 0 {
|
||||
return nil, ErrKeyNotFound
|
||||
}
|
||||
kv := []*KVPair{}
|
||||
for _, pair := range pairs {
|
||||
if pair.Key == prefix {
|
||||
continue
|
||||
}
|
||||
kvi = append(kvi, &kviTuple{pair.Key, pair.Value, pair.ModifyIndex})
|
||||
kv = append(kv, &KVPair{pair.Key, pair.Value, pair.ModifyIndex})
|
||||
}
|
||||
return kvi, nil
|
||||
return kv, nil
|
||||
}
|
||||
|
||||
// DeleteRange deletes a range of values at "directory"
|
||||
func (s *Consul) DeleteRange(prefix string) error {
|
||||
// DeleteTree deletes a range of values at "directory"
|
||||
func (s *Consul) DeleteTree(prefix string) error {
|
||||
_, err := s.client.KV().DeleteTree(s.normalize(prefix), nil)
|
||||
return err
|
||||
}
|
||||
|
||||
// Watch a single key for modifications
|
||||
func (s *Consul) Watch(key string, heartbeat time.Duration, callback WatchCallback) error {
|
||||
func (s *Consul) Watch(key string, callback WatchCallback) error {
|
||||
fkey := s.normalize(key)
|
||||
|
||||
// We get the last index first
|
||||
|
@ -152,18 +152,17 @@ func (s *Consul) Watch(key string, heartbeat time.Duration, callback WatchCallba
|
|||
}
|
||||
|
||||
// Add watch to map
|
||||
s.watches[fkey] = &Watch{LastIndex: meta.LastIndex, Interval: heartbeat}
|
||||
s.watches[fkey] = &Watch{LastIndex: meta.LastIndex}
|
||||
eventChan := s.waitForChange(fkey)
|
||||
|
||||
for _ = range eventChan {
|
||||
log.WithField("name", "consul").Debug("Key watch triggered")
|
||||
entry, index, err := s.Get(key)
|
||||
entry, err := s.Get(key)
|
||||
if err != nil {
|
||||
log.Error("Cannot refresh the key: ", fkey, ", cancelling watch")
|
||||
s.watches[fkey] = nil
|
||||
return err
|
||||
}
|
||||
callback(&kviTuple{key, entry, index})
|
||||
callback(entry)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -194,11 +193,10 @@ func (s *Consul) waitForChange(key string) <-chan uint64 {
|
|||
}
|
||||
option := &api.QueryOptions{
|
||||
WaitIndex: watch.LastIndex,
|
||||
WaitTime: watch.Interval,
|
||||
}
|
||||
_, meta, err := kv.List(key, option)
|
||||
if err != nil {
|
||||
log.WithField("name", "consul").Errorf("Discovery error: %v", err)
|
||||
log.WithField("name", "consul").Error(err)
|
||||
break
|
||||
}
|
||||
watch.LastIndex = meta.LastIndex
|
||||
|
@ -209,8 +207,8 @@ func (s *Consul) waitForChange(key string) <-chan uint64 {
|
|||
return ch
|
||||
}
|
||||
|
||||
// WatchRange triggers a watch on a range of values at "directory"
|
||||
func (s *Consul) WatchRange(prefix string, filter string, heartbeat time.Duration, callback WatchCallback) error {
|
||||
// WatchTree triggers a watch on a range of values at "directory"
|
||||
func (s *Consul) WatchTree(prefix string, callback WatchCallback) error {
|
||||
fprefix := s.normalize(prefix)
|
||||
|
||||
// We get the last index first
|
||||
|
@ -220,12 +218,11 @@ func (s *Consul) WatchRange(prefix string, filter string, heartbeat time.Duratio
|
|||
}
|
||||
|
||||
// Add watch to map
|
||||
s.watches[fprefix] = &Watch{LastIndex: meta.LastIndex, Interval: heartbeat}
|
||||
s.watches[fprefix] = &Watch{LastIndex: meta.LastIndex}
|
||||
eventChan := s.waitForChange(fprefix)
|
||||
|
||||
for _ = range eventChan {
|
||||
log.WithField("name", "consul").Debug("Key watch triggered")
|
||||
kvi, err := s.GetRange(prefix)
|
||||
kvi, err := s.List(prefix)
|
||||
if err != nil {
|
||||
log.Error("Cannot refresh keys with prefix: ", fprefix, ", cancelling watch")
|
||||
s.watches[fprefix] = nil
|
||||
|
@ -270,8 +267,8 @@ func (l *consulLock) Unlock() error {
|
|||
|
||||
// AtomicPut put a value at "key" if the key has not been
|
||||
// modified in the meantime, throws an error if this is the case
|
||||
func (s *Consul) AtomicPut(key string, _ []byte, newValue []byte, index uint64) (bool, error) {
|
||||
p := &api.KVPair{Key: s.normalize(key), Value: newValue, ModifyIndex: index}
|
||||
func (s *Consul) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) {
|
||||
p := &api.KVPair{Key: s.normalize(key), Value: value, ModifyIndex: previous.LastIndex}
|
||||
if work, _, err := s.client.KV().CAS(p, nil); err != nil {
|
||||
return false, err
|
||||
} else if !work {
|
||||
|
@ -282,8 +279,8 @@ func (s *Consul) AtomicPut(key string, _ []byte, newValue []byte, index uint64)
|
|||
|
||||
// AtomicDelete deletes a value at "key" if the key has not
|
||||
// been modified in the meantime, throws an error if this is the case
|
||||
func (s *Consul) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) {
|
||||
p := &api.KVPair{Key: s.normalize(key), ModifyIndex: index}
|
||||
func (s *Consul) AtomicDelete(key string, previous *KVPair) (bool, error) {
|
||||
p := &api.KVPair{Key: s.normalize(key), ModifyIndex: previous.LastIndex}
|
||||
if work, _, err := s.client.KV().DeleteCAS(p, nil); err != nil {
|
||||
return false, err
|
||||
} else if !work {
|
||||
|
|
|
@ -84,18 +84,18 @@ func (s *Etcd) createDirectory(path string) error {
|
|||
|
||||
// Get the value at "key", returns the last modified index
|
||||
// to use in conjunction to CAS calls
|
||||
func (s *Etcd) Get(key string) (value []byte, lastIndex uint64, err error) {
|
||||
func (s *Etcd) Get(key string) (*KVPair, error) {
|
||||
result, err := s.client.Get(normalize(key), false, false)
|
||||
if err != nil {
|
||||
if etcdError, ok := err.(*etcd.EtcdError); ok {
|
||||
// Not a Directory or Not a file
|
||||
if etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 {
|
||||
return nil, 0, ErrKeyNotFound
|
||||
return nil, ErrKeyNotFound
|
||||
}
|
||||
}
|
||||
return nil, 0, err
|
||||
return nil, err
|
||||
}
|
||||
return []byte(result.Node.Value), result.Node.ModifiedIndex, nil
|
||||
return &KVPair{result.Node.Key, []byte(result.Node.Value), result.Node.ModifiedIndex}, nil
|
||||
}
|
||||
|
||||
// Put a value at "key"
|
||||
|
@ -125,9 +125,9 @@ func (s *Etcd) Delete(key string) error {
|
|||
|
||||
// Exists checks if the key exists inside the store
|
||||
func (s *Etcd) Exists(key string) (bool, error) {
|
||||
value, _, err := s.Get(key)
|
||||
entry, err := s.Get(key)
|
||||
if err != nil {
|
||||
if err == ErrKeyNotFound || value == nil {
|
||||
if err == ErrKeyNotFound || entry.Value == nil {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
|
@ -136,7 +136,7 @@ func (s *Etcd) Exists(key string) (bool, error) {
|
|||
}
|
||||
|
||||
// Watch a single key for modifications
|
||||
func (s *Etcd) Watch(key string, _ time.Duration, callback WatchCallback) error {
|
||||
func (s *Etcd) Watch(key string, callback WatchCallback) error {
|
||||
key = normalize(key)
|
||||
watchChan := make(chan *etcd.Response)
|
||||
stopChan := make(chan bool)
|
||||
|
@ -148,14 +148,13 @@ func (s *Etcd) Watch(key string, _ time.Duration, callback WatchCallback) error
|
|||
go s.client.Watch(key, 0, false, watchChan, stopChan)
|
||||
|
||||
for _ = range watchChan {
|
||||
log.WithField("name", "etcd").Debug("Discovery watch triggered")
|
||||
entry, index, err := s.Get(key)
|
||||
entry, err := s.Get(key)
|
||||
if err != nil {
|
||||
log.Error("Cannot refresh the key: ", key, ", cancelling watch")
|
||||
s.watches[key] = nil
|
||||
return err
|
||||
}
|
||||
callback(&kviTuple{key, entry, index})
|
||||
callback(entry)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -176,56 +175,47 @@ func (s *Etcd) CancelWatch(key string) error {
|
|||
|
||||
// AtomicPut put a value at "key" if the key has not been
|
||||
// modified in the meantime, throws an error if this is the case
|
||||
func (s *Etcd) AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) {
|
||||
resp, err := s.client.CompareAndSwap(normalize(key), string(newValue), 5, string(oldValue), 0)
|
||||
func (s *Etcd) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) {
|
||||
_, err := s.client.CompareAndSwap(normalize(key), string(value), 0, "", previous.LastIndex)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !(resp.Node.Value == string(newValue) && resp.Node.Key == key && resp.Node.TTL == 5) {
|
||||
return false, ErrKeyModified
|
||||
}
|
||||
if !(resp.PrevNode.Value == string(newValue) && resp.PrevNode.Key == key && resp.PrevNode.TTL == 5) {
|
||||
return false, ErrKeyModified
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// AtomicDelete deletes a value at "key" if the key has not
|
||||
// been modified in the meantime, throws an error if this is the case
|
||||
func (s *Etcd) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) {
|
||||
resp, err := s.client.CompareAndDelete(normalize(key), string(oldValue), 0)
|
||||
func (s *Etcd) AtomicDelete(key string, previous *KVPair) (bool, error) {
|
||||
_, err := s.client.CompareAndDelete(normalize(key), "", previous.LastIndex)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !(resp.PrevNode.Value == string(oldValue) && resp.PrevNode.Key == key && resp.PrevNode.TTL == 5) {
|
||||
return false, ErrKeyModified
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// GetRange gets a range of values at "directory"
|
||||
func (s *Etcd) GetRange(prefix string) ([]KVEntry, error) {
|
||||
// List a range of values at "directory"
|
||||
func (s *Etcd) List(prefix string) ([]*KVPair, error) {
|
||||
resp, err := s.client.Get(normalize(prefix), true, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
kvi := make([]KVEntry, len(resp.Node.Nodes))
|
||||
for i, n := range resp.Node.Nodes {
|
||||
kvi[i] = &kviTuple{n.Key, []byte(n.Value), n.ModifiedIndex}
|
||||
kv := []*KVPair{}
|
||||
for _, n := range resp.Node.Nodes {
|
||||
kv = append(kv, &KVPair{n.Key, []byte(n.Value), n.ModifiedIndex})
|
||||
}
|
||||
return kvi, nil
|
||||
return kv, nil
|
||||
}
|
||||
|
||||
// DeleteRange deletes a range of values at "directory"
|
||||
func (s *Etcd) DeleteRange(prefix string) error {
|
||||
// DeleteTree deletes a range of values at "directory"
|
||||
func (s *Etcd) DeleteTree(prefix string) error {
|
||||
if _, err := s.client.Delete(normalize(prefix), true); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WatchRange triggers a watch on a range of values at "directory"
|
||||
func (s *Etcd) WatchRange(prefix string, filter string, _ time.Duration, callback WatchCallback) error {
|
||||
// WatchTree triggers a watch on a range of values at "directory"
|
||||
func (s *Etcd) WatchTree(prefix string, callback WatchCallback) error {
|
||||
prefix = normalize(prefix)
|
||||
watchChan := make(chan *etcd.Response)
|
||||
stopChan := make(chan bool)
|
||||
|
@ -236,8 +226,7 @@ func (s *Etcd) WatchRange(prefix string, filter string, _ time.Duration, callbac
|
|||
// Start watch
|
||||
go s.client.Watch(prefix, 0, true, watchChan, stopChan)
|
||||
for _ = range watchChan {
|
||||
log.WithField("name", "etcd").Debug("Discovery watch triggered")
|
||||
kvi, err := s.GetRange(prefix)
|
||||
kvi, err := s.List(prefix)
|
||||
if err != nil {
|
||||
log.Error("Cannot refresh the key: ", prefix, ", cancelling watch")
|
||||
s.watches[prefix] = nil
|
||||
|
|
|
@ -1,17 +1,47 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
||||
// WatchCallback is used for watch methods on keys
|
||||
// and is triggered on key change
|
||||
type WatchCallback func(kviTuple ...KVEntry)
|
||||
// Backend represents a KV Store Backend
|
||||
type Backend string
|
||||
|
||||
// Initialize creates a new Store object, initializing the client
|
||||
type Initialize func(addrs []string, options *Config) (Store, error)
|
||||
const (
|
||||
// CONSUL backend
|
||||
CONSUL Backend = "consul"
|
||||
// ETCD backend
|
||||
ETCD = "etcd"
|
||||
// ZK backend
|
||||
ZK = "zk"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrNotSupported is exported
|
||||
ErrNotSupported = errors.New("Backend storage not supported yet, please choose another one")
|
||||
// ErrNotImplemented is exported
|
||||
ErrNotImplemented = errors.New("Call not implemented in current backend")
|
||||
// ErrNotReachable is exported
|
||||
ErrNotReachable = errors.New("Api not reachable")
|
||||
// ErrCannotLock is exported
|
||||
ErrCannotLock = errors.New("Error acquiring the lock")
|
||||
// ErrWatchDoesNotExist is exported
|
||||
ErrWatchDoesNotExist = errors.New("No watch found for specified key")
|
||||
// ErrKeyModified is exported
|
||||
ErrKeyModified = errors.New("Unable to complete atomic operation, key modified")
|
||||
// ErrKeyNotFound is exported
|
||||
ErrKeyNotFound = errors.New("Key not found in store")
|
||||
)
|
||||
|
||||
// Config contains the options for a storage client
|
||||
type Config struct {
|
||||
TLS *tls.Config
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
// Store represents the backend K/V storage
|
||||
// Each store should support every call listed
|
||||
|
@ -22,7 +52,7 @@ type Store interface {
|
|||
Put(key string, value []byte) error
|
||||
|
||||
// Get a value given its key
|
||||
Get(key string) (value []byte, lastIndex uint64, err error)
|
||||
Get(key string) (*KVPair, error)
|
||||
|
||||
// Delete the value at the specified key
|
||||
Delete(key string) error
|
||||
|
@ -31,7 +61,7 @@ type Store interface {
|
|||
Exists(key string) (bool, error)
|
||||
|
||||
// Watch changes on a key
|
||||
Watch(key string, heartbeat time.Duration, callback WatchCallback) error
|
||||
Watch(key string, callback WatchCallback) error
|
||||
|
||||
// Cancel watch key
|
||||
CancelWatch(key string) error
|
||||
|
@ -42,31 +72,35 @@ type Store interface {
|
|||
CreateLock(key string, value []byte) (Locker, error)
|
||||
|
||||
// Get range of keys based on prefix
|
||||
GetRange(prefix string) ([]KVEntry, error)
|
||||
List(prefix string) ([]*KVPair, error)
|
||||
|
||||
// Delete range of keys based on prefix
|
||||
DeleteRange(prefix string) error
|
||||
DeleteTree(prefix string) error
|
||||
|
||||
// Watch key namespaces
|
||||
WatchRange(prefix string, filter string, heartbeat time.Duration, callback WatchCallback) error
|
||||
WatchTree(prefix string, callback WatchCallback) error
|
||||
|
||||
// Cancel watch key range
|
||||
CancelWatchRange(prefix string) error
|
||||
|
||||
// Atomic operation on a single value
|
||||
AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error)
|
||||
AtomicPut(key string, value []byte, previous *KVPair) (bool, error)
|
||||
|
||||
// Atomic delete of a single value
|
||||
AtomicDelete(key string, oldValue []byte, index uint64) (bool, error)
|
||||
AtomicDelete(key string, previous *KVPair) (bool, error)
|
||||
}
|
||||
|
||||
// KVEntry represents {Key, Value, Lastindex} tuple
|
||||
type KVEntry interface {
|
||||
Key() string
|
||||
Value() []byte
|
||||
LastIndex() uint64
|
||||
// KVPair represents {Key, Value, Lastindex} tuple
|
||||
type KVPair struct {
|
||||
Key string
|
||||
Value []byte
|
||||
LastIndex uint64
|
||||
}
|
||||
|
||||
// WatchCallback is used for watch methods on keys
|
||||
// and is triggered on key change
|
||||
type WatchCallback func(entries ...*KVPair)
|
||||
|
||||
// Locker provides locking mechanism on top of the store.
|
||||
// Similar to `sync.Lock` except it may return errors.
|
||||
type Locker interface {
|
||||
|
@ -74,23 +108,22 @@ type Locker interface {
|
|||
Unlock() error
|
||||
}
|
||||
|
||||
// Initialize creates a new Store object, initializing the client
|
||||
type Initialize func(addrs []string, options *Config) (Store, error)
|
||||
|
||||
var (
|
||||
// List of Store services
|
||||
stores map[string]Initialize
|
||||
// Backend initializers
|
||||
initializers = map[Backend]Initialize{
|
||||
CONSUL: InitializeConsul,
|
||||
ETCD: InitializeEtcd,
|
||||
ZK: InitializeZookeeper,
|
||||
}
|
||||
)
|
||||
|
||||
func init() {
|
||||
stores = make(map[string]Initialize)
|
||||
stores["consul"] = InitializeConsul
|
||||
stores["etcd"] = InitializeEtcd
|
||||
stores["zk"] = InitializeZookeeper
|
||||
}
|
||||
|
||||
// CreateStore creates a an instance of store
|
||||
func CreateStore(store string, addrs []string, options *Config) (Store, error) {
|
||||
|
||||
if init, exists := stores[store]; exists {
|
||||
log.WithFields(log.Fields{"store": store}).Debug("Initializing store service")
|
||||
func CreateStore(backend Backend, addrs []string, options *Config) (Store, error) {
|
||||
if init, exists := initializers[backend]; exists {
|
||||
log.WithFields(log.Fields{"backend": backend}).Debug("Initializing store service")
|
||||
return init(addrs, options)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,60 +0,0 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrNotSupported is exported
|
||||
ErrNotSupported = errors.New("Backend storage not supported yet, please choose another one")
|
||||
// ErrNotImplemented is exported
|
||||
ErrNotImplemented = errors.New("Call not implemented in current backend")
|
||||
// ErrNotReachable is exported
|
||||
ErrNotReachable = errors.New("Api not reachable")
|
||||
// ErrCannotLock is exported
|
||||
ErrCannotLock = errors.New("Error acquiring the lock")
|
||||
// ErrWatchDoesNotExist is exported
|
||||
ErrWatchDoesNotExist = errors.New("No watch found for specified key")
|
||||
// ErrKeyModified is exported
|
||||
ErrKeyModified = errors.New("Unable to complete atomic operation, key modified")
|
||||
// ErrKeyNotFound is exported
|
||||
ErrKeyNotFound = errors.New("Key not found in store")
|
||||
)
|
||||
|
||||
// KV represents the different supported K/V
|
||||
type KV string
|
||||
|
||||
const (
|
||||
// CONSUL is exported
|
||||
CONSUL KV = "consul"
|
||||
// ETCD is exported
|
||||
ETCD = "etcd"
|
||||
// ZOOKEEPER is exported
|
||||
ZOOKEEPER = "zookeeper"
|
||||
)
|
||||
|
||||
// Config contains the options for a storage client
|
||||
type Config struct {
|
||||
TLS *tls.Config
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
type kviTuple struct {
|
||||
key string
|
||||
value []byte
|
||||
lastIndex uint64
|
||||
}
|
||||
|
||||
func (kvi *kviTuple) Key() string {
|
||||
return kvi.key
|
||||
}
|
||||
|
||||
func (kvi *kviTuple) Value() []byte {
|
||||
return kvi.value
|
||||
}
|
||||
|
||||
func (kvi *kviTuple) LastIndex() uint64 {
|
||||
return kvi.lastIndex
|
||||
}
|
|
@ -47,15 +47,15 @@ func (s *Zookeeper) setTimeout(time time.Duration) {
|
|||
|
||||
// Get the value at "key", returns the last modified index
|
||||
// to use in conjunction to CAS calls
|
||||
func (s *Zookeeper) Get(key string) (value []byte, lastIndex uint64, err error) {
|
||||
func (s *Zookeeper) Get(key string) (*KVPair, error) {
|
||||
resp, meta, err := s.client.Get(normalize(key))
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
return nil, err
|
||||
}
|
||||
if resp == nil {
|
||||
return nil, 0, ErrKeyNotFound
|
||||
return nil, ErrKeyNotFound
|
||||
}
|
||||
return resp, uint64(meta.Mzxid), nil
|
||||
return &KVPair{key, resp, uint64(meta.Mzxid)}, nil
|
||||
}
|
||||
|
||||
// Create the entire path for a directory that does not exist
|
||||
|
@ -103,7 +103,7 @@ func (s *Zookeeper) Exists(key string) (bool, error) {
|
|||
}
|
||||
|
||||
// Watch a single key for modifications
|
||||
func (s *Zookeeper) Watch(key string, _ time.Duration, callback WatchCallback) error {
|
||||
func (s *Zookeeper) Watch(key string, callback WatchCallback) error {
|
||||
fkey := normalize(key)
|
||||
_, _, eventChan, err := s.client.GetW(fkey)
|
||||
if err != nil {
|
||||
|
@ -115,10 +115,9 @@ func (s *Zookeeper) Watch(key string, _ time.Duration, callback WatchCallback) e
|
|||
|
||||
for e := range eventChan {
|
||||
if e.Type == zk.EventNodeChildrenChanged {
|
||||
log.WithField("name", "zk").Debug("Discovery watch triggered")
|
||||
entry, index, err := s.Get(key)
|
||||
entry, err := s.Get(key)
|
||||
if err == nil {
|
||||
callback(&kviTuple{key, []byte(entry), index})
|
||||
callback(entry)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -139,28 +138,29 @@ func (s *Zookeeper) CancelWatch(key string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// GetRange gets a range of values at "directory"
|
||||
func (s *Zookeeper) GetRange(prefix string) (kvi []KVEntry, err error) {
|
||||
// List a range of values at "directory"
|
||||
func (s *Zookeeper) List(prefix string) ([]*KVPair, error) {
|
||||
prefix = normalize(prefix)
|
||||
entries, stat, err := s.client.Children(prefix)
|
||||
if err != nil {
|
||||
log.Error("Cannot fetch range of keys beginning with prefix: ", prefix)
|
||||
return nil, err
|
||||
}
|
||||
kv := []*KVPair{}
|
||||
for _, item := range entries {
|
||||
kvi = append(kvi, &kviTuple{prefix, []byte(item), uint64(stat.Mzxid)})
|
||||
kv = append(kv, &KVPair{prefix, []byte(item), uint64(stat.Mzxid)})
|
||||
}
|
||||
return kvi, err
|
||||
return kv, err
|
||||
}
|
||||
|
||||
// DeleteRange deletes a range of values at "directory"
|
||||
func (s *Zookeeper) DeleteRange(prefix string) error {
|
||||
// DeleteTree deletes a range of values at "directory"
|
||||
func (s *Zookeeper) DeleteTree(prefix string) error {
|
||||
err := s.client.Delete(normalize(prefix), -1)
|
||||
return err
|
||||
}
|
||||
|
||||
// WatchRange triggers a watch on a range of values at "directory"
|
||||
func (s *Zookeeper) WatchRange(prefix string, filter string, _ time.Duration, callback WatchCallback) error {
|
||||
// WatchTree triggers a watch on a range of values at "directory"
|
||||
func (s *Zookeeper) WatchTree(prefix string, callback WatchCallback) error {
|
||||
fprefix := normalize(prefix)
|
||||
_, _, eventChan, err := s.client.ChildrenW(fprefix)
|
||||
if err != nil {
|
||||
|
@ -172,8 +172,7 @@ func (s *Zookeeper) WatchRange(prefix string, filter string, _ time.Duration, ca
|
|||
|
||||
for e := range eventChan {
|
||||
if e.Type == zk.EventNodeChildrenChanged {
|
||||
log.WithField("name", "zk").Debug("Discovery watch triggered")
|
||||
kvi, err := s.GetRange(prefix)
|
||||
kvi, err := s.List(prefix)
|
||||
if err == nil {
|
||||
callback(kvi...)
|
||||
}
|
||||
|
@ -191,14 +190,14 @@ func (s *Zookeeper) CancelWatchRange(prefix string) error {
|
|||
|
||||
// AtomicPut put a value at "key" if the key has not been
|
||||
// modified in the meantime, throws an error if this is the case
|
||||
func (s *Zookeeper) AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) {
|
||||
func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) {
|
||||
// Use index of Set method to implement CAS
|
||||
return false, ErrNotImplemented
|
||||
}
|
||||
|
||||
// AtomicDelete deletes a value at "key" if the key has not
|
||||
// been modified in the meantime, throws an error if this is the case
|
||||
func (s *Zookeeper) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) {
|
||||
func (s *Zookeeper) AtomicDelete(key string, previous *KVPair) (bool, error) {
|
||||
return false, ErrNotImplemented
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue