Merge pull request #164 from vieux/discovery

Discovery Service backends token/etcd/file
This commit is contained in:
Andrea Luzzardi 2014-12-12 12:04:47 -08:00
commit c85b0a9c39
14 changed files with 508 additions and 130 deletions

View File

@ -31,10 +31,10 @@ $ swarm create
# on each of your nodes, start the swarm agent
# <node_ip> doesn't have to be public (eg. 192.168.0.X),
# as long as the other nodes can reach it, it is fine.
$ swarm join --token=6856663cdefdec325839a4b7e1de38e8 --addr=<node_ip:2375>
$ swarm join --discovery token://6856663cdefdec325839a4b7e1de38e8 --addr=<node_ip:2375>
# start the manager on any machine or your laptop
$ swarm manage --token=6856663cdefdec325839a4b7e1de38e8 --addr=<swarm_ip:swarm_port>
$ swarm manage --discovery token://6856663cdefdec325839a4b7e1de38e8 --addr=<swarm_ip:swarm_port>
# use the regular docker cli
$ docker -H <swarm_ip:swarm_port> info
@ -44,10 +44,13 @@ $ docker -H <swarm_ip:swarm_port> logs ...
...
# list nodes in your cluster
$ swarm list --token=6856663cdefdec325839a4b7e1de38e8
$ swarm list --discovery token://6856663cdefdec325839a4b7e1de38e8
http://<node_ip:2375>
```
See [here](https://github.com/docker/swarm/discovery) for more information about
other discovery services.
### TLS
Swarm supports TLS authentication between the CLI and Swarm but also between Swarm and the Docker nodes.

View File

@ -18,4 +18,8 @@ Docker Swarm Roadmap
####Extensibility
* [ ] pluggable scheduler
* [ ] discovery backends (etcd / zookeeper / hub...)
* [ ] discovery backends
* [x] etcd
* [ ] zookeeper
* [x] hub
* [x] file

View File

@ -1,31 +1,107 @@
#discovery.hub.docker.com
Discovery
=========
Docker Swarm comes with a simple discovery service built into the [Docker Hub](http://hub.docker.com)
`Docker Swarm` comes with multiple Discovery backends
The discovery service is still in alpha stage and currently hosted at `http://discovery-stage.hub.docker.com`
## Examples
#####Create a new cluster
`-> POST http://discovery.hub.docker.com/v1/clusters (data="")`
##### Using the hosted discovery service
`<- <token>`
```bash
# create a cluster
$ swarm create
6856663cdefdec325839a4b7e1de38e8
#####Add new nodes to a cluster
`-> POST http://discovery.hub.docker.com/v1/clusters/<token> (data="<ip:port1>")`
# on each of your nodes, start the swarm agent
# <node_ip> doesn't have to be public (eg. 192.168.0.X),
# as long as the other nodes can reach it, it is fine.
$ swarm join --discovery token://6856663cdefdec325839a4b7e1de38e8 --addr=<node_ip:2375>
`<- OK`
# start the manager on any machine or your laptop
$ swarm manage --discovery token://6856663cdefdec325839a4b7e1de38e8 --addr=<swarm_ip:swarm_port>
`-> POST http://discovery.hub.docker.com/v1/clusters/token (data="<ip:port2>")`
# use the regular docker cli
$ docker -H <swarm_ip:swarm_port> info
$ docker -H <swarm_ip:swarm_port> run ...
$ docker -H <swarm_ip:swarm_port> ps
$ docker -H <swarm_ip:swarm_port> logs ...
...
`<- OK`
# list nodes in your cluster
$ swarm list --discovery token://6856663cdefdec325839a4b7e1de38e8
http://<node_ip:2375>
```
###### Using a static file describing the cluster
#####List nodes in a cluster
`-> GET http://discovery.hub.docker.com/v1/clusters/token`
```bash
# for each of your nodes, add a line to a file
# <node_ip> doesn't have to be public (eg. 192.168.0.X),
# as long as the other nodes can reach it, it is fine.
$ echo <node_ip:2375> >> /tmp/my_cluster
`<- ["<ip:port1>", "<ip:port2>"]`
# start the manager on any machine or your laptop
$ swarm manage --discovery file:///tmp/my_cluster --addr=<swarm_ip:swarm_port>
# use the regular docker cli
$ docker -H <swarm_ip:swarm_port> info
$ docker -H <swarm_ip:swarm_port> run ...
$ docker -H <swarm_ip:swarm_port> ps
$ docker -H <swarm_ip:swarm_port> logs ...
...
#####Delete a cluster (all the nodes in a cluster)
`-> DELETE http://discovery.hub.docker.com/v1/clusters/token`
# list nodes in your cluster
$ swarm list --discovery file:///tmp/my_cluster
http://<node_ip:2375>
```
`<- OK`
###### Using etcd
```bash
# on each of your nodes, start the swarm agent
# <node_ip> doesn't have to be public (eg. 192.168.0.X),
# as long as the other nodes can reach it, it is fine.
$ swarm join --discovery etcd://<etcd_ip>/>path> --addr=<node_ip:2375>
# start the manager on any machine or your laptop
$ swarm manage --discovery etcd://<etcd_ip>/>path> --addr=<swarm_ip:swarm_port>
# use the regular docker cli
$ docker -H <swarm_ip:swarm_port> info
$ docker -H <swarm_ip:swarm_port> run ...
$ docker -H <swarm_ip:swarm_port> ps
$ docker -H <swarm_ip:swarm_port> logs ...
...
# list nodes in your cluster
$ swarm list --discovery etcd://<etcd_ip>/>path>
http://<node_ip:2375>
```
## Contributing
Contributing a new discovery backend is easy,
simply implements this interface:
```go
type DiscoveryService interface {
Initialize(string, int) error
Fetch() ([]string, error)
Watch() <-chan time.Time
Register(string) error
}
```
######Initialize
take the `--dicovery` withtout the scheme and a heartbeat (in seconds)
######Fetch
returns the list of all the nodes from the discovery
######Watch
triggers when you need to update (`Fetch`) the list of nodes,
it can happen either via un timer (like `token`) or use
backend specific features (like `etcd`)
######Register
add a new node to the discovery

View File

@ -1,51 +0,0 @@
package discovery
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
)
const DISCOVERY_URL = "https://discovery-stage.hub.docker.com/v1"
// FetchSlaves returns the slaves for the discovery service at the specified endpoint
func FetchSlaves(token string) ([]string, error) {
resp, err := http.Get(fmt.Sprintf("%s/%s/%s", DISCOVERY_URL, "clusters", token))
if err != nil {
return nil, err
}
if resp.Body != nil {
defer resp.Body.Close()
}
var addrs []string
if resp.StatusCode == http.StatusOK {
if err := json.NewDecoder(resp.Body).Decode(&addrs); err != nil {
return nil, err
}
}
return addrs, nil
}
// RegisterSlave adds a new slave identified by the slaveID into the discovery service
// the default TTL is 30 secs
func RegisterSlave(addr, token string) error {
buf := strings.NewReader(addr)
_, err := http.Post(fmt.Sprintf("%s/%s/%s", DISCOVERY_URL, "clusters", token), "application/json", buf)
return err
}
// CreateCluster returns a unique cluster token
func CreateCluster() (string, error) {
resp, err := http.Post(fmt.Sprintf("%s/%s", DISCOVERY_URL, "clusters"), "", nil)
if err != nil {
return "", err
}
token, err := ioutil.ReadAll(resp.Body)
return string(token), err
}

View File

@ -1,27 +0,0 @@
package discovery
import "testing"
func TestRegister(t *testing.T) {
expected := "127.0.0.1:2675"
if err := RegisterSlave(expected, "TEST_TOKEN"); err != nil {
t.Fatal(err)
}
addrs, err := FetchSlaves("TEST_TOKEN")
if err != nil {
t.Fatal(err)
}
if len(addrs) != 1 {
t.Fatalf("expected addr len == 1, got len = %d", len(addrs))
}
if addrs[0] != expected {
t.Fatalf("expected addr %q but received %q", expected, addrs[0])
}
if err = RegisterSlave(expected, "TEST_TOKEN"); err != nil {
t.Fatal(err)
}
}

68
discovery/discovery.go Normal file
View File

@ -0,0 +1,68 @@
package discovery
import (
"errors"
"fmt"
"net/url"
"strings"
"time"
log "github.com/Sirupsen/logrus"
)
type Node struct {
url string
}
func NewNode(url string) *Node {
if !strings.Contains(url, "://") {
url = "http://" + url
}
return &Node{url: url}
}
func (n Node) String() string {
return n.url
}
type DiscoveryService interface {
Initialize(string, int) error
Fetch() ([]*Node, error)
Watch() <-chan time.Time
Register(string) error
}
var (
discoveries map[string]func() DiscoveryService
ErrNotSupported = errors.New("discovery service not supported")
)
func init() {
discoveries = make(map[string]func() DiscoveryService)
}
func Register(scheme string, f func() DiscoveryService) error {
if _, exists := discoveries[scheme]; exists {
return fmt.Errorf("scheme already registered %s", scheme)
}
log.Debugf("Registering %q discovery service", scheme)
discoveries[scheme] = f
return nil
}
func New(rawurl string, heartbeat int) (DiscoveryService, error) {
url, err := url.Parse(rawurl)
if err != nil {
return nil, err
}
if f, exists := discoveries[url.Scheme]; exists {
log.Debugf("Initialising %q discovery service with %q", url.Scheme, url.Host+url.Path)
discovery := f()
err := discovery.Initialize(url.Host+url.Path, heartbeat)
return discovery, err
}
return nil, ErrNotSupported
}

84
discovery/etcd/etcd.go Normal file
View File

@ -0,0 +1,84 @@
package etcd
import (
"path"
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/coreos/go-etcd/etcd"
"github.com/docker/swarm/discovery"
)
type EtcdDiscoveryService struct {
ttl uint64
client *etcd.Client
path string
}
func init() {
discovery.Register("etcd",
func() discovery.DiscoveryService {
return &EtcdDiscoveryService{}
},
)
}
func (s *EtcdDiscoveryService) Initialize(uris string, heartbeat int) error {
var (
// split here because uris can contain multiples ips
// like `etcd://192.168.0.1,192.168.0.2,192.168.0.3/path`
parts = strings.SplitN(uris, "/", 2)
ips = strings.Split(parts[0], ",")
machines []string
)
for _, ip := range ips {
machines = append(machines, "http://"+ip)
}
s.client = etcd.NewClient(machines)
s.ttl = uint64(heartbeat * 3 / 2)
s.path = "/" + parts[1] + "/"
if _, err := s.client.CreateDir(s.path, s.ttl); err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
if etcdError.ErrorCode != 105 { // skip key already exists
return err
}
} else {
return err
}
}
return nil
}
func (s *EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) {
resp, err := s.client.Get(s.path, true, true)
if err != nil {
return nil, err
}
var nodes []*discovery.Node
for _, n := range resp.Node.Nodes {
nodes = append(nodes, discovery.NewNode(n.Value))
}
return nodes, nil
}
func (s *EtcdDiscoveryService) Watch() <-chan time.Time {
watchChan := make(chan *etcd.Response)
timeChan := make(chan time.Time)
go s.client.Watch(s.path, 0, true, watchChan, nil)
go func() {
for {
<-watchChan
log.Debugf("[ETCD] Watch triggered")
timeChan <- time.Now()
}
}()
return timeChan
}
func (s *EtcdDiscoveryService) Register(addr string) error {
_, err := s.client.Set(path.Join(s.path, addr), addr, s.ttl)
return err
}

53
discovery/file/file.go Normal file
View File

@ -0,0 +1,53 @@
package file
import (
"errors"
"io/ioutil"
"strings"
"time"
"github.com/docker/swarm/discovery"
)
type FileDiscoveryService struct {
heartbeat int
path string
}
func init() {
discovery.Register("file",
func() discovery.DiscoveryService {
return &FileDiscoveryService{}
},
)
}
func (s *FileDiscoveryService) Initialize(path string, heartbeat int) error {
s.path = path
s.heartbeat = heartbeat
return nil
}
func (s *FileDiscoveryService) Fetch() ([]*discovery.Node, error) {
data, err := ioutil.ReadFile(s.path)
if err != nil {
return nil, err
}
var nodes []*discovery.Node
for _, line := range strings.Split(string(data), "\n") {
if line != "" {
nodes = append(nodes, discovery.NewNode(line))
}
}
return nodes, nil
}
func (s *FileDiscoveryService) Watch() <-chan time.Time {
return time.Tick(time.Duration(s.heartbeat) * time.Second)
}
func (s *FileDiscoveryService) Register(addr string) error {
return errors.New("unimplemented")
}

31
discovery/token/README.md Normal file
View File

@ -0,0 +1,31 @@
#discovery.hub.docker.com
Docker Swarm comes with a simple discovery service built into the [Docker Hub](http://hub.docker.com)
The discovery service is still in alpha stage and currently hosted at `http://discovery-stage.hub.docker.com`
#####Create a new cluster
`-> POST http://discovery.hub.docker.com/v1/clusters (data="")`
`<- <token>`
#####Add new nodes to a cluster
`-> POST http://discovery.hub.docker.com/v1/clusters/<token> (data="<ip:port1>")`
`<- OK`
`-> POST http://discovery.hub.docker.com/v1/clusters/token (data="<ip:port2>")`
`<- OK`
#####List nodes in a cluster
`-> GET http://discovery.hub.docker.com/v1/clusters/token`
`<- ["<ip:port1>", "<ip:port2>"]`
#####Delete a cluster (all the nodes in a cluster)
`-> DELETE http://discovery.hub.docker.com/v1/clusters/token`
`<- OK`

90
discovery/token/token.go Normal file
View File

@ -0,0 +1,90 @@
package token
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/docker/swarm/discovery"
)
const DISCOVERY_URL = "https://discovery-stage.hub.docker.com/v1"
type TokenDiscoveryService struct {
heartbeat int
url string
token string
}
func init() {
discovery.Register("token",
func() discovery.DiscoveryService {
return &TokenDiscoveryService{}
},
)
}
func (s *TokenDiscoveryService) Initialize(urltoken string, heartbeat int) error {
if i := strings.LastIndex(urltoken, "/"); i != -1 {
s.url = "https://" + urltoken[:i]
s.token = urltoken[i+1:]
} else {
s.url = DISCOVERY_URL
s.token = urltoken
}
s.heartbeat = heartbeat
return nil
}
// FetchNodes returns the node for the discovery service at the specified endpoint
func (s *TokenDiscoveryService) Fetch() ([]*discovery.Node, error) {
resp, err := http.Get(fmt.Sprintf("%s/%s/%s", s.url, "clusters", s.token))
if err != nil {
return nil, err
}
if resp.Body != nil {
defer resp.Body.Close()
}
var addrs []string
if resp.StatusCode == http.StatusOK {
if err := json.NewDecoder(resp.Body).Decode(&addrs); err != nil {
return nil, err
}
}
var nodes []*discovery.Node
for _, addr := range addrs {
nodes = append(nodes, discovery.NewNode(addr))
}
return nodes, nil
}
func (s *TokenDiscoveryService) Watch() <-chan time.Time {
return time.Tick(time.Duration(s.heartbeat) * time.Second)
}
// RegisterNode adds a new node identified by the into the discovery service
func (s *TokenDiscoveryService) Register(addr string) error {
buf := strings.NewReader(addr)
_, err := http.Post(fmt.Sprintf("%s/%s/%s", s.url,
"clusters", s.token), "application/json", buf)
return err
}
// CreateCluster returns a unique cluster token
func (s *TokenDiscoveryService) CreateCluster() (string, error) {
resp, err := http.Post(fmt.Sprintf("%s/%s", s.url, "clusters"), "", nil)
if err != nil {
return "", err
}
token, err := ioutil.ReadAll(resp.Body)
return string(token), err
}

View File

@ -0,0 +1,31 @@
package token
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestInit(t *testing.T) {
discovery := &TokenDiscoveryService{}
discovery.Initialize("token", 0)
assert.Equal(t, discovery.token, "token")
assert.Equal(t, discovery.url, DISCOVERY_URL)
discovery.Initialize("custom/path/token", 0)
assert.Equal(t, discovery.token, "token")
assert.Equal(t, discovery.url, "https://custom/path")
}
func TestRegister(t *testing.T) {
discovery := TokenDiscoveryService{token: "TEST_TOKEN", url: DISCOVERY_URL}
expected := "127.0.0.1:2675"
assert.NoError(t, discovery.Register(expected))
addrs, err := discovery.Fetch()
assert.NoError(t, err)
assert.Equal(t, len(addrs), 1)
assert.Equal(t, addrs[0].String(), "http://"+expected)
assert.NoError(t, discovery.Register(expected))
}

13
join.go
View File

@ -10,18 +10,23 @@ import (
func join(c *cli.Context) {
if c.String("token") == "" {
log.Fatal("--token required to join a cluster")
if c.String("discovery") == "" {
log.Fatal("--discovery required to join a cluster")
}
if err := discovery.RegisterSlave(c.String("addr"), c.String("token")); err != nil {
d, err := discovery.New(c.String("discovery"), c.Int("heartbeat"))
if err != nil {
log.Fatal(err)
}
if err := d.Register(c.String("addr")); err != nil {
log.Fatal(err)
}
hb := time.Duration(c.Int("heartbeat"))
for {
time.Sleep(hb * time.Second)
if err := discovery.RegisterSlave(c.String("addr"), c.String("token")); err != nil {
if err := d.Register(c.String("addr")); err != nil {
log.Error(err)
}
}

31
main.go
View File

@ -6,7 +6,11 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/codegangsta/cli"
"github.com/docker/swarm/discovery"
_ "github.com/docker/swarm/discovery/etcd"
_ "github.com/docker/swarm/discovery/file"
"github.com/docker/swarm/discovery/token"
)
func main() {
@ -33,11 +37,11 @@ func main() {
}
// flags
flToken := cli.StringFlag{
Name: "token",
flDiscovery := cli.StringFlag{
Name: "discovery",
Value: "",
Usage: "cluster token",
EnvVar: "SWARM_TOKEN",
Usage: "token://<token>, file://path/to/file",
EnvVar: "SWARM_DISCOVERY",
}
flAddr := cli.StringFlag{
Name: "addr",
@ -81,7 +85,7 @@ func main() {
ShortName: "c",
Usage: "create a cluster",
Action: func(c *cli.Context) {
token, err := discovery.CreateCluster()
token, err := (&token.TokenDiscoveryService{}).CreateCluster()
if err != nil {
log.Fatal(err)
}
@ -92,13 +96,18 @@ func main() {
Name: "list",
ShortName: "l",
Usage: "list nodes in a cluster",
Flags: []cli.Flag{flToken},
Flags: []cli.Flag{flDiscovery},
Action: func(c *cli.Context) {
if c.String("token") == "" {
log.Fatal("--token required to list a cluster")
if c.String("discovery") == "" {
log.Fatal("--discovery required to list a cluster")
}
nodes, err := discovery.FetchSlaves(c.String("token"))
d, err := discovery.New(c.String("discovery"), 0)
if err != nil {
log.Fatal(err)
}
nodes, err := d.Fetch()
if err != nil {
log.Fatal(err)
}
@ -112,7 +121,7 @@ func main() {
ShortName: "m",
Usage: "manage a docker cluster",
Flags: []cli.Flag{
flToken, flAddr, flHeartBeat,
flDiscovery, flAddr, flHeartBeat,
flTls, flTlsCaCert, flTlsCert, flTlsKey, flTlsVerify,
flEnableCors},
Action: manage,
@ -121,7 +130,7 @@ func main() {
Name: "join",
ShortName: "j",
Usage: "join a docker cluster",
Flags: []cli.Flag{flToken, flAddr, flHeartBeat},
Flags: []cli.Flag{flDiscovery, flAddr, flHeartBeat},
Action: join,
},
}

View File

@ -5,8 +5,6 @@ import (
"crypto/x509"
"fmt"
"io/ioutil"
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/codegangsta/cli"
@ -73,14 +71,11 @@ func manage(c *cli.Context) {
}
}
refresh := func(c *cluster.Cluster, nodes []string) {
refresh := func(c *cluster.Cluster, nodes []*discovery.Node) {
for _, addr := range nodes {
go func(addr string) {
if !strings.Contains(addr, "://") {
addr = "http://" + addr
}
if c.Node(addr) == nil {
n := cluster.NewNode(addr)
go func(node *discovery.Node) {
if c.Node(node.String()) == nil {
n := cluster.NewNode(node.String())
if err := n.Connect(tlsConfig); err != nil {
log.Error(err)
return
@ -98,26 +93,33 @@ func manage(c *cli.Context) {
cluster.Events(&logHandler{})
go func() {
if c.String("token") != "" {
nodes, err := discovery.FetchSlaves(c.String("token"))
if c.String("discovery") != "" {
d, err := discovery.New(c.String("discovery"), c.Int("heartbeat"))
if err != nil {
log.Fatal(err)
}
nodes, err := d.Fetch()
if err != nil {
log.Fatal(err)
}
refresh(cluster, nodes)
hb := time.Duration(c.Int("heartbeat"))
go func() {
for {
time.Sleep(hb * time.Second)
nodes, err = discovery.FetchSlaves(c.String("token"))
for _ = range d.Watch() {
nodes, err = d.Fetch()
if err == nil {
refresh(cluster, nodes)
}
}
}()
} else {
refresh(cluster, c.Args())
var nodes []*discovery.Node
for _, arg := range c.Args() {
nodes = append(nodes, discovery.NewNode(arg))
}
refresh(cluster, nodes)
}
}()