Merge pull request #825 from abronan/etcd_lock

store: Implement locking mechanisms with Etcd
This commit is contained in:
Andrea Luzzardi 2015-05-21 18:40:07 -07:00
commit b5c8062933
2 changed files with 134 additions and 2 deletions

View File

@ -16,6 +16,20 @@ type Etcd struct {
ephemeralTTL time.Duration
}
type etcdLock struct {
client *etcd.Client
stopLock chan struct{}
key string
value string
last *etcd.Response
ttl uint64
}
const (
defaultLockTTL = 20 * time.Second
defaultUpdateTime = 5 * time.Second
)
// InitializeEtcd creates a new Etcd client given
// a list of endpoints and optional tls config
func InitializeEtcd(addrs []string, options *Config) (Store, error) {
@ -280,5 +294,122 @@ func (s *Etcd) DeleteTree(prefix string) error {
// NewLock returns a handle to a lock struct which can be used to acquire and
// release the mutex.
func (s *Etcd) NewLock(key string, options *LockOptions) (Locker, error) {
return nil, ErrNotImplemented
var value string
ttl := uint64(time.Duration(defaultLockTTL).Seconds())
// Apply options
if options != nil {
if options.Value != nil {
value = string(options.Value)
}
if options.TTL != 0 {
ttl = uint64(options.TTL.Seconds())
}
}
// Create lock object
lock := &etcdLock{
client: s.client,
key: key,
value: value,
ttl: ttl,
}
return lock, nil
}
// Lock attempts to acquire the lock and blocks while doing so.
// Returns a channel that is closed if our lock is lost or an error.
func (l *etcdLock) Lock() (<-chan struct{}, error) {
key := normalize(l.key)
// Lock holder channels
lockHeld := make(chan struct{})
stopLocking := make(chan struct{})
var lastIndex uint64
for {
resp, err := l.client.Create(key, l.value, l.ttl)
if err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
// Key already exists
if etcdError.ErrorCode != 105 {
lastIndex = ^uint64(0)
}
}
} else {
lastIndex = resp.Node.ModifiedIndex
}
_, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", lastIndex)
if err == nil {
// Leader section
l.stopLock = stopLocking
go l.holdLock(key, lockHeld, stopLocking)
break
} else {
// Seeker section
chW := make(chan *etcd.Response)
chWStop := make(chan bool)
l.waitLock(key, chW, chWStop)
// Delete or Expire event occured
// Retry
}
}
return lockHeld, nil
}
// Hold the lock as long as we can
// Updates the key ttl periodically until we receive
// an explicit stop signal from the Unlock method
func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking chan struct{}) {
defer close(lockHeld)
update := time.NewTicker(defaultUpdateTime)
defer update.Stop()
var err error
for {
select {
case <-update.C:
l.last, err = l.client.Update(key, l.value, l.ttl)
if err != nil {
return
}
case <-stopLocking:
return
}
}
}
// WaitLock simply waits for the key to be available for creation
func (l *etcdLock) waitLock(key string, eventCh chan *etcd.Response, stopWatchCh chan bool) {
go l.client.Watch(key, 0, false, eventCh, stopWatchCh)
for event := range eventCh {
if event.Action == "delete" || event.Action == "expire" {
return
}
}
}
// Unlock released the lock. It is an error to call this
// if the lock is not currently held.
func (l *etcdLock) Unlock() error {
if l.stopLock != nil {
l.stopLock <- struct{}{}
}
if l.last != nil {
_, err := l.client.CompareAndDelete(normalize(l.key), l.value, l.last.Node.ModifiedIndex)
if err != nil {
return err
}
}
return nil
}

View File

@ -108,7 +108,8 @@ type WriteOptions struct {
// LockOptions contains optional request parameters
type LockOptions struct {
Value []byte // Optional, value to associate with the lock
Value []byte // Optional, value to associate with the lock
TTL time.Duration // Optional, expiration ttl associated with the lock
}
// WatchCallback is used for watch methods on keys