leadership: Distributed Leader Election

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2015-05-19 19:27:35 -07:00
parent 78839ba4ea
commit 3f01413b75
5 changed files with 328 additions and 0 deletions

58
leadership/README.md Normal file
View File

@ -0,0 +1,58 @@
# Leadership: Distributed Leader Election for Clustered Environments.
Leadership is a library for a cluster leader election on top of a distributed
Key/Value store.
It's built using Swarm's `pkg/store` and is designed to work across multiple
storage backends.
Right now only `Consul` is supported but `etcd` and `Zookeeper` will be coming
soon.
```go
// Create a store using pkg/store.
client, err := store.NewStore("consul", []string{"127.0.0.1:8500"}, &store.Config{})
if err != nil {
panic(err)
}
underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood")
underwood.RunForElection()
for elected := range candidate.ElectedCh {
// This loop will run every time there is a change in our leadership
// status.
if elected {
// We won the election - we are now the leader.
// Let's do leader stuff, for example, sleep for a while.
log.Printf("I won the election! I'm now the leader")
time.Sleep(10 * time.Second)
// Tired of being a leader? You can resign anytime.
candidate.Resign()
} else {
// We lost the election but are still running for leadership.
// `elected == false` is the default state and is the first event
// we'll receive from the channel. After a successfull election,
// this event can get triggered if someone else steals the
// leadership or if we resign.
log.Printf("Lost the election, let's try another time")
}
}
```
It is possible to follow an election in real-time and get notified whenever
there is a change in leadership:
```go
follower := leadership.NewFollower(client, "service/swarm/leader")
follower.FollowElection()
for leader := <-follower.LeaderCh {
// Leader is a string containing the value passed to `NewCandidate`.
log.Printf("%s is now the leader", leader)
}
```
A typical usecase for this is to be able to always send requests to the current
leader.

108
leadership/candidate.go Normal file
View File

@ -0,0 +1,108 @@
package leadership
import (
"sync"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/pkg/store"
)
// Candidate runs the leader election algorithm asynchronously
type Candidate struct {
ElectedCh chan bool
client store.Store
key string
node string
lock sync.Mutex
leader bool
stopCh chan struct{}
resignCh chan bool
}
// NewCandidate creates a new Candidate
func NewCandidate(client store.Store, key, node string) *Candidate {
return &Candidate{
client: client,
key: key,
node: node,
ElectedCh: make(chan bool),
leader: false,
resignCh: make(chan bool),
stopCh: make(chan struct{}),
}
}
// RunForElection starts the leader election algorithm. Updates in status are
// pushed through the ElectedCh channel.
func (c *Candidate) RunForElection() error {
// Need a `SessionTTL` (keep-alive) and a stop channel.
lock, err := c.client.NewLock(c.key, &store.LockOptions{Value: []byte(c.node)})
if err != nil {
return err
}
go c.campaign(lock)
return nil
}
// Stop running for election.
func (c *Candidate) Stop() {
close(c.stopCh)
}
// Resign forces the candidate to step-down and try again.
// If the candidate is not a leader, it doesn't have any effect.
// Candidate will retry immediately to acquire the leadership. If no-one else
// took it, then the Candidate will end up being a leader again.
func (c *Candidate) Resign() {
c.lock.Lock()
defer c.lock.Unlock()
if c.leader {
c.resignCh <- true
}
}
func (c *Candidate) update(status bool) {
c.lock.Lock()
defer c.lock.Unlock()
c.ElectedCh <- status
c.leader = status
}
func (c *Candidate) campaign(lock store.Locker) {
defer close(c.ElectedCh)
for {
// Start as a follower.
c.update(false)
lostCh, err := lock.Lock()
if err != nil {
log.Error(err)
return
}
// Hooray! We acquired the lock therefore we are the new leader.
c.update(true)
select {
case <-c.resignCh:
// We were asked to resign, give up the lock and go back
// campaigning.
lock.Unlock()
case <-c.stopCh:
// Give up the leadership and quit.
if c.leader {
lock.Unlock()
}
return
case <-lostCh:
// We lost the lock. Someone else is the leader, try again.
}
}
}

View File

@ -0,0 +1,57 @@
package leadership
import (
"testing"
kv "github.com/docker/swarm/pkg/store"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestCandidate(t *testing.T) {
store, err := kv.NewStore("mock", []string{}, nil)
assert.NoError(t, err)
mockStore := store.(*kv.Mock)
mockLock := &kv.MockLock{}
mockStore.On("NewLock", "test_key", mock.Anything).Return(mockLock, nil)
// Lock and unlock always succeeds.
lostCh := make(chan struct{})
var mockLostCh <-chan struct{} = lostCh
mockLock.On("Lock").Return(mockLostCh, nil)
mockLock.On("Unlock").Return(nil)
candidate := NewCandidate(store, "test_key", "test_node")
candidate.RunForElection()
// Should issue a false upon start, no matter what.
assert.False(t, <-candidate.ElectedCh)
// Since the lock always succeeeds, we should get elected.
assert.True(t, <-candidate.ElectedCh)
// Signaling a lost lock should get us de-elected...
close(lostCh)
assert.False(t, <-candidate.ElectedCh)
// And we should attempt to get re-elected again.
assert.True(t, <-candidate.ElectedCh)
// When we resign, unlock will get called, we'll be notified of the
// de-election and we'll try to get the lock again.
go candidate.Resign()
assert.False(t, <-candidate.ElectedCh)
assert.True(t, <-candidate.ElectedCh)
// After stopping the candidate, the ElectedCh should be closed.
candidate.Stop()
select {
case <-candidate.ElectedCh:
assert.True(t, false) // we should not get here.
default:
assert.True(t, true)
}
mockStore.AssertExpectations(t)
}

62
leadership/follower.go Normal file
View File

@ -0,0 +1,62 @@
package leadership
import "github.com/docker/swarm/pkg/store"
// Follower can folow an election in real-time and push notifications whenever
// there is a change in leadership.
type Follower struct {
LeaderCh chan string
client store.Store
key string
stopCh chan struct{}
}
// NewFollower creates a new follower.
func NewFollower(client store.Store, key string) *Follower {
return &Follower{
LeaderCh: make(chan string),
client: client,
key: key,
stopCh: make(chan struct{}),
}
}
// FollowElection starts monitoring the election. The current leader is updated
// in real-time and pushed through `LeaderCh`.
func (f *Follower) FollowElection() error {
ch, err := f.client.Watch(f.key, f.stopCh)
if err != nil {
return err
}
go f.follow(ch)
return nil
}
// Stop stops monitoring an election.
func (f *Follower) Stop() {
close(f.stopCh)
}
func (f *Follower) follow(<-chan *store.KVPair) {
defer close(f.LeaderCh)
// FIXME: We should pass `RequireConsistent: true` to Consul.
ch, err := f.client.Watch(f.key, f.stopCh)
if err != nil {
return
}
prev := ""
for kv := range ch {
curr := string(kv.Value)
if curr == prev {
continue
}
prev = curr
f.LeaderCh <- string(curr)
}
}

View File

@ -0,0 +1,43 @@
package leadership
import (
"testing"
kv "github.com/docker/swarm/pkg/store"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestFollower(t *testing.T) {
store, err := kv.NewStore("mock", []string{}, nil)
assert.NoError(t, err)
mockStore := store.(*kv.Mock)
kvCh := make(chan *kv.KVPair)
var mockKVCh <-chan *kv.KVPair = kvCh
mockStore.On("Watch", "test_key", mock.Anything).Return(mockKVCh, nil)
follower := NewFollower(store, "test_key")
follower.FollowElection()
// Simulate leader updates
go func() {
kvCh <- &kv.KVPair{Key: "test_key", Value: []byte("leader1")}
kvCh <- &kv.KVPair{Key: "test_key", Value: []byte("leader1")}
kvCh <- &kv.KVPair{Key: "test_key", Value: []byte("leader2")}
kvCh <- &kv.KVPair{Key: "test_key", Value: []byte("leader1")}
}()
// We shouldn't see duplicate events.
assert.Equal(t, <-follower.LeaderCh, "leader1")
assert.Equal(t, <-follower.LeaderCh, "leader2")
assert.Equal(t, <-follower.LeaderCh, "leader1")
// Once stopped, iteration over the leader channel should stop.
follower.Stop()
close(kvCh)
assert.Equal(t, "", <-follower.LeaderCh)
mockStore.AssertExpectations(t)
}