Merge pull request #1585 from jimenez/klaus-jimenez-offer-refuse

Klaus jimenez offer refuse
This commit is contained in:
Dongluo Chen 2016-01-06 13:20:02 -08:00
commit b4a6ad2e56
2 changed files with 18 additions and 2 deletions

View File

@ -48,7 +48,8 @@ Options:
{{printf "\t * mesos.port=\tport to bind on [$SWARM_MESOS_PORT]"}}
{{printf "\t * mesos.offertimeout=30s\ttimeout for offers [$SWARM_MESOS_OFFER_TIMEOUT]"}}
{{printf "\t * mesos.tasktimeout=5s\ttimeout for task creation [$SWARM_MESOS_TASK_TIMEOUT]"}}
{{printf "\t * mesos.user=\tframework user [$SWARM_MESOS_USER]"}}{{end}}{{ end }}
{{printf "\t * mesos.user=\tframework user [$SWARM_MESOS_USER]"}}
{{printf "\t * mesos.offerrefusetimeout=5s\tseconds to consider unused resources refused [$SWARM_MESOS_OFFER_REFUSE_TIMEOUT]"}}{{end}}{{ end }}
`
}

View File

@ -36,6 +36,7 @@ type Cluster struct {
TLSConfig *tls.Config
options *cluster.DriverOpts
offerTimeout time.Duration
refuseTimeout time.Duration
taskCreationTimeout time.Duration
pendingTasks *queue.Queue
engineOpts *cluster.EngineOpts
@ -47,6 +48,7 @@ const (
defaultDockerEngineTLSPort = "2376"
dockerPortAttribute = "docker_port"
defaultOfferTimeout = 30 * time.Second
defaultRefuseTimeout = 5 * time.Second
defaultTaskCreationTimeout = 5 * time.Second
)
@ -73,6 +75,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st
offerTimeout: defaultOfferTimeout,
taskCreationTimeout: defaultTaskCreationTimeout,
engineOpts: engineOptions,
refuseTimeout: defaultRefuseTimeout,
}
cluster.pendingTasks = queue.NewQueue()
@ -126,6 +129,14 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st
cluster.offerTimeout = d
}
if refuseTimeout, ok := options.String("mesos.offerrefusetimeout", "SWARM_MESOS_OFFER_REFUSE_TIMEOUT"); ok {
d, err := time.ParseDuration(refuseTimeout)
if err != nil {
return nil, err
}
cluster.refuseTimeout = d
}
driver, err := mesosscheduler.NewMesosSchedulerDriver(driverConfig)
if err != nil {
return nil, err
@ -470,7 +481,11 @@ func (c *Cluster) scheduleTask(t *task) bool {
t.build(n.ID, c.agents[n.ID].offers)
if _, err := c.driver.LaunchTasks(offerIDs, []*mesosproto.TaskInfo{&t.TaskInfo}, &mesosproto.Filters{}); err != nil {
offerFilters := &mesosproto.Filters{}
refuseSeconds := c.refuseTimeout.Seconds()
offerFilters.RefuseSeconds = &refuseSeconds
if _, err := c.driver.LaunchTasks(offerIDs, []*mesosproto.TaskInfo{&t.TaskInfo}, offerFilters); err != nil {
// TODO: Do not erase all the offers, only the one used
for _, offer := range s.offers {
c.removeOffer(offer)