From a94a104932e0940ed691bc6367fd25dd20e6946d Mon Sep 17 00:00:00 2001 From: JChien Date: Fri, 19 Dec 2014 01:47:28 +0800 Subject: [PATCH] use consul's watch Signed-off-by: JChien --- discovery/consul/consul.go | 42 +++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/discovery/consul/consul.go b/discovery/consul/consul.go index 792e8cb381..00290fa0dd 100644 --- a/discovery/consul/consul.go +++ b/discovery/consul/consul.go @@ -6,14 +6,16 @@ import ( "strings" "time" + log "github.com/Sirupsen/logrus" consul "github.com/armon/consul-api" "github.com/docker/swarm/discovery" ) type ConsulDiscoveryService struct { - heartbeat uint64 + heartbeat time.Duration client *consul.Client prefix string + lastIndex uint64 } func init() { @@ -36,14 +38,18 @@ func (s *ConsulDiscoveryService) Initialize(uris string, heartbeat int) error { return err } s.client = client - s.heartbeat = uint64(heartbeat) + s.heartbeat = time.Duration(heartbeat) * time.Second s.prefix = path + "/" kv := s.client.KV() p := &consul.KVPair{Key: s.prefix, Value: nil} - _, err = kv.Put(p, nil) + if _, err = kv.Put(p, nil); err != nil { + return err + } + _, meta, err := kv.Get(s.prefix, nil) if err != nil { return err } + s.lastIndex = meta.LastIndex return nil } func (s *ConsulDiscoveryService) Fetch() ([]*discovery.Node, error) { @@ -65,9 +71,7 @@ func (s *ConsulDiscoveryService) Fetch() ([]*discovery.Node, error) { } func (s *ConsulDiscoveryService) Watch(callback discovery.WatchCallback) { - c := time.Tick(time.Duration(s.heartbeat) * time.Second) - for { - <-c + for _ = range s.waitForChange() { nodes, err := s.Fetch() if err == nil { callback(nodes) @@ -79,8 +83,26 @@ func (s *ConsulDiscoveryService) Register(addr string) error { kv := s.client.KV() p := &consul.KVPair{Key: path.Join(s.prefix, addr), Value: []byte(addr)} _, err := kv.Put(p, nil) - if err != nil { - return err - } - return nil + return err +} + +func (s *ConsulDiscoveryService) waitForChange() <-chan uint64 { + c := make(chan uint64) + go func() { + for { + kv := s.client.KV() + option := &consul.QueryOptions{ + WaitIndex: s.lastIndex, + WaitTime: s.heartbeat} + _, meta, err := kv.List(s.prefix, option) + if err != nil { + log.Errorln(err) + break + } + s.lastIndex = meta.LastIndex + c <- s.lastIndex + } + close(c) + }() + return c }