mirror of https://github.com/docker/docs.git
use docker/docker/pkg/discovery
Signed-off-by: Victor Vieux <vieux@docker.com>
This commit is contained in:
parent
1bfdf55a52
commit
fc1e7bbca2
|
@ -7,7 +7,7 @@ import (
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/codegangsta/cli"
|
"github.com/codegangsta/cli"
|
||||||
"github.com/docker/swarm/discovery"
|
"github.com/docker/docker/pkg/discovery"
|
||||||
)
|
)
|
||||||
|
|
||||||
func checkAddrFormat(addr string) bool {
|
func checkAddrFormat(addr string) bool {
|
||||||
|
|
|
@ -6,7 +6,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/codegangsta/cli"
|
"github.com/codegangsta/cli"
|
||||||
"github.com/docker/swarm/discovery"
|
"github.com/docker/docker/pkg/discovery"
|
||||||
)
|
)
|
||||||
|
|
||||||
func list(c *cli.Context) {
|
func list(c *cli.Context) {
|
||||||
|
|
|
@ -11,12 +11,12 @@ import (
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/codegangsta/cli"
|
"github.com/codegangsta/cli"
|
||||||
|
"github.com/docker/docker/pkg/discovery"
|
||||||
|
kvdiscovery "github.com/docker/docker/pkg/discovery/kv"
|
||||||
"github.com/docker/swarm/api"
|
"github.com/docker/swarm/api"
|
||||||
"github.com/docker/swarm/cluster"
|
"github.com/docker/swarm/cluster"
|
||||||
"github.com/docker/swarm/cluster/mesos"
|
"github.com/docker/swarm/cluster/mesos"
|
||||||
"github.com/docker/swarm/cluster/swarm"
|
"github.com/docker/swarm/cluster/swarm"
|
||||||
"github.com/docker/swarm/discovery"
|
|
||||||
kvdiscovery "github.com/docker/swarm/discovery/kv"
|
|
||||||
"github.com/docker/swarm/leadership"
|
"github.com/docker/swarm/leadership"
|
||||||
"github.com/docker/swarm/scheduler"
|
"github.com/docker/swarm/scheduler"
|
||||||
"github.com/docker/swarm/scheduler/filter"
|
"github.com/docker/swarm/scheduler/filter"
|
||||||
|
@ -98,7 +98,7 @@ func loadTLSConfig(ca, cert, key string, verify bool) (*tls.Config, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize the discovery service.
|
// Initialize the discovery service.
|
||||||
func createDiscovery(uri string, c *cli.Context, discoveryOpt []string) discovery.Discovery {
|
func createDiscovery(uri string, c *cli.Context, discoveryOpt []string) discovery.Backend {
|
||||||
hb, err := time.ParseDuration(c.String("heartbeat"))
|
hb, err := time.ParseDuration(c.String("heartbeat"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("invalid --heartbeat: %v", err)
|
log.Fatalf("invalid --heartbeat: %v", err)
|
||||||
|
@ -129,7 +129,7 @@ func getDiscoveryOpt(c *cli.Context) map[string]string {
|
||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupReplication(c *cli.Context, cluster cluster.Cluster, server *api.Server, discovery discovery.Discovery, addr string, leaderTTL time.Duration, tlsConfig *tls.Config) {
|
func setupReplication(c *cli.Context, cluster cluster.Cluster, server *api.Server, discovery discovery.Backend, addr string, leaderTTL time.Duration, tlsConfig *tls.Config) {
|
||||||
kvDiscovery, ok := discovery.(*kvdiscovery.Discovery)
|
kvDiscovery, ok := discovery.(*kvdiscovery.Discovery)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Fatal("Leader election is only supported with consul, etcd and zookeeper discovery.")
|
log.Fatal("Leader election is only supported with consul, etcd and zookeeper discovery.")
|
||||||
|
|
|
@ -12,10 +12,10 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
|
"github.com/docker/docker/pkg/discovery"
|
||||||
"github.com/docker/docker/pkg/stringid"
|
"github.com/docker/docker/pkg/stringid"
|
||||||
"github.com/docker/go-units"
|
"github.com/docker/go-units"
|
||||||
"github.com/docker/swarm/cluster"
|
"github.com/docker/swarm/cluster"
|
||||||
"github.com/docker/swarm/discovery"
|
|
||||||
"github.com/docker/swarm/scheduler"
|
"github.com/docker/swarm/scheduler"
|
||||||
"github.com/docker/swarm/scheduler/node"
|
"github.com/docker/swarm/scheduler/node"
|
||||||
"github.com/samalba/dockerclient"
|
"github.com/samalba/dockerclient"
|
||||||
|
@ -54,7 +54,7 @@ type Cluster struct {
|
||||||
engines map[string]*cluster.Engine
|
engines map[string]*cluster.Engine
|
||||||
pendingEngines map[string]*cluster.Engine
|
pendingEngines map[string]*cluster.Engine
|
||||||
scheduler *scheduler.Scheduler
|
scheduler *scheduler.Scheduler
|
||||||
discovery discovery.Discovery
|
discovery discovery.Backend
|
||||||
pendingContainers map[string]*pendingContainer
|
pendingContainers map[string]*pendingContainer
|
||||||
|
|
||||||
overcommitRatio float64
|
overcommitRatio float64
|
||||||
|
@ -63,7 +63,7 @@ type Cluster struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCluster is exported
|
// NewCluster is exported
|
||||||
func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery discovery.Discovery, options cluster.DriverOpts, engineOptions *cluster.EngineOpts) (cluster.Cluster, error) {
|
func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery discovery.Backend, options cluster.DriverOpts, engineOptions *cluster.EngineOpts) (cluster.Cluster, error) {
|
||||||
log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster")
|
log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster")
|
||||||
|
|
||||||
cluster := &Cluster{
|
cluster := &Cluster{
|
||||||
|
|
|
@ -1,166 +0,0 @@
|
||||||
package discovery
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
|
||||||
)
|
|
||||||
|
|
||||||
// An Entry represents a swarm host.
|
|
||||||
type Entry struct {
|
|
||||||
Host string
|
|
||||||
Port string
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewEntry creates a new entry.
|
|
||||||
func NewEntry(url string) (*Entry, error) {
|
|
||||||
host, port, err := net.SplitHostPort(url)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &Entry{host, port}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// String returns the string form of an entry.
|
|
||||||
func (e *Entry) String() string {
|
|
||||||
return fmt.Sprintf("%s:%s", e.Host, e.Port)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Equals returns true if cmp contains the same data.
|
|
||||||
func (e *Entry) Equals(cmp *Entry) bool {
|
|
||||||
return e.Host == cmp.Host && e.Port == cmp.Port
|
|
||||||
}
|
|
||||||
|
|
||||||
// Entries is a list of *Entry with some helpers.
|
|
||||||
type Entries []*Entry
|
|
||||||
|
|
||||||
// Equals returns true if cmp contains the same data.
|
|
||||||
func (e Entries) Equals(cmp Entries) bool {
|
|
||||||
// Check if the file has really changed.
|
|
||||||
if len(e) != len(cmp) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
for i := range e {
|
|
||||||
if !e[i].Equals(cmp[i]) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Contains returns true if the Entries contain a given Entry.
|
|
||||||
func (e Entries) Contains(entry *Entry) bool {
|
|
||||||
for _, curr := range e {
|
|
||||||
if curr.Equals(entry) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Diff compares two entries and returns the added and removed entries.
|
|
||||||
func (e Entries) Diff(cmp Entries) (Entries, Entries) {
|
|
||||||
added := Entries{}
|
|
||||||
for _, entry := range cmp {
|
|
||||||
if !e.Contains(entry) {
|
|
||||||
added = append(added, entry)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
removed := Entries{}
|
|
||||||
for _, entry := range e {
|
|
||||||
if !cmp.Contains(entry) {
|
|
||||||
removed = append(removed, entry)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return added, removed
|
|
||||||
}
|
|
||||||
|
|
||||||
// The Discovery interface is implemented by Discovery backends which
|
|
||||||
// manage swarm host entries.
|
|
||||||
type Discovery interface {
|
|
||||||
// Initialize the discovery with URIs, a heartbeat, a ttl and optional settings.
|
|
||||||
Initialize(string, time.Duration, time.Duration, map[string]string) error
|
|
||||||
|
|
||||||
// Watch the discovery for entry changes.
|
|
||||||
// Returns a channel that will receive changes or an error.
|
|
||||||
// Providing a non-nil stopCh can be used to stop watching.
|
|
||||||
Watch(stopCh <-chan struct{}) (<-chan Entries, <-chan error)
|
|
||||||
|
|
||||||
// Register to the discovery
|
|
||||||
Register(string) error
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
discoveries map[string]Discovery
|
|
||||||
// ErrNotSupported is returned when a discovery service is not supported.
|
|
||||||
ErrNotSupported = errors.New("discovery service not supported")
|
|
||||||
// ErrNotImplemented is returned when discovery feature is not implemented
|
|
||||||
// by discovery backend.
|
|
||||||
ErrNotImplemented = errors.New("not implemented in this discovery service")
|
|
||||||
)
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
discoveries = make(map[string]Discovery)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register makes a discovery backend available by the provided scheme.
|
|
||||||
// If Register is called twice with the same scheme an error is returned.
|
|
||||||
func Register(scheme string, d Discovery) error {
|
|
||||||
if _, exists := discoveries[scheme]; exists {
|
|
||||||
return fmt.Errorf("scheme already registered %s", scheme)
|
|
||||||
}
|
|
||||||
log.WithField("name", scheme).Debug("Registering discovery service")
|
|
||||||
discoveries[scheme] = d
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func parse(rawurl string) (string, string) {
|
|
||||||
parts := strings.SplitN(rawurl, "://", 2)
|
|
||||||
|
|
||||||
// nodes:port,node2:port => nodes://node1:port,node2:port
|
|
||||||
if len(parts) == 1 {
|
|
||||||
return "nodes", parts[0]
|
|
||||||
}
|
|
||||||
return parts[0], parts[1]
|
|
||||||
}
|
|
||||||
|
|
||||||
// New returns a new Discovery given a URL, heartbeat and ttl settings.
|
|
||||||
// Returns an error if the URL scheme is not supported.
|
|
||||||
func New(rawurl string, heartbeat time.Duration, ttl time.Duration, discoveryOpt map[string]string) (Discovery, error) {
|
|
||||||
scheme, uri := parse(rawurl)
|
|
||||||
|
|
||||||
if discovery, exists := discoveries[scheme]; exists {
|
|
||||||
log.WithFields(log.Fields{"name": scheme, "uri": uri}).Debug("Initializing discovery service")
|
|
||||||
err := discovery.Initialize(uri, heartbeat, ttl, discoveryOpt)
|
|
||||||
return discovery, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, ErrNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreateEntries returns an array of entries based on the given addresses.
|
|
||||||
func CreateEntries(addrs []string) (Entries, error) {
|
|
||||||
entries := Entries{}
|
|
||||||
if addrs == nil {
|
|
||||||
return entries, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, addr := range addrs {
|
|
||||||
if len(addr) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
entry, err := NewEntry(addr)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
entries = append(entries, entry)
|
|
||||||
}
|
|
||||||
return entries, nil
|
|
||||||
}
|
|
|
@ -1,120 +0,0 @@
|
||||||
package discovery
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestNewEntry(t *testing.T) {
|
|
||||||
entry, err := NewEntry("127.0.0.1:2375")
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.True(t, entry.Equals(&Entry{Host: "127.0.0.1", Port: "2375"}))
|
|
||||||
assert.Equal(t, entry.String(), "127.0.0.1:2375")
|
|
||||||
|
|
||||||
_, err = NewEntry("127.0.0.1")
|
|
||||||
assert.Error(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestParse(t *testing.T) {
|
|
||||||
scheme, uri := parse("127.0.0.1:2375")
|
|
||||||
assert.Equal(t, scheme, "nodes")
|
|
||||||
assert.Equal(t, uri, "127.0.0.1:2375")
|
|
||||||
|
|
||||||
scheme, uri = parse("localhost:2375")
|
|
||||||
assert.Equal(t, scheme, "nodes")
|
|
||||||
assert.Equal(t, uri, "localhost:2375")
|
|
||||||
|
|
||||||
scheme, uri = parse("scheme://127.0.0.1:2375")
|
|
||||||
assert.Equal(t, scheme, "scheme")
|
|
||||||
assert.Equal(t, uri, "127.0.0.1:2375")
|
|
||||||
|
|
||||||
scheme, uri = parse("scheme://localhost:2375")
|
|
||||||
assert.Equal(t, scheme, "scheme")
|
|
||||||
assert.Equal(t, uri, "localhost:2375")
|
|
||||||
|
|
||||||
scheme, uri = parse("")
|
|
||||||
assert.Equal(t, scheme, "nodes")
|
|
||||||
assert.Equal(t, uri, "")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCreateEntries(t *testing.T) {
|
|
||||||
entries, err := CreateEntries(nil)
|
|
||||||
assert.Equal(t, entries, Entries{})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
entries, err = CreateEntries([]string{"127.0.0.1:2375", "127.0.0.2:2375", ""})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
expected := Entries{
|
|
||||||
&Entry{Host: "127.0.0.1", Port: "2375"},
|
|
||||||
&Entry{Host: "127.0.0.2", Port: "2375"},
|
|
||||||
}
|
|
||||||
assert.True(t, entries.Equals(expected))
|
|
||||||
|
|
||||||
_, err = CreateEntries([]string{"127.0.0.1", "127.0.0.2"})
|
|
||||||
assert.Error(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestContainsEntry(t *testing.T) {
|
|
||||||
entries, err := CreateEntries([]string{"127.0.0.1:2375", "127.0.0.2:2375", ""})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.True(t, entries.Contains(&Entry{Host: "127.0.0.1", Port: "2375"}))
|
|
||||||
assert.False(t, entries.Contains(&Entry{Host: "127.0.0.3", Port: "2375"}))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEntriesEquality(t *testing.T) {
|
|
||||||
entries := Entries{
|
|
||||||
&Entry{Host: "127.0.0.1", Port: "2375"},
|
|
||||||
&Entry{Host: "127.0.0.2", Port: "2375"},
|
|
||||||
}
|
|
||||||
|
|
||||||
// Same
|
|
||||||
assert.True(t, entries.Equals(Entries{
|
|
||||||
&Entry{Host: "127.0.0.1", Port: "2375"},
|
|
||||||
&Entry{Host: "127.0.0.2", Port: "2375"},
|
|
||||||
}))
|
|
||||||
|
|
||||||
// Different size
|
|
||||||
assert.False(t, entries.Equals(Entries{
|
|
||||||
&Entry{Host: "127.0.0.1", Port: "2375"},
|
|
||||||
&Entry{Host: "127.0.0.2", Port: "2375"},
|
|
||||||
&Entry{Host: "127.0.0.3", Port: "2375"},
|
|
||||||
}))
|
|
||||||
|
|
||||||
// Different content
|
|
||||||
assert.False(t, entries.Equals(Entries{
|
|
||||||
&Entry{Host: "127.0.0.1", Port: "2375"},
|
|
||||||
&Entry{Host: "127.0.0.42", Port: "2375"},
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEntriesDiff(t *testing.T) {
|
|
||||||
entry1 := &Entry{Host: "1.1.1.1", Port: "1111"}
|
|
||||||
entry2 := &Entry{Host: "2.2.2.2", Port: "2222"}
|
|
||||||
entry3 := &Entry{Host: "3.3.3.3", Port: "3333"}
|
|
||||||
entries := Entries{entry1, entry2}
|
|
||||||
|
|
||||||
// No diff
|
|
||||||
added, removed := entries.Diff(Entries{entry2, entry1})
|
|
||||||
assert.Empty(t, added)
|
|
||||||
assert.Empty(t, removed)
|
|
||||||
|
|
||||||
// Add
|
|
||||||
added, removed = entries.Diff(Entries{entry2, entry3, entry1})
|
|
||||||
assert.Len(t, added, 1)
|
|
||||||
assert.True(t, added.Contains(entry3))
|
|
||||||
assert.Empty(t, removed)
|
|
||||||
|
|
||||||
// Remove
|
|
||||||
added, removed = entries.Diff(Entries{entry2})
|
|
||||||
assert.Empty(t, added)
|
|
||||||
assert.Len(t, removed, 1)
|
|
||||||
assert.True(t, removed.Contains(entry1))
|
|
||||||
|
|
||||||
// Add and remove
|
|
||||||
added, removed = entries.Diff(Entries{entry1, entry3})
|
|
||||||
assert.Len(t, added, 1)
|
|
||||||
assert.True(t, added.Contains(entry3))
|
|
||||||
assert.Len(t, removed, 1)
|
|
||||||
assert.True(t, removed.Contains(entry2))
|
|
||||||
}
|
|
|
@ -1,109 +0,0 @@
|
||||||
package file
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/docker/swarm/discovery"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Discovery is exported
|
|
||||||
type Discovery struct {
|
|
||||||
heartbeat time.Duration
|
|
||||||
path string
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
Init()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Init is exported
|
|
||||||
func Init() {
|
|
||||||
discovery.Register("file", &Discovery{})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize is exported
|
|
||||||
func (s *Discovery) Initialize(path string, heartbeat time.Duration, ttl time.Duration, _ map[string]string) error {
|
|
||||||
s.path = path
|
|
||||||
s.heartbeat = heartbeat
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseFileContent(content []byte) []string {
|
|
||||||
var result []string
|
|
||||||
for _, line := range strings.Split(strings.TrimSpace(string(content)), "\n") {
|
|
||||||
line = strings.TrimSpace(line)
|
|
||||||
// Ignoring line starts with #
|
|
||||||
if strings.HasPrefix(line, "#") {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Inlined # comment also ignored.
|
|
||||||
if strings.Contains(line, "#") {
|
|
||||||
line = line[0:strings.Index(line, "#")]
|
|
||||||
// Trim additional spaces caused by above stripping.
|
|
||||||
line = strings.TrimSpace(line)
|
|
||||||
}
|
|
||||||
for _, ip := range discovery.Generate(line) {
|
|
||||||
result = append(result, ip)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Discovery) fetch() (discovery.Entries, error) {
|
|
||||||
fileContent, err := ioutil.ReadFile(s.path)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to read '%s': %v", s.path, err)
|
|
||||||
}
|
|
||||||
return discovery.CreateEntries(parseFileContent(fileContent))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Watch is exported
|
|
||||||
func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
|
|
||||||
ch := make(chan discovery.Entries)
|
|
||||||
errCh := make(chan error)
|
|
||||||
ticker := time.NewTicker(s.heartbeat)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer close(errCh)
|
|
||||||
defer close(ch)
|
|
||||||
|
|
||||||
// Send the initial entries if available.
|
|
||||||
currentEntries, err := s.fetch()
|
|
||||||
if err != nil {
|
|
||||||
errCh <- err
|
|
||||||
} else {
|
|
||||||
ch <- currentEntries
|
|
||||||
}
|
|
||||||
|
|
||||||
// Periodically send updates.
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ticker.C:
|
|
||||||
newEntries, err := s.fetch()
|
|
||||||
if err != nil {
|
|
||||||
errCh <- err
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if the file has really changed.
|
|
||||||
if !newEntries.Equals(currentEntries) {
|
|
||||||
ch <- newEntries
|
|
||||||
}
|
|
||||||
currentEntries = newEntries
|
|
||||||
case <-stopCh:
|
|
||||||
ticker.Stop()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return ch, errCh
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register is exported
|
|
||||||
func (s *Discovery) Register(addr string) error {
|
|
||||||
return discovery.ErrNotImplemented
|
|
||||||
}
|
|
|
@ -1,106 +0,0 @@
|
||||||
package file
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/docker/swarm/discovery"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestInitialize(t *testing.T) {
|
|
||||||
d := &Discovery{}
|
|
||||||
d.Initialize("/path/to/file", 1000, 0, nil)
|
|
||||||
assert.Equal(t, d.path, "/path/to/file")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNew(t *testing.T) {
|
|
||||||
d, err := discovery.New("file:///path/to/file", 0, 0, nil)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Equal(t, d.(*Discovery).path, "/path/to/file")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestContent(t *testing.T) {
|
|
||||||
data := `
|
|
||||||
1.1.1.[1:2]:1111
|
|
||||||
2.2.2.[2:4]:2222
|
|
||||||
`
|
|
||||||
ips := parseFileContent([]byte(data))
|
|
||||||
assert.Len(t, ips, 5)
|
|
||||||
assert.Equal(t, ips[0], "1.1.1.1:1111")
|
|
||||||
assert.Equal(t, ips[1], "1.1.1.2:1111")
|
|
||||||
assert.Equal(t, ips[2], "2.2.2.2:2222")
|
|
||||||
assert.Equal(t, ips[3], "2.2.2.3:2222")
|
|
||||||
assert.Equal(t, ips[4], "2.2.2.4:2222")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRegister(t *testing.T) {
|
|
||||||
discovery := &Discovery{path: "/path/to/file"}
|
|
||||||
assert.Error(t, discovery.Register("0.0.0.0"))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestParsingContentsWithComments(t *testing.T) {
|
|
||||||
data := `
|
|
||||||
### test ###
|
|
||||||
1.1.1.1:1111 # inline comment
|
|
||||||
# 2.2.2.2:2222
|
|
||||||
### empty line with comment
|
|
||||||
3.3.3.3:3333
|
|
||||||
### test ###
|
|
||||||
`
|
|
||||||
ips := parseFileContent([]byte(data))
|
|
||||||
assert.Len(t, ips, 2)
|
|
||||||
assert.Equal(t, "1.1.1.1:1111", ips[0])
|
|
||||||
assert.Equal(t, "3.3.3.3:3333", ips[1])
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWatch(t *testing.T) {
|
|
||||||
data := `
|
|
||||||
1.1.1.1:1111
|
|
||||||
2.2.2.2:2222
|
|
||||||
`
|
|
||||||
expected := discovery.Entries{
|
|
||||||
&discovery.Entry{Host: "1.1.1.1", Port: "1111"},
|
|
||||||
&discovery.Entry{Host: "2.2.2.2", Port: "2222"},
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a temporary file and remove it.
|
|
||||||
tmp, err := ioutil.TempFile(os.TempDir(), "discovery-file-test")
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.NoError(t, tmp.Close())
|
|
||||||
assert.NoError(t, os.Remove(tmp.Name()))
|
|
||||||
|
|
||||||
// Set up file discovery.
|
|
||||||
d := &Discovery{}
|
|
||||||
d.Initialize(tmp.Name(), 1000, 0, nil)
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
ch, errCh := d.Watch(stopCh)
|
|
||||||
|
|
||||||
// Make sure it fires errors since the file doesn't exist.
|
|
||||||
assert.Error(t, <-errCh)
|
|
||||||
// We have to drain the error channel otherwise Watch will get stuck.
|
|
||||||
go func() {
|
|
||||||
for range errCh {
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Write the file and make sure we get the expected value back.
|
|
||||||
assert.NoError(t, ioutil.WriteFile(tmp.Name(), []byte(data), 0600))
|
|
||||||
assert.Equal(t, expected, <-ch)
|
|
||||||
|
|
||||||
// Add a new entry and look it up.
|
|
||||||
expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"})
|
|
||||||
f, err := os.OpenFile(tmp.Name(), os.O_APPEND|os.O_WRONLY, 0600)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.NotNil(t, f)
|
|
||||||
_, err = f.WriteString("\n3.3.3.3:3333\n")
|
|
||||||
assert.NoError(t, err)
|
|
||||||
f.Close()
|
|
||||||
assert.Equal(t, expected, <-ch)
|
|
||||||
|
|
||||||
// Stop and make sure it closes all channels.
|
|
||||||
close(stopCh)
|
|
||||||
assert.Nil(t, <-ch)
|
|
||||||
assert.Nil(t, <-errCh)
|
|
||||||
}
|
|
|
@ -1,35 +0,0 @@
|
||||||
package discovery
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"regexp"
|
|
||||||
"strconv"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Generate takes care of IP generation
|
|
||||||
func Generate(pattern string) []string {
|
|
||||||
re, _ := regexp.Compile(`\[(.+):(.+)\]`)
|
|
||||||
submatch := re.FindStringSubmatch(pattern)
|
|
||||||
if submatch == nil {
|
|
||||||
return []string{pattern}
|
|
||||||
}
|
|
||||||
|
|
||||||
from, err := strconv.Atoi(submatch[1])
|
|
||||||
if err != nil {
|
|
||||||
return []string{pattern}
|
|
||||||
}
|
|
||||||
to, err := strconv.Atoi(submatch[2])
|
|
||||||
if err != nil {
|
|
||||||
return []string{pattern}
|
|
||||||
}
|
|
||||||
|
|
||||||
template := re.ReplaceAllString(pattern, "%d")
|
|
||||||
|
|
||||||
var result []string
|
|
||||||
for val := from; val <= to; val++ {
|
|
||||||
entry := fmt.Sprintf(template, val)
|
|
||||||
result = append(result, entry)
|
|
||||||
}
|
|
||||||
|
|
||||||
return result
|
|
||||||
}
|
|
|
@ -1,55 +0,0 @@
|
||||||
package discovery
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestGeneratorNotGenerate(t *testing.T) {
|
|
||||||
ips := Generate("127.0.0.1")
|
|
||||||
assert.Equal(t, len(ips), 1)
|
|
||||||
assert.Equal(t, ips[0], "127.0.0.1")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGeneratorWithPortNotGenerate(t *testing.T) {
|
|
||||||
ips := Generate("127.0.0.1:8080")
|
|
||||||
assert.Equal(t, len(ips), 1)
|
|
||||||
assert.Equal(t, ips[0], "127.0.0.1:8080")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGeneratorMatchFailedNotGenerate(t *testing.T) {
|
|
||||||
ips := Generate("127.0.0.[1]")
|
|
||||||
assert.Equal(t, len(ips), 1)
|
|
||||||
assert.Equal(t, ips[0], "127.0.0.[1]")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGeneratorWithPort(t *testing.T) {
|
|
||||||
ips := Generate("127.0.0.[1:11]:2375")
|
|
||||||
assert.Equal(t, len(ips), 11)
|
|
||||||
assert.Equal(t, ips[0], "127.0.0.1:2375")
|
|
||||||
assert.Equal(t, ips[1], "127.0.0.2:2375")
|
|
||||||
assert.Equal(t, ips[2], "127.0.0.3:2375")
|
|
||||||
assert.Equal(t, ips[3], "127.0.0.4:2375")
|
|
||||||
assert.Equal(t, ips[4], "127.0.0.5:2375")
|
|
||||||
assert.Equal(t, ips[5], "127.0.0.6:2375")
|
|
||||||
assert.Equal(t, ips[6], "127.0.0.7:2375")
|
|
||||||
assert.Equal(t, ips[7], "127.0.0.8:2375")
|
|
||||||
assert.Equal(t, ips[8], "127.0.0.9:2375")
|
|
||||||
assert.Equal(t, ips[9], "127.0.0.10:2375")
|
|
||||||
assert.Equal(t, ips[10], "127.0.0.11:2375")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGenerateWithMalformedInputAtRangeStart(t *testing.T) {
|
|
||||||
malformedInput := "127.0.0.[x:11]:2375"
|
|
||||||
ips := Generate(malformedInput)
|
|
||||||
assert.Equal(t, len(ips), 1)
|
|
||||||
assert.Equal(t, ips[0], malformedInput)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGenerateWithMalformedInputAtRangeEnd(t *testing.T) {
|
|
||||||
malformedInput := "127.0.0.[1:x]:2375"
|
|
||||||
ips := Generate(malformedInput)
|
|
||||||
assert.Equal(t, len(ips), 1)
|
|
||||||
assert.Equal(t, ips[0], malformedInput)
|
|
||||||
}
|
|
|
@ -1,193 +0,0 @@
|
||||||
package kv
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"path"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
|
||||||
"github.com/docker/docker/pkg/tlsconfig"
|
|
||||||
"github.com/docker/libkv"
|
|
||||||
"github.com/docker/libkv/store"
|
|
||||||
"github.com/docker/libkv/store/consul"
|
|
||||||
"github.com/docker/libkv/store/etcd"
|
|
||||||
"github.com/docker/libkv/store/zookeeper"
|
|
||||||
"github.com/docker/swarm/discovery"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
defaultDiscoveryPath = "docker/swarm/nodes"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Discovery is exported
|
|
||||||
type Discovery struct {
|
|
||||||
backend store.Backend
|
|
||||||
store store.Store
|
|
||||||
heartbeat time.Duration
|
|
||||||
ttl time.Duration
|
|
||||||
prefix string
|
|
||||||
path string
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
Init()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Init is exported
|
|
||||||
func Init() {
|
|
||||||
// Register to libkv
|
|
||||||
zookeeper.Register()
|
|
||||||
consul.Register()
|
|
||||||
etcd.Register()
|
|
||||||
|
|
||||||
// Register to internal Swarm discovery service
|
|
||||||
discovery.Register("zk", &Discovery{backend: store.ZK})
|
|
||||||
discovery.Register("consul", &Discovery{backend: store.CONSUL})
|
|
||||||
discovery.Register("etcd", &Discovery{backend: store.ETCD})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize is exported
|
|
||||||
func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Duration, discoveryOpt map[string]string) error {
|
|
||||||
var (
|
|
||||||
parts = strings.SplitN(uris, "/", 2)
|
|
||||||
addrs = strings.Split(parts[0], ",")
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
|
|
||||||
// A custom prefix to the path can be optionally used.
|
|
||||||
if len(parts) == 2 {
|
|
||||||
s.prefix = parts[1]
|
|
||||||
}
|
|
||||||
|
|
||||||
s.heartbeat = heartbeat
|
|
||||||
s.ttl = ttl
|
|
||||||
|
|
||||||
// Use a custom path if specified in discovery options
|
|
||||||
dpath := defaultDiscoveryPath
|
|
||||||
if discoveryOpt["kv.path"] != "" {
|
|
||||||
dpath = discoveryOpt["kv.path"]
|
|
||||||
}
|
|
||||||
|
|
||||||
s.path = path.Join(s.prefix, dpath)
|
|
||||||
|
|
||||||
var config *store.Config
|
|
||||||
if discoveryOpt["kv.cacertfile"] != "" && discoveryOpt["kv.certfile"] != "" && discoveryOpt["kv.keyfile"] != "" {
|
|
||||||
log.Debug("Initializing discovery with TLS")
|
|
||||||
tlsConfig, err := tlsconfig.Client(tlsconfig.Options{
|
|
||||||
CAFile: discoveryOpt["kv.cacertfile"],
|
|
||||||
CertFile: discoveryOpt["kv.certfile"],
|
|
||||||
KeyFile: discoveryOpt["kv.keyfile"],
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
config = &store.Config{
|
|
||||||
// Set ClientTLS to trigger https (bug in libkv/etcd)
|
|
||||||
ClientTLS: &store.ClientTLSConfig{
|
|
||||||
CACertFile: discoveryOpt["kv.cacertfile"],
|
|
||||||
CertFile: discoveryOpt["kv.certfile"],
|
|
||||||
KeyFile: discoveryOpt["kv.keyfile"],
|
|
||||||
},
|
|
||||||
// The actual TLS config that will be used
|
|
||||||
TLS: tlsConfig,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Debug("Initializing discovery without TLS")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Creates a new store, will ignore options given
|
|
||||||
// if not supported by the chosen store
|
|
||||||
s.store, err = libkv.NewStore(s.backend, addrs, config)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Watch the store until either there's a store error or we receive a stop request.
|
|
||||||
// Returns false if we shouldn't attempt watching the store anymore (stop request received).
|
|
||||||
func (s *Discovery) watchOnce(stopCh <-chan struct{}, watchCh <-chan []*store.KVPair, discoveryCh chan discovery.Entries, errCh chan error) bool {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case pairs := <-watchCh:
|
|
||||||
if pairs == nil {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
log.WithField("discovery", s.backend).Debugf("Watch triggered with %d nodes", len(pairs))
|
|
||||||
|
|
||||||
// Convert `KVPair` into `discovery.Entry`.
|
|
||||||
addrs := make([]string, len(pairs))
|
|
||||||
for _, pair := range pairs {
|
|
||||||
addrs = append(addrs, string(pair.Value))
|
|
||||||
}
|
|
||||||
|
|
||||||
entries, err := discovery.CreateEntries(addrs)
|
|
||||||
if err != nil {
|
|
||||||
errCh <- err
|
|
||||||
} else {
|
|
||||||
discoveryCh <- entries
|
|
||||||
}
|
|
||||||
case <-stopCh:
|
|
||||||
// We were requested to stop watching.
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Watch is exported
|
|
||||||
func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
|
|
||||||
ch := make(chan discovery.Entries)
|
|
||||||
errCh := make(chan error)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer close(ch)
|
|
||||||
defer close(errCh)
|
|
||||||
|
|
||||||
// Forever: Create a store watch, watch until we get an error and then try again.
|
|
||||||
// Will only stop if we receive a stopCh request.
|
|
||||||
for {
|
|
||||||
// Create the path to watch if it does not exist yet
|
|
||||||
exists, err := s.store.Exists(s.path)
|
|
||||||
if err != nil {
|
|
||||||
errCh <- err
|
|
||||||
}
|
|
||||||
if !exists {
|
|
||||||
if err := s.store.Put(s.path, []byte(""), &store.WriteOptions{IsDir: true}); err != nil {
|
|
||||||
errCh <- err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set up a watch.
|
|
||||||
watchCh, err := s.store.WatchTree(s.path, stopCh)
|
|
||||||
if err != nil {
|
|
||||||
errCh <- err
|
|
||||||
} else {
|
|
||||||
if !s.watchOnce(stopCh, watchCh, ch, errCh) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we get here it means the store watch channel was closed. This
|
|
||||||
// is unexpected so let's retry later.
|
|
||||||
errCh <- fmt.Errorf("Unexpected watch error")
|
|
||||||
time.Sleep(s.heartbeat)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return ch, errCh
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register is exported
|
|
||||||
func (s *Discovery) Register(addr string) error {
|
|
||||||
opts := &store.WriteOptions{TTL: s.ttl}
|
|
||||||
return s.store.Put(path.Join(s.path, addr), []byte(addr), opts)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store returns the underlying store used by KV discovery.
|
|
||||||
func (s *Discovery) Store() store.Store {
|
|
||||||
return s.store
|
|
||||||
}
|
|
||||||
|
|
||||||
// Prefix returns the store prefix
|
|
||||||
func (s *Discovery) Prefix() string {
|
|
||||||
return s.prefix
|
|
||||||
}
|
|
|
@ -1,199 +0,0 @@
|
||||||
package kv
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
|
||||||
"path"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/docker/libkv"
|
|
||||||
"github.com/docker/libkv/store"
|
|
||||||
libkvmock "github.com/docker/libkv/store/mock"
|
|
||||||
"github.com/docker/swarm/discovery"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/mock"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestInitialize(t *testing.T) {
|
|
||||||
storeMock, err := libkvmock.New([]string{"127.0.0.1"}, nil)
|
|
||||||
assert.NotNil(t, storeMock)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
d := &Discovery{backend: store.CONSUL}
|
|
||||||
d.Initialize("127.0.0.1", 0, 0, nil)
|
|
||||||
d.store = storeMock
|
|
||||||
|
|
||||||
s := d.store.(*libkvmock.Mock)
|
|
||||||
assert.Len(t, s.Endpoints, 1)
|
|
||||||
assert.Equal(t, s.Endpoints[0], "127.0.0.1")
|
|
||||||
assert.Equal(t, d.path, defaultDiscoveryPath)
|
|
||||||
|
|
||||||
storeMock, err = libkvmock.New([]string{"127.0.0.1:1234"}, nil)
|
|
||||||
assert.NotNil(t, storeMock)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
d = &Discovery{backend: store.CONSUL}
|
|
||||||
d.Initialize("127.0.0.1:1234/path", 0, 0, nil)
|
|
||||||
d.store = storeMock
|
|
||||||
|
|
||||||
s = d.store.(*libkvmock.Mock)
|
|
||||||
assert.Len(t, s.Endpoints, 1)
|
|
||||||
assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234")
|
|
||||||
assert.Equal(t, d.path, "path/"+defaultDiscoveryPath)
|
|
||||||
|
|
||||||
storeMock, err = libkvmock.New([]string{"127.0.0.1:1234", "127.0.0.2:1234", "127.0.0.3:1234"}, nil)
|
|
||||||
assert.NotNil(t, storeMock)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
d = &Discovery{backend: store.CONSUL}
|
|
||||||
d.Initialize("127.0.0.1:1234,127.0.0.2:1234,127.0.0.3:1234/path", 0, 0, nil)
|
|
||||||
d.store = storeMock
|
|
||||||
|
|
||||||
s = d.store.(*libkvmock.Mock)
|
|
||||||
if assert.Len(t, s.Endpoints, 3) {
|
|
||||||
assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234")
|
|
||||||
assert.Equal(t, s.Endpoints[1], "127.0.0.2:1234")
|
|
||||||
assert.Equal(t, s.Endpoints[2], "127.0.0.3:1234")
|
|
||||||
}
|
|
||||||
assert.Equal(t, d.path, "path/"+defaultDiscoveryPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestInitializeWithCerts(t *testing.T) {
|
|
||||||
cert := `-----BEGIN CERTIFICATE-----
|
|
||||||
MIIDCDCCAfKgAwIBAgIICifG7YeiQOEwCwYJKoZIhvcNAQELMBIxEDAOBgNVBAMT
|
|
||||||
B1Rlc3QgQ0EwHhcNMTUxMDAxMjMwMDAwWhcNMjAwOTI5MjMwMDAwWjASMRAwDgYD
|
|
||||||
VQQDEwdUZXN0IENBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1wRC
|
|
||||||
O+flnLTK5ImjTurNRHwSejuqGbc4CAvpB0hS+z0QlSs4+zE9h80aC4hz+6caRpds
|
|
||||||
+J908Q+RvAittMHbpc7VjbZP72G6fiXk7yPPl6C10HhRSoSi3nY+B7F2E8cuz14q
|
|
||||||
V2e+ejhWhSrBb/keyXpcyjoW1BOAAJ2TIclRRkICSCZrpXUyXxAvzXfpFXo1RhSb
|
|
||||||
UywN11pfiCQzDUN7sPww9UzFHuAHZHoyfTr27XnJYVUerVYrCPq8vqfn//01qz55
|
|
||||||
Xs0hvzGdlTFXhuabFtQnKFH5SNwo/fcznhB7rePOwHojxOpXTBepUCIJLbtNnWFT
|
|
||||||
V44t9gh5IqIWtoBReQIDAQABo2YwZDAOBgNVHQ8BAf8EBAMCAAYwEgYDVR0TAQH/
|
|
||||||
BAgwBgEB/wIBAjAdBgNVHQ4EFgQUZKUI8IIjIww7X/6hvwggQK4bD24wHwYDVR0j
|
|
||||||
BBgwFoAUZKUI8IIjIww7X/6hvwggQK4bD24wCwYJKoZIhvcNAQELA4IBAQDES2cz
|
|
||||||
7sCQfDCxCIWH7X8kpi/JWExzUyQEJ0rBzN1m3/x8ySRxtXyGekimBqQwQdFqlwMI
|
|
||||||
xzAQKkh3ue8tNSzRbwqMSyH14N1KrSxYS9e9szJHfUasoTpQGPmDmGIoRJuq1h6M
|
|
||||||
ej5x1SCJ7GWCR6xEXKUIE9OftXm9TdFzWa7Ja3OHz/mXteii8VXDuZ5ACq6EE5bY
|
|
||||||
8sP4gcICfJ5fTrpTlk9FIqEWWQrCGa5wk95PGEj+GJpNogjXQ97wVoo/Y3p1brEn
|
|
||||||
t5zjN9PAq4H1fuCMdNNA+p1DHNwd+ELTxcMAnb2ajwHvV6lKPXutrTFc4umJToBX
|
|
||||||
FpTxDmJHEV4bzUzh
|
|
||||||
-----END CERTIFICATE-----
|
|
||||||
`
|
|
||||||
key := `-----BEGIN RSA PRIVATE KEY-----
|
|
||||||
MIIEpQIBAAKCAQEA1wRCO+flnLTK5ImjTurNRHwSejuqGbc4CAvpB0hS+z0QlSs4
|
|
||||||
+zE9h80aC4hz+6caRpds+J908Q+RvAittMHbpc7VjbZP72G6fiXk7yPPl6C10HhR
|
|
||||||
SoSi3nY+B7F2E8cuz14qV2e+ejhWhSrBb/keyXpcyjoW1BOAAJ2TIclRRkICSCZr
|
|
||||||
pXUyXxAvzXfpFXo1RhSbUywN11pfiCQzDUN7sPww9UzFHuAHZHoyfTr27XnJYVUe
|
|
||||||
rVYrCPq8vqfn//01qz55Xs0hvzGdlTFXhuabFtQnKFH5SNwo/fcznhB7rePOwHoj
|
|
||||||
xOpXTBepUCIJLbtNnWFTV44t9gh5IqIWtoBReQIDAQABAoIBAHSWipORGp/uKFXj
|
|
||||||
i/mut776x8ofsAxhnLBARQr93ID+i49W8H7EJGkOfaDjTICYC1dbpGrri61qk8sx
|
|
||||||
qX7p3v/5NzKwOIfEpirgwVIqSNYe/ncbxnhxkx6tXtUtFKmEx40JskvSpSYAhmmO
|
|
||||||
1XSx0E/PWaEN/nLgX/f1eWJIlxlQkk3QeqL+FGbCXI48DEtlJ9+MzMu4pAwZTpj5
|
|
||||||
5qtXo5JJ0jRGfJVPAOznRsYqv864AhMdMIWguzk6EGnbaCWwPcfcn+h9a5LMdony
|
|
||||||
MDHfBS7bb5tkF3+AfnVY3IBMVx7YlsD9eAyajlgiKu4zLbwTRHjXgShy+4Oussz0
|
|
||||||
ugNGnkECgYEA/hi+McrZC8C4gg6XqK8+9joD8tnyDZDz88BQB7CZqABUSwvjDqlP
|
|
||||||
L8hcwo/lzvjBNYGkqaFPUICGWKjeCtd8pPS2DCVXxDQX4aHF1vUur0uYNncJiV3N
|
|
||||||
XQz4Iemsa6wnKf6M67b5vMXICw7dw0HZCdIHD1hnhdtDz0uVpeevLZ8CgYEA2KCT
|
|
||||||
Y43lorjrbCgMqtlefkr3GJA9dey+hTzCiWEOOqn9RqGoEGUday0sKhiLofOgmN2B
|
|
||||||
LEukpKIey8s+Q/cb6lReajDVPDsMweX8i7hz3Wa4Ugp4Xa5BpHqu8qIAE2JUZ7bU
|
|
||||||
t88aQAYE58pUF+/Lq1QzAQdrjjzQBx6SrBxieecCgYEAvukoPZEC8mmiN1VvbTX+
|
|
||||||
QFHmlZha3QaDxChB+QUe7bMRojEUL/fVnzkTOLuVFqSfxevaI/km9n0ac5KtAchV
|
|
||||||
xjp2bTnBb5EUQFqjopYktWA+xO07JRJtMfSEmjZPbbay1kKC7rdTfBm961EIHaRj
|
|
||||||
xZUf6M+rOE8964oGrdgdLlECgYEA046GQmx6fh7/82FtdZDRQp9tj3SWQUtSiQZc
|
|
||||||
qhO59Lq8mjUXz+MgBuJXxkiwXRpzlbaFB0Bca1fUoYw8o915SrDYf/Zu2OKGQ/qa
|
|
||||||
V81sgiVmDuEgycR7YOlbX6OsVUHrUlpwhY3hgfMe6UtkMvhBvHF/WhroBEIJm1pV
|
|
||||||
PXZ/CbMCgYEApNWVktFBjOaYfY6SNn4iSts1jgsQbbpglg3kT7PLKjCAhI6lNsbk
|
|
||||||
dyT7ut01PL6RaW4SeQWtrJIVQaM6vF3pprMKqlc5XihOGAmVqH7rQx9rtQB5TicL
|
|
||||||
BFrwkQE4HQtQBV60hYQUzzlSk44VFDz+jxIEtacRHaomDRh2FtOTz+I=
|
|
||||||
-----END RSA PRIVATE KEY-----
|
|
||||||
`
|
|
||||||
certFile, err := ioutil.TempFile("", "cert")
|
|
||||||
assert.Nil(t, err)
|
|
||||||
defer os.Remove(certFile.Name())
|
|
||||||
certFile.Write([]byte(cert))
|
|
||||||
certFile.Close()
|
|
||||||
keyFile, err := ioutil.TempFile("", "key")
|
|
||||||
assert.Nil(t, err)
|
|
||||||
defer os.Remove(keyFile.Name())
|
|
||||||
keyFile.Write([]byte(key))
|
|
||||||
keyFile.Close()
|
|
||||||
|
|
||||||
libkv.AddStore("mock", libkvmock.New)
|
|
||||||
d := &Discovery{backend: "mock"}
|
|
||||||
err = d.Initialize("127.0.0.3:1234", 0, 0, map[string]string{
|
|
||||||
"kv.cacertfile": certFile.Name(),
|
|
||||||
"kv.certfile": certFile.Name(),
|
|
||||||
"kv.keyfile": keyFile.Name(),
|
|
||||||
})
|
|
||||||
assert.Nil(t, err)
|
|
||||||
s := d.store.(*libkvmock.Mock)
|
|
||||||
assert.Equal(t, s.Options.ClientTLS.CACertFile, certFile.Name())
|
|
||||||
assert.Equal(t, s.Options.ClientTLS.CertFile, certFile.Name())
|
|
||||||
assert.Equal(t, s.Options.ClientTLS.KeyFile, keyFile.Name())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWatch(t *testing.T) {
|
|
||||||
storeMock, err := libkvmock.New([]string{"127.0.0.1:1234"}, nil)
|
|
||||||
assert.NotNil(t, storeMock)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
d := &Discovery{backend: store.CONSUL}
|
|
||||||
d.Initialize("127.0.0.1:1234/path", 0, 0, nil)
|
|
||||||
d.store = storeMock
|
|
||||||
|
|
||||||
s := d.store.(*libkvmock.Mock)
|
|
||||||
mockCh := make(chan []*store.KVPair)
|
|
||||||
|
|
||||||
// The first watch will fail on those three calls
|
|
||||||
s.On("Exists", "path/"+defaultDiscoveryPath).Return(false, errors.New("test error"))
|
|
||||||
s.On("Put", "path/"+defaultDiscoveryPath, mock.Anything, mock.Anything).Return(errors.New("test error"))
|
|
||||||
s.On("WatchTree", "path/"+defaultDiscoveryPath, mock.Anything).Return(mockCh, errors.New("test error")).Once()
|
|
||||||
|
|
||||||
// The second one will succeed.
|
|
||||||
s.On("WatchTree", "path/"+defaultDiscoveryPath, mock.Anything).Return(mockCh, nil).Once()
|
|
||||||
expected := discovery.Entries{
|
|
||||||
&discovery.Entry{Host: "1.1.1.1", Port: "1111"},
|
|
||||||
&discovery.Entry{Host: "2.2.2.2", Port: "2222"},
|
|
||||||
}
|
|
||||||
kvs := []*store.KVPair{
|
|
||||||
{Key: path.Join("path", defaultDiscoveryPath, "1.1.1.1"), Value: []byte("1.1.1.1:1111")},
|
|
||||||
{Key: path.Join("path", defaultDiscoveryPath, "2.2.2.2"), Value: []byte("2.2.2.2:2222")},
|
|
||||||
}
|
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
ch, errCh := d.Watch(stopCh)
|
|
||||||
|
|
||||||
// It should fire an error since the first WatchTree call failed.
|
|
||||||
assert.EqualError(t, <-errCh, "test error")
|
|
||||||
// We have to drain the error channel otherwise Watch will get stuck.
|
|
||||||
go func() {
|
|
||||||
for range errCh {
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Push the entries into the store channel and make sure discovery emits.
|
|
||||||
mockCh <- kvs
|
|
||||||
assert.Equal(t, <-ch, expected)
|
|
||||||
|
|
||||||
// Add a new entry.
|
|
||||||
expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"})
|
|
||||||
kvs = append(kvs, &store.KVPair{Key: path.Join("path", defaultDiscoveryPath, "3.3.3.3"), Value: []byte("3.3.3.3:3333")})
|
|
||||||
mockCh <- kvs
|
|
||||||
assert.Equal(t, <-ch, expected)
|
|
||||||
|
|
||||||
// Make sure that if an error occurs it retries.
|
|
||||||
// This third call to WatchTree will be checked later by AssertExpectations.
|
|
||||||
s.On("WatchTree", "path/"+defaultDiscoveryPath, mock.Anything).Return(mockCh, nil)
|
|
||||||
close(mockCh)
|
|
||||||
// Give it enough time to call WatchTree.
|
|
||||||
time.Sleep(3)
|
|
||||||
|
|
||||||
// Stop and make sure it closes all channels.
|
|
||||||
close(stopCh)
|
|
||||||
assert.Nil(t, <-ch)
|
|
||||||
assert.Nil(t, <-errCh)
|
|
||||||
|
|
||||||
s.AssertExpectations(t)
|
|
||||||
}
|
|
|
@ -1,54 +0,0 @@
|
||||||
package nodes
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/docker/swarm/discovery"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Discovery is exported
|
|
||||||
type Discovery struct {
|
|
||||||
entries discovery.Entries
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
Init()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Init is exported
|
|
||||||
func Init() {
|
|
||||||
discovery.Register("nodes", &Discovery{})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize is exported
|
|
||||||
func (s *Discovery) Initialize(uris string, _ time.Duration, _ time.Duration, _ map[string]string) error {
|
|
||||||
for _, input := range strings.Split(uris, ",") {
|
|
||||||
for _, ip := range discovery.Generate(input) {
|
|
||||||
entry, err := discovery.NewEntry(ip)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("%s, please check you are using the correct discovery (missing token:// ?)", err.Error())
|
|
||||||
}
|
|
||||||
s.entries = append(s.entries, entry)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Watch is exported
|
|
||||||
func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
|
|
||||||
ch := make(chan discovery.Entries)
|
|
||||||
go func() {
|
|
||||||
defer close(ch)
|
|
||||||
ch <- s.entries
|
|
||||||
<-stopCh
|
|
||||||
}()
|
|
||||||
return ch, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register is exported
|
|
||||||
func (s *Discovery) Register(addr string) error {
|
|
||||||
return discovery.ErrNotImplemented
|
|
||||||
}
|
|
|
@ -1,43 +0,0 @@
|
||||||
package nodes
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/docker/swarm/discovery"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestInitialize(t *testing.T) {
|
|
||||||
d := &Discovery{}
|
|
||||||
d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0, 0, nil)
|
|
||||||
assert.Equal(t, len(d.entries), 2)
|
|
||||||
assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111")
|
|
||||||
assert.Equal(t, d.entries[1].String(), "2.2.2.2:2222")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestInitializeWithPattern(t *testing.T) {
|
|
||||||
d := &Discovery{}
|
|
||||||
d.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0, 0, nil)
|
|
||||||
assert.Equal(t, len(d.entries), 5)
|
|
||||||
assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111")
|
|
||||||
assert.Equal(t, d.entries[1].String(), "1.1.1.2:1111")
|
|
||||||
assert.Equal(t, d.entries[2].String(), "2.2.2.2:2222")
|
|
||||||
assert.Equal(t, d.entries[3].String(), "2.2.2.3:2222")
|
|
||||||
assert.Equal(t, d.entries[4].String(), "2.2.2.4:2222")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWatch(t *testing.T) {
|
|
||||||
d := &Discovery{}
|
|
||||||
d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0, 0, nil)
|
|
||||||
expected := discovery.Entries{
|
|
||||||
&discovery.Entry{Host: "1.1.1.1", Port: "1111"},
|
|
||||||
&discovery.Entry{Host: "2.2.2.2", Port: "2222"},
|
|
||||||
}
|
|
||||||
ch, _ := d.Watch(nil)
|
|
||||||
assert.True(t, expected.Equals(<-ch))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRegister(t *testing.T) {
|
|
||||||
d := &Discovery{}
|
|
||||||
assert.Error(t, d.Register("0.0.0.0"))
|
|
||||||
}
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/docker/swarm/discovery"
|
"github.com/docker/docker/pkg/discovery"
|
||||||
)
|
)
|
||||||
|
|
||||||
const discoveryURL = "https://discovery.hub.docker.com/v1"
|
const discoveryURL = "https://discovery.hub.docker.com/v1"
|
||||||
|
|
|
@ -4,7 +4,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/docker/swarm/discovery"
|
"github.com/docker/docker/pkg/discovery"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -209,8 +209,9 @@ swarm is connected to the public internet. To create your swarm:
|
||||||
|
|
||||||
You can contribute a new discovery backend to Swarm. For information on how to
|
You can contribute a new discovery backend to Swarm. For information on how to
|
||||||
do this, see <a
|
do this, see <a
|
||||||
href="https://github.com/docker/swarm/blob/master/discovery/README.md">our
|
href="https://github.com/docker/docker/tree/master/pkg/discovery">
|
||||||
discovery README in the Docker Swarm repository</a>.
|
github.com/docker/docker/pkg/discovery</a>.
|
||||||
|
|
||||||
|
|
||||||
## Docker Swarm documentation index
|
## Docker Swarm documentation index
|
||||||
|
|
||||||
|
|
6
main.go
6
main.go
|
@ -1,9 +1,9 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
_ "github.com/docker/swarm/discovery/file"
|
_ "github.com/docker/docker/pkg/discovery/file"
|
||||||
_ "github.com/docker/swarm/discovery/kv"
|
_ "github.com/docker/docker/pkg/discovery/kv"
|
||||||
_ "github.com/docker/swarm/discovery/nodes"
|
_ "github.com/docker/docker/pkg/discovery/nodes"
|
||||||
_ "github.com/docker/swarm/discovery/token"
|
_ "github.com/docker/swarm/discovery/token"
|
||||||
|
|
||||||
"github.com/docker/swarm/cli"
|
"github.com/docker/swarm/cli"
|
||||||
|
|
Loading…
Reference in New Issue