Adding integration tests, decline offers after 'SWARM_MESOS_OFFER_TIMEOUT', Tracking tasks and managing offers, refactoring list of slaves, queue create requests before processing them

Signed-off-by: Isabel Jimenez <contact.isabeljimenez@gmail.com>
Signed-off-by: Victor Vieux <vieux@docker.com>
This commit is contained in:
Isabel Jimenez 2015-04-21 14:58:01 -04:00
parent 20a5a7e588
commit 19c14fde2e
15 changed files with 695 additions and 307 deletions

View File

@ -44,7 +44,11 @@ Arguments:
Options:
{{range .Flags}}{{.}}
{{end}}{{if (eq .Name "manage")}}{{printf "\t * swarm.overcommit=0.05\tovercommit to apply on resources"}}
{{end}}{{ end }}
{{printf "\t * swarm.discovery.heartbeat=25s\tperiod between each heartbeat"}}
{{printf "\t * mesos.address=\taddress to bind on [$SWARM_MESOS_ADDRESS]"}}
{{printf "\t * mesos.port=\tport to bind on [$SWARM_MESOS_PORT]"}}
{{printf "\t * mesos.offertimeout=10m\ttimeout for offers [$SWARM_MESOS_OFFER_TIMEOUT]"}}
{{printf "\t * mesos.user=\tframework user [$SWARM_MESOS_USER]"}}{{end}}{{ end }}
`
}

View File

@ -189,21 +189,18 @@ func manage(c *cli.Context) {
}
sched := scheduler.New(s, fs)
cluster, err := swarm.NewCluster(sched, store, tlsConfig, discovery, c.StringSlice("cluster-opt"))
if err != nil {
log.Fatal(err)
}
var cl cluster.Cluster
switch c.String("cluster") {
switch c.String("cluster-driver") {
case "mesos":
cl = mesos.NewCluster(sched, store, options)
cl, err = mesos.NewCluster(sched, store, tlsConfig, discovery, c.StringSlice("cluster-opt"))
case "swarm":
cl = swarm.NewCluster(sched, store, options)
cl, err = swarm.NewCluster(sched, store, tlsConfig, discovery, c.StringSlice("cluster-opt"))
default:
log.Fatalf("Unsupported cluster %q", c.String("cluster"))
}
if err != nil {
log.Fatal(err)
}
// see https://github.com/codegangsta/cli/issues/160
hosts := c.StringSlice("host")

View File

@ -108,3 +108,14 @@ func (c *ContainerConfig) Affinities() []string {
func (c *ContainerConfig) Constraints() []string {
return c.extractExprs("constraints")
}
// NamespacedLabel returns a label within `namespace` from the Config.
// May return an empty string if not set.
func (c *ContainerConfig) NamespacedLabel(key string) string {
return c.Labels[namespace+"."+key]
}
// SetNamespacedLabel sets or overrides a label within `namespace` in the Config.
func (c *ContainerConfig) SetNamespacedLabel(key, value string) {
c.Labels[namespace+"."+key] = value
}

View File

@ -182,6 +182,7 @@ func (e *Engine) RefreshImages() error {
// RefreshContainers will refresh the list and status of containers running on the engine. If `full` is
// true, each container will be inspected.
// FIXME: unexport this method after mesos scheduler stops using it directly
func (e *Engine) RefreshContainers(full bool) error {
containers, err := e.client.ListContainers(true, false, "")
if err != nil {
@ -285,7 +286,7 @@ func (e *Engine) refreshLoop() {
return
}
err = e.refreshContainers(false)
err = e.RefreshContainers(false)
if err == nil {
err = e.RefreshImages()
}

View File

@ -1,19 +1,21 @@
package mesos
import (
"crypto/tls"
"errors"
"fmt"
"net"
"os"
"io"
"sort"
"strconv"
"strings"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/units"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler"
"github.com/docker/swarm/scheduler/node"
"github.com/docker/swarm/scheduler/strategy"
"github.com/docker/swarm/state"
"github.com/mesos/mesos-go/mesosproto"
mesosscheduler "github.com/mesos/mesos-go/scheduler"
@ -29,77 +31,84 @@ type Cluster struct {
eventHandler cluster.EventHandler
slaves map[string]*slave
scheduler *scheduler.Scheduler
options *cluster.Options
options *cluster.DriverOpts
store *state.Store
TLSConfig *tls.Config
master string
pendingTasks *queue
offerTimeout time.Duration
}
var (
frameworkName = "swarm"
dockerDaemonPort = "2375"
errNotSupported = errors.New("not supported with mesos")
)
// NewCluster for mesos Cluster creation
func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, options *cluster.Options) cluster.Cluster {
func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *tls.Config, master string, options cluster.DriverOpts) (cluster.Cluster, error) {
log.WithFields(log.Fields{"name": "mesos"}).Debug("Initializing cluster")
cluster := &Cluster{
slaves: make(map[string]*slave),
scheduler: scheduler,
options: options,
store: store,
slaves: make(map[string]*slave),
scheduler: scheduler,
options: &options,
store: store,
master: master,
TLSConfig: TLSConfig,
offerTimeout: 10 * time.Minute,
}
cluster.pendingTasks = &queue{c: cluster}
// Empty string is accepted by the scheduler.
user := os.Getenv("SWARM_MESOS_USER")
user, _ := options.String("mesos.user", "SWARM_MESOS_USER")
driverConfig := mesosscheduler.DriverConfig{
Scheduler: cluster,
Framework: &mesosproto.FrameworkInfo{Name: &frameworkName, User: &user},
Master: options.Discovery,
Master: cluster.master,
}
// Changing port for https
if options.TLSConfig != nil {
if cluster.TLSConfig != nil {
dockerDaemonPort = "2376"
}
bindingAddressEnv := os.Getenv("SWARM_MESOS_ADDRESS")
bindingPortEnv := os.Getenv("SWARM_MESOS_PORT")
if bindingPortEnv != "" {
log.Debugf("SWARM_MESOS_PORT found, Binding port to %s", bindingPortEnv)
bindingPort, err := strconv.ParseUint(bindingPortEnv, 0, 16)
if err != nil {
log.Errorf("Unable to parse SWARM_MESOS_PORT, error: %s", err)
return nil
}
if bindingPort, ok := options.Uint("mesos.port", "SWARM_MESOS_PORT"); ok {
driverConfig.BindingPort = uint16(bindingPort)
}
if bindingAddressEnv != "" {
log.Debugf("SWARM_MESOS_ADDRESS found, Binding address to %s", bindingAddressEnv)
bindingAddress := net.ParseIP(bindingAddressEnv)
if bindingAddress, ok := options.IP("mesos.address", "SWARM_MESOS_ADDRESS"); ok {
if bindingAddress == nil {
log.Error("Unable to parse SWARM_MESOS_ADDRESS")
return nil
return nil, fmt.Errorf("invalid address %s", bindingAddress)
}
driverConfig.BindingAddress = bindingAddress
}
if offerTimeout, ok := options.String("mesos.offertimeout", "SWARM_MESOS_OFFER_TIMEOUT"); ok {
d, err := time.ParseDuration(offerTimeout)
if err != nil {
return nil, err
}
cluster.offerTimeout = d
}
driver, err := mesosscheduler.NewMesosSchedulerDriver(driverConfig)
if err != nil {
return nil
return nil, err
}
cluster.driver = driver
status, err := driver.Start()
log.Debugf("Mesos driver started, status/err %v: %v", status, err)
if err != nil {
return nil
log.Debugf("Mesos driver started, status/err %v: %v", status, err)
return nil, err
}
log.Debugf("Mesos driver started, status %v", status)
return cluster
return cluster, nil
}
// RegisterEventHandler registers an event handler.
@ -111,34 +120,47 @@ func (c *Cluster) RegisterEventHandler(h cluster.EventHandler) error {
return nil
}
// CreateContainer for container creation
func (c *Cluster) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) {
// CreateContainer for container creation in Mesos task
func (c *Cluster) CreateContainer(config *cluster.ContainerConfig, name string) (*cluster.Container, error) {
task, err := newTask(config, name)
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), config)
if err != nil {
return nil, err
}
if nn, ok := c.slaves[n.ID]; ok {
container, err := nn.create(c.driver, config, name, true)
if err != nil {
return nil, err
}
go c.pendingTasks.add(task)
if container == nil {
return nil, fmt.Errorf("Container failed to create")
}
// TODO: do not store the container as it might be a wrong ContainerID
// see TODO in slave.go
//st := &state.RequestedState{
//ID: container.Id,
//Name: name,
//Config: config,
//}
return container, nil //c.store.Add(container.Id, st)
select {
case container := <-task.container:
return container, nil
case err := <-task.error:
return nil, err
case <-time.After(5 * time.Second):
c.pendingTasks.remove(true, task.TaskId.GetValue())
return nil, strategy.ErrNoResourcesAvailable
}
return nil, nil
}
func (c *Cluster) monitorTask(task *task) (bool, error) {
taskStatus := task.getStatus()
switch taskStatus.GetState() {
case mesosproto.TaskState_TASK_STAGING:
case mesosproto.TaskState_TASK_STARTING:
case mesosproto.TaskState_TASK_RUNNING:
case mesosproto.TaskState_TASK_FINISHED:
return true, nil
case mesosproto.TaskState_TASK_FAILED:
return true, errors.New(taskStatus.GetMessage())
case mesosproto.TaskState_TASK_KILLED:
return true, nil
case mesosproto.TaskState_TASK_LOST:
return true, errors.New(taskStatus.GetMessage())
case mesosproto.TaskState_TASK_ERROR:
return true, errors.New(taskStatus.GetMessage())
}
return false, nil
}
// RemoveContainer to remove containers on mesos cluster
@ -152,8 +174,8 @@ func (c *Cluster) Images() []*cluster.Image {
defer c.RUnlock()
out := []*cluster.Image{}
for _, n := range c.slaves {
out = append(out, n.Images()...)
for _, s := range c.slaves {
out = append(out, s.engine.Images()...)
}
return out
@ -168,8 +190,8 @@ func (c *Cluster) Image(IDOrName string) *cluster.Image {
c.RLock()
defer c.RUnlock()
for _, n := range c.slaves {
if image := n.Image(IDOrName); image != nil {
for _, s := range c.slaves {
if image := s.engine.Image(IDOrName); image != nil {
return image
}
}
@ -183,8 +205,13 @@ func (c *Cluster) Containers() []*cluster.Container {
defer c.RUnlock()
out := []*cluster.Container{}
for _, n := range c.slaves {
out = append(out, n.Containers()...)
for _, s := range c.slaves {
for _, container := range s.engine.Containers() {
if name := container.Config.NamespacedLabel("mesos.name"); name != "" {
container.Names = append([]string{"/" + name}, container.Names...)
}
out = append(out, container)
}
}
return out
@ -199,12 +226,61 @@ func (c *Cluster) Container(IDOrName string) *cluster.Container {
c.RLock()
defer c.RUnlock()
for _, n := range c.slaves {
if container := n.Container(IDOrName); container != nil {
containers := c.Containers()
// Match exact or short Container ID.
for _, container := range containers {
if container.Id == IDOrName || stringid.TruncateID(container.Id) == IDOrName {
return container
}
}
// Match exact Swarm ID.
for _, container := range containers {
if swarmID := container.Config.SwarmID(); swarmID == IDOrName || stringid.TruncateID(swarmID) == IDOrName {
return container
}
}
candidates := []*cluster.Container{}
// Match name, /name or engine/name.
for _, container := range containers {
for _, name := range container.Names {
if name == IDOrName || name == "/"+IDOrName || container.Engine.ID+name == IDOrName || container.Engine.Name+name == IDOrName {
return container
}
}
}
if size := len(candidates); size == 1 {
return candidates[0]
} else if size > 1 {
return nil
}
// Match Container ID prefix.
for _, container := range containers {
if strings.HasPrefix(container.Id, IDOrName) {
candidates = append(candidates, container)
}
}
// Match Swarm ID prefix.
for _, container := range containers {
if strings.HasPrefix(container.Config.SwarmID(), IDOrName) {
candidates = append(candidates, container)
}
}
if len(candidates) == 1 {
if name := candidates[0].Config.NamespacedLabel("mesos.name"); name != "" {
candidates[0].Names = append([]string{"/" + name}, candidates[0].Names...)
}
return candidates[0]
}
return nil
}
@ -214,10 +290,32 @@ func (c *Cluster) RemoveImage(image *cluster.Image) ([]*dockerclient.ImageDelete
}
// Pull will pull images on the cluster nodes
func (c *Cluster) Pull(name string, callback func(what, status string)) {
func (c *Cluster) Pull(name string, authConfig *dockerclient.AuthConfig, callback func(what, status string)) {
}
// Load images
func (c *Cluster) Load(imageReader io.Reader, callback func(what, status string)) {
}
// RenameContainer Rename a container
func (c *Cluster) RenameContainer(container *cluster.Container, newName string) error {
return errNotSupported
}
func scalarResourceValue(offers map[string]*mesosproto.Offer, name string) float64 {
var value float64
for _, offer := range offers {
for _, resource := range offer.Resources {
if *resource.Name == name {
value += *resource.Scalar.Value
}
}
}
return value
}
// listNodes returns all the nodess in the cluster.
func (c *Cluster) listNodes() []*node.Node {
c.RLock()
@ -225,46 +323,42 @@ func (c *Cluster) listNodes() []*node.Node {
out := []*node.Node{}
for _, s := range c.slaves {
out = append(out, s.toNode())
n := node.NewNode(s.engine)
n.ID = s.id
n.TotalCpus = int64(scalarResourceValue(s.offers, "cpus"))
n.UsedCpus = 0
n.TotalMemory = int64(scalarResourceValue(s.offers, "mem")) * 1024 * 1024
n.UsedMemory = 0
out = append(out, n)
}
return out
}
// listSlaves returns all the slaves in the cluster.
func (c *Cluster) listSlaves() []*slave {
c.RLock()
defer c.RUnlock()
out := []*slave{}
func (c *Cluster) listOffers() []*mesosproto.Offer {
list := []*mesosproto.Offer{}
for _, s := range c.slaves {
out = append(out, s)
for _, offer := range s.offers {
list = append(list, offer)
}
}
return out
return list
}
// Info gives minimal information about containers and resources on the mesos cluster
func (c *Cluster) Info() [][2]string {
offers := c.listOffers()
info := [][2]string{
{"\bStrategy", c.scheduler.Strategy()},
{"\bFilters", c.scheduler.Filters()},
{"\bSlaves", fmt.Sprintf("%d", len(c.slaves))},
{"\bOffers", fmt.Sprintf("%d", len(offers))},
}
slaves := c.listSlaves()
sort.Sort(SlaveSorter(slaves))
sort.Sort(offerSorter(offers))
for _, slave := range slaves {
info = append(info, [2]string{slave.Name, slave.Addr})
info = append(info, [2]string{" └ Containers", fmt.Sprintf("%d", len(slave.Containers()))})
info = append(info, [2]string{" └ Reserved CPUs", fmt.Sprintf("%d / %d", slave.UsedCpus(), slave.TotalCpus())})
info = append(info, [2]string{" └ Reserved Memory", fmt.Sprintf("%s / %s", units.BytesSize(float64(slave.UsedMemory())), units.BytesSize(float64(slave.TotalMemory())))})
info = append(info, [2]string{" └ Offers", fmt.Sprintf("%d", len(slave.offers))})
for _, offer := range slave.offers {
info = append(info, [2]string{" Offer", offer.Id.GetValue()})
for _, resource := range offer.Resources {
info = append(info, [2]string{" └ " + *resource.Name, fmt.Sprintf("%v", resource)})
}
for _, offer := range offers {
info = append(info, [2]string{" Offer", offer.Id.GetValue()})
for _, resource := range offer.Resources {
info = append(info, [2]string{" └ " + *resource.Name, fmt.Sprintf("%v", resource)})
}
}
@ -273,17 +367,17 @@ func (c *Cluster) Info() [][2]string {
// Registered method for registered mesos framework
func (c *Cluster) Registered(driver mesosscheduler.SchedulerDriver, fwID *mesosproto.FrameworkID, masterInfo *mesosproto.MasterInfo) {
log.Debugf("Swarm is registered with Mesos with framework id: %s", fwID.GetValue())
log.WithFields(log.Fields{"name": "mesos", "frameworkId": fwID.GetValue()}).Debug("Framework registered")
}
// Reregistered method for registered mesos framework
func (c *Cluster) Reregistered(mesosscheduler.SchedulerDriver, *mesosproto.MasterInfo) {
log.Debug("Swarm is re-registered with Mesos")
log.WithFields(log.Fields{"name": "mesos"}).Debug("Framework re-registered")
}
// Disconnected method
func (c *Cluster) Disconnected(mesosscheduler.SchedulerDriver) {
log.Debug("Swarm is disconnectd with Mesos")
log.WithFields(log.Fields{"name": "mesos"}).Debug("Framework disconnected")
}
// ResourceOffers method
@ -292,18 +386,51 @@ func (c *Cluster) ResourceOffers(_ mesosscheduler.SchedulerDriver, offers []*mes
for _, offer := range offers {
slaveID := offer.SlaveId.GetValue()
if slave, ok := c.slaves[slaveID]; ok {
slave.addOffer(offer)
} else {
slave := newSlave(*offer.Hostname+":"+dockerDaemonPort, c.options.OvercommitRatio, offer)
err := slave.Connect(c.options.TLSConfig)
if err != nil {
s, ok := c.slaves[slaveID]
if !ok {
engine := cluster.NewEngine(*offer.Hostname+":"+dockerDaemonPort, 0)
if err := engine.Connect(c.TLSConfig); err != nil {
log.Error(err)
} else {
c.slaves[slaveID] = slave
s = newSlave(slaveID, engine)
c.slaves[slaveID] = s
}
}
c.addOffer(offer)
}
c.pendingTasks.resourcesAdded()
}
func (c *Cluster) addOffer(offer *mesosproto.Offer) {
s, ok := c.slaves[offer.SlaveId.GetValue()]
if !ok {
return
}
s.addOffer(offer)
go func(offer *mesosproto.Offer) {
<-time.After(c.offerTimeout)
if c.removeOffer(offer) {
if _, err := c.driver.DeclineOffer(offer.Id, &mesosproto.Filters{}); err != nil {
log.WithFields(log.Fields{"name": "mesos"}).Errorf("Error while declining offer %q: %v", offer.Id.GetValue(), err)
} else {
log.WithFields(log.Fields{"name": "mesos"}).Debugf("Offer %q declined successfully", offer.Id.GetValue())
}
}
}(offer)
}
func (c *Cluster) removeOffer(offer *mesosproto.Offer) bool {
log.WithFields(log.Fields{"name": "mesos", "offerID": offer.Id.String()}).Debug("Removing offer")
s, ok := c.slaves[offer.SlaveId.GetValue()]
if !ok {
return false
}
found := s.removeOffer(offer.Id.GetValue())
if s.empty() {
// Disconnect from engine
delete(c.slaves, offer.SlaveId.GetValue())
}
return found
}
// OfferRescinded method
@ -313,11 +440,14 @@ func (c *Cluster) OfferRescinded(mesosscheduler.SchedulerDriver, *mesosproto.Off
// StatusUpdate method
func (c *Cluster) StatusUpdate(_ mesosscheduler.SchedulerDriver, taskStatus *mesosproto.TaskStatus) {
log.WithFields(log.Fields{"name": "mesos", "state": taskStatus.State.String()}).Debug("Status update")
if slave, ok := c.slaves[taskStatus.SlaveId.GetValue()]; ok {
if ch, ok := slave.statuses[taskStatus.TaskId.GetValue()]; ok {
ch <- taskStatus
}
taskID := taskStatus.TaskId.GetValue()
slaveID := taskStatus.SlaveId.GetValue()
s, ok := c.slaves[slaveID]
if !ok {
return
}
if task, ok := s.tasks[taskID]; ok {
task.sendStatus(taskStatus)
} else {
var reason = ""
if taskStatus.Reason != nil {
@ -347,17 +477,17 @@ func (c *Cluster) ExecutorLost(mesosscheduler.SchedulerDriver, *mesosproto.Execu
// Error method
func (c *Cluster) Error(d mesosscheduler.SchedulerDriver, msg string) {
log.Error(msg)
log.WithFields(log.Fields{"name": "mesos"}).Error(msg)
}
// RANDOMENGINE returns a random engine.
func (c *Cluster) RANDOMENGINE() (*cluster.Engine, error) {
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), &dockerclient.ContainerConfig{})
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), &cluster.ContainerConfig{})
if err != nil {
return nil, err
}
if n != nil {
return &c.slaves[n.ID].Engine, nil
return c.slaves[n.ID].engine, nil
}
return nil, nil
}

View File

@ -0,0 +1,23 @@
package mesos
import "github.com/mesos/mesos-go/mesosproto"
// OfferSorter implements the Sort interface to sort offers.
// It is not guaranteed to be a stable sort.
type offerSorter []*mesosproto.Offer
// Len returns the number of engines to be sorted.
func (s offerSorter) Len() int {
return len(s)
}
// Swap exchanges the engine elements with indices i and j.
func (s offerSorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
// Less reports whether the engine with index i should sort before the engine with index j.
// Offers are sorted chronologically by name.
func (s offerSorter) Less(i, j int) bool {
return s[i].Id.GetValue() < s[j].Id.GetValue()
}

View File

@ -0,0 +1,24 @@
package mesos
import (
"sort"
"testing"
"github.com/gogo/protobuf/proto"
"github.com/mesos/mesos-go/mesosproto"
"github.com/stretchr/testify/assert"
)
func TestOfferSorter(t *testing.T) {
offers := []*mesosproto.Offer{
{Id: &mesosproto.OfferID{Value: proto.String("id1")}},
{Id: &mesosproto.OfferID{Value: proto.String("id3")}},
{Id: &mesosproto.OfferID{Value: proto.String("id2")}},
}
sort.Sort(offerSorter(offers))
assert.Equal(t, offers[0].Id.GetValue(), "id1")
assert.Equal(t, offers[1].Id.GetValue(), "id2")
assert.Equal(t, offers[2].Id.GetValue(), "id3")
}

144
cluster/mesos/queue.go Normal file
View File

@ -0,0 +1,144 @@
package mesos
import (
"fmt"
"sync"
log "github.com/Sirupsen/logrus"
"github.com/mesos/mesos-go/mesosproto"
)
type queue struct {
sync.RWMutex
tasks []*task
c *Cluster
}
func (q *queue) scheduleTask(t *task) bool {
n, err := q.c.scheduler.SelectNodeForContainer(q.c.listNodes(), t.config)
if err != nil {
return false
}
s, ok := q.c.slaves[n.ID]
if !ok {
t.error <- fmt.Errorf("Unable to create on slave %q", n.ID)
return true
}
// build the offer from it's internal config and set the slaveID
t.build(n.ID)
q.c.Lock()
// TODO: Only use the offer we need
offerIds := []*mesosproto.OfferID{}
for _, offer := range q.c.slaves[n.ID].offers {
offerIds = append(offerIds, offer.Id)
}
if _, err := q.c.driver.LaunchTasks(offerIds, []*mesosproto.TaskInfo{&t.TaskInfo}, &mesosproto.Filters{}); err != nil {
// TODO: Do not erase all the offers, only the one used
for _, offer := range s.offers {
q.c.removeOffer(offer)
}
s.Unlock()
t.error <- err
return true
}
s.addTask(t)
// TODO: Do not erase all the offers, only the one used
for _, offer := range s.offers {
q.c.removeOffer(offer)
}
q.c.Unlock()
// block until we get the container
finished, err := q.c.monitorTask(t)
if err != nil {
//remove task
s.removeTask(t.TaskInfo.TaskId.GetValue())
t.error <- err
return true
}
if !finished {
go func() {
for {
finished, err := q.c.monitorTask(t)
if err != nil {
// TODO proper error message
log.Error(err)
break
}
if finished {
break
}
}
//remove task
}()
}
// Register the container immediately while waiting for a state refresh.
// Force a state refresh to pick up the newly created container.
// FIXME: unexport this method, see FIXME in engine.go
s.engine.RefreshContainers(true)
// TODO: We have to return the right container that was just created.
// Once we receive the ContainerID from the executor.
for _, container := range s.engine.Containers() {
t.container <- container
// TODO save in store
return true
}
t.error <- fmt.Errorf("Container failed to create")
return true
}
func (q *queue) add(t *task) {
q.Lock()
defer q.Unlock()
if !q.scheduleTask(t) {
q.tasks = append(q.tasks, t)
}
}
func (q *queue) remove(lock bool, taskIDs ...string) {
if lock {
q.Lock()
defer q.Unlock()
}
new := []*task{}
for _, t := range q.tasks {
found := false
for _, taskID := range taskIDs {
if t.TaskId.GetValue() == taskID {
found = true
}
}
if !found {
new = append(new, t)
}
}
q.tasks = new
}
func (q *queue) resourcesAdded() {
go q.process()
}
func (q *queue) process() {
q.Lock()
defer q.Unlock()
ids := []string{}
for _, t := range q.tasks {
if q.scheduleTask(t) {
ids = append(ids, t.TaskId.GetValue())
}
}
q.remove(false, ids...)
}

View File

@ -1,177 +1,74 @@
package mesos
import (
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
"sync"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/node"
"github.com/gogo/protobuf/proto"
"github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
mesosscheduler "github.com/mesos/mesos-go/scheduler"
"github.com/samalba/dockerclient"
)
type slave struct {
cluster.Engine
sync.RWMutex
slaveID *mesosproto.SlaveID
offers []*mesosproto.Offer
statuses map[string]chan *mesosproto.TaskStatus
id string
offers map[string]*mesosproto.Offer
tasks map[string]*task
engine *cluster.Engine
}
// NewSlave creates mesos slave agent
func newSlave(addr string, overcommitRatio float64, offer *mesosproto.Offer) *slave {
slave := &slave{Engine: *cluster.NewEngine(addr, overcommitRatio)}
slave.offers = []*mesosproto.Offer{offer}
slave.statuses = make(map[string]chan *mesosproto.TaskStatus)
slave.slaveID = offer.SlaveId
return slave
}
func (s *slave) toNode() *node.Node {
return &node.Node{
ID: s.slaveID.GetValue(),
IP: s.IP,
Addr: s.Addr,
Name: s.Name,
Cpus: s.Cpus,
Labels: s.Labels,
Containers: s.Containers(),
Images: s.Images(),
UsedMemory: s.UsedMemory(),
UsedCpus: s.UsedCpus(),
TotalMemory: s.TotalMemory(),
TotalCpus: s.TotalCpus(),
IsHealthy: s.IsHealthy(),
func newSlave(sid string, e *cluster.Engine) *slave {
return &slave{
id: sid,
offers: make(map[string]*mesosproto.Offer),
tasks: make(map[string]*task),
engine: e,
}
}
func (s *slave) addOffer(offer *mesosproto.Offer) {
s.offers = append(s.offers, offer)
s.Lock()
s.offers[offer.Id.GetValue()] = offer
s.Unlock()
}
func (s *slave) scalarResourceValue(name string) float64 {
var value float64
for _, offer := range s.offers {
for _, resource := range offer.Resources {
if *resource.Name == name {
value += *resource.Scalar.Value
}
}
}
return value
func (s *slave) addTask(task *task) {
s.tasks[task.TaskInfo.TaskId.GetValue()] = task
}
func (s *slave) UsedMemory() int64 {
return s.TotalMemory() - int64(s.scalarResourceValue("mem"))*1024*1024
func (s *slave) removeOffer(offerID string) bool {
found := false
_, found = s.offers[offerID]
if found {
delete(s.offers, offerID)
}
return found
}
func (s *slave) UsedCpus() int64 {
return s.TotalCpus() - int64(s.scalarResourceValue("cpus"))
func (s *slave) removeTask(taskID string) bool {
s.Lock()
defer s.Unlock()
fmt.Println("removing task")
found := false
_, found = s.tasks[taskID]
if found {
delete(s.tasks, taskID)
}
return found
}
func generateTaskID() (string, error) {
id := make([]byte, 6)
if _, err := rand.Read(id); err != nil {
return "", err
}
return hex.EncodeToString(id), nil
func (s *slave) empty() bool {
return len(s.offers) == 0 && len(s.tasks) == 0
}
func (s *slave) create(driver *mesosscheduler.MesosSchedulerDriver, config *dockerclient.ContainerConfig, name string, pullImage bool) (*cluster.Container, error) {
ID, err := generateTaskID()
if err != nil {
return nil, err
}
s.statuses[ID] = make(chan *mesosproto.TaskStatus)
resources := []*mesosproto.Resource{}
if cpus := config.CpuShares; cpus > 0 {
resources = append(resources, mesosutil.NewScalarResource("cpus", float64(cpus)))
}
if mem := config.Memory; mem > 0 {
resources = append(resources, mesosutil.NewScalarResource("mem", float64(mem/1024/1024)))
}
taskInfo := &mesosproto.TaskInfo{
Name: &name,
TaskId: &mesosproto.TaskID{
Value: &ID,
},
SlaveId: s.slaveID,
Resources: resources,
Command: &mesosproto.CommandInfo{},
}
if len(config.Cmd) > 0 && config.Cmd[0] != "" {
taskInfo.Command.Value = &config.Cmd[0]
}
if len(config.Cmd) > 1 {
taskInfo.Command.Arguments = config.Cmd[1:]
}
taskInfo.Container = &mesosproto.ContainerInfo{
Type: mesosproto.ContainerInfo_DOCKER.Enum(),
Docker: &mesosproto.ContainerInfo_DockerInfo{
Image: &config.Image,
},
}
taskInfo.Command.Shell = proto.Bool(false)
offerIds := []*mesosproto.OfferID{}
for _, offer := range s.offers {
offerIds = append(offerIds, offer.Id)
}
status, err := driver.LaunchTasks(offerIds, []*mesosproto.TaskInfo{taskInfo}, &mesosproto.Filters{})
if err != nil {
return nil, err
}
log.Debugf("create %v: %v", status, err)
s.offers = []*mesosproto.Offer{}
// block until we get the container
taskStatus := <-s.statuses[ID]
delete(s.statuses, ID)
switch taskStatus.GetState() {
case mesosproto.TaskState_TASK_STAGING:
case mesosproto.TaskState_TASK_STARTING:
case mesosproto.TaskState_TASK_RUNNING:
case mesosproto.TaskState_TASK_FINISHED:
case mesosproto.TaskState_TASK_FAILED:
return nil, errors.New(taskStatus.GetMessage())
case mesosproto.TaskState_TASK_KILLED:
case mesosproto.TaskState_TASK_LOST:
return nil, errors.New(taskStatus.GetMessage())
case mesosproto.TaskState_TASK_ERROR:
return nil, errors.New(taskStatus.GetMessage())
}
// Register the container immediately while waiting for a state refresh.
// Force a state refresh to pick up the newly created container.
s.RefreshContainers(true)
s.RLock()
defer s.RUnlock()
// TODO: We have to return the right container that was just created.
// Once we receive the ContainerID from the executor.
for _, container := range s.Containers() {
return container, nil
}
return nil, nil
func (s *slave) getOffers() map[string]*mesosproto.Offer {
s.Lock()
defer s.Unlock()
return s.offers
}
func (s *slave) getTasks() map[string]*task {
s.Lock()
defer s.Unlock()
return s.tasks
}

View File

@ -1,21 +0,0 @@
package mesos
// SlaveSorter implements the Sort interface to sort slaves.
// It is not guaranteed to be a stable sort.
type SlaveSorter []*slave
// Len returns the number of engines to be sorted.
func (s SlaveSorter) Len() int {
return len(s)
}
// Swap exchanges the engine elements with indices i and j.
func (s SlaveSorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
// Less reports whether the engine with index i should sort before the engine with index j.
// Slaves are sorted chronologically by name.
func (s SlaveSorter) Less(i, j int) bool {
return s[i].Name < s[j].Name
}

View File

@ -1,21 +0,0 @@
package mesos
import (
"sort"
"testing"
"github.com/docker/swarm/cluster"
"github.com/stretchr/testify/assert"
)
func TestSlaveSorter(t *testing.T) {
slaves := []*slave{{cluster.Engine{Name: "name1"}, nil, nil, nil},
{cluster.Engine{Name: "name2"}, nil, nil, nil},
{cluster.Engine{Name: "name3"}, nil, nil, nil}}
sort.Sort(SlaveSorter(slaves))
assert.Equal(t, slaves[0].Name, "name1")
assert.Equal(t, slaves[1].Name, "name2")
assert.Equal(t, slaves[2].Name, "name3")
}

85
cluster/mesos/task.go Normal file
View File

@ -0,0 +1,85 @@
package mesos
import (
"fmt"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/swarm/cluster"
"github.com/gogo/protobuf/proto"
"github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
)
type task struct {
mesosproto.TaskInfo
updates chan *mesosproto.TaskStatus
config *cluster.ContainerConfig
error chan error
container chan *cluster.Container
}
func (t *task) build(slaveID string) {
t.Command = &mesosproto.CommandInfo{Shell: proto.Bool(false)}
t.Container = &mesosproto.ContainerInfo{
Type: mesosproto.ContainerInfo_DOCKER.Enum(),
Docker: &mesosproto.ContainerInfo_DockerInfo{
Image: &t.config.Image,
},
}
if cpus := t.config.CpuShares; cpus > 0 {
t.Resources = append(t.Resources, mesosutil.NewScalarResource("cpus", float64(cpus)))
}
if mem := t.config.Memory; mem > 0 {
t.Resources = append(t.Resources, mesosutil.NewScalarResource("mem", float64(mem/1024/1024)))
}
if len(t.config.Cmd) > 0 && t.config.Cmd[0] != "" {
t.Command.Value = &t.config.Cmd[0]
}
if len(t.config.Cmd) > 1 {
t.Command.Arguments = t.config.Cmd[1:]
}
for key, value := range t.config.Labels {
t.Container.Docker.Parameters = append(t.Container.Docker.Parameters, &mesosproto.Parameter{Key: proto.String("label"), Value: proto.String(fmt.Sprintf("%s=%s", key, value))})
}
t.SlaveId = &mesosproto.SlaveID{Value: &slaveID}
}
func newTask(config *cluster.ContainerConfig, name string) (*task, error) {
// save the name in labels as the mesos containerizer will override it
config.SetNamespacedLabel("mesos.name", name)
task := task{
updates: make(chan *mesosproto.TaskStatus),
config: config,
error: make(chan error),
container: make(chan *cluster.Container),
}
ID := stringid.TruncateID(stringid.GenerateRandomID())
if name != "" {
ID = name + "." + ID
}
task.Name = &name
task.TaskId = &mesosproto.TaskID{Value: &ID}
return &task, nil
}
func (t *task) sendStatus(status *mesosproto.TaskStatus) {
t.updates <- status
}
func (t *task) getStatus() *mesosproto.TaskStatus {
return <-t.updates
}

View File

@ -0,0 +1,57 @@
# Mesos Swarm Integration Tests
Integration tests provide integrated features of Mesos Swarm tests.
While unit tests verify the code is working as expected by relying on mocks and
artificially created fixtures, integration tests actually use real docker
engines and communicate and schedule resources with Swarm through Mesos.
Note that integration tests do **not** replace unit tests.
As a rule of thumb, code should be tested thoroughly with unit tests.
Integration tests on the other hand are meant to test a specific feature end
to end.
Integration tests are written in *bash* using the
[bats](https://github.com/sstephenson/bats) framework.
## Running integration tests
Start by [installing]
(https://github.com/sstephenson/bats#installing-bats-from-source) *bats* on
your system.
In order to run all integration tests, pass *bats* the test path:
```
$ bats test/integration_mesos
```
## Writing integration tests
[helper functions]
(https://github.com/docker/swarm/blob/master/test/integration/helpers.bash)
are provided in order to facilitate writing tests.
```sh
#!/usr/bin/env bats
# This will load the helpers.
load helpers
@test "this is a simple test" {
# Unlike swarm integration tests, Mesos Swarm will start docker on Mesos nodes.
# swarm_mesos_manage will start the swarm manager preconfigured to run on a
# Mesos cluster:
swarm_manage
# You can talk with said manager by using the docker_swarm helper:
run docker_swarm info
[ "$status" -eq 0 ]
}
# teardown is called at the end of every test.
function teardown() {
# This will stop the swarm manager:
swarm_mesos_manage_cleanup
}
```

View File

@ -0,0 +1,16 @@
#!/usr/bin/env bats
load helpers
function teardown() {
swarm_manage_cleanup
}
@test "docker info" {
swarm_manage
run docker_swarm info
[ "$status" -eq 0 ]
[[ "${output}" == *'Offers'* ]]
}

View File

@ -0,0 +1,41 @@
#!/bin/bash
# Root directory of the repository.
SWARM_ROOT=${SWARM_ROOT:-${BATS_TEST_DIRNAME}/../..}
# Host on which the manager will listen to (random port between 6000 and 7000).
SWARM_HOST=127.0.0.1:$(( ( RANDOM % 1000 ) + 6000 ))
MESOS_CLUSTER_ENTRYPOINT=${MESOS_CLUSTER_ENTRYPOINT:-0.0.0.0:5050}
# Run the swarm binary.
function swarm() {
godep go run "${SWARM_ROOT}/main.go" "$@"
}
# Waits until the given docker engine API becomes reachable.
function wait_until_reachable() {
local attempts=0
local max_attempts=5
until docker -H $1 info || [ $attempts -ge $max_attempts ]; do
echo "Attempt to connect to $1 failed for the $((++attempts)) time" >&2
sleep 0.5
done
[[ $attempts -lt $max_attempts ]]
}
# Run the docker CLI against swarm through Mesos.
function docker_swarm() {
docker -H $SWARM_HOST "$@"
}
function swarm_manage() {
${SWARM_ROOT}/swarm manage -c mesos -H $SWARM_HOST $MESOS_CLUSTER_ENTRYPOINT &
SWARM_PID=$!
wait_until_reachable $SWARM_HOST
}
function swarm_manage_cleanup() {
kill $SWARM_PID
}