Merge pull request #756 from abronan/store_integration

Change existing discovery backends to 'kv' using metatada storage backends in a new 'store' package
This commit is contained in:
Victor Vieux 2015-05-12 15:15:38 -07:00
commit 4bc58b1150
17 changed files with 1150 additions and 418 deletions

View File

@ -82,7 +82,8 @@ func Run() {
log.Fatalf("discovery required to list a cluster. See '%s list --help'.", c.App.Name) log.Fatalf("discovery required to list a cluster. See '%s list --help'.", c.App.Name)
} }
d, err := discovery.New(dflag, 0) // FIXME Add and use separate timeout flag instead of forcing it
d, err := discovery.New(dflag, 10)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@ -1,115 +0,0 @@
package consul
import (
"fmt"
"path"
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/discovery"
consul "github.com/hashicorp/consul/api"
)
// Discovery is exported
type Discovery struct {
heartbeat time.Duration
client *consul.Client
prefix string
lastIndex uint64
}
func init() {
discovery.Register("consul", &Discovery{})
}
// Initialize is exported
func (s *Discovery) Initialize(uris string, heartbeat uint64) error {
parts := strings.SplitN(uris, "/", 2)
if len(parts) < 2 {
return fmt.Errorf("invalid format %q, missing <path>", uris)
}
addr := parts[0]
path := parts[1]
config := consul.DefaultConfig()
config.Address = addr
client, err := consul.NewClient(config)
if err != nil {
return err
}
s.client = client
s.heartbeat = time.Duration(heartbeat) * time.Second
s.prefix = path + "/"
kv := s.client.KV()
p := &consul.KVPair{Key: s.prefix, Value: nil}
if _, err = kv.Put(p, nil); err != nil {
return err
}
_, meta, err := kv.Get(s.prefix, nil)
if err != nil {
return err
}
s.lastIndex = meta.LastIndex
return nil
}
// Fetch is exported
func (s *Discovery) Fetch() ([]*discovery.Entry, error) {
kv := s.client.KV()
pairs, _, err := kv.List(s.prefix, nil)
if err != nil {
return nil, err
}
addrs := []string{}
for _, pair := range pairs {
if pair.Key == s.prefix {
continue
}
addrs = append(addrs, string(pair.Value))
}
return discovery.CreateEntries(addrs)
}
// Watch is exported
func (s *Discovery) Watch(callback discovery.WatchCallback) {
for _ = range s.waitForChange() {
log.WithField("name", "consul").Debug("Discovery watch triggered")
entries, err := s.Fetch()
if err == nil {
callback(entries)
}
}
}
// Register is exported
func (s *Discovery) Register(addr string) error {
kv := s.client.KV()
p := &consul.KVPair{Key: path.Join(s.prefix, addr), Value: []byte(addr)}
_, err := kv.Put(p, nil)
return err
}
func (s *Discovery) waitForChange() <-chan uint64 {
c := make(chan uint64)
go func() {
for {
kv := s.client.KV()
option := &consul.QueryOptions{
WaitIndex: s.lastIndex,
WaitTime: s.heartbeat}
_, meta, err := kv.List(s.prefix, option)
if err != nil {
log.WithField("name", "consul").Errorf("Discovery error: %v", err)
break
}
s.lastIndex = meta.LastIndex
c <- s.lastIndex
}
close(c)
}()
return c
}

View File

@ -1,20 +0,0 @@
package consul
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestInitialize(t *testing.T) {
discovery := &Discovery{}
assert.Equal(t, discovery.Initialize("127.0.0.1", 0).Error(), "invalid format \"127.0.0.1\", missing <path>")
assert.Error(t, discovery.Initialize("127.0.0.1/path", 0))
assert.Equal(t, discovery.prefix, "path/")
assert.Error(t, discovery.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0))
assert.Equal(t, discovery.prefix, "path/")
}

View File

@ -1,89 +0,0 @@
package etcd
import (
"fmt"
"path"
"strings"
log "github.com/Sirupsen/logrus"
"github.com/coreos/go-etcd/etcd"
"github.com/docker/swarm/discovery"
)
// Discovery is exported
type Discovery struct {
ttl uint64
client *etcd.Client
path string
}
func init() {
discovery.Register("etcd", &Discovery{})
}
// Initialize is exported
func (s *Discovery) Initialize(uris string, heartbeat uint64) error {
var (
// split here because uris can contain multiples ips
// like `etcd://192.168.0.1,192.168.0.2,192.168.0.3/path`
parts = strings.SplitN(uris, "/", 2)
ips = strings.Split(parts[0], ",")
entries []string
)
if len(parts) != 2 {
return fmt.Errorf("invalid format %q, missing <path>", uris)
}
for _, ip := range ips {
entries = append(entries, "http://"+ip)
}
s.client = etcd.NewClient(entries)
// ttl should always be > heartbeat, even if heartbeat = 1 or 0
s.ttl = uint64(heartbeat*3/2) + 1
s.path = "/" + parts[1] + "/"
if _, err := s.client.CreateDir(s.path, s.ttl); err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
if etcdError.ErrorCode != 105 { // skip key already exists
return err
}
} else {
return err
}
}
return nil
}
// Fetch is exported
func (s *Discovery) Fetch() ([]*discovery.Entry, error) {
resp, err := s.client.Get(s.path, true, true)
if err != nil {
return nil, err
}
addrs := []string{}
for _, n := range resp.Node.Nodes {
addrs = append(addrs, n.Value)
}
return discovery.CreateEntries(addrs)
}
// Watch is exported
func (s *Discovery) Watch(callback discovery.WatchCallback) {
watchChan := make(chan *etcd.Response)
go s.client.Watch(s.path, 0, true, watchChan, nil)
for _ = range watchChan {
log.WithField("name", "etcd").Debug("Discovery watch triggered")
entries, err := s.Fetch()
if err == nil {
callback(entries)
}
}
}
// Register is exported
func (s *Discovery) Register(addr string) error {
_, err := s.client.Set(path.Join(s.path, addr), addr, s.ttl)
return err
}

View File

@ -1,19 +0,0 @@
package etcd
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestInitialize(t *testing.T) {
discovery := &Discovery{}
assert.Equal(t, discovery.Initialize("127.0.0.1", 0).Error(), "invalid format \"127.0.0.1\", missing <path>")
assert.Error(t, discovery.Initialize("127.0.0.1/path", 0))
assert.Equal(t, discovery.path, "/path/")
assert.Error(t, discovery.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0))
assert.Equal(t, discovery.path, "/path/")
}

92
discovery/kv/kv.go Normal file
View File

@ -0,0 +1,92 @@
package kv
import (
"fmt"
"path"
"strings"
"time"
"github.com/docker/swarm/discovery"
"github.com/docker/swarm/pkg/store"
)
// Discovery is exported
type Discovery struct {
store store.Store
name string
heartbeat time.Duration
prefix string
}
func init() {
discovery.Register("zk", &Discovery{name: "zk"})
discovery.Register("consul", &Discovery{name: "consul"})
discovery.Register("etcd", &Discovery{name: "etcd"})
}
// Initialize is exported
func (s *Discovery) Initialize(uris string, heartbeat uint64) error {
var (
parts = strings.SplitN(uris, "/", 2)
ips = strings.Split(parts[0], ",")
addrs []string
err error
)
if len(parts) != 2 {
return fmt.Errorf("invalid format %q, missing <path>", uris)
}
for _, ip := range ips {
addrs = append(addrs, ip)
}
s.heartbeat = time.Duration(heartbeat) * time.Second
s.prefix = parts[1]
// Creates a new store, will ignore options given
// if not supported by the chosen store
s.store, err = store.CreateStore(
s.name, // name of the store
addrs,
store.Config{
Timeout: s.heartbeat,
},
)
if err != nil {
return err
}
return nil
}
// Fetch is exported
func (s *Discovery) Fetch() ([]*discovery.Entry, error) {
addrs, err := s.store.GetRange(s.prefix)
if err != nil {
return nil, err
}
return discovery.CreateEntries(convertToStringArray(addrs))
}
// Watch is exported
func (s *Discovery) Watch(callback discovery.WatchCallback) {
s.store.WatchRange(s.prefix, "", s.heartbeat, func(kvalues [][]byte) {
// Traduce byte array entries to discovery.Entry
entries, _ := discovery.CreateEntries(convertToStringArray(kvalues))
callback(entries)
})
}
// Register is exported
func (s *Discovery) Register(addr string) error {
err := s.store.Put(path.Join(s.prefix, addr), []byte(addr))
return err
}
func convertToStringArray(entries [][]byte) (addrs []string) {
for _, entry := range entries {
addrs = append(addrs, string(entry))
}
return addrs
}

20
discovery/kv/kv_test.go Normal file
View File

@ -0,0 +1,20 @@
package kv
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestInitialize(t *testing.T) {
discoveryService := &Discovery{}
assert.Equal(t, discoveryService.Initialize("127.0.0.1", 0).Error(), "invalid format \"127.0.0.1\", missing <path>")
assert.Error(t, discoveryService.Initialize("127.0.0.1/path", 0))
assert.Equal(t, discoveryService.prefix, "path")
assert.Error(t, discoveryService.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0))
assert.Equal(t, discoveryService.prefix, "path")
}

View File

@ -1,149 +0,0 @@
package zookeeper
import (
"fmt"
"path"
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/discovery"
"github.com/samuel/go-zookeeper/zk"
)
// Discovery is exported
type Discovery struct {
conn *zk.Conn
path []string
heartbeat uint64
}
func init() {
discovery.Register("zk", &Discovery{})
}
func (s *Discovery) fullpath() string {
return "/" + strings.Join(s.path, "/")
}
func (s *Discovery) createFullpath() error {
for i := 1; i <= len(s.path); i++ {
newpath := "/" + strings.Join(s.path[:i], "/")
_, err := s.conn.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll))
if err != nil {
// It's OK if key already existed. Just skip.
if err != zk.ErrNodeExists {
return err
}
}
}
return nil
}
// Initialize is exported
func (s *Discovery) Initialize(uris string, heartbeat uint64) error {
var (
// split here because uris can contain multiples ips
// like `zk://192.168.0.1,192.168.0.2,192.168.0.3/path`
parts = strings.SplitN(uris, "/", 2)
ips = strings.Split(parts[0], ",")
)
if len(parts) != 2 {
return fmt.Errorf("invalid format %q, missing <path>", uris)
}
if strings.Contains(parts[1], "/") {
s.path = strings.Split(parts[1], "/")
} else {
s.path = []string{parts[1]}
}
conn, _, err := zk.Connect(ips, time.Second)
if err != nil {
return err
}
s.conn = conn
s.heartbeat = heartbeat
err = s.createFullpath()
if err != nil {
return err
}
return nil
}
// Fetch is exported
func (s *Discovery) Fetch() ([]*discovery.Entry, error) {
addrs, _, err := s.conn.Children(s.fullpath())
if err != nil {
return nil, err
}
return discovery.CreateEntries(addrs)
}
// Watch is exported
func (s *Discovery) Watch(callback discovery.WatchCallback) {
addrs, _, eventChan, err := s.conn.ChildrenW(s.fullpath())
if err != nil {
log.WithField("name", "zk").Debug("Discovery watch aborted")
return
}
entries, err := discovery.CreateEntries(addrs)
if err == nil {
callback(entries)
}
for e := range eventChan {
if e.Type == zk.EventNodeChildrenChanged {
log.WithField("name", "zk").Debug("Discovery watch triggered")
entries, err := s.Fetch()
if err == nil {
callback(entries)
}
}
}
}
// Register is exported
func (s *Discovery) Register(addr string) error {
nodePath := path.Join(s.fullpath(), addr)
// check existing for the parent path first
exist, _, err := s.conn.Exists(s.fullpath())
if err != nil {
return err
}
// if the parent path does not exist yet
if exist == false {
// create the parent first
err = s.createFullpath()
if err != nil {
return err
}
} else {
// if node path exists
exist, _, err = s.conn.Exists(nodePath)
if err != nil {
return err
}
// delete it first
if exist {
err = s.conn.Delete(nodePath, -1)
if err != nil {
return err
}
}
}
// create the node path to store address information
_, err = s.conn.Create(nodePath, []byte(addr), 0, zk.WorldACL(zk.PermAll))
return err
}

View File

@ -1,22 +0,0 @@
package zookeeper
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestInitialize(t *testing.T) {
service := &Discovery{}
assert.Equal(t, service.Initialize("127.0.0.1", 0).Error(), "invalid format \"127.0.0.1\", missing <path>")
assert.Error(t, service.Initialize("127.0.0.1/path", 0))
assert.Equal(t, service.fullpath(), "/path")
assert.Error(t, service.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0))
assert.Equal(t, service.fullpath(), "/path")
assert.Error(t, service.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path/sub1/sub2", 0))
assert.Equal(t, service.fullpath(), "/path/sub1/sub2")
}

View File

@ -1,12 +1,10 @@
package main package main
import ( import (
_ "github.com/docker/swarm/discovery/consul"
_ "github.com/docker/swarm/discovery/etcd"
_ "github.com/docker/swarm/discovery/file" _ "github.com/docker/swarm/discovery/file"
_ "github.com/docker/swarm/discovery/kv"
_ "github.com/docker/swarm/discovery/nodes" _ "github.com/docker/swarm/discovery/nodes"
_ "github.com/docker/swarm/discovery/token" _ "github.com/docker/swarm/discovery/token"
_ "github.com/docker/swarm/discovery/zookeeper"
"github.com/docker/swarm/cli" "github.com/docker/swarm/cli"
) )

79
pkg/store/README.md Normal file
View File

@ -0,0 +1,79 @@
# Storage
This package is used by the discovery service to register machines inside the cluster. It is also used to store cluster's metadata.
## Example of usage
### Create a new store and use Put/Get
```go
package main
import (
"fmt"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/store"
)
func main() {
var (
client = "localhost:8500"
)
// Initialize a new store with consul
kv, err := store.CreateStore(
store.Consul,
[]string{client},
store.Config{
Timeout: 10*time.Second
},
)
if err != nil {
log.Error("Cannot create store consul")
}
key := "foo"
err = kv.Put(key, []byte("bar"))
if err != nil {
log.Error("Error trying to put value at key `", key, "`")
}
value, _, err := kv.Get(key)
if err != nil {
log.Error("Error trying accessing value at key `", key, "`")
}
log.Info("value: ", string(value))
}
```
## Contributing to a new storage backend
A new **storage backend** should include those calls:
```go
type Store interface {
Put(key string, value []byte) error
Get(key string) (value []byte, lastIndex uint64, err error)
Delete(key string) error
Exists(key string) (bool, error)
Watch(key string, ttl uint64, callback WatchCallback) error
CancelWatch(key string) error
Acquire(key string, value []byte) (string, error)
Release(session string) error
GetRange(prefix string) (value [][]byte, err error)
DeleteRange(prefix string) error
WatchRange(prefix string, filter string, heartbeat uint64, callback WatchCallback) error
CancelWatchRange(prefix string) error
AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error)
AtomicDelete(key string, oldValue []byte, index uint64) (bool, error)
}
```
To be elligible as a **discovery backend** only, a K/V store implementation should at least offer `Get`, `Put`, `WatchRange`, `GetRange`.
You can get inspiration from existing backends to create a new one. This interface could be subject to changes to improve the experience of using the library and contributing to a new backend.

301
pkg/store/consul.go Normal file
View File

@ -0,0 +1,301 @@
package store
import (
"crypto/tls"
"errors"
"net/http"
"time"
log "github.com/Sirupsen/logrus"
api "github.com/hashicorp/consul/api"
)
var (
// ErrSessionUndefined is exported
ErrSessionUndefined = errors.New("Session does not exist")
)
// Consul embeds the client and watches/lock sessions
type Consul struct {
config *api.Config
client *api.Client
sessions map[string]*api.Session
watches map[string]*Watch
}
// Watch embeds the event channel and the
// refresh interval
type Watch struct {
LastIndex uint64
Interval time.Duration
}
// InitializeConsul creates a new Consul client given
// a list of endpoints and optional tls config
func InitializeConsul(endpoints []string, options Config) (Store, error) {
s := &Consul{}
s.sessions = make(map[string]*api.Session)
s.watches = make(map[string]*Watch)
// Create Consul client
config := api.DefaultConfig()
s.config = config
config.HttpClient = http.DefaultClient
config.Address = endpoints[0]
config.Scheme = "http"
if options.TLS != nil {
s.setTLS(options.TLS)
}
if options.Timeout != 0 {
s.setTimeout(options.Timeout)
}
// Creates a new client
client, err := api.NewClient(config)
if err != nil {
log.Errorf("Couldn't initialize consul client..")
return nil, err
}
s.client = client
return s, nil
}
// SetTLS sets Consul TLS options
func (s *Consul) setTLS(tls *tls.Config) {
s.config.HttpClient.Transport = &http.Transport{
TLSClientConfig: tls,
}
s.config.Scheme = "https"
}
// SetTimeout sets the timout for connecting to Consul
func (s *Consul) setTimeout(time time.Duration) {
s.config.WaitTime = time
}
// Get the value at "key", returns the last modified index
// to use in conjunction to CAS calls
func (s *Consul) Get(key string) (value []byte, lastIndex uint64, err error) {
pair, meta, err := s.client.KV().Get(partialFormat(key), nil)
if err != nil {
return nil, 0, err
}
if pair == nil {
return nil, 0, ErrKeyNotFound
}
return pair.Value, meta.LastIndex, nil
}
// Put a value at "key"
func (s *Consul) Put(key string, value []byte) error {
p := &api.KVPair{Key: partialFormat(key), Value: value}
if s.client == nil {
log.Error("Error initializing client")
}
_, err := s.client.KV().Put(p, nil)
return err
}
// Delete a value at "key"
func (s *Consul) Delete(key string) error {
_, err := s.client.KV().Delete(partialFormat(key), nil)
return err
}
// Exists checks that the key exists inside the store
func (s *Consul) Exists(key string) (bool, error) {
_, _, err := s.Get(key)
if err != nil && err == ErrKeyNotFound {
return false, err
}
return true, nil
}
// GetRange gets a range of values at "directory"
func (s *Consul) GetRange(prefix string) (values [][]byte, err error) {
pairs, _, err := s.client.KV().List(partialFormat(prefix), nil)
if err != nil {
return nil, err
}
if len(pairs) == 0 {
return nil, ErrKeyNotFound
}
for _, pair := range pairs {
if pair.Key == prefix {
continue
}
values = append(values, pair.Value)
}
return values, nil
}
// DeleteRange deletes a range of values at "directory"
func (s *Consul) DeleteRange(prefix string) error {
_, err := s.client.KV().DeleteTree(partialFormat(prefix), nil)
return err
}
// Watch a single key for modifications
func (s *Consul) Watch(key string, heartbeat time.Duration, callback WatchCallback) error {
fkey := partialFormat(key)
// We get the last index first
_, meta, err := s.client.KV().Get(fkey, nil)
if err != nil {
return err
}
// Add watch to map
s.watches[fkey] = &Watch{LastIndex: meta.LastIndex, Interval: heartbeat}
eventChan := s.waitForChange(fkey)
for _ = range eventChan {
log.WithField("name", "consul").Debug("Key watch triggered")
entry, _, err := s.Get(key)
if err != nil {
log.Error("Cannot refresh the key: ", fkey, ", cancelling watch")
s.watches[fkey] = nil
return err
}
value := [][]byte{[]byte(entry)}
callback(value)
}
return nil
}
// CancelWatch cancels a watch, sends a signal to the appropriate
// stop channel
func (s *Consul) CancelWatch(key string) error {
key = partialFormat(key)
if _, ok := s.watches[key]; !ok {
log.Error("Chan does not exist for key: ", key)
return ErrWatchDoesNotExist
}
s.watches[key] = nil
return nil
}
// Internal function to check if a key has changed
func (s *Consul) waitForChange(key string) <-chan uint64 {
ch := make(chan uint64)
kv := s.client.KV()
go func() {
for {
watch, ok := s.watches[key]
if !ok {
log.Error("Cannot access last index for key: ", key, " closing channel")
break
}
option := &api.QueryOptions{
WaitIndex: watch.LastIndex,
WaitTime: watch.Interval,
}
_, meta, err := kv.List(key, option)
if err != nil {
log.WithField("name", "consul").Errorf("Discovery error: %v", err)
break
}
watch.LastIndex = meta.LastIndex
ch <- watch.LastIndex
}
close(ch)
}()
return ch
}
// WatchRange triggers a watch on a range of values at "directory"
func (s *Consul) WatchRange(prefix string, filter string, heartbeat time.Duration, callback WatchCallback) error {
fprefix := partialFormat(prefix)
// We get the last index first
_, meta, err := s.client.KV().Get(prefix, nil)
if err != nil {
return err
}
// Add watch to map
s.watches[fprefix] = &Watch{LastIndex: meta.LastIndex, Interval: heartbeat}
eventChan := s.waitForChange(fprefix)
for _ = range eventChan {
log.WithField("name", "consul").Debug("Key watch triggered")
values, err := s.GetRange(prefix)
if err != nil {
log.Error("Cannot refresh keys with prefix: ", fprefix, ", cancelling watch")
s.watches[fprefix] = nil
return err
}
callback(values)
}
return nil
}
// CancelWatchRange stops the watch on the range of values, sends
// a signal to the appropriate stop channel
func (s *Consul) CancelWatchRange(prefix string) error {
return s.CancelWatch(prefix)
}
// Acquire the lock for "key"/"directory"
func (s *Consul) Acquire(key string, value []byte) (string, error) {
key = partialFormat(key)
session := s.client.Session()
id, _, err := session.CreateNoChecks(nil, nil)
if err != nil {
return "", err
}
// Add session to map
s.sessions[id] = session
p := &api.KVPair{Key: key, Value: value, Session: id}
if work, _, err := s.client.KV().Acquire(p, nil); err != nil {
return "", err
} else if !work {
return "", ErrCannotLock
}
return id, nil
}
// Release the lock for "key"/"directory"
func (s *Consul) Release(id string) error {
if _, ok := s.sessions[id]; !ok {
log.Error("Lock session does not exist")
return ErrSessionUndefined
}
session := s.sessions[id]
session.Destroy(id, nil)
s.sessions[id] = nil
return nil
}
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Consul) AtomicPut(key string, _ []byte, newValue []byte, index uint64) (bool, error) {
p := &api.KVPair{Key: partialFormat(key), Value: newValue, ModifyIndex: index}
if work, _, err := s.client.KV().CAS(p, nil); err != nil {
return false, err
} else if !work {
return false, ErrKeyModified
}
return true, nil
}
// AtomicDelete deletes a value at "key" if the key has not
// been modified in the meantime, throws an error if this is the case
func (s *Consul) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) {
p := &api.KVPair{Key: partialFormat(key), ModifyIndex: index}
if work, _, err := s.client.KV().DeleteCAS(p, nil); err != nil {
return false, err
} else if !work {
return false, ErrKeyModified
}
return true, nil
}

264
pkg/store/etcd.go Normal file
View File

@ -0,0 +1,264 @@
package store
import (
"crypto/tls"
"net"
"net/http"
"strings"
"time"
log "github.com/Sirupsen/logrus"
etcd "github.com/coreos/go-etcd/etcd"
)
// Etcd embeds the client
type Etcd struct {
client *etcd.Client
watches map[string]chan<- bool
}
// InitializeEtcd creates a new Etcd client given
// a list of endpoints and optional tls config
func InitializeEtcd(addrs []string, options Config) (Store, error) {
s := &Etcd{}
s.watches = make(map[string]chan<- bool)
entries := createEndpoints(addrs, "http")
s.client = etcd.NewClient(entries)
if options.TLS != nil {
s.setTLS(options.TLS)
}
if options.Timeout != 0 {
s.setTimeout(options.Timeout)
}
return s, nil
}
// SetTLS sets the tls configuration given the path
// of certificate files
func (s *Etcd) setTLS(tls *tls.Config) {
// Change to https scheme
var addrs []string
entries := s.client.GetCluster()
for _, entry := range entries {
addrs = append(addrs, strings.Replace(entry, "http", "https", -1))
}
s.client.SetCluster(addrs)
// Set transport
t := http.Transport{
Dial: (&net.Dialer{
Timeout: 30 * time.Second, // default timeout
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: tls,
}
s.client.SetTransport(&t)
}
// SetTimeout sets the timeout used for connecting to the store
func (s *Etcd) setTimeout(time time.Duration) {
s.client.SetDialTimeout(time)
}
// Create the entire path for a directory that does not exist
func (s *Etcd) createDirectory(path string) error {
// TODO Handle TTL at key/dir creation -> use K/V struct for key infos?
if _, err := s.client.CreateDir(format(path), 10); err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
if etcdError.ErrorCode != 105 { // Skip key already exists
return err
}
} else {
return err
}
}
return nil
}
// Get the value at "key", returns the last modified index
// to use in conjunction to CAS calls
func (s *Etcd) Get(key string) (value []byte, lastIndex uint64, err error) {
result, err := s.client.Get(format(key), false, false)
if err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
// Not a Directory or Not a file
if etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 {
return nil, 0, ErrKeyNotFound
}
}
return nil, 0, err
}
return []byte(result.Node.Value), result.Node.ModifiedIndex, nil
}
// Put a value at "key"
func (s *Etcd) Put(key string, value []byte) error {
if _, err := s.client.Set(key, string(value), 0); err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
if etcdError.ErrorCode == 104 { // Not a directory
// Remove the last element (the actual key) and set the prefix as a dir
err = s.createDirectory(getDir(key))
if _, err := s.client.Set(key, string(value), 0); err != nil {
return err
}
}
}
return err
}
return nil
}
// Delete a value at "key"
func (s *Etcd) Delete(key string) error {
if _, err := s.client.Delete(format(key), false); err != nil {
return err
}
return nil
}
// Exists checks if the key exists inside the store
func (s *Etcd) Exists(key string) (bool, error) {
value, _, err := s.Get(key)
if err != nil {
if err == ErrKeyNotFound || value == nil {
return false, nil
}
return false, err
}
return true, nil
}
// Watch a single key for modifications
func (s *Etcd) Watch(key string, _ time.Duration, callback WatchCallback) error {
key = format(key)
watchChan := make(chan *etcd.Response)
stopChan := make(chan bool)
// Create new Watch entry
s.watches[key] = stopChan
// Start watch
go s.client.Watch(key, 0, false, watchChan, stopChan)
for _ = range watchChan {
log.WithField("name", "etcd").Debug("Discovery watch triggered")
entry, _, err := s.Get(key)
if err != nil {
log.Error("Cannot refresh the key: ", key, ", cancelling watch")
s.watches[key] = nil
return err
}
value := [][]byte{[]byte(entry)}
callback(value)
}
return nil
}
// CancelWatch cancels a watch, sends a signal to the appropriate
// stop channel
func (s *Etcd) CancelWatch(key string) error {
key = format(key)
if _, ok := s.watches[key]; !ok {
log.Error("Chan does not exist for key: ", key)
return ErrWatchDoesNotExist
}
// Send stop signal to event chan
s.watches[key] <- true
s.watches[key] = nil
return nil
}
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Etcd) AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) {
resp, err := s.client.CompareAndSwap(format(key), string(newValue), 5, string(oldValue), 0)
if err != nil {
return false, err
}
if !(resp.Node.Value == string(newValue) && resp.Node.Key == key && resp.Node.TTL == 5) {
return false, ErrKeyModified
}
if !(resp.PrevNode.Value == string(newValue) && resp.PrevNode.Key == key && resp.PrevNode.TTL == 5) {
return false, ErrKeyModified
}
return true, nil
}
// AtomicDelete deletes a value at "key" if the key has not
// been modified in the meantime, throws an error if this is the case
func (s *Etcd) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) {
resp, err := s.client.CompareAndDelete(format(key), string(oldValue), 0)
if err != nil {
return false, err
}
if !(resp.PrevNode.Value == string(oldValue) && resp.PrevNode.Key == key && resp.PrevNode.TTL == 5) {
return false, ErrKeyModified
}
return true, nil
}
// GetRange gets a range of values at "directory"
func (s *Etcd) GetRange(prefix string) (value [][]byte, err error) {
resp, err := s.client.Get(format(prefix), true, true)
if err != nil {
return nil, err
}
values := [][]byte{}
for _, n := range resp.Node.Nodes {
values = append(values, []byte(n.Value))
}
return values, nil
}
// DeleteRange deletes a range of values at "directory"
func (s *Etcd) DeleteRange(prefix string) error {
if _, err := s.client.Delete(format(prefix), true); err != nil {
return err
}
return nil
}
// WatchRange triggers a watch on a range of values at "directory"
func (s *Etcd) WatchRange(prefix string, filter string, _ time.Duration, callback WatchCallback) error {
prefix = format(prefix)
watchChan := make(chan *etcd.Response)
stopChan := make(chan bool)
// Create new Watch entry
s.watches[prefix] = stopChan
// Start watch
go s.client.Watch(prefix, 0, true, watchChan, stopChan)
for _ = range watchChan {
log.WithField("name", "etcd").Debug("Discovery watch triggered")
values, err := s.GetRange(prefix)
if err != nil {
log.Error("Cannot refresh the key: ", prefix, ", cancelling watch")
s.watches[prefix] = nil
return err
}
callback(values)
}
return nil
}
// CancelWatchRange stops the watch on the range of values, sends
// a signal to the appropriate stop channel
func (s *Etcd) CancelWatchRange(prefix string) error {
return s.CancelWatch(format(prefix))
}
// Acquire the lock for "key"/"directory"
func (s *Etcd) Acquire(key string, value []byte) (string, error) {
return "", ErrNotImplemented
}
// Release the lock for "key"/"directory"
func (s *Etcd) Release(session string) error {
return ErrNotImplemented
}

51
pkg/store/helpers.go Normal file
View File

@ -0,0 +1,51 @@
package store
import (
"strings"
)
// Creates a list of endpoints given the right scheme
func createEndpoints(addrs []string, scheme string) (entries []string) {
for _, addr := range addrs {
entries = append(entries, scheme+"://"+addr)
}
return entries
}
// Formats the key
func format(key string) string {
return fullpath(splitKey(key))
}
// Formats the key partially (omits the first '/')
func partialFormat(key string) string {
return partialpath(splitKey(key))
}
// Get the full directory part of the key
func getDir(key string) string {
parts := splitKey(key)
parts = parts[:len(parts)-1]
return fullpath(parts)
}
// SplitKey splits the key to extract path informations
func splitKey(key string) (path []string) {
if strings.Contains(key, "/") {
path = strings.Split(key, "/")
} else {
path = []string{key}
}
return path
}
// Get the full correct path representation of a splitted key/directory
func fullpath(path []string) string {
return "/" + strings.Join(path, "/")
}
// Get the partial correct path representation of a splitted key/directory
// Omits the first '/'
func partialpath(path []string) string {
return strings.Join(path, "/")
}

85
pkg/store/store.go Normal file
View File

@ -0,0 +1,85 @@
package store
import (
"time"
log "github.com/Sirupsen/logrus"
)
// WatchCallback is used for watch methods on keys
// and is triggered on key change
type WatchCallback func(value [][]byte)
// Initialize creates a new Store object, initializing the client
type Initialize func(addrs []string, options Config) (Store, error)
// Store represents the backend K/V storage
// Each store should support every call listed
// here. Or it couldn't be implemented as a K/V
// backend for libkv
type Store interface {
// Put a value at the specified key
Put(key string, value []byte) error
// Get a value given its key
Get(key string) (value []byte, lastIndex uint64, err error)
// Delete the value at the specified key
Delete(key string) error
// Verify if a Key exists in the store
Exists(key string) (bool, error)
// Watch changes on a key
Watch(key string, heartbeat time.Duration, callback WatchCallback) error
// Cancel watch key
CancelWatch(key string) error
// Acquire the lock at key
Acquire(key string, value []byte) (string, error)
// Release the lock at key
Release(session string) error
// Get range of keys based on prefix
GetRange(prefix string) (value [][]byte, err error)
// Delete range of keys based on prefix
DeleteRange(prefix string) error
// Watch key namespaces
WatchRange(prefix string, filter string, heartbeat time.Duration, callback WatchCallback) error
// Cancel watch key range
CancelWatchRange(prefix string) error
// Atomic operation on a single value
AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error)
// Atomic delete of a single value
AtomicDelete(key string, oldValue []byte, index uint64) (bool, error)
}
var (
// List of Store services
stores map[string]Initialize
)
func init() {
stores = make(map[string]Initialize)
stores["consul"] = InitializeConsul
stores["etcd"] = InitializeEtcd
stores["zk"] = InitializeZookeeper
}
// CreateStore creates a an instance of store
func CreateStore(store string, addrs []string, options Config) (Store, error) {
if init, exists := stores[store]; exists {
log.WithFields(log.Fields{"store": store}).Debug("Initializing store service")
return init(addrs, options)
}
return nil, ErrNotSupported
}

42
pkg/store/structs.go Normal file
View File

@ -0,0 +1,42 @@
package store
import (
"crypto/tls"
"errors"
"time"
)
var (
// ErrNotSupported is exported
ErrNotSupported = errors.New("Backend storage not supported yet, please choose another one")
// ErrNotImplemented is exported
ErrNotImplemented = errors.New("Call not implemented in current backend")
// ErrNotReachable is exported
ErrNotReachable = errors.New("Api not reachable")
// ErrCannotLock is exported
ErrCannotLock = errors.New("Error acquiring the lock")
// ErrWatchDoesNotExist is exported
ErrWatchDoesNotExist = errors.New("No watch found for specified key")
// ErrKeyModified is exported
ErrKeyModified = errors.New("Unable to complete atomic operation, key modified")
// ErrKeyNotFound is exported
ErrKeyNotFound = errors.New("Key not found in store")
)
// KV represents the different supported K/V
type KV string
const (
// CONSUL is exported
CONSUL KV = "consul"
// ETCD is exported
ETCD = "etcd"
// ZOOKEEPER is exported
ZOOKEEPER = "zookeeper"
)
// Config contains the options for a storage client
type Config struct {
TLS *tls.Config
Timeout time.Duration
}

213
pkg/store/zookeeper.go Normal file
View File

@ -0,0 +1,213 @@
package store
import (
"strings"
"time"
log "github.com/Sirupsen/logrus"
zk "github.com/samuel/go-zookeeper/zk"
)
// Zookeeper embeds the zookeeper client
// and list of watches
type Zookeeper struct {
timeout time.Duration
client *zk.Conn
watches map[string]<-chan zk.Event
}
// InitializeZookeeper creates a new Zookeeper client
// given a list of endpoints and optional tls config
func InitializeZookeeper(endpoints []string, options Config) (Store, error) {
s := &Zookeeper{}
s.watches = make(map[string]<-chan zk.Event)
s.timeout = 5 * time.Second // default timeout
if options.Timeout != 0 {
s.setTimeout(options.Timeout)
}
conn, _, err := zk.Connect(endpoints, s.timeout)
if err != nil {
log.Error(err)
return nil, err
}
s.client = conn
return s, nil
}
// SetTimeout sets the timout for connecting to Zookeeper
func (s *Zookeeper) setTimeout(time time.Duration) {
s.timeout = time
}
// Get the value at "key", returns the last modified index
// to use in conjunction to CAS calls
func (s *Zookeeper) Get(key string) (value []byte, lastIndex uint64, err error) {
resp, meta, err := s.client.Get(format(key))
if err != nil {
return nil, 0, err
}
if resp == nil {
return nil, 0, ErrKeyNotFound
}
return resp, uint64(meta.Mzxid), nil
}
// Create the entire path for a directory that does not exist
func (s *Zookeeper) createFullpath(path []string) error {
for i := 1; i <= len(path); i++ {
newpath := "/" + strings.Join(path[:i], "/")
_, err := s.client.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll))
if err != nil {
// Skip if node already exists
if err != zk.ErrNodeExists {
return err
}
}
}
return nil
}
// Put a value at "key"
func (s *Zookeeper) Put(key string, value []byte) error {
fkey := format(key)
exists, err := s.Exists(key)
if err != nil {
return err
}
if !exists {
s.createFullpath(splitKey(key))
}
_, err = s.client.Set(fkey, value, -1)
return err
}
// Delete a value at "key"
func (s *Zookeeper) Delete(key string) error {
err := s.client.Delete(format(key), -1)
return err
}
// Exists checks if the key exists inside the store
func (s *Zookeeper) Exists(key string) (bool, error) {
exists, _, err := s.client.Exists(format(key))
if err != nil {
return false, err
}
return exists, nil
}
// Watch a single key for modifications
func (s *Zookeeper) Watch(key string, _ time.Duration, callback WatchCallback) error {
fkey := format(key)
_, _, eventChan, err := s.client.GetW(fkey)
if err != nil {
return err
}
// Create a new Watch entry with eventChan
s.watches[fkey] = eventChan
for e := range eventChan {
if e.Type == zk.EventNodeChildrenChanged {
log.WithField("name", "zk").Debug("Discovery watch triggered")
entry, _, err := s.Get(key)
value := [][]byte{[]byte(entry)}
if err == nil {
callback(value)
}
}
}
return nil
}
// CancelWatch cancels a watch, sends a signal to the appropriate
// stop channel
func (s *Zookeeper) CancelWatch(key string) error {
key = format(key)
if _, ok := s.watches[key]; !ok {
log.Error("Chan does not exist for key: ", key)
return ErrWatchDoesNotExist
}
// Just remove the entry on watches key
s.watches[key] = nil
return nil
}
// GetRange gets a range of values at "directory"
func (s *Zookeeper) GetRange(prefix string) (values [][]byte, err error) {
prefix = format(prefix)
entries, _, err := s.client.Children(prefix)
if err != nil {
log.Error("Cannot fetch range of keys beginning with prefix: ", prefix)
return nil, err
}
for _, item := range entries {
values = append(values, []byte(item))
}
return values, err
}
// DeleteRange deletes a range of values at "directory"
func (s *Zookeeper) DeleteRange(prefix string) error {
err := s.client.Delete(format(prefix), -1)
return err
}
// WatchRange triggers a watch on a range of values at "directory"
func (s *Zookeeper) WatchRange(prefix string, filter string, _ time.Duration, callback WatchCallback) error {
fprefix := format(prefix)
_, _, eventChan, err := s.client.ChildrenW(fprefix)
if err != nil {
return err
}
// Create a new Watch entry with eventChan
s.watches[fprefix] = eventChan
for e := range eventChan {
if e.Type == zk.EventNodeChildrenChanged {
log.WithField("name", "zk").Debug("Discovery watch triggered")
values, err := s.GetRange(prefix)
if err == nil {
callback(values)
}
}
}
return nil
}
// CancelWatchRange stops the watch on the range of values, sends
// a signal to the appropriate stop channel
func (s *Zookeeper) CancelWatchRange(prefix string) error {
return s.CancelWatch(prefix)
}
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Zookeeper) AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) {
// Use index of Set method to implement CAS
return false, ErrNotImplemented
}
// AtomicDelete deletes a value at "key" if the key has not
// been modified in the meantime, throws an error if this is the case
func (s *Zookeeper) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) {
return false, ErrNotImplemented
}
// Acquire the lock for "key"/"directory"
func (s *Zookeeper) Acquire(path string, value []byte) (string, error) {
// lock := zk.NewLock(s.client, path, nil)
// locks[path] = lock
// lock.Lock()
return "", ErrNotImplemented
}
// Release the lock for "key"/"directory"
func (s *Zookeeper) Release(session string) error {
return ErrNotImplemented
}