cleaner callback

Signed-off-by: Victor Vieux <vieux@docker.com>
This commit is contained in:
Victor Vieux 2014-12-12 22:46:28 +00:00
parent a0dd4b9fe8
commit f2ca549f46
8 changed files with 40 additions and 42 deletions

View File

@ -1,11 +1,13 @@
package cluster package cluster
import ( import (
"crypto/tls"
"errors" "errors"
"strings" "strings"
"sync" "sync"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/discovery"
) )
var ( var (
@ -15,13 +17,15 @@ var (
type Cluster struct { type Cluster struct {
sync.RWMutex sync.RWMutex
tlsConfig *tls.Config
eventHandlers []EventHandler eventHandlers []EventHandler
nodes map[string]*Node nodes map[string]*Node
} }
func NewCluster() *Cluster { func NewCluster(tlsConfig *tls.Config) *Cluster {
return &Cluster{ return &Cluster{
nodes: make(map[string]*Node), tlsConfig: tlsConfig,
nodes: make(map[string]*Node),
} }
} }
@ -52,6 +56,24 @@ func (c *Cluster) AddNode(n *Node) error {
return n.Events(c) return n.Events(c)
} }
func (c *Cluster) UpdateNodes(nodes []*discovery.Node) {
for _, addr := range nodes {
go func(node *discovery.Node) {
if c.Node(node.String()) == nil {
n := NewNode(node.String())
if err := n.Connect(c.tlsConfig); err != nil {
log.Error(err)
return
}
if err := c.AddNode(n); err != nil {
log.Error(err)
return
}
}
}(addr)
}
}
// Containers returns all the containers in the cluster. // Containers returns all the containers in the cluster.
func (c *Cluster) Containers() []*Container { func (c *Cluster) Containers() []*Container {
c.Lock() c.Lock()

View File

@ -32,7 +32,7 @@ func createNode(t *testing.T, ID string, containers ...dockerclient.Container) *
} }
func TestAddNode(t *testing.T) { func TestAddNode(t *testing.T) {
c := NewCluster() c := NewCluster(nil)
assert.Equal(t, len(c.Nodes()), 0) assert.Equal(t, len(c.Nodes()), 0)
assert.Nil(t, c.Node("test")) assert.Nil(t, c.Node("test"))
@ -52,7 +52,7 @@ func TestAddNode(t *testing.T) {
} }
func TestLookupContainer(t *testing.T) { func TestLookupContainer(t *testing.T) {
c := NewCluster() c := NewCluster(nil)
container := dockerclient.Container{ container := dockerclient.Container{
Id: "container-id", Id: "container-id",
Names: []string{"/container-name1", "/container-name2"}, Names: []string{"/container-name1", "/container-name2"},

View File

@ -87,7 +87,7 @@ simply implements this interface:
type DiscoveryService interface { type DiscoveryService interface {
Initialize(string, int) error Initialize(string, int) error
Fetch() ([]string, error) Fetch() ([]string, error)
Watch(*cluster.Cluster, func(*cluster.Cluster, []*Node)) Watch(func([]*Node))
Register(string) error Register(string) error
} }
``` ```
@ -99,9 +99,8 @@ take the `--dicovery` withtout the scheme and a heartbeat (in seconds)
returns the list of all the nodes from the discovery returns the list of all the nodes from the discovery
######Watch ######Watch
triggers when you need to update (`Fetch`) the list of nodes, triggers an update (`Fetch`),it can happen either via
it can happen either via un timer (like `token`) or use a timer (like `token`) or use backend specific features (like `etcd`)
backend specific features (like `etcd`)
######Register ######Register
add a new node to the discovery add a new node to the discovery

View File

@ -7,7 +7,6 @@ import (
"strings" "strings"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
) )
type Node struct { type Node struct {
@ -28,7 +27,7 @@ func (n Node) String() string {
type DiscoveryService interface { type DiscoveryService interface {
Initialize(string, int) error Initialize(string, int) error
Fetch() ([]*Node, error) Fetch() ([]*Node, error)
Watch(*cluster.Cluster, func(*cluster.Cluster, []*Node)) Watch(func(nodes []*Node))
Register(string) error Register(string) error
} }

View File

@ -6,7 +6,6 @@ import (
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/discovery" "github.com/docker/swarm/discovery"
) )
@ -64,14 +63,14 @@ func (s *EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) {
return nodes, nil return nodes, nil
} }
func (s *EtcdDiscoveryService) Watch(c *cluster.Cluster, refresh func(c *cluster.Cluster, nodes []*discovery.Node)) { func (s *EtcdDiscoveryService) Watch(updateNodes func(nodes []*discovery.Node)) {
watchChan := make(chan *etcd.Response) watchChan := make(chan *etcd.Response)
go s.client.Watch(s.path, 0, true, watchChan, nil) go s.client.Watch(s.path, 0, true, watchChan, nil)
for _ = range watchChan { for _ = range watchChan {
log.Debugf("[ETCD] Watch triggered") log.Debugf("[ETCD] Watch triggered")
nodes, err := s.Fetch() nodes, err := s.Fetch()
if err == nil { if err == nil {
refresh(c, nodes) updateNodes(nodes)
} }
} }
} }

View File

@ -5,8 +5,6 @@ import (
"io/ioutil" "io/ioutil"
"strings" "strings"
"time" "time"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/discovery" "github.com/docker/swarm/discovery"
) )
@ -45,11 +43,11 @@ func (s *FileDiscoveryService) Fetch() ([]*discovery.Node, error) {
return nodes, nil return nodes, nil
} }
func (s *FileDiscoveryService) Watch(c *cluster.Cluster, refresh func(c *cluster.Cluster, nodes []*discovery.Node)) { func (s *FileDiscoveryService) Watch(updateNodes func(nodes []*discovery.Node)) {
for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) { for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) {
nodes, err := s.Fetch() nodes, err := s.Fetch()
if err == nil { if err == nil {
refresh(c, nodes) updateNodes(nodes)
} }
} }
} }

View File

@ -8,7 +8,6 @@ import (
"strings" "strings"
"time" "time"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/discovery" "github.com/docker/swarm/discovery"
) )
@ -67,11 +66,11 @@ func (s *TokenDiscoveryService) Fetch() ([]*discovery.Node, error) {
return nodes, nil return nodes, nil
} }
func (s *TokenDiscoveryService) Watch(c *cluster.Cluster, refresh func(c *cluster.Cluster, nodes []*discovery.Node)) { func (s *TokenDiscoveryService) Watch(updateNodes func(nodes []*discovery.Node)) {
for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) { for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) {
nodes, err := s.Fetch() nodes, err := s.Fetch()
if err == nil { if err == nil {
refresh(c, nodes) updateNodes(nodes)
} }
} }
} }

View File

@ -71,25 +71,7 @@ func manage(c *cli.Context) {
} }
} }
refresh := func(c *cluster.Cluster, nodes []*discovery.Node) { cluster := cluster.NewCluster(tlsConfig)
for _, addr := range nodes {
go func(node *discovery.Node) {
if c.Node(node.String()) == nil {
n := cluster.NewNode(node.String())
if err := n.Connect(tlsConfig); err != nil {
log.Error(err)
return
}
if err := c.AddNode(n); err != nil {
log.Error(err)
return
}
}
}(addr)
}
}
cluster := cluster.NewCluster()
cluster.Events(&logHandler{}) cluster.Events(&logHandler{})
go func() { go func() {
@ -104,15 +86,15 @@ func manage(c *cli.Context) {
log.Fatal(err) log.Fatal(err)
} }
refresh(cluster, nodes) cluster.UpdateNodes(nodes)
go d.Watch(cluster, refresh) go d.Watch(cluster.UpdateNodes)
} else { } else {
var nodes []*discovery.Node var nodes []*discovery.Node
for _, arg := range c.Args() { for _, arg := range c.Args() {
nodes = append(nodes, discovery.NewNode(arg)) nodes = append(nodes, discovery.NewNode(arg))
} }
refresh(cluster, nodes) cluster.UpdateNodes(nodes)
} }
}() }()