diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 9587c42c11..a1c810cc43 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -111,7 +111,7 @@ }, { "ImportPath": "github.com/samalba/dockerclient", - "Rev": "48d211b4bab6028425b0cd34606c974634836986" + "Rev": "12570e600d71374233e5056ba315f657ced496c7" }, { "ImportPath": "github.com/samuel/go-zookeeper/zk", diff --git a/Godeps/_workspace/src/github.com/samalba/dockerclient/dockerclient.go b/Godeps/_workspace/src/github.com/samalba/dockerclient/dockerclient.go index fe2a648b41..cdfd56d8ca 100644 --- a/Godeps/_workspace/src/github.com/samalba/dockerclient/dockerclient.go +++ b/Godeps/_workspace/src/github.com/samalba/dockerclient/dockerclient.go @@ -30,8 +30,8 @@ type DockerClient struct { URL *url.URL HTTPClient *http.Client TLSConfig *tls.Config - monitorEvents int32 monitorStats int32 + eventStopChan chan (struct{}) } type Error struct { @@ -61,7 +61,7 @@ func NewDockerClientTimeout(daemonUrl string, tlsConfig *tls.Config, timeout tim } } httpClient := newHTTPClient(u, tlsConfig, timeout) - return &DockerClient{u, httpClient, tlsConfig, 0, 0}, nil + return &DockerClient{u, httpClient, tlsConfig, 0, nil}, nil } func (client *DockerClient) doRequest(method string, path string, body []byte, headers map[string]string) ([]byte, error) { @@ -236,16 +236,23 @@ func (client *DockerClient) readJSONStream(stream io.ReadCloser, decode func(*js go func() { decoder := json.NewDecoder(stream) - defer stream.Close() + stopped := make(chan struct{}) + go func() { + <-stopChan + stream.Close() + stopped <- struct{}{} + }() + defer close(resultChan) for { decodeResult := decode(decoder) select { - case <-stopChan: + case <-stopped: return default: resultChan <- decodeResult if decodeResult.err != nil { + stream.Close() return } } @@ -354,32 +361,27 @@ func (client *DockerClient) MonitorEvents(options *MonitorEventsOptions, stopCha } func (client *DockerClient) StartMonitorEvents(cb Callback, ec chan error, args ...interface{}) { - atomic.StoreInt32(&client.monitorEvents, 1) - go client.getEvents(cb, ec, args...) -} + client.eventStopChan = make(chan struct{}) -func (client *DockerClient) getEvents(cb Callback, ec chan error, args ...interface{}) { - uri := fmt.Sprintf("%s/%s/events", client.URL.String(), APIVersion) - resp, err := client.HTTPClient.Get(uri) - if err != nil { - ec <- err - return - } - defer resp.Body.Close() - - dec := json.NewDecoder(resp.Body) - for atomic.LoadInt32(&client.monitorEvents) > 0 { - var event *Event - if err := dec.Decode(&event); err != nil { + go func() { + eventErrChan, err := client.MonitorEvents(nil, client.eventStopChan) + if err != nil { ec <- err return } - cb(event, ec, args...) - } + + for e := range eventErrChan { + if e.Error != nil { + ec <- err + return + } + go cb(&e.Event, ec, args...) + } + }() } func (client *DockerClient) StopAllMonitorEvents() { - atomic.StoreInt32(&client.monitorEvents, 0) + close(client.eventStopChan) } func (client *DockerClient) StartMonitorStats(id string, cb StatCallback, ec chan error, args ...interface{}) { diff --git a/Godeps/_workspace/src/github.com/samalba/dockerclient/examples/events.go b/Godeps/_workspace/src/github.com/samalba/dockerclient/examples/events.go index 07d05dfc1c..2d6de40c7a 100644 --- a/Godeps/_workspace/src/github.com/samalba/dockerclient/examples/events.go +++ b/Godeps/_workspace/src/github.com/samalba/dockerclient/examples/events.go @@ -12,10 +12,15 @@ func eventCallback(e *dockerclient.Event, ec chan error, args ...interface{}) { log.Println(e) } +var ( + client *dockerclient.DockerClient +) + func waitForInterrupt() { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT) for _ = range sigChan { + client.StopAllMonitorEvents() os.Exit(0) } } @@ -26,7 +31,9 @@ func main() { log.Fatal(err) } - docker.StartMonitorEvents(eventCallback, nil) + client = docker + + client.StartMonitorEvents(eventCallback, nil) waitForInterrupt() } diff --git a/Godeps/_workspace/src/github.com/samalba/dockerclient/types.go b/Godeps/_workspace/src/github.com/samalba/dockerclient/types.go index 57150641e8..2b2ca44220 100644 --- a/Godeps/_workspace/src/github.com/samalba/dockerclient/types.go +++ b/Godeps/_workspace/src/github.com/samalba/dockerclient/types.go @@ -30,7 +30,7 @@ type ContainerConfig struct { OnBuild []string Labels map[string]string - // FIXME: Compatibility + // FIXME: The following fields have been removed since API v1.18 Memory int64 MemorySwap int64 CpuShares int64 diff --git a/cluster/engine.go b/cluster/engine.go index aa7dd84a5f..2e715a2e38 100644 --- a/cluster/engine.go +++ b/cluster/engine.go @@ -117,6 +117,9 @@ func (e *Engine) ConnectWithClient(client dockerclient.Client) error { // Disconnect will stop all monitoring of the engine. // The Engine object cannot be further used without reconnecting it first. func (e *Engine) Disconnect() { + e.Lock() + defer e.Unlock() + // do not close the chan, so it wait until the refreshLoop goroutine stops e.stopCh <- struct{}{} e.client.StopAllMonitorEvents() @@ -318,12 +321,13 @@ func (e *Engine) refreshLoop() { } else { if !e.healthy { log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Info("Engine came back to life. Hooray!") + if err := e.updateSpecs(); err != nil { + log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Update engine specs failed: %v", err) + continue + } e.client.StopAllMonitorEvents() e.client.StartMonitorEvents(e.handler, nil) e.emitEvent("engine_reconnect") - if err := e.updateSpecs(); err != nil { - log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Update engine specs failed: %v", err) - } } e.healthy = true }