diff --git a/pkg/store/consul.go b/pkg/store/consul.go index cf6dd1017a..2be4252f64 100644 --- a/pkg/store/consul.go +++ b/pkg/store/consul.go @@ -15,12 +15,15 @@ var ( ErrSessionUndefined = errors.New("Session does not exist") ) -// Consul embeds the client and watches/lock sessions +// Consul embeds the client and watches type Consul struct { - config *api.Config - client *api.Client - sessions map[string]*api.Session - watches map[string]*Watch + config *api.Config + client *api.Client + watches map[string]*Watch +} + +type consulLock struct { + lock *api.Lock } // Watch embeds the event channel and the @@ -34,7 +37,6 @@ type Watch struct { // a list of endpoints and optional tls config func InitializeConsul(endpoints []string, options Config) (Store, error) { s := &Consul{} - s.sessions = make(map[string]*api.Session) s.watches = make(map[string]*Watch) // Create Consul client @@ -240,38 +242,30 @@ func (s *Consul) CancelWatchRange(prefix string) error { return s.CancelWatch(prefix) } -// Acquire the lock for "key"/"directory" -func (s *Consul) Acquire(key string, value []byte) (string, error) { - key = partialFormat(key) - session := s.client.Session() - id, _, err := session.CreateNoChecks(nil, nil) +// CreateLock returns a handle to a lock struct which can be used +// to acquire and release the mutex. +func (s *Consul) CreateLock(key string, value []byte) (Locker, error) { + l, err := s.client.LockOpts(&api.LockOptions{ + Key: partialFormat(key), + Value: value, + }) if err != nil { - return "", err + return nil, err } - - // Add session to map - s.sessions[id] = session - - p := &api.KVPair{Key: key, Value: value, Session: id} - if work, _, err := s.client.KV().Acquire(p, nil); err != nil { - return "", err - } else if !work { - return "", ErrCannotLock - } - - return id, nil + return &consulLock{lock: l}, nil } -// Release the lock for "key"/"directory" -func (s *Consul) Release(id string) error { - if _, ok := s.sessions[id]; !ok { - log.Error("Lock session does not exist") - return ErrSessionUndefined - } - session := s.sessions[id] - session.Destroy(id, nil) - s.sessions[id] = nil - return nil +// Lock attempts to acquire the lock and blocks while doing so. +func (l *consulLock) Lock() error { + // FIXME: Locks may be lost and we should watch for the returned channel. + _, err := l.lock.Lock(nil) + return err +} + +// Unlock released the lock. It is an error to call this +// if the lock is not currently held. +func (l *consulLock) Unlock() error { + return l.lock.Unlock() } // AtomicPut put a value at "key" if the key has not been diff --git a/pkg/store/etcd.go b/pkg/store/etcd.go index 7b53c38ae7..44f895841d 100644 --- a/pkg/store/etcd.go +++ b/pkg/store/etcd.go @@ -252,12 +252,8 @@ func (s *Etcd) CancelWatchRange(prefix string) error { return s.CancelWatch(format(prefix)) } -// Acquire the lock for "key"/"directory" -func (s *Etcd) Acquire(key string, value []byte) (string, error) { - return "", ErrNotImplemented -} - -// Release the lock for "key"/"directory" -func (s *Etcd) Release(session string) error { - return ErrNotImplemented +// CreateLock returns a handle to a lock struct which can be used +// to acquire and release the mutex. +func (s *Etcd) CreateLock(key string, value []byte) (Locker, error) { + return nil, ErrNotImplemented } diff --git a/pkg/store/store.go b/pkg/store/store.go index b699bd08de..2a38061d04 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -36,11 +36,10 @@ type Store interface { // Cancel watch key CancelWatch(key string) error - // Acquire the lock at key - Acquire(key string, value []byte) (string, error) - - // Release the lock at key - Release(session string) error + // CreateLock for a given key. + // The returned Locker is not held and must be acquired with `.Lock`. + // value is optional. + CreateLock(key string, value []byte) (Locker, error) // Get range of keys based on prefix GetRange(prefix string) ([]KVEntry, error) @@ -68,6 +67,13 @@ type KVEntry interface { LastIndex() uint64 } +// Locker provides locking mechanism on top of the store. +// Similar to `sync.Lock` except it may return errors. +type Locker interface { + Lock() error + Unlock() error +} + var ( // List of Store services stores map[string]Initialize diff --git a/pkg/store/zookeeper.go b/pkg/store/zookeeper.go index d25d35f306..d12b016a13 100644 --- a/pkg/store/zookeeper.go +++ b/pkg/store/zookeeper.go @@ -16,6 +16,10 @@ type Zookeeper struct { watches map[string]<-chan zk.Event } +type zookeeperLock struct { + lock *zk.Lock +} + // InitializeZookeeper creates a new Zookeeper client // given a list of endpoints and optional tls config func InitializeZookeeper(endpoints []string, options Config) (Store, error) { @@ -198,15 +202,22 @@ func (s *Zookeeper) AtomicDelete(key string, oldValue []byte, index uint64) (boo return false, ErrNotImplemented } -// Acquire the lock for "key"/"directory" -func (s *Zookeeper) Acquire(path string, value []byte) (string, error) { - // lock := zk.NewLock(s.client, path, nil) - // locks[path] = lock - // lock.Lock() - return "", ErrNotImplemented +// CreateLock returns a handle to a lock struct which can be used +// to acquire and release the mutex. +func (s *Zookeeper) CreateLock(key string, value []byte) (Locker, error) { + // FIXME: `value` is not being used since there is no support in zk.NewLock(). + return &zookeeperLock{ + lock: zk.NewLock(s.client, format(key), zk.WorldACL(zk.PermAll)), + }, nil } -// Release the lock for "key"/"directory" -func (s *Zookeeper) Release(session string) error { - return ErrNotImplemented +// Lock attempts to acquire the lock and blocks while doing so. +func (l *zookeeperLock) Lock() error { + return l.lock.Lock() +} + +// Unlock released the lock. It is an error to call this +// if the lock is not currently held. +func (l *zookeeperLock) Unlock() error { + return l.lock.Unlock() }