mirror of https://github.com/docker/docs.git
GetRange and WatchRange to return {key,value,index} tuple
Signed-off-by: Madhu Venugopal <madhu@docker.com>
This commit is contained in:
parent
7dd0cde819
commit
0130c2262d
|
@ -71,7 +71,7 @@ func (s *Discovery) Fetch() ([]*discovery.Entry, error) {
|
||||||
|
|
||||||
// Watch is exported
|
// Watch is exported
|
||||||
func (s *Discovery) Watch(callback discovery.WatchCallback) {
|
func (s *Discovery) Watch(callback discovery.WatchCallback) {
|
||||||
s.store.WatchRange(s.prefix, "", s.heartbeat, func(kvalues [][]byte) {
|
s.store.WatchRange(s.prefix, "", s.heartbeat, func(kvalues []store.KVEntry) {
|
||||||
// Traduce byte array entries to discovery.Entry
|
// Traduce byte array entries to discovery.Entry
|
||||||
entries, _ := discovery.CreateEntries(convertToStringArray(kvalues))
|
entries, _ := discovery.CreateEntries(convertToStringArray(kvalues))
|
||||||
callback(entries)
|
callback(entries)
|
||||||
|
@ -84,9 +84,9 @@ func (s *Discovery) Register(addr string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func convertToStringArray(entries [][]byte) (addrs []string) {
|
func convertToStringArray(entries []store.KVEntry) (addrs []string) {
|
||||||
for _, entry := range entries {
|
for _, entry := range entries {
|
||||||
addrs = append(addrs, string(entry))
|
addrs = append(addrs, string(entry.Value()))
|
||||||
}
|
}
|
||||||
return addrs
|
return addrs
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,7 +115,7 @@ func (s *Consul) Exists(key string) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRange gets a range of values at "directory"
|
// GetRange gets a range of values at "directory"
|
||||||
func (s *Consul) GetRange(prefix string) (values [][]byte, err error) {
|
func (s *Consul) GetRange(prefix string) (kvi []KVEntry, err error) {
|
||||||
pairs, _, err := s.client.KV().List(partialFormat(prefix), nil)
|
pairs, _, err := s.client.KV().List(partialFormat(prefix), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -127,9 +127,9 @@ func (s *Consul) GetRange(prefix string) (values [][]byte, err error) {
|
||||||
if pair.Key == prefix {
|
if pair.Key == prefix {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
values = append(values, pair.Value)
|
kvi = append(kvi, &kviTuple{pair.Key, pair.Value, pair.ModifyIndex})
|
||||||
}
|
}
|
||||||
return values, nil
|
return kvi, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteRange deletes a range of values at "directory"
|
// DeleteRange deletes a range of values at "directory"
|
||||||
|
@ -154,14 +154,14 @@ func (s *Consul) Watch(key string, heartbeat time.Duration, callback WatchCallba
|
||||||
|
|
||||||
for _ = range eventChan {
|
for _ = range eventChan {
|
||||||
log.WithField("name", "consul").Debug("Key watch triggered")
|
log.WithField("name", "consul").Debug("Key watch triggered")
|
||||||
entry, _, err := s.Get(key)
|
entry, index, err := s.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Cannot refresh the key: ", fkey, ", cancelling watch")
|
log.Error("Cannot refresh the key: ", fkey, ", cancelling watch")
|
||||||
s.watches[fkey] = nil
|
s.watches[fkey] = nil
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
value := [][]byte{[]byte(entry)}
|
value := []KVEntry{&kviTuple{key, entry, index}}
|
||||||
callback(value)
|
callback(value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -224,13 +224,13 @@ func (s *Consul) WatchRange(prefix string, filter string, heartbeat time.Duratio
|
||||||
|
|
||||||
for _ = range eventChan {
|
for _ = range eventChan {
|
||||||
log.WithField("name", "consul").Debug("Key watch triggered")
|
log.WithField("name", "consul").Debug("Key watch triggered")
|
||||||
values, err := s.GetRange(prefix)
|
kvi, err := s.GetRange(prefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Cannot refresh keys with prefix: ", fprefix, ", cancelling watch")
|
log.Error("Cannot refresh keys with prefix: ", fprefix, ", cancelling watch")
|
||||||
s.watches[fprefix] = nil
|
s.watches[fprefix] = nil
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
callback(values)
|
callback(kvi)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -147,14 +147,14 @@ func (s *Etcd) Watch(key string, _ time.Duration, callback WatchCallback) error
|
||||||
|
|
||||||
for _ = range watchChan {
|
for _ = range watchChan {
|
||||||
log.WithField("name", "etcd").Debug("Discovery watch triggered")
|
log.WithField("name", "etcd").Debug("Discovery watch triggered")
|
||||||
entry, _, err := s.Get(key)
|
entry, index, err := s.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Cannot refresh the key: ", key, ", cancelling watch")
|
log.Error("Cannot refresh the key: ", key, ", cancelling watch")
|
||||||
s.watches[key] = nil
|
s.watches[key] = nil
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
value := [][]byte{[]byte(entry)}
|
kvi := []KVEntry{&kviTuple{key, entry, index}}
|
||||||
callback(value)
|
callback(kvi)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -203,16 +203,16 @@ func (s *Etcd) AtomicDelete(key string, oldValue []byte, index uint64) (bool, er
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRange gets a range of values at "directory"
|
// GetRange gets a range of values at "directory"
|
||||||
func (s *Etcd) GetRange(prefix string) (value [][]byte, err error) {
|
func (s *Etcd) GetRange(prefix string) ([]KVEntry, error) {
|
||||||
resp, err := s.client.Get(format(prefix), true, true)
|
resp, err := s.client.Get(format(prefix), true, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
values := [][]byte{}
|
kvi := make([]KVEntry, len(resp.Node.Nodes))
|
||||||
for _, n := range resp.Node.Nodes {
|
for i, n := range resp.Node.Nodes {
|
||||||
values = append(values, []byte(n.Value))
|
kvi[i] = &kviTuple{n.Key, []byte(n.Value), n.ModifiedIndex}
|
||||||
}
|
}
|
||||||
return values, nil
|
return kvi, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteRange deletes a range of values at "directory"
|
// DeleteRange deletes a range of values at "directory"
|
||||||
|
@ -236,13 +236,13 @@ func (s *Etcd) WatchRange(prefix string, filter string, _ time.Duration, callbac
|
||||||
go s.client.Watch(prefix, 0, true, watchChan, stopChan)
|
go s.client.Watch(prefix, 0, true, watchChan, stopChan)
|
||||||
for _ = range watchChan {
|
for _ = range watchChan {
|
||||||
log.WithField("name", "etcd").Debug("Discovery watch triggered")
|
log.WithField("name", "etcd").Debug("Discovery watch triggered")
|
||||||
values, err := s.GetRange(prefix)
|
kvi, err := s.GetRange(prefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Cannot refresh the key: ", prefix, ", cancelling watch")
|
log.Error("Cannot refresh the key: ", prefix, ", cancelling watch")
|
||||||
s.watches[prefix] = nil
|
s.watches[prefix] = nil
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
callback(values)
|
callback(kvi)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,7 @@ import (
|
||||||
|
|
||||||
// WatchCallback is used for watch methods on keys
|
// WatchCallback is used for watch methods on keys
|
||||||
// and is triggered on key change
|
// and is triggered on key change
|
||||||
type WatchCallback func(value [][]byte)
|
type WatchCallback func(kviTuple []KVEntry)
|
||||||
|
|
||||||
// Initialize creates a new Store object, initializing the client
|
// Initialize creates a new Store object, initializing the client
|
||||||
type Initialize func(addrs []string, options Config) (Store, error)
|
type Initialize func(addrs []string, options Config) (Store, error)
|
||||||
|
@ -43,7 +43,7 @@ type Store interface {
|
||||||
Release(session string) error
|
Release(session string) error
|
||||||
|
|
||||||
// Get range of keys based on prefix
|
// Get range of keys based on prefix
|
||||||
GetRange(prefix string) (value [][]byte, err error)
|
GetRange(prefix string) ([]KVEntry, error)
|
||||||
|
|
||||||
// Delete range of keys based on prefix
|
// Delete range of keys based on prefix
|
||||||
DeleteRange(prefix string) error
|
DeleteRange(prefix string) error
|
||||||
|
@ -61,6 +61,13 @@ type Store interface {
|
||||||
AtomicDelete(key string, oldValue []byte, index uint64) (bool, error)
|
AtomicDelete(key string, oldValue []byte, index uint64) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// KVEntry represents {Key, Value, Lastindex} tuple
|
||||||
|
type KVEntry interface {
|
||||||
|
Key() string
|
||||||
|
Value() []byte
|
||||||
|
LastIndex() uint64
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// List of Store services
|
// List of Store services
|
||||||
stores map[string]Initialize
|
stores map[string]Initialize
|
||||||
|
|
|
@ -40,3 +40,21 @@ type Config struct {
|
||||||
TLS *tls.Config
|
TLS *tls.Config
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type kviTuple struct {
|
||||||
|
key string
|
||||||
|
value []byte
|
||||||
|
lastIndex uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kvi *kviTuple) Key() string {
|
||||||
|
return kvi.key
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kvi *kviTuple) Value() []byte {
|
||||||
|
return kvi.value
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kvi *kviTuple) LastIndex() uint64 {
|
||||||
|
return kvi.lastIndex
|
||||||
|
}
|
||||||
|
|
|
@ -112,10 +112,10 @@ func (s *Zookeeper) Watch(key string, _ time.Duration, callback WatchCallback) e
|
||||||
for e := range eventChan {
|
for e := range eventChan {
|
||||||
if e.Type == zk.EventNodeChildrenChanged {
|
if e.Type == zk.EventNodeChildrenChanged {
|
||||||
log.WithField("name", "zk").Debug("Discovery watch triggered")
|
log.WithField("name", "zk").Debug("Discovery watch triggered")
|
||||||
entry, _, err := s.Get(key)
|
entry, index, err := s.Get(key)
|
||||||
value := [][]byte{[]byte(entry)}
|
kvi := []KVEntry{&kviTuple{key, []byte(entry), index}}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
callback(value)
|
callback(kvi)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -137,17 +137,17 @@ func (s *Zookeeper) CancelWatch(key string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRange gets a range of values at "directory"
|
// GetRange gets a range of values at "directory"
|
||||||
func (s *Zookeeper) GetRange(prefix string) (values [][]byte, err error) {
|
func (s *Zookeeper) GetRange(prefix string) (kvi []KVEntry, err error) {
|
||||||
prefix = format(prefix)
|
prefix = format(prefix)
|
||||||
entries, _, err := s.client.Children(prefix)
|
entries, stat, err := s.client.Children(prefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Cannot fetch range of keys beginning with prefix: ", prefix)
|
log.Error("Cannot fetch range of keys beginning with prefix: ", prefix)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, item := range entries {
|
for _, item := range entries {
|
||||||
values = append(values, []byte(item))
|
kvi = append(kvi, &kviTuple{prefix, []byte(item), uint64(stat.Mzxid)})
|
||||||
}
|
}
|
||||||
return values, err
|
return kvi, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteRange deletes a range of values at "directory"
|
// DeleteRange deletes a range of values at "directory"
|
||||||
|
@ -170,9 +170,9 @@ func (s *Zookeeper) WatchRange(prefix string, filter string, _ time.Duration, ca
|
||||||
for e := range eventChan {
|
for e := range eventChan {
|
||||||
if e.Type == zk.EventNodeChildrenChanged {
|
if e.Type == zk.EventNodeChildrenChanged {
|
||||||
log.WithField("name", "zk").Debug("Discovery watch triggered")
|
log.WithField("name", "zk").Debug("Discovery watch triggered")
|
||||||
values, err := s.GetRange(prefix)
|
kvi, err := s.GetRange(prefix)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
callback(values)
|
callback(kvi)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue