diff --git a/cli/join.go b/cli/join.go index e302e15f73..df8c64d245 100644 --- a/cli/join.go +++ b/cli/join.go @@ -7,7 +7,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" - "github.com/docker/swarm/discovery" + "github.com/docker/docker/pkg/discovery" ) func checkAddrFormat(addr string) bool { diff --git a/cli/list.go b/cli/list.go index 24e9db5467..8e9bc23a12 100644 --- a/cli/list.go +++ b/cli/list.go @@ -6,7 +6,7 @@ import ( "time" "github.com/codegangsta/cli" - "github.com/docker/swarm/discovery" + "github.com/docker/docker/pkg/discovery" ) func list(c *cli.Context) { diff --git a/cli/manage.go b/cli/manage.go index 550baa07db..37a2566f56 100644 --- a/cli/manage.go +++ b/cli/manage.go @@ -11,12 +11,12 @@ import ( log "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" + "github.com/docker/docker/pkg/discovery" + kvdiscovery "github.com/docker/docker/pkg/discovery/kv" "github.com/docker/swarm/api" "github.com/docker/swarm/cluster" "github.com/docker/swarm/cluster/mesos" "github.com/docker/swarm/cluster/swarm" - "github.com/docker/swarm/discovery" - kvdiscovery "github.com/docker/swarm/discovery/kv" "github.com/docker/swarm/leadership" "github.com/docker/swarm/scheduler" "github.com/docker/swarm/scheduler/filter" @@ -98,7 +98,7 @@ func loadTLSConfig(ca, cert, key string, verify bool) (*tls.Config, error) { } // Initialize the discovery service. -func createDiscovery(uri string, c *cli.Context, discoveryOpt []string) discovery.Discovery { +func createDiscovery(uri string, c *cli.Context, discoveryOpt []string) discovery.Backend { hb, err := time.ParseDuration(c.String("heartbeat")) if err != nil { log.Fatalf("invalid --heartbeat: %v", err) @@ -129,7 +129,7 @@ func getDiscoveryOpt(c *cli.Context) map[string]string { return options } -func setupReplication(c *cli.Context, cluster cluster.Cluster, server *api.Server, discovery discovery.Discovery, addr string, leaderTTL time.Duration, tlsConfig *tls.Config) { +func setupReplication(c *cli.Context, cluster cluster.Cluster, server *api.Server, discovery discovery.Backend, addr string, leaderTTL time.Duration, tlsConfig *tls.Config) { kvDiscovery, ok := discovery.(*kvdiscovery.Discovery) if !ok { log.Fatal("Leader election is only supported with consul, etcd and zookeeper discovery.") diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index 969d28f497..96985b7807 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -12,10 +12,10 @@ import ( "time" log "github.com/Sirupsen/logrus" + "github.com/docker/docker/pkg/discovery" "github.com/docker/docker/pkg/stringid" "github.com/docker/go-units" "github.com/docker/swarm/cluster" - "github.com/docker/swarm/discovery" "github.com/docker/swarm/scheduler" "github.com/docker/swarm/scheduler/node" "github.com/samalba/dockerclient" @@ -54,7 +54,7 @@ type Cluster struct { engines map[string]*cluster.Engine pendingEngines map[string]*cluster.Engine scheduler *scheduler.Scheduler - discovery discovery.Discovery + discovery discovery.Backend pendingContainers map[string]*pendingContainer overcommitRatio float64 @@ -63,7 +63,7 @@ type Cluster struct { } // NewCluster is exported -func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery discovery.Discovery, options cluster.DriverOpts, engineOptions *cluster.EngineOpts) (cluster.Cluster, error) { +func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery discovery.Backend, options cluster.DriverOpts, engineOptions *cluster.EngineOpts) (cluster.Cluster, error) { log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster") cluster := &Cluster{ diff --git a/discovery/discovery.go b/discovery/discovery.go deleted file mode 100644 index 1888d1a0f4..0000000000 --- a/discovery/discovery.go +++ /dev/null @@ -1,166 +0,0 @@ -package discovery - -import ( - "errors" - "fmt" - "net" - "strings" - "time" - - log "github.com/Sirupsen/logrus" -) - -// An Entry represents a swarm host. -type Entry struct { - Host string - Port string -} - -// NewEntry creates a new entry. -func NewEntry(url string) (*Entry, error) { - host, port, err := net.SplitHostPort(url) - if err != nil { - return nil, err - } - return &Entry{host, port}, nil -} - -// String returns the string form of an entry. -func (e *Entry) String() string { - return fmt.Sprintf("%s:%s", e.Host, e.Port) -} - -// Equals returns true if cmp contains the same data. -func (e *Entry) Equals(cmp *Entry) bool { - return e.Host == cmp.Host && e.Port == cmp.Port -} - -// Entries is a list of *Entry with some helpers. -type Entries []*Entry - -// Equals returns true if cmp contains the same data. -func (e Entries) Equals(cmp Entries) bool { - // Check if the file has really changed. - if len(e) != len(cmp) { - return false - } - for i := range e { - if !e[i].Equals(cmp[i]) { - return false - } - } - return true -} - -// Contains returns true if the Entries contain a given Entry. -func (e Entries) Contains(entry *Entry) bool { - for _, curr := range e { - if curr.Equals(entry) { - return true - } - } - return false -} - -// Diff compares two entries and returns the added and removed entries. -func (e Entries) Diff(cmp Entries) (Entries, Entries) { - added := Entries{} - for _, entry := range cmp { - if !e.Contains(entry) { - added = append(added, entry) - } - } - - removed := Entries{} - for _, entry := range e { - if !cmp.Contains(entry) { - removed = append(removed, entry) - } - } - - return added, removed -} - -// The Discovery interface is implemented by Discovery backends which -// manage swarm host entries. -type Discovery interface { - // Initialize the discovery with URIs, a heartbeat, a ttl and optional settings. - Initialize(string, time.Duration, time.Duration, map[string]string) error - - // Watch the discovery for entry changes. - // Returns a channel that will receive changes or an error. - // Providing a non-nil stopCh can be used to stop watching. - Watch(stopCh <-chan struct{}) (<-chan Entries, <-chan error) - - // Register to the discovery - Register(string) error -} - -var ( - discoveries map[string]Discovery - // ErrNotSupported is returned when a discovery service is not supported. - ErrNotSupported = errors.New("discovery service not supported") - // ErrNotImplemented is returned when discovery feature is not implemented - // by discovery backend. - ErrNotImplemented = errors.New("not implemented in this discovery service") -) - -func init() { - discoveries = make(map[string]Discovery) -} - -// Register makes a discovery backend available by the provided scheme. -// If Register is called twice with the same scheme an error is returned. -func Register(scheme string, d Discovery) error { - if _, exists := discoveries[scheme]; exists { - return fmt.Errorf("scheme already registered %s", scheme) - } - log.WithField("name", scheme).Debug("Registering discovery service") - discoveries[scheme] = d - - return nil -} - -func parse(rawurl string) (string, string) { - parts := strings.SplitN(rawurl, "://", 2) - - // nodes:port,node2:port => nodes://node1:port,node2:port - if len(parts) == 1 { - return "nodes", parts[0] - } - return parts[0], parts[1] -} - -// New returns a new Discovery given a URL, heartbeat and ttl settings. -// Returns an error if the URL scheme is not supported. -func New(rawurl string, heartbeat time.Duration, ttl time.Duration, discoveryOpt map[string]string) (Discovery, error) { - scheme, uri := parse(rawurl) - - if discovery, exists := discoveries[scheme]; exists { - log.WithFields(log.Fields{"name": scheme, "uri": uri}).Debug("Initializing discovery service") - err := discovery.Initialize(uri, heartbeat, ttl, discoveryOpt) - return discovery, err - } - - return nil, ErrNotSupported -} - -// CreateEntries returns an array of entries based on the given addresses. -func CreateEntries(addrs []string) (Entries, error) { - entries := Entries{} - if addrs == nil { - return entries, nil - } - - for _, addr := range addrs { - if len(addr) == 0 { - continue - } - entry, err := NewEntry(addr) - if err != nil { - return nil, err - } - entries = append(entries, entry) - } - return entries, nil -} diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go deleted file mode 100644 index b7128ff258..0000000000 --- a/discovery/discovery_test.go +++ /dev/null @@ -1,120 +0,0 @@ -package discovery - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestNewEntry(t *testing.T) { - entry, err := NewEntry("127.0.0.1:2375") - assert.NoError(t, err) - assert.True(t, entry.Equals(&Entry{Host: "127.0.0.1", Port: "2375"})) - assert.Equal(t, entry.String(), "127.0.0.1:2375") - - _, err = NewEntry("127.0.0.1") - assert.Error(t, err) -} - -func TestParse(t *testing.T) { - scheme, uri := parse("127.0.0.1:2375") - assert.Equal(t, scheme, "nodes") - assert.Equal(t, uri, "127.0.0.1:2375") - - scheme, uri = parse("localhost:2375") - assert.Equal(t, scheme, "nodes") - assert.Equal(t, uri, "localhost:2375") - - scheme, uri = parse("scheme://127.0.0.1:2375") - assert.Equal(t, scheme, "scheme") - assert.Equal(t, uri, "127.0.0.1:2375") - - scheme, uri = parse("scheme://localhost:2375") - assert.Equal(t, scheme, "scheme") - assert.Equal(t, uri, "localhost:2375") - - scheme, uri = parse("") - assert.Equal(t, scheme, "nodes") - assert.Equal(t, uri, "") -} - -func TestCreateEntries(t *testing.T) { - entries, err := CreateEntries(nil) - assert.Equal(t, entries, Entries{}) - assert.NoError(t, err) - - entries, err = CreateEntries([]string{"127.0.0.1:2375", "127.0.0.2:2375", ""}) - assert.NoError(t, err) - expected := Entries{ - &Entry{Host: "127.0.0.1", Port: "2375"}, - &Entry{Host: "127.0.0.2", Port: "2375"}, - } - assert.True(t, entries.Equals(expected)) - - _, err = CreateEntries([]string{"127.0.0.1", "127.0.0.2"}) - assert.Error(t, err) -} - -func TestContainsEntry(t *testing.T) { - entries, err := CreateEntries([]string{"127.0.0.1:2375", "127.0.0.2:2375", ""}) - assert.NoError(t, err) - assert.True(t, entries.Contains(&Entry{Host: "127.0.0.1", Port: "2375"})) - assert.False(t, entries.Contains(&Entry{Host: "127.0.0.3", Port: "2375"})) -} - -func TestEntriesEquality(t *testing.T) { - entries := Entries{ - &Entry{Host: "127.0.0.1", Port: "2375"}, - &Entry{Host: "127.0.0.2", Port: "2375"}, - } - - // Same - assert.True(t, entries.Equals(Entries{ - &Entry{Host: "127.0.0.1", Port: "2375"}, - &Entry{Host: "127.0.0.2", Port: "2375"}, - })) - - // Different size - assert.False(t, entries.Equals(Entries{ - &Entry{Host: "127.0.0.1", Port: "2375"}, - &Entry{Host: "127.0.0.2", Port: "2375"}, - &Entry{Host: "127.0.0.3", Port: "2375"}, - })) - - // Different content - assert.False(t, entries.Equals(Entries{ - &Entry{Host: "127.0.0.1", Port: "2375"}, - &Entry{Host: "127.0.0.42", Port: "2375"}, - })) -} - -func TestEntriesDiff(t *testing.T) { - entry1 := &Entry{Host: "1.1.1.1", Port: "1111"} - entry2 := &Entry{Host: "2.2.2.2", Port: "2222"} - entry3 := &Entry{Host: "3.3.3.3", Port: "3333"} - entries := Entries{entry1, entry2} - - // No diff - added, removed := entries.Diff(Entries{entry2, entry1}) - assert.Empty(t, added) - assert.Empty(t, removed) - - // Add - added, removed = entries.Diff(Entries{entry2, entry3, entry1}) - assert.Len(t, added, 1) - assert.True(t, added.Contains(entry3)) - assert.Empty(t, removed) - - // Remove - added, removed = entries.Diff(Entries{entry2}) - assert.Empty(t, added) - assert.Len(t, removed, 1) - assert.True(t, removed.Contains(entry1)) - - // Add and remove - added, removed = entries.Diff(Entries{entry1, entry3}) - assert.Len(t, added, 1) - assert.True(t, added.Contains(entry3)) - assert.Len(t, removed, 1) - assert.True(t, removed.Contains(entry2)) -} diff --git a/discovery/file/file.go b/discovery/file/file.go deleted file mode 100644 index 1ec72ad398..0000000000 --- a/discovery/file/file.go +++ /dev/null @@ -1,109 +0,0 @@ -package file - -import ( - "fmt" - "io/ioutil" - "strings" - "time" - - "github.com/docker/swarm/discovery" -) - -// Discovery is exported -type Discovery struct { - heartbeat time.Duration - path string -} - -func init() { - Init() -} - -// Init is exported -func Init() { - discovery.Register("file", &Discovery{}) -} - -// Initialize is exported -func (s *Discovery) Initialize(path string, heartbeat time.Duration, ttl time.Duration, _ map[string]string) error { - s.path = path - s.heartbeat = heartbeat - return nil -} - -func parseFileContent(content []byte) []string { - var result []string - for _, line := range strings.Split(strings.TrimSpace(string(content)), "\n") { - line = strings.TrimSpace(line) - // Ignoring line starts with # - if strings.HasPrefix(line, "#") { - continue - } - // Inlined # comment also ignored. - if strings.Contains(line, "#") { - line = line[0:strings.Index(line, "#")] - // Trim additional spaces caused by above stripping. - line = strings.TrimSpace(line) - } - for _, ip := range discovery.Generate(line) { - result = append(result, ip) - } - } - return result -} - -func (s *Discovery) fetch() (discovery.Entries, error) { - fileContent, err := ioutil.ReadFile(s.path) - if err != nil { - return nil, fmt.Errorf("failed to read '%s': %v", s.path, err) - } - return discovery.CreateEntries(parseFileContent(fileContent)) -} - -// Watch is exported -func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { - ch := make(chan discovery.Entries) - errCh := make(chan error) - ticker := time.NewTicker(s.heartbeat) - - go func() { - defer close(errCh) - defer close(ch) - - // Send the initial entries if available. - currentEntries, err := s.fetch() - if err != nil { - errCh <- err - } else { - ch <- currentEntries - } - - // Periodically send updates. - for { - select { - case <-ticker.C: - newEntries, err := s.fetch() - if err != nil { - errCh <- err - continue - } - - // Check if the file has really changed. - if !newEntries.Equals(currentEntries) { - ch <- newEntries - } - currentEntries = newEntries - case <-stopCh: - ticker.Stop() - return - } - } - }() - - return ch, errCh -} - -// Register is exported -func (s *Discovery) Register(addr string) error { - return discovery.ErrNotImplemented -} diff --git a/discovery/file/file_test.go b/discovery/file/file_test.go deleted file mode 100644 index 05f248fb87..0000000000 --- a/discovery/file/file_test.go +++ /dev/null @@ -1,106 +0,0 @@ -package file - -import ( - "io/ioutil" - "os" - "testing" - - "github.com/docker/swarm/discovery" - "github.com/stretchr/testify/assert" -) - -func TestInitialize(t *testing.T) { - d := &Discovery{} - d.Initialize("/path/to/file", 1000, 0, nil) - assert.Equal(t, d.path, "/path/to/file") -} - -func TestNew(t *testing.T) { - d, err := discovery.New("file:///path/to/file", 0, 0, nil) - assert.NoError(t, err) - assert.Equal(t, d.(*Discovery).path, "/path/to/file") -} - -func TestContent(t *testing.T) { - data := ` -1.1.1.[1:2]:1111 -2.2.2.[2:4]:2222 -` - ips := parseFileContent([]byte(data)) - assert.Len(t, ips, 5) - assert.Equal(t, ips[0], "1.1.1.1:1111") - assert.Equal(t, ips[1], "1.1.1.2:1111") - assert.Equal(t, ips[2], "2.2.2.2:2222") - assert.Equal(t, ips[3], "2.2.2.3:2222") - assert.Equal(t, ips[4], "2.2.2.4:2222") -} - -func TestRegister(t *testing.T) { - discovery := &Discovery{path: "/path/to/file"} - assert.Error(t, discovery.Register("0.0.0.0")) -} - -func TestParsingContentsWithComments(t *testing.T) { - data := ` -### test ### -1.1.1.1:1111 # inline comment -# 2.2.2.2:2222 - ### empty line with comment - 3.3.3.3:3333 -### test ### -` - ips := parseFileContent([]byte(data)) - assert.Len(t, ips, 2) - assert.Equal(t, "1.1.1.1:1111", ips[0]) - assert.Equal(t, "3.3.3.3:3333", ips[1]) -} - -func TestWatch(t *testing.T) { - data := ` -1.1.1.1:1111 -2.2.2.2:2222 -` - expected := discovery.Entries{ - &discovery.Entry{Host: "1.1.1.1", Port: "1111"}, - &discovery.Entry{Host: "2.2.2.2", Port: "2222"}, - } - - // Create a temporary file and remove it. - tmp, err := ioutil.TempFile(os.TempDir(), "discovery-file-test") - assert.NoError(t, err) - assert.NoError(t, tmp.Close()) - assert.NoError(t, os.Remove(tmp.Name())) - - // Set up file discovery. - d := &Discovery{} - d.Initialize(tmp.Name(), 1000, 0, nil) - stopCh := make(chan struct{}) - ch, errCh := d.Watch(stopCh) - - // Make sure it fires errors since the file doesn't exist. - assert.Error(t, <-errCh) - // We have to drain the error channel otherwise Watch will get stuck. - go func() { - for range errCh { - } - }() - - // Write the file and make sure we get the expected value back. - assert.NoError(t, ioutil.WriteFile(tmp.Name(), []byte(data), 0600)) - assert.Equal(t, expected, <-ch) - - // Add a new entry and look it up. - expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"}) - f, err := os.OpenFile(tmp.Name(), os.O_APPEND|os.O_WRONLY, 0600) - assert.NoError(t, err) - assert.NotNil(t, f) - _, err = f.WriteString("\n3.3.3.3:3333\n") - assert.NoError(t, err) - f.Close() - assert.Equal(t, expected, <-ch) - - // Stop and make sure it closes all channels. - close(stopCh) - assert.Nil(t, <-ch) - assert.Nil(t, <-errCh) -} diff --git a/discovery/generator.go b/discovery/generator.go deleted file mode 100644 index d22298298f..0000000000 --- a/discovery/generator.go +++ /dev/null @@ -1,35 +0,0 @@ -package discovery - -import ( - "fmt" - "regexp" - "strconv" -) - -// Generate takes care of IP generation -func Generate(pattern string) []string { - re, _ := regexp.Compile(`\[(.+):(.+)\]`) - submatch := re.FindStringSubmatch(pattern) - if submatch == nil { - return []string{pattern} - } - - from, err := strconv.Atoi(submatch[1]) - if err != nil { - return []string{pattern} - } - to, err := strconv.Atoi(submatch[2]) - if err != nil { - return []string{pattern} - } - - template := re.ReplaceAllString(pattern, "%d") - - var result []string - for val := from; val <= to; val++ { - entry := fmt.Sprintf(template, val) - result = append(result, entry) - } - - return result -} diff --git a/discovery/generator_test.go b/discovery/generator_test.go deleted file mode 100644 index 747334452f..0000000000 --- a/discovery/generator_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package discovery - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestGeneratorNotGenerate(t *testing.T) { - ips := Generate("127.0.0.1") - assert.Equal(t, len(ips), 1) - assert.Equal(t, ips[0], "127.0.0.1") -} - -func TestGeneratorWithPortNotGenerate(t *testing.T) { - ips := Generate("127.0.0.1:8080") - assert.Equal(t, len(ips), 1) - assert.Equal(t, ips[0], "127.0.0.1:8080") -} - -func TestGeneratorMatchFailedNotGenerate(t *testing.T) { - ips := Generate("127.0.0.[1]") - assert.Equal(t, len(ips), 1) - assert.Equal(t, ips[0], "127.0.0.[1]") -} - -func TestGeneratorWithPort(t *testing.T) { - ips := Generate("127.0.0.[1:11]:2375") - assert.Equal(t, len(ips), 11) - assert.Equal(t, ips[0], "127.0.0.1:2375") - assert.Equal(t, ips[1], "127.0.0.2:2375") - assert.Equal(t, ips[2], "127.0.0.3:2375") - assert.Equal(t, ips[3], "127.0.0.4:2375") - assert.Equal(t, ips[4], "127.0.0.5:2375") - assert.Equal(t, ips[5], "127.0.0.6:2375") - assert.Equal(t, ips[6], "127.0.0.7:2375") - assert.Equal(t, ips[7], "127.0.0.8:2375") - assert.Equal(t, ips[8], "127.0.0.9:2375") - assert.Equal(t, ips[9], "127.0.0.10:2375") - assert.Equal(t, ips[10], "127.0.0.11:2375") -} - -func TestGenerateWithMalformedInputAtRangeStart(t *testing.T) { - malformedInput := "127.0.0.[x:11]:2375" - ips := Generate(malformedInput) - assert.Equal(t, len(ips), 1) - assert.Equal(t, ips[0], malformedInput) -} - -func TestGenerateWithMalformedInputAtRangeEnd(t *testing.T) { - malformedInput := "127.0.0.[1:x]:2375" - ips := Generate(malformedInput) - assert.Equal(t, len(ips), 1) - assert.Equal(t, ips[0], malformedInput) -} diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go deleted file mode 100644 index fb578e9957..0000000000 --- a/discovery/kv/kv.go +++ /dev/null @@ -1,193 +0,0 @@ -package kv - -import ( - "fmt" - "path" - "strings" - "time" - - log "github.com/Sirupsen/logrus" - "github.com/docker/docker/pkg/tlsconfig" - "github.com/docker/libkv" - "github.com/docker/libkv/store" - "github.com/docker/libkv/store/consul" - "github.com/docker/libkv/store/etcd" - "github.com/docker/libkv/store/zookeeper" - "github.com/docker/swarm/discovery" -) - -const ( - defaultDiscoveryPath = "docker/swarm/nodes" -) - -// Discovery is exported -type Discovery struct { - backend store.Backend - store store.Store - heartbeat time.Duration - ttl time.Duration - prefix string - path string -} - -func init() { - Init() -} - -// Init is exported -func Init() { - // Register to libkv - zookeeper.Register() - consul.Register() - etcd.Register() - - // Register to internal Swarm discovery service - discovery.Register("zk", &Discovery{backend: store.ZK}) - discovery.Register("consul", &Discovery{backend: store.CONSUL}) - discovery.Register("etcd", &Discovery{backend: store.ETCD}) -} - -// Initialize is exported -func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Duration, discoveryOpt map[string]string) error { - var ( - parts = strings.SplitN(uris, "/", 2) - addrs = strings.Split(parts[0], ",") - err error - ) - - // A custom prefix to the path can be optionally used. - if len(parts) == 2 { - s.prefix = parts[1] - } - - s.heartbeat = heartbeat - s.ttl = ttl - - // Use a custom path if specified in discovery options - dpath := defaultDiscoveryPath - if discoveryOpt["kv.path"] != "" { - dpath = discoveryOpt["kv.path"] - } - - s.path = path.Join(s.prefix, dpath) - - var config *store.Config - if discoveryOpt["kv.cacertfile"] != "" && discoveryOpt["kv.certfile"] != "" && discoveryOpt["kv.keyfile"] != "" { - log.Debug("Initializing discovery with TLS") - tlsConfig, err := tlsconfig.Client(tlsconfig.Options{ - CAFile: discoveryOpt["kv.cacertfile"], - CertFile: discoveryOpt["kv.certfile"], - KeyFile: discoveryOpt["kv.keyfile"], - }) - if err != nil { - return err - } - config = &store.Config{ - // Set ClientTLS to trigger https (bug in libkv/etcd) - ClientTLS: &store.ClientTLSConfig{ - CACertFile: discoveryOpt["kv.cacertfile"], - CertFile: discoveryOpt["kv.certfile"], - KeyFile: discoveryOpt["kv.keyfile"], - }, - // The actual TLS config that will be used - TLS: tlsConfig, - } - } else { - log.Debug("Initializing discovery without TLS") - } - - // Creates a new store, will ignore options given - // if not supported by the chosen store - s.store, err = libkv.NewStore(s.backend, addrs, config) - return err -} - -// Watch the store until either there's a store error or we receive a stop request. -// Returns false if we shouldn't attempt watching the store anymore (stop request received). -func (s *Discovery) watchOnce(stopCh <-chan struct{}, watchCh <-chan []*store.KVPair, discoveryCh chan discovery.Entries, errCh chan error) bool { - for { - select { - case pairs := <-watchCh: - if pairs == nil { - return true - } - - log.WithField("discovery", s.backend).Debugf("Watch triggered with %d nodes", len(pairs)) - - // Convert `KVPair` into `discovery.Entry`. - addrs := make([]string, len(pairs)) - for _, pair := range pairs { - addrs = append(addrs, string(pair.Value)) - } - - entries, err := discovery.CreateEntries(addrs) - if err != nil { - errCh <- err - } else { - discoveryCh <- entries - } - case <-stopCh: - // We were requested to stop watching. - return false - } - } -} - -// Watch is exported -func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { - ch := make(chan discovery.Entries) - errCh := make(chan error) - - go func() { - defer close(ch) - defer close(errCh) - - // Forever: Create a store watch, watch until we get an error and then try again. - // Will only stop if we receive a stopCh request. - for { - // Create the path to watch if it does not exist yet - exists, err := s.store.Exists(s.path) - if err != nil { - errCh <- err - } - if !exists { - if err := s.store.Put(s.path, []byte(""), &store.WriteOptions{IsDir: true}); err != nil { - errCh <- err - } - } - - // Set up a watch. - watchCh, err := s.store.WatchTree(s.path, stopCh) - if err != nil { - errCh <- err - } else { - if !s.watchOnce(stopCh, watchCh, ch, errCh) { - return - } - } - - // If we get here it means the store watch channel was closed. This - // is unexpected so let's retry later. - errCh <- fmt.Errorf("Unexpected watch error") - time.Sleep(s.heartbeat) - } - }() - - return ch, errCh -} - -// Register is exported -func (s *Discovery) Register(addr string) error { - opts := &store.WriteOptions{TTL: s.ttl} - return s.store.Put(path.Join(s.path, addr), []byte(addr), opts) -} - -// Store returns the underlying store used by KV discovery. -func (s *Discovery) Store() store.Store { - return s.store -} - -// Prefix returns the store prefix -func (s *Discovery) Prefix() string { - return s.prefix -} diff --git a/discovery/kv/kv_test.go b/discovery/kv/kv_test.go deleted file mode 100644 index 045559c276..0000000000 --- a/discovery/kv/kv_test.go +++ /dev/null @@ -1,199 +0,0 @@ -package kv - -import ( - "errors" - "io/ioutil" - "os" - "path" - "testing" - "time" - - "github.com/docker/libkv" - "github.com/docker/libkv/store" - libkvmock "github.com/docker/libkv/store/mock" - "github.com/docker/swarm/discovery" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" -) - -func TestInitialize(t *testing.T) { - storeMock, err := libkvmock.New([]string{"127.0.0.1"}, nil) - assert.NotNil(t, storeMock) - assert.NoError(t, err) - - d := &Discovery{backend: store.CONSUL} - d.Initialize("127.0.0.1", 0, 0, nil) - d.store = storeMock - - s := d.store.(*libkvmock.Mock) - assert.Len(t, s.Endpoints, 1) - assert.Equal(t, s.Endpoints[0], "127.0.0.1") - assert.Equal(t, d.path, defaultDiscoveryPath) - - storeMock, err = libkvmock.New([]string{"127.0.0.1:1234"}, nil) - assert.NotNil(t, storeMock) - assert.NoError(t, err) - - d = &Discovery{backend: store.CONSUL} - d.Initialize("127.0.0.1:1234/path", 0, 0, nil) - d.store = storeMock - - s = d.store.(*libkvmock.Mock) - assert.Len(t, s.Endpoints, 1) - assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234") - assert.Equal(t, d.path, "path/"+defaultDiscoveryPath) - - storeMock, err = libkvmock.New([]string{"127.0.0.1:1234", "127.0.0.2:1234", "127.0.0.3:1234"}, nil) - assert.NotNil(t, storeMock) - assert.NoError(t, err) - - d = &Discovery{backend: store.CONSUL} - d.Initialize("127.0.0.1:1234,127.0.0.2:1234,127.0.0.3:1234/path", 0, 0, nil) - d.store = storeMock - - s = d.store.(*libkvmock.Mock) - if assert.Len(t, s.Endpoints, 3) { - assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234") - assert.Equal(t, s.Endpoints[1], "127.0.0.2:1234") - assert.Equal(t, s.Endpoints[2], "127.0.0.3:1234") - } - assert.Equal(t, d.path, "path/"+defaultDiscoveryPath) -} - -func TestInitializeWithCerts(t *testing.T) { - cert := `-----BEGIN CERTIFICATE----- -MIIDCDCCAfKgAwIBAgIICifG7YeiQOEwCwYJKoZIhvcNAQELMBIxEDAOBgNVBAMT -B1Rlc3QgQ0EwHhcNMTUxMDAxMjMwMDAwWhcNMjAwOTI5MjMwMDAwWjASMRAwDgYD -VQQDEwdUZXN0IENBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1wRC -O+flnLTK5ImjTurNRHwSejuqGbc4CAvpB0hS+z0QlSs4+zE9h80aC4hz+6caRpds -+J908Q+RvAittMHbpc7VjbZP72G6fiXk7yPPl6C10HhRSoSi3nY+B7F2E8cuz14q -V2e+ejhWhSrBb/keyXpcyjoW1BOAAJ2TIclRRkICSCZrpXUyXxAvzXfpFXo1RhSb -UywN11pfiCQzDUN7sPww9UzFHuAHZHoyfTr27XnJYVUerVYrCPq8vqfn//01qz55 -Xs0hvzGdlTFXhuabFtQnKFH5SNwo/fcznhB7rePOwHojxOpXTBepUCIJLbtNnWFT -V44t9gh5IqIWtoBReQIDAQABo2YwZDAOBgNVHQ8BAf8EBAMCAAYwEgYDVR0TAQH/ -BAgwBgEB/wIBAjAdBgNVHQ4EFgQUZKUI8IIjIww7X/6hvwggQK4bD24wHwYDVR0j -BBgwFoAUZKUI8IIjIww7X/6hvwggQK4bD24wCwYJKoZIhvcNAQELA4IBAQDES2cz -7sCQfDCxCIWH7X8kpi/JWExzUyQEJ0rBzN1m3/x8ySRxtXyGekimBqQwQdFqlwMI -xzAQKkh3ue8tNSzRbwqMSyH14N1KrSxYS9e9szJHfUasoTpQGPmDmGIoRJuq1h6M -ej5x1SCJ7GWCR6xEXKUIE9OftXm9TdFzWa7Ja3OHz/mXteii8VXDuZ5ACq6EE5bY -8sP4gcICfJ5fTrpTlk9FIqEWWQrCGa5wk95PGEj+GJpNogjXQ97wVoo/Y3p1brEn -t5zjN9PAq4H1fuCMdNNA+p1DHNwd+ELTxcMAnb2ajwHvV6lKPXutrTFc4umJToBX -FpTxDmJHEV4bzUzh ------END CERTIFICATE----- -` - key := `-----BEGIN RSA PRIVATE KEY----- -MIIEpQIBAAKCAQEA1wRCO+flnLTK5ImjTurNRHwSejuqGbc4CAvpB0hS+z0QlSs4 -+zE9h80aC4hz+6caRpds+J908Q+RvAittMHbpc7VjbZP72G6fiXk7yPPl6C10HhR -SoSi3nY+B7F2E8cuz14qV2e+ejhWhSrBb/keyXpcyjoW1BOAAJ2TIclRRkICSCZr -pXUyXxAvzXfpFXo1RhSbUywN11pfiCQzDUN7sPww9UzFHuAHZHoyfTr27XnJYVUe -rVYrCPq8vqfn//01qz55Xs0hvzGdlTFXhuabFtQnKFH5SNwo/fcznhB7rePOwHoj -xOpXTBepUCIJLbtNnWFTV44t9gh5IqIWtoBReQIDAQABAoIBAHSWipORGp/uKFXj -i/mut776x8ofsAxhnLBARQr93ID+i49W8H7EJGkOfaDjTICYC1dbpGrri61qk8sx -qX7p3v/5NzKwOIfEpirgwVIqSNYe/ncbxnhxkx6tXtUtFKmEx40JskvSpSYAhmmO -1XSx0E/PWaEN/nLgX/f1eWJIlxlQkk3QeqL+FGbCXI48DEtlJ9+MzMu4pAwZTpj5 -5qtXo5JJ0jRGfJVPAOznRsYqv864AhMdMIWguzk6EGnbaCWwPcfcn+h9a5LMdony -MDHfBS7bb5tkF3+AfnVY3IBMVx7YlsD9eAyajlgiKu4zLbwTRHjXgShy+4Oussz0 -ugNGnkECgYEA/hi+McrZC8C4gg6XqK8+9joD8tnyDZDz88BQB7CZqABUSwvjDqlP -L8hcwo/lzvjBNYGkqaFPUICGWKjeCtd8pPS2DCVXxDQX4aHF1vUur0uYNncJiV3N -XQz4Iemsa6wnKf6M67b5vMXICw7dw0HZCdIHD1hnhdtDz0uVpeevLZ8CgYEA2KCT -Y43lorjrbCgMqtlefkr3GJA9dey+hTzCiWEOOqn9RqGoEGUday0sKhiLofOgmN2B -LEukpKIey8s+Q/cb6lReajDVPDsMweX8i7hz3Wa4Ugp4Xa5BpHqu8qIAE2JUZ7bU -t88aQAYE58pUF+/Lq1QzAQdrjjzQBx6SrBxieecCgYEAvukoPZEC8mmiN1VvbTX+ -QFHmlZha3QaDxChB+QUe7bMRojEUL/fVnzkTOLuVFqSfxevaI/km9n0ac5KtAchV -xjp2bTnBb5EUQFqjopYktWA+xO07JRJtMfSEmjZPbbay1kKC7rdTfBm961EIHaRj -xZUf6M+rOE8964oGrdgdLlECgYEA046GQmx6fh7/82FtdZDRQp9tj3SWQUtSiQZc -qhO59Lq8mjUXz+MgBuJXxkiwXRpzlbaFB0Bca1fUoYw8o915SrDYf/Zu2OKGQ/qa -V81sgiVmDuEgycR7YOlbX6OsVUHrUlpwhY3hgfMe6UtkMvhBvHF/WhroBEIJm1pV -PXZ/CbMCgYEApNWVktFBjOaYfY6SNn4iSts1jgsQbbpglg3kT7PLKjCAhI6lNsbk -dyT7ut01PL6RaW4SeQWtrJIVQaM6vF3pprMKqlc5XihOGAmVqH7rQx9rtQB5TicL -BFrwkQE4HQtQBV60hYQUzzlSk44VFDz+jxIEtacRHaomDRh2FtOTz+I= ------END RSA PRIVATE KEY----- -` - certFile, err := ioutil.TempFile("", "cert") - assert.Nil(t, err) - defer os.Remove(certFile.Name()) - certFile.Write([]byte(cert)) - certFile.Close() - keyFile, err := ioutil.TempFile("", "key") - assert.Nil(t, err) - defer os.Remove(keyFile.Name()) - keyFile.Write([]byte(key)) - keyFile.Close() - - libkv.AddStore("mock", libkvmock.New) - d := &Discovery{backend: "mock"} - err = d.Initialize("127.0.0.3:1234", 0, 0, map[string]string{ - "kv.cacertfile": certFile.Name(), - "kv.certfile": certFile.Name(), - "kv.keyfile": keyFile.Name(), - }) - assert.Nil(t, err) - s := d.store.(*libkvmock.Mock) - assert.Equal(t, s.Options.ClientTLS.CACertFile, certFile.Name()) - assert.Equal(t, s.Options.ClientTLS.CertFile, certFile.Name()) - assert.Equal(t, s.Options.ClientTLS.KeyFile, keyFile.Name()) -} - -func TestWatch(t *testing.T) { - storeMock, err := libkvmock.New([]string{"127.0.0.1:1234"}, nil) - assert.NotNil(t, storeMock) - assert.NoError(t, err) - - d := &Discovery{backend: store.CONSUL} - d.Initialize("127.0.0.1:1234/path", 0, 0, nil) - d.store = storeMock - - s := d.store.(*libkvmock.Mock) - mockCh := make(chan []*store.KVPair) - - // The first watch will fail on those three calls - s.On("Exists", "path/"+defaultDiscoveryPath).Return(false, errors.New("test error")) - s.On("Put", "path/"+defaultDiscoveryPath, mock.Anything, mock.Anything).Return(errors.New("test error")) - s.On("WatchTree", "path/"+defaultDiscoveryPath, mock.Anything).Return(mockCh, errors.New("test error")).Once() - - // The second one will succeed. - s.On("WatchTree", "path/"+defaultDiscoveryPath, mock.Anything).Return(mockCh, nil).Once() - expected := discovery.Entries{ - &discovery.Entry{Host: "1.1.1.1", Port: "1111"}, - &discovery.Entry{Host: "2.2.2.2", Port: "2222"}, - } - kvs := []*store.KVPair{ - {Key: path.Join("path", defaultDiscoveryPath, "1.1.1.1"), Value: []byte("1.1.1.1:1111")}, - {Key: path.Join("path", defaultDiscoveryPath, "2.2.2.2"), Value: []byte("2.2.2.2:2222")}, - } - - stopCh := make(chan struct{}) - ch, errCh := d.Watch(stopCh) - - // It should fire an error since the first WatchTree call failed. - assert.EqualError(t, <-errCh, "test error") - // We have to drain the error channel otherwise Watch will get stuck. - go func() { - for range errCh { - } - }() - - // Push the entries into the store channel and make sure discovery emits. - mockCh <- kvs - assert.Equal(t, <-ch, expected) - - // Add a new entry. - expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"}) - kvs = append(kvs, &store.KVPair{Key: path.Join("path", defaultDiscoveryPath, "3.3.3.3"), Value: []byte("3.3.3.3:3333")}) - mockCh <- kvs - assert.Equal(t, <-ch, expected) - - // Make sure that if an error occurs it retries. - // This third call to WatchTree will be checked later by AssertExpectations. - s.On("WatchTree", "path/"+defaultDiscoveryPath, mock.Anything).Return(mockCh, nil) - close(mockCh) - // Give it enough time to call WatchTree. - time.Sleep(3) - - // Stop and make sure it closes all channels. - close(stopCh) - assert.Nil(t, <-ch) - assert.Nil(t, <-errCh) - - s.AssertExpectations(t) -} diff --git a/discovery/nodes/nodes.go b/discovery/nodes/nodes.go deleted file mode 100644 index 181a15879f..0000000000 --- a/discovery/nodes/nodes.go +++ /dev/null @@ -1,54 +0,0 @@ -package nodes - -import ( - "fmt" - "strings" - "time" - - "github.com/docker/swarm/discovery" -) - -// Discovery is exported -type Discovery struct { - entries discovery.Entries -} - -func init() { - Init() -} - -// Init is exported -func Init() { - discovery.Register("nodes", &Discovery{}) -} - -// Initialize is exported -func (s *Discovery) Initialize(uris string, _ time.Duration, _ time.Duration, _ map[string]string) error { - for _, input := range strings.Split(uris, ",") { - for _, ip := range discovery.Generate(input) { - entry, err := discovery.NewEntry(ip) - if err != nil { - return fmt.Errorf("%s, please check you are using the correct discovery (missing token:// ?)", err.Error()) - } - s.entries = append(s.entries, entry) - } - } - - return nil -} - -// Watch is exported -func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { - ch := make(chan discovery.Entries) - go func() { - defer close(ch) - ch <- s.entries - <-stopCh - }() - return ch, nil -} - -// Register is exported -func (s *Discovery) Register(addr string) error { - return discovery.ErrNotImplemented -} diff --git a/discovery/nodes/nodes_test.go b/discovery/nodes/nodes_test.go deleted file mode 100644 index dd1ed4fe26..0000000000 --- a/discovery/nodes/nodes_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package nodes - -import ( - "testing" - - "github.com/docker/swarm/discovery" - "github.com/stretchr/testify/assert" -) - -func TestInitialize(t *testing.T) { - d := &Discovery{} - d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0, 0, nil) - assert.Equal(t, len(d.entries), 2) - assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111") - assert.Equal(t, d.entries[1].String(), "2.2.2.2:2222") -} - -func TestInitializeWithPattern(t *testing.T) { - d := &Discovery{} - d.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0, 0, nil) - assert.Equal(t, len(d.entries), 5) - assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111") - assert.Equal(t, d.entries[1].String(), "1.1.1.2:1111") - assert.Equal(t, d.entries[2].String(), "2.2.2.2:2222") - assert.Equal(t, d.entries[3].String(), "2.2.2.3:2222") - assert.Equal(t, d.entries[4].String(), "2.2.2.4:2222") -} - -func TestWatch(t *testing.T) { - d := &Discovery{} - d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0, 0, nil) - expected := discovery.Entries{ - &discovery.Entry{Host: "1.1.1.1", Port: "1111"}, - &discovery.Entry{Host: "2.2.2.2", Port: "2222"}, - } - ch, _ := d.Watch(nil) - assert.True(t, expected.Equals(<-ch)) -} - -func TestRegister(t *testing.T) { - d := &Discovery{} - assert.Error(t, d.Register("0.0.0.0")) -} diff --git a/discovery/token/token.go b/discovery/token/token.go index 1f08e516a3..a416fe2035 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -9,7 +9,7 @@ import ( "strings" "time" - "github.com/docker/swarm/discovery" + "github.com/docker/docker/pkg/discovery" ) const discoveryURL = "https://discovery.hub.docker.com/v1" diff --git a/discovery/token/token_test.go b/discovery/token/token_test.go index d3c08d8f88..d3c29e32aa 100644 --- a/discovery/token/token_test.go +++ b/discovery/token/token_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "github.com/docker/swarm/discovery" + "github.com/docker/docker/pkg/discovery" "github.com/stretchr/testify/assert" ) diff --git a/docs/discovery.md b/docs/discovery.md index f111281c75..7670fe8d30 100644 --- a/docs/discovery.md +++ b/docs/discovery.md @@ -209,8 +209,9 @@ swarm is connected to the public internet. To create your swarm: You can contribute a new discovery backend to Swarm. For information on how to do this, see our -discovery README in the Docker Swarm repository. +href="https://github.com/docker/docker/tree/master/pkg/discovery"> +github.com/docker/docker/pkg/discovery. + ## Docker Swarm documentation index diff --git a/main.go b/main.go index f90922793d..e59a0637cc 100644 --- a/main.go +++ b/main.go @@ -1,9 +1,9 @@ package main import ( - _ "github.com/docker/swarm/discovery/file" - _ "github.com/docker/swarm/discovery/kv" - _ "github.com/docker/swarm/discovery/nodes" + _ "github.com/docker/docker/pkg/discovery/file" + _ "github.com/docker/docker/pkg/discovery/kv" + _ "github.com/docker/docker/pkg/discovery/nodes" _ "github.com/docker/swarm/discovery/token" "github.com/docker/swarm/cli"