Merge pull request #1261 from aluzzardi/parallel-scheduling

Parallel scheduling
This commit is contained in:
Victor Vieux 2015-10-09 12:57:42 -07:00
commit 267d7e6701
3 changed files with 137 additions and 79 deletions

View File

@ -20,14 +20,35 @@ import (
"github.com/samalba/dockerclient" "github.com/samalba/dockerclient"
) )
type pendingContainer struct {
Config *cluster.ContainerConfig
Name string
Engine *cluster.Engine
}
func (p *pendingContainer) ToContainer() *cluster.Container {
container := &cluster.Container{
Container: dockerclient.Container{},
Config: p.Config,
Engine: p.Engine,
}
if p.Name != "" {
container.Container.Names = []string{"/" + p.Name}
}
return container
}
// Cluster is exported // Cluster is exported
type Cluster struct { type Cluster struct {
sync.RWMutex sync.RWMutex
eventHandler cluster.EventHandler eventHandler cluster.EventHandler
engines map[string]*cluster.Engine engines map[string]*cluster.Engine
scheduler *scheduler.Scheduler scheduler *scheduler.Scheduler
discovery discovery.Discovery discovery discovery.Discovery
pendingContainers map[string]*pendingContainer
overcommitRatio float64 overcommitRatio float64
TLSConfig *tls.Config TLSConfig *tls.Config
@ -38,11 +59,12 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery
log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster") log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster")
cluster := &Cluster{ cluster := &Cluster{
engines: make(map[string]*cluster.Engine), engines: make(map[string]*cluster.Engine),
scheduler: scheduler, scheduler: scheduler,
TLSConfig: TLSConfig, TLSConfig: TLSConfig,
discovery: discovery, discovery: discovery,
overcommitRatio: 0.05, pendingContainers: make(map[string]*pendingContainer),
overcommitRatio: 0.05,
} }
if val, ok := options.Float("swarm.overcommit", ""); ok { if val, ok := options.Float("swarm.overcommit", ""); ok {
@ -102,15 +124,16 @@ func (c *Cluster) CreateContainer(config *cluster.ContainerConfig, name string)
func (c *Cluster) createContainer(config *cluster.ContainerConfig, name string, withSoftImageAffinity bool) (*cluster.Container, error) { func (c *Cluster) createContainer(config *cluster.ContainerConfig, name string, withSoftImageAffinity bool) (*cluster.Container, error) {
c.scheduler.Lock() c.scheduler.Lock()
defer c.scheduler.Unlock()
// Ensure the name is available // Ensure the name is available
if cID := c.getIDFromName(name); cID != "" { if !c.checkNameUniqueness(name) {
return nil, fmt.Errorf("Conflict, The name %s is already assigned to %s. You have to delete (or rename) that container to be able to assign %s to a container again.", name, cID, name) c.scheduler.Unlock()
return nil, fmt.Errorf("Conflict: The name %s is already assigned. You have to delete (or rename) that container to be able to assign %s to a container again.", name, name)
} }
// Associate a Swarm ID to the container we are creating. // Associate a Swarm ID to the container we are creating.
config.SetSwarmID(c.generateUniqueID()) swarmID := c.generateUniqueID()
config.SetSwarmID(swarmID)
configTemp := config configTemp := config
if withSoftImageAffinity { if withSoftImageAffinity {
@ -119,25 +142,35 @@ func (c *Cluster) createContainer(config *cluster.ContainerConfig, name string,
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), configTemp) n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), configTemp)
if err != nil { if err != nil {
c.scheduler.Unlock()
return nil, err return nil, err
} }
engine, ok := c.engines[n.ID]
if nn, ok := c.engines[n.ID]; ok { if !ok {
container, err := nn.Create(config, name, true) c.scheduler.Unlock()
return container, err return nil, nil
} }
return nil, nil c.pendingContainers[swarmID] = &pendingContainer{
Name: name,
Config: config,
Engine: engine,
}
c.scheduler.Unlock()
container, err := engine.Create(config, name, true)
c.scheduler.Lock()
delete(c.pendingContainers, swarmID)
c.scheduler.Unlock()
return container, err
} }
// RemoveContainer aka Remove a container from the cluster. Containers should // RemoveContainer aka Remove a container from the cluster.
// always be destroyed through the scheduler to guarantee atomicity.
func (c *Cluster) RemoveContainer(container *cluster.Container, force, volumes bool) error { func (c *Cluster) RemoveContainer(container *cluster.Container, force, volumes bool) error {
c.scheduler.Lock() return container.Engine.RemoveContainer(container, force, volumes)
defer c.scheduler.Unlock()
err := container.Engine.RemoveContainer(container, force, volumes)
return err
} }
func (c *Cluster) getEngineByAddr(addr string) *cluster.Engine { func (c *Cluster) getEngineByAddr(addr string) *cluster.Engine {
@ -498,10 +531,10 @@ func (c *Cluster) Containers() cluster.Containers {
return out return out
} }
func (c *Cluster) getIDFromName(name string) string { func (c *Cluster) checkNameUniqueness(name string) bool {
// Abort immediately if the name is empty. // Abort immediately if the name is empty.
if len(name) == 0 { if len(name) == 0 {
return "" return true
} }
c.RLock() c.RLock()
@ -510,12 +543,20 @@ func (c *Cluster) getIDFromName(name string) string {
for _, c := range e.Containers() { for _, c := range e.Containers() {
for _, cname := range c.Names { for _, cname := range c.Names {
if cname == name || cname == "/"+name { if cname == name || cname == "/"+name {
return c.Id return false
} }
} }
} }
} }
return ""
// check pending containers.
for _, c := range c.pendingContainers {
if c.Name == name {
return false
}
}
return true
} }
// Container returns the container with IDOrName in the cluster // Container returns the container with IDOrName in the cluster
@ -571,8 +612,14 @@ func (c *Cluster) listNodes() []*node.Node {
defer c.RUnlock() defer c.RUnlock()
out := make([]*node.Node, 0, len(c.engines)) out := make([]*node.Node, 0, len(c.engines))
for _, n := range c.engines { for _, e := range c.engines {
out = append(out, node.NewNode(n)) node := node.NewNode(e)
for _, c := range c.pendingContainers {
if c.Engine.ID == e.ID && node.Container(c.Config.SwarmID()) == nil {
node.AddContainer(c.ToContainer())
}
}
out = append(out, node)
} }
return out return out
@ -653,8 +700,8 @@ func (c *Cluster) RenameContainer(container *cluster.Container, newName string)
defer c.RUnlock() defer c.RUnlock()
// check new name whether available // check new name whether available
if cID := c.getIDFromName(newName); cID != "" { if !c.checkNameUniqueness(newName) {
return fmt.Errorf("Conflict, The name %s is already assigned to %s. You have to delete (or rename) that container to be able to assign %s to a container again.", newName, cID, newName) return fmt.Errorf("Conflict: The name %s is already assigned. You have to delete (or rename) that container to be able to assign %s to a container again.", newName, newName)
} }
// call engine rename // call engine rename

View File

@ -14,24 +14,33 @@ func TestDependencyFilterSimple(t *testing.T) {
f = DependencyFilter{} f = DependencyFilter{}
nodes = []*node.Node{ nodes = []*node.Node{
{ {
ID: "node-0-id", ID: "node-0-id",
Name: "node-0-name", Name: "node-0-name",
Addr: "node-0", Addr: "node-0",
Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c0"}}}, Containers: []*cluster.Container{{
Container: dockerclient.Container{Id: "c0"},
Config: &cluster.ContainerConfig{},
}},
}, },
{ {
ID: "node-1-id", ID: "node-1-id",
Name: "node-1-name", Name: "node-1-name",
Addr: "node-1", Addr: "node-1",
Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c1"}}}, Containers: []*cluster.Container{{
Container: dockerclient.Container{Id: "c1"},
Config: &cluster.ContainerConfig{},
}},
}, },
{ {
ID: "node-2-id", ID: "node-2-id",
Name: "node-2-name", Name: "node-2-name",
Addr: "node-2", Addr: "node-2",
Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}}, Containers: []*cluster.Container{{
Container: dockerclient.Container{Id: "c2"},
Config: &cluster.ContainerConfig{},
}},
}, },
} }
result []*node.Node result []*node.Node
@ -109,17 +118,28 @@ func TestDependencyFilterMulti(t *testing.T) {
Name: "node-0-name", Name: "node-0-name",
Addr: "node-0", Addr: "node-0",
Containers: []*cluster.Container{ Containers: []*cluster.Container{
{Container: dockerclient.Container{Id: "c0"}}, {
{Container: dockerclient.Container{Id: "c1"}}, Container: dockerclient.Container{Id: "c0"},
Config: &cluster.ContainerConfig{},
},
{
Container: dockerclient.Container{Id: "c1"},
Config: &cluster.ContainerConfig{},
},
}, },
}, },
// nodes[1] has c2 // nodes[1] has c2
{ {
ID: "node-1-id", ID: "node-1-id",
Name: "node-1-name", Name: "node-1-name",
Addr: "node-1", Addr: "node-1",
Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}}, Containers: []*cluster.Container{
{
Container: dockerclient.Container{Id: "c2"},
Config: &cluster.ContainerConfig{},
},
},
}, },
// nodes[2] has nothing // nodes[2] has nothing
@ -179,17 +199,28 @@ func TestDependencyFilterChaining(t *testing.T) {
Name: "node-0-name", Name: "node-0-name",
Addr: "node-0", Addr: "node-0",
Containers: []*cluster.Container{ Containers: []*cluster.Container{
{Container: dockerclient.Container{Id: "c0"}}, {
{Container: dockerclient.Container{Id: "c1"}}, Container: dockerclient.Container{Id: "c0"},
Config: &cluster.ContainerConfig{},
},
{
Container: dockerclient.Container{Id: "c1"},
Config: &cluster.ContainerConfig{},
},
}, },
}, },
// nodes[1] has c2 // nodes[1] has c2
{ {
ID: "node-1-id", ID: "node-1-id",
Name: "node-1-name", Name: "node-1-name",
Addr: "node-1", Addr: "node-1",
Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}}, Containers: []*cluster.Container{
{
Container: dockerclient.Container{Id: "c2"},
Config: &cluster.ContainerConfig{},
},
},
}, },
// nodes[2] has nothing // nodes[2] has nothing

View File

@ -2,7 +2,6 @@ package node
import ( import (
"errors" "errors"
"strings"
"github.com/docker/swarm/cluster" "github.com/docker/swarm/cluster"
) )
@ -14,7 +13,7 @@ type Node struct {
Addr string Addr string
Name string Name string
Labels map[string]string Labels map[string]string
Containers []*cluster.Container Containers cluster.Containers
Images []*cluster.Image Images []*cluster.Image
UsedMemory int64 UsedMemory int64
@ -45,26 +44,7 @@ func NewNode(e *cluster.Engine) *Node {
// Container returns the container with IDOrName in the engine. // Container returns the container with IDOrName in the engine.
func (n *Node) Container(IDOrName string) *cluster.Container { func (n *Node) Container(IDOrName string) *cluster.Container {
// Abort immediately if the name is empty. return n.Containers.Get(IDOrName)
if len(IDOrName) == 0 {
return nil
}
for _, container := range n.Containers {
// Match ID prefix.
if strings.HasPrefix(container.Id, IDOrName) {
return container
}
// Match name, /name or engine/name.
for _, name := range container.Names {
if name == IDOrName || name == "/"+IDOrName || container.Engine.ID+name == IDOrName || container.Engine.Name+name == IDOrName {
return container
}
}
}
return nil
} }
// AddContainer injects a container into the internal state. // AddContainer injects a container into the internal state.