mirror of https://github.com/docker/docs.git
297 lines
6.9 KiB
Go
297 lines
6.9 KiB
Go
package swarm
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
log "github.com/Sirupsen/logrus"
|
|
"github.com/docker/docker/pkg/units"
|
|
"github.com/docker/swarm/cluster"
|
|
"github.com/docker/swarm/discovery"
|
|
"github.com/docker/swarm/scheduler"
|
|
"github.com/docker/swarm/state"
|
|
"github.com/samalba/dockerclient"
|
|
)
|
|
|
|
type Cluster struct {
|
|
sync.RWMutex
|
|
|
|
eventHandler cluster.EventHandler
|
|
nodes map[string]*node
|
|
scheduler *scheduler.Scheduler
|
|
options *cluster.Options
|
|
store *state.Store
|
|
}
|
|
|
|
func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, eventhandler cluster.EventHandler, options *cluster.Options) cluster.Cluster {
|
|
log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster")
|
|
|
|
cluster := &Cluster{
|
|
eventHandler: eventhandler,
|
|
nodes: make(map[string]*node),
|
|
scheduler: scheduler,
|
|
options: options,
|
|
store: store,
|
|
}
|
|
|
|
// get the list of entries from the discovery service
|
|
go func() {
|
|
d, err := discovery.New(options.Discovery, options.Heartbeat)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
entries, err := d.Fetch()
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
|
|
}
|
|
cluster.newEntries(entries)
|
|
|
|
go d.Watch(cluster.newEntries)
|
|
}()
|
|
|
|
return cluster
|
|
}
|
|
|
|
// callback for the events
|
|
func (c *Cluster) Handle(e *cluster.Event) error {
|
|
if err := c.eventHandler.Handle(e); err != nil {
|
|
log.Error(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Schedule a brand new container into the cluster.
|
|
func (c *Cluster) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) {
|
|
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
retry:
|
|
// FIXME: to prevent a race, we check again after the pull if the node can still handle
|
|
// the container. We should store the state in the store before pulling and use this to check
|
|
// all the other container create, but, as we don't have a proper store yet, this temporary solution
|
|
// was chosen.
|
|
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if nn, ok := n.(*node); ok {
|
|
container, err := nn.create(config, name, false)
|
|
if err == dockerclient.ErrNotFound {
|
|
// image not on the node, try to pull
|
|
if err = nn.pull(config.Image); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// check if the container can still fit on this node
|
|
if _, err = c.scheduler.SelectNodeForContainer([]cluster.Node{n}, config); err != nil {
|
|
// if not, try to find another node
|
|
log.Debugf("Node %s not available anymore, selecting another one", n.Name())
|
|
goto retry
|
|
}
|
|
container, err = nn.create(config, name, false)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
st := &state.RequestedState{
|
|
ID: container.Id,
|
|
Name: name,
|
|
Config: config,
|
|
}
|
|
return container, c.store.Add(container.Id, st)
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
// Remove a container from the cluster. Containers should always be destroyed
|
|
// through the scheduler to guarantee atomicity.
|
|
func (c *Cluster) RemoveContainer(container *cluster.Container, force bool) error {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
if n, ok := container.Node.(*node); ok {
|
|
if err := n.destroy(container, force); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := c.store.Remove(container.Id); err != nil {
|
|
if err == state.ErrNotFound {
|
|
log.Debugf("Container %s not found in the store", container.Id)
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Entries are Docker Nodes
|
|
func (c *Cluster) newEntries(entries []*discovery.Entry) {
|
|
for _, entry := range entries {
|
|
go func(m *discovery.Entry) {
|
|
if c.getNode(m.String()) == nil {
|
|
n := NewNode(m.String(), c.options.OvercommitRatio)
|
|
if err := n.connect(c.options.TLSConfig); err != nil {
|
|
log.Error(err)
|
|
return
|
|
}
|
|
c.Lock()
|
|
|
|
if old, exists := c.nodes[n.id]; exists {
|
|
c.Unlock()
|
|
if old.ip != n.ip {
|
|
log.Errorf("ID duplicated. %s shared by %s and %s", n.id, old.IP(), n.IP())
|
|
} else {
|
|
log.Errorf("node %q is already registered", n.id)
|
|
}
|
|
return
|
|
}
|
|
c.nodes[n.id] = n
|
|
if err := n.events(c); err != nil {
|
|
log.Error(err)
|
|
c.Unlock()
|
|
return
|
|
}
|
|
c.Unlock()
|
|
|
|
}
|
|
}(entry)
|
|
}
|
|
}
|
|
|
|
func (c *Cluster) getNode(addr string) *node {
|
|
for _, node := range c.nodes {
|
|
if node.addr == addr {
|
|
return node
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Images returns all the images in the cluster.
|
|
func (c *Cluster) Images() []*cluster.Image {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
out := []*cluster.Image{}
|
|
for _, n := range c.nodes {
|
|
out = append(out, n.Images()...)
|
|
}
|
|
|
|
return out
|
|
}
|
|
|
|
// Image returns an image with IdOrName in the cluster
|
|
func (c *Cluster) Image(IdOrName string) *cluster.Image {
|
|
// Abort immediately if the name is empty.
|
|
if len(IdOrName) == 0 {
|
|
return nil
|
|
}
|
|
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
for _, n := range c.nodes {
|
|
if image := n.Image(IdOrName); image != nil {
|
|
return image
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RemoveImage removes an image from the cluster
|
|
func (c *Cluster) RemoveImage(image *cluster.Image) ([]*dockerclient.ImageDelete, error) {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
if n, ok := image.Node.(*node); ok {
|
|
return n.removeImage(image)
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func (c *Cluster) Pull(name string, callback func(what, status string)) {
|
|
size := len(c.nodes)
|
|
done := make(chan bool, size)
|
|
for _, n := range c.nodes {
|
|
go func(nn *node) {
|
|
if callback != nil {
|
|
callback(nn.Name(), "")
|
|
}
|
|
err := nn.pull(name)
|
|
if callback != nil {
|
|
if err != nil {
|
|
callback(nn.Name(), err.Error())
|
|
} else {
|
|
callback(nn.Name(), "downloaded")
|
|
}
|
|
}
|
|
done <- true
|
|
}(n)
|
|
}
|
|
for i := 0; i < size; i++ {
|
|
<-done
|
|
}
|
|
}
|
|
|
|
// Containers returns all the containers in the cluster.
|
|
func (c *Cluster) Containers() []*cluster.Container {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
out := []*cluster.Container{}
|
|
for _, n := range c.nodes {
|
|
out = append(out, n.Containers()...)
|
|
}
|
|
|
|
return out
|
|
}
|
|
|
|
// Container returns the container with IdOrName in the cluster
|
|
func (c *Cluster) Container(IdOrName string) *cluster.Container {
|
|
// Abort immediately if the name is empty.
|
|
if len(IdOrName) == 0 {
|
|
return nil
|
|
}
|
|
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
for _, n := range c.nodes {
|
|
if container := n.Container(IdOrName); container != nil {
|
|
return container
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// nodes returns all the nodess in the cluster.
|
|
func (c *Cluster) listNodes() []cluster.Node {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
out := []cluster.Node{}
|
|
for _, n := range c.nodes {
|
|
out = append(out, n)
|
|
}
|
|
|
|
return out
|
|
}
|
|
|
|
func (c *Cluster) Info() [][2]string {
|
|
info := [][2]string{{"\bNodes", fmt.Sprintf("%d", len(c.nodes))}}
|
|
|
|
for _, node := range c.nodes {
|
|
info = append(info, [2]string{node.Name(), node.Addr()})
|
|
info = append(info, [2]string{" └ Containers", fmt.Sprintf("%d", len(node.Containers()))})
|
|
info = append(info, [2]string{" └ Reserved CPUs", fmt.Sprintf("%d / %d", node.UsedCpus(), node.TotalCpus())})
|
|
info = append(info, [2]string{" └ Reserved Memory", fmt.Sprintf("%s / %s", units.BytesSize(float64(node.UsedMemory())), units.BytesSize(float64(node.TotalMemory())))})
|
|
}
|
|
|
|
return info
|
|
}
|