diff --git a/discovery/discovery.go b/discovery/discovery.go index 479b339052..2aa4b16e90 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -5,9 +5,9 @@ import ( "fmt" "net/url" "strings" - "time" log "github.com/Sirupsen/logrus" + "github.com/docker/swarm/cluster" ) type Node struct { @@ -28,7 +28,7 @@ func (n Node) String() string { type DiscoveryService interface { Initialize(string, int) error Fetch() ([]*Node, error) - Watch() <-chan time.Time + Watch(*cluster.Cluster, func(*cluster.Cluster, []*Node)) Register(string) error } diff --git a/discovery/etcd/etcd.go b/discovery/etcd/etcd.go index 0ba65fddb2..9f464d7ff1 100644 --- a/discovery/etcd/etcd.go +++ b/discovery/etcd/etcd.go @@ -3,10 +3,10 @@ package etcd import ( "path" "strings" - "time" log "github.com/Sirupsen/logrus" "github.com/coreos/go-etcd/etcd" + "github.com/docker/swarm/cluster" "github.com/docker/swarm/discovery" ) @@ -64,18 +64,16 @@ func (s *EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) { return nodes, nil } -func (s *EtcdDiscoveryService) Watch() <-chan time.Time { +func (s *EtcdDiscoveryService) Watch(c *cluster.Cluster, refresh func(c *cluster.Cluster, nodes []*discovery.Node)) { watchChan := make(chan *etcd.Response) - timeChan := make(chan time.Time) go s.client.Watch(s.path, 0, true, watchChan, nil) - go func() { - for { - <-watchChan - log.Debugf("[ETCD] Watch triggered") - timeChan <- time.Now() + for _ = range watchChan { + log.Debugf("[ETCD] Watch triggered") + nodes, err := s.Fetch() + if err == nil { + refresh(c, nodes) } - }() - return timeChan + } } func (s *EtcdDiscoveryService) Register(addr string) error { diff --git a/discovery/file/file.go b/discovery/file/file.go index d7c1a7cf8a..875b918000 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/docker/swarm/cluster" "github.com/docker/swarm/discovery" ) @@ -44,8 +45,13 @@ func (s *FileDiscoveryService) Fetch() ([]*discovery.Node, error) { return nodes, nil } -func (s *FileDiscoveryService) Watch() <-chan time.Time { - return time.Tick(time.Duration(s.heartbeat) * time.Second) +func (s *FileDiscoveryService) Watch(c *cluster.Cluster, refresh func(c *cluster.Cluster, nodes []*discovery.Node)) { + for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) { + nodes, err := s.Fetch() + if err == nil { + refresh(c, nodes) + } + } } func (s *FileDiscoveryService) Register(addr string) error { diff --git a/discovery/token/token.go b/discovery/token/token.go index 5c270df0bb..29d7c4f58d 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/docker/swarm/cluster" "github.com/docker/swarm/discovery" ) @@ -66,8 +67,13 @@ func (s *TokenDiscoveryService) Fetch() ([]*discovery.Node, error) { return nodes, nil } -func (s *TokenDiscoveryService) Watch() <-chan time.Time { - return time.Tick(time.Duration(s.heartbeat) * time.Second) +func (s *TokenDiscoveryService) Watch(c *cluster.Cluster, refresh func(c *cluster.Cluster, nodes []*discovery.Node)) { + for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) { + nodes, err := s.Fetch() + if err == nil { + refresh(c, nodes) + } + } } // RegisterNode adds a new node identified by the into the discovery service diff --git a/manage.go b/manage.go index ce131e4ff7..7c954f78be 100644 --- a/manage.go +++ b/manage.go @@ -106,14 +106,7 @@ func manage(c *cli.Context) { } refresh(cluster, nodes) - go func() { - for _ = range d.Watch() { - nodes, err = d.Fetch() - if err == nil { - refresh(cluster, nodes) - } - } - }() + go d.Watch(cluster, refresh) } else { var nodes []*discovery.Node for _, arg := range c.Args() {