diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go index 351669f8f6..107bec56b5 100644 --- a/discovery/kv/kv.go +++ b/discovery/kv/kv.go @@ -53,7 +53,6 @@ func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Du s.backend, addrs, &store.Config{ - Heartbeat: s.heartbeat, EphemeralTTL: s.ttl, }, ) @@ -124,6 +123,6 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c // Register is exported func (s *Discovery) Register(addr string) error { - opts := &store.WriteOptions{Ephemeral: true} + opts := &store.WriteOptions{Ephemeral: true, Heartbeat: s.heartbeat} return s.store.Put(path.Join(s.prefix, addr), []byte(addr), opts) } diff --git a/pkg/store/consul.go b/pkg/store/consul.go index 51d503c31a..9825fa70df 100644 --- a/pkg/store/consul.go +++ b/pkg/store/consul.go @@ -118,7 +118,7 @@ func (s *Consul) Put(key string, value []byte, opts *WriteOptions) error { if _, ok := s.sessions[key]; !ok { entry := &api.SessionEntry{ Behavior: api.SessionBehaviorDelete, - TTL: time.Duration(60 * time.Second).String(), + TTL: s.ephemeralTTL.String(), } // Create global ephemeral keys session @@ -142,25 +142,26 @@ func (s *Consul) Put(key string, value []byte, opts *WriteOptions) error { lock.Lock(nil) } - // Register in sessions map s.sessions[key] = session + p.Session = session - // Renew the session periodically - go func() { - ticker := time.NewTicker(20 * time.Second) - for { - select { - case <-ticker.C: - _, _, err := s.client.Session().Renew(p.Session, nil) - if err != nil { - delete(s.sessions, key) - return + // Renew the session periodically if heartbeat set + if opts.Heartbeat != 0 { + go func() { + ticker := time.NewTicker(opts.Heartbeat) + for { + select { + case <-ticker.C: + _, _, err := s.client.Session().Renew(p.Session, nil) + if err != nil { + delete(s.sessions, key) + return + } } } - } - }() + }() + } } - p.Session = s.sessions[key] } _, err := s.client.KV().Put(p, nil) diff --git a/pkg/store/store.go b/pkg/store/store.go index b8a3446100..cb70dff1a2 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -43,7 +43,6 @@ var ( type Config struct { TLS *tls.Config ConnectionTimeout time.Duration - Heartbeat time.Duration EphemeralTTL time.Duration } @@ -99,11 +98,6 @@ const ( // to be removed, it is set to 0 to explain // that there is no expiration DefaultTTL = 0 - - // EphemeralTTL is used for the ephemeral node - // behavior. If the node session is not renewed - // before the ttl expires, the node is removed - EphemeralTTL = 60 ) // KVPair represents {Key, Value, Lastindex} tuple @@ -115,6 +109,7 @@ type KVPair struct { // WriteOptions contains optional request parameters type WriteOptions struct { + Heartbeat time.Duration Ephemeral bool } diff --git a/test/integration/discovery/consul.bats b/test/integration/discovery/consul.bats index 8701c8440c..5ec88c6faf 100644 --- a/test/integration/discovery/consul.bats +++ b/test/integration/discovery/consul.bats @@ -59,6 +59,31 @@ function teardown() { retry 5 1 discovery_check_swarm_info } +@test "consul discovery: check for engines departure" { + # The goal of this test is to ensure swarm can detect engines that + # are removed from the discovery and refresh info accordingly + + # Start the store + start_store + + # Start a manager with no engines. + swarm_manage "$DISCOVERY" + retry 10 1 discovery_check_swarm_info + + # Add Engines to the cluster and make sure it's picked by swarm + start_docker 2 + swarm_join "$DISCOVERY" + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 5 1 discovery_check_swarm_info + + # Removes all the swarm agents + swarm_join_cleanup + + # Check if previously registered engines are all gone + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 30 1 discovery_check_swarm_info_empty +} + @test "consul discovery: failure" { # The goal of this test is to simulate a store failure and ensure discovery # is resilient to it. diff --git a/test/integration/discovery/discovery_helpers.bash b/test/integration/discovery/discovery_helpers.bash index e080b90fdc..4b0ba85263 100644 --- a/test/integration/discovery/discovery_helpers.bash +++ b/test/integration/discovery/discovery_helpers.bash @@ -10,6 +10,11 @@ function discovery_check_swarm_info() { docker_swarm info | grep -q "Nodes: $count" } +# Returns true if swarm info outputs is empty (0 nodes). +function discovery_check_swarm_info_empty() { + docker_swarm info | grep -q "Nodes: 0" +} + # Returns true if all nodes have joined the discovery. function discovery_check_swarm_list() { local joined=`swarm list "$1" | wc -l` diff --git a/test/integration/discovery/etcd.bats b/test/integration/discovery/etcd.bats index e883b12148..da54afb0ed 100644 --- a/test/integration/discovery/etcd.bats +++ b/test/integration/discovery/etcd.bats @@ -64,6 +64,31 @@ function teardown() { retry 5 1 discovery_check_swarm_info } +@test "etcd discovery: check for engines departure" { + # The goal of this test is to ensure swarm can detect engines that + # are removed from the discovery and refresh info accordingly + + # Start the store + start_store + + # Start a manager with no engines. + swarm_manage "$DISCOVERY" + retry 10 1 discovery_check_swarm_info + + # Add Engines to the cluster and make sure it's picked by swarm + start_docker 2 + swarm_join "$DISCOVERY" + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 5 1 discovery_check_swarm_info + + # Removes all the swarm agents + swarm_join_cleanup + + # Check if nodes are all gone + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 15 1 discovery_check_swarm_info_empty +} + @test "etcd discovery: failure" { # The goal of this test is to simulate a store failure and ensure discovery # is resilient to it. diff --git a/test/integration/discovery/zk.bats b/test/integration/discovery/zk.bats index 0ab9d0bbca..850ed46fa2 100644 --- a/test/integration/discovery/zk.bats +++ b/test/integration/discovery/zk.bats @@ -59,6 +59,31 @@ function teardown() { retry 10 1 discovery_check_swarm_info } +@test "zk discovery: check for engines departure" { + # The goal of this test is to ensure swarm can detect engines that + # are removed from the discovery and refresh info accordingly + + # Start the store + start_store + + # Start 2 engines and make them join the cluster. + swarm_manage "$DISCOVERY" + retry 10 1 discovery_check_swarm_info + + # Add Engines to the cluster and make sure it's picked by swarm + start_docker 2 + swarm_join "$DISCOVERY" + retry 10 1 discovery_check_swarm_list "$DISCOVERY" + retry 10 1 discovery_check_swarm_info + + # Removes all the swarm agents + swarm_join_cleanup + + # Check if previously registered engines are all gone + retry 10 1 discovery_check_swarm_list "$DISCOVERY" + retry 20 1 discovery_check_swarm_info_empty +} + @test "zk discovery: failure" { # The goal of this test is to simulate a store failure and ensure discovery # is resilient to it. diff --git a/test/integration/helpers.bash b/test/integration/helpers.bash index 231b900b5f..78bc5feb06 100644 --- a/test/integration/helpers.bash +++ b/test/integration/helpers.bash @@ -117,7 +117,7 @@ function swarm_join() { for ((i=current; i < nodes; i++)); do local h="${HOSTS[$i]}" echo "Swarm join #${i}: $h $addr" - "$SWARM_BINARY" -l debug join --heartbeat=1s --addr="$h" "$addr" & + "$SWARM_BINARY" -l debug join --heartbeat=1s --ttl=10s --addr="$h" "$addr" & SWARM_JOIN_PID[$i]=$! done }