diff --git a/cli/cli.go b/cli/cli.go index 98ed2797e1..b1296a8d6c 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -82,7 +82,8 @@ func Run() { log.Fatalf("discovery required to list a cluster. See '%s list --help'.", c.App.Name) } - d, err := discovery.New(dflag, 0) + // FIXME Add and use separate timeout flag instead of forcing it + d, err := discovery.New(dflag, 10) if err != nil { log.Fatal(err) } diff --git a/discovery/consul/consul.go b/discovery/consul/consul.go deleted file mode 100644 index 0ac9baf94e..0000000000 --- a/discovery/consul/consul.go +++ /dev/null @@ -1,115 +0,0 @@ -package consul - -import ( - "fmt" - "path" - "strings" - "time" - - log "github.com/Sirupsen/logrus" - "github.com/docker/swarm/discovery" - consul "github.com/hashicorp/consul/api" -) - -// Discovery is exported -type Discovery struct { - heartbeat time.Duration - client *consul.Client - prefix string - lastIndex uint64 -} - -func init() { - discovery.Register("consul", &Discovery{}) -} - -// Initialize is exported -func (s *Discovery) Initialize(uris string, heartbeat uint64) error { - parts := strings.SplitN(uris, "/", 2) - if len(parts) < 2 { - return fmt.Errorf("invalid format %q, missing ", uris) - } - addr := parts[0] - path := parts[1] - - config := consul.DefaultConfig() - config.Address = addr - - client, err := consul.NewClient(config) - if err != nil { - return err - } - s.client = client - s.heartbeat = time.Duration(heartbeat) * time.Second - s.prefix = path + "/" - kv := s.client.KV() - p := &consul.KVPair{Key: s.prefix, Value: nil} - if _, err = kv.Put(p, nil); err != nil { - return err - } - _, meta, err := kv.Get(s.prefix, nil) - if err != nil { - return err - } - s.lastIndex = meta.LastIndex - return nil -} - -// Fetch is exported -func (s *Discovery) Fetch() ([]*discovery.Entry, error) { - kv := s.client.KV() - pairs, _, err := kv.List(s.prefix, nil) - if err != nil { - return nil, err - } - - addrs := []string{} - for _, pair := range pairs { - if pair.Key == s.prefix { - continue - } - addrs = append(addrs, string(pair.Value)) - } - - return discovery.CreateEntries(addrs) -} - -// Watch is exported -func (s *Discovery) Watch(callback discovery.WatchCallback) { - for _ = range s.waitForChange() { - log.WithField("name", "consul").Debug("Discovery watch triggered") - entries, err := s.Fetch() - if err == nil { - callback(entries) - } - } -} - -// Register is exported -func (s *Discovery) Register(addr string) error { - kv := s.client.KV() - p := &consul.KVPair{Key: path.Join(s.prefix, addr), Value: []byte(addr)} - _, err := kv.Put(p, nil) - return err -} - -func (s *Discovery) waitForChange() <-chan uint64 { - c := make(chan uint64) - go func() { - for { - kv := s.client.KV() - option := &consul.QueryOptions{ - WaitIndex: s.lastIndex, - WaitTime: s.heartbeat} - _, meta, err := kv.List(s.prefix, option) - if err != nil { - log.WithField("name", "consul").Errorf("Discovery error: %v", err) - break - } - s.lastIndex = meta.LastIndex - c <- s.lastIndex - } - close(c) - }() - return c -} diff --git a/discovery/consul/consul_test.go b/discovery/consul/consul_test.go deleted file mode 100644 index ce2f87ec86..0000000000 --- a/discovery/consul/consul_test.go +++ /dev/null @@ -1,20 +0,0 @@ -package consul - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestInitialize(t *testing.T) { - discovery := &Discovery{} - - assert.Equal(t, discovery.Initialize("127.0.0.1", 0).Error(), "invalid format \"127.0.0.1\", missing ") - - assert.Error(t, discovery.Initialize("127.0.0.1/path", 0)) - assert.Equal(t, discovery.prefix, "path/") - - assert.Error(t, discovery.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0)) - assert.Equal(t, discovery.prefix, "path/") - -} diff --git a/discovery/etcd/etcd.go b/discovery/etcd/etcd.go deleted file mode 100644 index f4e3e2204b..0000000000 --- a/discovery/etcd/etcd.go +++ /dev/null @@ -1,89 +0,0 @@ -package etcd - -import ( - "fmt" - "path" - "strings" - - log "github.com/Sirupsen/logrus" - "github.com/coreos/go-etcd/etcd" - "github.com/docker/swarm/discovery" -) - -// Discovery is exported -type Discovery struct { - ttl uint64 - client *etcd.Client - path string -} - -func init() { - discovery.Register("etcd", &Discovery{}) -} - -// Initialize is exported -func (s *Discovery) Initialize(uris string, heartbeat uint64) error { - var ( - // split here because uris can contain multiples ips - // like `etcd://192.168.0.1,192.168.0.2,192.168.0.3/path` - parts = strings.SplitN(uris, "/", 2) - ips = strings.Split(parts[0], ",") - entries []string - ) - - if len(parts) != 2 { - return fmt.Errorf("invalid format %q, missing ", uris) - } - - for _, ip := range ips { - entries = append(entries, "http://"+ip) - } - - s.client = etcd.NewClient(entries) - // ttl should always be > heartbeat, even if heartbeat = 1 or 0 - s.ttl = uint64(heartbeat*3/2) + 1 - s.path = "/" + parts[1] + "/" - if _, err := s.client.CreateDir(s.path, s.ttl); err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - if etcdError.ErrorCode != 105 { // skip key already exists - return err - } - } else { - return err - } - } - return nil -} - -// Fetch is exported -func (s *Discovery) Fetch() ([]*discovery.Entry, error) { - resp, err := s.client.Get(s.path, true, true) - if err != nil { - return nil, err - } - - addrs := []string{} - for _, n := range resp.Node.Nodes { - addrs = append(addrs, n.Value) - } - return discovery.CreateEntries(addrs) -} - -// Watch is exported -func (s *Discovery) Watch(callback discovery.WatchCallback) { - watchChan := make(chan *etcd.Response) - go s.client.Watch(s.path, 0, true, watchChan, nil) - for _ = range watchChan { - log.WithField("name", "etcd").Debug("Discovery watch triggered") - entries, err := s.Fetch() - if err == nil { - callback(entries) - } - } -} - -// Register is exported -func (s *Discovery) Register(addr string) error { - _, err := s.client.Set(path.Join(s.path, addr), addr, s.ttl) - return err -} diff --git a/discovery/etcd/etcd_test.go b/discovery/etcd/etcd_test.go deleted file mode 100644 index bbca1bc77c..0000000000 --- a/discovery/etcd/etcd_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package etcd - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestInitialize(t *testing.T) { - discovery := &Discovery{} - - assert.Equal(t, discovery.Initialize("127.0.0.1", 0).Error(), "invalid format \"127.0.0.1\", missing ") - - assert.Error(t, discovery.Initialize("127.0.0.1/path", 0)) - assert.Equal(t, discovery.path, "/path/") - - assert.Error(t, discovery.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0)) - assert.Equal(t, discovery.path, "/path/") -} diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go new file mode 100644 index 0000000000..a7ac5acbb8 --- /dev/null +++ b/discovery/kv/kv.go @@ -0,0 +1,92 @@ +package kv + +import ( + "fmt" + "path" + "strings" + "time" + + "github.com/docker/swarm/discovery" + "github.com/docker/swarm/pkg/store" +) + +// Discovery is exported +type Discovery struct { + store store.Store + name string + heartbeat time.Duration + prefix string +} + +func init() { + discovery.Register("zk", &Discovery{name: "zk"}) + discovery.Register("consul", &Discovery{name: "consul"}) + discovery.Register("etcd", &Discovery{name: "etcd"}) +} + +// Initialize is exported +func (s *Discovery) Initialize(uris string, heartbeat uint64) error { + var ( + parts = strings.SplitN(uris, "/", 2) + ips = strings.Split(parts[0], ",") + addrs []string + err error + ) + + if len(parts) != 2 { + return fmt.Errorf("invalid format %q, missing ", uris) + } + + for _, ip := range ips { + addrs = append(addrs, ip) + } + + s.heartbeat = time.Duration(heartbeat) * time.Second + s.prefix = parts[1] + + // Creates a new store, will ignore options given + // if not supported by the chosen store + s.store, err = store.CreateStore( + s.name, // name of the store + addrs, + store.Config{ + Timeout: s.heartbeat, + }, + ) + if err != nil { + return err + } + + return nil +} + +// Fetch is exported +func (s *Discovery) Fetch() ([]*discovery.Entry, error) { + addrs, err := s.store.GetRange(s.prefix) + if err != nil { + return nil, err + } + return discovery.CreateEntries(convertToStringArray(addrs)) +} + +// Watch is exported +func (s *Discovery) Watch(callback discovery.WatchCallback) { + s.store.WatchRange(s.prefix, "", s.heartbeat, func(kvalues [][]byte) { + // Traduce byte array entries to discovery.Entry + entries, _ := discovery.CreateEntries(convertToStringArray(kvalues)) + callback(entries) + }) +} + +// Register is exported +func (s *Discovery) Register(addr string) error { + err := s.store.Put(path.Join(s.prefix, addr), []byte(addr)) + return err +} + +func convertToStringArray(entries [][]byte) (addrs []string) { + for _, entry := range entries { + addrs = append(addrs, string(entry)) + } + return addrs +} diff --git a/discovery/kv/kv_test.go b/discovery/kv/kv_test.go new file mode 100644 index 0000000000..6a2b0c7fef --- /dev/null +++ b/discovery/kv/kv_test.go @@ -0,0 +1,20 @@ +package kv + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestInitialize(t *testing.T) { + discoveryService := &Discovery{} + + assert.Equal(t, discoveryService.Initialize("127.0.0.1", 0).Error(), "invalid format \"127.0.0.1\", missing ") + + assert.Error(t, discoveryService.Initialize("127.0.0.1/path", 0)) + assert.Equal(t, discoveryService.prefix, "path") + + assert.Error(t, discoveryService.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0)) + assert.Equal(t, discoveryService.prefix, "path") + +} diff --git a/discovery/zookeeper/zookeeper.go b/discovery/zookeeper/zookeeper.go deleted file mode 100644 index 2b1d4ad16e..0000000000 --- a/discovery/zookeeper/zookeeper.go +++ /dev/null @@ -1,149 +0,0 @@ -package zookeeper - -import ( - "fmt" - "path" - "strings" - "time" - - log "github.com/Sirupsen/logrus" - "github.com/docker/swarm/discovery" - "github.com/samuel/go-zookeeper/zk" -) - -// Discovery is exported -type Discovery struct { - conn *zk.Conn - path []string - heartbeat uint64 -} - -func init() { - discovery.Register("zk", &Discovery{}) -} - -func (s *Discovery) fullpath() string { - return "/" + strings.Join(s.path, "/") -} - -func (s *Discovery) createFullpath() error { - for i := 1; i <= len(s.path); i++ { - newpath := "/" + strings.Join(s.path[:i], "/") - _, err := s.conn.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll)) - if err != nil { - // It's OK if key already existed. Just skip. - if err != zk.ErrNodeExists { - return err - } - } - } - return nil -} - -// Initialize is exported -func (s *Discovery) Initialize(uris string, heartbeat uint64) error { - var ( - // split here because uris can contain multiples ips - // like `zk://192.168.0.1,192.168.0.2,192.168.0.3/path` - parts = strings.SplitN(uris, "/", 2) - ips = strings.Split(parts[0], ",") - ) - - if len(parts) != 2 { - return fmt.Errorf("invalid format %q, missing ", uris) - } - - if strings.Contains(parts[1], "/") { - s.path = strings.Split(parts[1], "/") - } else { - s.path = []string{parts[1]} - } - - conn, _, err := zk.Connect(ips, time.Second) - if err != nil { - return err - } - - s.conn = conn - s.heartbeat = heartbeat - err = s.createFullpath() - if err != nil { - return err - } - - return nil -} - -// Fetch is exported -func (s *Discovery) Fetch() ([]*discovery.Entry, error) { - addrs, _, err := s.conn.Children(s.fullpath()) - - if err != nil { - return nil, err - } - - return discovery.CreateEntries(addrs) -} - -// Watch is exported -func (s *Discovery) Watch(callback discovery.WatchCallback) { - - addrs, _, eventChan, err := s.conn.ChildrenW(s.fullpath()) - if err != nil { - log.WithField("name", "zk").Debug("Discovery watch aborted") - return - } - entries, err := discovery.CreateEntries(addrs) - if err == nil { - callback(entries) - } - - for e := range eventChan { - if e.Type == zk.EventNodeChildrenChanged { - log.WithField("name", "zk").Debug("Discovery watch triggered") - entries, err := s.Fetch() - if err == nil { - callback(entries) - } - } - - } - -} - -// Register is exported -func (s *Discovery) Register(addr string) error { - nodePath := path.Join(s.fullpath(), addr) - - // check existing for the parent path first - exist, _, err := s.conn.Exists(s.fullpath()) - if err != nil { - return err - } - - // if the parent path does not exist yet - if exist == false { - // create the parent first - err = s.createFullpath() - if err != nil { - return err - } - } else { - // if node path exists - exist, _, err = s.conn.Exists(nodePath) - if err != nil { - return err - } - // delete it first - if exist { - err = s.conn.Delete(nodePath, -1) - if err != nil { - return err - } - } - } - - // create the node path to store address information - _, err = s.conn.Create(nodePath, []byte(addr), 0, zk.WorldACL(zk.PermAll)) - return err -} diff --git a/discovery/zookeeper/zookeeper_test.go b/discovery/zookeeper/zookeeper_test.go deleted file mode 100644 index b09144da1b..0000000000 --- a/discovery/zookeeper/zookeeper_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package zookeeper - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestInitialize(t *testing.T) { - service := &Discovery{} - - assert.Equal(t, service.Initialize("127.0.0.1", 0).Error(), "invalid format \"127.0.0.1\", missing ") - - assert.Error(t, service.Initialize("127.0.0.1/path", 0)) - assert.Equal(t, service.fullpath(), "/path") - - assert.Error(t, service.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0)) - assert.Equal(t, service.fullpath(), "/path") - - assert.Error(t, service.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path/sub1/sub2", 0)) - assert.Equal(t, service.fullpath(), "/path/sub1/sub2") -} diff --git a/main.go b/main.go index 899d90cb13..f90922793d 100644 --- a/main.go +++ b/main.go @@ -1,12 +1,10 @@ package main import ( - _ "github.com/docker/swarm/discovery/consul" - _ "github.com/docker/swarm/discovery/etcd" _ "github.com/docker/swarm/discovery/file" + _ "github.com/docker/swarm/discovery/kv" _ "github.com/docker/swarm/discovery/nodes" _ "github.com/docker/swarm/discovery/token" - _ "github.com/docker/swarm/discovery/zookeeper" "github.com/docker/swarm/cli" ) diff --git a/pkg/store/README.md b/pkg/store/README.md new file mode 100644 index 0000000000..43766aaf05 --- /dev/null +++ b/pkg/store/README.md @@ -0,0 +1,79 @@ +# Storage + +This package is used by the discovery service to register machines inside the cluster. It is also used to store cluster's metadata. + +## Example of usage + +### Create a new store and use Put/Get + +```go +package main + +import ( + "fmt" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/docker/swarm/store" +) + +func main() { + var ( + client = "localhost:8500" + ) + + // Initialize a new store with consul + kv, err := store.CreateStore( + store.Consul, + []string{client}, + store.Config{ + Timeout: 10*time.Second + }, + ) + if err != nil { + log.Error("Cannot create store consul") + } + + key := "foo" + err = kv.Put(key, []byte("bar")) + if err != nil { + log.Error("Error trying to put value at key `", key, "`") + } + + value, _, err := kv.Get(key) + if err != nil { + log.Error("Error trying accessing value at key `", key, "`") + } + + log.Info("value: ", string(value)) +} +``` + + + +## Contributing to a new storage backend + +A new **storage backend** should include those calls: + +```go +type Store interface { + Put(key string, value []byte) error + Get(key string) (value []byte, lastIndex uint64, err error) + Delete(key string) error + Exists(key string) (bool, error) + Watch(key string, ttl uint64, callback WatchCallback) error + CancelWatch(key string) error + Acquire(key string, value []byte) (string, error) + Release(session string) error + GetRange(prefix string) (value [][]byte, err error) + DeleteRange(prefix string) error + WatchRange(prefix string, filter string, heartbeat uint64, callback WatchCallback) error + CancelWatchRange(prefix string) error + AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) + AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) +} +``` + +To be elligible as a **discovery backend** only, a K/V store implementation should at least offer `Get`, `Put`, `WatchRange`, `GetRange`. + +You can get inspiration from existing backends to create a new one. This interface could be subject to changes to improve the experience of using the library and contributing to a new backend. diff --git a/pkg/store/consul.go b/pkg/store/consul.go new file mode 100644 index 0000000000..a052d9d556 --- /dev/null +++ b/pkg/store/consul.go @@ -0,0 +1,301 @@ +package store + +import ( + "crypto/tls" + "errors" + "net/http" + "time" + + log "github.com/Sirupsen/logrus" + api "github.com/hashicorp/consul/api" +) + +var ( + // ErrSessionUndefined is exported + ErrSessionUndefined = errors.New("Session does not exist") +) + +// Consul embeds the client and watches/lock sessions +type Consul struct { + config *api.Config + client *api.Client + sessions map[string]*api.Session + watches map[string]*Watch +} + +// Watch embeds the event channel and the +// refresh interval +type Watch struct { + LastIndex uint64 + Interval time.Duration +} + +// InitializeConsul creates a new Consul client given +// a list of endpoints and optional tls config +func InitializeConsul(endpoints []string, options Config) (Store, error) { + s := &Consul{} + s.sessions = make(map[string]*api.Session) + s.watches = make(map[string]*Watch) + + // Create Consul client + config := api.DefaultConfig() + s.config = config + config.HttpClient = http.DefaultClient + config.Address = endpoints[0] + config.Scheme = "http" + + if options.TLS != nil { + s.setTLS(options.TLS) + } + + if options.Timeout != 0 { + s.setTimeout(options.Timeout) + } + + // Creates a new client + client, err := api.NewClient(config) + if err != nil { + log.Errorf("Couldn't initialize consul client..") + return nil, err + } + s.client = client + + return s, nil +} + +// SetTLS sets Consul TLS options +func (s *Consul) setTLS(tls *tls.Config) { + s.config.HttpClient.Transport = &http.Transport{ + TLSClientConfig: tls, + } + s.config.Scheme = "https" +} + +// SetTimeout sets the timout for connecting to Consul +func (s *Consul) setTimeout(time time.Duration) { + s.config.WaitTime = time +} + +// Get the value at "key", returns the last modified index +// to use in conjunction to CAS calls +func (s *Consul) Get(key string) (value []byte, lastIndex uint64, err error) { + pair, meta, err := s.client.KV().Get(partialFormat(key), nil) + if err != nil { + return nil, 0, err + } + if pair == nil { + return nil, 0, ErrKeyNotFound + } + return pair.Value, meta.LastIndex, nil +} + +// Put a value at "key" +func (s *Consul) Put(key string, value []byte) error { + p := &api.KVPair{Key: partialFormat(key), Value: value} + if s.client == nil { + log.Error("Error initializing client") + } + _, err := s.client.KV().Put(p, nil) + return err +} + +// Delete a value at "key" +func (s *Consul) Delete(key string) error { + _, err := s.client.KV().Delete(partialFormat(key), nil) + return err +} + +// Exists checks that the key exists inside the store +func (s *Consul) Exists(key string) (bool, error) { + _, _, err := s.Get(key) + if err != nil && err == ErrKeyNotFound { + return false, err + } + return true, nil +} + +// GetRange gets a range of values at "directory" +func (s *Consul) GetRange(prefix string) (values [][]byte, err error) { + pairs, _, err := s.client.KV().List(partialFormat(prefix), nil) + if err != nil { + return nil, err + } + if len(pairs) == 0 { + return nil, ErrKeyNotFound + } + for _, pair := range pairs { + if pair.Key == prefix { + continue + } + values = append(values, pair.Value) + } + return values, nil +} + +// DeleteRange deletes a range of values at "directory" +func (s *Consul) DeleteRange(prefix string) error { + _, err := s.client.KV().DeleteTree(partialFormat(prefix), nil) + return err +} + +// Watch a single key for modifications +func (s *Consul) Watch(key string, heartbeat time.Duration, callback WatchCallback) error { + fkey := partialFormat(key) + + // We get the last index first + _, meta, err := s.client.KV().Get(fkey, nil) + if err != nil { + return err + } + + // Add watch to map + s.watches[fkey] = &Watch{LastIndex: meta.LastIndex, Interval: heartbeat} + eventChan := s.waitForChange(fkey) + + for _ = range eventChan { + log.WithField("name", "consul").Debug("Key watch triggered") + entry, _, err := s.Get(key) + if err != nil { + log.Error("Cannot refresh the key: ", fkey, ", cancelling watch") + s.watches[fkey] = nil + return err + } + + value := [][]byte{[]byte(entry)} + callback(value) + } + + return nil +} + +// CancelWatch cancels a watch, sends a signal to the appropriate +// stop channel +func (s *Consul) CancelWatch(key string) error { + key = partialFormat(key) + if _, ok := s.watches[key]; !ok { + log.Error("Chan does not exist for key: ", key) + return ErrWatchDoesNotExist + } + s.watches[key] = nil + return nil +} + +// Internal function to check if a key has changed +func (s *Consul) waitForChange(key string) <-chan uint64 { + ch := make(chan uint64) + kv := s.client.KV() + go func() { + for { + watch, ok := s.watches[key] + if !ok { + log.Error("Cannot access last index for key: ", key, " closing channel") + break + } + option := &api.QueryOptions{ + WaitIndex: watch.LastIndex, + WaitTime: watch.Interval, + } + _, meta, err := kv.List(key, option) + if err != nil { + log.WithField("name", "consul").Errorf("Discovery error: %v", err) + break + } + watch.LastIndex = meta.LastIndex + ch <- watch.LastIndex + } + close(ch) + }() + return ch +} + +// WatchRange triggers a watch on a range of values at "directory" +func (s *Consul) WatchRange(prefix string, filter string, heartbeat time.Duration, callback WatchCallback) error { + fprefix := partialFormat(prefix) + + // We get the last index first + _, meta, err := s.client.KV().Get(prefix, nil) + if err != nil { + return err + } + + // Add watch to map + s.watches[fprefix] = &Watch{LastIndex: meta.LastIndex, Interval: heartbeat} + eventChan := s.waitForChange(fprefix) + + for _ = range eventChan { + log.WithField("name", "consul").Debug("Key watch triggered") + values, err := s.GetRange(prefix) + if err != nil { + log.Error("Cannot refresh keys with prefix: ", fprefix, ", cancelling watch") + s.watches[fprefix] = nil + return err + } + callback(values) + } + + return nil +} + +// CancelWatchRange stops the watch on the range of values, sends +// a signal to the appropriate stop channel +func (s *Consul) CancelWatchRange(prefix string) error { + return s.CancelWatch(prefix) +} + +// Acquire the lock for "key"/"directory" +func (s *Consul) Acquire(key string, value []byte) (string, error) { + key = partialFormat(key) + session := s.client.Session() + id, _, err := session.CreateNoChecks(nil, nil) + if err != nil { + return "", err + } + + // Add session to map + s.sessions[id] = session + + p := &api.KVPair{Key: key, Value: value, Session: id} + if work, _, err := s.client.KV().Acquire(p, nil); err != nil { + return "", err + } else if !work { + return "", ErrCannotLock + } + + return id, nil +} + +// Release the lock for "key"/"directory" +func (s *Consul) Release(id string) error { + if _, ok := s.sessions[id]; !ok { + log.Error("Lock session does not exist") + return ErrSessionUndefined + } + session := s.sessions[id] + session.Destroy(id, nil) + s.sessions[id] = nil + return nil +} + +// AtomicPut put a value at "key" if the key has not been +// modified in the meantime, throws an error if this is the case +func (s *Consul) AtomicPut(key string, _ []byte, newValue []byte, index uint64) (bool, error) { + p := &api.KVPair{Key: partialFormat(key), Value: newValue, ModifyIndex: index} + if work, _, err := s.client.KV().CAS(p, nil); err != nil { + return false, err + } else if !work { + return false, ErrKeyModified + } + return true, nil +} + +// AtomicDelete deletes a value at "key" if the key has not +// been modified in the meantime, throws an error if this is the case +func (s *Consul) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) { + p := &api.KVPair{Key: partialFormat(key), ModifyIndex: index} + if work, _, err := s.client.KV().DeleteCAS(p, nil); err != nil { + return false, err + } else if !work { + return false, ErrKeyModified + } + return true, nil +} diff --git a/pkg/store/etcd.go b/pkg/store/etcd.go new file mode 100644 index 0000000000..4487ec5578 --- /dev/null +++ b/pkg/store/etcd.go @@ -0,0 +1,264 @@ +package store + +import ( + "crypto/tls" + "net" + "net/http" + "strings" + "time" + + log "github.com/Sirupsen/logrus" + etcd "github.com/coreos/go-etcd/etcd" +) + +// Etcd embeds the client +type Etcd struct { + client *etcd.Client + watches map[string]chan<- bool +} + +// InitializeEtcd creates a new Etcd client given +// a list of endpoints and optional tls config +func InitializeEtcd(addrs []string, options Config) (Store, error) { + s := &Etcd{} + s.watches = make(map[string]chan<- bool) + + entries := createEndpoints(addrs, "http") + s.client = etcd.NewClient(entries) + + if options.TLS != nil { + s.setTLS(options.TLS) + } + + if options.Timeout != 0 { + s.setTimeout(options.Timeout) + } + + return s, nil +} + +// SetTLS sets the tls configuration given the path +// of certificate files +func (s *Etcd) setTLS(tls *tls.Config) { + // Change to https scheme + var addrs []string + entries := s.client.GetCluster() + for _, entry := range entries { + addrs = append(addrs, strings.Replace(entry, "http", "https", -1)) + } + s.client.SetCluster(addrs) + + // Set transport + t := http.Transport{ + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, // default timeout + KeepAlive: 30 * time.Second, + }).Dial, + TLSHandshakeTimeout: 10 * time.Second, + TLSClientConfig: tls, + } + s.client.SetTransport(&t) +} + +// SetTimeout sets the timeout used for connecting to the store +func (s *Etcd) setTimeout(time time.Duration) { + s.client.SetDialTimeout(time) +} + +// Create the entire path for a directory that does not exist +func (s *Etcd) createDirectory(path string) error { + // TODO Handle TTL at key/dir creation -> use K/V struct for key infos? + if _, err := s.client.CreateDir(format(path), 10); err != nil { + if etcdError, ok := err.(*etcd.EtcdError); ok { + if etcdError.ErrorCode != 105 { // Skip key already exists + return err + } + } else { + return err + } + } + return nil +} + +// Get the value at "key", returns the last modified index +// to use in conjunction to CAS calls +func (s *Etcd) Get(key string) (value []byte, lastIndex uint64, err error) { + result, err := s.client.Get(format(key), false, false) + if err != nil { + if etcdError, ok := err.(*etcd.EtcdError); ok { + // Not a Directory or Not a file + if etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 { + return nil, 0, ErrKeyNotFound + } + } + return nil, 0, err + } + return []byte(result.Node.Value), result.Node.ModifiedIndex, nil +} + +// Put a value at "key" +func (s *Etcd) Put(key string, value []byte) error { + if _, err := s.client.Set(key, string(value), 0); err != nil { + if etcdError, ok := err.(*etcd.EtcdError); ok { + if etcdError.ErrorCode == 104 { // Not a directory + // Remove the last element (the actual key) and set the prefix as a dir + err = s.createDirectory(getDir(key)) + if _, err := s.client.Set(key, string(value), 0); err != nil { + return err + } + } + } + return err + } + return nil +} + +// Delete a value at "key" +func (s *Etcd) Delete(key string) error { + if _, err := s.client.Delete(format(key), false); err != nil { + return err + } + return nil +} + +// Exists checks if the key exists inside the store +func (s *Etcd) Exists(key string) (bool, error) { + value, _, err := s.Get(key) + if err != nil { + if err == ErrKeyNotFound || value == nil { + return false, nil + } + return false, err + } + return true, nil +} + +// Watch a single key for modifications +func (s *Etcd) Watch(key string, _ time.Duration, callback WatchCallback) error { + key = format(key) + watchChan := make(chan *etcd.Response) + stopChan := make(chan bool) + + // Create new Watch entry + s.watches[key] = stopChan + + // Start watch + go s.client.Watch(key, 0, false, watchChan, stopChan) + + for _ = range watchChan { + log.WithField("name", "etcd").Debug("Discovery watch triggered") + entry, _, err := s.Get(key) + if err != nil { + log.Error("Cannot refresh the key: ", key, ", cancelling watch") + s.watches[key] = nil + return err + } + value := [][]byte{[]byte(entry)} + callback(value) + } + return nil +} + +// CancelWatch cancels a watch, sends a signal to the appropriate +// stop channel +func (s *Etcd) CancelWatch(key string) error { + key = format(key) + if _, ok := s.watches[key]; !ok { + log.Error("Chan does not exist for key: ", key) + return ErrWatchDoesNotExist + } + // Send stop signal to event chan + s.watches[key] <- true + s.watches[key] = nil + return nil +} + +// AtomicPut put a value at "key" if the key has not been +// modified in the meantime, throws an error if this is the case +func (s *Etcd) AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) { + resp, err := s.client.CompareAndSwap(format(key), string(newValue), 5, string(oldValue), 0) + if err != nil { + return false, err + } + if !(resp.Node.Value == string(newValue) && resp.Node.Key == key && resp.Node.TTL == 5) { + return false, ErrKeyModified + } + if !(resp.PrevNode.Value == string(newValue) && resp.PrevNode.Key == key && resp.PrevNode.TTL == 5) { + return false, ErrKeyModified + } + return true, nil +} + +// AtomicDelete deletes a value at "key" if the key has not +// been modified in the meantime, throws an error if this is the case +func (s *Etcd) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) { + resp, err := s.client.CompareAndDelete(format(key), string(oldValue), 0) + if err != nil { + return false, err + } + if !(resp.PrevNode.Value == string(oldValue) && resp.PrevNode.Key == key && resp.PrevNode.TTL == 5) { + return false, ErrKeyModified + } + return true, nil +} + +// GetRange gets a range of values at "directory" +func (s *Etcd) GetRange(prefix string) (value [][]byte, err error) { + resp, err := s.client.Get(format(prefix), true, true) + if err != nil { + return nil, err + } + values := [][]byte{} + for _, n := range resp.Node.Nodes { + values = append(values, []byte(n.Value)) + } + return values, nil +} + +// DeleteRange deletes a range of values at "directory" +func (s *Etcd) DeleteRange(prefix string) error { + if _, err := s.client.Delete(format(prefix), true); err != nil { + return err + } + return nil +} + +// WatchRange triggers a watch on a range of values at "directory" +func (s *Etcd) WatchRange(prefix string, filter string, _ time.Duration, callback WatchCallback) error { + prefix = format(prefix) + watchChan := make(chan *etcd.Response) + stopChan := make(chan bool) + + // Create new Watch entry + s.watches[prefix] = stopChan + + // Start watch + go s.client.Watch(prefix, 0, true, watchChan, stopChan) + for _ = range watchChan { + log.WithField("name", "etcd").Debug("Discovery watch triggered") + values, err := s.GetRange(prefix) + if err != nil { + log.Error("Cannot refresh the key: ", prefix, ", cancelling watch") + s.watches[prefix] = nil + return err + } + callback(values) + } + return nil +} + +// CancelWatchRange stops the watch on the range of values, sends +// a signal to the appropriate stop channel +func (s *Etcd) CancelWatchRange(prefix string) error { + return s.CancelWatch(format(prefix)) +} + +// Acquire the lock for "key"/"directory" +func (s *Etcd) Acquire(key string, value []byte) (string, error) { + return "", ErrNotImplemented +} + +// Release the lock for "key"/"directory" +func (s *Etcd) Release(session string) error { + return ErrNotImplemented +} diff --git a/pkg/store/helpers.go b/pkg/store/helpers.go new file mode 100644 index 0000000000..2c94b54c14 --- /dev/null +++ b/pkg/store/helpers.go @@ -0,0 +1,51 @@ +package store + +import ( + "strings" +) + +// Creates a list of endpoints given the right scheme +func createEndpoints(addrs []string, scheme string) (entries []string) { + for _, addr := range addrs { + entries = append(entries, scheme+"://"+addr) + } + return entries +} + +// Formats the key +func format(key string) string { + return fullpath(splitKey(key)) +} + +// Formats the key partially (omits the first '/') +func partialFormat(key string) string { + return partialpath(splitKey(key)) +} + +// Get the full directory part of the key +func getDir(key string) string { + parts := splitKey(key) + parts = parts[:len(parts)-1] + return fullpath(parts) +} + +// SplitKey splits the key to extract path informations +func splitKey(key string) (path []string) { + if strings.Contains(key, "/") { + path = strings.Split(key, "/") + } else { + path = []string{key} + } + return path +} + +// Get the full correct path representation of a splitted key/directory +func fullpath(path []string) string { + return "/" + strings.Join(path, "/") +} + +// Get the partial correct path representation of a splitted key/directory +// Omits the first '/' +func partialpath(path []string) string { + return strings.Join(path, "/") +} diff --git a/pkg/store/store.go b/pkg/store/store.go new file mode 100644 index 0000000000..33d17d468e --- /dev/null +++ b/pkg/store/store.go @@ -0,0 +1,85 @@ +package store + +import ( + "time" + + log "github.com/Sirupsen/logrus" +) + +// WatchCallback is used for watch methods on keys +// and is triggered on key change +type WatchCallback func(value [][]byte) + +// Initialize creates a new Store object, initializing the client +type Initialize func(addrs []string, options Config) (Store, error) + +// Store represents the backend K/V storage +// Each store should support every call listed +// here. Or it couldn't be implemented as a K/V +// backend for libkv +type Store interface { + // Put a value at the specified key + Put(key string, value []byte) error + + // Get a value given its key + Get(key string) (value []byte, lastIndex uint64, err error) + + // Delete the value at the specified key + Delete(key string) error + + // Verify if a Key exists in the store + Exists(key string) (bool, error) + + // Watch changes on a key + Watch(key string, heartbeat time.Duration, callback WatchCallback) error + + // Cancel watch key + CancelWatch(key string) error + + // Acquire the lock at key + Acquire(key string, value []byte) (string, error) + + // Release the lock at key + Release(session string) error + + // Get range of keys based on prefix + GetRange(prefix string) (value [][]byte, err error) + + // Delete range of keys based on prefix + DeleteRange(prefix string) error + + // Watch key namespaces + WatchRange(prefix string, filter string, heartbeat time.Duration, callback WatchCallback) error + + // Cancel watch key range + CancelWatchRange(prefix string) error + + // Atomic operation on a single value + AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) + + // Atomic delete of a single value + AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) +} + +var ( + // List of Store services + stores map[string]Initialize +) + +func init() { + stores = make(map[string]Initialize) + stores["consul"] = InitializeConsul + stores["etcd"] = InitializeEtcd + stores["zk"] = InitializeZookeeper +} + +// CreateStore creates a an instance of store +func CreateStore(store string, addrs []string, options Config) (Store, error) { + + if init, exists := stores[store]; exists { + log.WithFields(log.Fields{"store": store}).Debug("Initializing store service") + return init(addrs, options) + } + + return nil, ErrNotSupported +} diff --git a/pkg/store/structs.go b/pkg/store/structs.go new file mode 100644 index 0000000000..6e827fea4b --- /dev/null +++ b/pkg/store/structs.go @@ -0,0 +1,42 @@ +package store + +import ( + "crypto/tls" + "errors" + "time" +) + +var ( + // ErrNotSupported is exported + ErrNotSupported = errors.New("Backend storage not supported yet, please choose another one") + // ErrNotImplemented is exported + ErrNotImplemented = errors.New("Call not implemented in current backend") + // ErrNotReachable is exported + ErrNotReachable = errors.New("Api not reachable") + // ErrCannotLock is exported + ErrCannotLock = errors.New("Error acquiring the lock") + // ErrWatchDoesNotExist is exported + ErrWatchDoesNotExist = errors.New("No watch found for specified key") + // ErrKeyModified is exported + ErrKeyModified = errors.New("Unable to complete atomic operation, key modified") + // ErrKeyNotFound is exported + ErrKeyNotFound = errors.New("Key not found in store") +) + +// KV represents the different supported K/V +type KV string + +const ( + // CONSUL is exported + CONSUL KV = "consul" + // ETCD is exported + ETCD = "etcd" + // ZOOKEEPER is exported + ZOOKEEPER = "zookeeper" +) + +// Config contains the options for a storage client +type Config struct { + TLS *tls.Config + Timeout time.Duration +} diff --git a/pkg/store/zookeeper.go b/pkg/store/zookeeper.go new file mode 100644 index 0000000000..0ef3c00541 --- /dev/null +++ b/pkg/store/zookeeper.go @@ -0,0 +1,213 @@ +package store + +import ( + "strings" + "time" + + log "github.com/Sirupsen/logrus" + zk "github.com/samuel/go-zookeeper/zk" +) + +// Zookeeper embeds the zookeeper client +// and list of watches +type Zookeeper struct { + timeout time.Duration + client *zk.Conn + watches map[string]<-chan zk.Event +} + +// InitializeZookeeper creates a new Zookeeper client +// given a list of endpoints and optional tls config +func InitializeZookeeper(endpoints []string, options Config) (Store, error) { + s := &Zookeeper{} + s.watches = make(map[string]<-chan zk.Event) + s.timeout = 5 * time.Second // default timeout + + if options.Timeout != 0 { + s.setTimeout(options.Timeout) + } + + conn, _, err := zk.Connect(endpoints, s.timeout) + if err != nil { + log.Error(err) + return nil, err + } + s.client = conn + return s, nil +} + +// SetTimeout sets the timout for connecting to Zookeeper +func (s *Zookeeper) setTimeout(time time.Duration) { + s.timeout = time +} + +// Get the value at "key", returns the last modified index +// to use in conjunction to CAS calls +func (s *Zookeeper) Get(key string) (value []byte, lastIndex uint64, err error) { + resp, meta, err := s.client.Get(format(key)) + if err != nil { + return nil, 0, err + } + if resp == nil { + return nil, 0, ErrKeyNotFound + } + return resp, uint64(meta.Mzxid), nil +} + +// Create the entire path for a directory that does not exist +func (s *Zookeeper) createFullpath(path []string) error { + for i := 1; i <= len(path); i++ { + newpath := "/" + strings.Join(path[:i], "/") + _, err := s.client.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll)) + if err != nil { + // Skip if node already exists + if err != zk.ErrNodeExists { + return err + } + } + } + return nil +} + +// Put a value at "key" +func (s *Zookeeper) Put(key string, value []byte) error { + fkey := format(key) + exists, err := s.Exists(key) + if err != nil { + return err + } + if !exists { + s.createFullpath(splitKey(key)) + } + _, err = s.client.Set(fkey, value, -1) + return err +} + +// Delete a value at "key" +func (s *Zookeeper) Delete(key string) error { + err := s.client.Delete(format(key), -1) + return err +} + +// Exists checks if the key exists inside the store +func (s *Zookeeper) Exists(key string) (bool, error) { + exists, _, err := s.client.Exists(format(key)) + if err != nil { + return false, err + } + return exists, nil +} + +// Watch a single key for modifications +func (s *Zookeeper) Watch(key string, _ time.Duration, callback WatchCallback) error { + fkey := format(key) + _, _, eventChan, err := s.client.GetW(fkey) + if err != nil { + return err + } + + // Create a new Watch entry with eventChan + s.watches[fkey] = eventChan + + for e := range eventChan { + if e.Type == zk.EventNodeChildrenChanged { + log.WithField("name", "zk").Debug("Discovery watch triggered") + entry, _, err := s.Get(key) + value := [][]byte{[]byte(entry)} + if err == nil { + callback(value) + } + } + } + + return nil +} + +// CancelWatch cancels a watch, sends a signal to the appropriate +// stop channel +func (s *Zookeeper) CancelWatch(key string) error { + key = format(key) + if _, ok := s.watches[key]; !ok { + log.Error("Chan does not exist for key: ", key) + return ErrWatchDoesNotExist + } + // Just remove the entry on watches key + s.watches[key] = nil + return nil +} + +// GetRange gets a range of values at "directory" +func (s *Zookeeper) GetRange(prefix string) (values [][]byte, err error) { + prefix = format(prefix) + entries, _, err := s.client.Children(prefix) + if err != nil { + log.Error("Cannot fetch range of keys beginning with prefix: ", prefix) + return nil, err + } + for _, item := range entries { + values = append(values, []byte(item)) + } + return values, err +} + +// DeleteRange deletes a range of values at "directory" +func (s *Zookeeper) DeleteRange(prefix string) error { + err := s.client.Delete(format(prefix), -1) + return err +} + +// WatchRange triggers a watch on a range of values at "directory" +func (s *Zookeeper) WatchRange(prefix string, filter string, _ time.Duration, callback WatchCallback) error { + fprefix := format(prefix) + _, _, eventChan, err := s.client.ChildrenW(fprefix) + if err != nil { + return err + } + + // Create a new Watch entry with eventChan + s.watches[fprefix] = eventChan + + for e := range eventChan { + if e.Type == zk.EventNodeChildrenChanged { + log.WithField("name", "zk").Debug("Discovery watch triggered") + values, err := s.GetRange(prefix) + if err == nil { + callback(values) + } + } + } + + return nil +} + +// CancelWatchRange stops the watch on the range of values, sends +// a signal to the appropriate stop channel +func (s *Zookeeper) CancelWatchRange(prefix string) error { + return s.CancelWatch(prefix) +} + +// AtomicPut put a value at "key" if the key has not been +// modified in the meantime, throws an error if this is the case +func (s *Zookeeper) AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) { + // Use index of Set method to implement CAS + return false, ErrNotImplemented +} + +// AtomicDelete deletes a value at "key" if the key has not +// been modified in the meantime, throws an error if this is the case +func (s *Zookeeper) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) { + return false, ErrNotImplemented +} + +// Acquire the lock for "key"/"directory" +func (s *Zookeeper) Acquire(path string, value []byte) (string, error) { + // lock := zk.NewLock(s.client, path, nil) + // locks[path] = lock + // lock.Lock() + return "", ErrNotImplemented +} + +// Release the lock for "key"/"directory" +func (s *Zookeeper) Release(session string) error { + return ErrNotImplemented +}