Merge pull request #941 from aluzzardi/events-panic

Engine: fix panic in disconnect with events.
This commit is contained in:
Victor Vieux 2015-06-12 19:01:28 -07:00
commit 174ea0c4d7
5 changed files with 42 additions and 29 deletions

2
Godeps/Godeps.json generated
View File

@ -111,7 +111,7 @@
}, },
{ {
"ImportPath": "github.com/samalba/dockerclient", "ImportPath": "github.com/samalba/dockerclient",
"Rev": "48d211b4bab6028425b0cd34606c974634836986" "Rev": "12570e600d71374233e5056ba315f657ced496c7"
}, },
{ {
"ImportPath": "github.com/samuel/go-zookeeper/zk", "ImportPath": "github.com/samuel/go-zookeeper/zk",

View File

@ -30,8 +30,8 @@ type DockerClient struct {
URL *url.URL URL *url.URL
HTTPClient *http.Client HTTPClient *http.Client
TLSConfig *tls.Config TLSConfig *tls.Config
monitorEvents int32
monitorStats int32 monitorStats int32
eventStopChan chan (struct{})
} }
type Error struct { type Error struct {
@ -61,7 +61,7 @@ func NewDockerClientTimeout(daemonUrl string, tlsConfig *tls.Config, timeout tim
} }
} }
httpClient := newHTTPClient(u, tlsConfig, timeout) httpClient := newHTTPClient(u, tlsConfig, timeout)
return &DockerClient{u, httpClient, tlsConfig, 0, 0}, nil return &DockerClient{u, httpClient, tlsConfig, 0, nil}, nil
} }
func (client *DockerClient) doRequest(method string, path string, body []byte, headers map[string]string) ([]byte, error) { func (client *DockerClient) doRequest(method string, path string, body []byte, headers map[string]string) ([]byte, error) {
@ -236,16 +236,23 @@ func (client *DockerClient) readJSONStream(stream io.ReadCloser, decode func(*js
go func() { go func() {
decoder := json.NewDecoder(stream) decoder := json.NewDecoder(stream)
defer stream.Close() stopped := make(chan struct{})
go func() {
<-stopChan
stream.Close()
stopped <- struct{}{}
}()
defer close(resultChan) defer close(resultChan)
for { for {
decodeResult := decode(decoder) decodeResult := decode(decoder)
select { select {
case <-stopChan: case <-stopped:
return return
default: default:
resultChan <- decodeResult resultChan <- decodeResult
if decodeResult.err != nil { if decodeResult.err != nil {
stream.Close()
return return
} }
} }
@ -354,32 +361,27 @@ func (client *DockerClient) MonitorEvents(options *MonitorEventsOptions, stopCha
} }
func (client *DockerClient) StartMonitorEvents(cb Callback, ec chan error, args ...interface{}) { func (client *DockerClient) StartMonitorEvents(cb Callback, ec chan error, args ...interface{}) {
atomic.StoreInt32(&client.monitorEvents, 1) client.eventStopChan = make(chan struct{})
go client.getEvents(cb, ec, args...)
}
func (client *DockerClient) getEvents(cb Callback, ec chan error, args ...interface{}) { go func() {
uri := fmt.Sprintf("%s/%s/events", client.URL.String(), APIVersion) eventErrChan, err := client.MonitorEvents(nil, client.eventStopChan)
resp, err := client.HTTPClient.Get(uri) if err != nil {
if err != nil {
ec <- err
return
}
defer resp.Body.Close()
dec := json.NewDecoder(resp.Body)
for atomic.LoadInt32(&client.monitorEvents) > 0 {
var event *Event
if err := dec.Decode(&event); err != nil {
ec <- err ec <- err
return return
} }
cb(event, ec, args...)
} for e := range eventErrChan {
if e.Error != nil {
ec <- err
return
}
go cb(&e.Event, ec, args...)
}
}()
} }
func (client *DockerClient) StopAllMonitorEvents() { func (client *DockerClient) StopAllMonitorEvents() {
atomic.StoreInt32(&client.monitorEvents, 0) close(client.eventStopChan)
} }
func (client *DockerClient) StartMonitorStats(id string, cb StatCallback, ec chan error, args ...interface{}) { func (client *DockerClient) StartMonitorStats(id string, cb StatCallback, ec chan error, args ...interface{}) {

View File

@ -12,10 +12,15 @@ func eventCallback(e *dockerclient.Event, ec chan error, args ...interface{}) {
log.Println(e) log.Println(e)
} }
var (
client *dockerclient.DockerClient
)
func waitForInterrupt() { func waitForInterrupt() {
sigChan := make(chan os.Signal, 1) sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
for _ = range sigChan { for _ = range sigChan {
client.StopAllMonitorEvents()
os.Exit(0) os.Exit(0)
} }
} }
@ -26,7 +31,9 @@ func main() {
log.Fatal(err) log.Fatal(err)
} }
docker.StartMonitorEvents(eventCallback, nil) client = docker
client.StartMonitorEvents(eventCallback, nil)
waitForInterrupt() waitForInterrupt()
} }

View File

@ -30,7 +30,7 @@ type ContainerConfig struct {
OnBuild []string OnBuild []string
Labels map[string]string Labels map[string]string
// FIXME: Compatibility // FIXME: The following fields have been removed since API v1.18
Memory int64 Memory int64
MemorySwap int64 MemorySwap int64
CpuShares int64 CpuShares int64

View File

@ -117,6 +117,9 @@ func (e *Engine) ConnectWithClient(client dockerclient.Client) error {
// Disconnect will stop all monitoring of the engine. // Disconnect will stop all monitoring of the engine.
// The Engine object cannot be further used without reconnecting it first. // The Engine object cannot be further used without reconnecting it first.
func (e *Engine) Disconnect() { func (e *Engine) Disconnect() {
e.Lock()
defer e.Unlock()
// do not close the chan, so it wait until the refreshLoop goroutine stops // do not close the chan, so it wait until the refreshLoop goroutine stops
e.stopCh <- struct{}{} e.stopCh <- struct{}{}
e.client.StopAllMonitorEvents() e.client.StopAllMonitorEvents()
@ -318,12 +321,13 @@ func (e *Engine) refreshLoop() {
} else { } else {
if !e.healthy { if !e.healthy {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Info("Engine came back to life. Hooray!") log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Info("Engine came back to life. Hooray!")
if err := e.updateSpecs(); err != nil {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Update engine specs failed: %v", err)
continue
}
e.client.StopAllMonitorEvents() e.client.StopAllMonitorEvents()
e.client.StartMonitorEvents(e.handler, nil) e.client.StartMonitorEvents(e.handler, nil)
e.emitEvent("engine_reconnect") e.emitEvent("engine_reconnect")
if err := e.updateSpecs(); err != nil {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Update engine specs failed: %v", err)
}
} }
e.healthy = true e.healthy = true
} }