diff --git a/README.md b/README.md index 00bc8535d0..db72ab557d 100644 --- a/README.md +++ b/README.md @@ -31,10 +31,10 @@ $ swarm create # on each of your nodes, start the swarm agent # doesn't have to be public (eg. 192.168.0.X), # as long as the other nodes can reach it, it is fine. -$ swarm join --token=6856663cdefdec325839a4b7e1de38e8 --addr= +$ swarm join --discovery token://6856663cdefdec325839a4b7e1de38e8 --addr= # start the manager on any machine or your laptop -$ swarm manage --token=6856663cdefdec325839a4b7e1de38e8 --addr= +$ swarm manage --discovery token://6856663cdefdec325839a4b7e1de38e8 --addr= # use the regular docker cli $ docker -H info @@ -44,10 +44,13 @@ $ docker -H logs ... ... # list nodes in your cluster -$ swarm list --token=6856663cdefdec325839a4b7e1de38e8 +$ swarm list --discovery token://6856663cdefdec325839a4b7e1de38e8 http:// ``` +See [here](https://github.com/docker/swarm/discovery) for more information about +other discovery services. + ### TLS Swarm supports TLS authentication between the CLI and Swarm but also between Swarm and the Docker nodes. diff --git a/ROADMAP.md b/ROADMAP.md index 79af95eace..4afadaea0c 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -18,4 +18,8 @@ Docker Swarm Roadmap ####Extensibility * [ ] pluggable scheduler -* [ ] discovery backends (etcd / zookeeper / hub...) +* [ ] discovery backends + * [x] etcd + * [ ] zookeeper + * [x] hub + * [x] file \ No newline at end of file diff --git a/discovery/README.md b/discovery/README.md index e322c5a1be..7d3a69c4a3 100644 --- a/discovery/README.md +++ b/discovery/README.md @@ -1,31 +1,107 @@ -#discovery.hub.docker.com +Discovery +========= -Docker Swarm comes with a simple discovery service built into the [Docker Hub](http://hub.docker.com) +`Docker Swarm` comes with multiple Discovery backends -The discovery service is still in alpha stage and currently hosted at `http://discovery-stage.hub.docker.com` +## Examples -#####Create a new cluster -`-> POST http://discovery.hub.docker.com/v1/clusters (data="")` +##### Using the hosted discovery service -`<- ` +```bash +# create a cluster +$ swarm create +6856663cdefdec325839a4b7e1de38e8 -#####Add new nodes to a cluster -`-> POST http://discovery.hub.docker.com/v1/clusters/ (data="")` +# on each of your nodes, start the swarm agent +# doesn't have to be public (eg. 192.168.0.X), +# as long as the other nodes can reach it, it is fine. +$ swarm join --discovery token://6856663cdefdec325839a4b7e1de38e8 --addr= -`<- OK` +# start the manager on any machine or your laptop +$ swarm manage --discovery token://6856663cdefdec325839a4b7e1de38e8 --addr= -`-> POST http://discovery.hub.docker.com/v1/clusters/token (data="")` +# use the regular docker cli +$ docker -H info +$ docker -H run ... +$ docker -H ps +$ docker -H logs ... +... -`<- OK` +# list nodes in your cluster +$ swarm list --discovery token://6856663cdefdec325839a4b7e1de38e8 +http:// +``` +###### Using a static file describing the cluster -#####List nodes in a cluster -`-> GET http://discovery.hub.docker.com/v1/clusters/token` +```bash +# for each of your nodes, add a line to a file +# doesn't have to be public (eg. 192.168.0.X), +# as long as the other nodes can reach it, it is fine. +$ echo >> /tmp/my_cluster -`<- ["", ""]` +# start the manager on any machine or your laptop +$ swarm manage --discovery file:///tmp/my_cluster --addr= +# use the regular docker cli +$ docker -H info +$ docker -H run ... +$ docker -H ps +$ docker -H logs ... +... -#####Delete a cluster (all the nodes in a cluster) -`-> DELETE http://discovery.hub.docker.com/v1/clusters/token` +# list nodes in your cluster +$ swarm list --discovery file:///tmp/my_cluster +http:// +``` -`<- OK` +###### Using etcd + +```bash +# on each of your nodes, start the swarm agent +# doesn't have to be public (eg. 192.168.0.X), +# as long as the other nodes can reach it, it is fine. +$ swarm join --discovery etcd:///>path> --addr= + +# start the manager on any machine or your laptop +$ swarm manage --discovery etcd:///>path> --addr= + +# use the regular docker cli +$ docker -H info +$ docker -H run ... +$ docker -H ps +$ docker -H logs ... +... + +# list nodes in your cluster +$ swarm list --discovery etcd:///>path> +http:// +``` + +## Contributing + +Contributing a new discovery backend is easy, +simply implements this interface: + +```go +type DiscoveryService interface { + Initialize(string, int) error + Fetch() ([]string, error) + Watch() <-chan time.Time + Register(string) error +} +``` + +######Initialize +take the `--dicovery` withtout the scheme and a heartbeat (in seconds) + +######Fetch +returns the list of all the nodes from the discovery + +######Watch +triggers when you need to update (`Fetch`) the list of nodes, +it can happen either via un timer (like `token`) or use +backend specific features (like `etcd`) + +######Register +add a new node to the discovery diff --git a/discovery/client.go b/discovery/client.go deleted file mode 100644 index b852bfe6ca..0000000000 --- a/discovery/client.go +++ /dev/null @@ -1,51 +0,0 @@ -package discovery - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - "strings" -) - -const DISCOVERY_URL = "https://discovery-stage.hub.docker.com/v1" - -// FetchSlaves returns the slaves for the discovery service at the specified endpoint -func FetchSlaves(token string) ([]string, error) { - resp, err := http.Get(fmt.Sprintf("%s/%s/%s", DISCOVERY_URL, "clusters", token)) - if err != nil { - return nil, err - } - - if resp.Body != nil { - defer resp.Body.Close() - } - - var addrs []string - if resp.StatusCode == http.StatusOK { - if err := json.NewDecoder(resp.Body).Decode(&addrs); err != nil { - return nil, err - } - } - - return addrs, nil -} - -// RegisterSlave adds a new slave identified by the slaveID into the discovery service -// the default TTL is 30 secs -func RegisterSlave(addr, token string) error { - buf := strings.NewReader(addr) - - _, err := http.Post(fmt.Sprintf("%s/%s/%s", DISCOVERY_URL, "clusters", token), "application/json", buf) - return err -} - -// CreateCluster returns a unique cluster token -func CreateCluster() (string, error) { - resp, err := http.Post(fmt.Sprintf("%s/%s", DISCOVERY_URL, "clusters"), "", nil) - if err != nil { - return "", err - } - token, err := ioutil.ReadAll(resp.Body) - return string(token), err -} diff --git a/discovery/client_test.go b/discovery/client_test.go deleted file mode 100644 index dd12a3f7b7..0000000000 --- a/discovery/client_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package discovery - -import "testing" - -func TestRegister(t *testing.T) { - expected := "127.0.0.1:2675" - if err := RegisterSlave(expected, "TEST_TOKEN"); err != nil { - t.Fatal(err) - } - - addrs, err := FetchSlaves("TEST_TOKEN") - if err != nil { - t.Fatal(err) - } - - if len(addrs) != 1 { - t.Fatalf("expected addr len == 1, got len = %d", len(addrs)) - } - - if addrs[0] != expected { - t.Fatalf("expected addr %q but received %q", expected, addrs[0]) - } - - if err = RegisterSlave(expected, "TEST_TOKEN"); err != nil { - t.Fatal(err) - } -} diff --git a/discovery/discovery.go b/discovery/discovery.go new file mode 100644 index 0000000000..479b339052 --- /dev/null +++ b/discovery/discovery.go @@ -0,0 +1,68 @@ +package discovery + +import ( + "errors" + "fmt" + "net/url" + "strings" + "time" + + log "github.com/Sirupsen/logrus" +) + +type Node struct { + url string +} + +func NewNode(url string) *Node { + if !strings.Contains(url, "://") { + url = "http://" + url + } + return &Node{url: url} +} + +func (n Node) String() string { + return n.url +} + +type DiscoveryService interface { + Initialize(string, int) error + Fetch() ([]*Node, error) + Watch() <-chan time.Time + Register(string) error +} + +var ( + discoveries map[string]func() DiscoveryService + ErrNotSupported = errors.New("discovery service not supported") +) + +func init() { + discoveries = make(map[string]func() DiscoveryService) +} + +func Register(scheme string, f func() DiscoveryService) error { + if _, exists := discoveries[scheme]; exists { + return fmt.Errorf("scheme already registered %s", scheme) + } + log.Debugf("Registering %q discovery service", scheme) + discoveries[scheme] = f + + return nil +} + +func New(rawurl string, heartbeat int) (DiscoveryService, error) { + url, err := url.Parse(rawurl) + if err != nil { + return nil, err + } + + if f, exists := discoveries[url.Scheme]; exists { + log.Debugf("Initialising %q discovery service with %q", url.Scheme, url.Host+url.Path) + discovery := f() + err := discovery.Initialize(url.Host+url.Path, heartbeat) + return discovery, err + } + + return nil, ErrNotSupported +} diff --git a/discovery/etcd/etcd.go b/discovery/etcd/etcd.go new file mode 100644 index 0000000000..0ba65fddb2 --- /dev/null +++ b/discovery/etcd/etcd.go @@ -0,0 +1,84 @@ +package etcd + +import ( + "path" + "strings" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/coreos/go-etcd/etcd" + "github.com/docker/swarm/discovery" +) + +type EtcdDiscoveryService struct { + ttl uint64 + client *etcd.Client + path string +} + +func init() { + discovery.Register("etcd", + func() discovery.DiscoveryService { + return &EtcdDiscoveryService{} + }, + ) +} + +func (s *EtcdDiscoveryService) Initialize(uris string, heartbeat int) error { + var ( + // split here because uris can contain multiples ips + // like `etcd://192.168.0.1,192.168.0.2,192.168.0.3/path` + parts = strings.SplitN(uris, "/", 2) + ips = strings.Split(parts[0], ",") + machines []string + ) + for _, ip := range ips { + machines = append(machines, "http://"+ip) + } + + s.client = etcd.NewClient(machines) + s.ttl = uint64(heartbeat * 3 / 2) + s.path = "/" + parts[1] + "/" + if _, err := s.client.CreateDir(s.path, s.ttl); err != nil { + if etcdError, ok := err.(*etcd.EtcdError); ok { + if etcdError.ErrorCode != 105 { // skip key already exists + return err + } + } else { + return err + } + } + return nil +} +func (s *EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) { + resp, err := s.client.Get(s.path, true, true) + if err != nil { + return nil, err + } + + var nodes []*discovery.Node + + for _, n := range resp.Node.Nodes { + nodes = append(nodes, discovery.NewNode(n.Value)) + } + return nodes, nil +} + +func (s *EtcdDiscoveryService) Watch() <-chan time.Time { + watchChan := make(chan *etcd.Response) + timeChan := make(chan time.Time) + go s.client.Watch(s.path, 0, true, watchChan, nil) + go func() { + for { + <-watchChan + log.Debugf("[ETCD] Watch triggered") + timeChan <- time.Now() + } + }() + return timeChan +} + +func (s *EtcdDiscoveryService) Register(addr string) error { + _, err := s.client.Set(path.Join(s.path, addr), addr, s.ttl) + return err +} diff --git a/discovery/file/file.go b/discovery/file/file.go new file mode 100644 index 0000000000..d7c1a7cf8a --- /dev/null +++ b/discovery/file/file.go @@ -0,0 +1,53 @@ +package file + +import ( + "errors" + "io/ioutil" + "strings" + "time" + + "github.com/docker/swarm/discovery" +) + +type FileDiscoveryService struct { + heartbeat int + path string +} + +func init() { + discovery.Register("file", + func() discovery.DiscoveryService { + return &FileDiscoveryService{} + }, + ) +} + +func (s *FileDiscoveryService) Initialize(path string, heartbeat int) error { + s.path = path + s.heartbeat = heartbeat + return nil +} + +func (s *FileDiscoveryService) Fetch() ([]*discovery.Node, error) { + data, err := ioutil.ReadFile(s.path) + if err != nil { + return nil, err + } + + var nodes []*discovery.Node + + for _, line := range strings.Split(string(data), "\n") { + if line != "" { + nodes = append(nodes, discovery.NewNode(line)) + } + } + return nodes, nil +} + +func (s *FileDiscoveryService) Watch() <-chan time.Time { + return time.Tick(time.Duration(s.heartbeat) * time.Second) +} + +func (s *FileDiscoveryService) Register(addr string) error { + return errors.New("unimplemented") +} diff --git a/discovery/token/README.md b/discovery/token/README.md new file mode 100644 index 0000000000..e322c5a1be --- /dev/null +++ b/discovery/token/README.md @@ -0,0 +1,31 @@ +#discovery.hub.docker.com + +Docker Swarm comes with a simple discovery service built into the [Docker Hub](http://hub.docker.com) + +The discovery service is still in alpha stage and currently hosted at `http://discovery-stage.hub.docker.com` + +#####Create a new cluster +`-> POST http://discovery.hub.docker.com/v1/clusters (data="")` + +`<- ` + +#####Add new nodes to a cluster +`-> POST http://discovery.hub.docker.com/v1/clusters/ (data="")` + +`<- OK` + +`-> POST http://discovery.hub.docker.com/v1/clusters/token (data="")` + +`<- OK` + + +#####List nodes in a cluster +`-> GET http://discovery.hub.docker.com/v1/clusters/token` + +`<- ["", ""]` + + +#####Delete a cluster (all the nodes in a cluster) +`-> DELETE http://discovery.hub.docker.com/v1/clusters/token` + +`<- OK` diff --git a/discovery/token/token.go b/discovery/token/token.go new file mode 100644 index 0000000000..5c270df0bb --- /dev/null +++ b/discovery/token/token.go @@ -0,0 +1,90 @@ +package token + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" + "time" + + "github.com/docker/swarm/discovery" +) + +const DISCOVERY_URL = "https://discovery-stage.hub.docker.com/v1" + +type TokenDiscoveryService struct { + heartbeat int + url string + token string +} + +func init() { + discovery.Register("token", + func() discovery.DiscoveryService { + return &TokenDiscoveryService{} + }, + ) +} + +func (s *TokenDiscoveryService) Initialize(urltoken string, heartbeat int) error { + if i := strings.LastIndex(urltoken, "/"); i != -1 { + s.url = "https://" + urltoken[:i] + s.token = urltoken[i+1:] + } else { + s.url = DISCOVERY_URL + s.token = urltoken + } + s.heartbeat = heartbeat + + return nil +} + +// FetchNodes returns the node for the discovery service at the specified endpoint +func (s *TokenDiscoveryService) Fetch() ([]*discovery.Node, error) { + resp, err := http.Get(fmt.Sprintf("%s/%s/%s", s.url, "clusters", s.token)) + if err != nil { + return nil, err + } + + if resp.Body != nil { + defer resp.Body.Close() + } + + var addrs []string + if resp.StatusCode == http.StatusOK { + if err := json.NewDecoder(resp.Body).Decode(&addrs); err != nil { + return nil, err + } + } + + var nodes []*discovery.Node + for _, addr := range addrs { + nodes = append(nodes, discovery.NewNode(addr)) + } + + return nodes, nil +} + +func (s *TokenDiscoveryService) Watch() <-chan time.Time { + return time.Tick(time.Duration(s.heartbeat) * time.Second) +} + +// RegisterNode adds a new node identified by the into the discovery service +func (s *TokenDiscoveryService) Register(addr string) error { + buf := strings.NewReader(addr) + + _, err := http.Post(fmt.Sprintf("%s/%s/%s", s.url, + "clusters", s.token), "application/json", buf) + return err +} + +// CreateCluster returns a unique cluster token +func (s *TokenDiscoveryService) CreateCluster() (string, error) { + resp, err := http.Post(fmt.Sprintf("%s/%s", s.url, "clusters"), "", nil) + if err != nil { + return "", err + } + token, err := ioutil.ReadAll(resp.Body) + return string(token), err +} diff --git a/discovery/token/token_test.go b/discovery/token/token_test.go new file mode 100644 index 0000000000..c5d49ec6c6 --- /dev/null +++ b/discovery/token/token_test.go @@ -0,0 +1,31 @@ +package token + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestInit(t *testing.T) { + discovery := &TokenDiscoveryService{} + discovery.Initialize("token", 0) + assert.Equal(t, discovery.token, "token") + assert.Equal(t, discovery.url, DISCOVERY_URL) + + discovery.Initialize("custom/path/token", 0) + assert.Equal(t, discovery.token, "token") + assert.Equal(t, discovery.url, "https://custom/path") +} + +func TestRegister(t *testing.T) { + discovery := TokenDiscoveryService{token: "TEST_TOKEN", url: DISCOVERY_URL} + expected := "127.0.0.1:2675" + assert.NoError(t, discovery.Register(expected)) + + addrs, err := discovery.Fetch() + assert.NoError(t, err) + assert.Equal(t, len(addrs), 1) + assert.Equal(t, addrs[0].String(), "http://"+expected) + + assert.NoError(t, discovery.Register(expected)) +} diff --git a/join.go b/join.go index f4d0cd1294..5caca2d8e7 100644 --- a/join.go +++ b/join.go @@ -10,18 +10,23 @@ import ( func join(c *cli.Context) { - if c.String("token") == "" { - log.Fatal("--token required to join a cluster") + if c.String("discovery") == "" { + log.Fatal("--discovery required to join a cluster") } - if err := discovery.RegisterSlave(c.String("addr"), c.String("token")); err != nil { + d, err := discovery.New(c.String("discovery"), c.Int("heartbeat")) + if err != nil { + log.Fatal(err) + } + + if err := d.Register(c.String("addr")); err != nil { log.Fatal(err) } hb := time.Duration(c.Int("heartbeat")) for { time.Sleep(hb * time.Second) - if err := discovery.RegisterSlave(c.String("addr"), c.String("token")); err != nil { + if err := d.Register(c.String("addr")); err != nil { log.Error(err) } } diff --git a/main.go b/main.go index 5673a10194..332dbada1d 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,11 @@ import ( log "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" + "github.com/docker/swarm/discovery" + _ "github.com/docker/swarm/discovery/etcd" + _ "github.com/docker/swarm/discovery/file" + "github.com/docker/swarm/discovery/token" ) func main() { @@ -33,11 +37,11 @@ func main() { } // flags - flToken := cli.StringFlag{ - Name: "token", + flDiscovery := cli.StringFlag{ + Name: "discovery", Value: "", - Usage: "cluster token", - EnvVar: "SWARM_TOKEN", + Usage: "token://, file://path/to/file", + EnvVar: "SWARM_DISCOVERY", } flAddr := cli.StringFlag{ Name: "addr", @@ -81,7 +85,7 @@ func main() { ShortName: "c", Usage: "create a cluster", Action: func(c *cli.Context) { - token, err := discovery.CreateCluster() + token, err := (&token.TokenDiscoveryService{}).CreateCluster() if err != nil { log.Fatal(err) } @@ -92,13 +96,18 @@ func main() { Name: "list", ShortName: "l", Usage: "list nodes in a cluster", - Flags: []cli.Flag{flToken}, + Flags: []cli.Flag{flDiscovery}, Action: func(c *cli.Context) { - if c.String("token") == "" { - log.Fatal("--token required to list a cluster") + if c.String("discovery") == "" { + log.Fatal("--discovery required to list a cluster") } - nodes, err := discovery.FetchSlaves(c.String("token")) + d, err := discovery.New(c.String("discovery"), 0) + if err != nil { + log.Fatal(err) + } + + nodes, err := d.Fetch() if err != nil { log.Fatal(err) } @@ -112,7 +121,7 @@ func main() { ShortName: "m", Usage: "manage a docker cluster", Flags: []cli.Flag{ - flToken, flAddr, flHeartBeat, + flDiscovery, flAddr, flHeartBeat, flTls, flTlsCaCert, flTlsCert, flTlsKey, flTlsVerify, flEnableCors}, Action: manage, @@ -121,7 +130,7 @@ func main() { Name: "join", ShortName: "j", Usage: "join a docker cluster", - Flags: []cli.Flag{flToken, flAddr, flHeartBeat}, + Flags: []cli.Flag{flDiscovery, flAddr, flHeartBeat}, Action: join, }, } diff --git a/manage.go b/manage.go index 8375152250..ce131e4ff7 100644 --- a/manage.go +++ b/manage.go @@ -5,8 +5,6 @@ import ( "crypto/x509" "fmt" "io/ioutil" - "strings" - "time" log "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" @@ -73,14 +71,11 @@ func manage(c *cli.Context) { } } - refresh := func(c *cluster.Cluster, nodes []string) { + refresh := func(c *cluster.Cluster, nodes []*discovery.Node) { for _, addr := range nodes { - go func(addr string) { - if !strings.Contains(addr, "://") { - addr = "http://" + addr - } - if c.Node(addr) == nil { - n := cluster.NewNode(addr) + go func(node *discovery.Node) { + if c.Node(node.String()) == nil { + n := cluster.NewNode(node.String()) if err := n.Connect(tlsConfig); err != nil { log.Error(err) return @@ -98,26 +93,33 @@ func manage(c *cli.Context) { cluster.Events(&logHandler{}) go func() { - if c.String("token") != "" { - nodes, err := discovery.FetchSlaves(c.String("token")) + if c.String("discovery") != "" { + d, err := discovery.New(c.String("discovery"), c.Int("heartbeat")) + if err != nil { + log.Fatal(err) + } + + nodes, err := d.Fetch() if err != nil { log.Fatal(err) } refresh(cluster, nodes) - hb := time.Duration(c.Int("heartbeat")) go func() { - for { - time.Sleep(hb * time.Second) - nodes, err = discovery.FetchSlaves(c.String("token")) + for _ = range d.Watch() { + nodes, err = d.Fetch() if err == nil { refresh(cluster, nodes) } } }() } else { - refresh(cluster, c.Args()) + var nodes []*discovery.Node + for _, arg := range c.Args() { + nodes = append(nodes, discovery.NewNode(arg)) + } + refresh(cluster, nodes) } }()