store: Close channels in case of errors in Watch/WatchTree.

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2015-05-15 13:34:33 -07:00
parent 2cdca520d9
commit 3e4e74c5a1
3 changed files with 32 additions and 10 deletions

View File

@ -143,6 +143,8 @@ func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, erro
watchCh := make(chan *KVPair)
go func() {
defer close(watchCh)
opts := &api.QueryOptions{}
for {
// Check if we should quit
@ -153,7 +155,7 @@ func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, erro
}
pair, meta, err := kv.Get(key, opts)
if err != nil {
log.WithField("backend", "consul").Error(err)
log.Errorf("consul: %v", err)
return
}
opts.WaitIndex = meta.LastIndex
@ -177,6 +179,8 @@ func (s *Consul) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVP
watchCh := make(chan []*KVPair)
go func() {
defer close(watchCh)
opts := &api.QueryOptions{}
for {
// Check if we should quit
@ -188,7 +192,7 @@ func (s *Consul) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVP
pairs, meta, err := kv.List(prefix, opts)
if err != nil {
log.WithField("name", "consul").Error(err)
log.Errorf("consul: %v", err)
return
}
kv := []*KVPair{}

View File

@ -138,7 +138,12 @@ func (s *Etcd) Exists(key string) (bool, error) {
// Providing a non-nil stopCh can be used to stop watching.
func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) {
key = normalize(key)
watchCh := make(chan *KVPair)
// Get the current value
current, err := s.Get(key)
if err != nil {
return nil, err
}
// Start an etcd watch.
// Note: etcd will send the current value through the channel.
@ -148,11 +153,13 @@ func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)
// Adapter goroutine: The goal here is to convert wathever format etcd is
// using into our interface.
watchCh := make(chan *KVPair)
go func() {
defer close(watchCh)
// Push the current value through the channel.
if v, err := s.Get(key); err != nil {
watchCh <- v
}
watchCh <- current
for {
select {
case result := <-etcdWatchCh:
@ -176,7 +183,12 @@ func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)
// Providing a non-nil stopCh can be used to stop watching.
func (s *Etcd) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) {
prefix = normalize(prefix)
watchCh := make(chan []*KVPair)
// Get the current value
current, err := s.List(prefix)
if err != nil {
return nil, err
}
// Start an etcd watch.
etcdWatchCh := make(chan *etcd.Response)
@ -185,11 +197,13 @@ func (s *Etcd) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPai
// Adapter goroutine: The goal here is to convert wathever format etcd is
// using into our interface.
watchCh := make(chan []*KVPair)
go func() {
defer close(watchCh)
// Push the current value through the channel.
if list, err := s.List(prefix); err != nil {
watchCh <- list
}
watchCh <- current
for {
select {
case <-etcdWatchCh:

View File

@ -114,6 +114,8 @@ func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, e
// Catch zk notifications and fire changes into the channel.
watchCh := make(chan *KVPair)
go func() {
defer close(watchCh)
// GetW returns the current value before setting the watch.
watchCh <- &KVPair{key, resp, uint64(meta.Mzxid)}
for {
@ -148,6 +150,8 @@ func (s *Zookeeper) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*
// Catch zk notifications and fire changes into the channel.
watchCh := make(chan []*KVPair)
go func() {
defer close(watchCh)
// GetW returns the current value before setting the watch.
kv := []*KVPair{}
for _, item := range entries {