store: Improved Lock support.

- New interface: `store.CreateLock` returns a `Locker` interface with
  `Lock` and `Unlock` functions.
- Consul: New locking implementation leveraging helper lock
  implementation that comes with the go library.
- ZK: Lock support using the library helpers.

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2015-05-12 19:03:10 -07:00
parent 8e7447c040
commit f4bd5834fa
4 changed files with 63 additions and 56 deletions

View File

@ -15,12 +15,15 @@ var (
ErrSessionUndefined = errors.New("Session does not exist")
)
// Consul embeds the client and watches/lock sessions
// Consul embeds the client and watches
type Consul struct {
config *api.Config
client *api.Client
sessions map[string]*api.Session
watches map[string]*Watch
config *api.Config
client *api.Client
watches map[string]*Watch
}
type consulLock struct {
lock *api.Lock
}
// Watch embeds the event channel and the
@ -34,7 +37,6 @@ type Watch struct {
// a list of endpoints and optional tls config
func InitializeConsul(endpoints []string, options Config) (Store, error) {
s := &Consul{}
s.sessions = make(map[string]*api.Session)
s.watches = make(map[string]*Watch)
// Create Consul client
@ -240,38 +242,30 @@ func (s *Consul) CancelWatchRange(prefix string) error {
return s.CancelWatch(prefix)
}
// Acquire the lock for "key"/"directory"
func (s *Consul) Acquire(key string, value []byte) (string, error) {
key = partialFormat(key)
session := s.client.Session()
id, _, err := session.CreateNoChecks(nil, nil)
// CreateLock returns a handle to a lock struct which can be used
// to acquire and release the mutex.
func (s *Consul) CreateLock(key string, value []byte) (Locker, error) {
l, err := s.client.LockOpts(&api.LockOptions{
Key: partialFormat(key),
Value: value,
})
if err != nil {
return "", err
return nil, err
}
// Add session to map
s.sessions[id] = session
p := &api.KVPair{Key: key, Value: value, Session: id}
if work, _, err := s.client.KV().Acquire(p, nil); err != nil {
return "", err
} else if !work {
return "", ErrCannotLock
}
return id, nil
return &consulLock{lock: l}, nil
}
// Release the lock for "key"/"directory"
func (s *Consul) Release(id string) error {
if _, ok := s.sessions[id]; !ok {
log.Error("Lock session does not exist")
return ErrSessionUndefined
}
session := s.sessions[id]
session.Destroy(id, nil)
s.sessions[id] = nil
return nil
// Lock attempts to acquire the lock and blocks while doing so.
func (l *consulLock) Lock() error {
// FIXME: Locks may be lost and we should watch for the returned channel.
_, err := l.lock.Lock(nil)
return err
}
// Unlock released the lock. It is an error to call this
// if the lock is not currently held.
func (l *consulLock) Unlock() error {
return l.lock.Unlock()
}
// AtomicPut put a value at "key" if the key has not been

View File

@ -252,12 +252,8 @@ func (s *Etcd) CancelWatchRange(prefix string) error {
return s.CancelWatch(format(prefix))
}
// Acquire the lock for "key"/"directory"
func (s *Etcd) Acquire(key string, value []byte) (string, error) {
return "", ErrNotImplemented
}
// Release the lock for "key"/"directory"
func (s *Etcd) Release(session string) error {
return ErrNotImplemented
// CreateLock returns a handle to a lock struct which can be used
// to acquire and release the mutex.
func (s *Etcd) CreateLock(key string, value []byte) (Locker, error) {
return nil, ErrNotImplemented
}

View File

@ -36,11 +36,10 @@ type Store interface {
// Cancel watch key
CancelWatch(key string) error
// Acquire the lock at key
Acquire(key string, value []byte) (string, error)
// Release the lock at key
Release(session string) error
// CreateLock for a given key.
// The returned Locker is not held and must be acquired with `.Lock`.
// value is optional.
CreateLock(key string, value []byte) (Locker, error)
// Get range of keys based on prefix
GetRange(prefix string) ([]KVEntry, error)
@ -68,6 +67,13 @@ type KVEntry interface {
LastIndex() uint64
}
// Locker provides locking mechanism on top of the store.
// Similar to `sync.Lock` except it may return errors.
type Locker interface {
Lock() error
Unlock() error
}
var (
// List of Store services
stores map[string]Initialize

View File

@ -16,6 +16,10 @@ type Zookeeper struct {
watches map[string]<-chan zk.Event
}
type zookeeperLock struct {
lock *zk.Lock
}
// InitializeZookeeper creates a new Zookeeper client
// given a list of endpoints and optional tls config
func InitializeZookeeper(endpoints []string, options Config) (Store, error) {
@ -198,15 +202,22 @@ func (s *Zookeeper) AtomicDelete(key string, oldValue []byte, index uint64) (boo
return false, ErrNotImplemented
}
// Acquire the lock for "key"/"directory"
func (s *Zookeeper) Acquire(path string, value []byte) (string, error) {
// lock := zk.NewLock(s.client, path, nil)
// locks[path] = lock
// lock.Lock()
return "", ErrNotImplemented
// CreateLock returns a handle to a lock struct which can be used
// to acquire and release the mutex.
func (s *Zookeeper) CreateLock(key string, value []byte) (Locker, error) {
// FIXME: `value` is not being used since there is no support in zk.NewLock().
return &zookeeperLock{
lock: zk.NewLock(s.client, format(key), zk.WorldACL(zk.PermAll)),
}, nil
}
// Release the lock for "key"/"directory"
func (s *Zookeeper) Release(session string) error {
return ErrNotImplemented
// Lock attempts to acquire the lock and blocks while doing so.
func (l *zookeeperLock) Lock() error {
return l.lock.Lock()
}
// Unlock released the lock. It is an error to call this
// if the lock is not currently held.
func (l *zookeeperLock) Unlock() error {
return l.lock.Unlock()
}