cluster: Support multiple event handlers.

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2015-11-25 09:20:53 -08:00 committed by Victor Vieux
parent 8cfc984869
commit 56941d02a8
3 changed files with 46 additions and 16 deletions

View File

@ -83,6 +83,9 @@ type Cluster interface {
// Register an event handler for cluster-wide events.
RegisterEventHandler(h EventHandler) error
// Unregister an event handler.
UnregisterEventHandler(h EventHandler)
// FIXME: remove this method
// Return a random engine
RANDOMENGINE() (*Engine, error)

View File

@ -29,7 +29,7 @@ type Cluster struct {
driver *mesosscheduler.MesosSchedulerDriver
dockerEnginePort string
eventHandler cluster.EventHandler
eventHandlers map[cluster.EventHandler]struct{}
master string
agents map[string]*agent
scheduler *scheduler.Scheduler
@ -156,24 +156,37 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st
// Handle callbacks for the events
func (c *Cluster) Handle(e *cluster.Event) error {
if c.eventHandler == nil {
return nil
}
if err := c.eventHandler.Handle(e); err != nil {
log.Error(err)
c.RLock()
defer c.RUnlock()
for h, _ := range c.eventHandlers {
if err := h.Handle(e); err != nil {
log.Error(err)
}
}
return nil
}
// RegisterEventHandler registers an event handler.
func (c *Cluster) RegisterEventHandler(h cluster.EventHandler) error {
if c.eventHandler != nil {
c.Lock()
defer c.Unlock()
if _, ok := c.eventHandlers[h]; ok {
return errors.New("event handler already set")
}
c.eventHandler = h
c.eventHandlers[h] = struct{}{}
return nil
}
// UnregisterEventHandler unregisters a previously registered event handler.
func (c *Cluster) UnregisterEventHandler(h cluster.EventHandler) {
c.Lock()
defer c.Unlock()
delete(c.eventHandlers, h)
}
// CreateContainer for container creation in Mesos task
func (c *Cluster) CreateContainer(config *cluster.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (*cluster.Container, error) {
if config.Memory == 0 && config.CpuShares == 0 {

View File

@ -50,7 +50,7 @@ func (p *pendingContainer) ToContainer() *cluster.Container {
type Cluster struct {
sync.RWMutex
eventHandler cluster.EventHandler
eventHandlers map[cluster.EventHandler]struct{}
engines map[string]*cluster.Engine
pendingEngines map[string]*cluster.Engine
scheduler *scheduler.Scheduler
@ -67,6 +67,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery
log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster")
cluster := &Cluster{
eventHandlers: make(map[cluster.EventHandler]struct{}),
engines: make(map[string]*cluster.Engine),
pendingEngines: make(map[string]*cluster.Engine),
scheduler: scheduler,
@ -90,24 +91,37 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery
// Handle callbacks for the events
func (c *Cluster) Handle(e *cluster.Event) error {
if c.eventHandler == nil {
return nil
}
if err := c.eventHandler.Handle(e); err != nil {
log.Error(err)
c.RLock()
defer c.RUnlock()
for h, _ := range c.eventHandlers {
if err := h.Handle(e); err != nil {
log.Error(err)
}
}
return nil
}
// RegisterEventHandler registers an event handler.
func (c *Cluster) RegisterEventHandler(h cluster.EventHandler) error {
if c.eventHandler != nil {
c.Lock()
defer c.Unlock()
if _, ok := c.eventHandlers[h]; ok {
return errors.New("event handler already set")
}
c.eventHandler = h
c.eventHandlers[h] = struct{}{}
return nil
}
// UnregisterEventHandler unregisters a previously registered event handler.
func (c *Cluster) UnregisterEventHandler(h cluster.EventHandler) {
c.Lock()
defer c.Unlock()
delete(c.eventHandlers, h)
}
// Generate a globally (across the cluster) unique ID.
func (c *Cluster) generateUniqueID() string {
for {