mirror of https://github.com/docker/docs.git
Merge pull request #2050 from abronan/fix_session_consul_leader_failure
Fix Consul Leader Election Failure on multi-server and leader soft-restart
This commit is contained in:
commit
34d4cae503
|
@ -1,7 +1,6 @@
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/docker/swarm",
|
"ImportPath": "github.com/docker/swarm",
|
||||||
"GoVersion": "go1.5",
|
"GoVersion": "go1.5",
|
||||||
"GodepVersion": "v60",
|
|
||||||
"Packages": [
|
"Packages": [
|
||||||
"./..."
|
"./..."
|
||||||
],
|
],
|
||||||
|
@ -166,32 +165,32 @@
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/docker/leadership",
|
"ImportPath": "github.com/docker/leadership",
|
||||||
"Rev": "b545f2df5f5cd35a54c5a6bf154dcfe1f29d125b"
|
"Rev": "bfc7753dd48af19513b29deec23c364bf0f274eb"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/docker/libkv",
|
"ImportPath": "github.com/docker/libkv",
|
||||||
"Comment": "v0.1.0-6-g2f2380c",
|
"Comment": "v0.1.0-33-g2a3d365",
|
||||||
"Rev": "2f2380c8698abff4eb662f33b0e088e520ec416e"
|
"Rev": "2a3d365c64a1cdda570493123392c8d800edf766"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/docker/libkv/store",
|
"ImportPath": "github.com/docker/libkv/store",
|
||||||
"Comment": "v0.1.0-6-g2f2380c",
|
"Comment": "v0.1.0-33-g2a3d365",
|
||||||
"Rev": "2f2380c8698abff4eb662f33b0e088e520ec416e"
|
"Rev": "2a3d365c64a1cdda570493123392c8d800edf766"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/docker/libkv/store/consul",
|
"ImportPath": "github.com/docker/libkv/store/consul",
|
||||||
"Comment": "v0.1.0-6-g2f2380c",
|
"Comment": "v0.1.0-33-g2a3d365",
|
||||||
"Rev": "2f2380c8698abff4eb662f33b0e088e520ec416e"
|
"Rev": "2a3d365c64a1cdda570493123392c8d800edf766"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/docker/libkv/store/etcd",
|
"ImportPath": "github.com/docker/libkv/store/etcd",
|
||||||
"Comment": "v0.1.0-6-g2f2380c",
|
"Comment": "v0.1.0-33-g2a3d365",
|
||||||
"Rev": "2f2380c8698abff4eb662f33b0e088e520ec416e"
|
"Rev": "2a3d365c64a1cdda570493123392c8d800edf766"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/docker/libkv/store/zookeeper",
|
"ImportPath": "github.com/docker/libkv/store/zookeeper",
|
||||||
"Comment": "v0.1.0-6-g2f2380c",
|
"Comment": "v0.1.0-33-g2a3d365",
|
||||||
"Rev": "2f2380c8698abff4eb662f33b0e088e520ec416e"
|
"Rev": "2a3d365c64a1cdda570493123392c8d800edf766"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/gogo/protobuf/proto",
|
"ImportPath": "github.com/gogo/protobuf/proto",
|
||||||
|
|
|
@ -16,10 +16,7 @@ if err != nil {
|
||||||
}
|
}
|
||||||
|
|
||||||
underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood", 15*time.Second)
|
underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood", 15*time.Second)
|
||||||
electedCh, _, err := underwood.RunForElection()
|
electedCh, _ := underwood.RunForElection()
|
||||||
if err != nil {
|
|
||||||
log.Fatal("Cannot run for election, store is probably down")
|
|
||||||
}
|
|
||||||
|
|
||||||
for isElected := range electedCh {
|
for isElected := range electedCh {
|
||||||
// This loop will run every time there is a change in our leadership
|
// This loop will run every time there is a change in our leadership
|
||||||
|
@ -49,14 +46,13 @@ It is possible to follow an election in real-time and get notified whenever
|
||||||
there is a change in leadership:
|
there is a change in leadership:
|
||||||
```go
|
```go
|
||||||
follower := leadership.NewFollower(client, "service/swarm/leader")
|
follower := leadership.NewFollower(client, "service/swarm/leader")
|
||||||
leaderCh, _, err := follower.FollowElection()
|
leaderCh, _ := follower.FollowElection()
|
||||||
if err != nil {
|
|
||||||
log.Fatal("Cannot follow the election, store is probably down")
|
|
||||||
}
|
|
||||||
for leader := range leaderCh {
|
for leader := range leaderCh {
|
||||||
// Leader is a string containing the value passed to `NewCandidate`.
|
// Leader is a string containing the value passed to `NewCandidate`.
|
||||||
log.Printf("%s is now the leader", leader)
|
log.Printf("%s is now the leader", leader)
|
||||||
}
|
}
|
||||||
|
log.Fatal("Cannot follow the election, store is probably down")
|
||||||
|
// Recovery code or exit
|
||||||
```
|
```
|
||||||
|
|
||||||
A typical use case for this is to be able to always send requests to the current
|
A typical use case for this is to be able to always send requests to the current
|
||||||
|
@ -85,17 +81,14 @@ func participate() {
|
||||||
time.Sleep(waitTime)
|
time.Sleep(waitTime)
|
||||||
// retry
|
// retry
|
||||||
}
|
}
|
||||||
}
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func run(candidate *leadership.Candidate) {
|
func run(candidate *leadership.Candidate) {
|
||||||
electedCh, errCh, err := candidate.RunForElection()
|
electedCh, errCh := candidate.RunForElection()
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case elected := <-electedCh:
|
case isElected := <-electedCh:
|
||||||
if isElected {
|
if isElected {
|
||||||
// Do something
|
// Do something
|
||||||
} else {
|
} else {
|
||||||
|
@ -106,6 +99,7 @@ func run(candidate *leadership.Candidate) {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
defaultLockTTL = 15 * time.Second
|
defaultLockTTL = 20 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// Candidate runs the leader election algorithm asynchronously
|
// Candidate runs the leader election algorithm asynchronously
|
||||||
|
@ -22,6 +22,7 @@ type Candidate struct {
|
||||||
lockTTL time.Duration
|
lockTTL time.Duration
|
||||||
leader bool
|
leader bool
|
||||||
stopCh chan struct{}
|
stopCh chan struct{}
|
||||||
|
stopRenew chan struct{}
|
||||||
resignCh chan bool
|
resignCh chan bool
|
||||||
errCh chan error
|
errCh chan error
|
||||||
}
|
}
|
||||||
|
@ -51,28 +52,13 @@ func (c *Candidate) IsLeader() bool {
|
||||||
// ElectedCh is used to get a channel which delivers signals on
|
// ElectedCh is used to get a channel which delivers signals on
|
||||||
// acquiring or losing leadership. It sends true if we become
|
// acquiring or losing leadership. It sends true if we become
|
||||||
// the leader, and false if we lose it.
|
// the leader, and false if we lose it.
|
||||||
func (c *Candidate) RunForElection() (<-chan bool, <-chan error, error) {
|
func (c *Candidate) RunForElection() (<-chan bool, <-chan error) {
|
||||||
c.electedCh = make(chan bool)
|
c.electedCh = make(chan bool)
|
||||||
c.errCh = make(chan error)
|
c.errCh = make(chan error)
|
||||||
|
|
||||||
lockOpts := &store.LockOptions{
|
go c.campaign()
|
||||||
Value: []byte(c.node),
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.lockTTL != defaultLockTTL {
|
return c.electedCh, c.errCh
|
||||||
lockOpts.TTL = c.lockTTL
|
|
||||||
lockOpts.RenewLock = make(chan struct{})
|
|
||||||
}
|
|
||||||
|
|
||||||
lock, err := c.client.NewLock(c.key, lockOpts)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
go c.campaign(lock)
|
|
||||||
|
|
||||||
return c.electedCh, c.errCh, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop running for election.
|
// Stop running for election.
|
||||||
|
@ -101,7 +87,29 @@ func (c *Candidate) update(status bool) {
|
||||||
c.electedCh <- status
|
c.electedCh <- status
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Candidate) campaign(lock store.Locker) {
|
func (c *Candidate) initLock() (store.Locker, error) {
|
||||||
|
// Give up on the lock session if
|
||||||
|
// we recovered from a store failure
|
||||||
|
if c.stopRenew != nil {
|
||||||
|
close(c.stopRenew)
|
||||||
|
}
|
||||||
|
|
||||||
|
lockOpts := &store.LockOptions{
|
||||||
|
Value: []byte(c.node),
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.lockTTL != defaultLockTTL {
|
||||||
|
lockOpts.TTL = c.lockTTL
|
||||||
|
}
|
||||||
|
|
||||||
|
lockOpts.RenewLock = make(chan struct{})
|
||||||
|
c.stopRenew = lockOpts.RenewLock
|
||||||
|
|
||||||
|
lock, err := c.client.NewLock(c.key, lockOpts)
|
||||||
|
return lock, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Candidate) campaign() {
|
||||||
defer close(c.electedCh)
|
defer close(c.electedCh)
|
||||||
defer close(c.errCh)
|
defer close(c.errCh)
|
||||||
|
|
||||||
|
@ -109,6 +117,12 @@ func (c *Candidate) campaign(lock store.Locker) {
|
||||||
// Start as a follower.
|
// Start as a follower.
|
||||||
c.update(false)
|
c.update(false)
|
||||||
|
|
||||||
|
lock, err := c.initLock()
|
||||||
|
if err != nil {
|
||||||
|
c.errCh <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
lostCh, err := lock.Lock(nil)
|
lostCh, err := lock.Lock(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.errCh <- err
|
c.errCh <- err
|
||||||
|
|
|
@ -33,18 +33,13 @@ func (f *Follower) Leader() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// FollowElection starts monitoring the election.
|
// FollowElection starts monitoring the election.
|
||||||
func (f *Follower) FollowElection() (<-chan string, <-chan error, error) {
|
func (f *Follower) FollowElection() (<-chan string, <-chan error) {
|
||||||
f.leaderCh = make(chan string)
|
f.leaderCh = make(chan string)
|
||||||
f.errCh = make(chan error)
|
f.errCh = make(chan error)
|
||||||
|
|
||||||
ch, err := f.client.Watch(f.key, f.stopCh)
|
go f.follow()
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
go f.follow(ch)
|
return f.leaderCh, f.errCh
|
||||||
|
|
||||||
return f.leaderCh, f.errCh, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop stops monitoring an election.
|
// Stop stops monitoring an election.
|
||||||
|
@ -52,10 +47,15 @@ func (f *Follower) Stop() {
|
||||||
close(f.stopCh)
|
close(f.stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Follower) follow(ch <-chan *store.KVPair) {
|
func (f *Follower) follow() {
|
||||||
defer close(f.leaderCh)
|
defer close(f.leaderCh)
|
||||||
defer close(f.errCh)
|
defer close(f.errCh)
|
||||||
|
|
||||||
|
ch, err := f.client.Watch(f.key, f.stopCh)
|
||||||
|
if err != nil {
|
||||||
|
f.errCh <- err
|
||||||
|
}
|
||||||
|
|
||||||
f.leader = ""
|
f.leader = ""
|
||||||
for kv := range ch {
|
for kv := range ch {
|
||||||
if kv == nil {
|
if kv == nil {
|
||||||
|
|
|
@ -1,9 +1,7 @@
|
||||||
language: go
|
language: go
|
||||||
|
|
||||||
go:
|
go:
|
||||||
- 1.3
|
- 1.5.3
|
||||||
# - 1.4
|
|
||||||
# see https://github.com/moovweb/gvm/pull/116 for why Go 1.4 is currently disabled
|
|
||||||
|
|
||||||
# let us have speedy Docker-based Travis workers
|
# let us have speedy Docker-based Travis workers
|
||||||
sudo: false
|
sudo: false
|
||||||
|
@ -11,15 +9,14 @@ sudo: false
|
||||||
before_install:
|
before_install:
|
||||||
# Symlink below is needed for Travis CI to work correctly on personal forks of libkv
|
# Symlink below is needed for Travis CI to work correctly on personal forks of libkv
|
||||||
- ln -s $HOME/gopath/src/github.com/${TRAVIS_REPO_SLUG///libkv/} $HOME/gopath/src/github.com/docker
|
- ln -s $HOME/gopath/src/github.com/${TRAVIS_REPO_SLUG///libkv/} $HOME/gopath/src/github.com/docker
|
||||||
- go get golang.org/x/tools/cmd/vet
|
|
||||||
- go get golang.org/x/tools/cmd/cover
|
- go get golang.org/x/tools/cmd/cover
|
||||||
- go get github.com/mattn/goveralls
|
- go get github.com/mattn/goveralls
|
||||||
- go get github.com/golang/lint/golint
|
- go get github.com/golang/lint/golint
|
||||||
- go get github.com/GeertJohan/fgt
|
- go get github.com/GeertJohan/fgt
|
||||||
|
|
||||||
before_script:
|
before_script:
|
||||||
- script/travis_consul.sh 0.6.0
|
- script/travis_consul.sh 0.6.3
|
||||||
- script/travis_etcd.sh 2.2.0
|
- script/travis_etcd.sh 2.2.5
|
||||||
- script/travis_zk.sh 3.5.1-alpha
|
- script/travis_zk.sh 3.5.1-alpha
|
||||||
|
|
||||||
script:
|
script:
|
||||||
|
|
|
@ -176,7 +176,7 @@
|
||||||
|
|
||||||
END OF TERMS AND CONDITIONS
|
END OF TERMS AND CONDITIONS
|
||||||
|
|
||||||
Copyright 2014-2015 Docker, Inc.
|
Copyright 2014-2016 Docker, Inc.
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
|
|
|
@ -3,6 +3,7 @@
|
||||||
[](https://godoc.org/github.com/docker/libkv)
|
[](https://godoc.org/github.com/docker/libkv)
|
||||||
[](https://travis-ci.org/docker/libkv)
|
[](https://travis-ci.org/docker/libkv)
|
||||||
[](https://coveralls.io/r/docker/libkv)
|
[](https://coveralls.io/r/docker/libkv)
|
||||||
|
[](https://goreportcard.com/report/github.com/docker/libkv)
|
||||||
|
|
||||||
`libkv` provides a `Go` native library to store metadata.
|
`libkv` provides a `Go` native library to store metadata.
|
||||||
|
|
||||||
|
@ -10,7 +11,7 @@ The goal of `libkv` is to abstract common store operations for multiple distribu
|
||||||
|
|
||||||
For example, you can use it to store your metadata or for service discovery to register machines and endpoints inside your cluster.
|
For example, you can use it to store your metadata or for service discovery to register machines and endpoints inside your cluster.
|
||||||
|
|
||||||
You can also easily implement a generic *Leader Election* on top of it (see the [swarm/leadership](https://github.com/docker/swarm/tree/master/leadership) package).
|
You can also easily implement a generic *Leader Election* on top of it (see the [docker/leadership](https://github.com/docker/leadership) repository).
|
||||||
|
|
||||||
As of now, `libkv` offers support for `Consul`, `Etcd`, `Zookeeper` (**Distributed** store) and `BoltDB` (**Local** store).
|
As of now, `libkv` offers support for `Consul`, `Etcd`, `Zookeeper` (**Distributed** store) and `BoltDB` (**Local** store).
|
||||||
|
|
||||||
|
@ -30,7 +31,7 @@ You can find examples of usage for `libkv` under in `docs/examples.go`. Optional
|
||||||
|
|
||||||
`libkv` supports:
|
`libkv` supports:
|
||||||
- Consul versions >= `0.5.1` because it uses Sessions with `Delete` behavior for the use of `TTLs` (mimics zookeeper's Ephemeral node support), If you don't plan to use `TTLs`: you can use Consul version `0.4.0+`.
|
- Consul versions >= `0.5.1` because it uses Sessions with `Delete` behavior for the use of `TTLs` (mimics zookeeper's Ephemeral node support), If you don't plan to use `TTLs`: you can use Consul version `0.4.0+`.
|
||||||
- Etcd versions >= `2.0` because it uses the new `coreos/etcd/client`, this might change in the future as the support for `APIv3` comes along and adds mor capabilities.
|
- Etcd versions >= `2.0` because it uses the new `coreos/etcd/client`, this might change in the future as the support for `APIv3` comes along and adds more capabilities.
|
||||||
- Zookeeper versions >= `3.4.5`. Although this might work with previous version but this remains untested as of now.
|
- Zookeeper versions >= `3.4.5`. Although this might work with previous version but this remains untested as of now.
|
||||||
- Boltdb, which shouldn't be subject to any version dependencies.
|
- Boltdb, which shouldn't be subject to any version dependencies.
|
||||||
|
|
||||||
|
@ -83,7 +84,7 @@ Please refer to the `docs/compatibility.md` to see what are the special cases fo
|
||||||
|
|
||||||
Other than those special cases, you should expect the same experience for basic operations like `Get`/`Put`, etc.
|
Other than those special cases, you should expect the same experience for basic operations like `Get`/`Put`, etc.
|
||||||
|
|
||||||
Calls like `WatchTree` may return different events (or number of events) depending on the backend (for now, `Etcd` and `Consul` will likely return more events than `Zookeeper` that you should triage properly). Although you should be able to use it successfully to watch on events in an interchangeable way (see the **swarm/leadership** or **swarm/discovery** packages in **docker/swarm**).
|
Calls like `WatchTree` may return different events (or number of events) depending on the backend (for now, `Etcd` and `Consul` will likely return more events than `Zookeeper` that you should triage properly). Although you should be able to use it successfully to watch on events in an interchangeable way (see the **docker/leadership** repository or the **pkg/discovery/kv** package in **docker/docker**).
|
||||||
|
|
||||||
## TLS
|
## TLS
|
||||||
|
|
||||||
|
@ -103,4 +104,4 @@ Want to hack on libkv? [Docker's contributions guidelines](https://github.com/do
|
||||||
|
|
||||||
##Copyright and license
|
##Copyright and license
|
||||||
|
|
||||||
Copyright © 2014-2015 Docker, Inc. All rights reserved, except as follows. Code is released under the Apache 2.0 license. The README.md file, and files in the "docs" folder are licensed under the Creative Commons Attribution 4.0 International License under the terms and conditions set forth in the file "LICENSE.docs". You may obtain a duplicate copy of the same license, titled CC-BY-SA-4.0, at http://creativecommons.org/licenses/by/4.0/.
|
Copyright © 2014-2016 Docker, Inc. All rights reserved, except as follows. Code is released under the Apache 2.0 license. The README.md file, and files in the "docs" folder are licensed under the Creative Commons Attribution 4.0 International License under the terms and conditions set forth in the file "LICENSE.docs". You may obtain a duplicate copy of the same license, titled CC-BY-SA-4.0, at http://creativecommons.org/licenses/by/4.0/.
|
||||||
|
|
|
@ -25,7 +25,7 @@ var (
|
||||||
}()
|
}()
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewStore creates a an instance of store
|
// NewStore creates an instance of store
|
||||||
func NewStore(backend store.Backend, addrs []string, options *store.Config) (store.Store, error) {
|
func NewStore(backend store.Backend, addrs []string, options *store.Config) (store.Store, error) {
|
||||||
if init, exists := initializers[backend]; exists {
|
if init, exists := initializers[backend]; exists {
|
||||||
return init(addrs, options)
|
return init(addrs, options)
|
||||||
|
|
|
@ -22,6 +22,14 @@ const (
|
||||||
// RenewSessionRetryMax is the number of time we should try
|
// RenewSessionRetryMax is the number of time we should try
|
||||||
// to renew the session before giving up and throwing an error
|
// to renew the session before giving up and throwing an error
|
||||||
RenewSessionRetryMax = 5
|
RenewSessionRetryMax = 5
|
||||||
|
|
||||||
|
// MaxSessionDestroyAttempts is the maximum times we will try
|
||||||
|
// to explicitely destroy the session attached to a lock after
|
||||||
|
// the connectivity to the store has been lost
|
||||||
|
MaxSessionDestroyAttempts = 5
|
||||||
|
|
||||||
|
// defaultLockTTL is the default ttl for the consul lock
|
||||||
|
defaultLockTTL = 20 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -186,6 +194,7 @@ func (s *Consul) Put(key string, value []byte, opts *store.WriteOptions) error {
|
||||||
p := &api.KVPair{
|
p := &api.KVPair{
|
||||||
Key: key,
|
Key: key,
|
||||||
Value: value,
|
Value: value,
|
||||||
|
Flags: api.LockFlagValue,
|
||||||
}
|
}
|
||||||
|
|
||||||
if opts != nil && opts.TTL > 0 {
|
if opts != nil && opts.TTL > 0 {
|
||||||
|
@ -378,12 +387,22 @@ func (s *Consul) NewLock(key string, options *store.LockOptions) (store.Locker,
|
||||||
|
|
||||||
lock := &consulLock{}
|
lock := &consulLock{}
|
||||||
|
|
||||||
|
ttl := defaultLockTTL
|
||||||
|
|
||||||
if options != nil {
|
if options != nil {
|
||||||
// Set optional TTL on Lock
|
// Set optional TTL on Lock
|
||||||
if options.TTL != 0 {
|
if options.TTL != 0 {
|
||||||
|
ttl = options.TTL
|
||||||
|
}
|
||||||
|
// Set optional value on Lock
|
||||||
|
if options.Value != nil {
|
||||||
|
lockOpts.Value = options.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
entry := &api.SessionEntry{
|
entry := &api.SessionEntry{
|
||||||
Behavior: api.SessionBehaviorRelease, // Release the lock when the session expires
|
Behavior: api.SessionBehaviorRelease, // Release the lock when the session expires
|
||||||
TTL: (options.TTL / 2).String(), // Consul multiplies the TTL by 2x
|
TTL: (ttl / 2).String(), // Consul multiplies the TTL by 2x
|
||||||
LockDelay: 1 * time.Millisecond, // Virtually disable lock delay
|
LockDelay: 1 * time.Millisecond, // Virtually disable lock delay
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -393,29 +412,74 @@ func (s *Consul) NewLock(key string, options *store.LockOptions) (store.Locker,
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Place the session on lock
|
// Place the session and renew chan on lock
|
||||||
lockOpts.Session = session
|
lockOpts.Session = session
|
||||||
|
|
||||||
// Renew the session ttl lock periodically
|
|
||||||
go s.client.Session().RenewPeriodic(entry.TTL, session, nil, options.RenewLock)
|
|
||||||
lock.renewCh = options.RenewLock
|
lock.renewCh = options.RenewLock
|
||||||
}
|
|
||||||
|
|
||||||
// Set optional value on Lock
|
|
||||||
if options.Value != nil {
|
|
||||||
lockOpts.Value = options.Value
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
l, err := s.client.LockOpts(lockOpts)
|
l, err := s.client.LockOpts(lockOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Renew the session ttl lock periodically
|
||||||
|
s.renewLockSession(entry.TTL, session, options.RenewLock)
|
||||||
|
|
||||||
lock.lock = l
|
lock.lock = l
|
||||||
return lock, nil
|
return lock, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// renewLockSession is used to renew a session Lock, it takes
|
||||||
|
// a stopRenew chan which is used to explicitely stop the session
|
||||||
|
// renew process. The renew routine never stops until a signal is
|
||||||
|
// sent to this channel. If deleting the session fails because the
|
||||||
|
// connection to the store is lost, it keeps trying to delete the
|
||||||
|
// session periodically until it can contact the store, this ensures
|
||||||
|
// that the lock is not maintained indefinitely which ensures liveness
|
||||||
|
// over safety for the lock when the store becomes unavailable.
|
||||||
|
func (s *Consul) renewLockSession(initialTTL string, id string, stopRenew chan struct{}) {
|
||||||
|
sessionDestroyAttempts := 0
|
||||||
|
ttl, err := time.ParseDuration(initialTTL)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-time.After(ttl / 2):
|
||||||
|
entry, _, err := s.client.Session().Renew(id, nil)
|
||||||
|
if err != nil {
|
||||||
|
// If an error occurs, continue until the
|
||||||
|
// session gets destroyed explicitely or
|
||||||
|
// the session ttl times out
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if entry == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle the server updating the TTL
|
||||||
|
ttl, _ = time.ParseDuration(entry.TTL)
|
||||||
|
|
||||||
|
case <-stopRenew:
|
||||||
|
// Attempt a session destroy
|
||||||
|
_, err := s.client.Session().Destroy(id, nil)
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if sessionDestroyAttempts >= MaxSessionDestroyAttempts {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// We can't destroy the session because the store
|
||||||
|
// is unavailable, wait for the session renew period
|
||||||
|
sessionDestroyAttempts++
|
||||||
|
time.Sleep(ttl / 2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
// Lock attempts to acquire the lock and blocks while
|
// Lock attempts to acquire the lock and blocks while
|
||||||
// doing so. It returns a channel that is closed if our
|
// doing so. It returns a channel that is closed if our
|
||||||
// lock is lost or if an error occurs
|
// lock is lost or if an error occurs
|
||||||
|
@ -436,7 +500,7 @@ func (l *consulLock) Unlock() error {
|
||||||
// modified in the meantime, throws an error if this is the case
|
// modified in the meantime, throws an error if this is the case
|
||||||
func (s *Consul) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
|
func (s *Consul) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
|
||||||
|
|
||||||
p := &api.KVPair{Key: s.normalize(key), Value: value}
|
p := &api.KVPair{Key: s.normalize(key), Value: value, Flags: api.LockFlagValue}
|
||||||
|
|
||||||
if previous == nil {
|
if previous == nil {
|
||||||
// Consul interprets ModifyIndex = 0 as new key.
|
// Consul interprets ModifyIndex = 0 as new key.
|
||||||
|
@ -471,7 +535,7 @@ func (s *Consul) AtomicDelete(key string, previous *store.KVPair) (bool, error)
|
||||||
return false, store.ErrPreviousNotSpecified
|
return false, store.ErrPreviousNotSpecified
|
||||||
}
|
}
|
||||||
|
|
||||||
p := &api.KVPair{Key: s.normalize(key), ModifyIndex: previous.LastIndex}
|
p := &api.KVPair{Key: s.normalize(key), ModifyIndex: previous.LastIndex, Flags: api.LockFlagValue}
|
||||||
|
|
||||||
// Extra Get operation to check on the key
|
// Extra Get operation to check on the key
|
||||||
_, err := s.Get(key)
|
_, err := s.Get(key)
|
||||||
|
|
|
@ -75,6 +75,9 @@ func New(addrs []string, options *store.Config) (store.Store, error) {
|
||||||
if options.ConnectionTimeout != 0 {
|
if options.ConnectionTimeout != 0 {
|
||||||
setTimeout(cfg, options.ConnectionTimeout)
|
setTimeout(cfg, options.ConnectionTimeout)
|
||||||
}
|
}
|
||||||
|
if options.Username != "" {
|
||||||
|
setCredentials(cfg, options.Username, options.Password)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := etcd.New(*cfg)
|
c, err := etcd.New(*cfg)
|
||||||
|
@ -119,6 +122,12 @@ func setTimeout(cfg *etcd.Config, time time.Duration) {
|
||||||
cfg.HeaderTimeoutPerRequest = time
|
cfg.HeaderTimeoutPerRequest = time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setCredentials sets the username/password credentials for connecting to Etcd
|
||||||
|
func setCredentials(cfg *etcd.Config, username, password string) {
|
||||||
|
cfg.Username = username
|
||||||
|
cfg.Password = password
|
||||||
|
}
|
||||||
|
|
||||||
// Normalize the key for usage in Etcd
|
// Normalize the key for usage in Etcd
|
||||||
func (s *Etcd) normalize(key string) string {
|
func (s *Etcd) normalize(key string) string {
|
||||||
key = store.Normalize(key)
|
key = store.Normalize(key)
|
||||||
|
@ -512,15 +521,15 @@ func (l *etcdLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) {
|
||||||
// Wait for the key to be available or for
|
// Wait for the key to be available or for
|
||||||
// a signal to stop trying to lock the key
|
// a signal to stop trying to lock the key
|
||||||
select {
|
select {
|
||||||
case _ = <-free:
|
case <-free:
|
||||||
break
|
break
|
||||||
case err := <-errorCh:
|
case err := <-errorCh:
|
||||||
return nil, err
|
return nil, err
|
||||||
case _ = <-stopChan:
|
case <-stopChan:
|
||||||
return nil, ErrAbortTryLock
|
return nil, ErrAbortTryLock
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete or Expire event occured
|
// Delete or Expire event occurred
|
||||||
// Retry
|
// Retry
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,7 +36,7 @@ var (
|
||||||
// ErrPreviousNotSpecified is thrown when the previous value is not specified for an atomic operation
|
// ErrPreviousNotSpecified is thrown when the previous value is not specified for an atomic operation
|
||||||
ErrPreviousNotSpecified = errors.New("Previous K/V pair should be provided for the Atomic operation")
|
ErrPreviousNotSpecified = errors.New("Previous K/V pair should be provided for the Atomic operation")
|
||||||
// ErrKeyExists is thrown when the previous value exists in the case of an AtomicPut
|
// ErrKeyExists is thrown when the previous value exists in the case of an AtomicPut
|
||||||
ErrKeyExists = errors.New("Previous K/V pair exists, cannnot complete Atomic operation")
|
ErrKeyExists = errors.New("Previous K/V pair exists, cannot complete Atomic operation")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Config contains the options for a storage client
|
// Config contains the options for a storage client
|
||||||
|
@ -46,6 +46,8 @@ type Config struct {
|
||||||
ConnectionTimeout time.Duration
|
ConnectionTimeout time.Duration
|
||||||
Bucket string
|
Bucket string
|
||||||
PersistConnection bool
|
PersistConnection bool
|
||||||
|
Username string
|
||||||
|
Password string
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClientTLSConfig contains data for a Client TLS configuration in the form
|
// ClientTLSConfig contains data for a Client TLS configuration in the form
|
||||||
|
|
|
@ -148,7 +148,7 @@ var (
|
||||||
}
|
}
|
||||||
flLeaderTTL = cli.StringFlag{
|
flLeaderTTL = cli.StringFlag{
|
||||||
Name: "replication-ttl",
|
Name: "replication-ttl",
|
||||||
Value: "15s",
|
Value: "20s",
|
||||||
Usage: "Leader lock release time on failure",
|
Usage: "Leader lock release time on failure",
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
|
@ -164,10 +164,7 @@ func setupReplication(c *cli.Context, cluster cluster.Cluster, server *api.Serve
|
||||||
}
|
}
|
||||||
|
|
||||||
func run(cl cluster.Cluster, candidate *leadership.Candidate, server *api.Server, primary *mux.Router, replica *api.Replica) {
|
func run(cl cluster.Cluster, candidate *leadership.Candidate, server *api.Server, primary *mux.Router, replica *api.Replica) {
|
||||||
electedCh, errCh, err := candidate.RunForElection()
|
electedCh, errCh := candidate.RunForElection()
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var watchdog *cluster.Watchdog
|
var watchdog *cluster.Watchdog
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -190,10 +187,7 @@ func run(cl cluster.Cluster, candidate *leadership.Candidate, server *api.Server
|
||||||
}
|
}
|
||||||
|
|
||||||
func follow(follower *leadership.Follower, replica *api.Replica, addr string) {
|
func follow(follower *leadership.Follower, replica *api.Replica, addr string) {
|
||||||
leaderCh, errCh, err := follower.FollowElection()
|
leaderCh, errCh := follower.FollowElection()
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case leader := <-leaderCh:
|
case leader := <-leaderCh:
|
||||||
|
|
|
@ -3,49 +3,75 @@
|
||||||
load helpers
|
load helpers
|
||||||
|
|
||||||
# Address on which the store will listen
|
# Address on which the store will listen
|
||||||
STORE_HOST=127.0.0.1:8500
|
STORE_HOST_1=127.0.0.1:8500
|
||||||
|
STORE_HOST_2=127.0.0.1:8501
|
||||||
# Discovery parameter for Swarm
|
STORE_HOST_3=127.0.0.1:8502
|
||||||
DISCOVERY="consul://${STORE_HOST}/test"
|
|
||||||
|
|
||||||
# Container name for integration test
|
# Container name for integration test
|
||||||
CONTAINER_NAME=swarm_leader
|
CONTAINER_NAME=swarm_leader
|
||||||
|
|
||||||
function start_store() {
|
# Names for store cluster nodes
|
||||||
docker_host run -v $(pwd)/discovery/consul/config:/config --name=$CONTAINER_NAME -h $CONTAINER_NAME -p $STORE_HOST:8500 -d progrium/consul -server -bootstrap-expect 1 -config-file=/config/consul.json
|
NODE_1="node1"
|
||||||
# Wait a few seconds for the store to come up.
|
NODE_2="node2"
|
||||||
sleep 3
|
NODE_3="node3"
|
||||||
|
|
||||||
|
# Urls of store cluster nodes
|
||||||
|
NODE_1_URL="consul://${STORE_HOST_1}/test"
|
||||||
|
NODE_2_URL="consul://${STORE_HOST_2}/test"
|
||||||
|
NODE_3_URL="consul://${STORE_HOST_3}/test"
|
||||||
|
|
||||||
|
function start_store_cluster() {
|
||||||
|
docker_host run -v $(pwd)/discovery/consul/config:/config --name=$NODE_1 -h $NODE_1 -p $STORE_HOST_1:8500 -d progrium/consul -server -bootstrap-expect 3 -config-file=/config/consul.json
|
||||||
|
|
||||||
|
# Grab node_1 address required for other nodes to join the cluster
|
||||||
|
JOIN_ENDPOINT=$(docker_host inspect -f '{{.NetworkSettings.IPAddress}}' $NODE_1)
|
||||||
|
|
||||||
|
docker_host run -v $(pwd)/discovery/consul/config:/config --name=$NODE_2 -h $NODE_2 -p $STORE_HOST_2:8500 -d progrium/consul -server -join $JOIN_ENDPOINT -config-file=/config/consul.json
|
||||||
|
|
||||||
|
docker_host run -v $(pwd)/discovery/consul/config:/config --name=$NODE_3 -h $NODE_3 -p $STORE_HOST_3:8500 -d progrium/consul -server -join $JOIN_ENDPOINT -config-file=/config/consul.json
|
||||||
|
|
||||||
|
# Wait for the cluster to be available.
|
||||||
|
sleep 2
|
||||||
|
}
|
||||||
|
|
||||||
|
function restart_leader() {
|
||||||
|
# TODO find out who is the leader
|
||||||
|
docker_host restart -t 5 $NODE_1
|
||||||
}
|
}
|
||||||
|
|
||||||
function stop_store() {
|
function stop_store() {
|
||||||
docker_host rm -f -v $CONTAINER_NAME
|
docker_host rm -f -v $CONTAINER_NAME
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function stop_store_cluster() {
|
||||||
|
docker_host rm -f -v $NODE_1 $NODE_2 $NODE_3
|
||||||
|
}
|
||||||
|
|
||||||
function setup() {
|
function setup() {
|
||||||
start_store
|
start_store_cluster
|
||||||
}
|
}
|
||||||
|
|
||||||
function teardown() {
|
function teardown() {
|
||||||
swarm_manage_cleanup
|
swarm_manage_cleanup
|
||||||
swarm_join_cleanup
|
swarm_join_cleanup
|
||||||
stop_docker
|
stop_docker
|
||||||
stop_store
|
stop_store_cluster
|
||||||
}
|
}
|
||||||
|
|
||||||
@test "replication options" {
|
@test "replication options" {
|
||||||
# Bring up one manager
|
# Bring up one manager
|
||||||
# --advertise
|
# --advertise
|
||||||
run swarm manage --replication --replication-ttl "4s" --advertise "" "$DISCOVERY"
|
run swarm manage --replication --replication-ttl "4s" --advertise "" "$NODE_1_URL"
|
||||||
[ "$status" -ne 0 ]
|
[ "$status" -ne 0 ]
|
||||||
[[ "${output}" == *"--advertise address must be provided when using --leader-election"* ]]
|
[[ "${output}" == *"--advertise address must be provided when using --leader-election"* ]]
|
||||||
|
|
||||||
# --advertise
|
# --advertise
|
||||||
run swarm manage --replication --replication-ttl "4s" --advertise 127.0.0.1ab:1bcde "$DISCOVERY"
|
run swarm manage --replication --replication-ttl "4s" --advertise 127.0.0.1ab:1bcde "$NODE_1_URL"
|
||||||
[ "$status" -ne 0 ]
|
[ "$status" -ne 0 ]
|
||||||
[[ "${output}" == *"--advertise should be of the form ip:port or hostname:port"* ]]
|
[[ "${output}" == *"--advertise should be of the form ip:port or hostname:port"* ]]
|
||||||
|
|
||||||
# --replication-ttl
|
# --replication-ttl
|
||||||
run swarm manage --replication --replication-ttl "-20s" --advertise 127.0.0.1:$SWARM_BASE_PORT "$DISCOVERY"
|
run swarm manage --replication --replication-ttl "-20s" --advertise 127.0.0.1:$SWARM_BASE_PORT "$NODE_1_URL"
|
||||||
[ "$status" -ne 0 ]
|
[ "$status" -ne 0 ]
|
||||||
[[ "${output}" == *"--replication-ttl should be a positive number"* ]]
|
[[ "${output}" == *"--replication-ttl should be a positive number"* ]]
|
||||||
}
|
}
|
||||||
|
@ -56,12 +82,12 @@ function teardown() {
|
||||||
local host=127.0.0.1:$port
|
local host=127.0.0.1:$port
|
||||||
|
|
||||||
# Bring up one manager, make sure it becomes primary.
|
# Bring up one manager, make sure it becomes primary.
|
||||||
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$SWARM_BASE_PORT "$DISCOVERY"
|
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$SWARM_BASE_PORT "$NODE_1_URL"
|
||||||
run docker -H ${SWARM_HOSTS[0]} info
|
run docker -H ${SWARM_HOSTS[0]} info
|
||||||
[[ "${output}" == *"Role: primary"* ]]
|
[[ "${output}" == *"Role: primary"* ]]
|
||||||
|
|
||||||
# Fire up a second manager. Ensure it's a replica forwarding to the right primary.
|
# Fire up a second manager. Ensure it's a replica forwarding to the right primary.
|
||||||
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$(($SWARM_BASE_PORT + 1)) "$DISCOVERY"
|
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$(($SWARM_BASE_PORT + 1)) "$NODE_1_URL"
|
||||||
run docker -H ${SWARM_HOSTS[1]} info
|
run docker -H ${SWARM_HOSTS[1]} info
|
||||||
[[ "${output}" == *"Role: replica"* ]]
|
[[ "${output}" == *"Role: replica"* ]]
|
||||||
[[ "${output}" == *"Primary: ${SWARM_HOSTS[0]}"* ]]
|
[[ "${output}" == *"Primary: ${SWARM_HOSTS[0]}"* ]]
|
||||||
|
@ -71,7 +97,7 @@ function teardown() {
|
||||||
retry 20 1 eval "docker -H ${SWARM_HOSTS[1]} info | grep -q 'Role: primary'"
|
retry 20 1 eval "docker -H ${SWARM_HOSTS[1]} info | grep -q 'Role: primary'"
|
||||||
|
|
||||||
# Add a new replica and make sure it sees the new leader as primary.
|
# Add a new replica and make sure it sees the new leader as primary.
|
||||||
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$(($SWARM_BASE_PORT + 2)) "$DISCOVERY"
|
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$(($SWARM_BASE_PORT + 2)) "$NODE_1_URL"
|
||||||
run docker -H ${SWARM_HOSTS[2]} info
|
run docker -H ${SWARM_HOSTS[2]} info
|
||||||
[[ "${output}" == *"Role: replica"* ]]
|
[[ "${output}" == *"Role: replica"* ]]
|
||||||
[[ "${output}" == *"Primary: ${SWARM_HOSTS[1]}"* ]]
|
[[ "${output}" == *"Primary: ${SWARM_HOSTS[1]}"* ]]
|
||||||
|
@ -92,15 +118,15 @@ function containerRunning() {
|
||||||
local host=127.0.0.1:$port
|
local host=127.0.0.1:$port
|
||||||
|
|
||||||
start_docker_with_busybox 2
|
start_docker_with_busybox 2
|
||||||
swarm_join "$DISCOVERY"
|
swarm_join "$NODE_1_URL"
|
||||||
|
|
||||||
# Bring up one manager, make sure it becomes primary.
|
# Bring up one manager, make sure it becomes primary.
|
||||||
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$SWARM_BASE_PORT --engine-refresh-min-interval=1s --engine-refresh-max-interval=1s --engine-failure-retry=1 "$DISCOVERY"
|
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$SWARM_BASE_PORT --engine-refresh-min-interval=1s --engine-refresh-max-interval=1s --engine-failure-retry=1 "$NODE_1_URL"
|
||||||
run docker -H ${SWARM_HOSTS[0]} info
|
run docker -H ${SWARM_HOSTS[0]} info
|
||||||
[[ "${output}" == *"Role: primary"* ]]
|
[[ "${output}" == *"Role: primary"* ]]
|
||||||
|
|
||||||
# Fire up a second manager. Ensure it's a replica forwarding to the right primary.
|
# Fire up a second manager. Ensure it's a replica forwarding to the right primary.
|
||||||
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$(($SWARM_BASE_PORT + 1)) --engine-refresh-min-interval=1s --engine-refresh-max-interval=1s --engine-failure-retry=1 "$DISCOVERY"
|
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$(($SWARM_BASE_PORT + 1)) --engine-refresh-min-interval=1s --engine-refresh-max-interval=1s --engine-failure-retry=1 "$NODE_1_URL"
|
||||||
run docker -H ${SWARM_HOSTS[1]} info
|
run docker -H ${SWARM_HOSTS[1]} info
|
||||||
[[ "${output}" == *"Role: replica"* ]]
|
[[ "${output}" == *"Role: replica"* ]]
|
||||||
[[ "${output}" == *"Primary: ${SWARM_HOSTS[0]}"* ]]
|
[[ "${output}" == *"Primary: ${SWARM_HOSTS[0]}"* ]]
|
||||||
|
@ -156,30 +182,84 @@ function containerRunning() {
|
||||||
|
|
||||||
@test "leader election - store failure" {
|
@test "leader election - store failure" {
|
||||||
# Bring up one manager, make sure it becomes primary.
|
# Bring up one manager, make sure it becomes primary.
|
||||||
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$SWARM_BASE_PORT "$DISCOVERY"
|
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$SWARM_BASE_PORT "$NODE_1_URL"
|
||||||
run docker -H ${SWARM_HOSTS[0]} info
|
run docker -H ${SWARM_HOSTS[0]} info
|
||||||
[[ "${output}" == *"Role: primary"* ]]
|
[[ "${output}" == *"Role: primary"* ]]
|
||||||
|
|
||||||
# Fire up a second manager. Ensure it's a replica forwarding to the right primary.
|
# Fire up a second manager. Ensure it's a replica forwarding to the right primary.
|
||||||
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$(($SWARM_BASE_PORT + 1)) "$DISCOVERY"
|
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$(($SWARM_BASE_PORT + 1)) "$NODE_1_URL"
|
||||||
run docker -H ${SWARM_HOSTS[1]} info
|
run docker -H ${SWARM_HOSTS[1]} info
|
||||||
[[ "${output}" == *"Role: replica"* ]]
|
[[ "${output}" == *"Role: replica"* ]]
|
||||||
[[ "${output}" == *"Primary: ${SWARM_HOSTS[0]}"* ]]
|
[[ "${output}" == *"Primary: ${SWARM_HOSTS[0]}"* ]]
|
||||||
|
|
||||||
# Fire up a third manager. Ensure it's a replica forwarding to the right primary.
|
# Fire up a third manager. Ensure it's a replica forwarding to the right primary.
|
||||||
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$(($SWARM_BASE_PORT + 2)) "$DISCOVERY"
|
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$(($SWARM_BASE_PORT + 2)) "$NODE_1_URL"
|
||||||
run docker -H ${SWARM_HOSTS[2]} info
|
run docker -H ${SWARM_HOSTS[2]} info
|
||||||
[[ "${output}" == *"Role: replica"* ]]
|
[[ "${output}" == *"Role: replica"* ]]
|
||||||
[[ "${output}" == *"Primary: ${SWARM_HOSTS[0]}"* ]]
|
[[ "${output}" == *"Primary: ${SWARM_HOSTS[0]}"* ]]
|
||||||
|
|
||||||
# Stop and start the store holding the leader metadata
|
# Stop and start the store holding the leader metadata
|
||||||
stop_store
|
stop_store_cluster
|
||||||
sleep 3
|
sleep 3
|
||||||
start_store
|
start_store_cluster
|
||||||
|
|
||||||
# Wait a little bit for the re-election to occur
|
# Wait a little bit for the re-election to occur
|
||||||
# This is specific to Consul (liveness over safety)
|
# This is specific to Consul (liveness over safety)
|
||||||
sleep 6
|
sleep 10
|
||||||
|
|
||||||
|
# Make sure the managers are either in the 'primary' or the 'replica' state.
|
||||||
|
for host in "${SWARM_HOSTS[@]}"; do
|
||||||
|
retry 120 1 eval "docker -H ${host} info | grep -Eq 'Role: primary|Role: replica'"
|
||||||
|
done
|
||||||
|
|
||||||
|
# Find out which one of the node is the Primary and
|
||||||
|
# the ones that are Replicas after the store failure
|
||||||
|
primary=${SWARM_HOSTS[0]}
|
||||||
|
declare -a replicas
|
||||||
|
i=0
|
||||||
|
for host in "${SWARM_HOSTS[@]}"; do
|
||||||
|
run docker -H $host info
|
||||||
|
if [[ "${output}" == *"Role: primary"* ]]; then
|
||||||
|
primary=$host
|
||||||
|
else
|
||||||
|
replicas[$((i=i+1))]=$host
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
# Check if we have indeed 2 replicas
|
||||||
|
[[ "${#replicas[@]}" -eq 2 ]]
|
||||||
|
|
||||||
|
# Check if the replicas are pointing to the right Primary
|
||||||
|
for host in "${replicas[@]}"; do
|
||||||
|
run docker -H $host info
|
||||||
|
[[ "${output}" == *"Primary: ${primary}"* ]]
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
@test "leader election - dispatched discovery urls - leader failure" {
|
||||||
|
# Bring up one manager, make sure it becomes primary.
|
||||||
|
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$SWARM_BASE_PORT "$NODE_1_URL"
|
||||||
|
run docker -H ${SWARM_HOSTS[0]} info
|
||||||
|
[[ "${output}" == *"Role: primary"* ]]
|
||||||
|
|
||||||
|
# Fire up a second manager. Ensure it's a replica forwarding to the right primary.
|
||||||
|
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$(($SWARM_BASE_PORT + 1)) "$NODE_2_URL"
|
||||||
|
run docker -H ${SWARM_HOSTS[1]} info
|
||||||
|
[[ "${output}" == *"Role: replica"* ]]
|
||||||
|
[[ "${output}" == *"Primary: ${SWARM_HOSTS[0]}"* ]]
|
||||||
|
|
||||||
|
# Fire up a third manager. Ensure it's a replica forwarding to the right primary.
|
||||||
|
swarm_manage --replication --replication-ttl "4s" --advertise 127.0.0.1:$(($SWARM_BASE_PORT + 2)) "$NODE_3_URL"
|
||||||
|
run docker -H ${SWARM_HOSTS[2]} info
|
||||||
|
[[ "${output}" == *"Role: replica"* ]]
|
||||||
|
[[ "${output}" == *"Primary: ${SWARM_HOSTS[0]}"* ]]
|
||||||
|
|
||||||
|
# Stop and start the store leader
|
||||||
|
restart_leader
|
||||||
|
|
||||||
|
# Wait a little bit for the re-election to occur
|
||||||
|
# This is specific to Consul (liveness over safety)
|
||||||
|
sleep 15
|
||||||
|
|
||||||
# Make sure the managers are either in the 'primary' or the 'replica' state.
|
# Make sure the managers are either in the 'primary' or the 'replica' state.
|
||||||
for host in "${SWARM_HOSTS[@]}"; do
|
for host in "${SWARM_HOSTS[@]}"; do
|
||||||
|
|
Loading…
Reference in New Issue