diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go index db5ae5e2fc..7882bfa1c7 100644 --- a/discovery/kv/kv.go +++ b/discovery/kv/kv.go @@ -78,7 +78,7 @@ func (s *Discovery) Watch(callback discovery.WatchCallback) { return } for kv := range ch { - log.WithField("discovery", s.backend).Debug("Watch triggered") + log.WithField("discovery", s.backend).Debugf("Watch triggered with %d nodes", len(kv)) // Traduce byte array entries to discovery.Entry entries, _ := discovery.CreateEntries(convertToStringArray(kv)) callback(entries) diff --git a/pkg/store/consul.go b/pkg/store/consul.go index 217d0e40ea..dfd386be74 100644 --- a/pkg/store/consul.go +++ b/pkg/store/consul.go @@ -154,11 +154,11 @@ func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, erro pair, meta, err := kv.Get(key, opts) if err != nil { - log.WithField("name", "consul").Error(err) + log.WithField("backend", "consul").Error(err) return } if pair == nil { - log.WithField("name", "consul").Errorf("Key %s does not exist", key) + log.WithField("backend", "consul").Errorf("Key %s does not exist", key) return } opts.WaitIndex = meta.LastIndex diff --git a/pkg/store/etcd.go b/pkg/store/etcd.go index e2d5595d27..10bdf6553b 100644 --- a/pkg/store/etcd.go +++ b/pkg/store/etcd.go @@ -149,6 +149,10 @@ func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) // Adapter goroutine: The goal here is to convert wathever format etcd is // using into our interface. go func() { + // Push the current value through the channel. + if v, err := s.Get(key); err != nil { + watchCh <- v + } for { select { case result := <-etcdWatchCh: @@ -175,7 +179,6 @@ func (s *Etcd) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPai watchCh := make(chan []*KVPair) // Start an etcd watch. - // Note: etcd will send the current value through the channel. etcdWatchCh := make(chan *etcd.Response) etcdStopCh := make(chan bool) go s.client.Watch(prefix, 0, true, etcdWatchCh, etcdStopCh) @@ -183,14 +186,18 @@ func (s *Etcd) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPai // Adapter goroutine: The goal here is to convert wathever format etcd is // using into our interface. go func() { + // Push the current value through the channel. + if list, err := s.List(prefix); err != nil { + watchCh <- list + } for { select { - case result := <-etcdWatchCh: - kv := []*KVPair{} - for _, n := range result.Node.Nodes { - kv = append(kv, &KVPair{n.Key, []byte(n.Value), n.ModifiedIndex}) + case <-etcdWatchCh: + // FIXME: We should probably use the value pushed by the channel. + // However, .Node.Nodes seems to be empty. + if list, err := s.List(prefix); err == nil { + watchCh <- list } - watchCh <- kv case <-stopCh: etcdStopCh <- true return diff --git a/pkg/store/zookeeper.go b/pkg/store/zookeeper.go index 024a657dfa..0a0592a44e 100644 --- a/pkg/store/zookeeper.go +++ b/pkg/store/zookeeper.go @@ -119,8 +119,8 @@ func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, e for { select { case e := <-eventCh: - if e.Type == zk.EventNodeChildrenChanged { - if entry, err := s.Get(key); err != nil { + if e.Type == zk.EventNodeDataChanged { + if entry, err := s.Get(key); err == nil { watchCh <- entry } } @@ -159,7 +159,7 @@ func (s *Zookeeper) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []* select { case e := <-eventCh: if e.Type == zk.EventNodeChildrenChanged { - if kv, err := s.List(prefix); err != nil { + if kv, err := s.List(prefix); err == nil { watchCh <- kv } }