Merge pull request #800 from aluzzardi/watch-channels

store: Use channels for Watch/WatchTree
This commit is contained in:
Andrea Luzzardi 2015-05-16 16:36:02 -07:00
commit 9d914b16b1
5 changed files with 233 additions and 230 deletions

View File

@ -72,12 +72,17 @@ func (s *Discovery) Fetch() ([]*discovery.Entry, error) {
// Watch is exported
func (s *Discovery) Watch(callback discovery.WatchCallback) {
s.store.WatchTree(s.prefix, func(kv ...*store.KVPair) {
log.WithField("name", s.backend).Debug("Discovery watch triggered")
ch, err := s.store.WatchTree(s.prefix, nil)
if err != nil {
log.WithField("discovery", s.backend).Errorf("Watch failed: %v", err)
return
}
for kv := range ch {
log.WithField("discovery", s.backend).Debugf("Watch triggered with %d nodes", len(kv))
// Traduce byte array entries to discovery.Entry
entries, _ := discovery.CreateEntries(convertToStringArray(kv))
callback(entries)
})
}
}
// Register is exported

View File

@ -12,26 +12,18 @@ import (
// Consul embeds the client and watches
type Consul struct {
config *api.Config
client *api.Client
watches map[string]*Watch
config *api.Config
client *api.Client
}
type consulLock struct {
lock *api.Lock
}
// Watch embeds the event channel and the
// refresh interval
type Watch struct {
LastIndex uint64
}
// InitializeConsul creates a new Consul client given
// a list of endpoints and optional tls config
func InitializeConsul(endpoints []string, options *Config) (Store, error) {
s := &Consul{}
s.watches = make(map[string]*Watch)
// Create Consul client
config := api.DefaultConfig()
@ -116,7 +108,7 @@ func (s *Consul) Exists(key string) (bool, error) {
return true, nil
}
// List values at "directory"
// List the content of a given prefix
func (s *Consul) List(prefix string) ([]*KVPair, error) {
pairs, _, err := s.client.KV().List(s.normalize(prefix), nil)
if err != nil {
@ -135,109 +127,87 @@ func (s *Consul) List(prefix string) ([]*KVPair, error) {
return kv, nil
}
// DeleteTree deletes a range of values at "directory"
// DeleteTree deletes a range of keys based on prefix
func (s *Consul) DeleteTree(prefix string) error {
_, err := s.client.KV().DeleteTree(s.normalize(prefix), nil)
return err
}
// Watch a single key for modifications
func (s *Consul) Watch(key string, callback WatchCallback) error {
fkey := s.normalize(key)
// We get the last index first
_, meta, err := s.client.KV().Get(fkey, nil)
if err != nil {
return err
}
// Add watch to map
s.watches[fkey] = &Watch{LastIndex: meta.LastIndex}
eventChan := s.waitForChange(fkey)
for _ = range eventChan {
entry, err := s.Get(key)
if err != nil {
log.Error("Cannot refresh the key: ", fkey, ", cancelling watch")
s.watches[fkey] = nil
return err
}
callback(entry)
}
return nil
}
// CancelWatch cancels a watch, sends a signal to the appropriate
// stop channel
func (s *Consul) CancelWatch(key string) error {
// Watch changes on a key.
// Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) {
key = s.normalize(key)
if _, ok := s.watches[key]; !ok {
log.Error("Chan does not exist for key: ", key)
return ErrWatchDoesNotExist
}
s.watches[key] = nil
return nil
}
// Internal function to check if a key has changed
func (s *Consul) waitForChange(key string) <-chan uint64 {
ch := make(chan uint64)
kv := s.client.KV()
watchCh := make(chan *KVPair)
go func() {
defer close(watchCh)
opts := &api.QueryOptions{}
for {
watch, ok := s.watches[key]
if !ok {
log.Error("Cannot access last index for key: ", key, " closing channel")
break
// Check if we should quit
select {
case <-stopCh:
return
default:
}
option := &api.QueryOptions{
WaitIndex: watch.LastIndex,
}
_, meta, err := kv.List(key, option)
pair, meta, err := kv.Get(key, opts)
if err != nil {
log.WithField("name", "consul").Error(err)
break
log.Errorf("consul: %v", err)
return
}
opts.WaitIndex = meta.LastIndex
// FIXME: What happens when a key is deleted?
if pair != nil {
watchCh <- &KVPair{pair.Key, pair.Value, pair.ModifyIndex}
}
watch.LastIndex = meta.LastIndex
ch <- watch.LastIndex
}
close(ch)
}()
return ch
return watchCh, nil
}
// WatchTree triggers a watch on a range of values at "directory"
func (s *Consul) WatchTree(prefix string, callback WatchCallback) error {
fprefix := s.normalize(prefix)
// WatchTree watches changes on a "directory"
// Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
func (s *Consul) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) {
prefix = s.normalize(prefix)
kv := s.client.KV()
watchCh := make(chan []*KVPair)
// We get the last index first
_, meta, err := s.client.KV().Get(prefix, nil)
if err != nil {
return err
}
go func() {
defer close(watchCh)
// Add watch to map
s.watches[fprefix] = &Watch{LastIndex: meta.LastIndex}
eventChan := s.waitForChange(fprefix)
opts := &api.QueryOptions{}
for {
// Check if we should quit
select {
case <-stopCh:
return
default:
}
for _ = range eventChan {
kvi, err := s.List(prefix)
if err != nil {
log.Error("Cannot refresh keys with prefix: ", fprefix, ", cancelling watch")
s.watches[fprefix] = nil
return err
pairs, meta, err := kv.List(prefix, opts)
if err != nil {
log.Errorf("consul: %v", err)
return
}
kv := []*KVPair{}
for _, pair := range pairs {
if pair.Key == prefix {
continue
}
kv = append(kv, &KVPair{pair.Key, pair.Value, pair.ModifyIndex})
}
opts.WaitIndex = meta.LastIndex
watchCh <- kv
}
callback(kvi...)
}
}()
return nil
}
// CancelWatchRange stops the watch on the range of values, sends
// a signal to the appropriate stop channel
func (s *Consul) CancelWatchRange(prefix string) error {
return s.CancelWatch(prefix)
return watchCh, nil
}
// CreateLock returns a handle to a lock struct which can be used

View File

@ -7,21 +7,18 @@ import (
"strings"
"time"
log "github.com/Sirupsen/logrus"
etcd "github.com/coreos/go-etcd/etcd"
)
// Etcd embeds the client
type Etcd struct {
client *etcd.Client
watches map[string]chan<- bool
client *etcd.Client
}
// InitializeEtcd creates a new Etcd client given
// a list of endpoints and optional tls config
func InitializeEtcd(addrs []string, options *Config) (Store, error) {
s := &Etcd{}
s.watches = make(map[string]chan<- bool)
entries := createEndpoints(addrs, "http")
s.client = etcd.NewClient(entries)
@ -135,42 +132,93 @@ func (s *Etcd) Exists(key string) (bool, error) {
return true, nil
}
// Watch a single key for modifications
func (s *Etcd) Watch(key string, callback WatchCallback) error {
// Watch changes on a key.
// Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) {
key = normalize(key)
watchChan := make(chan *etcd.Response)
stopChan := make(chan bool)
// Create new Watch entry
s.watches[key] = stopChan
// Start watch
go s.client.Watch(key, 0, false, watchChan, stopChan)
for _ = range watchChan {
entry, err := s.Get(key)
if err != nil {
log.Error("Cannot refresh the key: ", key, ", cancelling watch")
s.watches[key] = nil
return err
}
callback(entry)
// Get the current value
current, err := s.Get(key)
if err != nil {
return nil, err
}
return nil
// Start an etcd watch.
// Note: etcd will send the current value through the channel.
etcdWatchCh := make(chan *etcd.Response)
etcdStopCh := make(chan bool)
go s.client.Watch(key, 0, false, etcdWatchCh, etcdStopCh)
// Adapter goroutine: The goal here is to convert wathever format etcd is
// using into our interface.
watchCh := make(chan *KVPair)
go func() {
defer close(watchCh)
// Push the current value through the channel.
watchCh <- current
for {
select {
case result := <-etcdWatchCh:
watchCh <- &KVPair{
result.Node.Key,
[]byte(result.Node.Value),
result.Node.ModifiedIndex,
}
case <-stopCh:
etcdStopCh <- true
return
}
}
}()
return watchCh, nil
}
// CancelWatch cancels a watch, sends a signal to the appropriate
// stop channel
func (s *Etcd) CancelWatch(key string) error {
key = normalize(key)
if _, ok := s.watches[key]; !ok {
log.Error("Chan does not exist for key: ", key)
return ErrWatchDoesNotExist
// WatchTree watches changes on a "directory"
// Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
func (s *Etcd) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) {
prefix = normalize(prefix)
// Get the current value
current, err := s.List(prefix)
if err != nil {
return nil, err
}
// Send stop signal to event chan
s.watches[key] <- true
s.watches[key] = nil
return nil
// Start an etcd watch.
etcdWatchCh := make(chan *etcd.Response)
etcdStopCh := make(chan bool)
go s.client.Watch(prefix, 0, true, etcdWatchCh, etcdStopCh)
// Adapter goroutine: The goal here is to convert wathever format etcd is
// using into our interface.
watchCh := make(chan []*KVPair)
go func() {
defer close(watchCh)
// Push the current value through the channel.
watchCh <- current
for {
select {
case <-etcdWatchCh:
// FIXME: We should probably use the value pushed by the channel.
// However, .Node.Nodes seems to be empty.
if list, err := s.List(prefix); err == nil {
watchCh <- list
}
case <-stopCh:
etcdStopCh <- true
return
}
}
}()
return watchCh, nil
}
// AtomicPut put a value at "key" if the key has not been
@ -193,7 +241,7 @@ func (s *Etcd) AtomicDelete(key string, previous *KVPair) (bool, error) {
return true, nil
}
// List a range of values at "directory"
// List the content of a given prefix
func (s *Etcd) List(prefix string) ([]*KVPair, error) {
resp, err := s.client.Get(normalize(prefix), true, true)
if err != nil {
@ -206,7 +254,7 @@ func (s *Etcd) List(prefix string) ([]*KVPair, error) {
return kv, nil
}
// DeleteTree deletes a range of values at "directory"
// DeleteTree deletes a range of keys based on prefix
func (s *Etcd) DeleteTree(prefix string) error {
if _, err := s.client.Delete(normalize(prefix), true); err != nil {
return err
@ -214,35 +262,6 @@ func (s *Etcd) DeleteTree(prefix string) error {
return nil
}
// WatchTree triggers a watch on a range of values at "directory"
func (s *Etcd) WatchTree(prefix string, callback WatchCallback) error {
prefix = normalize(prefix)
watchChan := make(chan *etcd.Response)
stopChan := make(chan bool)
// Create new Watch entry
s.watches[prefix] = stopChan
// Start watch
go s.client.Watch(prefix, 0, true, watchChan, stopChan)
for _ = range watchChan {
kvi, err := s.List(prefix)
if err != nil {
log.Error("Cannot refresh the key: ", prefix, ", cancelling watch")
s.watches[prefix] = nil
return err
}
callback(kvi...)
}
return nil
}
// CancelWatchRange stops the watch on the range of values, sends
// a signal to the appropriate stop channel
func (s *Etcd) CancelWatchRange(prefix string) error {
return s.CancelWatch(normalize(prefix))
}
// CreateLock returns a handle to a lock struct which can be used
// to acquire and release the mutex.
func (s *Etcd) CreateLock(key string, value []byte) (Locker, error) {

View File

@ -60,29 +60,29 @@ type Store interface {
// Verify if a Key exists in the store
Exists(key string) (bool, error)
// Watch changes on a key
Watch(key string, callback WatchCallback) error
// Watch changes on a key.
// Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)
// Cancel watch key
CancelWatch(key string) error
// WatchTree watches changes on a "directory"
// Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error)
// CreateLock for a given key.
// The returned Locker is not held and must be acquired with `.Lock`.
// value is optional.
CreateLock(key string, value []byte) (Locker, error)
// Get range of keys based on prefix
// List the content of a given prefix
List(prefix string) ([]*KVPair, error)
// Delete range of keys based on prefix
// DeleteTree deletes a range of keys based on prefix
DeleteTree(prefix string) error
// Watch key namespaces
WatchTree(prefix string, callback WatchCallback) error
// Cancel watch key range
CancelWatchRange(prefix string) error
// Atomic operation on a single value
AtomicPut(key string, value []byte, previous *KVPair) (bool, error)

View File

@ -13,7 +13,6 @@ import (
type Zookeeper struct {
timeout time.Duration
client *zk.Conn
watches map[string]<-chan zk.Event
}
type zookeeperLock struct {
@ -24,7 +23,6 @@ type zookeeperLock struct {
// given a list of endpoints and optional tls config
func InitializeZookeeper(endpoints []string, options *Config) (Store, error) {
s := &Zookeeper{}
s.watches = make(map[string]<-chan zk.Event)
s.timeout = 5 * time.Second // default timeout
if options.Timeout != 0 {
@ -102,48 +100,88 @@ func (s *Zookeeper) Exists(key string) (bool, error) {
return exists, nil
}
// Watch a single key for modifications
func (s *Zookeeper) Watch(key string, callback WatchCallback) error {
// Watch changes on a key.
// Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) {
fkey := normalize(key)
_, _, eventChan, err := s.client.GetW(fkey)
resp, meta, eventCh, err := s.client.GetW(fkey)
if err != nil {
return err
return nil, err
}
// Create a new Watch entry with eventChan
s.watches[fkey] = eventChan
// Catch zk notifications and fire changes into the channel.
watchCh := make(chan *KVPair)
go func() {
defer close(watchCh)
for e := range eventChan {
if e.Type == zk.EventNodeChildrenChanged {
entry, err := s.Get(key)
if err == nil {
callback(entry)
// GetW returns the current value before setting the watch.
watchCh <- &KVPair{key, resp, uint64(meta.Mzxid)}
for {
select {
case e := <-eventCh:
if e.Type == zk.EventNodeDataChanged {
if entry, err := s.Get(key); err == nil {
watchCh <- entry
}
}
case <-stopCh:
// There is no way to stop GetW so just quit
return
}
}
}
}()
return nil
return watchCh, nil
}
// CancelWatch cancels a watch, sends a signal to the appropriate
// stop channel
func (s *Zookeeper) CancelWatch(key string) error {
key = normalize(key)
if _, ok := s.watches[key]; !ok {
log.Error("Chan does not exist for key: ", key)
return ErrWatchDoesNotExist
// WatchTree watches changes on a "directory"
// Returns a channel that will receive changes or an error.
// Upon creating a watch, the current value will be sent to the channel.
// Providing a non-nil stopCh can be used to stop watching.
func (s *Zookeeper) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) {
fprefix := normalize(prefix)
entries, stat, eventCh, err := s.client.ChildrenW(fprefix)
if err != nil {
return nil, err
}
// Just remove the entry on watches key
s.watches[key] = nil
return nil
// Catch zk notifications and fire changes into the channel.
watchCh := make(chan []*KVPair)
go func() {
defer close(watchCh)
// GetW returns the current value before setting the watch.
kv := []*KVPair{}
for _, item := range entries {
kv = append(kv, &KVPair{prefix, []byte(item), uint64(stat.Mzxid)})
}
watchCh <- kv
for {
select {
case e := <-eventCh:
if e.Type == zk.EventNodeChildrenChanged {
if kv, err := s.List(prefix); err == nil {
watchCh <- kv
}
}
case <-stopCh:
// There is no way to stop GetW so just quit
return
}
}
}()
return watchCh, nil
}
// List a range of values at "directory"
// List the content of a given prefix
func (s *Zookeeper) List(prefix string) ([]*KVPair, error) {
prefix = normalize(prefix)
entries, stat, err := s.client.Children(prefix)
if err != nil {
log.Error("Cannot fetch range of keys beginning with prefix: ", prefix)
return nil, err
}
kv := []*KVPair{}
@ -153,41 +191,12 @@ func (s *Zookeeper) List(prefix string) ([]*KVPair, error) {
return kv, err
}
// DeleteTree deletes a range of values at "directory"
// DeleteTree deletes a range of keys based on prefix
func (s *Zookeeper) DeleteTree(prefix string) error {
err := s.client.Delete(normalize(prefix), -1)
return err
}
// WatchTree triggers a watch on a range of values at "directory"
func (s *Zookeeper) WatchTree(prefix string, callback WatchCallback) error {
fprefix := normalize(prefix)
_, _, eventChan, err := s.client.ChildrenW(fprefix)
if err != nil {
return err
}
// Create a new Watch entry with eventChan
s.watches[fprefix] = eventChan
for e := range eventChan {
if e.Type == zk.EventNodeChildrenChanged {
kvi, err := s.List(prefix)
if err == nil {
callback(kvi...)
}
}
}
return nil
}
// CancelWatchRange stops the watch on the range of values, sends
// a signal to the appropriate stop channel
func (s *Zookeeper) CancelWatchRange(prefix string) error {
return s.CancelWatch(prefix)
}
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) {