Merge pull request #1552 from abronan/fix_lock_ttl_store_failure

Fix Consul Lock TTL with store failure
This commit is contained in:
Victor Vieux 2015-12-17 13:27:42 -08:00
commit 5050e79f83
7 changed files with 51 additions and 25 deletions

View File

@ -138,7 +138,7 @@ var (
}
flLeaderTTL = cli.StringFlag{
Name: "replication-ttl",
Value: "30s",
Value: "15s",
Usage: "Leader lock release time on failure",
}
)

View File

@ -161,7 +161,10 @@ func setupReplication(c *cli.Context, cluster cluster.Cluster, server *api.Serve
}
func run(candidate *leadership.Candidate, server *api.Server, primary *mux.Router, replica *api.Replica) {
electedCh, errCh := candidate.RunForElection()
electedCh, errCh, err := candidate.RunForElection()
if err != nil {
return
}
for {
select {
case isElected := <-electedCh:
@ -181,7 +184,10 @@ func run(candidate *leadership.Candidate, server *api.Server, primary *mux.Route
}
func follow(follower *leadership.Follower, replica *api.Replica, addr string) {
leaderCh, errCh := follower.FollowElection()
leaderCh, errCh, err := follower.FollowElection()
if err != nil {
return
}
for {
select {
case leader := <-leaderCh:

View File

@ -16,7 +16,10 @@ if err != nil {
}
underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood")
electedCh, _ := underwood.RunForElection()
electedCh, _, err := underwood.RunForElection()
if err != nil {
log.Fatal("Cannot run for election, store is probably down")
}
for isElected := range electedCh {
// This loop will run every time there is a change in our leadership
@ -46,7 +49,10 @@ It is possible to follow an election in real-time and get notified whenever
there is a change in leadership:
```go
follower := leadership.NewFollower(client, "service/swarm/leader")
leaderCh, _ := follower.FollowElection()
leaderCh, _, err := follower.FollowElection()
if err != nil {
log.Fatal("Cannot follow the election, store is probably down")
}
for leader := <-leaderCh {
// Leader is a string containing the value passed to `NewCandidate`.
log.Printf("%s is now the leader", leader)
@ -83,7 +89,10 @@ func participate() {
}
func run(candidate *leadership.Candidate) {
electedCh, errCh := candidate.RunForElection()
electedCh, errCh, err := candidate.RunForElection()
if err != nil {
return
}
for {
select {
case elected := <-electedCh:

View File

@ -7,6 +7,10 @@ import (
"github.com/docker/libkv/store"
)
const (
defaultLockTTL = 15 * time.Second
)
// Candidate runs the leader election algorithm asynchronously
type Candidate struct {
client store.Store
@ -47,23 +51,28 @@ func (c *Candidate) IsLeader() bool {
// ElectedCh is used to get a channel which delivers signals on
// acquiring or losing leadership. It sends true if we become
// the leader, and false if we lose it.
func (c *Candidate) RunForElection() (<-chan bool, <-chan error) {
func (c *Candidate) RunForElection() (<-chan bool, <-chan error, error) {
c.electedCh = make(chan bool)
c.errCh = make(chan error)
lock, err := c.client.NewLock(c.key, &store.LockOptions{
Value: []byte(c.node),
TTL: c.lockTTL,
RenewLock: make(chan struct{}),
})
if err != nil {
c.errCh <- err
} else {
go c.campaign(lock)
lockOpts := &store.LockOptions{
Value: []byte(c.node),
}
return c.electedCh, c.errCh
if c.lockTTL != defaultLockTTL {
lockOpts.TTL = c.lockTTL
lockOpts.RenewLock = make(chan struct{})
}
lock, err := c.client.NewLock(c.key, lockOpts)
if err != nil {
return nil, nil, err
}
go c.campaign(lock)
return c.electedCh, c.errCh, nil
}
// Stop running for election.

View File

@ -25,7 +25,8 @@ func TestCandidate(t *testing.T) {
mockLock.On("Unlock").Return(nil)
candidate := NewCandidate(kv, "test_key", "test_node", 0)
electedCh, _ := candidate.RunForElection()
electedCh, _, err := candidate.RunForElection()
assert.Nil(t, err)
// Should issue a false upon start, no matter what.
assert.False(t, <-electedCh)

View File

@ -33,18 +33,18 @@ func (f *Follower) Leader() string {
}
// FollowElection starts monitoring the election.
func (f *Follower) FollowElection() (<-chan string, <-chan error) {
func (f *Follower) FollowElection() (<-chan string, <-chan error, error) {
f.leaderCh = make(chan string)
f.errCh = make(chan error)
ch, err := f.client.Watch(f.key, f.stopCh)
if err != nil {
f.errCh <- err
} else {
go f.follow(ch)
return nil, nil, err
}
return f.leaderCh, f.errCh
go f.follow(ch)
return f.leaderCh, f.errCh, nil
}
// Stop stops monitoring an election.

View File

@ -21,7 +21,8 @@ func TestFollower(t *testing.T) {
mockStore.On("Watch", "test_key", mock.Anything).Return(mockKVCh, nil)
follower := NewFollower(kv, "test_key")
leaderCh, errCh := follower.FollowElection()
leaderCh, errCh, err := follower.FollowElection()
assert.Nil(t, err)
// Simulate leader updates
go func() {