diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go index a7ac5acbb8..a413aec462 100644 --- a/discovery/kv/kv.go +++ b/discovery/kv/kv.go @@ -71,7 +71,7 @@ func (s *Discovery) Fetch() ([]*discovery.Entry, error) { // Watch is exported func (s *Discovery) Watch(callback discovery.WatchCallback) { - s.store.WatchRange(s.prefix, "", s.heartbeat, func(kvalues [][]byte) { + s.store.WatchRange(s.prefix, "", s.heartbeat, func(kvalues []store.KVEntry) { // Traduce byte array entries to discovery.Entry entries, _ := discovery.CreateEntries(convertToStringArray(kvalues)) callback(entries) @@ -84,9 +84,9 @@ func (s *Discovery) Register(addr string) error { return err } -func convertToStringArray(entries [][]byte) (addrs []string) { +func convertToStringArray(entries []store.KVEntry) (addrs []string) { for _, entry := range entries { - addrs = append(addrs, string(entry)) + addrs = append(addrs, string(entry.Value())) } return addrs } diff --git a/pkg/store/consul.go b/pkg/store/consul.go index a052d9d556..536dd8e070 100644 --- a/pkg/store/consul.go +++ b/pkg/store/consul.go @@ -115,7 +115,7 @@ func (s *Consul) Exists(key string) (bool, error) { } // GetRange gets a range of values at "directory" -func (s *Consul) GetRange(prefix string) (values [][]byte, err error) { +func (s *Consul) GetRange(prefix string) (kvi []KVEntry, err error) { pairs, _, err := s.client.KV().List(partialFormat(prefix), nil) if err != nil { return nil, err @@ -127,9 +127,9 @@ func (s *Consul) GetRange(prefix string) (values [][]byte, err error) { if pair.Key == prefix { continue } - values = append(values, pair.Value) + kvi = append(kvi, &kviTuple{pair.Key, pair.Value, pair.ModifyIndex}) } - return values, nil + return kvi, nil } // DeleteRange deletes a range of values at "directory" @@ -154,14 +154,14 @@ func (s *Consul) Watch(key string, heartbeat time.Duration, callback WatchCallba for _ = range eventChan { log.WithField("name", "consul").Debug("Key watch triggered") - entry, _, err := s.Get(key) + entry, index, err := s.Get(key) if err != nil { log.Error("Cannot refresh the key: ", fkey, ", cancelling watch") s.watches[fkey] = nil return err } - value := [][]byte{[]byte(entry)} + value := []KVEntry{&kviTuple{key, entry, index}} callback(value) } @@ -224,13 +224,13 @@ func (s *Consul) WatchRange(prefix string, filter string, heartbeat time.Duratio for _ = range eventChan { log.WithField("name", "consul").Debug("Key watch triggered") - values, err := s.GetRange(prefix) + kvi, err := s.GetRange(prefix) if err != nil { log.Error("Cannot refresh keys with prefix: ", fprefix, ", cancelling watch") s.watches[fprefix] = nil return err } - callback(values) + callback(kvi) } return nil diff --git a/pkg/store/etcd.go b/pkg/store/etcd.go index 4487ec5578..33689445bb 100644 --- a/pkg/store/etcd.go +++ b/pkg/store/etcd.go @@ -147,14 +147,14 @@ func (s *Etcd) Watch(key string, _ time.Duration, callback WatchCallback) error for _ = range watchChan { log.WithField("name", "etcd").Debug("Discovery watch triggered") - entry, _, err := s.Get(key) + entry, index, err := s.Get(key) if err != nil { log.Error("Cannot refresh the key: ", key, ", cancelling watch") s.watches[key] = nil return err } - value := [][]byte{[]byte(entry)} - callback(value) + kvi := []KVEntry{&kviTuple{key, entry, index}} + callback(kvi) } return nil } @@ -203,16 +203,16 @@ func (s *Etcd) AtomicDelete(key string, oldValue []byte, index uint64) (bool, er } // GetRange gets a range of values at "directory" -func (s *Etcd) GetRange(prefix string) (value [][]byte, err error) { +func (s *Etcd) GetRange(prefix string) ([]KVEntry, error) { resp, err := s.client.Get(format(prefix), true, true) if err != nil { return nil, err } - values := [][]byte{} - for _, n := range resp.Node.Nodes { - values = append(values, []byte(n.Value)) + kvi := make([]KVEntry, len(resp.Node.Nodes)) + for i, n := range resp.Node.Nodes { + kvi[i] = &kviTuple{n.Key, []byte(n.Value), n.ModifiedIndex} } - return values, nil + return kvi, nil } // DeleteRange deletes a range of values at "directory" @@ -236,13 +236,13 @@ func (s *Etcd) WatchRange(prefix string, filter string, _ time.Duration, callbac go s.client.Watch(prefix, 0, true, watchChan, stopChan) for _ = range watchChan { log.WithField("name", "etcd").Debug("Discovery watch triggered") - values, err := s.GetRange(prefix) + kvi, err := s.GetRange(prefix) if err != nil { log.Error("Cannot refresh the key: ", prefix, ", cancelling watch") s.watches[prefix] = nil return err } - callback(values) + callback(kvi) } return nil } diff --git a/pkg/store/store.go b/pkg/store/store.go index 33d17d468e..7a405a69be 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -8,7 +8,7 @@ import ( // WatchCallback is used for watch methods on keys // and is triggered on key change -type WatchCallback func(value [][]byte) +type WatchCallback func(kviTuple []KVEntry) // Initialize creates a new Store object, initializing the client type Initialize func(addrs []string, options Config) (Store, error) @@ -43,7 +43,7 @@ type Store interface { Release(session string) error // Get range of keys based on prefix - GetRange(prefix string) (value [][]byte, err error) + GetRange(prefix string) ([]KVEntry, error) // Delete range of keys based on prefix DeleteRange(prefix string) error @@ -61,6 +61,13 @@ type Store interface { AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) } +// KVEntry represents {Key, Value, Lastindex} tuple +type KVEntry interface { + Key() string + Value() []byte + LastIndex() uint64 +} + var ( // List of Store services stores map[string]Initialize diff --git a/pkg/store/structs.go b/pkg/store/structs.go index 6e827fea4b..03681fc2c8 100644 --- a/pkg/store/structs.go +++ b/pkg/store/structs.go @@ -40,3 +40,21 @@ type Config struct { TLS *tls.Config Timeout time.Duration } + +type kviTuple struct { + key string + value []byte + lastIndex uint64 +} + +func (kvi *kviTuple) Key() string { + return kvi.key +} + +func (kvi *kviTuple) Value() []byte { + return kvi.value +} + +func (kvi *kviTuple) LastIndex() uint64 { + return kvi.lastIndex +} diff --git a/pkg/store/zookeeper.go b/pkg/store/zookeeper.go index 0ef3c00541..b456507bb5 100644 --- a/pkg/store/zookeeper.go +++ b/pkg/store/zookeeper.go @@ -112,10 +112,10 @@ func (s *Zookeeper) Watch(key string, _ time.Duration, callback WatchCallback) e for e := range eventChan { if e.Type == zk.EventNodeChildrenChanged { log.WithField("name", "zk").Debug("Discovery watch triggered") - entry, _, err := s.Get(key) - value := [][]byte{[]byte(entry)} + entry, index, err := s.Get(key) + kvi := []KVEntry{&kviTuple{key, []byte(entry), index}} if err == nil { - callback(value) + callback(kvi) } } } @@ -137,17 +137,17 @@ func (s *Zookeeper) CancelWatch(key string) error { } // GetRange gets a range of values at "directory" -func (s *Zookeeper) GetRange(prefix string) (values [][]byte, err error) { +func (s *Zookeeper) GetRange(prefix string) (kvi []KVEntry, err error) { prefix = format(prefix) - entries, _, err := s.client.Children(prefix) + entries, stat, err := s.client.Children(prefix) if err != nil { log.Error("Cannot fetch range of keys beginning with prefix: ", prefix) return nil, err } for _, item := range entries { - values = append(values, []byte(item)) + kvi = append(kvi, &kviTuple{prefix, []byte(item), uint64(stat.Mzxid)}) } - return values, err + return kvi, err } // DeleteRange deletes a range of values at "directory" @@ -170,9 +170,9 @@ func (s *Zookeeper) WatchRange(prefix string, filter string, _ time.Duration, ca for e := range eventChan { if e.Type == zk.EventNodeChildrenChanged { log.WithField("name", "zk").Debug("Discovery watch triggered") - values, err := s.GetRange(prefix) + kvi, err := s.GetRange(prefix) if err == nil { - callback(values) + callback(kvi) } } }