From 442217d6dcaa91c91bf4fcea015222b29299b764 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Wed, 15 Apr 2015 15:59:55 -0700 Subject: [PATCH] Cleanup events handler. - Renamed Events() to RegisterEventHandler() in Engine. - Added RegisterEventHandler() to the Cluster interface. - Removed EventHandler requirement from swarm's Cluster constructor. - Make API's events handler private. Signed-off-by: Andrea Luzzardi --- api/api.go | 2 +- api/events.go | 14 +++++++------- api/events_test.go | 2 +- api/server.go | 6 +++++- cluster/cluster.go | 7 ++++++- cluster/engine.go | 4 ++-- cluster/swarm/cluster.go | 26 +++++++++++++++++++------- manage.go | 5 ++--- 8 files changed, 43 insertions(+), 23 deletions(-) diff --git a/api/api.go b/api/api.go index 748c603134..d20f31ad73 100644 --- a/api/api.go +++ b/api/api.go @@ -28,7 +28,7 @@ const APIVERSION = "1.16" type context struct { cluster cluster.Cluster - eventsHandler *EventsHandler + eventsHandler *eventsHandler debug bool tlsConfig *tls.Config } diff --git a/api/events.go b/api/events.go index 8b3558788e..7ea0c99ac0 100644 --- a/api/events.go +++ b/api/events.go @@ -10,7 +10,7 @@ import ( ) // EventsHandler broadcasts events to multiple client listeners. -type EventsHandler struct { +type eventsHandler struct { sync.RWMutex ws map[string]io.Writer cs map[string]chan struct{} @@ -18,15 +18,15 @@ type EventsHandler struct { // NewEventsHandler creates a new EventsHandler for a cluster. // The new eventsHandler is initialized with no writers or channels. -func NewEventsHandler() *EventsHandler { - return &EventsHandler{ +func newEventsHandler() *eventsHandler { + return &eventsHandler{ ws: make(map[string]io.Writer), cs: make(map[string]chan struct{}), } } // Add adds the writer and a new channel for the remote address. -func (eh *EventsHandler) Add(remoteAddr string, w io.Writer) { +func (eh *eventsHandler) Add(remoteAddr string, w io.Writer) { eh.Lock() eh.ws[remoteAddr] = w eh.cs[remoteAddr] = make(chan struct{}) @@ -34,13 +34,13 @@ func (eh *EventsHandler) Add(remoteAddr string, w io.Writer) { } // Wait waits on a signal from the remote address. -func (eh *EventsHandler) Wait(remoteAddr string) { +func (eh *eventsHandler) Wait(remoteAddr string) { <-eh.cs[remoteAddr] } // Handle writes information about a cluster event to each remote address in the cluster that has been added to the events handler. // After a successful write to a remote address, the associated channel is closed and the address is removed from the events handler. -func (eh *EventsHandler) Handle(e *cluster.Event) error { +func (eh *eventsHandler) Handle(e *cluster.Event) error { eh.RLock() str := fmt.Sprintf("{%q:%q,%q:%q,%q:%q,%q:%d,%q:{%q:%q,%q:%q,%q:%q,%q:%q}}", @@ -72,7 +72,7 @@ func (eh *EventsHandler) Handle(e *cluster.Event) error { } // Size returns the number of remote addresses that the events handler currently contains. -func (eh *EventsHandler) Size() int { +func (eh *eventsHandler) Size() int { eh.RLock() defer eh.RUnlock() return len(eh.ws) diff --git a/api/events_test.go b/api/events_test.go index e8ecd7281b..51dcf6effb 100644 --- a/api/events_test.go +++ b/api/events_test.go @@ -18,7 +18,7 @@ func (fw *FakeWriter) Write(p []byte) (n int, err error) { } func TestHandle(t *testing.T) { - eh := NewEventsHandler() + eh := newEventsHandler() assert.Equal(t, eh.Size(), 0) fw := &FakeWriter{Tmp: []byte{}} diff --git a/api/server.go b/api/server.go index 8400500373..303f60ada5 100644 --- a/api/server.go +++ b/api/server.go @@ -35,7 +35,11 @@ func newListener(proto, addr string, tlsConfig *tls.Config) (net.Listener, error // // The expected format for a host string is [protocol://]address. The protocol // must be either "tcp" or "unix", with "tcp" used by default if not specified. -func ListenAndServe(c cluster.Cluster, hosts []string, enableCors bool, tlsConfig *tls.Config, eventsHandler *EventsHandler) error { +func ListenAndServe(c cluster.Cluster, hosts []string, enableCors bool, tlsConfig *tls.Config) error { + // Register the API events handler in the cluster. + eventsHandler := newEventsHandler() + c.RegisterEventHandler(eventsHandler) + context := &context{ cluster: c, eventsHandler: eventsHandler, diff --git a/cluster/cluster.go b/cluster/cluster.go index 2a50edfa3a..b258164266 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -1,6 +1,8 @@ package cluster -import "github.com/samalba/dockerclient" +import ( + "github.com/samalba/dockerclient" +) // Cluster is exported type Cluster interface { @@ -34,4 +36,7 @@ type Cluster interface { // Return some info about the cluster, like nb or containers / images // It is pretty open, so the implementation decides what to return. Info() [][2]string + + // Register an event handler for cluster-wide events. + RegisterEventHandler(h EventHandler) error } diff --git a/cluster/engine.go b/cluster/engine.go index ef5e76de61..856e0ce758 100644 --- a/cluster/engine.go +++ b/cluster/engine.go @@ -411,8 +411,8 @@ func (e *Engine) Pull(image string) error { return nil } -// Events register an event handler. -func (e *Engine) Events(h EventHandler) error { +// RegisterEventHandler registers an event handler. +func (e *Engine) RegisterEventHandler(h EventHandler) error { if e.eventHandler != nil { return errors.New("event handler already set") } diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index 828fd31ec3..7c7f4549ed 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -1,6 +1,7 @@ package swarm import ( + "errors" "fmt" "sort" "sync" @@ -27,15 +28,14 @@ type Cluster struct { } // NewCluster is exported -func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, eventhandler cluster.EventHandler, options *cluster.Options) cluster.Cluster { +func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, options *cluster.Options) cluster.Cluster { log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster") cluster := &Cluster{ - eventHandler: eventhandler, - engines: make(map[string]*cluster.Engine), - scheduler: scheduler, - options: options, - store: store, + engines: make(map[string]*cluster.Engine), + scheduler: scheduler, + options: options, + store: store, } // get the list of entries from the discovery service @@ -60,12 +60,24 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, eventhandler // Handle callbacks for the events func (c *Cluster) Handle(e *cluster.Event) error { + if c.eventHandler == nil { + return nil + } if err := c.eventHandler.Handle(e); err != nil { log.Error(err) } return nil } +// RegisterEventHandler registers an event handler. +func (c *Cluster) RegisterEventHandler(h cluster.EventHandler) error { + if c.eventHandler != nil { + return errors.New("event handler already set") + } + c.eventHandler = h + return nil +} + // CreateContainer aka schedule a brand new container into the cluster. func (c *Cluster) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) { c.scheduler.Lock() @@ -135,7 +147,7 @@ func (c *Cluster) newEntries(entries []*discovery.Entry) { return } c.engines[engine.ID] = engine - if err := engine.Events(c); err != nil { + if err := engine.RegisterEventHandler(c); err != nil { log.Error(err) c.Unlock() return diff --git a/manage.go b/manage.go index d37ac94666..8a8dda12cf 100644 --- a/manage.go +++ b/manage.go @@ -120,7 +120,6 @@ func manage(c *cli.Context) { sched := scheduler.New(s, fs) - eventsHandler := api.NewEventsHandler() hb, err := strconv.ParseUint(c.String("heartbeat"), 0, 32) if hb < 1 || err != nil { log.Fatal("--heartbeat should be an unsigned integer and greater than 0") @@ -132,12 +131,12 @@ func manage(c *cli.Context) { Heartbeat: hb, } - cluster := swarm.NewCluster(sched, store, eventsHandler, options) + cluster := swarm.NewCluster(sched, store, options) // see https://github.com/codegangsta/cli/issues/160 hosts := c.StringSlice("host") if c.IsSet("host") || c.IsSet("H") { hosts = hosts[1:] } - log.Fatal(api.ListenAndServe(cluster, hosts, c.Bool("cors"), tlsConfig, eventsHandler)) + log.Fatal(api.ListenAndServe(cluster, hosts, c.Bool("cors"), tlsConfig)) }