diff --git a/cluster/mesos/cluster.go b/cluster/mesos/cluster.go index 03f9f352e5..662d6559d5 100644 --- a/cluster/mesos/cluster.go +++ b/cluster/mesos/cluster.go @@ -514,7 +514,9 @@ func (c *Cluster) scheduleTask(t *task) bool { if data != nil && json.Unmarshal(data, &inspect) == nil && len(inspect) == 1 { container := &cluster.Container{Container: dockerclient.Container{Id: inspect[0].Id}, Engine: s.engine} if container, err := container.Refresh(); err == nil { - t.container <- container + if !t.done { + t.container <- container + } return true } } @@ -527,12 +529,16 @@ func (c *Cluster) scheduleTask(t *task) bool { for _, container := range s.engine.Containers() { if container.Config.Labels[cluster.SwarmLabelNamespace+".mesos.task"] == taskID { - t.container <- container + if !t.done { + t.container <- container + } return true } } - t.error <- fmt.Errorf("Container failed to create") + if !t.done { + t.error <- fmt.Errorf("Container failed to create") + } return true } diff --git a/cluster/mesos/queue/queue.go b/cluster/mesos/queue/queue.go index c1684ab976..1eedf9e195 100644 --- a/cluster/mesos/queue/queue.go +++ b/cluster/mesos/queue/queue.go @@ -6,6 +6,7 @@ import "sync" type Item interface { ID() string Do() bool + Stop() } // Queue is a simple item queue @@ -51,6 +52,7 @@ func (q *Queue) Process() { func (q *Queue) remove(items ...Item) { for _, item := range items { + item.Stop() delete(q.items, item.ID()) } } diff --git a/cluster/mesos/queue/queue_test.go b/cluster/mesos/queue/queue_test.go index 3ae415730b..cf52044215 100644 --- a/cluster/mesos/queue/queue_test.go +++ b/cluster/mesos/queue/queue_test.go @@ -15,6 +15,8 @@ func (i *item) ID() string { return i.id } +func (i *item) Stop() {} + func (i *item) Do() bool { i.count = i.count - 1 return i.count == 0 diff --git a/cluster/mesos/task.go b/cluster/mesos/task.go index 54cbe9218a..05b9435918 100644 --- a/cluster/mesos/task.go +++ b/cluster/mesos/task.go @@ -24,6 +24,7 @@ type task struct { config *cluster.ContainerConfig error chan error container chan *cluster.Container + done bool } func (t *task) ID() string { @@ -34,6 +35,10 @@ func (t *task) Do() bool { return t.cluster.scheduleTask(t) } +func (t *task) Stop() { + t.done = true +} + func (t *task) build(slaveID string, offers map[string]*mesosproto.Offer) { t.Command = &mesosproto.CommandInfo{Shell: proto.Bool(false)}