replace discovery.Node by discovery.Entry

Signed-off-by: Victor Vieux <vieux@docker.com>
This commit is contained in:
Victor Vieux 2015-02-04 18:50:51 +00:00
parent ea2a0386ae
commit ba01634743
13 changed files with 134 additions and 82 deletions

View File

@ -99,11 +99,11 @@ func (c *Cluster) AddNode(n *Node) error {
return n.Events(c) return n.Events(c)
} }
func (c *Cluster) UpdateNodes(nodes []*discovery.Node) { func (c *Cluster) UpdateNodes(entries []*discovery.Entry) {
for _, addr := range nodes { for _, entry := range entries {
go func(node *discovery.Node) { go func(m *discovery.Entry) {
if c.Node(node.String()) == nil { if c.Node(m.String()) == nil {
n := NewNode(node.String(), c.overcommitRatio) n := NewNode(m.String(), c.overcommitRatio)
if err := n.Connect(c.tlsConfig); err != nil { if err := n.Connect(c.tlsConfig); err != nil {
log.Error(err) log.Error(err)
return return
@ -113,7 +113,7 @@ func (c *Cluster) UpdateNodes(nodes []*discovery.Node) {
return return
} }
} }
}(addr) }(entry)
} }
} }

View File

@ -52,34 +52,41 @@ func (s *ConsulDiscoveryService) Initialize(uris string, heartbeat int) error {
s.lastIndex = meta.LastIndex s.lastIndex = meta.LastIndex
return nil return nil
} }
func (s *ConsulDiscoveryService) Fetch() ([]*discovery.Node, error) { func (s *ConsulDiscoveryService) Fetch() ([]*discovery.Entry, error) {
kv := s.client.KV() kv := s.client.KV()
pairs, _, err := kv.List(s.prefix, nil) pairs, _, err := kv.List(s.prefix, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var nodes []*discovery.Node return s.createEntries(pairs)
}
func (s *ConsulDiscoveryService) createEntries(pairs consul.KVPairs) ([]*discovery.Entry, error) {
entries := []*discovery.Entry{}
if pairs == nil {
return entries, nil
}
for _, pair := range pairs { for _, pair := range pairs {
if pair.Key == s.prefix { if pair.Key == s.prefix {
continue continue
} }
node, err := discovery.NewNode(string(pair.Value)) entry, err := discovery.NewEntry(string(pair.Value))
if err != nil { if err != nil {
return nil, err return nil, err
} }
nodes = append(nodes, node) entries = append(entries, entry)
} }
return nodes, nil return entries, nil
} }
func (s *ConsulDiscoveryService) Watch(callback discovery.WatchCallback) { func (s *ConsulDiscoveryService) Watch(callback discovery.WatchCallback) {
for _ = range s.waitForChange() { for _ = range s.waitForChange() {
log.WithField("name", "consul").Debug("Discovery watch triggered") log.WithField("name", "consul").Debug("Discovery watch triggered")
nodes, err := s.Fetch() entries, err := s.Fetch()
if err == nil { if err == nil {
callback(nodes) callback(entries)
} }
} }
} }

View File

@ -3,6 +3,8 @@ package consul
import ( import (
"testing" "testing"
consul "github.com/armon/consul-api"
"github.com/docker/swarm/discovery"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -18,3 +20,20 @@ func TestInitialize(t *testing.T) {
assert.Equal(t, discovery.prefix, "path/") assert.Equal(t, discovery.prefix, "path/")
} }
func TestCreateEntries(t *testing.T) {
service := &ConsulDiscoveryService{prefix: "prefix"}
entries, err := service.createEntries(nil)
assert.Equal(t, entries, []*discovery.Entry{})
assert.NoError(t, err)
entries, err = service.createEntries(consul.KVPairs{&consul.KVPair{Value: []byte("127.0.0.1:2375")}, &consul.KVPair{Value: []byte("127.0.0.2:2375")}})
assert.Equal(t, len(entries), 2)
assert.Equal(t, entries[0].String(), "127.0.0.1:2375")
assert.Equal(t, entries[1].String(), "127.0.0.2:2375")
assert.NoError(t, err)
_, err = service.createEntries(consul.KVPairs{&consul.KVPair{Value: []byte("127.0.0.1")}, &consul.KVPair{Value: []byte("127.0.0.2")}})
assert.Error(t, err)
}

View File

@ -9,28 +9,28 @@ import (
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
) )
type Node struct { type Entry struct {
Host string Host string
Port string Port string
} }
func NewNode(url string) (*Node, error) { func NewEntry(url string) (*Entry, error) {
host, port, err := net.SplitHostPort(url) host, port, err := net.SplitHostPort(url)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Node{host, port}, nil return &Entry{host, port}, nil
} }
func (n Node) String() string { func (m Entry) String() string {
return fmt.Sprintf("%s:%s", n.Host, n.Port) return fmt.Sprintf("%s:%s", m.Host, m.Port)
} }
type WatchCallback func(nodes []*Node) type WatchCallback func(entries []*Entry)
type DiscoveryService interface { type DiscoveryService interface {
Initialize(string, int) error Initialize(string, int) error
Fetch() ([]*Node, error) Fetch() ([]*Entry, error)
Watch(WatchCallback) Watch(WatchCallback)
Register(string) error Register(string) error
} }

View File

@ -6,13 +6,13 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestNewNode(t *testing.T) { func TestNewEntry(t *testing.T) {
node, err := NewNode("127.0.0.1:2375") entry, err := NewEntry("127.0.0.1:2375")
assert.Equal(t, node.Host, "127.0.0.1") assert.Equal(t, entry.Host, "127.0.0.1")
assert.Equal(t, node.Port, "2375") assert.Equal(t, entry.Port, "2375")
assert.NoError(t, err) assert.NoError(t, err)
_, err = NewNode("127.0.0.1") _, err = NewEntry("127.0.0.1")
assert.Error(t, err) assert.Error(t, err)
} }

View File

@ -24,9 +24,9 @@ func (s *EtcdDiscoveryService) Initialize(uris string, heartbeat int) error {
var ( var (
// split here because uris can contain multiples ips // split here because uris can contain multiples ips
// like `etcd://192.168.0.1,192.168.0.2,192.168.0.3/path` // like `etcd://192.168.0.1,192.168.0.2,192.168.0.3/path`
parts = strings.SplitN(uris, "/", 2) parts = strings.SplitN(uris, "/", 2)
ips = strings.Split(parts[0], ",") ips = strings.Split(parts[0], ",")
machines []string entries []string
) )
if len(parts) != 2 { if len(parts) != 2 {
@ -34,10 +34,10 @@ func (s *EtcdDiscoveryService) Initialize(uris string, heartbeat int) error {
} }
for _, ip := range ips { for _, ip := range ips {
machines = append(machines, "http://"+ip) entries = append(entries, "http://"+ip)
} }
s.client = etcd.NewClient(machines) s.client = etcd.NewClient(entries)
s.ttl = uint64(heartbeat * 3 / 2) s.ttl = uint64(heartbeat * 3 / 2)
s.path = "/" + parts[1] + "/" s.path = "/" + parts[1] + "/"
if _, err := s.client.CreateDir(s.path, s.ttl); err != nil { if _, err := s.client.CreateDir(s.path, s.ttl); err != nil {
@ -51,22 +51,30 @@ func (s *EtcdDiscoveryService) Initialize(uris string, heartbeat int) error {
} }
return nil return nil
} }
func (s *EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) { func (s *EtcdDiscoveryService) Fetch() ([]*discovery.Entry, error) {
resp, err := s.client.Get(s.path, true, true) resp, err := s.client.Get(s.path, true, true)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var nodes []*discovery.Node return s.createEntries(resp.Node.Nodes)
}
for _, n := range resp.Node.Nodes { func (s *EtcdDiscoveryService) createEntries(nodes etcd.Nodes) ([]*discovery.Entry, error) {
node, err := discovery.NewNode(n.Value) entries := []*discovery.Entry{}
if nodes == nil {
return entries, nil
}
for _, n := range nodes {
entry, err := discovery.NewEntry(n.Value)
if err != nil { if err != nil {
return nil, err return nil, err
} }
nodes = append(nodes, node) entries = append(entries, entry)
} }
return nodes, nil return entries, nil
} }
func (s *EtcdDiscoveryService) Watch(callback discovery.WatchCallback) { func (s *EtcdDiscoveryService) Watch(callback discovery.WatchCallback) {
@ -74,9 +82,9 @@ func (s *EtcdDiscoveryService) Watch(callback discovery.WatchCallback) {
go s.client.Watch(s.path, 0, true, watchChan, nil) go s.client.Watch(s.path, 0, true, watchChan, nil)
for _ = range watchChan { for _ = range watchChan {
log.WithField("name", "etcd").Debug("Discovery watch triggered") log.WithField("name", "etcd").Debug("Discovery watch triggered")
nodes, err := s.Fetch() entries, err := s.Fetch()
if err == nil { if err == nil {
callback(nodes) callback(entries)
} }
} }
} }

View File

@ -3,6 +3,8 @@ package etcd
import ( import (
"testing" "testing"
"github.com/coreos/go-etcd/etcd"
"github.com/docker/swarm/discovery"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -17,3 +19,19 @@ func TestInitialize(t *testing.T) {
assert.Error(t, discovery.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0)) assert.Error(t, discovery.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0))
assert.Equal(t, discovery.path, "/path/") assert.Equal(t, discovery.path, "/path/")
} }
func TestCreateEntries(t *testing.T) {
service := &EtcdDiscoveryService{}
entries, err := service.createEntries(nil)
assert.Equal(t, entries, []*discovery.Entry{})
assert.NoError(t, err)
entries, err = service.createEntries(etcd.Nodes{&etcd.Node{Value: "127.0.0.1:2375"}, &etcd.Node{Value: "127.0.0.2:2375"}})
assert.Equal(t, entries[0].String(), "127.0.0.1:2375")
assert.Equal(t, entries[1].String(), "127.0.0.2:2375")
assert.NoError(t, err)
_, err = service.createEntries(etcd.Nodes{&etcd.Node{Value: "127.0.0.1"}, &etcd.Node{Value: "127.0.0.2"}})
assert.Error(t, err)
}

View File

@ -23,31 +23,31 @@ func (s *FileDiscoveryService) Initialize(path string, heartbeat int) error {
return nil return nil
} }
func (s *FileDiscoveryService) Fetch() ([]*discovery.Node, error) { func (s *FileDiscoveryService) Fetch() ([]*discovery.Entry, error) {
data, err := ioutil.ReadFile(s.path) data, err := ioutil.ReadFile(s.path)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var nodes []*discovery.Node var entries []*discovery.Entry
for _, line := range strings.Split(string(data), "\n") { for _, line := range strings.Split(string(data), "\n") {
if line != "" { if line != "" {
node, err := discovery.NewNode(line) entry, err := discovery.NewEntry(line)
if err != nil { if err != nil {
return nil, err return nil, err
} }
nodes = append(nodes, node) entries = append(entries, entry)
} }
} }
return nodes, nil return entries, nil
} }
func (s *FileDiscoveryService) Watch(callback discovery.WatchCallback) { func (s *FileDiscoveryService) Watch(callback discovery.WatchCallback) {
for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) { for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) {
nodes, err := s.Fetch() entries, err := s.Fetch()
if err == nil { if err == nil {
callback(nodes) callback(entries)
} }
} }
} }

View File

@ -7,7 +7,7 @@ import (
) )
type NodesDiscoveryService struct { type NodesDiscoveryService struct {
nodes []*discovery.Node entries []*discovery.Entry
} }
func init() { func init() {
@ -16,17 +16,17 @@ func init() {
func (s *NodesDiscoveryService) Initialize(uris string, _ int) error { func (s *NodesDiscoveryService) Initialize(uris string, _ int) error {
for _, ip := range strings.Split(uris, ",") { for _, ip := range strings.Split(uris, ",") {
node, err := discovery.NewNode(ip) entry, err := discovery.NewEntry(ip)
if err != nil { if err != nil {
return err return err
} }
s.nodes = append(s.nodes, node) s.entries = append(s.entries, entry)
} }
return nil return nil
} }
func (s *NodesDiscoveryService) Fetch() ([]*discovery.Node, error) { func (s *NodesDiscoveryService) Fetch() ([]*discovery.Entry, error) {
return s.nodes, nil return s.entries, nil
} }
func (s *NodesDiscoveryService) Watch(callback discovery.WatchCallback) { func (s *NodesDiscoveryService) Watch(callback discovery.WatchCallback) {

View File

@ -9,9 +9,9 @@ import (
func TestInitialise(t *testing.T) { func TestInitialise(t *testing.T) {
discovery := &NodesDiscoveryService{} discovery := &NodesDiscoveryService{}
discovery.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0) discovery.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0)
assert.Equal(t, len(discovery.nodes), 2) assert.Equal(t, len(discovery.entries), 2)
assert.Equal(t, discovery.nodes[0].String(), "1.1.1.1:1111") assert.Equal(t, discovery.entries[0].String(), "1.1.1.1:1111")
assert.Equal(t, discovery.nodes[1].String(), "2.2.2.2:2222") assert.Equal(t, discovery.entries[1].String(), "2.2.2.2:2222")
} }
func TestRegister(t *testing.T) { func TestRegister(t *testing.T) {

View File

@ -41,8 +41,8 @@ func (s *TokenDiscoveryService) Initialize(urltoken string, heartbeat int) error
return nil return nil
} }
// Fetch returns the list of nodes for the discovery service at the specified endpoint // Fetch returns the list of entries for the discovery service at the specified endpoint
func (s *TokenDiscoveryService) Fetch() ([]*discovery.Node, error) { func (s *TokenDiscoveryService) Fetch() ([]*discovery.Entry, error) {
resp, err := http.Get(fmt.Sprintf("%s/%s/%s", s.url, "clusters", s.token)) resp, err := http.Get(fmt.Sprintf("%s/%s/%s", s.url, "clusters", s.token))
if err != nil { if err != nil {
@ -59,31 +59,31 @@ func (s *TokenDiscoveryService) Fetch() ([]*discovery.Node, error) {
return nil, err return nil, err
} }
} else { } else {
return nil, fmt.Errorf("Failed to fetch nodes, Discovery service returned %d HTTP status code", resp.StatusCode) return nil, fmt.Errorf("Failed to fetch entries, Discovery service returned %d HTTP status code", resp.StatusCode)
} }
var nodes []*discovery.Node var entries []*discovery.Entry
for _, addr := range addrs { for _, addr := range addrs {
node, err := discovery.NewNode(addr) entry, err := discovery.NewEntry(addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
nodes = append(nodes, node) entries = append(entries, entry)
} }
return nodes, nil return entries, nil
} }
func (s *TokenDiscoveryService) Watch(callback discovery.WatchCallback) { func (s *TokenDiscoveryService) Watch(callback discovery.WatchCallback) {
for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) { for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) {
nodes, err := s.Fetch() entries, err := s.Fetch()
if err == nil { if err == nil {
callback(nodes) callback(entries)
} }
} }
} }
// RegisterNode adds a new node identified by the into the discovery service // RegisterEntry adds a new entry identified by the into the discovery service
func (s *TokenDiscoveryService) Register(addr string) error { func (s *TokenDiscoveryService) Register(addr string) error {
buf := strings.NewReader(addr) buf := strings.NewReader(addr)

View File

@ -72,30 +72,30 @@ func (s *ZkDiscoveryService) Initialize(uris string, heartbeat int) error {
return nil return nil
} }
func (s *ZkDiscoveryService) Fetch() ([]*discovery.Node, error) { func (s *ZkDiscoveryService) Fetch() ([]*discovery.Entry, error) {
addrs, _, err := s.conn.Children(s.fullpath()) addrs, _, err := s.conn.Children(s.fullpath())
if err != nil { if err != nil {
return nil, err return nil, err
} }
return s.createNodes(addrs) return s.createEntries(addrs)
} }
func (s *ZkDiscoveryService) createNodes(addrs []string) ([]*discovery.Node, error) { func (s *ZkDiscoveryService) createEntries(addrs []string) ([]*discovery.Entry, error) {
nodes := make([]*discovery.Node, 0) entries := []*discovery.Entry{}
if addrs == nil { if addrs == nil {
return nodes, nil return entries, nil
} }
for _, addr := range addrs { for _, addr := range addrs {
node, err := discovery.NewNode(addr) entry, err := discovery.NewEntry(addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
nodes = append(nodes, node) entries = append(entries, entry)
} }
return nodes, nil return entries, nil
} }
func (s *ZkDiscoveryService) Watch(callback discovery.WatchCallback) { func (s *ZkDiscoveryService) Watch(callback discovery.WatchCallback) {
@ -105,17 +105,17 @@ func (s *ZkDiscoveryService) Watch(callback discovery.WatchCallback) {
log.WithField("name", "zk").Debug("Discovery watch aborted") log.WithField("name", "zk").Debug("Discovery watch aborted")
return return
} }
nodes, err := s.createNodes(addrs) entries, err := s.createEntries(addrs)
if err == nil { if err == nil {
callback(nodes) callback(entries)
} }
for e := range eventChan { for e := range eventChan {
if e.Type == zk.EventNodeChildrenChanged { if e.Type == zk.EventNodeChildrenChanged {
log.WithField("name", "zk").Debug("Discovery watch triggered") log.WithField("name", "zk").Debug("Discovery watch triggered")
nodes, err := s.Fetch() entries, err := s.Fetch()
if err == nil { if err == nil {
callback(nodes) callback(entries)
} }
} }

View File

@ -22,18 +22,18 @@ func TestInitialize(t *testing.T) {
assert.Equal(t, service.fullpath(), "/path/sub1/sub2") assert.Equal(t, service.fullpath(), "/path/sub1/sub2")
} }
func TestCreateNodes(t *testing.T) { func TestCreateEntries(t *testing.T) {
service := &ZkDiscoveryService{} service := &ZkDiscoveryService{}
nodes, err := service.createNodes(nil) entries, err := service.createEntries(nil)
assert.Equal(t, nodes, []*discovery.Node{}) assert.Equal(t, entries, []*discovery.Entry{})
assert.NoError(t, err) assert.NoError(t, err)
nodes, err = service.createNodes([]string{"127.0.0.1:2375", "127.0.0.2:2375"}) entries, err = service.createEntries([]string{"127.0.0.1:2375", "127.0.0.2:2375"})
assert.Equal(t, nodes[0].String(), "127.0.0.1:2375") assert.Equal(t, entries[0].String(), "127.0.0.1:2375")
assert.Equal(t, nodes[1].String(), "127.0.0.2:2375") assert.Equal(t, entries[1].String(), "127.0.0.2:2375")
assert.NoError(t, err) assert.NoError(t, err)
_, err = service.createNodes([]string{"127.0.0.1", "127.0.0.2"}) _, err = service.createEntries([]string{"127.0.0.1", "127.0.0.2"})
assert.Error(t, err) assert.Error(t, err)
} }