removed nodes.go

Signed-off-by: Victor Vieux <vieux@docker.com>
This commit is contained in:
Victor Vieux 2015-02-26 17:55:10 -08:00
parent d8042f9677
commit a8885ab997
6 changed files with 161 additions and 236 deletions

View File

@ -7,5 +7,4 @@ type Options struct {
OvercommitRatio float64
Discovery string
Heartbeat int
EventsHandler EventHandler
}

View File

@ -1,144 +0,0 @@
package swarm
import (
"errors"
"sync"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
)
var (
ErrNodeNotConnected = errors.New("node is not connected to docker's REST API")
ErrNodeAlreadyRegistered = errors.New("node was already")
)
type Nodes struct {
sync.RWMutex
eventHandlers []cluster.EventHandler
nodes map[string]*Node
}
func NewNodes() *Nodes {
return &Nodes{
nodes: make(map[string]*Node),
}
}
func (c *Nodes) Handle(e *cluster.Event) error {
for _, eventHandler := range c.eventHandlers {
if err := eventHandler.Handle(e); err != nil {
log.Error(err)
}
}
return nil
}
// Register a node within the cluster. The node must have been already
// initialized.
func (c *Nodes) Add(n *Node) error {
if !n.IsConnected() {
return ErrNodeNotConnected
}
c.Lock()
defer c.Unlock()
if old, exists := c.nodes[n.id]; exists {
if old.ip != n.ip {
log.Errorf("ID duplicated. %s shared by %s and %s", n.id, old.IP(), n.IP())
}
return ErrNodeAlreadyRegistered
}
c.nodes[n.id] = n
return n.Events(c)
}
// Containers returns all the images in the cluster.
func (c *Nodes) Images() []*cluster.Image {
c.Lock()
defer c.Unlock()
out := []*cluster.Image{}
for _, n := range c.nodes {
out = append(out, n.Images()...)
}
return out
}
// Image returns an image with IdOrName in the cluster
func (c *Nodes) Image(IdOrName string) *cluster.Image {
// Abort immediately if the name is empty.
if len(IdOrName) == 0 {
return nil
}
c.RLock()
defer c.RUnlock()
for _, n := range c.nodes {
if image := n.Image(IdOrName); image != nil {
return image
}
}
return nil
}
// Containers returns all the containers in the cluster.
func (c *Nodes) Containers() []*cluster.Container {
c.Lock()
defer c.Unlock()
out := []*cluster.Container{}
for _, n := range c.nodes {
out = append(out, n.Containers()...)
}
return out
}
// Container returns the container with IdOrName in the cluster
func (c *Nodes) Container(IdOrName string) *cluster.Container {
// Abort immediately if the name is empty.
if len(IdOrName) == 0 {
return nil
}
c.RLock()
defer c.RUnlock()
for _, n := range c.nodes {
if container := n.Container(IdOrName); container != nil {
return container
}
}
return nil
}
// Nodes returns the list of nodes in the cluster
func (c *Nodes) List() []cluster.Node {
nodes := []cluster.Node{}
c.RLock()
for _, node := range c.nodes {
nodes = append(nodes, node)
}
c.RUnlock()
return nodes
}
func (c *Nodes) Get(addr string) *Node {
for _, node := range c.nodes {
if node.addr == addr {
return node
}
}
return nil
}
func (c *Nodes) Events(h cluster.EventHandler) error {
c.eventHandlers = append(c.eventHandlers, h)
return nil
}

View File

@ -1,68 +0,0 @@
package swarm
import (
"testing"
"github.com/docker/swarm/cluster"
"github.com/samalba/dockerclient"
"github.com/stretchr/testify/assert"
)
func createNode(t *testing.T, ID string, containers ...dockerclient.Container) *Node {
node := NewNode(ID, 0)
node.name = ID
node.id = ID
for _, container := range containers {
node.AddContainer(&cluster.Container{Container: container, Node: node})
}
return node
}
func TestAdd(t *testing.T) {
c := NewNodes()
assert.Equal(t, len(c.List()), 0)
assert.Nil(t, c.Get("test"))
assert.Nil(t, c.Get("test2"))
n := createNode(t, "test")
c.nodes[n.ID()] = n
assert.Equal(t, len(c.List()), 1)
assert.NotNil(t, c.Get("test"))
n = createNode(t, "test")
c.nodes[n.ID()] = n
assert.Equal(t, len(c.List()), 1)
assert.NotNil(t, c.Get("test"))
n = createNode(t, "test2")
c.nodes[n.ID()] = n
assert.Equal(t, len(c.List()), 2)
assert.NotNil(t, c.Get("test2"))
}
func TestContainerLookup(t *testing.T) {
c := NewNodes()
container := dockerclient.Container{
Id: "container-id",
Names: []string{"/container-name1", "/container-name2"},
}
n := createNode(t, "test-node", container)
c.nodes[n.ID()] = n
// Invalid lookup
assert.Nil(t, c.Container("invalid-id"))
assert.Nil(t, c.Container(""))
// Container ID lookup.
assert.NotNil(t, c.Container("container-id"))
// Container ID prefix lookup.
assert.NotNil(t, c.Container("container-"))
// Container name lookup.
assert.NotNil(t, c.Container("container-name1"))
assert.NotNil(t, c.Container("container-name2"))
// Container node/name matching.
assert.NotNil(t, c.Container("test-node/container-name1"))
assert.NotNil(t, c.Container("test-node/container-name2"))
}

View File

@ -16,24 +16,24 @@ import (
type SwarmCluster struct {
sync.RWMutex
nodes *Nodes
scheduler *scheduler.Scheduler
options *cluster.Options
store *state.Store
eventHandler cluster.EventHandler
nodes map[string]*Node
scheduler *scheduler.Scheduler
options *cluster.Options
store *state.Store
}
func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, options *cluster.Options) cluster.Cluster {
func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, eventhandler cluster.EventHandler, options *cluster.Options) cluster.Cluster {
log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster")
cluster := &SwarmCluster{
nodes: NewNodes(),
scheduler: scheduler,
options: options,
store: store,
eventHandler: eventhandler,
nodes: make(map[string]*Node),
scheduler: scheduler,
options: options,
store: store,
}
cluster.nodes.Events(options.EventsHandler)
// get the list of entries from the discovery service
go func() {
d, err := discovery.New(options.Discovery, options.Heartbeat)
@ -54,13 +54,21 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, options *clu
return cluster
}
// callback for the events
func (s *SwarmCluster) Handle(e *cluster.Event) error {
if err := s.eventHandler.Handle(e); err != nil {
log.Error(err)
}
return nil
}
// Schedule a brand new container into the cluster.
func (s *SwarmCluster) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) {
s.RLock()
defer s.RUnlock()
node, err := s.scheduler.SelectNodeForContainer(s.nodes.List(), config)
node, err := s.scheduler.SelectNodeForContainer(s.listNodes(), config)
if err != nil {
return nil, err
}
@ -108,41 +116,124 @@ func (s *SwarmCluster) RemoveContainer(container *cluster.Container, force bool)
func (s *SwarmCluster) newEntries(entries []*discovery.Entry) {
for _, entry := range entries {
go func(m *discovery.Entry) {
if s.nodes.Get(m.String()) == nil {
if s.getNode(m.String()) == nil {
n := NewNode(m.String(), s.options.OvercommitRatio)
if err := n.Connect(s.options.TLSConfig); err != nil {
log.Error(err)
return
}
if err := s.nodes.Add(n); err != nil {
log.Error(err)
s.Lock()
if old, exists := s.nodes[n.id]; exists {
s.Unlock()
if old.ip != n.ip {
log.Errorf("ID duplicated. %s shared by %s and %s", n.id, old.IP(), n.IP())
} else {
log.Errorf("node %q is already registered", n.id)
}
return
}
s.nodes[n.id] = n
if err := n.Events(s); err != nil {
log.Error(err)
s.Unlock()
return
}
s.Unlock()
}
}(entry)
}
}
func (s *SwarmCluster) getNode(addr string) *Node {
for _, node := range s.nodes {
if node.addr == addr {
return node
}
}
return nil
}
// Containers returns all the images in the cluster.
func (s *SwarmCluster) Images() []*cluster.Image {
return s.nodes.Images()
s.RLock()
defer s.RUnlock()
out := []*cluster.Image{}
for _, n := range s.nodes {
out = append(out, n.Images()...)
}
return out
}
// Image returns an image with IdOrName in the cluster
func (s *SwarmCluster) Image(IdOrName string) *cluster.Image {
return s.nodes.Image(IdOrName)
// Abort immediately if the name is empty.
if len(IdOrName) == 0 {
return nil
}
s.RLock()
defer s.RUnlock()
for _, n := range s.nodes {
if image := n.Image(IdOrName); image != nil {
return image
}
}
return nil
}
// Containers returns all the containers in the cluster.
func (s *SwarmCluster) Containers() []*cluster.Container {
return s.nodes.Containers()
s.RLock()
defer s.RUnlock()
out := []*cluster.Container{}
for _, n := range s.nodes {
out = append(out, n.Containers()...)
}
return out
}
// Container returns the container with IdOrName in the cluster
func (s *SwarmCluster) Container(IdOrName string) *cluster.Container {
return s.nodes.Container(IdOrName)
// Abort immediately if the name is empty.
if len(IdOrName) == 0 {
return nil
}
s.RLock()
defer s.RUnlock()
for _, n := range s.nodes {
if container := n.Container(IdOrName); container != nil {
return container
}
}
return nil
}
// nodes returns all the nodess in the cluster.
func (s *SwarmCluster) listNodes() []cluster.Node {
s.RLock()
defer s.RUnlock()
out := []cluster.Node{}
for _, n := range s.nodes {
out = append(out, n)
}
return out
}
func (s *SwarmCluster) Info() [][2]string {
info := [][2]string{{"\bNodes", fmt.Sprintf("%d", len(s.nodes.List()))}}
info := [][2]string{{"\bNodes", fmt.Sprintf("%d", len(s.nodes))}}
for _, node := range s.nodes.List() {
for _, node := range s.nodes {
info = append(info, [2]string{node.Name(), node.Addr()})
info = append(info, [2]string{" └ Containers", fmt.Sprintf("%d", len(node.Containers()))})
info = append(info, [2]string{" └ Reserved CPUs", fmt.Sprintf("%d / %d", node.UsedCpus(), node.TotalCpus())})

View File

@ -0,0 +1,48 @@
package swarm
import (
"testing"
"github.com/docker/swarm/cluster"
"github.com/samalba/dockerclient"
"github.com/stretchr/testify/assert"
)
func createNode(t *testing.T, ID string, containers ...dockerclient.Container) *Node {
node := NewNode(ID, 0)
node.name = ID
node.id = ID
for _, container := range containers {
node.AddContainer(&cluster.Container{Container: container, Node: node})
}
return node
}
func TestContainerLookup(t *testing.T) {
s := &SwarmCluster{
nodes: make(map[string]*Node),
}
container := dockerclient.Container{
Id: "container-id",
Names: []string{"/container-name1", "/container-name2"},
}
n := createNode(t, "test-node", container)
s.nodes[n.ID()] = n
// Invalid lookup
assert.Nil(t, s.Container("invalid-id"))
assert.Nil(t, s.Container(""))
// Container ID lookup.
assert.NotNil(t, s.Container("container-id"))
// Container ID prefix lookup.
assert.NotNil(t, s.Container("container-"))
// Container name lookup.
assert.NotNil(t, s.Container("container-name1"))
assert.NotNil(t, s.Container("container-name2"))
// Container node/name matching.
assert.NotNil(t, s.Container("test-node/container-name1"))
assert.NotNil(t, s.Container("test-node/container-name2"))
}

View File

@ -126,14 +126,13 @@ func manage(c *cli.Context) {
OvercommitRatio: c.Float64("overcommit"),
Discovery: dflag,
Heartbeat: c.Int("heartbeat"),
EventsHandler: eventsHandler,
}
var cluster cluster.Cluster
switch c.String("cluster") {
case "swarm":
cluster = swarm.NewCluster(sched, store, options)
cluster = swarm.NewCluster(sched, store, eventsHandler, options)
case "mesos":
cluster = mesos.NewCluster(sched, options)
default: