From 7d7e4aee132f7dba7b28bd41ee5c2abbe3c19f99 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Fri, 15 May 2015 00:20:19 -0700 Subject: [PATCH] store: Watch: Use channels instead of callbacks. This gets rid of `CancelWatch*` functions and its usage is much simpler. Signed-off-by: Andrea Luzzardi --- discovery/kv/kv.go | 11 ++- pkg/store/consul.go | 158 +++++++++++++++++------------------------ pkg/store/etcd.go | 128 ++++++++++++++++----------------- pkg/store/store.go | 24 +++---- pkg/store/zookeeper.go | 121 ++++++++++++++++--------------- 5 files changed, 211 insertions(+), 231 deletions(-) diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go index d8158f5e53..db5ae5e2fc 100644 --- a/discovery/kv/kv.go +++ b/discovery/kv/kv.go @@ -72,12 +72,17 @@ func (s *Discovery) Fetch() ([]*discovery.Entry, error) { // Watch is exported func (s *Discovery) Watch(callback discovery.WatchCallback) { - s.store.WatchTree(s.prefix, func(kv ...*store.KVPair) { - log.WithField("name", s.backend).Debug("Discovery watch triggered") + ch, err := s.store.WatchTree(s.prefix, nil) + if err != nil { + log.WithField("discovery", s.backend).Errorf("Watch failed: %v", err) + return + } + for kv := range ch { + log.WithField("discovery", s.backend).Debug("Watch triggered") // Traduce byte array entries to discovery.Entry entries, _ := discovery.CreateEntries(convertToStringArray(kv)) callback(entries) - }) + } } // Register is exported diff --git a/pkg/store/consul.go b/pkg/store/consul.go index 72271b05f0..217d0e40ea 100644 --- a/pkg/store/consul.go +++ b/pkg/store/consul.go @@ -12,26 +12,18 @@ import ( // Consul embeds the client and watches type Consul struct { - config *api.Config - client *api.Client - watches map[string]*Watch + config *api.Config + client *api.Client } type consulLock struct { lock *api.Lock } -// Watch embeds the event channel and the -// refresh interval -type Watch struct { - LastIndex uint64 -} - // InitializeConsul creates a new Consul client given // a list of endpoints and optional tls config func InitializeConsul(endpoints []string, options *Config) (Store, error) { s := &Consul{} - s.watches = make(map[string]*Watch) // Create Consul client config := api.DefaultConfig() @@ -116,7 +108,7 @@ func (s *Consul) Exists(key string) (bool, error) { return true, nil } -// List values at "directory" +// List the content of a given prefix func (s *Consul) List(prefix string) ([]*KVPair, error) { pairs, _, err := s.client.KV().List(s.normalize(prefix), nil) if err != nil { @@ -135,109 +127,89 @@ func (s *Consul) List(prefix string) ([]*KVPair, error) { return kv, nil } -// DeleteTree deletes a range of values at "directory" +// DeleteTree deletes a range of keys based on prefix func (s *Consul) DeleteTree(prefix string) error { _, err := s.client.KV().DeleteTree(s.normalize(prefix), nil) return err } -// Watch a single key for modifications -func (s *Consul) Watch(key string, callback WatchCallback) error { - fkey := s.normalize(key) - - // We get the last index first - _, meta, err := s.client.KV().Get(fkey, nil) - if err != nil { - return err - } - - // Add watch to map - s.watches[fkey] = &Watch{LastIndex: meta.LastIndex} - eventChan := s.waitForChange(fkey) - - for _ = range eventChan { - entry, err := s.Get(key) - if err != nil { - log.Error("Cannot refresh the key: ", fkey, ", cancelling watch") - s.watches[fkey] = nil - return err - } - callback(entry) - } - - return nil -} - -// CancelWatch cancels a watch, sends a signal to the appropriate -// stop channel -func (s *Consul) CancelWatch(key string) error { +// Watch changes on a key. +// Returns a channel that will receive changes or an error. +// Upon creating a watch, the current value will be sent to the channel. +// Providing a non-nil stopCh can be used to stop watching. +func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) { key = s.normalize(key) - if _, ok := s.watches[key]; !ok { - log.Error("Chan does not exist for key: ", key) - return ErrWatchDoesNotExist - } - s.watches[key] = nil - return nil -} - -// Internal function to check if a key has changed -func (s *Consul) waitForChange(key string) <-chan uint64 { - ch := make(chan uint64) kv := s.client.KV() + watchCh := make(chan *KVPair) + go func() { + opts := &api.QueryOptions{} for { - watch, ok := s.watches[key] - if !ok { - log.Error("Cannot access last index for key: ", key, " closing channel") - break + // Check if we should quit + select { + case <-stopCh: + return + default: } - option := &api.QueryOptions{ - WaitIndex: watch.LastIndex, - } - _, meta, err := kv.List(key, option) + + pair, meta, err := kv.Get(key, opts) if err != nil { log.WithField("name", "consul").Error(err) - break + return } - watch.LastIndex = meta.LastIndex - ch <- watch.LastIndex + if pair == nil { + log.WithField("name", "consul").Errorf("Key %s does not exist", key) + return + } + opts.WaitIndex = meta.LastIndex + watchCh <- &KVPair{pair.Key, pair.Value, pair.ModifyIndex} } - close(ch) }() - return ch + + return watchCh, nil } -// WatchTree triggers a watch on a range of values at "directory" -func (s *Consul) WatchTree(prefix string, callback WatchCallback) error { - fprefix := s.normalize(prefix) +// WatchTree watches changes on a "directory" +// Returns a channel that will receive changes or an error. +// Upon creating a watch, the current value will be sent to the channel. +// Providing a non-nil stopCh can be used to stop watching. +func (s *Consul) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) { + prefix = s.normalize(prefix) + kv := s.client.KV() + watchCh := make(chan []*KVPair) - // We get the last index first - _, meta, err := s.client.KV().Get(prefix, nil) - if err != nil { - return err - } + go func() { + opts := &api.QueryOptions{} + for { + // Check if we should quit + select { + case <-stopCh: + return + default: + } - // Add watch to map - s.watches[fprefix] = &Watch{LastIndex: meta.LastIndex} - eventChan := s.waitForChange(fprefix) - - for _ = range eventChan { - kvi, err := s.List(prefix) - if err != nil { - log.Error("Cannot refresh keys with prefix: ", fprefix, ", cancelling watch") - s.watches[fprefix] = nil - return err + pairs, meta, err := kv.List(prefix, opts) + if err != nil { + log.WithField("name", "consul").Error(err) + return + } + if len(pairs) == 0 { + log.WithField("name", "consul").Errorf("Key %s does not exist", prefix) + return + } + kv := []*KVPair{} + for _, pair := range pairs { + if pair.Key == prefix { + continue + } + kv = append(kv, &KVPair{pair.Key, pair.Value, pair.ModifyIndex}) + } + opts.WaitIndex = meta.LastIndex + watchCh <- kv } - callback(kvi...) - } + }() - return nil -} - -// CancelWatchRange stops the watch on the range of values, sends -// a signal to the appropriate stop channel -func (s *Consul) CancelWatchRange(prefix string) error { - return s.CancelWatch(prefix) + return watchCh, nil } // CreateLock returns a handle to a lock struct which can be used diff --git a/pkg/store/etcd.go b/pkg/store/etcd.go index 92d77ed615..e2d5595d27 100644 --- a/pkg/store/etcd.go +++ b/pkg/store/etcd.go @@ -7,21 +7,18 @@ import ( "strings" "time" - log "github.com/Sirupsen/logrus" etcd "github.com/coreos/go-etcd/etcd" ) // Etcd embeds the client type Etcd struct { - client *etcd.Client - watches map[string]chan<- bool + client *etcd.Client } // InitializeEtcd creates a new Etcd client given // a list of endpoints and optional tls config func InitializeEtcd(addrs []string, options *Config) (Store, error) { s := &Etcd{} - s.watches = make(map[string]chan<- bool) entries := createEndpoints(addrs, "http") s.client = etcd.NewClient(entries) @@ -135,42 +132,72 @@ func (s *Etcd) Exists(key string) (bool, error) { return true, nil } -// Watch a single key for modifications -func (s *Etcd) Watch(key string, callback WatchCallback) error { +// Watch changes on a key. +// Returns a channel that will receive changes or an error. +// Upon creating a watch, the current value will be sent to the channel. +// Providing a non-nil stopCh can be used to stop watching. +func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) { key = normalize(key) - watchChan := make(chan *etcd.Response) - stopChan := make(chan bool) + watchCh := make(chan *KVPair) - // Create new Watch entry - s.watches[key] = stopChan + // Start an etcd watch. + // Note: etcd will send the current value through the channel. + etcdWatchCh := make(chan *etcd.Response) + etcdStopCh := make(chan bool) + go s.client.Watch(key, 0, false, etcdWatchCh, etcdStopCh) - // Start watch - go s.client.Watch(key, 0, false, watchChan, stopChan) - - for _ = range watchChan { - entry, err := s.Get(key) - if err != nil { - log.Error("Cannot refresh the key: ", key, ", cancelling watch") - s.watches[key] = nil - return err + // Adapter goroutine: The goal here is to convert wathever format etcd is + // using into our interface. + go func() { + for { + select { + case result := <-etcdWatchCh: + watchCh <- &KVPair{ + result.Node.Key, + []byte(result.Node.Value), + result.Node.ModifiedIndex, + } + case <-stopCh: + etcdStopCh <- true + return + } } - callback(entry) - } - return nil + }() + return watchCh, nil } -// CancelWatch cancels a watch, sends a signal to the appropriate -// stop channel -func (s *Etcd) CancelWatch(key string) error { - key = normalize(key) - if _, ok := s.watches[key]; !ok { - log.Error("Chan does not exist for key: ", key) - return ErrWatchDoesNotExist - } - // Send stop signal to event chan - s.watches[key] <- true - s.watches[key] = nil - return nil +// WatchTree watches changes on a "directory" +// Returns a channel that will receive changes or an error. +// Upon creating a watch, the current value will be sent to the channel. +// Providing a non-nil stopCh can be used to stop watching. +func (s *Etcd) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) { + prefix = normalize(prefix) + watchCh := make(chan []*KVPair) + + // Start an etcd watch. + // Note: etcd will send the current value through the channel. + etcdWatchCh := make(chan *etcd.Response) + etcdStopCh := make(chan bool) + go s.client.Watch(prefix, 0, true, etcdWatchCh, etcdStopCh) + + // Adapter goroutine: The goal here is to convert wathever format etcd is + // using into our interface. + go func() { + for { + select { + case result := <-etcdWatchCh: + kv := []*KVPair{} + for _, n := range result.Node.Nodes { + kv = append(kv, &KVPair{n.Key, []byte(n.Value), n.ModifiedIndex}) + } + watchCh <- kv + case <-stopCh: + etcdStopCh <- true + return + } + } + }() + return watchCh, nil } // AtomicPut put a value at "key" if the key has not been @@ -193,7 +220,7 @@ func (s *Etcd) AtomicDelete(key string, previous *KVPair) (bool, error) { return true, nil } -// List a range of values at "directory" +// List the content of a given prefix func (s *Etcd) List(prefix string) ([]*KVPair, error) { resp, err := s.client.Get(normalize(prefix), true, true) if err != nil { @@ -206,7 +233,7 @@ func (s *Etcd) List(prefix string) ([]*KVPair, error) { return kv, nil } -// DeleteTree deletes a range of values at "directory" +// DeleteTree deletes a range of keys based on prefix func (s *Etcd) DeleteTree(prefix string) error { if _, err := s.client.Delete(normalize(prefix), true); err != nil { return err @@ -214,35 +241,6 @@ func (s *Etcd) DeleteTree(prefix string) error { return nil } -// WatchTree triggers a watch on a range of values at "directory" -func (s *Etcd) WatchTree(prefix string, callback WatchCallback) error { - prefix = normalize(prefix) - watchChan := make(chan *etcd.Response) - stopChan := make(chan bool) - - // Create new Watch entry - s.watches[prefix] = stopChan - - // Start watch - go s.client.Watch(prefix, 0, true, watchChan, stopChan) - for _ = range watchChan { - kvi, err := s.List(prefix) - if err != nil { - log.Error("Cannot refresh the key: ", prefix, ", cancelling watch") - s.watches[prefix] = nil - return err - } - callback(kvi...) - } - return nil -} - -// CancelWatchRange stops the watch on the range of values, sends -// a signal to the appropriate stop channel -func (s *Etcd) CancelWatchRange(prefix string) error { - return s.CancelWatch(normalize(prefix)) -} - // CreateLock returns a handle to a lock struct which can be used // to acquire and release the mutex. func (s *Etcd) CreateLock(key string, value []byte) (Locker, error) { diff --git a/pkg/store/store.go b/pkg/store/store.go index 4a77b79ba5..442ee548f1 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -60,29 +60,29 @@ type Store interface { // Verify if a Key exists in the store Exists(key string) (bool, error) - // Watch changes on a key - Watch(key string, callback WatchCallback) error + // Watch changes on a key. + // Returns a channel that will receive changes or an error. + // Upon creating a watch, the current value will be sent to the channel. + // Providing a non-nil stopCh can be used to stop watching. + Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) - // Cancel watch key - CancelWatch(key string) error + // WatchTree watches changes on a "directory" + // Returns a channel that will receive changes or an error. + // Upon creating a watch, the current value will be sent to the channel. + // Providing a non-nil stopCh can be used to stop watching. + WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) // CreateLock for a given key. // The returned Locker is not held and must be acquired with `.Lock`. // value is optional. CreateLock(key string, value []byte) (Locker, error) - // Get range of keys based on prefix + // List the content of a given prefix List(prefix string) ([]*KVPair, error) - // Delete range of keys based on prefix + // DeleteTree deletes a range of keys based on prefix DeleteTree(prefix string) error - // Watch key namespaces - WatchTree(prefix string, callback WatchCallback) error - - // Cancel watch key range - CancelWatchRange(prefix string) error - // Atomic operation on a single value AtomicPut(key string, value []byte, previous *KVPair) (bool, error) diff --git a/pkg/store/zookeeper.go b/pkg/store/zookeeper.go index 62cf5e3387..024a657dfa 100644 --- a/pkg/store/zookeeper.go +++ b/pkg/store/zookeeper.go @@ -13,7 +13,6 @@ import ( type Zookeeper struct { timeout time.Duration client *zk.Conn - watches map[string]<-chan zk.Event } type zookeeperLock struct { @@ -24,7 +23,6 @@ type zookeeperLock struct { // given a list of endpoints and optional tls config func InitializeZookeeper(endpoints []string, options *Config) (Store, error) { s := &Zookeeper{} - s.watches = make(map[string]<-chan zk.Event) s.timeout = 5 * time.Second // default timeout if options.Timeout != 0 { @@ -102,48 +100,84 @@ func (s *Zookeeper) Exists(key string) (bool, error) { return exists, nil } -// Watch a single key for modifications -func (s *Zookeeper) Watch(key string, callback WatchCallback) error { +// Watch changes on a key. +// Returns a channel that will receive changes or an error. +// Upon creating a watch, the current value will be sent to the channel. +// Providing a non-nil stopCh can be used to stop watching. +func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) { fkey := normalize(key) - _, _, eventChan, err := s.client.GetW(fkey) + resp, meta, eventCh, err := s.client.GetW(fkey) if err != nil { - return err + return nil, err } - // Create a new Watch entry with eventChan - s.watches[fkey] = eventChan - - for e := range eventChan { - if e.Type == zk.EventNodeChildrenChanged { - entry, err := s.Get(key) - if err == nil { - callback(entry) + // Catch zk notifications and fire changes into the channel. + watchCh := make(chan *KVPair) + go func() { + // GetW returns the current value before setting the watch. + watchCh <- &KVPair{key, resp, uint64(meta.Mzxid)} + for { + select { + case e := <-eventCh: + if e.Type == zk.EventNodeChildrenChanged { + if entry, err := s.Get(key); err != nil { + watchCh <- entry + } + } + case <-stopCh: + // There is no way to stop GetW so just quit + return } } - } + }() - return nil + return watchCh, nil } -// CancelWatch cancels a watch, sends a signal to the appropriate -// stop channel -func (s *Zookeeper) CancelWatch(key string) error { - key = normalize(key) - if _, ok := s.watches[key]; !ok { - log.Error("Chan does not exist for key: ", key) - return ErrWatchDoesNotExist +// WatchTree watches changes on a "directory" +// Returns a channel that will receive changes or an error. +// Upon creating a watch, the current value will be sent to the channel. +// Providing a non-nil stopCh can be used to stop watching. +func (s *Zookeeper) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) { + fprefix := normalize(prefix) + entries, stat, eventCh, err := s.client.ChildrenW(fprefix) + if err != nil { + return nil, err } - // Just remove the entry on watches key - s.watches[key] = nil - return nil + + // Catch zk notifications and fire changes into the channel. + watchCh := make(chan []*KVPair) + go func() { + // GetW returns the current value before setting the watch. + kv := []*KVPair{} + for _, item := range entries { + kv = append(kv, &KVPair{prefix, []byte(item), uint64(stat.Mzxid)}) + } + watchCh <- kv + + for { + select { + case e := <-eventCh: + if e.Type == zk.EventNodeChildrenChanged { + if kv, err := s.List(prefix); err != nil { + watchCh <- kv + } + } + case <-stopCh: + // There is no way to stop GetW so just quit + return + } + } + }() + + return watchCh, nil } -// List a range of values at "directory" +// List the content of a given prefix func (s *Zookeeper) List(prefix string) ([]*KVPair, error) { prefix = normalize(prefix) entries, stat, err := s.client.Children(prefix) if err != nil { - log.Error("Cannot fetch range of keys beginning with prefix: ", prefix) return nil, err } kv := []*KVPair{} @@ -153,41 +187,12 @@ func (s *Zookeeper) List(prefix string) ([]*KVPair, error) { return kv, err } -// DeleteTree deletes a range of values at "directory" +// DeleteTree deletes a range of keys based on prefix func (s *Zookeeper) DeleteTree(prefix string) error { err := s.client.Delete(normalize(prefix), -1) return err } -// WatchTree triggers a watch on a range of values at "directory" -func (s *Zookeeper) WatchTree(prefix string, callback WatchCallback) error { - fprefix := normalize(prefix) - _, _, eventChan, err := s.client.ChildrenW(fprefix) - if err != nil { - return err - } - - // Create a new Watch entry with eventChan - s.watches[fprefix] = eventChan - - for e := range eventChan { - if e.Type == zk.EventNodeChildrenChanged { - kvi, err := s.List(prefix) - if err == nil { - callback(kvi...) - } - } - } - - return nil -} - -// CancelWatchRange stops the watch on the range of values, sends -// a signal to the appropriate stop channel -func (s *Zookeeper) CancelWatchRange(prefix string) error { - return s.CancelWatch(prefix) -} - // AtomicPut put a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) {