mirror of https://github.com/docker/docs.git
Removing Queue package and regrouping task logic
Signed-off-by: Isabel Jimenez <contact@isabeljimenez.com>
This commit is contained in:
parent
254e095f77
commit
fe8da8fe80
|
@ -4,6 +4,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/docker/swarm/cluster"
|
"github.com/docker/swarm/cluster"
|
||||||
|
"github.com/docker/swarm/cluster/mesos/task"
|
||||||
"github.com/mesos/mesos-go/mesosproto"
|
"github.com/mesos/mesos-go/mesosproto"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -12,7 +13,7 @@ type agent struct {
|
||||||
|
|
||||||
id string
|
id string
|
||||||
offers map[string]*mesosproto.Offer
|
offers map[string]*mesosproto.Offer
|
||||||
tasks map[string]*task
|
tasks map[string]*task.Task
|
||||||
engine *cluster.Engine
|
engine *cluster.Engine
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,7 +21,7 @@ func newAgent(sid string, e *cluster.Engine) *agent {
|
||||||
return &agent{
|
return &agent{
|
||||||
id: sid,
|
id: sid,
|
||||||
offers: make(map[string]*mesosproto.Offer),
|
offers: make(map[string]*mesosproto.Offer),
|
||||||
tasks: make(map[string]*task),
|
tasks: make(map[string]*task.Task),
|
||||||
engine: e,
|
engine: e,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -31,7 +32,7 @@ func (s *agent) addOffer(offer *mesosproto.Offer) {
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *agent) addTask(task *task) {
|
func (s *agent) addTask(task *task.Task) {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
s.tasks[task.TaskInfo.TaskId.GetValue()] = task
|
s.tasks[task.TaskInfo.TaskId.GetValue()] = task
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
|
@ -71,7 +72,7 @@ func (s *agent) getOffers() map[string]*mesosproto.Offer {
|
||||||
return s.offers
|
return s.offers
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *agent) getTasks() map[string]*task {
|
func (s *agent) getTasks() map[string]*task.Task {
|
||||||
s.RLock()
|
s.RLock()
|
||||||
defer s.RUnlock()
|
defer s.RUnlock()
|
||||||
return s.tasks
|
return s.tasks
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/docker/swarm/cluster"
|
"github.com/docker/swarm/cluster"
|
||||||
|
"github.com/docker/swarm/cluster/mesos/task"
|
||||||
"github.com/mesos/mesos-go/mesosutil"
|
"github.com/mesos/mesos-go/mesosutil"
|
||||||
"github.com/samalba/dockerclient"
|
"github.com/samalba/dockerclient"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
@ -41,11 +42,11 @@ func TestAddTask(t *testing.T) {
|
||||||
assert.Empty(t, s.tasks)
|
assert.Empty(t, s.tasks)
|
||||||
assert.True(t, s.empty())
|
assert.True(t, s.empty())
|
||||||
|
|
||||||
t1, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1")
|
t1, err := task.NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1")
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
s.addTask(t1)
|
s.addTask(t1)
|
||||||
|
|
||||||
t2, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1")
|
t2, err := task.NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1")
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
s.addTask(t2)
|
s.addTask(t2)
|
||||||
assert.Equal(t, len(s.tasks), 2)
|
assert.Equal(t, len(s.tasks), 2)
|
||||||
|
@ -79,11 +80,11 @@ func TestRemoveTask(t *testing.T) {
|
||||||
|
|
||||||
assert.Empty(t, s.tasks)
|
assert.Empty(t, s.tasks)
|
||||||
|
|
||||||
t1, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1")
|
t1, err := task.NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1")
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
s.addTask(t1)
|
s.addTask(t1)
|
||||||
|
|
||||||
t2, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1")
|
t2, err := task.NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "task1")
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
s.addTask(t2)
|
s.addTask(t2)
|
||||||
assert.Equal(t, len(s.tasks), 2)
|
assert.Equal(t, len(s.tasks), 2)
|
||||||
|
|
|
@ -14,7 +14,7 @@ import (
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/docker/swarm/cluster"
|
"github.com/docker/swarm/cluster"
|
||||||
"github.com/docker/swarm/cluster/mesos/queue"
|
"github.com/docker/swarm/cluster/mesos/task"
|
||||||
"github.com/docker/swarm/scheduler"
|
"github.com/docker/swarm/scheduler"
|
||||||
"github.com/docker/swarm/scheduler/node"
|
"github.com/docker/swarm/scheduler/node"
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
|
@ -37,7 +37,7 @@ type Cluster struct {
|
||||||
offerTimeout time.Duration
|
offerTimeout time.Duration
|
||||||
refuseTimeout time.Duration
|
refuseTimeout time.Duration
|
||||||
taskCreationTimeout time.Duration
|
taskCreationTimeout time.Duration
|
||||||
pendingTasks *queue.Queue
|
pendingTasks *task.Tasks
|
||||||
engineOpts *cluster.EngineOpts
|
engineOpts *cluster.EngineOpts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,7 +77,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st
|
||||||
refuseTimeout: defaultRefuseTimeout,
|
refuseTimeout: defaultRefuseTimeout,
|
||||||
}
|
}
|
||||||
|
|
||||||
cluster.pendingTasks = queue.NewQueue()
|
cluster.pendingTasks = task.NewTasks(cluster)
|
||||||
|
|
||||||
// Empty string is accepted by the scheduler.
|
// Empty string is accepted by the scheduler.
|
||||||
user, _ := options.String("mesos.user", "SWARM_MESOS_USER")
|
user, _ := options.String("mesos.user", "SWARM_MESOS_USER")
|
||||||
|
@ -173,7 +173,7 @@ func (c *Cluster) CreateContainer(config *cluster.ContainerConfig, name string,
|
||||||
return nil, errResourcesNeeded
|
return nil, errResourcesNeeded
|
||||||
}
|
}
|
||||||
|
|
||||||
task, err := newTask(c, config, name)
|
task, err := task.NewTask(config, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -181,9 +181,9 @@ func (c *Cluster) CreateContainer(config *cluster.ContainerConfig, name string,
|
||||||
go c.pendingTasks.Add(task)
|
go c.pendingTasks.Add(task)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case container := <-task.container:
|
case container := <-task.GetContainer():
|
||||||
return formatContainer(container), nil
|
return formatContainer(container), nil
|
||||||
case err := <-task.error:
|
case err := <-task.Error:
|
||||||
return nil, err
|
return nil, err
|
||||||
case <-time.After(c.taskCreationTimeout):
|
case <-time.After(c.taskCreationTimeout):
|
||||||
c.pendingTasks.Remove(task)
|
c.pendingTasks.Remove(task)
|
||||||
|
@ -446,11 +446,12 @@ func (c *Cluster) removeOffer(offer *mesosproto.Offer) bool {
|
||||||
return found
|
return found
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) scheduleTask(t *task) bool {
|
// LaunchTask method selects node and calls driver to launch a task
|
||||||
|
func (c *Cluster) LaunchTask(t *task.Task) bool {
|
||||||
c.scheduler.Lock()
|
c.scheduler.Lock()
|
||||||
//change to explicit lock defer c.scheduler.Unlock()
|
//change to explicit lock defer c.scheduler.Unlock()
|
||||||
|
|
||||||
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), t.config)
|
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), t.GetConfig())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.scheduler.Unlock()
|
c.scheduler.Unlock()
|
||||||
return false
|
return false
|
||||||
|
@ -458,7 +459,7 @@ func (c *Cluster) scheduleTask(t *task) bool {
|
||||||
n := nodes[0]
|
n := nodes[0]
|
||||||
s, ok := c.agents[n.ID]
|
s, ok := c.agents[n.ID]
|
||||||
if !ok {
|
if !ok {
|
||||||
t.error <- fmt.Errorf("Unable to create on agent %q", n.ID)
|
t.Error <- fmt.Errorf("Unable to create on agent %q", n.ID)
|
||||||
c.scheduler.Unlock()
|
c.scheduler.Unlock()
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -472,7 +473,7 @@ func (c *Cluster) scheduleTask(t *task) bool {
|
||||||
offerIDs = append(offerIDs, offer.Id)
|
offerIDs = append(offerIDs, offer.Id)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.build(n.ID, c.agents[n.ID].offers)
|
t.Build(n.ID, c.agents[n.ID].offers)
|
||||||
|
|
||||||
offerFilters := &mesosproto.Filters{}
|
offerFilters := &mesosproto.Filters{}
|
||||||
refuseSeconds := c.refuseTimeout.Seconds()
|
refuseSeconds := c.refuseTimeout.Seconds()
|
||||||
|
@ -485,7 +486,7 @@ func (c *Cluster) scheduleTask(t *task) bool {
|
||||||
}
|
}
|
||||||
c.Unlock()
|
c.Unlock()
|
||||||
c.scheduler.Unlock()
|
c.scheduler.Unlock()
|
||||||
t.error <- err
|
t.Error <- err
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -498,18 +499,18 @@ func (c *Cluster) scheduleTask(t *task) bool {
|
||||||
c.Unlock()
|
c.Unlock()
|
||||||
c.scheduler.Unlock()
|
c.scheduler.Unlock()
|
||||||
// block until we get the container
|
// block until we get the container
|
||||||
finished, data, err := t.monitor()
|
finished, data, err := t.Monitor()
|
||||||
taskID := t.TaskInfo.TaskId.GetValue()
|
taskID := t.TaskInfo.TaskId.GetValue()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
//remove task
|
//remove task
|
||||||
s.removeTask(taskID)
|
s.removeTask(taskID)
|
||||||
t.error <- err
|
t.Error <- err
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if !finished {
|
if !finished {
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
finished, _, err := t.monitor()
|
finished, _, err := t.Monitor()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO do a better log by sending proper error message
|
// TODO do a better log by sending proper error message
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
|
@ -531,8 +532,8 @@ func (c *Cluster) scheduleTask(t *task) bool {
|
||||||
if data != nil && json.Unmarshal(data, &inspect) == nil && len(inspect) == 1 {
|
if data != nil && json.Unmarshal(data, &inspect) == nil && len(inspect) == 1 {
|
||||||
container := &cluster.Container{Container: dockerclient.Container{Id: inspect[0].Id}, Engine: s.engine}
|
container := &cluster.Container{Container: dockerclient.Container{Id: inspect[0].Id}, Engine: s.engine}
|
||||||
if container, err := container.Refresh(); err == nil {
|
if container, err := container.Refresh(); err == nil {
|
||||||
if !t.done {
|
if !t.Stopped() {
|
||||||
t.container <- container
|
t.SetContainer(container)
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -546,15 +547,15 @@ func (c *Cluster) scheduleTask(t *task) bool {
|
||||||
|
|
||||||
for _, container := range s.engine.Containers() {
|
for _, container := range s.engine.Containers() {
|
||||||
if container.Config.Labels[cluster.SwarmLabelNamespace+".mesos.task"] == taskID {
|
if container.Config.Labels[cluster.SwarmLabelNamespace+".mesos.task"] == taskID {
|
||||||
if !t.done {
|
if !t.Stopped() {
|
||||||
t.container <- container
|
t.SetContainer(container)
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !t.done {
|
if !t.Stopped() {
|
||||||
t.error <- fmt.Errorf("Container failed to create")
|
t.Error <- fmt.Errorf("Container failed to create")
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,58 +0,0 @@
|
||||||
package queue
|
|
||||||
|
|
||||||
import "sync"
|
|
||||||
|
|
||||||
// Item represents a simple item in the queue
|
|
||||||
type Item interface {
|
|
||||||
ID() string
|
|
||||||
Do() bool
|
|
||||||
Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Queue is a simple item queue
|
|
||||||
type Queue struct {
|
|
||||||
sync.Mutex
|
|
||||||
items map[string]Item
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewQueue returns a new queue
|
|
||||||
func NewQueue() *Queue {
|
|
||||||
return &Queue{items: make(map[string]Item)}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add tries to Do the item, if it's not possible, add the item to the queue for future tries
|
|
||||||
func (q *Queue) Add(item Item) {
|
|
||||||
if !item.Do() {
|
|
||||||
q.Lock()
|
|
||||||
q.items[item.ID()] = item
|
|
||||||
q.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove an item from the queue
|
|
||||||
func (q *Queue) Remove(items ...Item) {
|
|
||||||
q.Lock()
|
|
||||||
q.remove(items...)
|
|
||||||
q.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process tries to Do all the items in the queue and remove the items successfully done
|
|
||||||
func (q *Queue) Process() {
|
|
||||||
q.Lock()
|
|
||||||
toRemove := []Item{}
|
|
||||||
for _, item := range q.items {
|
|
||||||
if item.Do() {
|
|
||||||
toRemove = append(toRemove, item)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
q.remove(toRemove...)
|
|
||||||
q.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *Queue) remove(items ...Item) {
|
|
||||||
for _, item := range items {
|
|
||||||
item.Stop()
|
|
||||||
delete(q.items, item.ID())
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,62 +0,0 @@
|
||||||
package queue
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
type item struct {
|
|
||||||
id string
|
|
||||||
count int
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *item) ID() string {
|
|
||||||
return i.id
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *item) Stop() {}
|
|
||||||
|
|
||||||
func (i *item) Do() bool {
|
|
||||||
i.count = i.count - 1
|
|
||||||
return i.count == 0
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAdd(t *testing.T) {
|
|
||||||
q := NewQueue()
|
|
||||||
|
|
||||||
q.Add(&item{"id1", 1})
|
|
||||||
assert.Equal(t, len(q.items), 0)
|
|
||||||
|
|
||||||
q.Add(&item{"id2", 2})
|
|
||||||
assert.Equal(t, len(q.items), 1)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRemove(t *testing.T) {
|
|
||||||
q := NewQueue()
|
|
||||||
|
|
||||||
i := &item{"id1", 2}
|
|
||||||
q.Add(i)
|
|
||||||
assert.Equal(t, len(q.items), 1)
|
|
||||||
q.Remove(i)
|
|
||||||
assert.Equal(t, len(q.items), 0)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestProcess(t *testing.T) {
|
|
||||||
q := NewQueue()
|
|
||||||
|
|
||||||
q.Add(&item{"id1", 2})
|
|
||||||
assert.Equal(t, len(q.items), 1)
|
|
||||||
q.Process()
|
|
||||||
assert.Equal(t, len(q.items), 0)
|
|
||||||
|
|
||||||
q.Add(&item{"id2", 3})
|
|
||||||
assert.Equal(t, len(q.items), 1)
|
|
||||||
q.Process()
|
|
||||||
assert.Equal(t, len(q.items), 1)
|
|
||||||
q.Process()
|
|
||||||
assert.Equal(t, len(q.items), 0)
|
|
||||||
|
|
||||||
}
|
|
|
@ -104,7 +104,7 @@ func (s *Scheduler) StatusUpdate(_ mesosscheduler.SchedulerDriver, taskStatus *m
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if task, ok := a.tasks[taskID]; ok {
|
if task, ok := a.tasks[taskID]; ok {
|
||||||
task.sendStatus(taskStatus)
|
task.SendStatus(taskStatus)
|
||||||
} else {
|
} else {
|
||||||
var reason = ""
|
var reason = ""
|
||||||
if taskStatus.Reason != nil {
|
if taskStatus.Reason != nil {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package mesos
|
package task
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -14,32 +14,52 @@ import (
|
||||||
"github.com/mesos/mesos-go/mesosutil"
|
"github.com/mesos/mesos-go/mesosutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
type task struct {
|
// Task struct inherits from TaskInfo and represents a mesos task
|
||||||
|
type Task struct {
|
||||||
mesosproto.TaskInfo
|
mesosproto.TaskInfo
|
||||||
|
|
||||||
cluster *Cluster
|
|
||||||
|
|
||||||
updates chan *mesosproto.TaskStatus
|
updates chan *mesosproto.TaskStatus
|
||||||
|
|
||||||
config *cluster.ContainerConfig
|
config *cluster.ContainerConfig
|
||||||
error chan error
|
Error chan error
|
||||||
container chan *cluster.Container
|
container chan *cluster.Container
|
||||||
done bool
|
done bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *task) ID() string {
|
// GetContainer returns the container channel from the task
|
||||||
|
// where the Swarm API sends the created container
|
||||||
|
func (t *Task) GetContainer() chan *cluster.Container {
|
||||||
|
return t.container
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetContainer writes on the container channel from the task
|
||||||
|
func (t *Task) SetContainer(container *cluster.Container) {
|
||||||
|
t.container <- container
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetConfig returns the container configuration of the task
|
||||||
|
func (t *Task) GetConfig() *cluster.ContainerConfig {
|
||||||
|
return t.config
|
||||||
|
}
|
||||||
|
|
||||||
|
// ID method returns the taskId
|
||||||
|
func (t *Task) ID() string {
|
||||||
return t.TaskId.GetValue()
|
return t.TaskId.GetValue()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *task) Do() bool {
|
// Stopped method returns a boolean determining if the task
|
||||||
return t.cluster.scheduleTask(t)
|
// is done
|
||||||
|
func (t *Task) Stopped() bool {
|
||||||
|
return t.done
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *task) Stop() {
|
// Stop method sets the boolean determining if the task is done
|
||||||
|
func (t *Task) Stop() {
|
||||||
t.done = true
|
t.done = true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *task) build(slaveID string, offers map[string]*mesosproto.Offer) {
|
// Build method builds the task
|
||||||
|
func (t *Task) Build(slaveID string, offers map[string]*mesosproto.Offer) {
|
||||||
t.Command = &mesosproto.CommandInfo{Shell: proto.Bool(false)}
|
t.Command = &mesosproto.CommandInfo{Shell: proto.Bool(false)}
|
||||||
|
|
||||||
t.Container = &mesosproto.ContainerInfo{
|
t.Container = &mesosproto.ContainerInfo{
|
||||||
|
@ -141,7 +161,8 @@ func (t *task) build(slaveID string, offers map[string]*mesosproto.Offer) {
|
||||||
t.SlaveId = &mesosproto.SlaveID{Value: &slaveID}
|
t.SlaveId = &mesosproto.SlaveID{Value: &slaveID}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTask(c *Cluster, config *cluster.ContainerConfig, name string) (*task, error) {
|
// NewTask fucntion creates a task
|
||||||
|
func NewTask(config *cluster.ContainerConfig, name string) (*Task, error) {
|
||||||
id := stringid.TruncateID(stringid.GenerateRandomID())
|
id := stringid.TruncateID(stringid.GenerateRandomID())
|
||||||
|
|
||||||
if name != "" {
|
if name != "" {
|
||||||
|
@ -152,11 +173,10 @@ func newTask(c *Cluster, config *cluster.ContainerConfig, name string) (*task, e
|
||||||
// FIXME: once Mesos changes merged no need to save the task id to know which container we launched
|
// FIXME: once Mesos changes merged no need to save the task id to know which container we launched
|
||||||
config.Labels[cluster.SwarmLabelNamespace+".mesos.task"] = id
|
config.Labels[cluster.SwarmLabelNamespace+".mesos.task"] = id
|
||||||
|
|
||||||
task := &task{
|
task := &Task{
|
||||||
cluster: c,
|
|
||||||
config: config,
|
config: config,
|
||||||
container: make(chan *cluster.Container),
|
container: make(chan *cluster.Container),
|
||||||
error: make(chan error),
|
Error: make(chan error),
|
||||||
updates: make(chan *mesosproto.TaskStatus),
|
updates: make(chan *mesosproto.TaskStatus),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,16 +186,19 @@ func newTask(c *Cluster, config *cluster.ContainerConfig, name string) (*task, e
|
||||||
return task, nil
|
return task, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *task) sendStatus(status *mesosproto.TaskStatus) {
|
// SendStatus method writes the task status in the updates channel
|
||||||
|
func (t *Task) SendStatus(status *mesosproto.TaskStatus) {
|
||||||
t.updates <- status
|
t.updates <- status
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *task) getStatus() *mesosproto.TaskStatus {
|
// GetStatus method reads the task status on the updates channel
|
||||||
|
func (t *Task) GetStatus() *mesosproto.TaskStatus {
|
||||||
return <-t.updates
|
return <-t.updates
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *task) monitor() (bool, []byte, error) {
|
// Monitor method monitors task statuses
|
||||||
taskStatus := t.getStatus()
|
func (t *Task) Monitor() (bool, []byte, error) {
|
||||||
|
taskStatus := t.GetStatus()
|
||||||
|
|
||||||
switch taskStatus.GetState() {
|
switch taskStatus.GetState() {
|
||||||
case mesosproto.TaskState_TASK_STAGING:
|
case mesosproto.TaskState_TASK_STAGING:
|
|
@ -1,4 +1,4 @@
|
||||||
package mesos
|
package task
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sort"
|
"sort"
|
||||||
|
@ -15,7 +15,7 @@ import (
|
||||||
const name = "mesos-swarm-task-name"
|
const name = "mesos-swarm-task-name"
|
||||||
|
|
||||||
func TestBuild(t *testing.T) {
|
func TestBuild(t *testing.T) {
|
||||||
task, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{
|
task, err := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{
|
||||||
Image: "test-image",
|
Image: "test-image",
|
||||||
CpuShares: 42,
|
CpuShares: 42,
|
||||||
Memory: 2097152,
|
Memory: 2097152,
|
||||||
|
@ -23,7 +23,7 @@ func TestBuild(t *testing.T) {
|
||||||
}), name)
|
}), name)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
task.build("slave-id", nil)
|
task.Build("slave-id", nil)
|
||||||
|
|
||||||
assert.Equal(t, task.Container.GetType(), mesosproto.ContainerInfo_DOCKER)
|
assert.Equal(t, task.Container.GetType(), mesosproto.ContainerInfo_DOCKER)
|
||||||
assert.Equal(t, task.Container.Docker.GetImage(), "test-image")
|
assert.Equal(t, task.Container.Docker.GetImage(), "test-image")
|
||||||
|
@ -47,7 +47,7 @@ func TestBuild(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewTask(t *testing.T) {
|
func TestNewTask(t *testing.T) {
|
||||||
task, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), name)
|
task, err := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), name)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
assert.Equal(t, *task.Name, name)
|
assert.Equal(t, *task.Name, name)
|
||||||
|
@ -56,13 +56,13 @@ func TestNewTask(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSendGetStatus(t *testing.T) {
|
func TestSendGetStatus(t *testing.T) {
|
||||||
task, err := newTask(nil, cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "")
|
task, err := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), "")
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
status := mesosutil.NewTaskStatus(nil, mesosproto.TaskState_TASK_RUNNING)
|
status := mesosutil.NewTaskStatus(nil, mesosproto.TaskState_TASK_RUNNING)
|
||||||
|
|
||||||
go func() { task.sendStatus(status) }()
|
go func() { task.SendStatus(status) }()
|
||||||
s := task.getStatus()
|
s := task.GetStatus()
|
||||||
|
|
||||||
assert.Equal(t, s, status)
|
assert.Equal(t, s, status)
|
||||||
}
|
}
|
|
@ -0,0 +1,61 @@
|
||||||
|
package task
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
type launcher interface {
|
||||||
|
LaunchTask(t *Task) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tasks is a simple map of tasks
|
||||||
|
type Tasks struct {
|
||||||
|
sync.Mutex
|
||||||
|
|
||||||
|
cluster launcher
|
||||||
|
Tasks map[string]*Task
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTasks returns a new tasks
|
||||||
|
func NewTasks(cluster launcher) *Tasks {
|
||||||
|
return &Tasks{
|
||||||
|
Tasks: make(map[string]*Task),
|
||||||
|
cluster: cluster,
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add tries to Do the Task, if it's not possible, add the Task to the tasks for future tries
|
||||||
|
func (t *Tasks) Add(task *Task) {
|
||||||
|
if !t.cluster.LaunchTask(task) {
|
||||||
|
t.Lock()
|
||||||
|
t.Tasks[task.ID()] = task
|
||||||
|
t.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove an Task from the tasks
|
||||||
|
func (t *Tasks) Remove(tasks ...*Task) {
|
||||||
|
t.Lock()
|
||||||
|
t.remove(tasks...)
|
||||||
|
t.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process tries to Do all the Tasks in the tasks and remove the Tasks successfully done
|
||||||
|
func (t *Tasks) Process() {
|
||||||
|
t.Lock()
|
||||||
|
toRemove := []*Task{}
|
||||||
|
for _, task := range t.Tasks {
|
||||||
|
if t.cluster.LaunchTask(task) {
|
||||||
|
toRemove = append(toRemove, task)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.remove(toRemove...)
|
||||||
|
t.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Tasks) remove(tasks ...*Task) {
|
||||||
|
for _, task := range tasks {
|
||||||
|
task.Stop()
|
||||||
|
delete(t.Tasks, task.ID())
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,76 @@
|
||||||
|
package task
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/docker/swarm/cluster"
|
||||||
|
"github.com/samalba/dockerclient"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testLauncher struct {
|
||||||
|
count int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testLauncher) LaunchTask(_ *Task) bool {
|
||||||
|
t.count = t.count - 1
|
||||||
|
return t.count == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAdd(t *testing.T) {
|
||||||
|
q := NewTasks(&testLauncher{count: 1})
|
||||||
|
|
||||||
|
task1, _ := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{
|
||||||
|
Image: "test-image",
|
||||||
|
CpuShares: 42,
|
||||||
|
Memory: 2097152,
|
||||||
|
Cmd: []string{"ls", "foo", "bar"},
|
||||||
|
}), "name1")
|
||||||
|
|
||||||
|
task2, _ := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{
|
||||||
|
Image: "test-image",
|
||||||
|
CpuShares: 42,
|
||||||
|
Memory: 2097152,
|
||||||
|
Cmd: []string{"ls", "foo", "bar"},
|
||||||
|
}), "name2")
|
||||||
|
q.Add(task1)
|
||||||
|
assert.Equal(t, len(q.Tasks), 0)
|
||||||
|
|
||||||
|
q.Add(task2)
|
||||||
|
assert.Equal(t, len(q.Tasks), 1)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRemove(t *testing.T) {
|
||||||
|
q := NewTasks(&testLauncher{count: 2})
|
||||||
|
task1, _ := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{
|
||||||
|
Image: "test-image",
|
||||||
|
CpuShares: 42,
|
||||||
|
Memory: 2097152,
|
||||||
|
Cmd: []string{"ls", "foo", "bar"},
|
||||||
|
}), "name1")
|
||||||
|
|
||||||
|
q.Add(task1)
|
||||||
|
assert.Equal(t, len(q.Tasks), 1)
|
||||||
|
q.Remove(task1)
|
||||||
|
assert.Equal(t, len(q.Tasks), 0)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProcess(t *testing.T) {
|
||||||
|
q := NewTasks(&testLauncher{count: 3})
|
||||||
|
task1, _ := NewTask(cluster.BuildContainerConfig(dockerclient.ContainerConfig{
|
||||||
|
Image: "test-image",
|
||||||
|
CpuShares: 42,
|
||||||
|
Memory: 2097152,
|
||||||
|
Cmd: []string{"ls", "foo", "bar"},
|
||||||
|
}), "name1")
|
||||||
|
|
||||||
|
q.Add(task1)
|
||||||
|
assert.Equal(t, len(q.Tasks), 1)
|
||||||
|
q.Process()
|
||||||
|
assert.Equal(t, len(q.Tasks), 1)
|
||||||
|
q.Process()
|
||||||
|
assert.Equal(t, len(q.Tasks), 0)
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,16 @@
|
||||||
|
package task
|
||||||
|
|
||||||
|
import "github.com/mesos/mesos-go/mesosproto"
|
||||||
|
|
||||||
|
func getPorts(offer *mesosproto.Offer) (ports []uint64) {
|
||||||
|
for _, resource := range offer.Resources {
|
||||||
|
if resource.GetName() == "ports" {
|
||||||
|
for _, rang := range resource.GetRanges().GetRange() {
|
||||||
|
for i := rang.GetBegin(); i <= rang.GetEnd(); i++ {
|
||||||
|
ports = append(ports, i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ports
|
||||||
|
}
|
|
@ -37,16 +37,3 @@ func sumScalarResourceValue(offers map[string]*mesosproto.Offer, name string) fl
|
||||||
}
|
}
|
||||||
return value
|
return value
|
||||||
}
|
}
|
||||||
|
|
||||||
func getPorts(offer *mesosproto.Offer) (ports []uint64) {
|
|
||||||
for _, resource := range offer.Resources {
|
|
||||||
if resource.GetName() == "ports" {
|
|
||||||
for _, rang := range resource.GetRanges().GetRange() {
|
|
||||||
for i := rang.GetBegin(); i <= rang.GetEnd(); i++ {
|
|
||||||
ports = append(ports, i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ports
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue