docs/pkg/store/consul.go

291 lines
7.0 KiB
Go

package store
import (
"crypto/tls"
"net/http"
"strings"
"time"
log "github.com/Sirupsen/logrus"
api "github.com/hashicorp/consul/api"
)
// Consul embeds the client and watches
type Consul struct {
config *api.Config
client *api.Client
watches map[string]*Watch
}
type consulLock struct {
lock *api.Lock
}
// Watch embeds the event channel and the
// refresh interval
type Watch struct {
LastIndex uint64
}
// InitializeConsul creates a new Consul client given
// a list of endpoints and optional tls config
func InitializeConsul(endpoints []string, options *Config) (Store, error) {
s := &Consul{}
s.watches = make(map[string]*Watch)
// Create Consul client
config := api.DefaultConfig()
s.config = config
config.HttpClient = http.DefaultClient
config.Address = endpoints[0]
config.Scheme = "http"
if options.TLS != nil {
s.setTLS(options.TLS)
}
if options.Timeout != 0 {
s.setTimeout(options.Timeout)
}
// Creates a new client
client, err := api.NewClient(config)
if err != nil {
log.Errorf("Couldn't initialize consul client..")
return nil, err
}
s.client = client
return s, nil
}
// SetTLS sets Consul TLS options
func (s *Consul) setTLS(tls *tls.Config) {
s.config.HttpClient.Transport = &http.Transport{
TLSClientConfig: tls,
}
s.config.Scheme = "https"
}
// SetTimeout sets the timout for connecting to Consul
func (s *Consul) setTimeout(time time.Duration) {
s.config.WaitTime = time
}
// Normalize the key for usage in Consul
func (s *Consul) normalize(key string) string {
key = normalize(key)
return strings.TrimPrefix(key, "/")
}
// Get the value at "key", returns the last modified index
// to use in conjunction to CAS calls
func (s *Consul) Get(key string) (*KVPair, error) {
pair, meta, err := s.client.KV().Get(s.normalize(key), nil)
if err != nil {
return nil, err
}
if pair == nil {
return nil, ErrKeyNotFound
}
return &KVPair{pair.Key, pair.Value, meta.LastIndex}, nil
}
// Put a value at "key"
func (s *Consul) Put(key string, value []byte) error {
p := &api.KVPair{Key: s.normalize(key), Value: value}
if s.client == nil {
log.Error("Error initializing client")
}
_, err := s.client.KV().Put(p, nil)
return err
}
// Delete a value at "key"
func (s *Consul) Delete(key string) error {
_, err := s.client.KV().Delete(s.normalize(key), nil)
return err
}
// Exists checks that the key exists inside the store
func (s *Consul) Exists(key string) (bool, error) {
_, err := s.Get(key)
if err != nil && err == ErrKeyNotFound {
return false, err
}
return true, nil
}
// List values at "directory"
func (s *Consul) List(prefix string) ([]*KVPair, error) {
pairs, _, err := s.client.KV().List(s.normalize(prefix), nil)
if err != nil {
return nil, err
}
if len(pairs) == 0 {
return nil, ErrKeyNotFound
}
kv := []*KVPair{}
for _, pair := range pairs {
if pair.Key == prefix {
continue
}
kv = append(kv, &KVPair{pair.Key, pair.Value, pair.ModifyIndex})
}
return kv, nil
}
// DeleteTree deletes a range of values at "directory"
func (s *Consul) DeleteTree(prefix string) error {
_, err := s.client.KV().DeleteTree(s.normalize(prefix), nil)
return err
}
// Watch a single key for modifications
func (s *Consul) Watch(key string, callback WatchCallback) error {
fkey := s.normalize(key)
// We get the last index first
_, meta, err := s.client.KV().Get(fkey, nil)
if err != nil {
return err
}
// Add watch to map
s.watches[fkey] = &Watch{LastIndex: meta.LastIndex}
eventChan := s.waitForChange(fkey)
for _ = range eventChan {
entry, err := s.Get(key)
if err != nil {
log.Error("Cannot refresh the key: ", fkey, ", cancelling watch")
s.watches[fkey] = nil
return err
}
callback(entry)
}
return nil
}
// CancelWatch cancels a watch, sends a signal to the appropriate
// stop channel
func (s *Consul) CancelWatch(key string) error {
key = s.normalize(key)
if _, ok := s.watches[key]; !ok {
log.Error("Chan does not exist for key: ", key)
return ErrWatchDoesNotExist
}
s.watches[key] = nil
return nil
}
// Internal function to check if a key has changed
func (s *Consul) waitForChange(key string) <-chan uint64 {
ch := make(chan uint64)
kv := s.client.KV()
go func() {
for {
watch, ok := s.watches[key]
if !ok {
log.Error("Cannot access last index for key: ", key, " closing channel")
break
}
option := &api.QueryOptions{
WaitIndex: watch.LastIndex,
}
_, meta, err := kv.List(key, option)
if err != nil {
log.WithField("name", "consul").Error(err)
break
}
watch.LastIndex = meta.LastIndex
ch <- watch.LastIndex
}
close(ch)
}()
return ch
}
// WatchTree triggers a watch on a range of values at "directory"
func (s *Consul) WatchTree(prefix string, callback WatchCallback) error {
fprefix := s.normalize(prefix)
// We get the last index first
_, meta, err := s.client.KV().Get(prefix, nil)
if err != nil {
return err
}
// Add watch to map
s.watches[fprefix] = &Watch{LastIndex: meta.LastIndex}
eventChan := s.waitForChange(fprefix)
for _ = range eventChan {
kvi, err := s.List(prefix)
if err != nil {
log.Error("Cannot refresh keys with prefix: ", fprefix, ", cancelling watch")
s.watches[fprefix] = nil
return err
}
callback(kvi...)
}
return nil
}
// CancelWatchRange stops the watch on the range of values, sends
// a signal to the appropriate stop channel
func (s *Consul) CancelWatchRange(prefix string) error {
return s.CancelWatch(prefix)
}
// CreateLock returns a handle to a lock struct which can be used
// to acquire and release the mutex.
func (s *Consul) CreateLock(key string, value []byte) (Locker, error) {
l, err := s.client.LockOpts(&api.LockOptions{
Key: s.normalize(key),
Value: value,
})
if err != nil {
return nil, err
}
return &consulLock{lock: l}, nil
}
// Lock attempts to acquire the lock and blocks while doing so.
// Returns a channel that is closed if our lock is lost or an error.
func (l *consulLock) Lock() (<-chan struct{}, error) {
return l.lock.Lock(nil)
}
// Unlock released the lock. It is an error to call this
// if the lock is not currently held.
func (l *consulLock) Unlock() error {
return l.lock.Unlock()
}
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Consul) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) {
p := &api.KVPair{Key: s.normalize(key), Value: value, ModifyIndex: previous.LastIndex}
if work, _, err := s.client.KV().CAS(p, nil); err != nil {
return false, err
} else if !work {
return false, ErrKeyModified
}
return true, nil
}
// AtomicDelete deletes a value at "key" if the key has not
// been modified in the meantime, throws an error if this is the case
func (s *Consul) AtomicDelete(key string, previous *KVPair) (bool, error) {
p := &api.KVPair{Key: s.normalize(key), ModifyIndex: previous.LastIndex}
if work, _, err := s.client.KV().DeleteCAS(p, nil); err != nil {
return false, err
} else if !work {
return false, ErrKeyModified
}
return true, nil
}