Merge pull request #243 from vieux/affinity

add affinity (start next to a container/image)
This commit is contained in:
Andrea Luzzardi 2015-01-19 14:02:03 -08:00
commit 1e92f854d5
11 changed files with 321 additions and 49 deletions

View File

@ -20,6 +20,7 @@ func createNode(t *testing.T, ID string, containers ...dockerclient.Container) *
client := mockclient.NewMockClient()
client.On("Info").Return(mockInfo, nil)
client.On("ListContainers", true, false, "").Return(containers, nil)
client.On("ListImages").Return([]*dockerclient.Image{}, nil)
client.On("InspectContainer", mock.Anything).Return(
&dockerclient.ContainerInfo{
Config: &dockerclient.ContainerConfig{CpuShares: 100},

View File

@ -46,6 +46,7 @@ type Node struct {
ch chan bool
containers map[string]*Container
images []*dockerclient.Image
client dockerclient.Client
eventHandler EventHandler
healthy bool
@ -84,6 +85,10 @@ func (n *Node) connectClient(client dockerclient.Client) error {
n.client = nil
return err
}
if err := n.refreshImages(); err != nil {
n.client = nil
return err
}
// Start the update loop.
go n.refreshLoop()
@ -131,6 +136,18 @@ func (n *Node) updateSpecs() error {
return nil
}
// Refresh the list of images on the node.
func (n *Node) refreshImages() error {
images, err := n.client.ListImages()
if err != nil {
return err
}
n.Lock()
n.images = images
n.Unlock()
return nil
}
// Refresh the list and status of containers running on the node.
func (n *Node) refreshContainers() error {
containers, err := n.client.ListContainers(true, false, "")
@ -237,6 +254,10 @@ func (n *Node) refreshLoop() {
err = n.refreshContainers()
}
if err == nil {
err = n.refreshImages()
}
if err != nil {
n.healthy = false
log.Errorf("[%s/%s] Flagging node as dead. Updated state failed: %v", n.ID, n.Name, err)
@ -316,23 +337,6 @@ func (n *Node) Create(config *dockerclient.ContainerConfig, name string, pullIma
return n.containers[id], nil
}
func (n *Node) ListImages() ([]string, error) {
images, err := n.client.ListImages()
if err != nil {
return nil, err
}
out := []string{}
for _, i := range images {
for _, t := range i.RepoTags {
out = append(out, t)
}
}
return out, nil
}
// Destroy and remove a container from the node.
func (n *Node) Destroy(container *Container, force bool) error {
if err := n.client.RemoveContainer(container.Id, force); err != nil {
@ -374,13 +378,27 @@ func (n *Node) Containers() []*Container {
return containers
}
func (n *Node) Images() []*dockerclient.Image {
images := []*dockerclient.Image{}
n.RLock()
for _, image := range n.images {
images = append(images, image)
}
n.RUnlock()
return images
}
func (n *Node) String() string {
return fmt.Sprintf("node %s addr %s", n.ID, n.Addr)
}
func (n *Node) handler(ev *dockerclient.Event, args ...interface{}) {
// Something changed - refresh our internal state.
n.refreshContainer(ev.Id)
if ev.Status == "pull" || ev.Status == "untag" || ev.Status == "delete" {
n.refreshImages()
} else {
n.refreshContainer(ev.Id)
}
// If there is no event handler registered, abort right now.
if n.eventHandler == nil {
@ -407,6 +425,14 @@ func (n *Node) AddContainer(container *Container) error {
return nil
}
// Inject an image into the internal state.
func (n *Node) AddImage(image *dockerclient.Image) {
n.Lock()
defer n.Unlock()
n.images = append(n.images, image)
}
// Remove a container from the internal test.
func (n *Node) RemoveContainer(container *Container) error {
n.Lock()

View File

@ -58,6 +58,7 @@ func TestNodeCpusMemory(t *testing.T) {
client := mockclient.NewMockClient()
client.On("Info").Return(mockInfo, nil)
client.On("ListContainers", true, false, "").Return([]dockerclient.Container{}, nil)
client.On("ListImages").Return([]*dockerclient.Image{}, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything).Return()
assert.NoError(t, node.connectClient(client))
@ -77,6 +78,7 @@ func TestNodeSpecs(t *testing.T) {
client := mockclient.NewMockClient()
client.On("Info").Return(mockInfo, nil)
client.On("ListContainers", true, false, "").Return([]dockerclient.Container{}, nil)
client.On("ListImages").Return([]*dockerclient.Image{}, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything).Return()
assert.NoError(t, node.connectClient(client))
@ -104,6 +106,7 @@ func TestNodeState(t *testing.T) {
// The client will return one container at first, then a second one will appear.
client.On("ListContainers", true, false, "").Return([]dockerclient.Container{{Id: "one"}}, nil).Once()
client.On("ListImages").Return([]*dockerclient.Image{}, nil).Once()
client.On("InspectContainer", "one").Return(&dockerclient.ContainerInfo{Config: &dockerclient.ContainerConfig{CpuShares: 100}}, nil).Once()
client.On("ListContainers", true, false, fmt.Sprintf("{%q:[%q]}", "id", "two")).Return([]dockerclient.Container{{Id: "two"}}, nil).Once()
client.On("InspectContainer", "two").Return(&dockerclient.ContainerInfo{Config: &dockerclient.ContainerConfig{CpuShares: 100}}, nil).Once()
@ -147,6 +150,7 @@ func TestCreateContainer(t *testing.T) {
client.On("Info").Return(mockInfo, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything).Return()
client.On("ListContainers", true, false, "").Return([]dockerclient.Container{}, nil).Once()
client.On("ListImages").Return([]*dockerclient.Image{}, nil).Once()
assert.NoError(t, node.connectClient(client))
assert.True(t, node.IsConnected())
@ -158,6 +162,7 @@ func TestCreateContainer(t *testing.T) {
id := "id1"
client.On("CreateContainer", &mockConfig, name).Return(id, nil).Once()
client.On("ListContainers", true, false, fmt.Sprintf(`{"id":[%q]}`, id)).Return([]dockerclient.Container{{Id: id}}, nil).Once()
client.On("ListImages").Return([]*dockerclient.Image{}, nil).Once()
client.On("InspectContainer", id).Return(&dockerclient.ContainerInfo{Config: config}, nil).Once()
container, err := node.Create(config, name, false)
assert.Nil(t, err)
@ -180,6 +185,7 @@ func TestCreateContainer(t *testing.T) {
client.On("CreateContainer", &mockConfig, name).Return("", dockerclient.ErrNotFound).Once()
client.On("CreateContainer", &mockConfig, name).Return(id, nil).Once()
client.On("ListContainers", true, false, fmt.Sprintf(`{"id":[%q]}`, id)).Return([]dockerclient.Container{{Id: id}}, nil).Once()
client.On("ListImages").Return([]*dockerclient.Image{}, nil).Once()
client.On("InspectContainer", id).Return(&dockerclient.ContainerInfo{Config: config}, nil).Once()
container, err = node.Create(config, name, true)
assert.Nil(t, err)

View File

@ -81,7 +81,7 @@ var (
}
flFilter = cli.StringSliceFlag{
Name: "filter, f",
Usage: "filter to use [constraint, health, port]",
Value: &cli.StringSlice{"constraint", "health", "port"},
Usage: "filter to use [constraint, affinity, health, port]",
Value: &cli.StringSlice{"constraint", "affinity", "health", "port"},
}
)

View File

@ -7,6 +7,7 @@ These filters are used to schedule containers on a subset of nodes.
`Docker Swarm` currently supports 3 filters:
* [Constraint](README.md#constraint-filter)
* [Affinity](README.md#affinity-filter)
* [Port](README.md#port-filter)
* [Healthy](README.md#healthy-filter)
@ -79,6 +80,73 @@ Those tags are sourced from `docker info` and currently include:
* kernelversion
* operatingsystem
## Affinity Filter
#### Containers
You can schedule 2 containers and make the container #2 next to the container #1.
```
$ docker run -d -p 80:80 --name front nginx
87c4376856a8
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NODE NAMES
87c4376856a8 nginx:latest "nginx" Less than a second ago running 192.168.0.42:80->80/tcp node-1 front
```
Using `-e affinity:container=front` will schedule a container next to the container `front`.
You can also use IDs instead of name: `-e affinity:container=87c4376856a8`
```
$ docker run -d --name logger -e affinity:container=front logger
87c4376856a8
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NODE NAMES
87c4376856a8 nginx:latest "nginx" Less than a second ago running 192.168.0.42:80->80/tcp node-1 front
963841b138d8 logger:latest "logger" Less than a second ago running node-1 logger
```
The `logger` container ends up on `node-1` because his affinity with the container `front`.
#### Images
You can schedule a container only on nodes where the images is already pulled.
```
$ docker -H node-1:2375 pull redis
$ docker -H node-2:2375 pull mysql
$ docker -H node-2:2375 pull redis
```
Here only `node-1` and `node-3` have the `redis` image. Using `-e affinity:image=redis` we can
schedule container only on these 2 nodes. You can also use the image ID instead of it's name.
```
$ docker run -d --name redis1 -e affinity:image=redis redis
$ docker run -d --name redis2 -e affinity:image=redis redis
$ docker run -d --name redis3 -e affinity:image=redis redis
$ docker run -d --name redis4 -e affinity:image=redis redis
$ docker run -d --name redis5 -e affinity:image=redis redis
$ docker run -d --name redis6 -e affinity:image=redis redis
$ docker run -d --name redis7 -e affinity:image=redis redis
$ docker run -d --name redis8 -e affinity:image=redis redis
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NODE NAMES
87c4376856a8 redis:latest "redis" Less than a second ago running node-1 redis1
1212386856a8 redis:latest "redis" Less than a second ago running node-1 redis2
87c4376639a8 redis:latest "redis" Less than a second ago running node-3 redis3
1234376856a8 redis:latest "redis" Less than a second ago running node-1 redis4
86c2136253a8 redis:latest "redis" Less than a second ago running node-3 redis5
87c3236856a8 redis:latest "redis" Less than a second ago running node-3 redis6
87c4376856a8 redis:latest "redis" Less than a second ago running node-3 redis7
963841b138d8 redis:latest "redis" Less than a second ago running node-1 redis8
```
As you can see here, the containers were only scheduled on nodes with the redis imagealreayd pulled.
## Port Filter
With this filter, `ports` are considered as a unique resource.

View File

@ -0,0 +1,51 @@
package filter
import (
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
"github.com/samalba/dockerclient"
)
// AffinityFilter selects only nodes based on other containers on the node.
type AffinityFilter struct {
}
func (f *AffinityFilter) Filter(config *dockerclient.ContainerConfig, nodes []*cluster.Node) ([]*cluster.Node, error) {
affinities := extractEnv("affinity", config.Env)
for k, v := range affinities {
log.Debugf("matching affinity: %s=%s", k, v)
candidates := []*cluster.Node{}
for _, node := range nodes {
switch k {
case "container":
for _, container := range node.Containers() {
if match(v, container.Id) || match(v, container.Names[0]) {
candidates = append(candidates, node)
break
}
}
case "image":
done:
for _, image := range node.Images() {
if match(v, image.Id) {
candidates = append(candidates, node)
break
}
for _, t := range image.RepoTags {
if match(v, t) {
candidates = append(candidates, node)
break done
}
}
}
}
}
if len(candidates) == 0 {
return nil, fmt.Errorf("unable to find a node that satisfies %s == %s", k, v)
}
nodes = candidates
}
return nodes, nil
}

View File

@ -0,0 +1,110 @@
package filter
import (
"testing"
"github.com/docker/swarm/cluster"
"github.com/samalba/dockerclient"
"github.com/stretchr/testify/assert"
)
func TestAffinityFilter(t *testing.T) {
var (
f = AffinityFilter{}
nodes = []*cluster.Node{
cluster.NewNode("node-0", 0),
cluster.NewNode("node-1", 0),
cluster.NewNode("node-2", 0),
}
result []*cluster.Node
err error
)
nodes[0].ID = "node-0-id"
nodes[0].Name = "node-0-name"
nodes[0].AddContainer(&cluster.Container{
Container: dockerclient.Container{
Id: "container-0-id",
Names: []string{"container-0-name"},
},
})
nodes[0].AddImage(&dockerclient.Image{
Id: "image-0-id",
RepoTags: []string{"image-0:tag1", "image-0:tag2"},
})
nodes[1].ID = "node-1-id"
nodes[1].Name = "node-1-name"
nodes[1].AddContainer(&cluster.Container{
Container: dockerclient.Container{
Id: "container-1-id",
Names: []string{"container-1-name"},
},
})
nodes[1].AddImage(&dockerclient.Image{
Id: "image-1-id",
RepoTags: []string{"image-1:tag1", "image-0:tag3", "image-1:tag2"},
})
nodes[2].ID = "node-2-id"
nodes[2].Name = "node-2-name"
// Without constraints we should get the unfiltered list of nodes back.
result, err = f.Filter(&dockerclient.ContainerConfig{}, nodes)
assert.NoError(t, err)
assert.Equal(t, result, nodes)
// Set a constraint that cannot be fullfilled and expect an error back.
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"affinity:container=does_not_exsits"},
}, nodes)
assert.Error(t, err)
// Set a contraint that can only be filled by a single node.
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"affinity:container=container-0*"},
}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[0])
// This constraint can only be fullfilled by a subset of nodes.
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"affinity:container=container-*"},
}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 2)
assert.NotContains(t, result, nodes[2])
// Validate by id.
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"affinity:container=container-0-id"},
}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[0])
// Validate by name.
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"affinity:container=container-1-name"},
}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[1])
// Validate images by id
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"affinity:image=image-0-id"},
}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[0])
// Validate images by name
result, err = f.Filter(&dockerclient.ContainerConfig{
Env: []string{"affinity:image=image-0:tag3"},
}, nodes)
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, result[0], nodes[1])
}

View File

@ -2,8 +2,6 @@ package filter
import (
"fmt"
"regexp"
"strings"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
@ -14,31 +12,8 @@ import (
type ConstraintFilter struct {
}
func (f *ConstraintFilter) extractConstraints(env []string) map[string]string {
constraints := make(map[string]string)
for _, e := range env {
if strings.HasPrefix(e, "constraint:") {
constraint := strings.TrimPrefix(e, "constraint:")
parts := strings.SplitN(constraint, "=", 2)
constraints[strings.ToLower(parts[0])] = strings.ToLower(parts[1])
}
}
return constraints
}
// Create the regex for globbing (ex: ub*t* -> ^ub.*t.*$)
// and match.
func (f *ConstraintFilter) match(pattern, s string) bool {
regex := "^" + strings.Replace(pattern, "*", ".*", -1) + "$"
matched, err := regexp.MatchString(regex, strings.ToLower(s))
if err != nil {
log.Error(err)
}
return matched
}
func (f *ConstraintFilter) Filter(config *dockerclient.ContainerConfig, nodes []*cluster.Node) ([]*cluster.Node, error) {
constraints := f.extractConstraints(config.Env)
constraints := extractEnv("constraint", config.Env)
for k, v := range constraints {
log.Debugf("matching constraint: %s=%s", k, v)
candidates := []*cluster.Node{}
@ -46,13 +21,13 @@ func (f *ConstraintFilter) Filter(config *dockerclient.ContainerConfig, nodes []
switch k {
case "node":
// "node" label is a special case pinning a container to a specific node.
if f.match(v, node.ID) || f.match(v, node.Name) {
if match(v, node.ID) || match(v, node.Name) {
candidates = append(candidates, node)
}
default:
// By default match the node labels.
if label, ok := node.Labels[k]; ok {
if f.match(v, label) {
if match(v, label) {
candidates = append(candidates, node)
}
}

View File

@ -8,7 +8,7 @@ import (
"github.com/stretchr/testify/assert"
)
func TestConstrainteFilter(t *testing.T) {
func TestConstraintFilter(t *testing.T) {
var (
f = ConstraintFilter{}
nodes = []*cluster.Node{

View File

@ -20,6 +20,7 @@ var (
func init() {
filters = map[string]Filter{
"affinity": &AffinityFilter{},
"health": &HealthFilter{},
"constraint": &ConstraintFilter{},
"port": &PortFilter{},

34
scheduler/filter/utils.go Normal file
View File

@ -0,0 +1,34 @@
package filter
import (
"regexp"
"strings"
log "github.com/Sirupsen/logrus"
)
func extractEnv(key string, env []string) map[string]string {
values := make(map[string]string)
for _, e := range env {
if strings.HasPrefix(e, key+":") {
value := strings.TrimPrefix(e, key+":")
parts := strings.SplitN(value, "=", 2)
if len(parts) == 2 {
values[strings.ToLower(parts[0])] = strings.ToLower(parts[1])
} else {
values[strings.ToLower(parts[0])] = ""
}
}
}
return values
}
// Create the regex for globbing (ex: ub*t* -> ^ub.*t.*$) and match.
func match(pattern, s string) bool {
regex := "^" + strings.Replace(pattern, "*", ".*", -1) + "$"
matched, err := regexp.MatchString(regex, strings.ToLower(s))
if err != nil {
log.Error(err)
}
return matched
}