Merge pull request #393 from vieux/mesos_poc

Proposal: Scheduler/Cluster Driver
This commit is contained in:
Andrea Luzzardi 2015-02-27 15:17:54 -08:00
commit db97473b40
36 changed files with 1513 additions and 1418 deletions

View File

@ -14,9 +14,8 @@ import (
log "github.com/Sirupsen/logrus"
dockerfilters "github.com/docker/docker/pkg/parsers/filters"
"github.com/docker/docker/pkg/units"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler"
"github.com/docker/swarm/cluster/swarm"
"github.com/docker/swarm/scheduler/filter"
"github.com/docker/swarm/version"
"github.com/gorilla/mux"
@ -26,8 +25,7 @@ import (
const APIVERSION = "1.16"
type context struct {
cluster *cluster.Cluster
scheduler *scheduler.Scheduler
cluster cluster.Cluster
eventsHandler *eventsHandler
debug bool
tlsConfig *tls.Config
@ -37,15 +35,6 @@ type handler func(c *context, w http.ResponseWriter, r *http.Request)
// GET /info
func getInfo(c *context, w http.ResponseWriter, r *http.Request) {
nodes := c.cluster.Nodes()
driverStatus := [][2]string{{"\bNodes", fmt.Sprintf("%d", len(nodes))}}
for _, node := range nodes {
driverStatus = append(driverStatus, [2]string{node.Name, node.Addr})
driverStatus = append(driverStatus, [2]string{" └ Containers", fmt.Sprintf("%d", len(node.Containers()))})
driverStatus = append(driverStatus, [2]string{" └ Reserved CPUs", fmt.Sprintf("%d / %d", node.ReservedCpus(), node.Cpus)})
driverStatus = append(driverStatus, [2]string{" └ Reserved Memory", fmt.Sprintf("%s / %s", units.BytesSize(float64(node.ReservedMemory())), units.BytesSize(float64(node.Memory)))})
}
info := struct {
Containers int
DriverStatus [][2]string
@ -53,7 +42,7 @@ func getInfo(c *context, w http.ResponseWriter, r *http.Request) {
Debug bool
}{
len(c.cluster.Containers()),
driverStatus,
c.cluster.Info(),
c.eventsHandler.Size(),
c.debug,
}
@ -98,13 +87,13 @@ func getImagesJSON(c *context, w http.ResponseWriter, r *http.Request) {
}
accepteds, _ := filters["node"]
images := []*dockerclient.Image{}
images := []*cluster.Image{}
for _, node := range c.cluster.Nodes() {
for _, image := range c.cluster.Images() {
if len(accepteds) != 0 {
found := false
for _, accepted := range accepteds {
if accepted == node.Name || accepted == node.ID {
if accepted == image.Node.Name() || accepted == image.Node.ID() {
found = true
break
}
@ -113,10 +102,7 @@ func getImagesJSON(c *context, w http.ResponseWriter, r *http.Request) {
continue
}
}
for _, image := range node.Images() {
images = append(images, image)
}
images = append(images, image)
}
w.Header().Set("Content-Type", "application/json")
@ -150,14 +136,14 @@ func getContainersJSON(c *context, w http.ResponseWriter, r *http.Request) {
// TODO remove the Node Name in the name when we have a good solution
tmp.Names = make([]string, len(container.Names))
for i, name := range container.Names {
tmp.Names[i] = "/" + container.Node.Name + name
tmp.Names[i] = "/" + container.Node.Name() + name
}
// insert node IP
tmp.Ports = make([]dockerclient.Port, len(container.Ports))
for i, port := range container.Ports {
tmp.Ports[i] = port
if port.IP == "0.0.0.0" {
tmp.Ports[i].IP = container.Node.IP
tmp.Ports[i].IP = container.Node.IP()
}
}
out = append(out, &tmp)
@ -179,7 +165,7 @@ func getContainerJSON(c *context, w http.ResponseWriter, r *http.Request) {
}
client, scheme := newClientAndScheme(c.tlsConfig)
resp, err := client.Get(scheme + "://" + container.Node.Addr + "/containers/" + container.Id + "/json")
resp, err := client.Get(scheme + "://" + container.Node.Addr() + "/containers/" + container.Id + "/json")
if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
@ -205,7 +191,7 @@ func getContainerJSON(c *context, w http.ResponseWriter, r *http.Request) {
data = bytes.Replace(data, []byte("\"Name\":\"/"), []byte(fmt.Sprintf("\"Node\":%s,\"Name\":\"/", n)), -1)
// insert node IP
data = bytes.Replace(data, []byte("\"HostIp\":\"0.0.0.0\""), []byte(fmt.Sprintf("\"HostIp\":%q", container.Node.IP)), -1)
data = bytes.Replace(data, []byte("\"HostIp\":\"0.0.0.0\""), []byte(fmt.Sprintf("\"HostIp\":%q", container.Node.IP())), -1)
w.Header().Set("Content-Type", "application/json")
w.Write(data)
@ -229,7 +215,7 @@ func postContainersCreate(c *context, w http.ResponseWriter, r *http.Request) {
return
}
container, err := c.scheduler.CreateContainer(&config, name)
container, err := c.cluster.CreateContainer(&config, name)
if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
@ -255,7 +241,7 @@ func deleteContainer(c *context, w http.ResponseWriter, r *http.Request) {
httpError(w, fmt.Sprintf("Container %s not found", name), http.StatusNotFound)
return
}
if err := c.scheduler.RemoveContainer(container, force); err != nil {
if err := c.cluster.RemoveContainer(container, force); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
@ -291,11 +277,13 @@ func proxyContainerAndForceRefresh(c *context, w http.ResponseWriter, r *http.Re
cb := func(resp *http.Response) {
if resp.StatusCode == http.StatusCreated {
log.Debugf("[REFRESH CONTAINER] --> %s", container.Id)
container.Node.RefreshContainer(container.Id, true)
if n, ok := container.Node.(*swarm.Node); ok {
n.RefreshContainer(container.Id, true)
}
}
}
if err := proxyAsync(c.tlsConfig, container.Node.Addr, w, r, cb); err != nil {
if err := proxyAsync(c.tlsConfig, container.Node.Addr(), w, r, cb); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
@ -309,7 +297,7 @@ func proxyContainer(c *context, w http.ResponseWriter, r *http.Request) {
return
}
if err := proxy(c.tlsConfig, container.Node.Addr, w, r); err != nil {
if err := proxy(c.tlsConfig, container.Node.Addr(), w, r); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
}
@ -318,18 +306,22 @@ func proxyContainer(c *context, w http.ResponseWriter, r *http.Request) {
func proxyImage(c *context, w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
for _, node := range c.cluster.Nodes() {
if node.Image(name) != nil {
proxy(c.tlsConfig, node.Addr, w, r)
return
}
if image := c.cluster.Image(name); image != nil {
proxy(c.tlsConfig, image.Node.Addr(), w, r)
return
}
httpError(w, fmt.Sprintf("No such image: %s", name), http.StatusNotFound)
}
// Proxy a request to a random node
func proxyRandom(c *context, w http.ResponseWriter, r *http.Request) {
candidates := c.cluster.Nodes()
candidates := []cluster.Node{}
// FIXME: doesn't work if there are no container in the cluster
// remove proxyRandom and implemente the features locally
for _, container := range c.cluster.Containers() {
candidates = append(candidates, container.Node)
}
healthFilter := &filter.HealthFilter{}
accepted, err := healthFilter.Filter(nil, candidates)
@ -339,7 +331,7 @@ func proxyRandom(c *context, w http.ResponseWriter, r *http.Request) {
return
}
if err := proxy(c.tlsConfig, accepted[rand.Intn(len(accepted))].Addr, w, r); err != nil {
if err := proxy(c.tlsConfig, accepted[rand.Intn(len(accepted))].Addr(), w, r); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
}
@ -352,7 +344,7 @@ func proxyHijack(c *context, w http.ResponseWriter, r *http.Request) {
return
}
if err := hijack(c.tlsConfig, container.Node.Addr, w, r); err != nil {
if err := hijack(c.tlsConfig, container.Node.Addr(), w, r); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
}

View File

@ -2,20 +2,17 @@ package api
import (
"encoding/json"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/version"
"github.com/stretchr/testify/assert"
"net/http"
"net/http/httptest"
"testing"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler"
"github.com/docker/swarm/version"
"github.com/stretchr/testify/assert"
)
func serveRequest(c *cluster.Cluster, s *scheduler.Scheduler, w http.ResponseWriter, req *http.Request) error {
func serveRequest(c cluster.Cluster, w http.ResponseWriter, req *http.Request) error {
context := &context{
cluster: c,
scheduler: s,
cluster: c,
}
r := createRouter(context, false)
@ -28,7 +25,7 @@ func TestGetVersion(t *testing.T) {
req, err := http.NewRequest("GET", "/version", nil)
assert.NoError(t, err)
assert.NoError(t, serveRequest(nil, nil, r, req))
assert.NoError(t, serveRequest(nil, r, req))
assert.Equal(t, r.Code, http.StatusOK)
v := struct {

View File

@ -39,12 +39,12 @@ func (eh *eventsHandler) Handle(e *cluster.Event) error {
str := fmt.Sprintf("{%q:%q,%q:%q,%q:%q,%q:%d,%q:%q,%q:%q,%q:%q,%q:%q}",
"status", e.Status,
"id", e.Id,
"from", e.From+" node:"+e.Node.Name,
"from", e.From+" node:"+e.Node.Name(),
"time", e.Time,
"node_name", e.Node.Name,
"node_id", e.Node.ID,
"node_addr", e.Node.Addr,
"node_ip", e.Node.IP)
"node_name", e.Node.Name(),
"node_id", e.Node.ID(),
"node_addr", e.Node.Addr(),
"node_ip", e.Node.IP())
for key, w := range eh.ws {
if _, err := fmt.Fprintf(w, str); err != nil {

View File

@ -17,6 +17,23 @@ func (fw *FakeWriter) Write(p []byte) (n int, err error) {
return len(p), nil
}
type FakeNode struct{}
func (fn *FakeNode) ID() string { return "node_id" }
func (fn *FakeNode) Name() string { return "node_name" }
func (fn *FakeNode) IP() string { return "node_ip" }
func (fn *FakeNode) Addr() string { return "node_addr" }
func (fn *FakeNode) Images() []*cluster.Image { return nil }
func (fn *FakeNode) Image(_ string) *cluster.Image { return nil }
func (fn *FakeNode) Containers() []*cluster.Container { return nil }
func (fn *FakeNode) Container(_ string) *cluster.Container { return nil }
func (fn *FakeNode) TotalCpus() int64 { return 0 }
func (fn *FakeNode) UsedCpus() int64 { return 0 }
func (fn *FakeNode) TotalMemory() int64 { return 0 }
func (fn *FakeNode) UsedMemory() int64 { return 0 }
func (fn *FakeNode) Labels() map[string]string { return nil }
func (fn *FakeNode) IsHealthy() bool { return true }
func TestHandle(t *testing.T) {
eh := NewEventsHandler()
assert.Equal(t, eh.Size(), 0)
@ -27,13 +44,9 @@ func TestHandle(t *testing.T) {
assert.Equal(t, eh.Size(), 1)
event := &cluster.Event{
Node: &cluster.Node{
Name: "node_name",
ID: "node_id",
Addr: "node_addr",
IP: "node_ip",
},
Node: &FakeNode{},
}
event.Event.Status = "status"
event.Event.Id = "id"
event.Event.From = "from"

View File

@ -9,7 +9,6 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler"
)
const DefaultDockerPort = ":2375"
@ -29,14 +28,12 @@ func newListener(proto, addr string, tlsConfig *tls.Config) (net.Listener, error
return l, nil
}
func ListenAndServe(c *cluster.Cluster, s *scheduler.Scheduler, hosts []string, enableCors bool, tlsConfig *tls.Config) error {
func ListenAndServe(c cluster.Cluster, hosts []string, enableCors bool, tlsConfig *tls.Config, eventsHandler *eventsHandler) error {
context := &context{
cluster: c,
scheduler: s,
eventsHandler: NewEventsHandler(),
eventsHandler: eventsHandler,
tlsConfig: tlsConfig,
}
c.Events(context.eventsHandler)
r := createRouter(context, enableCors)
chErrors := make(chan error, len(hosts))

View File

@ -1,177 +1,15 @@
package cluster
import (
"crypto/tls"
"errors"
"sync"
import "github.com/samalba/dockerclient"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/discovery"
"github.com/docker/swarm/state"
"github.com/samalba/dockerclient"
)
type Cluster interface {
CreateContainer(config *dockerclient.ContainerConfig, name string) (*Container, error)
RemoveContainer(container *Container, force bool) error
var (
ErrNodeNotConnected = errors.New("node is not connected to docker's REST API")
ErrNodeAlreadyRegistered = errors.New("node was already added to the cluster")
)
Images() []*Image
Image(IdOrName string) *Image
Containers() []*Container
Container(IdOrName string) *Container
type Cluster struct {
sync.RWMutex
store *state.Store
tlsConfig *tls.Config
eventHandlers []EventHandler
nodes map[string]*Node
overcommitRatio float64
}
func NewCluster(store *state.Store, tlsConfig *tls.Config, overcommitRatio float64) *Cluster {
return &Cluster{
tlsConfig: tlsConfig,
nodes: make(map[string]*Node),
store: store,
overcommitRatio: overcommitRatio,
}
}
// Deploy a container into a `specific` node on the cluster.
func (c *Cluster) DeployContainer(node *Node, config *dockerclient.ContainerConfig, name string) (*Container, error) {
container, err := node.Create(config, name, true)
if err != nil {
return nil, err
}
// Commit the requested state.
st := &state.RequestedState{
ID: container.Id,
Name: name,
Config: config,
}
if err := c.store.Add(container.Id, st); err != nil {
return nil, err
}
return container, nil
}
// Destroys a given `container` from the cluster.
func (c *Cluster) DestroyContainer(container *Container, force bool) error {
if err := container.Node.Destroy(container, force); err != nil {
return err
}
if err := c.store.Remove(container.Id); err != nil {
if err == state.ErrNotFound {
log.Debugf("Container %s not found in the store", container.Id)
return nil
}
return err
}
return nil
}
func (c *Cluster) Handle(e *Event) error {
for _, eventHandler := range c.eventHandlers {
if err := eventHandler.Handle(e); err != nil {
log.Error(err)
}
}
return nil
}
// Register a node within the cluster. The node must have been already
// initialized.
func (c *Cluster) AddNode(n *Node) error {
if !n.IsConnected() {
return ErrNodeNotConnected
}
c.Lock()
defer c.Unlock()
if old, exists := c.nodes[n.ID]; exists {
if old.IP != n.IP {
log.Errorf("ID duplicated. %s shared by %s and %s", n.ID, old.IP, n.IP)
}
return ErrNodeAlreadyRegistered
}
c.nodes[n.ID] = n
return n.Events(c)
}
func (c *Cluster) UpdateNodes(entries []*discovery.Entry) {
for _, entry := range entries {
go func(m *discovery.Entry) {
if c.Node(m.String()) == nil {
n := NewNode(m.String(), c.overcommitRatio)
if err := n.Connect(c.tlsConfig); err != nil {
log.Error(err)
return
}
if err := c.AddNode(n); err != nil {
log.Error(err)
return
}
}
}(entry)
}
}
// Containers returns all the containers in the cluster.
func (c *Cluster) Containers() []*Container {
c.RLock()
defer c.RUnlock()
out := []*Container{}
for _, n := range c.nodes {
containers := n.Containers()
for _, container := range containers {
out = append(out, container)
}
}
return out
}
// Container returns the container with IdOrName in the cluster
func (c *Cluster) Container(IdOrName string) *Container {
// Abort immediately if the name is empty.
if len(IdOrName) == 0 {
return nil
}
c.RLock()
defer c.RUnlock()
for _, n := range c.nodes {
if container := n.Container(IdOrName); container != nil {
return container
}
}
return nil
}
// Nodes returns the list of nodes in the cluster
func (c *Cluster) Nodes() []*Node {
nodes := []*Node{}
c.RLock()
for _, node := range c.nodes {
nodes = append(nodes, node)
}
c.RUnlock()
return nodes
}
func (c *Cluster) Node(addr string) *Node {
for _, node := range c.nodes {
if node.Addr == addr {
return node
}
}
return nil
}
func (c *Cluster) Events(h EventHandler) error {
c.eventHandlers = append(c.eventHandlers, h)
return nil
Info() [][2]string
}

View File

@ -1,104 +0,0 @@
package cluster
import (
"io/ioutil"
"testing"
"github.com/docker/swarm/state"
"github.com/samalba/dockerclient"
"github.com/samalba/dockerclient/mockclient"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func createNode(t *testing.T, ID string, containers ...dockerclient.Container) *Node {
node := NewNode(ID, 0)
node.Name = ID
assert.False(t, node.IsConnected())
client := mockclient.NewMockClient()
client.On("Info").Return(mockInfo, nil)
client.On("ListContainers", true, false, "").Return(containers, nil)
client.On("ListImages").Return([]*dockerclient.Image{}, nil)
client.On("InspectContainer", mock.Anything).Return(
&dockerclient.ContainerInfo{
Config: &dockerclient.ContainerConfig{CpuShares: 100},
}, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return()
assert.NoError(t, node.connectClient(client))
assert.True(t, node.IsConnected())
node.ID = ID
return node
}
func newCluster(t *testing.T) *Cluster {
dir, err := ioutil.TempDir("", "store-test")
assert.NoError(t, err)
return NewCluster(state.NewStore(dir), nil, 0)
}
func TestAddNode(t *testing.T) {
c := newCluster(t)
assert.Equal(t, len(c.Nodes()), 0)
assert.Nil(t, c.Node("test"))
assert.Nil(t, c.Node("test2"))
assert.NoError(t, c.AddNode(createNode(t, "test")))
assert.Equal(t, len(c.Nodes()), 1)
assert.NotNil(t, c.Node("test"))
assert.Error(t, c.AddNode(createNode(t, "test")))
assert.Equal(t, len(c.Nodes()), 1)
assert.NotNil(t, c.Node("test"))
assert.NoError(t, c.AddNode(createNode(t, "test2")))
assert.Equal(t, len(c.Nodes()), 2)
assert.NotNil(t, c.Node("test2"))
}
func TestContainerLookup(t *testing.T) {
c := newCluster(t)
container := dockerclient.Container{
Id: "container-id",
Names: []string{"/container-name1", "/container-name2"},
}
node := createNode(t, "test-node", container)
assert.NoError(t, c.AddNode(node))
// Invalid lookup
assert.Nil(t, c.Container("invalid-id"))
assert.Nil(t, c.Container(""))
// Container ID lookup.
assert.NotNil(t, c.Container("container-id"))
// Container ID prefix lookup.
assert.NotNil(t, c.Container("container-"))
// Container name lookup.
assert.NotNil(t, c.Container("container-name1"))
assert.NotNil(t, c.Container("container-name2"))
// Container node/name matching.
assert.NotNil(t, c.Container("test-node/container-name1"))
assert.NotNil(t, c.Container("test-node/container-name2"))
}
func TestDeployContainer(t *testing.T) {
// Create a test node.
node := createNode(t, "test")
// Create a test cluster.
c := newCluster(t)
assert.NoError(t, c.AddNode(node))
// Fake dockerclient calls to deploy a container.
client := node.client.(*mockclient.MockClient)
client.On("CreateContainer", mock.Anything, mock.Anything).Return("id", nil).Once()
client.On("ListContainers", true, false, mock.Anything).Return([]dockerclient.Container{{Id: "id"}}, nil).Once()
client.On("InspectContainer", "id").Return(&dockerclient.ContainerInfo{Config: &dockerclient.ContainerConfig{CpuShares: 100}}, nil).Once()
// Ensure the container gets deployed.
container, err := c.DeployContainer(node, &dockerclient.ContainerConfig{}, "name")
assert.NoError(t, err)
assert.Equal(t, container.Id, "id")
}

View File

@ -6,5 +6,5 @@ type Container struct {
dockerclient.Container
Info dockerclient.ContainerInfo
Node *Node
Node Node
}

View File

@ -4,7 +4,7 @@ import "github.com/samalba/dockerclient"
type Event struct {
dockerclient.Event
Node *Node
Node Node
}
type EventHandler interface {

9
cluster/image.go Normal file
View File

@ -0,0 +1,9 @@
package cluster
import "github.com/samalba/dockerclient"
type Image struct {
dockerclient.Image
Node Node
}

View File

@ -1,534 +1,23 @@
package cluster
import (
"crypto/tls"
"errors"
"fmt"
"net"
"strings"
"sync"
"time"
type Node interface {
ID() string
Name() string
log "github.com/Sirupsen/logrus"
"github.com/samalba/dockerclient"
)
IP() string //to inject the actual IP of the machine in docker ps (hostname:port or ip:port)
Addr() string //to know where to connect with the proxy
const (
// Force-refresh the state of the node this often.
stateRefreshPeriod = 30 * time.Second
Images() []*Image //used by the API
Image(IdOrName string) *Image //used by the filters
Containers() []*Container //used by the filters
Container(IdOrName string) *Container //used by the filters
// Timeout for requests sent out to the node.
requestTimeout = 10 * time.Second
)
TotalCpus() int64 //used by the strategy
UsedCpus() int64 //used by the strategy
TotalMemory() int64 //used by the strategy
UsedMemory() int64 //used by the strategy
func NewNode(addr string, overcommitRatio float64) *Node {
e := &Node{
Addr: addr,
Labels: make(map[string]string),
ch: make(chan bool),
containers: make(map[string]*Container),
healthy: true,
overcommitRatio: int64(overcommitRatio * 100),
}
return e
}
type Node struct {
sync.RWMutex
ID string
IP string
Addr string
Name string
Cpus int64
Memory int64
Labels map[string]string
ch chan bool
containers map[string]*Container
images []*dockerclient.Image
client dockerclient.Client
eventHandler EventHandler
healthy bool
overcommitRatio int64
}
// Connect will initialize a connection to the Docker daemon running on the
// host, gather machine specs (memory, cpu, ...) and monitor state changes.
func (n *Node) Connect(config *tls.Config) error {
host, _, err := net.SplitHostPort(n.Addr)
if err != nil {
return err
}
addr, err := net.ResolveIPAddr("ip4", host)
if err != nil {
return err
}
n.IP = addr.IP.String()
c, err := dockerclient.NewDockerClientTimeout("tcp://"+n.Addr, config, time.Duration(requestTimeout))
if err != nil {
return err
}
return n.connectClient(c)
}
func (n *Node) connectClient(client dockerclient.Client) error {
n.client = client
// Fetch the engine labels.
if err := n.updateSpecs(); err != nil {
n.client = nil
return err
}
// Force a state update before returning.
if err := n.RefreshContainers(true); err != nil {
n.client = nil
return err
}
if err := n.refreshImages(); err != nil {
n.client = nil
return err
}
// Start the update loop.
go n.refreshLoop()
// Start monitoring events from the Node.
n.client.StartMonitorEvents(n.handler, nil)
n.emitEvent("node_connect")
return nil
}
// IsConnected returns true if the engine is connected to a remote docker API
func (n *Node) IsConnected() bool {
return n.client != nil
}
func (n *Node) IsHealthy() bool {
return n.healthy
}
// Gather node specs (CPU, memory, constraints, ...).
func (n *Node) updateSpecs() error {
info, err := n.client.Info()
if err != nil {
return err
}
// Older versions of Docker don't expose the ID field and are not supported
// by Swarm. Catch the error ASAP and refuse to connect.
if len(info.ID) == 0 {
return fmt.Errorf("Node %s is running an unsupported version of Docker Engine. Please upgrade.", n.Addr)
}
n.ID = info.ID
n.Name = info.Name
n.Cpus = info.NCPU
n.Memory = info.MemTotal
n.Labels = map[string]string{
"storagedriver": info.Driver,
"executiondriver": info.ExecutionDriver,
"kernelversion": info.KernelVersion,
"operatingsystem": info.OperatingSystem,
}
for _, label := range info.Labels {
kv := strings.SplitN(label, "=", 2)
n.Labels[kv[0]] = kv[1]
}
return nil
}
// Refresh the list of images on the node.
func (n *Node) refreshImages() error {
images, err := n.client.ListImages()
if err != nil {
return err
}
n.Lock()
n.images = images
n.Unlock()
return nil
}
// Refresh the list and status of containers running on the node. If `full` is
// true, each container will be inspected.
func (n *Node) RefreshContainers(full bool) error {
containers, err := n.client.ListContainers(true, false, "")
if err != nil {
return err
}
merged := make(map[string]*Container)
for _, c := range containers {
merged, err = n.updateContainer(c, merged, full)
if err != nil {
log.WithFields(log.Fields{"name": n.Name, "id": n.ID}).Errorf("Unable to update state of container %q", c.Id)
}
}
n.Lock()
defer n.Unlock()
n.containers = merged
log.WithFields(log.Fields{"id": n.ID, "name": n.Name}).Debugf("Updated node state")
return nil
}
// Refresh the status of a container running on the node. If `full` is true,
// the container will be inspected.
func (n *Node) RefreshContainer(ID string, full bool) error {
containers, err := n.client.ListContainers(true, false, fmt.Sprintf("{%q:[%q]}", "id", ID))
if err != nil {
return err
}
if len(containers) > 1 {
// We expect one container, if we get more than one, trigger a full refresh.
return n.RefreshContainers(full)
}
if len(containers) == 0 {
// The container doesn't exist on the node, remove it.
n.Lock()
delete(n.containers, ID)
n.Unlock()
return nil
}
_, err = n.updateContainer(containers[0], n.containers, full)
return err
}
func (n *Node) updateContainer(c dockerclient.Container, containers map[string]*Container, full bool) (map[string]*Container, error) {
var container *Container
n.Lock()
if current, exists := n.containers[c.Id]; exists {
// The container is already known.
container = current
} else {
// This is a brand new container. We need to do a full refresh.
container = &Container{
Node: n,
}
full = true
}
// Update its internal state.
container.Container = c
containers[container.Id] = container
// Release the lock here as the next step is slow.
n.Unlock()
// Update ContainerInfo.
if full {
info, err := n.client.InspectContainer(c.Id)
if err != nil {
return nil, err
}
container.Info = *info
// real CpuShares -> nb of CPUs
container.Info.Config.CpuShares = container.Info.Config.CpuShares / 100.0 * n.Cpus
}
return containers, nil
}
func (n *Node) RefreshContainersAsync() {
n.ch <- true
}
func (n *Node) refreshLoop() {
for {
var err error
select {
case <-n.ch:
err = n.RefreshContainers(false)
case <-time.After(stateRefreshPeriod):
err = n.RefreshContainers(false)
}
if err == nil {
err = n.refreshImages()
}
if err != nil {
if n.healthy {
n.emitEvent("node_disconnect")
}
n.healthy = false
log.WithFields(log.Fields{"name": n.Name, "id": n.ID}).Errorf("Flagging node as dead. Updated state failed: %v", err)
} else {
if !n.healthy {
log.WithFields(log.Fields{"name": n.Name, "id": n.ID}).Info("Node came back to life. Hooray!")
n.client.StopAllMonitorEvents()
n.client.StartMonitorEvents(n.handler, nil)
n.emitEvent("node_reconnect")
if err := n.updateSpecs(); err != nil {
log.WithFields(log.Fields{"name": n.Name, "id": n.ID}).Errorf("Update node specs failed: %v", err)
}
}
n.healthy = true
}
}
}
func (n *Node) emitEvent(event string) {
// If there is no event handler registered, abort right now.
if n.eventHandler == nil {
return
}
ev := &Event{
Event: dockerclient.Event{
Status: event,
From: "swarm",
Time: time.Now().Unix(),
},
Node: n,
}
n.eventHandler.Handle(ev)
}
// Return the sum of memory reserved by containers.
func (n *Node) ReservedMemory() int64 {
var r int64 = 0
n.RLock()
for _, c := range n.containers {
r += c.Info.Config.Memory
}
n.RUnlock()
return r
}
// Return the sum of CPUs reserved by containers.
func (n *Node) ReservedCpus() int64 {
var r int64 = 0
n.RLock()
for _, c := range n.containers {
r += c.Info.Config.CpuShares
}
n.RUnlock()
return r
}
func (n *Node) UsableMemory() int64 {
return n.Memory + (n.Memory * n.overcommitRatio / 100)
}
func (n *Node) UsableCpus() int64 {
return n.Cpus + (n.Cpus * n.overcommitRatio / 100)
}
func (n *Node) Create(config *dockerclient.ContainerConfig, name string, pullImage bool) (*Container, error) {
var (
err error
id string
client = n.client
)
newConfig := *config
// nb of CPUs -> real CpuShares
newConfig.CpuShares = config.CpuShares * 100 / n.Cpus
if id, err = client.CreateContainer(&newConfig, name); err != nil {
// If the error is other than not found, abort immediately.
if err != dockerclient.ErrNotFound || !pullImage {
return nil, err
}
// Otherwise, try to pull the image...
if err = n.Pull(config.Image); err != nil {
return nil, err
}
// ...And try again.
if id, err = client.CreateContainer(&newConfig, name); err != nil {
return nil, err
}
}
// Register the container immediately while waiting for a state refresh.
// Force a state refresh to pick up the newly created container.
n.RefreshContainer(id, true)
n.RLock()
defer n.RUnlock()
return n.containers[id], nil
}
// Destroy and remove a container from the node.
func (n *Node) Destroy(container *Container, force bool) error {
if err := n.client.RemoveContainer(container.Id, force, true); err != nil {
return err
}
// Remove the container from the state. Eventually, the state refresh loop
// will rewrite this.
n.Lock()
defer n.Unlock()
delete(n.containers, container.Id)
return nil
}
func (n *Node) Pull(image string) error {
if err := n.client.PullImage(image, nil); err != nil {
return err
}
return nil
}
// Register an event handler.
func (n *Node) Events(h EventHandler) error {
if n.eventHandler != nil {
return errors.New("event handler already set")
}
n.eventHandler = h
return nil
}
// Containers returns all the containers in the node.
func (n *Node) Containers() []*Container {
containers := []*Container{}
n.RLock()
for _, container := range n.containers {
containers = append(containers, container)
}
n.RUnlock()
return containers
}
// Container returns the container with IdOrName in the node.
func (n *Node) Container(IdOrName string) *Container {
// Abort immediately if the name is empty.
if len(IdOrName) == 0 {
return nil
}
n.RLock()
defer n.RUnlock()
for _, container := range n.Containers() {
// Match ID prefix.
if strings.HasPrefix(container.Id, IdOrName) {
return container
}
// Match name, /name or engine/name.
for _, name := range container.Names {
if name == IdOrName || name == "/"+IdOrName || container.Node.ID+name == IdOrName || container.Node.Name+name == IdOrName {
return container
}
}
}
return nil
}
func (n *Node) Images() []*dockerclient.Image {
images := []*dockerclient.Image{}
n.RLock()
for _, image := range n.images {
images = append(images, image)
}
n.RUnlock()
return images
}
// Image returns the image with IdOrName in the node
func (n *Node) Image(IdOrName string) *dockerclient.Image {
n.RLock()
defer n.RUnlock()
size := len(IdOrName)
for _, image := range n.Images() {
if image.Id == IdOrName || (size > 2 && strings.HasPrefix(image.Id, IdOrName)) {
return image
}
for _, t := range image.RepoTags {
if t == IdOrName || (size > 2 && strings.HasPrefix(t, IdOrName)) {
return image
}
}
}
return nil
}
func (n *Node) String() string {
return fmt.Sprintf("node %s addr %s", n.ID, n.Addr)
}
func (n *Node) handler(ev *dockerclient.Event, _ chan error, args ...interface{}) {
// Something changed - refresh our internal state.
switch ev.Status {
case "pull", "untag", "delete":
// These events refer to images so there's no need to update
// containers.
n.refreshImages()
case "start", "die":
// If the container is started or stopped, we have to do an inspect in
// order to get the new NetworkSettings.
n.RefreshContainer(ev.Id, true)
default:
// Otherwise, do a "soft" refresh of the container.
n.RefreshContainer(ev.Id, false)
}
// If there is no event handler registered, abort right now.
if n.eventHandler == nil {
return
}
event := &Event{
Node: n,
Event: *ev,
}
n.eventHandler.Handle(event)
}
// Inject a container into the internal state.
func (n *Node) AddContainer(container *Container) error {
n.Lock()
defer n.Unlock()
if _, ok := n.containers[container.Id]; ok {
return errors.New("container already exists")
}
n.containers[container.Id] = container
return nil
}
// Inject an image into the internal state.
func (n *Node) AddImage(image *dockerclient.Image) {
n.Lock()
defer n.Unlock()
n.images = append(n.images, image)
}
// Remove a container from the internal test.
func (n *Node) RemoveContainer(container *Container) error {
n.Lock()
defer n.Unlock()
if _, ok := n.containers[container.Id]; !ok {
return errors.New("container not found")
}
delete(n.containers, container.Id)
return nil
}
// Wipes the internal container state.
func (n *Node) CleanupContainers() {
n.Lock()
n.containers = make(map[string]*Container)
n.Unlock()
Labels() map[string]string //used by the filters
IsHealthy() bool
}

10
cluster/options.go Normal file
View File

@ -0,0 +1,10 @@
package cluster
import "crypto/tls"
type Options struct {
TLSConfig *tls.Config
OvercommitRatio float64
Discovery string
Heartbeat int
}

558
cluster/swarm/node.go Normal file
View File

@ -0,0 +1,558 @@
package swarm
import (
"crypto/tls"
"errors"
"fmt"
"net"
"strings"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
"github.com/samalba/dockerclient"
)
const (
// Force-refresh the state of the node this often.
stateRefreshPeriod = 30 * time.Second
// Timeout for requests sent out to the node.
requestTimeout = 10 * time.Second
)
func NewNode(addr string, overcommitRatio float64) *Node {
e := &Node{
addr: addr,
labels: make(map[string]string),
ch: make(chan bool),
containers: make(map[string]*cluster.Container),
healthy: true,
overcommitRatio: int64(overcommitRatio * 100),
}
return e
}
type Node struct {
sync.RWMutex
id string
ip string
addr string
name string
Cpus int64
Memory int64
labels map[string]string
ch chan bool
containers map[string]*cluster.Container
images []*cluster.Image
client dockerclient.Client
eventHandler cluster.EventHandler
healthy bool
overcommitRatio int64
}
func (n *Node) ID() string {
return n.id
}
func (n *Node) IP() string {
return n.ip
}
func (n *Node) Addr() string {
return n.addr
}
func (n *Node) Name() string {
return n.name
}
func (n *Node) Labels() map[string]string {
return n.labels
}
// Connect will initialize a connection to the Docker daemon running on the
// host, gather machine specs (memory, cpu, ...) and monitor state changes.
func (n *Node) Connect(config *tls.Config) error {
host, _, err := net.SplitHostPort(n.addr)
if err != nil {
return err
}
addr, err := net.ResolveIPAddr("ip4", host)
if err != nil {
return err
}
n.ip = addr.IP.String()
c, err := dockerclient.NewDockerClientTimeout("tcp://"+n.addr, config, time.Duration(requestTimeout))
if err != nil {
return err
}
return n.connectClient(c)
}
func (n *Node) connectClient(client dockerclient.Client) error {
n.client = client
// Fetch the engine labels.
if err := n.updateSpecs(); err != nil {
n.client = nil
return err
}
// Force a state update before returning.
if err := n.RefreshContainers(true); err != nil {
n.client = nil
return err
}
if err := n.refreshImages(); err != nil {
n.client = nil
return err
}
// Start the update loop.
go n.refreshLoop()
// Start monitoring events from the Node.
n.client.StartMonitorEvents(n.handler, nil)
n.emitEvent("node_connect")
return nil
}
// IsConnected returns true if the engine is connected to a remote docker API
func (n *Node) IsConnected() bool {
return n.client != nil
}
func (n *Node) IsHealthy() bool {
return n.healthy
}
// Gather node specs (CPU, memory, constraints, ...).
func (n *Node) updateSpecs() error {
info, err := n.client.Info()
if err != nil {
return err
}
// Older versions of Docker don't expose the ID field and are not supported
// by Swarm. Catch the error ASAP and refuse to connect.
if len(info.ID) == 0 {
return fmt.Errorf("Node %s is running an unsupported version of Docker Engine. Please upgrade.", n.addr)
}
n.id = info.ID
n.name = info.Name
n.Cpus = info.NCPU
n.Memory = info.MemTotal
n.labels = map[string]string{
"storagedriver": info.Driver,
"executiondriver": info.ExecutionDriver,
"kernelversion": info.KernelVersion,
"operatingsystem": info.OperatingSystem,
}
for _, label := range info.Labels {
kv := strings.SplitN(label, "=", 2)
n.labels[kv[0]] = kv[1]
}
return nil
}
// Refresh the list of images on the node.
func (n *Node) refreshImages() error {
images, err := n.client.ListImages()
if err != nil {
return err
}
n.Lock()
n.images = nil
for _, image := range images {
n.images = append(n.images, &cluster.Image{Image: *image, Node: n})
}
n.Unlock()
return nil
}
// Refresh the list and status of containers running on the node. If `full` is
// true, each container will be inspected.
func (n *Node) RefreshContainers(full bool) error {
containers, err := n.client.ListContainers(true, false, "")
if err != nil {
return err
}
merged := make(map[string]*cluster.Container)
for _, c := range containers {
merged, err = n.updateContainer(c, merged, full)
if err != nil {
log.WithFields(log.Fields{"name": n.name, "id": n.id}).Errorf("Unable to update state of container %q", c.Id)
}
}
n.Lock()
defer n.Unlock()
n.containers = merged
log.WithFields(log.Fields{"id": n.id, "name": n.name}).Debugf("Updated node state")
return nil
}
// Refresh the status of a container running on the node. If `full` is true,
// the container will be inspected.
func (n *Node) RefreshContainer(ID string, full bool) error {
containers, err := n.client.ListContainers(true, false, fmt.Sprintf("{%q:[%q]}", "id", ID))
if err != nil {
return err
}
if len(containers) > 1 {
// We expect one container, if we get more than one, trigger a full refresh.
return n.RefreshContainers(full)
}
if len(containers) == 0 {
// The container doesn't exist on the node, remove it.
n.Lock()
delete(n.containers, ID)
n.Unlock()
return nil
}
_, err = n.updateContainer(containers[0], n.containers, full)
return err
}
func (n *Node) updateContainer(c dockerclient.Container, containers map[string]*cluster.Container, full bool) (map[string]*cluster.Container, error) {
var container *cluster.Container
n.Lock()
if current, exists := n.containers[c.Id]; exists {
// The container is already known.
container = current
} else {
// This is a brand new container. We need to do a full refresh.
container = &cluster.Container{
Node: n,
}
full = true
}
// Update its internal state.
container.Container = c
containers[container.Id] = container
// Release the lock here as the next step is slow.
n.Unlock()
// Update ContainerInfo.
if full {
info, err := n.client.InspectContainer(c.Id)
if err != nil {
return nil, err
}
container.Info = *info
// real CpuShares -> nb of CPUs
container.Info.Config.CpuShares = container.Info.Config.CpuShares / 100.0 * n.Cpus
}
return containers, nil
}
func (n *Node) RefreshContainersAsync() {
n.ch <- true
}
func (n *Node) refreshLoop() {
for {
var err error
select {
case <-n.ch:
err = n.RefreshContainers(false)
case <-time.After(stateRefreshPeriod):
err = n.RefreshContainers(false)
}
if err == nil {
err = n.refreshImages()
}
if err != nil {
if n.healthy {
n.emitEvent("node_disconnect")
}
n.healthy = false
log.WithFields(log.Fields{"name": n.name, "id": n.id}).Errorf("Flagging node as dead. Updated state failed: %v", err)
} else {
if !n.healthy {
log.WithFields(log.Fields{"name": n.name, "id": n.id}).Info("Node came back to life. Hooray!")
n.client.StopAllMonitorEvents()
n.client.StartMonitorEvents(n.handler, nil)
n.emitEvent("node_reconnect")
if err := n.updateSpecs(); err != nil {
log.WithFields(log.Fields{"name": n.name, "id": n.id}).Errorf("Update node specs failed: %v", err)
}
}
n.healthy = true
}
}
}
func (n *Node) emitEvent(event string) {
// If there is no event handler registered, abort right now.
if n.eventHandler == nil {
return
}
ev := &cluster.Event{
Event: dockerclient.Event{
Status: event,
From: "swarm",
Time: time.Now().Unix(),
},
Node: n,
}
n.eventHandler.Handle(ev)
}
// Return the sum of memory reserved by containers.
func (n *Node) UsedMemory() int64 {
var r int64 = 0
n.RLock()
for _, c := range n.containers {
r += c.Info.Config.Memory
}
n.RUnlock()
return r
}
// Return the sum of CPUs reserved by containers.
func (n *Node) UsedCpus() int64 {
var r int64 = 0
n.RLock()
for _, c := range n.containers {
r += c.Info.Config.CpuShares
}
n.RUnlock()
return r
}
func (n *Node) TotalMemory() int64 {
return n.Memory + (n.Memory * n.overcommitRatio / 100)
}
func (n *Node) TotalCpus() int64 {
return n.Cpus + (n.Cpus * n.overcommitRatio / 100)
}
func (n *Node) Create(config *dockerclient.ContainerConfig, name string, pullImage bool) (*cluster.Container, error) {
var (
err error
id string
client = n.client
)
newConfig := *config
// nb of CPUs -> real CpuShares
newConfig.CpuShares = config.CpuShares * 100 / n.Cpus
if id, err = client.CreateContainer(&newConfig, name); err != nil {
// If the error is other than not found, abort immediately.
if err != dockerclient.ErrNotFound || !pullImage {
return nil, err
}
// Otherwise, try to pull the image...
if err = n.Pull(config.Image); err != nil {
return nil, err
}
// ...And try again.
if id, err = client.CreateContainer(&newConfig, name); err != nil {
return nil, err
}
}
// Register the container immediately while waiting for a state refresh.
// Force a state refresh to pick up the newly created container.
n.RefreshContainer(id, true)
n.RLock()
defer n.RUnlock()
return n.containers[id], nil
}
// Destroy and remove a container from the node.
func (n *Node) Destroy(container *cluster.Container, force bool) error {
if err := n.client.RemoveContainer(container.Id, force, true); err != nil {
return err
}
// Remove the container from the state. Eventually, the state refresh loop
// will rewrite this.
n.Lock()
defer n.Unlock()
delete(n.containers, container.Id)
return nil
}
func (n *Node) Pull(image string) error {
if err := n.client.PullImage(image, nil); err != nil {
return err
}
return nil
}
// Register an event handler.
func (n *Node) Events(h cluster.EventHandler) error {
if n.eventHandler != nil {
return errors.New("event handler already set")
}
n.eventHandler = h
return nil
}
// Containers returns all the containers in the node.
func (n *Node) Containers() []*cluster.Container {
containers := []*cluster.Container{}
n.RLock()
for _, container := range n.containers {
containers = append(containers, container)
}
n.RUnlock()
return containers
}
// Container returns the container with IdOrName in the node.
func (n *Node) Container(IdOrName string) *cluster.Container {
// Abort immediately if the name is empty.
if len(IdOrName) == 0 {
return nil
}
n.RLock()
defer n.RUnlock()
for _, container := range n.Containers() {
// Match ID prefix.
if strings.HasPrefix(container.Id, IdOrName) {
return container
}
// Match name, /name or engine/name.
for _, name := range container.Names {
if name == IdOrName || name == "/"+IdOrName || container.Node.ID()+name == IdOrName || container.Node.Name()+name == IdOrName {
return container
}
}
}
return nil
}
func (n *Node) Images() []*cluster.Image {
images := []*cluster.Image{}
n.RLock()
for _, image := range n.images {
images = append(images, image)
}
n.RUnlock()
return images
}
// Image returns the image with IdOrName in the node
func (n *Node) Image(IdOrName string) *cluster.Image {
n.RLock()
defer n.RUnlock()
size := len(IdOrName)
for _, image := range n.Images() {
if image.Id == IdOrName || (size > 2 && strings.HasPrefix(image.Id, IdOrName)) {
return image
}
for _, t := range image.RepoTags {
if t == IdOrName || (size > 2 && strings.HasPrefix(t, IdOrName)) {
return image
}
}
}
return nil
}
func (n *Node) String() string {
return fmt.Sprintf("node %s addr %s", n.id, n.addr)
}
func (n *Node) handler(ev *dockerclient.Event, _ chan error, args ...interface{}) {
// Something changed - refresh our internal state.
switch ev.Status {
case "pull", "untag", "delete":
// These events refer to images so there's no need to update
// containers.
n.refreshImages()
case "start", "die":
// If the container is started or stopped, we have to do an inspect in
// order to get the new NetworkSettings.
n.RefreshContainer(ev.Id, true)
default:
// Otherwise, do a "soft" refresh of the container.
n.RefreshContainer(ev.Id, false)
}
// If there is no event handler registered, abort right now.
if n.eventHandler == nil {
return
}
event := &cluster.Event{
Node: n,
Event: *ev,
}
n.eventHandler.Handle(event)
}
// Inject a container into the internal state.
func (n *Node) AddContainer(container *cluster.Container) error {
n.Lock()
defer n.Unlock()
if _, ok := n.containers[container.Id]; ok {
return errors.New("container already exists")
}
n.containers[container.Id] = container
return nil
}
// Inject an image into the internal state.
func (n *Node) AddImage(image *cluster.Image) {
n.Lock()
defer n.Unlock()
n.images = append(n.images, image)
}
// Remove a container from the internal test.
func (n *Node) RemoveContainer(container *cluster.Container) error {
n.Lock()
defer n.Unlock()
if _, ok := n.containers[container.Id]; !ok {
return errors.New("container not found")
}
delete(n.containers, container.Id)
return nil
}
// Wipes the internal container state.
func (n *Node) CleanupContainers() {
n.Lock()
n.containers = make(map[string]*cluster.Container)
n.Unlock()
}

View File

@ -1,4 +1,4 @@
package cluster
package swarm
import (
"errors"
@ -65,8 +65,8 @@ func TestNodeCpusMemory(t *testing.T) {
assert.True(t, node.IsConnected())
assert.True(t, node.IsHealthy())
assert.Equal(t, node.ReservedCpus(), 0)
assert.Equal(t, node.ReservedMemory(), 0)
assert.Equal(t, node.UsedCpus(), 0)
assert.Equal(t, node.UsedMemory(), 0)
client.Mock.AssertExpectations(t)
}
@ -87,11 +87,11 @@ func TestNodeSpecs(t *testing.T) {
assert.Equal(t, node.Cpus, mockInfo.NCPU)
assert.Equal(t, node.Memory, mockInfo.MemTotal)
assert.Equal(t, node.Labels["storagedriver"], mockInfo.Driver)
assert.Equal(t, node.Labels["executiondriver"], mockInfo.ExecutionDriver)
assert.Equal(t, node.Labels["kernelversion"], mockInfo.KernelVersion)
assert.Equal(t, node.Labels["operatingsystem"], mockInfo.OperatingSystem)
assert.Equal(t, node.Labels["foo"], "bar")
assert.Equal(t, node.Labels()["storagedriver"], mockInfo.Driver)
assert.Equal(t, node.Labels()["executiondriver"], mockInfo.ExecutionDriver)
assert.Equal(t, node.Labels()["kernelversion"], mockInfo.KernelVersion)
assert.Equal(t, node.Labels()["operatingsystem"], mockInfo.OperatingSystem)
assert.Equal(t, node.Labels()["foo"], "bar")
client.Mock.AssertExpectations(t)
}
@ -225,22 +225,22 @@ func TestCreateContainer(t *testing.T) {
assert.Len(t, node.Containers(), 2)
}
func TestUsableMemory(t *testing.T) {
func TestTotalMemory(t *testing.T) {
node := NewNode("test", 0.05)
node.Memory = 1024
assert.Equal(t, node.UsableMemory(), 1024+1024*5/100)
assert.Equal(t, node.TotalMemory(), 1024+1024*5/100)
node = NewNode("test", 0)
node.Memory = 1024
assert.Equal(t, node.UsableMemory(), 1024)
assert.Equal(t, node.TotalMemory(), 1024)
}
func TestUsableCpus(t *testing.T) {
func TestTotalCpus(t *testing.T) {
node := NewNode("test", 0.05)
node.Cpus = 2
assert.Equal(t, node.UsableCpus(), 2+2*5/100)
assert.Equal(t, node.TotalCpus(), 2+2*5/100)
node = NewNode("test", 0)
node.Cpus = 2
assert.Equal(t, node.UsableCpus(), 2)
assert.Equal(t, node.TotalCpus(), 2)
}

244
cluster/swarm/swarm.go Normal file
View File

@ -0,0 +1,244 @@
package swarm
import (
"fmt"
"sync"
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/units"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/discovery"
"github.com/docker/swarm/scheduler"
"github.com/docker/swarm/state"
"github.com/samalba/dockerclient"
)
type SwarmCluster struct {
sync.RWMutex
eventHandler cluster.EventHandler
nodes map[string]*Node
scheduler *scheduler.Scheduler
options *cluster.Options
store *state.Store
}
func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, eventhandler cluster.EventHandler, options *cluster.Options) cluster.Cluster {
log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster")
cluster := &SwarmCluster{
eventHandler: eventhandler,
nodes: make(map[string]*Node),
scheduler: scheduler,
options: options,
store: store,
}
// get the list of entries from the discovery service
go func() {
d, err := discovery.New(options.Discovery, options.Heartbeat)
if err != nil {
log.Fatal(err)
}
entries, err := d.Fetch()
if err != nil {
log.Fatal(err)
}
cluster.newEntries(entries)
go d.Watch(cluster.newEntries)
}()
return cluster
}
// callback for the events
func (s *SwarmCluster) Handle(e *cluster.Event) error {
if err := s.eventHandler.Handle(e); err != nil {
log.Error(err)
}
return nil
}
// Schedule a brand new container into the cluster.
func (s *SwarmCluster) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) {
s.RLock()
defer s.RUnlock()
node, err := s.scheduler.SelectNodeForContainer(s.listNodes(), config)
if err != nil {
return nil, err
}
if n, ok := node.(*Node); ok {
container, err := n.Create(config, name, true)
if err != nil {
return nil, err
}
st := &state.RequestedState{
ID: container.Id,
Name: name,
Config: config,
}
return container, s.store.Add(container.Id, st)
}
return nil, nil
}
// Remove a container from the cluster. Containers should always be destroyed
// through the scheduler to guarantee atomicity.
func (s *SwarmCluster) RemoveContainer(container *cluster.Container, force bool) error {
s.Lock()
defer s.Unlock()
if n, ok := container.Node.(*Node); ok {
if err := n.Destroy(container, force); err != nil {
return err
}
}
if err := s.store.Remove(container.Id); err != nil {
if err == state.ErrNotFound {
log.Debugf("Container %s not found in the store", container.Id)
return nil
}
return err
}
return nil
}
// Entries are Docker Nodes
func (s *SwarmCluster) newEntries(entries []*discovery.Entry) {
for _, entry := range entries {
go func(m *discovery.Entry) {
if s.getNode(m.String()) == nil {
n := NewNode(m.String(), s.options.OvercommitRatio)
if err := n.Connect(s.options.TLSConfig); err != nil {
log.Error(err)
return
}
s.Lock()
if old, exists := s.nodes[n.id]; exists {
s.Unlock()
if old.ip != n.ip {
log.Errorf("ID duplicated. %s shared by %s and %s", n.id, old.IP(), n.IP())
} else {
log.Errorf("node %q is already registered", n.id)
}
return
}
s.nodes[n.id] = n
if err := n.Events(s); err != nil {
log.Error(err)
s.Unlock()
return
}
s.Unlock()
}
}(entry)
}
}
func (s *SwarmCluster) getNode(addr string) *Node {
for _, node := range s.nodes {
if node.addr == addr {
return node
}
}
return nil
}
// Containers returns all the images in the cluster.
func (s *SwarmCluster) Images() []*cluster.Image {
s.RLock()
defer s.RUnlock()
out := []*cluster.Image{}
for _, n := range s.nodes {
out = append(out, n.Images()...)
}
return out
}
// Image returns an image with IdOrName in the cluster
func (s *SwarmCluster) Image(IdOrName string) *cluster.Image {
// Abort immediately if the name is empty.
if len(IdOrName) == 0 {
return nil
}
s.RLock()
defer s.RUnlock()
for _, n := range s.nodes {
if image := n.Image(IdOrName); image != nil {
return image
}
}
return nil
}
// Containers returns all the containers in the cluster.
func (s *SwarmCluster) Containers() []*cluster.Container {
s.RLock()
defer s.RUnlock()
out := []*cluster.Container{}
for _, n := range s.nodes {
out = append(out, n.Containers()...)
}
return out
}
// Container returns the container with IdOrName in the cluster
func (s *SwarmCluster) Container(IdOrName string) *cluster.Container {
// Abort immediately if the name is empty.
if len(IdOrName) == 0 {
return nil
}
s.RLock()
defer s.RUnlock()
for _, n := range s.nodes {
if container := n.Container(IdOrName); container != nil {
return container
}
}
return nil
}
// nodes returns all the nodess in the cluster.
func (s *SwarmCluster) listNodes() []cluster.Node {
s.RLock()
defer s.RUnlock()
out := []cluster.Node{}
for _, n := range s.nodes {
out = append(out, n)
}
return out
}
func (s *SwarmCluster) Info() [][2]string {
info := [][2]string{{"\bNodes", fmt.Sprintf("%d", len(s.nodes))}}
for _, node := range s.nodes {
info = append(info, [2]string{node.Name(), node.Addr()})
info = append(info, [2]string{" └ Containers", fmt.Sprintf("%d", len(node.Containers()))})
info = append(info, [2]string{" └ Reserved CPUs", fmt.Sprintf("%d / %d", node.UsedCpus(), node.TotalCpus())})
info = append(info, [2]string{" └ Reserved Memory", fmt.Sprintf("%s / %s", units.BytesSize(float64(node.UsedMemory())), units.BytesSize(float64(node.TotalMemory())))})
}
return info
}

View File

@ -0,0 +1,48 @@
package swarm
import (
"testing"
"github.com/docker/swarm/cluster"
"github.com/samalba/dockerclient"
"github.com/stretchr/testify/assert"
)
func createNode(t *testing.T, ID string, containers ...dockerclient.Container) *Node {
node := NewNode(ID, 0)
node.name = ID
node.id = ID
for _, container := range containers {
node.AddContainer(&cluster.Container{Container: container, Node: node})
}
return node
}
func TestContainerLookup(t *testing.T) {
s := &SwarmCluster{
nodes: make(map[string]*Node),
}
container := dockerclient.Container{
Id: "container-id",
Names: []string{"/container-name1", "/container-name2"},
}
n := createNode(t, "test-node", container)
s.nodes[n.ID()] = n
// Invalid lookup
assert.Nil(t, s.Container("invalid-id"))
assert.Nil(t, s.Container(""))
// Container ID lookup.
assert.NotNil(t, s.Container("container-id"))
// Container ID prefix lookup.
assert.NotNil(t, s.Container("container-"))
// Container name lookup.
assert.NotNil(t, s.Container("container-name1"))
assert.NotNil(t, s.Container("container-name2"))
// Container node/name matching.
assert.NotNil(t, s.Container("test-node/container-name1"))
assert.NotNil(t, s.Container("test-node/container-name2"))
}

View File

@ -94,4 +94,9 @@ var (
Usage: "filter to use [constraint, affinity, health, port, dependency]",
Value: &flFilterValue,
}
flCluster = cli.StringFlag{
Name: "cluster, c",
Usage: "cluster to use [swarm, mesos]",
Value: "swarm",
}
)

View File

@ -102,7 +102,7 @@ func main() {
ShortName: "m",
Usage: "manage a docker cluster",
Flags: []cli.Flag{
flStore,
flStore, flCluster,
flStrategy, flFilter,
flHosts, flHeartBeat, flOverCommit,
flTls, flTlsCaCert, flTlsCert, flTlsKey, flTlsVerify,

View File

@ -11,7 +11,7 @@ import (
"github.com/codegangsta/cli"
"github.com/docker/swarm/api"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/discovery"
"github.com/docker/swarm/cluster/swarm"
"github.com/docker/swarm/scheduler"
"github.com/docker/swarm/scheduler/filter"
"github.com/docker/swarm/scheduler/strategy"
@ -97,9 +97,6 @@ func manage(c *cli.Context) {
log.Fatal(err)
}
cluster := cluster.NewCluster(store, tlsConfig, c.Float64("overcommit"))
cluster.Events(&logHandler{})
dflag := getDiscovery(c)
if dflag == "" {
log.Fatalf("discovery required to manage a cluster. See '%s manage --help'.", c.App.Name)
@ -120,33 +117,22 @@ func manage(c *cli.Context) {
log.Fatal(err)
}
// get the list of nodes from the discovery service
go func() {
d, err := discovery.New(dflag, c.Int("heartbeat"))
if err != nil {
log.Fatal(err)
}
sched := scheduler.New(s, fs)
nodes, err := d.Fetch()
if err != nil {
log.Fatal(err)
eventsHandler := api.NewEventsHandler()
options := &cluster.Options{
TLSConfig: tlsConfig,
OvercommitRatio: c.Float64("overcommit"),
Discovery: dflag,
Heartbeat: c.Int("heartbeat"),
}
}
cluster.UpdateNodes(nodes)
go d.Watch(cluster.UpdateNodes)
}()
sched := scheduler.NewScheduler(
cluster,
s,
fs,
)
cluster := swarm.NewCluster(sched, store, eventsHandler, options)
// see https://github.com/codegangsta/cli/issues/160
hosts := c.StringSlice("host")
if c.IsSet("host") || c.IsSet("H") {
hosts = hosts[1:]
}
log.Fatal(api.ListenAndServe(cluster, sched, hosts, c.Bool("cors"), tlsConfig))
log.Fatal(api.ListenAndServe(cluster, hosts, c.Bool("cors"), tlsConfig, eventsHandler))
}

View File

@ -13,7 +13,7 @@ import (
type AffinityFilter struct {
}
func (f *AffinityFilter) Filter(config *dockerclient.ContainerConfig, nodes []*cluster.Node) ([]*cluster.Node, error) {
func (f *AffinityFilter) Filter(config *dockerclient.ContainerConfig, nodes []cluster.Node) ([]cluster.Node, error) {
affinities, err := parseExprs("affinity", config.Env)
if err != nil {
return nil, err
@ -22,7 +22,7 @@ func (f *AffinityFilter) Filter(config *dockerclient.ContainerConfig, nodes []*c
for _, affinity := range affinities {
log.Debugf("matching affinity: %s%s%s", affinity.key, OPERATORS[affinity.operator], affinity.value)
candidates := []*cluster.Node{}
candidates := []cluster.Node{}
for _, node := range nodes {
switch affinity.key {
case "container":

View File

@ -11,56 +11,55 @@ import (
func TestAffinityFilter(t *testing.T) {
var (
f = AffinityFilter{}
nodes = []*cluster.Node{
cluster.NewNode("node-0", 0),
cluster.NewNode("node-1", 0),
cluster.NewNode("node-2", 0),
nodes = []cluster.Node{
&FakeNode{
id: "node-0-id",
name: "node-0-name",
addr: "node-0",
containers: []*cluster.Container{
{Container: dockerclient.Container{
Id: "container-n0-0-id",
Names: []string{"/container-n0-0-name"},
}},
{Container: dockerclient.Container{
Id: "container-n0-1-id",
Names: []string{"/container-n0-1-name"},
}},
},
images: []*cluster.Image{{Image: dockerclient.Image{
Id: "image-0-id",
RepoTags: []string{"image-0:tag1", "image-0:tag2"},
}}},
},
&FakeNode{
id: "node-1-id",
name: "node-1-name",
addr: "node-1",
containers: []*cluster.Container{
{Container: dockerclient.Container{
Id: "container-n1-0-id",
Names: []string{"/container-n1-0-name"},
}},
{Container: dockerclient.Container{
Id: "container-n1-1-id",
Names: []string{"/container-n1-1-name"},
}},
},
images: []*cluster.Image{{Image: dockerclient.Image{
Id: "image-1-id",
RepoTags: []string{"image-1:tag1", "image-0:tag3", "image-1:tag2"},
}}},
},
&FakeNode{
id: "node-2-id",
name: "node-2-name",
addr: "node-2",
},
}
result []*cluster.Node
result []cluster.Node
err error
)
nodes[0].ID = "node-0-id"
nodes[0].Name = "node-0-name"
nodes[0].AddContainer(&cluster.Container{
Container: dockerclient.Container{
Id: "container-n0-0-id",
Names: []string{"/container-n0-0-name"},
},
})
nodes[0].AddContainer(&cluster.Container{
Container: dockerclient.Container{
Id: "container-n0-1-id",
Names: []string{"/container-n0-1-name"},
},
})
nodes[0].AddImage(&dockerclient.Image{
Id: "image-0-id",
RepoTags: []string{"image-0:tag1", "image-0:tag2"},
})
nodes[1].ID = "node-1-id"
nodes[1].Name = "node-1-name"
nodes[1].AddContainer(&cluster.Container{
Container: dockerclient.Container{
Id: "container-n1-0-id",
Names: []string{"/container-n1-0-name"},
},
})
nodes[1].AddContainer(&cluster.Container{
Container: dockerclient.Container{
Id: "container-n1-1-id",
Names: []string{"/container-n1-1-name"},
},
})
nodes[1].AddImage(&dockerclient.Image{
Id: "image-1-id",
RepoTags: []string{"image-1:tag1", "image-0:tag3", "image-1:tag2"},
})
nodes[2].ID = "node-2-id"
nodes[2].Name = "node-2-name"
// Without constraints we should get the unfiltered list of nodes back.
result, err = f.Filter(&dockerclient.ContainerConfig{}, nodes)
assert.NoError(t, err)

View File

@ -12,7 +12,7 @@ import (
type ConstraintFilter struct {
}
func (f *ConstraintFilter) Filter(config *dockerclient.ContainerConfig, nodes []*cluster.Node) ([]*cluster.Node, error) {
func (f *ConstraintFilter) Filter(config *dockerclient.ContainerConfig, nodes []cluster.Node) ([]cluster.Node, error) {
constraints, err := parseExprs("constraint", config.Env)
if err != nil {
return nil, err
@ -21,16 +21,16 @@ func (f *ConstraintFilter) Filter(config *dockerclient.ContainerConfig, nodes []
for _, constraint := range constraints {
log.Debugf("matching constraint: %s %s %s", constraint.key, OPERATORS[constraint.operator], constraint.value)
candidates := []*cluster.Node{}
candidates := []cluster.Node{}
for _, node := range nodes {
switch constraint.key {
case "node":
// "node" label is a special case pinning a container to a specific node.
if constraint.Match(node.ID, node.Name) {
if constraint.Match(node.ID(), node.Name()) {
candidates = append(candidates, node)
}
default:
if constraint.Match(node.Labels[constraint.key]) {
if constraint.Match(node.Labels()[constraint.key]) {
candidates = append(candidates, node)
}
}

View File

@ -8,46 +8,54 @@ import (
"github.com/stretchr/testify/assert"
)
func testFixtures() (nodes []*cluster.Node) {
nodes = []*cluster.Node{
cluster.NewNode("node-0", 0),
cluster.NewNode("node-1", 0),
cluster.NewNode("node-2", 0),
cluster.NewNode("node-3", 0),
}
nodes[0].ID = "node-0-id"
nodes[0].Name = "node-0-name"
nodes[0].Labels = map[string]string{
"name": "node0",
"group": "1",
"region": "us-west",
}
func testFixtures() []cluster.Node {
return []cluster.Node{
&FakeNode{
id: "node-0-id",
name: "node-0-name",
addr: "node-0",
labels: map[string]string{
"name": "node0",
"group": "1",
"region": "us-west",
},
},
nodes[1].ID = "node-1-id"
nodes[1].Name = "node-1-name"
nodes[1].Labels = map[string]string{
"name": "node1",
"group": "1",
"region": "us-east",
}
&FakeNode{
id: "node-1-id",
name: "node-1-name",
addr: "node-1",
labels: map[string]string{
"name": "node1",
"group": "1",
"region": "us-east",
},
},
nodes[2].ID = "node-2-id"
nodes[2].Name = "node-2-name"
nodes[2].Labels = map[string]string{
"name": "node2",
"group": "2",
"region": "eu",
&FakeNode{
id: "node-2-id",
name: "node-2-name",
addr: "node-2",
labels: map[string]string{
"name": "node2",
"group": "2",
"region": "eu",
},
},
&FakeNode{
id: "node-3-id",
name: "node-3-name",
addr: "node-3",
},
}
nodes[3].ID = "node-3-id"
nodes[3].Name = "node-3-name"
return
}
func TestConstrainteFilter(t *testing.T) {
var (
f = ConstraintFilter{}
nodes = testFixtures()
result []*cluster.Node
result []cluster.Node
err error
)
@ -57,67 +65,49 @@ func TestConstrainteFilter(t *testing.T) {
assert.Equal(t, result, nodes)
// Set a constraint that cannot be fullfilled and expect an error back.
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:does_not_exist==true"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:does_not_exist==true"}}, nodes)
assert.Error(t, err)
// Set a contraint that can only be filled by a single node.
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:name==node1"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:name==node1"}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[1])
// This constraint can only be fullfilled by a subset of nodes.
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:group==1"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:group==1"}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 2)
assert.NotContains(t, result, nodes[2])
// Validate node pinning by id.
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:node==node-2-id"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:node==node-2-id"}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[2])
// Validate node pinning by name.
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:node==node-1-name"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:node==node-1-name"}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[1])
// Make sure constraints are evaluated as logical ANDs.
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:name==node0", "constraint:group==1"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:name==node0", "constraint:group==1"}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[0])
// Check matching
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:region==us"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:region==us"}}, nodes)
assert.Error(t, err)
assert.Len(t, result, 0)
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:region==us*"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:region==us*"}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 2)
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:region==*us*"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:region==*us*"}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 2)
}
@ -126,35 +116,27 @@ func TestConstraintNotExpr(t *testing.T) {
var (
f = ConstraintFilter{}
nodes = testFixtures()
result []*cluster.Node
result []cluster.Node
err error
)
// Check not (!) expression
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:name!=node0"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:name!=node0"}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 3)
// Check not does_not_exist. All should be found
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:name!=does_not_exist"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:name!=does_not_exist"}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 4)
// Check name must not start with n
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:name!=n*"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:name!=n*"}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
// Check not with globber pattern
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:region!=us*"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:region!=us*"}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 2)
}
@ -163,35 +145,27 @@ func TestConstraintRegExp(t *testing.T) {
var (
f = ConstraintFilter{}
nodes = testFixtures()
result []*cluster.Node
result []cluster.Node
err error
)
// Check with regular expression /node\d/ matches node{0..2}
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{`constraint:name==/node\d/`},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{`constraint:name==/node\d/`}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 3)
// Check with regular expression /node\d/ matches node{0..2}
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{`constraint:name==/node[12]/`},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{`constraint:name==/node[12]/`}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 2)
// Check with regular expression ! and regexp /node[12]/ matches node[0] and node[3]
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{`constraint:name!=/node[12]/`},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{`constraint:name!=/node[12]/`}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 2)
// Validate node pinning by ! and regexp.
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:node!=/node-[01]-id/"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:node!=/node-[01]-id/"}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 2)
}
@ -200,123 +174,57 @@ func TestFilterRegExpCaseInsensitive(t *testing.T) {
var (
f = ConstraintFilter{}
nodes = testFixtures()
result []*cluster.Node
result []cluster.Node
err error
)
// Prepare node with a strange name
node3 := cluster.NewNode("node-3", 0)
node3.ID = "node-3-id"
node3.Name = "node-3-name"
node3.Labels = map[string]string{
"name": "aBcDeF",
"group": "2",
"region": "eu",
if n, ok := nodes[3].(*FakeNode); ok {
n.labels = map[string]string{
"name": "aBcDeF",
"group": "2",
"region": "eu",
}
}
nodes[3] = node3
// Case-sensitive, so not match
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{`constraint:name==/abcdef/`},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{`constraint:name==/abcdef/`}}, nodes)
assert.Error(t, err)
assert.Len(t, result, 0)
// Match with case-insensitive
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{`constraint:name==/(?i)abcdef/`},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{`constraint:name==/(?i)abcdef/`}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[3])
assert.Equal(t, result[0].Labels["name"], "aBcDeF")
assert.Equal(t, result[0].Labels()["name"], "aBcDeF")
// Test ! filter combined with case insensitive
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{`constraint:name!=/(?i)abc*/`},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{`constraint:name!=/(?i)abc*/`}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 3)
}
func TestFilterWithRelativeComparisons(t *testing.T) {
t.Skip()
var (
f = ConstraintFilter{}
nodes = testFixtures()
result []*cluster.Node
err error
)
// Prepare node with a strange name
node3 := cluster.NewNode("node-3", 0)
node3.ID = "node-3-id"
node3.Name = "node-3-name"
node3.Labels = map[string]string{
"name": "aBcDeF",
"group": "4",
"kernel": "3.1",
"region": "eu",
}
nodes = append(nodes, node3)
// Check with less than or equal
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{`constraint:group<=3`},
}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 3)
// Check with greater than or equal
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{`constraint:group>=4`},
}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
// Another gte check with a complex string
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{`constraint:kernel>=3.0`},
}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[3])
assert.Equal(t, result[0].Labels["kernel"], "3.1")
// Check with greater than or equal. This should match node-3-id.
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{`constraint:node>=node-3`},
}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
}
func TestFilterEquals(t *testing.T) {
var (
f = ConstraintFilter{}
nodes = testFixtures()
result []*cluster.Node
result []cluster.Node
err error
)
// Check == comparison
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:name==node0"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:name==node0"}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
// Test == with glob
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:region==us*"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:region==us*"}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 2)
// Validate node name with ==
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:node==node-1-name"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:node==node-1-name"}}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[1])
@ -326,19 +234,15 @@ func TestUnsupportedOperators(t *testing.T) {
var (
f = ConstraintFilter{}
nodes = testFixtures()
result []*cluster.Node
result []cluster.Node
err error
)
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:name=node0"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:name=node0"}}, nodes)
assert.Error(t, err)
assert.Len(t, result, 0)
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"constraint:name=!node0"},
}, nodes)
result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{"constraint:name=!node0"}}, nodes)
assert.Error(t, err)
assert.Len(t, result, 0)
}

View File

@ -12,7 +12,7 @@ import (
type DependencyFilter struct {
}
func (f *DependencyFilter) Filter(config *dockerclient.ContainerConfig, nodes []*cluster.Node) ([]*cluster.Node, error) {
func (f *DependencyFilter) Filter(config *dockerclient.ContainerConfig, nodes []cluster.Node) ([]cluster.Node, error) {
if len(nodes) == 0 {
return nodes, nil
}
@ -29,7 +29,7 @@ func (f *DependencyFilter) Filter(config *dockerclient.ContainerConfig, nodes []
net = append(net, strings.TrimPrefix(config.HostConfig.NetworkMode, "container:"))
}
candidates := []*cluster.Node{}
candidates := []cluster.Node{}
for _, node := range nodes {
if f.check(config.HostConfig.VolumesFrom, node) &&
f.check(links, node) &&
@ -61,7 +61,7 @@ func (f *DependencyFilter) String(config *dockerclient.ContainerConfig) string {
}
// Ensure that the node contains all dependent containers.
func (f *DependencyFilter) check(dependencies []string, node *cluster.Node) bool {
func (f *DependencyFilter) check(dependencies []string, node cluster.Node) bool {
for _, dependency := range dependencies {
if node.Container(dependency) == nil {
return false

View File

@ -11,26 +11,33 @@ import (
func TestDependencyFilterSimple(t *testing.T) {
var (
f = DependencyFilter{}
nodes = []*cluster.Node{
cluster.NewNode("node-1", 0),
cluster.NewNode("node-2", 0),
cluster.NewNode("node-3", 0),
nodes = []cluster.Node{
&FakeNode{
id: "node-0-id",
name: "node-0-name",
addr: "node-0",
containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c0"}}},
},
&FakeNode{
id: "node-1-id",
name: "node-1-name",
addr: "node-1",
containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c1"}}},
},
&FakeNode{
id: "node-2-id",
name: "node-2-name",
addr: "node-2",
containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}},
},
}
result []*cluster.Node
err error
container *cluster.Container
config *dockerclient.ContainerConfig
result []cluster.Node
err error
config *dockerclient.ContainerConfig
)
container = &cluster.Container{Container: dockerclient.Container{Id: "c0"}}
assert.NoError(t, nodes[0].AddContainer(container))
container = &cluster.Container{Container: dockerclient.Container{Id: "c1"}}
assert.NoError(t, nodes[1].AddContainer(container))
container = &cluster.Container{Container: dockerclient.Container{Id: "c2"}}
assert.NoError(t, nodes[2].AddContainer(container))
// No dependencies - make sure we don't filter anything out.
config = &dockerclient.ContainerConfig{}
result, err = f.Filter(config, nodes)
@ -38,44 +45,36 @@ func TestDependencyFilterSimple(t *testing.T) {
assert.Equal(t, result, nodes)
// volumes-from.
config = &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{
VolumesFrom: []string{"c0"},
},
}
config = &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
VolumesFrom: []string{"c0"},
}}
result, err = f.Filter(config, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[0])
// link.
config = &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{
Links: []string{"c1:foobar"},
},
}
config = &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
Links: []string{"c1:foobar"},
}}
result, err = f.Filter(config, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[1])
// net.
config = &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{
NetworkMode: "container:c2",
},
}
config = &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
NetworkMode: "container:c2",
}}
result, err = f.Filter(config, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[2])
// net not prefixed by "container:" should be ignored.
config = &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{
NetworkMode: "bridge",
},
}
config = &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
NetworkMode: "bridge",
}}
result, err = f.Filter(config, nodes)
assert.NoError(t, err)
assert.Equal(t, result, nodes)
@ -84,68 +83,69 @@ func TestDependencyFilterSimple(t *testing.T) {
func TestDependencyFilterMulti(t *testing.T) {
var (
f = DependencyFilter{}
nodes = []*cluster.Node{
cluster.NewNode("node-1", 0),
cluster.NewNode("node-2", 0),
cluster.NewNode("node-3", 0),
nodes = []cluster.Node{
// nodes[0] has c0 and c1
&FakeNode{
id: "node-0-id",
name: "node-0-name",
addr: "node-0",
containers: []*cluster.Container{
{Container: dockerclient.Container{Id: "c0"}},
{Container: dockerclient.Container{Id: "c1"}},
},
},
// nodes[1] has c2
&FakeNode{
id: "node-1-id",
name: "node-1-name",
addr: "node-1",
containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}},
},
// nodes[2] has nothing
&FakeNode{
id: "node-2-id",
name: "node-2-name",
addr: "node-2",
},
}
result []*cluster.Node
err error
container *cluster.Container
config *dockerclient.ContainerConfig
result []cluster.Node
err error
config *dockerclient.ContainerConfig
)
// nodes[0] has c0 and c1
container = &cluster.Container{Container: dockerclient.Container{Id: "c0"}}
assert.NoError(t, nodes[0].AddContainer(container))
container = &cluster.Container{Container: dockerclient.Container{Id: "c1"}}
assert.NoError(t, nodes[0].AddContainer(container))
// nodes[1] has c2
container = &cluster.Container{Container: dockerclient.Container{Id: "c2"}}
assert.NoError(t, nodes[1].AddContainer(container))
// nodes[2] has nothing
// Depend on c0 which is on nodes[0]
config = &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{
VolumesFrom: []string{"c0"},
},
}
config = &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
VolumesFrom: []string{"c0"},
}}
result, err = f.Filter(config, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[0])
// Depend on c1 which is on nodes[0]
config = &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{
VolumesFrom: []string{"c1"},
},
}
config = &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
VolumesFrom: []string{"c1"},
}}
result, err = f.Filter(config, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[0])
// Depend on c0 AND c1 which are both on nodes[0]
config = &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{
VolumesFrom: []string{"c0", "c1"},
},
}
config = &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
VolumesFrom: []string{"c0", "c1"},
}}
result, err = f.Filter(config, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[0])
// Depend on c0 AND c2 which are on different nodes.
config = &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{
VolumesFrom: []string{"c0", "c2"},
},
}
config = &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
VolumesFrom: []string{"c0", "c2"},
}}
result, err = f.Filter(config, nodes)
assert.Error(t, err)
}
@ -153,29 +153,38 @@ func TestDependencyFilterMulti(t *testing.T) {
func TestDependencyFilterChaining(t *testing.T) {
var (
f = DependencyFilter{}
nodes = []*cluster.Node{
cluster.NewNode("node-1", 0),
cluster.NewNode("node-2", 0),
cluster.NewNode("node-3", 0),
nodes = []cluster.Node{
// nodes[0] has c0 and c1
&FakeNode{
id: "node-0-id",
name: "node-0-name",
addr: "node-0",
containers: []*cluster.Container{
{Container: dockerclient.Container{Id: "c0"}},
{Container: dockerclient.Container{Id: "c1"}},
},
},
// nodes[1] has c2
&FakeNode{
id: "node-1-id",
name: "node-1-name",
addr: "node-1",
containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}},
},
// nodes[2] has nothing
&FakeNode{
id: "node-2-id",
name: "node-2-name",
addr: "node-2",
},
}
result []*cluster.Node
err error
container *cluster.Container
config *dockerclient.ContainerConfig
result []cluster.Node
err error
config *dockerclient.ContainerConfig
)
// nodes[0] has c0 and c1
container = &cluster.Container{Container: dockerclient.Container{Id: "c0"}}
assert.NoError(t, nodes[0].AddContainer(container))
container = &cluster.Container{Container: dockerclient.Container{Id: "c1"}}
assert.NoError(t, nodes[0].AddContainer(container))
// nodes[1] has c2
container = &cluster.Container{Container: dockerclient.Container{Id: "c2"}}
assert.NoError(t, nodes[1].AddContainer(container))
// nodes[2] has nothing
// Different dependencies on c0 and c1
config = &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{

View File

@ -0,0 +1,46 @@
package filter
import "github.com/docker/swarm/cluster"
type FakeNode struct {
id string
name string
addr string
containers []*cluster.Container
images []*cluster.Image
labels map[string]string
}
func (fn *FakeNode) ID() string { return fn.id }
func (fn *FakeNode) Name() string { return fn.name }
func (fn *FakeNode) IP() string { return "" }
func (fn *FakeNode) Addr() string { return fn.addr }
func (fn *FakeNode) Images() []*cluster.Image { return fn.images }
func (fn *FakeNode) Image(id string) *cluster.Image {
for _, image := range fn.images {
if image.Id == id {
return image
}
}
return nil
}
func (fn *FakeNode) Containers() []*cluster.Container { return fn.containers }
func (fn *FakeNode) Container(id string) *cluster.Container {
for _, container := range fn.containers {
if container.Id == id {
return container
}
}
return nil
}
func (fn *FakeNode) TotalCpus() int64 { return 0 }
func (fn *FakeNode) UsedCpus() int64 { return 0 }
func (fn *FakeNode) TotalMemory() int64 { return 0 }
func (fn *FakeNode) UsedMemory() int64 { return 0 }
func (fn *FakeNode) Labels() map[string]string { return fn.labels }
func (fn *FakeNode) IsHealthy() bool { return true }
func (fn *FakeNode) AddContainer(container *cluster.Container) error {
fn.containers = append(fn.containers, container)
return nil
}

View File

@ -10,7 +10,7 @@ import (
type Filter interface {
// Return a subset of nodes that were accepted by the filtering policy.
Filter(*dockerclient.ContainerConfig, []*cluster.Node) ([]*cluster.Node, error)
Filter(*dockerclient.ContainerConfig, []cluster.Node) ([]cluster.Node, error)
}
var (
@ -43,7 +43,7 @@ func New(names []string) ([]Filter, error) {
}
// Apply a set of filters in batch.
func ApplyFilters(filters []Filter, config *dockerclient.ContainerConfig, nodes []*cluster.Node) ([]*cluster.Node, error) {
func ApplyFilters(filters []Filter, config *dockerclient.ContainerConfig, nodes []cluster.Node) ([]cluster.Node, error) {
var err error
for _, filter := range filters {

View File

@ -15,8 +15,8 @@ var (
type HealthFilter struct {
}
func (f *HealthFilter) Filter(_ *dockerclient.ContainerConfig, nodes []*cluster.Node) ([]*cluster.Node, error) {
result := []*cluster.Node{}
func (f *HealthFilter) Filter(_ *dockerclient.ContainerConfig, nodes []cluster.Node) ([]cluster.Node, error) {
result := []cluster.Node{}
for _, node := range nodes {
if node.IsHealthy() {
result = append(result, node)

View File

@ -13,10 +13,10 @@ import (
type PortFilter struct {
}
func (p *PortFilter) Filter(config *dockerclient.ContainerConfig, nodes []*cluster.Node) ([]*cluster.Node, error) {
func (p *PortFilter) Filter(config *dockerclient.ContainerConfig, nodes []cluster.Node) ([]cluster.Node, error) {
for _, port := range config.HostConfig.PortBindings {
for _, binding := range port {
candidates := []*cluster.Node{}
candidates := []cluster.Node{}
for _, node := range nodes {
if !p.portAlreadyInUse(node, binding) {
candidates = append(candidates, node)
@ -31,7 +31,7 @@ func (p *PortFilter) Filter(config *dockerclient.ContainerConfig, nodes []*clust
return nodes, nil
}
func (p *PortFilter) portAlreadyInUse(node *cluster.Node, requested dockerclient.PortBinding) bool {
func (p *PortFilter) portAlreadyInUse(node cluster.Node, requested dockerclient.PortBinding) bool {
for _, c := range node.Containers() {
// HostConfig.PortBindings contains the requested ports.
// NetworkSettings.Ports contains the actual ports.

View File

@ -23,32 +23,40 @@ func makeBinding(ip, port string) map[string][]dockerclient.PortBinding {
func TestPortFilterNoConflicts(t *testing.T) {
var (
p = PortFilter{}
nodes = []*cluster.Node{
cluster.NewNode("node-1", 0),
cluster.NewNode("node-2", 0),
cluster.NewNode("node-3", 0),
nodes = []cluster.Node{
&FakeNode{
id: "node-0-id",
name: "node-0-name",
addr: "node-0",
},
&FakeNode{
id: "node-1-id",
name: "node-1-name",
addr: "node-1",
},
&FakeNode{
id: "node-2-id",
name: "node-2-name",
addr: "node-2",
},
}
result []*cluster.Node
result []cluster.Node
err error
)
// Request no ports.
config := &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{
PortBindings: map[string][]dockerclient.PortBinding{},
},
}
config := &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
PortBindings: map[string][]dockerclient.PortBinding{},
}}
// Make sure we don't filter anything out.
result, err = p.Filter(config, nodes)
assert.NoError(t, err)
assert.Equal(t, result, nodes)
// Request port 80.
config = &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("", "80"),
},
}
config = &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("", "80"),
}}
// Since there are no other containers in the cluster, this shouldn't
// filter anything either.
@ -58,7 +66,9 @@ func TestPortFilterNoConflicts(t *testing.T) {
// Add a container taking a different (4242) port.
container := &cluster.Container{Container: dockerclient.Container{Id: "c1"}, Info: dockerclient.ContainerInfo{HostConfig: &dockerclient.HostConfig{PortBindings: makeBinding("", "4242")}}}
assert.NoError(t, nodes[0].AddContainer(container))
if n, ok := nodes[0].(*FakeNode); ok {
assert.NoError(t, n.AddContainer(container))
}
// Since no node is using port 80, there should be no filter
result, err = p.Filter(config, nodes)
@ -69,25 +79,37 @@ func TestPortFilterNoConflicts(t *testing.T) {
func TestPortFilterSimple(t *testing.T) {
var (
p = PortFilter{}
nodes = []*cluster.Node{
cluster.NewNode("node-1", 0),
cluster.NewNode("node-2", 0),
cluster.NewNode("node-3", 0),
nodes = []cluster.Node{
&FakeNode{
id: "node-0-id",
name: "node-0-name",
addr: "node-0",
},
&FakeNode{
id: "node-1-id",
name: "node-1-name",
addr: "node-1",
},
&FakeNode{
id: "node-2-id",
name: "node-2-name",
addr: "node-2",
},
}
result []*cluster.Node
result []cluster.Node
err error
)
// Add a container taking away port 80 to nodes[0].
container := &cluster.Container{Container: dockerclient.Container{Id: "c1"}, Info: dockerclient.ContainerInfo{HostConfig: &dockerclient.HostConfig{PortBindings: makeBinding("", "80")}}}
assert.NoError(t, nodes[0].AddContainer(container))
if n, ok := nodes[0].(*FakeNode); ok {
assert.NoError(t, n.AddContainer(container))
}
// Request port 80.
config := &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("", "80"),
},
}
config := &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("", "80"),
}}
// nodes[0] should be excluded since port 80 is taken away.
result, err = p.Filter(config, nodes)
@ -98,25 +120,37 @@ func TestPortFilterSimple(t *testing.T) {
func TestPortFilterDifferentInterfaces(t *testing.T) {
var (
p = PortFilter{}
nodes = []*cluster.Node{
cluster.NewNode("node-1", 0),
cluster.NewNode("node-2", 0),
cluster.NewNode("node-3", 0),
nodes = []cluster.Node{
&FakeNode{
id: "node-0-id",
name: "node-0-name",
addr: "node-0",
},
&FakeNode{
id: "node-1-id",
name: "node-1-name",
addr: "node-1",
},
&FakeNode{
id: "node-2-id",
name: "node-2-name",
addr: "node-2",
},
}
result []*cluster.Node
result []cluster.Node
err error
)
// Add a container taking away port 80 on every interface to nodes[0].
container := &cluster.Container{Container: dockerclient.Container{Id: "c1"}, Info: dockerclient.ContainerInfo{HostConfig: &dockerclient.HostConfig{PortBindings: makeBinding("", "80")}}}
assert.NoError(t, nodes[0].AddContainer(container))
if n, ok := nodes[0].(*FakeNode); ok {
assert.NoError(t, n.AddContainer(container))
}
// Request port 80 for the local interface.
config := &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("127.0.0.1", "80"),
},
}
config := &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("127.0.0.1", "80"),
}}
// nodes[0] should be excluded since port 80 is taken away for every
// interface.
@ -127,14 +161,13 @@ func TestPortFilterDifferentInterfaces(t *testing.T) {
// Add a container taking away port 4242 on the local interface of
// nodes[1].
container = &cluster.Container{Container: dockerclient.Container{Id: "c1"}, Info: dockerclient.ContainerInfo{HostConfig: &dockerclient.HostConfig{PortBindings: makeBinding("127.0.0.1", "4242")}}}
assert.NoError(t, nodes[1].AddContainer(container))
// Request port 4242 on the same interface.
config = &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("127.0.0.1", "4242"),
},
if n, ok := nodes[1].(*FakeNode); ok {
assert.NoError(t, n.AddContainer(container))
}
// Request port 4242 on the same interface.
config = &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("127.0.0.1", "4242"),
}}
// nodes[1] should be excluded since port 4242 is already taken on that
// interface.
result, err = p.Filter(config, nodes)
@ -142,33 +175,27 @@ func TestPortFilterDifferentInterfaces(t *testing.T) {
assert.NotContains(t, result, nodes[1])
// Request port 4242 on every interface.
config = &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("0.0.0.0", "4242"),
},
}
config = &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("0.0.0.0", "4242"),
}}
// nodes[1] should still be excluded since the port is not available on the same interface.
result, err = p.Filter(config, nodes)
assert.NoError(t, err)
assert.NotContains(t, result, nodes[1])
// Request port 4242 on every interface using an alternative syntax.
config = &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("", "4242"),
},
}
config = &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("", "4242"),
}}
// nodes[1] should still be excluded since the port is not available on the same interface.
result, err = p.Filter(config, nodes)
assert.NoError(t, err)
assert.NotContains(t, result, nodes[1])
// Finally, request port 4242 on a different interface.
config = &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("192.168.1.1", "4242"),
},
}
config = &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("192.168.1.1", "4242"),
}}
// nodes[1] should be included this time since the port is available on the
// other interface.
result, err = p.Filter(config, nodes)
@ -179,12 +206,24 @@ func TestPortFilterDifferentInterfaces(t *testing.T) {
func TestPortFilterRandomAssignment(t *testing.T) {
var (
p = PortFilter{}
nodes = []*cluster.Node{
cluster.NewNode("node-1", 0),
cluster.NewNode("node-2", 0),
cluster.NewNode("node-3", 0),
nodes = []cluster.Node{
&FakeNode{
id: "node-0-id",
name: "node-0-name",
addr: "node-0",
},
&FakeNode{
id: "node-1-id",
name: "node-1-name",
addr: "node-1",
},
&FakeNode{
id: "node-2-id",
name: "node-2-name",
addr: "node-2",
},
}
result []*cluster.Node
result []cluster.Node
err error
)
@ -194,17 +233,15 @@ func TestPortFilterRandomAssignment(t *testing.T) {
// mapped port.
container := &cluster.Container{
Container: dockerclient.Container{Id: "c1"},
Info: dockerclient.ContainerInfo{
HostConfig: &dockerclient.HostConfig{
PortBindings: map[string][]dockerclient.PortBinding{
"80/tcp": {
{
HostIp: "",
HostPort: "",
},
Info: dockerclient.ContainerInfo{HostConfig: &dockerclient.HostConfig{
PortBindings: map[string][]dockerclient.PortBinding{
"80/tcp": {
{
HostIp: "",
HostPort: "",
},
},
},
}},
NetworkSettings: struct {
IpAddress string
IpPrefixLen int
@ -224,14 +261,14 @@ func TestPortFilterRandomAssignment(t *testing.T) {
},
}
assert.NoError(t, nodes[0].AddContainer(container))
if n, ok := nodes[0].(*FakeNode); ok {
assert.NoError(t, n.AddContainer(container))
}
// Request port 80.
config := &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("", "80"),
},
}
config := &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("", "80"),
}}
// Since port "80" has been mapped to "1234", we should be able to request "80".
result, err = p.Filter(config, nodes)
@ -239,11 +276,9 @@ func TestPortFilterRandomAssignment(t *testing.T) {
assert.Equal(t, result, nodes)
// However, we should not be able to request "1234" since it has been used for a random assignment.
config = &dockerclient.ContainerConfig{
HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("", "1234"),
},
}
config = &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{
PortBindings: makeBinding("", "1234"),
}}
result, err = p.Filter(config, nodes)
assert.NoError(t, err)
assert.NotContains(t, result, nodes[0])

View File

@ -1,8 +1,6 @@
package scheduler
import (
"sync"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/filter"
"github.com/docker/swarm/scheduler/strategy"
@ -10,56 +8,23 @@ import (
)
type Scheduler struct {
sync.Mutex
cluster *cluster.Cluster
strategy strategy.PlacementStrategy
filters []filter.Filter
}
func NewScheduler(cluster *cluster.Cluster, strategy strategy.PlacementStrategy, filters []filter.Filter) *Scheduler {
func New(strategy strategy.PlacementStrategy, filters []filter.Filter) *Scheduler {
return &Scheduler{
cluster: cluster,
strategy: strategy,
filters: filters,
}
}
// Find a nice home for our container.
func (s *Scheduler) selectNodeForContainer(config *dockerclient.ContainerConfig) (*cluster.Node, error) {
candidates := s.cluster.Nodes()
accepted, err := filter.ApplyFilters(s.filters, config, candidates)
func (s *Scheduler) SelectNodeForContainer(nodes []cluster.Node, config *dockerclient.ContainerConfig) (cluster.Node, error) {
accepted, err := filter.ApplyFilters(s.filters, config, nodes)
if err != nil {
return nil, err
}
return s.strategy.PlaceContainer(config, accepted)
}
// Schedule a brand new container into the cluster.
func (s *Scheduler) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) {
/*Disable for now
if config.Memory == 0 || config.CpuShares == 0 {
return nil, fmt.Errorf("Creating containers in clustering mode requires resource constraints (-c and -m) to be set")
}
*/
s.Lock()
defer s.Unlock()
node, err := s.selectNodeForContainer(config)
if err != nil {
return nil, err
}
return s.cluster.DeployContainer(node, config, name)
}
// Remove a container from the cluster. Containers should always be destroyed
// through the scheduler to guarantee atomicity.
func (s *Scheduler) RemoveContainer(container *cluster.Container, force bool) error {
s.Lock()
defer s.Unlock()
return s.cluster.DestroyContainer(container, force)
}

View File

@ -18,12 +18,12 @@ func (p *BinPackingPlacementStrategy) Initialize() error {
return nil
}
func (p *BinPackingPlacementStrategy) PlaceContainer(config *dockerclient.ContainerConfig, nodes []*cluster.Node) (*cluster.Node, error) {
func (p *BinPackingPlacementStrategy) PlaceContainer(config *dockerclient.ContainerConfig, nodes []cluster.Node) (cluster.Node, error) {
scores := scores{}
for _, node := range nodes {
nodeMemory := node.UsableMemory()
nodeCpus := node.UsableCpus()
nodeMemory := node.TotalMemory()
nodeCpus := node.TotalCpus()
// Skip nodes that are smaller than the requested resources.
if nodeMemory < int64(config.Memory) || nodeCpus < config.CpuShares {
@ -36,10 +36,10 @@ func (p *BinPackingPlacementStrategy) PlaceContainer(config *dockerclient.Contai
)
if config.CpuShares > 0 {
cpuScore = (node.ReservedCpus() + config.CpuShares) * 100 / nodeCpus
cpuScore = (node.UsedCpus() + config.CpuShares) * 100 / nodeCpus
}
if config.Memory > 0 {
memoryScore = (node.ReservedMemory() + config.Memory) * 100 / nodeMemory
memoryScore = (node.UsedMemory() + config.Memory) * 100 / nodeMemory
}
if cpuScore <= 100 && memoryScore <= 100 {
@ -57,7 +57,7 @@ func (p *BinPackingPlacementStrategy) PlaceContainer(config *dockerclient.Contai
}
type score struct {
node *cluster.Node
node cluster.Node
score int64
}

View File

@ -9,12 +9,15 @@ import (
"github.com/stretchr/testify/assert"
)
func createNode(ID string, memory int64, cpus int64) *cluster.Node {
node := cluster.NewNode(ID, 0.05)
node.ID = ID
node.Memory = memory * 1024 * 1024 * 1024
node.Cpus = cpus
return node
func createNode(ID string, memory int64, cpus int64) cluster.Node {
oc := 0.05
memory = int64(float64(memory) + float64(memory)*oc)
return &FakeNode{
id: ID,
addr: ID,
memory: memory * 1024 * 1024 * 1024,
cpus: cpus,
}
}
func createConfig(memory int64, cpus int64) *dockerclient.ContainerConfig {
@ -28,7 +31,7 @@ func createContainer(ID string, config *dockerclient.ContainerConfig) *cluster.C
func TestPlaceContainerMemory(t *testing.T) {
s := &BinPackingPlacementStrategy{}
nodes := []*cluster.Node{}
nodes := []cluster.Node{}
for i := 0; i < 2; i++ {
nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 2, 1))
}
@ -37,27 +40,25 @@ func TestPlaceContainerMemory(t *testing.T) {
config := createConfig(1, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
assert.Equal(t, node1.ReservedMemory(), 1024*1024*1024)
assert.NoError(t, AddContainer(node1, createContainer("c1", config)))
assert.Equal(t, node1.UsedMemory(), 1024*1024*1024)
// add another container 1G
config = createConfig(1, 1)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
assert.Equal(t, node2.ReservedMemory(), int64(2*1024*1024*1024))
assert.NoError(t, AddContainer(node2, createContainer("c2", config)))
assert.Equal(t, node2.UsedMemory(), int64(2*1024*1024*1024))
// check that both containers ended on the same node
assert.Equal(t, node1.ID, node2.ID, "")
assert.Equal(t, node1.ID(), node2.ID(), "")
assert.Equal(t, len(node1.Containers()), len(node2.Containers()), "")
}
func TestPlaceContainerCPU(t *testing.T) {
s := &BinPackingPlacementStrategy{}
nodes := []*cluster.Node{}
nodes := []cluster.Node{}
for i := 0; i < 2; i++ {
nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 1, 2))
}
@ -66,16 +67,15 @@ func TestPlaceContainerCPU(t *testing.T) {
config := createConfig(0, 1)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
assert.Equal(t, node1.ReservedCpus(), 1)
assert.NoError(t, AddContainer(node1, createContainer("c1", config)))
assert.Equal(t, node1.UsedCpus(), 1)
// add another container 1CPU
config = createConfig(0, 1)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
assert.Equal(t, node2.ReservedCpus(), 2)
assert.NoError(t, AddContainer(node2, createContainer("c2", config)))
assert.Equal(t, node2.UsedCpus(), 2)
// check that both containers ended on the same node
assert.Equal(t, node1.ID, node2.ID, "")
@ -85,7 +85,7 @@ func TestPlaceContainerCPU(t *testing.T) {
func TestPlaceContainerHuge(t *testing.T) {
s := &BinPackingPlacementStrategy{}
nodes := []*cluster.Node{}
nodes := []cluster.Node{}
for i := 0; i < 100; i++ {
nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 1, 1))
}
@ -94,7 +94,7 @@ func TestPlaceContainerHuge(t *testing.T) {
for i := 0; i < 100; i++ {
node, err := s.PlaceContainer(createConfig(0, 1), nodes)
assert.NoError(t, err)
assert.NoError(t, node.AddContainer(createContainer(fmt.Sprintf("c%d", i), createConfig(0, 100))))
assert.NoError(t, AddContainer(node, createContainer(fmt.Sprintf("c%d", i), createConfig(0, 1))))
}
// try to add another container 1CPU
@ -105,7 +105,7 @@ func TestPlaceContainerHuge(t *testing.T) {
for i := 100; i < 200; i++ {
node, err := s.PlaceContainer(createConfig(1, 0), nodes)
assert.NoError(t, err)
assert.NoError(t, node.AddContainer(createContainer(fmt.Sprintf("c%d", i), createConfig(1, 0))))
assert.NoError(t, AddContainer(node, createContainer(fmt.Sprintf("c%d", i), createConfig(1, 0))))
}
// try to add another container 1G
@ -117,31 +117,30 @@ func TestPlaceContainerOvercommit(t *testing.T) {
s, err := New("binpacking")
assert.NoError(t, err)
nodes := []*cluster.Node{createNode("node-1", 0, 1)}
nodes[0].Memory = 100
nodes := []cluster.Node{createNode("node-1", 100, 1)}
config := createConfig(0, 0)
// Below limit should still work.
config.Memory = 90
config.Memory = 90 * 1024 * 1024 * 1024
node, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.Equal(t, node, nodes[0])
// At memory limit should still work.
config.Memory = 100
config.Memory = 100 * 1024 * 1024 * 1024
node, err = s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.Equal(t, node, nodes[0])
// Up to 105% it should still work.
config.Memory = 105
config.Memory = 105 * 1024 * 1024 * 1024
node, err = s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.Equal(t, node, nodes[0])
// Above it should return an error.
config.Memory = 106
config.Memory = 106 * 1024 * 1024 * 1024
node, err = s.PlaceContainer(config, nodes)
assert.Error(t, err)
}
@ -150,7 +149,7 @@ func TestPlaceContainerOvercommit(t *testing.T) {
func TestPlaceContainerDemo(t *testing.T) {
s := &BinPackingPlacementStrategy{}
nodes := []*cluster.Node{}
nodes := []cluster.Node{}
for i := 0; i < 3; i++ {
nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 2, 4))
}
@ -166,13 +165,12 @@ func TestPlaceContainerDemo(t *testing.T) {
config = createConfig(1, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
assert.NoError(t, AddContainer(node1, createContainer("c1", config)))
// add another container 1G
config = createConfig(1, 0)
node1bis, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, node1bis.AddContainer(createContainer("c2", config)))
assert.NoError(t, AddContainer(node1bis, createContainer("c2", config)))
// check that both containers ended on the same node
assert.Equal(t, node1.ID, node1bis.ID, "")
@ -182,29 +180,29 @@ func TestPlaceContainerDemo(t *testing.T) {
config = createConfig(2, 0)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, node2.AddContainer(createContainer("c3", config)))
assert.NoError(t, AddContainer(node2, createContainer("c3", config)))
// check that it ends up on another node
assert.NotEqual(t, node1.ID, node2.ID, "")
assert.NotEqual(t, node1.ID(), node2.ID(), "")
// add another container 1G
config = createConfig(1, 0)
node3, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, node3.AddContainer(createContainer("c4", config)))
assert.NoError(t, AddContainer(node3, createContainer("c4", config)))
// check that it ends up on another node
assert.NotEqual(t, node1.ID, node3.ID, "")
assert.NotEqual(t, node2.ID, node3.ID, "")
assert.NotEqual(t, node1.ID(), node3.ID(), "")
assert.NotEqual(t, node2.ID(), node3.ID(), "")
// add another container 1G
config = createConfig(1, 0)
node3bis, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, node3bis.AddContainer(createContainer("c5", config)))
assert.NoError(t, AddContainer(node3bis, createContainer("c5", config)))
// check that it ends up on the same node
assert.Equal(t, node3.ID, node3bis.ID, "")
assert.Equal(t, node3.ID(), node3bis.ID(), "")
// try to add another container
config = createConfig(1, 0)
@ -214,23 +212,27 @@ func TestPlaceContainerDemo(t *testing.T) {
assert.Error(t, err)
// remove container in the middle
node2.CleanupContainers()
if n, ok := node2.(*FakeNode); ok {
n.containers = nil
n.usedmemory = 0
n.usedcpus = 0
}
// add another container
config = createConfig(1, 0)
node2bis, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, node2bis.AddContainer(createContainer("c6", config)))
assert.NoError(t, AddContainer(node2bis, createContainer("c6", config)))
// check it ends up on `node3`
assert.Equal(t, node2.ID, node2bis.ID, "")
assert.Equal(t, node2.ID(), node2bis.ID(), "")
assert.Equal(t, len(node2.Containers()), len(node2bis.Containers()), "")
}
func TestComplexPlacement(t *testing.T) {
s := &BinPackingPlacementStrategy{}
nodes := []*cluster.Node{}
nodes := []cluster.Node{}
for i := 0; i < 2; i++ {
nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 4, 4))
}
@ -239,23 +241,23 @@ func TestComplexPlacement(t *testing.T) {
config := createConfig(2, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
assert.NoError(t, AddContainer(node1, createContainer("c1", config)))
// add one container 3G
config = createConfig(3, 0)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
assert.NoError(t, AddContainer(node2, createContainer("c2", config)))
// check that they end up on separate nodes
assert.NotEqual(t, node1.ID, node2.ID)
assert.NotEqual(t, node1.ID(), node2.ID())
// add one container 1G
config = createConfig(1, 0)
node3, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
assert.NoError(t, node3.AddContainer(createContainer("c3", config)))
assert.NoError(t, AddContainer(node3, createContainer("c3", config)))
// check that it ends up on the same node as the 3G
assert.Equal(t, node2.ID, node3.ID)
assert.Equal(t, node2.ID(), node3.ID())
}

View File

@ -0,0 +1,53 @@
package strategy
import (
"errors"
"github.com/docker/swarm/cluster"
)
type FakeNode struct {
id string
name string
addr string
memory int64
usedmemory int64
cpus int64
usedcpus int64
containers []*cluster.Container
}
func (fn *FakeNode) ID() string { return fn.id }
func (fn *FakeNode) Name() string { return fn.name }
func (fn *FakeNode) IP() string { return "" }
func (fn *FakeNode) Addr() string { return fn.addr }
func (fn *FakeNode) Images() []*cluster.Image { return nil }
func (fn *FakeNode) Image(_ string) *cluster.Image { return nil }
func (fn *FakeNode) Containers() []*cluster.Container { return fn.containers }
func (fn *FakeNode) Container(_ string) *cluster.Container { return nil }
func (fn *FakeNode) TotalCpus() int64 { return fn.cpus }
func (fn *FakeNode) UsedCpus() int64 { return fn.usedcpus }
func (fn *FakeNode) TotalMemory() int64 { return fn.memory }
func (fn *FakeNode) UsedMemory() int64 { return fn.usedmemory }
func (fn *FakeNode) Labels() map[string]string { return nil }
func (fn *FakeNode) IsHealthy() bool { return true }
func (fn *FakeNode) AddContainer(container *cluster.Container) error {
memory := container.Info.Config.Memory
cpus := container.Info.Config.CpuShares
if fn.memory-memory < 0 || fn.cpus-cpus < 0 {
return errors.New("not enough resources")
}
fn.usedmemory = fn.usedmemory + memory
fn.usedcpus = fn.usedcpus + cpus
fn.containers = append(fn.containers, container)
return nil
}
func AddContainer(node cluster.Node, container *cluster.Container) error {
if n, ok := node.(*FakeNode); ok {
return n.AddContainer(container)
}
return errors.New("Not a FakeNode")
}

View File

@ -17,7 +17,7 @@ func (p *RandomPlacementStrategy) Initialize() error {
return nil
}
func (p *RandomPlacementStrategy) PlaceContainer(config *dockerclient.ContainerConfig, nodes []*cluster.Node) (*cluster.Node, error) {
func (p *RandomPlacementStrategy) PlaceContainer(config *dockerclient.ContainerConfig, nodes []cluster.Node) (cluster.Node, error) {
if size := len(nodes); size > 0 {
n := rand.Intn(len(nodes))
for i, node := range nodes {

View File

@ -12,7 +12,7 @@ type PlacementStrategy interface {
Initialize() error
// Given a container configuration and a set of nodes, select the target
// node where the container should be scheduled.
PlaceContainer(config *dockerclient.ContainerConfig, nodes []*cluster.Node) (*cluster.Node, error)
PlaceContainer(config *dockerclient.ContainerConfig, nodes []cluster.Node) (cluster.Node, error)
}
var (