From c7513506be16945dfb4c0ff5b9fae32d8ee32f2b Mon Sep 17 00:00:00 2001 From: Alexandre Beslic Date: Fri, 31 Jul 2015 10:06:47 -0700 Subject: [PATCH] Fault tolerant Leader Election process, fixes leader information on docker info, fixes intermittent error on Consul session lock Signed-off-by: Alexandre Beslic --- Godeps/Godeps.json | 2 +- .../docker/libkv/store/etcd/etcd.go | 28 ++++---- .../github.com/docker/libkv/store/store.go | 4 -- .../docker/libkv/testutils/utils.go | 5 ++ cli/manage.go | 69 ++++++++++++++----- leadership/README.md | 50 ++++++++++++-- leadership/candidate.go | 35 +++++----- leadership/candidate_test.go | 17 ++++- leadership/follower.go | 48 ++++++------- leadership/follower_test.go | 13 +++- 10 files changed, 183 insertions(+), 88 deletions(-) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index b77ee0ed2a..7cb1d347ec 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -52,7 +52,7 @@ }, { "ImportPath": "github.com/docker/libkv", - "Rev": "057813e38a46ee5951b1fc33f6f749f7cfce2941" + "Rev": "261ee167337a70a244e30410080685843b22e184" }, { "ImportPath": "github.com/gogo/protobuf/proto", diff --git a/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go b/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go index 58f6e273ef..8d90b443d9 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go @@ -194,12 +194,6 @@ func (s *Etcd) Exists(key string) (bool, error) { // be sent to the channel. Providing a non-nil stopCh can // be used to stop watching. func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { - // Get the current value - current, err := s.Get(key) - if err != nil { - return nil, err - } - // Start an etcd watch. // Note: etcd will send the current value through the channel. etcdWatchCh := make(chan *etcd.Response) @@ -212,6 +206,12 @@ func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, go func() { defer close(watchCh) + // Get the current value + current, err := s.Get(key) + if err != nil { + return + } + // Push the current value through the channel. watchCh <- current @@ -243,12 +243,6 @@ func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, // will be sent to the channel .Providing a non-nil stopCh can // be used to stop watching. func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { - // Get child values - current, err := s.List(directory) - if err != nil { - return nil, err - } - // Start the watch etcdWatchCh := make(chan *etcd.Response) etcdStopCh := make(chan bool) @@ -260,6 +254,12 @@ func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*st go func() { defer close(watchCh) + // Get child values + current, err := s.List(directory) + if err != nil { + return + } + // Push the current value through the channel. watchCh <- current @@ -432,7 +432,7 @@ func (l *etcdLock) Lock() (<-chan struct{}, error) { lastIndex = resp.Node.ModifiedIndex } - _, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", lastIndex) + l.last, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", lastIndex) if err == nil { // Leader section @@ -467,7 +467,7 @@ func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking chan for { select { case <-update.C: - l.last, err = l.client.Update(key, l.value, l.ttl) + l.last, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", l.last.Node.ModifiedIndex) if err != nil { return } diff --git a/Godeps/_workspace/src/github.com/docker/libkv/store/store.go b/Godeps/_workspace/src/github.com/docker/libkv/store/store.go index 49e2eb9dcd..527cb4a69d 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/store/store.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/store/store.go @@ -107,10 +107,6 @@ type LockOptions struct { TTL time.Duration // Optional, expiration ttl associated with the lock } -// WatchCallback is used for watch methods on keys -// and is triggered on key change -type WatchCallback func(entries ...*KVPair) - // Locker provides locking mechanism on top of the store. // Similar to `sync.Lock` except it may return errors. type Locker interface { diff --git a/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go b/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go index 717d9ecdc2..d1ddc0ff22 100644 --- a/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go +++ b/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go @@ -288,6 +288,11 @@ func testLockUnlock(t *testing.T, kv store.Store) { err = lock.Unlock() assert.NoError(t, err) + // Lock should succeed again + lockChan, err = lock.Lock() + assert.NoError(t, err) + assert.NotNil(t, lockChan) + // Get should work pair, err = kv.Get(key) assert.NoError(t, err) diff --git a/cli/manage.go b/cli/manage.go index 41b0db85d3..ddfd3d271f 100644 --- a/cli/manage.go +++ b/cli/manage.go @@ -21,10 +21,12 @@ import ( "github.com/docker/swarm/scheduler/filter" "github.com/docker/swarm/scheduler/strategy" "github.com/docker/swarm/state" + "github.com/gorilla/mux" ) const ( leaderElectionPath = "docker/swarm/leader" + defaultRecoverTime = 10 * time.Second ) type logHandler struct { @@ -129,35 +131,64 @@ func setupReplication(c *cli.Context, cluster cluster.Cluster, server *api.Serve replica := api.NewReplica(primary, tlsConfig) go func() { - candidate.RunForElection() - electedCh := candidate.ElectedCh() - for isElected := range electedCh { - if isElected { - log.Info("Cluster leadership acquired") - server.SetHandler(primary) - } else { - log.Info("Cluster leadership lost") - server.SetHandler(replica) - } + for { + run(candidate, server, primary, replica) + time.Sleep(defaultRecoverTime) } }() go func() { - follower.FollowElection() - leaderCh := follower.LeaderCh() - for leader := range leaderCh { - log.Infof("New leader elected: %s", leader) - if leader == addr { - replica.SetPrimary("") - } else { - replica.SetPrimary(leader) - } + for { + follow(follower, replica, addr) + time.Sleep(defaultRecoverTime) } }() server.SetHandler(primary) } +func run(candidate *leadership.Candidate, server *api.Server, primary *mux.Router, replica *api.Replica) { + electedCh, errCh := candidate.RunForElection() + for { + select { + case isElected := <-electedCh: + if isElected { + log.Info("Leader Election: Cluster leadership acquired") + server.SetHandler(primary) + } else { + log.Info("Leader Election: Cluster leadership lost") + server.SetHandler(replica) + } + + case err := <-errCh: + log.Error(err) + return + } + } +} + +func follow(follower *leadership.Follower, replica *api.Replica, addr string) { + leaderCh, errCh := follower.FollowElection() + for { + select { + case leader := <-leaderCh: + if leader == "" { + continue + } + if leader == addr { + replica.SetPrimary("") + } else { + log.Infof("New leader elected: %s", leader) + replica.SetPrimary(leader) + } + + case err := <-errCh: + log.Error(err) + return + } + } +} + func manage(c *cli.Context) { var ( tlsConfig *tls.Config diff --git a/leadership/README.md b/leadership/README.md index 27ba9d2289..519409e11d 100644 --- a/leadership/README.md +++ b/leadership/README.md @@ -16,9 +16,8 @@ if err != nil { } underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood") -underwood.RunForElection() +electedCh, _ := underwood.RunForElection() -electedCh := underwood.ElectedCh() for isElected := range electedCh { // This loop will run every time there is a change in our leadership // status. @@ -47,8 +46,7 @@ It is possible to follow an election in real-time and get notified whenever there is a change in leadership: ```go follower := leadership.NewFollower(client, "service/swarm/leader") -follower.FollowElection() -leaderCh := follower.LeaderCh() +leaderCh, _ := follower.FollowElection() for leader := <-leaderCh { // Leader is a string containing the value passed to `NewCandidate`. log.Printf("%s is now the leader", leader) @@ -57,3 +55,47 @@ for leader := <-leaderCh { A typical use case for this is to be able to always send requests to the current leader. + +## Fault tolerance + +Leadership returns an error channel for Candidates and Followers that you can use +to be resilient to failures. For example, if the watch on the leader key fails +because the store becomes unavailable, you can retry the process later. + +```go +func participate() { + // Create a store using pkg/store. + client, err := store.NewStore("consul", []string{"127.0.0.1:8500"}, &store.Config{}) + if err != nil { + panic(err) + } + + waitTime := 10 * time.Second + underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood") + + go func() { + for { + run(underwood) + time.Sleep(waitTime) + // retry + } + } +} + +func run(candidate *leadership.Candidate) { + electedCh, errCh := candidate.RunForElection() + for { + select { + case elected := <-electedCh: + if isElected { + // Do something + } else { + // Do something else + } + + case err := <-errCh: + log.Error(err) + return + } +} +``` diff --git a/leadership/candidate.go b/leadership/candidate.go index 614041b2d4..b752fa22a0 100644 --- a/leadership/candidate.go +++ b/leadership/candidate.go @@ -3,7 +3,6 @@ package leadership import ( "sync" - log "github.com/Sirupsen/logrus" "github.com/docker/libkv/store" ) @@ -18,6 +17,7 @@ type Candidate struct { leader bool stopCh chan struct{} resignCh chan bool + errCh chan error } // NewCandidate creates a new Candidate @@ -27,20 +27,12 @@ func NewCandidate(client store.Store, key, node string) *Candidate { key: key, node: node, - electedCh: make(chan bool), - leader: false, - resignCh: make(chan bool), - stopCh: make(chan struct{}), + leader: false, + resignCh: make(chan bool), + stopCh: make(chan struct{}), } } -// ElectedCh is used to get a channel which delivers signals on -// acquiring or losing leadership. It sends true if we become -// the leader, and false if we lose it. -func (c *Candidate) ElectedCh() <-chan bool { - return c.electedCh -} - // IsLeader returns true if the candidate is currently a leader. func (c *Candidate) IsLeader() bool { return c.leader @@ -48,15 +40,23 @@ func (c *Candidate) IsLeader() bool { // RunForElection starts the leader election algorithm. Updates in status are // pushed through the ElectedCh channel. -func (c *Candidate) RunForElection() error { +// +// ElectedCh is used to get a channel which delivers signals on +// acquiring or losing leadership. It sends true if we become +// the leader, and false if we lose it. +func (c *Candidate) RunForElection() (<-chan bool, <-chan error) { + c.electedCh = make(chan bool) + c.errCh = make(chan error) + // Need a `SessionTTL` (keep-alive) and a stop channel. lock, err := c.client.NewLock(c.key, &store.LockOptions{Value: []byte(c.node)}) if err != nil { - return err + c.errCh <- err + } else { + go c.campaign(lock) } - go c.campaign(lock) - return nil + return c.electedCh, c.errCh } // Stop running for election. @@ -87,6 +87,7 @@ func (c *Candidate) update(status bool) { func (c *Candidate) campaign(lock store.Locker) { defer close(c.electedCh) + defer close(c.errCh) for { // Start as a follower. @@ -94,7 +95,7 @@ func (c *Candidate) campaign(lock store.Locker) { lostCh, err := lock.Lock() if err != nil { - log.Error(err) + c.errCh <- err return } diff --git a/leadership/candidate_test.go b/leadership/candidate_test.go index 3240915d22..b8e1ee8b35 100644 --- a/leadership/candidate_test.go +++ b/leadership/candidate_test.go @@ -2,6 +2,7 @@ package leadership import ( "testing" + "time" libkvmock "github.com/docker/libkv/store/mock" "github.com/stretchr/testify/assert" @@ -24,8 +25,7 @@ func TestCandidate(t *testing.T) { mockLock.On("Unlock").Return(nil) candidate := NewCandidate(kv, "test_key", "test_node") - candidate.RunForElection() - electedCh := candidate.ElectedCh() + electedCh, _ := candidate.RunForElection() // Should issue a false upon start, no matter what. assert.False(t, <-electedCh) @@ -49,5 +49,16 @@ func TestCandidate(t *testing.T) { candidate.Stop() - mockStore.AssertExpectations(t) + // Ensure that the chan closes after some time + for { + select { + case _, open := <-electedCh: + if !open { + mockStore.AssertExpectations(t) + return + } + case <-time.After(1 * time.Second): + t.Fatalf("electedCh not closed correctly") + } + } } diff --git a/leadership/follower.go b/leadership/follower.go index 50ed3b86d1..43eee814ca 100644 --- a/leadership/follower.go +++ b/leadership/follower.go @@ -1,6 +1,10 @@ package leadership -import "github.com/docker/libkv/store" +import ( + "errors" + + "github.com/docker/libkv/store" +) // Follower can folow an election in real-time and push notifications whenever // there is a change in leadership. @@ -11,39 +15,36 @@ type Follower struct { leader string leaderCh chan string stopCh chan struct{} + errCh chan error } // NewFollower creates a new follower. func NewFollower(client store.Store, key string) *Follower { return &Follower{ - client: client, - key: key, - leaderCh: make(chan string), - stopCh: make(chan struct{}), + client: client, + key: key, + stopCh: make(chan struct{}), } } -// LeaderCh is used to get a channel which delivers the currently elected -// leader. -func (f *Follower) LeaderCh() <-chan string { - return f.leaderCh -} - // Leader returns the current leader. func (f *Follower) Leader() string { return f.leader } // FollowElection starts monitoring the election. -func (f *Follower) FollowElection() error { +func (f *Follower) FollowElection() (<-chan string, <-chan error) { + f.leaderCh = make(chan string) + f.errCh = make(chan error) + ch, err := f.client.Watch(f.key, f.stopCh) if err != nil { - return err + f.errCh <- err + } else { + go f.follow(ch) } - go f.follow(ch) - - return nil + return f.leaderCh, f.errCh } // Stop stops monitoring an election. @@ -51,17 +52,15 @@ func (f *Follower) Stop() { close(f.stopCh) } -func (f *Follower) follow(<-chan *store.KVPair) { +func (f *Follower) follow(ch <-chan *store.KVPair) { defer close(f.leaderCh) - - // FIXME: We should pass `RequireConsistent: true` to Consul. - ch, err := f.client.Watch(f.key, f.stopCh) - if err != nil { - return - } + defer close(f.errCh) f.leader = "" for kv := range ch { + if kv == nil { + continue + } curr := string(kv.Value) if curr == f.leader { continue @@ -69,4 +68,7 @@ func (f *Follower) follow(<-chan *store.KVPair) { f.leader = curr f.leaderCh <- f.leader } + + // Channel closed, we return an error + f.errCh <- errors.New("Leader Election: watch leader channel closed, the store may be unavailable...") } diff --git a/leadership/follower_test.go b/leadership/follower_test.go index c95e307091..de2066b95f 100644 --- a/leadership/follower_test.go +++ b/leadership/follower_test.go @@ -21,8 +21,7 @@ func TestFollower(t *testing.T) { mockStore.On("Watch", "test_key", mock.Anything).Return(mockKVCh, nil) follower := NewFollower(kv, "test_key") - follower.FollowElection() - leaderCh := follower.LeaderCh() + leaderCh, errCh := follower.FollowElection() // Simulate leader updates go func() { @@ -41,7 +40,15 @@ func TestFollower(t *testing.T) { // Once stopped, iteration over the leader channel should stop. follower.Stop() close(kvCh) - assert.Equal(t, "", <-leaderCh) + + // Assert that we receive an error from the error chan to deal with the failover + err, open := <-errCh + assert.True(t, open) + assert.NotNil(t, err) + + // Ensure that the chan is closed + _, open = <-leaderCh + assert.False(t, open) mockStore.AssertExpectations(t) }