Cleanup events handler.

- Renamed Events() to RegisterEventHandler() in Engine.
- Added RegisterEventHandler() to the Cluster interface.
- Removed EventHandler requirement from swarm's Cluster constructor.
- Make API's events handler private.

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2015-04-15 15:59:55 -07:00
parent 35c6eec263
commit 442217d6dc
8 changed files with 43 additions and 23 deletions

View File

@ -28,7 +28,7 @@ const APIVERSION = "1.16"
type context struct {
cluster cluster.Cluster
eventsHandler *EventsHandler
eventsHandler *eventsHandler
debug bool
tlsConfig *tls.Config
}

View File

@ -10,7 +10,7 @@ import (
)
// EventsHandler broadcasts events to multiple client listeners.
type EventsHandler struct {
type eventsHandler struct {
sync.RWMutex
ws map[string]io.Writer
cs map[string]chan struct{}
@ -18,15 +18,15 @@ type EventsHandler struct {
// NewEventsHandler creates a new EventsHandler for a cluster.
// The new eventsHandler is initialized with no writers or channels.
func NewEventsHandler() *EventsHandler {
return &EventsHandler{
func newEventsHandler() *eventsHandler {
return &eventsHandler{
ws: make(map[string]io.Writer),
cs: make(map[string]chan struct{}),
}
}
// Add adds the writer and a new channel for the remote address.
func (eh *EventsHandler) Add(remoteAddr string, w io.Writer) {
func (eh *eventsHandler) Add(remoteAddr string, w io.Writer) {
eh.Lock()
eh.ws[remoteAddr] = w
eh.cs[remoteAddr] = make(chan struct{})
@ -34,13 +34,13 @@ func (eh *EventsHandler) Add(remoteAddr string, w io.Writer) {
}
// Wait waits on a signal from the remote address.
func (eh *EventsHandler) Wait(remoteAddr string) {
func (eh *eventsHandler) Wait(remoteAddr string) {
<-eh.cs[remoteAddr]
}
// Handle writes information about a cluster event to each remote address in the cluster that has been added to the events handler.
// After a successful write to a remote address, the associated channel is closed and the address is removed from the events handler.
func (eh *EventsHandler) Handle(e *cluster.Event) error {
func (eh *eventsHandler) Handle(e *cluster.Event) error {
eh.RLock()
str := fmt.Sprintf("{%q:%q,%q:%q,%q:%q,%q:%d,%q:{%q:%q,%q:%q,%q:%q,%q:%q}}",
@ -72,7 +72,7 @@ func (eh *EventsHandler) Handle(e *cluster.Event) error {
}
// Size returns the number of remote addresses that the events handler currently contains.
func (eh *EventsHandler) Size() int {
func (eh *eventsHandler) Size() int {
eh.RLock()
defer eh.RUnlock()
return len(eh.ws)

View File

@ -18,7 +18,7 @@ func (fw *FakeWriter) Write(p []byte) (n int, err error) {
}
func TestHandle(t *testing.T) {
eh := NewEventsHandler()
eh := newEventsHandler()
assert.Equal(t, eh.Size(), 0)
fw := &FakeWriter{Tmp: []byte{}}

View File

@ -35,7 +35,11 @@ func newListener(proto, addr string, tlsConfig *tls.Config) (net.Listener, error
//
// The expected format for a host string is [protocol://]address. The protocol
// must be either "tcp" or "unix", with "tcp" used by default if not specified.
func ListenAndServe(c cluster.Cluster, hosts []string, enableCors bool, tlsConfig *tls.Config, eventsHandler *EventsHandler) error {
func ListenAndServe(c cluster.Cluster, hosts []string, enableCors bool, tlsConfig *tls.Config) error {
// Register the API events handler in the cluster.
eventsHandler := newEventsHandler()
c.RegisterEventHandler(eventsHandler)
context := &context{
cluster: c,
eventsHandler: eventsHandler,

View File

@ -1,6 +1,8 @@
package cluster
import "github.com/samalba/dockerclient"
import (
"github.com/samalba/dockerclient"
)
// Cluster is exported
type Cluster interface {
@ -34,4 +36,7 @@ type Cluster interface {
// Return some info about the cluster, like nb or containers / images
// It is pretty open, so the implementation decides what to return.
Info() [][2]string
// Register an event handler for cluster-wide events.
RegisterEventHandler(h EventHandler) error
}

View File

@ -411,8 +411,8 @@ func (e *Engine) Pull(image string) error {
return nil
}
// Events register an event handler.
func (e *Engine) Events(h EventHandler) error {
// RegisterEventHandler registers an event handler.
func (e *Engine) RegisterEventHandler(h EventHandler) error {
if e.eventHandler != nil {
return errors.New("event handler already set")
}

View File

@ -1,6 +1,7 @@
package swarm
import (
"errors"
"fmt"
"sort"
"sync"
@ -27,15 +28,14 @@ type Cluster struct {
}
// NewCluster is exported
func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, eventhandler cluster.EventHandler, options *cluster.Options) cluster.Cluster {
func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, options *cluster.Options) cluster.Cluster {
log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster")
cluster := &Cluster{
eventHandler: eventhandler,
engines: make(map[string]*cluster.Engine),
scheduler: scheduler,
options: options,
store: store,
engines: make(map[string]*cluster.Engine),
scheduler: scheduler,
options: options,
store: store,
}
// get the list of entries from the discovery service
@ -60,12 +60,24 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, eventhandler
// Handle callbacks for the events
func (c *Cluster) Handle(e *cluster.Event) error {
if c.eventHandler == nil {
return nil
}
if err := c.eventHandler.Handle(e); err != nil {
log.Error(err)
}
return nil
}
// RegisterEventHandler registers an event handler.
func (c *Cluster) RegisterEventHandler(h cluster.EventHandler) error {
if c.eventHandler != nil {
return errors.New("event handler already set")
}
c.eventHandler = h
return nil
}
// CreateContainer aka schedule a brand new container into the cluster.
func (c *Cluster) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) {
c.scheduler.Lock()
@ -135,7 +147,7 @@ func (c *Cluster) newEntries(entries []*discovery.Entry) {
return
}
c.engines[engine.ID] = engine
if err := engine.Events(c); err != nil {
if err := engine.RegisterEventHandler(c); err != nil {
log.Error(err)
c.Unlock()
return

View File

@ -120,7 +120,6 @@ func manage(c *cli.Context) {
sched := scheduler.New(s, fs)
eventsHandler := api.NewEventsHandler()
hb, err := strconv.ParseUint(c.String("heartbeat"), 0, 32)
if hb < 1 || err != nil {
log.Fatal("--heartbeat should be an unsigned integer and greater than 0")
@ -132,12 +131,12 @@ func manage(c *cli.Context) {
Heartbeat: hb,
}
cluster := swarm.NewCluster(sched, store, eventsHandler, options)
cluster := swarm.NewCluster(sched, store, options)
// see https://github.com/codegangsta/cli/issues/160
hosts := c.StringSlice("host")
if c.IsSet("host") || c.IsSet("H") {
hosts = hosts[1:]
}
log.Fatal(api.ListenAndServe(cluster, hosts, c.Bool("cors"), tlsConfig, eventsHandler))
log.Fatal(api.ListenAndServe(cluster, hosts, c.Bool("cors"), tlsConfig))
}