diff --git a/leadership/README.md b/leadership/README.md index 854803d927..34675be577 100644 --- a/leadership/README.md +++ b/leadership/README.md @@ -19,11 +19,12 @@ if err != nil { underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood") underwood.RunForElection() -for elected := range underwood.ElectedCh { +electedCh := underwood.ElectedCh() +for isElected := range rlectedCh { // This loop will run every time there is a change in our leadership // status. - if elected { + if isElected { // We won the election - we are now the leader. // Let's do leader stuff, for example, sleep for a while. log.Printf("I won the election! I'm now the leader") @@ -48,7 +49,8 @@ there is a change in leadership: ```go follower := leadership.NewFollower(client, "service/swarm/leader") follower.FollowElection() -for leader := <-follower.LeaderCh { +leaderCh := follower.LeaderCh() +for leader := <-leaderCh { // Leader is a string containing the value passed to `NewCandidate`. log.Printf("%s is now the leader", leader) } diff --git a/leadership/candidate.go b/leadership/candidate.go index 95d80773cf..9e140b57c8 100644 --- a/leadership/candidate.go +++ b/leadership/candidate.go @@ -9,16 +9,15 @@ import ( // Candidate runs the leader election algorithm asynchronously type Candidate struct { - ElectedCh chan bool - client store.Store key string node string - lock sync.Mutex - leader bool - stopCh chan struct{} - resignCh chan bool + electedCh chan bool + lock sync.Mutex + leader bool + stopCh chan struct{} + resignCh chan bool } // NewCandidate creates a new Candidate @@ -28,13 +27,20 @@ func NewCandidate(client store.Store, key, node string) *Candidate { key: key, node: node, - ElectedCh: make(chan bool), + electedCh: make(chan bool), leader: false, resignCh: make(chan bool), stopCh: make(chan struct{}), } } +// ElectedCh is used to get a channel which delivers signals on +// acquiring or losing leadership. It sends true if we become +// the leader, and false if we lose it. +func (c *Candidate) ElectedCh() <-chan bool { + return c.electedCh +} + // RunForElection starts the leader election algorithm. Updates in status are // pushed through the ElectedCh channel. func (c *Candidate) RunForElection() error { @@ -70,12 +76,12 @@ func (c *Candidate) update(status bool) { c.lock.Lock() defer c.lock.Unlock() - c.ElectedCh <- status + c.electedCh <- status c.leader = status } func (c *Candidate) campaign(lock store.Locker) { - defer close(c.ElectedCh) + defer close(c.electedCh) for { // Start as a follower. diff --git a/leadership/candidate_test.go b/leadership/candidate_test.go index f1cea11fdd..f2758cfbbf 100644 --- a/leadership/candidate_test.go +++ b/leadership/candidate_test.go @@ -24,30 +24,31 @@ func TestCandidate(t *testing.T) { candidate := NewCandidate(store, "test_key", "test_node") candidate.RunForElection() + electedCh := candidate.ElectedCh() // Should issue a false upon start, no matter what. - assert.False(t, <-candidate.ElectedCh) + assert.False(t, <-electedCh) // Since the lock always succeeeds, we should get elected. - assert.True(t, <-candidate.ElectedCh) + assert.True(t, <-electedCh) // Signaling a lost lock should get us de-elected... close(lostCh) - assert.False(t, <-candidate.ElectedCh) + assert.False(t, <-electedCh) // And we should attempt to get re-elected again. - assert.True(t, <-candidate.ElectedCh) + assert.True(t, <-electedCh) // When we resign, unlock will get called, we'll be notified of the // de-election and we'll try to get the lock again. go candidate.Resign() - assert.False(t, <-candidate.ElectedCh) - assert.True(t, <-candidate.ElectedCh) + assert.False(t, <-electedCh) + assert.True(t, <-electedCh) // After stopping the candidate, the ElectedCh should be closed. candidate.Stop() select { - case <-candidate.ElectedCh: + case <-electedCh: assert.True(t, false) // we should not get here. default: assert.True(t, true) diff --git a/leadership/follower.go b/leadership/follower.go index e8191fe7a3..1a3a74483a 100644 --- a/leadership/follower.go +++ b/leadership/follower.go @@ -5,26 +5,30 @@ import "github.com/docker/swarm/pkg/store" // Follower can folow an election in real-time and push notifications whenever // there is a change in leadership. type Follower struct { - LeaderCh chan string - client store.Store key string - stopCh chan struct{} + leaderCh chan string + stopCh chan struct{} } // NewFollower creates a new follower. func NewFollower(client store.Store, key string) *Follower { return &Follower{ - LeaderCh: make(chan string), client: client, key: key, + leaderCh: make(chan string), stopCh: make(chan struct{}), } } -// FollowElection starts monitoring the election. The current leader is updated -// in real-time and pushed through `LeaderCh`. +// LeaderCh is used to get a channel which delivers the currently elected +// leader. +func (f *Follower) LeaderCh() <-chan string { + return f.leaderCh +} + +// FollowElection starts monitoring the election. func (f *Follower) FollowElection() error { ch, err := f.client.Watch(f.key, f.stopCh) if err != nil { @@ -42,7 +46,7 @@ func (f *Follower) Stop() { } func (f *Follower) follow(<-chan *store.KVPair) { - defer close(f.LeaderCh) + defer close(f.leaderCh) // FIXME: We should pass `RequireConsistent: true` to Consul. ch, err := f.client.Watch(f.key, f.stopCh) @@ -57,6 +61,6 @@ func (f *Follower) follow(<-chan *store.KVPair) { continue } prev = curr - f.LeaderCh <- string(curr) + f.leaderCh <- string(curr) } } diff --git a/leadership/follower_test.go b/leadership/follower_test.go index f065e49ff9..bd0827b81c 100644 --- a/leadership/follower_test.go +++ b/leadership/follower_test.go @@ -20,6 +20,7 @@ func TestFollower(t *testing.T) { follower := NewFollower(store, "test_key") follower.FollowElection() + leaderCh := follower.LeaderCh() // Simulate leader updates go func() { @@ -30,14 +31,14 @@ func TestFollower(t *testing.T) { }() // We shouldn't see duplicate events. - assert.Equal(t, <-follower.LeaderCh, "leader1") - assert.Equal(t, <-follower.LeaderCh, "leader2") - assert.Equal(t, <-follower.LeaderCh, "leader1") + assert.Equal(t, <-leaderCh, "leader1") + assert.Equal(t, <-leaderCh, "leader2") + assert.Equal(t, <-leaderCh, "leader1") // Once stopped, iteration over the leader channel should stop. follower.Stop() close(kvCh) - assert.Equal(t, "", <-follower.LeaderCh) + assert.Equal(t, "", <-leaderCh) mockStore.AssertExpectations(t) }