Merge pull request #1078 from abronan/leader_election_retry

Leadership: Fault tolerant Leader Election mechanism
This commit is contained in:
Andrea Luzzardi 2015-08-04 11:52:56 -07:00
commit add72eb06a
10 changed files with 183 additions and 88 deletions

2
Godeps/Godeps.json generated
View File

@ -52,7 +52,7 @@
}, },
{ {
"ImportPath": "github.com/docker/libkv", "ImportPath": "github.com/docker/libkv",
"Rev": "057813e38a46ee5951b1fc33f6f749f7cfce2941" "Rev": "261ee167337a70a244e30410080685843b22e184"
}, },
{ {
"ImportPath": "github.com/gogo/protobuf/proto", "ImportPath": "github.com/gogo/protobuf/proto",

View File

@ -194,12 +194,6 @@ func (s *Etcd) Exists(key string) (bool, error) {
// be sent to the channel. Providing a non-nil stopCh can // be sent to the channel. Providing a non-nil stopCh can
// be used to stop watching. // be used to stop watching.
func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
// Get the current value
current, err := s.Get(key)
if err != nil {
return nil, err
}
// Start an etcd watch. // Start an etcd watch.
// Note: etcd will send the current value through the channel. // Note: etcd will send the current value through the channel.
etcdWatchCh := make(chan *etcd.Response) etcdWatchCh := make(chan *etcd.Response)
@ -212,6 +206,12 @@ func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair,
go func() { go func() {
defer close(watchCh) defer close(watchCh)
// Get the current value
current, err := s.Get(key)
if err != nil {
return
}
// Push the current value through the channel. // Push the current value through the channel.
watchCh <- current watchCh <- current
@ -243,12 +243,6 @@ func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair,
// will be sent to the channel .Providing a non-nil stopCh can // will be sent to the channel .Providing a non-nil stopCh can
// be used to stop watching. // be used to stop watching.
func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
// Get child values
current, err := s.List(directory)
if err != nil {
return nil, err
}
// Start the watch // Start the watch
etcdWatchCh := make(chan *etcd.Response) etcdWatchCh := make(chan *etcd.Response)
etcdStopCh := make(chan bool) etcdStopCh := make(chan bool)
@ -260,6 +254,12 @@ func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*st
go func() { go func() {
defer close(watchCh) defer close(watchCh)
// Get child values
current, err := s.List(directory)
if err != nil {
return
}
// Push the current value through the channel. // Push the current value through the channel.
watchCh <- current watchCh <- current
@ -432,7 +432,7 @@ func (l *etcdLock) Lock() (<-chan struct{}, error) {
lastIndex = resp.Node.ModifiedIndex lastIndex = resp.Node.ModifiedIndex
} }
_, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", lastIndex) l.last, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", lastIndex)
if err == nil { if err == nil {
// Leader section // Leader section
@ -467,7 +467,7 @@ func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking chan
for { for {
select { select {
case <-update.C: case <-update.C:
l.last, err = l.client.Update(key, l.value, l.ttl) l.last, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", l.last.Node.ModifiedIndex)
if err != nil { if err != nil {
return return
} }

View File

@ -107,10 +107,6 @@ type LockOptions struct {
TTL time.Duration // Optional, expiration ttl associated with the lock TTL time.Duration // Optional, expiration ttl associated with the lock
} }
// WatchCallback is used for watch methods on keys
// and is triggered on key change
type WatchCallback func(entries ...*KVPair)
// Locker provides locking mechanism on top of the store. // Locker provides locking mechanism on top of the store.
// Similar to `sync.Lock` except it may return errors. // Similar to `sync.Lock` except it may return errors.
type Locker interface { type Locker interface {

View File

@ -288,6 +288,11 @@ func testLockUnlock(t *testing.T, kv store.Store) {
err = lock.Unlock() err = lock.Unlock()
assert.NoError(t, err) assert.NoError(t, err)
// Lock should succeed again
lockChan, err = lock.Lock()
assert.NoError(t, err)
assert.NotNil(t, lockChan)
// Get should work // Get should work
pair, err = kv.Get(key) pair, err = kv.Get(key)
assert.NoError(t, err) assert.NoError(t, err)

View File

@ -21,10 +21,12 @@ import (
"github.com/docker/swarm/scheduler/filter" "github.com/docker/swarm/scheduler/filter"
"github.com/docker/swarm/scheduler/strategy" "github.com/docker/swarm/scheduler/strategy"
"github.com/docker/swarm/state" "github.com/docker/swarm/state"
"github.com/gorilla/mux"
) )
const ( const (
leaderElectionPath = "docker/swarm/leader" leaderElectionPath = "docker/swarm/leader"
defaultRecoverTime = 10 * time.Second
) )
type logHandler struct { type logHandler struct {
@ -129,35 +131,64 @@ func setupReplication(c *cli.Context, cluster cluster.Cluster, server *api.Serve
replica := api.NewReplica(primary, tlsConfig) replica := api.NewReplica(primary, tlsConfig)
go func() { go func() {
candidate.RunForElection() for {
electedCh := candidate.ElectedCh() run(candidate, server, primary, replica)
for isElected := range electedCh { time.Sleep(defaultRecoverTime)
if isElected {
log.Info("Cluster leadership acquired")
server.SetHandler(primary)
} else {
log.Info("Cluster leadership lost")
server.SetHandler(replica)
}
} }
}() }()
go func() { go func() {
follower.FollowElection() for {
leaderCh := follower.LeaderCh() follow(follower, replica, addr)
for leader := range leaderCh { time.Sleep(defaultRecoverTime)
log.Infof("New leader elected: %s", leader)
if leader == addr {
replica.SetPrimary("")
} else {
replica.SetPrimary(leader)
}
} }
}() }()
server.SetHandler(primary) server.SetHandler(primary)
} }
func run(candidate *leadership.Candidate, server *api.Server, primary *mux.Router, replica *api.Replica) {
electedCh, errCh := candidate.RunForElection()
for {
select {
case isElected := <-electedCh:
if isElected {
log.Info("Leader Election: Cluster leadership acquired")
server.SetHandler(primary)
} else {
log.Info("Leader Election: Cluster leadership lost")
server.SetHandler(replica)
}
case err := <-errCh:
log.Error(err)
return
}
}
}
func follow(follower *leadership.Follower, replica *api.Replica, addr string) {
leaderCh, errCh := follower.FollowElection()
for {
select {
case leader := <-leaderCh:
if leader == "" {
continue
}
if leader == addr {
replica.SetPrimary("")
} else {
log.Infof("New leader elected: %s", leader)
replica.SetPrimary(leader)
}
case err := <-errCh:
log.Error(err)
return
}
}
}
func manage(c *cli.Context) { func manage(c *cli.Context) {
var ( var (
tlsConfig *tls.Config tlsConfig *tls.Config

View File

@ -16,9 +16,8 @@ if err != nil {
} }
underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood") underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood")
underwood.RunForElection() electedCh, _ := underwood.RunForElection()
electedCh := underwood.ElectedCh()
for isElected := range electedCh { for isElected := range electedCh {
// This loop will run every time there is a change in our leadership // This loop will run every time there is a change in our leadership
// status. // status.
@ -47,8 +46,7 @@ It is possible to follow an election in real-time and get notified whenever
there is a change in leadership: there is a change in leadership:
```go ```go
follower := leadership.NewFollower(client, "service/swarm/leader") follower := leadership.NewFollower(client, "service/swarm/leader")
follower.FollowElection() leaderCh, _ := follower.FollowElection()
leaderCh := follower.LeaderCh()
for leader := <-leaderCh { for leader := <-leaderCh {
// Leader is a string containing the value passed to `NewCandidate`. // Leader is a string containing the value passed to `NewCandidate`.
log.Printf("%s is now the leader", leader) log.Printf("%s is now the leader", leader)
@ -57,3 +55,47 @@ for leader := <-leaderCh {
A typical use case for this is to be able to always send requests to the current A typical use case for this is to be able to always send requests to the current
leader. leader.
## Fault tolerance
Leadership returns an error channel for Candidates and Followers that you can use
to be resilient to failures. For example, if the watch on the leader key fails
because the store becomes unavailable, you can retry the process later.
```go
func participate() {
// Create a store using pkg/store.
client, err := store.NewStore("consul", []string{"127.0.0.1:8500"}, &store.Config{})
if err != nil {
panic(err)
}
waitTime := 10 * time.Second
underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood")
go func() {
for {
run(underwood)
time.Sleep(waitTime)
// retry
}
}
}
func run(candidate *leadership.Candidate) {
electedCh, errCh := candidate.RunForElection()
for {
select {
case elected := <-electedCh:
if isElected {
// Do something
} else {
// Do something else
}
case err := <-errCh:
log.Error(err)
return
}
}
```

View File

@ -3,7 +3,6 @@ package leadership
import ( import (
"sync" "sync"
log "github.com/Sirupsen/logrus"
"github.com/docker/libkv/store" "github.com/docker/libkv/store"
) )
@ -18,6 +17,7 @@ type Candidate struct {
leader bool leader bool
stopCh chan struct{} stopCh chan struct{}
resignCh chan bool resignCh chan bool
errCh chan error
} }
// NewCandidate creates a new Candidate // NewCandidate creates a new Candidate
@ -27,20 +27,12 @@ func NewCandidate(client store.Store, key, node string) *Candidate {
key: key, key: key,
node: node, node: node,
electedCh: make(chan bool),
leader: false, leader: false,
resignCh: make(chan bool), resignCh: make(chan bool),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
} }
} }
// ElectedCh is used to get a channel which delivers signals on
// acquiring or losing leadership. It sends true if we become
// the leader, and false if we lose it.
func (c *Candidate) ElectedCh() <-chan bool {
return c.electedCh
}
// IsLeader returns true if the candidate is currently a leader. // IsLeader returns true if the candidate is currently a leader.
func (c *Candidate) IsLeader() bool { func (c *Candidate) IsLeader() bool {
return c.leader return c.leader
@ -48,15 +40,23 @@ func (c *Candidate) IsLeader() bool {
// RunForElection starts the leader election algorithm. Updates in status are // RunForElection starts the leader election algorithm. Updates in status are
// pushed through the ElectedCh channel. // pushed through the ElectedCh channel.
func (c *Candidate) RunForElection() error { //
// ElectedCh is used to get a channel which delivers signals on
// acquiring or losing leadership. It sends true if we become
// the leader, and false if we lose it.
func (c *Candidate) RunForElection() (<-chan bool, <-chan error) {
c.electedCh = make(chan bool)
c.errCh = make(chan error)
// Need a `SessionTTL` (keep-alive) and a stop channel. // Need a `SessionTTL` (keep-alive) and a stop channel.
lock, err := c.client.NewLock(c.key, &store.LockOptions{Value: []byte(c.node)}) lock, err := c.client.NewLock(c.key, &store.LockOptions{Value: []byte(c.node)})
if err != nil { if err != nil {
return err c.errCh <- err
} else {
go c.campaign(lock)
} }
go c.campaign(lock) return c.electedCh, c.errCh
return nil
} }
// Stop running for election. // Stop running for election.
@ -87,6 +87,7 @@ func (c *Candidate) update(status bool) {
func (c *Candidate) campaign(lock store.Locker) { func (c *Candidate) campaign(lock store.Locker) {
defer close(c.electedCh) defer close(c.electedCh)
defer close(c.errCh)
for { for {
// Start as a follower. // Start as a follower.
@ -94,7 +95,7 @@ func (c *Candidate) campaign(lock store.Locker) {
lostCh, err := lock.Lock() lostCh, err := lock.Lock()
if err != nil { if err != nil {
log.Error(err) c.errCh <- err
return return
} }

View File

@ -2,6 +2,7 @@ package leadership
import ( import (
"testing" "testing"
"time"
libkvmock "github.com/docker/libkv/store/mock" libkvmock "github.com/docker/libkv/store/mock"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -24,8 +25,7 @@ func TestCandidate(t *testing.T) {
mockLock.On("Unlock").Return(nil) mockLock.On("Unlock").Return(nil)
candidate := NewCandidate(kv, "test_key", "test_node") candidate := NewCandidate(kv, "test_key", "test_node")
candidate.RunForElection() electedCh, _ := candidate.RunForElection()
electedCh := candidate.ElectedCh()
// Should issue a false upon start, no matter what. // Should issue a false upon start, no matter what.
assert.False(t, <-electedCh) assert.False(t, <-electedCh)
@ -49,5 +49,16 @@ func TestCandidate(t *testing.T) {
candidate.Stop() candidate.Stop()
// Ensure that the chan closes after some time
for {
select {
case _, open := <-electedCh:
if !open {
mockStore.AssertExpectations(t) mockStore.AssertExpectations(t)
return
}
case <-time.After(1 * time.Second):
t.Fatalf("electedCh not closed correctly")
}
}
} }

View File

@ -1,6 +1,10 @@
package leadership package leadership
import "github.com/docker/libkv/store" import (
"errors"
"github.com/docker/libkv/store"
)
// Follower can follow an election in real-time and push notifications whenever // Follower can follow an election in real-time and push notifications whenever
// there is a change in leadership. // there is a change in leadership.
@ -11,6 +15,7 @@ type Follower struct {
leader string leader string
leaderCh chan string leaderCh chan string
stopCh chan struct{} stopCh chan struct{}
errCh chan error
} }
// NewFollower creates a new follower. // NewFollower creates a new follower.
@ -18,32 +23,28 @@ func NewFollower(client store.Store, key string) *Follower {
return &Follower{ return &Follower{
client: client, client: client,
key: key, key: key,
leaderCh: make(chan string),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
} }
} }
// LeaderCh is used to get a channel which delivers the currently elected
// leader.
func (f *Follower) LeaderCh() <-chan string {
return f.leaderCh
}
// Leader returns the current leader. // Leader returns the current leader.
func (f *Follower) Leader() string { func (f *Follower) Leader() string {
return f.leader return f.leader
} }
// FollowElection starts monitoring the election. // FollowElection starts monitoring the election.
func (f *Follower) FollowElection() error { func (f *Follower) FollowElection() (<-chan string, <-chan error) {
f.leaderCh = make(chan string)
f.errCh = make(chan error)
ch, err := f.client.Watch(f.key, f.stopCh) ch, err := f.client.Watch(f.key, f.stopCh)
if err != nil { if err != nil {
return err f.errCh <- err
} else {
go f.follow(ch)
} }
go f.follow(ch) return f.leaderCh, f.errCh
return nil
} }
// Stop stops monitoring an election. // Stop stops monitoring an election.
@ -51,17 +52,15 @@ func (f *Follower) Stop() {
close(f.stopCh) close(f.stopCh)
} }
func (f *Follower) follow(<-chan *store.KVPair) { func (f *Follower) follow(ch <-chan *store.KVPair) {
defer close(f.leaderCh) defer close(f.leaderCh)
defer close(f.errCh)
// FIXME: We should pass `RequireConsistent: true` to Consul.
ch, err := f.client.Watch(f.key, f.stopCh)
if err != nil {
return
}
f.leader = "" f.leader = ""
for kv := range ch { for kv := range ch {
if kv == nil {
continue
}
curr := string(kv.Value) curr := string(kv.Value)
if curr == f.leader { if curr == f.leader {
continue continue
@ -69,4 +68,7 @@ func (f *Follower) follow(<-chan *store.KVPair) {
f.leader = curr f.leader = curr
f.leaderCh <- f.leader f.leaderCh <- f.leader
} }
// Channel closed, we return an error
f.errCh <- errors.New("Leader Election: watch leader channel closed, the store may be unavailable...")
} }

View File

@ -21,8 +21,7 @@ func TestFollower(t *testing.T) {
mockStore.On("Watch", "test_key", mock.Anything).Return(mockKVCh, nil) mockStore.On("Watch", "test_key", mock.Anything).Return(mockKVCh, nil)
follower := NewFollower(kv, "test_key") follower := NewFollower(kv, "test_key")
follower.FollowElection() leaderCh, errCh := follower.FollowElection()
leaderCh := follower.LeaderCh()
// Simulate leader updates // Simulate leader updates
go func() { go func() {
@ -41,7 +40,15 @@ func TestFollower(t *testing.T) {
// Once stopped, iteration over the leader channel should stop. // Once stopped, iteration over the leader channel should stop.
follower.Stop() follower.Stop()
close(kvCh) close(kvCh)
assert.Equal(t, "", <-leaderCh)
// Assert that we receive an error from the error chan to deal with the failover
err, open := <-errCh
assert.True(t, open)
assert.NotNil(t, err)
// Ensure that the chan is closed
_, open = <-leaderCh
assert.False(t, open)
mockStore.AssertExpectations(t) mockStore.AssertExpectations(t)
} }