use inspect sent by mesos 0.23

Signed-off-by: Victor Vieux <victorvieux@gmail.com>
This commit is contained in:
Victor Vieux 2015-06-01 15:16:23 -07:00
parent 3cc53769ec
commit 4212b69c96
2 changed files with 26 additions and 14 deletions

View File

@ -2,6 +2,7 @@ package mesos
import ( import (
"crypto/tls" "crypto/tls"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -397,7 +398,7 @@ func (c *Cluster) scheduleTask(t *task) bool {
} }
c.Unlock() c.Unlock()
// block until we get the container // block until we get the container
finished, err := t.monitor() finished, data, err := t.monitor()
taskID := t.TaskInfo.TaskId.GetValue() taskID := t.TaskInfo.TaskId.GetValue()
if err != nil { if err != nil {
//remove task //remove task
@ -408,7 +409,7 @@ func (c *Cluster) scheduleTask(t *task) bool {
if !finished { if !finished {
go func() { go func() {
for { for {
finished, err := t.monitor() finished, _, err := t.monitor()
if err != nil { if err != nil {
// TODO do a better log by sending proper error message // TODO do a better log by sending proper error message
log.Error(err) log.Error(err)
@ -423,18 +424,29 @@ func (c *Cluster) scheduleTask(t *task) bool {
} }
// Register the container immediately while waiting for a state refresh. // Register the container immediately while waiting for a state refresh.
// Force a state refresh to pick up the newly created container.
// FIXME: unexport this method, see FIXME in engine.go // In mesos 0.23+ the docker inspect will be sent back in the taskStatus.Data
// We can use this to find the right container.
inspect := []dockerclient.ContainerInfo{}
if data != nil && json.Unmarshal(data, &inspect) != nil && len(inspect) == 1 {
container := &cluster.Container{Container: dockerclient.Container{Id: inspect[0].Id}, Engine: s.engine}
if container.Refresh() == nil {
t.container <- container
return true
}
}
log.Debug("Cannot parse docker info from task status, please upgrade Mesos to the last version")
// For mesos <= 0.22 we fallback to a full refresh + using labels
// TODO: once 0.23 or 0.24 is released, remove all this block of code as it
// doesn't scale very well.
s.engine.RefreshContainers(true) s.engine.RefreshContainers(true)
// TODO: We have to return the right container that was just created.
// Once we receive the ContainerID from the executor.
for _, container := range s.engine.Containers() { for _, container := range s.engine.Containers() {
if container.Config.Labels[cluster.SwarmLabelNamespace+".mesos.task"] == taskID { if container.Config.Labels[cluster.SwarmLabelNamespace+".mesos.task"] == taskID {
t.container <- container t.container <- container
return true return true
} }
// TODO save in store
} }
t.error <- fmt.Errorf("Container failed to create") t.error <- fmt.Errorf("Container failed to create")

View File

@ -140,7 +140,7 @@ func (t *task) getStatus() *mesosproto.TaskStatus {
return <-t.updates return <-t.updates
} }
func (t *task) monitor() (bool, error) { func (t *task) monitor() (bool, []byte, error) {
taskStatus := t.getStatus() taskStatus := t.getStatus()
switch taskStatus.GetState() { switch taskStatus.GetState() {
@ -148,16 +148,16 @@ func (t *task) monitor() (bool, error) {
case mesosproto.TaskState_TASK_STARTING: case mesosproto.TaskState_TASK_STARTING:
case mesosproto.TaskState_TASK_RUNNING: case mesosproto.TaskState_TASK_RUNNING:
case mesosproto.TaskState_TASK_FINISHED: case mesosproto.TaskState_TASK_FINISHED:
return true, nil return true, taskStatus.Data, nil
case mesosproto.TaskState_TASK_FAILED: case mesosproto.TaskState_TASK_FAILED:
return true, errors.New(taskStatus.GetMessage()) return true, nil, errors.New(taskStatus.GetMessage())
case mesosproto.TaskState_TASK_KILLED: case mesosproto.TaskState_TASK_KILLED:
return true, nil return true, taskStatus.Data, nil
case mesosproto.TaskState_TASK_LOST: case mesosproto.TaskState_TASK_LOST:
return true, errors.New(taskStatus.GetMessage()) return true, nil, errors.New(taskStatus.GetMessage())
case mesosproto.TaskState_TASK_ERROR: case mesosproto.TaskState_TASK_ERROR:
return true, errors.New(taskStatus.GetMessage()) return true, nil, errors.New(taskStatus.GetMessage())
} }
return false, nil return false, taskStatus.Data, nil
} }