support 1.10 events

Signed-off-by: Victor Vieux <vieux@docker.com>
This commit is contained in:
Victor Vieux 2016-01-15 18:30:46 -08:00
parent d884fe563b
commit 04fb48d27a
8 changed files with 69 additions and 31 deletions

View File

@ -1,6 +1,7 @@
package api
import (
"encoding/json"
"fmt"
"io"
"net/http"
@ -10,8 +11,6 @@ import (
"github.com/docker/swarm/cluster"
)
const eventFmt string = "{%q:%q,%q:%q,%q:%q,%q:%d,%q:{%q:%q,%q:%q,%q:%q,%q:%q}}\n"
// EventsHandler broadcasts events to multiple client listeners.
type eventsHandler struct {
sync.RWMutex
@ -76,21 +75,42 @@ func (eh *eventsHandler) cleanupHandler(remoteAddr string) {
func (eh *eventsHandler) Handle(e *cluster.Event) error {
eh.RLock()
str := fmt.Sprintf(eventFmt,
"status", e.Status,
"id", e.Id,
"from", e.From+" node:"+e.Engine.Name,
"time", e.Time,
// remove this hack once 1.10 is broadly adopted
from := e.From
e.From = e.From + " node:" + e.Engine.Name
// Attributes will be nil if the event was sent by engine < 1.10
if e.Actor.Attributes == nil {
e.Actor.Attributes = make(map[string]string)
}
e.Actor.Attributes["node.name"] = e.Engine.Name
e.Actor.Attributes["node.id"] = e.Engine.ID
e.Actor.Attributes["node.addr"] = e.Engine.Addr
e.Actor.Attributes["node.ip"] = e.Engine.IP
data, err := json.Marshal(e)
e.From = from
if err != nil {
return err
}
// remove the node field once 1.10 is broadly adopted & interlock stop relying on it
node := fmt.Sprintf(",%q:{%q:%q,%q:%q,%q:%q,%q:%q}}",
"node",
"Name", e.Engine.Name,
"Id", e.Engine.ID,
"Addr", e.Engine.Addr,
"Ip", e.Engine.IP)
"Ip", e.Engine.IP,
)
// insert Node field
data = data[:len(data)-1]
data = append(data, []byte(node)...)
var failed []string
for key, w := range eh.ws {
if _, err := fmt.Fprintf(w, str); err != nil {
if _, err := fmt.Fprintf(w, string(data)); err != nil {
// collect them to handle later under Lock
failed = append(failed, key)
continue

View File

@ -1,6 +1,7 @@
package api
import (
"encoding/json"
"fmt"
"testing"
@ -36,22 +37,33 @@ func TestHandle(t *testing.T) {
}
event.Event.Status = "status"
event.Event.Id = "id"
event.Event.ID = "id"
event.Event.From = "from"
event.Event.Time = 0
event.Actor.Attributes = make(map[string]string)
event.Actor.Attributes["nodevent.name"] = event.Engine.Name
event.Actor.Attributes["nodevent.id"] = event.Engine.ID
event.Actor.Attributes["nodevent.addr"] = event.Engine.Addr
event.Actor.Attributes["nodevent.ip"] = event.Engine.IP
assert.NoError(t, eh.Handle(event))
str := fmt.Sprintf(eventFmt,
"status", "status",
"id", "id",
"from", "from node:node_name",
"time", 0,
"node",
"Name", "node_name",
"Id", "node_id",
"Addr", "node_addr",
"Ip", "node_ip")
event.Event.From = "from node:node_name"
assert.Equal(t, str, string(fw.Tmp))
data, err := json.Marshal(event)
assert.NoError(t, err)
node := fmt.Sprintf(",%q:{%q:%q,%q:%q,%q:%q,%q:%q}}",
"node",
"Name", event.Engine.Name,
"Id", event.Engine.ID,
"Addr", event.Engine.Addr,
"Ip", event.Engine.IP,
)
// insert Node field
data = data[:len(data)-1]
data = append(data, []byte(node)...)
assert.Equal(t, string(data), string(fw.Tmp))
}

View File

@ -34,7 +34,7 @@ type logHandler struct {
}
func (h *logHandler) Handle(e *cluster.Event) error {
id := e.Id
id := e.ID
// Trim IDs to 12 chars.
if len(id) > 12 {
id = id[:12]

View File

@ -145,7 +145,7 @@ func (e *Engine) Connect(config *tls.Config) error {
}
e.IP = addr.IP.String()
c, err := dockerclient.NewDockerClientTimeout("tcp://"+e.Addr, config, time.Duration(requestTimeout))
c, err := dockerclient.NewDockerClientTimeout("tcp://"+e.Addr, config, time.Duration(requestTimeout), nil)
if err != nil {
return err
}
@ -627,7 +627,13 @@ func (e *Engine) emitEvent(event string) {
Event: dockerclient.Event{
Status: event,
From: "swarm",
Time: time.Now().Unix(),
Type: "swarm",
Action: event,
Actor: dockerclient.Actor{
Attributes: make(map[string]string),
},
Time: time.Now().Unix(),
TimeNano: time.Now().UnixNano(),
},
Engine: e,
}
@ -886,12 +892,12 @@ func (e *Engine) handler(ev *dockerclient.Event, _ chan error, args ...interface
case "die", "kill", "oom", "pause", "start", "stop", "unpause", "rename":
// If the container state changes, we have to do an inspect in
// order to update container.Info and get the new NetworkSettings.
e.refreshContainer(ev.Id, true)
e.refreshContainer(ev.ID, true)
e.RefreshVolumes()
e.RefreshNetworks()
default:
// Otherwise, do a "soft" refresh of the container.
e.refreshContainer(ev.Id, false)
e.refreshContainer(ev.ID, false)
e.RefreshVolumes()
e.RefreshNetworks()
}

View File

@ -212,7 +212,7 @@ func TestEngineState(t *testing.T) {
}
// Fake an event which will trigger a refresh. The second container will appear.
engine.handler(&dockerclient.Event{Id: "two", Status: "created"}, nil)
engine.handler(&dockerclient.Event{ID: "two", Status: "created"}, nil)
containers = engine.Containers()
assert.Len(t, containers, 2)
if containers[0].Id != "one" && containers[1].Id != "one" {

View File

@ -11,7 +11,7 @@ import (
// Event is exported
type Event struct {
dockerclient.Event
Engine *Engine
Engine *Engine `json:"-"`
}
// EventHandler is exported

View File

@ -26,12 +26,12 @@ function teardown() {
kill "$events_pid"
# verify size
[[ $(wc -l < ${log_file}) == 3 ]]
[[ $(wc -l < ${log_file}) -ge 3 ]]
# verify content
run cat "$log_file"
[ "$status" -eq 0 ]
[[ "${output}" == *"node:node-0"* ]]
[[ "${output}" == *"node-0"* ]]
[[ "${output}" == *"create"* ]]
[[ "${output}" == *"start"* ]]
[[ "${output}" == *"die"* ]]

View File

@ -31,7 +31,7 @@ function teardown() {
# verify
run cat "$log_file"
[ "$status" -eq 0 ]
[[ "${output}" == *"node:node-0"* ]]
[[ "${output}" == *"node-0"* ]]
[[ "${output}" == *"create"* ]]
[[ "${output}" == *"start"* ]]
[[ "${output}" == *"die"* ]]