use 1.5*heartbeat as TTL in etcd

Signed-off-by: Victor Vieux <vieux@docker.com>
This commit is contained in:
Victor Vieux 2014-12-11 22:18:54 +00:00
parent 2282fc89ef
commit f95f943b4a
8 changed files with 31 additions and 29 deletions

View File

@ -10,7 +10,7 @@ import (
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
) )
type InitFunc func(url string) (DiscoveryService, error) type InitFunc func(url string, heartbeat int) (DiscoveryService, error)
type Node struct { type Node struct {
url string url string
@ -29,7 +29,7 @@ func (n Node) String() string {
type DiscoveryService interface { type DiscoveryService interface {
Fetch() ([]*Node, error) Fetch() ([]*Node, error)
Watch(int) <-chan time.Time Watch() <-chan time.Time
Register(string) error Register(string) error
} }
@ -52,7 +52,7 @@ func Register(scheme string, initFunc InitFunc) error {
return nil return nil
} }
func New(rawurl string) (DiscoveryService, error) { func New(rawurl string, heartbeat int) (DiscoveryService, error) {
url, err := url.Parse(rawurl) url, err := url.Parse(rawurl)
if err != nil { if err != nil {
return nil, err return nil, err
@ -60,7 +60,7 @@ func New(rawurl string) (DiscoveryService, error) {
if initFct, exists := discoveries[url.Scheme]; exists { if initFct, exists := discoveries[url.Scheme]; exists {
log.Debugf("Initialising %q discovery service with %q", url.Scheme, url.Host+url.Path) log.Debugf("Initialising %q discovery service with %q", url.Scheme, url.Host+url.Path)
return initFct(url.Host + url.Path) return initFct(url.Host+url.Path, heartbeat)
} }
return nil, ErrNotSupported return nil, ErrNotSupported

View File

@ -10,9 +10,8 @@ import (
"github.com/docker/swarm/discovery" "github.com/docker/swarm/discovery"
) )
const DEFAULT_TTL = 30
type EtcdDiscoveryService struct { type EtcdDiscoveryService struct {
ttl uint64
client *etcd.Client client *etcd.Client
path string path string
} }
@ -21,7 +20,7 @@ func init() {
discovery.Register("etcd", Init) discovery.Register("etcd", Init)
} }
func Init(uris string) (discovery.DiscoveryService, error) { func Init(uris string, heartbeat int) (discovery.DiscoveryService, error) {
var ( var (
// split here because uris can contain multiples ips // split here because uris can contain multiples ips
// like `etcd://192.168.0.1,192.168.0.2,192.168.0.3/path` // like `etcd://192.168.0.1,192.168.0.2,192.168.0.3/path`
@ -35,8 +34,9 @@ func Init(uris string) (discovery.DiscoveryService, error) {
} }
client := etcd.NewClient(machines) client := etcd.NewClient(machines)
client.CreateDir(path, DEFAULT_TTL) // skip error check error because it might already exists ttl := uint64(heartbeat * 3 / 2)
return EtcdDiscoveryService{client: client, path: path}, nil client.CreateDir(path, ttl) // skip error check error because it might already exists
return EtcdDiscoveryService{client: client, path: path, ttl: ttl}, nil
} }
func (s EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) { func (s EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) {
resp, err := s.client.Get(s.path, true, true) resp, err := s.client.Get(s.path, true, true)
@ -52,7 +52,7 @@ func (s EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) {
return nodes, nil return nodes, nil
} }
func (s EtcdDiscoveryService) Watch(heartbeat int) <-chan time.Time { func (s EtcdDiscoveryService) Watch() <-chan time.Time {
watchChan := make(chan *etcd.Response) watchChan := make(chan *etcd.Response)
timeChan := make(chan time.Time) timeChan := make(chan time.Time)
go s.client.Watch(s.path, 0, true, watchChan, nil) go s.client.Watch(s.path, 0, true, watchChan, nil)
@ -67,6 +67,6 @@ func (s EtcdDiscoveryService) Watch(heartbeat int) <-chan time.Time {
} }
func (s EtcdDiscoveryService) Register(addr string) error { func (s EtcdDiscoveryService) Register(addr string) error {
_, err := s.client.Set(path.Join(s.path, addr), addr, DEFAULT_TTL) _, err := s.client.Set(path.Join(s.path, addr), addr, s.ttl)
return err return err
} }

View File

@ -10,15 +10,16 @@ import (
) )
type FileDiscoveryService struct { type FileDiscoveryService struct {
path string heartbeat int
path string
} }
func init() { func init() {
discovery.Register("file", Init) discovery.Register("file", Init)
} }
func Init(file string) (discovery.DiscoveryService, error) { func Init(file string, heartbeat int) (discovery.DiscoveryService, error) {
return FileDiscoveryService{path: file}, nil return FileDiscoveryService{path: file, heartbeat: heartbeat}, nil
} }
func (s FileDiscoveryService) Fetch() ([]*discovery.Node, error) { func (s FileDiscoveryService) Fetch() ([]*discovery.Node, error) {
@ -37,8 +38,8 @@ func (s FileDiscoveryService) Fetch() ([]*discovery.Node, error) {
return nodes, nil return nodes, nil
} }
func (s FileDiscoveryService) Watch(heartbeat int) <-chan time.Time { func (s FileDiscoveryService) Watch() <-chan time.Time {
return time.Tick(time.Duration(heartbeat) * time.Second) return time.Tick(time.Duration(s.heartbeat) * time.Second)
} }
func (s FileDiscoveryService) Register(addr string) error { func (s FileDiscoveryService) Register(addr string) error {

View File

@ -14,17 +14,18 @@ import (
const DISCOVERY_URL = "https://discovery-stage.hub.docker.com/v1" const DISCOVERY_URL = "https://discovery-stage.hub.docker.com/v1"
type TokenDiscoveryService struct { type TokenDiscoveryService struct {
url string heartbeat int
token string url string
token string
} }
func init() { func init() {
discovery.Register("token", Init) discovery.Register("token", Init)
} }
func Init(urltoken string) (discovery.DiscoveryService, error) { func Init(urltoken string, heartbeat int) (discovery.DiscoveryService, error) {
if i := strings.LastIndex(urltoken, "/"); i != -1 { if i := strings.LastIndex(urltoken, "/"); i != -1 {
return TokenDiscoveryService{url: "https://" + urltoken[:i], token: urltoken[i+1:]}, nil return TokenDiscoveryService{url: "https://" + urltoken[:i], token: urltoken[i+1:], heartbeat: heartbeat}, nil
} }
return TokenDiscoveryService{url: DISCOVERY_URL, token: urltoken}, nil return TokenDiscoveryService{url: DISCOVERY_URL, token: urltoken}, nil
@ -62,8 +63,8 @@ func (s TokenDiscoveryService) Fetch() ([]*discovery.Node, error) {
return nodes, nil return nodes, nil
} }
func (s TokenDiscoveryService) Watch(heartbeat int) <-chan time.Time { func (s TokenDiscoveryService) Watch() <-chan time.Time {
return time.Tick(time.Duration(heartbeat) * time.Second) return time.Tick(time.Duration(s.heartbeat) * time.Second)
} }
// RegisterNode adds a new node identified by the into the discovery service // RegisterNode adds a new node identified by the into the discovery service

View File

@ -7,13 +7,13 @@ import (
) )
func TestInit(t *testing.T) { func TestInit(t *testing.T) {
discovery, _ := Init("token") discovery, _ := Init("token", 0)
if dtoken, ok := discovery.(TokenDiscoveryService); ok { if dtoken, ok := discovery.(TokenDiscoveryService); ok {
assert.Equal(t, dtoken.token, "token") assert.Equal(t, dtoken.token, "token")
assert.Equal(t, dtoken.url, DISCOVERY_URL) assert.Equal(t, dtoken.url, DISCOVERY_URL)
} }
discovery, _ = Init("custom/path/token") discovery, _ = Init("custom/path/token", 0)
if dtoken, ok := discovery.(TokenDiscoveryService); ok { if dtoken, ok := discovery.(TokenDiscoveryService); ok {
assert.Equal(t, dtoken.token, "token") assert.Equal(t, dtoken.token, "token")
assert.Equal(t, dtoken.url, "https://custom/path") assert.Equal(t, dtoken.url, "https://custom/path")

View File

@ -14,7 +14,7 @@ func join(c *cli.Context) {
log.Fatal("--discovery required to join a cluster") log.Fatal("--discovery required to join a cluster")
} }
d, err := discovery.New(c.String("discovery")) d, err := discovery.New(c.String("discovery"), c.Int("heartbeat"))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -23,7 +23,7 @@ func join(c *cli.Context) {
log.Fatal(err) log.Fatal(err)
} }
hb := time.Duration(c.Int("heartbeat")) hb := c.Duration("heartbeat")
for { for {
time.Sleep(hb * time.Second) time.Sleep(hb * time.Second)
if err := d.Register(c.String("addr")); err != nil { if err := d.Register(c.String("addr")); err != nil {

View File

@ -102,7 +102,7 @@ func main() {
log.Fatal("--discovery required to list a cluster") log.Fatal("--discovery required to list a cluster")
} }
d, err := discovery.New(c.String("discovery")) d, err := discovery.New(c.String("discovery"), 0)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@ -94,7 +94,7 @@ func manage(c *cli.Context) {
go func() { go func() {
if c.String("discovery") != "" { if c.String("discovery") != "" {
d, err := discovery.New(c.String("discovery")) d, err := discovery.New(c.String("discovery"), c.Int("heartbeat"))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -107,7 +107,7 @@ func manage(c *cli.Context) {
refresh(cluster, nodes) refresh(cluster, nodes)
go func() { go func() {
for _ = range d.Watch(c.Int("heartbeat")) { for _ = range d.Watch() {
nodes, err = d.Fetch() nodes, err = d.Fetch()
if err == nil { if err == nil {
refresh(cluster, nodes) refresh(cluster, nodes)