diff --git a/api/api.go b/api/api.go index b441d8cadb..4d0ccfc790 100644 --- a/api/api.go +++ b/api/api.go @@ -32,9 +32,10 @@ type handler func(c *context, w http.ResponseWriter, r *http.Request) // GET /info func getInfo(c *context, w http.ResponseWriter, r *http.Request) { - driverStatus := [][2]string{{"\bNodes", fmt.Sprintf("%d", len(c.cluster.Nodes()))}} + nodes := c.cluster.Nodes() + driverStatus := [][2]string{{"\bNodes", fmt.Sprintf("%d", len(nodes))}} - for _, node := range c.cluster.Nodes() { + for _, node := range nodes { driverStatus = append(driverStatus, [2]string{node.Name, node.Addr}) } info := struct { diff --git a/api/events.go b/api/events.go index 7fe583c17f..dcd81515dd 100644 --- a/api/events.go +++ b/api/events.go @@ -17,7 +17,7 @@ type eventsHandler struct { func (eh *eventsHandler) Handle(e *cluster.Event) error { eh.RLock() - str := fmt.Sprintf("{%q:%q,%q:%q,%q:%q,%q:%d}", "status", e.Type, "id", e.Container.Id, "from", e.Container.Image+" node:"+e.Node.ID, "time", e.Time.Unix()) + str := fmt.Sprintf("{%q:%q,%q:%q,%q:%q,%q:%d}", "status", e.Status, "id", e.Id, "from", e.From+" node:"+e.NodeName, "time", e.Time) for key, w := range eh.ws { if _, err := fmt.Fprintf(w, str); err != nil { diff --git a/cluster/cluster.go b/cluster/cluster.go index ea01bca816..5d4b4cef32 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -14,7 +14,7 @@ var ( ) type Cluster struct { - sync.Mutex + sync.RWMutex eventHandlers []EventHandler nodes map[string]*Node } @@ -88,8 +88,14 @@ func (c *Cluster) Container(IdOrName string) *Container { } // Nodes returns the list of nodes in the cluster -func (c *Cluster) Nodes() map[string]*Node { - return c.nodes +func (c *Cluster) Nodes() []*Node { + nodes := []*Node{} + c.RLock() + for _, node := range c.nodes { + nodes = append(nodes, node) + } + c.RUnlock() + return nodes } func (c *Cluster) Node(addr string) *Node { diff --git a/cluster/event.go b/cluster/event.go index 7414604031..80cfa002d4 100644 --- a/cluster/event.go +++ b/cluster/event.go @@ -1,12 +1,11 @@ package cluster -import "time" +import "github.com/samalba/dockerclient" type Event struct { - Type string - Container *Container - Node *Node - Time time.Time + dockerclient.Event + + NodeName string } type EventHandler interface { diff --git a/cluster/node.go b/cluster/node.go index fe2ee6f320..b70f394922 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -28,7 +28,7 @@ func NewNode(addr string) *Node { } type Node struct { - sync.Mutex + sync.RWMutex ID string IP string @@ -205,9 +205,11 @@ func (n *Node) refreshLoop() { // Return the sum of memory reserved by containers. func (n *Node) ReservedMemory() int64 { var r int64 = 0 + n.RLock() for _, c := range n.containers { r += c.Info.Config.Memory } + n.RUnlock() return r } @@ -219,9 +221,11 @@ func (n *Node) AvailableMemory() int64 { // Return the sum of CPUs reserved by containers. func (n *Node) ReservedCpus() int64 { var r int64 = 0 + n.RLock() for _, c := range n.containers { r += c.Info.Config.CpuShares } + n.RUnlock() return r } @@ -260,6 +264,9 @@ func (n *Node) Create(config *dockerclient.ContainerConfig, name string, pullIma // Force a state refresh to pick up the newly created container. n.refreshContainer(id) + n.RLock() + defer n.RUnlock() + return n.containers[id], nil } @@ -310,8 +317,14 @@ func (n *Node) Events(h EventHandler) error { return nil } -func (n *Node) Containers() map[string]*Container { - return n.containers +func (n *Node) Containers() []*Container { + containers := []*Container{} + n.RLock() + for _, container := range n.containers { + containers = append(containers, container) + } + n.RUnlock() + return containers } func (n *Node) String() string { @@ -327,23 +340,8 @@ func (n *Node) handler(ev *dockerclient.Event, args ...interface{}) { return } - event := &Event{ - Node: n, - Type: ev.Status, - Time: time.Unix(int64(ev.Time), 0), - } - - if container, ok := n.containers[ev.Id]; ok { - event.Container = container - } else { - event.Container = &Container{ - node: n, - Container: dockerclient.Container{ - Id: ev.Id, - Image: ev.From, - }, - } - } + event := &Event{NodeName: n.Name} + event.Event = *ev n.eventHandler.Handle(event) } diff --git a/cluster/node_test.go b/cluster/node_test.go index 8d04a1a12c..0a19fe6dd5 100644 --- a/cluster/node_test.go +++ b/cluster/node_test.go @@ -76,18 +76,20 @@ func TestNodeState(t *testing.T) { assert.True(t, node.IsConnected()) // The node should only have a single container at this point. - assert.Len(t, node.Containers(), 1) - if _, ok := node.Containers()["one"]; !ok { + containers := node.Containers() + assert.Len(t, containers, 1) + if containers[0].Id != "one" { t.Fatalf("Missing container: one") } // Fake an event which will trigger a refresh. The second container will appear. node.handler(&dockerclient.Event{Id: "two", Status: "created"}) - assert.Len(t, node.Containers(), 2) - if _, ok := node.Containers()["one"]; !ok { + containers = node.Containers() + assert.Len(t, containers, 2) + if containers[0].Id != "one" && containers[1].Id != "one" { t.Fatalf("Missing container: one") } - if _, ok := node.Containers()["two"]; !ok { + if containers[0].Id != "two" && containers[1].Id != "two" { t.Fatalf("Missing container: two") } diff --git a/manage.go b/manage.go index 0639e18b0e..1aa5d05d1b 100644 --- a/manage.go +++ b/manage.go @@ -18,7 +18,7 @@ type logHandler struct { } func (h *logHandler) Handle(e *cluster.Event) error { - log.Printf("event -> type: %q time: %q image: %q container: %q", e.Type, e.Time.Format(time.RubyDate), e.Container.Image, e.Container.Id) + log.Printf("event -> status: %q from: %q id: %q node: %q", e.Status, e.From, e.Id, e.NodeName) return nil } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index e946e62940..510fdbc0a3 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -27,10 +27,7 @@ func NewScheduler(cluster *cluster.Cluster, strategy strategy.PlacementStrategy, // Find a nice home for our container. func (s *Scheduler) selectNodeForContainer(config *dockerclient.ContainerConfig) (*cluster.Node, error) { - candidates := []*cluster.Node{} - for _, node := range s.cluster.Nodes() { - candidates = append(candidates, node) - } + candidates := s.cluster.Nodes() accepted, err := filter.ApplyFilters(s.filters, config, candidates) if err != nil {