watch take a callback

This commit is contained in:
Victor Vieux 2014-12-12 22:14:11 +00:00
parent c85b0a9c39
commit 005b5fe288
5 changed files with 27 additions and 24 deletions

View File

@ -5,9 +5,9 @@ import (
"fmt"
"net/url"
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
)
type Node struct {
@ -28,7 +28,7 @@ func (n Node) String() string {
type DiscoveryService interface {
Initialize(string, int) error
Fetch() ([]*Node, error)
Watch() <-chan time.Time
Watch(*cluster.Cluster, func(*cluster.Cluster, []*Node))
Register(string) error
}

View File

@ -3,10 +3,10 @@ package etcd
import (
"path"
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/coreos/go-etcd/etcd"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/discovery"
)
@ -64,18 +64,16 @@ func (s *EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) {
return nodes, nil
}
func (s *EtcdDiscoveryService) Watch() <-chan time.Time {
func (s *EtcdDiscoveryService) Watch(c *cluster.Cluster, refresh func(c *cluster.Cluster, nodes []*discovery.Node)) {
watchChan := make(chan *etcd.Response)
timeChan := make(chan time.Time)
go s.client.Watch(s.path, 0, true, watchChan, nil)
go func() {
for {
<-watchChan
log.Debugf("[ETCD] Watch triggered")
timeChan <- time.Now()
for _ = range watchChan {
log.Debugf("[ETCD] Watch triggered")
nodes, err := s.Fetch()
if err == nil {
refresh(c, nodes)
}
}()
return timeChan
}
}
func (s *EtcdDiscoveryService) Register(addr string) error {

View File

@ -6,6 +6,7 @@ import (
"strings"
"time"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/discovery"
)
@ -44,8 +45,13 @@ func (s *FileDiscoveryService) Fetch() ([]*discovery.Node, error) {
return nodes, nil
}
func (s *FileDiscoveryService) Watch() <-chan time.Time {
return time.Tick(time.Duration(s.heartbeat) * time.Second)
func (s *FileDiscoveryService) Watch(c *cluster.Cluster, refresh func(c *cluster.Cluster, nodes []*discovery.Node)) {
for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) {
nodes, err := s.Fetch()
if err == nil {
refresh(c, nodes)
}
}
}
func (s *FileDiscoveryService) Register(addr string) error {

View File

@ -8,6 +8,7 @@ import (
"strings"
"time"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/discovery"
)
@ -66,8 +67,13 @@ func (s *TokenDiscoveryService) Fetch() ([]*discovery.Node, error) {
return nodes, nil
}
func (s *TokenDiscoveryService) Watch() <-chan time.Time {
return time.Tick(time.Duration(s.heartbeat) * time.Second)
func (s *TokenDiscoveryService) Watch(c *cluster.Cluster, refresh func(c *cluster.Cluster, nodes []*discovery.Node)) {
for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) {
nodes, err := s.Fetch()
if err == nil {
refresh(c, nodes)
}
}
}
// RegisterNode adds a new node identified by the into the discovery service

View File

@ -106,14 +106,7 @@ func manage(c *cli.Context) {
}
refresh(cluster, nodes)
go func() {
for _ = range d.Watch() {
nodes, err = d.Fetch()
if err == nil {
refresh(cluster, nodes)
}
}
}()
go d.Watch(cluster, refresh)
} else {
var nodes []*discovery.Node
for _, arg := range c.Args() {