From f81de46ab4556a330584a847f92d87ffe5e58b13 Mon Sep 17 00:00:00 2001 From: Alexandre Beslic Date: Mon, 18 May 2015 19:39:10 -0700 Subject: [PATCH] Fix Consul and etcd with latest changes, use etcd v2.0.11 for integration tests, remove call to SyncCluster for now (breaks the integration tests) Signed-off-by: Alexandre Beslic --- cli/cli.go | 8 +-- cli/flags.go | 5 ++ cli/help.go | 1 + cli/join.go | 9 ++- cluster/swarm/cluster.go | 14 ++++- discovery/discovery.go | 6 +- discovery/file/file.go | 2 +- discovery/file/file_test.go | 6 +- discovery/kv/kv.go | 23 ++++---- discovery/kv/kv_test.go | 8 +-- discovery/nodes/nodes.go | 2 +- discovery/nodes/nodes_test.go | 6 +- discovery/token/token.go | 4 +- discovery/token/token_test.go | 6 +- pkg/store/consul.go | 87 ++++++++++++++++++---------- pkg/store/etcd.go | 22 ++++--- pkg/store/store.go | 2 + pkg/store/zookeeper.go | 2 +- test/integration/discovery/etcd.bats | 6 +- 19 files changed, 143 insertions(+), 76 deletions(-) diff --git a/cli/cli.go b/cli/cli.go index e221a7ee7e..d3738790e0 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -65,7 +65,7 @@ func Run() { log.Fatalf("the `create` command takes no arguments. See '%s create --help'.", c.App.Name) } discovery := &token.Discovery{} - discovery.Initialize("", 0) + discovery.Initialize("", 0, 0) token, err := discovery.CreateCluster() if err != nil { log.Fatal(err) @@ -88,7 +88,7 @@ func Run() { log.Fatalf("invalid --timeout: %v", err) } - d, err := discovery.New(dflag, timeout) + d, err := discovery.New(dflag, timeout, 0) if err != nil { log.Fatal(err) } @@ -122,8 +122,8 @@ func Run() { { Name: "join", ShortName: "j", - Usage: "Join a docker cluster", - Flags: []cli.Flag{flAddr, flHeartBeat}, + Usage: "join a docker cluster", + Flags: []cli.Flag{flAddr, flHeartBeat, flTTL}, Action: join, }, } diff --git a/cli/flags.go b/cli/flags.go index b24362b73c..7dc3ab071f 100644 --- a/cli/flags.go +++ b/cli/flags.go @@ -53,6 +53,11 @@ var ( Value: "25s", Usage: "period between each heartbeat", } + flTTL = cli.StringFlag{ + Name: "time-to-live, ttl", + Value: "75s", + Usage: "sets the expiration of an ephemeral node", + } flTimeout = cli.StringFlag{ Name: "timeout", Value: "10s", diff --git a/cli/help.go b/cli/help.go index c66dc61323..c229f58ff7 100644 --- a/cli/help.go +++ b/cli/help.go @@ -45,6 +45,7 @@ Options: {{range .Flags}}{{.}} {{end}}{{if (eq .Name "manage")}}{{printf "\t * swarm.overcommit=0.05\tovercommit to apply on resources"}} {{printf "\t * swarm.discovery.heartbeat=25s\tperiod between each heartbeat"}}{{end}}{{ end }} + {{printf "\t * swarm.discovery.ttl=75s\ttime limit for a key to expire if ephemeral"}}{{end}}{{ end }} ` } diff --git a/cli/join.go b/cli/join.go index 7a151a5215..56a4324cb9 100644 --- a/cli/join.go +++ b/cli/join.go @@ -27,7 +27,14 @@ func join(c *cli.Context) { if hb < 1*time.Second { log.Fatal("--heartbeat should be at least one second") } - d, err := discovery.New(dflag, hb) + ttl, err := time.ParseDuration(c.String("ttl")) + if err != nil { + log.Fatalf("invalid --ttl: %v", err) + } + if ttl <= hb { + log.Fatal("--ttl must be strictly superior to the heartbeat value") + } + d, err := discovery.New(dflag, hb, ttl) if err != nil { log.Fatal(err) } diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index 5826ccb992..788c03f8ba 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -56,6 +56,7 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *t } heartbeat := 25 * time.Second + ttl := 75 * time.Second if opt, ok := options.String("swarm.discovery.heartbeat", ""); ok { h, err := time.ParseDuration(opt) if err != nil { @@ -67,8 +68,19 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *t heartbeat = h } + if opt, ok := options.String("swarm.discovery.ttl", ""); ok { + t, err := time.ParseDuration(opt) + if err != nil { + return nil, err + } + if t <= heartbeat { + return nil, fmt.Errorf("invalid ttl %s: must be strictly superior to heartbeat value", opt) + } + ttl = t + } + // Set up discovery. - cluster.discovery, err = discovery.New(dflag, heartbeat) + cluster.discovery, err = discovery.New(dflag, heartbeat, ttl) if err != nil { log.Fatal(err) } diff --git a/discovery/discovery.go b/discovery/discovery.go index f1440e044e..649a9d6007 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -85,7 +85,7 @@ func (e Entries) Diff(cmp Entries) (Entries, Entries) { // manage swarm host entries. type Discovery interface { // Initialize the discovery with URIs and a heartbeat. - Initialize(string, time.Duration) error + Initialize(string, time.Duration, time.Duration) error // Watch the discovery for entry changes. // Returns a channel that will receive changes or an error. @@ -133,12 +133,12 @@ func parse(rawurl string) (string, string) { // New returns a new Discovery given a URL and heartbeat settings. // Returns an error if the URL scheme is not supported. -func New(rawurl string, heartbeat time.Duration) (Discovery, error) { +func New(rawurl string, heartbeat time.Duration, ttl time.Duration) (Discovery, error) { scheme, uri := parse(rawurl) if discovery, exists := discoveries[scheme]; exists { log.WithFields(log.Fields{"name": scheme, "uri": uri}).Debug("Initializing discovery service") - err := discovery.Initialize(uri, heartbeat) + err := discovery.Initialize(uri, heartbeat, ttl) return discovery, err } diff --git a/discovery/file/file.go b/discovery/file/file.go index 4eb146ca7a..994f0aefd4 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -20,7 +20,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(path string, heartbeat time.Duration) error { +func (s *Discovery) Initialize(path string, heartbeat time.Duration, ttl time.Duration) error { s.path = path s.heartbeat = heartbeat return nil diff --git a/discovery/file/file_test.go b/discovery/file/file_test.go index 0ec05972dc..d9038bb137 100644 --- a/discovery/file/file_test.go +++ b/discovery/file/file_test.go @@ -11,12 +11,12 @@ import ( func TestInitialize(t *testing.T) { d := &Discovery{} - d.Initialize("/path/to/file", 0) + d.Initialize("/path/to/file", 0, 0) assert.Equal(t, d.path, "/path/to/file") } func TestNew(t *testing.T) { - d, err := discovery.New("file:///path/to/file", 0) + d, err := discovery.New("file:///path/to/file", 0, 0) assert.NoError(t, err) assert.Equal(t, d.(*Discovery).path, "/path/to/file") } @@ -73,7 +73,7 @@ func TestWatch(t *testing.T) { // Set up file discovery. d := &Discovery{} - d.Initialize(tmp.Name(), 1) + d.Initialize(tmp.Name(), 1, 0) stopCh := make(chan struct{}) ch, errCh := d.Watch(stopCh) diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go index ca328b95df..351669f8f6 100644 --- a/discovery/kv/kv.go +++ b/discovery/kv/kv.go @@ -16,6 +16,7 @@ type Discovery struct { backend store.Backend store store.Store heartbeat time.Duration + ttl time.Duration prefix string } @@ -26,7 +27,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(uris string, heartbeat time.Duration) error { +func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Duration) error { var ( parts = strings.SplitN(uris, "/", 2) ips = strings.Split(parts[0], ",") @@ -43,11 +44,19 @@ func (s *Discovery) Initialize(uris string, heartbeat time.Duration) error { } s.heartbeat = heartbeat + s.ttl = ttl s.prefix = parts[1] // Creates a new store, will ignore options given // if not supported by the chosen store - s.store, err = store.CreateStore(s.backend, addrs, nil) + s.store, err = store.CreateStore( + s.backend, + addrs, + &store.Config{ + Heartbeat: s.heartbeat, + EphemeralTTL: s.ttl, + }, + ) return err } @@ -116,13 +125,5 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c // Register is exported func (s *Discovery) Register(addr string) error { opts := &store.WriteOptions{Ephemeral: true} - err := s.store.Put(path.Join(s.prefix, addr), []byte(addr), opts) - return err -} - -func convertToStringArray(entries []*store.KVPair) (addrs []string) { - for _, entry := range entries { - addrs = append(addrs, string(entry.Value)) - } - return addrs + return s.store.Put(path.Join(s.prefix, addr), []byte(addr), opts) } diff --git a/discovery/kv/kv_test.go b/discovery/kv/kv_test.go index 381171f415..0f349f9255 100644 --- a/discovery/kv/kv_test.go +++ b/discovery/kv/kv_test.go @@ -13,17 +13,17 @@ import ( func TestInitialize(t *testing.T) { d := &Discovery{backend: store.MOCK} - assert.EqualError(t, d.Initialize("127.0.0.1", 0), "invalid format \"127.0.0.1\", missing ") + assert.EqualError(t, d.Initialize("127.0.0.1", 0, 0), "invalid format \"127.0.0.1\", missing ") d = &Discovery{backend: store.MOCK} - assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0)) + assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0, 0)) s := d.store.(*store.Mock) assert.Len(t, s.Endpoints, 1) assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234") assert.Equal(t, d.prefix, "path") d = &Discovery{backend: store.MOCK} - assert.NoError(t, d.Initialize("127.0.0.1:1234,127.0.0.2:1234,127.0.0.3:1234/path", 0)) + assert.NoError(t, d.Initialize("127.0.0.1:1234,127.0.0.2:1234,127.0.0.3:1234/path", 0, 0)) s = d.store.(*store.Mock) assert.Len(t, s.Endpoints, 3) assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234") @@ -34,7 +34,7 @@ func TestInitialize(t *testing.T) { func TestWatch(t *testing.T) { d := &Discovery{backend: store.MOCK} - assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0)) + assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0, 0)) s := d.store.(*store.Mock) mockCh := make(chan []*store.KVPair) diff --git a/discovery/nodes/nodes.go b/discovery/nodes/nodes.go index 68d8b0ec0b..b93208186d 100644 --- a/discovery/nodes/nodes.go +++ b/discovery/nodes/nodes.go @@ -17,7 +17,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(uris string, _ time.Duration) error { +func (s *Discovery) Initialize(uris string, _ time.Duration, _ time.Duration) error { for _, input := range strings.Split(uris, ",") { for _, ip := range discovery.Generate(input) { entry, err := discovery.NewEntry(ip) diff --git a/discovery/nodes/nodes_test.go b/discovery/nodes/nodes_test.go index 4c0583cbae..d59e38621d 100644 --- a/discovery/nodes/nodes_test.go +++ b/discovery/nodes/nodes_test.go @@ -9,7 +9,7 @@ import ( func TestInitialize(t *testing.T) { d := &Discovery{} - d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0) + d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0, 0) assert.Equal(t, len(d.entries), 2) assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111") assert.Equal(t, d.entries[1].String(), "2.2.2.2:2222") @@ -17,7 +17,7 @@ func TestInitialize(t *testing.T) { func TestInitializeWithPattern(t *testing.T) { d := &Discovery{} - d.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0) + d.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0, 0) assert.Equal(t, len(d.entries), 5) assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111") assert.Equal(t, d.entries[1].String(), "1.1.1.2:1111") @@ -28,7 +28,7 @@ func TestInitializeWithPattern(t *testing.T) { func TestWatch(t *testing.T) { d := &Discovery{} - d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0) + d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0, 0) expected := discovery.Entries{ &discovery.Entry{Host: "1.1.1.1", Port: "1111"}, &discovery.Entry{Host: "2.2.2.2", Port: "2222"}, diff --git a/discovery/token/token.go b/discovery/token/token.go index 84ec0be8cd..cca6f27b79 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -18,6 +18,7 @@ const DiscoveryURL = "https://discovery-stage.hub.docker.com/v1" // Discovery is exported type Discovery struct { heartbeat time.Duration + ttl time.Duration url string token string } @@ -27,7 +28,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(urltoken string, heartbeat time.Duration) error { +func (s *Discovery) Initialize(urltoken string, heartbeat time.Duration, ttl time.Duration) error { if i := strings.LastIndex(urltoken, "/"); i != -1 { s.url = "https://" + urltoken[:i] s.token = urltoken[i+1:] @@ -40,6 +41,7 @@ func (s *Discovery) Initialize(urltoken string, heartbeat time.Duration) error { return errors.New("token is empty") } s.heartbeat = heartbeat + s.ttl = ttl return nil } diff --git a/discovery/token/token_test.go b/discovery/token/token_test.go index 47d9d8856c..16e3fa6ab9 100644 --- a/discovery/token/token_test.go +++ b/discovery/token/token_test.go @@ -10,17 +10,17 @@ import ( func TestInitialize(t *testing.T) { discovery := &Discovery{} - err := discovery.Initialize("token", 0) + err := discovery.Initialize("token", 0, 0) assert.NoError(t, err) assert.Equal(t, discovery.token, "token") assert.Equal(t, discovery.url, DiscoveryURL) - err = discovery.Initialize("custom/path/token", 0) + err = discovery.Initialize("custom/path/token", 0, 0) assert.NoError(t, err) assert.Equal(t, discovery.token, "token") assert.Equal(t, discovery.url, "https://custom/path") - err = discovery.Initialize("", 0) + err = discovery.Initialize("", 0, 0) assert.Error(t, err) } diff --git a/pkg/store/consul.go b/pkg/store/consul.go index b97b04fcd6..51d503c31a 100644 --- a/pkg/store/consul.go +++ b/pkg/store/consul.go @@ -21,7 +21,9 @@ const ( type Consul struct { config *api.Config client *api.Client + ephemeralTTL time.Duration ephemeralSession string + sessions map[string]string } type consulLock struct { @@ -32,6 +34,7 @@ type consulLock struct { // a list of endpoints and optional tls config func InitializeConsul(endpoints []string, options *Config) (Store, error) { s := &Consul{} + s.sessions = make(map[string]string) // Create Consul client config := api.DefaultConfig() @@ -48,6 +51,9 @@ func InitializeConsul(endpoints []string, options *Config) (Store, error) { if options.ConnectionTimeout != 0 { s.setTimeout(options.ConnectionTimeout) } + if options.EphemeralTTL != 0 { + s.setEphemeralTTL(options.EphemeralTTL) + } } // Creates a new client @@ -58,17 +64,6 @@ func InitializeConsul(endpoints []string, options *Config) (Store, error) { } s.client = client - // Create global ephemeral keys session - entry := &api.SessionEntry{ - Behavior: api.SessionBehaviorDelete, - TTL: time.Duration(EphemeralTTL * time.Second).String(), - } - session, _, err := s.client.Session().Create(entry, nil) - if err != nil { - return nil, err - } - s.ephemeralSession = session - return s, nil } @@ -85,6 +80,11 @@ func (s *Consul) setTimeout(time time.Duration) { s.config.WaitTime = time } +// SetEphemeralTTL sets the ttl for ephemeral nodes +func (s *Consul) setEphemeralTTL(time time.Duration) { + s.ephemeralTTL = time +} + // Normalize the key for usage in Consul func (s *Consul) normalize(key string) string { key = normalize(key) @@ -106,34 +106,61 @@ func (s *Consul) Get(key string) (*KVPair, error) { // Put a value at "key" func (s *Consul) Put(key string, value []byte, opts *WriteOptions) error { + + key = s.normalize(key) + p := &api.KVPair{ - Key: s.normalize(key), + Key: key, Value: value, } if opts != nil && opts.Ephemeral { - p.Session = s.ephemeralSession + if _, ok := s.sessions[key]; !ok { + entry := &api.SessionEntry{ + Behavior: api.SessionBehaviorDelete, + TTL: time.Duration(60 * time.Second).String(), + } - // Create lock option with the - // EphemeralSession - lockOpts := &api.LockOptions{ - Key: key, - Session: s.ephemeralSession, - } + // Create global ephemeral keys session + session, _, err := s.client.Session().Create(entry, nil) + if err != nil { + return err + } - // Lock and ignore if lock is held - // It's just a placeholder for the - // ephemeral behavior - lock, _ := s.client.LockOpts(lockOpts) - if lock != nil { - lock.Lock(nil) - } + // Create lock option with the + // EphemeralSession + lockOpts := &api.LockOptions{ + Key: key, + Session: session, + } - // Try to renew the session - _, _, err := s.client.Session().Renew(p.Session, nil) - if err != nil { - return err + // Lock and ignore if lock is held + // It's just a placeholder for the + // ephemeral behavior + lock, _ := s.client.LockOpts(lockOpts) + if lock != nil { + lock.Lock(nil) + } + + // Register in sessions map + s.sessions[key] = session + + // Renew the session periodically + go func() { + ticker := time.NewTicker(20 * time.Second) + for { + select { + case <-ticker.C: + _, _, err := s.client.Session().Renew(p.Session, nil) + if err != nil { + delete(s.sessions, key) + return + } + } + } + }() } + p.Session = s.sessions[key] } _, err := s.client.KV().Put(p, nil) diff --git a/pkg/store/etcd.go b/pkg/store/etcd.go index 50f6522926..3c33732108 100644 --- a/pkg/store/etcd.go +++ b/pkg/store/etcd.go @@ -12,7 +12,8 @@ import ( // Etcd embeds the client type Etcd struct { - client *etcd.Client + client *etcd.Client + ephemeralTTL time.Duration } // InitializeEtcd creates a new Etcd client given @@ -31,10 +32,11 @@ func InitializeEtcd(addrs []string, options *Config) (Store, error) { if options.ConnectionTimeout != 0 { s.setTimeout(options.ConnectionTimeout) } + if options.EphemeralTTL != 0 { + s.setEphemeralTTL(options.EphemeralTTL) + } } - // FIXME sync on each operation? - s.client.SyncCluster() return s, nil } @@ -66,6 +68,11 @@ func (s *Etcd) setTimeout(time time.Duration) { s.client.SetDialTimeout(time) } +// SetHeartbeat sets the heartbeat value to notify we are alive +func (s *Etcd) setEphemeralTTL(time time.Duration) { + s.ephemeralTTL = time +} + // Create the entire path for a directory that does not exist func (s *Etcd) createDirectory(path string) error { if _, err := s.client.CreateDir(normalize(path), 10); err != nil { @@ -100,24 +107,23 @@ func (s *Etcd) Get(key string) (*KVPair, error) { func (s *Etcd) Put(key string, value []byte, opts *WriteOptions) error { // Default TTL = 0 means no expiration - ttl := DefaultTTL + var ttl uint64 if opts != nil && opts.Ephemeral { - ttl = EphemeralTTL + ttl = uint64(s.ephemeralTTL.Seconds()) } - if _, err := s.client.Set(key, string(value), uint64(ttl)); err != nil { + if _, err := s.client.Set(key, string(value), ttl); err != nil { if etcdError, ok := err.(*etcd.EtcdError); ok { if etcdError.ErrorCode == 104 { // Not a directory // Remove the last element (the actual key) and set the prefix as a dir err = s.createDirectory(getDirectory(key)) - if _, err := s.client.Set(key, string(value), uint64(ttl)); err != nil { + if _, err := s.client.Set(key, string(value), ttl); err != nil { return err } } } return err } - return nil } diff --git a/pkg/store/store.go b/pkg/store/store.go index dfc352d535..b8a3446100 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -43,6 +43,8 @@ var ( type Config struct { TLS *tls.Config ConnectionTimeout time.Duration + Heartbeat time.Duration + EphemeralTTL time.Duration } // Store represents the backend K/V storage diff --git a/pkg/store/zookeeper.go b/pkg/store/zookeeper.go index 7ae92eb449..68a7846bea 100644 --- a/pkg/store/zookeeper.go +++ b/pkg/store/zookeeper.go @@ -22,7 +22,7 @@ type zookeeperLock struct { // given a list of endpoints and optional tls config func InitializeZookeeper(endpoints []string, options *Config) (Store, error) { s := &Zookeeper{} - s.timeout = 5 * time.Second // default timeout + s.timeout = 10 * time.Second // default timeout // Set options if options != nil { diff --git a/test/integration/discovery/etcd.bats b/test/integration/discovery/etcd.bats index a49001d996..e883b12148 100644 --- a/test/integration/discovery/etcd.bats +++ b/test/integration/discovery/etcd.bats @@ -12,7 +12,11 @@ DISCOVERY="etcd://${STORE_HOST}/test" CONTAINER_NAME=swarm_etcd function start_store() { - docker_host run -p $STORE_HOST:4001 --name=$CONTAINER_NAME -d coreos/etcd + docker_host run -p $STORE_HOST:4001 \ + --name=$CONTAINER_NAME -d \ + quay.io/coreos/etcd:v2.0.11 \ + --listen-client-urls=http://0.0.0.0:2379,http://0.0.0.0:4001 \ + --advertise-client-urls=http://$STORE_HOST } function stop_store() {