From 622b50927456b70e55fb5f11fbca4f7a35b069bd Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Fri, 25 Mar 2016 21:46:53 -0700 Subject: [PATCH] use engine-api for events Signed-off-by: Victor Vieux --- api/events_test.go | 10 ++--- cluster/engine.go | 40 +++++++++-------- cluster/engine_test.go | 26 +++++++---- cluster/event.go | 4 +- cluster/event_monitor.go | 85 +++++++++++++++++++++++++++++++++++ cluster/swarm/cluster_test.go | 6 +-- 6 files changed, 133 insertions(+), 38 deletions(-) create mode 100644 cluster/event_monitor.go diff --git a/api/events_test.go b/api/events_test.go index 2da2c924d6..bc28ae3bab 100644 --- a/api/events_test.go +++ b/api/events_test.go @@ -36,10 +36,10 @@ func TestHandle(t *testing.T) { }, } - event.Event.Status = "status" - event.Event.ID = "id" - event.Event.From = "from" - event.Event.Time = 0 + event.Message.Status = "status" + event.Message.ID = "id" + event.Message.From = "from" + event.Message.Time = 0 event.Actor.Attributes = make(map[string]string) event.Actor.Attributes["nodevent.name"] = event.Engine.Name event.Actor.Attributes["nodevent.id"] = event.Engine.ID @@ -48,7 +48,7 @@ func TestHandle(t *testing.T) { assert.NoError(t, eh.Handle(event)) - event.Event.From = "from node:node_name" + event.Message.From = "from node:node_name" data, err := json.Marshal(event) assert.NoError(t, err) diff --git a/cluster/engine.go b/cluster/engine.go index 2919215183..1bfcc701fd 100644 --- a/cluster/engine.go +++ b/cluster/engine.go @@ -21,6 +21,7 @@ import ( "github.com/docker/docker/pkg/version" engineapi "github.com/docker/engine-api/client" "github.com/docker/engine-api/types" + "github.com/docker/engine-api/types/events" "github.com/docker/engine-api/types/filters" networktypes "github.com/docker/engine-api/types/network" engineapinop "github.com/docker/swarm/api/nopclient" @@ -127,6 +128,7 @@ type Engine struct { failureCount int overcommitRatio int64 opts *EngineOpts + eventsMonitor *EventsMonitor } // NewEngine is exported @@ -188,16 +190,14 @@ func (e *Engine) Connect(config *tls.Config) error { func (e *Engine) StartMonitorEvents() { log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Debug("Start monitoring events") ec := make(chan error) - e.client.StartMonitorEvents(e.handler, ec) + e.eventsMonitor.Start(ec) go func() { if err := <-ec; err != nil { - log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Error monitoring events: %s.", err) if !strings.Contains(err.Error(), "EOF") { // failing node reconnect should use back-off strategy <-e.refreshDelayer.Wait(e.getFailureCount()) } - log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Restart event monitoring.") e.StartMonitorEvents() } close(ec) @@ -208,6 +208,7 @@ func (e *Engine) StartMonitorEvents() { func (e *Engine) ConnectWithClient(client dockerclient.Client, apiClient engineapi.APIClient) error { e.client = client e.apiClient = apiClient + e.eventsMonitor = NewEventsMonitor(e.apiClient, e.handler) // Fetch the engine labels. if err := e.updateSpecs(); err != nil { @@ -246,7 +247,8 @@ func (e *Engine) Disconnect() { // close the chan close(e.stopCh) - e.client.StopAllMonitorEvents() + e.eventsMonitor.Stop() + // close idle connections if dc, ok := e.client.(*dockerclient.DockerClient); ok { closeIdleConnections(dc.HTTPClient) @@ -788,7 +790,7 @@ func (e *Engine) refreshLoop() { } if !healthy { - e.client.StopAllMonitorEvents() + e.eventsMonitor.Stop() e.StartMonitorEvents() } @@ -811,12 +813,12 @@ func (e *Engine) emitEvent(event string) { return } ev := &Event{ - Event: dockerclient.Event{ + Message: events.Message{ Status: event, From: "swarm", Type: "swarm", Action: event, - Actor: dockerclient.Actor{ + Actor: events.Actor{ Attributes: make(map[string]string), }, Time: time.Now().Unix(), @@ -1111,10 +1113,10 @@ func (e *Engine) String() string { return fmt.Sprintf("engine %s addr %s", e.ID, e.Addr) } -func (e *Engine) handler(ev *dockerclient.Event, _ chan error, args ...interface{}) { +func (e *Engine) handler(msg events.Message) error { // Something changed - refresh our internal state. - switch ev.Type { + switch msg.Type { case "network": e.RefreshNetworks() case "volume": @@ -1122,15 +1124,15 @@ func (e *Engine) handler(ev *dockerclient.Event, _ chan error, args ...interface case "image": e.RefreshImages() case "container": - switch ev.Action { + switch msg.Action { case "die", "kill", "oom", "pause", "start", "restart", "stop", "unpause", "rename": - e.refreshContainer(ev.ID, true) + e.refreshContainer(msg.ID, true) default: - e.refreshContainer(ev.ID, false) + e.refreshContainer(msg.ID, false) } case "": // docker < 1.10 - switch ev.Status { + switch msg.Status { case "pull", "untag", "delete", "commit": // These events refer to images so there's no need to update // containers. @@ -1138,12 +1140,12 @@ func (e *Engine) handler(ev *dockerclient.Event, _ chan error, args ...interface case "die", "kill", "oom", "pause", "start", "stop", "unpause", "rename": // If the container state changes, we have to do an inspect in // order to update container.Info and get the new NetworkSettings. - e.refreshContainer(ev.ID, true) + e.refreshContainer(msg.ID, true) e.RefreshVolumes() e.RefreshNetworks() default: // Otherwise, do a "soft" refresh of the container. - e.refreshContainer(ev.ID, false) + e.refreshContainer(msg.ID, false) e.RefreshVolumes() e.RefreshNetworks() } @@ -1152,15 +1154,15 @@ func (e *Engine) handler(ev *dockerclient.Event, _ chan error, args ...interface // If there is no event handler registered, abort right now. if e.eventHandler == nil { - return + return nil } event := &Event{ - Engine: e, - Event: *ev, + Engine: e, + Message: msg, } - e.eventHandler.Handle(event) + return e.eventHandler.Handle(event) } // AddContainer injects a container into the internal state. diff --git a/cluster/engine_test.go b/cluster/engine_test.go index 9de8ea4076..1db60951ec 100644 --- a/cluster/engine_test.go +++ b/cluster/engine_test.go @@ -14,6 +14,7 @@ import ( engineapi "github.com/docker/engine-api/client" "github.com/docker/engine-api/types" containertypes "github.com/docker/engine-api/types/container" + "github.com/docker/engine-api/types/events" "github.com/docker/engine-api/types/filters" networktypes "github.com/docker/engine-api/types/network" engineapimock "github.com/docker/swarm/api/mockclient" @@ -25,6 +26,13 @@ import ( "github.com/stretchr/testify/mock" ) +type infinitRead struct{} + +func (infinitRead) Read(p []byte) (n int, err error) { + p = append(p, 1) + return 1, nil +} + var ( mockInfo = types.Info{ ID: "id", @@ -180,7 +188,7 @@ func TestEngineCpusMemory(t *testing.T) { ).Return(types.VolumesListResponse{}, nil) apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.Image{}, nil) apiClient.On("ContainerList", mock.Anything, types.ContainerListOptions{All: true, Size: false}).Return([]types.Container{}, nil) - client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() + apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{infinitRead{}}, nil) assert.NoError(t, engine.ConnectWithClient(client, apiClient)) assert.True(t, engine.isConnected()) @@ -210,7 +218,7 @@ func TestEngineSpecs(t *testing.T) { ).Return(types.VolumesListResponse{}, nil) apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.Image{}, nil) apiClient.On("ContainerList", mock.Anything, types.ContainerListOptions{All: true, Size: false}).Return([]types.Container{}, nil) - client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() + apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{infinitRead{}}, nil) assert.NoError(t, engine.ConnectWithClient(client, apiClient)) assert.True(t, engine.isConnected()) @@ -243,7 +251,7 @@ func TestEngineState(t *testing.T) { apiClient.On("VolumeList", mock.Anything, mock.AnythingOfType("Args"), ).Return(types.VolumesListResponse{}, nil) - client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() + apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{infinitRead{}}, nil) // The client will return one container at first, then a second one will appear. apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.Image{}, nil).Once() @@ -265,7 +273,7 @@ func TestEngineState(t *testing.T) { } // Fake an event which will trigger a refresh. The second container will appear. - engine.handler(&dockerclient.Event{ID: "two", Status: "created"}, nil) + engine.handler(events.Message{ID: "two", Status: "created"}) containers = engine.Containers() assert.Len(t, containers, 2) if containers[0].ID != "one" && containers[1].ID != "one" { @@ -305,7 +313,8 @@ func TestCreateContainer(t *testing.T) { apiClient.On("VolumeList", mock.Anything, mock.AnythingOfType("Args"), ).Return(types.VolumesListResponse{}, nil) - client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() + apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{infinitRead{}}, nil) + client.On("ListContainers", true, false, "").Return([]dockerclient.Container{}, nil).Once() apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.Image{}, nil).Once() // filterArgs1 := filters.NewArgs() // filterArgs1.Add("id", id) @@ -419,7 +428,7 @@ func TestUsedCpus(t *testing.T) { apiClient.On("VolumeList", mock.Anything, mock.AnythingOfType("Args"), ).Return(types.VolumesListResponse{}, nil) - client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() + apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{infinitRead{}}, nil) apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.Image{}, nil).Once() apiClient.On("ContainerList", mock.Anything, types.ContainerListOptions{All: true, Size: false}).Return([]types.Container{{ID: "test"}}, nil).Once() apiClient.On("ContainerInspect", mock.Anything, "test").Return(types.ContainerJSON{ContainerJSONBase: &types.ContainerJSONBase{HostConfig: &containertypes.HostConfig{Resources: containertypes.Resources{CPUShares: cpuShares}}}, Config: &containertypes.Config{}, NetworkSettings: &types.NetworkSettings{Networks: nil}}, nil).Once() @@ -458,7 +467,7 @@ func TestContainerRemovedDuringRefresh(t *testing.T) { mock.AnythingOfType("Args"), ).Return(types.VolumesListResponse{}, nil) apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.Image{}, nil) - client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() + apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{infinitRead{}}, nil) apiClient.On("ContainerList", mock.Anything, types.ContainerListOptions{All: true, Size: false}).Return([]types.Container{container1, container2}, nil) apiClient.On("ContainerInspect", mock.Anything, "c1").Return(info1, errors.New("Not found")) apiClient.On("ContainerInspect", mock.Anything, "c2").Return(info2, nil) @@ -489,8 +498,7 @@ func TestDisconnect(t *testing.T) { apiClient.On("VolumeList", mock.Anything, mock.AnythingOfType("Args"), ).Return(types.VolumesListResponse{}, nil) - client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() - client.On("StopAllMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() + apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{infinitRead{}}, nil) // The client will return one container at first, then a second one will appear. apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.Image{}, nil) diff --git a/cluster/event.go b/cluster/event.go index 797998a62a..553d495dfb 100644 --- a/cluster/event.go +++ b/cluster/event.go @@ -5,12 +5,12 @@ import ( "sync" log "github.com/Sirupsen/logrus" - "github.com/samalba/dockerclient" + "github.com/docker/engine-api/types/events" ) // Event is exported type Event struct { - dockerclient.Event + events.Message Engine *Engine `json:"-"` } diff --git a/cluster/event_monitor.go b/cluster/event_monitor.go new file mode 100644 index 0000000000..ab80dcb75f --- /dev/null +++ b/cluster/event_monitor.go @@ -0,0 +1,85 @@ +package cluster + +import ( + "encoding/json" + "io" + + "github.com/docker/engine-api/client" + "github.com/docker/engine-api/types" + "github.com/docker/engine-api/types/events" + "golang.org/x/net/context" +) + +//EventsMonitor monitors events +type EventsMonitor struct { + stopChan chan struct{} + cli client.APIClient + handler func(msg events.Message) error +} + +type decodingResult struct { + msg events.Message + err error +} + +// NewEventsMonitor returns an EventsMonitor +func NewEventsMonitor(cli client.APIClient, handler func(msg events.Message) error) *EventsMonitor { + return &EventsMonitor{ + cli: cli, + handler: handler, + } +} + +// Start starts the EventsMonitor +func (em *EventsMonitor) Start(ec chan error) { + em.stopChan = make(chan struct{}) + + responseBody, err := em.cli.Events(context.TODO(), types.EventsOptions{}) + if err != nil { + ec <- err + return + } + + resultChan := make(chan decodingResult) + + go func() { + dec := json.NewDecoder(responseBody) + for { + var result decodingResult + result.err = dec.Decode(&result.msg) + resultChan <- result + if result.err == io.EOF { + break + } + } + close(resultChan) + }() + + go func() { + defer responseBody.Close() + for { + select { + case <-em.stopChan: + ec <- nil + return + case result := <-resultChan: + if result.err != nil { + ec <- result.err + return + } + if err := em.handler(result.msg); err != nil { + ec <- err + return + } + } + } + }() +} + +// Stop stops the EventsMonitor +func (em *EventsMonitor) Stop() { + if em.stopChan == nil { + return + } + close(em.stopChan) +} diff --git a/cluster/swarm/cluster_test.go b/cluster/swarm/cluster_test.go index 52d0da0f96..a82c8f55be 100644 --- a/cluster/swarm/cluster_test.go +++ b/cluster/swarm/cluster_test.go @@ -143,7 +143,7 @@ func TestImportImage(t *testing.T) { mock.AnythingOfType("NetworkListOptions"), ).Return([]types.NetworkResource{}, nil) apiClient.On("VolumeList", mock.Anything, mock.Anything).Return(types.VolumesListResponse{}, nil) - client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() + apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{bytes.NewBufferString("")}, nil) apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.Image{}, nil) apiClient.On("ContainerList", mock.Anything, types.ContainerListOptions{All: true, Size: false}).Return([]types.Container{}, nil).Once() @@ -196,7 +196,7 @@ func TestLoadImage(t *testing.T) { mock.AnythingOfType("NetworkListOptions"), ).Return([]types.NetworkResource{}, nil) apiClient.On("VolumeList", mock.Anything, mock.Anything).Return(types.VolumesListResponse{}, nil) - client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() + apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{bytes.NewBufferString("")}, nil) apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.Image{}, nil) apiClient.On("ContainerList", mock.Anything, types.ContainerListOptions{All: true, Size: false}).Return([]types.Container{}, nil).Once() @@ -252,7 +252,7 @@ func TestTagImage(t *testing.T) { mock.AnythingOfType("NetworkListOptions"), ).Return([]types.NetworkResource{}, nil) apiClient.On("VolumeList", mock.Anything, mock.Anything).Return(types.VolumesListResponse{}, nil) - client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() + apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(&nopCloser{bytes.NewBufferString("")}, nil) apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return(images, nil) apiClient.On("ContainerList", mock.Anything, types.ContainerListOptions{All: true, Size: false}).Return([]types.Container{}, nil).Once()