docs/cluster/mesos/cluster.go

482 lines
12 KiB
Go

package mesos
import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"sort"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/cluster/mesos/queue"
"github.com/docker/swarm/scheduler"
"github.com/docker/swarm/scheduler/node"
"github.com/docker/swarm/scheduler/strategy"
"github.com/docker/swarm/state"
"github.com/gogo/protobuf/proto"
"github.com/mesos/mesos-go/mesosproto"
mesosscheduler "github.com/mesos/mesos-go/scheduler"
"github.com/samalba/dockerclient"
)
// Cluster struct for mesos
type Cluster struct {
sync.RWMutex
driver *mesosscheduler.MesosSchedulerDriver
dockerEnginePort string
eventHandler cluster.EventHandler
master string
slaves map[string]*slave
scheduler *scheduler.Scheduler
store *state.Store
TLSConfig *tls.Config
options *cluster.DriverOpts
offerTimeout time.Duration
pendingTasks *queue.Queue
}
const (
frameworkName = "swarm"
defaultDockerEnginePort = "2375"
defaultDockerEngineTLSPort = "2376"
defaultOfferTimeout = 10 * time.Minute
taskCreationTimeout = 5 * time.Second
)
var (
errNotSupported = errors.New("not supported with mesos")
)
// NewCluster for mesos Cluster creation
func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *tls.Config, master string, options cluster.DriverOpts) (cluster.Cluster, error) {
log.WithFields(log.Fields{"name": "mesos"}).Debug("Initializing cluster")
cluster := &Cluster{
dockerEnginePort: defaultDockerEnginePort,
master: master,
slaves: make(map[string]*slave),
scheduler: scheduler,
store: store,
TLSConfig: TLSConfig,
options: &options,
offerTimeout: defaultOfferTimeout,
}
cluster.pendingTasks = queue.NewQueue()
// Empty string is accepted by the scheduler.
user, _ := options.String("mesos.user", "SWARM_MESOS_USER")
driverConfig := mesosscheduler.DriverConfig{
Scheduler: cluster,
Framework: &mesosproto.FrameworkInfo{Name: proto.String(frameworkName), User: &user},
Master: cluster.master,
}
// Changing port for https
if cluster.TLSConfig != nil {
cluster.dockerEnginePort = defaultDockerEngineTLSPort
}
if bindingPort, ok := options.Uint("mesos.port", "SWARM_MESOS_PORT"); ok {
driverConfig.BindingPort = uint16(bindingPort)
}
if bindingAddress, ok := options.IP("mesos.address", "SWARM_MESOS_ADDRESS"); ok {
if bindingAddress == nil {
return nil, fmt.Errorf("invalid address %s", bindingAddress)
}
driverConfig.BindingAddress = bindingAddress
}
if offerTimeout, ok := options.String("mesos.offertimeout", "SWARM_MESOS_OFFER_TIMEOUT"); ok {
d, err := time.ParseDuration(offerTimeout)
if err != nil {
return nil, err
}
cluster.offerTimeout = d
}
driver, err := mesosscheduler.NewMesosSchedulerDriver(driverConfig)
if err != nil {
return nil, err
}
cluster.driver = driver
status, err := driver.Start()
if err != nil {
log.Debugf("Mesos driver started, status/err %v: %v", status, err)
return nil, err
}
log.Debugf("Mesos driver started, status %v", status)
return cluster, nil
}
// RegisterEventHandler registers an event handler.
func (c *Cluster) RegisterEventHandler(h cluster.EventHandler) error {
if c.eventHandler != nil {
return errors.New("event handler already set")
}
c.eventHandler = h
return nil
}
// CreateContainer for container creation in Mesos task
func (c *Cluster) CreateContainer(config *cluster.ContainerConfig, name string) (*cluster.Container, error) {
task, err := newTask(c, config, name)
if err != nil {
return nil, err
}
go c.pendingTasks.Add(task)
select {
case container := <-task.container:
return formatContainer(container), nil
case err := <-task.error:
return nil, err
case <-time.After(taskCreationTimeout):
c.pendingTasks.Remove(task)
return nil, strategy.ErrNoResourcesAvailable
}
}
// RemoveContainer to remove containers on mesos cluster
func (c *Cluster) RemoveContainer(container *cluster.Container, force bool) error {
c.scheduler.Lock()
defer c.scheduler.Unlock()
return container.Engine.RemoveContainer(container, force)
}
// Images returns all the images in the cluster.
func (c *Cluster) Images() []*cluster.Image {
c.RLock()
defer c.RUnlock()
out := []*cluster.Image{}
for _, s := range c.slaves {
out = append(out, s.engine.Images()...)
}
return out
}
// Image returns an image with IdOrName in the cluster
func (c *Cluster) Image(IDOrName string) *cluster.Image {
// Abort immediately if the name is empty.
if len(IDOrName) == 0 {
return nil
}
c.RLock()
defer c.RUnlock()
for _, s := range c.slaves {
if image := s.engine.Image(IDOrName); image != nil {
return image
}
}
return nil
}
// RemoveImages removes images from the cluster
func (c *Cluster) RemoveImages(name string) ([]*dockerclient.ImageDelete, error) {
return nil, errNotSupported
}
func formatContainer(container *cluster.Container) *cluster.Container {
if container == nil {
return nil
}
if name := container.Config.Labels[cluster.SwarmLabelNamespace+".mesos.name"]; name != "" && container.Names[0] != "/"+name {
container.Names = append([]string{"/" + name}, container.Names...)
}
return container
}
// Containers returns all the containers in the cluster.
func (c *Cluster) Containers() cluster.Containers {
c.RLock()
defer c.RUnlock()
out := cluster.Containers{}
for _, s := range c.slaves {
for _, container := range s.engine.Containers() {
out = append(out, formatContainer(container))
}
}
return out
}
// Container returns the container with IdOrName in the cluster
func (c *Cluster) Container(IDOrName string) *cluster.Container {
// Abort immediately if the name is empty.
if len(IDOrName) == 0 {
return nil
}
c.RLock()
defer c.RUnlock()
return formatContainer(cluster.Containers(c.Containers()).Get(IDOrName))
}
// RemoveImage removes an image from the cluster
func (c *Cluster) RemoveImage(image *cluster.Image) ([]*dockerclient.ImageDelete, error) {
return nil, errNotSupported
}
// Pull will pull images on the cluster nodes
func (c *Cluster) Pull(name string, authConfig *dockerclient.AuthConfig, callback func(what, status string)) {
}
// Load images
func (c *Cluster) Load(imageReader io.Reader, callback func(what, status string)) {
}
// Import image
func (c *Cluster) Import(source string, repository string, tag string, imageReader io.Reader, callback func(what, status string)) {
}
// RenameContainer Rename a container
func (c *Cluster) RenameContainer(container *cluster.Container, newName string) error {
//FIXME this doesn't work as the next refreshcontainer will erase this change (this change is in-memory only)
container.Config.Labels[cluster.SwarmLabelNamespace+".mesos.name"] = newName
return nil
}
func scalarResourceValue(offers map[string]*mesosproto.Offer, name string) float64 {
var value float64
for _, offer := range offers {
for _, resource := range offer.Resources {
if *resource.Name == name {
value += *resource.Scalar.Value
}
}
}
return value
}
// listNodes returns all the nodess in the cluster.
func (c *Cluster) listNodes() []*node.Node {
c.RLock()
defer c.RUnlock()
out := []*node.Node{}
for _, s := range c.slaves {
n := node.NewNode(s.engine)
n.ID = s.id
n.TotalCpus = int64(scalarResourceValue(s.offers, "cpus"))
n.UsedCpus = 0
n.TotalMemory = int64(scalarResourceValue(s.offers, "mem")) * 1024 * 1024
n.UsedMemory = 0
out = append(out, n)
}
return out
}
func (c *Cluster) listOffers() []*mesosproto.Offer {
c.RLock()
defer c.RUnlock()
list := []*mesosproto.Offer{}
for _, s := range c.slaves {
for _, offer := range s.offers {
list = append(list, offer)
}
}
return list
}
// TotalMemory return the total memory of the cluster
func (c *Cluster) TotalMemory() int64 {
// TODO: use current offers
return 0
}
// TotalCpus return the total memory of the cluster
func (c *Cluster) TotalCpus() int64 {
// TODO: use current offers
return 0
}
// Info gives minimal information about containers and resources on the mesos cluster
func (c *Cluster) Info() [][]string {
offers := c.listOffers()
info := [][]string{
{"\bStrategy", c.scheduler.Strategy()},
{"\bFilters", c.scheduler.Filters()},
{"\bOffers", fmt.Sprintf("%d", len(offers))},
}
sort.Sort(offerSorter(offers))
for _, offer := range offers {
info = append(info, []string{" Offer", offer.Id.GetValue()})
for _, resource := range offer.Resources {
info = append(info, []string{" └ " + *resource.Name, fmt.Sprintf("%v", resource)})
}
}
return info
}
func (c *Cluster) addOffer(offer *mesosproto.Offer) {
s, ok := c.slaves[offer.SlaveId.GetValue()]
if !ok {
return
}
s.addOffer(offer)
go func(offer *mesosproto.Offer) {
time.Sleep(c.offerTimeout)
// declining Mesos offers to make them available to other Mesos services
if c.removeOffer(offer) {
if _, err := c.driver.DeclineOffer(offer.Id, &mesosproto.Filters{}); err != nil {
log.WithFields(log.Fields{"name": "mesos"}).Errorf("Error while declining offer %q: %v", offer.Id.GetValue(), err)
} else {
log.WithFields(log.Fields{"name": "mesos"}).Debugf("Offer %q declined successfully", offer.Id.GetValue())
}
}
}(offer)
}
func (c *Cluster) removeOffer(offer *mesosproto.Offer) bool {
log.WithFields(log.Fields{"name": "mesos", "offerID": offer.Id.String()}).Debug("Removing offer")
s, ok := c.slaves[offer.SlaveId.GetValue()]
if !ok {
return false
}
found := s.removeOffer(offer.Id.GetValue())
if s.empty() {
// Disconnect from engine
delete(c.slaves, offer.SlaveId.GetValue())
}
return found
}
func (c *Cluster) scheduleTask(t *task) bool {
c.scheduler.Lock()
defer c.scheduler.Unlock()
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), t.config)
if err != nil {
return false
}
s, ok := c.slaves[n.ID]
if !ok {
t.error <- fmt.Errorf("Unable to create on slave %q", n.ID)
return true
}
// build the offer from it's internal config and set the slaveID
t.build(n.ID)
c.Lock()
// TODO: Only use the offer we need
offerIDs := []*mesosproto.OfferID{}
for _, offer := range c.slaves[n.ID].offers {
offerIDs = append(offerIDs, offer.Id)
}
if _, err := c.driver.LaunchTasks(offerIDs, []*mesosproto.TaskInfo{&t.TaskInfo}, &mesosproto.Filters{}); err != nil {
// TODO: Do not erase all the offers, only the one used
for _, offer := range s.offers {
c.removeOffer(offer)
}
s.Unlock()
t.error <- err
return true
}
s.addTask(t)
// TODO: Do not erase all the offers, only the one used
for _, offer := range s.offers {
c.removeOffer(offer)
}
c.Unlock()
// block until we get the container
finished, data, err := t.monitor()
taskID := t.TaskInfo.TaskId.GetValue()
if err != nil {
//remove task
s.removeTask(taskID)
t.error <- err
return true
}
if !finished {
go func() {
for {
finished, _, err := t.monitor()
if err != nil {
// TODO do a better log by sending proper error message
log.Error(err)
break
}
if finished {
break
}
}
//remove the task once it's finished
}()
}
// Register the container immediately while waiting for a state refresh.
// In mesos 0.23+ the docker inspect will be sent back in the taskStatus.Data
// We can use this to find the right container.
inspect := []dockerclient.ContainerInfo{}
if data != nil && json.Unmarshal(data, &inspect) != nil && len(inspect) == 1 {
container := &cluster.Container{Container: dockerclient.Container{Id: inspect[0].Id}, Engine: s.engine}
if container.Refresh() == nil {
t.container <- container
return true
}
}
log.Debug("Cannot parse docker info from task status, please upgrade Mesos to the last version")
// For mesos <= 0.22 we fallback to a full refresh + using labels
// TODO: once 0.23 or 0.24 is released, remove all this block of code as it
// doesn't scale very well.
s.engine.RefreshContainers(true)
for _, container := range s.engine.Containers() {
if container.Config.Labels[cluster.SwarmLabelNamespace+".mesos.task"] == taskID {
t.container <- container
return true
}
}
t.error <- fmt.Errorf("Container failed to create")
return true
}
// RANDOMENGINE returns a random engine.
func (c *Cluster) RANDOMENGINE() (*cluster.Engine, error) {
c.RLock()
defer c.RUnlock()
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), &cluster.ContainerConfig{})
if err != nil {
return nil, err
}
if n != nil {
return c.slaves[n.ID].engine, nil
}
return nil, nil
}