Initialize in the interface

Signed-off-by: Victor Vieux <vieux@docker.com>
This commit is contained in:
Victor Vieux 2014-12-11 23:23:10 +00:00
parent f95f943b4a
commit 1f9eac7fd1
7 changed files with 69 additions and 50 deletions

View File

@ -10,8 +10,6 @@ import (
log "github.com/Sirupsen/logrus"
)
type InitFunc func(url string, heartbeat int) (DiscoveryService, error)
type Node struct {
url string
}
@ -28,26 +26,27 @@ func (n Node) String() string {
}
type DiscoveryService interface {
Initialize(string, int) error
Fetch() ([]*Node, error)
Watch() <-chan time.Time
Register(string) error
}
var (
discoveries map[string]InitFunc
discoveries map[string]func() DiscoveryService
ErrNotSupported = errors.New("discovery service not supported")
)
func init() {
discoveries = make(map[string]InitFunc)
discoveries = make(map[string]func() DiscoveryService)
}
func Register(scheme string, initFunc InitFunc) error {
func Register(scheme string, f func() DiscoveryService) error {
if _, exists := discoveries[scheme]; exists {
return fmt.Errorf("scheme already registered %s", scheme)
}
log.Debugf("Registering %q discovery service", scheme)
discoveries[scheme] = initFunc
discoveries[scheme] = f
return nil
}
@ -58,9 +57,11 @@ func New(rawurl string, heartbeat int) (DiscoveryService, error) {
return nil, err
}
if initFct, exists := discoveries[url.Scheme]; exists {
if f, exists := discoveries[url.Scheme]; exists {
log.Debugf("Initialising %q discovery service with %q", url.Scheme, url.Host+url.Path)
return initFct(url.Host+url.Path, heartbeat)
discovery := f()
err := discovery.Initialize(url.Host+url.Path, heartbeat)
return discovery, err
}
return nil, ErrNotSupported

View File

@ -17,28 +17,40 @@ type EtcdDiscoveryService struct {
}
func init() {
discovery.Register("etcd", Init)
discovery.Register("etcd",
func() discovery.DiscoveryService {
return &EtcdDiscoveryService{}
},
)
}
func Init(uris string, heartbeat int) (discovery.DiscoveryService, error) {
func (s *EtcdDiscoveryService) Initialize(uris string, heartbeat int) error {
var (
// split here because uris can contain multiples ips
// like `etcd://192.168.0.1,192.168.0.2,192.168.0.3/path`
parts = strings.SplitN(uris, "/", 2)
ips = strings.Split(parts[0], ",")
machines []string
path = "/" + parts[1] + "/"
)
for _, ip := range ips {
machines = append(machines, "http://"+ip)
}
client := etcd.NewClient(machines)
ttl := uint64(heartbeat * 3 / 2)
client.CreateDir(path, ttl) // skip error check error because it might already exists
return EtcdDiscoveryService{client: client, path: path, ttl: ttl}, nil
s.client = etcd.NewClient(machines)
s.ttl = uint64(heartbeat * 3 / 2)
s.path = "/" + parts[1] + "/"
if _, err := s.client.CreateDir(s.path, s.ttl); err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
if etcdError.ErrorCode != 105 { // skip key already exists
return err
}
} else {
return err
}
}
return nil
}
func (s EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) {
func (s *EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) {
resp, err := s.client.Get(s.path, true, true)
if err != nil {
return nil, err
@ -52,7 +64,7 @@ func (s EtcdDiscoveryService) Fetch() ([]*discovery.Node, error) {
return nodes, nil
}
func (s EtcdDiscoveryService) Watch() <-chan time.Time {
func (s *EtcdDiscoveryService) Watch() <-chan time.Time {
watchChan := make(chan *etcd.Response)
timeChan := make(chan time.Time)
go s.client.Watch(s.path, 0, true, watchChan, nil)
@ -66,7 +78,7 @@ func (s EtcdDiscoveryService) Watch() <-chan time.Time {
return timeChan
}
func (s EtcdDiscoveryService) Register(addr string) error {
func (s *EtcdDiscoveryService) Register(addr string) error {
_, err := s.client.Set(path.Join(s.path, addr), addr, s.ttl)
return err
}

View File

@ -15,14 +15,20 @@ type FileDiscoveryService struct {
}
func init() {
discovery.Register("file", Init)
discovery.Register("file",
func() discovery.DiscoveryService {
return &FileDiscoveryService{}
},
)
}
func Init(file string, heartbeat int) (discovery.DiscoveryService, error) {
return FileDiscoveryService{path: file, heartbeat: heartbeat}, nil
func (s *FileDiscoveryService) Initialize(path string, heartbeat int) error {
s.path = path
s.heartbeat = heartbeat
return nil
}
func (s FileDiscoveryService) Fetch() ([]*discovery.Node, error) {
func (s *FileDiscoveryService) Fetch() ([]*discovery.Node, error) {
data, err := ioutil.ReadFile(s.path)
if err != nil {
return nil, err
@ -38,10 +44,10 @@ func (s FileDiscoveryService) Fetch() ([]*discovery.Node, error) {
return nodes, nil
}
func (s FileDiscoveryService) Watch() <-chan time.Time {
func (s *FileDiscoveryService) Watch() <-chan time.Time {
return time.Tick(time.Duration(s.heartbeat) * time.Second)
}
func (s FileDiscoveryService) Register(addr string) error {
func (s *FileDiscoveryService) Register(addr string) error {
return errors.New("unimplemented")
}

View File

@ -20,25 +20,28 @@ type TokenDiscoveryService struct {
}
func init() {
discovery.Register("token", Init)
discovery.Register("token",
func() discovery.DiscoveryService {
return &TokenDiscoveryService{}
},
)
}
func Init(urltoken string, heartbeat int) (discovery.DiscoveryService, error) {
func (s *TokenDiscoveryService) Initialize(urltoken string, heartbeat int) error {
if i := strings.LastIndex(urltoken, "/"); i != -1 {
return TokenDiscoveryService{url: "https://" + urltoken[:i], token: urltoken[i+1:], heartbeat: heartbeat}, nil
s.url = "https://" + urltoken[:i]
s.token = urltoken[i+1:]
} else {
s.url = DISCOVERY_URL
s.token = urltoken
}
s.heartbeat = heartbeat
return TokenDiscoveryService{url: DISCOVERY_URL, token: urltoken}, nil
}
func New(url string) *TokenDiscoveryService {
if url != "" {
return &TokenDiscoveryService{url: url}
}
return &TokenDiscoveryService{url: DISCOVERY_URL}
return nil
}
// FetchNodes returns the node for the discovery service at the specified endpoint
func (s TokenDiscoveryService) Fetch() ([]*discovery.Node, error) {
func (s *TokenDiscoveryService) Fetch() ([]*discovery.Node, error) {
resp, err := http.Get(fmt.Sprintf("%s/%s/%s", s.url, "clusters", s.token))
if err != nil {
return nil, err
@ -63,12 +66,12 @@ func (s TokenDiscoveryService) Fetch() ([]*discovery.Node, error) {
return nodes, nil
}
func (s TokenDiscoveryService) Watch() <-chan time.Time {
func (s *TokenDiscoveryService) Watch() <-chan time.Time {
return time.Tick(time.Duration(s.heartbeat) * time.Second)
}
// RegisterNode adds a new node identified by the into the discovery service
func (s TokenDiscoveryService) Register(addr string) error {
func (s *TokenDiscoveryService) Register(addr string) error {
buf := strings.NewReader(addr)
_, err := http.Post(fmt.Sprintf("%s/%s/%s", s.url,
@ -77,7 +80,7 @@ func (s TokenDiscoveryService) Register(addr string) error {
}
// CreateCluster returns a unique cluster token
func (s TokenDiscoveryService) CreateCluster() (string, error) {
func (s *TokenDiscoveryService) CreateCluster() (string, error) {
resp, err := http.Post(fmt.Sprintf("%s/%s", s.url, "clusters"), "", nil)
if err != nil {
return "", err

View File

@ -7,17 +7,14 @@ import (
)
func TestInit(t *testing.T) {
discovery, _ := Init("token", 0)
if dtoken, ok := discovery.(TokenDiscoveryService); ok {
assert.Equal(t, dtoken.token, "token")
assert.Equal(t, dtoken.url, DISCOVERY_URL)
}
discovery := &TokenDiscoveryService{}
discovery.Initialize("token", 0)
assert.Equal(t, discovery.token, "token")
assert.Equal(t, discovery.url, DISCOVERY_URL)
discovery, _ = Init("custom/path/token", 0)
if dtoken, ok := discovery.(TokenDiscoveryService); ok {
assert.Equal(t, dtoken.token, "token")
assert.Equal(t, dtoken.url, "https://custom/path")
}
discovery.Initialize("custom/path/token", 0)
assert.Equal(t, discovery.token, "token")
assert.Equal(t, discovery.url, "https://custom/path")
}
func TestRegister(t *testing.T) {

View File

@ -23,7 +23,7 @@ func join(c *cli.Context) {
log.Fatal(err)
}
hb := c.Duration("heartbeat")
hb := time.Duration(c.Int("heartbeat"))
for {
time.Sleep(hb * time.Second)
if err := d.Register(c.String("addr")); err != nil {

View File

@ -85,7 +85,7 @@ func main() {
ShortName: "c",
Usage: "create a cluster",
Action: func(c *cli.Context) {
token, err := token.New("").CreateCluster()
token, err := (&token.TokenDiscoveryService{}).CreateCluster()
if err != nil {
log.Fatal(err)
}