diff --git a/api/api.go b/api/api.go index 4d0ccfc790..b1c22f33d6 100644 --- a/api/api.go +++ b/api/api.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/json" "fmt" - "io" "io/ioutil" "net/http" "net/url" @@ -221,17 +220,15 @@ func deleteContainer(c *context, w http.ResponseWriter, r *http.Request) { // GET /events func getEvents(c *context, w http.ResponseWriter, r *http.Request) { - c.eventsHandler.Lock() - c.eventsHandler.ws[r.RemoteAddr] = w - c.eventsHandler.cs[r.RemoteAddr] = make(chan struct{}) - c.eventsHandler.Unlock() + c.eventsHandler.Add(r.RemoteAddr, w) + w.Header().Set("Content-Type", "application/json") if f, ok := w.(http.Flusher); ok { f.Flush() } - <-c.eventsHandler.cs[r.RemoteAddr] + c.eventsHandler.Wait(r.RemoteAddr) } // GET /_ping @@ -351,13 +348,10 @@ func createRouter(c *context, enableCors bool) (*mux.Router, error) { func ListenAndServe(c *cluster.Cluster, s *scheduler.Scheduler, addr, version string) error { context := &context{ - cluster: c, - scheduler: s, - version: version, - eventsHandler: &eventsHandler{ - ws: make(map[string]io.Writer), - cs: make(map[string]chan struct{}), - }, + cluster: c, + scheduler: s, + version: version, + eventsHandler: NewEventsHandler(), } c.Events(context.eventsHandler) r, err := createRouter(context, false) diff --git a/api/events.go b/api/events.go index f1a390f86f..b0b382834a 100644 --- a/api/events.go +++ b/api/events.go @@ -15,6 +15,24 @@ type eventsHandler struct { cs map[string]chan struct{} } +func NewEventsHandler() *eventsHandler { + return &eventsHandler{ + ws: make(map[string]io.Writer), + cs: make(map[string]chan struct{}), + } +} + +func (eh *eventsHandler) Add(remoteAddr string, w io.Writer) { + eh.Lock() + eh.ws[remoteAddr] = w + eh.cs[remoteAddr] = make(chan struct{}) + eh.Unlock() +} + +func (eh *eventsHandler) Wait(remoteAddr string) { + <-eh.cs[remoteAddr] +} + func (eh *eventsHandler) Handle(e *cluster.Event) error { eh.RLock() diff --git a/api/events_test.go b/api/events_test.go new file mode 100644 index 0000000000..15e0749648 --- /dev/null +++ b/api/events_test.go @@ -0,0 +1,53 @@ +package api + +import ( + "fmt" + "testing" + + "github.com/docker/swarm/cluster" + "github.com/stretchr/testify/assert" +) + +type FakeWriter struct { + Tmp []byte +} + +func (fw *FakeWriter) Write(p []byte) (n int, err error) { + fw.Tmp = append(fw.Tmp, p...) + return len(p), nil +} + +func TestHandle(t *testing.T) { + eh := NewEventsHandler() + assert.Equal(t, eh.Size(), 0) + + fw := &FakeWriter{Tmp: []byte{}} + eh.Add("test", fw) + + assert.Equal(t, eh.Size(), 1) + + event := &cluster.Event{ + NodeName: "node_name", + NodeID: "node_id", + NodeAddr: "node_addr", + NodeIP: "node_ip", + } + event.Event.Status = "status" + event.Event.Id = "id" + event.Event.From = "from" + event.Event.Time = 0 + + assert.NoError(t, eh.Handle(event)) + + str := fmt.Sprintf("{%q:%q,%q:%q,%q:%q,%q:%d,%q:%q,%q:%q,%q:%q,%q:%q}", + "status", "status", + "id", "id", + "from", "from node:node_name", + "time", 0, + "node_name", "node_name", + "node_id", "node_id", + "node_addr", "node_addr", + "node_ip", "node_ip") + + assert.Equal(t, str, string(fw.Tmp)) +}