diff --git a/pkg/store/etcd.go b/pkg/store/etcd.go index 07dacd313b..fb14805902 100644 --- a/pkg/store/etcd.go +++ b/pkg/store/etcd.go @@ -16,6 +16,20 @@ type Etcd struct { ephemeralTTL time.Duration } +type etcdLock struct { + client *etcd.Client + stopLock chan struct{} + key string + value string + last *etcd.Response + ttl uint64 +} + +const ( + defaultLockTTL = 20 * time.Second + defaultUpdateTime = 5 * time.Second +) + // InitializeEtcd creates a new Etcd client given // a list of endpoints and optional tls config func InitializeEtcd(addrs []string, options *Config) (Store, error) { @@ -280,5 +294,122 @@ func (s *Etcd) DeleteTree(prefix string) error { // NewLock returns a handle to a lock struct which can be used to acquire and // release the mutex. func (s *Etcd) NewLock(key string, options *LockOptions) (Locker, error) { - return nil, ErrNotImplemented + var value string + ttl := uint64(time.Duration(defaultLockTTL).Seconds()) + + // Apply options + if options != nil { + if options.Value != nil { + value = string(options.Value) + } + if options.TTL != 0 { + ttl = uint64(options.TTL.Seconds()) + } + } + + // Create lock object + lock := &etcdLock{ + client: s.client, + key: key, + value: value, + ttl: ttl, + } + + return lock, nil +} + +// Lock attempts to acquire the lock and blocks while doing so. +// Returns a channel that is closed if our lock is lost or an error. +func (l *etcdLock) Lock() (<-chan struct{}, error) { + + key := normalize(l.key) + + // Lock holder channels + lockHeld := make(chan struct{}) + stopLocking := make(chan struct{}) + + var lastIndex uint64 + + for { + resp, err := l.client.Create(key, l.value, l.ttl) + if err != nil { + if etcdError, ok := err.(*etcd.EtcdError); ok { + // Key already exists + if etcdError.ErrorCode != 105 { + lastIndex = ^uint64(0) + } + } + } else { + lastIndex = resp.Node.ModifiedIndex + } + + _, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", lastIndex) + + if err == nil { + // Leader section + l.stopLock = stopLocking + go l.holdLock(key, lockHeld, stopLocking) + break + } else { + // Seeker section + chW := make(chan *etcd.Response) + chWStop := make(chan bool) + l.waitLock(key, chW, chWStop) + + // Delete or Expire event occured + // Retry + } + } + + return lockHeld, nil +} + +// Hold the lock as long as we can +// Updates the key ttl periodically until we receive +// an explicit stop signal from the Unlock method +func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking chan struct{}) { + defer close(lockHeld) + + update := time.NewTicker(defaultUpdateTime) + defer update.Stop() + + var err error + + for { + select { + case <-update.C: + l.last, err = l.client.Update(key, l.value, l.ttl) + if err != nil { + return + } + + case <-stopLocking: + return + } + } +} + +// WaitLock simply waits for the key to be available for creation +func (l *etcdLock) waitLock(key string, eventCh chan *etcd.Response, stopWatchCh chan bool) { + go l.client.Watch(key, 0, false, eventCh, stopWatchCh) + for event := range eventCh { + if event.Action == "delete" || event.Action == "expire" { + return + } + } +} + +// Unlock released the lock. It is an error to call this +// if the lock is not currently held. +func (l *etcdLock) Unlock() error { + if l.stopLock != nil { + l.stopLock <- struct{}{} + } + if l.last != nil { + _, err := l.client.CompareAndDelete(normalize(l.key), l.value, l.last.Node.ModifiedIndex) + if err != nil { + return err + } + } + return nil } diff --git a/pkg/store/store.go b/pkg/store/store.go index e3cbdebe0a..0fc390a232 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -108,7 +108,8 @@ type WriteOptions struct { // LockOptions contains optional request parameters type LockOptions struct { - Value []byte // Optional, value to associate with the lock + Value []byte // Optional, value to associate with the lock + TTL time.Duration // Optional, expiration ttl associated with the lock } // WatchCallback is used for watch methods on keys