use consul's watch

Signed-off-by: JChien <jeffchien13@gmail.com>
This commit is contained in:
JChien 2014-12-19 01:47:28 +08:00 committed by Victor Vieux
parent f6b5ca38d5
commit a94a104932
1 changed files with 32 additions and 10 deletions

View File

@ -6,14 +6,16 @@ import (
"strings"
"time"
log "github.com/Sirupsen/logrus"
consul "github.com/armon/consul-api"
"github.com/docker/swarm/discovery"
)
type ConsulDiscoveryService struct {
heartbeat uint64
heartbeat time.Duration
client *consul.Client
prefix string
lastIndex uint64
}
func init() {
@ -36,14 +38,18 @@ func (s *ConsulDiscoveryService) Initialize(uris string, heartbeat int) error {
return err
}
s.client = client
s.heartbeat = uint64(heartbeat)
s.heartbeat = time.Duration(heartbeat) * time.Second
s.prefix = path + "/"
kv := s.client.KV()
p := &consul.KVPair{Key: s.prefix, Value: nil}
_, err = kv.Put(p, nil)
if _, err = kv.Put(p, nil); err != nil {
return err
}
_, meta, err := kv.Get(s.prefix, nil)
if err != nil {
return err
}
s.lastIndex = meta.LastIndex
return nil
}
func (s *ConsulDiscoveryService) Fetch() ([]*discovery.Node, error) {
@ -65,9 +71,7 @@ func (s *ConsulDiscoveryService) Fetch() ([]*discovery.Node, error) {
}
func (s *ConsulDiscoveryService) Watch(callback discovery.WatchCallback) {
c := time.Tick(time.Duration(s.heartbeat) * time.Second)
for {
<-c
for _ = range s.waitForChange() {
nodes, err := s.Fetch()
if err == nil {
callback(nodes)
@ -79,8 +83,26 @@ func (s *ConsulDiscoveryService) Register(addr string) error {
kv := s.client.KV()
p := &consul.KVPair{Key: path.Join(s.prefix, addr), Value: []byte(addr)}
_, err := kv.Put(p, nil)
if err != nil {
return err
}
return nil
return err
}
func (s *ConsulDiscoveryService) waitForChange() <-chan uint64 {
c := make(chan uint64)
go func() {
for {
kv := s.client.KV()
option := &consul.QueryOptions{
WaitIndex: s.lastIndex,
WaitTime: s.heartbeat}
_, meta, err := kv.List(s.prefix, option)
if err != nil {
log.Errorln(err)
break
}
s.lastIndex = meta.LastIndex
c <- s.lastIndex
}
close(c)
}()
return c
}