Merge pull request #167 from vieux/update_watch

watch take a callback
This commit is contained in:
Andrea Luzzardi 2014-12-12 17:10:38 -08:00
commit 88bff68a5a
8 changed files with 57 additions and 53 deletions

View File

@ -1,11 +1,13 @@
package cluster
import (
"crypto/tls"
"errors"
"strings"
"sync"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/discovery"
)
var (
@ -15,13 +17,15 @@ var (
type Cluster struct {
sync.RWMutex
tlsConfig *tls.Config
eventHandlers []EventHandler
nodes map[string]*Node
}
func NewCluster() *Cluster {
func NewCluster(tlsConfig *tls.Config) *Cluster {
return &Cluster{
nodes: make(map[string]*Node),
tlsConfig: tlsConfig,
nodes: make(map[string]*Node),
}
}
@ -52,6 +56,24 @@ func (c *Cluster) AddNode(n *Node) error {
return n.Events(c)
}
func (c *Cluster) UpdateNodes(nodes []*discovery.Node) {
for _, addr := range nodes {
go func(node *discovery.Node) {
if c.Node(node.String()) == nil {
n := NewNode(node.String())
if err := n.Connect(c.tlsConfig); err != nil {
log.Error(err)
return
}
if err := c.AddNode(n); err != nil {
log.Error(err)
return
}
}
}(addr)
}
}
// Containers returns all the containers in the cluster.
func (c *Cluster) Containers() []*Container {
c.Lock()

View File

@ -32,7 +32,7 @@ func createNode(t *testing.T, ID string, containers ...dockerclient.Container) *
}
func TestAddNode(t *testing.T) {
c := NewCluster()
c := NewCluster(nil)
assert.Equal(t, len(c.Nodes()), 0)
assert.Nil(t, c.Node("test"))
@ -52,7 +52,7 @@ func TestAddNode(t *testing.T) {
}
func TestLookupContainer(t *testing.T) {
c := NewCluster()
c := NewCluster(nil)
container := dockerclient.Container{
Id: "container-id",
Names: []string{"/container-name1", "/container-name2"},

View File

@ -87,7 +87,7 @@ simply implements this interface:
type DiscoveryService interface {
Initialize(string, int) error
Fetch() ([]string, error)
Watch() <-chan time.Time
Watch(WatchCallback)
Register(string) error
}
```
@ -99,9 +99,8 @@ take the `--dicovery` withtout the scheme and a heartbeat (in seconds)
returns the list of all the nodes from the discovery
######Watch
triggers when you need to update (`Fetch`) the list of nodes,
it can happen either via un timer (like `token`) or use
backend specific features (like `etcd`)
triggers an update (`Fetch`),it can happen either via
a timer (like `token`) or use backend specific features (like `etcd`)
######Register
add a new node to the discovery

View File

@ -5,7 +5,6 @@ import (
"fmt"
"net/url"
"strings"
"time"
log "github.com/Sirupsen/logrus"
)
@ -25,10 +24,12 @@ func (n Node) String() string {
return n.url
}
type WatchCallback func(nodes []*Node)
type DiscoveryService interface {
Initialize(string, int) error
Fetch() ([]*Node, error)
Watch() <-chan time.Time
Watch(WatchCallback)
Register(string) error
}

View File

@ -3,7 +3,6 @@ package etcd
import (
"path"
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/coreos/go-etcd/etcd"
@ -64,18 +63,16 @@ func (s *EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) {
return nodes, nil
}
func (s *EtcdDiscoveryService) Watch() <-chan time.Time {
func (s *EtcdDiscoveryService) Watch(callback discovery.WatchCallback) {
watchChan := make(chan *etcd.Response)
timeChan := make(chan time.Time)
go s.client.Watch(s.path, 0, true, watchChan, nil)
go func() {
for {
<-watchChan
log.Debugf("[ETCD] Watch triggered")
timeChan <- time.Now()
for _ = range watchChan {
log.Debugf("[ETCD] Watch triggered")
nodes, err := s.Fetch()
if err == nil {
callback(nodes)
}
}()
return timeChan
}
}
func (s *EtcdDiscoveryService) Register(addr string) error {

View File

@ -44,8 +44,13 @@ func (s *FileDiscoveryService) Fetch() ([]*discovery.Node, error) {
return nodes, nil
}
func (s *FileDiscoveryService) Watch() <-chan time.Time {
return time.Tick(time.Duration(s.heartbeat) * time.Second)
func (s *FileDiscoveryService) Watch(callback discovery.WatchCallback) {
for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) {
nodes, err := s.Fetch()
if err == nil {
callback(nodes)
}
}
}
func (s *FileDiscoveryService) Register(addr string) error {

View File

@ -66,8 +66,13 @@ func (s *TokenDiscoveryService) Fetch() ([]*discovery.Node, error) {
return nodes, nil
}
func (s *TokenDiscoveryService) Watch() <-chan time.Time {
return time.Tick(time.Duration(s.heartbeat) * time.Second)
func (s *TokenDiscoveryService) Watch(callback discovery.WatchCallback) {
for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) {
nodes, err := s.Fetch()
if err == nil {
callback(nodes)
}
}
}
// RegisterNode adds a new node identified by the into the discovery service

View File

@ -71,25 +71,7 @@ func manage(c *cli.Context) {
}
}
refresh := func(c *cluster.Cluster, nodes []*discovery.Node) {
for _, addr := range nodes {
go func(node *discovery.Node) {
if c.Node(node.String()) == nil {
n := cluster.NewNode(node.String())
if err := n.Connect(tlsConfig); err != nil {
log.Error(err)
return
}
if err := c.AddNode(n); err != nil {
log.Error(err)
return
}
}
}(addr)
}
}
cluster := cluster.NewCluster()
cluster := cluster.NewCluster(tlsConfig)
cluster.Events(&logHandler{})
go func() {
@ -104,22 +86,15 @@ func manage(c *cli.Context) {
log.Fatal(err)
}
refresh(cluster, nodes)
cluster.UpdateNodes(nodes)
go func() {
for _ = range d.Watch() {
nodes, err = d.Fetch()
if err == nil {
refresh(cluster, nodes)
}
}
}()
go d.Watch(cluster.UpdateNodes)
} else {
var nodes []*discovery.Node
for _, arg := range c.Args() {
nodes = append(nodes, discovery.NewNode(arg))
}
refresh(cluster, nodes)
cluster.UpdateNodes(nodes)
}
}()