store: Watch: Use channels instead of callbacks.

This gets rid of `CancelWatch*` functions and its usage is much simpler.

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2015-05-15 00:20:19 -07:00
parent a30c2ae680
commit 7d7e4aee13
5 changed files with 211 additions and 231 deletions

View File

@ -72,12 +72,17 @@ func (s *Discovery) Fetch() ([]*discovery.Entry, error) {
// Watch is exported // Watch is exported
func (s *Discovery) Watch(callback discovery.WatchCallback) { func (s *Discovery) Watch(callback discovery.WatchCallback) {
s.store.WatchTree(s.prefix, func(kv ...*store.KVPair) { ch, err := s.store.WatchTree(s.prefix, nil)
log.WithField("name", s.backend).Debug("Discovery watch triggered") if err != nil {
log.WithField("discovery", s.backend).Errorf("Watch failed: %v", err)
return
}
for kv := range ch {
log.WithField("discovery", s.backend).Debug("Watch triggered")
// Traduce byte array entries to discovery.Entry // Traduce byte array entries to discovery.Entry
entries, _ := discovery.CreateEntries(convertToStringArray(kv)) entries, _ := discovery.CreateEntries(convertToStringArray(kv))
callback(entries) callback(entries)
}) }
} }
// Register is exported // Register is exported

View File

@ -14,24 +14,16 @@ import (
type Consul struct { type Consul struct {
config *api.Config config *api.Config
client *api.Client client *api.Client
watches map[string]*Watch
} }
type consulLock struct { type consulLock struct {
lock *api.Lock lock *api.Lock
} }
// Watch embeds the event channel and the
// refresh interval
type Watch struct {
LastIndex uint64
}
// InitializeConsul creates a new Consul client given // InitializeConsul creates a new Consul client given
// a list of endpoints and optional tls config // a list of endpoints and optional tls config
func InitializeConsul(endpoints []string, options *Config) (Store, error) { func InitializeConsul(endpoints []string, options *Config) (Store, error) {
s := &Consul{} s := &Consul{}
s.watches = make(map[string]*Watch)
// Create Consul client // Create Consul client
config := api.DefaultConfig() config := api.DefaultConfig()
@ -116,7 +108,7 @@ func (s *Consul) Exists(key string) (bool, error) {
return true, nil return true, nil
} }
// List values at "directory" // List the content of a given prefix
func (s *Consul) List(prefix string) ([]*KVPair, error) { func (s *Consul) List(prefix string) ([]*KVPair, error) {
pairs, _, err := s.client.KV().List(s.normalize(prefix), nil) pairs, _, err := s.client.KV().List(s.normalize(prefix), nil)
if err != nil { if err != nil {
@ -135,109 +127,89 @@ func (s *Consul) List(prefix string) ([]*KVPair, error) {
return kv, nil return kv, nil
} }
// DeleteTree deletes a range of values at "directory" // DeleteTree deletes a range of keys based on prefix
func (s *Consul) DeleteTree(prefix string) error { func (s *Consul) DeleteTree(prefix string) error {
_, err := s.client.KV().DeleteTree(s.normalize(prefix), nil) _, err := s.client.KV().DeleteTree(s.normalize(prefix), nil)
return err return err
} }
// Watch a single key for modifications // Watch changes on a key.
func (s *Consul) Watch(key string, callback WatchCallback) error { // Returns a channel that will receive changes or an error.
fkey := s.normalize(key) // Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
// We get the last index first func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) {
_, meta, err := s.client.KV().Get(fkey, nil)
if err != nil {
return err
}
// Add watch to map
s.watches[fkey] = &Watch{LastIndex: meta.LastIndex}
eventChan := s.waitForChange(fkey)
for _ = range eventChan {
entry, err := s.Get(key)
if err != nil {
log.Error("Cannot refresh the key: ", fkey, ", cancelling watch")
s.watches[fkey] = nil
return err
}
callback(entry)
}
return nil
}
// CancelWatch cancels a watch, sends a signal to the appropriate
// stop channel
func (s *Consul) CancelWatch(key string) error {
key = s.normalize(key) key = s.normalize(key)
if _, ok := s.watches[key]; !ok {
log.Error("Chan does not exist for key: ", key)
return ErrWatchDoesNotExist
}
s.watches[key] = nil
return nil
}
// Internal function to check if a key has changed
func (s *Consul) waitForChange(key string) <-chan uint64 {
ch := make(chan uint64)
kv := s.client.KV() kv := s.client.KV()
watchCh := make(chan *KVPair)
go func() { go func() {
opts := &api.QueryOptions{}
for { for {
watch, ok := s.watches[key] // Check if we should quit
if !ok { select {
log.Error("Cannot access last index for key: ", key, " closing channel") case <-stopCh:
break return
default:
} }
option := &api.QueryOptions{
WaitIndex: watch.LastIndex, pair, meta, err := kv.Get(key, opts)
}
_, meta, err := kv.List(key, option)
if err != nil { if err != nil {
log.WithField("name", "consul").Error(err) log.WithField("name", "consul").Error(err)
break return
} }
watch.LastIndex = meta.LastIndex if pair == nil {
ch <- watch.LastIndex log.WithField("name", "consul").Errorf("Key %s does not exist", key)
return
}
opts.WaitIndex = meta.LastIndex
watchCh <- &KVPair{pair.Key, pair.Value, pair.ModifyIndex}
} }
close(ch)
}() }()
return ch
return watchCh, nil
} }
// WatchTree triggers a watch on a range of values at "directory" // WatchTree watches changes on a "directory"
func (s *Consul) WatchTree(prefix string, callback WatchCallback) error { // Returns a channel that will receive changes or an error.
fprefix := s.normalize(prefix) // Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
func (s *Consul) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) {
prefix = s.normalize(prefix)
kv := s.client.KV()
watchCh := make(chan []*KVPair)
// We get the last index first go func() {
_, meta, err := s.client.KV().Get(prefix, nil) opts := &api.QueryOptions{}
for {
// Check if we should quit
select {
case <-stopCh:
return
default:
}
pairs, meta, err := kv.List(prefix, opts)
if err != nil { if err != nil {
return err log.WithField("name", "consul").Error(err)
return
} }
if len(pairs) == 0 {
// Add watch to map log.WithField("name", "consul").Errorf("Key %s does not exist", prefix)
s.watches[fprefix] = &Watch{LastIndex: meta.LastIndex} return
eventChan := s.waitForChange(fprefix)
for _ = range eventChan {
kvi, err := s.List(prefix)
if err != nil {
log.Error("Cannot refresh keys with prefix: ", fprefix, ", cancelling watch")
s.watches[fprefix] = nil
return err
} }
callback(kvi...) kv := []*KVPair{}
for _, pair := range pairs {
if pair.Key == prefix {
continue
} }
kv = append(kv, &KVPair{pair.Key, pair.Value, pair.ModifyIndex})
}
opts.WaitIndex = meta.LastIndex
watchCh <- kv
}
}()
return nil return watchCh, nil
}
// CancelWatchRange stops the watch on the range of values, sends
// a signal to the appropriate stop channel
func (s *Consul) CancelWatchRange(prefix string) error {
return s.CancelWatch(prefix)
} }
// CreateLock returns a handle to a lock struct which can be used // CreateLock returns a handle to a lock struct which can be used

View File

@ -7,21 +7,18 @@ import (
"strings" "strings"
"time" "time"
log "github.com/Sirupsen/logrus"
etcd "github.com/coreos/go-etcd/etcd" etcd "github.com/coreos/go-etcd/etcd"
) )
// Etcd embeds the client // Etcd embeds the client
type Etcd struct { type Etcd struct {
client *etcd.Client client *etcd.Client
watches map[string]chan<- bool
} }
// InitializeEtcd creates a new Etcd client given // InitializeEtcd creates a new Etcd client given
// a list of endpoints and optional tls config // a list of endpoints and optional tls config
func InitializeEtcd(addrs []string, options *Config) (Store, error) { func InitializeEtcd(addrs []string, options *Config) (Store, error) {
s := &Etcd{} s := &Etcd{}
s.watches = make(map[string]chan<- bool)
entries := createEndpoints(addrs, "http") entries := createEndpoints(addrs, "http")
s.client = etcd.NewClient(entries) s.client = etcd.NewClient(entries)
@ -135,42 +132,72 @@ func (s *Etcd) Exists(key string) (bool, error) {
return true, nil return true, nil
} }
// Watch a single key for modifications // Watch changes on a key.
func (s *Etcd) Watch(key string, callback WatchCallback) error { // Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) {
key = normalize(key) key = normalize(key)
watchChan := make(chan *etcd.Response) watchCh := make(chan *KVPair)
stopChan := make(chan bool)
// Create new Watch entry // Start an etcd watch.
s.watches[key] = stopChan // Note: etcd will send the current value through the channel.
etcdWatchCh := make(chan *etcd.Response)
etcdStopCh := make(chan bool)
go s.client.Watch(key, 0, false, etcdWatchCh, etcdStopCh)
// Start watch // Adapter goroutine: The goal here is to convert wathever format etcd is
go s.client.Watch(key, 0, false, watchChan, stopChan) // using into our interface.
go func() {
for _ = range watchChan { for {
entry, err := s.Get(key) select {
if err != nil { case result := <-etcdWatchCh:
log.Error("Cannot refresh the key: ", key, ", cancelling watch") watchCh <- &KVPair{
s.watches[key] = nil result.Node.Key,
return err []byte(result.Node.Value),
result.Node.ModifiedIndex,
} }
callback(entry) case <-stopCh:
etcdStopCh <- true
return
} }
return nil }
}()
return watchCh, nil
} }
// CancelWatch cancels a watch, sends a signal to the appropriate // WatchTree watches changes on a "directory"
// stop channel // Returns a channel that will receive changes or an error.
func (s *Etcd) CancelWatch(key string) error { // Upon creating a watch, the current value will be sent to the channel.
key = normalize(key) // Providing a non-nil stopCh can be used to stop watching.
if _, ok := s.watches[key]; !ok { func (s *Etcd) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) {
log.Error("Chan does not exist for key: ", key) prefix = normalize(prefix)
return ErrWatchDoesNotExist watchCh := make(chan []*KVPair)
// Start an etcd watch.
// Note: etcd will send the current value through the channel.
etcdWatchCh := make(chan *etcd.Response)
etcdStopCh := make(chan bool)
go s.client.Watch(prefix, 0, true, etcdWatchCh, etcdStopCh)
// Adapter goroutine: The goal here is to convert wathever format etcd is
// using into our interface.
go func() {
for {
select {
case result := <-etcdWatchCh:
kv := []*KVPair{}
for _, n := range result.Node.Nodes {
kv = append(kv, &KVPair{n.Key, []byte(n.Value), n.ModifiedIndex})
} }
// Send stop signal to event chan watchCh <- kv
s.watches[key] <- true case <-stopCh:
s.watches[key] = nil etcdStopCh <- true
return nil return
}
}
}()
return watchCh, nil
} }
// AtomicPut put a value at "key" if the key has not been // AtomicPut put a value at "key" if the key has not been
@ -193,7 +220,7 @@ func (s *Etcd) AtomicDelete(key string, previous *KVPair) (bool, error) {
return true, nil return true, nil
} }
// List a range of values at "directory" // List the content of a given prefix
func (s *Etcd) List(prefix string) ([]*KVPair, error) { func (s *Etcd) List(prefix string) ([]*KVPair, error) {
resp, err := s.client.Get(normalize(prefix), true, true) resp, err := s.client.Get(normalize(prefix), true, true)
if err != nil { if err != nil {
@ -206,7 +233,7 @@ func (s *Etcd) List(prefix string) ([]*KVPair, error) {
return kv, nil return kv, nil
} }
// DeleteTree deletes a range of values at "directory" // DeleteTree deletes a range of keys based on prefix
func (s *Etcd) DeleteTree(prefix string) error { func (s *Etcd) DeleteTree(prefix string) error {
if _, err := s.client.Delete(normalize(prefix), true); err != nil { if _, err := s.client.Delete(normalize(prefix), true); err != nil {
return err return err
@ -214,35 +241,6 @@ func (s *Etcd) DeleteTree(prefix string) error {
return nil return nil
} }
// WatchTree triggers a watch on a range of values at "directory"
func (s *Etcd) WatchTree(prefix string, callback WatchCallback) error {
prefix = normalize(prefix)
watchChan := make(chan *etcd.Response)
stopChan := make(chan bool)
// Create new Watch entry
s.watches[prefix] = stopChan
// Start watch
go s.client.Watch(prefix, 0, true, watchChan, stopChan)
for _ = range watchChan {
kvi, err := s.List(prefix)
if err != nil {
log.Error("Cannot refresh the key: ", prefix, ", cancelling watch")
s.watches[prefix] = nil
return err
}
callback(kvi...)
}
return nil
}
// CancelWatchRange stops the watch on the range of values, sends
// a signal to the appropriate stop channel
func (s *Etcd) CancelWatchRange(prefix string) error {
return s.CancelWatch(normalize(prefix))
}
// CreateLock returns a handle to a lock struct which can be used // CreateLock returns a handle to a lock struct which can be used
// to acquire and release the mutex. // to acquire and release the mutex.
func (s *Etcd) CreateLock(key string, value []byte) (Locker, error) { func (s *Etcd) CreateLock(key string, value []byte) (Locker, error) {

View File

@ -60,29 +60,29 @@ type Store interface {
// Verify if a Key exists in the store // Verify if a Key exists in the store
Exists(key string) (bool, error) Exists(key string) (bool, error)
// Watch changes on a key // Watch changes on a key.
Watch(key string, callback WatchCallback) error // Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)
// Cancel watch key // WatchTree watches changes on a "directory"
CancelWatch(key string) error // Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error)
// CreateLock for a given key. // CreateLock for a given key.
// The returned Locker is not held and must be acquired with `.Lock`. // The returned Locker is not held and must be acquired with `.Lock`.
// value is optional. // value is optional.
CreateLock(key string, value []byte) (Locker, error) CreateLock(key string, value []byte) (Locker, error)
// Get range of keys based on prefix // List the content of a given prefix
List(prefix string) ([]*KVPair, error) List(prefix string) ([]*KVPair, error)
// Delete range of keys based on prefix // DeleteTree deletes a range of keys based on prefix
DeleteTree(prefix string) error DeleteTree(prefix string) error
// Watch key namespaces
WatchTree(prefix string, callback WatchCallback) error
// Cancel watch key range
CancelWatchRange(prefix string) error
// Atomic operation on a single value // Atomic operation on a single value
AtomicPut(key string, value []byte, previous *KVPair) (bool, error) AtomicPut(key string, value []byte, previous *KVPair) (bool, error)

View File

@ -13,7 +13,6 @@ import (
type Zookeeper struct { type Zookeeper struct {
timeout time.Duration timeout time.Duration
client *zk.Conn client *zk.Conn
watches map[string]<-chan zk.Event
} }
type zookeeperLock struct { type zookeeperLock struct {
@ -24,7 +23,6 @@ type zookeeperLock struct {
// given a list of endpoints and optional tls config // given a list of endpoints and optional tls config
func InitializeZookeeper(endpoints []string, options *Config) (Store, error) { func InitializeZookeeper(endpoints []string, options *Config) (Store, error) {
s := &Zookeeper{} s := &Zookeeper{}
s.watches = make(map[string]<-chan zk.Event)
s.timeout = 5 * time.Second // default timeout s.timeout = 5 * time.Second // default timeout
if options.Timeout != 0 { if options.Timeout != 0 {
@ -102,48 +100,84 @@ func (s *Zookeeper) Exists(key string) (bool, error) {
return exists, nil return exists, nil
} }
// Watch a single key for modifications // Watch changes on a key.
func (s *Zookeeper) Watch(key string, callback WatchCallback) error { // Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) {
fkey := normalize(key) fkey := normalize(key)
_, _, eventChan, err := s.client.GetW(fkey) resp, meta, eventCh, err := s.client.GetW(fkey)
if err != nil { if err != nil {
return err return nil, err
} }
// Create a new Watch entry with eventChan // Catch zk notifications and fire changes into the channel.
s.watches[fkey] = eventChan watchCh := make(chan *KVPair)
go func() {
for e := range eventChan { // GetW returns the current value before setting the watch.
watchCh <- &KVPair{key, resp, uint64(meta.Mzxid)}
for {
select {
case e := <-eventCh:
if e.Type == zk.EventNodeChildrenChanged { if e.Type == zk.EventNodeChildrenChanged {
entry, err := s.Get(key) if entry, err := s.Get(key); err != nil {
if err == nil { watchCh <- entry
callback(entry)
} }
} }
case <-stopCh:
// There is no way to stop GetW so just quit
return
} }
}
}()
return nil return watchCh, nil
} }
// CancelWatch cancels a watch, sends a signal to the appropriate // WatchTree watches changes on a "directory"
// stop channel // Returns a channel that will receive changes or an error.
func (s *Zookeeper) CancelWatch(key string) error { // Upon creating a watch, the current value will be sent to the channel.
key = normalize(key) // Providing a non-nil stopCh can be used to stop watching.
if _, ok := s.watches[key]; !ok { func (s *Zookeeper) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) {
log.Error("Chan does not exist for key: ", key) fprefix := normalize(prefix)
return ErrWatchDoesNotExist entries, stat, eventCh, err := s.client.ChildrenW(fprefix)
if err != nil {
return nil, err
} }
// Just remove the entry on watches key
s.watches[key] = nil // Catch zk notifications and fire changes into the channel.
return nil watchCh := make(chan []*KVPair)
go func() {
// GetW returns the current value before setting the watch.
kv := []*KVPair{}
for _, item := range entries {
kv = append(kv, &KVPair{prefix, []byte(item), uint64(stat.Mzxid)})
}
watchCh <- kv
for {
select {
case e := <-eventCh:
if e.Type == zk.EventNodeChildrenChanged {
if kv, err := s.List(prefix); err != nil {
watchCh <- kv
}
}
case <-stopCh:
// There is no way to stop GetW so just quit
return
}
}
}()
return watchCh, nil
} }
// List a range of values at "directory" // List the content of a given prefix
func (s *Zookeeper) List(prefix string) ([]*KVPair, error) { func (s *Zookeeper) List(prefix string) ([]*KVPair, error) {
prefix = normalize(prefix) prefix = normalize(prefix)
entries, stat, err := s.client.Children(prefix) entries, stat, err := s.client.Children(prefix)
if err != nil { if err != nil {
log.Error("Cannot fetch range of keys beginning with prefix: ", prefix)
return nil, err return nil, err
} }
kv := []*KVPair{} kv := []*KVPair{}
@ -153,41 +187,12 @@ func (s *Zookeeper) List(prefix string) ([]*KVPair, error) {
return kv, err return kv, err
} }
// DeleteTree deletes a range of values at "directory" // DeleteTree deletes a range of keys based on prefix
func (s *Zookeeper) DeleteTree(prefix string) error { func (s *Zookeeper) DeleteTree(prefix string) error {
err := s.client.Delete(normalize(prefix), -1) err := s.client.Delete(normalize(prefix), -1)
return err return err
} }
// WatchTree triggers a watch on a range of values at "directory"
func (s *Zookeeper) WatchTree(prefix string, callback WatchCallback) error {
fprefix := normalize(prefix)
_, _, eventChan, err := s.client.ChildrenW(fprefix)
if err != nil {
return err
}
// Create a new Watch entry with eventChan
s.watches[fprefix] = eventChan
for e := range eventChan {
if e.Type == zk.EventNodeChildrenChanged {
kvi, err := s.List(prefix)
if err == nil {
callback(kvi...)
}
}
}
return nil
}
// CancelWatchRange stops the watch on the range of values, sends
// a signal to the appropriate stop channel
func (s *Zookeeper) CancelWatchRange(prefix string) error {
return s.CancelWatch(prefix)
}
// AtomicPut put a value at "key" if the key has not been // AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case // modified in the meantime, throws an error if this is the case
func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) { func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) {