Merge pull request #851 from aluzzardi/discovery-prefix

discovery: KV Path is now an optional prefix.
This commit is contained in:
Victor Vieux 2015-05-26 14:57:56 -07:00
commit 62e46100cb
3 changed files with 39 additions and 33 deletions

View File

@ -11,13 +11,17 @@ import (
"github.com/docker/swarm/pkg/store" "github.com/docker/swarm/pkg/store"
) )
const (
discoveryPath = "docker/swarm/nodes"
)
// Discovery is exported // Discovery is exported
type Discovery struct { type Discovery struct {
backend store.Backend backend store.Backend
store store.Store store store.Store
heartbeat time.Duration heartbeat time.Duration
ttl time.Duration ttl time.Duration
prefix string path string
} }
func init() { func init() {
@ -35,22 +39,19 @@ func Init() {
func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Duration) error { func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Duration) error {
var ( var (
parts = strings.SplitN(uris, "/", 2) parts = strings.SplitN(uris, "/", 2)
ips = strings.Split(parts[0], ",") addrs = strings.Split(parts[0], ",")
addrs []string prefix = ""
err error err error
) )
if len(parts) != 2 { // A custom prefix to the path can be optionally used.
return fmt.Errorf("invalid format %q, missing <path>", uris) if len(parts) == 2 {
} prefix = parts[1]
for _, ip := range ips {
addrs = append(addrs, ip)
} }
s.heartbeat = heartbeat s.heartbeat = heartbeat
s.ttl = ttl s.ttl = ttl
s.prefix = parts[1] s.path = path.Join(prefix, discoveryPath)
// Creates a new store, will ignore options given // Creates a new store, will ignore options given
// if not supported by the chosen store // if not supported by the chosen store
@ -108,7 +109,7 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c
// Will only stop if we receive a stopCh request. // Will only stop if we receive a stopCh request.
for { for {
// Set up a watch. // Set up a watch.
watchCh, err := s.store.WatchTree(s.prefix, stopCh) watchCh, err := s.store.WatchTree(s.path, stopCh)
if err != nil { if err != nil {
errCh <- err errCh <- err
} else { } else {
@ -129,7 +130,7 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c
// Register is exported // Register is exported
func (s *Discovery) Register(addr string) error { func (s *Discovery) Register(addr string) error {
opts := &store.WriteOptions{Ephemeral: true, Heartbeat: s.heartbeat} opts := &store.WriteOptions{Ephemeral: true, Heartbeat: s.heartbeat}
return s.store.Put(path.Join(s.prefix, addr), []byte(addr), opts) return s.store.Put(path.Join(s.path, addr), []byte(addr), opts)
} }
// Store returns the underlying store used by KV discovery. // Store returns the underlying store used by KV discovery.

View File

@ -2,6 +2,7 @@ package kv
import ( import (
"errors" "errors"
"path"
"testing" "testing"
"time" "time"
@ -13,14 +14,18 @@ import (
func TestInitialize(t *testing.T) { func TestInitialize(t *testing.T) {
d := &Discovery{backend: store.MOCK} d := &Discovery{backend: store.MOCK}
assert.EqualError(t, d.Initialize("127.0.0.1", 0, 0), "invalid format \"127.0.0.1\", missing <path>") assert.NoError(t, d.Initialize("127.0.0.1", 0, 0))
s := d.store.(*store.Mock)
assert.Len(t, s.Endpoints, 1)
assert.Equal(t, s.Endpoints[0], "127.0.0.1")
assert.Equal(t, d.path, discoveryPath)
d = &Discovery{backend: store.MOCK} d = &Discovery{backend: store.MOCK}
assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0, 0)) assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0, 0))
s := d.store.(*store.Mock) s = d.store.(*store.Mock)
assert.Len(t, s.Endpoints, 1) assert.Len(t, s.Endpoints, 1)
assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234") assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234")
assert.Equal(t, d.prefix, "path") assert.Equal(t, d.path, "path/"+discoveryPath)
d = &Discovery{backend: store.MOCK} d = &Discovery{backend: store.MOCK}
assert.NoError(t, d.Initialize("127.0.0.1:1234,127.0.0.2:1234,127.0.0.3:1234/path", 0, 0)) assert.NoError(t, d.Initialize("127.0.0.1:1234,127.0.0.2:1234,127.0.0.3:1234/path", 0, 0))
@ -29,7 +34,7 @@ func TestInitialize(t *testing.T) {
assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234") assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234")
assert.Equal(t, s.Endpoints[1], "127.0.0.2:1234") assert.Equal(t, s.Endpoints[1], "127.0.0.2:1234")
assert.Equal(t, s.Endpoints[2], "127.0.0.3:1234") assert.Equal(t, s.Endpoints[2], "127.0.0.3:1234")
assert.Equal(t, d.prefix, "path") assert.Equal(t, d.path, "path/"+discoveryPath)
} }
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
@ -40,16 +45,16 @@ func TestWatch(t *testing.T) {
mockCh := make(chan []*store.KVPair) mockCh := make(chan []*store.KVPair)
// The first watch will fail. // The first watch will fail.
s.On("WatchTree", "path", mock.Anything).Return(mockCh, errors.New("test error")).Once() s.On("WatchTree", "path/"+discoveryPath, mock.Anything).Return(mockCh, errors.New("test error")).Once()
// The second one will succeed. // The second one will succeed.
s.On("WatchTree", "path", mock.Anything).Return(mockCh, nil).Once() s.On("WatchTree", "path/"+discoveryPath, mock.Anything).Return(mockCh, nil).Once()
expected := discovery.Entries{ expected := discovery.Entries{
&discovery.Entry{Host: "1.1.1.1", Port: "1111"}, &discovery.Entry{Host: "1.1.1.1", Port: "1111"},
&discovery.Entry{Host: "2.2.2.2", Port: "2222"}, &discovery.Entry{Host: "2.2.2.2", Port: "2222"},
} }
kvs := []*store.KVPair{ kvs := []*store.KVPair{
{Key: "path/1.1.1.1", Value: []byte("1.1.1.1:1111")}, {Key: path.Join("path", discoveryPath, "1.1.1.1"), Value: []byte("1.1.1.1:1111")},
{Key: "path/1.1.1.1", Value: []byte("2.2.2.2:2222")}, {Key: path.Join("path", discoveryPath, "2.2.2.2"), Value: []byte("2.2.2.2:2222")},
} }
stopCh := make(chan struct{}) stopCh := make(chan struct{})
@ -69,13 +74,13 @@ func TestWatch(t *testing.T) {
// Add a new entry. // Add a new entry.
expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"}) expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"})
kvs = append(kvs, &store.KVPair{Key: "path/3.3.3.3", Value: []byte("3.3.3.3:3333")}) kvs = append(kvs, &store.KVPair{Key: path.Join("path", discoveryPath, "3.3.3.3"), Value: []byte("3.3.3.3:3333")})
mockCh <- kvs mockCh <- kvs
assert.Equal(t, <-ch, expected) assert.Equal(t, <-ch, expected)
// Make sure that if an error occurs it retries. // Make sure that if an error occurs it retries.
// This third call to WatchTree will be checked later by AssertExpectations. // This third call to WatchTree will be checked later by AssertExpectations.
s.On("WatchTree", "path", mock.Anything).Return(mockCh, nil) s.On("WatchTree", "path/"+discoveryPath, mock.Anything).Return(mockCh, nil)
close(mockCh) close(mockCh)
// Give it enough time to call WatchTree. // Give it enough time to call WatchTree.
time.Sleep(3) time.Sleep(3)

View File

@ -95,13 +95,13 @@ On each of your nodes, start the Swarm agent. The node IP address
doesn't have to be public as long as the swarm manager can access it. doesn't have to be public as long as the swarm manager can access it.
```bash ```bash
swarm join --addr=<node_ip:2375> etcd://<etcd_ip>/<path> swarm join --addr=<node_ip:2375> etcd://<etcd_addr1>,<etcd_addr2>/<optional path prefix>
``` ```
Start the manager on any machine or your laptop. Start the manager on any machine or your laptop.
```bash ```bash
swarm manage -H tcp://<swarm_ip:swarm_port> etcd://<etcd_ip>/<path> swarm manage -H tcp://<swarm_ip:swarm_port> etcd://<etcd_addr1>,<etcd_addr2>/<optional path prefix>
``` ```
And then use the regular Docker commands. And then use the regular Docker commands.
@ -117,7 +117,7 @@ docker -H tcp://<swarm_ip:swarm_port> logs ...
You can list the nodes in your cluster. You can list the nodes in your cluster.
```bash ```bash
swarm list etcd://<etcd_ip>/<path> swarm list etcd://<etcd_addr1>,<etcd_addr2>/<optional path prefix>
<node_ip:2375> <node_ip:2375>
``` ```
@ -127,13 +127,13 @@ On each of your nodes, start the Swarm agent. The node IP address
doesn't need to be public as long as the Swarm manager can access it. doesn't need to be public as long as the Swarm manager can access it.
```bash ```bash
swarm join --addr=<node_ip:2375> consul://<consul_addr>/<path> swarm join --addr=<node_ip:2375> consul://<consul_addr>/<optional path prefix>
``` ```
Start the manager on any machine or your laptop. Start the manager on any machine or your laptop.
```bash ```bash
swarm manage -H tcp://<swarm_ip:swarm_port> consul://<consul_addr>/<path> swarm manage -H tcp://<swarm_ip:swarm_port> consul://<consul_addr>/<optional path prefix>
``` ```
And then use the regular Docker commands. And then use the regular Docker commands.
@ -149,7 +149,7 @@ docker -H tcp://<swarm_ip:swarm_port> logs ...
You can list the nodes in your cluster. You can list the nodes in your cluster.
```bash ```bash
swarm list consul://<consul_addr>/<path> swarm list consul://<consul_addr>/<optional path prefix>
<node_ip:2375> <node_ip:2375>
``` ```
@ -159,13 +159,13 @@ On each of your nodes, start the Swarm agent. The node IP doesn't have
to be public as long as the swarm manager can access it. to be public as long as the swarm manager can access it.
```bash ```bash
swarm join --addr=<node_ip:2375> zk://<zookeeper_addr1>,<zookeeper_addr2>/<path> swarm join --addr=<node_ip:2375> zk://<zookeeper_addr1>,<zookeeper_addr2>/<optional path prefix>
``` ```
Start the manager on any machine or your laptop. Start the manager on any machine or your laptop.
```bash ```bash
swarm manage -H tcp://<swarm_ip:swarm_port> zk://<zookeeper_addr1>,<zookeeper_addr2>/<path> swarm manage -H tcp://<swarm_ip:swarm_port> zk://<zookeeper_addr1>,<zookeeper_addr2>/<optional path prefix>
``` ```
You can then use the regular Docker commands. You can then use the regular Docker commands.
@ -181,7 +181,7 @@ docker -H tcp://<swarm_ip:swarm_port> logs ...
You can list the nodes in the cluster. You can list the nodes in the cluster.
```bash ```bash
swarm list zk://<zookeeper_addr1>,<zookeeper_addr2>/<path> swarm list zk://<zookeeper_addr1>,<zookeeper_addr2>/<optional path prefix>
<node_ip:2375> <node_ip:2375>
``` ```