From 04fb48d27af475e17a52c75a5e1f92933536079a Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Fri, 15 Jan 2016 18:30:46 -0800 Subject: [PATCH] support 1.10 events Signed-off-by: Victor Vieux --- api/events.go | 38 ++++++++++++++++++++------ api/events_test.go | 36 ++++++++++++++++-------- cli/manage.go | 2 +- cluster/engine.go | 14 +++++++--- cluster/engine_test.go | 2 +- cluster/event.go | 2 +- test/integration/api/events.bats | 4 +-- test/integration/mesos/api/events.bats | 2 +- 8 files changed, 69 insertions(+), 31 deletions(-) diff --git a/api/events.go b/api/events.go index 3d792f5e86..8a289608c9 100644 --- a/api/events.go +++ b/api/events.go @@ -1,6 +1,7 @@ package api import ( + "encoding/json" "fmt" "io" "net/http" @@ -10,8 +11,6 @@ import ( "github.com/docker/swarm/cluster" ) -const eventFmt string = "{%q:%q,%q:%q,%q:%q,%q:%d,%q:{%q:%q,%q:%q,%q:%q,%q:%q}}\n" - // EventsHandler broadcasts events to multiple client listeners. type eventsHandler struct { sync.RWMutex @@ -76,21 +75,42 @@ func (eh *eventsHandler) cleanupHandler(remoteAddr string) { func (eh *eventsHandler) Handle(e *cluster.Event) error { eh.RLock() - str := fmt.Sprintf(eventFmt, - "status", e.Status, - "id", e.Id, - "from", e.From+" node:"+e.Engine.Name, - "time", e.Time, + // remove this hack once 1.10 is broadly adopted + from := e.From + e.From = e.From + " node:" + e.Engine.Name + + // Attributes will be nil if the event was sent by engine < 1.10 + if e.Actor.Attributes == nil { + e.Actor.Attributes = make(map[string]string) + } + e.Actor.Attributes["node.name"] = e.Engine.Name + e.Actor.Attributes["node.id"] = e.Engine.ID + e.Actor.Attributes["node.addr"] = e.Engine.Addr + e.Actor.Attributes["node.ip"] = e.Engine.IP + + data, err := json.Marshal(e) + e.From = from + if err != nil { + return err + } + + // remove the node field once 1.10 is broadly adopted & interlock stop relying on it + node := fmt.Sprintf(",%q:{%q:%q,%q:%q,%q:%q,%q:%q}}", "node", "Name", e.Engine.Name, "Id", e.Engine.ID, "Addr", e.Engine.Addr, - "Ip", e.Engine.IP) + "Ip", e.Engine.IP, + ) + + // insert Node field + data = data[:len(data)-1] + data = append(data, []byte(node)...) var failed []string for key, w := range eh.ws { - if _, err := fmt.Fprintf(w, str); err != nil { + if _, err := fmt.Fprintf(w, string(data)); err != nil { // collect them to handle later under Lock failed = append(failed, key) continue diff --git a/api/events_test.go b/api/events_test.go index c7c5220298..2da2c924d6 100644 --- a/api/events_test.go +++ b/api/events_test.go @@ -1,6 +1,7 @@ package api import ( + "encoding/json" "fmt" "testing" @@ -36,22 +37,33 @@ func TestHandle(t *testing.T) { } event.Event.Status = "status" - event.Event.Id = "id" + event.Event.ID = "id" event.Event.From = "from" event.Event.Time = 0 + event.Actor.Attributes = make(map[string]string) + event.Actor.Attributes["nodevent.name"] = event.Engine.Name + event.Actor.Attributes["nodevent.id"] = event.Engine.ID + event.Actor.Attributes["nodevent.addr"] = event.Engine.Addr + event.Actor.Attributes["nodevent.ip"] = event.Engine.IP assert.NoError(t, eh.Handle(event)) - str := fmt.Sprintf(eventFmt, - "status", "status", - "id", "id", - "from", "from node:node_name", - "time", 0, - "node", - "Name", "node_name", - "Id", "node_id", - "Addr", "node_addr", - "Ip", "node_ip") + event.Event.From = "from node:node_name" - assert.Equal(t, str, string(fw.Tmp)) + data, err := json.Marshal(event) + assert.NoError(t, err) + + node := fmt.Sprintf(",%q:{%q:%q,%q:%q,%q:%q,%q:%q}}", + "node", + "Name", event.Engine.Name, + "Id", event.Engine.ID, + "Addr", event.Engine.Addr, + "Ip", event.Engine.IP, + ) + + // insert Node field + data = data[:len(data)-1] + data = append(data, []byte(node)...) + + assert.Equal(t, string(data), string(fw.Tmp)) } diff --git a/cli/manage.go b/cli/manage.go index b8a3bd83de..d530bb7fe2 100644 --- a/cli/manage.go +++ b/cli/manage.go @@ -34,7 +34,7 @@ type logHandler struct { } func (h *logHandler) Handle(e *cluster.Event) error { - id := e.Id + id := e.ID // Trim IDs to 12 chars. if len(id) > 12 { id = id[:12] diff --git a/cluster/engine.go b/cluster/engine.go index bd283314fb..df307011f1 100644 --- a/cluster/engine.go +++ b/cluster/engine.go @@ -145,7 +145,7 @@ func (e *Engine) Connect(config *tls.Config) error { } e.IP = addr.IP.String() - c, err := dockerclient.NewDockerClientTimeout("tcp://"+e.Addr, config, time.Duration(requestTimeout)) + c, err := dockerclient.NewDockerClientTimeout("tcp://"+e.Addr, config, time.Duration(requestTimeout), nil) if err != nil { return err } @@ -627,7 +627,13 @@ func (e *Engine) emitEvent(event string) { Event: dockerclient.Event{ Status: event, From: "swarm", - Time: time.Now().Unix(), + Type: "swarm", + Action: event, + Actor: dockerclient.Actor{ + Attributes: make(map[string]string), + }, + Time: time.Now().Unix(), + TimeNano: time.Now().UnixNano(), }, Engine: e, } @@ -886,12 +892,12 @@ func (e *Engine) handler(ev *dockerclient.Event, _ chan error, args ...interface case "die", "kill", "oom", "pause", "start", "stop", "unpause", "rename": // If the container state changes, we have to do an inspect in // order to update container.Info and get the new NetworkSettings. - e.refreshContainer(ev.Id, true) + e.refreshContainer(ev.ID, true) e.RefreshVolumes() e.RefreshNetworks() default: // Otherwise, do a "soft" refresh of the container. - e.refreshContainer(ev.Id, false) + e.refreshContainer(ev.ID, false) e.RefreshVolumes() e.RefreshNetworks() } diff --git a/cluster/engine_test.go b/cluster/engine_test.go index 6b747d09ff..6fa17a72e3 100644 --- a/cluster/engine_test.go +++ b/cluster/engine_test.go @@ -212,7 +212,7 @@ func TestEngineState(t *testing.T) { } // Fake an event which will trigger a refresh. The second container will appear. - engine.handler(&dockerclient.Event{Id: "two", Status: "created"}, nil) + engine.handler(&dockerclient.Event{ID: "two", Status: "created"}, nil) containers = engine.Containers() assert.Len(t, containers, 2) if containers[0].Id != "one" && containers[1].Id != "one" { diff --git a/cluster/event.go b/cluster/event.go index 034afe356b..797998a62a 100644 --- a/cluster/event.go +++ b/cluster/event.go @@ -11,7 +11,7 @@ import ( // Event is exported type Event struct { dockerclient.Event - Engine *Engine + Engine *Engine `json:"-"` } // EventHandler is exported diff --git a/test/integration/api/events.bats b/test/integration/api/events.bats index 4e6c1bcd48..136c0abf5d 100644 --- a/test/integration/api/events.bats +++ b/test/integration/api/events.bats @@ -26,12 +26,12 @@ function teardown() { kill "$events_pid" # verify size - [[ $(wc -l < ${log_file}) == 3 ]] + [[ $(wc -l < ${log_file}) -ge 3 ]] # verify content run cat "$log_file" [ "$status" -eq 0 ] - [[ "${output}" == *"node:node-0"* ]] + [[ "${output}" == *"node-0"* ]] [[ "${output}" == *"create"* ]] [[ "${output}" == *"start"* ]] [[ "${output}" == *"die"* ]] diff --git a/test/integration/mesos/api/events.bats b/test/integration/mesos/api/events.bats index da7bdbe6a6..0b20f46894 100644 --- a/test/integration/mesos/api/events.bats +++ b/test/integration/mesos/api/events.bats @@ -31,7 +31,7 @@ function teardown() { # verify run cat "$log_file" [ "$status" -eq 0 ] - [[ "${output}" == *"node:node-0"* ]] + [[ "${output}" == *"node-0"* ]] [[ "${output}" == *"create"* ]] [[ "${output}" == *"start"* ]] [[ "${output}" == *"die"* ]]