diff --git a/cli/flags.go b/cli/flags.go index 6d783189e9..b464d95691 100644 --- a/cli/flags.go +++ b/cli/flags.go @@ -138,7 +138,7 @@ var ( } flLeaderTTL = cli.StringFlag{ Name: "replication-ttl", - Value: "30s", + Value: "15s", Usage: "Leader lock release time on failure", } ) diff --git a/cli/manage.go b/cli/manage.go index 260cc8bc67..ccd94639fd 100644 --- a/cli/manage.go +++ b/cli/manage.go @@ -161,7 +161,10 @@ func setupReplication(c *cli.Context, cluster cluster.Cluster, server *api.Serve } func run(candidate *leadership.Candidate, server *api.Server, primary *mux.Router, replica *api.Replica) { - electedCh, errCh := candidate.RunForElection() + electedCh, errCh, err := candidate.RunForElection() + if err != nil { + return + } for { select { case isElected := <-electedCh: @@ -181,7 +184,10 @@ func run(candidate *leadership.Candidate, server *api.Server, primary *mux.Route } func follow(follower *leadership.Follower, replica *api.Replica, addr string) { - leaderCh, errCh := follower.FollowElection() + leaderCh, errCh, err := follower.FollowElection() + if err != nil { + return + } for { select { case leader := <-leaderCh: diff --git a/leadership/README.md b/leadership/README.md index 519409e11d..d20d777551 100644 --- a/leadership/README.md +++ b/leadership/README.md @@ -16,7 +16,10 @@ if err != nil { } underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood") -electedCh, _ := underwood.RunForElection() +electedCh, _, err := underwood.RunForElection() +if err != nil { + log.Fatal("Cannot run for election, store is probably down") +} for isElected := range electedCh { // This loop will run every time there is a change in our leadership @@ -46,7 +49,10 @@ It is possible to follow an election in real-time and get notified whenever there is a change in leadership: ```go follower := leadership.NewFollower(client, "service/swarm/leader") -leaderCh, _ := follower.FollowElection() +leaderCh, _, err := follower.FollowElection() +if err != nil { + log.Fatal("Cannot follow the election, store is probably down") +} for leader := <-leaderCh { // Leader is a string containing the value passed to `NewCandidate`. log.Printf("%s is now the leader", leader) @@ -83,7 +89,10 @@ func participate() { } func run(candidate *leadership.Candidate) { - electedCh, errCh := candidate.RunForElection() + electedCh, errCh, err := candidate.RunForElection() + if err != nil { + return + } for { select { case elected := <-electedCh: diff --git a/leadership/candidate.go b/leadership/candidate.go index 3ea1f17f82..b7af7990ed 100644 --- a/leadership/candidate.go +++ b/leadership/candidate.go @@ -7,6 +7,10 @@ import ( "github.com/docker/libkv/store" ) +const ( + defaultLockTTL = 15 * time.Second +) + // Candidate runs the leader election algorithm asynchronously type Candidate struct { client store.Store @@ -47,23 +51,28 @@ func (c *Candidate) IsLeader() bool { // ElectedCh is used to get a channel which delivers signals on // acquiring or losing leadership. It sends true if we become // the leader, and false if we lose it. -func (c *Candidate) RunForElection() (<-chan bool, <-chan error) { +func (c *Candidate) RunForElection() (<-chan bool, <-chan error, error) { c.electedCh = make(chan bool) c.errCh = make(chan error) - lock, err := c.client.NewLock(c.key, &store.LockOptions{ - Value: []byte(c.node), - TTL: c.lockTTL, - RenewLock: make(chan struct{}), - }) - - if err != nil { - c.errCh <- err - } else { - go c.campaign(lock) + lockOpts := &store.LockOptions{ + Value: []byte(c.node), } - return c.electedCh, c.errCh + if c.lockTTL != defaultLockTTL { + lockOpts.TTL = c.lockTTL + lockOpts.RenewLock = make(chan struct{}) + } + + lock, err := c.client.NewLock(c.key, lockOpts) + + if err != nil { + return nil, nil, err + } + + go c.campaign(lock) + + return c.electedCh, c.errCh, nil } // Stop running for election. diff --git a/leadership/candidate_test.go b/leadership/candidate_test.go index 274fcce43c..d29b433874 100644 --- a/leadership/candidate_test.go +++ b/leadership/candidate_test.go @@ -25,7 +25,8 @@ func TestCandidate(t *testing.T) { mockLock.On("Unlock").Return(nil) candidate := NewCandidate(kv, "test_key", "test_node", 0) - electedCh, _ := candidate.RunForElection() + electedCh, _, err := candidate.RunForElection() + assert.Nil(t, err) // Should issue a false upon start, no matter what. assert.False(t, <-electedCh) diff --git a/leadership/follower.go b/leadership/follower.go index 8d5134f2b6..12f5f46b35 100644 --- a/leadership/follower.go +++ b/leadership/follower.go @@ -33,18 +33,18 @@ func (f *Follower) Leader() string { } // FollowElection starts monitoring the election. -func (f *Follower) FollowElection() (<-chan string, <-chan error) { +func (f *Follower) FollowElection() (<-chan string, <-chan error, error) { f.leaderCh = make(chan string) f.errCh = make(chan error) ch, err := f.client.Watch(f.key, f.stopCh) if err != nil { - f.errCh <- err - } else { - go f.follow(ch) + return nil, nil, err } - return f.leaderCh, f.errCh + go f.follow(ch) + + return f.leaderCh, f.errCh, nil } // Stop stops monitoring an election. diff --git a/leadership/follower_test.go b/leadership/follower_test.go index de2066b95f..9483252512 100644 --- a/leadership/follower_test.go +++ b/leadership/follower_test.go @@ -21,7 +21,8 @@ func TestFollower(t *testing.T) { mockStore.On("Watch", "test_key", mock.Anything).Return(mockKVCh, nil) follower := NewFollower(kv, "test_key") - leaderCh, errCh := follower.FollowElection() + leaderCh, errCh, err := follower.FollowElection() + assert.Nil(t, err) // Simulate leader updates go func() {