From 3564e38d8027144f8be906f8631a93297527362a Mon Sep 17 00:00:00 2001 From: Chanwit Kaewkasi Date: Tue, 6 Jan 2015 17:56:05 +0700 Subject: [PATCH 1/7] add zookeeper discovery Signed-off-by: Chanwit Kaewkasi --- discovery/zookeeper/zookeeper.go | 80 ++++++++++++++++++++++++++++++++ main.go | 1 + 2 files changed, 81 insertions(+) create mode 100644 discovery/zookeeper/zookeeper.go diff --git a/discovery/zookeeper/zookeeper.go b/discovery/zookeeper/zookeeper.go new file mode 100644 index 0000000000..d02cdf544d --- /dev/null +++ b/discovery/zookeeper/zookeeper.go @@ -0,0 +1,80 @@ +package zookeeper + +import ( + "path" + "strings" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/docker/swarm/discovery" + "github.com/samuel/go-zookeeper/zk" +) + +type ZkDiscoveryService struct { + conn *zk.Conn + path string + heartbeat int +} + +func init() { + discovery.Register("zk", &ZkDiscoveryService{}) +} + +func (s *ZkDiscoveryService) Initialize(uris string, heartbeat int) error { + var ( + // split here because uris can contain multiples ips + // like `zk://192.168.0.1,192.168.0.2,192.168.0.3/path` + parts = strings.SplitN(uris, "/", 2) + ips = strings.Split(parts[0], ",") + ) + + conn, _, err := zk.Connect(ips, time.Second) + + if err != nil { + return err + } + + s.conn = conn + s.path = "/" + parts[1] + s.heartbeat = heartbeat + + _, err = conn.Create(s.path, []byte{1}, 0, zk.WorldACL(zk.PermAll)) + if err != nil { + // if key already existed, then skip + if err != zk.ErrNodeExists { + return err + } + } + + return nil +} + +func (s *ZkDiscoveryService) Fetch() ([]*discovery.Node, error) { + addrs, _, err := s.conn.Children(s.path) + + if err != nil { + return nil, err + } + + var nodes []*discovery.Node + for _, addr := range addrs { + nodes = append(nodes, discovery.NewNode(addr)) + } + + return nodes, nil +} + +func (s *ZkDiscoveryService) Watch(callback discovery.WatchCallback) { + for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) { + log.Debugf("[ZK] Watch triggered") + nodes, err := s.Fetch() + if err == nil { + callback(nodes) + } + } +} + +func (s *ZkDiscoveryService) Register(addr string) error { + _, err := s.conn.Create(path.Join(s.path, "/"+addr), []byte(addr), 0, zk.WorldACL(zk.PermAll)) + return err +} diff --git a/main.go b/main.go index 4eb1d64a73..1965d24080 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( _ "github.com/docker/swarm/discovery/etcd" _ "github.com/docker/swarm/discovery/file" "github.com/docker/swarm/discovery/token" + _ "github.com/docker/swarm/discovery/zookeeper" ) func main() { From 195ba02473140a6d7a422dc8d76a46a35703c98e Mon Sep 17 00:00:00 2001 From: Chanwit Kaewkasi Date: Tue, 6 Jan 2015 18:51:02 +0700 Subject: [PATCH 2/7] add a unit test for zk discovery Signed-off-by: Chanwit Kaewkasi --- discovery/zookeeper/zookeeper_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 discovery/zookeeper/zookeeper_test.go diff --git a/discovery/zookeeper/zookeeper_test.go b/discovery/zookeeper/zookeeper_test.go new file mode 100644 index 0000000000..5e5a0b3258 --- /dev/null +++ b/discovery/zookeeper/zookeeper_test.go @@ -0,0 +1,16 @@ +package zookeeper + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestInitialize(t *testing.T) { + discovery := &ZkDiscoveryService{} + assert.Error(t, discovery.Initialize("127.0.0.1/path", 0)) + assert.Equal(t, discovery.path, "/path") + + assert.Error(t, discovery.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0)) + assert.Equal(t, discovery.path, "/path") +} From de2081fa1951eae079954f0a33068beccf2549dd Mon Sep 17 00:00:00 2001 From: Chanwit Kaewkasi Date: Tue, 6 Jan 2015 19:09:00 +0700 Subject: [PATCH 3/7] skip ErrNodeExists for node registration Signed-off-by: Chanwit Kaewkasi --- discovery/zookeeper/zookeeper.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/discovery/zookeeper/zookeeper.go b/discovery/zookeeper/zookeeper.go index d02cdf544d..292d766830 100644 --- a/discovery/zookeeper/zookeeper.go +++ b/discovery/zookeeper/zookeeper.go @@ -76,5 +76,9 @@ func (s *ZkDiscoveryService) Watch(callback discovery.WatchCallback) { func (s *ZkDiscoveryService) Register(addr string) error { _, err := s.conn.Create(path.Join(s.path, "/"+addr), []byte(addr), 0, zk.WorldACL(zk.PermAll)) - return err + if err != zk.ErrNodeExists { + return err + } else { + return nil + } } From 66ff423861d6c9a2132477da69d713fc5e6ac88f Mon Sep 17 00:00:00 2001 From: Chanwit Kaewkasi Date: Tue, 6 Jan 2015 19:27:15 +0700 Subject: [PATCH 4/7] make node registration more robust Signed-off-by: Chanwit Kaewkasi --- discovery/zookeeper/zookeeper.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/discovery/zookeeper/zookeeper.go b/discovery/zookeeper/zookeeper.go index 292d766830..b82a88aab1 100644 --- a/discovery/zookeeper/zookeeper.go +++ b/discovery/zookeeper/zookeeper.go @@ -75,10 +75,24 @@ func (s *ZkDiscoveryService) Watch(callback discovery.WatchCallback) { } func (s *ZkDiscoveryService) Register(addr string) error { - _, err := s.conn.Create(path.Join(s.path, "/"+addr), []byte(addr), 0, zk.WorldACL(zk.PermAll)) + newpath := path.Join(s.path, addr) + exists, _, err := s.conn.Exists(newpath) + if err != nil { + return err + } + + if exists { + err = s.conn.Delete(newpath, -1) + if err != nil { + return err + } + } + + _, err = s.conn.Create(newpath, []byte(addr), 0, zk.WorldACL(zk.PermAll)) if err != zk.ErrNodeExists { return err } else { return nil } + } From 1e2e60806df449817efaf13659c5b07293383389 Mon Sep 17 00:00:00 2001 From: Chanwit Kaewkasi Date: Tue, 6 Jan 2015 19:52:08 +0700 Subject: [PATCH 5/7] double check if someone delete /path outside swarm Signed-off-by: Chanwit Kaewkasi --- discovery/zookeeper/zookeeper.go | 33 ++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/discovery/zookeeper/zookeeper.go b/discovery/zookeeper/zookeeper.go index b82a88aab1..bda6c74992 100644 --- a/discovery/zookeeper/zookeeper.go +++ b/discovery/zookeeper/zookeeper.go @@ -76,23 +76,40 @@ func (s *ZkDiscoveryService) Watch(callback discovery.WatchCallback) { func (s *ZkDiscoveryService) Register(addr string) error { newpath := path.Join(s.path, addr) - exists, _, err := s.conn.Exists(newpath) + + // check existing for the parent path first + exist, _, err := s.conn.Exists(s.path) if err != nil { return err } - if exists { - err = s.conn.Delete(newpath, -1) + // create parent first + if exist == false { + + _, err = s.conn.Create(s.path, []byte{1}, 0, zk.WorldACL(zk.PermAll)) if err != nil { return err } - } - - _, err = s.conn.Create(newpath, []byte(addr), 0, zk.WorldACL(zk.PermAll)) - if err != zk.ErrNodeExists { + _, err = s.conn.Create(newpath, []byte(addr), 0, zk.WorldACL(zk.PermAll)) return err + } else { - return nil + + exist, _, err = s.conn.Exists(newpath) + if err != nil { + return err + } + + if exist { + err = s.conn.Delete(newpath, -1) + if err != nil { + return err + } + } + + _, err = s.conn.Create(newpath, []byte(addr), 0, zk.WorldACL(zk.PermAll)) + return err } + return nil } From 5d5d7bf4302654baeea7b7ac665dbef8855d9b64 Mon Sep 17 00:00:00 2001 From: Chanwit Kaewkasi Date: Tue, 6 Jan 2015 19:57:49 +0700 Subject: [PATCH 6/7] mark zookeeper discovery implemented Signed-off-by: Chanwit Kaewkasi --- ROADMAP.md | 2 +- flags.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ROADMAP.md b/ROADMAP.md index 29260c7ab8..71553661a8 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -23,7 +23,7 @@ Docker Swarm Roadmap * [ ] Pluggable scheduler * [ ] Discovery backends * [x] etcd - * [ ] zookeeper + * [x] zookeeper * [x] consul * [x] hub * [x] file diff --git a/flags.go b/flags.go index a734d9a3f9..c97d19bdec 100644 --- a/flags.go +++ b/flags.go @@ -6,7 +6,7 @@ var ( flDiscovery = cli.StringFlag{ Name: "discovery", Value: "", - Usage: "DiscoveryService to use [token://, etcd://,/, file://path/to/file, consul:///]", + Usage: "DiscoveryService to use [token://, etcd://,/, file://path/to/file, consul:///, zk://,/]", EnvVar: "SWARM_DISCOVERY", } flAddr = cli.StringFlag{ From f530d0886f0e1eb91cdc1ef4b2ba75063ce4bd35 Mon Sep 17 00:00:00 2001 From: Chanwit Kaewkasi Date: Wed, 7 Jan 2015 12:31:19 +0700 Subject: [PATCH 7/7] implement a proper zookeeper watcher Signed-off-by: Chanwit Kaewkasi --- discovery/zookeeper/zookeeper.go | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/discovery/zookeeper/zookeeper.go b/discovery/zookeeper/zookeeper.go index bda6c74992..67fda8d6d1 100644 --- a/discovery/zookeeper/zookeeper.go +++ b/discovery/zookeeper/zookeeper.go @@ -65,13 +65,24 @@ func (s *ZkDiscoveryService) Fetch() ([]*discovery.Node, error) { } func (s *ZkDiscoveryService) Watch(callback discovery.WatchCallback) { - for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) { - log.Debugf("[ZK] Watch triggered") - nodes, err := s.Fetch() - if err == nil { - callback(nodes) - } + + _, _, eventChan, err := s.conn.ChildrenW(s.path) + if err != nil { + log.Debugf("[ZK] Watch aborted") + return } + + for e := range eventChan { + if e.Type == zk.EventNodeChildrenChanged { + log.Debugf("[ZK] Watch triggered") + nodes, err := s.Fetch() + if err == nil { + callback(nodes) + } + } + + } + } func (s *ZkDiscoveryService) Register(addr string) error {