store: Fixes to etcd and zk WatchTree

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2015-05-15 03:29:31 -07:00
parent 7d7e4aee13
commit c77f7332a0
4 changed files with 19 additions and 12 deletions

View File

@ -78,7 +78,7 @@ func (s *Discovery) Watch(callback discovery.WatchCallback) {
return
}
for kv := range ch {
log.WithField("discovery", s.backend).Debug("Watch triggered")
log.WithField("discovery", s.backend).Debugf("Watch triggered with %d nodes", len(kv))
// Traduce byte array entries to discovery.Entry
entries, _ := discovery.CreateEntries(convertToStringArray(kv))
callback(entries)

View File

@ -154,11 +154,11 @@ func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, erro
pair, meta, err := kv.Get(key, opts)
if err != nil {
log.WithField("name", "consul").Error(err)
log.WithField("backend", "consul").Error(err)
return
}
if pair == nil {
log.WithField("name", "consul").Errorf("Key %s does not exist", key)
log.WithField("backend", "consul").Errorf("Key %s does not exist", key)
return
}
opts.WaitIndex = meta.LastIndex

View File

@ -149,6 +149,10 @@ func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)
// Adapter goroutine: The goal here is to convert wathever format etcd is
// using into our interface.
go func() {
// Push the current value through the channel.
if v, err := s.Get(key); err != nil {
watchCh <- v
}
for {
select {
case result := <-etcdWatchCh:
@ -175,7 +179,6 @@ func (s *Etcd) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPai
watchCh := make(chan []*KVPair)
// Start an etcd watch.
// Note: etcd will send the current value through the channel.
etcdWatchCh := make(chan *etcd.Response)
etcdStopCh := make(chan bool)
go s.client.Watch(prefix, 0, true, etcdWatchCh, etcdStopCh)
@ -183,14 +186,18 @@ func (s *Etcd) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPai
// Adapter goroutine: The goal here is to convert wathever format etcd is
// using into our interface.
go func() {
// Push the current value through the channel.
if list, err := s.List(prefix); err != nil {
watchCh <- list
}
for {
select {
case result := <-etcdWatchCh:
kv := []*KVPair{}
for _, n := range result.Node.Nodes {
kv = append(kv, &KVPair{n.Key, []byte(n.Value), n.ModifiedIndex})
case <-etcdWatchCh:
// FIXME: We should probably use the value pushed by the channel.
// However, .Node.Nodes seems to be empty.
if list, err := s.List(prefix); err == nil {
watchCh <- list
}
watchCh <- kv
case <-stopCh:
etcdStopCh <- true
return

View File

@ -119,8 +119,8 @@ func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, e
for {
select {
case e := <-eventCh:
if e.Type == zk.EventNodeChildrenChanged {
if entry, err := s.Get(key); err != nil {
if e.Type == zk.EventNodeDataChanged {
if entry, err := s.Get(key); err == nil {
watchCh <- entry
}
}
@ -159,7 +159,7 @@ func (s *Zookeeper) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*
select {
case e := <-eventCh:
if e.Type == zk.EventNodeChildrenChanged {
if kv, err := s.List(prefix); err != nil {
if kv, err := s.List(prefix); err == nil {
watchCh <- kv
}
}