leadership: API cleanup.

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2015-05-22 19:59:42 -07:00
parent 10a7abd89e
commit bbf57d0724
5 changed files with 45 additions and 31 deletions

View File

@ -19,11 +19,12 @@ if err != nil {
underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood") underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood")
underwood.RunForElection() underwood.RunForElection()
for elected := range underwood.ElectedCh { electedCh := underwood.ElectedCh()
for isElected := range rlectedCh {
// This loop will run every time there is a change in our leadership // This loop will run every time there is a change in our leadership
// status. // status.
if elected { if isElected {
// We won the election - we are now the leader. // We won the election - we are now the leader.
// Let's do leader stuff, for example, sleep for a while. // Let's do leader stuff, for example, sleep for a while.
log.Printf("I won the election! I'm now the leader") log.Printf("I won the election! I'm now the leader")
@ -48,7 +49,8 @@ there is a change in leadership:
```go ```go
follower := leadership.NewFollower(client, "service/swarm/leader") follower := leadership.NewFollower(client, "service/swarm/leader")
follower.FollowElection() follower.FollowElection()
for leader := <-follower.LeaderCh { leaderCh := follower.LeaderCh()
for leader := <-leaderCh {
// Leader is a string containing the value passed to `NewCandidate`. // Leader is a string containing the value passed to `NewCandidate`.
log.Printf("%s is now the leader", leader) log.Printf("%s is now the leader", leader)
} }

View File

@ -9,16 +9,15 @@ import (
// Candidate runs the leader election algorithm asynchronously // Candidate runs the leader election algorithm asynchronously
type Candidate struct { type Candidate struct {
ElectedCh chan bool
client store.Store client store.Store
key string key string
node string node string
lock sync.Mutex electedCh chan bool
leader bool lock sync.Mutex
stopCh chan struct{} leader bool
resignCh chan bool stopCh chan struct{}
resignCh chan bool
} }
// NewCandidate creates a new Candidate // NewCandidate creates a new Candidate
@ -28,13 +27,20 @@ func NewCandidate(client store.Store, key, node string) *Candidate {
key: key, key: key,
node: node, node: node,
ElectedCh: make(chan bool), electedCh: make(chan bool),
leader: false, leader: false,
resignCh: make(chan bool), resignCh: make(chan bool),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
} }
} }
// ElectedCh is used to get a channel which delivers signals on
// acquiring or losing leadership. It sends true if we become
// the leader, and false if we lose it.
func (c *Candidate) ElectedCh() <-chan bool {
return c.electedCh
}
// RunForElection starts the leader election algorithm. Updates in status are // RunForElection starts the leader election algorithm. Updates in status are
// pushed through the ElectedCh channel. // pushed through the ElectedCh channel.
func (c *Candidate) RunForElection() error { func (c *Candidate) RunForElection() error {
@ -70,12 +76,12 @@ func (c *Candidate) update(status bool) {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
c.ElectedCh <- status c.electedCh <- status
c.leader = status c.leader = status
} }
func (c *Candidate) campaign(lock store.Locker) { func (c *Candidate) campaign(lock store.Locker) {
defer close(c.ElectedCh) defer close(c.electedCh)
for { for {
// Start as a follower. // Start as a follower.

View File

@ -24,30 +24,31 @@ func TestCandidate(t *testing.T) {
candidate := NewCandidate(store, "test_key", "test_node") candidate := NewCandidate(store, "test_key", "test_node")
candidate.RunForElection() candidate.RunForElection()
electedCh := candidate.ElectedCh()
// Should issue a false upon start, no matter what. // Should issue a false upon start, no matter what.
assert.False(t, <-candidate.ElectedCh) assert.False(t, <-electedCh)
// Since the lock always succeeeds, we should get elected. // Since the lock always succeeeds, we should get elected.
assert.True(t, <-candidate.ElectedCh) assert.True(t, <-electedCh)
// Signaling a lost lock should get us de-elected... // Signaling a lost lock should get us de-elected...
close(lostCh) close(lostCh)
assert.False(t, <-candidate.ElectedCh) assert.False(t, <-electedCh)
// And we should attempt to get re-elected again. // And we should attempt to get re-elected again.
assert.True(t, <-candidate.ElectedCh) assert.True(t, <-electedCh)
// When we resign, unlock will get called, we'll be notified of the // When we resign, unlock will get called, we'll be notified of the
// de-election and we'll try to get the lock again. // de-election and we'll try to get the lock again.
go candidate.Resign() go candidate.Resign()
assert.False(t, <-candidate.ElectedCh) assert.False(t, <-electedCh)
assert.True(t, <-candidate.ElectedCh) assert.True(t, <-electedCh)
// After stopping the candidate, the ElectedCh should be closed. // After stopping the candidate, the ElectedCh should be closed.
candidate.Stop() candidate.Stop()
select { select {
case <-candidate.ElectedCh: case <-electedCh:
assert.True(t, false) // we should not get here. assert.True(t, false) // we should not get here.
default: default:
assert.True(t, true) assert.True(t, true)

View File

@ -5,26 +5,30 @@ import "github.com/docker/swarm/pkg/store"
// Follower can folow an election in real-time and push notifications whenever // Follower can folow an election in real-time and push notifications whenever
// there is a change in leadership. // there is a change in leadership.
type Follower struct { type Follower struct {
LeaderCh chan string
client store.Store client store.Store
key string key string
stopCh chan struct{} leaderCh chan string
stopCh chan struct{}
} }
// NewFollower creates a new follower. // NewFollower creates a new follower.
func NewFollower(client store.Store, key string) *Follower { func NewFollower(client store.Store, key string) *Follower {
return &Follower{ return &Follower{
LeaderCh: make(chan string),
client: client, client: client,
key: key, key: key,
leaderCh: make(chan string),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
} }
} }
// FollowElection starts monitoring the election. The current leader is updated // LeaderCh is used to get a channel which delivers the currently elected
// in real-time and pushed through `LeaderCh`. // leader.
func (f *Follower) LeaderCh() <-chan string {
return f.leaderCh
}
// FollowElection starts monitoring the election.
func (f *Follower) FollowElection() error { func (f *Follower) FollowElection() error {
ch, err := f.client.Watch(f.key, f.stopCh) ch, err := f.client.Watch(f.key, f.stopCh)
if err != nil { if err != nil {
@ -42,7 +46,7 @@ func (f *Follower) Stop() {
} }
func (f *Follower) follow(<-chan *store.KVPair) { func (f *Follower) follow(<-chan *store.KVPair) {
defer close(f.LeaderCh) defer close(f.leaderCh)
// FIXME: We should pass `RequireConsistent: true` to Consul. // FIXME: We should pass `RequireConsistent: true` to Consul.
ch, err := f.client.Watch(f.key, f.stopCh) ch, err := f.client.Watch(f.key, f.stopCh)
@ -57,6 +61,6 @@ func (f *Follower) follow(<-chan *store.KVPair) {
continue continue
} }
prev = curr prev = curr
f.LeaderCh <- string(curr) f.leaderCh <- string(curr)
} }
} }

View File

@ -20,6 +20,7 @@ func TestFollower(t *testing.T) {
follower := NewFollower(store, "test_key") follower := NewFollower(store, "test_key")
follower.FollowElection() follower.FollowElection()
leaderCh := follower.LeaderCh()
// Simulate leader updates // Simulate leader updates
go func() { go func() {
@ -30,14 +31,14 @@ func TestFollower(t *testing.T) {
}() }()
// We shouldn't see duplicate events. // We shouldn't see duplicate events.
assert.Equal(t, <-follower.LeaderCh, "leader1") assert.Equal(t, <-leaderCh, "leader1")
assert.Equal(t, <-follower.LeaderCh, "leader2") assert.Equal(t, <-leaderCh, "leader2")
assert.Equal(t, <-follower.LeaderCh, "leader1") assert.Equal(t, <-leaderCh, "leader1")
// Once stopped, iteration over the leader channel should stop. // Once stopped, iteration over the leader channel should stop.
follower.Stop() follower.Stop()
close(kvCh) close(kvCh)
assert.Equal(t, "", <-follower.LeaderCh) assert.Equal(t, "", <-leaderCh)
mockStore.AssertExpectations(t) mockStore.AssertExpectations(t)
} }