From fe8da8fe80a88f404bd36c9b1eaf3edf86eae401 Mon Sep 17 00:00:00 2001 From: Isabel Jimenez Date: Tue, 12 Jan 2016 16:40:26 -0500 Subject: [PATCH] Removing Queue package and regrouping task logic Signed-off-by: Isabel Jimenez --- cluster/mesos/agent.go | 9 ++-- cluster/mesos/agent_test.go | 9 ++-- cluster/mesos/cluster.go | 41 ++++++++------- cluster/mesos/queue/queue.go | 58 -------------------- cluster/mesos/queue/queue_test.go | 62 ---------------------- cluster/mesos/scheduler.go | 2 +- cluster/mesos/{ => task}/task.go | 59 ++++++++++++++------- cluster/mesos/{ => task}/task_test.go | 14 ++--- cluster/mesos/task/tasks.go | 61 +++++++++++++++++++++ cluster/mesos/task/tasks_test.go | 76 +++++++++++++++++++++++++++ cluster/mesos/task/utils.go | 16 ++++++ cluster/mesos/utils.go | 13 ----- 12 files changed, 233 insertions(+), 187 deletions(-) delete mode 100644 cluster/mesos/queue/queue.go delete mode 100644 cluster/mesos/queue/queue_test.go rename cluster/mesos/{ => task}/task.go (78%) rename cluster/mesos/{ => task}/task_test.go (82%) create mode 100644 cluster/mesos/task/tasks.go create mode 100644 cluster/mesos/task/tasks_test.go create mode 100644 cluster/mesos/task/utils.go diff --git a/cluster/mesos/agent.go b/cluster/mesos/agent.go index 0b8e2efa40..48c51a9682 100644 --- a/cluster/mesos/agent.go +++ b/cluster/mesos/agent.go @@ -4,6 +4,7 @@ import ( "sync" "github.com/docker/swarm/cluster" + "github.com/docker/swarm/cluster/mesos/task" "github.com/mesos/mesos-go/mesosproto" ) @@ -12,7 +13,7 @@ type agent struct { id string offers map[string]*mesosproto.Offer - tasks map[string]*task + tasks map[string]*task.Task engine *cluster.Engine } @@ -20,7 +21,7 @@ func newAgent(sid string, e *cluster.Engine) *agent { return &agent{ id: sid, offers: make(map[string]*mesosproto.Offer), - tasks: make(map[string]*task), + tasks: make(map[string]*task.Task), engine: e, } } @@ -31,7 +32,7 @@ func (s *agent) addOffer(offer *mesosproto.Offer) { s.Unlock() } -func (s *agent) addTask(task *task) { +func (s *agent) addTask(task *task.Task) { s.Lock() s.tasks[task.TaskInfo.TaskId.GetValue()] = task s.Unlock() @@ -71,7 +72,7 @@ func (s *agent) getOffers() map[string]*mesosproto.Offer { return s.offers } -func (s *agent) getTasks() map[string]*task { +func (s *agent) getTasks() map[string]*task.Task { s.RLock() defer s.RUnlock() return s.tasks diff --git a/cluster/mesos/agent_test.go b/cluster/mesos/agent_test.go index 353fa4e7c5..60aea81ed0 100644 --- a/cluster/mesos/agent_test.go +++ b/cluster/mesos/agent_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/docker/swarm/cluster" + "github.com/docker/swarm/cluster/mesos/task" "github.com/mesos/mesos-go/mesosutil" "github.com/samalba/dockerclient" "github.com/stretchr/testify/assert" @@ -41,11 +42,11 @@ func TestAddTask(t *testing.T) { assert.Empty(t, s.tasks) assert.True(t, s.empty()) - t1, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1") + t1, err := task.NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1") assert.NoError(t, err) s.addTask(t1) - t2, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1") + t2, err := task.NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1") assert.NoError(t, err) s.addTask(t2) assert.Equal(t, len(s.tasks), 2) @@ -79,11 +80,11 @@ func TestRemoveTask(t *testing.T) { assert.Empty(t, s.tasks) - t1, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1") + t1, err := task.NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1") assert.NoError(t, err) s.addTask(t1) - t2, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1") + t2, err := task.NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1") assert.NoError(t, err) s.addTask(t2) assert.Equal(t, len(s.tasks), 2) diff --git a/cluster/mesos/cluster.go b/cluster/mesos/cluster.go index 18f3c4cae8..34d6d4932d 100644 --- a/cluster/mesos/cluster.go +++ b/cluster/mesos/cluster.go @@ -14,7 +14,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/swarm/cluster" - "github.com/docker/swarm/cluster/mesos/queue" + "github.com/docker/swarm/cluster/mesos/task" "github.com/docker/swarm/scheduler" "github.com/docker/swarm/scheduler/node" "github.com/gogo/protobuf/proto" @@ -37,7 +37,7 @@ type Cluster struct { offerTimeout time.Duration refuseTimeout time.Duration taskCreationTimeout time.Duration - pendingTasks *queue.Queue + pendingTasks *task.Tasks engineOpts *cluster.EngineOpts } @@ -77,7 +77,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st refuseTimeout: defaultRefuseTimeout, } - cluster.pendingTasks = queue.NewQueue() + cluster.pendingTasks = task.NewTasks(cluster) // Empty string is accepted by the scheduler. user, _ := options.String("mesos.user", "SWARM_MESOS_USER") @@ -173,7 +173,7 @@ func (c *Cluster) CreateContainer(config *cluster.ContainerConfig, name string, return nil, errResourcesNeeded } - task, err := newTask(c, config, name) + task, err := task.NewTask(config, name) if err != nil { return nil, err } @@ -181,9 +181,9 @@ func (c *Cluster) CreateContainer(config *cluster.ContainerConfig, name string, go c.pendingTasks.Add(task) select { - case container := <-task.container: + case container := <-task.GetContainer(): return formatContainer(container), nil - case err := <-task.error: + case err := <-task.Error: return nil, err case <-time.After(c.taskCreationTimeout): c.pendingTasks.Remove(task) @@ -446,11 +446,12 @@ func (c *Cluster) removeOffer(offer *mesosproto.Offer) bool { return found } -func (c *Cluster) scheduleTask(t *task) bool { +// LaunchTask method selects node and calls driver to launch a task +func (c *Cluster) LaunchTask(t *task.Task) bool { c.scheduler.Lock() //change to explicit lock defer c.scheduler.Unlock() - nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), t.config) + nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), t.GetConfig()) if err != nil { c.scheduler.Unlock() return false @@ -458,7 +459,7 @@ func (c *Cluster) scheduleTask(t *task) bool { n := nodes[0] s, ok := c.agents[n.ID] if !ok { - t.error <- fmt.Errorf("Unable to create on agent %q", n.ID) + t.Error <- fmt.Errorf("Unable to create on agent %q", n.ID) c.scheduler.Unlock() return true } @@ -472,7 +473,7 @@ func (c *Cluster) scheduleTask(t *task) bool { offerIDs = append(offerIDs, offer.Id) } - t.build(n.ID, c.agents[n.ID].offers) + t.Build(n.ID, c.agents[n.ID].offers) offerFilters := &mesosproto.Filters{} refuseSeconds := c.refuseTimeout.Seconds() @@ -485,7 +486,7 @@ func (c *Cluster) scheduleTask(t *task) bool { } c.Unlock() c.scheduler.Unlock() - t.error <- err + t.Error <- err return true } @@ -498,18 +499,18 @@ func (c *Cluster) scheduleTask(t *task) bool { c.Unlock() c.scheduler.Unlock() // block until we get the container - finished, data, err := t.monitor() + finished, data, err := t.Monitor() taskID := t.TaskInfo.TaskId.GetValue() if err != nil { //remove task s.removeTask(taskID) - t.error <- err + t.Error <- err return true } if !finished { go func() { for { - finished, _, err := t.monitor() + finished, _, err := t.Monitor() if err != nil { // TODO do a better log by sending proper error message log.Error(err) @@ -531,8 +532,8 @@ func (c *Cluster) scheduleTask(t *task) bool { if data != nil && json.Unmarshal(data, &inspect) == nil && len(inspect) == 1 { container := &cluster.Container{Container: dockerclient.Container{Id: inspect[0].Id}, Engine: s.engine} if container, err := container.Refresh(); err == nil { - if !t.done { - t.container <- container + if !t.Stopped() { + t.SetContainer(container) } return true } @@ -546,15 +547,15 @@ func (c *Cluster) scheduleTask(t *task) bool { for _, container := range s.engine.Containers() { if container.Config.Labels[cluster.SwarmLabelNamespace+".mesos.task"] == taskID { - if !t.done { - t.container <- container + if !t.Stopped() { + t.SetContainer(container) } return true } } - if !t.done { - t.error <- fmt.Errorf("Container failed to create") + if !t.Stopped() { + t.Error <- fmt.Errorf("Container failed to create") } return true } diff --git a/cluster/mesos/queue/queue.go b/cluster/mesos/queue/queue.go deleted file mode 100644 index 1eedf9e195..0000000000 --- a/cluster/mesos/queue/queue.go +++ /dev/null @@ -1,58 +0,0 @@ -package queue - -import "sync" - -// Item represents a simple item in the queue -type Item interface { - ID() string - Do() bool - Stop() -} - -// Queue is a simple item queue -type Queue struct { - sync.Mutex - items map[string]Item -} - -// NewQueue returns a new queue -func NewQueue() *Queue { - return &Queue{items: make(map[string]Item)} -} - -// Add tries to Do the item, if it's not possible, add the item to the queue for future tries -func (q *Queue) Add(item Item) { - if !item.Do() { - q.Lock() - q.items[item.ID()] = item - q.Unlock() - } -} - -// Remove an item from the queue -func (q *Queue) Remove(items ...Item) { - q.Lock() - q.remove(items...) - q.Unlock() -} - -// Process tries to Do all the items in the queue and remove the items successfully done -func (q *Queue) Process() { - q.Lock() - toRemove := []Item{} - for _, item := range q.items { - if item.Do() { - toRemove = append(toRemove, item) - } - } - - q.remove(toRemove...) - q.Unlock() -} - -func (q *Queue) remove(items ...Item) { - for _, item := range items { - item.Stop() - delete(q.items, item.ID()) - } -} diff --git a/cluster/mesos/queue/queue_test.go b/cluster/mesos/queue/queue_test.go deleted file mode 100644 index cf52044215..0000000000 --- a/cluster/mesos/queue/queue_test.go +++ /dev/null @@ -1,62 +0,0 @@ -package queue - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -type item struct { - id string - count int -} - -func (i *item) ID() string { - return i.id -} - -func (i *item) Stop() {} - -func (i *item) Do() bool { - i.count = i.count - 1 - return i.count == 0 -} - -func TestAdd(t *testing.T) { - q := NewQueue() - - q.Add(&item{"id1", 1}) - assert.Equal(t, len(q.items), 0) - - q.Add(&item{"id2", 2}) - assert.Equal(t, len(q.items), 1) - -} - -func TestRemove(t *testing.T) { - q := NewQueue() - - i := &item{"id1", 2} - q.Add(i) - assert.Equal(t, len(q.items), 1) - q.Remove(i) - assert.Equal(t, len(q.items), 0) - -} - -func TestProcess(t *testing.T) { - q := NewQueue() - - q.Add(&item{"id1", 2}) - assert.Equal(t, len(q.items), 1) - q.Process() - assert.Equal(t, len(q.items), 0) - - q.Add(&item{"id2", 3}) - assert.Equal(t, len(q.items), 1) - q.Process() - assert.Equal(t, len(q.items), 1) - q.Process() - assert.Equal(t, len(q.items), 0) - -} diff --git a/cluster/mesos/scheduler.go b/cluster/mesos/scheduler.go index d58b80fc39..6d17be04f0 100644 --- a/cluster/mesos/scheduler.go +++ b/cluster/mesos/scheduler.go @@ -104,7 +104,7 @@ func (s *Scheduler) StatusUpdate(_ mesosscheduler.SchedulerDriver, taskStatus *m return } if task, ok := a.tasks[taskID]; ok { - task.sendStatus(taskStatus) + task.SendStatus(taskStatus) } else { var reason = "" if taskStatus.Reason != nil { diff --git a/cluster/mesos/task.go b/cluster/mesos/task/task.go similarity index 78% rename from cluster/mesos/task.go rename to cluster/mesos/task/task.go index bc7f381346..ebdeb7f3e0 100644 --- a/cluster/mesos/task.go +++ b/cluster/mesos/task/task.go @@ -1,4 +1,4 @@ -package mesos +package task import ( "errors" @@ -14,32 +14,52 @@ import ( "github.com/mesos/mesos-go/mesosutil" ) -type task struct { +// Task struct inherits from TaskInfo and represents a mesos task +type Task struct { mesosproto.TaskInfo - cluster *Cluster - updates chan *mesosproto.TaskStatus config *cluster.ContainerConfig - error chan error + Error chan error container chan *cluster.Container done bool } -func (t *task) ID() string { +// GetContainer returns the container channel from the task +// where the Swarm API sends the created container +func (t *Task) GetContainer() chan *cluster.Container { + return t.container +} + +// SetContainer writes on the container channel from the task +func (t *Task) SetContainer(container *cluster.Container) { + t.container <- container +} + +// GetConfig returns the container configuration of the task +func (t *Task) GetConfig() *cluster.ContainerConfig { + return t.config +} + +// ID method returns the taskId +func (t *Task) ID() string { return t.TaskId.GetValue() } -func (t *task) Do() bool { - return t.cluster.scheduleTask(t) +// Stopped method returns a boolean determining if the task +// is done +func (t *Task) Stopped() bool { + return t.done } -func (t *task) Stop() { +// Stop method sets the boolean determining if the task is done +func (t *Task) Stop() { t.done = true } -func (t *task) build(slaveID string, offers map[string]*mesosproto.Offer) { +// Build method builds the task +func (t *Task) Build(slaveID string, offers map[string]*mesosproto.Offer) { t.Command = &mesosproto.CommandInfo{Shell: proto.Bool(false)} t.Container = &mesosproto.ContainerInfo{ @@ -141,7 +161,8 @@ func (t *task) build(slaveID string, offers map[string]*mesosproto.Offer) { t.SlaveId = &mesosproto.SlaveID{Value: &slaveID} } -func newTask(c *Cluster, config *cluster.ContainerConfig, name string) (*task, error) { +// NewTask fucntion creates a task +func NewTask(config *cluster.ContainerConfig, name string) (*Task, error) { id := stringid.TruncateID(stringid.GenerateRandomID()) if name != "" { @@ -152,11 +173,10 @@ func newTask(c *Cluster, config *cluster.ContainerConfig, name string) (*task, e // FIXME: once Mesos changes merged no need to save the task id to know which container we launched config.Labels[cluster.SwarmLabelNamespace+".mesos.task"] = id - task := &task{ - cluster: c, + task := &Task{ config: config, container: make(chan *cluster.Container), - error: make(chan error), + Error: make(chan error), updates: make(chan *mesosproto.TaskStatus), } @@ -166,16 +186,19 @@ func newTask(c *Cluster, config *cluster.ContainerConfig, name string) (*task, e return task, nil } -func (t *task) sendStatus(status *mesosproto.TaskStatus) { +// SendStatus method writes the task status in the updates channel +func (t *Task) SendStatus(status *mesosproto.TaskStatus) { t.updates <- status } -func (t *task) getStatus() *mesosproto.TaskStatus { +// GetStatus method reads the task status on the updates channel +func (t *Task) GetStatus() *mesosproto.TaskStatus { return <-t.updates } -func (t *task) monitor() (bool, []byte, error) { - taskStatus := t.getStatus() +// Monitor method monitors task statuses +func (t *Task) Monitor() (bool, []byte, error) { + taskStatus := t.GetStatus() switch taskStatus.GetState() { case mesosproto.TaskState_TASK_STAGING: diff --git a/cluster/mesos/task_test.go b/cluster/mesos/task/task_test.go similarity index 82% rename from cluster/mesos/task_test.go rename to cluster/mesos/task/task_test.go index 86d82b7e79..bd4154a840 100644 --- a/cluster/mesos/task_test.go +++ b/cluster/mesos/task/task_test.go @@ -1,4 +1,4 @@ -package mesos +package task import ( "sort" @@ -15,7 +15,7 @@ import ( const name = "mesos-swarm-task-name" func TestBuild(t *testing.T) { - task, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{ + task, err := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{ Image: "test-image", CpuShares: 42, Memory: 2097152, @@ -23,7 +23,7 @@ func TestBuild(t *testing.T) { }), name) assert.NoError(t, err) - task.build("slave-id", nil) + task.Build("slave-id", nil) assert.Equal(t, task.Container.GetType(), mesosproto.ContainerInfo_DOCKER) assert.Equal(t, task.Container.Docker.GetImage(), "test-image") @@ -47,7 +47,7 @@ func TestBuild(t *testing.T) { } func TestNewTask(t *testing.T) { - task, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), name) + task, err := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), name) assert.NoError(t, err) assert.Equal(t, *task.Name, name) @@ -56,13 +56,13 @@ func TestNewTask(t *testing.T) { } func TestSendGetStatus(t *testing.T) { - task, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "") + task, err := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "") assert.NoError(t, err) status := mesosutil.NewTaskStatus(nil, mesosproto.TaskState_TASK_RUNNING) - go func() { task.sendStatus(status) }() - s := task.getStatus() + go func() { task.SendStatus(status) }() + s := task.GetStatus() assert.Equal(t, s, status) } diff --git a/cluster/mesos/task/tasks.go b/cluster/mesos/task/tasks.go new file mode 100644 index 0000000000..bfdd495b14 --- /dev/null +++ b/cluster/mesos/task/tasks.go @@ -0,0 +1,61 @@ +package task + +import "sync" + +type launcher interface { + LaunchTask(t *Task) bool +} + +// Tasks is a simple map of tasks +type Tasks struct { + sync.Mutex + + cluster launcher + Tasks map[string]*Task +} + +// NewTasks returns a new tasks +func NewTasks(cluster launcher) *Tasks { + return &Tasks{ + Tasks: make(map[string]*Task), + cluster: cluster, + } + +} + +// Add tries to Do the Task, if it's not possible, add the Task to the tasks for future tries +func (t *Tasks) Add(task *Task) { + if !t.cluster.LaunchTask(task) { + t.Lock() + t.Tasks[task.ID()] = task + t.Unlock() + } +} + +// Remove an Task from the tasks +func (t *Tasks) Remove(tasks ...*Task) { + t.Lock() + t.remove(tasks...) + t.Unlock() +} + +// Process tries to Do all the Tasks in the tasks and remove the Tasks successfully done +func (t *Tasks) Process() { + t.Lock() + toRemove := []*Task{} + for _, task := range t.Tasks { + if t.cluster.LaunchTask(task) { + toRemove = append(toRemove, task) + } + } + + t.remove(toRemove...) + t.Unlock() +} + +func (t *Tasks) remove(tasks ...*Task) { + for _, task := range tasks { + task.Stop() + delete(t.Tasks, task.ID()) + } +} diff --git a/cluster/mesos/task/tasks_test.go b/cluster/mesos/task/tasks_test.go new file mode 100644 index 0000000000..3a92480996 --- /dev/null +++ b/cluster/mesos/task/tasks_test.go @@ -0,0 +1,76 @@ +package task + +import ( + "testing" + + "github.com/docker/swarm/cluster" + "github.com/samalba/dockerclient" + "github.com/stretchr/testify/assert" +) + +type testLauncher struct { + count int +} + +func (t *testLauncher) LaunchTask(_ *Task) bool { + t.count = t.count - 1 + return t.count == 0 +} + +func TestAdd(t *testing.T) { + q := NewTasks(&testLauncher{count: 1}) + + task1, _ := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{ + Image: "test-image", + CpuShares: 42, + Memory: 2097152, + Cmd: []string{"ls", "foo", "bar"}, + }), "name1") + + task2, _ := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{ + Image: "test-image", + CpuShares: 42, + Memory: 2097152, + Cmd: []string{"ls", "foo", "bar"}, + }), "name2") + q.Add(task1) + assert.Equal(t, len(q.Tasks), 0) + + q.Add(task2) + assert.Equal(t, len(q.Tasks), 1) + +} + +func TestRemove(t *testing.T) { + q := NewTasks(&testLauncher{count: 2}) + task1, _ := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{ + Image: "test-image", + CpuShares: 42, + Memory: 2097152, + Cmd: []string{"ls", "foo", "bar"}, + }), "name1") + + q.Add(task1) + assert.Equal(t, len(q.Tasks), 1) + q.Remove(task1) + assert.Equal(t, len(q.Tasks), 0) + +} + +func TestProcess(t *testing.T) { + q := NewTasks(&testLauncher{count: 3}) + task1, _ := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{ + Image: "test-image", + CpuShares: 42, + Memory: 2097152, + Cmd: []string{"ls", "foo", "bar"}, + }), "name1") + + q.Add(task1) + assert.Equal(t, len(q.Tasks), 1) + q.Process() + assert.Equal(t, len(q.Tasks), 1) + q.Process() + assert.Equal(t, len(q.Tasks), 0) + +} diff --git a/cluster/mesos/task/utils.go b/cluster/mesos/task/utils.go new file mode 100644 index 0000000000..1a8d4a79f4 --- /dev/null +++ b/cluster/mesos/task/utils.go @@ -0,0 +1,16 @@ +package task + +import "github.com/mesos/mesos-go/mesosproto" + +func getPorts(offer *mesosproto.Offer) (ports []uint64) { + for _, resource := range offer.Resources { + if resource.GetName() == "ports" { + for _, rang := range resource.GetRanges().GetRange() { + for i := rang.GetBegin(); i <= rang.GetEnd(); i++ { + ports = append(ports, i) + } + } + } + } + return ports +} diff --git a/cluster/mesos/utils.go b/cluster/mesos/utils.go index f25c66ae05..9055a3e980 100644 --- a/cluster/mesos/utils.go +++ b/cluster/mesos/utils.go @@ -37,16 +37,3 @@ func sumScalarResourceValue(offers map[string]*mesosproto.Offer, name string) fl } return value } - -func getPorts(offer *mesosproto.Offer) (ports []uint64) { - for _, resource := range offer.Resources { - if resource.GetName() == "ports" { - for _, rang := range resource.GetRanges().GetRange() { - for i := rang.GetBegin(); i <= rang.GetEnd(); i++ { - ports = append(ports, i) - } - } - } - } - return ports -}