Restructuring mesos scheduler driver outside of Cluster

Signed-off-by: Isabel Jimenez <contact@isabeljimenez.com>
This commit is contained in:
Isabel Jimenez 2016-01-07 18:10:11 -05:00
parent 8cc26f56f2
commit 443d49167a
2 changed files with 56 additions and 31 deletions

View File

@ -27,12 +27,11 @@ import (
type Cluster struct { type Cluster struct {
sync.RWMutex sync.RWMutex
driver *mesosscheduler.MesosSchedulerDriver
dockerEnginePort string dockerEnginePort string
eventHandler cluster.EventHandler eventHandler cluster.EventHandler
master string master string
agents map[string]*agent agents map[string]*agent
scheduler *scheduler.Scheduler scheduler *Scheduler
TLSConfig *tls.Config TLSConfig *tls.Config
options *cluster.DriverOpts options *cluster.DriverOpts
offerTimeout time.Duration offerTimeout time.Duration
@ -69,7 +68,6 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st
dockerEnginePort: defaultDockerEnginePort, dockerEnginePort: defaultDockerEnginePort,
master: master, master: master,
agents: make(map[string]*agent), agents: make(map[string]*agent),
scheduler: scheduler,
TLSConfig: TLSConfig, TLSConfig: TLSConfig,
options: &options, options: &options,
offerTimeout: defaultOfferTimeout, offerTimeout: defaultOfferTimeout,
@ -89,7 +87,6 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st
hostname, _ := os.Hostname() hostname, _ := os.Hostname()
driverConfig := mesosscheduler.DriverConfig{ driverConfig := mesosscheduler.DriverConfig{
Scheduler: cluster,
Framework: &mesosproto.FrameworkInfo{Name: proto.String(frameworkName), User: &user}, Framework: &mesosproto.FrameworkInfo{Name: proto.String(frameworkName), User: &user},
Master: cluster.master, Master: cluster.master,
HostnameOverride: hostname, HostnameOverride: hostname,
@ -137,14 +134,13 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st
cluster.refuseTimeout = d cluster.refuseTimeout = d
} }
driver, err := mesosscheduler.NewMesosSchedulerDriver(driverConfig) sched, err := NewScheduler(driverConfig, cluster, scheduler)
if err != nil { if err != nil {
return nil, err return nil, err
} }
cluster.driver = driver cluster.scheduler = sched
status, err := sched.driver.Start()
status, err := driver.Start()
if err != nil { if err != nil {
log.Debugf("Mesos driver started, status/err %v: %v", status, err) log.Debugf("Mesos driver started, status/err %v: %v", status, err)
return nil, err return nil, err
@ -430,7 +426,7 @@ func (c *Cluster) addOffer(offer *mesosproto.Offer) {
time.Sleep(c.offerTimeout) time.Sleep(c.offerTimeout)
// declining Mesos offers to make them available to other Mesos services // declining Mesos offers to make them available to other Mesos services
if c.removeOffer(offer) { if c.removeOffer(offer) {
if _, err := c.driver.DeclineOffer(offer.Id, &mesosproto.Filters{}); err != nil { if _, err := c.scheduler.driver.DeclineOffer(offer.Id, &mesosproto.Filters{}); err != nil {
log.WithFields(log.Fields{"name": "mesos"}).Errorf("Error while declining offer %q: %v", offer.Id.GetValue(), err) log.WithFields(log.Fields{"name": "mesos"}).Errorf("Error while declining offer %q: %v", offer.Id.GetValue(), err)
} else { } else {
log.WithFields(log.Fields{"name": "mesos"}).Debugf("Offer %q declined successfully", offer.Id.GetValue()) log.WithFields(log.Fields{"name": "mesos"}).Debugf("Offer %q declined successfully", offer.Id.GetValue())
@ -485,7 +481,7 @@ func (c *Cluster) scheduleTask(t *task) bool {
refuseSeconds := c.refuseTimeout.Seconds() refuseSeconds := c.refuseTimeout.Seconds()
offerFilters.RefuseSeconds = &refuseSeconds offerFilters.RefuseSeconds = &refuseSeconds
if _, err := c.driver.LaunchTasks(offerIDs, []*mesosproto.TaskInfo{&t.TaskInfo}, offerFilters); err != nil { if _, err := c.scheduler.driver.LaunchTasks(offerIDs, []*mesosproto.TaskInfo{&t.TaskInfo}, offerFilters); err != nil {
// TODO: Do not erase all the offers, only the one used // TODO: Do not erase all the offers, only the one used
for _, offer := range s.offers { for _, offer := range s.offers {
c.removeOffer(offer) c.removeOffer(offer)

View File

@ -5,32 +5,58 @@ import (
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster" "github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler"
"github.com/mesos/mesos-go/mesosproto" "github.com/mesos/mesos-go/mesosproto"
mesosscheduler "github.com/mesos/mesos-go/scheduler" mesosscheduler "github.com/mesos/mesos-go/scheduler"
) )
// Scheduler structure for mesos driver
type Scheduler struct {
scheduler.Scheduler
driver *mesosscheduler.MesosSchedulerDriver
cluster *Cluster
}
// NewScheduler for Scheduler mesos driver creation
func NewScheduler(config mesosscheduler.DriverConfig, cluster *Cluster, sched *scheduler.Scheduler) (*Scheduler, error) {
scheduler := Scheduler{
Scheduler: *sched,
cluster: cluster,
}
config.Scheduler = &scheduler
driver, err := mesosscheduler.NewMesosSchedulerDriver(config)
if err != nil {
return nil, err
}
scheduler.driver = driver
return &scheduler, nil
}
// Registered method for registered mesos framework // Registered method for registered mesos framework
func (c *Cluster) Registered(driver mesosscheduler.SchedulerDriver, fwID *mesosproto.FrameworkID, masterInfo *mesosproto.MasterInfo) { func (s *Scheduler) Registered(driver mesosscheduler.SchedulerDriver, fwID *mesosproto.FrameworkID, masterInfo *mesosproto.MasterInfo) {
log.WithFields(log.Fields{"name": "mesos", "frameworkId": fwID.GetValue()}).Debug("Framework registered") log.WithFields(log.Fields{"name": "mesos", "frameworkId": fwID.GetValue()}).Debug("Framework registered")
} }
// Reregistered method for registered mesos framework // Reregistered method for registered mesos framework
func (c *Cluster) Reregistered(mesosscheduler.SchedulerDriver, *mesosproto.MasterInfo) { func (s *Scheduler) Reregistered(mesosscheduler.SchedulerDriver, *mesosproto.MasterInfo) {
log.WithFields(log.Fields{"name": "mesos"}).Debug("Framework re-registered") log.WithFields(log.Fields{"name": "mesos"}).Debug("Framework re-registered")
} }
// Disconnected method // Disconnected method
func (c *Cluster) Disconnected(mesosscheduler.SchedulerDriver) { func (s *Scheduler) Disconnected(mesosscheduler.SchedulerDriver) {
log.WithFields(log.Fields{"name": "mesos"}).Debug("Framework disconnected") log.WithFields(log.Fields{"name": "mesos"}).Debug("Framework disconnected")
} }
// ResourceOffers method // ResourceOffers method
func (c *Cluster) ResourceOffers(_ mesosscheduler.SchedulerDriver, offers []*mesosproto.Offer) { func (s *Scheduler) ResourceOffers(_ mesosscheduler.SchedulerDriver, offers []*mesosproto.Offer) {
log.WithFields(log.Fields{"name": "mesos", "offers": len(offers)}).Debug("Offers received") log.WithFields(log.Fields{"name": "mesos", "offers": len(offers)}).Debug("Offers received")
for _, offer := range offers { for _, offer := range offers {
agentID := offer.SlaveId.GetValue() agentID := offer.SlaveId.GetValue()
dockerPort := c.dockerEnginePort dockerPort := s.cluster.dockerEnginePort
for _, attribute := range offer.GetAttributes() { for _, attribute := range offer.GetAttributes() {
if attribute.GetName() == dockerPortAttribute { if attribute.GetName() == dockerPortAttribute {
switch attribute.GetType() { switch attribute.GetType() {
@ -41,40 +67,43 @@ func (c *Cluster) ResourceOffers(_ mesosscheduler.SchedulerDriver, offers []*mes
} }
} }
} }
s, ok := c.agents[agentID]
a, ok := s.cluster.agents[agentID]
if !ok { if !ok {
engine := cluster.NewEngine(*offer.Hostname+":"+dockerPort, 0, c.engineOpts) engine := cluster.NewEngine(*offer.Hostname+":"+dockerPort, 0, s.cluster.engineOpts)
if err := engine.Connect(c.TLSConfig); err != nil { if err := engine.Connect(s.cluster.TLSConfig); err != nil {
log.Error(err) log.Error(err)
} else { } else {
// Set engine state to healthy and start refresh loop // Set engine state to healthy and start refresh loop
engine.ValidationComplete() engine.ValidationComplete()
s = newAgent(agentID, engine) a = newAgent(agentID, engine)
c.agents[agentID] = s s.cluster.agents[agentID] = a
if err := s.engine.RegisterEventHandler(c); err != nil { if err := a.engine.RegisterEventHandler(s.cluster); err != nil {
log.Error(err) log.Error(err)
} }
} }
} }
c.addOffer(offer) s.cluster.addOffer(offer)
} }
go c.pendingTasks.Process() go s.cluster.pendingTasks.Process()
} }
// OfferRescinded method // OfferRescinded method
func (c *Cluster) OfferRescinded(mesosscheduler.SchedulerDriver, *mesosproto.OfferID) { func (s *Scheduler) OfferRescinded(mesosscheduler.SchedulerDriver, *mesosproto.OfferID) {
} }
// StatusUpdate method // StatusUpdate method
func (c *Cluster) StatusUpdate(_ mesosscheduler.SchedulerDriver, taskStatus *mesosproto.TaskStatus) { func (s *Scheduler) StatusUpdate(_ mesosscheduler.SchedulerDriver, taskStatus *mesosproto.TaskStatus) {
log.WithFields(log.Fields{"name": "mesos", "state": taskStatus.State.String()}).Debug("Status update") log.WithFields(log.Fields{"name": "mesos", "state": taskStatus.State.String()}).Debug("Status update")
taskID := taskStatus.TaskId.GetValue() taskID := taskStatus.TaskId.GetValue()
agentID := taskStatus.SlaveId.GetValue() agentID := taskStatus.SlaveId.GetValue()
s, ok := c.agents[agentID] a, ok := s.cluster.agents[agentID]
if !ok { if !ok {
return return
} }
if task, ok := s.tasks[taskID]; ok { if task, ok := a.tasks[taskID]; ok {
task.sendStatus(taskStatus) task.sendStatus(taskStatus)
} else { } else {
var reason = "" var reason = ""
@ -92,18 +121,18 @@ func (c *Cluster) StatusUpdate(_ mesosscheduler.SchedulerDriver, taskStatus *mes
} }
// FrameworkMessage method // FrameworkMessage method
func (c *Cluster) FrameworkMessage(mesosscheduler.SchedulerDriver, *mesosproto.ExecutorID, *mesosproto.SlaveID, string) { func (s *Scheduler) FrameworkMessage(mesosscheduler.SchedulerDriver, *mesosproto.ExecutorID, *mesosproto.SlaveID, string) {
} }
// SlaveLost method // SlaveLost method
func (c *Cluster) SlaveLost(mesosscheduler.SchedulerDriver, *mesosproto.SlaveID) { func (s *Scheduler) SlaveLost(mesosscheduler.SchedulerDriver, *mesosproto.SlaveID) {
} }
// ExecutorLost method // ExecutorLost method
func (c *Cluster) ExecutorLost(mesosscheduler.SchedulerDriver, *mesosproto.ExecutorID, *mesosproto.SlaveID, int) { func (s *Scheduler) ExecutorLost(mesosscheduler.SchedulerDriver, *mesosproto.ExecutorID, *mesosproto.SlaveID, int) {
} }
// Error method // Error method
func (c *Cluster) Error(d mesosscheduler.SchedulerDriver, msg string) { func (s *Scheduler) Error(d mesosscheduler.SchedulerDriver, msg string) {
log.WithFields(log.Fields{"name": "mesos"}).Error(msg) log.WithFields(log.Fields{"name": "mesos"}).Error(msg)
} }