Fix Consul and etcd with latest changes, use etcd v2.0.11 for integration tests, remove call to SyncCluster for now (breaks the integration tests)

Signed-off-by: Alexandre Beslic <abronan@docker.com>
This commit is contained in:
Alexandre Beslic 2015-05-18 19:39:10 -07:00
parent 4408a9cc8c
commit f81de46ab4
19 changed files with 143 additions and 76 deletions

View File

@ -65,7 +65,7 @@ func Run() {
log.Fatalf("the `create` command takes no arguments. See '%s create --help'.", c.App.Name)
}
discovery := &token.Discovery{}
discovery.Initialize("", 0)
discovery.Initialize("", 0, 0)
token, err := discovery.CreateCluster()
if err != nil {
log.Fatal(err)
@ -88,7 +88,7 @@ func Run() {
log.Fatalf("invalid --timeout: %v", err)
}
d, err := discovery.New(dflag, timeout)
d, err := discovery.New(dflag, timeout, 0)
if err != nil {
log.Fatal(err)
}
@ -122,8 +122,8 @@ func Run() {
{
Name: "join",
ShortName: "j",
Usage: "Join a docker cluster",
Flags: []cli.Flag{flAddr, flHeartBeat},
Usage: "join a docker cluster",
Flags: []cli.Flag{flAddr, flHeartBeat, flTTL},
Action: join,
},
}

View File

@ -53,6 +53,11 @@ var (
Value: "25s",
Usage: "period between each heartbeat",
}
flTTL = cli.StringFlag{
Name: "time-to-live, ttl",
Value: "75s",
Usage: "sets the expiration of an ephemeral node",
}
flTimeout = cli.StringFlag{
Name: "timeout",
Value: "10s",

View File

@ -45,6 +45,7 @@ Options:
{{range .Flags}}{{.}}
{{end}}{{if (eq .Name "manage")}}{{printf "\t * swarm.overcommit=0.05\tovercommit to apply on resources"}}
{{printf "\t * swarm.discovery.heartbeat=25s\tperiod between each heartbeat"}}{{end}}{{ end }}
{{printf "\t * swarm.discovery.ttl=75s\ttime limit for a key to expire if ephemeral"}}{{end}}{{ end }}
`
}

View File

@ -27,7 +27,14 @@ func join(c *cli.Context) {
if hb < 1*time.Second {
log.Fatal("--heartbeat should be at least one second")
}
d, err := discovery.New(dflag, hb)
ttl, err := time.ParseDuration(c.String("ttl"))
if err != nil {
log.Fatalf("invalid --ttl: %v", err)
}
if ttl <= hb {
log.Fatal("--ttl must be strictly superior to the heartbeat value")
}
d, err := discovery.New(dflag, hb, ttl)
if err != nil {
log.Fatal(err)
}

View File

@ -56,6 +56,7 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *t
}
heartbeat := 25 * time.Second
ttl := 75 * time.Second
if opt, ok := options.String("swarm.discovery.heartbeat", ""); ok {
h, err := time.ParseDuration(opt)
if err != nil {
@ -67,8 +68,19 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *t
heartbeat = h
}
if opt, ok := options.String("swarm.discovery.ttl", ""); ok {
t, err := time.ParseDuration(opt)
if err != nil {
return nil, err
}
if t <= heartbeat {
return nil, fmt.Errorf("invalid ttl %s: must be strictly superior to heartbeat value", opt)
}
ttl = t
}
// Set up discovery.
cluster.discovery, err = discovery.New(dflag, heartbeat)
cluster.discovery, err = discovery.New(dflag, heartbeat, ttl)
if err != nil {
log.Fatal(err)
}

View File

@ -85,7 +85,7 @@ func (e Entries) Diff(cmp Entries) (Entries, Entries) {
// manage swarm host entries.
type Discovery interface {
// Initialize the discovery with URIs and a heartbeat.
Initialize(string, time.Duration) error
Initialize(string, time.Duration, time.Duration) error
// Watch the discovery for entry changes.
// Returns a channel that will receive changes or an error.
@ -133,12 +133,12 @@ func parse(rawurl string) (string, string) {
// New returns a new Discovery given a URL and heartbeat settings.
// Returns an error if the URL scheme is not supported.
func New(rawurl string, heartbeat time.Duration) (Discovery, error) {
func New(rawurl string, heartbeat time.Duration, ttl time.Duration) (Discovery, error) {
scheme, uri := parse(rawurl)
if discovery, exists := discoveries[scheme]; exists {
log.WithFields(log.Fields{"name": scheme, "uri": uri}).Debug("Initializing discovery service")
err := discovery.Initialize(uri, heartbeat)
err := discovery.Initialize(uri, heartbeat, ttl)
return discovery, err
}

View File

@ -20,7 +20,7 @@ func init() {
}
// Initialize is exported
func (s *Discovery) Initialize(path string, heartbeat time.Duration) error {
func (s *Discovery) Initialize(path string, heartbeat time.Duration, ttl time.Duration) error {
s.path = path
s.heartbeat = heartbeat
return nil

View File

@ -11,12 +11,12 @@ import (
func TestInitialize(t *testing.T) {
d := &Discovery{}
d.Initialize("/path/to/file", 0)
d.Initialize("/path/to/file", 0, 0)
assert.Equal(t, d.path, "/path/to/file")
}
func TestNew(t *testing.T) {
d, err := discovery.New("file:///path/to/file", 0)
d, err := discovery.New("file:///path/to/file", 0, 0)
assert.NoError(t, err)
assert.Equal(t, d.(*Discovery).path, "/path/to/file")
}
@ -73,7 +73,7 @@ func TestWatch(t *testing.T) {
// Set up file discovery.
d := &Discovery{}
d.Initialize(tmp.Name(), 1)
d.Initialize(tmp.Name(), 1, 0)
stopCh := make(chan struct{})
ch, errCh := d.Watch(stopCh)

View File

@ -16,6 +16,7 @@ type Discovery struct {
backend store.Backend
store store.Store
heartbeat time.Duration
ttl time.Duration
prefix string
}
@ -26,7 +27,7 @@ func init() {
}
// Initialize is exported
func (s *Discovery) Initialize(uris string, heartbeat time.Duration) error {
func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Duration) error {
var (
parts = strings.SplitN(uris, "/", 2)
ips = strings.Split(parts[0], ",")
@ -43,11 +44,19 @@ func (s *Discovery) Initialize(uris string, heartbeat time.Duration) error {
}
s.heartbeat = heartbeat
s.ttl = ttl
s.prefix = parts[1]
// Creates a new store, will ignore options given
// if not supported by the chosen store
s.store, err = store.CreateStore(s.backend, addrs, nil)
s.store, err = store.CreateStore(
s.backend,
addrs,
&store.Config{
Heartbeat: s.heartbeat,
EphemeralTTL: s.ttl,
},
)
return err
}
@ -116,13 +125,5 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c
// Register is exported
func (s *Discovery) Register(addr string) error {
opts := &store.WriteOptions{Ephemeral: true}
err := s.store.Put(path.Join(s.prefix, addr), []byte(addr), opts)
return err
}
func convertToStringArray(entries []*store.KVPair) (addrs []string) {
for _, entry := range entries {
addrs = append(addrs, string(entry.Value))
}
return addrs
return s.store.Put(path.Join(s.prefix, addr), []byte(addr), opts)
}

View File

@ -13,17 +13,17 @@ import (
func TestInitialize(t *testing.T) {
d := &Discovery{backend: store.MOCK}
assert.EqualError(t, d.Initialize("127.0.0.1", 0), "invalid format \"127.0.0.1\", missing <path>")
assert.EqualError(t, d.Initialize("127.0.0.1", 0, 0), "invalid format \"127.0.0.1\", missing <path>")
d = &Discovery{backend: store.MOCK}
assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0))
assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0, 0))
s := d.store.(*store.Mock)
assert.Len(t, s.Endpoints, 1)
assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234")
assert.Equal(t, d.prefix, "path")
d = &Discovery{backend: store.MOCK}
assert.NoError(t, d.Initialize("127.0.0.1:1234,127.0.0.2:1234,127.0.0.3:1234/path", 0))
assert.NoError(t, d.Initialize("127.0.0.1:1234,127.0.0.2:1234,127.0.0.3:1234/path", 0, 0))
s = d.store.(*store.Mock)
assert.Len(t, s.Endpoints, 3)
assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234")
@ -34,7 +34,7 @@ func TestInitialize(t *testing.T) {
func TestWatch(t *testing.T) {
d := &Discovery{backend: store.MOCK}
assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0))
assert.NoError(t, d.Initialize("127.0.0.1:1234/path", 0, 0))
s := d.store.(*store.Mock)
mockCh := make(chan []*store.KVPair)

View File

@ -17,7 +17,7 @@ func init() {
}
// Initialize is exported
func (s *Discovery) Initialize(uris string, _ time.Duration) error {
func (s *Discovery) Initialize(uris string, _ time.Duration, _ time.Duration) error {
for _, input := range strings.Split(uris, ",") {
for _, ip := range discovery.Generate(input) {
entry, err := discovery.NewEntry(ip)

View File

@ -9,7 +9,7 @@ import (
func TestInitialize(t *testing.T) {
d := &Discovery{}
d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0)
d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0, 0)
assert.Equal(t, len(d.entries), 2)
assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111")
assert.Equal(t, d.entries[1].String(), "2.2.2.2:2222")
@ -17,7 +17,7 @@ func TestInitialize(t *testing.T) {
func TestInitializeWithPattern(t *testing.T) {
d := &Discovery{}
d.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0)
d.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0, 0)
assert.Equal(t, len(d.entries), 5)
assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111")
assert.Equal(t, d.entries[1].String(), "1.1.1.2:1111")
@ -28,7 +28,7 @@ func TestInitializeWithPattern(t *testing.T) {
func TestWatch(t *testing.T) {
d := &Discovery{}
d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0)
d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0, 0)
expected := discovery.Entries{
&discovery.Entry{Host: "1.1.1.1", Port: "1111"},
&discovery.Entry{Host: "2.2.2.2", Port: "2222"},

View File

@ -18,6 +18,7 @@ const DiscoveryURL = "https://discovery-stage.hub.docker.com/v1"
// Discovery is exported
type Discovery struct {
heartbeat time.Duration
ttl time.Duration
url string
token string
}
@ -27,7 +28,7 @@ func init() {
}
// Initialize is exported
func (s *Discovery) Initialize(urltoken string, heartbeat time.Duration) error {
func (s *Discovery) Initialize(urltoken string, heartbeat time.Duration, ttl time.Duration) error {
if i := strings.LastIndex(urltoken, "/"); i != -1 {
s.url = "https://" + urltoken[:i]
s.token = urltoken[i+1:]
@ -40,6 +41,7 @@ func (s *Discovery) Initialize(urltoken string, heartbeat time.Duration) error {
return errors.New("token is empty")
}
s.heartbeat = heartbeat
s.ttl = ttl
return nil
}

View File

@ -10,17 +10,17 @@ import (
func TestInitialize(t *testing.T) {
discovery := &Discovery{}
err := discovery.Initialize("token", 0)
err := discovery.Initialize("token", 0, 0)
assert.NoError(t, err)
assert.Equal(t, discovery.token, "token")
assert.Equal(t, discovery.url, DiscoveryURL)
err = discovery.Initialize("custom/path/token", 0)
err = discovery.Initialize("custom/path/token", 0, 0)
assert.NoError(t, err)
assert.Equal(t, discovery.token, "token")
assert.Equal(t, discovery.url, "https://custom/path")
err = discovery.Initialize("", 0)
err = discovery.Initialize("", 0, 0)
assert.Error(t, err)
}

View File

@ -21,7 +21,9 @@ const (
type Consul struct {
config *api.Config
client *api.Client
ephemeralTTL time.Duration
ephemeralSession string
sessions map[string]string
}
type consulLock struct {
@ -32,6 +34,7 @@ type consulLock struct {
// a list of endpoints and optional tls config
func InitializeConsul(endpoints []string, options *Config) (Store, error) {
s := &Consul{}
s.sessions = make(map[string]string)
// Create Consul client
config := api.DefaultConfig()
@ -48,6 +51,9 @@ func InitializeConsul(endpoints []string, options *Config) (Store, error) {
if options.ConnectionTimeout != 0 {
s.setTimeout(options.ConnectionTimeout)
}
if options.EphemeralTTL != 0 {
s.setEphemeralTTL(options.EphemeralTTL)
}
}
// Creates a new client
@ -58,17 +64,6 @@ func InitializeConsul(endpoints []string, options *Config) (Store, error) {
}
s.client = client
// Create global ephemeral keys session
entry := &api.SessionEntry{
Behavior: api.SessionBehaviorDelete,
TTL: time.Duration(EphemeralTTL * time.Second).String(),
}
session, _, err := s.client.Session().Create(entry, nil)
if err != nil {
return nil, err
}
s.ephemeralSession = session
return s, nil
}
@ -85,6 +80,11 @@ func (s *Consul) setTimeout(time time.Duration) {
s.config.WaitTime = time
}
// SetEphemeralTTL sets the ttl for ephemeral nodes
func (s *Consul) setEphemeralTTL(time time.Duration) {
s.ephemeralTTL = time
}
// Normalize the key for usage in Consul
func (s *Consul) normalize(key string) string {
key = normalize(key)
@ -106,34 +106,61 @@ func (s *Consul) Get(key string) (*KVPair, error) {
// Put a value at "key"
func (s *Consul) Put(key string, value []byte, opts *WriteOptions) error {
key = s.normalize(key)
p := &api.KVPair{
Key: s.normalize(key),
Key: key,
Value: value,
}
if opts != nil && opts.Ephemeral {
p.Session = s.ephemeralSession
if _, ok := s.sessions[key]; !ok {
entry := &api.SessionEntry{
Behavior: api.SessionBehaviorDelete,
TTL: time.Duration(60 * time.Second).String(),
}
// Create lock option with the
// EphemeralSession
lockOpts := &api.LockOptions{
Key: key,
Session: s.ephemeralSession,
}
// Create global ephemeral keys session
session, _, err := s.client.Session().Create(entry, nil)
if err != nil {
return err
}
// Lock and ignore if lock is held
// It's just a placeholder for the
// ephemeral behavior
lock, _ := s.client.LockOpts(lockOpts)
if lock != nil {
lock.Lock(nil)
}
// Create lock option with the
// EphemeralSession
lockOpts := &api.LockOptions{
Key: key,
Session: session,
}
// Try to renew the session
_, _, err := s.client.Session().Renew(p.Session, nil)
if err != nil {
return err
// Lock and ignore if lock is held
// It's just a placeholder for the
// ephemeral behavior
lock, _ := s.client.LockOpts(lockOpts)
if lock != nil {
lock.Lock(nil)
}
// Register in sessions map
s.sessions[key] = session
// Renew the session periodically
go func() {
ticker := time.NewTicker(20 * time.Second)
for {
select {
case <-ticker.C:
_, _, err := s.client.Session().Renew(p.Session, nil)
if err != nil {
delete(s.sessions, key)
return
}
}
}
}()
}
p.Session = s.sessions[key]
}
_, err := s.client.KV().Put(p, nil)

View File

@ -12,7 +12,8 @@ import (
// Etcd embeds the client
type Etcd struct {
client *etcd.Client
client *etcd.Client
ephemeralTTL time.Duration
}
// InitializeEtcd creates a new Etcd client given
@ -31,10 +32,11 @@ func InitializeEtcd(addrs []string, options *Config) (Store, error) {
if options.ConnectionTimeout != 0 {
s.setTimeout(options.ConnectionTimeout)
}
if options.EphemeralTTL != 0 {
s.setEphemeralTTL(options.EphemeralTTL)
}
}
// FIXME sync on each operation?
s.client.SyncCluster()
return s, nil
}
@ -66,6 +68,11 @@ func (s *Etcd) setTimeout(time time.Duration) {
s.client.SetDialTimeout(time)
}
// SetHeartbeat sets the heartbeat value to notify we are alive
func (s *Etcd) setEphemeralTTL(time time.Duration) {
s.ephemeralTTL = time
}
// Create the entire path for a directory that does not exist
func (s *Etcd) createDirectory(path string) error {
if _, err := s.client.CreateDir(normalize(path), 10); err != nil {
@ -100,24 +107,23 @@ func (s *Etcd) Get(key string) (*KVPair, error) {
func (s *Etcd) Put(key string, value []byte, opts *WriteOptions) error {
// Default TTL = 0 means no expiration
ttl := DefaultTTL
var ttl uint64
if opts != nil && opts.Ephemeral {
ttl = EphemeralTTL
ttl = uint64(s.ephemeralTTL.Seconds())
}
if _, err := s.client.Set(key, string(value), uint64(ttl)); err != nil {
if _, err := s.client.Set(key, string(value), ttl); err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
if etcdError.ErrorCode == 104 { // Not a directory
// Remove the last element (the actual key) and set the prefix as a dir
err = s.createDirectory(getDirectory(key))
if _, err := s.client.Set(key, string(value), uint64(ttl)); err != nil {
if _, err := s.client.Set(key, string(value), ttl); err != nil {
return err
}
}
}
return err
}
return nil
}

View File

@ -43,6 +43,8 @@ var (
type Config struct {
TLS *tls.Config
ConnectionTimeout time.Duration
Heartbeat time.Duration
EphemeralTTL time.Duration
}
// Store represents the backend K/V storage

View File

@ -22,7 +22,7 @@ type zookeeperLock struct {
// given a list of endpoints and optional tls config
func InitializeZookeeper(endpoints []string, options *Config) (Store, error) {
s := &Zookeeper{}
s.timeout = 5 * time.Second // default timeout
s.timeout = 10 * time.Second // default timeout
// Set options
if options != nil {

View File

@ -12,7 +12,11 @@ DISCOVERY="etcd://${STORE_HOST}/test"
CONTAINER_NAME=swarm_etcd
function start_store() {
docker_host run -p $STORE_HOST:4001 --name=$CONTAINER_NAME -d coreos/etcd
docker_host run -p $STORE_HOST:4001 \
--name=$CONTAINER_NAME -d \
quay.io/coreos/etcd:v2.0.11 \
--listen-client-urls=http://0.0.0.0:2379,http://0.0.0.0:4001 \
--advertise-client-urls=http://$STORE_HOST
}
function stop_store() {