Merge pull request #1635 from jimenez/task_timeout_restructure

Removing Queue package and regrouping task logic
This commit is contained in:
Victor Vieux 2016-01-14 10:18:03 -08:00
commit d3e4ddb0f7
12 changed files with 233 additions and 187 deletions

View File

@ -4,6 +4,7 @@ import (
"sync"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/cluster/mesos/task"
"github.com/mesos/mesos-go/mesosproto"
)
@ -12,7 +13,7 @@ type agent struct {
id string
offers map[string]*mesosproto.Offer
tasks map[string]*task
tasks map[string]*task.Task
engine *cluster.Engine
}
@ -20,7 +21,7 @@ func newAgent(sid string, e *cluster.Engine) *agent {
return &agent{
id: sid,
offers: make(map[string]*mesosproto.Offer),
tasks: make(map[string]*task),
tasks: make(map[string]*task.Task),
engine: e,
}
}
@ -31,7 +32,7 @@ func (s *agent) addOffer(offer *mesosproto.Offer) {
s.Unlock()
}
func (s *agent) addTask(task *task) {
func (s *agent) addTask(task *task.Task) {
s.Lock()
s.tasks[task.TaskInfo.TaskId.GetValue()] = task
s.Unlock()
@ -71,7 +72,7 @@ func (s *agent) getOffers() map[string]*mesosproto.Offer {
return s.offers
}
func (s *agent) getTasks() map[string]*task {
func (s *agent) getTasks() map[string]*task.Task {
s.RLock()
defer s.RUnlock()
return s.tasks

View File

@ -4,6 +4,7 @@ import (
"testing"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/cluster/mesos/task"
"github.com/mesos/mesos-go/mesosutil"
"github.com/samalba/dockerclient"
"github.com/stretchr/testify/assert"
@ -41,11 +42,11 @@ func TestAddTask(t *testing.T) {
assert.Empty(t, s.tasks)
assert.True(t, s.empty())
t1, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1")
t1, err := task.NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1")
assert.NoError(t, err)
s.addTask(t1)
t2, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1")
t2, err := task.NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1")
assert.NoError(t, err)
s.addTask(t2)
assert.Equal(t, len(s.tasks), 2)
@ -79,11 +80,11 @@ func TestRemoveTask(t *testing.T) {
assert.Empty(t, s.tasks)
t1, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1")
t1, err := task.NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1")
assert.NoError(t, err)
s.addTask(t1)
t2, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1")
t2, err := task.NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1")
assert.NoError(t, err)
s.addTask(t2)
assert.Equal(t, len(s.tasks), 2)

View File

@ -14,7 +14,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/cluster/mesos/queue"
"github.com/docker/swarm/cluster/mesos/task"
"github.com/docker/swarm/scheduler"
"github.com/docker/swarm/scheduler/node"
"github.com/gogo/protobuf/proto"
@ -37,7 +37,7 @@ type Cluster struct {
offerTimeout time.Duration
refuseTimeout time.Duration
taskCreationTimeout time.Duration
pendingTasks *queue.Queue
pendingTasks *task.Tasks
engineOpts *cluster.EngineOpts
}
@ -77,7 +77,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st
refuseTimeout: defaultRefuseTimeout,
}
cluster.pendingTasks = queue.NewQueue()
cluster.pendingTasks = task.NewTasks(cluster)
// Empty string is accepted by the scheduler.
user, _ := options.String("mesos.user", "SWARM_MESOS_USER")
@ -179,7 +179,7 @@ func (c *Cluster) CreateContainer(config *cluster.ContainerConfig, name string,
return nil, errResourcesNeeded
}
task, err := newTask(c, config, name)
task, err := task.NewTask(config, name)
if err != nil {
return nil, err
}
@ -187,9 +187,9 @@ func (c *Cluster) CreateContainer(config *cluster.ContainerConfig, name string,
go c.pendingTasks.Add(task)
select {
case container := <-task.container:
case container := <-task.GetContainer():
return formatContainer(container), nil
case err := <-task.error:
case err := <-task.Error:
return nil, err
case <-time.After(c.taskCreationTimeout):
c.pendingTasks.Remove(task)
@ -452,11 +452,12 @@ func (c *Cluster) removeOffer(offer *mesosproto.Offer) bool {
return found
}
func (c *Cluster) scheduleTask(t *task) bool {
// LaunchTask method selects node and calls driver to launch a task
func (c *Cluster) LaunchTask(t *task.Task) bool {
c.scheduler.Lock()
//change to explicit lock defer c.scheduler.Unlock()
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), t.config)
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), t.GetConfig())
if err != nil {
c.scheduler.Unlock()
return false
@ -464,7 +465,7 @@ func (c *Cluster) scheduleTask(t *task) bool {
n := nodes[0]
s, ok := c.agents[n.ID]
if !ok {
t.error <- fmt.Errorf("Unable to create on agent %q", n.ID)
t.Error <- fmt.Errorf("Unable to create on agent %q", n.ID)
c.scheduler.Unlock()
return true
}
@ -478,7 +479,7 @@ func (c *Cluster) scheduleTask(t *task) bool {
offerIDs = append(offerIDs, offer.Id)
}
t.build(n.ID, c.agents[n.ID].offers)
t.Build(n.ID, c.agents[n.ID].offers)
offerFilters := &mesosproto.Filters{}
refuseSeconds := c.refuseTimeout.Seconds()
@ -491,7 +492,7 @@ func (c *Cluster) scheduleTask(t *task) bool {
}
c.Unlock()
c.scheduler.Unlock()
t.error <- err
t.Error <- err
return true
}
@ -504,18 +505,18 @@ func (c *Cluster) scheduleTask(t *task) bool {
c.Unlock()
c.scheduler.Unlock()
// block until we get the container
finished, data, err := t.monitor()
finished, data, err := t.Monitor()
taskID := t.TaskInfo.TaskId.GetValue()
if err != nil {
//remove task
s.removeTask(taskID)
t.error <- err
t.Error <- err
return true
}
if !finished {
go func() {
for {
finished, _, err := t.monitor()
finished, _, err := t.Monitor()
if err != nil {
// TODO do a better log by sending proper error message
log.Error(err)
@ -537,8 +538,8 @@ func (c *Cluster) scheduleTask(t *task) bool {
if data != nil && json.Unmarshal(data, &inspect) == nil && len(inspect) == 1 {
container := &cluster.Container{Container: dockerclient.Container{Id: inspect[0].Id}, Engine: s.engine}
if container, err := container.Refresh(); err == nil {
if !t.done {
t.container <- container
if !t.Stopped() {
t.SetContainer(container)
}
return true
}
@ -552,15 +553,15 @@ func (c *Cluster) scheduleTask(t *task) bool {
for _, container := range s.engine.Containers() {
if container.Config.Labels[cluster.SwarmLabelNamespace+".mesos.task"] == taskID {
if !t.done {
t.container <- container
if !t.Stopped() {
t.SetContainer(container)
}
return true
}
}
if !t.done {
t.error <- fmt.Errorf("Container failed to create")
if !t.Stopped() {
t.Error <- fmt.Errorf("Container failed to create")
}
return true
}

View File

@ -1,58 +0,0 @@
package queue
import "sync"
// Item represents a simple item in the queue
type Item interface {
ID() string
Do() bool
Stop()
}
// Queue is a simple item queue
type Queue struct {
sync.Mutex
items map[string]Item
}
// NewQueue returns a new queue
func NewQueue() *Queue {
return &Queue{items: make(map[string]Item)}
}
// Add tries to Do the item, if it's not possible, add the item to the queue for future tries
func (q *Queue) Add(item Item) {
if !item.Do() {
q.Lock()
q.items[item.ID()] = item
q.Unlock()
}
}
// Remove an item from the queue
func (q *Queue) Remove(items ...Item) {
q.Lock()
q.remove(items...)
q.Unlock()
}
// Process tries to Do all the items in the queue and remove the items successfully done
func (q *Queue) Process() {
q.Lock()
toRemove := []Item{}
for _, item := range q.items {
if item.Do() {
toRemove = append(toRemove, item)
}
}
q.remove(toRemove...)
q.Unlock()
}
func (q *Queue) remove(items ...Item) {
for _, item := range items {
item.Stop()
delete(q.items, item.ID())
}
}

View File

@ -1,62 +0,0 @@
package queue
import (
"testing"
"github.com/stretchr/testify/assert"
)
type item struct {
id string
count int
}
func (i *item) ID() string {
return i.id
}
func (i *item) Stop() {}
func (i *item) Do() bool {
i.count = i.count - 1
return i.count == 0
}
func TestAdd(t *testing.T) {
q := NewQueue()
q.Add(&item{"id1", 1})
assert.Equal(t, len(q.items), 0)
q.Add(&item{"id2", 2})
assert.Equal(t, len(q.items), 1)
}
func TestRemove(t *testing.T) {
q := NewQueue()
i := &item{"id1", 2}
q.Add(i)
assert.Equal(t, len(q.items), 1)
q.Remove(i)
assert.Equal(t, len(q.items), 0)
}
func TestProcess(t *testing.T) {
q := NewQueue()
q.Add(&item{"id1", 2})
assert.Equal(t, len(q.items), 1)
q.Process()
assert.Equal(t, len(q.items), 0)
q.Add(&item{"id2", 3})
assert.Equal(t, len(q.items), 1)
q.Process()
assert.Equal(t, len(q.items), 1)
q.Process()
assert.Equal(t, len(q.items), 0)
}

View File

@ -104,7 +104,7 @@ func (s *Scheduler) StatusUpdate(_ mesosscheduler.SchedulerDriver, taskStatus *m
return
}
if task, ok := a.tasks[taskID]; ok {
task.sendStatus(taskStatus)
task.SendStatus(taskStatus)
} else {
var reason = ""
if taskStatus.Reason != nil {

View File

@ -1,4 +1,4 @@
package mesos
package task
import (
"errors"
@ -14,32 +14,52 @@ import (
"github.com/mesos/mesos-go/mesosutil"
)
type task struct {
// Task struct inherits from TaskInfo and represents a mesos task
type Task struct {
mesosproto.TaskInfo
cluster *Cluster
updates chan *mesosproto.TaskStatus
config *cluster.ContainerConfig
error chan error
Error chan error
container chan *cluster.Container
done bool
}
func (t *task) ID() string {
// GetContainer returns the container channel from the task
// where the Swarm API sends the created container
func (t *Task) GetContainer() chan *cluster.Container {
return t.container
}
// SetContainer writes on the container channel from the task
func (t *Task) SetContainer(container *cluster.Container) {
t.container <- container
}
// GetConfig returns the container configuration of the task
func (t *Task) GetConfig() *cluster.ContainerConfig {
return t.config
}
// ID method returns the taskId
func (t *Task) ID() string {
return t.TaskId.GetValue()
}
func (t *task) Do() bool {
return t.cluster.scheduleTask(t)
// Stopped method returns a boolean determining if the task
// is done
func (t *Task) Stopped() bool {
return t.done
}
func (t *task) Stop() {
// Stop method sets the boolean determining if the task is done
func (t *Task) Stop() {
t.done = true
}
func (t *task) build(slaveID string, offers map[string]*mesosproto.Offer) {
// Build method builds the task
func (t *Task) Build(slaveID string, offers map[string]*mesosproto.Offer) {
t.Command = &mesosproto.CommandInfo{Shell: proto.Bool(false)}
t.Container = &mesosproto.ContainerInfo{
@ -141,7 +161,8 @@ func (t *task) build(slaveID string, offers map[string]*mesosproto.Offer) {
t.SlaveId = &mesosproto.SlaveID{Value: &slaveID}
}
func newTask(c *Cluster, config *cluster.ContainerConfig, name string) (*task, error) {
// NewTask fucntion creates a task
func NewTask(config *cluster.ContainerConfig, name string) (*Task, error) {
id := stringid.TruncateID(stringid.GenerateRandomID())
if name != "" {
@ -152,11 +173,10 @@ func newTask(c *Cluster, config *cluster.ContainerConfig, name string) (*task, e
// FIXME: once Mesos changes merged no need to save the task id to know which container we launched
config.Labels[cluster.SwarmLabelNamespace+".mesos.task"] = id
task := &task{
cluster: c,
task := &Task{
config: config,
container: make(chan *cluster.Container),
error: make(chan error),
Error: make(chan error),
updates: make(chan *mesosproto.TaskStatus),
}
@ -166,16 +186,19 @@ func newTask(c *Cluster, config *cluster.ContainerConfig, name string) (*task, e
return task, nil
}
func (t *task) sendStatus(status *mesosproto.TaskStatus) {
// SendStatus method writes the task status in the updates channel
func (t *Task) SendStatus(status *mesosproto.TaskStatus) {
t.updates <- status
}
func (t *task) getStatus() *mesosproto.TaskStatus {
// GetStatus method reads the task status on the updates channel
func (t *Task) GetStatus() *mesosproto.TaskStatus {
return <-t.updates
}
func (t *task) monitor() (bool, []byte, error) {
taskStatus := t.getStatus()
// Monitor method monitors task statuses
func (t *Task) Monitor() (bool, []byte, error) {
taskStatus := t.GetStatus()
switch taskStatus.GetState() {
case mesosproto.TaskState_TASK_STAGING:

View File

@ -1,4 +1,4 @@
package mesos
package task
import (
"sort"
@ -15,7 +15,7 @@ import (
const name = "mesos-swarm-task-name"
func TestBuild(t *testing.T) {
task, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{
task, err := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{
Image: "test-image",
CpuShares: 42,
Memory: 2097152,
@ -23,7 +23,7 @@ func TestBuild(t *testing.T) {
}), name)
assert.NoError(t, err)
task.build("slave-id", nil)
task.Build("slave-id", nil)
assert.Equal(t, task.Container.GetType(), mesosproto.ContainerInfo_DOCKER)
assert.Equal(t, task.Container.Docker.GetImage(), "test-image")
@ -47,7 +47,7 @@ func TestBuild(t *testing.T) {
}
func TestNewTask(t *testing.T) {
task, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), name)
task, err := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), name)
assert.NoError(t, err)
assert.Equal(t, *task.Name, name)
@ -56,13 +56,13 @@ func TestNewTask(t *testing.T) {
}
func TestSendGetStatus(t *testing.T) {
task, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "")
task, err := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "")
assert.NoError(t, err)
status := mesosutil.NewTaskStatus(nil, mesosproto.TaskState_TASK_RUNNING)
go func() { task.sendStatus(status) }()
s := task.getStatus()
go func() { task.SendStatus(status) }()
s := task.GetStatus()
assert.Equal(t, s, status)
}

View File

@ -0,0 +1,61 @@
package task
import "sync"
type launcher interface {
LaunchTask(t *Task) bool
}
// Tasks is a simple map of tasks
type Tasks struct {
sync.Mutex
cluster launcher
Tasks map[string]*Task
}
// NewTasks returns a new tasks
func NewTasks(cluster launcher) *Tasks {
return &Tasks{
Tasks: make(map[string]*Task),
cluster: cluster,
}
}
// Add tries to Do the Task, if it's not possible, add the Task to the tasks for future tries
func (t *Tasks) Add(task *Task) {
if !t.cluster.LaunchTask(task) {
t.Lock()
t.Tasks[task.ID()] = task
t.Unlock()
}
}
// Remove an Task from the tasks
func (t *Tasks) Remove(tasks ...*Task) {
t.Lock()
t.remove(tasks...)
t.Unlock()
}
// Process tries to Do all the Tasks in the tasks and remove the Tasks successfully done
func (t *Tasks) Process() {
t.Lock()
toRemove := []*Task{}
for _, task := range t.Tasks {
if t.cluster.LaunchTask(task) {
toRemove = append(toRemove, task)
}
}
t.remove(toRemove...)
t.Unlock()
}
func (t *Tasks) remove(tasks ...*Task) {
for _, task := range tasks {
task.Stop()
delete(t.Tasks, task.ID())
}
}

View File

@ -0,0 +1,76 @@
package task
import (
"testing"
"github.com/docker/swarm/cluster"
"github.com/samalba/dockerclient"
"github.com/stretchr/testify/assert"
)
type testLauncher struct {
count int
}
func (t *testLauncher) LaunchTask(_ *Task) bool {
t.count = t.count - 1
return t.count == 0
}
func TestAdd(t *testing.T) {
q := NewTasks(&testLauncher{count: 1})
task1, _ := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{
Image: "test-image",
CpuShares: 42,
Memory: 2097152,
Cmd: []string{"ls", "foo", "bar"},
}), "name1")
task2, _ := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{
Image: "test-image",
CpuShares: 42,
Memory: 2097152,
Cmd: []string{"ls", "foo", "bar"},
}), "name2")
q.Add(task1)
assert.Equal(t, len(q.Tasks), 0)
q.Add(task2)
assert.Equal(t, len(q.Tasks), 1)
}
func TestRemove(t *testing.T) {
q := NewTasks(&testLauncher{count: 2})
task1, _ := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{
Image: "test-image",
CpuShares: 42,
Memory: 2097152,
Cmd: []string{"ls", "foo", "bar"},
}), "name1")
q.Add(task1)
assert.Equal(t, len(q.Tasks), 1)
q.Remove(task1)
assert.Equal(t, len(q.Tasks), 0)
}
func TestProcess(t *testing.T) {
q := NewTasks(&testLauncher{count: 3})
task1, _ := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{
Image: "test-image",
CpuShares: 42,
Memory: 2097152,
Cmd: []string{"ls", "foo", "bar"},
}), "name1")
q.Add(task1)
assert.Equal(t, len(q.Tasks), 1)
q.Process()
assert.Equal(t, len(q.Tasks), 1)
q.Process()
assert.Equal(t, len(q.Tasks), 0)
}

View File

@ -0,0 +1,16 @@
package task
import "github.com/mesos/mesos-go/mesosproto"
func getPorts(offer *mesosproto.Offer) (ports []uint64) {
for _, resource := range offer.Resources {
if resource.GetName() == "ports" {
for _, rang := range resource.GetRanges().GetRange() {
for i := rang.GetBegin(); i <= rang.GetEnd(); i++ {
ports = append(ports, i)
}
}
}
}
return ports
}

View File

@ -37,16 +37,3 @@ func sumScalarResourceValue(offers map[string]*mesosproto.Offer, name string) fl
}
return value
}
func getPorts(offer *mesosproto.Offer) (ports []uint64) {
for _, resource := range offer.Resources {
if resource.GetName() == "ports" {
for _, rang := range resource.GetRanges().GetRange() {
for i := rang.GetBegin(); i <= rang.GetEnd(); i++ {
ports = append(ports, i)
}
}
}
}
return ports
}