Merge pull request #223 from chanwit/zookeeper-discovery

Proposal: ZooKeeper discovery
This commit is contained in:
Victor Vieux 2015-01-08 14:08:41 -08:00
commit 57ca80bdce
5 changed files with 145 additions and 2 deletions

View File

@ -23,7 +23,7 @@ Docker Swarm Roadmap
* [ ] Pluggable scheduler
* [ ] Discovery backends
* [x] etcd
* [ ] zookeeper
* [x] zookeeper
* [x] consul
* [x] hub
* [x] file

View File

@ -0,0 +1,126 @@
package zookeeper
import (
"path"
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/discovery"
"github.com/samuel/go-zookeeper/zk"
)
type ZkDiscoveryService struct {
conn *zk.Conn
path string
heartbeat int
}
func init() {
discovery.Register("zk", &ZkDiscoveryService{})
}
func (s *ZkDiscoveryService) Initialize(uris string, heartbeat int) error {
var (
// split here because uris can contain multiples ips
// like `zk://192.168.0.1,192.168.0.2,192.168.0.3/path`
parts = strings.SplitN(uris, "/", 2)
ips = strings.Split(parts[0], ",")
)
conn, _, err := zk.Connect(ips, time.Second)
if err != nil {
return err
}
s.conn = conn
s.path = "/" + parts[1]
s.heartbeat = heartbeat
_, err = conn.Create(s.path, []byte{1}, 0, zk.WorldACL(zk.PermAll))
if err != nil {
// if key already existed, then skip
if err != zk.ErrNodeExists {
return err
}
}
return nil
}
func (s *ZkDiscoveryService) Fetch() ([]*discovery.Node, error) {
addrs, _, err := s.conn.Children(s.path)
if err != nil {
return nil, err
}
var nodes []*discovery.Node
for _, addr := range addrs {
nodes = append(nodes, discovery.NewNode(addr))
}
return nodes, nil
}
func (s *ZkDiscoveryService) Watch(callback discovery.WatchCallback) {
_, _, eventChan, err := s.conn.ChildrenW(s.path)
if err != nil {
log.Debugf("[ZK] Watch aborted")
return
}
for e := range eventChan {
if e.Type == zk.EventNodeChildrenChanged {
log.Debugf("[ZK] Watch triggered")
nodes, err := s.Fetch()
if err == nil {
callback(nodes)
}
}
}
}
func (s *ZkDiscoveryService) Register(addr string) error {
newpath := path.Join(s.path, addr)
// check existing for the parent path first
exist, _, err := s.conn.Exists(s.path)
if err != nil {
return err
}
// create parent first
if exist == false {
_, err = s.conn.Create(s.path, []byte{1}, 0, zk.WorldACL(zk.PermAll))
if err != nil {
return err
}
_, err = s.conn.Create(newpath, []byte(addr), 0, zk.WorldACL(zk.PermAll))
return err
} else {
exist, _, err = s.conn.Exists(newpath)
if err != nil {
return err
}
if exist {
err = s.conn.Delete(newpath, -1)
if err != nil {
return err
}
}
_, err = s.conn.Create(newpath, []byte(addr), 0, zk.WorldACL(zk.PermAll))
return err
}
return nil
}

View File

@ -0,0 +1,16 @@
package zookeeper
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestInitialize(t *testing.T) {
discovery := &ZkDiscoveryService{}
assert.Error(t, discovery.Initialize("127.0.0.1/path", 0))
assert.Equal(t, discovery.path, "/path")
assert.Error(t, discovery.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0))
assert.Equal(t, discovery.path, "/path")
}

View File

@ -6,7 +6,7 @@ var (
flDiscovery = cli.StringFlag{
Name: "discovery",
Value: "",
Usage: "DiscoveryService to use [token://<token>, etcd://<ip1>,<ip2>/<path>, file://path/to/file, consul://<addr>/<path>]",
Usage: "DiscoveryService to use [token://<token>, etcd://<ip1>,<ip2>/<path>, file://path/to/file, consul://<addr>/<path>, zk://<ip1>,<ip2>/<path>]",
EnvVar: "SWARM_DISCOVERY",
}
flAddr = cli.StringFlag{

View File

@ -12,6 +12,7 @@ import (
_ "github.com/docker/swarm/discovery/etcd"
_ "github.com/docker/swarm/discovery/file"
"github.com/docker/swarm/discovery/token"
_ "github.com/docker/swarm/discovery/zookeeper"
)
func main() {