scheduler now returns the list of ranked nodes rather than the top node.

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2015-10-07 18:21:02 -07:00
parent 267d7e6701
commit b2b32d979d
10 changed files with 99 additions and 128 deletions

View File

@ -427,10 +427,11 @@ func (c *Cluster) scheduleTask(t *task) bool {
c.scheduler.Lock()
defer c.scheduler.Unlock()
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), t.config)
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), t.config)
if err != nil {
return false
}
n := nodes[0]
s, ok := c.slaves[n.ID]
if !ok {
t.error <- fmt.Errorf("Unable to create on slave %q", n.ID)
@ -526,15 +527,13 @@ func (c *Cluster) RANDOMENGINE() (*cluster.Engine, error) {
c.RLock()
defer c.RUnlock()
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), &cluster.ContainerConfig{})
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), &cluster.ContainerConfig{})
if err != nil {
return nil, err
}
if n != nil {
n := nodes[0]
return c.slaves[n.ID].engine, nil
}
return nil, nil
}
// BuildImage build an image
func (c *Cluster) BuildImage(buildImage *dockerclient.BuildImage, out io.Writer) error {
@ -545,11 +544,12 @@ func (c *Cluster) BuildImage(buildImage *dockerclient.BuildImage, out io.Writer)
CpuShares: buildImage.CpuShares,
Memory: buildImage.Memory,
}}
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), config)
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), config)
c.scheduler.Unlock()
if err != nil {
return err
}
n := nodes[0]
reader, err := c.slaves[n.ID].engine.BuildImage(buildImage)
if err != nil {

View File

@ -140,11 +140,12 @@ func (c *Cluster) createContainer(config *cluster.ContainerConfig, name string,
configTemp.AddAffinity("image==~" + config.Image)
}
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), configTemp)
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), configTemp)
if err != nil {
c.scheduler.Unlock()
return nil, err
}
n := nodes[0]
engine, ok := c.engines[n.ID]
if !ok {
c.scheduler.Unlock()
@ -684,14 +685,11 @@ func (c *Cluster) Info() [][]string {
// RANDOMENGINE returns a random engine.
func (c *Cluster) RANDOMENGINE() (*cluster.Engine, error) {
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), &cluster.ContainerConfig{})
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), &cluster.ContainerConfig{})
if err != nil {
return nil, err
}
if n != nil {
return c.engines[n.ID], nil
}
return nil, nil
return c.engines[nodes[0].ID], nil
}
// RenameContainer rename a container
@ -718,11 +716,12 @@ func (c *Cluster) BuildImage(buildImage *dockerclient.BuildImage, out io.Writer)
CpuShares: buildImage.CpuShares,
Memory: buildImage.Memory,
}}
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), config)
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), config)
c.scheduler.Unlock()
if err != nil {
return err
}
n := nodes[0]
reader, err := c.engines[n.ID].BuildImage(buildImage)
if err != nil {

View File

@ -1,6 +1,7 @@
package scheduler
import (
"errors"
"strings"
"sync"
@ -10,6 +11,10 @@ import (
"github.com/docker/swarm/scheduler/strategy"
)
var (
errNoNodeAvailable = errors.New("No nodes available in the cluster")
)
// Scheduler is exported
type Scheduler struct {
sync.Mutex
@ -26,14 +31,19 @@ func New(strategy strategy.PlacementStrategy, filters []filter.Filter) *Schedule
}
}
// SelectNodeForContainer will find a nice home for our container.
func (s *Scheduler) SelectNodeForContainer(nodes []*node.Node, config *cluster.ContainerConfig) (*node.Node, error) {
// SelectNodesForContainer will return a list of nodes where the container can
// be scheduled, sorted by order or preference.
func (s *Scheduler) SelectNodesForContainer(nodes []*node.Node, config *cluster.ContainerConfig) ([]*node.Node, error) {
accepted, err := filter.ApplyFilters(s.filters, config, nodes)
if err != nil {
return nil, err
}
return s.strategy.PlaceContainer(config, accepted)
if len(accepted) == 0 {
return nil, errNoNodeAvailable
}
return s.strategy.RankAndSort(config, accepted)
}
// Strategy returns the strategy name

View File

@ -21,25 +21,17 @@ func (p *BinpackPlacementStrategy) Name() string {
return "binpack"
}
// PlaceContainer places a container on the node with the most running containers.
func (p *BinpackPlacementStrategy) PlaceContainer(config *cluster.ContainerConfig, nodes []*node.Node) (*node.Node, error) {
// RankAndSort sorts nodes based on the binpack strategy applied to the container config.
func (p *BinpackPlacementStrategy) RankAndSort(config *cluster.ContainerConfig, nodes []*node.Node) ([]*node.Node, error) {
weightedNodes, err := weighNodes(config, nodes)
if err != nil {
return nil, err
}
// sort by highest weight
sort.Sort(sort.Reverse(weightedNodes))
topNode := weightedNodes[0]
for _, node := range weightedNodes {
if node.Weight != topNode.Weight {
break
output := make([]*node.Node, len(weightedNodes))
for i, n := range weightedNodes {
output[i] = n.Node
}
if len(node.Node.Containers) > len(topNode.Node.Containers) {
topNode = node
}
}
return topNode.Node, nil
return output, nil
}

View File

@ -34,6 +34,12 @@ func createContainer(ID string, config *cluster.ContainerConfig) *cluster.Contai
}
}
func selectTopNode(t *testing.T, s PlacementStrategy, config *cluster.ContainerConfig, nodes []*node.Node) *node.Node {
n, err := s.RankAndSort(config, nodes)
assert.NoError(t, err)
return n[0]
}
func TestPlaceEqualWeight(t *testing.T) {
s := &BinpackPlacementStrategy{}
@ -55,8 +61,7 @@ func TestPlaceEqualWeight(t *testing.T) {
// add another container 1G
config = createConfig(1, 0)
node, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node := selectTopNode(t, s, config, nodes)
assert.NoError(t, node.AddContainer(createContainer("c4", config)))
assert.Equal(t, node.UsedMemory, int64(3*1024*1024*1024))
@ -76,15 +81,13 @@ func TestPlaceContainerMemory(t *testing.T) {
// add 1 container 1G
config := createConfig(1, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node1 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
assert.Equal(t, node1.UsedMemory, int64(1024*1024*1024))
// add another container 1G
config = createConfig(1, 0)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node2 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
assert.Equal(t, node2.UsedMemory, int64(2*1024*1024*1024))
@ -103,15 +106,13 @@ func TestPlaceContainerCPU(t *testing.T) {
// add 1 container 1CPU
config := createConfig(0, 1)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node1 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
assert.Equal(t, node1.UsedCpus, int64(1))
// add another container 1CPU
config = createConfig(0, 1)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node2 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
assert.Equal(t, node2.UsedCpus, int64(2))
@ -130,24 +131,22 @@ func TestPlaceContainerHuge(t *testing.T) {
// add 100 container 1CPU
for i := 0; i < 100; i++ {
node, err := s.PlaceContainer(createConfig(0, 1), nodes)
assert.NoError(t, err)
node := selectTopNode(t, s, createConfig(0, 1), nodes)
assert.NoError(t, node.AddContainer(createContainer(fmt.Sprintf("c%d", i), createConfig(0, 1))))
}
// try to add another container 1CPU
_, err := s.PlaceContainer(createConfig(0, 1), nodes)
_, err := s.RankAndSort(createConfig(0, 1), nodes)
assert.Error(t, err)
// add 100 container 1G
for i := 100; i < 200; i++ {
node, err := s.PlaceContainer(createConfig(1, 0), nodes)
assert.NoError(t, err)
node := selectTopNode(t, s, createConfig(1, 0), nodes)
assert.NoError(t, node.AddContainer(createContainer(fmt.Sprintf("c%d", i), createConfig(1, 0))))
}
// try to add another container 1G
_, err = s.PlaceContainer(createConfig(1, 0), nodes)
_, err = s.RankAndSort(createConfig(1, 0), nodes)
assert.Error(t, err)
}
@ -161,25 +160,22 @@ func TestPlaceContainerOvercommit(t *testing.T) {
// Below limit should still work.
config.Memory = 90 * 1024 * 1024 * 1024
node, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node := selectTopNode(t, s, config, nodes)
assert.Equal(t, node, nodes[0])
// At memory limit should still work.
config.Memory = 100 * 1024 * 1024 * 1024
node, err = s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node = selectTopNode(t, s, config, nodes)
assert.Equal(t, node, nodes[0])
// Up to 105% it should still work.
config.Memory = 105 * 1024 * 1024 * 1024
node, err = s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node = selectTopNode(t, s, config, nodes)
assert.Equal(t, node, nodes[0])
// Above it should return an error.
config.Memory = 106 * 1024 * 1024 * 1024
node, err = s.PlaceContainer(config, nodes)
_, err = s.RankAndSort(config, nodes)
assert.Error(t, err)
}
@ -194,20 +190,18 @@ func TestPlaceContainerDemo(t *testing.T) {
// try to place a 10G container
config := createConfig(10, 0)
_, err := s.PlaceContainer(config, nodes)
_, err := s.RankAndSort(config, nodes)
// check that it refuses because the cluster is full
assert.Error(t, err)
// add one container 1G
config = createConfig(1, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node1 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
// add another container 1G
config = createConfig(1, 0)
node1bis, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node1bis := selectTopNode(t, s, config, nodes)
assert.NoError(t, node1bis.AddContainer(createContainer("c2", config)))
// check that both containers ended on the same node
@ -216,8 +210,7 @@ func TestPlaceContainerDemo(t *testing.T) {
// add another container 2G
config = createConfig(2, 0)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node2 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node2.AddContainer(createContainer("c3", config)))
// check that it ends up on another node
@ -225,8 +218,7 @@ func TestPlaceContainerDemo(t *testing.T) {
// add another container 1G
config = createConfig(1, 0)
node3, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node3 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node3.AddContainer(createContainer("c4", config)))
// check that it ends up on another node
@ -235,8 +227,7 @@ func TestPlaceContainerDemo(t *testing.T) {
// add another container 1G
config = createConfig(1, 0)
node3bis, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node3bis := selectTopNode(t, s, config, nodes)
assert.NoError(t, node3bis.AddContainer(createContainer("c5", config)))
// check that it ends up on the same node
@ -244,7 +235,7 @@ func TestPlaceContainerDemo(t *testing.T) {
// try to add another container
config = createConfig(1, 0)
_, err = s.PlaceContainer(config, nodes)
_, err = s.RankAndSort(config, nodes)
// check that it refuses because the cluster is full
assert.Error(t, err)
@ -256,8 +247,7 @@ func TestPlaceContainerDemo(t *testing.T) {
// add another container
config = createConfig(1, 0)
node2bis, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node2bis := selectTopNode(t, s, config, nodes)
assert.NoError(t, node2bis.AddContainer(createContainer("c6", config)))
// check it ends up on `node3`
@ -275,14 +265,12 @@ func TestComplexPlacement(t *testing.T) {
// add one container 2G
config := createConfig(2, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node1 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
// add one container 3G
config = createConfig(3, 0)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node2 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
// check that they end up on separate nodes
@ -290,8 +278,7 @@ func TestComplexPlacement(t *testing.T) {
// add one container 1G
config = createConfig(1, 0)
node3, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node3 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node3.AddContainer(createContainer("c3", config)))
// check that it ends up on the same node as the 3G

View File

@ -1,7 +1,6 @@
package strategy
import (
"errors"
"math/rand"
"time"
@ -25,11 +24,11 @@ func (p *RandomPlacementStrategy) Name() string {
return "random"
}
// PlaceContainer places the container on a random node in the cluster.
func (p *RandomPlacementStrategy) PlaceContainer(config *cluster.ContainerConfig, nodes []*node.Node) (*node.Node, error) {
if size := len(nodes); size > 0 {
return nodes[p.r.Intn(size)], nil
// RankAndSort randomly sorts the list of nodes.
func (p *RandomPlacementStrategy) RankAndSort(config *cluster.ContainerConfig, nodes []*node.Node) ([]*node.Node, error) {
for i := len(nodes) - 1; i > 0; i-- {
j := p.r.Intn(i + 1)
nodes[i], nodes[j] = nodes[j], nodes[i]
}
return nil, errors.New("No nodes running in the cluster")
return nodes, nil
}

View File

@ -21,25 +21,17 @@ func (p *SpreadPlacementStrategy) Name() string {
return "spread"
}
// PlaceContainer places a container on the node with the fewest running containers.
func (p *SpreadPlacementStrategy) PlaceContainer(config *cluster.ContainerConfig, nodes []*node.Node) (*node.Node, error) {
// RankAndSort sorts nodes based on the spread strategy applied to the container config.
func (p *SpreadPlacementStrategy) RankAndSort(config *cluster.ContainerConfig, nodes []*node.Node) ([]*node.Node, error) {
weightedNodes, err := weighNodes(config, nodes)
if err != nil {
return nil, err
}
// sort by lowest weight
sort.Sort(weightedNodes)
bottomNode := weightedNodes[0]
for _, node := range weightedNodes {
if node.Weight != bottomNode.Weight {
break
output := make([]*node.Node, len(weightedNodes))
for i, n := range weightedNodes {
output[i] = n.Node
}
if len(node.Node.Containers) < len(bottomNode.Node.Containers) {
bottomNode = node
}
}
return bottomNode.Node, nil
return output, nil
}

View File

@ -29,8 +29,7 @@ func TestSpreadPlaceEqualWeight(t *testing.T) {
// add another container 1G
config = createConfig(1, 0)
node, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node := selectTopNode(t, s, config, nodes)
assert.NoError(t, node.AddContainer(createContainer("c4", config)))
assert.Equal(t, node.UsedMemory, int64(3*1024*1024*1024))
@ -50,15 +49,13 @@ func TestSpreadPlaceContainerMemory(t *testing.T) {
// add 1 container 1G
config := createConfig(1, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node1 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
assert.Equal(t, node1.UsedMemory, int64(1024*1024*1024))
// add another container 1G
config = createConfig(1, 0)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node2 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
assert.Equal(t, node2.UsedMemory, int64(1024*1024*1024))
@ -77,15 +74,13 @@ func TestSpreadPlaceContainerCPU(t *testing.T) {
// add 1 container 1CPU
config := createConfig(0, 1)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node1 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
assert.Equal(t, node1.UsedCpus, int64(1))
// add another container 1CPU
config = createConfig(0, 1)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node2 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
assert.Equal(t, node2.UsedCpus, int64(1))
@ -104,24 +99,22 @@ func TestSpreadPlaceContainerHuge(t *testing.T) {
// add 100 container 1CPU
for i := 0; i < 100; i++ {
node, err := s.PlaceContainer(createConfig(0, 1), nodes)
assert.NoError(t, err)
node := selectTopNode(t, s, createConfig(0, 1), nodes)
assert.NoError(t, node.AddContainer(createContainer(fmt.Sprintf("c%d", i), createConfig(0, 1))))
}
// try to add another container 1CPU
_, err := s.PlaceContainer(createConfig(0, 1), nodes)
_, err := s.RankAndSort(createConfig(0, 1), nodes)
assert.Error(t, err)
// add 100 container 1G
for i := 100; i < 200; i++ {
node, err := s.PlaceContainer(createConfig(1, 0), nodes)
assert.NoError(t, err)
node := selectTopNode(t, s, createConfig(1, 0), nodes)
assert.NoError(t, node.AddContainer(createContainer(fmt.Sprintf("c%d", i), createConfig(1, 0))))
}
// try to add another container 1G
_, err = s.PlaceContainer(createConfig(1, 0), nodes)
_, err = s.RankAndSort(createConfig(1, 0), nodes)
assert.Error(t, err)
}
@ -134,25 +127,22 @@ func TestSpreadPlaceContainerOvercommit(t *testing.T) {
// Below limit should still work.
config.Memory = 90 * 1024 * 1024 * 1024
node, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node := selectTopNode(t, s, config, nodes)
assert.Equal(t, node, nodes[0])
// At memory limit should still work.
config.Memory = 100 * 1024 * 1024 * 1024
node, err = s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node = selectTopNode(t, s, config, nodes)
assert.Equal(t, node, nodes[0])
// Up to 105% it should still work.
config.Memory = 105 * 1024 * 1024 * 1024
node, err = s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node = selectTopNode(t, s, config, nodes)
assert.Equal(t, node, nodes[0])
// Above it should return an error.
config.Memory = 106 * 1024 * 1024 * 1024
node, err = s.PlaceContainer(config, nodes)
_, err := s.RankAndSort(config, nodes)
assert.Error(t, err)
}
@ -166,14 +156,12 @@ func TestSpreadComplexPlacement(t *testing.T) {
// add one container 2G
config := createConfig(2, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node1 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
// add one container 3G
config = createConfig(3, 0)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node2 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
// check that they end up on separate nodes
@ -181,8 +169,7 @@ func TestSpreadComplexPlacement(t *testing.T) {
// add one container 1G
config = createConfig(1, 0)
node3, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node3 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node3.AddContainer(createContainer("c3", config)))
// check that it ends up on the same node as the 2G

View File

@ -16,10 +16,11 @@ type PlacementStrategy interface {
// an error if one is encountered.
// If no initial configuration is needed, this may be a no-op and return a nil error.
Initialize() error
// Given a container configuration and a set of nodes, select the target
// node where the container should be scheduled. PlaceContainer returns
// an error if there is no available node on which to schedule the container.
PlaceContainer(config *cluster.ContainerConfig, nodes []*node.Node) (*node.Node, error)
// RankAndSort applies the strategy to a list of nodes and ranks them based
// on the best fit given the container configuration. It returns a sorted
// list of nodes (based on their ranks) or an error if there is no
// available node on which to schedule the container.
RankAndSort(config *cluster.ContainerConfig, nodes []*node.Node) ([]*node.Node, error)
}
var (

View File

@ -29,6 +29,10 @@ func (n weightedNodeList) Less(i, j int) bool {
jp = n[j]
)
// If the nodes have the same weight sort them out by number of containers.
if ip.Weight == jp.Weight {
return len(ip.Node.Containers) < len(jp.Node.Containers)
}
return ip.Weight < jp.Weight
}