mirror of https://github.com/docker/docs.git
Merge pull request #801 from abronan/add_ephemeral_support
store: Add node Ephemeral support
This commit is contained in:
commit
71a5fd944a
|
|
@ -15,8 +15,6 @@ import (
|
|||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
|
||||
)
|
||||
|
||||
// See SetConsistency for how to use these constants.
|
||||
|
|
@ -44,10 +42,17 @@ type Config struct {
|
|||
Consistency string `json:"consistency"`
|
||||
}
|
||||
|
||||
type credentials struct {
|
||||
username string
|
||||
password string
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
config Config `json:"config"`
|
||||
cluster *Cluster `json:"cluster"`
|
||||
httpClient *http.Client
|
||||
credentials *credentials
|
||||
transport *http.Transport
|
||||
persistence io.Writer
|
||||
cURLch chan string
|
||||
// CheckRetry can be used to control the policy for failed requests
|
||||
|
|
@ -172,17 +177,27 @@ func NewClientFromReader(reader io.Reader) (*Client, error) {
|
|||
// Override the Client's HTTP Transport object
|
||||
func (c *Client) SetTransport(tr *http.Transport) {
|
||||
c.httpClient.Transport = tr
|
||||
c.transport = tr
|
||||
}
|
||||
|
||||
func (c *Client) SetCredentials(username, password string) {
|
||||
c.credentials = &credentials{username, password}
|
||||
}
|
||||
|
||||
func (c *Client) Close() {
|
||||
c.transport.DisableKeepAlives = true
|
||||
c.transport.CloseIdleConnections()
|
||||
}
|
||||
|
||||
// initHTTPClient initializes a HTTP client for etcd client
|
||||
func (c *Client) initHTTPClient() {
|
||||
tr := &http.Transport{
|
||||
c.transport = &http.Transport{
|
||||
Dial: c.dial,
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
}
|
||||
c.httpClient = &http.Client{Transport: tr}
|
||||
c.httpClient = &http.Client{Transport: c.transport}
|
||||
}
|
||||
|
||||
// initHTTPClient initializes a HTTPS client for etcd client
|
||||
|
|
@ -305,31 +320,49 @@ func (c *Client) internalSyncCluster(machines []string) bool {
|
|||
continue
|
||||
}
|
||||
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
// try another machine in the cluster
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK { // fall-back to old endpoint
|
||||
httpPath := c.createHttpPath(machine, path.Join(version, "machines"))
|
||||
resp, err := c.httpClient.Get(httpPath)
|
||||
if err != nil {
|
||||
// try another machine in the cluster
|
||||
continue
|
||||
}
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
// try another machine in the cluster
|
||||
continue
|
||||
}
|
||||
// update Machines List
|
||||
c.cluster.updateFromStr(string(b))
|
||||
} else {
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
// try another machine in the cluster
|
||||
continue
|
||||
}
|
||||
|
||||
var mCollection httptypes.MemberCollection
|
||||
if err := json.Unmarshal(b, &mCollection); err != nil {
|
||||
// try another machine
|
||||
continue
|
||||
}
|
||||
var mCollection memberCollection
|
||||
if err := json.Unmarshal(b, &mCollection); err != nil {
|
||||
// try another machine
|
||||
continue
|
||||
}
|
||||
|
||||
urls := make([]string, 0)
|
||||
for _, m := range mCollection {
|
||||
urls = append(urls, m.ClientURLs...)
|
||||
}
|
||||
urls := make([]string, 0)
|
||||
for _, m := range mCollection {
|
||||
urls = append(urls, m.ClientURLs...)
|
||||
}
|
||||
|
||||
// update Machines List
|
||||
c.cluster.updateFromStr(strings.Join(urls, ","))
|
||||
// update Machines List
|
||||
c.cluster.updateFromStr(strings.Join(urls, ","))
|
||||
}
|
||||
|
||||
logger.Debug("sync.machines ", c.cluster.Machines)
|
||||
c.saveConfig()
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -94,3 +94,15 @@ func TestPersistence(t *testing.T) {
|
|||
t.Fatalf("The two configs should be equal!")
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientRetry(t *testing.T) {
|
||||
c := NewClient([]string{"http://strange", "http://127.0.0.1:4001"})
|
||||
// use first endpoint as the picked url
|
||||
c.cluster.picked = 0
|
||||
if _, err := c.Set("foo", "bar", 5); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := c.Delete("foo", true); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,5 +30,8 @@ func (cl *Cluster) pick() string { return cl.Machines[cl.picked] }
|
|||
|
||||
func (cl *Cluster) updateFromStr(machines string) {
|
||||
cl.Machines = strings.Split(machines, ",")
|
||||
cl.picked = rand.Intn(len(machines))
|
||||
for i := range cl.Machines {
|
||||
cl.Machines[i] = strings.TrimSpace(cl.Machines[i])
|
||||
}
|
||||
cl.picked = rand.Intn(len(cl.Machines))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,7 +6,8 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
ErrCodeEtcdNotReachable = 501
|
||||
ErrCodeEtcdNotReachable = 501
|
||||
ErrCodeUnhandledHTTPStatus = 502
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
|
|||
|
|
@ -0,0 +1,30 @@
|
|||
package etcd
|
||||
|
||||
import "encoding/json"
|
||||
|
||||
type Member struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
PeerURLs []string `json:"peerURLs"`
|
||||
ClientURLs []string `json:"clientURLs"`
|
||||
}
|
||||
|
||||
type memberCollection []Member
|
||||
|
||||
func (c *memberCollection) UnmarshalJSON(data []byte) error {
|
||||
d := struct {
|
||||
Members []Member
|
||||
}{}
|
||||
|
||||
if err := json.Unmarshal(data, &d); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if d.Members == nil {
|
||||
*c = make([]Member, 0)
|
||||
return nil
|
||||
}
|
||||
|
||||
*c = d.Members
|
||||
return nil
|
||||
}
|
||||
71
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member_test.go
generated
vendored
Normal file
71
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member_test.go
generated
vendored
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
package etcd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestMemberCollectionUnmarshal(t *testing.T) {
|
||||
tests := []struct {
|
||||
body []byte
|
||||
want memberCollection
|
||||
}{
|
||||
{
|
||||
body: []byte(`{"members":[]}`),
|
||||
want: memberCollection([]Member{}),
|
||||
},
|
||||
{
|
||||
body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
|
||||
want: memberCollection(
|
||||
[]Member{
|
||||
{
|
||||
ID: "2745e2525fce8fe",
|
||||
Name: "node3",
|
||||
PeerURLs: []string{
|
||||
"http://127.0.0.1:7003",
|
||||
},
|
||||
ClientURLs: []string{
|
||||
"http://127.0.0.1:4003",
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "42134f434382925",
|
||||
Name: "node1",
|
||||
PeerURLs: []string{
|
||||
"http://127.0.0.1:2380",
|
||||
"http://127.0.0.1:7001",
|
||||
},
|
||||
ClientURLs: []string{
|
||||
"http://127.0.0.1:2379",
|
||||
"http://127.0.0.1:4001",
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "94088180e21eb87b",
|
||||
Name: "node2",
|
||||
PeerURLs: []string{
|
||||
"http://127.0.0.1:7002",
|
||||
},
|
||||
ClientURLs: []string{
|
||||
"http://127.0.0.1:4002",
|
||||
},
|
||||
},
|
||||
},
|
||||
),
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
var got memberCollection
|
||||
err := json.Unmarshal(tt.body, &got)
|
||||
if err != nil {
|
||||
t.Errorf("#%d: unexpected error: %v", i, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(tt.want, got) {
|
||||
t.Errorf("#%d: incorrect output: want=%#v, got=%#v", i, tt.want, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -188,7 +188,10 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|||
|
||||
logger.Debug("Connecting to etcd: attempt ", attempt+1, " for ", rr.RelativePath)
|
||||
|
||||
httpPath = c.getHttpPath(rr.RelativePath)
|
||||
// get httpPath if not set
|
||||
if httpPath == "" {
|
||||
httpPath = c.getHttpPath(rr.RelativePath)
|
||||
}
|
||||
|
||||
// Return a cURL command if curlChan is set
|
||||
if c.cURLch != nil {
|
||||
|
|
@ -196,6 +199,9 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|||
for key, value := range rr.Values {
|
||||
command += fmt.Sprintf(" -d %s=%s", key, value[0])
|
||||
}
|
||||
if c.credentials != nil {
|
||||
command += fmt.Sprintf(" -u %s", c.credentials.username)
|
||||
}
|
||||
c.sendCURL(command)
|
||||
}
|
||||
|
||||
|
|
@ -225,7 +231,13 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if c.credentials != nil {
|
||||
req.SetBasicAuth(c.credentials.username, c.credentials.password)
|
||||
}
|
||||
|
||||
resp, err = c.httpClient.Do(req)
|
||||
// clear previous httpPath
|
||||
httpPath = ""
|
||||
defer func() {
|
||||
if resp != nil {
|
||||
resp.Body.Close()
|
||||
|
|
@ -280,6 +292,19 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|||
}
|
||||
}
|
||||
|
||||
if resp.StatusCode == http.StatusTemporaryRedirect {
|
||||
u, err := resp.Location()
|
||||
|
||||
if err != nil {
|
||||
logger.Warning(err)
|
||||
} else {
|
||||
// set httpPath for following redirection
|
||||
httpPath = u.String()
|
||||
}
|
||||
resp.Body.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
if checkErr := checkRetry(c.cluster, numReqs, *resp,
|
||||
errors.New("Unexpected HTTP status code")); checkErr != nil {
|
||||
return nil, checkErr
|
||||
|
|
@ -302,21 +327,40 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|||
func DefaultCheckRetry(cluster *Cluster, numReqs int, lastResp http.Response,
|
||||
err error) error {
|
||||
|
||||
if numReqs >= 2*len(cluster.Machines) {
|
||||
return newError(ErrCodeEtcdNotReachable,
|
||||
"Tried to connect to each peer twice and failed", 0)
|
||||
if numReqs > 2*len(cluster.Machines) {
|
||||
errStr := fmt.Sprintf("failed to propose on members %v twice [last error: %v]", cluster.Machines, err)
|
||||
return newError(ErrCodeEtcdNotReachable, errStr, 0)
|
||||
}
|
||||
|
||||
code := lastResp.StatusCode
|
||||
if code == http.StatusInternalServerError {
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
|
||||
if isEmptyResponse(lastResp) {
|
||||
// always retry if it failed to get response from one machine
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Warning("bad response status code", code)
|
||||
if !shouldRetry(lastResp) {
|
||||
body := []byte("nil")
|
||||
if lastResp.Body != nil {
|
||||
if b, err := ioutil.ReadAll(lastResp.Body); err == nil {
|
||||
body = b
|
||||
}
|
||||
}
|
||||
errStr := fmt.Sprintf("unhandled http status [%s] with body [%s]", http.StatusText(lastResp.StatusCode), body)
|
||||
return newError(ErrCodeUnhandledHTTPStatus, errStr, 0)
|
||||
}
|
||||
// sleep some time and expect leader election finish
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
logger.Warning("bad response status code", lastResp.StatusCode)
|
||||
return nil
|
||||
}
|
||||
|
||||
func isEmptyResponse(r http.Response) bool { return r.StatusCode == 0 }
|
||||
|
||||
// shouldRetry returns whether the reponse deserves retry.
|
||||
func shouldRetry(r http.Response) bool {
|
||||
// TODO: only retry when the cluster is in leader election
|
||||
// We cannot do it exactly because etcd doesn't support it well.
|
||||
return r.StatusCode == http.StatusInternalServerError
|
||||
}
|
||||
|
||||
func (c *Client) getHttpPath(s ...string) string {
|
||||
fullPath := c.cluster.pick() + "/" + version
|
||||
for _, seg := range s {
|
||||
|
|
|
|||
|
|
@ -1,3 +1,6 @@
|
|||
package etcd
|
||||
|
||||
const version = "v2"
|
||||
const (
|
||||
version = "v2"
|
||||
packageVersion = "v2.0.0+git"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ func Run() {
|
|||
log.Fatalf("the `create` command takes no arguments. See '%s create --help'.", c.App.Name)
|
||||
}
|
||||
discovery := &token.Discovery{}
|
||||
discovery.Initialize("", 0)
|
||||
discovery.Initialize("", 0, 0)
|
||||
token, err := discovery.CreateCluster()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
|
@ -88,7 +88,7 @@ func Run() {
|
|||
log.Fatalf("invalid --timeout: %v", err)
|
||||
}
|
||||
|
||||
d, err := discovery.New(dflag, timeout)
|
||||
d, err := discovery.New(dflag, timeout, 0)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
|
@ -122,8 +122,8 @@ func Run() {
|
|||
{
|
||||
Name: "join",
|
||||
ShortName: "j",
|
||||
Usage: "Join a docker cluster",
|
||||
Flags: []cli.Flag{flAddr, flHeartBeat},
|
||||
Usage: "join a docker cluster",
|
||||
Flags: []cli.Flag{flAddr, flHeartBeat, flTTL},
|
||||
Action: join,
|
||||
},
|
||||
}
|
||||
|
|
|
|||
|
|
@ -49,10 +49,15 @@ var (
|
|||
EnvVar: "SWARM_HOST",
|
||||
}
|
||||
flHeartBeat = cli.StringFlag{
|
||||
Name: "heartbeat, hb",
|
||||
Value: "25s",
|
||||
Name: "heartbeat",
|
||||
Value: "20s",
|
||||
Usage: "period between each heartbeat",
|
||||
}
|
||||
flTTL = cli.StringFlag{
|
||||
Name: "ttl",
|
||||
Value: "60s",
|
||||
Usage: "sets the expiration of an ephemeral node",
|
||||
}
|
||||
flTimeout = cli.StringFlag{
|
||||
Name: "timeout",
|
||||
Value: "10s",
|
||||
|
|
|
|||
|
|
@ -27,7 +27,14 @@ func join(c *cli.Context) {
|
|||
if hb < 1*time.Second {
|
||||
log.Fatal("--heartbeat should be at least one second")
|
||||
}
|
||||
d, err := discovery.New(dflag, hb)
|
||||
ttl, err := time.ParseDuration(c.String("ttl"))
|
||||
if err != nil {
|
||||
log.Fatalf("invalid --ttl: %v", err)
|
||||
}
|
||||
if ttl <= hb {
|
||||
log.Fatal("--ttl must be strictly superior to the heartbeat value")
|
||||
}
|
||||
d, err := discovery.New(dflag, hb, ttl)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *t
|
|||
}
|
||||
|
||||
// Set up discovery.
|
||||
cluster.discovery, err = discovery.New(dflag, heartbeat)
|
||||
cluster.discovery, err = discovery.New(dflag, heartbeat, 0)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -84,8 +84,8 @@ func (e Entries) Diff(cmp Entries) (Entries, Entries) {
|
|||
// The Discovery interface is implemented by Discovery backends which
|
||||
// manage swarm host entries.
|
||||
type Discovery interface {
|
||||
// Initialize the discovery with URIs and a heartbeat.
|
||||
Initialize(string, time.Duration) error
|
||||
// Initialize the discovery with URIs, a heartbeat and a ttl.
|
||||
Initialize(string, time.Duration, time.Duration) error
|
||||
|
||||
// Watch the discovery for entry changes.
|
||||
// Returns a channel that will receive changes or an error.
|
||||
|
|
@ -131,14 +131,14 @@ func parse(rawurl string) (string, string) {
|
|||
return parts[0], parts[1]
|
||||
}
|
||||
|
||||
// New returns a new Discovery given a URL and heartbeat settings.
|
||||
// New returns a new Discovery given a URL, heartbeat and ttl settings.
|
||||
// Returns an error if the URL scheme is not supported.
|
||||
func New(rawurl string, heartbeat time.Duration) (Discovery, error) {
|
||||
func New(rawurl string, heartbeat time.Duration, ttl time.Duration) (Discovery, error) {
|
||||
scheme, uri := parse(rawurl)
|
||||
|
||||
if discovery, exists := discoveries[scheme]; exists {
|
||||
log.WithFields(log.Fields{"name": scheme, "uri": uri}).Debug("Initializing discovery service")
|
||||
err := discovery.Initialize(uri, heartbeat)
|
||||
err := discovery.Initialize(uri, heartbeat, ttl)
|
||||
return discovery, err
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ func init() {
|
|||
}
|
||||
|
||||
// Initialize is exported
|
||||
func (s *Discovery) Initialize(path string, heartbeat time.Duration) error {
|
||||
func (s *Discovery) Initialize(path string, heartbeat time.Duration, ttl time.Duration) error {
|
||||
s.path = path
|
||||
s.heartbeat = heartbeat
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -11,12 +11,12 @@ import (
|
|||
|
||||
func TestInitialize(t *testing.T) {
|
||||
d := &Discovery{}
|
||||
d.Initialize("/path/to/file", 0)
|
||||
d.Initialize("/path/to/file", 0, 0)
|
||||
assert.Equal(t, d.path, "/path/to/file")
|
||||
}
|
||||
|
||||
func TestNew(t *testing.T) {
|
||||
d, err := discovery.New("file:///path/to/file", 0)
|
||||
d, err := discovery.New("file:///path/to/file", 0, 0)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, d.(*Discovery).path, "/path/to/file")
|
||||
}
|
||||
|
|
@ -73,7 +73,7 @@ func TestWatch(t *testing.T) {
|
|||
|
||||
// Set up file discovery.
|
||||
d := &Discovery{}
|
||||
d.Initialize(tmp.Name(), 1)
|
||||
d.Initialize(tmp.Name(), 1, 0)
|
||||
stopCh := make(chan struct{})
|
||||
ch, errCh := d.Watch(stopCh)
|
||||
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ type Discovery struct {
|
|||
backend store.Backend
|
||||
store store.Store
|
||||
heartbeat time.Duration
|
||||
ttl time.Duration
|
||||
prefix string
|
||||
}
|
||||
|
||||
|
|
@ -26,7 +27,7 @@ func init() {
|
|||
}
|
||||
|
||||
// Initialize is exported
|
||||
func (s *Discovery) Initialize(uris string, heartbeat time.Duration) error {
|
||||
func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Duration) error {
|
||||
var (
|
||||
parts = strings.SplitN(uris, "/", 2)
|
||||
ips = strings.Split(parts[0], ",")
|
||||
|
|
@ -43,6 +44,7 @@ func (s *Discovery) Initialize(uris string, heartbeat time.Duration) error {
|
|||
}
|
||||
|
||||
s.heartbeat = heartbeat
|
||||
s.ttl = ttl
|
||||
s.prefix = parts[1]
|
||||
|
||||
// Creates a new store, will ignore options given
|
||||
|
|
@ -51,7 +53,7 @@ func (s *Discovery) Initialize(uris string, heartbeat time.Duration) error {
|
|||
s.backend,
|
||||
addrs,
|
||||
&store.Config{
|
||||
Timeout: s.heartbeat,
|
||||
EphemeralTTL: s.ttl,
|
||||
},
|
||||
)
|
||||
return err
|
||||
|
|
@ -121,5 +123,6 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c
|
|||
|
||||
// Register is exported
|
||||
func (s *Discovery) Register(addr string) error {
|
||||
return s.store.Put(path.Join(s.prefix, addr), []byte(addr))
|
||||
opts := &store.WriteOptions{Ephemeral: true, Heartbeat: s.heartbeat}
|
||||
return s.store.Put(path.Join(s.prefix, addr), []byte(addr), opts)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,17 +13,17 @@ import (
|
|||
|
||||
func TestInitialize(t *testing.T) {
|
||||
d := &Discovery{backend: store.MOCK}
|
||||
assert.EqualError(t, d.Initialize("127.0.0.1", 0), "invalid format \"127.0.0.1\", missing <path>")
|
||||
assert.EqualError(t, d.Initialize("127.0.0.1", 0, 0), "invalid format \"127.0.0.1\", missing <path>")
|
||||
|
||||
d = &Discovery{backend: store.MOCK}
|
||||
assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0))
|
||||
assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0, 0))
|
||||
s := d.store.(*store.Mock)
|
||||
assert.Len(t, s.Endpoints, 1)
|
||||
assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234")
|
||||
assert.Equal(t, d.prefix, "path")
|
||||
|
||||
d = &Discovery{backend: store.MOCK}
|
||||
assert.NoError(t, d.Initialize("127.0.0.1:1234,127.0.0.2:1234,127.0.0.3:1234/path", 0))
|
||||
assert.NoError(t, d.Initialize("127.0.0.1:1234,127.0.0.2:1234,127.0.0.3:1234/path", 0, 0))
|
||||
s = d.store.(*store.Mock)
|
||||
assert.Len(t, s.Endpoints, 3)
|
||||
assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234")
|
||||
|
|
@ -34,7 +34,7 @@ func TestInitialize(t *testing.T) {
|
|||
|
||||
func TestWatch(t *testing.T) {
|
||||
d := &Discovery{backend: store.MOCK}
|
||||
assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0))
|
||||
assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0, 0))
|
||||
s := d.store.(*store.Mock)
|
||||
|
||||
mockCh := make(chan []*store.KVPair)
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ func init() {
|
|||
}
|
||||
|
||||
// Initialize is exported
|
||||
func (s *Discovery) Initialize(uris string, _ time.Duration) error {
|
||||
func (s *Discovery) Initialize(uris string, _ time.Duration, _ time.Duration) error {
|
||||
for _, input := range strings.Split(uris, ",") {
|
||||
for _, ip := range discovery.Generate(input) {
|
||||
entry, err := discovery.NewEntry(ip)
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import (
|
|||
|
||||
func TestInitialize(t *testing.T) {
|
||||
d := &Discovery{}
|
||||
d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0)
|
||||
d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0, 0)
|
||||
assert.Equal(t, len(d.entries), 2)
|
||||
assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111")
|
||||
assert.Equal(t, d.entries[1].String(), "2.2.2.2:2222")
|
||||
|
|
@ -17,7 +17,7 @@ func TestInitialize(t *testing.T) {
|
|||
|
||||
func TestInitializeWithPattern(t *testing.T) {
|
||||
d := &Discovery{}
|
||||
d.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0)
|
||||
d.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0, 0)
|
||||
assert.Equal(t, len(d.entries), 5)
|
||||
assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111")
|
||||
assert.Equal(t, d.entries[1].String(), "1.1.1.2:1111")
|
||||
|
|
@ -28,7 +28,7 @@ func TestInitializeWithPattern(t *testing.T) {
|
|||
|
||||
func TestWatch(t *testing.T) {
|
||||
d := &Discovery{}
|
||||
d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0)
|
||||
d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0, 0)
|
||||
expected := discovery.Entries{
|
||||
&discovery.Entry{Host: "1.1.1.1", Port: "1111"},
|
||||
&discovery.Entry{Host: "2.2.2.2", Port: "2222"},
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ const DiscoveryURL = "https://discovery-stage.hub.docker.com/v1"
|
|||
// Discovery is exported
|
||||
type Discovery struct {
|
||||
heartbeat time.Duration
|
||||
ttl time.Duration
|
||||
url string
|
||||
token string
|
||||
}
|
||||
|
|
@ -27,7 +28,7 @@ func init() {
|
|||
}
|
||||
|
||||
// Initialize is exported
|
||||
func (s *Discovery) Initialize(urltoken string, heartbeat time.Duration) error {
|
||||
func (s *Discovery) Initialize(urltoken string, heartbeat time.Duration, ttl time.Duration) error {
|
||||
if i := strings.LastIndex(urltoken, "/"); i != -1 {
|
||||
s.url = "https://" + urltoken[:i]
|
||||
s.token = urltoken[i+1:]
|
||||
|
|
@ -40,6 +41,7 @@ func (s *Discovery) Initialize(urltoken string, heartbeat time.Duration) error {
|
|||
return errors.New("token is empty")
|
||||
}
|
||||
s.heartbeat = heartbeat
|
||||
s.ttl = ttl
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,17 +10,17 @@ import (
|
|||
|
||||
func TestInitialize(t *testing.T) {
|
||||
discovery := &Discovery{}
|
||||
err := discovery.Initialize("token", 0)
|
||||
err := discovery.Initialize("token", 0, 0)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, discovery.token, "token")
|
||||
assert.Equal(t, discovery.url, DiscoveryURL)
|
||||
|
||||
err = discovery.Initialize("custom/path/token", 0)
|
||||
err = discovery.Initialize("custom/path/token", 0, 0)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, discovery.token, "token")
|
||||
assert.Equal(t, discovery.url, "https://custom/path")
|
||||
|
||||
err = discovery.Initialize("", 0)
|
||||
err = discovery.Initialize("", 0, 0)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"crypto/tls"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
|
|
@ -19,8 +20,11 @@ const (
|
|||
|
||||
// Consul embeds the client and watches
|
||||
type Consul struct {
|
||||
config *api.Config
|
||||
client *api.Client
|
||||
sync.Mutex
|
||||
config *api.Config
|
||||
client *api.Client
|
||||
ephemeralTTL time.Duration
|
||||
ephemeralSession string
|
||||
}
|
||||
|
||||
type consulLock struct {
|
||||
|
|
@ -39,12 +43,17 @@ func InitializeConsul(endpoints []string, options *Config) (Store, error) {
|
|||
config.Address = endpoints[0]
|
||||
config.Scheme = "http"
|
||||
|
||||
if options.TLS != nil {
|
||||
s.setTLS(options.TLS)
|
||||
}
|
||||
|
||||
if options.Timeout != 0 {
|
||||
s.setTimeout(options.Timeout)
|
||||
// Set options
|
||||
if options != nil {
|
||||
if options.TLS != nil {
|
||||
s.setTLS(options.TLS)
|
||||
}
|
||||
if options.ConnectionTimeout != 0 {
|
||||
s.setTimeout(options.ConnectionTimeout)
|
||||
}
|
||||
if options.EphemeralTTL != 0 {
|
||||
s.setEphemeralTTL(options.EphemeralTTL)
|
||||
}
|
||||
}
|
||||
|
||||
// Creates a new client
|
||||
|
|
@ -71,6 +80,31 @@ func (s *Consul) setTimeout(time time.Duration) {
|
|||
s.config.WaitTime = time
|
||||
}
|
||||
|
||||
// SetEphemeralTTL sets the ttl for ephemeral nodes
|
||||
func (s *Consul) setEphemeralTTL(time time.Duration) {
|
||||
s.ephemeralTTL = time
|
||||
}
|
||||
|
||||
// CreateEphemeralSession creates the a global session
|
||||
// once that is used to delete keys at node failure
|
||||
func (s *Consul) createEphemeralSession() error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
if s.ephemeralSession == "" {
|
||||
entry := &api.SessionEntry{
|
||||
Behavior: api.SessionBehaviorDelete,
|
||||
TTL: s.ephemeralTTL.String(),
|
||||
}
|
||||
// Create global ephemeral keys session
|
||||
session, _, err := s.client.Session().Create(entry, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.ephemeralSession = session
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Normalize the key for usage in Consul
|
||||
func (s *Consul) normalize(key string) string {
|
||||
key = normalize(key)
|
||||
|
|
@ -91,11 +125,48 @@ func (s *Consul) Get(key string) (*KVPair, error) {
|
|||
}
|
||||
|
||||
// Put a value at "key"
|
||||
func (s *Consul) Put(key string, value []byte) error {
|
||||
p := &api.KVPair{Key: s.normalize(key), Value: value}
|
||||
if s.client == nil {
|
||||
log.Error("Error initializing client")
|
||||
func (s *Consul) Put(key string, value []byte, opts *WriteOptions) error {
|
||||
|
||||
key = s.normalize(key)
|
||||
|
||||
p := &api.KVPair{
|
||||
Key: key,
|
||||
Value: value,
|
||||
}
|
||||
|
||||
if opts != nil && opts.Ephemeral {
|
||||
// Creates the global ephemeral session
|
||||
// if it does not exist
|
||||
if s.ephemeralSession == "" {
|
||||
s.createEphemeralSession()
|
||||
}
|
||||
|
||||
// Create lock option with the
|
||||
// EphemeralSession
|
||||
lockOpts := &api.LockOptions{
|
||||
Key: key,
|
||||
Session: s.ephemeralSession,
|
||||
}
|
||||
|
||||
// Lock and ignore if lock is held
|
||||
// It's just a placeholder for the
|
||||
// ephemeral behavior
|
||||
lock, _ := s.client.LockOpts(lockOpts)
|
||||
if lock != nil {
|
||||
lock.Lock(nil)
|
||||
}
|
||||
|
||||
// Place the session on key
|
||||
p.Session = s.ephemeralSession
|
||||
|
||||
// Renew the session
|
||||
_, _, err := s.client.Session().Renew(p.Session, nil)
|
||||
if err != nil {
|
||||
s.ephemeralSession = ""
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
_, err := s.client.KV().Put(p, nil)
|
||||
return err
|
||||
}
|
||||
|
|
@ -258,7 +329,7 @@ func (l *consulLock) Unlock() error {
|
|||
|
||||
// AtomicPut put a value at "key" if the key has not been
|
||||
// modified in the meantime, throws an error if this is the case
|
||||
func (s *Consul) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) {
|
||||
func (s *Consul) AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, error) {
|
||||
p := &api.KVPair{Key: s.normalize(key), Value: value, ModifyIndex: previous.LastIndex}
|
||||
if work, _, err := s.client.KV().CAS(p, nil); err != nil {
|
||||
return false, err
|
||||
|
|
|
|||
|
|
@ -12,7 +12,8 @@ import (
|
|||
|
||||
// Etcd embeds the client
|
||||
type Etcd struct {
|
||||
client *etcd.Client
|
||||
client *etcd.Client
|
||||
ephemeralTTL time.Duration
|
||||
}
|
||||
|
||||
// InitializeEtcd creates a new Etcd client given
|
||||
|
|
@ -23,16 +24,19 @@ func InitializeEtcd(addrs []string, options *Config) (Store, error) {
|
|||
entries := createEndpoints(addrs, "http")
|
||||
s.client = etcd.NewClient(entries)
|
||||
|
||||
if options.TLS != nil {
|
||||
s.setTLS(options.TLS)
|
||||
// Set options
|
||||
if options != nil {
|
||||
if options.TLS != nil {
|
||||
s.setTLS(options.TLS)
|
||||
}
|
||||
if options.ConnectionTimeout != 0 {
|
||||
s.setTimeout(options.ConnectionTimeout)
|
||||
}
|
||||
if options.EphemeralTTL != 0 {
|
||||
s.setEphemeralTTL(options.EphemeralTTL)
|
||||
}
|
||||
}
|
||||
|
||||
if options.Timeout != 0 {
|
||||
s.setTimeout(options.Timeout)
|
||||
}
|
||||
|
||||
// FIXME sync on each operation?
|
||||
s.client.SyncCluster()
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
|
@ -64,9 +68,13 @@ func (s *Etcd) setTimeout(time time.Duration) {
|
|||
s.client.SetDialTimeout(time)
|
||||
}
|
||||
|
||||
// SetHeartbeat sets the heartbeat value to notify we are alive
|
||||
func (s *Etcd) setEphemeralTTL(time time.Duration) {
|
||||
s.ephemeralTTL = time
|
||||
}
|
||||
|
||||
// Create the entire path for a directory that does not exist
|
||||
func (s *Etcd) createDirectory(path string) error {
|
||||
// TODO Handle TTL at key/dir creation -> use K/V struct for key infos?
|
||||
if _, err := s.client.CreateDir(normalize(path), 10); err != nil {
|
||||
if etcdError, ok := err.(*etcd.EtcdError); ok {
|
||||
if etcdError.ErrorCode != 105 { // Skip key already exists
|
||||
|
|
@ -96,13 +104,20 @@ func (s *Etcd) Get(key string) (*KVPair, error) {
|
|||
}
|
||||
|
||||
// Put a value at "key"
|
||||
func (s *Etcd) Put(key string, value []byte) error {
|
||||
if _, err := s.client.Set(key, string(value), 0); err != nil {
|
||||
func (s *Etcd) Put(key string, value []byte, opts *WriteOptions) error {
|
||||
|
||||
// Default TTL = 0 means no expiration
|
||||
var ttl uint64
|
||||
if opts != nil && opts.Ephemeral {
|
||||
ttl = uint64(s.ephemeralTTL.Seconds())
|
||||
}
|
||||
|
||||
if _, err := s.client.Set(key, string(value), ttl); err != nil {
|
||||
if etcdError, ok := err.(*etcd.EtcdError); ok {
|
||||
if etcdError.ErrorCode == 104 { // Not a directory
|
||||
// Remove the last element (the actual key) and set the prefix as a dir
|
||||
err = s.createDirectory(getDirectory(key))
|
||||
if _, err := s.client.Set(key, string(value), 0); err != nil {
|
||||
if _, err := s.client.Set(key, string(value), ttl); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
@ -223,7 +238,7 @@ func (s *Etcd) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPai
|
|||
|
||||
// AtomicPut put a value at "key" if the key has not been
|
||||
// modified in the meantime, throws an error if this is the case
|
||||
func (s *Etcd) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) {
|
||||
func (s *Etcd) AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, error) {
|
||||
_, err := s.client.CompareAndSwap(normalize(key), string(value), 0, "", previous.LastIndex)
|
||||
if err != nil {
|
||||
return false, err
|
||||
|
|
|
|||
|
|
@ -21,8 +21,8 @@ func InitializeMock(endpoints []string, options *Config) (Store, error) {
|
|||
}
|
||||
|
||||
// Put mock
|
||||
func (s *Mock) Put(key string, value []byte) error {
|
||||
args := s.Mock.Called(key, value)
|
||||
func (s *Mock) Put(key string, value []byte, opts *WriteOptions) error {
|
||||
args := s.Mock.Called(key, value, opts)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
|
|
@ -75,8 +75,8 @@ func (s *Mock) DeleteTree(prefix string) error {
|
|||
}
|
||||
|
||||
// AtomicPut mock
|
||||
func (s *Mock) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) {
|
||||
args := s.Mock.Called(key, value, previous)
|
||||
func (s *Mock) AtomicPut(key string, value []byte, previous *KVPair, opts *WriteOptions) (bool, error) {
|
||||
args := s.Mock.Called(key, value, previous, opts)
|
||||
return args.Bool(0), args.Error(1)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -41,8 +41,9 @@ var (
|
|||
|
||||
// Config contains the options for a storage client
|
||||
type Config struct {
|
||||
TLS *tls.Config
|
||||
Timeout time.Duration
|
||||
TLS *tls.Config
|
||||
ConnectionTimeout time.Duration
|
||||
EphemeralTTL time.Duration
|
||||
}
|
||||
|
||||
// Store represents the backend K/V storage
|
||||
|
|
@ -51,7 +52,7 @@ type Config struct {
|
|||
// backend for libkv
|
||||
type Store interface {
|
||||
// Put a value at the specified key
|
||||
Put(key string, value []byte) error
|
||||
Put(key string, value []byte, options *WriteOptions) error
|
||||
|
||||
// Get a value given its key
|
||||
Get(key string) (*KVPair, error)
|
||||
|
|
@ -86,7 +87,7 @@ type Store interface {
|
|||
DeleteTree(prefix string) error
|
||||
|
||||
// Atomic operation on a single value
|
||||
AtomicPut(key string, value []byte, previous *KVPair) (bool, error)
|
||||
AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, error)
|
||||
|
||||
// Atomic delete of a single value
|
||||
AtomicDelete(key string, previous *KVPair) (bool, error)
|
||||
|
|
@ -99,6 +100,12 @@ type KVPair struct {
|
|||
LastIndex uint64
|
||||
}
|
||||
|
||||
// WriteOptions contains optional request parameters
|
||||
type WriteOptions struct {
|
||||
Heartbeat time.Duration
|
||||
Ephemeral bool
|
||||
}
|
||||
|
||||
// WatchCallback is used for watch methods on keys
|
||||
// and is triggered on key change
|
||||
type WatchCallback func(entries ...*KVPair)
|
||||
|
|
|
|||
|
|
@ -8,6 +8,8 @@ import (
|
|||
zk "github.com/samuel/go-zookeeper/zk"
|
||||
)
|
||||
|
||||
const defaultTimeout = 10 * time.Second
|
||||
|
||||
// Zookeeper embeds the zookeeper client
|
||||
type Zookeeper struct {
|
||||
timeout time.Duration
|
||||
|
|
@ -22,10 +24,13 @@ type zookeeperLock struct {
|
|||
// given a list of endpoints and optional tls config
|
||||
func InitializeZookeeper(endpoints []string, options *Config) (Store, error) {
|
||||
s := &Zookeeper{}
|
||||
s.timeout = 5 * time.Second // default timeout
|
||||
s.timeout = defaultTimeout
|
||||
|
||||
if options.Timeout != 0 {
|
||||
s.setTimeout(options.Timeout)
|
||||
// Set options
|
||||
if options != nil {
|
||||
if options.ConnectionTimeout != 0 {
|
||||
s.setTimeout(options.ConnectionTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
conn, _, err := zk.Connect(endpoints, s.timeout)
|
||||
|
|
@ -56,9 +61,13 @@ func (s *Zookeeper) Get(key string) (*KVPair, error) {
|
|||
}
|
||||
|
||||
// Create the entire path for a directory that does not exist
|
||||
func (s *Zookeeper) createFullpath(path []string) error {
|
||||
func (s *Zookeeper) createFullpath(path []string, ephemeral bool) error {
|
||||
for i := 1; i <= len(path); i++ {
|
||||
newpath := "/" + strings.Join(path[:i], "/")
|
||||
if i == len(path) && ephemeral {
|
||||
_, err := s.client.Create(newpath, []byte{1}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
|
||||
return err
|
||||
}
|
||||
_, err := s.client.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll))
|
||||
if err != nil {
|
||||
// Skip if node already exists
|
||||
|
|
@ -71,14 +80,18 @@ func (s *Zookeeper) createFullpath(path []string) error {
|
|||
}
|
||||
|
||||
// Put a value at "key"
|
||||
func (s *Zookeeper) Put(key string, value []byte) error {
|
||||
func (s *Zookeeper) Put(key string, value []byte, opts *WriteOptions) error {
|
||||
fkey := normalize(key)
|
||||
exists, err := s.Exists(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
s.createFullpath(splitKey(key))
|
||||
if opts != nil && opts.Ephemeral {
|
||||
s.createFullpath(splitKey(key), opts.Ephemeral)
|
||||
} else {
|
||||
s.createFullpath(splitKey(key), false)
|
||||
}
|
||||
}
|
||||
_, err = s.client.Set(fkey, value, -1)
|
||||
return err
|
||||
|
|
@ -198,7 +211,7 @@ func (s *Zookeeper) DeleteTree(prefix string) error {
|
|||
|
||||
// AtomicPut put a value at "key" if the key has not been
|
||||
// modified in the meantime, throws an error if this is the case
|
||||
func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) {
|
||||
func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, error) {
|
||||
// Use index of Set method to implement CAS
|
||||
return false, ErrNotImplemented
|
||||
}
|
||||
|
|
|
|||
|
|
@ -59,6 +59,30 @@ function teardown() {
|
|||
retry 5 1 discovery_check_swarm_info
|
||||
}
|
||||
|
||||
@test "consul discovery: node removal" {
|
||||
# The goal of this test is to ensure swarm can detect engines that
|
||||
# are removed from the discovery and refresh info accordingly
|
||||
|
||||
# Start the store
|
||||
start_store
|
||||
|
||||
# Start a manager with no engines.
|
||||
swarm_manage "$DISCOVERY"
|
||||
retry 10 1 discovery_check_swarm_info
|
||||
|
||||
# Add Engines to the cluster and make sure it's picked by swarm
|
||||
start_docker 2
|
||||
swarm_join "$DISCOVERY"
|
||||
retry 5 1 discovery_check_swarm_list "$DISCOVERY"
|
||||
retry 5 1 discovery_check_swarm_info
|
||||
|
||||
# Removes all the swarm agents
|
||||
swarm_join_cleanup
|
||||
|
||||
# Check if previously registered engines are all gone
|
||||
retry 30 1 discovery_check_swarm_info 0
|
||||
}
|
||||
|
||||
@test "consul discovery: failure" {
|
||||
# The goal of this test is to simulate a store failure and ensure discovery
|
||||
# is resilient to it.
|
||||
|
|
|
|||
|
|
@ -7,7 +7,12 @@ function discovery_check_swarm_info() {
|
|||
local total="$1"
|
||||
[ -z "$total" ] && total="${#HOSTS[@]}"
|
||||
|
||||
docker_swarm info | grep -q "Nodes: $count"
|
||||
docker_swarm info | grep -q "Nodes: $total"
|
||||
}
|
||||
|
||||
# Returns true if swarm info outputs is empty (0 nodes).
|
||||
function discovery_check_swarm_info_empty() {
|
||||
docker_swarm info | grep -q "Nodes: 0"
|
||||
}
|
||||
|
||||
# Returns true if all nodes have joined the discovery.
|
||||
|
|
|
|||
|
|
@ -12,7 +12,11 @@ DISCOVERY="etcd://${STORE_HOST}/test"
|
|||
CONTAINER_NAME=swarm_etcd
|
||||
|
||||
function start_store() {
|
||||
docker_host run -p $STORE_HOST:4001 --name=$CONTAINER_NAME -d coreos/etcd
|
||||
docker_host run -p $STORE_HOST:4001 \
|
||||
--name=$CONTAINER_NAME -d \
|
||||
quay.io/coreos/etcd:v2.0.11 \
|
||||
--listen-client-urls=http://0.0.0.0:2379,http://0.0.0.0:4001 \
|
||||
--advertise-client-urls=http://$STORE_HOST
|
||||
}
|
||||
|
||||
function stop_store() {
|
||||
|
|
@ -60,6 +64,30 @@ function teardown() {
|
|||
retry 5 1 discovery_check_swarm_info
|
||||
}
|
||||
|
||||
@test "etcd discovery: node removal" {
|
||||
# The goal of this test is to ensure swarm can detect engines that
|
||||
# are removed from the discovery and refresh info accordingly
|
||||
|
||||
# Start the store
|
||||
start_store
|
||||
|
||||
# Start a manager with no engines.
|
||||
swarm_manage "$DISCOVERY"
|
||||
retry 10 1 discovery_check_swarm_info
|
||||
|
||||
# Add Engines to the cluster and make sure it's picked by swarm
|
||||
start_docker 2
|
||||
swarm_join "$DISCOVERY"
|
||||
retry 5 1 discovery_check_swarm_list "$DISCOVERY"
|
||||
retry 5 1 discovery_check_swarm_info
|
||||
|
||||
# Removes all the swarm agents
|
||||
swarm_join_cleanup
|
||||
|
||||
# Check if previously registered engines are all gone
|
||||
retry 15 1 discovery_check_swarm_info 0
|
||||
}
|
||||
|
||||
@test "etcd discovery: failure" {
|
||||
# The goal of this test is to simulate a store failure and ensure discovery
|
||||
# is resilient to it.
|
||||
|
|
|
|||
|
|
@ -59,6 +59,30 @@ function teardown() {
|
|||
retry 10 1 discovery_check_swarm_info
|
||||
}
|
||||
|
||||
@test "zk discovery: node removal" {
|
||||
# The goal of this test is to ensure swarm can detect engines that
|
||||
# are removed from the discovery and refresh info accordingly
|
||||
|
||||
# Start the store
|
||||
start_store
|
||||
|
||||
# Start 2 engines and make them join the cluster.
|
||||
swarm_manage "$DISCOVERY"
|
||||
retry 10 1 discovery_check_swarm_info
|
||||
|
||||
# Add Engines to the cluster and make sure it's picked by swarm
|
||||
start_docker 2
|
||||
swarm_join "$DISCOVERY"
|
||||
retry 10 1 discovery_check_swarm_list "$DISCOVERY"
|
||||
retry 10 1 discovery_check_swarm_info
|
||||
|
||||
# Removes all the swarm agents
|
||||
swarm_join_cleanup
|
||||
|
||||
# Check if previously registered engines are all gone
|
||||
retry 20 1 discovery_check_swarm_info 0
|
||||
}
|
||||
|
||||
@test "zk discovery: failure" {
|
||||
# The goal of this test is to simulate a store failure and ensure discovery
|
||||
# is resilient to it.
|
||||
|
|
|
|||
|
|
@ -117,7 +117,7 @@ function swarm_join() {
|
|||
for ((i=current; i < nodes; i++)); do
|
||||
local h="${HOSTS[$i]}"
|
||||
echo "Swarm join #${i}: $h $addr"
|
||||
"$SWARM_BINARY" -l debug join --heartbeat=1s --addr="$h" "$addr" &
|
||||
"$SWARM_BINARY" -l debug join --heartbeat=1s --ttl=10s --addr="$h" "$addr" &
|
||||
SWARM_JOIN_PID[$i]=$!
|
||||
done
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue