diff --git a/cli/cli.go b/cli/cli.go index b1b31a673e..442370d4de 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path" + "time" log "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" @@ -76,24 +77,32 @@ func Run() { Name: "list", ShortName: "l", Usage: "list nodes in a cluster", + Flags: []cli.Flag{flTimeout}, Action: func(c *cli.Context) { dflag := getDiscovery(c) if dflag == "" { log.Fatalf("discovery required to list a cluster. See '%s list --help'.", c.App.Name) } + timeout, err := time.ParseDuration(c.String("timeout")) + if err != nil { + log.Fatalf("invalid --timeout: %v", err) + } - // FIXME Add and use separate timeout flag instead of forcing it - d, err := discovery.New(dflag, 10) + d, err := discovery.New(dflag, timeout) if err != nil { log.Fatal(err) } - nodes, err := d.Fetch() - if err != nil { + ch, errCh := d.Watch(nil) + select { + case entries := <-ch: + for _, entry := range entries { + fmt.Println(entry) + } + case err := <-errCh: log.Fatal(err) - } - for _, node := range nodes { - fmt.Println(node) + case <-time.After(timeout): + log.Fatal("Timed out") } }, }, diff --git a/cli/flags.go b/cli/flags.go index 9b2485d944..b24362b73c 100644 --- a/cli/flags.go +++ b/cli/flags.go @@ -48,10 +48,15 @@ var ( Usage: "ip/socket to listen on", EnvVar: "SWARM_HOST", } - flHeartBeat = cli.IntFlag{ + flHeartBeat = cli.StringFlag{ Name: "heartbeat, hb", - Value: 25, - Usage: "time in second between each heartbeat", + Value: "25s", + Usage: "period between each heartbeat", + } + flTimeout = cli.StringFlag{ + Name: "timeout", + Value: "10s", + Usage: "timeout period", } flEnableCors = cli.BoolFlag{ Name: "api-enable-cors, cors", diff --git a/cli/help.go b/cli/help.go index 6c13e4fff2..9f4fe25bf8 100644 --- a/cli/help.go +++ b/cli/help.go @@ -26,7 +26,7 @@ ARGUMENTS: OPTIONS: {{range .Flags}}{{.}} {{end}}{{if (eq .Name "manage")}}{{printf "\t * swarm.overcommit=0.05\tovercommit to apply on resources"}} - {{printf "\t * swarm.discovery.heartbeat=25\ttime in second between each heartbeat"}}{{end}}{{ end }} + {{printf "\t * swarm.discovery.heartbeat=25s\tperiod between each heartbeat"}}{{end}}{{ end }} ` } diff --git a/cli/join.go b/cli/join.go index 3c5a6c7594..7a151a5215 100644 --- a/cli/join.go +++ b/cli/join.go @@ -2,7 +2,6 @@ package cli import ( "regexp" - "strconv" "time" log "github.com/Sirupsen/logrus" @@ -21,11 +20,13 @@ func join(c *cli.Context) { log.Fatalf("discovery required to join a cluster. See '%s join --help'.", c.App.Name) } - hb, err := strconv.ParseUint(c.String("heartbeat"), 0, 32) - if hb < 1 || err != nil { - log.Fatal("--heartbeat should be an unsigned integer and greater than 0") + hb, err := time.ParseDuration(c.String("heartbeat")) + if err != nil { + log.Fatalf("invalid --heartbeat: %v", err) + } + if hb < 1*time.Second { + log.Fatal("--heartbeat should be at least one second") } - d, err := discovery.New(dflag, hb) if err != nil { log.Fatal(err) @@ -37,16 +38,11 @@ func join(c *cli.Context) { log.Fatal("--addr should be of the form ip:port or hostname:port") } - if err := d.Register(addr); err != nil { - log.Fatal(err) - } - - hbval := time.Duration(hb) for { - log.WithFields(log.Fields{"addr": addr, "discovery": dflag}).Infof("Registering on the discovery service every %d seconds...", hbval) - time.Sleep(hbval * time.Second) + log.WithFields(log.Fields{"addr": addr, "discovery": dflag}).Infof("Registering on the discovery service every %s...", hb) if err := d.Register(addr); err != nil { log.Error(err) } + time.Sleep(hb) } } diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index 440066ce7b..4819258bf2 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -8,6 +8,7 @@ import ( "sort" "strings" "sync" + "time" log "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/stringid" @@ -28,10 +29,9 @@ type Cluster struct { engines map[string]*cluster.Engine scheduler *scheduler.Scheduler store *state.Store + discovery discovery.Discovery overcommitRatio float64 - discovery string - heartbeat uint64 TLSConfig *tls.Config } @@ -39,13 +39,15 @@ type Cluster struct { func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *tls.Config, dflag string, options cluster.DriverOpts) (cluster.Cluster, error) { log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster") + var ( + err error + ) + cluster := &Cluster{ engines: make(map[string]*cluster.Engine), scheduler: scheduler, store: store, overcommitRatio: 0.05, - heartbeat: 25, - discovery: dflag, TLSConfig: TLSConfig, } @@ -53,29 +55,25 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *t cluster.overcommitRatio = val } - if heartbeat, ok := options.Uint("swarm.discovery.heartbeat"); ok { - cluster.heartbeat = heartbeat - if cluster.heartbeat < 1 { - return nil, errors.New("heartbeat should be an unsigned integer and greater than 0") + heartbeat := 25 * time.Second + if opt, ok := options.String("swarm.discovery.heartbeat"); ok { + h, err := time.ParseDuration(opt) + if err != nil { + return nil, err } + if h < 1*time.Second { + return nil, fmt.Errorf("invalid heartbeat %s: must be at least 1s", opt) + } + heartbeat = h } - // get the list of entries from the discovery service - go func() { - d, err := discovery.New(cluster.discovery, cluster.heartbeat) - if err != nil { - log.Fatal(err) - } - - entries, err := d.Fetch() - if err != nil { - log.Fatal(err) - - } - cluster.newEntries(entries) - - go d.Watch(cluster.newEntries) - }() + // Set up discovery. + cluster.discovery, err = discovery.New(dflag, heartbeat) + if err != nil { + log.Fatal(err) + } + discoveryCh, errCh := cluster.discovery.Watch(nil) + go cluster.monitorDiscovery(discoveryCh, errCh) return cluster, nil } @@ -165,40 +163,6 @@ func (c *Cluster) RemoveContainer(container *cluster.Container, force bool) erro return nil } -// Entries are Docker Engines -func (c *Cluster) newEntries(entries []*discovery.Entry) { - for _, entry := range entries { - go func(m *discovery.Entry) { - if !c.hasEngine(m.String()) { - engine := cluster.NewEngine(m.String(), c.overcommitRatio) - if err := engine.Connect(c.TLSConfig); err != nil { - log.Error(err) - return - } - c.Lock() - - if old, exists := c.engines[engine.ID]; exists { - c.Unlock() - if old.Addr != engine.Addr { - log.Errorf("ID duplicated. %s shared by %s and %s", engine.ID, old.Addr, engine.Addr) - } else { - log.Debugf("node %q (name: %q) with address %q is already registered", engine.ID, engine.Name, engine.Addr) - } - return - } - c.engines[engine.ID] = engine - if err := engine.RegisterEventHandler(c); err != nil { - log.Error(err) - c.Unlock() - return - } - c.Unlock() - - } - }(entry) - } -} - func (c *Cluster) hasEngine(addr string) bool { c.RLock() defer c.RUnlock() @@ -211,6 +175,60 @@ func (c *Cluster) hasEngine(addr string) bool { return false } +func (c *Cluster) addEngine(addr string) bool { + // Check the engine is already registered by address. + if c.hasEngine(addr) { + return false + } + + // Attempt a connection to the engine. Since this is slow, don't get a hold + // of the lock yet. + engine := cluster.NewEngine(addr, c.overcommitRatio) + if err := engine.Connect(c.TLSConfig); err != nil { + log.Error(err) + return false + } + + // The following is critical and fast. Grab a lock. + c.Lock() + defer c.Unlock() + + // Make sure the engine ID is unique. + if old, exists := c.engines[engine.ID]; exists { + if old.Addr != engine.Addr { + log.Errorf("ID duplicated. %s shared by %s and %s", engine.ID, old.Addr, engine.Addr) + } else { + log.Debugf("node %q (name: %q) with address %q is already registered", engine.ID, engine.Name, engine.Addr) + } + return false + } + + // Finally register the engine. + c.engines[engine.ID] = engine + if err := engine.RegisterEventHandler(c); err != nil { + log.Error(err) + } + return true +} + +// Entries are Docker Engines +func (c *Cluster) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error) { + // Watch changes on the discovery channel. + for { + select { + case entries := <-ch: + // Attempt to add every engine. `addEngine` will take care of duplicates. + // Since `addEngine` can be very slow (it has to connect to the + // engine), we are going to launch them in parallel. + for _, entry := range entries { + go c.addEngine(entry.String()) + } + case err := <-errCh: + log.Errorf("Discovery error: %v", err) + } + } +} + // Images returns all the images in the cluster. func (c *Cluster) Images() []*cluster.Image { c.RLock() diff --git a/discovery/discovery.go b/discovery/discovery.go index f5f391d158..e9e44adb77 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -5,6 +5,7 @@ import ( "fmt" "net" "strings" + "time" log "github.com/Sirupsen/logrus" ) @@ -25,20 +26,44 @@ func NewEntry(url string) (*Entry, error) { } // String returns the string form of an entry. -func (m Entry) String() string { - return fmt.Sprintf("%s:%s", m.Host, m.Port) +func (e *Entry) String() string { + return fmt.Sprintf("%s:%s", e.Host, e.Port) } -// WatchCallback is the type of the function called to monitor entries -// on a discovery endpoint. -type WatchCallback func(entries []*Entry) +// Equals returns true if cmp contains the same data. +func (e *Entry) Equals(cmp *Entry) bool { + return e.Host == cmp.Host && e.Port == cmp.Port +} + +// Entries is a list of *Entry with some helpers. +type Entries []*Entry + +// Equals returns true if cmp contains the same data. +func (e Entries) Equals(cmp Entries) bool { + // Check if the file has really changed. + if len(e) != len(cmp) { + return false + } + for i := range e { + if !e[i].Equals(cmp[i]) { + return false + } + } + return true +} // The Discovery interface is implemented by Discovery backends which // manage swarm host entries. type Discovery interface { - Initialize(string, uint64) error - Fetch() ([]*Entry, error) - Watch(WatchCallback) + // Initialize the discovery with URIs and a heartbeat. + Initialize(string, time.Duration) error + + // Watch the discovery for entry changes. + // Returns a channel that will receive changes or an error. + // Providing a non-nil stopCh can be used to stop watching. + Watch(stopCh <-chan struct{}) (<-chan Entries, <-chan error) + + // Register to the discovery Register(string) error } @@ -79,7 +104,7 @@ func parse(rawurl string) (string, string) { // New returns a new Discovery given a URL and heartbeat settings. // Returns an error if the URL scheme is not supported. -func New(rawurl string, heartbeat uint64) (Discovery, error) { +func New(rawurl string, heartbeat time.Duration) (Discovery, error) { scheme, uri := parse(rawurl) if discovery, exists := discoveries[scheme]; exists { @@ -92,8 +117,8 @@ func New(rawurl string, heartbeat uint64) (Discovery, error) { } // CreateEntries returns an array of entries based on the given addresses. -func CreateEntries(addrs []string) ([]*Entry, error) { - entries := []*Entry{} +func CreateEntries(addrs []string) (Entries, error) { + entries := Entries{} if addrs == nil { return entries, nil } diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go index 154086e3c3..0c5127473c 100644 --- a/discovery/discovery_test.go +++ b/discovery/discovery_test.go @@ -8,9 +8,9 @@ import ( func TestNewEntry(t *testing.T) { entry, err := NewEntry("127.0.0.1:2375") - assert.Equal(t, entry.Host, "127.0.0.1") - assert.Equal(t, entry.Port, "2375") assert.NoError(t, err) + assert.True(t, entry.Equals(&Entry{Host: "127.0.0.1", Port: "2375"})) + assert.Equal(t, entry.String(), "127.0.0.1:2375") _, err = NewEntry("127.0.0.1") assert.Error(t, err) @@ -40,15 +40,43 @@ func TestParse(t *testing.T) { func TestCreateEntries(t *testing.T) { entries, err := CreateEntries(nil) - assert.Equal(t, entries, []*Entry{}) + assert.Equal(t, entries, Entries{}) assert.NoError(t, err) entries, err = CreateEntries([]string{"127.0.0.1:2375", "127.0.0.2:2375", ""}) - assert.Equal(t, len(entries), 2) - assert.Equal(t, entries[0].String(), "127.0.0.1:2375") - assert.Equal(t, entries[1].String(), "127.0.0.2:2375") assert.NoError(t, err) + expected := Entries{ + &Entry{Host: "127.0.0.1", Port: "2375"}, + &Entry{Host: "127.0.0.2", Port: "2375"}, + } + assert.True(t, entries.Equals(expected)) _, err = CreateEntries([]string{"127.0.0.1", "127.0.0.2"}) assert.Error(t, err) } + +func TestEntriesEquality(t *testing.T) { + entries := Entries{ + &Entry{Host: "127.0.0.1", Port: "2375"}, + &Entry{Host: "127.0.0.2", Port: "2375"}, + } + + // Same + assert.True(t, entries.Equals(Entries{ + &Entry{Host: "127.0.0.1", Port: "2375"}, + &Entry{Host: "127.0.0.2", Port: "2375"}, + })) + + // Different size + assert.False(t, entries.Equals(Entries{ + &Entry{Host: "127.0.0.1", Port: "2375"}, + &Entry{Host: "127.0.0.2", Port: "2375"}, + &Entry{Host: "127.0.0.3", Port: "2375"}, + })) + + // Different content + assert.False(t, entries.Equals(Entries{ + &Entry{Host: "127.0.0.1", Port: "2375"}, + &Entry{Host: "127.0.0.42", Port: "2375"}, + })) +} diff --git a/discovery/file/file.go b/discovery/file/file.go index ba43aee077..4eb146ca7a 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -1,6 +1,7 @@ package file import ( + "fmt" "io/ioutil" "strings" "time" @@ -10,7 +11,7 @@ import ( // Discovery is exported type Discovery struct { - heartbeat uint64 + heartbeat time.Duration path string } @@ -19,7 +20,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(path string, heartbeat uint64) error { +func (s *Discovery) Initialize(path string, heartbeat time.Duration) error { s.path = path s.heartbeat = heartbeat return nil @@ -46,23 +47,55 @@ func parseFileContent(content []byte) []string { return result } -// Fetch is exported -func (s *Discovery) Fetch() ([]*discovery.Entry, error) { +func (s *Discovery) fetch() (discovery.Entries, error) { fileContent, err := ioutil.ReadFile(s.path) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to read '%s': %v", s.path, err) } return discovery.CreateEntries(parseFileContent(fileContent)) } // Watch is exported -func (s *Discovery) Watch(callback discovery.WatchCallback) { - for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) { - entries, err := s.Fetch() - if err == nil { - callback(entries) +func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { + ch := make(chan discovery.Entries) + errCh := make(chan error) + ticker := time.NewTicker(s.heartbeat) + + go func() { + defer close(errCh) + defer close(ch) + + // Send the initial entries if available. + currentEntries, err := s.fetch() + if err != nil { + errCh <- err + } else { + ch <- currentEntries } - } + + // Periodically send updates. + for { + select { + case <-ticker.C: + newEntries, err := s.fetch() + if err != nil { + errCh <- err + continue + } + + // Check if the file has really changed. + if !newEntries.Equals(currentEntries) { + ch <- newEntries + } + currentEntries = newEntries + case <-stopCh: + ticker.Stop() + return + } + } + }() + + return ch, errCh } // Register is exported diff --git a/discovery/file/file_test.go b/discovery/file/file_test.go index 14ac918b82..0ec05972dc 100644 --- a/discovery/file/file_test.go +++ b/discovery/file/file_test.go @@ -1,15 +1,24 @@ package file import ( + "io/ioutil" + "os" "testing" + "github.com/docker/swarm/discovery" "github.com/stretchr/testify/assert" ) func TestInitialize(t *testing.T) { - discovery := &Discovery{} - discovery.Initialize("/path/to/file", 0) - assert.Equal(t, discovery.path, "/path/to/file") + d := &Discovery{} + d.Initialize("/path/to/file", 0) + assert.Equal(t, d.path, "/path/to/file") +} + +func TestNew(t *testing.T) { + d, err := discovery.New("file:///path/to/file", 0) + assert.NoError(t, err) + assert.Equal(t, d.(*Discovery).path, "/path/to/file") } func TestContent(t *testing.T) { @@ -18,6 +27,7 @@ func TestContent(t *testing.T) { 2.2.2.[2:4]:2222 ` ips := parseFileContent([]byte(data)) + assert.Len(t, ips, 5) assert.Equal(t, ips[0], "1.1.1.1:1111") assert.Equal(t, ips[1], "1.1.1.2:1111") assert.Equal(t, ips[2], "2.2.2.2:2222") @@ -40,7 +50,53 @@ func TestParsingContentsWithComments(t *testing.T) { ### test ### ` ips := parseFileContent([]byte(data)) - assert.Equal(t, 2, len(ips)) + assert.Len(t, ips, 2) assert.Equal(t, "1.1.1.1:1111", ips[0]) assert.Equal(t, "3.3.3.3:3333", ips[1]) } + +func TestWatch(t *testing.T) { + data := ` +1.1.1.1:1111 +2.2.2.2:2222 +` + expected := discovery.Entries{ + &discovery.Entry{Host: "1.1.1.1", Port: "1111"}, + &discovery.Entry{Host: "2.2.2.2", Port: "2222"}, + } + + // Create a temporary file and remove it. + tmp, err := ioutil.TempFile(os.TempDir(), "discovery-file-test") + assert.NoError(t, err) + assert.NoError(t, tmp.Close()) + assert.NoError(t, os.Remove(tmp.Name())) + + // Set up file discovery. + d := &Discovery{} + d.Initialize(tmp.Name(), 1) + stopCh := make(chan struct{}) + ch, errCh := d.Watch(stopCh) + + // Make sure it fires errors since the file doesn't exist. + assert.Error(t, <-errCh) + // We have to drain the error channel otherwise Watch will get stuck. + go func() { + for _ = range errCh { + } + }() + + // Write the file and make sure we get the expected value back. + assert.NoError(t, ioutil.WriteFile(tmp.Name(), []byte(data), 0600)) + assert.Equal(t, <-ch, expected) + + // Add a new entry and look it up. + data += "\n3.3.3.3:3333\n" + expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"}) + assert.NoError(t, ioutil.WriteFile(tmp.Name(), []byte(data), 0600)) + assert.Equal(t, <-ch, expected) + + // Stop and make sure it closes all channels. + close(stopCh) + assert.Nil(t, <-ch) + assert.Nil(t, <-errCh) +} diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go index 7882bfa1c7..3bffb105c9 100644 --- a/discovery/kv/kv.go +++ b/discovery/kv/kv.go @@ -26,7 +26,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(uris string, heartbeat uint64) error { +func (s *Discovery) Initialize(uris string, heartbeat time.Duration) error { var ( parts = strings.SplitN(uris, "/", 2) ips = strings.Split(parts[0], ",") @@ -42,7 +42,7 @@ func (s *Discovery) Initialize(uris string, heartbeat uint64) error { addrs = append(addrs, ip) } - s.heartbeat = time.Duration(heartbeat) * time.Second + s.heartbeat = heartbeat s.prefix = parts[1] // Creates a new store, will ignore options given @@ -54,46 +54,72 @@ func (s *Discovery) Initialize(uris string, heartbeat uint64) error { Timeout: s.heartbeat, }, ) - if err != nil { - return err - } - - return nil + return err } -// Fetch is exported -func (s *Discovery) Fetch() ([]*discovery.Entry, error) { - addrs, err := s.store.List(s.prefix) - if err != nil { - return nil, err +// Watch the store until either there's a store error or we receive a stop request. +// Returns false if we shouldn't attempt watching the store anymore (stop request received). +func (s *Discovery) watchOnce(stopCh <-chan struct{}, watchCh <-chan []*store.KVPair, discoveryCh chan discovery.Entries, errCh chan error) bool { + for { + select { + case pairs := <-watchCh: + if pairs == nil { + return true + } + + log.WithField("discovery", s.backend).Debugf("Watch triggered with %d nodes", len(pairs)) + + // Convert `KVPair` into `discovery.Entry`. + addrs := make([]string, len(pairs)) + for _, pair := range pairs { + addrs = append(addrs, string(pair.Value)) + } + + entries, err := discovery.CreateEntries(addrs) + if err != nil { + errCh <- err + } else { + discoveryCh <- entries + } + case <-stopCh: + // We were requested to stop watching. + return false + } } - return discovery.CreateEntries(convertToStringArray(addrs)) } // Watch is exported -func (s *Discovery) Watch(callback discovery.WatchCallback) { - ch, err := s.store.WatchTree(s.prefix, nil) - if err != nil { - log.WithField("discovery", s.backend).Errorf("Watch failed: %v", err) - return - } - for kv := range ch { - log.WithField("discovery", s.backend).Debugf("Watch triggered with %d nodes", len(kv)) - // Traduce byte array entries to discovery.Entry - entries, _ := discovery.CreateEntries(convertToStringArray(kv)) - callback(entries) - } +func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { + ch := make(chan discovery.Entries) + errCh := make(chan error) + + go func() { + defer close(ch) + defer close(errCh) + + // Forever: Create a store watch, watch until we get an error and then try again. + // Will only stop if we receive a stopCh request. + for { + // Set up a watch. + watchCh, err := s.store.WatchTree(s.prefix, stopCh) + if err != nil { + errCh <- err + } else { + if !s.watchOnce(stopCh, watchCh, ch, errCh) { + return + } + } + + // If we get here it means the store watch channel was closed. This + // is unexpected so let's retry later. + errCh <- fmt.Errorf("Unexpected watch error") + time.Sleep(s.heartbeat) + } + }() + return ch, errCh } // Register is exported func (s *Discovery) Register(addr string) error { - err := s.store.Put(path.Join(s.prefix, addr), []byte(addr)) - return err -} - -func convertToStringArray(entries []*store.KVPair) (addrs []string) { - for _, entry := range entries { - addrs = append(addrs, string(entry.Value)) - } - return addrs + return s.store.Put(path.Join(s.prefix, addr), []byte(addr)) } diff --git a/discovery/kv/kv_test.go b/discovery/kv/kv_test.go index 6a2b0c7fef..381171f415 100644 --- a/discovery/kv/kv_test.go +++ b/discovery/kv/kv_test.go @@ -1,20 +1,89 @@ package kv import ( + "errors" "testing" + "time" + "github.com/docker/swarm/discovery" + "github.com/docker/swarm/pkg/store" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) func TestInitialize(t *testing.T) { - discoveryService := &Discovery{} + d := &Discovery{backend: store.MOCK} + assert.EqualError(t, d.Initialize("127.0.0.1", 0), "invalid format \"127.0.0.1\", missing ") - assert.Equal(t, discoveryService.Initialize("127.0.0.1", 0).Error(), "invalid format \"127.0.0.1\", missing ") - - assert.Error(t, discoveryService.Initialize("127.0.0.1/path", 0)) - assert.Equal(t, discoveryService.prefix, "path") - - assert.Error(t, discoveryService.Initialize("127.0.0.1,127.0.0.2,127.0.0.3/path", 0)) - assert.Equal(t, discoveryService.prefix, "path") + d = &Discovery{backend: store.MOCK} + assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0)) + s := d.store.(*store.Mock) + assert.Len(t, s.Endpoints, 1) + assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234") + assert.Equal(t, d.prefix, "path") + d = &Discovery{backend: store.MOCK} + assert.NoError(t, d.Initialize("127.0.0.1:1234,127.0.0.2:1234,127.0.0.3:1234/path", 0)) + s = d.store.(*store.Mock) + assert.Len(t, s.Endpoints, 3) + assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234") + assert.Equal(t, s.Endpoints[1], "127.0.0.2:1234") + assert.Equal(t, s.Endpoints[2], "127.0.0.3:1234") + assert.Equal(t, d.prefix, "path") +} + +func TestWatch(t *testing.T) { + d := &Discovery{backend: store.MOCK} + assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0)) + s := d.store.(*store.Mock) + + mockCh := make(chan []*store.KVPair) + + // The first watch will fail. + s.On("WatchTree", "path", mock.Anything).Return(mockCh, errors.New("test error")).Once() + // The second one will succeed. + s.On("WatchTree", "path", mock.Anything).Return(mockCh, nil).Once() + expected := discovery.Entries{ + &discovery.Entry{Host: "1.1.1.1", Port: "1111"}, + &discovery.Entry{Host: "2.2.2.2", Port: "2222"}, + } + kvs := []*store.KVPair{ + {Key: "path/1.1.1.1", Value: []byte("1.1.1.1:1111")}, + {Key: "path/1.1.1.1", Value: []byte("2.2.2.2:2222")}, + } + + stopCh := make(chan struct{}) + ch, errCh := d.Watch(stopCh) + + // It should fire an error since the first WatchRange call failed. + assert.EqualError(t, <-errCh, "test error") + // We have to drain the error channel otherwise Watch will get stuck. + go func() { + for _ = range errCh { + } + }() + + // Push the entries into the store channel and make sure discovery emits. + mockCh <- kvs + assert.Equal(t, <-ch, expected) + + // Add a new entry. + expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"}) + kvs = append(kvs, &store.KVPair{Key: "path/3.3.3.3", Value: []byte("3.3.3.3:3333")}) + mockCh <- kvs + assert.Equal(t, <-ch, expected) + + // Make sure that if an error occurs it retries. + // This third call to WatchTree will be checked later by AssertExpectations. + s.On("WatchTree", "path", mock.Anything).Return(mockCh, nil) + close(mockCh) + // Give it enough time to call WatchTree. + time.Sleep(3) + + // Stop and make sure it closes all channels. + close(stopCh) + assert.Nil(t, <-ch) + assert.Nil(t, <-errCh) + + s.AssertExpectations(t) } diff --git a/discovery/nodes/nodes.go b/discovery/nodes/nodes.go index 009d275ff2..68d8b0ec0b 100644 --- a/discovery/nodes/nodes.go +++ b/discovery/nodes/nodes.go @@ -2,13 +2,14 @@ package nodes import ( "strings" + "time" "github.com/docker/swarm/discovery" ) // Discovery is exported type Discovery struct { - entries []*discovery.Entry + entries discovery.Entries } func init() { @@ -16,7 +17,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(uris string, _ uint64) error { +func (s *Discovery) Initialize(uris string, _ time.Duration) error { for _, input := range strings.Split(uris, ",") { for _, ip := range discovery.Generate(input) { entry, err := discovery.NewEntry(ip) @@ -30,13 +31,15 @@ func (s *Discovery) Initialize(uris string, _ uint64) error { return nil } -// Fetch is exported -func (s *Discovery) Fetch() ([]*discovery.Entry, error) { - return s.entries, nil -} - // Watch is exported -func (s *Discovery) Watch(callback discovery.WatchCallback) { +func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { + ch := make(chan discovery.Entries) + go func() { + defer close(ch) + ch <- s.entries + <-stopCh + }() + return ch, nil } // Register is exported diff --git a/discovery/nodes/nodes_test.go b/discovery/nodes/nodes_test.go index 04f682cabb..4c0583cbae 100644 --- a/discovery/nodes/nodes_test.go +++ b/discovery/nodes/nodes_test.go @@ -3,29 +3,41 @@ package nodes import ( "testing" + "github.com/docker/swarm/discovery" "github.com/stretchr/testify/assert" ) -func TestInitialise(t *testing.T) { - discovery := &Discovery{} - discovery.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0) - assert.Equal(t, len(discovery.entries), 2) - assert.Equal(t, discovery.entries[0].String(), "1.1.1.1:1111") - assert.Equal(t, discovery.entries[1].String(), "2.2.2.2:2222") +func TestInitialize(t *testing.T) { + d := &Discovery{} + d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0) + assert.Equal(t, len(d.entries), 2) + assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111") + assert.Equal(t, d.entries[1].String(), "2.2.2.2:2222") } -func TestInitialiseWithPattern(t *testing.T) { - discovery := &Discovery{} - discovery.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0) - assert.Equal(t, len(discovery.entries), 5) - assert.Equal(t, discovery.entries[0].String(), "1.1.1.1:1111") - assert.Equal(t, discovery.entries[1].String(), "1.1.1.2:1111") - assert.Equal(t, discovery.entries[2].String(), "2.2.2.2:2222") - assert.Equal(t, discovery.entries[3].String(), "2.2.2.3:2222") - assert.Equal(t, discovery.entries[4].String(), "2.2.2.4:2222") +func TestInitializeWithPattern(t *testing.T) { + d := &Discovery{} + d.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0) + assert.Equal(t, len(d.entries), 5) + assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111") + assert.Equal(t, d.entries[1].String(), "1.1.1.2:1111") + assert.Equal(t, d.entries[2].String(), "2.2.2.2:2222") + assert.Equal(t, d.entries[3].String(), "2.2.2.3:2222") + assert.Equal(t, d.entries[4].String(), "2.2.2.4:2222") +} + +func TestWatch(t *testing.T) { + d := &Discovery{} + d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0) + expected := discovery.Entries{ + &discovery.Entry{Host: "1.1.1.1", Port: "1111"}, + &discovery.Entry{Host: "2.2.2.2", Port: "2222"}, + } + ch, _ := d.Watch(nil) + assert.True(t, expected.Equals(<-ch)) } func TestRegister(t *testing.T) { - discovery := &Discovery{} - assert.Error(t, discovery.Register("0.0.0.0")) + d := &Discovery{} + assert.Error(t, d.Register("0.0.0.0")) } diff --git a/discovery/token/token.go b/discovery/token/token.go index bada10ce52..84ec0be8cd 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -17,7 +17,7 @@ const DiscoveryURL = "https://discovery-stage.hub.docker.com/v1" // Discovery is exported type Discovery struct { - heartbeat uint64 + heartbeat time.Duration url string token string } @@ -27,7 +27,7 @@ func init() { } // Initialize is exported -func (s *Discovery) Initialize(urltoken string, heartbeat uint64) error { +func (s *Discovery) Initialize(urltoken string, heartbeat time.Duration) error { if i := strings.LastIndex(urltoken, "/"); i != -1 { s.url = "https://" + urltoken[:i] s.token = urltoken[i+1:] @@ -45,8 +45,7 @@ func (s *Discovery) Initialize(urltoken string, heartbeat uint64) error { } // Fetch returns the list of entries for the discovery service at the specified endpoint -func (s *Discovery) Fetch() ([]*discovery.Entry, error) { - +func (s *Discovery) fetch() (discovery.Entries, error) { resp, err := http.Get(fmt.Sprintf("%s/%s/%s", s.url, "clusters", s.token)) if err != nil { return nil, err @@ -57,7 +56,7 @@ func (s *Discovery) Fetch() ([]*discovery.Entry, error) { var addrs []string if resp.StatusCode == http.StatusOK { if err := json.NewDecoder(resp.Body).Decode(&addrs); err != nil { - return nil, err + return nil, fmt.Errorf("Failed to decode response: %v", err) } } else { return nil, fmt.Errorf("Failed to fetch entries, Discovery service returned %d HTTP status code", resp.StatusCode) @@ -67,13 +66,46 @@ func (s *Discovery) Fetch() ([]*discovery.Entry, error) { } // Watch is exported -func (s *Discovery) Watch(callback discovery.WatchCallback) { - for _ = range time.Tick(time.Duration(s.heartbeat) * time.Second) { - entries, err := s.Fetch() - if err == nil { - callback(entries) +func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { + ch := make(chan discovery.Entries) + ticker := time.NewTicker(s.heartbeat) + errCh := make(chan error) + + go func() { + defer close(ch) + defer close(errCh) + + // Send the initial entries if available. + currentEntries, err := s.fetch() + if err != nil { + errCh <- err + } else { + ch <- currentEntries } - } + + // Periodically send updates. + for { + select { + case <-ticker.C: + newEntries, err := s.fetch() + if err != nil { + errCh <- err + continue + } + + // Check if the file has really changed. + if !newEntries.Equals(currentEntries) { + ch <- newEntries + } + currentEntries = newEntries + case <-stopCh: + ticker.Stop() + return + } + } + }() + + return ch, nil } // Register adds a new entry identified by the into the discovery service diff --git a/discovery/token/token_test.go b/discovery/token/token_test.go index a4e1f42edb..47d9d8856c 100644 --- a/discovery/token/token_test.go +++ b/discovery/token/token_test.go @@ -2,7 +2,9 @@ package token import ( "testing" + "time" + "github.com/docker/swarm/discovery" "github.com/stretchr/testify/assert" ) @@ -23,14 +25,24 @@ func TestInitialize(t *testing.T) { } func TestRegister(t *testing.T) { - discovery := &Discovery{token: "TEST_TOKEN", url: DiscoveryURL} + d := &Discovery{token: "TEST_TOKEN", url: DiscoveryURL, heartbeat: 1} expected := "127.0.0.1:2675" - assert.NoError(t, discovery.Register(expected)) - - addrs, err := discovery.Fetch() + expectedEntries, err := discovery.CreateEntries([]string{expected}) assert.NoError(t, err) - assert.Equal(t, len(addrs), 1) - assert.Equal(t, addrs[0].String(), expected) - assert.NoError(t, discovery.Register(expected)) + // Register + assert.NoError(t, d.Register(expected)) + + // Watch + ch, errCh := d.Watch(nil) + select { + case entries := <-ch: + assert.True(t, entries.Equals(expectedEntries)) + case err := <-errCh: + t.Fatal(err) + case <-time.After(5 * time.Second): + t.Fatal("Timed out") + } + + assert.NoError(t, d.Register(expected)) } diff --git a/pkg/store/consul.go b/pkg/store/consul.go index 32cddf0a93..f2696453b1 100644 --- a/pkg/store/consul.go +++ b/pkg/store/consul.go @@ -10,6 +10,13 @@ import ( api "github.com/hashicorp/consul/api" ) +const ( + // DefaultWatchWaitTime is how long we block for at a time to check if the + // watched key has changed. This affects the minimum time it takes to + // cancel a watch. + DefaultWatchWaitTime = 15 * time.Second +) + // Consul embeds the client and watches type Consul struct { config *api.Config @@ -145,7 +152,9 @@ func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, erro go func() { defer close(watchCh) - opts := &api.QueryOptions{} + // Use a wait time in order to check if we should quit from time to + // time. + opts := &api.QueryOptions{WaitTime: DefaultWatchWaitTime} for { // Check if we should quit select { @@ -158,6 +167,11 @@ func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, erro log.Errorf("consul: %v", err) return } + // If LastIndex didn't change then it means `Get` returned because + // of the WaitTime and the key didn't change. + if opts.WaitIndex == meta.LastIndex { + continue + } opts.WaitIndex = meta.LastIndex // FIXME: What happens when a key is deleted? if pair != nil { @@ -181,7 +195,9 @@ func (s *Consul) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVP go func() { defer close(watchCh) - opts := &api.QueryOptions{} + // Use a wait time in order to check if we should quit from time to + // time. + opts := &api.QueryOptions{WaitTime: DefaultWatchWaitTime} for { // Check if we should quit select { @@ -195,6 +211,12 @@ func (s *Consul) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVP log.Errorf("consul: %v", err) return } + // If LastIndex didn't change then it means `Get` returned because + // of the WaitTime and the key didn't change. + if opts.WaitIndex == meta.LastIndex { + continue + } + opts.WaitIndex = meta.LastIndex kv := []*KVPair{} for _, pair := range pairs { if pair.Key == prefix { @@ -202,7 +224,6 @@ func (s *Consul) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVP } kv = append(kv, &KVPair{pair.Key, pair.Value, pair.ModifyIndex}) } - opts.WaitIndex = meta.LastIndex watchCh <- kv } }() diff --git a/pkg/store/mock.go b/pkg/store/mock.go new file mode 100644 index 0000000000..3b70955628 --- /dev/null +++ b/pkg/store/mock.go @@ -0,0 +1,87 @@ +package store + +import "github.com/stretchr/testify/mock" + +// Mock store. Mocks all Store functions using testify.Mock. +type Mock struct { + mock.Mock + + // Endpoints passed to InitializeMock + Endpoints []string + // Options passed to InitializeMock + Options *Config +} + +// InitializeMock creates a Mock store. +func InitializeMock(endpoints []string, options *Config) (Store, error) { + s := &Mock{} + s.Endpoints = endpoints + s.Options = options + return s, nil +} + +// Put mock +func (s *Mock) Put(key string, value []byte) error { + args := s.Mock.Called(key, value) + return args.Error(0) +} + +// Get mock +func (s *Mock) Get(key string) (*KVPair, error) { + args := s.Mock.Called(key) + return args.Get(0).(*KVPair), args.Error(1) +} + +// Delete mock +func (s *Mock) Delete(key string) error { + args := s.Mock.Called(key) + return args.Error(0) +} + +// Exists mock +func (s *Mock) Exists(key string) (bool, error) { + args := s.Mock.Called(key) + return args.Bool(0), args.Error(1) +} + +// Watch mock +func (s *Mock) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) { + args := s.Mock.Called(key, stopCh) + return args.Get(0).(<-chan *KVPair), args.Error(1) +} + +// WatchTree mock +func (s *Mock) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) { + args := s.Mock.Called(prefix, stopCh) + return args.Get(0).(chan []*KVPair), args.Error(1) +} + +// CreateLock mock +func (s *Mock) CreateLock(key string, value []byte) (Locker, error) { + args := s.Mock.Called(key, value) + return args.Get(0).(Locker), args.Error(1) +} + +// List mock +func (s *Mock) List(prefix string) ([]*KVPair, error) { + args := s.Mock.Called(prefix) + return args.Get(0).([]*KVPair), args.Error(1) +} + +// DeleteTree mock +func (s *Mock) DeleteTree(prefix string) error { + args := s.Mock.Called(prefix) + return args.Error(0) +} + +// AtomicPut mock +func (s *Mock) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) { + args := s.Mock.Called(key, value, previous) + return args.Bool(0), args.Error(1) +} + +// AtomicDelete mock +func (s *Mock) AtomicDelete(key string, previous *KVPair) (bool, error) { + args := s.Mock.Called(key, previous) + return args.Bool(0), args.Error(1) +} diff --git a/pkg/store/store.go b/pkg/store/store.go index 442ee548f1..d10ecab503 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -12,8 +12,10 @@ import ( type Backend string const ( + // MOCK backend + MOCK Backend = "mock" // CONSUL backend - CONSUL Backend = "consul" + CONSUL = "consul" // ETCD backend ETCD = "etcd" // ZK backend @@ -114,6 +116,7 @@ type Initialize func(addrs []string, options *Config) (Store, error) var ( // Backend initializers initializers = map[Backend]Initialize{ + MOCK: InitializeMock, CONSUL: InitializeConsul, ETCD: InitializeEtcd, ZK: InitializeZookeeper, diff --git a/pkg/store/zookeeper.go b/pkg/store/zookeeper.go index fa324036ae..b55e7f42f7 100644 --- a/pkg/store/zookeeper.go +++ b/pkg/store/zookeeper.go @@ -9,7 +9,6 @@ import ( ) // Zookeeper embeds the zookeeper client -// and list of watches type Zookeeper struct { timeout time.Duration client *zk.Conn diff --git a/test/integration/discovery-consul.bats b/test/integration/discovery-consul.bats deleted file mode 100644 index 1d5c42535b..0000000000 --- a/test/integration/discovery-consul.bats +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env bats - -load helpers - -# Address on which Consul will listen (random port between 8000 and 9000). -CONSUL_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 8000 )) - -# Container name for integration test -CONTAINER_NAME=swarm_consul - -function check_leader() { - # Confirm Cluster leader election - docker_host logs $CONTAINER_NAME | grep "New leader elected" - # Check member state - docker_host logs $CONTAINER_NAME | grep "consul: member '$CONTAINER_NAME' joined, marking health alive" -} - -function start_consul() { - docker_host run --name=$CONTAINER_NAME -h $CONTAINER_NAME -p $CONSUL_HOST:8500 -d progrium/consul -server -bootstrap-expect 1 -data-dir /test - # Check if consul cluster leader is elected - retry 30 1 check_leader -} - -function stop_consul() { - docker_host rm -f -v $CONTAINER_NAME -} - -function setup() { - start_consul -} - -function teardown() { - swarm_manage_cleanup - swarm_join_cleanup - stop_docker - stop_consul -} - -@test "consul discovery" { - # Start 2 engines and make them join the cluster. - start_docker 2 - swarm_join "consul://${CONSUL_HOST}/test" - - # Start a manager and ensure it sees all the engines. - swarm_manage "consul://${CONSUL_HOST}/test" - check_swarm_nodes - - # Add another engine to the cluster and make sure it's picked up by swarm. - start_docker 1 - swarm_join "consul://${CONSUL_HOST}/test" - retry 30 1 check_swarm_nodes -} diff --git a/test/integration/discovery-etcd.bats b/test/integration/discovery-etcd.bats deleted file mode 100644 index 19c2ba9301..0000000000 --- a/test/integration/discovery-etcd.bats +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env bats - -load helpers - -# Address on which Etcd will listen (random port between 9000 and 10,000). -ETCD_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 9000 )) - -# Container name for integration test -CONTAINER_NAME=swarm_etcd - -function check_leader() { - # Confirm Cluster leader election - docker_host logs $CONTAINER_NAME | grep "state changed from 'follower' to 'leader'" - # Check leader event - docker_host logs $CONTAINER_NAME | grep "leader changed from '' to" -} - -function start_etcd() { - docker_host run -p $ETCD_HOST:4001 --name=$CONTAINER_NAME -d coreos/etcd - # Check if etcd cluster leader is elected - retry 30 1 check_leader -} - -function stop_etcd() { - docker_host rm -f -v $CONTAINER_NAME -} - -function setup() { - start_etcd -} - -function teardown() { - swarm_manage_cleanup - swarm_join_cleanup - stop_docker - stop_etcd -} - -@test "etcd discovery" { - # Start 2 engines and make them join the cluster. - start_docker 2 - swarm_join "etcd://${ETCD_HOST}/test" - - # Start a manager and ensure it sees all the engines. - swarm_manage "etcd://${ETCD_HOST}/test" - check_swarm_nodes - - # Add another engine to the cluster and make sure it's picked up by swarm. - start_docker 1 - swarm_join "etcd://${ETCD_HOST}/test" - retry 30 1 check_swarm_nodes -} diff --git a/test/integration/discovery-file.bats b/test/integration/discovery-file.bats deleted file mode 100644 index 98b91557c9..0000000000 --- a/test/integration/discovery-file.bats +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env bats - -load helpers - -# create a blank temp file for discovery -DISCOVERY_FILE=$(mktemp) - -function teardown() { - swarm_manage_cleanup - stop_docker - rm -f "$DISCOVERY_FILE" -} - -function setup_file_discovery() { - rm -f "$DISCOVERY_FILE" - for host in ${HOSTS[@]}; do - echo "$host" >> $DISCOVERY_FILE - done -} - -@test "file discovery" { - # Start 2 engines, register them in a file, then start swarm and make sure - # it sees them. - start_docker 2 - setup_file_discovery - swarm_manage "file://$DISCOVERY_FILE" - check_swarm_nodes - - # Add another engine to the cluster, update the discovery file and make - # sure it's picked up by swarm. - start_docker 1 - setup_file_discovery - retry 10 1 check_swarm_nodes -} diff --git a/test/integration/discovery-token.bats b/test/integration/discovery-token.bats deleted file mode 100644 index 877954fbf9..0000000000 --- a/test/integration/discovery-token.bats +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env bats - -load helpers - -TOKEN="" - -function token_cleanup() { - [ -z "$TOKEN" ] && return - echo "Removing $TOKEN" - curl -X DELETE "https://discovery-stage.hub.docker.com/v1/clusters/$TOKEN" -} - -function teardown() { - swarm_manage_cleanup - swarm_join_cleanup - stop_docker - token_cleanup -} - -@test "token discovery" { - # Create a cluster and validate the token. - run swarm create - [ "$status" -eq 0 ] - [[ "$output" =~ ^[0-9a-f]{32}$ ]] - TOKEN="$output" - - # Start 2 engines and make them join the cluster. - start_docker 2 - swarm_join "token://$TOKEN" - - # Start a manager and ensure it sees all the engines. - swarm_manage "token://$TOKEN" - check_swarm_nodes - - # Add another engine to the cluster and make sure it's picked up by swarm. - start_docker 1 - swarm_join "token://$TOKEN" - retry 10 1 check_swarm_nodes -} diff --git a/test/integration/discovery-zk.bats b/test/integration/discovery-zk.bats deleted file mode 100644 index bfe78a9c84..0000000000 --- a/test/integration/discovery-zk.bats +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env bats - -load helpers - -# Address on which Zookeeper will listen (random port between 7000 and 8000). -ZK_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 7000 )) - -# Container name for integration test -ZK_CONTAINER_NAME=swarm_integration_zk - -function check_zk_started() { - docker_host logs $ZK_CONTAINER_NAME | grep "binding to port 0.0.0.0/0.0.0.0:2181" -} - -function start_zk() { - run docker_host run --name $ZK_CONTAINER_NAME -p $ZK_HOST:2181 -d jplock/zookeeper - [ "$status" -eq 0 ] - - retry 30 1 check_zk_started -} - -function stop_zk() { - run docker_host rm -f -v $ZK_CONTAINER_NAME - [ "$status" -eq 0 ] -} - -function setup() { - start_zk -} - -function teardown() { - swarm_manage_cleanup - swarm_join_cleanup - stop_docker - stop_zk -} - -@test "zookeeper discovery" { - # Start 2 engines and make them join the cluster. - start_docker 2 - swarm_join "zk://${ZK_HOST}/test" - - # Start a manager and ensure it sees all the engines. - swarm_manage "zk://${ZK_HOST}/test" - check_swarm_nodes - - # Add another engine to the cluster and make sure it's picked up by swarm. - start_docker 1 - swarm_join "zk://${ZK_HOST}/test" - retry 30 1 check_swarm_nodes -} diff --git a/test/integration/discovery/consul.bats b/test/integration/discovery/consul.bats new file mode 100644 index 0000000000..8701c8440c --- /dev/null +++ b/test/integration/discovery/consul.bats @@ -0,0 +1,81 @@ +#!/usr/bin/env bats + +load discovery_helpers + +# Address on which the store will listen (random port between 8000 and 9000). +STORE_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 8000 )) + +# Discovery parameter for Swarm +DISCOVERY="consul://${STORE_HOST}/test" + +# Container name for integration test +CONTAINER_NAME=swarm_consul + +function start_store() { + docker_host run --name=$CONTAINER_NAME -h $CONTAINER_NAME -p $STORE_HOST:8500 -d progrium/consul -server -bootstrap-expect 1 -data-dir /test +} + +function stop_store() { + docker_host rm -f -v $CONTAINER_NAME +} + +function teardown() { + swarm_manage_cleanup + swarm_join_cleanup + stop_docker + stop_store +} + +@test "consul discovery: recover engines" { + # The goal of this test is to ensure swarm can see engines that joined + # while the manager was stopped. + + # Start the store + start_store + + # Start 2 engines and make them join the cluster. + start_docker 2 + swarm_join "$DISCOVERY" + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + + # Then, start a manager and ensure it sees all the engines. + swarm_manage "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} + +@test "consul discovery: watch for changes" { + # The goal of this test is to ensure swarm can see new nodes as they join + # the cluster. + start_store + + # Start a manager with no engines. + swarm_manage "$DISCOVERY" + retry 10 1 discovery_check_swarm_info + + # Add engines to the cluster and make sure it's picked up by swarm. + start_docker 2 + swarm_join "$DISCOVERY" + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} + +@test "consul discovery: failure" { + # The goal of this test is to simulate a store failure and ensure discovery + # is resilient to it. + + # At this point, the store is not yet started. + + # Start 2 engines and join the cluster. They should keep retrying + start_docker 2 + swarm_join "$DISCOVERY" + + # Start a manager. It should keep retrying + swarm_manage "$DISCOVERY" + + # Now start the store + start_store + + # After a while, `join` and `manage` should reach the store. + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} diff --git a/test/integration/discovery/discovery_helpers.bash b/test/integration/discovery/discovery_helpers.bash new file mode 100644 index 0000000000..13597a6aa9 --- /dev/null +++ b/test/integration/discovery/discovery_helpers.bash @@ -0,0 +1,17 @@ +#!/bin/bash + +load ../helpers + +# Returns true if all nodes have joined the swarm. +function discovery_check_swarm_info() { + docker_swarm info | grep -q "Nodes: ${#HOSTS[@]}" +} + +# Returns true if all nodes have joined the discovery. +function discovery_check_swarm_list() { + local joined=`swarm list "$1" | wc -l` + local total=${#HOSTS[@]} + + echo "${joined} out of ${total} hosts joined discovery" + [ "$joined" -eq "$total" ] +} diff --git a/test/integration/discovery/etcd.bats b/test/integration/discovery/etcd.bats new file mode 100644 index 0000000000..a49001d996 --- /dev/null +++ b/test/integration/discovery/etcd.bats @@ -0,0 +1,82 @@ +#!/usr/bin/env bats + +load discovery_helpers + +# Address on which the store will listen (random port between 8000 and 9000). +STORE_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 9000 )) + +# Discovery parameter for Swarm +DISCOVERY="etcd://${STORE_HOST}/test" + +# Container name for integration test +CONTAINER_NAME=swarm_etcd + +function start_store() { + docker_host run -p $STORE_HOST:4001 --name=$CONTAINER_NAME -d coreos/etcd +} + +function stop_store() { + docker_host rm -f -v $CONTAINER_NAME +} + +function teardown() { + swarm_manage_cleanup + swarm_join_cleanup + stop_docker + stop_store +} + +@test "etcd discovery: recover engines" { + # The goal of this test is to ensure swarm can see engines that joined + # while the manager was stopped. + + # Start the store + start_store + + docker_host ps -a + # Start 2 engines and make them join the cluster. + start_docker 2 + swarm_join "$DISCOVERY" + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + + # Then, start a manager and ensure it sees all the engines. + swarm_manage "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} + +@test "etcd discovery: watch for changes" { + # The goal of this test is to ensure swarm can see new nodes as they join + # the cluster. + start_store + + # Start a manager with no engines. + swarm_manage "$DISCOVERY" + retry 10 1 discovery_check_swarm_info + + # Add engines to the cluster and make sure it's picked up by swarm. + start_docker 2 + swarm_join "$DISCOVERY" + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} + +@test "etcd discovery: failure" { + # The goal of this test is to simulate a store failure and ensure discovery + # is resilient to it. + + # At this point, the store is not yet started. + + # Start 2 engines and join the cluster. They should keep retrying + start_docker 2 + swarm_join "$DISCOVERY" + + # Start a manager. It should keep retrying + swarm_manage "$DISCOVERY" + + # Now start the store + start_store + + # After a while, `join` and `manage` should reach the store. + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} diff --git a/test/integration/discovery/file.bats b/test/integration/discovery/file.bats new file mode 100644 index 0000000000..5fa5989b99 --- /dev/null +++ b/test/integration/discovery/file.bats @@ -0,0 +1,75 @@ +#!/usr/bin/env bats + +load discovery_helpers + +DISCOVERY_FILE="" +DISCOVERY="" + +function setup() { + # create a blank temp file for discovery + DISCOVERY_FILE=$(mktemp) + DISCOVERY="file://$DISCOVERY_FILE" +} + +function teardown() { + swarm_manage_cleanup + stop_docker + rm -f "$DISCOVERY_FILE" +} + +function setup_discovery_file() { + rm -f "$DISCOVERY_FILE" + for host in ${HOSTS[@]}; do + echo "$host" >> $DISCOVERY_FILE + done +} + +@test "file discovery: recover engines" { + # The goal of this test is to ensure swarm can see engines that joined + # while the manager was stopped. + + # Start 2 engines and register them in the file. + start_docker 2 + setup_discovery_file + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + + # Then, start a manager and ensure it sees all the engines. + swarm_manage "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} + +@test "file discovery: watch for changes" { + # The goal of this test is to ensure swarm can see new nodes as they join + # the cluster. + + # Start a manager with no engines. + swarm_manage "$DISCOVERY" + retry 10 1 discovery_check_swarm_info + + # Add engines to the cluster and make sure it's picked up by swarm. + start_docker 2 + setup_discovery_file + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} + +@test "file discovery: failure" { + # The goal of this test is to simulate a failure (file not available) and ensure discovery + # is resilient to it. + + # Wipe out the discovery file. + rm -f "$DISCOVERY_FILE" + + # Start 2 engines. + start_docker 2 + + # Start a manager. It should keep retrying + swarm_manage "$DISCOVERY" + + # Now create the discovery file. + setup_discovery_file + + # After a while, `join` and `manage` should see the file. + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} diff --git a/test/integration/discovery/token.bats b/test/integration/discovery/token.bats new file mode 100644 index 0000000000..894fa3ec76 --- /dev/null +++ b/test/integration/discovery/token.bats @@ -0,0 +1,54 @@ +#!/usr/bin/env bats + +load discovery_helpers + +TOKEN="" +DISCOVERY="" + +function token_cleanup() { + [ -z "$TOKEN" ] && return + echo "Removing $TOKEN" + curl -X DELETE "https://discovery-stage.hub.docker.com/v1/clusters/$TOKEN" +} + +function setup() { + TOKEN=$(swarm create) + [[ "$TOKEN" =~ ^[0-9a-f]{32}$ ]] + DISCOVERY="token://$TOKEN" +} + +function teardown() { + swarm_manage_cleanup + swarm_join_cleanup + stop_docker + token_cleanup +} + +@test "token discovery: recover engines" { + # The goal of this test is to ensure swarm can see engines that joined + # while the manager was stopped. + + # Start 2 engines and make them join the cluster. + start_docker 2 + swarm_join "$DISCOVERY" + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + + # Then, start a manager and ensure it sees all the engines. + swarm_manage "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} + +@test "token discovery: watch for changes" { + # The goal of this test is to ensure swarm can see new nodes as they join + # the cluster. + + # Start a manager with no engines. + swarm_manage "$DISCOVERY" + retry 10 1 discovery_check_swarm_info + + # Add engines to the cluster and make sure it's picked up by swarm. + start_docker 2 + swarm_join "$DISCOVERY" + retry 5 1 discovery_check_swarm_list "$DISCOVERY" + retry 5 1 discovery_check_swarm_info +} diff --git a/test/integration/discovery/zk.bats b/test/integration/discovery/zk.bats new file mode 100644 index 0000000000..0ab9d0bbca --- /dev/null +++ b/test/integration/discovery/zk.bats @@ -0,0 +1,81 @@ +#!/usr/bin/env bats + +load discovery_helpers + +# Address on which the store will listen (random port between 8000 and 9000). +STORE_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 7000 )) + +# Discovery parameter for Swarm +DISCOVERY="zk://${STORE_HOST}/test" + +# Container name for integration test +CONTAINER_NAME=swarm_integration_zk + +function start_store() { + docker_host run --name $CONTAINER_NAME -p $STORE_HOST:2181 -d jplock/zookeeper +} + +function stop_store() { + docker_host rm -f -v $CONTAINER_NAME +} + +function teardown() { + swarm_manage_cleanup + swarm_join_cleanup + stop_docker + stop_store +} + +@test "zk discovery: recover engines" { + # The goal of this test is to ensure swarm can see engines that joined + # while the manager was stopped. + + # Start the store + start_store + + # Start 2 engines and make them join the cluster. + start_docker 2 + swarm_join "$DISCOVERY" + retry 10 1 discovery_check_swarm_list "$DISCOVERY" + + # Then, start a manager and ensure it sees all the engines. + swarm_manage "$DISCOVERY" + retry 10 1 discovery_check_swarm_info +} + +@test "zk discovery: watch for changes" { + # The goal of this test is to ensure swarm can see new nodes as they join + # the cluster. + start_store + + # Start a manager with no engines. + swarm_manage "$DISCOVERY" + retry 10 1 discovery_check_swarm_info + + # Add engines to the cluster and make sure it's picked up by swarm. + start_docker 2 + swarm_join "$DISCOVERY" + retry 10 1 discovery_check_swarm_list "$DISCOVERY" + retry 10 1 discovery_check_swarm_info +} + +@test "zk discovery: failure" { + # The goal of this test is to simulate a store failure and ensure discovery + # is resilient to it. + + # At this point, the store is not yet started. + + # Start 2 engines and join the cluster. They should keep retrying + start_docker 2 + swarm_join "$DISCOVERY" + + # Start a manager. It should keep retrying + swarm_manage "$DISCOVERY" + + # Now start the store + start_store + + # After a while, `join` and `manage` should reach the store. + retry 10 1 discovery_check_swarm_list "$DISCOVERY" + retry 10 1 discovery_check_swarm_info +} diff --git a/test/integration/helpers.bash b/test/integration/helpers.bash index d183072570..231b900b5f 100644 --- a/test/integration/helpers.bash +++ b/test/integration/helpers.bash @@ -85,11 +85,6 @@ function wait_until_reachable() { retry 10 1 docker -H $1 info } -# Returns true if all nodes have joined the swarm. -function check_swarm_nodes() { - docker_swarm info | grep -q "Nodes: ${#HOSTS[@]}" -} - # Start the swarm manager in background. function swarm_manage() { local discovery @@ -99,10 +94,9 @@ function swarm_manage() { discovery="$@" fi - "$SWARM_BINARY" manage -H "$SWARM_HOST" --cluster-opt "swarm.discovery.heartbeat=1" "$discovery" & + "$SWARM_BINARY" -l debug manage -H "$SWARM_HOST" --cluster-opt "swarm.discovery.heartbeat=1s" "$discovery" & SWARM_PID=$! wait_until_reachable "$SWARM_HOST" - retry 10 1 check_swarm_nodes } # swarm join every engine created with `start_docker`. @@ -120,23 +114,12 @@ function swarm_join() { # Start the engines. local i - echo "current: $current | nodes: $nodes" > log for ((i=current; i < nodes; i++)); do local h="${HOSTS[$i]}" echo "Swarm join #${i}: $h $addr" - "$SWARM_BINARY" join --addr="$h" "$addr" & + "$SWARM_BINARY" -l debug join --heartbeat=1s --addr="$h" "$addr" & SWARM_JOIN_PID[$i]=$! done - retry 10 0.5 check_discovery_nodes "$addr" -} - -# Returns true if all nodes have joined the discovery. -function check_discovery_nodes() { - local joined=`swarm list "$1" | wc -l` - local total=${#HOSTS[@]} - - echo "${joined} out of ${total} hosts joined discovery" - [ "$joined" -eq "$total" ] } # Stops the manager. diff --git a/test/integration/test_runner.sh b/test/integration/test_runner.sh index 07396d172e..65a2c7b316 100755 --- a/test/integration/test_runner.sh +++ b/test/integration/test_runner.sh @@ -12,7 +12,7 @@ function execute() { } # Tests to run. Defaults to all. -TESTS=${@:-. api} +TESTS=${@:-. discovery api} # Generate a temporary binary for the tests. export SWARM_BINARY=`mktemp`