use engine-api for events

Signed-off-by: Victor Vieux <vieux@docker.com>
This commit is contained in:
Victor Vieux 2016-03-25 21:46:53 -07:00
parent ae7174fe4c
commit 622b509274
6 changed files with 133 additions and 38 deletions

View File

@ -36,10 +36,10 @@ func TestHandle(t *testing.T) {
},
}
event.Event.Status = "status"
event.Event.ID = "id"
event.Event.From = "from"
event.Event.Time = 0
event.Message.Status = "status"
event.Message.ID = "id"
event.Message.From = "from"
event.Message.Time = 0
event.Actor.Attributes = make(map[string]string)
event.Actor.Attributes["nodevent.name"] = event.Engine.Name
event.Actor.Attributes["nodevent.id"] = event.Engine.ID
@ -48,7 +48,7 @@ func TestHandle(t *testing.T) {
assert.NoError(t, eh.Handle(event))
event.Event.From = "from node:node_name"
event.Message.From = "from node:node_name"
data, err := json.Marshal(event)
assert.NoError(t, err)

View File

@ -21,6 +21,7 @@ import (
"github.com/docker/docker/pkg/version"
engineapi "github.com/docker/engine-api/client"
"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/events"
"github.com/docker/engine-api/types/filters"
networktypes "github.com/docker/engine-api/types/network"
engineapinop "github.com/docker/swarm/api/nopclient"
@ -127,6 +128,7 @@ type Engine struct {
failureCount int
overcommitRatio int64
opts *EngineOpts
eventsMonitor *EventsMonitor
}
// NewEngine is exported
@ -188,16 +190,14 @@ func (e *Engine) Connect(config *tls.Config) error {
func (e *Engine) StartMonitorEvents() {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Debug("Start monitoring events")
ec := make(chan error)
e.client.StartMonitorEvents(e.handler, ec)
e.eventsMonitor.Start(ec)
go func() {
if err := <-ec; err != nil {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Error monitoring events: %s.", err)
if !strings.Contains(err.Error(), "EOF") {
// failing node reconnect should use back-off strategy
<-e.refreshDelayer.Wait(e.getFailureCount())
}
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Restart event monitoring.")
e.StartMonitorEvents()
}
close(ec)
@ -208,6 +208,7 @@ func (e *Engine) StartMonitorEvents() {
func (e *Engine) ConnectWithClient(client dockerclient.Client, apiClient engineapi.APIClient) error {
e.client = client
e.apiClient = apiClient
e.eventsMonitor = NewEventsMonitor(e.apiClient, e.handler)
// Fetch the engine labels.
if err := e.updateSpecs(); err != nil {
@ -246,7 +247,8 @@ func (e *Engine) Disconnect() {
// close the chan
close(e.stopCh)
e.client.StopAllMonitorEvents()
e.eventsMonitor.Stop()
// close idle connections
if dc, ok := e.client.(*dockerclient.DockerClient); ok {
closeIdleConnections(dc.HTTPClient)
@ -788,7 +790,7 @@ func (e *Engine) refreshLoop() {
}
if !healthy {
e.client.StopAllMonitorEvents()
e.eventsMonitor.Stop()
e.StartMonitorEvents()
}
@ -811,12 +813,12 @@ func (e *Engine) emitEvent(event string) {
return
}
ev := &Event{
Event: dockerclient.Event{
Message: events.Message{
Status: event,
From: "swarm",
Type: "swarm",
Action: event,
Actor: dockerclient.Actor{
Actor: events.Actor{
Attributes: make(map[string]string),
},
Time: time.Now().Unix(),
@ -1111,10 +1113,10 @@ func (e *Engine) String() string {
return fmt.Sprintf("engine %s addr %s", e.ID, e.Addr)
}
func (e *Engine) handler(ev *dockerclient.Event, _ chan error, args ...interface{}) {
func (e *Engine) handler(msg events.Message) error {
// Something changed - refresh our internal state.
switch ev.Type {
switch msg.Type {
case "network":
e.RefreshNetworks()
case "volume":
@ -1122,15 +1124,15 @@ func (e *Engine) handler(ev *dockerclient.Event, _ chan error, args ...interface
case "image":
e.RefreshImages()
case "container":
switch ev.Action {
switch msg.Action {
case "die", "kill", "oom", "pause", "start", "restart", "stop", "unpause", "rename":
e.refreshContainer(ev.ID, true)
e.refreshContainer(msg.ID, true)
default:
e.refreshContainer(ev.ID, false)
e.refreshContainer(msg.ID, false)
}
case "":
// docker < 1.10
switch ev.Status {
switch msg.Status {
case "pull", "untag", "delete", "commit":
// These events refer to images so there's no need to update
// containers.
@ -1138,12 +1140,12 @@ func (e *Engine) handler(ev *dockerclient.Event, _ chan error, args ...interface
case "die", "kill", "oom", "pause", "start", "stop", "unpause", "rename":
// If the container state changes, we have to do an inspect in
// order to update container.Info and get the new NetworkSettings.
e.refreshContainer(ev.ID, true)
e.refreshContainer(msg.ID, true)
e.RefreshVolumes()
e.RefreshNetworks()
default:
// Otherwise, do a "soft" refresh of the container.
e.refreshContainer(ev.ID, false)
e.refreshContainer(msg.ID, false)
e.RefreshVolumes()
e.RefreshNetworks()
}
@ -1152,15 +1154,15 @@ func (e *Engine) handler(ev *dockerclient.Event, _ chan error, args ...interface
// If there is no event handler registered, abort right now.
if e.eventHandler == nil {
return
return nil
}
event := &Event{
Engine: e,
Event: *ev,
Engine: e,
Message: msg,
}
e.eventHandler.Handle(event)
return e.eventHandler.Handle(event)
}
// AddContainer injects a container into the internal state.

View File

@ -14,6 +14,7 @@ import (
engineapi "github.com/docker/engine-api/client"
"github.com/docker/engine-api/types"
containertypes "github.com/docker/engine-api/types/container"
"github.com/docker/engine-api/types/events"
"github.com/docker/engine-api/types/filters"
networktypes "github.com/docker/engine-api/types/network"
engineapimock "github.com/docker/swarm/api/mockclient"
@ -25,6 +26,13 @@ import (
"github.com/stretchr/testify/mock"
)
type infinitRead struct{}
func (infinitRead) Read(p []byte) (n int, err error) {
p = append(p, 1)
return 1, nil
}
var (
mockInfo = types.Info{
ID: "id",
@ -180,7 +188,7 @@ func TestEngineCpusMemory(t *testing.T) {
).Return(types.VolumesListResponse{}, nil)
apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.Image{}, nil)
apiClient.On("ContainerList", mock.Anything, types.ContainerListOptions{All: true, Size: false}).Return([]types.Container{}, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return()
apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{infinitRead{}}, nil)
assert.NoError(t, engine.ConnectWithClient(client, apiClient))
assert.True(t, engine.isConnected())
@ -210,7 +218,7 @@ func TestEngineSpecs(t *testing.T) {
).Return(types.VolumesListResponse{}, nil)
apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.Image{}, nil)
apiClient.On("ContainerList", mock.Anything, types.ContainerListOptions{All: true, Size: false}).Return([]types.Container{}, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return()
apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{infinitRead{}}, nil)
assert.NoError(t, engine.ConnectWithClient(client, apiClient))
assert.True(t, engine.isConnected())
@ -243,7 +251,7 @@ func TestEngineState(t *testing.T) {
apiClient.On("VolumeList", mock.Anything,
mock.AnythingOfType("Args"),
).Return(types.VolumesListResponse{}, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return()
apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{infinitRead{}}, nil)
// The client will return one container at first, then a second one will appear.
apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.Image{}, nil).Once()
@ -265,7 +273,7 @@ func TestEngineState(t *testing.T) {
}
// Fake an event which will trigger a refresh. The second container will appear.
engine.handler(&dockerclient.Event{ID: "two", Status: "created"}, nil)
engine.handler(events.Message{ID: "two", Status: "created"})
containers = engine.Containers()
assert.Len(t, containers, 2)
if containers[0].ID != "one" && containers[1].ID != "one" {
@ -305,7 +313,8 @@ func TestCreateContainer(t *testing.T) {
apiClient.On("VolumeList", mock.Anything,
mock.AnythingOfType("Args"),
).Return(types.VolumesListResponse{}, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return()
apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{infinitRead{}}, nil)
client.On("ListContainers", true, false, "").Return([]dockerclient.Container{}, nil).Once()
apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.Image{}, nil).Once()
// filterArgs1 := filters.NewArgs()
// filterArgs1.Add("id", id)
@ -419,7 +428,7 @@ func TestUsedCpus(t *testing.T) {
apiClient.On("VolumeList", mock.Anything,
mock.AnythingOfType("Args"),
).Return(types.VolumesListResponse{}, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return()
apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{infinitRead{}}, nil)
apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.Image{}, nil).Once()
apiClient.On("ContainerList", mock.Anything, types.ContainerListOptions{All: true, Size: false}).Return([]types.Container{{ID: "test"}}, nil).Once()
apiClient.On("ContainerInspect", mock.Anything, "test").Return(types.ContainerJSON{ContainerJSONBase: &types.ContainerJSONBase{HostConfig: &containertypes.HostConfig{Resources: containertypes.Resources{CPUShares: cpuShares}}}, Config: &containertypes.Config{}, NetworkSettings: &types.NetworkSettings{Networks: nil}}, nil).Once()
@ -458,7 +467,7 @@ func TestContainerRemovedDuringRefresh(t *testing.T) {
mock.AnythingOfType("Args"),
).Return(types.VolumesListResponse{}, nil)
apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.Image{}, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return()
apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{infinitRead{}}, nil)
apiClient.On("ContainerList", mock.Anything, types.ContainerListOptions{All: true, Size: false}).Return([]types.Container{container1, container2}, nil)
apiClient.On("ContainerInspect", mock.Anything, "c1").Return(info1, errors.New("Not found"))
apiClient.On("ContainerInspect", mock.Anything, "c2").Return(info2, nil)
@ -489,8 +498,7 @@ func TestDisconnect(t *testing.T) {
apiClient.On("VolumeList", mock.Anything,
mock.AnythingOfType("Args"),
).Return(types.VolumesListResponse{}, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return()
client.On("StopAllMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return()
apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{infinitRead{}}, nil)
// The client will return one container at first, then a second one will appear.
apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.Image{}, nil)

View File

@ -5,12 +5,12 @@ import (
"sync"
log "github.com/Sirupsen/logrus"
"github.com/samalba/dockerclient"
"github.com/docker/engine-api/types/events"
)
// Event is exported
type Event struct {
dockerclient.Event
events.Message
Engine *Engine `json:"-"`
}

85
cluster/event_monitor.go Normal file
View File

@ -0,0 +1,85 @@
package cluster
import (
"encoding/json"
"io"
"github.com/docker/engine-api/client"
"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/events"
"golang.org/x/net/context"
)
//EventsMonitor monitors events
type EventsMonitor struct {
stopChan chan struct{}
cli client.APIClient
handler func(msg events.Message) error
}
type decodingResult struct {
msg events.Message
err error
}
// NewEventsMonitor returns an EventsMonitor
func NewEventsMonitor(cli client.APIClient, handler func(msg events.Message) error) *EventsMonitor {
return &EventsMonitor{
cli: cli,
handler: handler,
}
}
// Start starts the EventsMonitor
func (em *EventsMonitor) Start(ec chan error) {
em.stopChan = make(chan struct{})
responseBody, err := em.cli.Events(context.TODO(), types.EventsOptions{})
if err != nil {
ec <- err
return
}
resultChan := make(chan decodingResult)
go func() {
dec := json.NewDecoder(responseBody)
for {
var result decodingResult
result.err = dec.Decode(&result.msg)
resultChan <- result
if result.err == io.EOF {
break
}
}
close(resultChan)
}()
go func() {
defer responseBody.Close()
for {
select {
case <-em.stopChan:
ec <- nil
return
case result := <-resultChan:
if result.err != nil {
ec <- result.err
return
}
if err := em.handler(result.msg); err != nil {
ec <- err
return
}
}
}
}()
}
// Stop stops the EventsMonitor
func (em *EventsMonitor) Stop() {
if em.stopChan == nil {
return
}
close(em.stopChan)
}

View File

@ -143,7 +143,7 @@ func TestImportImage(t *testing.T) {
mock.AnythingOfType("NetworkListOptions"),
).Return([]types.NetworkResource{}, nil)
apiClient.On("VolumeList", mock.Anything, mock.Anything).Return(types.VolumesListResponse{}, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return()
apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{bytes.NewBufferString("")}, nil)
apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.Image{}, nil)
apiClient.On("ContainerList", mock.Anything, types.ContainerListOptions{All: true, Size: false}).Return([]types.Container{}, nil).Once()
@ -196,7 +196,7 @@ func TestLoadImage(t *testing.T) {
mock.AnythingOfType("NetworkListOptions"),
).Return([]types.NetworkResource{}, nil)
apiClient.On("VolumeList", mock.Anything, mock.Anything).Return(types.VolumesListResponse{}, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return()
apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{bytes.NewBufferString("")}, nil)
apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.Image{}, nil)
apiClient.On("ContainerList", mock.Anything, types.ContainerListOptions{All: true, Size: false}).Return([]types.Container{}, nil).Once()
@ -252,7 +252,7 @@ func TestTagImage(t *testing.T) {
mock.AnythingOfType("NetworkListOptions"),
).Return([]types.NetworkResource{}, nil)
apiClient.On("VolumeList", mock.Anything, mock.Anything).Return(types.VolumesListResponse{}, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return()
apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{bytes.NewBufferString("")}, nil)
apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return(images, nil)
apiClient.On("ContainerList", mock.Anything, types.ContainerListOptions{All: true, Size: false}).Return([]types.Container{}, nil).Once()