From 97984881c349543dc00ee7bb8a09c3c1c2652be3 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Fri, 15 May 2015 20:27:01 -0700 Subject: [PATCH 01/16] discovery: New channel based API. - Watch() issues updates by channel rather than by callback - Fetch() is gone - Watch() can be stopped at any time by closing the stop channel - Watch() is now resilient to errors and will try over and over Signed-off-by: Andrea Luzzardi --- cli/cli.go | 12 +++- cli/join.go | 6 +- cluster/swarm/cluster.go | 129 +++++++++++++++++++--------------- discovery/discovery.go | 37 +++++++--- discovery/discovery_test.go | 2 +- discovery/file/file.go | 41 +++++++++-- discovery/kv/kv.go | 78 +++++++++++++------- discovery/nodes/nodes.go | 15 ++-- discovery/token/token.go | 45 +++++++++--- discovery/token/token_test.go | 27 +++++-- 10 files changed, 264 insertions(+), 128 deletions(-) diff --git a/cli/cli.go b/cli/cli.go index b1b31a673e..9227ac1d96 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path" + "time" log "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" @@ -88,12 +89,17 @@ func Run() { log.Fatal(err) } - nodes, err := d.Fetch() + ch, err := d.Watch(nil) if err != nil { log.Fatal(err) } - for _, node := range nodes { - fmt.Println(node) + select { + case entries := <-ch: + for _, entry := range entries { + fmt.Println(entry) + } + case <-time.After(10 * time.Second): + log.Fatal("Timed out") } }, }, diff --git a/cli/join.go b/cli/join.go index 3c5a6c7594..dc7fe31801 100644 --- a/cli/join.go +++ b/cli/join.go @@ -41,10 +41,10 @@ func join(c *cli.Context) { log.Fatal(err) } - hbval := time.Duration(hb) + hbval := time.Duration(hb) * time.Second for { - log.WithFields(log.Fields{"addr": addr, "discovery": dflag}).Infof("Registering on the discovery service every %d seconds...", hbval) - time.Sleep(hbval * time.Second) + log.WithFields(log.Fields{"addr": addr, "discovery": dflag}).Infof("Registering on the discovery service every %s...", hbval) + time.Sleep(hbval) if err := d.Register(addr); err != nil { log.Error(err) } diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index 440066ce7b..58c188149b 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -28,10 +28,9 @@ type Cluster struct { engines map[string]*cluster.Engine scheduler *scheduler.Scheduler store *state.Store + discovery discovery.Discovery overcommitRatio float64 - discovery string - heartbeat uint64 TLSConfig *tls.Config } @@ -39,13 +38,15 @@ type Cluster struct { func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *tls.Config, dflag string, options cluster.DriverOpts) (cluster.Cluster, error) { log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster") + var ( + err error + ) + cluster := &Cluster{ engines: make(map[string]*cluster.Engine), scheduler: scheduler, store: store, overcommitRatio: 0.05, - heartbeat: 25, - discovery: dflag, TLSConfig: TLSConfig, } @@ -53,29 +54,25 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *t cluster.overcommitRatio = val } - if heartbeat, ok := options.Uint("swarm.discovery.heartbeat"); ok { - cluster.heartbeat = heartbeat - if cluster.heartbeat < 1 { + heartbeat := uint64(25) + if opt, ok := options.Uint("swarm.discovery.heartbeat"); ok { + if opt < 1 { return nil, errors.New("heartbeat should be an unsigned integer and greater than 0") } + heartbeat = opt } + log.Errorf("chb: %d", heartbeat) - // get the list of entries from the discovery service - go func() { - d, err := discovery.New(cluster.discovery, cluster.heartbeat) - if err != nil { - log.Fatal(err) - } - - entries, err := d.Fetch() - if err != nil { - log.Fatal(err) - - } - cluster.newEntries(entries) - - go d.Watch(cluster.newEntries) - }() + // Set up discovery. + cluster.discovery, err = discovery.New(dflag, heartbeat) + if err != nil { + log.Fatal(err) + } + discoveryCh, err := cluster.discovery.Watch(nil) + if err != nil { + log.Fatal(err) + } + go cluster.monitorDiscovery(discoveryCh) return cluster, nil } @@ -165,40 +162,6 @@ func (c *Cluster) RemoveContainer(container *cluster.Container, force bool) erro return nil } -// Entries are Docker Engines -func (c *Cluster) newEntries(entries []*discovery.Entry) { - for _, entry := range entries { - go func(m *discovery.Entry) { - if !c.hasEngine(m.String()) { - engine := cluster.NewEngine(m.String(), c.overcommitRatio) - if err := engine.Connect(c.TLSConfig); err != nil { - log.Error(err) - return - } - c.Lock() - - if old, exists := c.engines[engine.ID]; exists { - c.Unlock() - if old.Addr != engine.Addr { - log.Errorf("ID duplicated. %s shared by %s and %s", engine.ID, old.Addr, engine.Addr) - } else { - log.Debugf("node %q (name: %q) with address %q is already registered", engine.ID, engine.Name, engine.Addr) - } - return - } - c.engines[engine.ID] = engine - if err := engine.RegisterEventHandler(c); err != nil { - log.Error(err) - c.Unlock() - return - } - c.Unlock() - - } - }(entry) - } -} - func (c *Cluster) hasEngine(addr string) bool { c.RLock() defer c.RUnlock() @@ -211,6 +174,58 @@ func (c *Cluster) hasEngine(addr string) bool { return false } +func (c *Cluster) addEngine(addr string) bool { + // Check the engine is already registered by address. + if c.hasEngine(addr) { + return false + } + + // Attempt a connection to the engine. Since this is slow, don't get a hold + // of the lock yet. + engine := cluster.NewEngine(addr, c.overcommitRatio) + if err := engine.Connect(c.TLSConfig); err != nil { + log.Error(err) + return false + } + + // The following is critical and fast. Grab a lock. + c.Lock() + defer c.Unlock() + + // Make sure the engine ID is unique. + if old, exists := c.engines[engine.ID]; exists { + if old.Addr != engine.Addr { + log.Errorf("ID duplicated. %s shared by %s and %s", engine.ID, old.Addr, engine.Addr) + } else { + log.Debugf("node %q (name: %q) with address %q is already registered", engine.ID, engine.Name, engine.Addr) + } + return false + } + + // Finally register the engine. + c.engines[engine.ID] = engine + if err := engine.RegisterEventHandler(c); err != nil { + log.Error(err) + } + return true +} + +// Entries are Docker Engines +func (c *Cluster) monitorDiscovery(ch <-chan discovery.Entries) { + // Watch for changes in the discovery channel. + log.Error("starting monitor") + for entries := range ch { + log.Errorf("got %v from monitor", entries) + + // Attempt to add every engine. `addEngine` will take care of duplicates. + // Since `addEngine` can be very slow (it has to connect to the + // engine), we are going to launch them in parallel. + for _, entry := range entries { + go c.addEngine(entry.String()) + } + } +} + // Images returns all the images in the cluster. func (c *Cluster) Images() []*cluster.Image { c.RLock() diff --git a/discovery/discovery.go b/discovery/discovery.go index f5f391d158..00c35bfbc7 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -25,20 +25,41 @@ func NewEntry(url string) (*Entry, error) { } // String returns the string form of an entry. -func (m Entry) String() string { +func (m *Entry) String() string { return fmt.Sprintf("%s:%s", m.Host, m.Port) } -// WatchCallback is the type of the function called to monitor entries -// on a discovery endpoint. -type WatchCallback func(entries []*Entry) +func (a *Entry) Equals(b *Entry) bool { + return a.Host == b.Host && a.Port == b.Port +} + +type Entries []*Entry + +func (a Entries) Equals(b Entries) bool { + // Check if the file has really changed. + if len(a) != len(b) { + return false + } + for i, _ := range a { + if !a[i].Equals(b[i]) { + return false + } + } + return true +} // The Discovery interface is implemented by Discovery backends which // manage swarm host entries. type Discovery interface { + // Initialize the discovery with URIs and a heartbeat. Initialize(string, uint64) error - Fetch() ([]*Entry, error) - Watch(WatchCallback) + + // Watch the discovery for entry changes. + // Returns a channel that will receive changes or an error. + // Providing a non-nil stopCh can be used to stop watching. + Watch(stopCh <-chan struct{}) (<-chan Entries, error) + + // Register to the discovery Register(string) error } @@ -92,8 +113,8 @@ func New(rawurl string, heartbeat uint64) (Discovery, error) { } // CreateEntries returns an array of entries based on the given addresses. -func CreateEntries(addrs []string) ([]*Entry, error) { - entries := []*Entry{} +func CreateEntries(addrs []string) (Entries, error) { + entries := Entries{} if addrs == nil { return entries, nil } diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go index 154086e3c3..5cbf3ffb86 100644 --- a/discovery/discovery_test.go +++ b/discovery/discovery_test.go @@ -40,7 +40,7 @@ func TestParse(t *testing.T) { func TestCreateEntries(t *testing.T) { entries, err := CreateEntries(nil) - assert.Equal(t, entries, []*Entry{}) + assert.Equal(t, entries, Entries{}) assert.NoError(t, err) entries, err = CreateEntries([]string{"127.0.0.1:2375", "127.0.0.2:2375", ""}) diff --git a/discovery/file/file.go b/discovery/file/file.go index ba43aee077..162e0c0309 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -5,6 +5,7 @@ import ( "strings" "time" + log "github.com/Sirupsen/logrus" "github.com/docker/swarm/discovery" ) @@ -46,23 +47,49 @@ func parseFileContent(content []byte) []string { return result } -// Fetch is exported -func (s *Discovery) Fetch() ([]*discovery.Entry, error) { +func (s *Discovery) fetch() (discovery.Entries, error) { fileContent, err := ioutil.ReadFile(s.path) if err != nil { + log.WithField("discovery", "file").Errorf("Failed to read '%s': %v", s.path, err) return nil, err } return discovery.CreateEntries(parseFileContent(fileContent)) } // Watch is exported -func (s *Discovery) Watch(callback discovery.WatchCallback) { - for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) { - entries, err := s.Fetch() +func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, error) { + ch := make(chan discovery.Entries) + ticker := time.NewTicker(time.Duration(s.heartbeat) * time.Second) + + go func() { + // Send the initial entries if available. + currentEntries, err := s.fetch() if err == nil { - callback(entries) + ch <- currentEntries } - } + + // Periodically send updates. + for { + select { + case <-ticker.C: + newEntries, err := s.fetch() + if err != nil { + continue + } + + // Check if the file has really changed. + if !newEntries.Equals(currentEntries) { + ch <- newEntries + } + currentEntries = newEntries + case <-stopCh: + ticker.Stop() + return + } + } + }() + + return ch, nil } // Register is exported diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go index 7882bfa1c7..da2602fe97 100644 --- a/discovery/kv/kv.go +++ b/discovery/kv/kv.go @@ -61,39 +61,63 @@ func (s *Discovery) Initialize(uris string, heartbeat uint64) error { return nil } -// Fetch is exported -func (s *Discovery) Fetch() ([]*discovery.Entry, error) { - addrs, err := s.store.List(s.prefix) - if err != nil { - return nil, err +// Watch the store until either there's a store error or we receive a stop request. +// Returns false if we shouldn't attempt watching the store anymore (stop request received). +func (s *Discovery) watchOnce(stopCh <-chan struct{}, watchCh <-chan []*store.KVPair, discoveryCh chan discovery.Entries) bool { + for { + select { + case pairs := <-watchCh: + if pairs == nil { + return true + } + + log.WithField("discovery", s.backend).Debugf("Watch triggered with %d nodes", len(pairs)) + + // Convert `KVPair` into `discovery.Entry`. + addrs := make([]string, len(pairs)) + for _, pair := range pairs { + addrs = append(addrs, string(pair.Value)) + } + + if entries, err := discovery.CreateEntries(addrs); err == nil { + discoveryCh <- entries + } + case <-stopCh: + // We were requested to stop watching. + return false + } } - return discovery.CreateEntries(convertToStringArray(addrs)) } // Watch is exported -func (s *Discovery) Watch(callback discovery.WatchCallback) { - ch, err := s.store.WatchTree(s.prefix, nil) - if err != nil { - log.WithField("discovery", s.backend).Errorf("Watch failed: %v", err) - return - } - for kv := range ch { - log.WithField("discovery", s.backend).Debugf("Watch triggered with %d nodes", len(kv)) - // Traduce byte array entries to discovery.Entry - entries, _ := discovery.CreateEntries(convertToStringArray(kv)) - callback(entries) - } +func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, error) { + ch := make(chan discovery.Entries) + go func() { + // Forever: Create a store watch, watch until we get an error and then try again. + // Will only stop if we receive a stopCh request. + for { + // Set up a watch. + watchCh, err := s.store.WatchTree(s.prefix, stopCh) + if err != nil { + log.WithField("discovery", s.backend).Errorf("Unable to set up a watch: %v", err) + } else { + if !s.watchOnce(stopCh, watchCh, ch) { + log.WithField("discovery", s.backend).Infof("Shutting down") + return + } + } + + // If we get here it means the store watch channel was closed. This + // is unexpected so let's retry later. + log.WithField("discovery", s.backend).Errorf("Unexpected watch error. Retrying in %s", time.Duration(s.heartbeat)) + time.Sleep(s.heartbeat) + } + }() + + return ch, nil } // Register is exported func (s *Discovery) Register(addr string) error { - err := s.store.Put(path.Join(s.prefix, addr), []byte(addr)) - return err -} - -func convertToStringArray(entries []*store.KVPair) (addrs []string) { - for _, entry := range entries { - addrs = append(addrs, string(entry.Value)) - } - return addrs + return s.store.Put(path.Join(s.prefix, addr), []byte(addr)) } diff --git a/discovery/nodes/nodes.go b/discovery/nodes/nodes.go index 009d275ff2..7f76d151dd 100644 --- a/discovery/nodes/nodes.go +++ b/discovery/nodes/nodes.go @@ -8,7 +8,7 @@ import ( // Discovery is exported type Discovery struct { - entries []*discovery.Entry + entries discovery.Entries } func init() { @@ -30,13 +30,14 @@ func (s *Discovery) Initialize(uris string, _ uint64) error { return nil } -// Fetch is exported -func (s *Discovery) Fetch() ([]*discovery.Entry, error) { - return s.entries, nil -} - // Watch is exported -func (s *Discovery) Watch(callback discovery.WatchCallback) { +func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, error) { + ch := make(chan discovery.Entries) + go func() { + ch <- s.entries + <-stopCh + }() + return ch, nil } // Register is exported diff --git a/discovery/token/token.go b/discovery/token/token.go index bada10ce52..793d24621b 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -9,6 +9,7 @@ import ( "strings" "time" + log "github.com/Sirupsen/logrus" "github.com/docker/swarm/discovery" ) @@ -45,8 +46,7 @@ func (s *Discovery) Initialize(urltoken string, heartbeat uint64) error { } // Fetch returns the list of entries for the discovery service at the specified endpoint -func (s *Discovery) Fetch() ([]*discovery.Entry, error) { - +func (s *Discovery) fetch() (discovery.Entries, error) { resp, err := http.Get(fmt.Sprintf("%s/%s/%s", s.url, "clusters", s.token)) if err != nil { return nil, err @@ -57,23 +57,52 @@ func (s *Discovery) Fetch() ([]*discovery.Entry, error) { var addrs []string if resp.StatusCode == http.StatusOK { if err := json.NewDecoder(resp.Body).Decode(&addrs); err != nil { + log.WithField("discovery", "token").Errorf("Failed to decode response: %v", err) return nil, err } } else { - return nil, fmt.Errorf("Failed to fetch entries, Discovery service returned %d HTTP status code", resp.StatusCode) + err := fmt.Errorf("Failed to fetch entries, Discovery service returned %d HTTP status code", resp.StatusCode) + log.WithField("discovery", "token").Error(err) + return nil, err } return discovery.CreateEntries(addrs) } // Watch is exported -func (s *Discovery) Watch(callback discovery.WatchCallback) { - for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) { - entries, err := s.Fetch() +func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, error) { + ch := make(chan discovery.Entries) + ticker := time.NewTicker(time.Duration(s.heartbeat) * time.Second) + + go func() { + // Send the initial entries if available. + currentEntries, err := s.fetch() if err == nil { - callback(entries) + ch <- currentEntries } - } + + // Periodically send updates. + for { + select { + case <-ticker.C: + newEntries, err := s.fetch() + if err != nil { + continue + } + + // Check if the file has really changed. + if !newEntries.Equals(currentEntries) { + ch <- newEntries + } + currentEntries = newEntries + case <-stopCh: + ticker.Stop() + return + } + } + }() + + return ch, nil } // Register adds a new entry identified by the into the discovery service diff --git a/discovery/token/token_test.go b/discovery/token/token_test.go index a4e1f42edb..14d7019110 100644 --- a/discovery/token/token_test.go +++ b/discovery/token/token_test.go @@ -1,8 +1,11 @@ package token import ( + "log" "testing" + "time" + "github.com/docker/swarm/discovery" "github.com/stretchr/testify/assert" ) @@ -23,14 +26,24 @@ func TestInitialize(t *testing.T) { } func TestRegister(t *testing.T) { - discovery := &Discovery{token: "TEST_TOKEN", url: DiscoveryURL} + d := &Discovery{token: "TEST_TOKEN", url: DiscoveryURL, heartbeat: 1} expected := "127.0.0.1:2675" - assert.NoError(t, discovery.Register(expected)) - - addrs, err := discovery.Fetch() + expectedEntries, err := discovery.CreateEntries([]string{expected}) assert.NoError(t, err) - assert.Equal(t, len(addrs), 1) - assert.Equal(t, addrs[0].String(), expected) - assert.NoError(t, discovery.Register(expected)) + // Register + assert.NoError(t, d.Register(expected)) + + // Watch + ch, err := d.Watch(nil) + assert.NoError(t, err) + select { + case entries := <-ch: + log.Printf("%v %v", entries, expectedEntries) + assert.True(t, entries.Equals(expectedEntries)) + case <-time.After(2 * time.Second): + t.Fatal("Timed out") + } + + assert.NoError(t, d.Register(expected)) } From 3de0b5f56710cf3d90a2f24d0c947c90f0d11413 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Sat, 16 May 2015 12:28:55 -0700 Subject: [PATCH 02/16] join: Don't give up if registration fails, just retry. Signed-off-by: Andrea Luzzardi --- cli/join.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/cli/join.go b/cli/join.go index dc7fe31801..9e6c9d9511 100644 --- a/cli/join.go +++ b/cli/join.go @@ -37,16 +37,12 @@ func join(c *cli.Context) { log.Fatal("--addr should be of the form ip:port or hostname:port") } - if err := d.Register(addr); err != nil { - log.Fatal(err) - } - hbval := time.Duration(hb) * time.Second for { log.WithFields(log.Fields{"addr": addr, "discovery": dflag}).Infof("Registering on the discovery service every %s...", hbval) - time.Sleep(hbval) if err := d.Register(addr); err != nil { log.Error(err) } + time.Sleep(hbval) } } From 2a27adaea1bf2b9934a351d67f0ac4cda45fd3aa Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Sat, 16 May 2015 12:30:31 -0700 Subject: [PATCH 03/16] discovery integration: No need to wait anymore Signed-off-by: Andrea Luzzardi --- test/integration/discovery-consul.bats | 9 --------- test/integration/discovery-etcd.bats | 9 --------- test/integration/discovery-zk.bats | 12 ++---------- test/integration/helpers.bash | 2 +- 4 files changed, 3 insertions(+), 29 deletions(-) diff --git a/test/integration/discovery-consul.bats b/test/integration/discovery-consul.bats index 1d5c42535b..b5848da8f7 100644 --- a/test/integration/discovery-consul.bats +++ b/test/integration/discovery-consul.bats @@ -8,17 +8,8 @@ CONSUL_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 8000 )) # Container name for integration test CONTAINER_NAME=swarm_consul -function check_leader() { - # Confirm Cluster leader election - docker_host logs $CONTAINER_NAME | grep "New leader elected" - # Check member state - docker_host logs $CONTAINER_NAME | grep "consul: member '$CONTAINER_NAME' joined, marking health alive" -} - function start_consul() { docker_host run --name=$CONTAINER_NAME -h $CONTAINER_NAME -p $CONSUL_HOST:8500 -d progrium/consul -server -bootstrap-expect 1 -data-dir /test - # Check if consul cluster leader is elected - retry 30 1 check_leader } function stop_consul() { diff --git a/test/integration/discovery-etcd.bats b/test/integration/discovery-etcd.bats index 19c2ba9301..49e55d9da3 100644 --- a/test/integration/discovery-etcd.bats +++ b/test/integration/discovery-etcd.bats @@ -8,17 +8,8 @@ ETCD_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 9000 )) # Container name for integration test CONTAINER_NAME=swarm_etcd -function check_leader() { - # Confirm Cluster leader election - docker_host logs $CONTAINER_NAME | grep "state changed from 'follower' to 'leader'" - # Check leader event - docker_host logs $CONTAINER_NAME | grep "leader changed from '' to" -} - function start_etcd() { docker_host run -p $ETCD_HOST:4001 --name=$CONTAINER_NAME -d coreos/etcd - # Check if etcd cluster leader is elected - retry 30 1 check_leader } function stop_etcd() { diff --git a/test/integration/discovery-zk.bats b/test/integration/discovery-zk.bats index bfe78a9c84..b8c6ef1800 100644 --- a/test/integration/discovery-zk.bats +++ b/test/integration/discovery-zk.bats @@ -8,20 +8,12 @@ ZK_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 7000 )) # Container name for integration test ZK_CONTAINER_NAME=swarm_integration_zk -function check_zk_started() { - docker_host logs $ZK_CONTAINER_NAME | grep "binding to port 0.0.0.0/0.0.0.0:2181" -} - function start_zk() { - run docker_host run --name $ZK_CONTAINER_NAME -p $ZK_HOST:2181 -d jplock/zookeeper - [ "$status" -eq 0 ] - - retry 30 1 check_zk_started + docker_host run --name $ZK_CONTAINER_NAME -p $ZK_HOST:2181 -d jplock/zookeeper } function stop_zk() { - run docker_host rm -f -v $ZK_CONTAINER_NAME - [ "$status" -eq 0 ] + docker_host rm -f -v $ZK_CONTAINER_NAME } function setup() { diff --git a/test/integration/helpers.bash b/test/integration/helpers.bash index d183072570..39ca6224cb 100644 --- a/test/integration/helpers.bash +++ b/test/integration/helpers.bash @@ -124,7 +124,7 @@ function swarm_join() { for ((i=current; i < nodes; i++)); do local h="${HOSTS[$i]}" echo "Swarm join #${i}: $h $addr" - "$SWARM_BINARY" join --addr="$h" "$addr" & + "$SWARM_BINARY" join --heartbeat=1 --addr="$h" "$addr" & SWARM_JOIN_PID[$i]=$! done retry 10 0.5 check_discovery_nodes "$addr" From 5756e83fcb1b047af21047e4c6fad15a04f4c1e7 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Sat, 16 May 2015 13:37:00 -0700 Subject: [PATCH 04/16] store/consul: Watch: Use a WaitTime so we can check stopCh. Signed-off-by: Andrea Luzzardi --- pkg/store/consul.go | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/pkg/store/consul.go b/pkg/store/consul.go index 32cddf0a93..9932f3cb15 100644 --- a/pkg/store/consul.go +++ b/pkg/store/consul.go @@ -10,6 +10,10 @@ import ( api "github.com/hashicorp/consul/api" ) +const ( + DefaultWatchWaitTime = 15 * time.Second +) + // Consul embeds the client and watches type Consul struct { config *api.Config @@ -145,7 +149,9 @@ func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, erro go func() { defer close(watchCh) - opts := &api.QueryOptions{} + // Use a wait time in order to check if we should quit from time to + // time. + opts := &api.QueryOptions{WaitTime: DefaultWatchWaitTime} for { // Check if we should quit select { @@ -158,6 +164,11 @@ func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, erro log.Errorf("consul: %v", err) return } + // If LastIndex didn't change then it means `Get` returned because + // of the WaitTime and the key didn't change. + if opts.WaitIndex == meta.LastIndex { + continue + } opts.WaitIndex = meta.LastIndex // FIXME: What happens when a key is deleted? if pair != nil { @@ -181,7 +192,9 @@ func (s *Consul) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVP go func() { defer close(watchCh) - opts := &api.QueryOptions{} + // Use a wait time in order to check if we should quit from time to + // time. + opts := &api.QueryOptions{WaitTime: DefaultWatchWaitTime} for { // Check if we should quit select { @@ -195,6 +208,12 @@ func (s *Consul) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVP log.Errorf("consul: %v", err) return } + // If LastIndex didn't change then it means `Get` returned because + // of the WaitTime and the key didn't change. + if opts.WaitIndex == meta.LastIndex { + continue + } + opts.WaitIndex = meta.LastIndex kv := []*KVPair{} for _, pair := range pairs { if pair.Key == prefix { @@ -202,7 +221,6 @@ func (s *Consul) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVP } kv = append(kv, &KVPair{pair.Key, pair.Value, pair.ModifyIndex}) } - opts.WaitIndex = meta.LastIndex watchCh <- kv } }() From f33c03af939a9d4f5eab8e79a2cb75bb3f4eb990 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Sat, 16 May 2015 13:47:15 -0700 Subject: [PATCH 05/16] discovery: Push watch errors to a channel Signed-off-by: Andrea Luzzardi --- cli/cli.go | 23 ++++++++++++++++------- cli/flags.go | 2 +- cluster/swarm/cluster.go | 31 +++++++++++++++---------------- discovery/discovery.go | 2 +- discovery/file/file.go | 11 ++++++----- discovery/kv/kv.go | 21 ++++++++++++--------- discovery/nodes/nodes.go | 2 +- discovery/token/token.go | 16 ++++++++-------- discovery/token/token_test.go | 9 ++++----- 9 files changed, 64 insertions(+), 53 deletions(-) diff --git a/cli/cli.go b/cli/cli.go index 9227ac1d96..b8ac5c601b 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path" + "strconv" "time" log "github.com/Sirupsen/logrus" @@ -77,28 +78,36 @@ func Run() { Name: "list", ShortName: "l", Usage: "list nodes in a cluster", + Flags: []cli.Flag{ + cli.IntFlag{ + Name: "timeout", + Value: 10, + }, + }, Action: func(c *cli.Context) { dflag := getDiscovery(c) if dflag == "" { log.Fatalf("discovery required to list a cluster. See '%s list --help'.", c.App.Name) } + timeout, err := strconv.ParseUint(c.String("timeout"), 0, 32) + if timeout < 1 || err != nil { + log.Fatal("--timeout should be an unsigned integer and greater than 0") + } - // FIXME Add and use separate timeout flag instead of forcing it - d, err := discovery.New(dflag, 10) + d, err := discovery.New(dflag, timeout) if err != nil { log.Fatal(err) } - ch, err := d.Watch(nil) - if err != nil { - log.Fatal(err) - } + ch, errCh := d.Watch(nil) select { case entries := <-ch: for _, entry := range entries { fmt.Println(entry) } - case <-time.After(10 * time.Second): + case err := <-errCh: + log.Fatal(err) + case <-time.After(time.Duration(timeout) * time.Second): log.Fatal("Timed out") } }, diff --git a/cli/flags.go b/cli/flags.go index 9b2485d944..2690fcefe9 100644 --- a/cli/flags.go +++ b/cli/flags.go @@ -51,7 +51,7 @@ var ( flHeartBeat = cli.IntFlag{ Name: "heartbeat, hb", Value: 25, - Usage: "time in second between each heartbeat", + Usage: "time in seconds between each heartbeat", } flEnableCors = cli.BoolFlag{ Name: "api-enable-cors, cors", diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index 58c188149b..0e5ad422b7 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -68,11 +68,8 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *t if err != nil { log.Fatal(err) } - discoveryCh, err := cluster.discovery.Watch(nil) - if err != nil { - log.Fatal(err) - } - go cluster.monitorDiscovery(discoveryCh) + discoveryCh, errCh := cluster.discovery.Watch(nil) + go cluster.monitorDiscovery(discoveryCh, errCh) return cluster, nil } @@ -211,17 +208,19 @@ func (c *Cluster) addEngine(addr string) bool { } // Entries are Docker Engines -func (c *Cluster) monitorDiscovery(ch <-chan discovery.Entries) { - // Watch for changes in the discovery channel. - log.Error("starting monitor") - for entries := range ch { - log.Errorf("got %v from monitor", entries) - - // Attempt to add every engine. `addEngine` will take care of duplicates. - // Since `addEngine` can be very slow (it has to connect to the - // engine), we are going to launch them in parallel. - for _, entry := range entries { - go c.addEngine(entry.String()) +func (c *Cluster) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error) { + // Watch changes on the discovery channel. + for { + select { + case entries := <-ch: + // Attempt to add every engine. `addEngine` will take care of duplicates. + // Since `addEngine` can be very slow (it has to connect to the + // engine), we are going to launch them in parallel. + for _, entry := range entries { + go c.addEngine(entry.String()) + } + case err := <-errCh: + log.Errorf("Discovery error: %v", err) } } } diff --git a/discovery/discovery.go b/discovery/discovery.go index 00c35bfbc7..0baf68fc01 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -57,7 +57,7 @@ type Discovery interface { // Watch the discovery for entry changes. // Returns a channel that will receive changes or an error. // Providing a non-nil stopCh can be used to stop watching. - Watch(stopCh <-chan struct{}) (<-chan Entries, error) + Watch(stopCh <-chan struct{}) (<-chan Entries, <-chan error) // Register to the discovery Register(string) error diff --git a/discovery/file/file.go b/discovery/file/file.go index 162e0c0309..12d46f0610 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -1,11 +1,11 @@ package file import ( + "fmt" "io/ioutil" "strings" "time" - log "github.com/Sirupsen/logrus" "github.com/docker/swarm/discovery" ) @@ -50,15 +50,15 @@ func parseFileContent(content []byte) []string { func (s *Discovery) fetch() (discovery.Entries, error) { fileContent, err := ioutil.ReadFile(s.path) if err != nil { - log.WithField("discovery", "file").Errorf("Failed to read '%s': %v", s.path, err) - return nil, err + return nil, fmt.Errorf("failed to read '%s': %v", s.path, err) } return discovery.CreateEntries(parseFileContent(fileContent)) } // Watch is exported -func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, error) { +func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { ch := make(chan discovery.Entries) + errCh := make(chan error) ticker := time.NewTicker(time.Duration(s.heartbeat) * time.Second) go func() { @@ -74,6 +74,7 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, err case <-ticker.C: newEntries, err := s.fetch() if err != nil { + errCh <- err continue } @@ -89,7 +90,7 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, err } }() - return ch, nil + return ch, errCh } // Register is exported diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go index da2602fe97..a754f825d1 100644 --- a/discovery/kv/kv.go +++ b/discovery/kv/kv.go @@ -63,7 +63,7 @@ func (s *Discovery) Initialize(uris string, heartbeat uint64) error { // Watch the store until either there's a store error or we receive a stop request. // Returns false if we shouldn't attempt watching the store anymore (stop request received). -func (s *Discovery) watchOnce(stopCh <-chan struct{}, watchCh <-chan []*store.KVPair, discoveryCh chan discovery.Entries) bool { +func (s *Discovery) watchOnce(stopCh <-chan struct{}, watchCh <-chan []*store.KVPair, discoveryCh chan discovery.Entries, errCh chan error) bool { for { select { case pairs := <-watchCh: @@ -79,7 +79,10 @@ func (s *Discovery) watchOnce(stopCh <-chan struct{}, watchCh <-chan []*store.KV addrs = append(addrs, string(pair.Value)) } - if entries, err := discovery.CreateEntries(addrs); err == nil { + entries, err := discovery.CreateEntries(addrs) + if err != nil { + errCh <- err + } else { discoveryCh <- entries } case <-stopCh: @@ -90,8 +93,10 @@ func (s *Discovery) watchOnce(stopCh <-chan struct{}, watchCh <-chan []*store.KV } // Watch is exported -func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, error) { +func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { ch := make(chan discovery.Entries) + errCh := make(chan error) + go func() { // Forever: Create a store watch, watch until we get an error and then try again. // Will only stop if we receive a stopCh request. @@ -99,22 +104,20 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, err // Set up a watch. watchCh, err := s.store.WatchTree(s.prefix, stopCh) if err != nil { - log.WithField("discovery", s.backend).Errorf("Unable to set up a watch: %v", err) + errCh <- err } else { - if !s.watchOnce(stopCh, watchCh, ch) { - log.WithField("discovery", s.backend).Infof("Shutting down") + if !s.watchOnce(stopCh, watchCh, ch, errCh) { return } } // If we get here it means the store watch channel was closed. This // is unexpected so let's retry later. - log.WithField("discovery", s.backend).Errorf("Unexpected watch error. Retrying in %s", time.Duration(s.heartbeat)) + errCh <- fmt.Errorf("Unexpected watch error") time.Sleep(s.heartbeat) } }() - - return ch, nil + return ch, errCh } // Register is exported diff --git a/discovery/nodes/nodes.go b/discovery/nodes/nodes.go index 7f76d151dd..07f963135c 100644 --- a/discovery/nodes/nodes.go +++ b/discovery/nodes/nodes.go @@ -31,7 +31,7 @@ func (s *Discovery) Initialize(uris string, _ uint64) error { } // Watch is exported -func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, error) { +func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { ch := make(chan discovery.Entries) go func() { ch <- s.entries diff --git a/discovery/token/token.go b/discovery/token/token.go index 793d24621b..f784f36ecf 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -9,7 +9,6 @@ import ( "strings" "time" - log "github.com/Sirupsen/logrus" "github.com/docker/swarm/discovery" ) @@ -57,27 +56,27 @@ func (s *Discovery) fetch() (discovery.Entries, error) { var addrs []string if resp.StatusCode == http.StatusOK { if err := json.NewDecoder(resp.Body).Decode(&addrs); err != nil { - log.WithField("discovery", "token").Errorf("Failed to decode response: %v", err) - return nil, err + return nil, fmt.Errorf("Failed to decode response: %v", err) } } else { - err := fmt.Errorf("Failed to fetch entries, Discovery service returned %d HTTP status code", resp.StatusCode) - log.WithField("discovery", "token").Error(err) - return nil, err + return nil, fmt.Errorf("Failed to fetch entries, Discovery service returned %d HTTP status code", resp.StatusCode) } return discovery.CreateEntries(addrs) } // Watch is exported -func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, error) { +func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { ch := make(chan discovery.Entries) ticker := time.NewTicker(time.Duration(s.heartbeat) * time.Second) + errCh := make(chan error) go func() { // Send the initial entries if available. currentEntries, err := s.fetch() - if err == nil { + if err != nil { + errCh <- err + } else { ch <- currentEntries } @@ -87,6 +86,7 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, err case <-ticker.C: newEntries, err := s.fetch() if err != nil { + errCh <- err continue } diff --git a/discovery/token/token_test.go b/discovery/token/token_test.go index 14d7019110..47d9d8856c 100644 --- a/discovery/token/token_test.go +++ b/discovery/token/token_test.go @@ -1,7 +1,6 @@ package token import ( - "log" "testing" "time" @@ -35,13 +34,13 @@ func TestRegister(t *testing.T) { assert.NoError(t, d.Register(expected)) // Watch - ch, err := d.Watch(nil) - assert.NoError(t, err) + ch, errCh := d.Watch(nil) select { case entries := <-ch: - log.Printf("%v %v", entries, expectedEntries) assert.True(t, entries.Equals(expectedEntries)) - case <-time.After(2 * time.Second): + case err := <-errCh: + t.Fatal(err) + case <-time.After(5 * time.Second): t.Fatal("Timed out") } From 0c72bcaf40c426196a7e20738b97619bf602a2b5 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Sat, 16 May 2015 14:04:56 -0700 Subject: [PATCH 06/16] discovery cleanup: heartbeat is a time.Duration. - Use a time.Duration instead of a uint64 for hb - Flags can accept durations: --heartbeat 25s Signed-off-by: Andrea Luzzardi --- cli/cli.go | 11 +++++------ cli/flags.go | 6 +++--- cli/help.go | 2 +- cli/join.go | 11 ++++++----- cluster/swarm/cluster.go | 13 +++++++------ discovery/discovery.go | 5 +++-- discovery/file/file.go | 6 +++--- discovery/kv/kv.go | 4 ++-- discovery/nodes/nodes.go | 3 ++- discovery/token/token.go | 6 +++--- 10 files changed, 35 insertions(+), 32 deletions(-) diff --git a/cli/cli.go b/cli/cli.go index b8ac5c601b..c069b0d98a 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -4,7 +4,6 @@ import ( "fmt" "os" "path" - "strconv" "time" log "github.com/Sirupsen/logrus" @@ -79,9 +78,9 @@ func Run() { ShortName: "l", Usage: "list nodes in a cluster", Flags: []cli.Flag{ - cli.IntFlag{ + cli.StringFlag{ Name: "timeout", - Value: 10, + Value: "10s", }, }, Action: func(c *cli.Context) { @@ -89,9 +88,9 @@ func Run() { if dflag == "" { log.Fatalf("discovery required to list a cluster. See '%s list --help'.", c.App.Name) } - timeout, err := strconv.ParseUint(c.String("timeout"), 0, 32) - if timeout < 1 || err != nil { - log.Fatal("--timeout should be an unsigned integer and greater than 0") + timeout, err := time.ParseDuration(c.String("timeout")) + if err != nil { + log.Fatalf("invalid --timeout: %v", err) } d, err := discovery.New(dflag, timeout) diff --git a/cli/flags.go b/cli/flags.go index 2690fcefe9..822b937020 100644 --- a/cli/flags.go +++ b/cli/flags.go @@ -48,10 +48,10 @@ var ( Usage: "ip/socket to listen on", EnvVar: "SWARM_HOST", } - flHeartBeat = cli.IntFlag{ + flHeartBeat = cli.StringFlag{ Name: "heartbeat, hb", - Value: 25, - Usage: "time in seconds between each heartbeat", + Value: "25s", + Usage: "period between each heartbeat", } flEnableCors = cli.BoolFlag{ Name: "api-enable-cors, cors", diff --git a/cli/help.go b/cli/help.go index 6c13e4fff2..9f4fe25bf8 100644 --- a/cli/help.go +++ b/cli/help.go @@ -26,7 +26,7 @@ ARGUMENTS: OPTIONS: {{range .Flags}}{{.}} {{end}}{{if (eq .Name "manage")}}{{printf "\t * swarm.overcommit=0.05\tovercommit to apply on resources"}} - {{printf "\t * swarm.discovery.heartbeat=25\ttime in second between each heartbeat"}}{{end}}{{ end }} + {{printf "\t * swarm.discovery.heartbeat=25s\tperiod between each heartbeat"}}{{end}}{{ end }} ` } diff --git a/cli/join.go b/cli/join.go index 9e6c9d9511..bd021ce1ee 100644 --- a/cli/join.go +++ b/cli/join.go @@ -2,7 +2,6 @@ package cli import ( "regexp" - "strconv" "time" log "github.com/Sirupsen/logrus" @@ -21,11 +20,13 @@ func join(c *cli.Context) { log.Fatalf("discovery required to join a cluster. See '%s join --help'.", c.App.Name) } - hb, err := strconv.ParseUint(c.String("heartbeat"), 0, 32) - if hb < 1 || err != nil { - log.Fatal("--heartbeat should be an unsigned integer and greater than 0") + hb, err := time.ParseDuration(c.String("heartbeat")) + if err != nil { + log.Fatalf("invalid --heartbeat: %v", err) + } + if hb < 1*time.Second { + log.Fatal("--heartbeat should be at least one second") } - d, err := discovery.New(dflag, hb) if err != nil { log.Fatal(err) diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index 0e5ad422b7..3cc8b3fb80 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -8,6 +8,7 @@ import ( "sort" "strings" "sync" + "time" log "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/stringid" @@ -54,14 +55,14 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *t cluster.overcommitRatio = val } - heartbeat := uint64(25) - if opt, ok := options.Uint("swarm.discovery.heartbeat"); ok { - if opt < 1 { - return nil, errors.New("heartbeat should be an unsigned integer and greater than 0") + heartbeat := 25 * time.Second + if opt, ok := options.String("swarm.discovery.heartbeat"); ok { + h, err := time.ParseDuration(opt) + if err != nil { + return nil, err } - heartbeat = opt + heartbeat = h } - log.Errorf("chb: %d", heartbeat) // Set up discovery. cluster.discovery, err = discovery.New(dflag, heartbeat) diff --git a/discovery/discovery.go b/discovery/discovery.go index 0baf68fc01..4923b5e800 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -5,6 +5,7 @@ import ( "fmt" "net" "strings" + "time" log "github.com/Sirupsen/logrus" ) @@ -52,7 +53,7 @@ func (a Entries) Equals(b Entries) bool { // manage swarm host entries. type Discovery interface { // Initialize the discovery with URIs and a heartbeat. - Initialize(string, uint64) error + Initialize(string, time.Duration) error // Watch the discovery for entry changes. // Returns a channel that will receive changes or an error. @@ -100,7 +101,7 @@ func parse(rawurl string) (string, string) { // New returns a new Discovery given a URL and heartbeat settings. // Returns an error if the URL scheme is not supported. -func New(rawurl string, heartbeat uint64) (Discovery, error) { +func New(rawurl string, heartbeat time.Duration) (Discovery, error) { scheme, uri := parse(rawurl) if discovery, exists := discoveries[scheme]; exists { diff --git a/discovery/file/file.go b/discovery/file/file.go index 12d46f0610..3e867800b0 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -11,7 +11,7 @@ import ( // Discovery is exported type Discovery struct { - heartbeat uint64 + heartbeat time.Duration path string } @@ -20,7 +20,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(path string, heartbeat uint64) error { +func (s *Discovery) Initialize(path string, heartbeat time.Duration) error { s.path = path s.heartbeat = heartbeat return nil @@ -59,7 +59,7 @@ func (s *Discovery) fetch() (discovery.Entries, error) { func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { ch := make(chan discovery.Entries) errCh := make(chan error) - ticker := time.NewTicker(time.Duration(s.heartbeat) * time.Second) + ticker := time.NewTicker(s.heartbeat) go func() { // Send the initial entries if available. diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go index a754f825d1..1318681d0d 100644 --- a/discovery/kv/kv.go +++ b/discovery/kv/kv.go @@ -26,7 +26,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(uris string, heartbeat uint64) error { +func (s *Discovery) Initialize(uris string, heartbeat time.Duration) error { var ( parts = strings.SplitN(uris, "/", 2) ips = strings.Split(parts[0], ",") @@ -42,7 +42,7 @@ func (s *Discovery) Initialize(uris string, heartbeat uint64) error { addrs = append(addrs, ip) } - s.heartbeat = time.Duration(heartbeat) * time.Second + s.heartbeat = heartbeat s.prefix = parts[1] // Creates a new store, will ignore options given diff --git a/discovery/nodes/nodes.go b/discovery/nodes/nodes.go index 07f963135c..e56b027ce1 100644 --- a/discovery/nodes/nodes.go +++ b/discovery/nodes/nodes.go @@ -2,6 +2,7 @@ package nodes import ( "strings" + "time" "github.com/docker/swarm/discovery" ) @@ -16,7 +17,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(uris string, _ uint64) error { +func (s *Discovery) Initialize(uris string, _ time.Duration) error { for _, input := range strings.Split(uris, ",") { for _, ip := range discovery.Generate(input) { entry, err := discovery.NewEntry(ip) diff --git a/discovery/token/token.go b/discovery/token/token.go index f784f36ecf..c1a7c5e99c 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -17,7 +17,7 @@ const DiscoveryURL = "https://discovery-stage.hub.docker.com/v1" // Discovery is exported type Discovery struct { - heartbeat uint64 + heartbeat time.Duration url string token string } @@ -27,7 +27,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(urltoken string, heartbeat uint64) error { +func (s *Discovery) Initialize(urltoken string, heartbeat time.Duration) error { if i := strings.LastIndex(urltoken, "/"); i != -1 { s.url = "https://" + urltoken[:i] s.token = urltoken[i+1:] @@ -68,7 +68,7 @@ func (s *Discovery) fetch() (discovery.Entries, error) { // Watch is exported func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { ch := make(chan discovery.Entries) - ticker := time.NewTicker(time.Duration(s.heartbeat) * time.Second) + ticker := time.NewTicker(s.heartbeat) errCh := make(chan error) go func() { From 7430fe5141adac1456090c2f1cf570c7707cf365 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Sat, 16 May 2015 14:39:31 -0700 Subject: [PATCH 07/16] fix hb/timeout being time.Duration in a few places Signed-off-by: Andrea Luzzardi --- cli/cli.go | 2 +- cli/join.go | 5 ++--- test/integration/helpers.bash | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/cli/cli.go b/cli/cli.go index c069b0d98a..2accbd149a 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -106,7 +106,7 @@ func Run() { } case err := <-errCh: log.Fatal(err) - case <-time.After(time.Duration(timeout) * time.Second): + case <-time.After(timeout): log.Fatal("Timed out") } }, diff --git a/cli/join.go b/cli/join.go index bd021ce1ee..7a151a5215 100644 --- a/cli/join.go +++ b/cli/join.go @@ -38,12 +38,11 @@ func join(c *cli.Context) { log.Fatal("--addr should be of the form ip:port or hostname:port") } - hbval := time.Duration(hb) * time.Second for { - log.WithFields(log.Fields{"addr": addr, "discovery": dflag}).Infof("Registering on the discovery service every %s...", hbval) + log.WithFields(log.Fields{"addr": addr, "discovery": dflag}).Infof("Registering on the discovery service every %s...", hb) if err := d.Register(addr); err != nil { log.Error(err) } - time.Sleep(hbval) + time.Sleep(hb) } } diff --git a/test/integration/helpers.bash b/test/integration/helpers.bash index 39ca6224cb..daef4e7669 100644 --- a/test/integration/helpers.bash +++ b/test/integration/helpers.bash @@ -99,7 +99,7 @@ function swarm_manage() { discovery="$@" fi - "$SWARM_BINARY" manage -H "$SWARM_HOST" --cluster-opt "swarm.discovery.heartbeat=1" "$discovery" & + "$SWARM_BINARY" manage -H "$SWARM_HOST" --cluster-opt "swarm.discovery.heartbeat=1s" "$discovery" & SWARM_PID=$! wait_until_reachable "$SWARM_HOST" retry 10 1 check_swarm_nodes @@ -124,7 +124,7 @@ function swarm_join() { for ((i=current; i < nodes; i++)); do local h="${HOSTS[$i]}" echo "Swarm join #${i}: $h $addr" - "$SWARM_BINARY" join --heartbeat=1 --addr="$h" "$addr" & + "$SWARM_BINARY" join --heartbeat=1s --addr="$h" "$addr" & SWARM_JOIN_PID[$i]=$! done retry 10 0.5 check_discovery_nodes "$addr" From 9179ed3d34d598b1c9ace6a3bd7f6bb0ba781efb Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Sat, 16 May 2015 15:47:03 -0700 Subject: [PATCH 08/16] integration: move discovery into its own folder Signed-off-by: Andrea Luzzardi --- .../{discovery-consul.bats => discovery/consul.bats} | 2 +- test/integration/{discovery-etcd.bats => discovery/etcd.bats} | 2 +- test/integration/{discovery-file.bats => discovery/file.bats} | 2 +- test/integration/{discovery-token.bats => discovery/token.bats} | 2 +- test/integration/{discovery-zk.bats => discovery/zk.bats} | 2 +- test/integration/test_runner.sh | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) rename test/integration/{discovery-consul.bats => discovery/consul.bats} (98%) rename test/integration/{discovery-etcd.bats => discovery/etcd.bats} (98%) rename test/integration/{discovery-file.bats => discovery/file.bats} (97%) rename test/integration/{discovery-token.bats => discovery/token.bats} (98%) rename test/integration/{discovery-zk.bats => discovery/zk.bats} (98%) diff --git a/test/integration/discovery-consul.bats b/test/integration/discovery/consul.bats similarity index 98% rename from test/integration/discovery-consul.bats rename to test/integration/discovery/consul.bats index b5848da8f7..3df457d324 100644 --- a/test/integration/discovery-consul.bats +++ b/test/integration/discovery/consul.bats @@ -1,6 +1,6 @@ #!/usr/bin/env bats -load helpers +load ../helpers # Address on which Consul will listen (random port between 8000 and 9000). CONSUL_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 8000 )) diff --git a/test/integration/discovery-etcd.bats b/test/integration/discovery/etcd.bats similarity index 98% rename from test/integration/discovery-etcd.bats rename to test/integration/discovery/etcd.bats index 49e55d9da3..a591a7d05f 100644 --- a/test/integration/discovery-etcd.bats +++ b/test/integration/discovery/etcd.bats @@ -1,6 +1,6 @@ #!/usr/bin/env bats -load helpers +load ../helpers # Address on which Etcd will listen (random port between 9000 and 10,000). ETCD_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 9000 )) diff --git a/test/integration/discovery-file.bats b/test/integration/discovery/file.bats similarity index 97% rename from test/integration/discovery-file.bats rename to test/integration/discovery/file.bats index 98b91557c9..1641aa39f4 100644 --- a/test/integration/discovery-file.bats +++ b/test/integration/discovery/file.bats @@ -1,6 +1,6 @@ #!/usr/bin/env bats -load helpers +load ../helpers # create a blank temp file for discovery DISCOVERY_FILE=$(mktemp) diff --git a/test/integration/discovery-token.bats b/test/integration/discovery/token.bats similarity index 98% rename from test/integration/discovery-token.bats rename to test/integration/discovery/token.bats index 877954fbf9..e44e45ce9e 100644 --- a/test/integration/discovery-token.bats +++ b/test/integration/discovery/token.bats @@ -1,6 +1,6 @@ #!/usr/bin/env bats -load helpers +load ../helpers TOKEN="" diff --git a/test/integration/discovery-zk.bats b/test/integration/discovery/zk.bats similarity index 98% rename from test/integration/discovery-zk.bats rename to test/integration/discovery/zk.bats index b8c6ef1800..0a0837edfd 100644 --- a/test/integration/discovery-zk.bats +++ b/test/integration/discovery/zk.bats @@ -1,6 +1,6 @@ #!/usr/bin/env bats -load helpers +load ../helpers # Address on which Zookeeper will listen (random port between 7000 and 8000). ZK_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 7000 )) diff --git a/test/integration/test_runner.sh b/test/integration/test_runner.sh index 07396d172e..65a2c7b316 100755 --- a/test/integration/test_runner.sh +++ b/test/integration/test_runner.sh @@ -12,7 +12,7 @@ function execute() { } # Tests to run. Defaults to all. -TESTS=${@:-. api} +TESTS=${@:-. discovery api} # Generate a temporary binary for the tests. export SWARM_BINARY=`mktemp` From ac18ef381da5bef4d93f7892a722dd0e56004763 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Sat, 16 May 2015 16:34:18 -0700 Subject: [PATCH 09/16] integration: cover more grounds in discovery testing. Signed-off-by: Andrea Luzzardi --- test/integration/discovery/consul.bats | 80 +++++++++++++----- .../discovery/discovery_helpers.bash | 17 ++++ test/integration/discovery/etcd.bats | 81 +++++++++++++----- test/integration/discovery/file.bats | 73 ++++++++++++---- test/integration/discovery/token.bats | 47 +++++++---- test/integration/discovery/zk.bats | 84 ++++++++++++++----- test/integration/helpers.bash | 21 +---- 7 files changed, 287 insertions(+), 116 deletions(-) create mode 100644 test/integration/discovery/discovery_helpers.bash diff --git a/test/integration/discovery/consul.bats b/test/integration/discovery/consul.bats index 3df457d324..8701c8440c 100644 --- a/test/integration/discovery/consul.bats +++ b/test/integration/discovery/consul.bats @@ -1,43 +1,81 @@ #!/usr/bin/env bats -load ../helpers +load discovery_helpers -# Address on which Consul will listen (random port between 8000 and 9000). -CONSUL_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 8000 )) +# Address on which the store will listen (random port between 8000 and 9000). +STORE_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 8000 )) + +# Discovery parameter for Swarm +DISCOVERY="consul://${STORE_HOST}/test" # Container name for integration test CONTAINER_NAME=swarm_consul -function start_consul() { - docker_host run --name=$CONTAINER_NAME -h $CONTAINER_NAME -p $CONSUL_HOST:8500 -d progrium/consul -server -bootstrap-expect 1 -data-dir /test +function start_store() { + docker_host run --name=$CONTAINER_NAME -h $CONTAINER_NAME -p $STORE_HOST:8500 -d progrium/consul -server -bootstrap-expect 1 -data-dir /test } -function stop_consul() { +function stop_store() { docker_host rm -f -v $CONTAINER_NAME } -function setup() { - start_consul -} - function teardown() { swarm_manage_cleanup swarm_join_cleanup stop_docker - stop_consul + stop_store } -@test "consul discovery" { +@test "consul discovery: recover engines" { + # The goal of this test is to ensure swarm can see engines that joined + # while the manager was stopped. + + # Start the store + start_store + # Start 2 engines and make them join the cluster. start_docker 2 - swarm_join "consul://${CONSUL_HOST}/test" + swarm_join "$DISCOVERY" + retry 5 1 discovery_check_swarm_list "$DISCOVERY" - # Start a manager and ensure it sees all the engines. - swarm_manage "consul://${CONSUL_HOST}/test" - check_swarm_nodes - - # Add another engine to the cluster and make sure it's picked up by swarm. - start_docker 1 - swarm_join "consul://${CONSUL_HOST}/test" - retry 30 1 check_swarm_nodes + # Then, start a manager and ensure it sees all the engines. + swarm_manage "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} + +@test "consul discovery: watch for changes" { + # The goal of this test is to ensure swarm can see new nodes as they join + # the cluster. + start_store + + # Start a manager with no engines. + swarm_manage "$DISCOVERY" + retry 10 1 discovery_check_swarm_info + + # Add engines to the cluster and make sure it's picked up by swarm. + start_docker 2 + swarm_join "$DISCOVERY" + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} + +@test "consul discovery: failure" { + # The goal of this test is to simulate a store failure and ensure discovery + # is resilient to it. + + # At this point, the store is not yet started. + + # Start 2 engines and join the cluster. They should keep retrying + start_docker 2 + swarm_join "$DISCOVERY" + + # Start a manager. It should keep retrying + swarm_manage "$DISCOVERY" + + # Now start the store + start_store + + # After a while, `join` and `manage` should reach the store. + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 5 1 discovery_check_swarm_info } diff --git a/test/integration/discovery/discovery_helpers.bash b/test/integration/discovery/discovery_helpers.bash new file mode 100644 index 0000000000..13597a6aa9 --- /dev/null +++ b/test/integration/discovery/discovery_helpers.bash @@ -0,0 +1,17 @@ +#!/bin/bash + +load ../helpers + +# Returns true if all nodes have joined the swarm. +function discovery_check_swarm_info() { + docker_swarm info | grep -q "Nodes: ${#HOSTS[@]}" +} + +# Returns true if all nodes have joined the discovery. +function discovery_check_swarm_list() { + local joined=`swarm list "$1" | wc -l` + local total=${#HOSTS[@]} + + echo "${joined} out of ${total} hosts joined discovery" + [ "$joined" -eq "$total" ] +} diff --git a/test/integration/discovery/etcd.bats b/test/integration/discovery/etcd.bats index a591a7d05f..a49001d996 100644 --- a/test/integration/discovery/etcd.bats +++ b/test/integration/discovery/etcd.bats @@ -1,43 +1,82 @@ #!/usr/bin/env bats -load ../helpers +load discovery_helpers -# Address on which Etcd will listen (random port between 9000 and 10,000). -ETCD_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 9000 )) +# Address on which the store will listen (random port between 8000 and 9000). +STORE_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 9000 )) + +# Discovery parameter for Swarm +DISCOVERY="etcd://${STORE_HOST}/test" # Container name for integration test CONTAINER_NAME=swarm_etcd -function start_etcd() { - docker_host run -p $ETCD_HOST:4001 --name=$CONTAINER_NAME -d coreos/etcd +function start_store() { + docker_host run -p $STORE_HOST:4001 --name=$CONTAINER_NAME -d coreos/etcd } -function stop_etcd() { +function stop_store() { docker_host rm -f -v $CONTAINER_NAME } -function setup() { - start_etcd -} - function teardown() { swarm_manage_cleanup swarm_join_cleanup stop_docker - stop_etcd + stop_store } -@test "etcd discovery" { +@test "etcd discovery: recover engines" { + # The goal of this test is to ensure swarm can see engines that joined + # while the manager was stopped. + + # Start the store + start_store + + docker_host ps -a # Start 2 engines and make them join the cluster. start_docker 2 - swarm_join "etcd://${ETCD_HOST}/test" + swarm_join "$DISCOVERY" + retry 5 1 discovery_check_swarm_list "$DISCOVERY" - # Start a manager and ensure it sees all the engines. - swarm_manage "etcd://${ETCD_HOST}/test" - check_swarm_nodes - - # Add another engine to the cluster and make sure it's picked up by swarm. - start_docker 1 - swarm_join "etcd://${ETCD_HOST}/test" - retry 30 1 check_swarm_nodes + # Then, start a manager and ensure it sees all the engines. + swarm_manage "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} + +@test "etcd discovery: watch for changes" { + # The goal of this test is to ensure swarm can see new nodes as they join + # the cluster. + start_store + + # Start a manager with no engines. + swarm_manage "$DISCOVERY" + retry 10 1 discovery_check_swarm_info + + # Add engines to the cluster and make sure it's picked up by swarm. + start_docker 2 + swarm_join "$DISCOVERY" + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} + +@test "etcd discovery: failure" { + # The goal of this test is to simulate a store failure and ensure discovery + # is resilient to it. + + # At this point, the store is not yet started. + + # Start 2 engines and join the cluster. They should keep retrying + start_docker 2 + swarm_join "$DISCOVERY" + + # Start a manager. It should keep retrying + swarm_manage "$DISCOVERY" + + # Now start the store + start_store + + # After a while, `join` and `manage` should reach the store. + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 5 1 discovery_check_swarm_info } diff --git a/test/integration/discovery/file.bats b/test/integration/discovery/file.bats index 1641aa39f4..5fa5989b99 100644 --- a/test/integration/discovery/file.bats +++ b/test/integration/discovery/file.bats @@ -1,9 +1,15 @@ #!/usr/bin/env bats -load ../helpers +load discovery_helpers -# create a blank temp file for discovery -DISCOVERY_FILE=$(mktemp) +DISCOVERY_FILE="" +DISCOVERY="" + +function setup() { + # create a blank temp file for discovery + DISCOVERY_FILE=$(mktemp) + DISCOVERY="file://$DISCOVERY_FILE" +} function teardown() { swarm_manage_cleanup @@ -11,24 +17,59 @@ function teardown() { rm -f "$DISCOVERY_FILE" } -function setup_file_discovery() { +function setup_discovery_file() { rm -f "$DISCOVERY_FILE" for host in ${HOSTS[@]}; do echo "$host" >> $DISCOVERY_FILE done } -@test "file discovery" { - # Start 2 engines, register them in a file, then start swarm and make sure - # it sees them. - start_docker 2 - setup_file_discovery - swarm_manage "file://$DISCOVERY_FILE" - check_swarm_nodes +@test "file discovery: recover engines" { + # The goal of this test is to ensure swarm can see engines that joined + # while the manager was stopped. - # Add another engine to the cluster, update the discovery file and make - # sure it's picked up by swarm. - start_docker 1 - setup_file_discovery - retry 10 1 check_swarm_nodes + # Start 2 engines and register them in the file. + start_docker 2 + setup_discovery_file + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + + # Then, start a manager and ensure it sees all the engines. + swarm_manage "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} + +@test "file discovery: watch for changes" { + # The goal of this test is to ensure swarm can see new nodes as they join + # the cluster. + + # Start a manager with no engines. + swarm_manage "$DISCOVERY" + retry 10 1 discovery_check_swarm_info + + # Add engines to the cluster and make sure it's picked up by swarm. + start_docker 2 + setup_discovery_file + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} + +@test "file discovery: failure" { + # The goal of this test is to simulate a failure (file not available) and ensure discovery + # is resilient to it. + + # Wipe out the discovery file. + rm -f "$DISCOVERY_FILE" + + # Start 2 engines. + start_docker 2 + + # Start a manager. It should keep retrying + swarm_manage "$DISCOVERY" + + # Now create the discovery file. + setup_discovery_file + + # After a while, `join` and `manage` should see the file. + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 5 1 discovery_check_swarm_info } diff --git a/test/integration/discovery/token.bats b/test/integration/discovery/token.bats index e44e45ce9e..894fa3ec76 100644 --- a/test/integration/discovery/token.bats +++ b/test/integration/discovery/token.bats @@ -1,8 +1,9 @@ #!/usr/bin/env bats -load ../helpers +load discovery_helpers TOKEN="" +DISCOVERY="" function token_cleanup() { [ -z "$TOKEN" ] && return @@ -10,6 +11,12 @@ function token_cleanup() { curl -X DELETE "https://discovery-stage.hub.docker.com/v1/clusters/$TOKEN" } +function setup() { + TOKEN=$(swarm create) + [[ "$TOKEN" =~ ^[0-9a-f]{32}$ ]] + DISCOVERY="token://$TOKEN" +} + function teardown() { swarm_manage_cleanup swarm_join_cleanup @@ -17,23 +24,31 @@ function teardown() { token_cleanup } -@test "token discovery" { - # Create a cluster and validate the token. - run swarm create - [ "$status" -eq 0 ] - [[ "$output" =~ ^[0-9a-f]{32}$ ]] - TOKEN="$output" +@test "token discovery: recover engines" { + # The goal of this test is to ensure swarm can see engines that joined + # while the manager was stopped. # Start 2 engines and make them join the cluster. start_docker 2 - swarm_join "token://$TOKEN" + swarm_join "$DISCOVERY" + retry 5 1 discovery_check_swarm_list "$DISCOVERY" - # Start a manager and ensure it sees all the engines. - swarm_manage "token://$TOKEN" - check_swarm_nodes - - # Add another engine to the cluster and make sure it's picked up by swarm. - start_docker 1 - swarm_join "token://$TOKEN" - retry 10 1 check_swarm_nodes + # Then, start a manager and ensure it sees all the engines. + swarm_manage "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} + +@test "token discovery: watch for changes" { + # The goal of this test is to ensure swarm can see new nodes as they join + # the cluster. + + # Start a manager with no engines. + swarm_manage "$DISCOVERY" + retry 10 1 discovery_check_swarm_info + + # Add engines to the cluster and make sure it's picked up by swarm. + start_docker 2 + swarm_join "$DISCOVERY" + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 5 1 discovery_check_swarm_info } diff --git a/test/integration/discovery/zk.bats b/test/integration/discovery/zk.bats index 0a0837edfd..2c933630ee 100644 --- a/test/integration/discovery/zk.bats +++ b/test/integration/discovery/zk.bats @@ -1,43 +1,81 @@ #!/usr/bin/env bats -load ../helpers +load discovery_helpers -# Address on which Zookeeper will listen (random port between 7000 and 8000). -ZK_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 7000 )) +# Address on which the store will listen (random port between 8000 and 9000). +STORE_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 7000 )) + +# Discovery parameter for Swarm +DISCOVERY="zk://${STORE_HOST}/test" # Container name for integration test -ZK_CONTAINER_NAME=swarm_integration_zk +CONTAINER_NAME=swarm_integration_zk -function start_zk() { - docker_host run --name $ZK_CONTAINER_NAME -p $ZK_HOST:2181 -d jplock/zookeeper +function start_store() { + docker_host run --name $CONTAINER_NAME -p $STORE_HOST:2181 -d jplock/zookeeper } -function stop_zk() { - docker_host rm -f -v $ZK_CONTAINER_NAME -} - -function setup() { - start_zk +function stop_store() { + docker_host rm -f -v $CONTAINER_NAME } function teardown() { swarm_manage_cleanup swarm_join_cleanup stop_docker - stop_zk + stop_store } -@test "zookeeper discovery" { +@test "zk discovery: recover engines" { + # The goal of this test is to ensure swarm can see engines that joined + # while the manager was stopped. + + # Start the store + start_store + # Start 2 engines and make them join the cluster. start_docker 2 - swarm_join "zk://${ZK_HOST}/test" + swarm_join "$DISCOVERY" + retry 5 1 discovery_check_swarm_list "$DISCOVERY" - # Start a manager and ensure it sees all the engines. - swarm_manage "zk://${ZK_HOST}/test" - check_swarm_nodes - - # Add another engine to the cluster and make sure it's picked up by swarm. - start_docker 1 - swarm_join "zk://${ZK_HOST}/test" - retry 30 1 check_swarm_nodes + # Then, start a manager and ensure it sees all the engines. + swarm_manage "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} + +@test "zk discovery: watch for changes" { + # The goal of this test is to ensure swarm can see new nodes as they join + # the cluster. + start_store + + # Start a manager with no engines. + swarm_manage "$DISCOVERY" + retry 10 1 discovery_check_swarm_info + + # Add engines to the cluster and make sure it's picked up by swarm. + start_docker 2 + swarm_join "$DISCOVERY" + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} + +@test "zk discovery: failure" { + # The goal of this test is to simulate a store failure and ensure discovery + # is resilient to it. + + # At this point, the store is not yet started. + + # Start 2 engines and join the cluster. They should keep retrying + start_docker 2 + swarm_join "$DISCOVERY" + + # Start a manager. It should keep retrying + swarm_manage "$DISCOVERY" + + # Now start the store + start_store + + # After a while, `join` and `manage` should reach the store. + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 5 1 discovery_check_swarm_info } diff --git a/test/integration/helpers.bash b/test/integration/helpers.bash index daef4e7669..231b900b5f 100644 --- a/test/integration/helpers.bash +++ b/test/integration/helpers.bash @@ -85,11 +85,6 @@ function wait_until_reachable() { retry 10 1 docker -H $1 info } -# Returns true if all nodes have joined the swarm. -function check_swarm_nodes() { - docker_swarm info | grep -q "Nodes: ${#HOSTS[@]}" -} - # Start the swarm manager in background. function swarm_manage() { local discovery @@ -99,10 +94,9 @@ function swarm_manage() { discovery="$@" fi - "$SWARM_BINARY" manage -H "$SWARM_HOST" --cluster-opt "swarm.discovery.heartbeat=1s" "$discovery" & + "$SWARM_BINARY" -l debug manage -H "$SWARM_HOST" --cluster-opt "swarm.discovery.heartbeat=1s" "$discovery" & SWARM_PID=$! wait_until_reachable "$SWARM_HOST" - retry 10 1 check_swarm_nodes } # swarm join every engine created with `start_docker`. @@ -120,23 +114,12 @@ function swarm_join() { # Start the engines. local i - echo "current: $current | nodes: $nodes" > log for ((i=current; i < nodes; i++)); do local h="${HOSTS[$i]}" echo "Swarm join #${i}: $h $addr" - "$SWARM_BINARY" join --heartbeat=1s --addr="$h" "$addr" & + "$SWARM_BINARY" -l debug join --heartbeat=1s --addr="$h" "$addr" & SWARM_JOIN_PID[$i]=$! done - retry 10 0.5 check_discovery_nodes "$addr" -} - -# Returns true if all nodes have joined the discovery. -function check_discovery_nodes() { - local joined=`swarm list "$1" | wc -l` - local total=${#HOSTS[@]} - - echo "${joined} out of ${total} hosts joined discovery" - [ "$joined" -eq "$total" ] } # Stops the manager. From 9399a8835c27f19f26207dcb96af026e20c0889e Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Sat, 16 May 2015 17:01:30 -0700 Subject: [PATCH 10/16] golint fixes. Signed-off-by: Andrea Luzzardi --- discovery/discovery.go | 19 +++++++++++-------- pkg/store/consul.go | 3 +++ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/discovery/discovery.go b/discovery/discovery.go index 4923b5e800..e9e44adb77 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -26,23 +26,26 @@ func NewEntry(url string) (*Entry, error) { } // String returns the string form of an entry. -func (m *Entry) String() string { - return fmt.Sprintf("%s:%s", m.Host, m.Port) +func (e *Entry) String() string { + return fmt.Sprintf("%s:%s", e.Host, e.Port) } -func (a *Entry) Equals(b *Entry) bool { - return a.Host == b.Host && a.Port == b.Port +// Equals returns true if cmp contains the same data. +func (e *Entry) Equals(cmp *Entry) bool { + return e.Host == cmp.Host && e.Port == cmp.Port } +// Entries is a list of *Entry with some helpers. type Entries []*Entry -func (a Entries) Equals(b Entries) bool { +// Equals returns true if cmp contains the same data. +func (e Entries) Equals(cmp Entries) bool { // Check if the file has really changed. - if len(a) != len(b) { + if len(e) != len(cmp) { return false } - for i, _ := range a { - if !a[i].Equals(b[i]) { + for i := range e { + if !e[i].Equals(cmp[i]) { return false } } diff --git a/pkg/store/consul.go b/pkg/store/consul.go index 9932f3cb15..f2696453b1 100644 --- a/pkg/store/consul.go +++ b/pkg/store/consul.go @@ -11,6 +11,9 @@ import ( ) const ( + // DefaultWatchWaitTime is how long we block for at a time to check if the + // watched key has changed. This affects the minimum time it takes to + // cancel a watch. DefaultWatchWaitTime = 15 * time.Second ) From b459f7c7f80e92ae6baff79133491a0bf146501b Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Sat, 16 May 2015 17:30:17 -0700 Subject: [PATCH 11/16] store: Add Mock store. Signed-off-by: Andrea Luzzardi --- pkg/store/mock.go | 85 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 pkg/store/mock.go diff --git a/pkg/store/mock.go b/pkg/store/mock.go new file mode 100644 index 0000000000..1ca1b95be3 --- /dev/null +++ b/pkg/store/mock.go @@ -0,0 +1,85 @@ +package store + +import "github.com/stretchr/testify/mock" + +// Mock store. Mocks all Store functions using testify.Mock. +type Mock struct { + mock.Mock + + endpoints []string + options *Config +} + +// InitializeMock creates a Mock store. +func InitializeMock(endpoints []string, options *Config) (Store, error) { + s := &Mock{} + s.endpoints = endpoints + s.options = options + return s, nil +} + +// Put mock +func (s *Mock) Put(key string, value []byte) error { + args := s.Mock.Called(key, value) + return args.Error(0) +} + +// Get mock +func (s *Mock) Get(key string) (*KVPair, error) { + args := s.Mock.Called(key) + return args.Get(0).(*KVPair), args.Error(1) +} + +// Delete mock +func (s *Mock) Delete(key string) error { + args := s.Mock.Called(key) + return args.Error(0) +} + +// Exists mock +func (s *Mock) Exists(key string) (bool, error) { + args := s.Mock.Called(key) + return args.Bool(0), args.Error(1) +} + +// Watch mock +func (s *Mock) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) { + args := s.Mock.Called(key, stopCh) + return args.Get(0).(<-chan *KVPair), args.Error(1) +} + +// WatchTree mock +func (s *Mock) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) { + args := s.Mock.Called(prefix, stopCh) + return args.Get(0).(<-chan []*KVPair), args.Error(1) +} + +// CreateLock mock +func (s *Mock) CreateLock(key string, value []byte) (Locker, error) { + args := s.Mock.Called(key, value) + return args.Get(0).(Locker), args.Error(1) +} + +// List mock +func (s *Mock) List(prefix string) ([]*KVPair, error) { + args := s.Mock.Called(prefix) + return args.Get(0).([]*KVPair), args.Error(1) +} + +// DeleteTree mock +func (s *Mock) DeleteTree(prefix string) error { + args := s.Mock.Called(prefix) + return args.Error(0) +} + +// AtomicPut mock +func (s *Mock) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) { + args := s.Mock.Called(key, value, previous) + return args.Bool(0), args.Error(1) +} + +// AtomicDelete mock +func (s *Mock) AtomicDelete(key string, previous *KVPair) (bool, error) { + args := s.Mock.Called(key, previous) + return args.Bool(0), args.Error(1) +} From f49ca7e20ff6abbfab888b6ae51662e38cc05c89 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Sat, 16 May 2015 17:45:56 -0700 Subject: [PATCH 12/16] store: Mock stores can now be created Signed-off-by: Andrea Luzzardi --- pkg/store/mock.go | 10 ++++++---- pkg/store/store.go | 5 ++++- pkg/store/zookeeper.go | 1 - 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/pkg/store/mock.go b/pkg/store/mock.go index 1ca1b95be3..f262f03ec0 100644 --- a/pkg/store/mock.go +++ b/pkg/store/mock.go @@ -6,15 +6,17 @@ import "github.com/stretchr/testify/mock" type Mock struct { mock.Mock - endpoints []string - options *Config + // Endpoints passed to InitializeMock + Endpoints []string + // Options passed to InitializeMock + Options *Config } // InitializeMock creates a Mock store. func InitializeMock(endpoints []string, options *Config) (Store, error) { s := &Mock{} - s.endpoints = endpoints - s.options = options + s.Endpoints = endpoints + s.Options = options return s, nil } diff --git a/pkg/store/store.go b/pkg/store/store.go index 442ee548f1..d10ecab503 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -12,8 +12,10 @@ import ( type Backend string const ( + // MOCK backend + MOCK Backend = "mock" // CONSUL backend - CONSUL Backend = "consul" + CONSUL = "consul" // ETCD backend ETCD = "etcd" // ZK backend @@ -114,6 +116,7 @@ type Initialize func(addrs []string, options *Config) (Store, error) var ( // Backend initializers initializers = map[Backend]Initialize{ + MOCK: InitializeMock, CONSUL: InitializeConsul, ETCD: InitializeEtcd, ZK: InitializeZookeeper, diff --git a/pkg/store/zookeeper.go b/pkg/store/zookeeper.go index fa324036ae..b55e7f42f7 100644 --- a/pkg/store/zookeeper.go +++ b/pkg/store/zookeeper.go @@ -9,7 +9,6 @@ import ( ) // Zookeeper embeds the zookeeper client -// and list of watches type Zookeeper struct { timeout time.Duration client *zk.Conn From 2106966d54dcc629e063424d9da943e1877e280b Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Sat, 16 May 2015 18:17:46 -0700 Subject: [PATCH 13/16] discovery: Watch tests for file and some other tests. Signed-off-by: Andrea Luzzardi --- discovery/discovery_test.go | 38 ++++++++++++++++++--- discovery/file/file.go | 7 +++- discovery/file/file_test.go | 64 ++++++++++++++++++++++++++++++++--- discovery/kv/kv.go | 9 +++-- discovery/kv/kv_test.go | 25 +++++++++----- discovery/nodes/nodes.go | 1 + discovery/nodes/nodes_test.go | 46 +++++++++++++++---------- discovery/token/token.go | 3 ++ pkg/store/mock.go | 2 +- 9 files changed, 154 insertions(+), 41 deletions(-) diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go index 5cbf3ffb86..0c5127473c 100644 --- a/discovery/discovery_test.go +++ b/discovery/discovery_test.go @@ -8,9 +8,9 @@ import ( func TestNewEntry(t *testing.T) { entry, err := NewEntry("127.0.0.1:2375") - assert.Equal(t, entry.Host, "127.0.0.1") - assert.Equal(t, entry.Port, "2375") assert.NoError(t, err) + assert.True(t, entry.Equals(&Entry{Host: "127.0.0.1", Port: "2375"})) + assert.Equal(t, entry.String(), "127.0.0.1:2375") _, err = NewEntry("127.0.0.1") assert.Error(t, err) @@ -44,11 +44,39 @@ func TestCreateEntries(t *testing.T) { assert.NoError(t, err) entries, err = CreateEntries([]string{"127.0.0.1:2375", "127.0.0.2:2375", ""}) - assert.Equal(t, len(entries), 2) - assert.Equal(t, entries[0].String(), "127.0.0.1:2375") - assert.Equal(t, entries[1].String(), "127.0.0.2:2375") assert.NoError(t, err) + expected := Entries{ + &Entry{Host: "127.0.0.1", Port: "2375"}, + &Entry{Host: "127.0.0.2", Port: "2375"}, + } + assert.True(t, entries.Equals(expected)) _, err = CreateEntries([]string{"127.0.0.1", "127.0.0.2"}) assert.Error(t, err) } + +func TestEntriesEquality(t *testing.T) { + entries := Entries{ + &Entry{Host: "127.0.0.1", Port: "2375"}, + &Entry{Host: "127.0.0.2", Port: "2375"}, + } + + // Same + assert.True(t, entries.Equals(Entries{ + &Entry{Host: "127.0.0.1", Port: "2375"}, + &Entry{Host: "127.0.0.2", Port: "2375"}, + })) + + // Different size + assert.False(t, entries.Equals(Entries{ + &Entry{Host: "127.0.0.1", Port: "2375"}, + &Entry{Host: "127.0.0.2", Port: "2375"}, + &Entry{Host: "127.0.0.3", Port: "2375"}, + })) + + // Different content + assert.False(t, entries.Equals(Entries{ + &Entry{Host: "127.0.0.1", Port: "2375"}, + &Entry{Host: "127.0.0.42", Port: "2375"}, + })) +} diff --git a/discovery/file/file.go b/discovery/file/file.go index 3e867800b0..4eb146ca7a 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -62,9 +62,14 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c ticker := time.NewTicker(s.heartbeat) go func() { + defer close(errCh) + defer close(ch) + // Send the initial entries if available. currentEntries, err := s.fetch() - if err == nil { + if err != nil { + errCh <- err + } else { ch <- currentEntries } diff --git a/discovery/file/file_test.go b/discovery/file/file_test.go index 14ac918b82..0ec05972dc 100644 --- a/discovery/file/file_test.go +++ b/discovery/file/file_test.go @@ -1,15 +1,24 @@ package file import ( + "io/ioutil" + "os" "testing" + "github.com/docker/swarm/discovery" "github.com/stretchr/testify/assert" ) func TestInitialize(t *testing.T) { - discovery := &Discovery{} - discovery.Initialize("/path/to/file", 0) - assert.Equal(t, discovery.path, "/path/to/file") + d := &Discovery{} + d.Initialize("/path/to/file", 0) + assert.Equal(t, d.path, "/path/to/file") +} + +func TestNew(t *testing.T) { + d, err := discovery.New("file:///path/to/file", 0) + assert.NoError(t, err) + assert.Equal(t, d.(*Discovery).path, "/path/to/file") } func TestContent(t *testing.T) { @@ -18,6 +27,7 @@ func TestContent(t *testing.T) { 2.2.2.[2:4]:2222 ` ips := parseFileContent([]byte(data)) + assert.Len(t, ips, 5) assert.Equal(t, ips[0], "1.1.1.1:1111") assert.Equal(t, ips[1], "1.1.1.2:1111") assert.Equal(t, ips[2], "2.2.2.2:2222") @@ -40,7 +50,53 @@ func TestParsingContentsWithComments(t *testing.T) { ### test ### ` ips := parseFileContent([]byte(data)) - assert.Equal(t, 2, len(ips)) + assert.Len(t, ips, 2) assert.Equal(t, "1.1.1.1:1111", ips[0]) assert.Equal(t, "3.3.3.3:3333", ips[1]) } + +func TestWatch(t *testing.T) { + data := ` +1.1.1.1:1111 +2.2.2.2:2222 +` + expected := discovery.Entries{ + &discovery.Entry{Host: "1.1.1.1", Port: "1111"}, + &discovery.Entry{Host: "2.2.2.2", Port: "2222"}, + } + + // Create a temporary file and remove it. + tmp, err := ioutil.TempFile(os.TempDir(), "discovery-file-test") + assert.NoError(t, err) + assert.NoError(t, tmp.Close()) + assert.NoError(t, os.Remove(tmp.Name())) + + // Set up file discovery. + d := &Discovery{} + d.Initialize(tmp.Name(), 1) + stopCh := make(chan struct{}) + ch, errCh := d.Watch(stopCh) + + // Make sure it fires errors since the file doesn't exist. + assert.Error(t, <-errCh) + // We have to drain the error channel otherwise Watch will get stuck. + go func() { + for _ = range errCh { + } + }() + + // Write the file and make sure we get the expected value back. + assert.NoError(t, ioutil.WriteFile(tmp.Name(), []byte(data), 0600)) + assert.Equal(t, <-ch, expected) + + // Add a new entry and look it up. + data += "\n3.3.3.3:3333\n" + expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"}) + assert.NoError(t, ioutil.WriteFile(tmp.Name(), []byte(data), 0600)) + assert.Equal(t, <-ch, expected) + + // Stop and make sure it closes all channels. + close(stopCh) + assert.Nil(t, <-ch) + assert.Nil(t, <-errCh) +} diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go index 1318681d0d..3bffb105c9 100644 --- a/discovery/kv/kv.go +++ b/discovery/kv/kv.go @@ -54,11 +54,7 @@ func (s *Discovery) Initialize(uris string, heartbeat time.Duration) error { Timeout: s.heartbeat, }, ) - if err != nil { - return err - } - - return nil + return err } // Watch the store until either there's a store error or we receive a stop request. @@ -98,6 +94,9 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c errCh := make(chan error) go func() { + defer close(ch) + defer close(errCh) + // Forever: Create a store watch, watch until we get an error and then try again. // Will only stop if we receive a stopCh request. for { diff --git a/discovery/kv/kv_test.go b/discovery/kv/kv_test.go index 6a2b0c7fef..d8b6685ddd 100644 --- a/discovery/kv/kv_test.go +++ b/discovery/kv/kv_test.go @@ -3,18 +3,27 @@ package kv import ( "testing" + "github.com/docker/swarm/pkg/store" "github.com/stretchr/testify/assert" ) func TestInitialize(t *testing.T) { - discoveryService := &Discovery{} + d := &Discovery{backend: store.MOCK} + assert.EqualError(t, d.Initialize("127.0.0.1", 0), "invalid format \"127.0.0.1\", missing ") - assert.Equal(t, discoveryService.Initialize("127.0.0.1", 0).Error(), "invalid format \"127.0.0.1\", missing ") - - assert.Error(t, discoveryService.Initialize("127.0.0.1/path", 0)) - assert.Equal(t, discoveryService.prefix, "path") - - assert.Error(t, discoveryService.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0)) - assert.Equal(t, discoveryService.prefix, "path") + d = &Discovery{backend: store.MOCK} + assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0)) + s := d.store.(*store.Mock) + assert.Len(t, s.Endpoints, 1) + assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234") + assert.Equal(t, d.prefix, "path") + d = &Discovery{backend: store.MOCK} + assert.NoError(t, d.Initialize("127.0.0.1:1234,127.0.0.2:1234,127.0.0.3:1234/path", 0)) + s = d.store.(*store.Mock) + assert.Len(t, s.Endpoints, 3) + assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234") + assert.Equal(t, s.Endpoints[1], "127.0.0.2:1234") + assert.Equal(t, s.Endpoints[2], "127.0.0.3:1234") + assert.Equal(t, d.prefix, "path") } diff --git a/discovery/nodes/nodes.go b/discovery/nodes/nodes.go index e56b027ce1..68d8b0ec0b 100644 --- a/discovery/nodes/nodes.go +++ b/discovery/nodes/nodes.go @@ -35,6 +35,7 @@ func (s *Discovery) Initialize(uris string, _ time.Duration) error { func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { ch := make(chan discovery.Entries) go func() { + defer close(ch) ch <- s.entries <-stopCh }() diff --git a/discovery/nodes/nodes_test.go b/discovery/nodes/nodes_test.go index 04f682cabb..4c0583cbae 100644 --- a/discovery/nodes/nodes_test.go +++ b/discovery/nodes/nodes_test.go @@ -3,29 +3,41 @@ package nodes import ( "testing" + "github.com/docker/swarm/discovery" "github.com/stretchr/testify/assert" ) -func TestInitialise(t *testing.T) { - discovery := &Discovery{} - discovery.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0) - assert.Equal(t, len(discovery.entries), 2) - assert.Equal(t, discovery.entries[0].String(), "1.1.1.1:1111") - assert.Equal(t, discovery.entries[1].String(), "2.2.2.2:2222") +func TestInitialize(t *testing.T) { + d := &Discovery{} + d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0) + assert.Equal(t, len(d.entries), 2) + assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111") + assert.Equal(t, d.entries[1].String(), "2.2.2.2:2222") } -func TestInitialiseWithPattern(t *testing.T) { - discovery := &Discovery{} - discovery.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0) - assert.Equal(t, len(discovery.entries), 5) - assert.Equal(t, discovery.entries[0].String(), "1.1.1.1:1111") - assert.Equal(t, discovery.entries[1].String(), "1.1.1.2:1111") - assert.Equal(t, discovery.entries[2].String(), "2.2.2.2:2222") - assert.Equal(t, discovery.entries[3].String(), "2.2.2.3:2222") - assert.Equal(t, discovery.entries[4].String(), "2.2.2.4:2222") +func TestInitializeWithPattern(t *testing.T) { + d := &Discovery{} + d.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0) + assert.Equal(t, len(d.entries), 5) + assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111") + assert.Equal(t, d.entries[1].String(), "1.1.1.2:1111") + assert.Equal(t, d.entries[2].String(), "2.2.2.2:2222") + assert.Equal(t, d.entries[3].String(), "2.2.2.3:2222") + assert.Equal(t, d.entries[4].String(), "2.2.2.4:2222") +} + +func TestWatch(t *testing.T) { + d := &Discovery{} + d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0) + expected := discovery.Entries{ + &discovery.Entry{Host: "1.1.1.1", Port: "1111"}, + &discovery.Entry{Host: "2.2.2.2", Port: "2222"}, + } + ch, _ := d.Watch(nil) + assert.True(t, expected.Equals(<-ch)) } func TestRegister(t *testing.T) { - discovery := &Discovery{} - assert.Error(t, discovery.Register("0.0.0.0")) + d := &Discovery{} + assert.Error(t, d.Register("0.0.0.0")) } diff --git a/discovery/token/token.go b/discovery/token/token.go index c1a7c5e99c..84ec0be8cd 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -72,6 +72,9 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c errCh := make(chan error) go func() { + defer close(ch) + defer close(errCh) + // Send the initial entries if available. currentEntries, err := s.fetch() if err != nil { diff --git a/pkg/store/mock.go b/pkg/store/mock.go index f262f03ec0..3b70955628 100644 --- a/pkg/store/mock.go +++ b/pkg/store/mock.go @@ -53,7 +53,7 @@ func (s *Mock) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) // WatchTree mock func (s *Mock) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) { args := s.Mock.Called(prefix, stopCh) - return args.Get(0).(<-chan []*KVPair), args.Error(1) + return args.Get(0).(chan []*KVPair), args.Error(1) } // CreateLock mock From 0c9e8c8803e5d5caa83dc517c057b6154c1b2381 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Sat, 16 May 2015 19:35:00 -0700 Subject: [PATCH 14/16] discovery: Added kv tests using the mock Store. Signed-off-by: Andrea Luzzardi --- discovery/kv/kv_test.go | 60 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/discovery/kv/kv_test.go b/discovery/kv/kv_test.go index d8b6685ddd..381171f415 100644 --- a/discovery/kv/kv_test.go +++ b/discovery/kv/kv_test.go @@ -1,10 +1,14 @@ package kv import ( + "errors" "testing" + "time" + "github.com/docker/swarm/discovery" "github.com/docker/swarm/pkg/store" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) func TestInitialize(t *testing.T) { @@ -27,3 +31,59 @@ func TestInitialize(t *testing.T) { assert.Equal(t, s.Endpoints[2], "127.0.0.3:1234") assert.Equal(t, d.prefix, "path") } + +func TestWatch(t *testing.T) { + d := &Discovery{backend: store.MOCK} + assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0)) + s := d.store.(*store.Mock) + + mockCh := make(chan []*store.KVPair) + + // The first watch will fail. + s.On("WatchTree", "path", mock.Anything).Return(mockCh, errors.New("test error")).Once() + // The second one will succeed. + s.On("WatchTree", "path", mock.Anything).Return(mockCh, nil).Once() + expected := discovery.Entries{ + &discovery.Entry{Host: "1.1.1.1", Port: "1111"}, + &discovery.Entry{Host: "2.2.2.2", Port: "2222"}, + } + kvs := []*store.KVPair{ + {Key: "path/1.1.1.1", Value: []byte("1.1.1.1:1111")}, + {Key: "path/1.1.1.1", Value: []byte("2.2.2.2:2222")}, + } + + stopCh := make(chan struct{}) + ch, errCh := d.Watch(stopCh) + + // It should fire an error since the first WatchRange call failed. + assert.EqualError(t, <-errCh, "test error") + // We have to drain the error channel otherwise Watch will get stuck. + go func() { + for _ = range errCh { + } + }() + + // Push the entries into the store channel and make sure discovery emits. + mockCh <- kvs + assert.Equal(t, <-ch, expected) + + // Add a new entry. + expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"}) + kvs = append(kvs, &store.KVPair{Key: "path/3.3.3.3", Value: []byte("3.3.3.3:3333")}) + mockCh <- kvs + assert.Equal(t, <-ch, expected) + + // Make sure that if an error occurs it retries. + // This third call to WatchTree will be checked later by AssertExpectations. + s.On("WatchTree", "path", mock.Anything).Return(mockCh, nil) + close(mockCh) + // Give it enough time to call WatchTree. + time.Sleep(3) + + // Stop and make sure it closes all channels. + close(stopCh) + assert.Nil(t, <-ch) + assert.Nil(t, <-errCh) + + s.AssertExpectations(t) +} From 3d6f833f6f653c3a56e1582ee2573b7ff505f5d1 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Mon, 18 May 2015 13:13:53 -0700 Subject: [PATCH 15/16] fix --heartbeat flag. Signed-off-by: Andrea Luzzardi --- cli/cli.go | 7 +------ cli/flags.go | 5 +++++ cluster/swarm/cluster.go | 3 +++ 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/cli/cli.go b/cli/cli.go index 2accbd149a..442370d4de 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -77,12 +77,7 @@ func Run() { Name: "list", ShortName: "l", Usage: "list nodes in a cluster", - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "timeout", - Value: "10s", - }, - }, + Flags: []cli.Flag{flTimeout}, Action: func(c *cli.Context) { dflag := getDiscovery(c) if dflag == "" { diff --git a/cli/flags.go b/cli/flags.go index 822b937020..b24362b73c 100644 --- a/cli/flags.go +++ b/cli/flags.go @@ -53,6 +53,11 @@ var ( Value: "25s", Usage: "period between each heartbeat", } + flTimeout = cli.StringFlag{ + Name: "timeout", + Value: "10s", + Usage: "timeout period", + } flEnableCors = cli.BoolFlag{ Name: "api-enable-cors, cors", Usage: "enable CORS headers in the remote API", diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index 3cc8b3fb80..4819258bf2 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -61,6 +61,9 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *t if err != nil { return nil, err } + if h < 1*time.Second { + return nil, fmt.Errorf("invalid heartbeat %s: must be at least 1s", opt) + } heartbeat = h } From 6fbbfd7287efbea3335ae8a378caad15c0451c88 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Mon, 18 May 2015 13:45:11 -0700 Subject: [PATCH 16/16] integration: give zk some extra time. Signed-off-by: Andrea Luzzardi --- test/integration/discovery/zk.bats | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/integration/discovery/zk.bats b/test/integration/discovery/zk.bats index 2c933630ee..0ab9d0bbca 100644 --- a/test/integration/discovery/zk.bats +++ b/test/integration/discovery/zk.bats @@ -36,11 +36,11 @@ function teardown() { # Start 2 engines and make them join the cluster. start_docker 2 swarm_join "$DISCOVERY" - retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 10 1 discovery_check_swarm_list "$DISCOVERY" # Then, start a manager and ensure it sees all the engines. swarm_manage "$DISCOVERY" - retry 5 1 discovery_check_swarm_info + retry 10 1 discovery_check_swarm_info } @test "zk discovery: watch for changes" { @@ -55,8 +55,8 @@ function teardown() { # Add engines to the cluster and make sure it's picked up by swarm. start_docker 2 swarm_join "$DISCOVERY" - retry 5 1 discovery_check_swarm_list "$DISCOVERY" - retry 5 1 discovery_check_swarm_info + retry 10 1 discovery_check_swarm_list "$DISCOVERY" + retry 10 1 discovery_check_swarm_info } @test "zk discovery: failure" { @@ -76,6 +76,6 @@ function teardown() { start_store # After a while, `join` and `manage` should reach the store. - retry 5 1 discovery_check_swarm_list "$DISCOVERY" - retry 5 1 discovery_check_swarm_info + retry 10 1 discovery_check_swarm_list "$DISCOVERY" + retry 10 1 discovery_check_swarm_info }