Fix Consul Lock TTL with store failure

If using the Lock TTL feature with Consul, the code
path in libkv is issuing a Put in the background through
the PeriodicRenewal call. The error is then eaten up and
ignored on the candidate loop. This would lead to the
candidate and followers being stuck in their candidate
loop. Consequence would be that they would not retry to
take the lock ending in a state with no Leader.

This patch restores an explicit error check instead of
wrongfully passing on the error to the channel before
giving it back to the caller.

Signed-off-by: Alexandre Beslic <abronan@docker.com>
This commit is contained in:
Alexandre Beslic 2015-12-16 15:46:11 -08:00
parent 7291ec144c
commit 5e8998eb6d
7 changed files with 51 additions and 25 deletions

View File

@ -138,7 +138,7 @@ var (
} }
flLeaderTTL = cli.StringFlag{ flLeaderTTL = cli.StringFlag{
Name: "replication-ttl", Name: "replication-ttl",
Value: "30s", Value: "15s",
Usage: "Leader lock release time on failure", Usage: "Leader lock release time on failure",
} }
) )

View File

@ -161,7 +161,10 @@ func setupReplication(c *cli.Context, cluster cluster.Cluster, server *api.Serve
} }
func run(candidate *leadership.Candidate, server *api.Server, primary *mux.Router, replica *api.Replica) { func run(candidate *leadership.Candidate, server *api.Server, primary *mux.Router, replica *api.Replica) {
electedCh, errCh := candidate.RunForElection() electedCh, errCh, err := candidate.RunForElection()
if err != nil {
return
}
for { for {
select { select {
case isElected := <-electedCh: case isElected := <-electedCh:
@ -181,7 +184,10 @@ func run(candidate *leadership.Candidate, server *api.Server, primary *mux.Route
} }
func follow(follower *leadership.Follower, replica *api.Replica, addr string) { func follow(follower *leadership.Follower, replica *api.Replica, addr string) {
leaderCh, errCh := follower.FollowElection() leaderCh, errCh, err := follower.FollowElection()
if err != nil {
return
}
for { for {
select { select {
case leader := <-leaderCh: case leader := <-leaderCh:

View File

@ -16,7 +16,10 @@ if err != nil {
} }
underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood") underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood")
electedCh, _ := underwood.RunForElection() electedCh, _, err := underwood.RunForElection()
if err != nil {
log.Fatal("Cannot run for election, store is probably down")
}
for isElected := range electedCh { for isElected := range electedCh {
// This loop will run every time there is a change in our leadership // This loop will run every time there is a change in our leadership
@ -46,7 +49,10 @@ It is possible to follow an election in real-time and get notified whenever
there is a change in leadership: there is a change in leadership:
```go ```go
follower := leadership.NewFollower(client, "service/swarm/leader") follower := leadership.NewFollower(client, "service/swarm/leader")
leaderCh, _ := follower.FollowElection() leaderCh, _, err := follower.FollowElection()
if err != nil {
log.Fatal("Cannot follow the election, store is probably down")
}
for leader := <-leaderCh { for leader := <-leaderCh {
// Leader is a string containing the value passed to `NewCandidate`. // Leader is a string containing the value passed to `NewCandidate`.
log.Printf("%s is now the leader", leader) log.Printf("%s is now the leader", leader)
@ -83,7 +89,10 @@ func participate() {
} }
func run(candidate *leadership.Candidate) { func run(candidate *leadership.Candidate) {
electedCh, errCh := candidate.RunForElection() electedCh, errCh, err := candidate.RunForElection()
if err != nil {
return
}
for { for {
select { select {
case elected := <-electedCh: case elected := <-electedCh:

View File

@ -7,6 +7,10 @@ import (
"github.com/docker/libkv/store" "github.com/docker/libkv/store"
) )
const (
defaultLockTTL = 15 * time.Second
)
// Candidate runs the leader election algorithm asynchronously // Candidate runs the leader election algorithm asynchronously
type Candidate struct { type Candidate struct {
client store.Store client store.Store
@ -47,23 +51,28 @@ func (c *Candidate) IsLeader() bool {
// ElectedCh is used to get a channel which delivers signals on // ElectedCh is used to get a channel which delivers signals on
// acquiring or losing leadership. It sends true if we become // acquiring or losing leadership. It sends true if we become
// the leader, and false if we lose it. // the leader, and false if we lose it.
func (c *Candidate) RunForElection() (<-chan bool, <-chan error) { func (c *Candidate) RunForElection() (<-chan bool, <-chan error, error) {
c.electedCh = make(chan bool) c.electedCh = make(chan bool)
c.errCh = make(chan error) c.errCh = make(chan error)
lock, err := c.client.NewLock(c.key, &store.LockOptions{ lockOpts := &store.LockOptions{
Value: []byte(c.node), Value: []byte(c.node),
TTL: c.lockTTL,
RenewLock: make(chan struct{}),
})
if err != nil {
c.errCh <- err
} else {
go c.campaign(lock)
} }
return c.electedCh, c.errCh if c.lockTTL != defaultLockTTL {
lockOpts.TTL = c.lockTTL
lockOpts.RenewLock = make(chan struct{})
}
lock, err := c.client.NewLock(c.key, lockOpts)
if err != nil {
return nil, nil, err
}
go c.campaign(lock)
return c.electedCh, c.errCh, nil
} }
// Stop running for election. // Stop running for election.

View File

@ -25,7 +25,8 @@ func TestCandidate(t *testing.T) {
mockLock.On("Unlock").Return(nil) mockLock.On("Unlock").Return(nil)
candidate := NewCandidate(kv, "test_key", "test_node", 0) candidate := NewCandidate(kv, "test_key", "test_node", 0)
electedCh, _ := candidate.RunForElection() electedCh, _, err := candidate.RunForElection()
assert.Nil(t, err)
// Should issue a false upon start, no matter what. // Should issue a false upon start, no matter what.
assert.False(t, <-electedCh) assert.False(t, <-electedCh)

View File

@ -33,18 +33,18 @@ func (f *Follower) Leader() string {
} }
// FollowElection starts monitoring the election. // FollowElection starts monitoring the election.
func (f *Follower) FollowElection() (<-chan string, <-chan error) { func (f *Follower) FollowElection() (<-chan string, <-chan error, error) {
f.leaderCh = make(chan string) f.leaderCh = make(chan string)
f.errCh = make(chan error) f.errCh = make(chan error)
ch, err := f.client.Watch(f.key, f.stopCh) ch, err := f.client.Watch(f.key, f.stopCh)
if err != nil { if err != nil {
f.errCh <- err return nil, nil, err
} else {
go f.follow(ch)
} }
return f.leaderCh, f.errCh go f.follow(ch)
return f.leaderCh, f.errCh, nil
} }
// Stop stops monitoring an election. // Stop stops monitoring an election.

View File

@ -21,7 +21,8 @@ func TestFollower(t *testing.T) {
mockStore.On("Watch", "test_key", mock.Anything).Return(mockKVCh, nil) mockStore.On("Watch", "test_key", mock.Anything).Return(mockKVCh, nil)
follower := NewFollower(kv, "test_key") follower := NewFollower(kv, "test_key")
leaderCh, errCh := follower.FollowElection() leaderCh, errCh, err := follower.FollowElection()
assert.Nil(t, err)
// Simulate leader updates // Simulate leader updates
go func() { go func() {