diff --git a/cluster/event.go b/cluster/event.go index e55ac47dda..034afe356b 100644 --- a/cluster/event.go +++ b/cluster/event.go @@ -1,6 +1,12 @@ package cluster -import "github.com/samalba/dockerclient" +import ( + "errors" + "sync" + + log "github.com/Sirupsen/logrus" + "github.com/samalba/dockerclient" +) // Event is exported type Event struct { @@ -12,3 +18,49 @@ type Event struct { type EventHandler interface { Handle(*Event) error } + +// EventHandlers is a map of EventHandler +type EventHandlers struct { + sync.RWMutex + + eventHandlers map[EventHandler]struct{} +} + +// NewEventHandlers returns a EventHandlers +func NewEventHandlers() *EventHandlers { + return &EventHandlers{ + eventHandlers: make(map[EventHandler]struct{}), + } +} + +// Handle callbacks for the events +func (eh *EventHandlers) Handle(e *Event) { + eh.RLock() + defer eh.RUnlock() + + for h := range eh.eventHandlers { + if err := h.Handle(e); err != nil { + log.Error(err) + } + } +} + +// RegisterEventHandler registers an event handler. +func (eh *EventHandlers) RegisterEventHandler(h EventHandler) error { + eh.Lock() + defer eh.Unlock() + + if _, ok := eh.eventHandlers[h]; ok { + return errors.New("event handler already set") + } + eh.eventHandlers[h] = struct{}{} + return nil +} + +// UnregisterEventHandler unregisters a previously registered event handler. +func (eh *EventHandlers) UnregisterEventHandler(h EventHandler) { + eh.Lock() + defer eh.Unlock() + + delete(eh.eventHandlers, h) +} diff --git a/cluster/mesos/cluster.go b/cluster/mesos/cluster.go index 91f10c3bc0..4157bf02d8 100644 --- a/cluster/mesos/cluster.go +++ b/cluster/mesos/cluster.go @@ -29,7 +29,7 @@ type Cluster struct { driver *mesosscheduler.MesosSchedulerDriver dockerEnginePort string - eventHandlers map[cluster.EventHandler]struct{} + eventHandlers *cluster.EventHandlers master string agents map[string]*agent scheduler *scheduler.Scheduler @@ -67,6 +67,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st } cluster := &Cluster{ dockerEnginePort: defaultDockerEnginePort, + eventHandlers: cluster.NewEventHandlers(), master: master, agents: make(map[string]*agent), scheduler: scheduler, @@ -156,35 +157,18 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st // Handle callbacks for the events func (c *Cluster) Handle(e *cluster.Event) error { - c.RLock() - defer c.RUnlock() - - for h := range c.eventHandlers { - if err := h.Handle(e); err != nil { - log.Error(err) - } - } + c.eventHandlers.Handle(e) return nil } // RegisterEventHandler registers an event handler. func (c *Cluster) RegisterEventHandler(h cluster.EventHandler) error { - c.Lock() - defer c.Unlock() - - if _, ok := c.eventHandlers[h]; ok { - return errors.New("event handler already set") - } - c.eventHandlers[h] = struct{}{} - return nil + return c.eventHandlers.RegisterEventHandler(h) } // UnregisterEventHandler unregisters a previously registered event handler. func (c *Cluster) UnregisterEventHandler(h cluster.EventHandler) { - c.Lock() - defer c.Unlock() - - delete(c.eventHandlers, h) + c.eventHandlers.UnregisterEventHandler(h) } // CreateContainer for container creation in Mesos task diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index 0e77c3cd6c..969d28f497 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -50,7 +50,7 @@ func (p *pendingContainer) ToContainer() *cluster.Container { type Cluster struct { sync.RWMutex - eventHandlers map[cluster.EventHandler]struct{} + eventHandlers *cluster.EventHandlers engines map[string]*cluster.Engine pendingEngines map[string]*cluster.Engine scheduler *scheduler.Scheduler @@ -67,7 +67,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster") cluster := &Cluster{ - eventHandlers: make(map[cluster.EventHandler]struct{}), + eventHandlers: cluster.NewEventHandlers(), engines: make(map[string]*cluster.Engine), pendingEngines: make(map[string]*cluster.Engine), scheduler: scheduler, @@ -91,35 +91,18 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery // Handle callbacks for the events func (c *Cluster) Handle(e *cluster.Event) error { - c.RLock() - defer c.RUnlock() - - for h := range c.eventHandlers { - if err := h.Handle(e); err != nil { - log.Error(err) - } - } + c.eventHandlers.Handle(e) return nil } // RegisterEventHandler registers an event handler. func (c *Cluster) RegisterEventHandler(h cluster.EventHandler) error { - c.Lock() - defer c.Unlock() - - if _, ok := c.eventHandlers[h]; ok { - return errors.New("event handler already set") - } - c.eventHandlers[h] = struct{}{} - return nil + return c.eventHandlers.RegisterEventHandler(h) } // UnregisterEventHandler unregisters a previously registered event handler. func (c *Cluster) UnregisterEventHandler(h cluster.EventHandler) { - c.Lock() - defer c.Unlock() - - delete(c.eventHandlers, h) + c.eventHandlers.UnregisterEventHandler(h) } // Generate a globally (across the cluster) unique ID.