mirror of https://github.com/docker/docs.git
120 lines
2.5 KiB
Go
120 lines
2.5 KiB
Go
package leadership
|
|
|
|
import (
|
|
"sync"
|
|
|
|
log "github.com/Sirupsen/logrus"
|
|
"github.com/docker/libkv/store"
|
|
)
|
|
|
|
// Candidate runs the leader election algorithm asynchronously
|
|
type Candidate struct {
|
|
client store.Store
|
|
key string
|
|
node string
|
|
|
|
electedCh chan bool
|
|
lock sync.Mutex
|
|
leader bool
|
|
stopCh chan struct{}
|
|
resignCh chan bool
|
|
}
|
|
|
|
// NewCandidate creates a new Candidate
|
|
func NewCandidate(client store.Store, key, node string) *Candidate {
|
|
return &Candidate{
|
|
client: client,
|
|
key: key,
|
|
node: node,
|
|
|
|
electedCh: make(chan bool),
|
|
leader: false,
|
|
resignCh: make(chan bool),
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// ElectedCh is used to get a channel which delivers signals on
|
|
// acquiring or losing leadership. It sends true if we become
|
|
// the leader, and false if we lose it.
|
|
func (c *Candidate) ElectedCh() <-chan bool {
|
|
return c.electedCh
|
|
}
|
|
|
|
// IsLeader returns true if the candidate is currently a leader.
|
|
func (c *Candidate) IsLeader() bool {
|
|
return c.leader
|
|
}
|
|
|
|
// RunForElection starts the leader election algorithm. Updates in status are
|
|
// pushed through the ElectedCh channel.
|
|
func (c *Candidate) RunForElection() error {
|
|
// Need a `SessionTTL` (keep-alive) and a stop channel.
|
|
lock, err := c.client.NewLock(c.key, &store.LockOptions{Value: []byte(c.node)})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
go c.campaign(lock)
|
|
return nil
|
|
}
|
|
|
|
// Stop running for election.
|
|
func (c *Candidate) Stop() {
|
|
close(c.stopCh)
|
|
}
|
|
|
|
// Resign forces the candidate to step-down and try again.
|
|
// If the candidate is not a leader, it doesn't have any effect.
|
|
// Candidate will retry immediately to acquire the leadership. If no-one else
|
|
// took it, then the Candidate will end up being a leader again.
|
|
func (c *Candidate) Resign() {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
|
|
if c.leader {
|
|
c.resignCh <- true
|
|
}
|
|
}
|
|
|
|
func (c *Candidate) update(status bool) {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
|
|
c.leader = status
|
|
c.electedCh <- status
|
|
}
|
|
|
|
func (c *Candidate) campaign(lock store.Locker) {
|
|
defer close(c.electedCh)
|
|
|
|
for {
|
|
// Start as a follower.
|
|
c.update(false)
|
|
|
|
lostCh, err := lock.Lock()
|
|
if err != nil {
|
|
log.Error(err)
|
|
return
|
|
}
|
|
|
|
// Hooray! We acquired the lock therefore we are the new leader.
|
|
c.update(true)
|
|
|
|
select {
|
|
case <-c.resignCh:
|
|
// We were asked to resign, give up the lock and go back
|
|
// campaigning.
|
|
lock.Unlock()
|
|
case <-c.stopCh:
|
|
// Give up the leadership and quit.
|
|
if c.leader {
|
|
lock.Unlock()
|
|
}
|
|
return
|
|
case <-lostCh:
|
|
// We lost the lock. Someone else is the leader, try again.
|
|
}
|
|
}
|
|
}
|