remove races

This commit is contained in:
Victor Vieux 2014-11-21 01:36:30 +00:00
parent dd05044b96
commit 1f74cb5809
8 changed files with 44 additions and 41 deletions

View File

@ -32,9 +32,10 @@ type handler func(c *context, w http.ResponseWriter, r *http.Request)
// GET /info
func getInfo(c *context, w http.ResponseWriter, r *http.Request) {
driverStatus := [][2]string{{"\bNodes", fmt.Sprintf("%d", len(c.cluster.Nodes()))}}
nodes := c.cluster.Nodes()
driverStatus := [][2]string{{"\bNodes", fmt.Sprintf("%d", len(nodes))}}
for _, node := range c.cluster.Nodes() {
for _, node := range nodes {
driverStatus = append(driverStatus, [2]string{node.Name, node.Addr})
}
info := struct {

View File

@ -17,7 +17,7 @@ type eventsHandler struct {
func (eh *eventsHandler) Handle(e *cluster.Event) error {
eh.RLock()
str := fmt.Sprintf("{%q:%q,%q:%q,%q:%q,%q:%d}", "status", e.Type, "id", e.Container.Id, "from", e.Container.Image+" node:"+e.Node.ID, "time", e.Time.Unix())
str := fmt.Sprintf("{%q:%q,%q:%q,%q:%q,%q:%d}", "status", e.Status, "id", e.Id, "from", e.From+" node:"+e.NodeName, "time", e.Time)
for key, w := range eh.ws {
if _, err := fmt.Fprintf(w, str); err != nil {

View File

@ -14,7 +14,7 @@ var (
)
type Cluster struct {
sync.Mutex
sync.RWMutex
eventHandlers []EventHandler
nodes map[string]*Node
}
@ -88,8 +88,14 @@ func (c *Cluster) Container(IdOrName string) *Container {
}
// Nodes returns the list of nodes in the cluster
func (c *Cluster) Nodes() map[string]*Node {
return c.nodes
func (c *Cluster) Nodes() []*Node {
nodes := []*Node{}
c.RLock()
for _, node := range c.nodes {
nodes = append(nodes, node)
}
c.RUnlock()
return nodes
}
func (c *Cluster) Node(addr string) *Node {

View File

@ -1,12 +1,11 @@
package cluster
import "time"
import "github.com/samalba/dockerclient"
type Event struct {
Type string
Container *Container
Node *Node
Time time.Time
dockerclient.Event
NodeName string
}
type EventHandler interface {

View File

@ -28,7 +28,7 @@ func NewNode(addr string) *Node {
}
type Node struct {
sync.Mutex
sync.RWMutex
ID string
IP string
@ -205,9 +205,11 @@ func (n *Node) refreshLoop() {
// Return the sum of memory reserved by containers.
func (n *Node) ReservedMemory() int64 {
var r int64 = 0
n.RLock()
for _, c := range n.containers {
r += c.Info.Config.Memory
}
n.RUnlock()
return r
}
@ -219,9 +221,11 @@ func (n *Node) AvailableMemory() int64 {
// Return the sum of CPUs reserved by containers.
func (n *Node) ReservedCpus() int64 {
var r int64 = 0
n.RLock()
for _, c := range n.containers {
r += c.Info.Config.CpuShares
}
n.RUnlock()
return r
}
@ -260,6 +264,9 @@ func (n *Node) Create(config *dockerclient.ContainerConfig, name string, pullIma
// Force a state refresh to pick up the newly created container.
n.refreshContainer(id)
n.RLock()
defer n.RUnlock()
return n.containers[id], nil
}
@ -310,8 +317,14 @@ func (n *Node) Events(h EventHandler) error {
return nil
}
func (n *Node) Containers() map[string]*Container {
return n.containers
func (n *Node) Containers() []*Container {
containers := []*Container{}
n.RLock()
for _, container := range n.containers {
containers = append(containers, container)
}
n.RUnlock()
return containers
}
func (n *Node) String() string {
@ -327,23 +340,8 @@ func (n *Node) handler(ev *dockerclient.Event, args ...interface{}) {
return
}
event := &Event{
Node: n,
Type: ev.Status,
Time: time.Unix(int64(ev.Time), 0),
}
if container, ok := n.containers[ev.Id]; ok {
event.Container = container
} else {
event.Container = &Container{
node: n,
Container: dockerclient.Container{
Id: ev.Id,
Image: ev.From,
},
}
}
event := &Event{NodeName: n.Name}
event.Event = *ev
n.eventHandler.Handle(event)
}

View File

@ -76,18 +76,20 @@ func TestNodeState(t *testing.T) {
assert.True(t, node.IsConnected())
// The node should only have a single container at this point.
assert.Len(t, node.Containers(), 1)
if _, ok := node.Containers()["one"]; !ok {
containers := node.Containers()
assert.Len(t, containers, 1)
if containers[0].Id != "one" {
t.Fatalf("Missing container: one")
}
// Fake an event which will trigger a refresh. The second container will appear.
node.handler(&dockerclient.Event{Id: "two", Status: "created"})
assert.Len(t, node.Containers(), 2)
if _, ok := node.Containers()["one"]; !ok {
containers = node.Containers()
assert.Len(t, containers, 2)
if containers[0].Id != "one" && containers[1].Id != "one" {
t.Fatalf("Missing container: one")
}
if _, ok := node.Containers()["two"]; !ok {
if containers[0].Id != "two" && containers[1].Id != "two" {
t.Fatalf("Missing container: two")
}

View File

@ -18,7 +18,7 @@ type logHandler struct {
}
func (h *logHandler) Handle(e *cluster.Event) error {
log.Printf("event -> type: %q time: %q image: %q container: %q", e.Type, e.Time.Format(time.RubyDate), e.Container.Image, e.Container.Id)
log.Printf("event -> status: %q from: %q id: %q node: %q", e.Status, e.From, e.Id, e.NodeName)
return nil
}

View File

@ -27,10 +27,7 @@ func NewScheduler(cluster *cluster.Cluster, strategy strategy.PlacementStrategy,
// Find a nice home for our container.
func (s *Scheduler) selectNodeForContainer(config *dockerclient.ContainerConfig) (*cluster.Node, error) {
candidates := []*cluster.Node{}
for _, node := range s.cluster.Nodes() {
candidates = append(candidates, node)
}
candidates := s.cluster.Nodes()
accepted, err := filter.ApplyFilters(s.filters, config, candidates)
if err != nil {