From bb7f69fd9c84730b12e2f4c9c402d7c0edeb8842 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Wed, 10 Dec 2014 01:39:33 +0000 Subject: [PATCH 01/13] First step toward modular discoevery services Signed-off-by: Victor Vieux --- discovery/discovery.go | 49 ++++++++++++++++++ discovery/{ => token}/README.md | 0 discovery/{client.go => token/token.go} | 50 ++++++++++++------- .../{client_test.go => token/token_test.go} | 9 ++-- join.go | 13 +++-- main.go | 28 +++++++---- manage.go | 12 +++-- 7 files changed, 121 insertions(+), 40 deletions(-) create mode 100644 discovery/discovery.go rename discovery/{ => token}/README.md (100%) rename discovery/{client.go => token/token.go} (53%) rename discovery/{client_test.go => token/token_test.go} (60%) diff --git a/discovery/discovery.go b/discovery/discovery.go new file mode 100644 index 0000000000..3d1f578d88 --- /dev/null +++ b/discovery/discovery.go @@ -0,0 +1,49 @@ +package discovery + +import ( + "errors" + "fmt" + "net/url" + + log "github.com/Sirupsen/logrus" +) + +type InitFunc func(url string) (DiscoveryService, error) + +type DiscoveryService interface { + FetchNodes() ([]string, error) + RegisterNode(addr string) error +} + +var ( + discoveries map[string]InitFunc + ErrNotSupported = errors.New("discovery service not supported") +) + +func init() { + discoveries = make(map[string]InitFunc) +} + +func Register(scheme string, initFunc InitFunc) error { + if _, exists := discoveries[scheme]; exists { + return fmt.Errorf("scheme already registered %s", scheme) + } + log.Debugf("Registering %q discovery service", scheme) + discoveries[scheme] = initFunc + + return nil +} + +func New(rawurl string) (DiscoveryService, error) { + url, err := url.Parse(rawurl) + if err != nil { + return nil, err + } + + if initFct, exists := discoveries[url.Scheme]; exists { + log.Debugf("Initialising %q discovery service with %q", url.Scheme, url.Host) + return initFct(url.Host) + } + + return nil, ErrNotSupported +} diff --git a/discovery/README.md b/discovery/token/README.md similarity index 100% rename from discovery/README.md rename to discovery/token/README.md diff --git a/discovery/client.go b/discovery/token/token.go similarity index 53% rename from discovery/client.go rename to discovery/token/token.go index b852bfe6ca..d3483dce28 100644 --- a/discovery/client.go +++ b/discovery/token/token.go @@ -1,4 +1,4 @@ -package discovery +package token import ( "encoding/json" @@ -6,13 +6,37 @@ import ( "io/ioutil" "net/http" "strings" + + "github.com/docker/swarm/discovery" ) const DISCOVERY_URL = "https://discovery-stage.hub.docker.com/v1" -// FetchSlaves returns the slaves for the discovery service at the specified endpoint -func FetchSlaves(token string) ([]string, error) { - resp, err := http.Get(fmt.Sprintf("%s/%s/%s", DISCOVERY_URL, "clusters", token)) +type TokenDiscoveryService struct { + token string +} + +func init() { + discovery.Register("token", Init) +} + +func Init(token string) (discovery.DiscoveryService, error) { + return TokenDiscoveryService{token: token}, nil +} + +// CreateCluster returns a unique cluster token +func CreateCluster() (string, error) { + resp, err := http.Post(fmt.Sprintf("%s/%s", DISCOVERY_URL, "clusters"), "", nil) + if err != nil { + return "", err + } + token, err := ioutil.ReadAll(resp.Body) + return string(token), err +} + +// FetchNodes returns the node for the discovery service at the specified endpoint +func (s TokenDiscoveryService) FetchNodes() ([]string, error) { + resp, err := http.Get(fmt.Sprintf("%s/%s/%s", DISCOVERY_URL, "clusters", s.token)) if err != nil { return nil, err } @@ -31,21 +55,11 @@ func FetchSlaves(token string) ([]string, error) { return addrs, nil } -// RegisterSlave adds a new slave identified by the slaveID into the discovery service -// the default TTL is 30 secs -func RegisterSlave(addr, token string) error { +// RegisterNode adds a new node identified by the into the discovery service +func (s TokenDiscoveryService) RegisterNode(addr string) error { buf := strings.NewReader(addr) - _, err := http.Post(fmt.Sprintf("%s/%s/%s", DISCOVERY_URL, "clusters", token), "application/json", buf) + _, err := http.Post(fmt.Sprintf("%s/%s/%s", DISCOVERY_URL, + "clusters", s.token), "application/json", buf) return err } - -// CreateCluster returns a unique cluster token -func CreateCluster() (string, error) { - resp, err := http.Post(fmt.Sprintf("%s/%s", DISCOVERY_URL, "clusters"), "", nil) - if err != nil { - return "", err - } - token, err := ioutil.ReadAll(resp.Body) - return string(token), err -} diff --git a/discovery/client_test.go b/discovery/token/token_test.go similarity index 60% rename from discovery/client_test.go rename to discovery/token/token_test.go index dd12a3f7b7..4e940da841 100644 --- a/discovery/client_test.go +++ b/discovery/token/token_test.go @@ -1,14 +1,15 @@ -package discovery +package token import "testing" func TestRegister(t *testing.T) { + discovery := TokenDiscoveryService{token: "TEST_TOKEN"} expected := "127.0.0.1:2675" - if err := RegisterSlave(expected, "TEST_TOKEN"); err != nil { + if err := discovery.RegisterNode(expected); err != nil { t.Fatal(err) } - addrs, err := FetchSlaves("TEST_TOKEN") + addrs, err := discovery.FetchNodes() if err != nil { t.Fatal(err) } @@ -21,7 +22,7 @@ func TestRegister(t *testing.T) { t.Fatalf("expected addr %q but received %q", expected, addrs[0]) } - if err = RegisterSlave(expected, "TEST_TOKEN"); err != nil { + if err = discovery.RegisterNode(expected); err != nil { t.Fatal(err) } } diff --git a/join.go b/join.go index f4d0cd1294..549cf0a96b 100644 --- a/join.go +++ b/join.go @@ -10,18 +10,23 @@ import ( func join(c *cli.Context) { - if c.String("token") == "" { - log.Fatal("--token required to join a cluster") + if c.String("discovery") == "" { + log.Fatal("--discovery required to list a cluster") } - if err := discovery.RegisterSlave(c.String("addr"), c.String("token")); err != nil { + d, err := discovery.New(c.String("discovery")) + if err != nil { + log.Fatal(err) + } + + if err := d.RegisterNode(c.String("addr")); err != nil { log.Fatal(err) } hb := time.Duration(c.Int("heartbeat")) for { time.Sleep(hb * time.Second) - if err := discovery.RegisterSlave(c.String("addr"), c.String("token")); err != nil { + if err := d.RegisterNode(c.String("addr")); err != nil { log.Error(err) } } diff --git a/main.go b/main.go index 5673a10194..a173b1c3f3 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" "github.com/docker/swarm/discovery" + "github.com/docker/swarm/discovery/token" ) func main() { @@ -33,11 +34,11 @@ func main() { } // flags - flToken := cli.StringFlag{ - Name: "token", + flDiscovery := cli.StringFlag{ + Name: "discovery", Value: "", - Usage: "cluster token", - EnvVar: "SWARM_TOKEN", + Usage: "token://, file://path/to/file", + EnvVar: "SWARM_DISCOVERY", } flAddr := cli.StringFlag{ Name: "addr", @@ -81,7 +82,7 @@ func main() { ShortName: "c", Usage: "create a cluster", Action: func(c *cli.Context) { - token, err := discovery.CreateCluster() + token, err := token.CreateCluster() if err != nil { log.Fatal(err) } @@ -92,13 +93,18 @@ func main() { Name: "list", ShortName: "l", Usage: "list nodes in a cluster", - Flags: []cli.Flag{flToken}, + Flags: []cli.Flag{flDiscovery}, Action: func(c *cli.Context) { - if c.String("token") == "" { - log.Fatal("--token required to list a cluster") + if c.String("discovery") == "" { + log.Fatal("--discovery required to list a cluster") } - nodes, err := discovery.FetchSlaves(c.String("token")) + d, err := discovery.New(c.String("discovery")) + if err != nil { + log.Fatal(err) + } + + nodes, err := d.FetchNodes() if err != nil { log.Fatal(err) } @@ -112,7 +118,7 @@ func main() { ShortName: "m", Usage: "manage a docker cluster", Flags: []cli.Flag{ - flToken, flAddr, flHeartBeat, + flDiscovery, flAddr, flHeartBeat, flTls, flTlsCaCert, flTlsCert, flTlsKey, flTlsVerify, flEnableCors}, Action: manage, @@ -121,7 +127,7 @@ func main() { Name: "join", ShortName: "j", Usage: "join a docker cluster", - Flags: []cli.Flag{flToken, flAddr, flHeartBeat}, + Flags: []cli.Flag{flDiscovery, flAddr, flHeartBeat}, Action: join, }, } diff --git a/manage.go b/manage.go index 8375152250..3acace1241 100644 --- a/manage.go +++ b/manage.go @@ -98,8 +98,14 @@ func manage(c *cli.Context) { cluster.Events(&logHandler{}) go func() { - if c.String("token") != "" { - nodes, err := discovery.FetchSlaves(c.String("token")) + fmt.Println(c.String("discovery")) + if c.String("discovery") != "" { + d, err := discovery.New(c.String("discovery")) + if err != nil { + log.Fatal(err) + } + + nodes, err := d.FetchNodes() if err != nil { log.Fatal(err) @@ -110,7 +116,7 @@ func manage(c *cli.Context) { go func() { for { time.Sleep(hb * time.Second) - nodes, err = discovery.FetchSlaves(c.String("token")) + nodes, err = d.FetchNodes() if err == nil { refresh(cluster, nodes) } From ef9b509d07183122c5e1d2e3300e9ff56214b7b0 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Wed, 10 Dec 2014 02:01:59 +0000 Subject: [PATCH 02/13] add file Signed-off-by: Victor Vieux --- discovery/discovery.go | 2 +- discovery/file/file.go | 34 ++++++++++++++++++++++++++++++++++ discovery/token/token.go | 20 ++++++++++---------- main.go | 2 ++ manage.go | 1 - 5 files changed, 47 insertions(+), 12 deletions(-) create mode 100644 discovery/file/file.go diff --git a/discovery/discovery.go b/discovery/discovery.go index 3d1f578d88..444df0a439 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -28,7 +28,7 @@ func Register(scheme string, initFunc InitFunc) error { if _, exists := discoveries[scheme]; exists { return fmt.Errorf("scheme already registered %s", scheme) } - log.Debugf("Registering %q discovery service", scheme) + fmt.Printf("Registering %q discovery service", scheme) discoveries[scheme] = initFunc return nil diff --git a/discovery/file/file.go b/discovery/file/file.go new file mode 100644 index 0000000000..6fa87c91d7 --- /dev/null +++ b/discovery/file/file.go @@ -0,0 +1,34 @@ +package file + +import ( + "io/ioutil" + "strings" + + "github.com/docker/swarm/discovery" +) + +type FileDiscoveryService struct { + path string +} + +func init() { + discovery.Register("file", Init) +} + +func Init(file string) (discovery.DiscoveryService, error) { + return FileDiscoveryService{path: file}, nil +} + +func (s FileDiscoveryService) FetchNodes() ([]string, error) { + data, err := ioutil.ReadFile(s.path) + if err != nil { + return nil, err + } + + return strings.Split(string(data), "\n"), nil +} + +func (s FileDiscoveryService) RegisterNode(addr string) error { + + return nil +} diff --git a/discovery/token/token.go b/discovery/token/token.go index d3483dce28..727b485c22 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -24,16 +24,6 @@ func Init(token string) (discovery.DiscoveryService, error) { return TokenDiscoveryService{token: token}, nil } -// CreateCluster returns a unique cluster token -func CreateCluster() (string, error) { - resp, err := http.Post(fmt.Sprintf("%s/%s", DISCOVERY_URL, "clusters"), "", nil) - if err != nil { - return "", err - } - token, err := ioutil.ReadAll(resp.Body) - return string(token), err -} - // FetchNodes returns the node for the discovery service at the specified endpoint func (s TokenDiscoveryService) FetchNodes() ([]string, error) { resp, err := http.Get(fmt.Sprintf("%s/%s/%s", DISCOVERY_URL, "clusters", s.token)) @@ -63,3 +53,13 @@ func (s TokenDiscoveryService) RegisterNode(addr string) error { "clusters", s.token), "application/json", buf) return err } + +// CreateCluster returns a unique cluster token +func CreateCluster() (string, error) { + resp, err := http.Post(fmt.Sprintf("%s/%s", DISCOVERY_URL, "clusters"), "", nil) + if err != nil { + return "", err + } + token, err := ioutil.ReadAll(resp.Body) + return string(token), err +} diff --git a/main.go b/main.go index a173b1c3f3..e62e39a647 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,9 @@ import ( log "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" + "github.com/docker/swarm/discovery" + _ "github.com/docker/swarm/discovery/file" "github.com/docker/swarm/discovery/token" ) diff --git a/manage.go b/manage.go index 3acace1241..8cd32eec3e 100644 --- a/manage.go +++ b/manage.go @@ -98,7 +98,6 @@ func manage(c *cli.Context) { cluster.Events(&logHandler{}) go func() { - fmt.Println(c.String("discovery")) if c.String("discovery") != "" { d, err := discovery.New(c.String("discovery")) if err != nil { From 0aeb1771bd8e9d6c0be8880f58a1052db4acd7d3 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Wed, 10 Dec 2014 02:34:27 +0000 Subject: [PATCH 03/13] add watch interface --- discovery/discovery.go | 6 ++++-- discovery/file/file.go | 13 +++++++++---- discovery/token/token.go | 9 +++++++-- join.go | 6 +++--- main.go | 2 +- manage.go | 9 +++------ 6 files changed, 27 insertions(+), 18 deletions(-) diff --git a/discovery/discovery.go b/discovery/discovery.go index 444df0a439..34c8eb3089 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "net/url" + "time" log "github.com/Sirupsen/logrus" ) @@ -11,8 +12,9 @@ import ( type InitFunc func(url string) (DiscoveryService, error) type DiscoveryService interface { - FetchNodes() ([]string, error) - RegisterNode(addr string) error + Fetch() ([]string, error) + Watch(int) <-chan time.Time + Register(string) error } var ( diff --git a/discovery/file/file.go b/discovery/file/file.go index 6fa87c91d7..af51141723 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -1,8 +1,10 @@ package file import ( + "errors" "io/ioutil" "strings" + "time" "github.com/docker/swarm/discovery" ) @@ -19,7 +21,7 @@ func Init(file string) (discovery.DiscoveryService, error) { return FileDiscoveryService{path: file}, nil } -func (s FileDiscoveryService) FetchNodes() ([]string, error) { +func (s FileDiscoveryService) Fetch() ([]string, error) { data, err := ioutil.ReadFile(s.path) if err != nil { return nil, err @@ -28,7 +30,10 @@ func (s FileDiscoveryService) FetchNodes() ([]string, error) { return strings.Split(string(data), "\n"), nil } -func (s FileDiscoveryService) RegisterNode(addr string) error { - - return nil +func (s FileDiscoveryService) Watch(heartbeat int) <-chan time.Time { + return time.Tick(time.Duration(heartbeat) * time.Second) +} + +func (s FileDiscoveryService) Register(addr string) error { + return errors.New("unimplemented") } diff --git a/discovery/token/token.go b/discovery/token/token.go index 727b485c22..866ff79a21 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "net/http" "strings" + "time" "github.com/docker/swarm/discovery" ) @@ -25,7 +26,7 @@ func Init(token string) (discovery.DiscoveryService, error) { } // FetchNodes returns the node for the discovery service at the specified endpoint -func (s TokenDiscoveryService) FetchNodes() ([]string, error) { +func (s TokenDiscoveryService) Fetch() ([]string, error) { resp, err := http.Get(fmt.Sprintf("%s/%s/%s", DISCOVERY_URL, "clusters", s.token)) if err != nil { return nil, err @@ -45,8 +46,12 @@ func (s TokenDiscoveryService) FetchNodes() ([]string, error) { return addrs, nil } +func (s TokenDiscoveryService) Watch(heartbeat int) <-chan time.Time { + return time.Tick(time.Duration(heartbeat) * time.Second) +} + // RegisterNode adds a new node identified by the into the discovery service -func (s TokenDiscoveryService) RegisterNode(addr string) error { +func (s TokenDiscoveryService) Register(addr string) error { buf := strings.NewReader(addr) _, err := http.Post(fmt.Sprintf("%s/%s/%s", DISCOVERY_URL, diff --git a/join.go b/join.go index 549cf0a96b..432381b16c 100644 --- a/join.go +++ b/join.go @@ -11,7 +11,7 @@ import ( func join(c *cli.Context) { if c.String("discovery") == "" { - log.Fatal("--discovery required to list a cluster") + log.Fatal("--discovery required to join a cluster") } d, err := discovery.New(c.String("discovery")) @@ -19,14 +19,14 @@ func join(c *cli.Context) { log.Fatal(err) } - if err := d.RegisterNode(c.String("addr")); err != nil { + if err := d.Register(c.String("addr")); err != nil { log.Fatal(err) } hb := time.Duration(c.Int("heartbeat")) for { time.Sleep(hb * time.Second) - if err := d.RegisterNode(c.String("addr")); err != nil { + if err := d.Register(c.String("addr")); err != nil { log.Error(err) } } diff --git a/main.go b/main.go index e62e39a647..0a4a7057b6 100644 --- a/main.go +++ b/main.go @@ -106,7 +106,7 @@ func main() { log.Fatal(err) } - nodes, err := d.FetchNodes() + nodes, err := d.Fetch() if err != nil { log.Fatal(err) } diff --git a/manage.go b/manage.go index 8cd32eec3e..8ac40b05ca 100644 --- a/manage.go +++ b/manage.go @@ -6,7 +6,6 @@ import ( "fmt" "io/ioutil" "strings" - "time" log "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" @@ -104,18 +103,16 @@ func manage(c *cli.Context) { log.Fatal(err) } - nodes, err := d.FetchNodes() + nodes, err := d.Fetch() if err != nil { log.Fatal(err) } refresh(cluster, nodes) - hb := time.Duration(c.Int("heartbeat")) go func() { - for { - time.Sleep(hb * time.Second) - nodes, err = d.FetchNodes() + for _ = range d.Watch(c.Int("heartbeat")) { + nodes, err = d.Fetch() if err == nil { refresh(cluster, nodes) } From 2e6c8cc00c540e221ee59a5b7ce4a747f7684ee4 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Wed, 10 Dec 2014 02:39:29 +0000 Subject: [PATCH 04/13] update README.md --- README.md | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index eecb26082c..525efc6b22 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ This can be achieved by starting Docker with the `-H` flag (e.g. `-H 0.0.0.0:237 Currently, nodes must be running the Docker **master** version. Master binaries are available here: https://master.dockerproject.com/ -### Example usage +### Example usage: using the hosted discovery service ```bash # create a cluster @@ -34,10 +34,10 @@ $ swarm create # on each of your nodes, start the swarm agent # doesn't have to be public (eg. 192.168.0.X), # as long as the other nodes can reach it, it is fine. -$ swarm join --token=6856663cdefdec325839a4b7e1de38e8 --addr= +$ swarm join --discovery token://6856663cdefdec325839a4b7e1de38e8 --addr= # start the manager on any machine or your laptop -$ swarm manage --token=6856663cdefdec325839a4b7e1de38e8 --addr= +$ swarm manage --discovery token://6856663cdefdec325839a4b7e1de38e8 --addr= # use the regular docker cli $ docker -H info @@ -47,7 +47,30 @@ $ docker -H logs ... ... # list nodes in your cluster -$ swarm list --token=6856663cdefdec325839a4b7e1de38e8 +$ swarm list --discovery token://6856663cdefdec325839a4b7e1de38e8 +http:// +``` + +### Example usage: using a static file describing the cluster + +```bash +# for each of your nodes, add a line to a file +# doesn't have to be public (eg. 192.168.0.X), +# as long as the other nodes can reach it, it is fine. +$ echo >> /tmp/my_cluster + +# start the manager on any machine or your laptop +$ swarm manage --discovery file:///tmp/my_cluster --addr= + +# use the regular docker cli +$ docker -H info +$ docker -H run ... +$ docker -H ps +$ docker -H logs ... +... + +# list nodes in your cluster +$ swarm list --discovery file:///tmp/my_cluster http:// ``` From f04af68d3f70b6d8d6d65dfd96cc9a956e7153f3 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Wed, 10 Dec 2014 21:28:45 +0000 Subject: [PATCH 05/13] update file --- discovery/discovery.go | 6 +++--- discovery/file/file.go | 9 ++++++++- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/discovery/discovery.go b/discovery/discovery.go index 34c8eb3089..190021e778 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -30,7 +30,7 @@ func Register(scheme string, initFunc InitFunc) error { if _, exists := discoveries[scheme]; exists { return fmt.Errorf("scheme already registered %s", scheme) } - fmt.Printf("Registering %q discovery service", scheme) + log.Debugf("Registering %q discovery service", scheme) discoveries[scheme] = initFunc return nil @@ -43,8 +43,8 @@ func New(rawurl string) (DiscoveryService, error) { } if initFct, exists := discoveries[url.Scheme]; exists { - log.Debugf("Initialising %q discovery service with %q", url.Scheme, url.Host) - return initFct(url.Host) + log.Debugf("Initialising %q discovery service with %q", url.Scheme, url.Host+url.Path) + return initFct(url.Host + url.Path) } return nil, ErrNotSupported diff --git a/discovery/file/file.go b/discovery/file/file.go index af51141723..f9cce05587 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -27,7 +27,14 @@ func (s FileDiscoveryService) Fetch() ([]string, error) { return nil, err } - return strings.Split(string(data), "\n"), nil + lines := []string{} + + for _, line := range strings.Split(string(data), "\n") { + if line != "" { + lines = append(lines, line) + } + } + return lines, nil } func (s FileDiscoveryService) Watch(heartbeat int) <-chan time.Time { From 3d39101ff7a4c46296606b6afc0949e19b31c2a1 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Wed, 10 Dec 2014 22:15:52 +0000 Subject: [PATCH 06/13] allow custom url on token Signed-off-by: Victor Vieux --- discovery/token/token.go | 23 ++++++++++++----- discovery/token/token_test.go | 48 ++++++++++++++++++++--------------- main.go | 2 +- 3 files changed, 45 insertions(+), 28 deletions(-) diff --git a/discovery/token/token.go b/discovery/token/token.go index 866ff79a21..e0379002c5 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -14,6 +14,7 @@ import ( const DISCOVERY_URL = "https://discovery-stage.hub.docker.com/v1" type TokenDiscoveryService struct { + url string token string } @@ -21,13 +22,23 @@ func init() { discovery.Register("token", Init) } -func Init(token string) (discovery.DiscoveryService, error) { - return TokenDiscoveryService{token: token}, nil +func Init(urltoken string) (discovery.DiscoveryService, error) { + if i := strings.LastIndex(urltoken, "/"); i != -1 { + return TokenDiscoveryService{url: "https://" + urltoken[:i], token: urltoken[i+1:]}, nil + } + + return TokenDiscoveryService{url: DISCOVERY_URL, token: urltoken}, nil +} +func New(url string) *TokenDiscoveryService { + if url != "" { + return &TokenDiscoveryService{url: url} + } + return &TokenDiscoveryService{url: DISCOVERY_URL} } // FetchNodes returns the node for the discovery service at the specified endpoint func (s TokenDiscoveryService) Fetch() ([]string, error) { - resp, err := http.Get(fmt.Sprintf("%s/%s/%s", DISCOVERY_URL, "clusters", s.token)) + resp, err := http.Get(fmt.Sprintf("%s/%s/%s", s.url, "clusters", s.token)) if err != nil { return nil, err } @@ -54,14 +65,14 @@ func (s TokenDiscoveryService) Watch(heartbeat int) <-chan time.Time { func (s TokenDiscoveryService) Register(addr string) error { buf := strings.NewReader(addr) - _, err := http.Post(fmt.Sprintf("%s/%s/%s", DISCOVERY_URL, + _, err := http.Post(fmt.Sprintf("%s/%s/%s", s.url, "clusters", s.token), "application/json", buf) return err } // CreateCluster returns a unique cluster token -func CreateCluster() (string, error) { - resp, err := http.Post(fmt.Sprintf("%s/%s", DISCOVERY_URL, "clusters"), "", nil) +func (s TokenDiscoveryService) CreateCluster() (string, error) { + resp, err := http.Post(fmt.Sprintf("%s/%s", s.url, "clusters"), "", nil) if err != nil { return "", err } diff --git a/discovery/token/token_test.go b/discovery/token/token_test.go index 4e940da841..a15d118539 100644 --- a/discovery/token/token_test.go +++ b/discovery/token/token_test.go @@ -1,28 +1,34 @@ package token -import "testing" +import ( + "testing" -func TestRegister(t *testing.T) { - discovery := TokenDiscoveryService{token: "TEST_TOKEN"} - expected := "127.0.0.1:2675" - if err := discovery.RegisterNode(expected); err != nil { - t.Fatal(err) + "github.com/stretchr/testify/assert" +) + +func TestInit(t *testing.T) { + discovery, _ := Init("token") + if dtoken, ok := discovery.(TokenDiscoveryService); ok { + assert.Equal(t, dtoken.token, "token") + assert.Equal(t, dtoken.url, DISCOVERY_URL) } - addrs, err := discovery.FetchNodes() - if err != nil { - t.Fatal(err) - } - - if len(addrs) != 1 { - t.Fatalf("expected addr len == 1, got len = %d", len(addrs)) - } - - if addrs[0] != expected { - t.Fatalf("expected addr %q but received %q", expected, addrs[0]) - } - - if err = discovery.RegisterNode(expected); err != nil { - t.Fatal(err) + discovery, _ = Init("custom/path/token") + if dtoken, ok := discovery.(TokenDiscoveryService); ok { + assert.Equal(t, dtoken.token, "token") + assert.Equal(t, dtoken.url, "https://custom/path") } } + +func TestRegister(t *testing.T) { + discovery := TokenDiscoveryService{token: "TEST_TOKEN", url: DISCOVERY_URL} + expected := "127.0.0.1:2675" + assert.NoError(t, discovery.Register(expected)) + + addrs, err := discovery.Fetch() + assert.NoError(t, err) + assert.Equal(t, len(addrs), 1) + assert.Equal(t, addrs[0], expected) + + assert.NoError(t, discovery.Register(expected)) +} diff --git a/main.go b/main.go index 0a4a7057b6..ac384645fa 100644 --- a/main.go +++ b/main.go @@ -84,7 +84,7 @@ func main() { ShortName: "c", Usage: "create a cluster", Action: func(c *cli.Context) { - token, err := token.CreateCluster() + token, err := token.New("").CreateCluster() if err != nil { log.Fatal(err) } From c38f621425f78492254301ab8597ab7fe8b2f424 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Wed, 10 Dec 2014 23:59:03 +0000 Subject: [PATCH 07/13] add etcd Signed-off-by: Victor Vieux --- README.md | 23 ++++++++++++++ discovery/etcd/etcd.go | 69 ++++++++++++++++++++++++++++++++++++++++++ main.go | 1 + 3 files changed, 93 insertions(+) create mode 100644 discovery/etcd/etcd.go diff --git a/README.md b/README.md index 525efc6b22..ec232fbcea 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,29 @@ $ swarm list --discovery file:///tmp/my_cluster http:// ``` +### Example usage: using etcd + +```bash +# on each of your nodes, start the swarm agent +# doesn't have to be public (eg. 192.168.0.X), +# as long as the other nodes can reach it, it is fine. +$ swarm join --discovery etcd:///>path> --addr= + +# start the manager on any machine or your laptop +$ swarm manage --discovery etcd:///>path> --addr= + +# use the regular docker cli +$ docker -H info +$ docker -H run ... +$ docker -H ps +$ docker -H logs ... +... + +# list nodes in your cluster +$ swarm list --discovery etcd:///>path> +http:// +``` + ### TLS Swarm supports TLS authentication between the CLI and Swarm but also between Swarm and the Docker nodes. diff --git a/discovery/etcd/etcd.go b/discovery/etcd/etcd.go new file mode 100644 index 0000000000..33afafc54c --- /dev/null +++ b/discovery/etcd/etcd.go @@ -0,0 +1,69 @@ +package etcd + +import ( + "path" + "strings" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/coreos/go-etcd/etcd" + "github.com/docker/swarm/discovery" +) + +const DEFAULT_TTL = 30 + +type EtcdDiscoveryService struct { + client *etcd.Client + path string +} + +func init() { + discovery.Register("etcd", Init) +} + +func Init(uris string) (discovery.DiscoveryService, error) { + var ( + parts = strings.SplitN(uris, "/", 2) + ips = strings.Split(parts[0], ",") + machines []string + path = "/" + parts[1] + "/" + ) + for _, ip := range ips { + machines = append(machines, "http://"+ip) + } + + client := etcd.NewClient(machines) + client.CreateDir(path, DEFAULT_TTL) // skip error check error because it might already exists + return EtcdDiscoveryService{client: client, path: path}, nil +} +func (s EtcdDiscoveryService) Fetch() ([]string, error) { + resp, err := s.client.Get(s.path, true, true) + if err != nil { + return nil, err + } + nodes := []string{} + + for _, n := range resp.Node.Nodes { + nodes = append(nodes, n.Value) + } + return nodes, nil +} + +func (s EtcdDiscoveryService) Watch(heartbeat int) <-chan time.Time { + watchChan := make(chan *etcd.Response) + timeChan := make(chan time.Time) + go s.client.Watch(s.path, 0, true, watchChan, nil) + go func() { + for { + <-watchChan + log.Debugf("[ETCD] Watch triggered") + timeChan <- time.Now() + } + }() + return timeChan +} + +func (s EtcdDiscoveryService) Register(addr string) error { + _, err := s.client.Set(path.Join(s.path, addr), addr, DEFAULT_TTL) + return err +} diff --git a/main.go b/main.go index ac384645fa..9a0fce159d 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "github.com/codegangsta/cli" "github.com/docker/swarm/discovery" + _ "github.com/docker/swarm/discovery/etcd" _ "github.com/docker/swarm/discovery/file" "github.com/docker/swarm/discovery/token" ) From 875e7c7e774fd5f2708cd9a1de9143144da9a960 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Thu, 11 Dec 2014 01:39:20 +0000 Subject: [PATCH 08/13] add discovery/README.md --- discovery/README.md | 24 ++++++++++++++++++++++++ discovery/etcd/etcd.go | 2 ++ 2 files changed, 26 insertions(+) create mode 100644 discovery/README.md diff --git a/discovery/README.md b/discovery/README.md new file mode 100644 index 0000000000..34c1a78ea0 --- /dev/null +++ b/discovery/README.md @@ -0,0 +1,24 @@ +Discovery +========= + +Contributing a new discovery backend is easy, +simply implements this interface: + +```go +type DiscoveryService interface { + Fetch() ([]string, error) + Watch(int) <-chan time.Time + Register(string) error +} +``` + +######Fetch +returns the list of all the nodes from the discovery + +######Watch +triggers when you need to update (`Fetch`) the list of nodes, +it can happen either via un timer (like `token`) or use +backend specific features (like `etcd`) + +######Register +add a new node to the discovery diff --git a/discovery/etcd/etcd.go b/discovery/etcd/etcd.go index 33afafc54c..a8cbfb4a53 100644 --- a/discovery/etcd/etcd.go +++ b/discovery/etcd/etcd.go @@ -23,6 +23,8 @@ func init() { func Init(uris string) (discovery.DiscoveryService, error) { var ( + // split here because uris can contain multiples ips + // like `etcd://192.168.0.1,192.168.0.2,192.168.0.3/path` parts = strings.SplitN(uris, "/", 2) ips = strings.Split(parts[0], ",") machines []string From 2282fc89ef249b6b4cd085cc0c4673f5d0a6f8a0 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Thu, 11 Dec 2014 21:59:01 +0000 Subject: [PATCH 09/13] add node type Signed-off-by: Victor Vieux --- discovery/discovery.go | 18 +++++++++++++++++- discovery/etcd/etcd.go | 7 ++++--- discovery/file/file.go | 8 ++++---- discovery/token/token.go | 9 +++++++-- discovery/token/token_test.go | 2 +- manage.go | 18 +++++++++--------- 6 files changed, 42 insertions(+), 20 deletions(-) diff --git a/discovery/discovery.go b/discovery/discovery.go index 190021e778..0ae35705b5 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "net/url" + "strings" "time" log "github.com/Sirupsen/logrus" @@ -11,8 +12,23 @@ import ( type InitFunc func(url string) (DiscoveryService, error) +type Node struct { + url string +} + +func NewNode(url string) *Node { + if !strings.Contains(url, "://") { + url = "http://" + url + } + return &Node{url: url} +} + +func (n Node) String() string { + return n.url +} + type DiscoveryService interface { - Fetch() ([]string, error) + Fetch() ([]*Node, error) Watch(int) <-chan time.Time Register(string) error } diff --git a/discovery/etcd/etcd.go b/discovery/etcd/etcd.go index a8cbfb4a53..a32c95197a 100644 --- a/discovery/etcd/etcd.go +++ b/discovery/etcd/etcd.go @@ -38,15 +38,16 @@ func Init(uris string) (discovery.DiscoveryService, error) { client.CreateDir(path, DEFAULT_TTL) // skip error check error because it might already exists return EtcdDiscoveryService{client: client, path: path}, nil } -func (s EtcdDiscoveryService) Fetch() ([]string, error) { +func (s EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) { resp, err := s.client.Get(s.path, true, true) if err != nil { return nil, err } - nodes := []string{} + + var nodes []*discovery.Node for _, n := range resp.Node.Nodes { - nodes = append(nodes, n.Value) + nodes = append(nodes, discovery.NewNode(n.Value)) } return nodes, nil } diff --git a/discovery/file/file.go b/discovery/file/file.go index f9cce05587..9e037d963b 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -21,20 +21,20 @@ func Init(file string) (discovery.DiscoveryService, error) { return FileDiscoveryService{path: file}, nil } -func (s FileDiscoveryService) Fetch() ([]string, error) { +func (s FileDiscoveryService) Fetch() ([]*discovery.Node, error) { data, err := ioutil.ReadFile(s.path) if err != nil { return nil, err } - lines := []string{} + var nodes []*discovery.Node for _, line := range strings.Split(string(data), "\n") { if line != "" { - lines = append(lines, line) + nodes = append(nodes, discovery.NewNode(line)) } } - return lines, nil + return nodes, nil } func (s FileDiscoveryService) Watch(heartbeat int) <-chan time.Time { diff --git a/discovery/token/token.go b/discovery/token/token.go index e0379002c5..36c70aede5 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -37,7 +37,7 @@ func New(url string) *TokenDiscoveryService { } // FetchNodes returns the node for the discovery service at the specified endpoint -func (s TokenDiscoveryService) Fetch() ([]string, error) { +func (s TokenDiscoveryService) Fetch() ([]*discovery.Node, error) { resp, err := http.Get(fmt.Sprintf("%s/%s/%s", s.url, "clusters", s.token)) if err != nil { return nil, err @@ -54,7 +54,12 @@ func (s TokenDiscoveryService) Fetch() ([]string, error) { } } - return addrs, nil + var nodes []*discovery.Node + for _, addr := range addrs { + nodes = append(nodes, discovery.NewNode(addr)) + } + + return nodes, nil } func (s TokenDiscoveryService) Watch(heartbeat int) <-chan time.Time { diff --git a/discovery/token/token_test.go b/discovery/token/token_test.go index a15d118539..8b8a7ae945 100644 --- a/discovery/token/token_test.go +++ b/discovery/token/token_test.go @@ -28,7 +28,7 @@ func TestRegister(t *testing.T) { addrs, err := discovery.Fetch() assert.NoError(t, err) assert.Equal(t, len(addrs), 1) - assert.Equal(t, addrs[0], expected) + assert.Equal(t, addrs[0].String(), "http://"+expected) assert.NoError(t, discovery.Register(expected)) } diff --git a/manage.go b/manage.go index 8ac40b05ca..0ce8b847d8 100644 --- a/manage.go +++ b/manage.go @@ -5,7 +5,6 @@ import ( "crypto/x509" "fmt" "io/ioutil" - "strings" log "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" @@ -72,14 +71,11 @@ func manage(c *cli.Context) { } } - refresh := func(c *cluster.Cluster, nodes []string) { + refresh := func(c *cluster.Cluster, nodes []*discovery.Node) { for _, addr := range nodes { - go func(addr string) { - if !strings.Contains(addr, "://") { - addr = "http://" + addr - } - if c.Node(addr) == nil { - n := cluster.NewNode(addr) + go func(node *discovery.Node) { + if c.Node(node.String()) == nil { + n := cluster.NewNode(node.String()) if err := n.Connect(tlsConfig); err != nil { log.Error(err) return @@ -119,7 +115,11 @@ func manage(c *cli.Context) { } }() } else { - refresh(cluster, c.Args()) + var nodes []*discovery.Node + for _, arg := range c.Args() { + nodes = append(nodes, discovery.NewNode(arg)) + } + refresh(cluster, nodes) } }() From f95f943b4a22956439b54e4ab5a62e54c98a33fa Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Thu, 11 Dec 2014 22:18:54 +0000 Subject: [PATCH 10/13] use 1.5*heartbeat as TTL in etcd Signed-off-by: Victor Vieux --- discovery/discovery.go | 8 ++++---- discovery/etcd/etcd.go | 14 +++++++------- discovery/file/file.go | 11 ++++++----- discovery/token/token.go | 13 +++++++------ discovery/token/token_test.go | 4 ++-- join.go | 4 ++-- main.go | 2 +- manage.go | 4 ++-- 8 files changed, 31 insertions(+), 29 deletions(-) diff --git a/discovery/discovery.go b/discovery/discovery.go index 0ae35705b5..cda76167b9 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -10,7 +10,7 @@ import ( log "github.com/Sirupsen/logrus" ) -type InitFunc func(url string) (DiscoveryService, error) +type InitFunc func(url string, heartbeat int) (DiscoveryService, error) type Node struct { url string @@ -29,7 +29,7 @@ func (n Node) String() string { type DiscoveryService interface { Fetch() ([]*Node, error) - Watch(int) <-chan time.Time + Watch() <-chan time.Time Register(string) error } @@ -52,7 +52,7 @@ func Register(scheme string, initFunc InitFunc) error { return nil } -func New(rawurl string) (DiscoveryService, error) { +func New(rawurl string, heartbeat int) (DiscoveryService, error) { url, err := url.Parse(rawurl) if err != nil { return nil, err @@ -60,7 +60,7 @@ func New(rawurl string) (DiscoveryService, error) { if initFct, exists := discoveries[url.Scheme]; exists { log.Debugf("Initialising %q discovery service with %q", url.Scheme, url.Host+url.Path) - return initFct(url.Host + url.Path) + return initFct(url.Host+url.Path, heartbeat) } return nil, ErrNotSupported diff --git a/discovery/etcd/etcd.go b/discovery/etcd/etcd.go index a32c95197a..3257a70c5d 100644 --- a/discovery/etcd/etcd.go +++ b/discovery/etcd/etcd.go @@ -10,9 +10,8 @@ import ( "github.com/docker/swarm/discovery" ) -const DEFAULT_TTL = 30 - type EtcdDiscoveryService struct { + ttl uint64 client *etcd.Client path string } @@ -21,7 +20,7 @@ func init() { discovery.Register("etcd", Init) } -func Init(uris string) (discovery.DiscoveryService, error) { +func Init(uris string, heartbeat int) (discovery.DiscoveryService, error) { var ( // split here because uris can contain multiples ips // like `etcd://192.168.0.1,192.168.0.2,192.168.0.3/path` @@ -35,8 +34,9 @@ func Init(uris string) (discovery.DiscoveryService, error) { } client := etcd.NewClient(machines) - client.CreateDir(path, DEFAULT_TTL) // skip error check error because it might already exists - return EtcdDiscoveryService{client: client, path: path}, nil + ttl := uint64(heartbeat * 3 / 2) + client.CreateDir(path, ttl) // skip error check error because it might already exists + return EtcdDiscoveryService{client: client, path: path, ttl: ttl}, nil } func (s EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) { resp, err := s.client.Get(s.path, true, true) @@ -52,7 +52,7 @@ func (s EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) { return nodes, nil } -func (s EtcdDiscoveryService) Watch(heartbeat int) <-chan time.Time { +func (s EtcdDiscoveryService) Watch() <-chan time.Time { watchChan := make(chan *etcd.Response) timeChan := make(chan time.Time) go s.client.Watch(s.path, 0, true, watchChan, nil) @@ -67,6 +67,6 @@ func (s EtcdDiscoveryService) Watch(heartbeat int) <-chan time.Time { } func (s EtcdDiscoveryService) Register(addr string) error { - _, err := s.client.Set(path.Join(s.path, addr), addr, DEFAULT_TTL) + _, err := s.client.Set(path.Join(s.path, addr), addr, s.ttl) return err } diff --git a/discovery/file/file.go b/discovery/file/file.go index 9e037d963b..4f86db19bc 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -10,15 +10,16 @@ import ( ) type FileDiscoveryService struct { - path string + heartbeat int + path string } func init() { discovery.Register("file", Init) } -func Init(file string) (discovery.DiscoveryService, error) { - return FileDiscoveryService{path: file}, nil +func Init(file string, heartbeat int) (discovery.DiscoveryService, error) { + return FileDiscoveryService{path: file, heartbeat: heartbeat}, nil } func (s FileDiscoveryService) Fetch() ([]*discovery.Node, error) { @@ -37,8 +38,8 @@ func (s FileDiscoveryService) Fetch() ([]*discovery.Node, error) { return nodes, nil } -func (s FileDiscoveryService) Watch(heartbeat int) <-chan time.Time { - return time.Tick(time.Duration(heartbeat) * time.Second) +func (s FileDiscoveryService) Watch() <-chan time.Time { + return time.Tick(time.Duration(s.heartbeat) * time.Second) } func (s FileDiscoveryService) Register(addr string) error { diff --git a/discovery/token/token.go b/discovery/token/token.go index 36c70aede5..fa53047941 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -14,17 +14,18 @@ import ( const DISCOVERY_URL = "https://discovery-stage.hub.docker.com/v1" type TokenDiscoveryService struct { - url string - token string + heartbeat int + url string + token string } func init() { discovery.Register("token", Init) } -func Init(urltoken string) (discovery.DiscoveryService, error) { +func Init(urltoken string, heartbeat int) (discovery.DiscoveryService, error) { if i := strings.LastIndex(urltoken, "/"); i != -1 { - return TokenDiscoveryService{url: "https://" + urltoken[:i], token: urltoken[i+1:]}, nil + return TokenDiscoveryService{url: "https://" + urltoken[:i], token: urltoken[i+1:], heartbeat: heartbeat}, nil } return TokenDiscoveryService{url: DISCOVERY_URL, token: urltoken}, nil @@ -62,8 +63,8 @@ func (s TokenDiscoveryService) Fetch() ([]*discovery.Node, error) { return nodes, nil } -func (s TokenDiscoveryService) Watch(heartbeat int) <-chan time.Time { - return time.Tick(time.Duration(heartbeat) * time.Second) +func (s TokenDiscoveryService) Watch() <-chan time.Time { + return time.Tick(time.Duration(s.heartbeat) * time.Second) } // RegisterNode adds a new node identified by the into the discovery service diff --git a/discovery/token/token_test.go b/discovery/token/token_test.go index 8b8a7ae945..22aa2cc179 100644 --- a/discovery/token/token_test.go +++ b/discovery/token/token_test.go @@ -7,13 +7,13 @@ import ( ) func TestInit(t *testing.T) { - discovery, _ := Init("token") + discovery, _ := Init("token", 0) if dtoken, ok := discovery.(TokenDiscoveryService); ok { assert.Equal(t, dtoken.token, "token") assert.Equal(t, dtoken.url, DISCOVERY_URL) } - discovery, _ = Init("custom/path/token") + discovery, _ = Init("custom/path/token", 0) if dtoken, ok := discovery.(TokenDiscoveryService); ok { assert.Equal(t, dtoken.token, "token") assert.Equal(t, dtoken.url, "https://custom/path") diff --git a/join.go b/join.go index 432381b16c..077c830b2a 100644 --- a/join.go +++ b/join.go @@ -14,7 +14,7 @@ func join(c *cli.Context) { log.Fatal("--discovery required to join a cluster") } - d, err := discovery.New(c.String("discovery")) + d, err := discovery.New(c.String("discovery"), c.Int("heartbeat")) if err != nil { log.Fatal(err) } @@ -23,7 +23,7 @@ func join(c *cli.Context) { log.Fatal(err) } - hb := time.Duration(c.Int("heartbeat")) + hb := c.Duration("heartbeat") for { time.Sleep(hb * time.Second) if err := d.Register(c.String("addr")); err != nil { diff --git a/main.go b/main.go index 9a0fce159d..3734766d50 100644 --- a/main.go +++ b/main.go @@ -102,7 +102,7 @@ func main() { log.Fatal("--discovery required to list a cluster") } - d, err := discovery.New(c.String("discovery")) + d, err := discovery.New(c.String("discovery"), 0) if err != nil { log.Fatal(err) } diff --git a/manage.go b/manage.go index 0ce8b847d8..ce131e4ff7 100644 --- a/manage.go +++ b/manage.go @@ -94,7 +94,7 @@ func manage(c *cli.Context) { go func() { if c.String("discovery") != "" { - d, err := discovery.New(c.String("discovery")) + d, err := discovery.New(c.String("discovery"), c.Int("heartbeat")) if err != nil { log.Fatal(err) } @@ -107,7 +107,7 @@ func manage(c *cli.Context) { refresh(cluster, nodes) go func() { - for _ = range d.Watch(c.Int("heartbeat")) { + for _ = range d.Watch() { nodes, err = d.Fetch() if err == nil { refresh(cluster, nodes) From 1f9eac7fd1062b9fb6d6e0d31c4c7bac53f6f928 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Thu, 11 Dec 2014 23:23:10 +0000 Subject: [PATCH 11/13] Initialize in the interface Signed-off-by: Victor Vieux --- discovery/discovery.go | 17 +++++++++-------- discovery/etcd/etcd.go | 32 ++++++++++++++++++++++---------- discovery/file/file.go | 18 ++++++++++++------ discovery/token/token.go | 31 +++++++++++++++++-------------- discovery/token/token_test.go | 17 +++++++---------- join.go | 2 +- main.go | 2 +- 7 files changed, 69 insertions(+), 50 deletions(-) diff --git a/discovery/discovery.go b/discovery/discovery.go index cda76167b9..479b339052 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -10,8 +10,6 @@ import ( log "github.com/Sirupsen/logrus" ) -type InitFunc func(url string, heartbeat int) (DiscoveryService, error) - type Node struct { url string } @@ -28,26 +26,27 @@ func (n Node) String() string { } type DiscoveryService interface { + Initialize(string, int) error Fetch() ([]*Node, error) Watch() <-chan time.Time Register(string) error } var ( - discoveries map[string]InitFunc + discoveries map[string]func() DiscoveryService ErrNotSupported = errors.New("discovery service not supported") ) func init() { - discoveries = make(map[string]InitFunc) + discoveries = make(map[string]func() DiscoveryService) } -func Register(scheme string, initFunc InitFunc) error { +func Register(scheme string, f func() DiscoveryService) error { if _, exists := discoveries[scheme]; exists { return fmt.Errorf("scheme already registered %s", scheme) } log.Debugf("Registering %q discovery service", scheme) - discoveries[scheme] = initFunc + discoveries[scheme] = f return nil } @@ -58,9 +57,11 @@ func New(rawurl string, heartbeat int) (DiscoveryService, error) { return nil, err } - if initFct, exists := discoveries[url.Scheme]; exists { + if f, exists := discoveries[url.Scheme]; exists { log.Debugf("Initialising %q discovery service with %q", url.Scheme, url.Host+url.Path) - return initFct(url.Host+url.Path, heartbeat) + discovery := f() + err := discovery.Initialize(url.Host+url.Path, heartbeat) + return discovery, err } return nil, ErrNotSupported diff --git a/discovery/etcd/etcd.go b/discovery/etcd/etcd.go index 3257a70c5d..0ba65fddb2 100644 --- a/discovery/etcd/etcd.go +++ b/discovery/etcd/etcd.go @@ -17,28 +17,40 @@ type EtcdDiscoveryService struct { } func init() { - discovery.Register("etcd", Init) + discovery.Register("etcd", + func() discovery.DiscoveryService { + return &EtcdDiscoveryService{} + }, + ) } -func Init(uris string, heartbeat int) (discovery.DiscoveryService, error) { +func (s *EtcdDiscoveryService) Initialize(uris string, heartbeat int) error { var ( // split here because uris can contain multiples ips // like `etcd://192.168.0.1,192.168.0.2,192.168.0.3/path` parts = strings.SplitN(uris, "/", 2) ips = strings.Split(parts[0], ",") machines []string - path = "/" + parts[1] + "/" ) for _, ip := range ips { machines = append(machines, "http://"+ip) } - client := etcd.NewClient(machines) - ttl := uint64(heartbeat * 3 / 2) - client.CreateDir(path, ttl) // skip error check error because it might already exists - return EtcdDiscoveryService{client: client, path: path, ttl: ttl}, nil + s.client = etcd.NewClient(machines) + s.ttl = uint64(heartbeat * 3 / 2) + s.path = "/" + parts[1] + "/" + if _, err := s.client.CreateDir(s.path, s.ttl); err != nil { + if etcdError, ok := err.(*etcd.EtcdError); ok { + if etcdError.ErrorCode != 105 { // skip key already exists + return err + } + } else { + return err + } + } + return nil } -func (s EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) { +func (s *EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) { resp, err := s.client.Get(s.path, true, true) if err != nil { return nil, err @@ -52,7 +64,7 @@ func (s EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) { return nodes, nil } -func (s EtcdDiscoveryService) Watch() <-chan time.Time { +func (s *EtcdDiscoveryService) Watch() <-chan time.Time { watchChan := make(chan *etcd.Response) timeChan := make(chan time.Time) go s.client.Watch(s.path, 0, true, watchChan, nil) @@ -66,7 +78,7 @@ func (s EtcdDiscoveryService) Watch() <-chan time.Time { return timeChan } -func (s EtcdDiscoveryService) Register(addr string) error { +func (s *EtcdDiscoveryService) Register(addr string) error { _, err := s.client.Set(path.Join(s.path, addr), addr, s.ttl) return err } diff --git a/discovery/file/file.go b/discovery/file/file.go index 4f86db19bc..d7c1a7cf8a 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -15,14 +15,20 @@ type FileDiscoveryService struct { } func init() { - discovery.Register("file", Init) + discovery.Register("file", + func() discovery.DiscoveryService { + return &FileDiscoveryService{} + }, + ) } -func Init(file string, heartbeat int) (discovery.DiscoveryService, error) { - return FileDiscoveryService{path: file, heartbeat: heartbeat}, nil +func (s *FileDiscoveryService) Initialize(path string, heartbeat int) error { + s.path = path + s.heartbeat = heartbeat + return nil } -func (s FileDiscoveryService) Fetch() ([]*discovery.Node, error) { +func (s *FileDiscoveryService) Fetch() ([]*discovery.Node, error) { data, err := ioutil.ReadFile(s.path) if err != nil { return nil, err @@ -38,10 +44,10 @@ func (s FileDiscoveryService) Fetch() ([]*discovery.Node, error) { return nodes, nil } -func (s FileDiscoveryService) Watch() <-chan time.Time { +func (s *FileDiscoveryService) Watch() <-chan time.Time { return time.Tick(time.Duration(s.heartbeat) * time.Second) } -func (s FileDiscoveryService) Register(addr string) error { +func (s *FileDiscoveryService) Register(addr string) error { return errors.New("unimplemented") } diff --git a/discovery/token/token.go b/discovery/token/token.go index fa53047941..5c270df0bb 100644 --- a/discovery/token/token.go +++ b/discovery/token/token.go @@ -20,25 +20,28 @@ type TokenDiscoveryService struct { } func init() { - discovery.Register("token", Init) + discovery.Register("token", + func() discovery.DiscoveryService { + return &TokenDiscoveryService{} + }, + ) } -func Init(urltoken string, heartbeat int) (discovery.DiscoveryService, error) { +func (s *TokenDiscoveryService) Initialize(urltoken string, heartbeat int) error { if i := strings.LastIndex(urltoken, "/"); i != -1 { - return TokenDiscoveryService{url: "https://" + urltoken[:i], token: urltoken[i+1:], heartbeat: heartbeat}, nil + s.url = "https://" + urltoken[:i] + s.token = urltoken[i+1:] + } else { + s.url = DISCOVERY_URL + s.token = urltoken } + s.heartbeat = heartbeat - return TokenDiscoveryService{url: DISCOVERY_URL, token: urltoken}, nil -} -func New(url string) *TokenDiscoveryService { - if url != "" { - return &TokenDiscoveryService{url: url} - } - return &TokenDiscoveryService{url: DISCOVERY_URL} + return nil } // FetchNodes returns the node for the discovery service at the specified endpoint -func (s TokenDiscoveryService) Fetch() ([]*discovery.Node, error) { +func (s *TokenDiscoveryService) Fetch() ([]*discovery.Node, error) { resp, err := http.Get(fmt.Sprintf("%s/%s/%s", s.url, "clusters", s.token)) if err != nil { return nil, err @@ -63,12 +66,12 @@ func (s TokenDiscoveryService) Fetch() ([]*discovery.Node, error) { return nodes, nil } -func (s TokenDiscoveryService) Watch() <-chan time.Time { +func (s *TokenDiscoveryService) Watch() <-chan time.Time { return time.Tick(time.Duration(s.heartbeat) * time.Second) } // RegisterNode adds a new node identified by the into the discovery service -func (s TokenDiscoveryService) Register(addr string) error { +func (s *TokenDiscoveryService) Register(addr string) error { buf := strings.NewReader(addr) _, err := http.Post(fmt.Sprintf("%s/%s/%s", s.url, @@ -77,7 +80,7 @@ func (s TokenDiscoveryService) Register(addr string) error { } // CreateCluster returns a unique cluster token -func (s TokenDiscoveryService) CreateCluster() (string, error) { +func (s *TokenDiscoveryService) CreateCluster() (string, error) { resp, err := http.Post(fmt.Sprintf("%s/%s", s.url, "clusters"), "", nil) if err != nil { return "", err diff --git a/discovery/token/token_test.go b/discovery/token/token_test.go index 22aa2cc179..c5d49ec6c6 100644 --- a/discovery/token/token_test.go +++ b/discovery/token/token_test.go @@ -7,17 +7,14 @@ import ( ) func TestInit(t *testing.T) { - discovery, _ := Init("token", 0) - if dtoken, ok := discovery.(TokenDiscoveryService); ok { - assert.Equal(t, dtoken.token, "token") - assert.Equal(t, dtoken.url, DISCOVERY_URL) - } + discovery := &TokenDiscoveryService{} + discovery.Initialize("token", 0) + assert.Equal(t, discovery.token, "token") + assert.Equal(t, discovery.url, DISCOVERY_URL) - discovery, _ = Init("custom/path/token", 0) - if dtoken, ok := discovery.(TokenDiscoveryService); ok { - assert.Equal(t, dtoken.token, "token") - assert.Equal(t, dtoken.url, "https://custom/path") - } + discovery.Initialize("custom/path/token", 0) + assert.Equal(t, discovery.token, "token") + assert.Equal(t, discovery.url, "https://custom/path") } func TestRegister(t *testing.T) { diff --git a/join.go b/join.go index 077c830b2a..5caca2d8e7 100644 --- a/join.go +++ b/join.go @@ -23,7 +23,7 @@ func join(c *cli.Context) { log.Fatal(err) } - hb := c.Duration("heartbeat") + hb := time.Duration(c.Int("heartbeat")) for { time.Sleep(hb * time.Second) if err := d.Register(c.String("addr")); err != nil { diff --git a/main.go b/main.go index 3734766d50..332dbada1d 100644 --- a/main.go +++ b/main.go @@ -85,7 +85,7 @@ func main() { ShortName: "c", Usage: "create a cluster", Action: func(c *cli.Context) { - token, err := token.New("").CreateCluster() + token, err := (&token.TokenDiscoveryService{}).CreateCluster() if err != nil { log.Fatal(err) } From 008398cc996e75982d2a9a2962e24159eb26790a Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Thu, 11 Dec 2014 23:30:35 +0000 Subject: [PATCH 12/13] update READMEs Signed-off-by: Victor Vieux --- README.md | 49 ++------------------------ discovery/README.md | 85 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 87 insertions(+), 47 deletions(-) diff --git a/README.md b/README.md index ec232fbcea..7ae5ec2756 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ This can be achieved by starting Docker with the `-H` flag (e.g. `-H 0.0.0.0:237 Currently, nodes must be running the Docker **master** version. Master binaries are available here: https://master.dockerproject.com/ -### Example usage: using the hosted discovery service +### Example usage ```bash # create a cluster @@ -51,51 +51,8 @@ $ swarm list --discovery token://6856663cdefdec325839a4b7e1de38e8 http:// ``` -### Example usage: using a static file describing the cluster - -```bash -# for each of your nodes, add a line to a file -# doesn't have to be public (eg. 192.168.0.X), -# as long as the other nodes can reach it, it is fine. -$ echo >> /tmp/my_cluster - -# start the manager on any machine or your laptop -$ swarm manage --discovery file:///tmp/my_cluster --addr= - -# use the regular docker cli -$ docker -H info -$ docker -H run ... -$ docker -H ps -$ docker -H logs ... -... - -# list nodes in your cluster -$ swarm list --discovery file:///tmp/my_cluster -http:// -``` - -### Example usage: using etcd - -```bash -# on each of your nodes, start the swarm agent -# doesn't have to be public (eg. 192.168.0.X), -# as long as the other nodes can reach it, it is fine. -$ swarm join --discovery etcd:///>path> --addr= - -# start the manager on any machine or your laptop -$ swarm manage --discovery etcd:///>path> --addr= - -# use the regular docker cli -$ docker -H info -$ docker -H run ... -$ docker -H ps -$ docker -H logs ... -... - -# list nodes in your cluster -$ swarm list --discovery etcd:///>path> -http:// -``` +See [here](https://github.com/docker/swarm/discovery) for more information about +other discovery services. ### TLS diff --git a/discovery/README.md b/discovery/README.md index 34c1a78ea0..7d3a69c4a3 100644 --- a/discovery/README.md +++ b/discovery/README.md @@ -1,17 +1,100 @@ Discovery ========= +`Docker Swarm` comes with multiple Discovery backends + +## Examples + +##### Using the hosted discovery service + +```bash +# create a cluster +$ swarm create +6856663cdefdec325839a4b7e1de38e8 + +# on each of your nodes, start the swarm agent +# doesn't have to be public (eg. 192.168.0.X), +# as long as the other nodes can reach it, it is fine. +$ swarm join --discovery token://6856663cdefdec325839a4b7e1de38e8 --addr= + +# start the manager on any machine or your laptop +$ swarm manage --discovery token://6856663cdefdec325839a4b7e1de38e8 --addr= + +# use the regular docker cli +$ docker -H info +$ docker -H run ... +$ docker -H ps +$ docker -H logs ... +... + +# list nodes in your cluster +$ swarm list --discovery token://6856663cdefdec325839a4b7e1de38e8 +http:// +``` + +###### Using a static file describing the cluster + +```bash +# for each of your nodes, add a line to a file +# doesn't have to be public (eg. 192.168.0.X), +# as long as the other nodes can reach it, it is fine. +$ echo >> /tmp/my_cluster + +# start the manager on any machine or your laptop +$ swarm manage --discovery file:///tmp/my_cluster --addr= + +# use the regular docker cli +$ docker -H info +$ docker -H run ... +$ docker -H ps +$ docker -H logs ... +... + +# list nodes in your cluster +$ swarm list --discovery file:///tmp/my_cluster +http:// +``` + +###### Using etcd + +```bash +# on each of your nodes, start the swarm agent +# doesn't have to be public (eg. 192.168.0.X), +# as long as the other nodes can reach it, it is fine. +$ swarm join --discovery etcd:///>path> --addr= + +# start the manager on any machine or your laptop +$ swarm manage --discovery etcd:///>path> --addr= + +# use the regular docker cli +$ docker -H info +$ docker -H run ... +$ docker -H ps +$ docker -H logs ... +... + +# list nodes in your cluster +$ swarm list --discovery etcd:///>path> +http:// +``` + +## Contributing + Contributing a new discovery backend is easy, simply implements this interface: ```go type DiscoveryService interface { + Initialize(string, int) error Fetch() ([]string, error) - Watch(int) <-chan time.Time + Watch() <-chan time.Time Register(string) error } ``` +######Initialize +take the `--dicovery` withtout the scheme and a heartbeat (in seconds) + ######Fetch returns the list of all the nodes from the discovery From 4cd12777c401abc33527e0882e1812e6d6b5ddd8 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Fri, 12 Dec 2014 18:50:56 +0000 Subject: [PATCH 13/13] update ROADMAP.md Signed-off-by: Victor Vieux --- ROADMAP.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ROADMAP.md b/ROADMAP.md index 79af95eace..4afadaea0c 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -18,4 +18,8 @@ Docker Swarm Roadmap ####Extensibility * [ ] pluggable scheduler -* [ ] discovery backends (etcd / zookeeper / hub...) +* [ ] discovery backends + * [x] etcd + * [ ] zookeeper + * [x] hub + * [x] file \ No newline at end of file