Merge pull request #574 from vieux/node_engine

Transform `node interface` to `engine struct`
This commit is contained in:
Andrea Luzzardi 2015-04-07 14:11:58 -07:00
commit 530d5670a4
38 changed files with 1166 additions and 1257 deletions

View File

@ -17,6 +17,7 @@ import (
dockerfilters "github.com/docker/docker/pkg/parsers/filters"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/filter"
"github.com/docker/swarm/scheduler/node"
"github.com/docker/swarm/version"
"github.com/gorilla/mux"
"github.com/samalba/dockerclient"
@ -94,7 +95,7 @@ func getImagesJSON(c *context, w http.ResponseWriter, r *http.Request) {
if len(accepteds) != 0 {
found := false
for _, accepted := range accepteds {
if accepted == image.Node.Name() || accepted == image.Node.ID() {
if accepted == image.Engine.Name || accepted == image.Engine.ID {
found = true
break
}
@ -132,20 +133,20 @@ func getContainersJSON(c *context, w http.ResponseWriter, r *http.Request) {
if strings.Split(tmp.Image, ":")[0] == "swarm" && !all {
continue
}
if !container.Node.IsHealthy() {
if !container.Engine.IsHealthy() {
tmp.Status = "Pending"
}
// TODO remove the Node Name in the name when we have a good solution
tmp.Names = make([]string, len(container.Names))
for i, name := range container.Names {
tmp.Names[i] = "/" + container.Node.Name() + name
tmp.Names[i] = "/" + container.Engine.Name + name
}
// insert node IP
tmp.Ports = make([]dockerclient.Port, len(container.Ports))
for i, port := range container.Ports {
tmp.Ports[i] = port
if port.IP == "0.0.0.0" {
tmp.Ports[i].IP = container.Node.IP()
tmp.Ports[i].IP = container.Engine.IP
}
}
out = append(out, &tmp)
@ -171,7 +172,7 @@ func getContainerJSON(c *context, w http.ResponseWriter, r *http.Request) {
}
client, scheme := newClientAndScheme(c.tlsConfig)
resp, err := client.Get(scheme + "://" + container.Node.Addr() + "/containers/" + container.Id + "/json")
resp, err := client.Get(scheme + "://" + container.Engine.Addr + "/containers/" + container.Id + "/json")
if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
@ -188,10 +189,10 @@ func getContainerJSON(c *context, w http.ResponseWriter, r *http.Request) {
}
// insert Node field
data = bytes.Replace(data, []byte("\"Name\":\"/"), []byte(fmt.Sprintf("\"Node\":%s,\"Name\":\"/", cluster.SerializeNode(container.Node))), -1)
data = bytes.Replace(data, []byte("\"Name\":\"/"), []byte(fmt.Sprintf("\"Node\":%s,\"Name\":\"/", container.Engine)), -1)
// insert node IP
data = bytes.Replace(data, []byte("\"HostIp\":\"0.0.0.0\""), []byte(fmt.Sprintf("\"HostIp\":%q", container.Node.IP())), -1)
data = bytes.Replace(data, []byte("\"HostIp\":\"0.0.0.0\""), []byte(fmt.Sprintf("\"HostIp\":%q", container.Engine.IP)), -1)
w.Header().Set("Content-Type", "application/json")
w.Write(data)
@ -301,7 +302,7 @@ func postContainersExec(c *context, w http.ResponseWriter, r *http.Request) {
client, scheme := newClientAndScheme(c.tlsConfig)
resp, err := client.Post(scheme+"://"+container.Node.Addr()+"/containers/"+container.Id+"/exec", "application/json", r.Body)
resp, err := client.Post(scheme+"://"+container.Engine.Addr+"/containers/"+container.Id+"/exec", "application/json", r.Body)
if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
@ -356,7 +357,7 @@ func deleteImages(c *context, w http.ResponseWriter, r *http.Request) {
for _, image := range matchedImages {
content, err := c.cluster.RemoveImage(image)
if err != nil {
errs = append(errs, fmt.Sprintf("%s: %s", image.Node.Name(), err.Error()))
errs = append(errs, fmt.Sprintf("%s: %s", image.Engine.Name, err.Error()))
continue
}
out = append(out, content...)
@ -384,7 +385,7 @@ func proxyContainer(c *context, w http.ResponseWriter, r *http.Request) {
return
}
if err := proxy(c.tlsConfig, container.Node.Addr(), w, r); err != nil {
if err := proxy(c.tlsConfig, container.Engine.Addr, w, r); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
}
@ -394,7 +395,7 @@ func proxyImage(c *context, w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
if image := c.cluster.Image(name); image != nil {
proxy(c.tlsConfig, image.Node.Addr(), w, r)
proxy(c.tlsConfig, image.Engine.Addr, w, r)
return
}
httpError(w, fmt.Sprintf("No such image: %s", name), http.StatusNotFound)
@ -402,23 +403,22 @@ func proxyImage(c *context, w http.ResponseWriter, r *http.Request) {
// Proxy a request to a random node
func proxyRandom(c *context, w http.ResponseWriter, r *http.Request) {
candidates := []cluster.Node{}
candidates := []*node.Node{}
// FIXME: doesn't work if there are no container in the cluster
// remove proxyRandom and implemente the features locally
for _, container := range c.cluster.Containers() {
candidates = append(candidates, container.Node)
candidates = append(candidates, node.NewNode(container.Engine))
}
healthFilter := &filter.HealthFilter{}
accepted, err := healthFilter.Filter(nil, candidates)
if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
if err := proxy(c.tlsConfig, accepted[rand.Intn(len(accepted))].Addr(), w, r); err != nil {
if err := proxy(c.tlsConfig, accepted[rand.Intn(len(accepted))].Addr, w, r); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
}
@ -441,7 +441,7 @@ func postCommit(c *context, w http.ResponseWriter, r *http.Request) {
}
// proxy commit request to the right node
if err := proxy(c.tlsConfig, container.Node.Addr(), w, r); err != nil {
if err := proxy(c.tlsConfig, container.Engine.Addr, w, r); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
}
@ -454,7 +454,7 @@ func proxyHijack(c *context, w http.ResponseWriter, r *http.Request) {
return
}
if err := hijack(c.tlsConfig, container.Node.Addr(), w, r); err != nil {
if err := hijack(c.tlsConfig, container.Engine.Addr, w, r); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
}

View File

@ -42,12 +42,16 @@ func (eh *eventsHandler) Wait(remoteAddr string) {
func (eh *eventsHandler) Handle(e *cluster.Event) error {
eh.RLock()
str := fmt.Sprintf("{%q:%q,%q:%q,%q:%q,%q:%d,%q:%s}",
str := fmt.Sprintf("{%q:%q,%q:%q,%q:%q,%q:%d,%q:{%q:%q,%q:%q,%q:%q,%q:%q}}",
"status", e.Status,
"id", e.Id,
"from", e.From+" node:"+e.Node.Name(),
"from", e.From+" node:"+e.Engine.Name,
"time", e.Time,
"node", cluster.SerializeNode(e.Node))
"node",
"Name", e.Engine.Name,
"Id", e.Engine.ID,
"Addr", e.Engine.Addr,
"Ip", e.Engine.IP)
for key, w := range eh.ws {
if _, err := fmt.Fprintf(w, str); err != nil {

View File

@ -17,23 +17,6 @@ func (fw *FakeWriter) Write(p []byte) (n int, err error) {
return len(p), nil
}
type FakeNode struct{}
func (fn *FakeNode) ID() string { return "node_id" }
func (fn *FakeNode) Name() string { return "node_name" }
func (fn *FakeNode) IP() string { return "node_ip" }
func (fn *FakeNode) Addr() string { return "node_addr" }
func (fn *FakeNode) Images() []*cluster.Image { return nil }
func (fn *FakeNode) Image(_ string) *cluster.Image { return nil }
func (fn *FakeNode) Containers() []*cluster.Container { return nil }
func (fn *FakeNode) Container(_ string) *cluster.Container { return nil }
func (fn *FakeNode) TotalCpus() int64 { return 0 }
func (fn *FakeNode) UsedCpus() int64 { return 0 }
func (fn *FakeNode) TotalMemory() int64 { return 0 }
func (fn *FakeNode) UsedMemory() int64 { return 0 }
func (fn *FakeNode) Labels() map[string]string { return nil }
func (fn *FakeNode) IsHealthy() bool { return true }
func TestHandle(t *testing.T) {
eh := NewEventsHandler()
assert.Equal(t, eh.Size(), 0)
@ -44,7 +27,12 @@ func TestHandle(t *testing.T) {
assert.Equal(t, eh.Size(), 1)
event := &cluster.Event{
Node: &FakeNode{},
Engine: &cluster.Engine{
ID: "node_id",
Name: "node_name",
IP: "node_ip",
Addr: "node_addr",
},
}
event.Event.Status = "status"

View File

@ -6,6 +6,6 @@ import "github.com/samalba/dockerclient"
type Container struct {
dockerclient.Container
Info dockerclient.ContainerInfo
Node Node
Info dockerclient.ContainerInfo
Engine *Engine
}

556
cluster/engine.go Normal file
View File

@ -0,0 +1,556 @@
package cluster
import (
"crypto/tls"
"errors"
"fmt"
"net"
"strings"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/samalba/dockerclient"
)
const (
// Force-refresh the state of the engine this often.
stateRefreshPeriod = 30 * time.Second
// Timeout for requests sent out to the engine.
requestTimeout = 10 * time.Second
)
// NewEngine is exported
func NewEngine(addr string, overcommitRatio float64) *Engine {
e := &Engine{
Addr: addr,
Labels: make(map[string]string),
ch: make(chan bool),
containers: make(map[string]*Container),
healthy: true,
overcommitRatio: int64(overcommitRatio * 100),
}
return e
}
// Engine represents a docker engine
type Engine struct {
sync.RWMutex
ID string
IP string
Addr string
Name string
Cpus int64
Memory int64
Labels map[string]string
ch chan bool
containers map[string]*Container
images []*Image
client dockerclient.Client
eventHandler EventHandler
healthy bool
overcommitRatio int64
}
// Connect will initialize a connection to the Docker daemon running on the
// host, gather machine specs (memory, cpu, ...) and monitor state changes.
func (e *Engine) Connect(config *tls.Config) error {
host, _, err := net.SplitHostPort(e.Addr)
if err != nil {
return err
}
addr, err := net.ResolveIPAddr("ip4", host)
if err != nil {
return err
}
e.IP = addr.IP.String()
c, err := dockerclient.NewDockerClientTimeout("tcp://"+e.Addr, config, time.Duration(requestTimeout))
if err != nil {
return err
}
return e.connectClient(c)
}
func (e *Engine) connectClient(client dockerclient.Client) error {
e.client = client
// Fetch the engine labels.
if err := e.updateSpecs(); err != nil {
e.client = nil
return err
}
// Force a state update before returning.
if err := e.refreshContainers(true); err != nil {
e.client = nil
return err
}
if err := e.refreshImages(); err != nil {
e.client = nil
return err
}
// Start the update loop.
go e.refreshLoop()
// Start monitoring events from the engine.
e.client.StartMonitorEvents(e.handler, nil)
e.emitEvent("engine_connect")
return nil
}
// isConnected returns true if the engine is connected to a remote docker API
func (e *Engine) isConnected() bool {
return e.client != nil
}
// IsHealthy returns true if the engine is healthy
func (e *Engine) IsHealthy() bool {
return e.healthy
}
// Gather engine specs (CPU, memory, constraints, ...).
func (e *Engine) updateSpecs() error {
info, err := e.client.Info()
if err != nil {
return err
}
if info.NCPU == 0 || info.MemTotal == 0 {
return fmt.Errorf("cannot get resources for this engine, make sure %s is a Docker Engine, not a Swarm manager", e.Addr)
}
// Older versions of Docker don't expose the ID field and are not supported
// by Swarm. Catch the error ASAP and refuse to connect.
if len(info.ID) == 0 {
return fmt.Errorf("engine %s is running an unsupported version of Docker Engine. Please upgrade", e.Addr)
}
e.ID = info.ID
e.Name = info.Name
e.Cpus = info.NCPU
e.Memory = info.MemTotal
e.Labels = map[string]string{
"storagedriver": info.Driver,
"executiondriver": info.ExecutionDriver,
"kernelversion": info.KernelVersion,
"operatingsystem": info.OperatingSystem,
}
for _, label := range info.Labels {
kv := strings.SplitN(label, "=", 2)
e.Labels[kv[0]] = kv[1]
}
return nil
}
// RemoveImage deletes an image from the engine.
func (e *Engine) RemoveImage(image *Image) ([]*dockerclient.ImageDelete, error) {
return e.client.RemoveImage(image.Id)
}
// Refresh the list of images on the engine.
func (e *Engine) refreshImages() error {
images, err := e.client.ListImages()
if err != nil {
return err
}
e.Lock()
e.images = nil
for _, image := range images {
e.images = append(e.images, &Image{Image: *image, Engine: e})
}
e.Unlock()
return nil
}
// Refresh the list and status of containers running on the engine. If `full` is
// true, each container will be inspected.
func (e *Engine) refreshContainers(full bool) error {
containers, err := e.client.ListContainers(true, false, "")
if err != nil {
return err
}
merged := make(map[string]*Container)
for _, c := range containers {
merged, err = e.updateContainer(c, merged, full)
if err != nil {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Unable to update state of container %q", c.Id)
}
}
e.Lock()
defer e.Unlock()
e.containers = merged
log.WithFields(log.Fields{"id": e.ID, "name": e.Name}).Debugf("Updated engine state")
return nil
}
// Refresh the status of a container running on the engine. If `full` is true,
// the container will be inspected.
func (e *Engine) refreshContainer(ID string, full bool) error {
containers, err := e.client.ListContainers(true, false, fmt.Sprintf("{%q:[%q]}", "id", ID))
if err != nil {
return err
}
if len(containers) > 1 {
// We expect one container, if we get more than one, trigger a full refresh.
return e.refreshContainers(full)
}
if len(containers) == 0 {
// The container doesn't exist on the engine, remove it.
e.Lock()
delete(e.containers, ID)
e.Unlock()
return nil
}
_, err = e.updateContainer(containers[0], e.containers, full)
return err
}
func (e *Engine) updateContainer(c dockerclient.Container, containers map[string]*Container, full bool) (map[string]*Container, error) {
var container *Container
e.RLock()
if current, exists := e.containers[c.Id]; exists {
// The container is already knowe.
container = current
} else {
// This is a brand new container. We need to do a full refresh.
container = &Container{
Engine: e,
}
full = true
}
// Release the lock here as the next step is slow.
// Trade-off: If updateContainer() is called concurrently for the same
// container, we will end up doing a full refresh twice and the original
// container (containers[container.Id]) will get replaced.
e.RUnlock()
// Update ContainerInfo.
if full {
info, err := e.client.InspectContainer(c.Id)
if err != nil {
return nil, err
}
container.Info = *info
// real CpuShares -> nb of CPUs
container.Info.Config.CpuShares = container.Info.Config.CpuShares * 1024.0 / e.Cpus
}
// Update its internal state.
e.Lock()
container.Container = c
containers[container.Id] = container
e.Unlock()
return containers, nil
}
func (e *Engine) refreshContainersAsync() {
e.ch <- true
}
func (e *Engine) refreshLoop() {
for {
var err error
select {
case <-e.ch:
err = e.refreshContainers(false)
case <-time.After(stateRefreshPeriod):
err = e.refreshContainers(false)
}
if err == nil {
err = e.refreshImages()
}
if err != nil {
if e.healthy {
e.emitEvent("engine_disconnect")
}
e.healthy = false
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Flagging engine as dead. Updated state failed: %v", err)
} else {
if !e.healthy {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Info("Engine came back to life. Hooray!")
e.client.StopAllMonitorEvents()
e.client.StartMonitorEvents(e.handler, nil)
e.emitEvent("engine_reconnect")
if err := e.updateSpecs(); err != nil {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Update engine specs failed: %v", err)
}
}
e.healthy = true
}
}
}
func (e *Engine) emitEvent(event string) {
// If there is no event handler registered, abort right now.
if e.eventHandler == nil {
return
}
ev := &Event{
Event: dockerclient.Event{
Status: event,
From: "swarm",
Time: time.Now().Unix(),
},
Engine: e,
}
e.eventHandler.Handle(ev)
}
// UsedMemory returns the sum of memory reserved by containers.
func (e *Engine) UsedMemory() int64 {
var r int64
e.RLock()
for _, c := range e.containers {
r += c.Info.Config.Memory
}
e.RUnlock()
return r
}
// UsedCpus returns the sum of CPUs reserved by containers.
func (e *Engine) UsedCpus() int64 {
var r int64
e.RLock()
for _, c := range e.containers {
r += c.Info.Config.CpuShares
}
e.RUnlock()
return r
}
// TotalMemory returns the total memory + overcommit
func (e *Engine) TotalMemory() int64 {
return e.Memory + (e.Memory * e.overcommitRatio / 100)
}
// TotalCpus returns the total cpus + overcommit
func (e *Engine) TotalCpus() int64 {
return e.Cpus + (e.Cpus * e.overcommitRatio / 100)
}
// Create a new container
func (e *Engine) Create(config *dockerclient.ContainerConfig, name string, pullImage bool) (*Container, error) {
var (
err error
id string
client = e.client
)
newConfig := *config
// nb of CPUs -> real CpuShares
newConfig.CpuShares = config.CpuShares * 1024 / e.Cpus
if id, err = client.CreateContainer(&newConfig, name); err != nil {
// If the error is other than not found, abort immediately.
if err != dockerclient.ErrNotFound || !pullImage {
return nil, err
}
// Otherwise, try to pull the image...
if err = e.Pull(config.Image); err != nil {
return nil, err
}
// ...And try agaie.
if id, err = client.CreateContainer(&newConfig, name); err != nil {
return nil, err
}
}
// Register the container immediately while waiting for a state refresh.
// Force a state refresh to pick up the newly created container.
e.refreshContainer(id, true)
e.RLock()
defer e.RUnlock()
return e.containers[id], nil
}
// Destroy and remove a container from the engine.
func (e *Engine) Destroy(container *Container, force bool) error {
if err := e.client.RemoveContainer(container.Id, force, true); err != nil {
return err
}
// Remove the container from the state. Eventually, the state refresh loop
// will rewrite this.
e.Lock()
defer e.Unlock()
delete(e.containers, container.Id)
return nil
}
// Pull an image on the engine
func (e *Engine) Pull(image string) error {
if !strings.Contains(image, ":") {
image = image + ":latest"
}
if err := e.client.PullImage(image, nil); err != nil {
return err
}
return nil
}
// Events register an event handler.
func (e *Engine) Events(h EventHandler) error {
if e.eventHandler != nil {
return errors.New("event handler already set")
}
e.eventHandler = h
return nil
}
// Containers returns all the containers in the engine.
func (e *Engine) Containers() []*Container {
containers := []*Container{}
e.RLock()
for _, container := range e.containers {
containers = append(containers, container)
}
e.RUnlock()
return containers
}
// Container returns the container with IDOrName in the engine.
func (e *Engine) Container(IDOrName string) *Container {
// Abort immediately if the name is empty.
if len(IDOrName) == 0 {
return nil
}
e.RLock()
defer e.RUnlock()
for _, container := range e.Containers() {
// Match ID prefix.
if strings.HasPrefix(container.Id, IDOrName) {
return container
}
// Match name, /name or engine/name.
for _, name := range container.Names {
if name == IDOrName || name == "/"+IDOrName || container.Engine.ID+name == IDOrName || container.Engine.Name+name == IDOrName {
return container
}
}
}
return nil
}
// Images returns all the images in the engine
func (e *Engine) Images() []*Image {
images := []*Image{}
e.RLock()
for _, image := range e.images {
images = append(images, image)
}
e.RUnlock()
return images
}
// Image returns the image with IDOrName in the engine
func (e *Engine) Image(IDOrName string) *Image {
e.RLock()
defer e.RUnlock()
for _, image := range e.images {
if image.Match(IDOrName) {
return image
}
}
return nil
}
func (e *Engine) String() string {
return fmt.Sprintf("engine %s addr %s", e.ID, e.Addr)
}
func (e *Engine) handler(ev *dockerclient.Event, _ chan error, args ...interface{}) {
// Something changed - refresh our internal state.
switch ev.Status {
case "pull", "untag", "delete":
// These events refer to images so there's no need to update
// containers.
e.refreshImages()
case "start", "die":
// If the container is started or stopped, we have to do an inspect in
// order to get the new NetworkSettings.
e.refreshContainer(ev.Id, true)
default:
// Otherwise, do a "soft" refresh of the container.
e.refreshContainer(ev.Id, false)
}
// If there is no event handler registered, abort right now.
if e.eventHandler == nil {
return
}
event := &Event{
Engine: e,
Event: *ev,
}
e.eventHandler.Handle(event)
}
// AddContainer inject a container into the internal state.
func (e *Engine) AddContainer(container *Container) error {
e.Lock()
defer e.Unlock()
if _, ok := e.containers[container.Id]; ok {
return errors.New("container already exists")
}
e.containers[container.Id] = container
return nil
}
// Inject an image into the internal state.
func (e *Engine) addImage(image *Image) {
e.Lock()
defer e.Unlock()
e.images = append(e.images, image)
}
// Remove a container from the internal test.
func (e *Engine) removeContainer(container *Container) error {
e.Lock()
defer e.Unlock()
if _, ok := e.containers[container.Id]; !ok {
return errors.New("container not found")
}
delete(e.containers, container.Id)
return nil
}
// Wipes the internal container state.
func (e *Engine) cleanupContainers() {
e.Lock()
e.containers = make(map[string]*Container)
e.Unlock()
}

21
cluster/engine_sorter.go Normal file
View File

@ -0,0 +1,21 @@
package cluster
// EngineSorter implements the Sort interface to sort Cluster.Engine.
// It is not guaranteed to be a stable sort.
type EngineSorter []*Engine
// Len returns the number of engines to be sorted.
func (s EngineSorter) Len() int {
return len(s)
}
// Swap exchanges the engine elements with indices i and j.
func (s EngineSorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
// Less reports whether the engine with index i should sort before the engine with index j.
// Engines are sorted chronologically by name.
func (s EngineSorter) Less(i, j int) bool {
return s[i].Name < s[j].Name
}

View File

@ -0,0 +1,18 @@
package cluster
import (
"sort"
"testing"
"github.com/stretchr/testify/assert"
)
func TestEngineSorter(t *testing.T) {
engines := []*Engine{{Name: "name1"}, {Name: "name3"}, {Name: "name2"}}
sort.Sort(EngineSorter(engines))
assert.Equal(t, engines[0].Name, "name1")
assert.Equal(t, engines[1].Name, "name2")
assert.Equal(t, engines[2].Name, "name3")
}

View File

@ -1,4 +1,4 @@
package swarm
package cluster
import (
"errors"
@ -25,35 +25,35 @@ var (
}
)
func TestNodeConnectionFailure(t *testing.T) {
node := NewNode("test", 0)
assert.False(t, node.isConnected())
func TestEngineConnectionFailure(t *testing.T) {
engine := NewEngine("test", 0)
assert.False(t, engine.isConnected())
// Always fail.
client := mockclient.NewMockClient()
client.On("Info").Return(&dockerclient.Info{}, errors.New("fail"))
// Connect() should fail and isConnected() return false.
assert.Error(t, node.connectClient(client))
assert.False(t, node.isConnected())
assert.Error(t, engine.connectClient(client))
assert.False(t, engine.isConnected())
client.Mock.AssertExpectations(t)
}
func TestOutdatedNode(t *testing.T) {
node := NewNode("test", 0)
func TestOutdatedEngine(t *testing.T) {
engine := NewEngine("test", 0)
client := mockclient.NewMockClient()
client.On("Info").Return(&dockerclient.Info{}, nil)
assert.Error(t, node.connectClient(client))
assert.False(t, node.isConnected())
assert.Error(t, engine.connectClient(client))
assert.False(t, engine.isConnected())
client.Mock.AssertExpectations(t)
}
func TestNodeCpusMemory(t *testing.T) {
node := NewNode("test", 0)
assert.False(t, node.isConnected())
func TestEngineCpusMemory(t *testing.T) {
engine := NewEngine("test", 0)
assert.False(t, engine.isConnected())
client := mockclient.NewMockClient()
client.On("Info").Return(mockInfo, nil)
@ -61,19 +61,19 @@ func TestNodeCpusMemory(t *testing.T) {
client.On("ListImages").Return([]*dockerclient.Image{}, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return()
assert.NoError(t, node.connectClient(client))
assert.True(t, node.isConnected())
assert.True(t, node.IsHealthy())
assert.NoError(t, engine.connectClient(client))
assert.True(t, engine.isConnected())
assert.True(t, engine.IsHealthy())
assert.Equal(t, node.UsedCpus(), 0)
assert.Equal(t, node.UsedMemory(), 0)
assert.Equal(t, engine.UsedCpus(), 0)
assert.Equal(t, engine.UsedMemory(), 0)
client.Mock.AssertExpectations(t)
}
func TestNodeSpecs(t *testing.T) {
node := NewNode("test", 0)
assert.False(t, node.isConnected())
func TestEngineSpecs(t *testing.T) {
engine := NewEngine("test", 0)
assert.False(t, engine.isConnected())
client := mockclient.NewMockClient()
client.On("Info").Return(mockInfo, nil)
@ -81,24 +81,24 @@ func TestNodeSpecs(t *testing.T) {
client.On("ListImages").Return([]*dockerclient.Image{}, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return()
assert.NoError(t, node.connectClient(client))
assert.True(t, node.isConnected())
assert.True(t, node.IsHealthy())
assert.NoError(t, engine.connectClient(client))
assert.True(t, engine.isConnected())
assert.True(t, engine.IsHealthy())
assert.Equal(t, node.Cpus, mockInfo.NCPU)
assert.Equal(t, node.Memory, mockInfo.MemTotal)
assert.Equal(t, node.Labels()["storagedriver"], mockInfo.Driver)
assert.Equal(t, node.Labels()["executiondriver"], mockInfo.ExecutionDriver)
assert.Equal(t, node.Labels()["kernelversion"], mockInfo.KernelVersion)
assert.Equal(t, node.Labels()["operatingsystem"], mockInfo.OperatingSystem)
assert.Equal(t, node.Labels()["foo"], "bar")
assert.Equal(t, engine.Cpus, mockInfo.NCPU)
assert.Equal(t, engine.Memory, mockInfo.MemTotal)
assert.Equal(t, engine.Labels["storagedriver"], mockInfo.Driver)
assert.Equal(t, engine.Labels["executiondriver"], mockInfo.ExecutionDriver)
assert.Equal(t, engine.Labels["kernelversion"], mockInfo.KernelVersion)
assert.Equal(t, engine.Labels["operatingsystem"], mockInfo.OperatingSystem)
assert.Equal(t, engine.Labels["foo"], "bar")
client.Mock.AssertExpectations(t)
}
func TestNodeState(t *testing.T) {
node := NewNode("test", 0)
assert.False(t, node.isConnected())
func TestEngineState(t *testing.T) {
engine := NewEngine("test", 0)
assert.False(t, engine.isConnected())
client := mockclient.NewMockClient()
client.On("Info").Return(mockInfo, nil)
@ -111,19 +111,19 @@ func TestNodeState(t *testing.T) {
client.On("ListContainers", true, false, fmt.Sprintf("{%q:[%q]}", "id", "two")).Return([]dockerclient.Container{{Id: "two"}}, nil).Once()
client.On("InspectContainer", "two").Return(&dockerclient.ContainerInfo{Config: &dockerclient.ContainerConfig{CpuShares: 100}}, nil).Once()
assert.NoError(t, node.connectClient(client))
assert.True(t, node.isConnected())
assert.NoError(t, engine.connectClient(client))
assert.True(t, engine.isConnected())
// The node should only have a single container at this point.
containers := node.Containers()
// The engine should only have a single container at this point.
containers := engine.Containers()
assert.Len(t, containers, 1)
if containers[0].Id != "one" {
t.Fatalf("Missing container: one")
}
// Fake an event which will trigger a refresh. The second container will appear.
node.handler(&dockerclient.Event{Id: "two", Status: "created"}, nil)
containers = node.Containers()
engine.handler(&dockerclient.Event{Id: "two", Status: "created"}, nil)
containers = engine.Containers()
assert.Len(t, containers, 2)
if containers[0].Id != "one" && containers[1].Id != "one" {
t.Fatalf("Missing container: one")
@ -135,9 +135,9 @@ func TestNodeState(t *testing.T) {
client.Mock.AssertExpectations(t)
}
func TestNodeContainerLookup(t *testing.T) {
node := NewNode("test-node", 0)
assert.False(t, node.isConnected())
func TestEngineContainerLookup(t *testing.T) {
engine := NewEngine("test-engine", 0)
assert.False(t, engine.isConnected())
client := mockclient.NewMockClient()
client.On("Info").Return(mockInfo, nil)
@ -147,22 +147,22 @@ func TestNodeContainerLookup(t *testing.T) {
client.On("ListImages").Return([]*dockerclient.Image{}, nil).Once()
client.On("InspectContainer", "container-id").Return(&dockerclient.ContainerInfo{Config: &dockerclient.ContainerConfig{CpuShares: 100}}, nil).Once()
assert.NoError(t, node.connectClient(client))
assert.True(t, node.isConnected())
assert.NoError(t, engine.connectClient(client))
assert.True(t, engine.isConnected())
// Invalid lookup
assert.Nil(t, node.Container("invalid-id"))
assert.Nil(t, node.Container(""))
assert.Nil(t, engine.Container("invalid-id"))
assert.Nil(t, engine.Container(""))
// Container ID lookup.
assert.NotNil(t, node.Container("container-id"))
assert.NotNil(t, engine.Container("container-id"))
// Container ID prefix lookup.
assert.NotNil(t, node.Container("container-"))
assert.NotNil(t, engine.Container("container-"))
// Container name lookup.
assert.NotNil(t, node.Container("container-name1"))
assert.NotNil(t, node.Container("container-name2"))
// Container node/name matching.
assert.NotNil(t, node.Container("id/container-name1"))
assert.NotNil(t, node.Container("id/container-name2"))
assert.NotNil(t, engine.Container("container-name1"))
assert.NotNil(t, engine.Container("container-name2"))
// Container engine/name matching.
assert.NotNil(t, engine.Container("id/container-name1"))
assert.NotNil(t, engine.Container("id/container-name2"))
client.Mock.AssertExpectations(t)
}
@ -175,7 +175,7 @@ func TestCreateContainer(t *testing.T) {
Cmd: []string{"date"},
Tty: false,
}
node = NewNode("test", 0)
engine = NewEngine("test", 0)
client = mockclient.NewMockClient()
)
@ -183,8 +183,8 @@ func TestCreateContainer(t *testing.T) {
client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return()
client.On("ListContainers", true, false, "").Return([]dockerclient.Container{}, nil).Once()
client.On("ListImages").Return([]*dockerclient.Image{}, nil).Once()
assert.NoError(t, node.connectClient(client))
assert.True(t, node.isConnected())
assert.NoError(t, engine.connectClient(client))
assert.True(t, engine.isConnected())
mockConfig := *config
mockConfig.CpuShares = config.CpuShares * 1024 / mockInfo.NCPU
@ -196,16 +196,16 @@ func TestCreateContainer(t *testing.T) {
client.On("ListContainers", true, false, fmt.Sprintf(`{"id":[%q]}`, id)).Return([]dockerclient.Container{{Id: id}}, nil).Once()
client.On("ListImages").Return([]*dockerclient.Image{}, nil).Once()
client.On("InspectContainer", id).Return(&dockerclient.ContainerInfo{Config: config}, nil).Once()
container, err := node.create(config, name, false)
container, err := engine.Create(config, name, false)
assert.Nil(t, err)
assert.Equal(t, container.Id, id)
assert.Len(t, node.Containers(), 1)
assert.Len(t, engine.Containers(), 1)
// Image not found, pullImage == false
name = "test2"
mockConfig.CpuShares = config.CpuShares * 1024 / mockInfo.NCPU
client.On("CreateContainer", &mockConfig, name).Return("", dockerclient.ErrNotFound).Once()
container, err = node.create(config, name, false)
container, err = engine.Create(config, name, false)
assert.Equal(t, err, dockerclient.ErrNotFound)
assert.Nil(t, container)
@ -219,28 +219,28 @@ func TestCreateContainer(t *testing.T) {
client.On("ListContainers", true, false, fmt.Sprintf(`{"id":[%q]}`, id)).Return([]dockerclient.Container{{Id: id}}, nil).Once()
client.On("ListImages").Return([]*dockerclient.Image{}, nil).Once()
client.On("InspectContainer", id).Return(&dockerclient.ContainerInfo{Config: config}, nil).Once()
container, err = node.create(config, name, true)
container, err = engine.Create(config, name, true)
assert.Nil(t, err)
assert.Equal(t, container.Id, id)
assert.Len(t, node.Containers(), 2)
assert.Len(t, engine.Containers(), 2)
}
func TestTotalMemory(t *testing.T) {
node := NewNode("test", 0.05)
node.Memory = 1024
assert.Equal(t, node.TotalMemory(), 1024+1024*5/100)
engine := NewEngine("test", 0.05)
engine.Memory = 1024
assert.Equal(t, engine.TotalMemory(), 1024+1024*5/100)
node = NewNode("test", 0)
node.Memory = 1024
assert.Equal(t, node.TotalMemory(), 1024)
engine = NewEngine("test", 0)
engine.Memory = 1024
assert.Equal(t, engine.TotalMemory(), 1024)
}
func TestTotalCpus(t *testing.T) {
node := NewNode("test", 0.05)
node.Cpus = 2
assert.Equal(t, node.TotalCpus(), 2+2*5/100)
engine := NewEngine("test", 0.05)
engine.Cpus = 2
assert.Equal(t, engine.TotalCpus(), 2+2*5/100)
node = NewNode("test", 0)
node.Cpus = 2
assert.Equal(t, node.TotalCpus(), 2)
engine = NewEngine("test", 0)
engine.Cpus = 2
assert.Equal(t, engine.TotalCpus(), 2)
}

View File

@ -5,7 +5,7 @@ import "github.com/samalba/dockerclient"
// Event is exported
type Event struct {
dockerclient.Event
Node Node
Engine *Engine
}
// EventHandler is exported

View File

@ -1,20 +0,0 @@
package cluster
type FakeNode struct {
name string
}
func (fn *FakeNode) ID() string { return "" }
func (fn *FakeNode) Name() string { return fn.name }
func (fn *FakeNode) IP() string { return "" }
func (fn *FakeNode) Addr() string { return "" }
func (fn *FakeNode) Images() []*Image { return nil }
func (fn *FakeNode) Image(_ string) *Image { return nil }
func (fn *FakeNode) Containers() []*Container { return nil }
func (fn *FakeNode) Container(_ string) *Container { return nil }
func (fn *FakeNode) TotalCpus() int64 { return 0 }
func (fn *FakeNode) UsedCpus() int64 { return 0 }
func (fn *FakeNode) TotalMemory() int64 { return 0 }
func (fn *FakeNode) UsedMemory() int64 { return 0 }
func (fn *FakeNode) Labels() map[string]string { return nil }
func (fn *FakeNode) IsHealthy() bool { return true }

View File

@ -10,7 +10,7 @@ import (
type Image struct {
dockerclient.Image
Node Node
Engine *Engine
}
// Match is exported

View File

@ -1,55 +0,0 @@
package cluster
import "fmt"
// Node is exported
type Node interface {
ID() string
Name() string
IP() string //to inject the actual IP of the machine in docker ps (hostname:port or ip:port)
Addr() string //to know where to connect with the proxy
Images() []*Image //used by the API
Image(IDOrName string) *Image //used by the filters
Containers() []*Container //used by the filters
Container(IDOrName string) *Container //used by the filters
TotalCpus() int64 //used by the strategy
UsedCpus() int64 //used by the strategy
TotalMemory() int64 //used by the strategy
UsedMemory() int64 //used by the strategy
Labels() map[string]string //used by the filters
IsHealthy() bool
}
// SerializeNode is exported
func SerializeNode(node Node) string {
return fmt.Sprintf("{%q:%q,%q:%q,%q:%q,%q:%q}",
"Name", node.Name(),
"Id", node.ID(),
"Addr", node.Addr(),
"Ip", node.IP())
}
// NodeSorter implements the Sort interface to sort Cluster.Node.
// It is not guaranteed to be a stable sort.
type NodeSorter []Node
// Len returns the number of nodes to be sorted.
func (s NodeSorter) Len() int {
return len(s)
}
// Swap exchanges the node elements with indices i and j.
func (s NodeSorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
// Less reports whether the node with index i should sort before the node with index j.
// Nodes are sorted chronologically by name.
func (s NodeSorter) Less(i, j int) bool {
return s[i].Name() < s[j].Name()
}

View File

@ -1,18 +0,0 @@
package cluster
import (
"sort"
"testing"
"github.com/stretchr/testify/assert"
)
func TestNodeSorter(t *testing.T) {
nodes := []Node{&FakeNode{"name1"}, &FakeNode{"name3"}, &FakeNode{"name2"}}
sort.Sort(NodeSorter(nodes))
assert.Equal(t, nodes[0].Name(), "name1")
assert.Equal(t, nodes[1].Name(), "name2")
assert.Equal(t, nodes[2].Name(), "name3")
}

View File

@ -10,6 +10,7 @@ import (
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/discovery"
"github.com/docker/swarm/scheduler"
"github.com/docker/swarm/scheduler/node"
"github.com/docker/swarm/state"
"github.com/samalba/dockerclient"
)
@ -19,7 +20,7 @@ type Cluster struct {
sync.RWMutex
eventHandler cluster.EventHandler
nodes map[string]*node
engines map[string]*cluster.Engine
scheduler *scheduler.Scheduler
options *cluster.Options
store *state.Store
@ -31,7 +32,7 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, eventhandler
cluster := &Cluster{
eventHandler: eventhandler,
nodes: make(map[string]*node),
engines: make(map[string]*cluster.Engine),
scheduler: scheduler,
options: options,
store: store,
@ -75,8 +76,8 @@ func (c *Cluster) CreateContainer(config *dockerclient.ContainerConfig, name str
return nil, err
}
if nn, ok := n.(*node); ok {
container, err := nn.create(config, name, true)
if nn, ok := c.engines[n.ID]; ok {
container, err := nn.Create(config, name, true)
if err != nil {
return nil, err
}
@ -98,10 +99,8 @@ func (c *Cluster) RemoveContainer(container *cluster.Container, force bool) erro
c.scheduler.Lock()
defer c.scheduler.Unlock()
if n, ok := container.Node.(*node); ok {
if err := n.destroy(container, force); err != nil {
return err
}
if err := container.Engine.Destroy(container, force); err != nil {
return err
}
if err := c.store.Remove(container.Id); err != nil {
@ -114,29 +113,29 @@ func (c *Cluster) RemoveContainer(container *cluster.Container, force bool) erro
return nil
}
// Entries are Docker Nodes
// Entries are Docker Engines
func (c *Cluster) newEntries(entries []*discovery.Entry) {
for _, entry := range entries {
go func(m *discovery.Entry) {
if c.getNode(m.String()) == nil {
n := NewNode(m.String(), c.options.OvercommitRatio)
if err := n.connect(c.options.TLSConfig); err != nil {
if !c.hasEngine(m.String()) {
engine := cluster.NewEngine(m.String(), c.options.OvercommitRatio)
if err := engine.Connect(c.options.TLSConfig); err != nil {
log.Error(err)
return
}
c.Lock()
if old, exists := c.nodes[n.id]; exists {
if old, exists := c.engines[engine.ID]; exists {
c.Unlock()
if old.ip != n.ip {
log.Errorf("ID duplicated. %s shared by %s and %s", n.id, old.IP(), n.IP())
if old.IP != engine.IP {
log.Errorf("ID duplicated. %s shared by %s and %s", engine.ID, old.IP, engine.IP)
} else {
log.Errorf("node %q is already registered", n.id)
log.Errorf("node %q is already registered", engine.ID)
}
return
}
c.nodes[n.id] = n
if err := n.events(c); err != nil {
c.engines[engine.ID] = engine
if err := engine.Events(c); err != nil {
log.Error(err)
c.Unlock()
return
@ -148,13 +147,13 @@ func (c *Cluster) newEntries(entries []*discovery.Entry) {
}
}
func (c *Cluster) getNode(addr string) *node {
for _, node := range c.nodes {
if node.addr == addr {
return node
func (c *Cluster) hasEngine(addr string) bool {
for _, engine := range c.engines {
if engine.Addr == addr {
return true
}
}
return nil
return false
}
// Images returns all the images in the cluster.
@ -163,7 +162,7 @@ func (c *Cluster) Images() []*cluster.Image {
defer c.RUnlock()
out := []*cluster.Image{}
for _, n := range c.nodes {
for _, n := range c.engines {
out = append(out, n.Images()...)
}
@ -179,7 +178,7 @@ func (c *Cluster) Image(IDOrName string) *cluster.Image {
c.RLock()
defer c.RUnlock()
for _, n := range c.nodes {
for _, n := range c.engines {
if image := n.Image(IDOrName); image != nil {
return image
}
@ -192,27 +191,24 @@ func (c *Cluster) Image(IDOrName string) *cluster.Image {
func (c *Cluster) RemoveImage(image *cluster.Image) ([]*dockerclient.ImageDelete, error) {
c.Lock()
defer c.Unlock()
if n, ok := image.Node.(*node); ok {
return n.removeImage(image)
}
return nil, nil
return image.Engine.RemoveImage(image)
}
// Pull is exported
func (c *Cluster) Pull(name string, callback func(what, status string)) {
size := len(c.nodes)
size := len(c.engines)
done := make(chan bool, size)
for _, n := range c.nodes {
go func(nn *node) {
for _, n := range c.engines {
go func(nn *cluster.Engine) {
if callback != nil {
callback(nn.Name(), "")
callback(nn.Name, "")
}
err := nn.pull(name)
err := nn.Pull(name)
if callback != nil {
if err != nil {
callback(nn.Name(), err.Error())
callback(nn.Name, err.Error())
} else {
callback(nn.Name(), "downloaded")
callback(nn.Name, "downloaded")
}
}
done <- true
@ -229,7 +225,7 @@ func (c *Cluster) Containers() []*cluster.Container {
defer c.RUnlock()
out := []*cluster.Container{}
for _, n := range c.nodes {
for _, n := range c.engines {
out = append(out, n.Containers()...)
}
@ -245,7 +241,7 @@ func (c *Cluster) Container(IDOrName string) *cluster.Container {
c.RLock()
defer c.RUnlock()
for _, n := range c.nodes {
for _, n := range c.engines {
if container := n.Container(IDOrName); container != nil {
return container
}
@ -254,35 +250,47 @@ func (c *Cluster) Container(IDOrName string) *cluster.Container {
return nil
}
// nodes returns all the nodes in the cluster.
func (c *Cluster) listNodes() []cluster.Node {
// listNodes returns all the engines in the cluster.
func (c *Cluster) listNodes() []*node.Node {
c.RLock()
defer c.RUnlock()
out := []cluster.Node{}
for _, n := range c.nodes {
out = append(out, n)
out := []*node.Node{}
for _, n := range c.engines {
out = append(out, node.NewNode(n))
}
return out
}
// listEngines returns all the engines in the cluster.
func (c *Cluster) listEngines() []*cluster.Engine {
c.RLock()
defer c.RUnlock()
out := []*cluster.Engine{}
for _, n := range c.engines {
out = append(out, n)
}
return out
}
// Info is exported
func (c *Cluster) Info() [][2]string {
info := [][2]string{
{"\bStrategy", c.scheduler.Strategy()},
{"\bFilters", c.scheduler.Filters()},
{"\bNodes", fmt.Sprintf("%d", len(c.nodes))},
{"\bNodes", fmt.Sprintf("%d", len(c.engines))},
}
nodes := c.listNodes()
sort.Sort(cluster.NodeSorter(nodes))
engines := c.listEngines()
sort.Sort(cluster.EngineSorter(engines))
for _, node := range nodes {
info = append(info, [2]string{node.Name(), node.Addr()})
info = append(info, [2]string{" └ Containers", fmt.Sprintf("%d", len(node.Containers()))})
info = append(info, [2]string{" └ Reserved CPUs", fmt.Sprintf("%d / %d", node.UsedCpus(), node.TotalCpus())})
info = append(info, [2]string{" └ Reserved Memory", fmt.Sprintf("%s / %s", units.BytesSize(float64(node.UsedMemory())), units.BytesSize(float64(node.TotalMemory())))})
for _, engine := range engines {
info = append(info, [2]string{engine.Name, engine.Addr})
info = append(info, [2]string{" └ Containers", fmt.Sprintf("%d", len(engine.Containers()))})
info = append(info, [2]string{" └ Reserved CPUs", fmt.Sprintf("%d / %d", engine.UsedCpus(), engine.TotalCpus())})
info = append(info, [2]string{" └ Reserved Memory", fmt.Sprintf("%s / %s", units.BytesSize(float64(engine.UsedMemory())), units.BytesSize(float64(engine.TotalMemory())))})
}
return info

View File

@ -8,29 +8,29 @@ import (
"github.com/stretchr/testify/assert"
)
func createNode(t *testing.T, ID string, containers ...dockerclient.Container) *node {
node := NewNode(ID, 0)
node.name = ID
node.id = ID
func createEngine(t *testing.T, ID string, containers ...dockerclient.Container) *cluster.Engine {
engine := cluster.NewEngine(ID, 0)
engine.Name = ID
engine.ID = ID
for _, container := range containers {
node.addContainer(&cluster.Container{Container: container, Node: node})
engine.AddContainer(&cluster.Container{Container: container, Engine: engine})
}
return node
return engine
}
func TestContainerLookup(t *testing.T) {
c := &Cluster{
nodes: make(map[string]*node),
engines: make(map[string]*cluster.Engine),
}
container := dockerclient.Container{
Id: "container-id",
Names: []string{"/container-name1", "/container-name2"},
}
n := createNode(t, "test-node", container)
c.nodes[n.ID()] = n
n := createEngine(t, "test-engine", container)
c.engines[n.ID] = n
// Invalid lookup
assert.Nil(t, c.Container("invalid-id"))
@ -42,7 +42,7 @@ func TestContainerLookup(t *testing.T) {
// Container name lookup.
assert.NotNil(t, c.Container("container-name1"))
assert.NotNil(t, c.Container("container-name2"))
// Container node/name matching.
assert.NotNil(t, c.Container("test-node/container-name1"))
assert.NotNil(t, c.Container("test-node/container-name2"))
// Container engine/name matching.
assert.NotNil(t, c.Container("test-engine/container-name1"))
assert.NotNil(t, c.Container("test-engine/container-name2"))
}

View File

@ -1,571 +0,0 @@
package swarm
import (
"crypto/tls"
"errors"
"fmt"
"net"
"strings"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
"github.com/samalba/dockerclient"
)
const (
// Force-refresh the state of the node this often.
stateRefreshPeriod = 30 * time.Second
// Timeout for requests sent out to the node.
requestTimeout = 10 * time.Second
)
// NewNode is exported
func NewNode(addr string, overcommitRatio float64) *node {
e := &node{
addr: addr,
labels: make(map[string]string),
ch: make(chan bool),
containers: make(map[string]*cluster.Container),
healthy: true,
overcommitRatio: int64(overcommitRatio * 100),
}
return e
}
type node struct {
sync.RWMutex
id string
ip string
addr string
name string
Cpus int64
Memory int64
labels map[string]string
ch chan bool
containers map[string]*cluster.Container
images []*cluster.Image
client dockerclient.Client
eventHandler cluster.EventHandler
healthy bool
overcommitRatio int64
}
func (n *node) ID() string {
return n.id
}
func (n *node) IP() string {
return n.ip
}
func (n *node) Addr() string {
return n.addr
}
func (n *node) Name() string {
return n.name
}
func (n *node) Labels() map[string]string {
return n.labels
}
// Connect will initialize a connection to the Docker daemon running on the
// host, gather machine specs (memory, cpu, ...) and monitor state changes.
func (n *node) connect(config *tls.Config) error {
host, _, err := net.SplitHostPort(n.addr)
if err != nil {
return err
}
addr, err := net.ResolveIPAddr("ip4", host)
if err != nil {
return err
}
n.ip = addr.IP.String()
c, err := dockerclient.NewDockerClientTimeout("tcp://"+n.addr, config, time.Duration(requestTimeout))
if err != nil {
return err
}
return n.connectClient(c)
}
func (n *node) connectClient(client dockerclient.Client) error {
n.client = client
// Fetch the engine labels.
if err := n.updateSpecs(); err != nil {
n.client = nil
return err
}
// Force a state update before returning.
if err := n.refreshContainers(true); err != nil {
n.client = nil
return err
}
if err := n.refreshImages(); err != nil {
n.client = nil
return err
}
// Start the update loop.
go n.refreshLoop()
// Start monitoring events from the node.
n.client.StartMonitorEvents(n.handler, nil)
n.emitEvent("node_connect")
return nil
}
// isConnected returns true if the engine is connected to a remote docker API
func (n *node) isConnected() bool {
return n.client != nil
}
func (n *node) IsHealthy() bool {
return n.healthy
}
// Gather node specs (CPU, memory, constraints, ...).
func (n *node) updateSpecs() error {
info, err := n.client.Info()
if err != nil {
return err
}
if info.NCPU == 0 || info.MemTotal == 0 {
return fmt.Errorf("cannot get resources for this node, make sure %s is a Docker Engine, not a Swarm manager", n.addr)
}
// Older versions of Docker don't expose the ID field and are not supported
// by Swarm. Catch the error ASAP and refuse to connect.
if len(info.ID) == 0 {
return fmt.Errorf("node %s is running an unsupported version of Docker Engine. Please upgrade", n.addr)
}
n.id = info.ID
n.name = info.Name
n.Cpus = info.NCPU
n.Memory = info.MemTotal
n.labels = map[string]string{
"storagedriver": info.Driver,
"executiondriver": info.ExecutionDriver,
"kernelversion": info.KernelVersion,
"operatingsystem": info.OperatingSystem,
}
for _, label := range info.Labels {
kv := strings.SplitN(label, "=", 2)
n.labels[kv[0]] = kv[1]
}
return nil
}
// Delete an image from the node.
func (n *node) removeImage(image *cluster.Image) ([]*dockerclient.ImageDelete, error) {
return n.client.RemoveImage(image.Id)
}
// Refresh the list of images on the node.
func (n *node) refreshImages() error {
images, err := n.client.ListImages()
if err != nil {
return err
}
n.Lock()
n.images = nil
for _, image := range images {
n.images = append(n.images, &cluster.Image{Image: *image, Node: n})
}
n.Unlock()
return nil
}
// Refresh the list and status of containers running on the node. If `full` is
// true, each container will be inspected.
func (n *node) refreshContainers(full bool) error {
containers, err := n.client.ListContainers(true, false, "")
if err != nil {
return err
}
merged := make(map[string]*cluster.Container)
for _, c := range containers {
merged, err = n.updateContainer(c, merged, full)
if err != nil {
log.WithFields(log.Fields{"name": n.name, "id": n.id}).Errorf("Unable to update state of container %q", c.Id)
}
}
n.Lock()
defer n.Unlock()
n.containers = merged
log.WithFields(log.Fields{"id": n.id, "name": n.name}).Debugf("Updated node state")
return nil
}
// Refresh the status of a container running on the node. If `full` is true,
// the container will be inspected.
func (n *node) refreshContainer(ID string, full bool) error {
containers, err := n.client.ListContainers(true, false, fmt.Sprintf("{%q:[%q]}", "id", ID))
if err != nil {
return err
}
if len(containers) > 1 {
// We expect one container, if we get more than one, trigger a full refresh.
return n.refreshContainers(full)
}
if len(containers) == 0 {
// The container doesn't exist on the node, remove it.
n.Lock()
delete(n.containers, ID)
n.Unlock()
return nil
}
_, err = n.updateContainer(containers[0], n.containers, full)
return err
}
func (n *node) updateContainer(c dockerclient.Container, containers map[string]*cluster.Container, full bool) (map[string]*cluster.Container, error) {
var container *cluster.Container
n.RLock()
if current, exists := n.containers[c.Id]; exists {
// The container is already known.
container = current
} else {
// This is a brand new container. We need to do a full refresh.
container = &cluster.Container{
Node: n,
}
full = true
}
// Release the lock here as the next step is slow.
// Trade-off: If updateContainer() is called concurrently for the same
// container, we will end up doing a full refresh twice and the original
// container (containers[container.Id]) will get replaced.
n.RUnlock()
// Update ContainerInfo.
if full {
info, err := n.client.InspectContainer(c.Id)
if err != nil {
return nil, err
}
container.Info = *info
// real CpuShares -> nb of CPUs
container.Info.Config.CpuShares = container.Info.Config.CpuShares * 1024.0 / n.Cpus
}
// Update its internal state.
n.Lock()
container.Container = c
containers[container.Id] = container
n.Unlock()
return containers, nil
}
func (n *node) refreshContainersAsync() {
n.ch <- true
}
func (n *node) refreshLoop() {
for {
var err error
select {
case <-n.ch:
err = n.refreshContainers(false)
case <-time.After(stateRefreshPeriod):
err = n.refreshContainers(false)
}
if err == nil {
err = n.refreshImages()
}
if err != nil {
if n.healthy {
n.emitEvent("node_disconnect")
}
n.healthy = false
log.WithFields(log.Fields{"name": n.name, "id": n.id}).Errorf("Flagging node as dead. Updated state failed: %v", err)
} else {
if !n.healthy {
log.WithFields(log.Fields{"name": n.name, "id": n.id}).Info("Node came back to life. Hooray!")
n.client.StopAllMonitorEvents()
n.client.StartMonitorEvents(n.handler, nil)
n.emitEvent("node_reconnect")
if err := n.updateSpecs(); err != nil {
log.WithFields(log.Fields{"name": n.name, "id": n.id}).Errorf("Update node specs failed: %v", err)
}
}
n.healthy = true
}
}
}
func (n *node) emitEvent(event string) {
// If there is no event handler registered, abort right now.
if n.eventHandler == nil {
return
}
ev := &cluster.Event{
Event: dockerclient.Event{
Status: event,
From: "swarm",
Time: time.Now().Unix(),
},
Node: n,
}
n.eventHandler.Handle(ev)
}
// Return the sum of memory reserved by containers.
func (n *node) UsedMemory() int64 {
var r int64
n.RLock()
for _, c := range n.containers {
r += c.Info.Config.Memory
}
n.RUnlock()
return r
}
// Return the sum of CPUs reserved by containers.
func (n *node) UsedCpus() int64 {
var r int64
n.RLock()
for _, c := range n.containers {
r += c.Info.Config.CpuShares
}
n.RUnlock()
return r
}
func (n *node) TotalMemory() int64 {
return n.Memory + (n.Memory * n.overcommitRatio / 100)
}
func (n *node) TotalCpus() int64 {
return n.Cpus + (n.Cpus * n.overcommitRatio / 100)
}
func (n *node) create(config *dockerclient.ContainerConfig, name string, pullImage bool) (*cluster.Container, error) {
var (
err error
id string
client = n.client
)
newConfig := *config
// nb of CPUs -> real CpuShares
newConfig.CpuShares = config.CpuShares * 1024 / n.Cpus
if id, err = client.CreateContainer(&newConfig, name); err != nil {
// If the error is other than not found, abort immediately.
if err != dockerclient.ErrNotFound || !pullImage {
return nil, err
}
// Otherwise, try to pull the image...
if err = n.pull(config.Image); err != nil {
return nil, err
}
// ...And try again.
if id, err = client.CreateContainer(&newConfig, name); err != nil {
return nil, err
}
}
// Register the container immediately while waiting for a state refresh.
// Force a state refresh to pick up the newly created container.
n.refreshContainer(id, true)
n.RLock()
defer n.RUnlock()
return n.containers[id], nil
}
// Destroy and remove a container from the node.
func (n *node) destroy(container *cluster.Container, force bool) error {
if err := n.client.RemoveContainer(container.Id, force, true); err != nil {
return err
}
// Remove the container from the state. Eventually, the state refresh loop
// will rewrite this.
n.Lock()
defer n.Unlock()
delete(n.containers, container.Id)
return nil
}
func (n *node) pull(image string) error {
if !strings.Contains(image, ":") {
image = image + ":latest"
}
if err := n.client.PullImage(image, nil); err != nil {
return err
}
return nil
}
// Register an event handler.
func (n *node) events(h cluster.EventHandler) error {
if n.eventHandler != nil {
return errors.New("event handler already set")
}
n.eventHandler = h
return nil
}
// Containers returns all the containers in the node.
func (n *node) Containers() []*cluster.Container {
containers := []*cluster.Container{}
n.RLock()
for _, container := range n.containers {
containers = append(containers, container)
}
n.RUnlock()
return containers
}
// Container returns the container with IDOrName in the node.
func (n *node) Container(IDOrName string) *cluster.Container {
// Abort immediately if the name is empty.
if len(IDOrName) == 0 {
return nil
}
n.RLock()
defer n.RUnlock()
for _, container := range n.Containers() {
// Match ID prefix.
if strings.HasPrefix(container.Id, IDOrName) {
return container
}
// Match name, /name or engine/name.
for _, name := range container.Names {
if name == IDOrName || name == "/"+IDOrName || container.Node.ID()+name == IDOrName || container.Node.Name()+name == IDOrName {
return container
}
}
}
return nil
}
// Images returns all the images in the node
func (n *node) Images() []*cluster.Image {
images := []*cluster.Image{}
n.RLock()
for _, image := range n.images {
images = append(images, image)
}
n.RUnlock()
return images
}
// Image returns the image with IDOrName in the node
func (n *node) Image(IDOrName string) *cluster.Image {
n.RLock()
defer n.RUnlock()
for _, image := range n.images {
if image.Match(IDOrName) {
return image
}
}
return nil
}
func (n *node) String() string {
return fmt.Sprintf("node %s addr %s", n.id, n.addr)
}
func (n *node) handler(ev *dockerclient.Event, _ chan error, args ...interface{}) {
// Something changed - refresh our internal state.
switch ev.Status {
case "pull", "untag", "delete":
// These events refer to images so there's no need to update
// containers.
n.refreshImages()
case "start", "die":
// If the container is started or stopped, we have to do an inspect in
// order to get the new NetworkSettings.
n.refreshContainer(ev.Id, true)
default:
// Otherwise, do a "soft" refresh of the container.
n.refreshContainer(ev.Id, false)
}
// If there is no event handler registered, abort right now.
if n.eventHandler == nil {
return
}
event := &cluster.Event{
Node: n,
Event: *ev,
}
n.eventHandler.Handle(event)
}
// Inject a container into the internal state.
func (n *node) addContainer(container *cluster.Container) error {
n.Lock()
defer n.Unlock()
if _, ok := n.containers[container.Id]; ok {
return errors.New("container already exists")
}
n.containers[container.Id] = container
return nil
}
// Inject an image into the internal state.
func (n *node) addImage(image *cluster.Image) {
n.Lock()
defer n.Unlock()
n.images = append(n.images, image)
}
// Remove a container from the internal test.
func (n *node) removeContainer(container *cluster.Container) error {
n.Lock()
defer n.Unlock()
if _, ok := n.containers[container.Id]; !ok {
return errors.New("container not found")
}
delete(n.containers, container.Id)
return nil
}
// Wipes the internal container state.
func (n *node) cleanupContainers() {
n.Lock()
n.containers = make(map[string]*cluster.Container)
n.Unlock()
}

View File

@ -27,7 +27,7 @@ func (h *logHandler) Handle(e *cluster.Event) error {
if len(id) > 12 {
id = id[:12]
}
log.WithFields(log.Fields{"node": e.Node.Name, "id": id, "from": e.From, "status": e.Status}).Debug("Event received")
log.WithFields(log.Fields{"node": e.Engine.Name, "id": id, "from": e.From, "status": e.Status}).Debug("Event received")
return nil
}

View File

@ -5,7 +5,7 @@ import (
"strings"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/node"
"github.com/samalba/dockerclient"
)
@ -19,7 +19,7 @@ func (f *AffinityFilter) Name() string {
}
// Filter is exported
func (f *AffinityFilter) Filter(config *dockerclient.ContainerConfig, nodes []cluster.Node) ([]cluster.Node, error) {
func (f *AffinityFilter) Filter(config *dockerclient.ContainerConfig, nodes []*node.Node) ([]*node.Node, error) {
affinities, err := parseExprs("affinity", config.Env)
if err != nil {
return nil, err
@ -28,12 +28,12 @@ func (f *AffinityFilter) Filter(config *dockerclient.ContainerConfig, nodes []cl
for _, affinity := range affinities {
log.Debugf("matching affinity: %s%s%s", affinity.key, OPERATORS[affinity.operator], affinity.value)
candidates := []cluster.Node{}
candidates := []*node.Node{}
for _, node := range nodes {
switch affinity.key {
case "container":
containers := []string{}
for _, container := range node.Containers() {
for _, container := range node.Containers {
containers = append(containers, container.Id, strings.TrimPrefix(container.Names[0], "/"))
}
if affinity.Match(containers...) {
@ -41,7 +41,7 @@ func (f *AffinityFilter) Filter(config *dockerclient.ContainerConfig, nodes []cl
}
case "image":
images := []string{}
for _, image := range node.Images() {
for _, image := range node.Images {
images = append(images, image.Id)
images = append(images, image.RepoTags...)
for _, tag := range image.RepoTags {

View File

@ -4,6 +4,7 @@ import (
"testing"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/node"
"github.com/samalba/dockerclient"
"github.com/stretchr/testify/assert"
)
@ -11,12 +12,12 @@ import (
func TestAffinityFilter(t *testing.T) {
var (
f = AffinityFilter{}
nodes = []cluster.Node{
&FakeNode{
id: "node-0-id",
name: "node-0-name",
addr: "node-0",
containers: []*cluster.Container{
nodes = []*node.Node{
{
ID: "node-0-id",
Name: "node-0-name",
Addr: "node-0",
Containers: []*cluster.Container{
{Container: dockerclient.Container{
Id: "container-n0-0-id",
Names: []string{"/container-n0-0-name"},
@ -26,16 +27,16 @@ func TestAffinityFilter(t *testing.T) {
Names: []string{"/container-n0-1-name"},
}},
},
images: []*cluster.Image{{Image: dockerclient.Image{
Images: []*cluster.Image{{Image: dockerclient.Image{
Id: "image-0-id",
RepoTags: []string{"image-0:tag1", "image-0:tag2"},
}}},
},
&FakeNode{
id: "node-1-id",
name: "node-1-name",
addr: "node-1",
containers: []*cluster.Container{
{
ID: "node-1-id",
Name: "node-1-name",
Addr: "node-1",
Containers: []*cluster.Container{
{Container: dockerclient.Container{
Id: "container-n1-0-id",
Names: []string{"/container-n1-0-name"},
@ -45,18 +46,18 @@ func TestAffinityFilter(t *testing.T) {
Names: []string{"/container-n1-1-name"},
}},
},
images: []*cluster.Image{{Image: dockerclient.Image{
Images: []*cluster.Image{{Image: dockerclient.Image{
Id: "image-1-id",
RepoTags: []string{"image-1:tag1", "image-0:tag3", "image-1:tag2"},
}}},
},
&FakeNode{
id: "node-2-id",
name: "node-2-name",
addr: "node-2",
{
ID: "node-2-id",
Name: "node-2-name",
Addr: "node-2",
},
}
result []cluster.Node
result []*node.Node
err error
)

View File

@ -4,7 +4,7 @@ import (
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/node"
"github.com/samalba/dockerclient"
)
@ -18,7 +18,7 @@ func (f *ConstraintFilter) Name() string {
}
// Filter is exported
func (f *ConstraintFilter) Filter(config *dockerclient.ContainerConfig, nodes []cluster.Node) ([]cluster.Node, error) {
func (f *ConstraintFilter) Filter(config *dockerclient.ContainerConfig, nodes []*node.Node) ([]*node.Node, error) {
constraints, err := parseExprs("constraint", config.Env)
if err != nil {
return nil, err
@ -27,16 +27,16 @@ func (f *ConstraintFilter) Filter(config *dockerclient.ContainerConfig, nodes []
for _, constraint := range constraints {
log.Debugf("matching constraint: %s %s %s", constraint.key, OPERATORS[constraint.operator], constraint.value)
candidates := []cluster.Node{}
candidates := []*node.Node{}
for _, node := range nodes {
switch constraint.key {
case "node":
// "node" label is a special case pinning a container to a specific node.
if constraint.Match(node.ID(), node.Name()) {
if constraint.Match(node.ID, node.Name) {
candidates = append(candidates, node)
}
default:
if constraint.Match(node.Labels()[constraint.key]) {
if constraint.Match(node.Labels[constraint.key]) {
candidates = append(candidates, node)
}
}

View File

@ -3,50 +3,50 @@ package filter
import (
"testing"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/node"
"github.com/samalba/dockerclient"
"github.com/stretchr/testify/assert"
)
func testFixtures() []cluster.Node {
return []cluster.Node{
&FakeNode{
id: "node-0-id",
name: "node-0-name",
addr: "node-0",
labels: map[string]string{
func testFixtures() []*node.Node {
return []*node.Node{
{
ID: "node-0-id",
Name: "node-0-name",
Addr: "node-0",
Labels: map[string]string{
"name": "node0",
"group": "1",
"region": "us-west",
},
},
&FakeNode{
id: "node-1-id",
name: "node-1-name",
addr: "node-1",
labels: map[string]string{
{
ID: "node-1-id",
Name: "node-1-name",
Addr: "node-1",
Labels: map[string]string{
"name": "node1",
"group": "1",
"region": "us-east",
},
},
&FakeNode{
id: "node-2-id",
name: "node-2-name",
addr: "node-2",
labels: map[string]string{
{
ID: "node-2-id",
Name: "node-2-name",
Addr: "node-2",
Labels: map[string]string{
"name": "node2",
"group": "2",
"region": "eu",
},
},
&FakeNode{
id: "node-3-id",
name: "node-3-name",
addr: "node-3",
{
ID: "node-3-id",
Name: "node-3-name",
Addr: "node-3",
},
}
}
@ -55,7 +55,7 @@ func TestConstrainteFilter(t *testing.T) {
var (
f = ConstraintFilter{}
nodes = testFixtures()
result []cluster.Node
result []*node.Node
err error
)
@ -116,7 +116,7 @@ func TestConstraintNotExpr(t *testing.T) {
var (
f = ConstraintFilter{}
nodes = testFixtures()
result []cluster.Node
result []*node.Node
err error
)
@ -145,7 +145,7 @@ func TestConstraintRegExp(t *testing.T) {
var (
f = ConstraintFilter{}
nodes = testFixtures()
result []cluster.Node
result []*node.Node
err error
)
@ -174,17 +174,15 @@ func TestFilterRegExpCaseInsensitive(t *testing.T) {
var (
f = ConstraintFilter{}
nodes = testFixtures()
result []cluster.Node
result []*node.Node
err error
)
// Prepare node with a strange name
if n, ok := nodes[3].(*FakeNode); ok {
n.labels = map[string]string{
"name": "aBcDeF",
"group": "2",
"region": "eu",
}
nodes[3].Labels = map[string]string{
"name": "aBcDeF",
"group": "2",
"region": "eu",
}
// Case-sensitive, so not match
@ -197,7 +195,7 @@ func TestFilterRegExpCaseInsensitive(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[3])
assert.Equal(t, result[0].Labels()["name"], "aBcDeF")
assert.Equal(t, result[0].Labels["name"], "aBcDeF")
// Test ! filter combined with case insensitive
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{`constraint:name!=/(?i)abc*/`}}, nodes)
@ -209,7 +207,7 @@ func TestFilterEquals(t *testing.T) {
var (
f = ConstraintFilter{}
nodes = testFixtures()
result []cluster.Node
result []*node.Node
err error
)
@ -234,7 +232,7 @@ func TestUnsupportedOperators(t *testing.T) {
var (
f = ConstraintFilter{}
nodes = testFixtures()
result []cluster.Node
result []*node.Node
err error
)
@ -251,7 +249,7 @@ func TestFilterSoftConstraint(t *testing.T) {
var (
f = ConstraintFilter{}
nodes = testFixtures()
result []cluster.Node
result []*node.Node
err error
)

View File

@ -4,7 +4,7 @@ import (
"fmt"
"strings"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/node"
"github.com/samalba/dockerclient"
)
@ -18,7 +18,7 @@ func (f *DependencyFilter) Name() string {
}
// Filter is exported
func (f *DependencyFilter) Filter(config *dockerclient.ContainerConfig, nodes []cluster.Node) ([]cluster.Node, error) {
func (f *DependencyFilter) Filter(config *dockerclient.ContainerConfig, nodes []*node.Node) ([]*node.Node, error) {
if len(nodes) == 0 {
return nodes, nil
}
@ -35,7 +35,7 @@ func (f *DependencyFilter) Filter(config *dockerclient.ContainerConfig, nodes []
net = append(net, strings.TrimPrefix(config.HostConfig.NetworkMode, "container:"))
}
candidates := []cluster.Node{}
candidates := []*node.Node{}
for _, node := range nodes {
if f.check(config.HostConfig.VolumesFrom, node) &&
f.check(links, node) &&
@ -67,7 +67,7 @@ func (f *DependencyFilter) String(config *dockerclient.ContainerConfig) string {
}
// Ensure that the node contains all dependent containers.
func (f *DependencyFilter) check(dependencies []string, node cluster.Node) bool {
func (f *DependencyFilter) check(dependencies []string, node *node.Node) bool {
for _, dependency := range dependencies {
if node.Container(dependency) == nil {
return false

View File

@ -4,6 +4,7 @@ import (
"testing"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/node"
"github.com/samalba/dockerclient"
"github.com/stretchr/testify/assert"
)
@ -11,29 +12,29 @@ import (
func TestDependencyFilterSimple(t *testing.T) {
var (
f = DependencyFilter{}
nodes = []cluster.Node{
&FakeNode{
id: "node-0-id",
name: "node-0-name",
addr: "node-0",
containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c0"}}},
nodes = []*node.Node{
{
ID: "node-0-id",
Name: "node-0-name",
Addr: "node-0",
Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c0"}}},
},
&FakeNode{
id: "node-1-id",
name: "node-1-name",
addr: "node-1",
containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c1"}}},
{
ID: "node-1-id",
Name: "node-1-name",
Addr: "node-1",
Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c1"}}},
},
&FakeNode{
id: "node-2-id",
name: "node-2-name",
addr: "node-2",
containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}},
{
ID: "node-2-id",
Name: "node-2-name",
Addr: "node-2",
Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}},
},
}
result []cluster.Node
result []*node.Node
err error
config *dockerclient.ContainerConfig
)
@ -83,34 +84,34 @@ func TestDependencyFilterSimple(t *testing.T) {
func TestDependencyFilterMulti(t *testing.T) {
var (
f = DependencyFilter{}
nodes = []cluster.Node{
nodes = []*node.Node{
// nodes[0] has c0 and c1
&FakeNode{
id: "node-0-id",
name: "node-0-name",
addr: "node-0",
containers: []*cluster.Container{
{
ID: "node-0-id",
Name: "node-0-name",
Addr: "node-0",
Containers: []*cluster.Container{
{Container: dockerclient.Container{Id: "c0"}},
{Container: dockerclient.Container{Id: "c1"}},
},
},
// nodes[1] has c2
&FakeNode{
id: "node-1-id",
name: "node-1-name",
addr: "node-1",
containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}},
{
ID: "node-1-id",
Name: "node-1-name",
Addr: "node-1",
Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}},
},
// nodes[2] has nothing
&FakeNode{
id: "node-2-id",
name: "node-2-name",
addr: "node-2",
{
ID: "node-2-id",
Name: "node-2-name",
Addr: "node-2",
},
}
result []cluster.Node
result []*node.Node
err error
config *dockerclient.ContainerConfig
)
@ -153,34 +154,34 @@ func TestDependencyFilterMulti(t *testing.T) {
func TestDependencyFilterChaining(t *testing.T) {
var (
f = DependencyFilter{}
nodes = []cluster.Node{
nodes = []*node.Node{
// nodes[0] has c0 and c1
&FakeNode{
id: "node-0-id",
name: "node-0-name",
addr: "node-0",
containers: []*cluster.Container{
{
ID: "node-0-id",
Name: "node-0-name",
Addr: "node-0",
Containers: []*cluster.Container{
{Container: dockerclient.Container{Id: "c0"}},
{Container: dockerclient.Container{Id: "c1"}},
},
},
// nodes[1] has c2
&FakeNode{
id: "node-1-id",
name: "node-1-name",
addr: "node-1",
containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}},
{
ID: "node-1-id",
Name: "node-1-name",
Addr: "node-1",
Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}},
},
// nodes[2] has nothing
&FakeNode{
id: "node-2-id",
name: "node-2-name",
addr: "node-2",
{
ID: "node-2-id",
Name: "node-2-name",
Addr: "node-2",
},
}
result []cluster.Node
result []*node.Node
err error
config *dockerclient.ContainerConfig
)

View File

@ -1,46 +0,0 @@
package filter
import "github.com/docker/swarm/cluster"
type FakeNode struct {
id string
name string
addr string
containers []*cluster.Container
images []*cluster.Image
labels map[string]string
}
func (fn *FakeNode) ID() string { return fn.id }
func (fn *FakeNode) Name() string { return fn.name }
func (fn *FakeNode) IP() string { return "" }
func (fn *FakeNode) Addr() string { return fn.addr }
func (fn *FakeNode) Images() []*cluster.Image { return fn.images }
func (fn *FakeNode) Image(id string) *cluster.Image {
for _, image := range fn.images {
if image.Id == id {
return image
}
}
return nil
}
func (fn *FakeNode) Containers() []*cluster.Container { return fn.containers }
func (fn *FakeNode) Container(id string) *cluster.Container {
for _, container := range fn.containers {
if container.Id == id {
return container
}
}
return nil
}
func (fn *FakeNode) TotalCpus() int64 { return 0 }
func (fn *FakeNode) UsedCpus() int64 { return 0 }
func (fn *FakeNode) TotalMemory() int64 { return 0 }
func (fn *FakeNode) UsedMemory() int64 { return 0 }
func (fn *FakeNode) Labels() map[string]string { return fn.labels }
func (fn *FakeNode) IsHealthy() bool { return true }
func (fn *FakeNode) AddContainer(container *cluster.Container) error {
fn.containers = append(fn.containers, container)
return nil
}

View File

@ -4,7 +4,7 @@ import (
"errors"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/node"
"github.com/samalba/dockerclient"
)
@ -13,7 +13,7 @@ type Filter interface {
Name() string
// Return a subset of nodes that were accepted by the filtering policy.
Filter(*dockerclient.ContainerConfig, []cluster.Node) ([]cluster.Node, error)
Filter(*dockerclient.ContainerConfig, []*node.Node) ([]*node.Node, error)
}
var (
@ -54,7 +54,7 @@ func New(names []string) ([]Filter, error) {
}
// ApplyFilters applies a set of filters in batch.
func ApplyFilters(filters []Filter, config *dockerclient.ContainerConfig, nodes []cluster.Node) ([]cluster.Node, error) {
func ApplyFilters(filters []Filter, config *dockerclient.ContainerConfig, nodes []*node.Node) ([]*node.Node, error) {
var err error
for _, filter := range filters {

View File

@ -3,7 +3,7 @@ package filter
import (
"errors"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/node"
"github.com/samalba/dockerclient"
)
@ -22,10 +22,10 @@ func (f *HealthFilter) Name() string {
}
// Filter is exported
func (f *HealthFilter) Filter(_ *dockerclient.ContainerConfig, nodes []cluster.Node) ([]cluster.Node, error) {
result := []cluster.Node{}
func (f *HealthFilter) Filter(_ *dockerclient.ContainerConfig, nodes []*node.Node) ([]*node.Node, error) {
result := []*node.Node{}
for _, node := range nodes {
if node.IsHealthy() {
if node.IsHealthy {
result = append(result, node)
}
}

View File

@ -3,7 +3,7 @@ package filter
import (
"fmt"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/node"
"github.com/samalba/dockerclient"
)
@ -19,10 +19,10 @@ func (p *PortFilter) Name() string {
}
// Filter is exported
func (p *PortFilter) Filter(config *dockerclient.ContainerConfig, nodes []cluster.Node) ([]cluster.Node, error) {
func (p *PortFilter) Filter(config *dockerclient.ContainerConfig, nodes []*node.Node) ([]*node.Node, error) {
for _, port := range config.HostConfig.PortBindings {
for _, binding := range port {
candidates := []cluster.Node{}
candidates := []*node.Node{}
for _, node := range nodes {
if !p.portAlreadyInUse(node, binding) {
candidates = append(candidates, node)
@ -37,8 +37,8 @@ func (p *PortFilter) Filter(config *dockerclient.ContainerConfig, nodes []cluste
return nodes, nil
}
func (p *PortFilter) portAlreadyInUse(node cluster.Node, requested dockerclient.PortBinding) bool {
for _, c := range node.Containers() {
func (p *PortFilter) portAlreadyInUse(node *node.Node, requested dockerclient.PortBinding) bool {
for _, c := range node.Containers {
// HostConfig.PortBindings contains the requested ports.
// NetworkSettings.Ports contains the actual ports.
//

View File

@ -5,6 +5,7 @@ import (
"testing"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/node"
"github.com/samalba/dockerclient"
"github.com/stretchr/testify/assert"
)
@ -23,24 +24,24 @@ func makeBinding(ip, port string) map[string][]dockerclient.PortBinding {
func TestPortFilterNoConflicts(t *testing.T) {
var (
p = PortFilter{}
nodes = []cluster.Node{
&FakeNode{
id: "node-0-id",
name: "node-0-name",
addr: "node-0",
nodes = []*node.Node{
{
ID: "node-0-id",
Name: "node-0-name",
Addr: "node-0",
},
&FakeNode{
id: "node-1-id",
name: "node-1-name",
addr: "node-1",
{
ID: "node-1-id",
Name: "node-1-name",
Addr: "node-1",
},
&FakeNode{
id: "node-2-id",
name: "node-2-name",
addr: "node-2",
{
ID: "node-2-id",
Name: "node-2-name",
Addr: "node-2",
},
}
result []cluster.Node
result []*node.Node
err error
)
@ -66,9 +67,7 @@ func TestPortFilterNoConflicts(t *testing.T) {
// Add a container taking a different (4242) port.
container := &cluster.Container{Container: dockerclient.Container{Id: "c1"}, Info: dockerclient.ContainerInfo{HostConfig: &dockerclient.HostConfig{PortBindings: makeBinding("", "4242")}}}
if n, ok := nodes[0].(*FakeNode); ok {
assert.NoError(t, n.AddContainer(container))
}
assert.NoError(t, nodes[0].AddContainer(container))
// Since no node is using port 80, there should be no filter
result, err = p.Filter(config, nodes)
@ -79,32 +78,30 @@ func TestPortFilterNoConflicts(t *testing.T) {
func TestPortFilterSimple(t *testing.T) {
var (
p = PortFilter{}
nodes = []cluster.Node{
&FakeNode{
id: "node-0-id",
name: "node-0-name",
addr: "node-0",
nodes = []*node.Node{
{
ID: "node-0-id",
Name: "node-0-name",
Addr: "node-0",
},
&FakeNode{
id: "node-1-id",
name: "node-1-name",
addr: "node-1",
{
ID: "node-1-id",
Name: "node-1-name",
Addr: "node-1",
},
&FakeNode{
id: "node-2-id",
name: "node-2-name",
addr: "node-2",
{
ID: "node-2-id",
Name: "node-2-name",
Addr: "node-2",
},
}
result []cluster.Node
result []*node.Node
err error
)
// Add a container taking away port 80 to nodes[0].
container := &cluster.Container{Container: dockerclient.Container{Id: "c1"}, Info: dockerclient.ContainerInfo{HostConfig: &dockerclient.HostConfig{PortBindings: makeBinding("", "80")}}}
if n, ok := nodes[0].(*FakeNode); ok {
assert.NoError(t, n.AddContainer(container))
}
assert.NoError(t, nodes[0].AddContainer(container))
// Request port 80.
config := &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
@ -120,32 +117,30 @@ func TestPortFilterSimple(t *testing.T) {
func TestPortFilterDifferentInterfaces(t *testing.T) {
var (
p = PortFilter{}
nodes = []cluster.Node{
&FakeNode{
id: "node-0-id",
name: "node-0-name",
addr: "node-0",
nodes = []*node.Node{
{
ID: "node-0-id",
Name: "node-0-name",
Addr: "node-0",
},
&FakeNode{
id: "node-1-id",
name: "node-1-name",
addr: "node-1",
{
ID: "node-1-id",
Name: "node-1-name",
Addr: "node-1",
},
&FakeNode{
id: "node-2-id",
name: "node-2-name",
addr: "node-2",
{
ID: "node-2-id",
Name: "node-2-name",
Addr: "node-2",
},
}
result []cluster.Node
result []*node.Node
err error
)
// Add a container taking away port 80 on every interface to nodes[0].
container := &cluster.Container{Container: dockerclient.Container{Id: "c1"}, Info: dockerclient.ContainerInfo{HostConfig: &dockerclient.HostConfig{PortBindings: makeBinding("", "80")}}}
if n, ok := nodes[0].(*FakeNode); ok {
assert.NoError(t, n.AddContainer(container))
}
assert.NoError(t, nodes[0].AddContainer(container))
// Request port 80 for the local interface.
config := &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
@ -161,9 +156,7 @@ func TestPortFilterDifferentInterfaces(t *testing.T) {
// Add a container taking away port 4242 on the local interface of
// nodes[1].
container = &cluster.Container{Container: dockerclient.Container{Id: "c1"}, Info: dockerclient.ContainerInfo{HostConfig: &dockerclient.HostConfig{PortBindings: makeBinding("127.0.0.1", "4242")}}}
if n, ok := nodes[1].(*FakeNode); ok {
assert.NoError(t, n.AddContainer(container))
}
assert.NoError(t, nodes[1].AddContainer(container))
// Request port 4242 on the same interface.
config = &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("127.0.0.1", "4242"),
@ -206,24 +199,24 @@ func TestPortFilterDifferentInterfaces(t *testing.T) {
func TestPortFilterRandomAssignment(t *testing.T) {
var (
p = PortFilter{}
nodes = []cluster.Node{
&FakeNode{
id: "node-0-id",
name: "node-0-name",
addr: "node-0",
nodes = []*node.Node{
{
ID: "node-0-id",
Name: "node-0-name",
Addr: "node-0",
},
&FakeNode{
id: "node-1-id",
name: "node-1-name",
addr: "node-1",
{
ID: "node-1-id",
Name: "node-1-name",
Addr: "node-1",
},
&FakeNode{
id: "node-2-id",
name: "node-2-name",
addr: "node-2",
{
ID: "node-2-id",
Name: "node-2-name",
Addr: "node-2",
},
}
result []cluster.Node
result []*node.Node
err error
)
@ -261,9 +254,7 @@ func TestPortFilterRandomAssignment(t *testing.T) {
},
}
if n, ok := nodes[0].(*FakeNode); ok {
assert.NoError(t, n.AddContainer(container))
}
assert.NoError(t, nodes[0].AddContainer(container))
// Request port 80.
config := &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{

86
scheduler/node/node.go Normal file
View File

@ -0,0 +1,86 @@
package node
import (
"errors"
"strings"
"github.com/docker/swarm/cluster"
)
// Node is an abstract type used by the scheduler
type Node struct {
ID string
IP string
Addr string
Name string
Cpus int64
Memory int64
Labels map[string]string
Containers []*cluster.Container
Images []*cluster.Image
UsedMemory int64
UsedCpus int64
TotalMemory int64
TotalCpus int64
IsHealthy bool
}
// NewNode creates a node from an engine
func NewNode(e *cluster.Engine) *Node {
return &Node{
ID: e.ID,
IP: e.IP,
Addr: e.Addr,
Name: e.Name,
Cpus: e.Cpus,
Labels: e.Labels,
Containers: e.Containers(),
Images: e.Images(),
UsedMemory: e.UsedMemory(),
UsedCpus: e.UsedCpus(),
TotalMemory: e.TotalMemory(),
TotalCpus: e.TotalCpus(),
IsHealthy: e.IsHealthy(),
}
}
// Container returns the container with IDOrName in the engine.
func (n *Node) Container(IDOrName string) *cluster.Container {
// Abort immediately if the name is empty.
if len(IDOrName) == 0 {
return nil
}
for _, container := range n.Containers {
// Match ID prefix.
if strings.HasPrefix(container.Id, IDOrName) {
return container
}
// Match name, /name or engine/name.
for _, name := range container.Names {
if name == IDOrName || name == "/"+IDOrName || container.Engine.ID+name == IDOrName || container.Engine.Name+name == IDOrName {
return container
}
}
}
return nil
}
// AddContainer inject a container into the internal state.
func (n *Node) AddContainer(container *cluster.Container) error {
if container.Info.Config != nil {
memory := container.Info.Config.Memory
cpus := container.Info.Config.CpuShares
if n.TotalMemory-memory < 0 || n.TotalCpus-cpus < 0 {
return errors.New("not enough resources")
}
n.UsedMemory = n.UsedMemory + memory
n.UsedCpus = n.UsedCpus + cpus
}
n.Containers = append(n.Containers, container)
return nil
}

View File

@ -4,8 +4,8 @@ import (
"strings"
"sync"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/filter"
"github.com/docker/swarm/scheduler/node"
"github.com/docker/swarm/scheduler/strategy"
"github.com/samalba/dockerclient"
)
@ -27,7 +27,7 @@ func New(strategy strategy.PlacementStrategy, filters []filter.Filter) *Schedule
}
// SelectNodeForContainer will find a nice home for our container.
func (s *Scheduler) SelectNodeForContainer(nodes []cluster.Node, config *dockerclient.ContainerConfig) (cluster.Node, error) {
func (s *Scheduler) SelectNodeForContainer(nodes []*node.Node, config *dockerclient.ContainerConfig) (*node.Node, error) {
accepted, err := filter.ApplyFilters(s.filters, config, nodes)
if err != nil {
return nil, err

View File

@ -3,7 +3,7 @@ package strategy
import (
"sort"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/node"
"github.com/samalba/dockerclient"
)
@ -22,7 +22,7 @@ func (p *BinpackPlacementStrategy) Name() string {
}
// PlaceContainer is exported
func (p *BinpackPlacementStrategy) PlaceContainer(config *dockerclient.ContainerConfig, nodes []cluster.Node) (cluster.Node, error) {
func (p *BinpackPlacementStrategy) PlaceContainer(config *dockerclient.ContainerConfig, nodes []*node.Node) (*node.Node, error) {
weightedNodes, err := weighNodes(config, nodes)
if err != nil {
return nil, err
@ -36,7 +36,7 @@ func (p *BinpackPlacementStrategy) PlaceContainer(config *dockerclient.Container
if node.Weight != topNode.Weight {
break
}
if len(node.Node.Containers()) > len(topNode.Node.Containers()) {
if len(node.Node.Containers) > len(topNode.Node.Containers) {
topNode = node
}
}

View File

@ -5,18 +5,20 @@ import (
"testing"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/node"
"github.com/samalba/dockerclient"
"github.com/stretchr/testify/assert"
)
func createNode(ID string, memory int64, cpus int64) cluster.Node {
func createNode(ID string, memory int64, cpus int64) *node.Node {
oc := 0.05
memory = int64(float64(memory) + float64(memory)*oc)
return &FakeNode{
id: ID,
addr: ID,
memory: memory * 1024 * 1024 * 1024,
cpus: cpus,
return &node.Node{
ID: ID,
IP: ID,
Addr: ID,
TotalMemory: memory * 1024 * 1024 * 1024,
TotalCpus: cpus,
}
}
@ -31,39 +33,39 @@ func createContainer(ID string, config *dockerclient.ContainerConfig) *cluster.C
func TestPlaceEqualWeight(t *testing.T) {
s := &BinpackPlacementStrategy{}
nodes := []cluster.Node{}
nodes := []*node.Node{}
for i := 0; i < 2; i++ {
nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 4, 0))
}
// add 1 container 2G on node1
config := createConfig(2, 0)
assert.NoError(t, AddContainer(nodes[0], createContainer("c1", config)))
assert.Equal(t, nodes[0].UsedMemory(), 2*1024*1024*1024)
assert.NoError(t, nodes[0].AddContainer(createContainer("c1", config)))
assert.Equal(t, nodes[0].UsedMemory, 2*1024*1024*1024)
// add 2 containers 1G on node2
config = createConfig(1, 0)
assert.NoError(t, AddContainer(nodes[1], createContainer("c2", config)))
assert.NoError(t, AddContainer(nodes[1], createContainer("c3", config)))
assert.Equal(t, nodes[1].UsedMemory(), int64(2*1024*1024*1024))
assert.NoError(t, nodes[1].AddContainer(createContainer("c2", config)))
assert.NoError(t, nodes[1].AddContainer(createContainer("c3", config)))
assert.Equal(t, nodes[1].UsedMemory, int64(2*1024*1024*1024))
// add another container 1G
config = createConfig(1, 0)
node, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node, createContainer("c4", config)))
assert.Equal(t, node.UsedMemory(), 3*1024*1024*1024)
assert.NoError(t, node.AddContainer(createContainer("c4", config)))
assert.Equal(t, node.UsedMemory, 3*1024*1024*1024)
// check that the last container ended on the node with the highest number of containers
assert.Equal(t, node.ID(), nodes[1].ID())
assert.NotEqual(t, len(nodes[0].Containers()), len(nodes[1].Containers()))
assert.Equal(t, node.ID, nodes[1].ID)
assert.NotEqual(t, len(nodes[0].Containers), len(nodes[1].Containers))
}
func TestPlaceContainerMemory(t *testing.T) {
s := &BinpackPlacementStrategy{}
nodes := []cluster.Node{}
nodes := []*node.Node{}
for i := 0; i < 2; i++ {
nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 2, 0))
}
@ -72,25 +74,25 @@ func TestPlaceContainerMemory(t *testing.T) {
config := createConfig(1, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node1, createContainer("c1", config)))
assert.Equal(t, node1.UsedMemory(), 1024*1024*1024)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
assert.Equal(t, node1.UsedMemory, 1024*1024*1024)
// add another container 1G
config = createConfig(1, 0)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node2, createContainer("c2", config)))
assert.Equal(t, node2.UsedMemory(), int64(2*1024*1024*1024))
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
assert.Equal(t, node2.UsedMemory, int64(2*1024*1024*1024))
// check that both containers ended on the same node
assert.Equal(t, node1.ID(), node2.ID())
assert.Equal(t, len(node1.Containers()), len(node2.Containers()))
assert.Equal(t, node1.ID, node2.ID)
assert.Equal(t, len(node1.Containers), len(node2.Containers))
}
func TestPlaceContainerCPU(t *testing.T) {
s := &BinpackPlacementStrategy{}
nodes := []cluster.Node{}
nodes := []*node.Node{}
for i := 0; i < 2; i++ {
nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 0, 2))
}
@ -99,25 +101,25 @@ func TestPlaceContainerCPU(t *testing.T) {
config := createConfig(0, 1)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node1, createContainer("c1", config)))
assert.Equal(t, node1.UsedCpus(), 1)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
assert.Equal(t, node1.UsedCpus, 1)
// add another container 1CPU
config = createConfig(0, 1)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node2, createContainer("c2", config)))
assert.Equal(t, node2.UsedCpus(), 2)
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
assert.Equal(t, node2.UsedCpus, 2)
// check that both containers ended on the same node
assert.Equal(t, node1.ID(), node2.ID())
assert.Equal(t, len(node1.Containers()), len(node2.Containers()))
assert.Equal(t, node1.ID, node2.ID)
assert.Equal(t, len(node1.Containers), len(node2.Containers))
}
func TestPlaceContainerHuge(t *testing.T) {
s := &BinpackPlacementStrategy{}
nodes := []cluster.Node{}
nodes := []*node.Node{}
for i := 0; i < 100; i++ {
nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 1, 1))
}
@ -126,7 +128,7 @@ func TestPlaceContainerHuge(t *testing.T) {
for i := 0; i < 100; i++ {
node, err := s.PlaceContainer(createConfig(0, 1), nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node, createContainer(fmt.Sprintf("c%d", i), createConfig(0, 1))))
assert.NoError(t, node.AddContainer(createContainer(fmt.Sprintf("c%d", i), createConfig(0, 1))))
}
// try to add another container 1CPU
@ -137,7 +139,7 @@ func TestPlaceContainerHuge(t *testing.T) {
for i := 100; i < 200; i++ {
node, err := s.PlaceContainer(createConfig(1, 0), nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node, createContainer(fmt.Sprintf("c%d", i), createConfig(1, 0))))
assert.NoError(t, node.AddContainer(createContainer(fmt.Sprintf("c%d", i), createConfig(1, 0))))
}
// try to add another container 1G
@ -149,7 +151,7 @@ func TestPlaceContainerOvercommit(t *testing.T) {
s, err := New("binpacking")
assert.NoError(t, err)
nodes := []cluster.Node{createNode("node-1", 100, 1)}
nodes := []*node.Node{createNode("node-1", 100, 1)}
config := createConfig(0, 0)
@ -181,7 +183,7 @@ func TestPlaceContainerOvercommit(t *testing.T) {
func TestPlaceContainerDemo(t *testing.T) {
s := &BinpackPlacementStrategy{}
nodes := []cluster.Node{}
nodes := []*node.Node{}
for i := 0; i < 3; i++ {
nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 2, 4))
}
@ -197,44 +199,44 @@ func TestPlaceContainerDemo(t *testing.T) {
config = createConfig(1, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node1, createContainer("c1", config)))
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
// add another container 1G
config = createConfig(1, 0)
node1bis, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node1bis, createContainer("c2", config)))
assert.NoError(t, node1bis.AddContainer(createContainer("c2", config)))
// check that both containers ended on the same node
assert.Equal(t, node1.ID(), node1bis.ID())
assert.Equal(t, len(node1.Containers()), len(node1bis.Containers()))
assert.Equal(t, node1.ID, node1bis.ID)
assert.Equal(t, len(node1.Containers), len(node1bis.Containers))
// add another container 2G
config = createConfig(2, 0)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node2, createContainer("c3", config)))
assert.NoError(t, node2.AddContainer(createContainer("c3", config)))
// check that it ends up on another node
assert.NotEqual(t, node1.ID(), node2.ID())
assert.NotEqual(t, node1.ID, node2.ID)
// add another container 1G
config = createConfig(1, 0)
node3, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node3, createContainer("c4", config)))
assert.NoError(t, node3.AddContainer(createContainer("c4", config)))
// check that it ends up on another node
assert.NotEqual(t, node1.ID(), node3.ID())
assert.NotEqual(t, node2.ID(), node3.ID())
assert.NotEqual(t, node1.ID, node3.ID)
assert.NotEqual(t, node2.ID, node3.ID)
// add another container 1G
config = createConfig(1, 0)
node3bis, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node3bis, createContainer("c5", config)))
assert.NoError(t, node3bis.AddContainer(createContainer("c5", config)))
// check that it ends up on the same node
assert.Equal(t, node3.ID(), node3bis.ID())
assert.Equal(t, node3.ID, node3bis.ID)
// try to add another container
config = createConfig(1, 0)
@ -244,27 +246,25 @@ func TestPlaceContainerDemo(t *testing.T) {
assert.Error(t, err)
// remove container in the middle
if n, ok := node2.(*FakeNode); ok {
n.containers = nil
n.usedmemory = 0
n.usedcpus = 0
}
node2.Containers = nil
node2.UsedMemory = 0
node2.UsedCpus = 0
// add another container
config = createConfig(1, 0)
node2bis, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node2bis, createContainer("c6", config)))
assert.NoError(t, node2bis.AddContainer(createContainer("c6", config)))
// check it ends up on `node3`
assert.Equal(t, node2.ID(), node2bis.ID())
assert.Equal(t, len(node2.Containers()), len(node2bis.Containers()))
assert.Equal(t, node2.ID, node2bis.ID)
assert.Equal(t, len(node2.Containers), len(node2bis.Containers))
}
func TestComplexPlacement(t *testing.T) {
s := &BinpackPlacementStrategy{}
nodes := []cluster.Node{}
nodes := []*node.Node{}
for i := 0; i < 2; i++ {
nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 4, 4))
}
@ -273,23 +273,23 @@ func TestComplexPlacement(t *testing.T) {
config := createConfig(2, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node1, createContainer("c1", config)))
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
// add one container 3G
config = createConfig(3, 0)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node2, createContainer("c2", config)))
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
// check that they end up on separate nodes
assert.NotEqual(t, node1.ID(), node2.ID())
assert.NotEqual(t, node1.ID, node2.ID)
// add one container 1G
config = createConfig(1, 0)
node3, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node3, createContainer("c3", config)))
assert.NoError(t, node3.AddContainer(createContainer("c3", config)))
// check that it ends up on the same node as the 3G
assert.Equal(t, node2.ID(), node3.ID())
assert.Equal(t, node2.ID, node3.ID)
}

View File

@ -1,53 +0,0 @@
package strategy
import (
"errors"
"github.com/docker/swarm/cluster"
)
type FakeNode struct {
id string
name string
addr string
memory int64
usedmemory int64
cpus int64
usedcpus int64
containers []*cluster.Container
}
func (fn *FakeNode) ID() string { return fn.id }
func (fn *FakeNode) Name() string { return fn.name }
func (fn *FakeNode) IP() string { return "" }
func (fn *FakeNode) Addr() string { return fn.addr }
func (fn *FakeNode) Images() []*cluster.Image { return nil }
func (fn *FakeNode) Image(_ string) *cluster.Image { return nil }
func (fn *FakeNode) Containers() []*cluster.Container { return fn.containers }
func (fn *FakeNode) Container(_ string) *cluster.Container { return nil }
func (fn *FakeNode) TotalCpus() int64 { return fn.cpus }
func (fn *FakeNode) UsedCpus() int64 { return fn.usedcpus }
func (fn *FakeNode) TotalMemory() int64 { return fn.memory }
func (fn *FakeNode) UsedMemory() int64 { return fn.usedmemory }
func (fn *FakeNode) Labels() map[string]string { return nil }
func (fn *FakeNode) IsHealthy() bool { return true }
func (fn *FakeNode) AddContainer(container *cluster.Container) error {
memory := container.Info.Config.Memory
cpus := container.Info.Config.CpuShares
if fn.memory-memory < 0 || fn.cpus-cpus < 0 {
return errors.New("not enough resources")
}
fn.usedmemory = fn.usedmemory + memory
fn.usedcpus = fn.usedcpus + cpus
fn.containers = append(fn.containers, container)
return nil
}
func AddContainer(node cluster.Node, container *cluster.Container) error {
if n, ok := node.(*FakeNode); ok {
return n.AddContainer(container)
}
return errors.New("Not a FakeNode")
}

View File

@ -5,7 +5,7 @@ import (
"math/rand"
"time"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/node"
"github.com/samalba/dockerclient"
)
@ -24,7 +24,7 @@ func (p *RandomPlacementStrategy) Name() string {
}
// PlaceContainer is exported
func (p *RandomPlacementStrategy) PlaceContainer(config *dockerclient.ContainerConfig, nodes []cluster.Node) (cluster.Node, error) {
func (p *RandomPlacementStrategy) PlaceContainer(config *dockerclient.ContainerConfig, nodes []*node.Node) (*node.Node, error) {
if size := len(nodes); size > 0 {
return nodes[rand.Intn(size)], nil
}

View File

@ -3,7 +3,7 @@ package strategy
import (
"sort"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/node"
"github.com/samalba/dockerclient"
)
@ -22,7 +22,7 @@ func (p *SpreadPlacementStrategy) Name() string {
}
// PlaceContainer is exported
func (p *SpreadPlacementStrategy) PlaceContainer(config *dockerclient.ContainerConfig, nodes []cluster.Node) (cluster.Node, error) {
func (p *SpreadPlacementStrategy) PlaceContainer(config *dockerclient.ContainerConfig, nodes []*node.Node) (*node.Node, error) {
weightedNodes, err := weighNodes(config, nodes)
if err != nil {
return nil, err
@ -36,7 +36,7 @@ func (p *SpreadPlacementStrategy) PlaceContainer(config *dockerclient.ContainerC
if node.Weight != bottomNode.Weight {
break
}
if len(node.Node.Containers()) < len(bottomNode.Node.Containers()) {
if len(node.Node.Containers) < len(bottomNode.Node.Containers) {
bottomNode = node
}
}

View File

@ -4,46 +4,46 @@ import (
"fmt"
"testing"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/node"
"github.com/stretchr/testify/assert"
)
func TestSpreadPlaceEqualWeight(t *testing.T) {
s := &SpreadPlacementStrategy{}
nodes := []cluster.Node{}
nodes := []*node.Node{}
for i := 0; i < 2; i++ {
nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 4, 0))
}
// add 1 container 2G on node1
config := createConfig(2, 0)
assert.NoError(t, AddContainer(nodes[0], createContainer("c1", config)))
assert.Equal(t, nodes[0].UsedMemory(), 2*1024*1024*1024)
assert.NoError(t, nodes[0].AddContainer(createContainer("c1", config)))
assert.Equal(t, nodes[0].UsedMemory, 2*1024*1024*1024)
// add 2 containers 1G on node2
config = createConfig(1, 0)
assert.NoError(t, AddContainer(nodes[1], createContainer("c2", config)))
assert.NoError(t, AddContainer(nodes[1], createContainer("c3", config)))
assert.Equal(t, nodes[1].UsedMemory(), int64(2*1024*1024*1024))
assert.NoError(t, nodes[1].AddContainer(createContainer("c2", config)))
assert.NoError(t, nodes[1].AddContainer(createContainer("c3", config)))
assert.Equal(t, nodes[1].UsedMemory, int64(2*1024*1024*1024))
// add another container 1G
config = createConfig(1, 0)
node, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node, createContainer("c4", config)))
assert.Equal(t, node.UsedMemory(), 3*1024*1024*1024)
assert.NoError(t, node.AddContainer(createContainer("c4", config)))
assert.Equal(t, node.UsedMemory, 3*1024*1024*1024)
// check that the last container ended on the node with the lowest number of containers
assert.Equal(t, node.ID(), nodes[0].ID())
assert.Equal(t, len(nodes[0].Containers()), len(nodes[1].Containers()))
assert.Equal(t, node.ID, nodes[0].ID)
assert.Equal(t, len(nodes[0].Containers), len(nodes[1].Containers))
}
func TestSpreadPlaceContainerMemory(t *testing.T) {
s := &SpreadPlacementStrategy{}
nodes := []cluster.Node{}
nodes := []*node.Node{}
for i := 0; i < 2; i++ {
nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 2, 0))
}
@ -52,25 +52,25 @@ func TestSpreadPlaceContainerMemory(t *testing.T) {
config := createConfig(1, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node1, createContainer("c1", config)))
assert.Equal(t, node1.UsedMemory(), 1024*1024*1024)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
assert.Equal(t, node1.UsedMemory, 1024*1024*1024)
// add another container 1G
config = createConfig(1, 0)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node2, createContainer("c2", config)))
assert.Equal(t, node2.UsedMemory(), int64(1024*1024*1024))
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
assert.Equal(t, node2.UsedMemory, int64(1024*1024*1024))
// check that both containers ended on different node
assert.NotEqual(t, node1.ID(), node2.ID())
assert.Equal(t, len(node1.Containers()), len(node2.Containers()), "")
assert.NotEqual(t, node1.ID, node2.ID)
assert.Equal(t, len(node1.Containers), len(node2.Containers), "")
}
func TestSpreadPlaceContainerCPU(t *testing.T) {
s := &SpreadPlacementStrategy{}
nodes := []cluster.Node{}
nodes := []*node.Node{}
for i := 0; i < 2; i++ {
nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 0, 2))
}
@ -79,25 +79,25 @@ func TestSpreadPlaceContainerCPU(t *testing.T) {
config := createConfig(0, 1)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node1, createContainer("c1", config)))
assert.Equal(t, node1.UsedCpus(), 1)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
assert.Equal(t, node1.UsedCpus, 1)
// add another container 1CPU
config = createConfig(0, 1)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node2, createContainer("c2", config)))
assert.Equal(t, node2.UsedCpus(), 1)
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
assert.Equal(t, node2.UsedCpus, 1)
// check that both containers ended on different node
assert.NotEqual(t, node1.ID(), node2.ID())
assert.Equal(t, len(node1.Containers()), len(node2.Containers()), "")
assert.NotEqual(t, node1.ID, node2.ID)
assert.Equal(t, len(node1.Containers), len(node2.Containers), "")
}
func TestSpreadPlaceContainerHuge(t *testing.T) {
s := &SpreadPlacementStrategy{}
nodes := []cluster.Node{}
nodes := []*node.Node{}
for i := 0; i < 100; i++ {
nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 1, 1))
}
@ -106,7 +106,7 @@ func TestSpreadPlaceContainerHuge(t *testing.T) {
for i := 0; i < 100; i++ {
node, err := s.PlaceContainer(createConfig(0, 1), nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node, createContainer(fmt.Sprintf("c%d", i), createConfig(0, 1))))
assert.NoError(t, node.AddContainer(createContainer(fmt.Sprintf("c%d", i), createConfig(0, 1))))
}
// try to add another container 1CPU
@ -117,7 +117,7 @@ func TestSpreadPlaceContainerHuge(t *testing.T) {
for i := 100; i < 200; i++ {
node, err := s.PlaceContainer(createConfig(1, 0), nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node, createContainer(fmt.Sprintf("c%d", i), createConfig(1, 0))))
assert.NoError(t, node.AddContainer(createContainer(fmt.Sprintf("c%d", i), createConfig(1, 0))))
}
// try to add another container 1G
@ -128,7 +128,7 @@ func TestSpreadPlaceContainerHuge(t *testing.T) {
func TestSpreadPlaceContainerOvercommit(t *testing.T) {
s := &SpreadPlacementStrategy{}
nodes := []cluster.Node{createNode("node-1", 100, 1)}
nodes := []*node.Node{createNode("node-1", 100, 1)}
config := createConfig(0, 0)
@ -159,7 +159,7 @@ func TestSpreadPlaceContainerOvercommit(t *testing.T) {
func TestSpreadComplexPlacement(t *testing.T) {
s := &SpreadPlacementStrategy{}
nodes := []cluster.Node{}
nodes := []*node.Node{}
for i := 0; i < 2; i++ {
nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 4, 4))
}
@ -168,23 +168,23 @@ func TestSpreadComplexPlacement(t *testing.T) {
config := createConfig(2, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node1, createContainer("c1", config)))
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
// add one container 3G
config = createConfig(3, 0)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node2, createContainer("c2", config)))
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
// check that they end up on separate nodes
assert.NotEqual(t, node1.ID(), node2.ID())
assert.NotEqual(t, node1.ID, node2.ID)
// add one container 1G
config = createConfig(1, 0)
node3, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, AddContainer(node3, createContainer("c3", config)))
assert.NoError(t, node3.AddContainer(createContainer("c3", config)))
// check that it ends up on the same node as the 2G
assert.Equal(t, node1.ID(), node3.ID())
assert.Equal(t, node1.ID, node3.ID)
}

View File

@ -4,7 +4,7 @@ import (
"errors"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/node"
"github.com/samalba/dockerclient"
)
@ -15,7 +15,7 @@ type PlacementStrategy interface {
Initialize() error
// Given a container configuration and a set of nodes, select the target
// node where the container should be scheduled.
PlaceContainer(config *dockerclient.ContainerConfig, nodes []cluster.Node) (cluster.Node, error)
PlaceContainer(config *dockerclient.ContainerConfig, nodes []*node.Node) (*node.Node, error)
}
var (

View File

@ -1,14 +1,14 @@
package strategy
import (
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/node"
"github.com/samalba/dockerclient"
)
// WeightedNode represents a node in the cluster with a given weight, typically used for sorting
// purposes.
type weightedNode struct {
Node cluster.Node
Node *node.Node
// Weight is the inherent value of this node.
Weight int64
}
@ -32,12 +32,12 @@ func (n weightedNodeList) Less(i, j int) bool {
return ip.Weight < jp.Weight
}
func weighNodes(config *dockerclient.ContainerConfig, nodes []cluster.Node) (weightedNodeList, error) {
func weighNodes(config *dockerclient.ContainerConfig, nodes []*node.Node) (weightedNodeList, error) {
weightedNodes := weightedNodeList{}
for _, node := range nodes {
nodeMemory := node.TotalMemory()
nodeCpus := node.TotalCpus()
nodeMemory := node.TotalMemory
nodeCpus := node.TotalCpus
// Skip nodes that are smaller than the requested resources.
if nodeMemory < int64(config.Memory) || nodeCpus < config.CpuShares {
@ -50,10 +50,10 @@ func weighNodes(config *dockerclient.ContainerConfig, nodes []cluster.Node) (wei
)
if config.CpuShares > 0 {
cpuScore = (node.UsedCpus() + config.CpuShares) * 100 / nodeCpus
cpuScore = (node.UsedCpus + config.CpuShares) * 100 / nodeCpus
}
if config.Memory > 0 {
memoryScore = (node.UsedMemory() + config.Memory) * 100 / nodeMemory
memoryScore = (node.UsedMemory + config.Memory) * 100 / nodeMemory
}
if cpuScore <= 100 && memoryScore <= 100 {