diff --git a/cluster/cluster.go b/cluster/cluster.go index cdfab1ae49..39a38a86b3 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -99,11 +99,11 @@ func (c *Cluster) AddNode(n *Node) error { return n.Events(c) } -func (c *Cluster) UpdateNodes(nodes []*discovery.Node) { - for _, addr := range nodes { - go func(node *discovery.Node) { - if c.Node(node.String()) == nil { - n := NewNode(node.String(), c.overcommitRatio) +func (c *Cluster) UpdateNodes(entries []*discovery.Entry) { + for _, entry := range entries { + go func(m *discovery.Entry) { + if c.Node(m.String()) == nil { + n := NewNode(m.String(), c.overcommitRatio) if err := n.Connect(c.tlsConfig); err != nil { log.Error(err) return @@ -113,7 +113,7 @@ func (c *Cluster) UpdateNodes(nodes []*discovery.Node) { return } } - }(addr) + }(entry) } } diff --git a/discovery/consul/consul.go b/discovery/consul/consul.go index 00ce9df6cf..129865fb6c 100644 --- a/discovery/consul/consul.go +++ b/discovery/consul/consul.go @@ -52,34 +52,30 @@ func (s *ConsulDiscoveryService) Initialize(uris string, heartbeat int) error { s.lastIndex = meta.LastIndex return nil } -func (s *ConsulDiscoveryService) Fetch() ([]*discovery.Node, error) { +func (s *ConsulDiscoveryService) Fetch() ([]*discovery.Entry, error) { kv := s.client.KV() pairs, _, err := kv.List(s.prefix, nil) if err != nil { return nil, err } - var nodes []*discovery.Node - + addrs := []string{} for _, pair := range pairs { if pair.Key == s.prefix { continue } - node, err := discovery.NewNode(string(pair.Value)) - if err != nil { - return nil, err - } - nodes = append(nodes, node) + addrs = append(addrs, string(pair.Value)) } - return nodes, nil + + return discovery.CreateEntries(addrs) } func (s *ConsulDiscoveryService) Watch(callback discovery.WatchCallback) { for _ = range s.waitForChange() { log.WithField("name", "consul").Debug("Discovery watch triggered") - nodes, err := s.Fetch() + entries, err := s.Fetch() if err == nil { - callback(nodes) + callback(entries) } } } diff --git a/discovery/discovery.go b/discovery/discovery.go index c2db7b98a2..59038c65f1 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -9,28 +9,28 @@ import ( log "github.com/Sirupsen/logrus" ) -type Node struct { +type Entry struct { Host string Port string } -func NewNode(url string) (*Node, error) { +func NewEntry(url string) (*Entry, error) { host, port, err := net.SplitHostPort(url) if err != nil { return nil, err } - return &Node{host, port}, nil + return &Entry{host, port}, nil } -func (n Node) String() string { - return fmt.Sprintf("%s:%s", n.Host, n.Port) +func (m Entry) String() string { + return fmt.Sprintf("%s:%s", m.Host, m.Port) } -type WatchCallback func(nodes []*Node) +type WatchCallback func(entries []*Entry) type DiscoveryService interface { Initialize(string, int) error - Fetch() ([]*Node, error) + Fetch() ([]*Entry, error) Watch(WatchCallback) Register(string) error } @@ -76,3 +76,19 @@ func New(rawurl string, heartbeat int) (DiscoveryService, error) { return nil, ErrNotSupported } + +func CreateEntries(addrs []string) ([]*Entry, error) { + entries := []*Entry{} + if addrs == nil { + return entries, nil + } + + for _, addr := range addrs { + entry, err := NewEntry(addr) + if err != nil { + return nil, err + } + entries = append(entries, entry) + } + return entries, nil +} diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go index ace6bc1b2c..caf8c2f5d5 100644 --- a/discovery/discovery_test.go +++ b/discovery/discovery_test.go @@ -6,13 +6,13 @@ import ( "github.com/stretchr/testify/assert" ) -func TestNewNode(t *testing.T) { - node, err := NewNode("127.0.0.1:2375") - assert.Equal(t, node.Host, "127.0.0.1") - assert.Equal(t, node.Port, "2375") +func TestNewEntry(t *testing.T) { + entry, err := NewEntry("127.0.0.1:2375") + assert.Equal(t, entry.Host, "127.0.0.1") + assert.Equal(t, entry.Port, "2375") assert.NoError(t, err) - _, err = NewNode("127.0.0.1") + _, err = NewEntry("127.0.0.1") assert.Error(t, err) } @@ -37,3 +37,17 @@ func TestParse(t *testing.T) { assert.Equal(t, scheme, "nodes") assert.Equal(t, uri, "") } + +func TestCreateEntries(t *testing.T) { + entries, err := CreateEntries(nil) + assert.Equal(t, entries, []*Entry{}) + assert.NoError(t, err) + + entries, err = CreateEntries([]string{"127.0.0.1:2375", "127.0.0.2:2375"}) + assert.Equal(t, entries[0].String(), "127.0.0.1:2375") + assert.Equal(t, entries[1].String(), "127.0.0.2:2375") + assert.NoError(t, err) + + _, err = CreateEntries([]string{"127.0.0.1", "127.0.0.2"}) + assert.Error(t, err) +} diff --git a/discovery/etcd/etcd.go b/discovery/etcd/etcd.go index df9f8d2351..87d9bfe71a 100644 --- a/discovery/etcd/etcd.go +++ b/discovery/etcd/etcd.go @@ -24,9 +24,9 @@ func (s *EtcdDiscoveryService) Initialize(uris string, heartbeat int) error { var ( // split here because uris can contain multiples ips // like `etcd://192.168.0.1,192.168.0.2,192.168.0.3/path` - parts = strings.SplitN(uris, "/", 2) - ips = strings.Split(parts[0], ",") - machines []string + parts = strings.SplitN(uris, "/", 2) + ips = strings.Split(parts[0], ",") + entries []string ) if len(parts) != 2 { @@ -34,10 +34,10 @@ func (s *EtcdDiscoveryService) Initialize(uris string, heartbeat int) error { } for _, ip := range ips { - machines = append(machines, "http://"+ip) + entries = append(entries, "http://"+ip) } - s.client = etcd.NewClient(machines) + s.client = etcd.NewClient(entries) s.ttl = uint64(heartbeat * 3 / 2) s.path = "/" + parts[1] + "/" if _, err := s.client.CreateDir(s.path, s.ttl); err != nil { @@ -51,22 +51,17 @@ func (s *EtcdDiscoveryService) Initialize(uris string, heartbeat int) error { } return nil } -func (s *EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) { +func (s *EtcdDiscoveryService) Fetch() ([]*discovery.Entry, error) { resp, err := s.client.Get(s.path, true, true) if err != nil { return nil, err } - var nodes []*discovery.Node - + addrs := []string{} for _, n := range resp.Node.Nodes { - node, err := discovery.NewNode(n.Value) - if err != nil { - return nil, err - } - nodes = append(nodes, node) + addrs = append(addrs, n.Value) } - return nodes, nil + return discovery.CreateEntries(addrs) } func (s *EtcdDiscoveryService) Watch(callback discovery.WatchCallback) { @@ -74,9 +69,9 @@ func (s *EtcdDiscoveryService) Watch(callback discovery.WatchCallback) { go s.client.Watch(s.path, 0, true, watchChan, nil) for _ = range watchChan { log.WithField("name", "etcd").Debug("Discovery watch triggered") - nodes, err := s.Fetch() + entries, err := s.Fetch() if err == nil { - callback(nodes) + callback(entries) } } } diff --git a/discovery/file/file.go b/discovery/file/file.go index 2594ca7b3f..80e2b7a35d 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -23,31 +23,20 @@ func (s *FileDiscoveryService) Initialize(path string, heartbeat int) error { return nil } -func (s *FileDiscoveryService) Fetch() ([]*discovery.Node, error) { +func (s *FileDiscoveryService) Fetch() ([]*discovery.Entry, error) { data, err := ioutil.ReadFile(s.path) if err != nil { return nil, err } - var nodes []*discovery.Node - - for _, line := range strings.Split(string(data), "\n") { - if line != "" { - node, err := discovery.NewNode(line) - if err != nil { - return nil, err - } - nodes = append(nodes, node) - } - } - return nodes, nil + return discovery.CreateEntries(strings.Split(string(data), "\n")) } func (s *FileDiscoveryService) Watch(callback discovery.WatchCallback) { for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) { - nodes, err := s.Fetch() + entries, err := s.Fetch() if err == nil { - callback(nodes) + callback(entries) } } } diff --git a/discovery/nodes/nodes.go b/discovery/nodes/nodes.go index 8c32ff8330..099f00494e 100644 --- a/discovery/nodes/nodes.go +++ b/discovery/nodes/nodes.go @@ -7,7 +7,7 @@ import ( ) type NodesDiscoveryService struct { - nodes []*discovery.Node + entries []*discovery.Entry } func init() { @@ -16,17 +16,17 @@ func init() { func (s *NodesDiscoveryService) Initialize(uris string, _ int) error { for _, ip := range strings.Split(uris, ",") { - node, err := discovery.NewNode(ip) + entry, err := discovery.NewEntry(ip) if err != nil { return err } - s.nodes = append(s.nodes, node) + s.entries = append(s.entries, entry) } return nil } -func (s *NodesDiscoveryService) Fetch() ([]*discovery.Node, error) { - return s.nodes, nil +func (s *NodesDiscoveryService) Fetch() ([]*discovery.Entry, error) { + return s.entries, nil } func (s *NodesDiscoveryService) Watch(callback discovery.WatchCallback) { diff --git a/discovery/nodes/nodes_test.go b/discovery/nodes/nodes_test.go index b4e4569700..f89ccd145e 100644 --- a/discovery/nodes/nodes_test.go +++ b/discovery/nodes/nodes_test.go @@ -9,9 +9,9 @@ import ( func TestInitialise(t *testing.T) { discovery := &NodesDiscoveryService{} discovery.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0) - assert.Equal(t, len(discovery.nodes), 2) - assert.Equal(t, discovery.nodes[0].String(), "1.1.1.1:1111") - assert.Equal(t, discovery.nodes[1].String(), "2.2.2.2:2222") + assert.Equal(t, len(discovery.entries), 2) + assert.Equal(t, discovery.entries[0].String(), "1.1.1.1:1111") + assert.Equal(t, discovery.entries[1].String(), "2.2.2.2:2222") } func TestRegister(t *testing.T) { diff --git a/discovery/token/token.go b/discovery/token/token.go index efd515bd3a..5274bb9700 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -41,8 +41,8 @@ func (s *TokenDiscoveryService) Initialize(urltoken string, heartbeat int) error return nil } -// Fetch returns the list of nodes for the discovery service at the specified endpoint -func (s *TokenDiscoveryService) Fetch() ([]*discovery.Node, error) { +// Fetch returns the list of entries for the discovery service at the specified endpoint +func (s *TokenDiscoveryService) Fetch() ([]*discovery.Entry, error) { resp, err := http.Get(fmt.Sprintf("%s/%s/%s", s.url, "clusters", s.token)) if err != nil { @@ -59,31 +59,22 @@ func (s *TokenDiscoveryService) Fetch() ([]*discovery.Node, error) { return nil, err } } else { - return nil, fmt.Errorf("Failed to fetch nodes, Discovery service returned %d HTTP status code", resp.StatusCode) + return nil, fmt.Errorf("Failed to fetch entries, Discovery service returned %d HTTP status code", resp.StatusCode) } - var nodes []*discovery.Node - for _, addr := range addrs { - node, err := discovery.NewNode(addr) - if err != nil { - return nil, err - } - nodes = append(nodes, node) - } - - return nodes, nil + return discovery.CreateEntries(addrs) } func (s *TokenDiscoveryService) Watch(callback discovery.WatchCallback) { for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) { - nodes, err := s.Fetch() + entries, err := s.Fetch() if err == nil { - callback(nodes) + callback(entries) } } } -// RegisterNode adds a new node identified by the into the discovery service +// RegisterEntry adds a new entry identified by the into the discovery service func (s *TokenDiscoveryService) Register(addr string) error { buf := strings.NewReader(addr) diff --git a/discovery/zookeeper/zookeeper.go b/discovery/zookeeper/zookeeper.go index 47b69f8795..fc029fa3a2 100644 --- a/discovery/zookeeper/zookeeper.go +++ b/discovery/zookeeper/zookeeper.go @@ -72,30 +72,14 @@ func (s *ZkDiscoveryService) Initialize(uris string, heartbeat int) error { return nil } -func (s *ZkDiscoveryService) Fetch() ([]*discovery.Node, error) { +func (s *ZkDiscoveryService) Fetch() ([]*discovery.Entry, error) { addrs, _, err := s.conn.Children(s.fullpath()) if err != nil { return nil, err } - return s.createNodes(addrs) -} - -func (s *ZkDiscoveryService) createNodes(addrs []string) ([]*discovery.Node, error) { - nodes := make([]*discovery.Node, 0) - if addrs == nil { - return nodes, nil - } - - for _, addr := range addrs { - node, err := discovery.NewNode(addr) - if err != nil { - return nil, err - } - nodes = append(nodes, node) - } - return nodes, nil + return discovery.CreateEntries(addrs) } func (s *ZkDiscoveryService) Watch(callback discovery.WatchCallback) { @@ -105,17 +89,17 @@ func (s *ZkDiscoveryService) Watch(callback discovery.WatchCallback) { log.WithField("name", "zk").Debug("Discovery watch aborted") return } - nodes, err := s.createNodes(addrs) + entries, err := discovery.CreateEntries(addrs) if err == nil { - callback(nodes) + callback(entries) } for e := range eventChan { if e.Type == zk.EventNodeChildrenChanged { log.WithField("name", "zk").Debug("Discovery watch triggered") - nodes, err := s.Fetch() + entries, err := s.Fetch() if err == nil { - callback(nodes) + callback(entries) } } diff --git a/discovery/zookeeper/zookeeper_test.go b/discovery/zookeeper/zookeeper_test.go index 33a2ac89af..0097fa6a88 100644 --- a/discovery/zookeeper/zookeeper_test.go +++ b/discovery/zookeeper/zookeeper_test.go @@ -3,7 +3,6 @@ package zookeeper import ( "testing" - "github.com/docker/swarm/discovery" "github.com/stretchr/testify/assert" ) @@ -21,19 +20,3 @@ func TestInitialize(t *testing.T) { assert.Error(t, service.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path/sub1/sub2", 0)) assert.Equal(t, service.fullpath(), "/path/sub1/sub2") } - -func TestCreateNodes(t *testing.T) { - service := &ZkDiscoveryService{} - - nodes, err := service.createNodes(nil) - assert.Equal(t, nodes, []*discovery.Node{}) - assert.NoError(t, err) - - nodes, err = service.createNodes([]string{"127.0.0.1:2375", "127.0.0.2:2375"}) - assert.Equal(t, nodes[0].String(), "127.0.0.1:2375") - assert.Equal(t, nodes[1].String(), "127.0.0.2:2375") - assert.NoError(t, err) - - _, err = service.createNodes([]string{"127.0.0.1", "127.0.0.2"}) - assert.Error(t, err) -}