Merge pull request #573 from ehazlett/update-godeps

update godeps (remove swarm discovery dependency)
This commit is contained in:
Evan Hazlett 2015-02-20 13:51:25 -08:00
commit 53414ded4e
17 changed files with 0 additions and 1024 deletions

5
Godeps/Godeps.json generated
View File

@ -115,11 +115,6 @@
"ImportPath": "github.com/docker/libtrust",
"Rev": "6b7834910dcbb3021adc193411d01f65595445fb"
},
{
"ImportPath": "github.com/docker/swarm/discovery",
"Comment": "v0.1.0-rc1-75-g86a028d",
"Rev": "86a028d7c1cb593744dfc8d9e3c1b6b22fbc18b8"
},
{
"ImportPath": "github.com/google/go-querystring/query",
"Rev": "30f7a39f4a218feb5325f3aebc60c32a572a8274"

View File

@ -1,172 +0,0 @@
Discovery
=========
`Docker Swarm` comes with multiple Discovery backends
## Examples
##### Using the hosted discovery service
```bash
# create a cluster
$ swarm create
6856663cdefdec325839a4b7e1de38e8 # <- this is your unique <cluster_id>
# on each of your nodes, start the swarm agent
# <node_ip> doesn't have to be public (eg. 192.168.0.X),
# as long as the other nodes can reach it, it is fine.
$ swarm join --addr=<node_ip:2375> token://<cluster_id>
# start the manager on any machine or your laptop
$ swarm manage -H tcp://<swarm_ip:swarm_port> token://<cluster_id>
# use the regular docker cli
$ docker -H tcp://<swarm_ip:swarm_port> info
$ docker -H tcp://<swarm_ip:swarm_port> run ...
$ docker -H tcp://<swarm_ip:swarm_port> ps
$ docker -H tcp://<swarm_ip:swarm_port> logs ...
...
# list nodes in your cluster
$ swarm list token://<cluster_id>
<node_ip:2375>
```
###### Using a static file describing the cluster
```bash
# for each of your nodes, add a line to a file
# <node_ip> doesn't have to be public (eg. 192.168.0.X),
# as long as the other nodes can reach it, it is fine.
$ echo <node_ip1:2375> >> /tmp/my_cluster
$ echo <node_ip2:2375> >> /tmp/my_cluster
$ echo <node_ip3:2375> >> /tmp/my_cluster
# start the manager on any machine or your laptop
$ swarm manage -H tcp://<swarm_ip:swarm_port> file:///tmp/my_cluster
# use the regular docker cli
$ docker -H tcp://<swarm_ip:swarm_port> info
$ docker -H tcp://<swarm_ip:swarm_port> run ...
$ docker -H tcp://<swarm_ip:swarm_port> ps
$ docker -H tcp://<swarm_ip:swarm_port> logs ...
...
# list nodes in your cluster
$ swarm list file:///tmp/my_cluster
<node_ip1:2375>
<node_ip2:2375>
<node_ip3:2375>
```
###### Using etcd
```bash
# on each of your nodes, start the swarm agent
# <node_ip> doesn't have to be public (eg. 192.168.0.X),
# as long as the other nodes can reach it, it is fine.
$ swarm join --addr=<node_ip:2375> etcd://<etcd_ip>/<path>
# start the manager on any machine or your laptop
$ swarm manage -H tcp://<swarm_ip:swarm_port> etcd://<etcd_ip>/<path>
# use the regular docker cli
$ docker -H tcp://<swarm_ip:swarm_port> info
$ docker -H tcp://<swarm_ip:swarm_port> run ...
$ docker -H tcp://<swarm_ip:swarm_port> ps
$ docker -H tcp://<swarm_ip:swarm_port> logs ...
...
# list nodes in your cluster
$ swarm list etcd://<etcd_ip>/<path>
<node_ip:2375>
```
###### Using consul
```bash
# on each of your nodes, start the swarm agent
# <node_ip> doesn't have to be public (eg. 192.168.0.X),
# as long as the other nodes can reach it, it is fine.
$ swarm join --addr=<node_ip:2375> consul://<consul_addr>/<path>
# start the manager on any machine or your laptop
$ swarm manage -H tcp://<swarm_ip:swarm_port> consul://<consul_addr>/<path>
# use the regular docker cli
$ docker -H tcp://<swarm_ip:swarm_port> info
$ docker -H tcp://<swarm_ip:swarm_port> run ...
$ docker -H tcp://<swarm_ip:swarm_port> ps
$ docker -H tcp://<swarm_ip:swarm_port> logs ...
...
# list nodes in your cluster
$ swarm list consul://<consul_addr>/<path>
<node_ip:2375>
```
###### Using zookeeper
```bash
# on each of your nodes, start the swarm agent
# <node_ip> doesn't have to be public (eg. 192.168.0.X),
# as long as the other nodes can reach it, it is fine.
$ swarm join --addr=<node_ip:2375> zk://<zookeeper_addr1>,<zookeeper_addr2>/<path>
# start the manager on any machine or your laptop
$ swarm manage -H tcp://<swarm_ip:swarm_port> zk://<zookeeper_addr1>,<zookeeper_addr2>/<path>
# use the regular docker cli
$ docker -H tcp://<swarm_ip:swarm_port> info
$ docker -H tcp://<swarm_ip:swarm_port> run ...
$ docker -H tcp://<swarm_ip:swarm_port> ps
$ docker -H tcp://<swarm_ip:swarm_port> logs ...
...
# list nodes in your cluster
$ swarm list zk://<zookeeper_addr1>,<zookeeper_addr2>/<path>
<node_ip:2375>
```
###### Using a static list of ips
```bash
# start the manager on any machine or your laptop
$ swarm manage -H <swarm_ip:swarm_port> nodes://<node_ip1:2375>,<node_ip2:2375>
#or
$ swarm manage -H <swarm_ip:swarm_port> nodes://<node_ip1:2375>,<node_ip2:2375>
# use the regular docker cli
$ docker -H <swarm_ip:swarm_port> info
$ docker -H <swarm_ip:swarm_port> run ...
$ docker -H <swarm_ip:swarm_port> ps
$ docker -H <swarm_ip:swarm_port> logs ...
...
```
## Contributing
Contributing a new discovery backend is easy,
simply implements this interface:
```go
type DiscoveryService interface {
Initialize(string, int) error
Fetch() ([]string, error)
Watch(WatchCallback)
Register(string) error
}
```
######Initialize
take the `discovery` without the scheme and a heartbeat (in seconds)
######Fetch
returns the list of all the nodes from the discovery
######Watch
triggers an update (`Fetch`),it can happen either via
a timer (like `token`) or use backend specific features (like `etcd`)
######Register
add a new node to the discovery

View File

@ -1,113 +0,0 @@
package consul
import (
"fmt"
"path"
"strings"
"time"
log "github.com/Sirupsen/logrus"
consul "github.com/armon/consul-api"
"github.com/docker/swarm/discovery"
)
type ConsulDiscoveryService struct {
heartbeat time.Duration
client *consul.Client
prefix string
lastIndex uint64
}
func init() {
discovery.Register("consul", &ConsulDiscoveryService{})
}
func (s *ConsulDiscoveryService) Initialize(uris string, heartbeat int) error {
parts := strings.SplitN(uris, "/", 2)
if len(parts) < 2 {
return fmt.Errorf("invalid format %q, missing <path>", uris)
}
addr := parts[0]
path := parts[1]
config := consul.DefaultConfig()
config.Address = addr
client, err := consul.NewClient(config)
if err != nil {
return err
}
s.client = client
s.heartbeat = time.Duration(heartbeat) * time.Second
s.prefix = path + "/"
kv := s.client.KV()
p := &consul.KVPair{Key: s.prefix, Value: nil}
if _, err = kv.Put(p, nil); err != nil {
return err
}
_, meta, err := kv.Get(s.prefix, nil)
if err != nil {
return err
}
s.lastIndex = meta.LastIndex
return nil
}
func (s *ConsulDiscoveryService) Fetch() ([]*discovery.Node, error) {
kv := s.client.KV()
pairs, _, err := kv.List(s.prefix, nil)
if err != nil {
return nil, err
}
var nodes []*discovery.Node
for _, pair := range pairs {
if pair.Key == s.prefix {
continue
}
node, err := discovery.NewNode(string(pair.Value))
if err != nil {
return nil, err
}
nodes = append(nodes, node)
}
return nodes, nil
}
func (s *ConsulDiscoveryService) Watch(callback discovery.WatchCallback) {
for _ = range s.waitForChange() {
log.WithField("name", "consul").Debug("Discovery watch triggered")
nodes, err := s.Fetch()
if err == nil {
callback(nodes)
}
}
}
func (s *ConsulDiscoveryService) Register(addr string) error {
kv := s.client.KV()
p := &consul.KVPair{Key: path.Join(s.prefix, addr), Value: []byte(addr)}
_, err := kv.Put(p, nil)
return err
}
func (s *ConsulDiscoveryService) waitForChange() <-chan uint64 {
c := make(chan uint64)
go func() {
for {
kv := s.client.KV()
option := &consul.QueryOptions{
WaitIndex: s.lastIndex,
WaitTime: s.heartbeat}
_, meta, err := kv.List(s.prefix, option)
if err != nil {
log.WithField("name", "consul").Errorf("Discovery error: %v", err)
break
}
s.lastIndex = meta.LastIndex
c <- s.lastIndex
}
close(c)
}()
return c
}

View File

@ -1,20 +0,0 @@
package consul
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestInitialize(t *testing.T) {
discovery := &ConsulDiscoveryService{}
assert.Equal(t, discovery.Initialize("127.0.0.1", 0).Error(), "invalid format \"127.0.0.1\", missing <path>")
assert.Error(t, discovery.Initialize("127.0.0.1/path", 0))
assert.Equal(t, discovery.prefix, "path/")
assert.Error(t, discovery.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0))
assert.Equal(t, discovery.prefix, "path/")
}

View File

@ -1,78 +0,0 @@
package discovery
import (
"errors"
"fmt"
"net"
"strings"
log "github.com/Sirupsen/logrus"
)
type Node struct {
Host string
Port string
}
func NewNode(url string) (*Node, error) {
host, port, err := net.SplitHostPort(url)
if err != nil {
return nil, err
}
return &Node{host, port}, nil
}
func (n Node) String() string {
return fmt.Sprintf("%s:%s", n.Host, n.Port)
}
type WatchCallback func(nodes []*Node)
type DiscoveryService interface {
Initialize(string, int) error
Fetch() ([]*Node, error)
Watch(WatchCallback)
Register(string) error
}
var (
discoveries map[string]DiscoveryService
ErrNotSupported = errors.New("discovery service not supported")
ErrNotImplemented = errors.New("not implemented in this discovery service")
)
func init() {
discoveries = make(map[string]DiscoveryService)
}
func Register(scheme string, d DiscoveryService) error {
if _, exists := discoveries[scheme]; exists {
return fmt.Errorf("scheme already registered %s", scheme)
}
log.WithField("name", scheme).Debug("Registering discovery service")
discoveries[scheme] = d
return nil
}
func parse(rawurl string) (string, string) {
parts := strings.SplitN(rawurl, "://", 2)
// nodes:port,node2:port => nodes://node1:port,node2:port
if len(parts) == 1 {
return "nodes", parts[0]
}
return parts[0], parts[1]
}
func New(rawurl string, heartbeat int) (DiscoveryService, error) {
scheme, uri := parse(rawurl)
if discovery, exists := discoveries[scheme]; exists {
log.WithFields(log.Fields{"name": scheme, "uri": uri}).Debug("Initializing discovery service")
err := discovery.Initialize(uri, heartbeat)
return discovery, err
}
return nil, ErrNotSupported
}

View File

@ -1,39 +0,0 @@
package discovery
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestNewNode(t *testing.T) {
node, err := NewNode("127.0.0.1:2375")
assert.Equal(t, node.Host, "127.0.0.1")
assert.Equal(t, node.Port, "2375")
assert.NoError(t, err)
_, err = NewNode("127.0.0.1")
assert.Error(t, err)
}
func TestParse(t *testing.T) {
scheme, uri := parse("127.0.0.1:2375")
assert.Equal(t, scheme, "nodes")
assert.Equal(t, uri, "127.0.0.1:2375")
scheme, uri = parse("localhost:2375")
assert.Equal(t, scheme, "nodes")
assert.Equal(t, uri, "localhost:2375")
scheme, uri = parse("scheme://127.0.0.1:2375")
assert.Equal(t, scheme, "scheme")
assert.Equal(t, uri, "127.0.0.1:2375")
scheme, uri = parse("scheme://localhost:2375")
assert.Equal(t, scheme, "scheme")
assert.Equal(t, uri, "localhost:2375")
scheme, uri = parse("")
assert.Equal(t, scheme, "nodes")
assert.Equal(t, uri, "")
}

View File

@ -1,87 +0,0 @@
package etcd
import (
"fmt"
"path"
"strings"
log "github.com/Sirupsen/logrus"
"github.com/coreos/go-etcd/etcd"
"github.com/docker/swarm/discovery"
)
type EtcdDiscoveryService struct {
ttl uint64
client *etcd.Client
path string
}
func init() {
discovery.Register("etcd", &EtcdDiscoveryService{})
}
func (s *EtcdDiscoveryService) Initialize(uris string, heartbeat int) error {
var (
// split here because uris can contain multiples ips
// like `etcd://192.168.0.1,192.168.0.2,192.168.0.3/path`
parts = strings.SplitN(uris, "/", 2)
ips = strings.Split(parts[0], ",")
machines []string
)
if len(parts) != 2 {
return fmt.Errorf("invalid format %q, missing <path>", uris)
}
for _, ip := range ips {
machines = append(machines, "http://"+ip)
}
s.client = etcd.NewClient(machines)
s.ttl = uint64(heartbeat * 3 / 2)
s.path = "/" + parts[1] + "/"
if _, err := s.client.CreateDir(s.path, s.ttl); err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
if etcdError.ErrorCode != 105 { // skip key already exists
return err
}
} else {
return err
}
}
return nil
}
func (s *EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) {
resp, err := s.client.Get(s.path, true, true)
if err != nil {
return nil, err
}
var nodes []*discovery.Node
for _, n := range resp.Node.Nodes {
node, err := discovery.NewNode(n.Value)
if err != nil {
return nil, err
}
nodes = append(nodes, node)
}
return nodes, nil
}
func (s *EtcdDiscoveryService) Watch(callback discovery.WatchCallback) {
watchChan := make(chan *etcd.Response)
go s.client.Watch(s.path, 0, true, watchChan, nil)
for _ = range watchChan {
log.WithField("name", "etcd").Debug("Discovery watch triggered")
nodes, err := s.Fetch()
if err == nil {
callback(nodes)
}
}
}
func (s *EtcdDiscoveryService) Register(addr string) error {
_, err := s.client.Set(path.Join(s.path, addr), addr, s.ttl)
return err
}

View File

@ -1,19 +0,0 @@
package etcd
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestInitialize(t *testing.T) {
discovery := &EtcdDiscoveryService{}
assert.Equal(t, discovery.Initialize("127.0.0.1", 0).Error(), "invalid format \"127.0.0.1\", missing <path>")
assert.Error(t, discovery.Initialize("127.0.0.1/path", 0))
assert.Equal(t, discovery.path, "/path/")
assert.Error(t, discovery.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0))
assert.Equal(t, discovery.path, "/path/")
}

View File

@ -1,57 +0,0 @@
package file
import (
"io/ioutil"
"strings"
"time"
"github.com/docker/swarm/discovery"
)
type FileDiscoveryService struct {
heartbeat int
path string
}
func init() {
discovery.Register("file", &FileDiscoveryService{})
}
func (s *FileDiscoveryService) Initialize(path string, heartbeat int) error {
s.path = path
s.heartbeat = heartbeat
return nil
}
func (s *FileDiscoveryService) Fetch() ([]*discovery.Node, error) {
data, err := ioutil.ReadFile(s.path)
if err != nil {
return nil, err
}
var nodes []*discovery.Node
for _, line := range strings.Split(string(data), "\n") {
if line != "" {
node, err := discovery.NewNode(line)
if err != nil {
return nil, err
}
nodes = append(nodes, node)
}
}
return nodes, nil
}
func (s *FileDiscoveryService) Watch(callback discovery.WatchCallback) {
for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) {
nodes, err := s.Fetch()
if err == nil {
callback(nodes)
}
}
}
func (s *FileDiscoveryService) Register(addr string) error {
return discovery.ErrNotImplemented
}

View File

@ -1,18 +0,0 @@
package file
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestInitialize(t *testing.T) {
discovery := &FileDiscoveryService{}
discovery.Initialize("/path/to/file", 0)
assert.Equal(t, discovery.path, "/path/to/file")
}
func TestRegister(t *testing.T) {
discovery := &FileDiscoveryService{path: "/path/to/file"}
assert.Error(t, discovery.Register("0.0.0.0"))
}

View File

@ -1,37 +0,0 @@
package nodes
import (
"strings"
"github.com/docker/swarm/discovery"
)
type NodesDiscoveryService struct {
nodes []*discovery.Node
}
func init() {
discovery.Register("nodes", &NodesDiscoveryService{})
}
func (s *NodesDiscoveryService) Initialize(uris string, _ int) error {
for _, ip := range strings.Split(uris, ",") {
node, err := discovery.NewNode(ip)
if err != nil {
return err
}
s.nodes = append(s.nodes, node)
}
return nil
}
func (s *NodesDiscoveryService) Fetch() ([]*discovery.Node, error) {
return s.nodes, nil
}
func (s *NodesDiscoveryService) Watch(callback discovery.WatchCallback) {
}
func (s *NodesDiscoveryService) Register(addr string) error {
return discovery.ErrNotImplemented
}

View File

@ -1,20 +0,0 @@
package nodes
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestInitialise(t *testing.T) {
discovery := &NodesDiscoveryService{}
discovery.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0)
assert.Equal(t, len(discovery.nodes), 2)
assert.Equal(t, discovery.nodes[0].String(), "1.1.1.1:1111")
assert.Equal(t, discovery.nodes[1].String(), "2.2.2.2:2222")
}
func TestRegister(t *testing.T) {
discovery := &NodesDiscoveryService{}
assert.Error(t, discovery.Register("0.0.0.0"))
}

View File

@ -1,31 +0,0 @@
#discovery.hub.docker.com
Docker Swarm comes with a simple discovery service built into the [Docker Hub](http://hub.docker.com)
The discovery service is still in alpha stage and currently hosted at `http://discovery-stage.hub.docker.com`
#####Create a new cluster
`-> POST http://discovery.hub.docker.com/v1/clusters (data="")`
`<- <token>`
#####Add new nodes to a cluster
`-> POST http://discovery.hub.docker.com/v1/clusters/<token> (data="<ip:port1>")`
`<- OK`
`-> POST http://discovery.hub.docker.com/v1/clusters/token (data="<ip:port2>")`
`<- OK`
#####List nodes in a cluster
`-> GET http://discovery.hub.docker.com/v1/clusters/token`
`<- ["<ip:port1>", "<ip:port2>"]`
#####Delete a cluster (all the nodes in a cluster)
`-> DELETE http://discovery.hub.docker.com/v1/clusters/token`
`<- OK`

View File

@ -1,98 +0,0 @@
package token
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/docker/swarm/discovery"
)
const DISCOVERY_URL = "https://discovery-stage.hub.docker.com/v1"
type TokenDiscoveryService struct {
heartbeat int
url string
token string
}
func init() {
discovery.Register("token", &TokenDiscoveryService{})
}
func (s *TokenDiscoveryService) Initialize(urltoken string, heartbeat int) error {
if i := strings.LastIndex(urltoken, "/"); i != -1 {
s.url = "https://" + urltoken[:i]
s.token = urltoken[i+1:]
} else {
s.url = DISCOVERY_URL
s.token = urltoken
}
s.heartbeat = heartbeat
return nil
}
// Fetch returns the list of nodes for the discovery service at the specified endpoint
func (s *TokenDiscoveryService) Fetch() ([]*discovery.Node, error) {
resp, err := http.Get(fmt.Sprintf("%s/%s/%s", s.url, "clusters", s.token))
if err != nil {
return nil, err
}
if resp.Body != nil {
defer resp.Body.Close()
}
var addrs []string
if resp.StatusCode == http.StatusOK {
if err := json.NewDecoder(resp.Body).Decode(&addrs); err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("Failed to fetch nodes, Discovery service returned %d HTTP status code", resp.StatusCode)
}
var nodes []*discovery.Node
for _, addr := range addrs {
node, err := discovery.NewNode(addr)
if err != nil {
return nil, err
}
nodes = append(nodes, node)
}
return nodes, nil
}
func (s *TokenDiscoveryService) Watch(callback discovery.WatchCallback) {
for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) {
nodes, err := s.Fetch()
if err == nil {
callback(nodes)
}
}
}
// RegisterNode adds a new node identified by the into the discovery service
func (s *TokenDiscoveryService) Register(addr string) error {
buf := strings.NewReader(addr)
_, err := http.Post(fmt.Sprintf("%s/%s/%s", s.url,
"clusters", s.token), "application/json", buf)
return err
}
// CreateCluster returns a unique cluster token
func (s *TokenDiscoveryService) CreateCluster() (string, error) {
resp, err := http.Post(fmt.Sprintf("%s/%s", s.url, "clusters"), "", nil)
if err != nil {
return "", err
}
token, err := ioutil.ReadAll(resp.Body)
return string(token), err
}

View File

@ -1,31 +0,0 @@
package token
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestInitialize(t *testing.T) {
discovery := &TokenDiscoveryService{}
discovery.Initialize("token", 0)
assert.Equal(t, discovery.token, "token")
assert.Equal(t, discovery.url, DISCOVERY_URL)
discovery.Initialize("custom/path/token", 0)
assert.Equal(t, discovery.token, "token")
assert.Equal(t, discovery.url, "https://custom/path")
}
func TestRegister(t *testing.T) {
discovery := &TokenDiscoveryService{token: "TEST_TOKEN", url: DISCOVERY_URL}
expected := "127.0.0.1:2675"
assert.NoError(t, discovery.Register(expected))
addrs, err := discovery.Fetch()
assert.NoError(t, err)
assert.Equal(t, len(addrs), 1)
assert.Equal(t, addrs[0].String(), expected)
assert.NoError(t, discovery.Register(expected))
}

View File

@ -1,160 +0,0 @@
package zookeeper
import (
"fmt"
"path"
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/discovery"
"github.com/samuel/go-zookeeper/zk"
)
type ZkDiscoveryService struct {
conn *zk.Conn
path []string
heartbeat int
}
func init() {
discovery.Register("zk", &ZkDiscoveryService{})
}
func (s *ZkDiscoveryService) fullpath() string {
return "/" + strings.Join(s.path, "/")
}
func (s *ZkDiscoveryService) createFullpath() error {
for i := 1; i <= len(s.path); i++ {
newpath := "/" + strings.Join(s.path[:i], "/")
_, err := s.conn.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll))
if err != nil {
// It's OK if key already existed. Just skip.
if err != zk.ErrNodeExists {
return err
}
}
}
return nil
}
func (s *ZkDiscoveryService) Initialize(uris string, heartbeat int) error {
var (
// split here because uris can contain multiples ips
// like `zk://192.168.0.1,192.168.0.2,192.168.0.3/path`
parts = strings.SplitN(uris, "/", 2)
ips = strings.Split(parts[0], ",")
)
if len(parts) != 2 {
return fmt.Errorf("invalid format %q, missing <path>", uris)
}
if strings.Contains(parts[1], "/") {
s.path = strings.Split(parts[1], "/")
} else {
s.path = []string{parts[1]}
}
conn, _, err := zk.Connect(ips, time.Second)
if err != nil {
return err
}
s.conn = conn
s.heartbeat = heartbeat
err = s.createFullpath()
if err != nil {
return err
}
return nil
}
func (s *ZkDiscoveryService) Fetch() ([]*discovery.Node, error) {
addrs, _, err := s.conn.Children(s.fullpath())
if err != nil {
return nil, err
}
return s.createNodes(addrs)
}
func (s *ZkDiscoveryService) createNodes(addrs []string) ([]*discovery.Node, error) {
nodes := make([]*discovery.Node, 0)
if addrs == nil {
return nodes, nil
}
for _, addr := range addrs {
node, err := discovery.NewNode(addr)
if err != nil {
return nil, err
}
nodes = append(nodes, node)
}
return nodes, nil
}
func (s *ZkDiscoveryService) Watch(callback discovery.WatchCallback) {
addrs, _, eventChan, err := s.conn.ChildrenW(s.fullpath())
if err != nil {
log.WithField("name", "zk").Debug("Discovery watch aborted")
return
}
nodes, err := s.createNodes(addrs)
if err == nil {
callback(nodes)
}
for e := range eventChan {
if e.Type == zk.EventNodeChildrenChanged {
log.WithField("name", "zk").Debug("Discovery watch triggered")
nodes, err := s.Fetch()
if err == nil {
callback(nodes)
}
}
}
}
func (s *ZkDiscoveryService) Register(addr string) error {
nodePath := path.Join(s.fullpath(), addr)
// check existing for the parent path first
exist, _, err := s.conn.Exists(s.fullpath())
if err != nil {
return err
}
// if the parent path does not exist yet
if exist == false {
// create the parent first
err = s.createFullpath()
if err != nil {
return err
}
} else {
// if node path exists
exist, _, err = s.conn.Exists(nodePath)
if err != nil {
return err
}
// delete it first
if exist {
err = s.conn.Delete(nodePath, -1)
if err != nil {
return err
}
}
}
// create the node path to store address information
_, err = s.conn.Create(nodePath, []byte(addr), 0, zk.WorldACL(zk.PermAll))
return err
}

View File

@ -1,39 +0,0 @@
package zookeeper
import (
"testing"
"github.com/docker/swarm/discovery"
"github.com/stretchr/testify/assert"
)
func TestInitialize(t *testing.T) {
service := &ZkDiscoveryService{}
assert.Equal(t, service.Initialize("127.0.0.1", 0).Error(), "invalid format \"127.0.0.1\", missing <path>")
assert.Error(t, service.Initialize("127.0.0.1/path", 0))
assert.Equal(t, service.fullpath(), "/path")
assert.Error(t, service.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0))
assert.Equal(t, service.fullpath(), "/path")
assert.Error(t, service.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path/sub1/sub2", 0))
assert.Equal(t, service.fullpath(), "/path/sub1/sub2")
}
func TestCreateNodes(t *testing.T) {
service := &ZkDiscoveryService{}
nodes, err := service.createNodes(nil)
assert.Equal(t, nodes, []*discovery.Node{})
assert.NoError(t, err)
nodes, err = service.createNodes([]string{"127.0.0.1:2375", "127.0.0.2:2375"})
assert.Equal(t, nodes[0].String(), "127.0.0.1:2375")
assert.Equal(t, nodes[1].String(), "127.0.0.2:2375")
assert.NoError(t, err)
_, err = service.createNodes([]string{"127.0.0.1", "127.0.0.2"})
assert.Error(t, err)
}