Merge pull request #364 from vieux/node_machine

replace discovery.Node by discovery.Entry + small refactor
This commit is contained in:
Andrea Luzzardi 2015-02-09 18:42:43 -08:00
commit 3de12b80c3
11 changed files with 91 additions and 123 deletions

View File

@ -99,11 +99,11 @@ func (c *Cluster) AddNode(n *Node) error {
return n.Events(c) return n.Events(c)
} }
func (c *Cluster) UpdateNodes(nodes []*discovery.Node) { func (c *Cluster) UpdateNodes(entries []*discovery.Entry) {
for _, addr := range nodes { for _, entry := range entries {
go func(node *discovery.Node) { go func(m *discovery.Entry) {
if c.Node(node.String()) == nil { if c.Node(m.String()) == nil {
n := NewNode(node.String(), c.overcommitRatio) n := NewNode(m.String(), c.overcommitRatio)
if err := n.Connect(c.tlsConfig); err != nil { if err := n.Connect(c.tlsConfig); err != nil {
log.Error(err) log.Error(err)
return return
@ -113,7 +113,7 @@ func (c *Cluster) UpdateNodes(nodes []*discovery.Node) {
return return
} }
} }
}(addr) }(entry)
} }
} }

View File

@ -52,34 +52,30 @@ func (s *ConsulDiscoveryService) Initialize(uris string, heartbeat int) error {
s.lastIndex = meta.LastIndex s.lastIndex = meta.LastIndex
return nil return nil
} }
func (s *ConsulDiscoveryService) Fetch() ([]*discovery.Node, error) { func (s *ConsulDiscoveryService) Fetch() ([]*discovery.Entry, error) {
kv := s.client.KV() kv := s.client.KV()
pairs, _, err := kv.List(s.prefix, nil) pairs, _, err := kv.List(s.prefix, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var nodes []*discovery.Node addrs := []string{}
for _, pair := range pairs { for _, pair := range pairs {
if pair.Key == s.prefix { if pair.Key == s.prefix {
continue continue
} }
node, err := discovery.NewNode(string(pair.Value)) addrs = append(addrs, string(pair.Value))
if err != nil {
return nil, err
}
nodes = append(nodes, node)
} }
return nodes, nil
return discovery.CreateEntries(addrs)
} }
func (s *ConsulDiscoveryService) Watch(callback discovery.WatchCallback) { func (s *ConsulDiscoveryService) Watch(callback discovery.WatchCallback) {
for _ = range s.waitForChange() { for _ = range s.waitForChange() {
log.WithField("name", "consul").Debug("Discovery watch triggered") log.WithField("name", "consul").Debug("Discovery watch triggered")
nodes, err := s.Fetch() entries, err := s.Fetch()
if err == nil { if err == nil {
callback(nodes) callback(entries)
} }
} }
} }

View File

@ -9,28 +9,28 @@ import (
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
) )
type Node struct { type Entry struct {
Host string Host string
Port string Port string
} }
func NewNode(url string) (*Node, error) { func NewEntry(url string) (*Entry, error) {
host, port, err := net.SplitHostPort(url) host, port, err := net.SplitHostPort(url)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Node{host, port}, nil return &Entry{host, port}, nil
} }
func (n Node) String() string { func (m Entry) String() string {
return fmt.Sprintf("%s:%s", n.Host, n.Port) return fmt.Sprintf("%s:%s", m.Host, m.Port)
} }
type WatchCallback func(nodes []*Node) type WatchCallback func(entries []*Entry)
type DiscoveryService interface { type DiscoveryService interface {
Initialize(string, int) error Initialize(string, int) error
Fetch() ([]*Node, error) Fetch() ([]*Entry, error)
Watch(WatchCallback) Watch(WatchCallback)
Register(string) error Register(string) error
} }
@ -76,3 +76,19 @@ func New(rawurl string, heartbeat int) (DiscoveryService, error) {
return nil, ErrNotSupported return nil, ErrNotSupported
} }
func CreateEntries(addrs []string) ([]*Entry, error) {
entries := []*Entry{}
if addrs == nil {
return entries, nil
}
for _, addr := range addrs {
entry, err := NewEntry(addr)
if err != nil {
return nil, err
}
entries = append(entries, entry)
}
return entries, nil
}

View File

@ -6,13 +6,13 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestNewNode(t *testing.T) { func TestNewEntry(t *testing.T) {
node, err := NewNode("127.0.0.1:2375") entry, err := NewEntry("127.0.0.1:2375")
assert.Equal(t, node.Host, "127.0.0.1") assert.Equal(t, entry.Host, "127.0.0.1")
assert.Equal(t, node.Port, "2375") assert.Equal(t, entry.Port, "2375")
assert.NoError(t, err) assert.NoError(t, err)
_, err = NewNode("127.0.0.1") _, err = NewEntry("127.0.0.1")
assert.Error(t, err) assert.Error(t, err)
} }
@ -37,3 +37,17 @@ func TestParse(t *testing.T) {
assert.Equal(t, scheme, "nodes") assert.Equal(t, scheme, "nodes")
assert.Equal(t, uri, "") assert.Equal(t, uri, "")
} }
func TestCreateEntries(t *testing.T) {
entries, err := CreateEntries(nil)
assert.Equal(t, entries, []*Entry{})
assert.NoError(t, err)
entries, err = CreateEntries([]string{"127.0.0.1:2375", "127.0.0.2:2375"})
assert.Equal(t, entries[0].String(), "127.0.0.1:2375")
assert.Equal(t, entries[1].String(), "127.0.0.2:2375")
assert.NoError(t, err)
_, err = CreateEntries([]string{"127.0.0.1", "127.0.0.2"})
assert.Error(t, err)
}

View File

@ -24,9 +24,9 @@ func (s *EtcdDiscoveryService) Initialize(uris string, heartbeat int) error {
var ( var (
// split here because uris can contain multiples ips // split here because uris can contain multiples ips
// like `etcd://192.168.0.1,192.168.0.2,192.168.0.3/path` // like `etcd://192.168.0.1,192.168.0.2,192.168.0.3/path`
parts = strings.SplitN(uris, "/", 2) parts = strings.SplitN(uris, "/", 2)
ips = strings.Split(parts[0], ",") ips = strings.Split(parts[0], ",")
machines []string entries []string
) )
if len(parts) != 2 { if len(parts) != 2 {
@ -34,10 +34,10 @@ func (s *EtcdDiscoveryService) Initialize(uris string, heartbeat int) error {
} }
for _, ip := range ips { for _, ip := range ips {
machines = append(machines, "http://"+ip) entries = append(entries, "http://"+ip)
} }
s.client = etcd.NewClient(machines) s.client = etcd.NewClient(entries)
s.ttl = uint64(heartbeat * 3 / 2) s.ttl = uint64(heartbeat * 3 / 2)
s.path = "/" + parts[1] + "/" s.path = "/" + parts[1] + "/"
if _, err := s.client.CreateDir(s.path, s.ttl); err != nil { if _, err := s.client.CreateDir(s.path, s.ttl); err != nil {
@ -51,22 +51,17 @@ func (s *EtcdDiscoveryService) Initialize(uris string, heartbeat int) error {
} }
return nil return nil
} }
func (s *EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) { func (s *EtcdDiscoveryService) Fetch() ([]*discovery.Entry, error) {
resp, err := s.client.Get(s.path, true, true) resp, err := s.client.Get(s.path, true, true)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var nodes []*discovery.Node addrs := []string{}
for _, n := range resp.Node.Nodes { for _, n := range resp.Node.Nodes {
node, err := discovery.NewNode(n.Value) addrs = append(addrs, n.Value)
if err != nil {
return nil, err
}
nodes = append(nodes, node)
} }
return nodes, nil return discovery.CreateEntries(addrs)
} }
func (s *EtcdDiscoveryService) Watch(callback discovery.WatchCallback) { func (s *EtcdDiscoveryService) Watch(callback discovery.WatchCallback) {
@ -74,9 +69,9 @@ func (s *EtcdDiscoveryService) Watch(callback discovery.WatchCallback) {
go s.client.Watch(s.path, 0, true, watchChan, nil) go s.client.Watch(s.path, 0, true, watchChan, nil)
for _ = range watchChan { for _ = range watchChan {
log.WithField("name", "etcd").Debug("Discovery watch triggered") log.WithField("name", "etcd").Debug("Discovery watch triggered")
nodes, err := s.Fetch() entries, err := s.Fetch()
if err == nil { if err == nil {
callback(nodes) callback(entries)
} }
} }
} }

View File

@ -23,31 +23,20 @@ func (s *FileDiscoveryService) Initialize(path string, heartbeat int) error {
return nil return nil
} }
func (s *FileDiscoveryService) Fetch() ([]*discovery.Node, error) { func (s *FileDiscoveryService) Fetch() ([]*discovery.Entry, error) {
data, err := ioutil.ReadFile(s.path) data, err := ioutil.ReadFile(s.path)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var nodes []*discovery.Node return discovery.CreateEntries(strings.Split(string(data), "\n"))
for _, line := range strings.Split(string(data), "\n") {
if line != "" {
node, err := discovery.NewNode(line)
if err != nil {
return nil, err
}
nodes = append(nodes, node)
}
}
return nodes, nil
} }
func (s *FileDiscoveryService) Watch(callback discovery.WatchCallback) { func (s *FileDiscoveryService) Watch(callback discovery.WatchCallback) {
for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) { for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) {
nodes, err := s.Fetch() entries, err := s.Fetch()
if err == nil { if err == nil {
callback(nodes) callback(entries)
} }
} }
} }

View File

@ -7,7 +7,7 @@ import (
) )
type NodesDiscoveryService struct { type NodesDiscoveryService struct {
nodes []*discovery.Node entries []*discovery.Entry
} }
func init() { func init() {
@ -16,17 +16,17 @@ func init() {
func (s *NodesDiscoveryService) Initialize(uris string, _ int) error { func (s *NodesDiscoveryService) Initialize(uris string, _ int) error {
for _, ip := range strings.Split(uris, ",") { for _, ip := range strings.Split(uris, ",") {
node, err := discovery.NewNode(ip) entry, err := discovery.NewEntry(ip)
if err != nil { if err != nil {
return err return err
} }
s.nodes = append(s.nodes, node) s.entries = append(s.entries, entry)
} }
return nil return nil
} }
func (s *NodesDiscoveryService) Fetch() ([]*discovery.Node, error) { func (s *NodesDiscoveryService) Fetch() ([]*discovery.Entry, error) {
return s.nodes, nil return s.entries, nil
} }
func (s *NodesDiscoveryService) Watch(callback discovery.WatchCallback) { func (s *NodesDiscoveryService) Watch(callback discovery.WatchCallback) {

View File

@ -9,9 +9,9 @@ import (
func TestInitialise(t *testing.T) { func TestInitialise(t *testing.T) {
discovery := &NodesDiscoveryService{} discovery := &NodesDiscoveryService{}
discovery.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0) discovery.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0)
assert.Equal(t, len(discovery.nodes), 2) assert.Equal(t, len(discovery.entries), 2)
assert.Equal(t, discovery.nodes[0].String(), "1.1.1.1:1111") assert.Equal(t, discovery.entries[0].String(), "1.1.1.1:1111")
assert.Equal(t, discovery.nodes[1].String(), "2.2.2.2:2222") assert.Equal(t, discovery.entries[1].String(), "2.2.2.2:2222")
} }
func TestRegister(t *testing.T) { func TestRegister(t *testing.T) {

View File

@ -41,8 +41,8 @@ func (s *TokenDiscoveryService) Initialize(urltoken string, heartbeat int) error
return nil return nil
} }
// Fetch returns the list of nodes for the discovery service at the specified endpoint // Fetch returns the list of entries for the discovery service at the specified endpoint
func (s *TokenDiscoveryService) Fetch() ([]*discovery.Node, error) { func (s *TokenDiscoveryService) Fetch() ([]*discovery.Entry, error) {
resp, err := http.Get(fmt.Sprintf("%s/%s/%s", s.url, "clusters", s.token)) resp, err := http.Get(fmt.Sprintf("%s/%s/%s", s.url, "clusters", s.token))
if err != nil { if err != nil {
@ -59,31 +59,22 @@ func (s *TokenDiscoveryService) Fetch() ([]*discovery.Node, error) {
return nil, err return nil, err
} }
} else { } else {
return nil, fmt.Errorf("Failed to fetch nodes, Discovery service returned %d HTTP status code", resp.StatusCode) return nil, fmt.Errorf("Failed to fetch entries, Discovery service returned %d HTTP status code", resp.StatusCode)
} }
var nodes []*discovery.Node return discovery.CreateEntries(addrs)
for _, addr := range addrs {
node, err := discovery.NewNode(addr)
if err != nil {
return nil, err
}
nodes = append(nodes, node)
}
return nodes, nil
} }
func (s *TokenDiscoveryService) Watch(callback discovery.WatchCallback) { func (s *TokenDiscoveryService) Watch(callback discovery.WatchCallback) {
for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) { for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) {
nodes, err := s.Fetch() entries, err := s.Fetch()
if err == nil { if err == nil {
callback(nodes) callback(entries)
} }
} }
} }
// RegisterNode adds a new node identified by the into the discovery service // RegisterEntry adds a new entry identified by the into the discovery service
func (s *TokenDiscoveryService) Register(addr string) error { func (s *TokenDiscoveryService) Register(addr string) error {
buf := strings.NewReader(addr) buf := strings.NewReader(addr)

View File

@ -72,30 +72,14 @@ func (s *ZkDiscoveryService) Initialize(uris string, heartbeat int) error {
return nil return nil
} }
func (s *ZkDiscoveryService) Fetch() ([]*discovery.Node, error) { func (s *ZkDiscoveryService) Fetch() ([]*discovery.Entry, error) {
addrs, _, err := s.conn.Children(s.fullpath()) addrs, _, err := s.conn.Children(s.fullpath())
if err != nil { if err != nil {
return nil, err return nil, err
} }
return s.createNodes(addrs) return discovery.CreateEntries(addrs)
}
func (s *ZkDiscoveryService) createNodes(addrs []string) ([]*discovery.Node, error) {
nodes := make([]*discovery.Node, 0)
if addrs == nil {
return nodes, nil
}
for _, addr := range addrs {
node, err := discovery.NewNode(addr)
if err != nil {
return nil, err
}
nodes = append(nodes, node)
}
return nodes, nil
} }
func (s *ZkDiscoveryService) Watch(callback discovery.WatchCallback) { func (s *ZkDiscoveryService) Watch(callback discovery.WatchCallback) {
@ -105,17 +89,17 @@ func (s *ZkDiscoveryService) Watch(callback discovery.WatchCallback) {
log.WithField("name", "zk").Debug("Discovery watch aborted") log.WithField("name", "zk").Debug("Discovery watch aborted")
return return
} }
nodes, err := s.createNodes(addrs) entries, err := discovery.CreateEntries(addrs)
if err == nil { if err == nil {
callback(nodes) callback(entries)
} }
for e := range eventChan { for e := range eventChan {
if e.Type == zk.EventNodeChildrenChanged { if e.Type == zk.EventNodeChildrenChanged {
log.WithField("name", "zk").Debug("Discovery watch triggered") log.WithField("name", "zk").Debug("Discovery watch triggered")
nodes, err := s.Fetch() entries, err := s.Fetch()
if err == nil { if err == nil {
callback(nodes) callback(entries)
} }
} }

View File

@ -3,7 +3,6 @@ package zookeeper
import ( import (
"testing" "testing"
"github.com/docker/swarm/discovery"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -21,19 +20,3 @@ func TestInitialize(t *testing.T) {
assert.Error(t, service.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path/sub1/sub2", 0)) assert.Error(t, service.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path/sub1/sub2", 0))
assert.Equal(t, service.fullpath(), "/path/sub1/sub2") assert.Equal(t, service.fullpath(), "/path/sub1/sub2")
} }
func TestCreateNodes(t *testing.T) {
service := &ZkDiscoveryService{}
nodes, err := service.createNodes(nil)
assert.Equal(t, nodes, []*discovery.Node{})
assert.NoError(t, err)
nodes, err = service.createNodes([]string{"127.0.0.1:2375", "127.0.0.2:2375"})
assert.Equal(t, nodes[0].String(), "127.0.0.1:2375")
assert.Equal(t, nodes[1].String(), "127.0.0.2:2375")
assert.NoError(t, err)
_, err = service.createNodes([]string{"127.0.0.1", "127.0.0.2"})
assert.Error(t, err)
}