From f2ca549f46f71b6c34574404fb0cc3f3d0d21f98 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Fri, 12 Dec 2014 22:46:28 +0000 Subject: [PATCH] cleaner callback Signed-off-by: Victor Vieux --- cluster/cluster.go | 26 ++++++++++++++++++++++++-- cluster/cluster_test.go | 4 ++-- discovery/README.md | 7 +++---- discovery/discovery.go | 3 +-- discovery/etcd/etcd.go | 5 ++--- discovery/file/file.go | 6 ++---- discovery/token/token.go | 5 ++--- manage.go | 26 ++++---------------------- 8 files changed, 40 insertions(+), 42 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index a1dc91a2c9..204f1e1d93 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -1,11 +1,13 @@ package cluster import ( + "crypto/tls" "errors" "strings" "sync" log "github.com/Sirupsen/logrus" + "github.com/docker/swarm/discovery" ) var ( @@ -15,13 +17,15 @@ var ( type Cluster struct { sync.RWMutex + tlsConfig *tls.Config eventHandlers []EventHandler nodes map[string]*Node } -func NewCluster() *Cluster { +func NewCluster(tlsConfig *tls.Config) *Cluster { return &Cluster{ - nodes: make(map[string]*Node), + tlsConfig: tlsConfig, + nodes: make(map[string]*Node), } } @@ -52,6 +56,24 @@ func (c *Cluster) AddNode(n *Node) error { return n.Events(c) } +func (c *Cluster) UpdateNodes(nodes []*discovery.Node) { + for _, addr := range nodes { + go func(node *discovery.Node) { + if c.Node(node.String()) == nil { + n := NewNode(node.String()) + if err := n.Connect(c.tlsConfig); err != nil { + log.Error(err) + return + } + if err := c.AddNode(n); err != nil { + log.Error(err) + return + } + } + }(addr) + } +} + // Containers returns all the containers in the cluster. func (c *Cluster) Containers() []*Container { c.Lock() diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 3653e529ea..e2a58b0d8e 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -32,7 +32,7 @@ func createNode(t *testing.T, ID string, containers ...dockerclient.Container) * } func TestAddNode(t *testing.T) { - c := NewCluster() + c := NewCluster(nil) assert.Equal(t, len(c.Nodes()), 0) assert.Nil(t, c.Node("test")) @@ -52,7 +52,7 @@ func TestAddNode(t *testing.T) { } func TestLookupContainer(t *testing.T) { - c := NewCluster() + c := NewCluster(nil) container := dockerclient.Container{ Id: "container-id", Names: []string{"/container-name1", "/container-name2"}, diff --git a/discovery/README.md b/discovery/README.md index 50f86bd6c5..39e46774ab 100644 --- a/discovery/README.md +++ b/discovery/README.md @@ -87,7 +87,7 @@ simply implements this interface: type DiscoveryService interface { Initialize(string, int) error Fetch() ([]string, error) - Watch(*cluster.Cluster, func(*cluster.Cluster, []*Node)) + Watch(func([]*Node)) Register(string) error } ``` @@ -99,9 +99,8 @@ take the `--dicovery` withtout the scheme and a heartbeat (in seconds) returns the list of all the nodes from the discovery ######Watch -triggers when you need to update (`Fetch`) the list of nodes, -it can happen either via un timer (like `token`) or use -backend specific features (like `etcd`) +triggers an update (`Fetch`),it can happen either via +a timer (like `token`) or use backend specific features (like `etcd`) ######Register add a new node to the discovery diff --git a/discovery/discovery.go b/discovery/discovery.go index 2aa4b16e90..34c81e833f 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -7,7 +7,6 @@ import ( "strings" log "github.com/Sirupsen/logrus" - "github.com/docker/swarm/cluster" ) type Node struct { @@ -28,7 +27,7 @@ func (n Node) String() string { type DiscoveryService interface { Initialize(string, int) error Fetch() ([]*Node, error) - Watch(*cluster.Cluster, func(*cluster.Cluster, []*Node)) + Watch(func(nodes []*Node)) Register(string) error } diff --git a/discovery/etcd/etcd.go b/discovery/etcd/etcd.go index 9f464d7ff1..35dd594f3d 100644 --- a/discovery/etcd/etcd.go +++ b/discovery/etcd/etcd.go @@ -6,7 +6,6 @@ import ( log "github.com/Sirupsen/logrus" "github.com/coreos/go-etcd/etcd" - "github.com/docker/swarm/cluster" "github.com/docker/swarm/discovery" ) @@ -64,14 +63,14 @@ func (s *EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) { return nodes, nil } -func (s *EtcdDiscoveryService) Watch(c *cluster.Cluster, refresh func(c *cluster.Cluster, nodes []*discovery.Node)) { +func (s *EtcdDiscoveryService) Watch(updateNodes func(nodes []*discovery.Node)) { watchChan := make(chan *etcd.Response) go s.client.Watch(s.path, 0, true, watchChan, nil) for _ = range watchChan { log.Debugf("[ETCD] Watch triggered") nodes, err := s.Fetch() if err == nil { - refresh(c, nodes) + updateNodes(nodes) } } } diff --git a/discovery/file/file.go b/discovery/file/file.go index 875b918000..2f8da0bb89 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -5,8 +5,6 @@ import ( "io/ioutil" "strings" "time" - - "github.com/docker/swarm/cluster" "github.com/docker/swarm/discovery" ) @@ -45,11 +43,11 @@ func (s *FileDiscoveryService) Fetch() ([]*discovery.Node, error) { return nodes, nil } -func (s *FileDiscoveryService) Watch(c *cluster.Cluster, refresh func(c *cluster.Cluster, nodes []*discovery.Node)) { +func (s *FileDiscoveryService) Watch(updateNodes func(nodes []*discovery.Node)) { for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) { nodes, err := s.Fetch() if err == nil { - refresh(c, nodes) + updateNodes(nodes) } } } diff --git a/discovery/token/token.go b/discovery/token/token.go index 29d7c4f58d..7bc9cae1c0 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -8,7 +8,6 @@ import ( "strings" "time" - "github.com/docker/swarm/cluster" "github.com/docker/swarm/discovery" ) @@ -67,11 +66,11 @@ func (s *TokenDiscoveryService) Fetch() ([]*discovery.Node, error) { return nodes, nil } -func (s *TokenDiscoveryService) Watch(c *cluster.Cluster, refresh func(c *cluster.Cluster, nodes []*discovery.Node)) { +func (s *TokenDiscoveryService) Watch(updateNodes func(nodes []*discovery.Node)) { for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) { nodes, err := s.Fetch() if err == nil { - refresh(c, nodes) + updateNodes(nodes) } } } diff --git a/manage.go b/manage.go index 7c954f78be..7b7103cb1c 100644 --- a/manage.go +++ b/manage.go @@ -71,25 +71,7 @@ func manage(c *cli.Context) { } } - refresh := func(c *cluster.Cluster, nodes []*discovery.Node) { - for _, addr := range nodes { - go func(node *discovery.Node) { - if c.Node(node.String()) == nil { - n := cluster.NewNode(node.String()) - if err := n.Connect(tlsConfig); err != nil { - log.Error(err) - return - } - if err := c.AddNode(n); err != nil { - log.Error(err) - return - } - } - }(addr) - } - } - - cluster := cluster.NewCluster() + cluster := cluster.NewCluster(tlsConfig) cluster.Events(&logHandler{}) go func() { @@ -104,15 +86,15 @@ func manage(c *cli.Context) { log.Fatal(err) } - refresh(cluster, nodes) + cluster.UpdateNodes(nodes) - go d.Watch(cluster, refresh) + go d.Watch(cluster.UpdateNodes) } else { var nodes []*discovery.Node for _, arg := range c.Args() { nodes = append(nodes, discovery.NewNode(arg)) } - refresh(cluster, nodes) + cluster.UpdateNodes(nodes) } }()