From 59e1e9b94294ec9aca0a2933c01cdfdd6a31ad37 Mon Sep 17 00:00:00 2001 From: Alexandre Beslic Date: Tue, 28 Apr 2015 16:11:44 -0700 Subject: [PATCH 1/9] Change existing discovery backends to 'kv' using metatada storage backends in the store package Signed-off-by: Alexandre Beslic --- cli/cli.go | 6 +- cli/join.go | 4 +- cli/manage.go | 4 +- cluster/options.go | 7 +- cluster/swarm/cluster.go | 6 +- discovery/consul/consul.go | 115 ---------- discovery/consul/consul_test.go | 20 -- discovery/discovery.go | 12 +- discovery/etcd/etcd.go | 89 -------- discovery/etcd/etcd_test.go | 19 -- discovery/file/file.go | 2 +- discovery/file/file_test.go | 2 +- discovery/kv/kv.go | 91 ++++++++ discovery/kv/kv_test.go | 22 ++ discovery/nodes/nodes.go | 2 +- discovery/nodes/nodes_test.go | 4 +- discovery/token/token.go | 8 +- discovery/token/token_test.go | 6 +- discovery/zookeeper/zookeeper.go | 149 ------------- discovery/zookeeper/zookeeper_test.go | 22 -- main.go | 4 +- pkg/store/README.md | 91 ++++++++ pkg/store/consul.go | 300 ++++++++++++++++++++++++++ pkg/store/etcd.go | 278 ++++++++++++++++++++++++ pkg/store/helpers.go | 51 +++++ pkg/store/store.go | 85 ++++++++ pkg/store/structs.go | 46 ++++ pkg/store/zookeeper.go | 228 ++++++++++++++++++++ 28 files changed, 1234 insertions(+), 439 deletions(-) delete mode 100644 discovery/consul/consul.go delete mode 100644 discovery/consul/consul_test.go delete mode 100644 discovery/etcd/etcd.go delete mode 100644 discovery/etcd/etcd_test.go create mode 100644 discovery/kv/kv.go create mode 100644 discovery/kv/kv_test.go delete mode 100644 discovery/zookeeper/zookeeper.go delete mode 100644 discovery/zookeeper/zookeeper_test.go create mode 100644 pkg/store/README.md create mode 100644 pkg/store/consul.go create mode 100644 pkg/store/etcd.go create mode 100644 pkg/store/helpers.go create mode 100644 pkg/store/store.go create mode 100644 pkg/store/structs.go create mode 100644 pkg/store/zookeeper.go diff --git a/cli/cli.go b/cli/cli.go index 98ed2797e1..d9772ad128 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -64,7 +64,7 @@ func Run() { log.Fatalf("the `create` command takes no arguments. See '%s create --help'.", c.App.Name) } discovery := &token.Discovery{} - discovery.Initialize("", 0) + discovery.Initialize("", 0, nil) token, err := discovery.CreateCluster() if err != nil { log.Fatal(err) @@ -82,7 +82,9 @@ func Run() { log.Fatalf("discovery required to list a cluster. See '%s list --help'.", c.App.Name) } - d, err := discovery.New(dflag, 0) + // FIXME fill TLS struct + tls := &discovery.TLS{} + d, err := discovery.New(dflag, 0, tls) if err != nil { log.Fatal(err) } diff --git a/cli/join.go b/cli/join.go index 3c5a6c7594..100f56da7c 100644 --- a/cli/join.go +++ b/cli/join.go @@ -26,7 +26,9 @@ func join(c *cli.Context) { log.Fatal("--heartbeat should be an unsigned integer and greater than 0") } - d, err := discovery.New(dflag, hb) + // FIXME Add TLS + tls := &discovery.TLS{TLSConfig: nil} + d, err := discovery.New(dflag, hb, tls) if err != nil { log.Fatal(err) } diff --git a/cli/manage.go b/cli/manage.go index 62c27c2d98..871ed55017 100644 --- a/cli/manage.go +++ b/cli/manage.go @@ -69,6 +69,7 @@ func manage(c *cli.Context) { err error ) + tls := &cluster.TLSConfig{} // If either --tls or --tlsverify are specified, load the certificates. if c.Bool("tls") || c.Bool("tlsverify") { if !c.IsSet("tlscert") || !c.IsSet("tlskey") { @@ -82,6 +83,7 @@ func manage(c *cli.Context) { c.String("tlscert"), c.String("tlskey"), c.Bool("tlsverify")) + tls.Config = tlsConfig if err != nil { log.Fatal(err) } @@ -125,7 +127,7 @@ func manage(c *cli.Context) { log.Fatal("--heartbeat should be an unsigned integer and greater than 0") } options := &cluster.Options{ - TLSConfig: tlsConfig, + TLS: tls, OvercommitRatio: c.Float64("overcommit"), Discovery: dflag, Heartbeat: hb, diff --git a/cluster/options.go b/cluster/options.go index 5c3b37ffee..b2d82ce6d8 100644 --- a/cluster/options.go +++ b/cluster/options.go @@ -4,8 +4,13 @@ import "crypto/tls" // Options is exported type Options struct { - TLSConfig *tls.Config + TLS *TLSConfig OvercommitRatio float64 Discovery string Heartbeat uint64 } + +// TLSConfig is exported +type TLSConfig struct { + Config *tls.Config +} diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index 5f4f2254dc..4353b2098d 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -42,7 +42,9 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, options *clu // get the list of entries from the discovery service go func() { - d, err := discovery.New(options.Discovery, options.Heartbeat) + discoveryTLS := &discovery.TLS{TLSConfig: options.TLS.Config} + log.Info("options TLS: ", options.TLS.Config) + d, err := discovery.New(options.Discovery, options.Heartbeat, discoveryTLS) if err != nil { log.Fatal(err) } @@ -138,7 +140,7 @@ func (c *Cluster) newEntries(entries []*discovery.Entry) { go func(m *discovery.Entry) { if !c.hasEngine(m.String()) { engine := cluster.NewEngine(m.String(), c.options.OvercommitRatio) - if err := engine.Connect(c.options.TLSConfig); err != nil { + if err := engine.Connect(c.options.TLS.Config); err != nil { log.Error(err) return } diff --git a/discovery/consul/consul.go b/discovery/consul/consul.go deleted file mode 100644 index 0ac9baf94e..0000000000 --- a/discovery/consul/consul.go +++ /dev/null @@ -1,115 +0,0 @@ -package consul - -import ( - "fmt" - "path" - "strings" - "time" - - log "github.com/Sirupsen/logrus" - "github.com/docker/swarm/discovery" - consul "github.com/hashicorp/consul/api" -) - -// Discovery is exported -type Discovery struct { - heartbeat time.Duration - client *consul.Client - prefix string - lastIndex uint64 -} - -func init() { - discovery.Register("consul", &Discovery{}) -} - -// Initialize is exported -func (s *Discovery) Initialize(uris string, heartbeat uint64) error { - parts := strings.SplitN(uris, "/", 2) - if len(parts) < 2 { - return fmt.Errorf("invalid format %q, missing ", uris) - } - addr := parts[0] - path := parts[1] - - config := consul.DefaultConfig() - config.Address = addr - - client, err := consul.NewClient(config) - if err != nil { - return err - } - s.client = client - s.heartbeat = time.Duration(heartbeat) * time.Second - s.prefix = path + "/" - kv := s.client.KV() - p := &consul.KVPair{Key: s.prefix, Value: nil} - if _, err = kv.Put(p, nil); err != nil { - return err - } - _, meta, err := kv.Get(s.prefix, nil) - if err != nil { - return err - } - s.lastIndex = meta.LastIndex - return nil -} - -// Fetch is exported -func (s *Discovery) Fetch() ([]*discovery.Entry, error) { - kv := s.client.KV() - pairs, _, err := kv.List(s.prefix, nil) - if err != nil { - return nil, err - } - - addrs := []string{} - for _, pair := range pairs { - if pair.Key == s.prefix { - continue - } - addrs = append(addrs, string(pair.Value)) - } - - return discovery.CreateEntries(addrs) -} - -// Watch is exported -func (s *Discovery) Watch(callback discovery.WatchCallback) { - for _ = range s.waitForChange() { - log.WithField("name", "consul").Debug("Discovery watch triggered") - entries, err := s.Fetch() - if err == nil { - callback(entries) - } - } -} - -// Register is exported -func (s *Discovery) Register(addr string) error { - kv := s.client.KV() - p := &consul.KVPair{Key: path.Join(s.prefix, addr), Value: []byte(addr)} - _, err := kv.Put(p, nil) - return err -} - -func (s *Discovery) waitForChange() <-chan uint64 { - c := make(chan uint64) - go func() { - for { - kv := s.client.KV() - option := &consul.QueryOptions{ - WaitIndex: s.lastIndex, - WaitTime: s.heartbeat} - _, meta, err := kv.List(s.prefix, option) - if err != nil { - log.WithField("name", "consul").Errorf("Discovery error: %v", err) - break - } - s.lastIndex = meta.LastIndex - c <- s.lastIndex - } - close(c) - }() - return c -} diff --git a/discovery/consul/consul_test.go b/discovery/consul/consul_test.go deleted file mode 100644 index ce2f87ec86..0000000000 --- a/discovery/consul/consul_test.go +++ /dev/null @@ -1,20 +0,0 @@ -package consul - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestInitialize(t *testing.T) { - discovery := &Discovery{} - - assert.Equal(t, discovery.Initialize("127.0.0.1", 0).Error(), "invalid format \"127.0.0.1\", missing ") - - assert.Error(t, discovery.Initialize("127.0.0.1/path", 0)) - assert.Equal(t, discovery.prefix, "path/") - - assert.Error(t, discovery.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0)) - assert.Equal(t, discovery.prefix, "path/") - -} diff --git a/discovery/discovery.go b/discovery/discovery.go index 6375b2ffc7..c052e6e194 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -1,6 +1,7 @@ package discovery import ( + "crypto/tls" "errors" "fmt" "net" @@ -15,6 +16,11 @@ type Entry struct { Port string } +// TLS is exported +type TLS struct { + TLSConfig *tls.Config +} + // NewEntry is exported func NewEntry(url string) (*Entry, error) { host, port, err := net.SplitHostPort(url) @@ -33,7 +39,7 @@ type WatchCallback func(entries []*Entry) // Discovery is exported type Discovery interface { - Initialize(string, uint64) error + Initialize(string, uint64, *TLS) error Fetch() ([]*Entry, error) Watch(WatchCallback) Register(string) error @@ -73,12 +79,12 @@ func parse(rawurl string) (string, string) { } // New is exported -func New(rawurl string, heartbeat uint64) (Discovery, error) { +func New(rawurl string, heartbeat uint64, tls *TLS) (Discovery, error) { scheme, uri := parse(rawurl) if discovery, exists := discoveries[scheme]; exists { log.WithFields(log.Fields{"name": scheme, "uri": uri}).Debug("Initializing discovery service") - err := discovery.Initialize(uri, heartbeat) + err := discovery.Initialize(uri, heartbeat, tls) return discovery, err } diff --git a/discovery/etcd/etcd.go b/discovery/etcd/etcd.go deleted file mode 100644 index f4e3e2204b..0000000000 --- a/discovery/etcd/etcd.go +++ /dev/null @@ -1,89 +0,0 @@ -package etcd - -import ( - "fmt" - "path" - "strings" - - log "github.com/Sirupsen/logrus" - "github.com/coreos/go-etcd/etcd" - "github.com/docker/swarm/discovery" -) - -// Discovery is exported -type Discovery struct { - ttl uint64 - client *etcd.Client - path string -} - -func init() { - discovery.Register("etcd", &Discovery{}) -} - -// Initialize is exported -func (s *Discovery) Initialize(uris string, heartbeat uint64) error { - var ( - // split here because uris can contain multiples ips - // like `etcd://192.168.0.1,192.168.0.2,192.168.0.3/path` - parts = strings.SplitN(uris, "/", 2) - ips = strings.Split(parts[0], ",") - entries []string - ) - - if len(parts) != 2 { - return fmt.Errorf("invalid format %q, missing ", uris) - } - - for _, ip := range ips { - entries = append(entries, "http://"+ip) - } - - s.client = etcd.NewClient(entries) - // ttl should always be > heartbeat, even if heartbeat = 1 or 0 - s.ttl = uint64(heartbeat*3/2) + 1 - s.path = "/" + parts[1] + "/" - if _, err := s.client.CreateDir(s.path, s.ttl); err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - if etcdError.ErrorCode != 105 { // skip key already exists - return err - } - } else { - return err - } - } - return nil -} - -// Fetch is exported -func (s *Discovery) Fetch() ([]*discovery.Entry, error) { - resp, err := s.client.Get(s.path, true, true) - if err != nil { - return nil, err - } - - addrs := []string{} - for _, n := range resp.Node.Nodes { - addrs = append(addrs, n.Value) - } - return discovery.CreateEntries(addrs) -} - -// Watch is exported -func (s *Discovery) Watch(callback discovery.WatchCallback) { - watchChan := make(chan *etcd.Response) - go s.client.Watch(s.path, 0, true, watchChan, nil) - for _ = range watchChan { - log.WithField("name", "etcd").Debug("Discovery watch triggered") - entries, err := s.Fetch() - if err == nil { - callback(entries) - } - } -} - -// Register is exported -func (s *Discovery) Register(addr string) error { - _, err := s.client.Set(path.Join(s.path, addr), addr, s.ttl) - return err -} diff --git a/discovery/etcd/etcd_test.go b/discovery/etcd/etcd_test.go deleted file mode 100644 index bbca1bc77c..0000000000 --- a/discovery/etcd/etcd_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package etcd - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestInitialize(t *testing.T) { - discovery := &Discovery{} - - assert.Equal(t, discovery.Initialize("127.0.0.1", 0).Error(), "invalid format \"127.0.0.1\", missing ") - - assert.Error(t, discovery.Initialize("127.0.0.1/path", 0)) - assert.Equal(t, discovery.path, "/path/") - - assert.Error(t, discovery.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0)) - assert.Equal(t, discovery.path, "/path/") -} diff --git a/discovery/file/file.go b/discovery/file/file.go index ba43aee077..63b0d027bd 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -19,7 +19,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(path string, heartbeat uint64) error { +func (s *Discovery) Initialize(path string, heartbeat uint64, _ *discovery.TLS) error { s.path = path s.heartbeat = heartbeat return nil diff --git a/discovery/file/file_test.go b/discovery/file/file_test.go index 14ac918b82..4b085a360e 100644 --- a/discovery/file/file_test.go +++ b/discovery/file/file_test.go @@ -8,7 +8,7 @@ import ( func TestInitialize(t *testing.T) { discovery := &Discovery{} - discovery.Initialize("/path/to/file", 0) + discovery.Initialize("/path/to/file", 0, nil) assert.Equal(t, discovery.path, "/path/to/file") } diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go new file mode 100644 index 0000000000..3758788ef4 --- /dev/null +++ b/discovery/kv/kv.go @@ -0,0 +1,91 @@ +package kv + +import ( + "fmt" + "path" + "strings" + "time" + + "github.com/docker/swarm/discovery" + "github.com/docker/swarm/pkg/store" +) + +// Discovery is exported +type Discovery struct { + store store.Store + name string + heartbeat time.Duration + prefix string +} + +func init() { + discovery.Register("zk", &Discovery{name: "zk"}) + discovery.Register("consul", &Discovery{name: "consul"}) + discovery.Register("etcd", &Discovery{name: "etcd"}) +} + +// Initialize is exported +func (s *Discovery) Initialize(uris string, heartbeat uint64, tls *discovery.TLS) error { + var ( + parts = strings.SplitN(uris, "/", 2) + ips = strings.Split(parts[0], ",") + addrs []string + err error + ) + + if len(parts) != 2 { + return fmt.Errorf("invalid format %q, missing ", uris) + } + + for _, ip := range ips { + addrs = append(addrs, ip) + } + + s.heartbeat = time.Duration(heartbeat) * time.Second + s.prefix = parts[1] + + // Creates a new store, will ignore options given + // if not supported by the chosen store + s.store, err = store.CreateStore( + s.name, // name of the store + addrs, + store.Timeout(time.Duration(heartbeat)*time.Second), + store.TLSConfig(tls.TLSConfig), + ) + if err != nil { + return err + } + + return nil +} + +// Fetch is exported +func (s *Discovery) Fetch() ([]*discovery.Entry, error) { + addrs, err := s.store.GetRange(s.prefix) + if err != nil { + return nil, err + } + return discovery.CreateEntries(convertToStringArray(addrs)) +} + +// Watch is exported +func (s *Discovery) Watch(callback discovery.WatchCallback) { + s.store.WatchRange(s.prefix, "", s.heartbeat, func(kvalues [][]byte) { + // Traduce byte array entries to discovery.Entry + entries, _ := discovery.CreateEntries(convertToStringArray(kvalues)) + callback(entries) + }) +} + +// Register is exported +func (s *Discovery) Register(addr string) error { + err := s.store.Put(path.Join(s.prefix, addr), []byte(addr)) + return err +} + +func convertToStringArray(entries [][]byte) (addrs []string) { + for _, entry := range entries { + addrs = append(addrs, string(entry)) + } + return addrs +} diff --git a/discovery/kv/kv_test.go b/discovery/kv/kv_test.go new file mode 100644 index 0000000000..a7b6b7925c --- /dev/null +++ b/discovery/kv/kv_test.go @@ -0,0 +1,22 @@ +package kv + +import ( + "testing" + + "github.com/docker/swarm/discovery" + "github.com/stretchr/testify/assert" +) + +func TestInitialize(t *testing.T) { + discoveryService := &Discovery{} + tls := &discovery.TLS{} + + assert.Equal(t, discoveryService.Initialize("127.0.0.1", 0, tls).Error(), "invalid format \"127.0.0.1\", missing ") + + assert.Error(t, discoveryService.Initialize("127.0.0.1/path", 0, tls)) + assert.Equal(t, discoveryService.prefix, "path") + + assert.Error(t, discoveryService.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0, tls)) + assert.Equal(t, discoveryService.prefix, "path") + +} diff --git a/discovery/nodes/nodes.go b/discovery/nodes/nodes.go index 009d275ff2..cd0646d6df 100644 --- a/discovery/nodes/nodes.go +++ b/discovery/nodes/nodes.go @@ -16,7 +16,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(uris string, _ uint64) error { +func (s *Discovery) Initialize(uris string, _ uint64, _ *discovery.TLS) error { for _, input := range strings.Split(uris, ",") { for _, ip := range discovery.Generate(input) { entry, err := discovery.NewEntry(ip) diff --git a/discovery/nodes/nodes_test.go b/discovery/nodes/nodes_test.go index 04f682cabb..0a3ddeb03c 100644 --- a/discovery/nodes/nodes_test.go +++ b/discovery/nodes/nodes_test.go @@ -8,7 +8,7 @@ import ( func TestInitialise(t *testing.T) { discovery := &Discovery{} - discovery.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0) + discovery.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0, nil) assert.Equal(t, len(discovery.entries), 2) assert.Equal(t, discovery.entries[0].String(), "1.1.1.1:1111") assert.Equal(t, discovery.entries[1].String(), "2.2.2.2:2222") @@ -16,7 +16,7 @@ func TestInitialise(t *testing.T) { func TestInitialiseWithPattern(t *testing.T) { discovery := &Discovery{} - discovery.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0) + discovery.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0, nil) assert.Equal(t, len(discovery.entries), 5) assert.Equal(t, discovery.entries[0].String(), "1.1.1.1:1111") assert.Equal(t, discovery.entries[1].String(), "1.1.1.2:1111") diff --git a/discovery/token/token.go b/discovery/token/token.go index bada10ce52..f9fa081d26 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -27,7 +27,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(urltoken string, heartbeat uint64) error { +func (s *Discovery) Initialize(urltoken string, heartbeat uint64, _ *discovery.TLS) error { if i := strings.LastIndex(urltoken, "/"); i != -1 { s.url = "https://" + urltoken[:i] s.token = urltoken[i+1:] @@ -54,16 +54,16 @@ func (s *Discovery) Fetch() ([]*discovery.Entry, error) { defer resp.Body.Close() - var addrs []string + var entries []string if resp.StatusCode == http.StatusOK { - if err := json.NewDecoder(resp.Body).Decode(&addrs); err != nil { + if err := json.NewDecoder(resp.Body).Decode(&entries); err != nil { return nil, err } } else { return nil, fmt.Errorf("Failed to fetch entries, Discovery service returned %d HTTP status code", resp.StatusCode) } - return discovery.CreateEntries(addrs) + return discovery.CreateEntries(entries) } // Watch is exported diff --git a/discovery/token/token_test.go b/discovery/token/token_test.go index a4e1f42edb..b09fbbf591 100644 --- a/discovery/token/token_test.go +++ b/discovery/token/token_test.go @@ -8,17 +8,17 @@ import ( func TestInitialize(t *testing.T) { discovery := &Discovery{} - err := discovery.Initialize("token", 0) + err := discovery.Initialize("token", 0, nil) assert.NoError(t, err) assert.Equal(t, discovery.token, "token") assert.Equal(t, discovery.url, DiscoveryURL) - err = discovery.Initialize("custom/path/token", 0) + err = discovery.Initialize("custom/path/token", 0, nil) assert.NoError(t, err) assert.Equal(t, discovery.token, "token") assert.Equal(t, discovery.url, "https://custom/path") - err = discovery.Initialize("", 0) + err = discovery.Initialize("", 0, nil) assert.Error(t, err) } diff --git a/discovery/zookeeper/zookeeper.go b/discovery/zookeeper/zookeeper.go deleted file mode 100644 index 2b1d4ad16e..0000000000 --- a/discovery/zookeeper/zookeeper.go +++ /dev/null @@ -1,149 +0,0 @@ -package zookeeper - -import ( - "fmt" - "path" - "strings" - "time" - - log "github.com/Sirupsen/logrus" - "github.com/docker/swarm/discovery" - "github.com/samuel/go-zookeeper/zk" -) - -// Discovery is exported -type Discovery struct { - conn *zk.Conn - path []string - heartbeat uint64 -} - -func init() { - discovery.Register("zk", &Discovery{}) -} - -func (s *Discovery) fullpath() string { - return "/" + strings.Join(s.path, "/") -} - -func (s *Discovery) createFullpath() error { - for i := 1; i <= len(s.path); i++ { - newpath := "/" + strings.Join(s.path[:i], "/") - _, err := s.conn.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll)) - if err != nil { - // It's OK if key already existed. Just skip. - if err != zk.ErrNodeExists { - return err - } - } - } - return nil -} - -// Initialize is exported -func (s *Discovery) Initialize(uris string, heartbeat uint64) error { - var ( - // split here because uris can contain multiples ips - // like `zk://192.168.0.1,192.168.0.2,192.168.0.3/path` - parts = strings.SplitN(uris, "/", 2) - ips = strings.Split(parts[0], ",") - ) - - if len(parts) != 2 { - return fmt.Errorf("invalid format %q, missing ", uris) - } - - if strings.Contains(parts[1], "/") { - s.path = strings.Split(parts[1], "/") - } else { - s.path = []string{parts[1]} - } - - conn, _, err := zk.Connect(ips, time.Second) - if err != nil { - return err - } - - s.conn = conn - s.heartbeat = heartbeat - err = s.createFullpath() - if err != nil { - return err - } - - return nil -} - -// Fetch is exported -func (s *Discovery) Fetch() ([]*discovery.Entry, error) { - addrs, _, err := s.conn.Children(s.fullpath()) - - if err != nil { - return nil, err - } - - return discovery.CreateEntries(addrs) -} - -// Watch is exported -func (s *Discovery) Watch(callback discovery.WatchCallback) { - - addrs, _, eventChan, err := s.conn.ChildrenW(s.fullpath()) - if err != nil { - log.WithField("name", "zk").Debug("Discovery watch aborted") - return - } - entries, err := discovery.CreateEntries(addrs) - if err == nil { - callback(entries) - } - - for e := range eventChan { - if e.Type == zk.EventNodeChildrenChanged { - log.WithField("name", "zk").Debug("Discovery watch triggered") - entries, err := s.Fetch() - if err == nil { - callback(entries) - } - } - - } - -} - -// Register is exported -func (s *Discovery) Register(addr string) error { - nodePath := path.Join(s.fullpath(), addr) - - // check existing for the parent path first - exist, _, err := s.conn.Exists(s.fullpath()) - if err != nil { - return err - } - - // if the parent path does not exist yet - if exist == false { - // create the parent first - err = s.createFullpath() - if err != nil { - return err - } - } else { - // if node path exists - exist, _, err = s.conn.Exists(nodePath) - if err != nil { - return err - } - // delete it first - if exist { - err = s.conn.Delete(nodePath, -1) - if err != nil { - return err - } - } - } - - // create the node path to store address information - _, err = s.conn.Create(nodePath, []byte(addr), 0, zk.WorldACL(zk.PermAll)) - return err -} diff --git a/discovery/zookeeper/zookeeper_test.go b/discovery/zookeeper/zookeeper_test.go deleted file mode 100644 index b09144da1b..0000000000 --- a/discovery/zookeeper/zookeeper_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package zookeeper - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestInitialize(t *testing.T) { - service := &Discovery{} - - assert.Equal(t, service.Initialize("127.0.0.1", 0).Error(), "invalid format \"127.0.0.1\", missing ") - - assert.Error(t, service.Initialize("127.0.0.1/path", 0)) - assert.Equal(t, service.fullpath(), "/path") - - assert.Error(t, service.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0)) - assert.Equal(t, service.fullpath(), "/path") - - assert.Error(t, service.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path/sub1/sub2", 0)) - assert.Equal(t, service.fullpath(), "/path/sub1/sub2") -} diff --git a/main.go b/main.go index 899d90cb13..f90922793d 100644 --- a/main.go +++ b/main.go @@ -1,12 +1,10 @@ package main import ( - _ "github.com/docker/swarm/discovery/consul" - _ "github.com/docker/swarm/discovery/etcd" _ "github.com/docker/swarm/discovery/file" + _ "github.com/docker/swarm/discovery/kv" _ "github.com/docker/swarm/discovery/nodes" _ "github.com/docker/swarm/discovery/token" - _ "github.com/docker/swarm/discovery/zookeeper" "github.com/docker/swarm/cli" ) diff --git a/pkg/store/README.md b/pkg/store/README.md new file mode 100644 index 0000000000..1d8b15bf07 --- /dev/null +++ b/pkg/store/README.md @@ -0,0 +1,91 @@ +--- +page_title: Docker Swarm storage +page_description: Swarm storage +page_keywords: docker, swarm, clustering, storage, metadata +--- + +# Storage + +Docker Swarm comes with multiple Key/Value storage backends. This package is used by the discovery service to register machines inside the cluster. It is also used to store cluster's metadata. + +## Example of usage + +### Create a new store and use Put/Get + +```go +package main + +import ( + "fmt" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/docker/swarm/store" +) + +func main() { + var ( + client = "localhost:8500" + ) + + // Initialize a new store with consul + kv, err := store.CreateStore( + store.Consul, + []string{client}, + store.Timeout(10*time.Second), + ) + if err != nil { + log.Error("Cannot create store consul") + } + + key := "foo" + err = kv.Put(key, []byte("bar")) + if err != nil { + log.Error("Error trying to put value at key `", key, "`") + } + + value, _, err := kv.Get(key) + if err != nil { + log.Error("Error trying accessing value at key `", key, "`") + } + + log.Info("value: ", string(value)) +} +``` + + + +## Contributing to a new storage backend + +A new **storage backend** should include those calls: + +```go +type Store interface { + Put(key string, value []byte) error + Get(key string) (value []byte, lastIndex uint64, err error) + Delete(key string) error + Exists(key string) (bool, error) + Watch(key string, ttl uint64, callback WatchCallback) error + CancelWatch(key string) error + Acquire(key string, value []byte) (string, error) + Release(session string) error + GetRange(prefix string) (value [][]byte, err error) + DeleteRange(prefix string) error + WatchRange(prefix string, filter string, heartbeat uint64, callback WatchCallback) error + CancelWatchRange(prefix string) error + AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) + AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) +} +``` + +To be elligible as a **discovery backend** only, a K/V store implementation should at least offer `Get`, `Put`, `WatchRange`, `GetRange`. + +You can get inspiration from existing backends to create a new one. This interface could be subject to changes to improve the experience of using the library and contributing to a new backend. + + +## Docker Swarm documentation index + +- [User guide](./index.md) +- [Sheduler strategies](./scheduler/strategy.md) +- [Sheduler filters](./scheduler/filter.md) +- [Swarm API](./API.md) diff --git a/pkg/store/consul.go b/pkg/store/consul.go new file mode 100644 index 0000000000..e1418e377f --- /dev/null +++ b/pkg/store/consul.go @@ -0,0 +1,300 @@ +package store + +import ( + "crypto/tls" + "errors" + "net/http" + "time" + + log "github.com/Sirupsen/logrus" + api "github.com/hashicorp/consul/api" +) + +var ( + // ErrSessionUndefined is exported + ErrSessionUndefined = errors.New("Session does not exist") +) + +// Consul embeds the client and watches/lock sessions +type Consul struct { + config *api.Config + client *api.Client + sessions map[string]*api.Session + watches map[string]*Watch +} + +// Watch embeds the event channel and the +// refresh interval +type Watch struct { + LastIndex uint64 + Interval time.Duration + EventChan interface{} +} + +// InitializeConsul creates a new Consul client given +// a list of endpoints and optional tls config +func InitializeConsul(endpoints []string, options ...interface{}) (Store, error) { + s := &Consul{} + s.sessions = make(map[string]*api.Session) + s.watches = make(map[string]*Watch) + + // Create Consul client + config := api.DefaultConfig() + s.config = config + config.HttpClient = http.DefaultClient + config.Address = endpoints[0] + config.Scheme = "http" + + // Sets all the options + s.SetOptions(options...) + + // Creates a new client + client, err := api.NewClient(config) + if err != nil { + log.Errorf("Couldn't initialize consul client..") + return nil, err + } + s.client = client + + return s, nil +} + +// SetOptions sets the options for Consul +func (s *Consul) SetOptions(options ...interface{}) { + for _, opt := range options { + + switch opt := opt.(type) { + + case *tls.Config: + s.SetTLS(opt) + + case time.Duration: + s.SetTimeout(opt) + + default: + // TODO give more meaningful information to print + log.Info("store: option unsupported for consul") + + } + } +} + +// SetTLS sets Consul TLS options +func (s *Consul) SetTLS(tls *tls.Config) { + if tls != nil { + s.config.HttpClient.Transport = &http.Transport{ + TLSClientConfig: tls, + } + s.config.Scheme = "https" + } +} + +// SetTimeout sets the timout for connecting to Consul +func (s *Consul) SetTimeout(time time.Duration) { + s.config.WaitTime = time +} + +// Get the value at "key", returns the last modified index +// to use in conjunction to CAS calls +func (s *Consul) Get(key string) (value []byte, lastIndex uint64, err error) { + pair, meta, err := s.client.KV().Get(partialFormat(key), nil) + if err != nil { + return nil, 0, err + } + if pair == nil { + return nil, 0, ErrKeyNotFound + } + return pair.Value, meta.LastIndex, nil +} + +// Put a value at "key" +func (s *Consul) Put(key string, value []byte) error { + p := &api.KVPair{Key: partialFormat(key), Value: value} + if s.client == nil { + log.Error("Error initializing client") + } + _, err := s.client.KV().Put(p, nil) + return err +} + +// Delete a value at "key" +func (s *Consul) Delete(key string) error { + _, err := s.client.KV().Delete(partialFormat(key), nil) + return err +} + +// Exists checks that the key exists inside the store +func (s *Consul) Exists(key string) (bool, error) { + _, _, err := s.Get(key) + if err != nil && err == ErrKeyNotFound { + return false, err + } + return true, nil +} + +// GetRange gets a range of values at "directory" +func (s *Consul) GetRange(prefix string) (values [][]byte, err error) { + pairs, _, err := s.client.KV().List(partialFormat(prefix), nil) + if err != nil { + return nil, err + } + for _, pair := range pairs { + if pair.Key == prefix { + continue + } + values = append(values, pair.Value) + } + return values, nil +} + +// DeleteRange deletes a range of values at "directory" +func (s *Consul) DeleteRange(prefix string) error { + _, err := s.client.KV().DeleteTree(partialFormat(prefix), nil) + return err +} + +// Watch a single key for modifications +func (s *Consul) Watch(key string, heartbeat time.Duration, callback WatchCallback) error { + key = partialFormat(key) + interval := heartbeat + eventChan := s.waitForChange(key) + s.watches[key] = &Watch{Interval: interval, EventChan: eventChan} + + for _ = range eventChan { + log.WithField("name", "consul").Debug("Key watch triggered") + entry, _, err := s.Get(key) + if err != nil { + log.Error("Cannot refresh the key: ", key, ", cancelling watch") + s.watches[key] = nil + return err + } + + value := [][]byte{[]byte(entry)} + callback(value) + } + + return nil +} + +// CancelWatch cancels a watch, sends a signal to the appropriate +// stop channel +func (s *Consul) CancelWatch(key string) error { + key = partialFormat(key) + if _, ok := s.watches[key]; !ok { + log.Error("Chan does not exist for key: ", key) + return ErrWatchDoesNotExist + } + s.watches[key] = nil + return nil +} + +// Internal function to check if a key has changed +func (s *Consul) waitForChange(key string) <-chan uint64 { + ch := make(chan uint64) + go func() { + for { + watch, ok := s.watches[key] + if !ok { + log.Error("Cannot access last index for key: ", key, " closing channel") + break + } + option := &api.QueryOptions{ + WaitIndex: watch.LastIndex, + WaitTime: watch.Interval} + _, meta, err := s.client.KV().Get(key, option) + if err != nil { + log.WithField("name", "consul").Errorf("Discovery error: %v", err) + break + } + watch.LastIndex = meta.LastIndex + ch <- watch.LastIndex + } + close(ch) + }() + return ch +} + +// WatchRange triggers a watch on a range of values at "directory" +func (s *Consul) WatchRange(prefix string, filter string, heartbeat time.Duration, callback WatchCallback) error { + prefix = partialFormat(prefix) + interval := heartbeat + eventChan := s.waitForChange(prefix) + s.watches[prefix] = &Watch{Interval: interval, EventChan: eventChan} + + for _ = range eventChan { + log.WithField("name", "consul").Debug("Key watch triggered") + values, err := s.GetRange(prefix) + if err != nil { + log.Error("Cannot refresh keys with prefix: ", prefix, ", cancelling watch") + s.watches[prefix] = nil + return err + } + callback(values) + } + + return nil +} + +// CancelWatchRange stops the watch on the range of values, sends +// a signal to the appropriate stop channel +func (s *Consul) CancelWatchRange(prefix string) error { + return s.CancelWatch(prefix) +} + +// Acquire the lock for "key"/"directory" +func (s *Consul) Acquire(key string, value []byte) (string, error) { + key = partialFormat(key) + session := s.client.Session() + id, _, err := session.CreateNoChecks(nil, nil) + if err != nil { + return "", err + } + + // Add session to map + s.sessions[id] = session + + p := &api.KVPair{Key: key, Value: value, Session: id} + if work, _, err := s.client.KV().Acquire(p, nil); err != nil { + return "", err + } else if !work { + return "", ErrCannotLock + } + + return id, nil +} + +// Release the lock for "key"/"directory" +func (s *Consul) Release(id string) error { + if _, ok := s.sessions[id]; !ok { + log.Error("Lock session does not exist") + return ErrSessionUndefined + } + session := s.sessions[id] + session.Destroy(id, nil) + s.sessions[id] = nil + return nil +} + +// AtomicPut put a value at "key" if the key has not been +// modified in the meantime, throws an error if this is the case +func (s *Consul) AtomicPut(key string, _ []byte, newValue []byte, index uint64) (bool, error) { + p := &api.KVPair{Key: partialFormat(key), Value: newValue, ModifyIndex: index} + if work, _, err := s.client.KV().CAS(p, nil); err != nil { + return false, err + } else if !work { + return false, ErrKeyModified + } + return true, nil +} + +// AtomicDelete deletes a value at "key" if the key has not +// been modified in the meantime, throws an error if this is the case +func (s *Consul) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) { + p := &api.KVPair{Key: partialFormat(key), ModifyIndex: index} + if work, _, err := s.client.KV().DeleteCAS(p, nil); err != nil { + return false, err + } else if !work { + return false, ErrKeyModified + } + return true, nil +} diff --git a/pkg/store/etcd.go b/pkg/store/etcd.go new file mode 100644 index 0000000000..16a6bd7245 --- /dev/null +++ b/pkg/store/etcd.go @@ -0,0 +1,278 @@ +package store + +import ( + "crypto/tls" + "net" + "net/http" + "strings" + "time" + + log "github.com/Sirupsen/logrus" + etcd "github.com/coreos/go-etcd/etcd" +) + +// Etcd embeds the client +type Etcd struct { + client *etcd.Client + watches map[string]chan<- bool +} + +// InitializeEtcd creates a new Etcd client given +// a list of endpoints and optional tls config +func InitializeEtcd(addrs []string, options ...interface{}) (Store, error) { + s := &Etcd{} + s.watches = make(map[string]chan<- bool) + + entries := createEndpoints(addrs, "http") + s.client = etcd.NewClient(entries) + s.SetOptions(options...) + return s, nil +} + +// SetOptions sets the options for Etcd +func (s *Etcd) SetOptions(options ...interface{}) { + for _, opt := range options { + + switch opt := opt.(type) { + + case *tls.Config: + s.SetTLS(opt) + + case time.Duration: + s.SetTimeout(opt) + + default: + // TODO give more meaningful information to print + log.Info("store: option unsupported for etcd") + + } + } +} + +// SetTLS sets the tls configuration given the path +// of certificate files +func (s *Etcd) SetTLS(tls *tls.Config) { + if tls != nil { + // Change to https scheme + var addrs []string + entries := s.client.GetCluster() + for _, entry := range entries { + addrs = append(addrs, strings.Replace(entry, "http", "https", -1)) + } + s.client.SetCluster(addrs) + + // Set transport + t := http.Transport{ + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, // default timeout + KeepAlive: 30 * time.Second, + }).Dial, + TLSHandshakeTimeout: 10 * time.Second, + TLSClientConfig: tls, + } + s.client.SetTransport(&t) + } +} + +// SetTimeout sets the timeout used for connecting to the store +func (s *Etcd) SetTimeout(time time.Duration) { + s.client.SetDialTimeout(time) +} + +// Create the entire path for a directory that does not exist +func (s *Etcd) createDirectory(path string) error { + // TODO Handle TTL at key/dir creation -> use K/V struct for key infos? + if _, err := s.client.CreateDir(format(path), 10); err != nil { + if etcdError, ok := err.(*etcd.EtcdError); ok { + if etcdError.ErrorCode != 105 { // Skip key already exists + return err + } + } else { + return err + } + } + return nil +} + +// Get the value at "key", returns the last modified index +// to use in conjunction to CAS calls +func (s *Etcd) Get(key string) (value []byte, lastIndex uint64, err error) { + result, err := s.client.Get(format(key), false, false) + if err != nil { + if etcdError, ok := err.(*etcd.EtcdError); ok { + // Not a Directory or Not a file + if etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 { + return nil, 0, ErrKeyNotFound + } + } + return nil, 0, err + } + return []byte(result.Node.Value), result.Node.ModifiedIndex, nil +} + +// Put a value at "key" +func (s *Etcd) Put(key string, value []byte) error { + if _, err := s.client.Set(key, string(value), 0); err != nil { + if etcdError, ok := err.(*etcd.EtcdError); ok { + if etcdError.ErrorCode == 104 { // Not a directory + // Remove the last element (the actual key) and set the prefix as a dir + err = s.createDirectory(getDir(key)) + if _, err := s.client.Set(key, string(value), 0); err != nil { + return err + } + } + } + return err + } + return nil +} + +// Delete a value at "key" +func (s *Etcd) Delete(key string) error { + if _, err := s.client.Delete(format(key), false); err != nil { + return err + } + return nil +} + +// Exists checks if the key exists inside the store +func (s *Etcd) Exists(key string) (bool, error) { + value, _, err := s.Get(key) + if err != nil { + if err == ErrKeyNotFound || value == nil { + return false, nil + } + return false, err + } + return true, nil +} + +// Watch a single key for modifications +func (s *Etcd) Watch(key string, _ time.Duration, callback WatchCallback) error { + key = format(key) + watchChan := make(chan *etcd.Response) + stopChan := make(chan bool) + + // Create new Watch entry + s.watches[key] = stopChan + + // Start watch + go s.client.Watch(key, 0, false, watchChan, stopChan) + + for _ = range watchChan { + log.WithField("name", "etcd").Debug("Discovery watch triggered") + entry, _, err := s.Get(key) + if err != nil { + log.Error("Cannot refresh the key: ", key, ", cancelling watch") + s.watches[key] = nil + return err + } + value := [][]byte{[]byte(entry)} + callback(value) + } + return nil +} + +// CancelWatch cancels a watch, sends a signal to the appropriate +// stop channel +func (s *Etcd) CancelWatch(key string) error { + key = format(key) + if _, ok := s.watches[key]; !ok { + log.Error("Chan does not exist for key: ", key) + return ErrWatchDoesNotExist + } + // Send stop signal to event chan + s.watches[key] <- true + s.watches[key] = nil + return nil +} + +// AtomicPut put a value at "key" if the key has not been +// modified in the meantime, throws an error if this is the case +func (s *Etcd) AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) { + resp, err := s.client.CompareAndSwap(format(key), string(newValue), 5, string(oldValue), 0) + if err != nil { + return false, err + } + if !(resp.Node.Value == string(newValue) && resp.Node.Key == key && resp.Node.TTL == 5) { + return false, ErrKeyModified + } + if !(resp.PrevNode.Value == string(newValue) && resp.PrevNode.Key == key && resp.PrevNode.TTL == 5) { + return false, ErrKeyModified + } + return true, nil +} + +// AtomicDelete deletes a value at "key" if the key has not +// been modified in the meantime, throws an error if this is the case +func (s *Etcd) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) { + resp, err := s.client.CompareAndDelete(format(key), string(oldValue), 0) + if err != nil { + return false, err + } + if !(resp.PrevNode.Value == string(oldValue) && resp.PrevNode.Key == key && resp.PrevNode.TTL == 5) { + return false, ErrKeyModified + } + return true, nil +} + +// GetRange gets a range of values at "directory" +func (s *Etcd) GetRange(prefix string) (value [][]byte, err error) { + resp, err := s.client.Get(format(prefix), true, true) + if err != nil { + return nil, err + } + values := [][]byte{} + for _, n := range resp.Node.Nodes { + values = append(values, []byte(n.Value)) + } + return values, nil +} + +// DeleteRange deletes a range of values at "directory" +func (s *Etcd) DeleteRange(prefix string) error { + if _, err := s.client.Delete(format(prefix), true); err != nil { + return err + } + return nil +} + +// WatchRange triggers a watch on a range of values at "directory" +func (s *Etcd) WatchRange(prefix string, filter string, _ time.Duration, callback WatchCallback) error { + prefix = format(prefix) + watchChan := make(chan *etcd.Response) + stopChan := make(chan bool) + + // Create new Watch entry + s.watches[prefix] = stopChan + + // Start watch + go s.client.Watch(prefix, 0, true, watchChan, stopChan) + for _ = range watchChan { + log.WithField("name", "etcd").Debug("Discovery watch triggered") + values, err := s.GetRange(prefix) + if err != nil { + log.Error("Cannot refresh the key: ", prefix, ", cancelling watch") + s.watches[prefix] = nil + return err + } + callback(values) + } + return nil +} + +// CancelWatchRange stops the watch on the range of values, sends +// a signal to the appropriate stop channel +func (s *Etcd) CancelWatchRange(prefix string) error { + return s.CancelWatch(format(prefix)) +} + +// Acquire the lock for "key"/"directory" +func (s *Etcd) Acquire(key string, value []byte) (string, error) { + return "", ErrNotImplemented +} + +// Release the lock for "key"/"directory" +func (s *Etcd) Release(session string) error { + return ErrNotImplemented +} diff --git a/pkg/store/helpers.go b/pkg/store/helpers.go new file mode 100644 index 0000000000..2c94b54c14 --- /dev/null +++ b/pkg/store/helpers.go @@ -0,0 +1,51 @@ +package store + +import ( + "strings" +) + +// Creates a list of endpoints given the right scheme +func createEndpoints(addrs []string, scheme string) (entries []string) { + for _, addr := range addrs { + entries = append(entries, scheme+"://"+addr) + } + return entries +} + +// Formats the key +func format(key string) string { + return fullpath(splitKey(key)) +} + +// Formats the key partially (omits the first '/') +func partialFormat(key string) string { + return partialpath(splitKey(key)) +} + +// Get the full directory part of the key +func getDir(key string) string { + parts := splitKey(key) + parts = parts[:len(parts)-1] + return fullpath(parts) +} + +// SplitKey splits the key to extract path informations +func splitKey(key string) (path []string) { + if strings.Contains(key, "/") { + path = strings.Split(key, "/") + } else { + path = []string{key} + } + return path +} + +// Get the full correct path representation of a splitted key/directory +func fullpath(path []string) string { + return "/" + strings.Join(path, "/") +} + +// Get the partial correct path representation of a splitted key/directory +// Omits the first '/' +func partialpath(path []string) string { + return strings.Join(path, "/") +} diff --git a/pkg/store/store.go b/pkg/store/store.go new file mode 100644 index 0000000000..a5640013e1 --- /dev/null +++ b/pkg/store/store.go @@ -0,0 +1,85 @@ +package store + +import ( + "time" + + log "github.com/Sirupsen/logrus" +) + +// WatchCallback is used for watch methods on keys +// and is triggered on key change +type WatchCallback func(value [][]byte) + +// Initialize creates a new Store object, initializing the client +type Initialize func(addrs []string, options ...interface{}) (Store, error) + +// Store represents the backend K/V storage +// Each store should support every call listed +// here. Or it couldn't be implemented as a K/V +// backend for libkv +type Store interface { + // Put a value at the specified key + Put(key string, value []byte) error + + // Get a value given its key + Get(key string) (value []byte, lastIndex uint64, err error) + + // Delete the value at the specified key + Delete(key string) error + + // Verify if a Key exists in the store + Exists(key string) (bool, error) + + // Watch changes on a key + Watch(key string, heartbeat time.Duration, callback WatchCallback) error + + // Cancel watch key + CancelWatch(key string) error + + // Acquire the lock at key + Acquire(key string, value []byte) (string, error) + + // Release the lock at key + Release(session string) error + + // Get range of keys based on prefix + GetRange(prefix string) (value [][]byte, err error) + + // Delete range of keys based on prefix + DeleteRange(prefix string) error + + // Watch key namespaces + WatchRange(prefix string, filter string, heartbeat time.Duration, callback WatchCallback) error + + // Cancel watch key range + CancelWatchRange(prefix string) error + + // Atomic operation on a single value + AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) + + // Atomic delete of a single value + AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) +} + +var ( + // List of Store services + stores map[string]Initialize +) + +func init() { + stores = make(map[string]Initialize) + stores["consul"] = InitializeConsul + stores["etcd"] = InitializeEtcd + stores["zk"] = InitializeZookeeper +} + +// CreateStore creates a an instance of store +func CreateStore(store string, addrs []string, opts ...interface{}) (Store, error) { + + if init, exists := stores[store]; exists { + log.WithFields(log.Fields{"store": store}).Debug("Initializing store service") + return init(addrs, opts...) + } + + return nil, ErrNotSupported +} diff --git a/pkg/store/structs.go b/pkg/store/structs.go new file mode 100644 index 0000000000..6566a8302c --- /dev/null +++ b/pkg/store/structs.go @@ -0,0 +1,46 @@ +package store + +import ( + "crypto/tls" + "errors" + "time" +) + +var ( + // ErrNotSupported is exported + ErrNotSupported = errors.New("Backend storage not supported yet, please choose another one") + // ErrNotImplemented is exported + ErrNotImplemented = errors.New("Call not implemented in current backend") + // ErrNotReachable is exported + ErrNotReachable = errors.New("Api not reachable") + // ErrCannotLock is exported + ErrCannotLock = errors.New("Error acquiring the lock") + // ErrWatchDoesNotExist is exported + ErrWatchDoesNotExist = errors.New("No watch found for specified key") + // ErrKeyModified is exported + ErrKeyModified = errors.New("Unable to complete atomic operation, key modified") + // ErrKeyNotFound is exported + ErrKeyNotFound = errors.New("Key not found in store") +) + +// KV represents the different supported K/V +type KV string + +const ( + // CONSUL is exported + CONSUL KV = "consul" + // ETCD is exported + ETCD = "etcd" + // ZOOKEEPER is exported + ZOOKEEPER = "zookeeper" +) + +// TLSConfig takes a built tls.Config object +func TLSConfig(tls *tls.Config) interface{} { + return tls +} + +// Timeout takes a timout for client Initialization +func Timeout(time time.Duration) interface{} { + return time +} diff --git a/pkg/store/zookeeper.go b/pkg/store/zookeeper.go new file mode 100644 index 0000000000..4db93e941b --- /dev/null +++ b/pkg/store/zookeeper.go @@ -0,0 +1,228 @@ +package store + +import ( + "strings" + "time" + + log "github.com/Sirupsen/logrus" + zk "github.com/samuel/go-zookeeper/zk" +) + +// Zookeeper embeds the zookeeper client +// and list of watches +type Zookeeper struct { + timeout time.Duration + client *zk.Conn + watches map[string]<-chan zk.Event +} + +// InitializeZookeeper creates a new Zookeeper client +// given a list of endpoints and optional tls config +func InitializeZookeeper(endpoints []string, options ...interface{}) (Store, error) { + s := &Zookeeper{} + s.watches = make(map[string]<-chan zk.Event) + s.timeout = 5 * time.Second // default timeout + + // Sets options + s.SetOptions(options...) + + conn, _, err := zk.Connect(endpoints, s.timeout) + if err != nil { + log.Error(err) + return nil, err + } + s.client = conn + return s, nil +} + +// SetOptions sets the options for Zookeeper +func (s *Zookeeper) SetOptions(options ...interface{}) { + for _, opt := range options { + + switch opt := opt.(type) { + + case time.Duration: + s.SetTimeout(opt) + + default: + // TODO give more meaningful information to print + log.Error("store: option unsupported for zookeeper") + + } + } +} + +// SetTimeout sets the timout for connecting to Zookeeper +func (s *Zookeeper) SetTimeout(time time.Duration) { + s.timeout = time +} + +// Get the value at "key", returns the last modified index +// to use in conjunction to CAS calls +func (s *Zookeeper) Get(key string) (value []byte, lastIndex uint64, err error) { + resp, meta, err := s.client.Get(format(key)) + if err != nil { + return nil, 0, err + } + if resp == nil { + return nil, 0, ErrKeyNotFound + } + return resp, uint64(meta.Mzxid), nil +} + +// Create the entire path for a directory that does not exist +func (s *Zookeeper) createFullpath(path []string) error { + for i := 1; i <= len(path); i++ { + newpath := "/" + strings.Join(path[:i], "/") + _, err := s.client.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll)) + if err != nil { + // Skip if node already exists + if err != zk.ErrNodeExists { + return err + } + } + } + return nil +} + +// Put a value at "key" +func (s *Zookeeper) Put(key string, value []byte) error { + fkey := format(key) + exists, err := s.Exists(key) + if err != nil { + return err + } + if !exists { + s.createFullpath(splitKey(key)) + } + _, err = s.client.Set(fkey, value, -1) + return err +} + +// Delete a value at "key" +func (s *Zookeeper) Delete(key string) error { + err := s.client.Delete(format(key), -1) + return err +} + +// Exists checks if the key exists inside the store +func (s *Zookeeper) Exists(key string) (bool, error) { + exists, _, err := s.client.Exists(format(key)) + if err != nil { + return false, err + } + return exists, nil +} + +// Watch a single key for modifications +func (s *Zookeeper) Watch(key string, _ time.Duration, callback WatchCallback) error { + key = format(key) + _, _, eventChan, err := s.client.GetW(key) + if err != nil { + return err + } + + // Create a new Watch entry with eventChan + s.watches[key] = eventChan + + for e := range eventChan { + if e.Type == zk.EventNodeChildrenChanged { + log.WithField("name", "zk").Debug("Discovery watch triggered") + entry, _, err := s.Get(key) + value := [][]byte{[]byte(entry)} + if err == nil { + callback(value) + } + } + } + + return nil +} + +// CancelWatch cancels a watch, sends a signal to the appropriate +// stop channel +func (s *Zookeeper) CancelWatch(key string) error { + key = format(key) + if _, ok := s.watches[key]; !ok { + log.Error("Chan does not exist for key: ", key) + return ErrWatchDoesNotExist + } + // Just remove the entry on watches key + s.watches[key] = nil + return nil +} + +// GetRange gets a range of values at "directory" +func (s *Zookeeper) GetRange(prefix string) (values [][]byte, err error) { + entries, _, err := s.client.Children(format(prefix)) + if err != nil { + log.Error("Cannot fetch range of keys beginning with prefix: ", prefix) + return nil, err + } + for _, item := range entries { + values = append(values, []byte(item)) + } + return values, err +} + +// DeleteRange deletes a range of values at "directory" +func (s *Zookeeper) DeleteRange(prefix string) error { + err := s.client.Delete(format(prefix), -1) + return err +} + +// WatchRange triggers a watch on a range of values at "directory" +func (s *Zookeeper) WatchRange(prefix string, filter string, _ time.Duration, callback WatchCallback) error { + prefix = format(prefix) + _, _, eventChan, err := s.client.ChildrenW(prefix) + if err != nil { + return err + } + + // Create a new Watch entry with eventChan + s.watches[prefix] = eventChan + + for e := range eventChan { + if e.Type == zk.EventNodeChildrenChanged { + log.WithField("name", "zk").Debug("Discovery watch triggered") + values, err := s.GetRange(prefix) + if err == nil { + callback(values) + } + } + } + + return nil +} + +// CancelWatchRange stops the watch on the range of values, sends +// a signal to the appropriate stop channel +func (s *Zookeeper) CancelWatchRange(prefix string) error { + return s.CancelWatch(prefix) +} + +// AtomicPut put a value at "key" if the key has not been +// modified in the meantime, throws an error if this is the case +func (s *Zookeeper) AtomicPut(key string, oldValue []byte, newValue []byte, index uint64) (bool, error) { + // Use index of Set method to implement CAS + return false, ErrNotImplemented +} + +// AtomicDelete deletes a value at "key" if the key has not +// been modified in the meantime, throws an error if this is the case +func (s *Zookeeper) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) { + return false, ErrNotImplemented +} + +// Acquire the lock for "key"/"directory" +func (s *Zookeeper) Acquire(path string, value []byte) (string, error) { + // lock := zk.NewLock(s.client, path, nil) + // locks[path] = lock + // lock.Lock() + return "", ErrNotImplemented +} + +// Release the lock for "key"/"directory" +func (s *Zookeeper) Release(session string) error { + return ErrNotImplemented +} From d333da52848b26dffe82de99b1e02bfd1c5cb5d3 Mon Sep 17 00:00:00 2001 From: Alexandre Beslic Date: Mon, 11 May 2015 17:49:40 -0700 Subject: [PATCH 2/9] remove log info for default case in SetOptions Signed-off-by: Alexandre Beslic --- pkg/store/consul.go | 3 +-- pkg/store/etcd.go | 3 +-- pkg/store/zookeeper.go | 6 +++--- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/pkg/store/consul.go b/pkg/store/consul.go index e1418e377f..049e97289b 100644 --- a/pkg/store/consul.go +++ b/pkg/store/consul.go @@ -72,8 +72,7 @@ func (s *Consul) SetOptions(options ...interface{}) { s.SetTimeout(opt) default: - // TODO give more meaningful information to print - log.Info("store: option unsupported for consul") + // ignore } } diff --git a/pkg/store/etcd.go b/pkg/store/etcd.go index 16a6bd7245..48c67df89e 100644 --- a/pkg/store/etcd.go +++ b/pkg/store/etcd.go @@ -42,8 +42,7 @@ func (s *Etcd) SetOptions(options ...interface{}) { s.SetTimeout(opt) default: - // TODO give more meaningful information to print - log.Info("store: option unsupported for etcd") + // ignore } } diff --git a/pkg/store/zookeeper.go b/pkg/store/zookeeper.go index 4db93e941b..9bbdbfc1ba 100644 --- a/pkg/store/zookeeper.go +++ b/pkg/store/zookeeper.go @@ -45,8 +45,7 @@ func (s *Zookeeper) SetOptions(options ...interface{}) { s.SetTimeout(opt) default: - // TODO give more meaningful information to print - log.Error("store: option unsupported for zookeeper") + // ignore } } @@ -154,7 +153,8 @@ func (s *Zookeeper) CancelWatch(key string) error { // GetRange gets a range of values at "directory" func (s *Zookeeper) GetRange(prefix string) (values [][]byte, err error) { - entries, _, err := s.client.Children(format(prefix)) + prefix = format(prefix) + entries, _, err := s.client.Children(prefix) if err != nil { log.Error("Cannot fetch range of keys beginning with prefix: ", prefix) return nil, err From e039563c78915a32ded0e107a50a9a1c7ab19174 Mon Sep 17 00:00:00 2001 From: Alexandre Beslic Date: Mon, 11 May 2015 18:23:40 -0700 Subject: [PATCH 3/9] Fix zookeeper timeout value Signed-off-by: Alexandre Beslic --- cli/cli.go | 2 +- discovery/kv/kv.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cli/cli.go b/cli/cli.go index d9772ad128..e52bcf2599 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -84,7 +84,7 @@ func Run() { // FIXME fill TLS struct tls := &discovery.TLS{} - d, err := discovery.New(dflag, 0, tls) + d, err := discovery.New(dflag, 10, tls) if err != nil { log.Fatal(err) } diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go index 3758788ef4..03b2fe6037 100644 --- a/discovery/kv/kv.go +++ b/discovery/kv/kv.go @@ -49,7 +49,7 @@ func (s *Discovery) Initialize(uris string, heartbeat uint64, tls *discovery.TLS s.store, err = store.CreateStore( s.name, // name of the store addrs, - store.Timeout(time.Duration(heartbeat)*time.Second), + store.Timeout(s.heartbeat), store.TLSConfig(tls.TLSConfig), ) if err != nil { From 8909dfda637e10eaa084810c823631418836ae49 Mon Sep 17 00:00:00 2001 From: Alexandre Beslic Date: Mon, 11 May 2015 19:07:21 -0700 Subject: [PATCH 4/9] Fix zookeeper watch methods with key being formatted twice before given to GetRange Signed-off-by: Alexandre Beslic --- pkg/store/zookeeper.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/store/zookeeper.go b/pkg/store/zookeeper.go index 9bbdbfc1ba..5ae18d1dd8 100644 --- a/pkg/store/zookeeper.go +++ b/pkg/store/zookeeper.go @@ -115,14 +115,14 @@ func (s *Zookeeper) Exists(key string) (bool, error) { // Watch a single key for modifications func (s *Zookeeper) Watch(key string, _ time.Duration, callback WatchCallback) error { - key = format(key) - _, _, eventChan, err := s.client.GetW(key) + fkey := format(key) + _, _, eventChan, err := s.client.GetW(fkey) if err != nil { return err } // Create a new Watch entry with eventChan - s.watches[key] = eventChan + s.watches[fkey] = eventChan for e := range eventChan { if e.Type == zk.EventNodeChildrenChanged { @@ -173,14 +173,14 @@ func (s *Zookeeper) DeleteRange(prefix string) error { // WatchRange triggers a watch on a range of values at "directory" func (s *Zookeeper) WatchRange(prefix string, filter string, _ time.Duration, callback WatchCallback) error { - prefix = format(prefix) - _, _, eventChan, err := s.client.ChildrenW(prefix) + fprefix := format(prefix) + _, _, eventChan, err := s.client.ChildrenW(fprefix) if err != nil { return err } // Create a new Watch entry with eventChan - s.watches[prefix] = eventChan + s.watches[fprefix] = eventChan for e := range eventChan { if e.Type == zk.EventNodeChildrenChanged { From 6f67ca3a58010d6b73c95ee30bc509c7329a0e55 Mon Sep 17 00:00:00 2001 From: Alexandre Beslic Date: Mon, 11 May 2015 22:33:08 -0700 Subject: [PATCH 5/9] replace variadic method options for simple Config struct, correct Consul Watch method Signed-off-by: Alexandre Beslic --- cli/cli.go | 4 +- cli/join.go | 5 +-- cli/manage.go | 4 +- cluster/options.go | 7 +--- cluster/swarm/cluster.go | 6 +-- discovery/discovery.go | 9 +--- discovery/file/file.go | 3 +- discovery/kv/kv.go | 9 ++-- discovery/kv/kv_test.go | 8 ++-- discovery/nodes/nodes.go | 3 +- discovery/token/token.go | 3 +- pkg/store/consul.go | 90 ++++++++++++++++++++-------------------- pkg/store/etcd.go | 69 +++++++++++++----------------- pkg/store/store.go | 6 +-- pkg/store/structs.go | 12 ++---- pkg/store/zookeeper.go | 25 +++-------- 16 files changed, 110 insertions(+), 153 deletions(-) diff --git a/cli/cli.go b/cli/cli.go index e52bcf2599..09acf9f32a 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -82,9 +82,7 @@ func Run() { log.Fatalf("discovery required to list a cluster. See '%s list --help'.", c.App.Name) } - // FIXME fill TLS struct - tls := &discovery.TLS{} - d, err := discovery.New(dflag, 10, tls) + d, err := discovery.New(dflag, 10, nil) if err != nil { log.Fatal(err) } diff --git a/cli/join.go b/cli/join.go index 100f56da7c..2bc73523b1 100644 --- a/cli/join.go +++ b/cli/join.go @@ -26,9 +26,8 @@ func join(c *cli.Context) { log.Fatal("--heartbeat should be an unsigned integer and greater than 0") } - // FIXME Add TLS - tls := &discovery.TLS{TLSConfig: nil} - d, err := discovery.New(dflag, hb, tls) + // TODO Add TLS on join + d, err := discovery.New(dflag, hb, nil) if err != nil { log.Fatal(err) } diff --git a/cli/manage.go b/cli/manage.go index 871ed55017..ec60c50ce7 100644 --- a/cli/manage.go +++ b/cli/manage.go @@ -69,7 +69,6 @@ func manage(c *cli.Context) { err error ) - tls := &cluster.TLSConfig{} // If either --tls or --tlsverify are specified, load the certificates. if c.Bool("tls") || c.Bool("tlsverify") { if !c.IsSet("tlscert") || !c.IsSet("tlskey") { @@ -83,7 +82,6 @@ func manage(c *cli.Context) { c.String("tlscert"), c.String("tlskey"), c.Bool("tlsverify")) - tls.Config = tlsConfig if err != nil { log.Fatal(err) } @@ -127,7 +125,7 @@ func manage(c *cli.Context) { log.Fatal("--heartbeat should be an unsigned integer and greater than 0") } options := &cluster.Options{ - TLS: tls, + TLS: tlsConfig, OvercommitRatio: c.Float64("overcommit"), Discovery: dflag, Heartbeat: hb, diff --git a/cluster/options.go b/cluster/options.go index b2d82ce6d8..9871cbecca 100644 --- a/cluster/options.go +++ b/cluster/options.go @@ -4,13 +4,8 @@ import "crypto/tls" // Options is exported type Options struct { - TLS *TLSConfig + TLS *tls.Config OvercommitRatio float64 Discovery string Heartbeat uint64 } - -// TLSConfig is exported -type TLSConfig struct { - Config *tls.Config -} diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index 4353b2098d..49f41d62bd 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -42,9 +42,7 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, options *clu // get the list of entries from the discovery service go func() { - discoveryTLS := &discovery.TLS{TLSConfig: options.TLS.Config} - log.Info("options TLS: ", options.TLS.Config) - d, err := discovery.New(options.Discovery, options.Heartbeat, discoveryTLS) + d, err := discovery.New(options.Discovery, options.Heartbeat, options.TLS) if err != nil { log.Fatal(err) } @@ -140,7 +138,7 @@ func (c *Cluster) newEntries(entries []*discovery.Entry) { go func(m *discovery.Entry) { if !c.hasEngine(m.String()) { engine := cluster.NewEngine(m.String(), c.options.OvercommitRatio) - if err := engine.Connect(c.options.TLS.Config); err != nil { + if err := engine.Connect(c.options.TLS); err != nil { log.Error(err) return } diff --git a/discovery/discovery.go b/discovery/discovery.go index c052e6e194..5d2e72b1e2 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -16,11 +16,6 @@ type Entry struct { Port string } -// TLS is exported -type TLS struct { - TLSConfig *tls.Config -} - // NewEntry is exported func NewEntry(url string) (*Entry, error) { host, port, err := net.SplitHostPort(url) @@ -39,7 +34,7 @@ type WatchCallback func(entries []*Entry) // Discovery is exported type Discovery interface { - Initialize(string, uint64, *TLS) error + Initialize(string, uint64, *tls.Config) error Fetch() ([]*Entry, error) Watch(WatchCallback) Register(string) error @@ -79,7 +74,7 @@ func parse(rawurl string) (string, string) { } // New is exported -func New(rawurl string, heartbeat uint64, tls *TLS) (Discovery, error) { +func New(rawurl string, heartbeat uint64, tls *tls.Config) (Discovery, error) { scheme, uri := parse(rawurl) if discovery, exists := discoveries[scheme]; exists { diff --git a/discovery/file/file.go b/discovery/file/file.go index 63b0d027bd..0f52349388 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -1,6 +1,7 @@ package file import ( + "crypto/tls" "io/ioutil" "strings" "time" @@ -19,7 +20,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(path string, heartbeat uint64, _ *discovery.TLS) error { +func (s *Discovery) Initialize(path string, heartbeat uint64, _ *tls.Config) error { s.path = path s.heartbeat = heartbeat return nil diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go index 03b2fe6037..d4f6c105d2 100644 --- a/discovery/kv/kv.go +++ b/discovery/kv/kv.go @@ -1,6 +1,7 @@ package kv import ( + "crypto/tls" "fmt" "path" "strings" @@ -25,7 +26,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(uris string, heartbeat uint64, tls *discovery.TLS) error { +func (s *Discovery) Initialize(uris string, heartbeat uint64, tls *tls.Config) error { var ( parts = strings.SplitN(uris, "/", 2) ips = strings.Split(parts[0], ",") @@ -49,8 +50,10 @@ func (s *Discovery) Initialize(uris string, heartbeat uint64, tls *discovery.TLS s.store, err = store.CreateStore( s.name, // name of the store addrs, - store.Timeout(s.heartbeat), - store.TLSConfig(tls.TLSConfig), + store.Config{ + TLS: tls, + Timeout: s.heartbeat, + }, ) if err != nil { return err diff --git a/discovery/kv/kv_test.go b/discovery/kv/kv_test.go index a7b6b7925c..a4b4b60093 100644 --- a/discovery/kv/kv_test.go +++ b/discovery/kv/kv_test.go @@ -3,20 +3,18 @@ package kv import ( "testing" - "github.com/docker/swarm/discovery" "github.com/stretchr/testify/assert" ) func TestInitialize(t *testing.T) { discoveryService := &Discovery{} - tls := &discovery.TLS{} - assert.Equal(t, discoveryService.Initialize("127.0.0.1", 0, tls).Error(), "invalid format \"127.0.0.1\", missing ") + assert.Equal(t, discoveryService.Initialize("127.0.0.1", 0, nil).Error(), "invalid format \"127.0.0.1\", missing ") - assert.Error(t, discoveryService.Initialize("127.0.0.1/path", 0, tls)) + assert.Error(t, discoveryService.Initialize("127.0.0.1/path", 0, nil)) assert.Equal(t, discoveryService.prefix, "path") - assert.Error(t, discoveryService.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0, tls)) + assert.Error(t, discoveryService.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0, nil)) assert.Equal(t, discoveryService.prefix, "path") } diff --git a/discovery/nodes/nodes.go b/discovery/nodes/nodes.go index cd0646d6df..5236eb5ec5 100644 --- a/discovery/nodes/nodes.go +++ b/discovery/nodes/nodes.go @@ -1,6 +1,7 @@ package nodes import ( + "crypto/tls" "strings" "github.com/docker/swarm/discovery" @@ -16,7 +17,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(uris string, _ uint64, _ *discovery.TLS) error { +func (s *Discovery) Initialize(uris string, _ uint64, _ *tls.Config) error { for _, input := range strings.Split(uris, ",") { for _, ip := range discovery.Generate(input) { entry, err := discovery.NewEntry(ip) diff --git a/discovery/token/token.go b/discovery/token/token.go index f9fa081d26..21c03780f5 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -1,6 +1,7 @@ package token import ( + "crypto/tls" "encoding/json" "errors" "fmt" @@ -27,7 +28,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(urltoken string, heartbeat uint64, _ *discovery.TLS) error { +func (s *Discovery) Initialize(urltoken string, heartbeat uint64, _ *tls.Config) error { if i := strings.LastIndex(urltoken, "/"); i != -1 { s.url = "https://" + urltoken[:i] s.token = urltoken[i+1:] diff --git a/pkg/store/consul.go b/pkg/store/consul.go index 049e97289b..a052d9d556 100644 --- a/pkg/store/consul.go +++ b/pkg/store/consul.go @@ -28,12 +28,11 @@ type Consul struct { type Watch struct { LastIndex uint64 Interval time.Duration - EventChan interface{} } // InitializeConsul creates a new Consul client given // a list of endpoints and optional tls config -func InitializeConsul(endpoints []string, options ...interface{}) (Store, error) { +func InitializeConsul(endpoints []string, options Config) (Store, error) { s := &Consul{} s.sessions = make(map[string]*api.Session) s.watches = make(map[string]*Watch) @@ -45,8 +44,13 @@ func InitializeConsul(endpoints []string, options ...interface{}) (Store, error) config.Address = endpoints[0] config.Scheme = "http" - // Sets all the options - s.SetOptions(options...) + if options.TLS != nil { + s.setTLS(options.TLS) + } + + if options.Timeout != 0 { + s.setTimeout(options.Timeout) + } // Creates a new client client, err := api.NewClient(config) @@ -59,37 +63,16 @@ func InitializeConsul(endpoints []string, options ...interface{}) (Store, error) return s, nil } -// SetOptions sets the options for Consul -func (s *Consul) SetOptions(options ...interface{}) { - for _, opt := range options { - - switch opt := opt.(type) { - - case *tls.Config: - s.SetTLS(opt) - - case time.Duration: - s.SetTimeout(opt) - - default: - // ignore - - } - } -} - // SetTLS sets Consul TLS options -func (s *Consul) SetTLS(tls *tls.Config) { - if tls != nil { - s.config.HttpClient.Transport = &http.Transport{ - TLSClientConfig: tls, - } - s.config.Scheme = "https" +func (s *Consul) setTLS(tls *tls.Config) { + s.config.HttpClient.Transport = &http.Transport{ + TLSClientConfig: tls, } + s.config.Scheme = "https" } // SetTimeout sets the timout for connecting to Consul -func (s *Consul) SetTimeout(time time.Duration) { +func (s *Consul) setTimeout(time time.Duration) { s.config.WaitTime = time } @@ -137,6 +120,9 @@ func (s *Consul) GetRange(prefix string) (values [][]byte, err error) { if err != nil { return nil, err } + if len(pairs) == 0 { + return nil, ErrKeyNotFound + } for _, pair := range pairs { if pair.Key == prefix { continue @@ -154,17 +140,24 @@ func (s *Consul) DeleteRange(prefix string) error { // Watch a single key for modifications func (s *Consul) Watch(key string, heartbeat time.Duration, callback WatchCallback) error { - key = partialFormat(key) - interval := heartbeat - eventChan := s.waitForChange(key) - s.watches[key] = &Watch{Interval: interval, EventChan: eventChan} + fkey := partialFormat(key) + + // We get the last index first + _, meta, err := s.client.KV().Get(fkey, nil) + if err != nil { + return err + } + + // Add watch to map + s.watches[fkey] = &Watch{LastIndex: meta.LastIndex, Interval: heartbeat} + eventChan := s.waitForChange(fkey) for _ = range eventChan { log.WithField("name", "consul").Debug("Key watch triggered") entry, _, err := s.Get(key) if err != nil { - log.Error("Cannot refresh the key: ", key, ", cancelling watch") - s.watches[key] = nil + log.Error("Cannot refresh the key: ", fkey, ", cancelling watch") + s.watches[fkey] = nil return err } @@ -190,6 +183,7 @@ func (s *Consul) CancelWatch(key string) error { // Internal function to check if a key has changed func (s *Consul) waitForChange(key string) <-chan uint64 { ch := make(chan uint64) + kv := s.client.KV() go func() { for { watch, ok := s.watches[key] @@ -199,8 +193,9 @@ func (s *Consul) waitForChange(key string) <-chan uint64 { } option := &api.QueryOptions{ WaitIndex: watch.LastIndex, - WaitTime: watch.Interval} - _, meta, err := s.client.KV().Get(key, option) + WaitTime: watch.Interval, + } + _, meta, err := kv.List(key, option) if err != nil { log.WithField("name", "consul").Errorf("Discovery error: %v", err) break @@ -215,17 +210,24 @@ func (s *Consul) waitForChange(key string) <-chan uint64 { // WatchRange triggers a watch on a range of values at "directory" func (s *Consul) WatchRange(prefix string, filter string, heartbeat time.Duration, callback WatchCallback) error { - prefix = partialFormat(prefix) - interval := heartbeat - eventChan := s.waitForChange(prefix) - s.watches[prefix] = &Watch{Interval: interval, EventChan: eventChan} + fprefix := partialFormat(prefix) + + // We get the last index first + _, meta, err := s.client.KV().Get(prefix, nil) + if err != nil { + return err + } + + // Add watch to map + s.watches[fprefix] = &Watch{LastIndex: meta.LastIndex, Interval: heartbeat} + eventChan := s.waitForChange(fprefix) for _ = range eventChan { log.WithField("name", "consul").Debug("Key watch triggered") values, err := s.GetRange(prefix) if err != nil { - log.Error("Cannot refresh keys with prefix: ", prefix, ", cancelling watch") - s.watches[prefix] = nil + log.Error("Cannot refresh keys with prefix: ", fprefix, ", cancelling watch") + s.watches[fprefix] = nil return err } callback(values) diff --git a/pkg/store/etcd.go b/pkg/store/etcd.go index 48c67df89e..4487ec5578 100644 --- a/pkg/store/etcd.go +++ b/pkg/store/etcd.go @@ -19,62 +19,49 @@ type Etcd struct { // InitializeEtcd creates a new Etcd client given // a list of endpoints and optional tls config -func InitializeEtcd(addrs []string, options ...interface{}) (Store, error) { +func InitializeEtcd(addrs []string, options Config) (Store, error) { s := &Etcd{} s.watches = make(map[string]chan<- bool) entries := createEndpoints(addrs, "http") s.client = etcd.NewClient(entries) - s.SetOptions(options...) - return s, nil -} -// SetOptions sets the options for Etcd -func (s *Etcd) SetOptions(options ...interface{}) { - for _, opt := range options { - - switch opt := opt.(type) { - - case *tls.Config: - s.SetTLS(opt) - - case time.Duration: - s.SetTimeout(opt) - - default: - // ignore - - } + if options.TLS != nil { + s.setTLS(options.TLS) } + + if options.Timeout != 0 { + s.setTimeout(options.Timeout) + } + + return s, nil } // SetTLS sets the tls configuration given the path // of certificate files -func (s *Etcd) SetTLS(tls *tls.Config) { - if tls != nil { - // Change to https scheme - var addrs []string - entries := s.client.GetCluster() - for _, entry := range entries { - addrs = append(addrs, strings.Replace(entry, "http", "https", -1)) - } - s.client.SetCluster(addrs) - - // Set transport - t := http.Transport{ - Dial: (&net.Dialer{ - Timeout: 30 * time.Second, // default timeout - KeepAlive: 30 * time.Second, - }).Dial, - TLSHandshakeTimeout: 10 * time.Second, - TLSClientConfig: tls, - } - s.client.SetTransport(&t) +func (s *Etcd) setTLS(tls *tls.Config) { + // Change to https scheme + var addrs []string + entries := s.client.GetCluster() + for _, entry := range entries { + addrs = append(addrs, strings.Replace(entry, "http", "https", -1)) } + s.client.SetCluster(addrs) + + // Set transport + t := http.Transport{ + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, // default timeout + KeepAlive: 30 * time.Second, + }).Dial, + TLSHandshakeTimeout: 10 * time.Second, + TLSClientConfig: tls, + } + s.client.SetTransport(&t) } // SetTimeout sets the timeout used for connecting to the store -func (s *Etcd) SetTimeout(time time.Duration) { +func (s *Etcd) setTimeout(time time.Duration) { s.client.SetDialTimeout(time) } diff --git a/pkg/store/store.go b/pkg/store/store.go index a5640013e1..33d17d468e 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -11,7 +11,7 @@ import ( type WatchCallback func(value [][]byte) // Initialize creates a new Store object, initializing the client -type Initialize func(addrs []string, options ...interface{}) (Store, error) +type Initialize func(addrs []string, options Config) (Store, error) // Store represents the backend K/V storage // Each store should support every call listed @@ -74,11 +74,11 @@ func init() { } // CreateStore creates a an instance of store -func CreateStore(store string, addrs []string, opts ...interface{}) (Store, error) { +func CreateStore(store string, addrs []string, options Config) (Store, error) { if init, exists := stores[store]; exists { log.WithFields(log.Fields{"store": store}).Debug("Initializing store service") - return init(addrs, opts...) + return init(addrs, options) } return nil, ErrNotSupported diff --git a/pkg/store/structs.go b/pkg/store/structs.go index 6566a8302c..6e827fea4b 100644 --- a/pkg/store/structs.go +++ b/pkg/store/structs.go @@ -35,12 +35,8 @@ const ( ZOOKEEPER = "zookeeper" ) -// TLSConfig takes a built tls.Config object -func TLSConfig(tls *tls.Config) interface{} { - return tls -} - -// Timeout takes a timout for client Initialization -func Timeout(time time.Duration) interface{} { - return time +// Config contains the options for a storage client +type Config struct { + TLS *tls.Config + Timeout time.Duration } diff --git a/pkg/store/zookeeper.go b/pkg/store/zookeeper.go index 5ae18d1dd8..0ef3c00541 100644 --- a/pkg/store/zookeeper.go +++ b/pkg/store/zookeeper.go @@ -18,13 +18,14 @@ type Zookeeper struct { // InitializeZookeeper creates a new Zookeeper client // given a list of endpoints and optional tls config -func InitializeZookeeper(endpoints []string, options ...interface{}) (Store, error) { +func InitializeZookeeper(endpoints []string, options Config) (Store, error) { s := &Zookeeper{} s.watches = make(map[string]<-chan zk.Event) s.timeout = 5 * time.Second // default timeout - // Sets options - s.SetOptions(options...) + if options.Timeout != 0 { + s.setTimeout(options.Timeout) + } conn, _, err := zk.Connect(endpoints, s.timeout) if err != nil { @@ -35,24 +36,8 @@ func InitializeZookeeper(endpoints []string, options ...interface{}) (Store, err return s, nil } -// SetOptions sets the options for Zookeeper -func (s *Zookeeper) SetOptions(options ...interface{}) { - for _, opt := range options { - - switch opt := opt.(type) { - - case time.Duration: - s.SetTimeout(opt) - - default: - // ignore - - } - } -} - // SetTimeout sets the timout for connecting to Zookeeper -func (s *Zookeeper) SetTimeout(time time.Duration) { +func (s *Zookeeper) setTimeout(time time.Duration) { s.timeout = time } From 403f95f86c09303802da0465ac73af861af09881 Mon Sep 17 00:00:00 2001 From: Alexandre Beslic Date: Tue, 12 May 2015 14:19:30 -0700 Subject: [PATCH 6/9] Remove placeholder TLS parameter in discovery interface and New method Signed-off-by: Alexandre Beslic --- cli/cli.go | 4 ++-- cli/join.go | 3 +-- cluster/swarm/cluster.go | 2 +- discovery/discovery.go | 7 +++---- discovery/file/file.go | 3 +-- discovery/file/file_test.go | 2 +- discovery/kv/kv.go | 4 +--- discovery/kv/kv_test.go | 6 +++--- discovery/nodes/nodes.go | 3 +-- discovery/nodes/nodes_test.go | 4 ++-- discovery/token/token.go | 3 +-- discovery/token/token_test.go | 6 +++--- 12 files changed, 20 insertions(+), 27 deletions(-) diff --git a/cli/cli.go b/cli/cli.go index 09acf9f32a..bed480d126 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -64,7 +64,7 @@ func Run() { log.Fatalf("the `create` command takes no arguments. See '%s create --help'.", c.App.Name) } discovery := &token.Discovery{} - discovery.Initialize("", 0, nil) + discovery.Initialize("", 0) token, err := discovery.CreateCluster() if err != nil { log.Fatal(err) @@ -82,7 +82,7 @@ func Run() { log.Fatalf("discovery required to list a cluster. See '%s list --help'.", c.App.Name) } - d, err := discovery.New(dflag, 10, nil) + d, err := discovery.New(dflag, 10) if err != nil { log.Fatal(err) } diff --git a/cli/join.go b/cli/join.go index 2bc73523b1..3c5a6c7594 100644 --- a/cli/join.go +++ b/cli/join.go @@ -26,8 +26,7 @@ func join(c *cli.Context) { log.Fatal("--heartbeat should be an unsigned integer and greater than 0") } - // TODO Add TLS on join - d, err := discovery.New(dflag, hb, nil) + d, err := discovery.New(dflag, hb) if err != nil { log.Fatal(err) } diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index 49f41d62bd..e8add8457f 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -42,7 +42,7 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, options *clu // get the list of entries from the discovery service go func() { - d, err := discovery.New(options.Discovery, options.Heartbeat, options.TLS) + d, err := discovery.New(options.Discovery, options.Heartbeat) if err != nil { log.Fatal(err) } diff --git a/discovery/discovery.go b/discovery/discovery.go index 5d2e72b1e2..6375b2ffc7 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -1,7 +1,6 @@ package discovery import ( - "crypto/tls" "errors" "fmt" "net" @@ -34,7 +33,7 @@ type WatchCallback func(entries []*Entry) // Discovery is exported type Discovery interface { - Initialize(string, uint64, *tls.Config) error + Initialize(string, uint64) error Fetch() ([]*Entry, error) Watch(WatchCallback) Register(string) error @@ -74,12 +73,12 @@ func parse(rawurl string) (string, string) { } // New is exported -func New(rawurl string, heartbeat uint64, tls *tls.Config) (Discovery, error) { +func New(rawurl string, heartbeat uint64) (Discovery, error) { scheme, uri := parse(rawurl) if discovery, exists := discoveries[scheme]; exists { log.WithFields(log.Fields{"name": scheme, "uri": uri}).Debug("Initializing discovery service") - err := discovery.Initialize(uri, heartbeat, tls) + err := discovery.Initialize(uri, heartbeat) return discovery, err } diff --git a/discovery/file/file.go b/discovery/file/file.go index 0f52349388..ba43aee077 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -1,7 +1,6 @@ package file import ( - "crypto/tls" "io/ioutil" "strings" "time" @@ -20,7 +19,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(path string, heartbeat uint64, _ *tls.Config) error { +func (s *Discovery) Initialize(path string, heartbeat uint64) error { s.path = path s.heartbeat = heartbeat return nil diff --git a/discovery/file/file_test.go b/discovery/file/file_test.go index 4b085a360e..14ac918b82 100644 --- a/discovery/file/file_test.go +++ b/discovery/file/file_test.go @@ -8,7 +8,7 @@ import ( func TestInitialize(t *testing.T) { discovery := &Discovery{} - discovery.Initialize("/path/to/file", 0, nil) + discovery.Initialize("/path/to/file", 0) assert.Equal(t, discovery.path, "/path/to/file") } diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go index d4f6c105d2..a7ac5acbb8 100644 --- a/discovery/kv/kv.go +++ b/discovery/kv/kv.go @@ -1,7 +1,6 @@ package kv import ( - "crypto/tls" "fmt" "path" "strings" @@ -26,7 +25,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(uris string, heartbeat uint64, tls *tls.Config) error { +func (s *Discovery) Initialize(uris string, heartbeat uint64) error { var ( parts = strings.SplitN(uris, "/", 2) ips = strings.Split(parts[0], ",") @@ -51,7 +50,6 @@ func (s *Discovery) Initialize(uris string, heartbeat uint64, tls *tls.Config) e s.name, // name of the store addrs, store.Config{ - TLS: tls, Timeout: s.heartbeat, }, ) diff --git a/discovery/kv/kv_test.go b/discovery/kv/kv_test.go index a4b4b60093..6a2b0c7fef 100644 --- a/discovery/kv/kv_test.go +++ b/discovery/kv/kv_test.go @@ -9,12 +9,12 @@ import ( func TestInitialize(t *testing.T) { discoveryService := &Discovery{} - assert.Equal(t, discoveryService.Initialize("127.0.0.1", 0, nil).Error(), "invalid format \"127.0.0.1\", missing ") + assert.Equal(t, discoveryService.Initialize("127.0.0.1", 0).Error(), "invalid format \"127.0.0.1\", missing ") - assert.Error(t, discoveryService.Initialize("127.0.0.1/path", 0, nil)) + assert.Error(t, discoveryService.Initialize("127.0.0.1/path", 0)) assert.Equal(t, discoveryService.prefix, "path") - assert.Error(t, discoveryService.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0, nil)) + assert.Error(t, discoveryService.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0)) assert.Equal(t, discoveryService.prefix, "path") } diff --git a/discovery/nodes/nodes.go b/discovery/nodes/nodes.go index 5236eb5ec5..009d275ff2 100644 --- a/discovery/nodes/nodes.go +++ b/discovery/nodes/nodes.go @@ -1,7 +1,6 @@ package nodes import ( - "crypto/tls" "strings" "github.com/docker/swarm/discovery" @@ -17,7 +16,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(uris string, _ uint64, _ *tls.Config) error { +func (s *Discovery) Initialize(uris string, _ uint64) error { for _, input := range strings.Split(uris, ",") { for _, ip := range discovery.Generate(input) { entry, err := discovery.NewEntry(ip) diff --git a/discovery/nodes/nodes_test.go b/discovery/nodes/nodes_test.go index 0a3ddeb03c..04f682cabb 100644 --- a/discovery/nodes/nodes_test.go +++ b/discovery/nodes/nodes_test.go @@ -8,7 +8,7 @@ import ( func TestInitialise(t *testing.T) { discovery := &Discovery{} - discovery.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0, nil) + discovery.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0) assert.Equal(t, len(discovery.entries), 2) assert.Equal(t, discovery.entries[0].String(), "1.1.1.1:1111") assert.Equal(t, discovery.entries[1].String(), "2.2.2.2:2222") @@ -16,7 +16,7 @@ func TestInitialise(t *testing.T) { func TestInitialiseWithPattern(t *testing.T) { discovery := &Discovery{} - discovery.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0, nil) + discovery.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0) assert.Equal(t, len(discovery.entries), 5) assert.Equal(t, discovery.entries[0].String(), "1.1.1.1:1111") assert.Equal(t, discovery.entries[1].String(), "1.1.1.2:1111") diff --git a/discovery/token/token.go b/discovery/token/token.go index 21c03780f5..4496c37348 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -1,7 +1,6 @@ package token import ( - "crypto/tls" "encoding/json" "errors" "fmt" @@ -28,7 +27,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(urltoken string, heartbeat uint64, _ *tls.Config) error { +func (s *Discovery) Initialize(urltoken string, heartbeat uint64) error { if i := strings.LastIndex(urltoken, "/"); i != -1 { s.url = "https://" + urltoken[:i] s.token = urltoken[i+1:] diff --git a/discovery/token/token_test.go b/discovery/token/token_test.go index b09fbbf591..a4e1f42edb 100644 --- a/discovery/token/token_test.go +++ b/discovery/token/token_test.go @@ -8,17 +8,17 @@ import ( func TestInitialize(t *testing.T) { discovery := &Discovery{} - err := discovery.Initialize("token", 0, nil) + err := discovery.Initialize("token", 0) assert.NoError(t, err) assert.Equal(t, discovery.token, "token") assert.Equal(t, discovery.url, DiscoveryURL) - err = discovery.Initialize("custom/path/token", 0, nil) + err = discovery.Initialize("custom/path/token", 0) assert.NoError(t, err) assert.Equal(t, discovery.token, "token") assert.Equal(t, discovery.url, "https://custom/path") - err = discovery.Initialize("", 0, nil) + err = discovery.Initialize("", 0) assert.Error(t, err) } From 6b0b82cddcb257ae84611379e00aa246a6c0a636 Mon Sep 17 00:00:00 2001 From: Alexandre Beslic Date: Tue, 12 May 2015 14:41:09 -0700 Subject: [PATCH 7/9] Add fixme related to timeout/heartbeat issue Signed-off-by: Alexandre Beslic --- cli/cli.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cli/cli.go b/cli/cli.go index bed480d126..b1296a8d6c 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -82,6 +82,7 @@ func Run() { log.Fatalf("discovery required to list a cluster. See '%s list --help'.", c.App.Name) } + // FIXME Add and use separate timeout flag instead of forcing it d, err := discovery.New(dflag, 10) if err != nil { log.Fatal(err) From 06f51e09b0c19649cc9979d29893299f990b7261 Mon Sep 17 00:00:00 2001 From: Alexandre Beslic Date: Tue, 12 May 2015 14:47:48 -0700 Subject: [PATCH 8/9] revert variable names Signed-off-by: Alexandre Beslic --- cli/manage.go | 2 +- cluster/options.go | 2 +- cluster/swarm/cluster.go | 2 +- discovery/token/token.go | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cli/manage.go b/cli/manage.go index ec60c50ce7..62c27c2d98 100644 --- a/cli/manage.go +++ b/cli/manage.go @@ -125,7 +125,7 @@ func manage(c *cli.Context) { log.Fatal("--heartbeat should be an unsigned integer and greater than 0") } options := &cluster.Options{ - TLS: tlsConfig, + TLSConfig: tlsConfig, OvercommitRatio: c.Float64("overcommit"), Discovery: dflag, Heartbeat: hb, diff --git a/cluster/options.go b/cluster/options.go index 9871cbecca..5c3b37ffee 100644 --- a/cluster/options.go +++ b/cluster/options.go @@ -4,7 +4,7 @@ import "crypto/tls" // Options is exported type Options struct { - TLS *tls.Config + TLSConfig *tls.Config OvercommitRatio float64 Discovery string Heartbeat uint64 diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index e8add8457f..5f4f2254dc 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -138,7 +138,7 @@ func (c *Cluster) newEntries(entries []*discovery.Entry) { go func(m *discovery.Entry) { if !c.hasEngine(m.String()) { engine := cluster.NewEngine(m.String(), c.options.OvercommitRatio) - if err := engine.Connect(c.options.TLS); err != nil { + if err := engine.Connect(c.options.TLSConfig); err != nil { log.Error(err) return } diff --git a/discovery/token/token.go b/discovery/token/token.go index 4496c37348..bada10ce52 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -54,16 +54,16 @@ func (s *Discovery) Fetch() ([]*discovery.Entry, error) { defer resp.Body.Close() - var entries []string + var addrs []string if resp.StatusCode == http.StatusOK { - if err := json.NewDecoder(resp.Body).Decode(&entries); err != nil { + if err := json.NewDecoder(resp.Body).Decode(&addrs); err != nil { return nil, err } } else { return nil, fmt.Errorf("Failed to fetch entries, Discovery service returned %d HTTP status code", resp.StatusCode) } - return discovery.CreateEntries(entries) + return discovery.CreateEntries(addrs) } // Watch is exported From 2fc3f8e663c681326f2bff607e5ea15b0842f067 Mon Sep 17 00:00:00 2001 From: Alexandre Beslic Date: Tue, 12 May 2015 14:50:53 -0700 Subject: [PATCH 9/9] update README Signed-off-by: Alexandre Beslic --- pkg/store/README.md | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/pkg/store/README.md b/pkg/store/README.md index 1d8b15bf07..43766aaf05 100644 --- a/pkg/store/README.md +++ b/pkg/store/README.md @@ -1,12 +1,6 @@ ---- -page_title: Docker Swarm storage -page_description: Swarm storage -page_keywords: docker, swarm, clustering, storage, metadata ---- - # Storage -Docker Swarm comes with multiple Key/Value storage backends. This package is used by the discovery service to register machines inside the cluster. It is also used to store cluster's metadata. +This package is used by the discovery service to register machines inside the cluster. It is also used to store cluster's metadata. ## Example of usage @@ -32,7 +26,9 @@ func main() { kv, err := store.CreateStore( store.Consul, []string{client}, - store.Timeout(10*time.Second), + store.Config{ + Timeout: 10*time.Second + }, ) if err != nil { log.Error("Cannot create store consul") @@ -81,11 +77,3 @@ type Store interface { To be elligible as a **discovery backend** only, a K/V store implementation should at least offer `Get`, `Put`, `WatchRange`, `GetRange`. You can get inspiration from existing backends to create a new one. This interface could be subject to changes to improve the experience of using the library and contributing to a new backend. - - -## Docker Swarm documentation index - -- [User guide](./index.md) -- [Sheduler strategies](./scheduler/strategy.md) -- [Sheduler filters](./scheduler/filter.md) -- [Swarm API](./API.md)