diff --git a/leadership/README.md b/leadership/README.md new file mode 100644 index 0000000000..8e50ea6302 --- /dev/null +++ b/leadership/README.md @@ -0,0 +1,58 @@ +# Leadership: Distributed Leader Election for Clustered Environments. + +Leadership is a library for a cluster leader election on top of a distributed +Key/Value store. + +It's built using Swarm's `pkg/store` and is designed to work across multiple +storage backends. + +Right now only `Consul` is supported but `etcd` and `Zookeeper` will be coming +soon. + +```go +// Create a store using pkg/store. +client, err := store.NewStore("consul", []string{"127.0.0.1:8500"}, &store.Config{}) +if err != nil { + panic(err) +} + +underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood") +underwood.RunForElection() + +for elected := range candidate.ElectedCh { + // This loop will run every time there is a change in our leadership + // status. + + if elected { + // We won the election - we are now the leader. + // Let's do leader stuff, for example, sleep for a while. + log.Printf("I won the election! I'm now the leader") + time.Sleep(10 * time.Second) + + // Tired of being a leader? You can resign anytime. + candidate.Resign() + } else { + // We lost the election but are still running for leadership. + // `elected == false` is the default state and is the first event + // we'll receive from the channel. After a successfull election, + // this event can get triggered if someone else steals the + // leadership or if we resign. + + log.Printf("Lost the election, let's try another time") + } +} +``` + +It is possible to follow an election in real-time and get notified whenever +there is a change in leadership: +```go +follower := leadership.NewFollower(client, "service/swarm/leader") +follower.FollowElection() +for leader := <-follower.LeaderCh { + // Leader is a string containing the value passed to `NewCandidate`. + log.Printf("%s is now the leader", leader) +} +``` + +A typical usecase for this is to be able to always send requests to the current +leader. diff --git a/leadership/candidate.go b/leadership/candidate.go new file mode 100644 index 0000000000..95d80773cf --- /dev/null +++ b/leadership/candidate.go @@ -0,0 +1,108 @@ +package leadership + +import ( + "sync" + + log "github.com/Sirupsen/logrus" + "github.com/docker/swarm/pkg/store" +) + +// Candidate runs the leader election algorithm asynchronously +type Candidate struct { + ElectedCh chan bool + + client store.Store + key string + node string + + lock sync.Mutex + leader bool + stopCh chan struct{} + resignCh chan bool +} + +// NewCandidate creates a new Candidate +func NewCandidate(client store.Store, key, node string) *Candidate { + return &Candidate{ + client: client, + key: key, + node: node, + + ElectedCh: make(chan bool), + leader: false, + resignCh: make(chan bool), + stopCh: make(chan struct{}), + } +} + +// RunForElection starts the leader election algorithm. Updates in status are +// pushed through the ElectedCh channel. +func (c *Candidate) RunForElection() error { + // Need a `SessionTTL` (keep-alive) and a stop channel. + lock, err := c.client.NewLock(c.key, &store.LockOptions{Value: []byte(c.node)}) + if err != nil { + return err + } + + go c.campaign(lock) + return nil +} + +// Stop running for election. +func (c *Candidate) Stop() { + close(c.stopCh) +} + +// Resign forces the candidate to step-down and try again. +// If the candidate is not a leader, it doesn't have any effect. +// Candidate will retry immediately to acquire the leadership. If no-one else +// took it, then the Candidate will end up being a leader again. +func (c *Candidate) Resign() { + c.lock.Lock() + defer c.lock.Unlock() + + if c.leader { + c.resignCh <- true + } +} + +func (c *Candidate) update(status bool) { + c.lock.Lock() + defer c.lock.Unlock() + + c.ElectedCh <- status + c.leader = status +} + +func (c *Candidate) campaign(lock store.Locker) { + defer close(c.ElectedCh) + + for { + // Start as a follower. + c.update(false) + + lostCh, err := lock.Lock() + if err != nil { + log.Error(err) + return + } + + // Hooray! We acquired the lock therefore we are the new leader. + c.update(true) + + select { + case <-c.resignCh: + // We were asked to resign, give up the lock and go back + // campaigning. + lock.Unlock() + case <-c.stopCh: + // Give up the leadership and quit. + if c.leader { + lock.Unlock() + } + return + case <-lostCh: + // We lost the lock. Someone else is the leader, try again. + } + } +} diff --git a/leadership/candidate_test.go b/leadership/candidate_test.go new file mode 100644 index 0000000000..f1cea11fdd --- /dev/null +++ b/leadership/candidate_test.go @@ -0,0 +1,57 @@ +package leadership + +import ( + "testing" + + kv "github.com/docker/swarm/pkg/store" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestCandidate(t *testing.T) { + store, err := kv.NewStore("mock", []string{}, nil) + assert.NoError(t, err) + + mockStore := store.(*kv.Mock) + mockLock := &kv.MockLock{} + mockStore.On("NewLock", "test_key", mock.Anything).Return(mockLock, nil) + + // Lock and unlock always succeeds. + lostCh := make(chan struct{}) + var mockLostCh <-chan struct{} = lostCh + mockLock.On("Lock").Return(mockLostCh, nil) + mockLock.On("Unlock").Return(nil) + + candidate := NewCandidate(store, "test_key", "test_node") + candidate.RunForElection() + + // Should issue a false upon start, no matter what. + assert.False(t, <-candidate.ElectedCh) + + // Since the lock always succeeeds, we should get elected. + assert.True(t, <-candidate.ElectedCh) + + // Signaling a lost lock should get us de-elected... + close(lostCh) + assert.False(t, <-candidate.ElectedCh) + + // And we should attempt to get re-elected again. + assert.True(t, <-candidate.ElectedCh) + + // When we resign, unlock will get called, we'll be notified of the + // de-election and we'll try to get the lock again. + go candidate.Resign() + assert.False(t, <-candidate.ElectedCh) + assert.True(t, <-candidate.ElectedCh) + + // After stopping the candidate, the ElectedCh should be closed. + candidate.Stop() + select { + case <-candidate.ElectedCh: + assert.True(t, false) // we should not get here. + default: + assert.True(t, true) + } + + mockStore.AssertExpectations(t) +} diff --git a/leadership/follower.go b/leadership/follower.go new file mode 100644 index 0000000000..e8191fe7a3 --- /dev/null +++ b/leadership/follower.go @@ -0,0 +1,62 @@ +package leadership + +import "github.com/docker/swarm/pkg/store" + +// Follower can folow an election in real-time and push notifications whenever +// there is a change in leadership. +type Follower struct { + LeaderCh chan string + + client store.Store + key string + + stopCh chan struct{} +} + +// NewFollower creates a new follower. +func NewFollower(client store.Store, key string) *Follower { + return &Follower{ + LeaderCh: make(chan string), + client: client, + key: key, + stopCh: make(chan struct{}), + } +} + +// FollowElection starts monitoring the election. The current leader is updated +// in real-time and pushed through `LeaderCh`. +func (f *Follower) FollowElection() error { + ch, err := f.client.Watch(f.key, f.stopCh) + if err != nil { + return err + } + + go f.follow(ch) + + return nil +} + +// Stop stops monitoring an election. +func (f *Follower) Stop() { + close(f.stopCh) +} + +func (f *Follower) follow(<-chan *store.KVPair) { + defer close(f.LeaderCh) + + // FIXME: We should pass `RequireConsistent: true` to Consul. + ch, err := f.client.Watch(f.key, f.stopCh) + if err != nil { + return + } + + prev := "" + for kv := range ch { + curr := string(kv.Value) + if curr == prev { + continue + } + prev = curr + f.LeaderCh <- string(curr) + } +} diff --git a/leadership/follower_test.go b/leadership/follower_test.go new file mode 100644 index 0000000000..f065e49ff9 --- /dev/null +++ b/leadership/follower_test.go @@ -0,0 +1,43 @@ +package leadership + +import ( + "testing" + + kv "github.com/docker/swarm/pkg/store" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestFollower(t *testing.T) { + store, err := kv.NewStore("mock", []string{}, nil) + assert.NoError(t, err) + + mockStore := store.(*kv.Mock) + + kvCh := make(chan *kv.KVPair) + var mockKVCh <-chan *kv.KVPair = kvCh + mockStore.On("Watch", "test_key", mock.Anything).Return(mockKVCh, nil) + + follower := NewFollower(store, "test_key") + follower.FollowElection() + + // Simulate leader updates + go func() { + kvCh <- &kv.KVPair{Key: "test_key", Value: []byte("leader1")} + kvCh <- &kv.KVPair{Key: "test_key", Value: []byte("leader1")} + kvCh <- &kv.KVPair{Key: "test_key", Value: []byte("leader2")} + kvCh <- &kv.KVPair{Key: "test_key", Value: []byte("leader1")} + }() + + // We shouldn't see duplicate events. + assert.Equal(t, <-follower.LeaderCh, "leader1") + assert.Equal(t, <-follower.LeaderCh, "leader2") + assert.Equal(t, <-follower.LeaderCh, "leader1") + + // Once stopped, iteration over the leader channel should stop. + follower.Stop() + close(kvCh) + assert.Equal(t, "", <-follower.LeaderCh) + + mockStore.AssertExpectations(t) +}