Fix race condition. cmdWait will do the closing

Signed-off-by: David Gageot <david@gageot.net>
This commit is contained in:
David Gageot 2015-12-28 13:11:28 +01:00
parent b185bb3dbd
commit c180a7962b
1 changed files with 14 additions and 34 deletions

View File

@ -32,12 +32,11 @@ const (
type PluginStreamer interface { type PluginStreamer interface {
// Return a channel for receiving the output of the stream line by // Return a channel for receiving the output of the stream line by
// line, and a channel for stopping the stream when we are finished // line.
// reading from it.
// //
// It happens to be the case that we do this all inside of the main // It happens to be the case that we do this all inside of the main
// plugin struct today, but that may not be the case forever. // plugin struct today, but that may not be the case forever.
AttachStream(*bufio.Scanner) (<-chan string, chan<- bool) AttachStream(*bufio.Scanner) <-chan string
} }
type PluginServer interface { type PluginServer interface {
@ -155,40 +154,23 @@ func (lbe *Executor) Close() error {
return fmt.Errorf("Error waiting for binary close: %s", err) return fmt.Errorf("Error waiting for binary close: %s", err)
} }
if err := lbe.pluginStdout.Close(); err != nil {
return fmt.Errorf("Error closing plugin stdout: %s", err)
}
if err := lbe.pluginStderr.Close(); err != nil {
return fmt.Errorf("Error closing plugin stderr: %s", err)
}
return nil return nil
} }
func stream(scanner *bufio.Scanner, streamOutCh chan<- string, stopCh <-chan bool) { func stream(scanner *bufio.Scanner, streamOutCh chan<- string) {
for { for scanner.Scan() {
select { line := scanner.Text()
case <-stopCh: if err := scanner.Err(); err != nil {
close(streamOutCh) log.Warnf("Scanning stream: %s", err)
return
default:
if scanner.Scan() {
line := scanner.Text()
if err := scanner.Err(); err != nil {
log.Warnf("Scanning stream: %s", err)
}
streamOutCh <- strings.Trim(line, "\n")
}
} }
streamOutCh <- strings.Trim(line, "\n")
} }
} }
func (lbp *Plugin) AttachStream(scanner *bufio.Scanner) (<-chan string, chan<- bool) { func (lbp *Plugin) AttachStream(scanner *bufio.Scanner) <-chan string {
streamOutCh := make(chan string) streamOutCh := make(chan string)
stopCh := make(chan bool) go stream(scanner, streamOutCh)
go stream(scanner, streamOutCh, stopCh) return streamOutCh
return streamOutCh, stopCh
} }
func (lbp *Plugin) execServer() error { func (lbp *Plugin) execServer() error {
@ -207,8 +189,8 @@ func (lbp *Plugin) execServer() error {
lbp.addrCh <- strings.TrimSpace(addr) lbp.addrCh <- strings.TrimSpace(addr)
stdOutCh, stopStdoutCh := lbp.AttachStream(outScanner) stdOutCh := lbp.AttachStream(outScanner)
stdErrCh, stopStderrCh := lbp.AttachStream(errScanner) stdErrCh := lbp.AttachStream(errScanner)
for { for {
select { select {
@ -216,9 +198,7 @@ func (lbp *Plugin) execServer() error {
log.Infof(pluginOut, lbp.MachineName, out) log.Infof(pluginOut, lbp.MachineName, out)
case err := <-stdErrCh: case err := <-stdErrCh:
log.Debugf(pluginErr, lbp.MachineName, err) log.Debugf(pluginErr, lbp.MachineName, err)
case _ = <-lbp.stopCh: case <-lbp.stopCh:
stopStdoutCh <- true
stopStderrCh <- true
if err := lbp.Executor.Close(); err != nil { if err := lbp.Executor.Close(); err != nil {
return fmt.Errorf("Error closing local plugin binary: %s", err) return fmt.Errorf("Error closing local plugin binary: %s", err)
} }