mirror of https://github.com/docker/docs.git
				
				
				
			Merge pull request #2697 from dgageot/truncate-logs
Fix truncated plugin binary logs
This commit is contained in:
		
						commit
						98a490517a
					
				|  | @ -32,12 +32,11 @@ const ( | |||
| 
 | ||||
| type PluginStreamer interface { | ||||
| 	// Return a channel for receiving the output of the stream line by
 | ||||
| 	// line, and a channel for stopping the stream when we are finished
 | ||||
| 	// reading from it.
 | ||||
| 	// line.
 | ||||
| 	//
 | ||||
| 	// It happens to be the case that we do this all inside of the main
 | ||||
| 	// plugin struct today, but that may not be the case forever.
 | ||||
| 	AttachStream(*bufio.Scanner) (<-chan string, chan<- bool) | ||||
| 	AttachStream(*bufio.Scanner) <-chan string | ||||
| } | ||||
| 
 | ||||
| type PluginServer interface { | ||||
|  | @ -73,11 +72,13 @@ type Plugin struct { | |||
| 	MachineName string | ||||
| 	addrCh      chan string | ||||
| 	stopCh      chan bool | ||||
| 	timeout     time.Duration | ||||
| } | ||||
| 
 | ||||
| type Executor struct { | ||||
| 	pluginStdout, pluginStderr io.ReadCloser | ||||
| 	DriverName                 string | ||||
| 	cmd                        *exec.Cmd | ||||
| 	binaryPath                 string | ||||
| } | ||||
| 
 | ||||
|  | @ -123,14 +124,14 @@ func (lbe *Executor) Start() (*bufio.Scanner, *bufio.Scanner, error) { | |||
| 
 | ||||
| 	log.Debugf("Launching plugin server for driver %s", lbe.DriverName) | ||||
| 
 | ||||
| 	cmd := exec.Command(lbe.binaryPath) | ||||
| 	lbe.cmd = exec.Command(lbe.binaryPath) | ||||
| 
 | ||||
| 	lbe.pluginStdout, err = cmd.StdoutPipe() | ||||
| 	lbe.pluginStdout, err = lbe.cmd.StdoutPipe() | ||||
| 	if err != nil { | ||||
| 		return nil, nil, fmt.Errorf("Error getting cmd stdout pipe: %s", err) | ||||
| 	} | ||||
| 
 | ||||
| 	lbe.pluginStderr, err = cmd.StderrPipe() | ||||
| 	lbe.pluginStderr, err = lbe.cmd.StderrPipe() | ||||
| 	if err != nil { | ||||
| 		return nil, nil, fmt.Errorf("Error getting cmd stderr pipe: %s", err) | ||||
| 	} | ||||
|  | @ -141,7 +142,7 @@ func (lbe *Executor) Start() (*bufio.Scanner, *bufio.Scanner, error) { | |||
| 	os.Setenv(PluginEnvKey, PluginEnvVal) | ||||
| 	os.Setenv(PluginEnvDriverName, lbe.DriverName) | ||||
| 
 | ||||
| 	if err := cmd.Start(); err != nil { | ||||
| 	if err := lbe.cmd.Start(); err != nil { | ||||
| 		return nil, nil, fmt.Errorf("Error starting plugin binary: %s", err) | ||||
| 	} | ||||
| 
 | ||||
|  | @ -149,43 +150,27 @@ func (lbe *Executor) Start() (*bufio.Scanner, *bufio.Scanner, error) { | |||
| } | ||||
| 
 | ||||
| func (lbe *Executor) Close() error { | ||||
| 	if err := lbe.pluginStdout.Close(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	if err := lbe.pluginStderr.Close(); err != nil { | ||||
| 		return err | ||||
| 	if err := lbe.cmd.Wait(); err != nil { | ||||
| 		return fmt.Errorf("Error waiting for binary close: %s", err) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func stream(scanner *bufio.Scanner, streamOutCh chan<- string, stopCh <-chan bool) { | ||||
| 	lines := make(chan string) | ||||
| 	go func() { | ||||
| func stream(scanner *bufio.Scanner, streamOutCh chan<- string) { | ||||
| 	for scanner.Scan() { | ||||
| 			lines <- scanner.Text() | ||||
| 		} | ||||
| 	}() | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-stopCh: | ||||
| 			close(streamOutCh) | ||||
| 			return | ||||
| 		case line := <-lines: | ||||
| 			streamOutCh <- strings.Trim(line, "\n") | ||||
| 		line := scanner.Text() | ||||
| 		if err := scanner.Err(); err != nil { | ||||
| 			log.Warnf("Scanning stream: %s", err) | ||||
| 		} | ||||
| 		} | ||||
| 		streamOutCh <- strings.Trim(line, "\n") | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (lbp *Plugin) AttachStream(scanner *bufio.Scanner) (<-chan string, chan<- bool) { | ||||
| func (lbp *Plugin) AttachStream(scanner *bufio.Scanner) <-chan string { | ||||
| 	streamOutCh := make(chan string) | ||||
| 	stopCh := make(chan bool) | ||||
| 	go stream(scanner, streamOutCh, stopCh) | ||||
| 	return streamOutCh, stopCh | ||||
| 	go stream(scanner, streamOutCh) | ||||
| 	return streamOutCh | ||||
| } | ||||
| 
 | ||||
| func (lbp *Plugin) execServer() error { | ||||
|  | @ -204,8 +189,8 @@ func (lbp *Plugin) execServer() error { | |||
| 
 | ||||
| 	lbp.addrCh <- strings.TrimSpace(addr) | ||||
| 
 | ||||
| 	stdOutCh, stopStdoutCh := lbp.AttachStream(outScanner) | ||||
| 	stdErrCh, stopStderrCh := lbp.AttachStream(errScanner) | ||||
| 	stdOutCh := lbp.AttachStream(outScanner) | ||||
| 	stdErrCh := lbp.AttachStream(errScanner) | ||||
| 
 | ||||
| 	for { | ||||
| 		select { | ||||
|  | @ -213,9 +198,7 @@ func (lbp *Plugin) execServer() error { | |||
| 			log.Infof(pluginOut, lbp.MachineName, out) | ||||
| 		case err := <-stdErrCh: | ||||
| 			log.Debugf(pluginErr, lbp.MachineName, err) | ||||
| 		case _ = <-lbp.stopCh: | ||||
| 			stopStdoutCh <- true | ||||
| 			stopStderrCh <- true | ||||
| 		case <-lbp.stopCh: | ||||
| 			if err := lbp.Executor.Close(); err != nil { | ||||
| 				return fmt.Errorf("Error closing local plugin binary: %s", err) | ||||
| 			} | ||||
|  | @ -230,13 +213,17 @@ func (lbp *Plugin) Serve() error { | |||
| 
 | ||||
| func (lbp *Plugin) Address() (string, error) { | ||||
| 	if lbp.Addr == "" { | ||||
| 		if lbp.timeout == 0 { | ||||
| 			lbp.timeout = defaultTimeout | ||||
| 		} | ||||
| 
 | ||||
| 		select { | ||||
| 		case lbp.Addr = <-lbp.addrCh: | ||||
| 			log.Debugf("Plugin server listening at address %s", lbp.Addr) | ||||
| 			close(lbp.addrCh) | ||||
| 			return lbp.Addr, nil | ||||
| 		case <-time.After(defaultTimeout): | ||||
| 			return "", fmt.Errorf("Failed to dial the plugin server in %s", defaultTimeout) | ||||
| 		case <-time.After(lbp.timeout): | ||||
| 			return "", fmt.Errorf("Failed to dial the plugin server in %s", lbp.timeout) | ||||
| 		} | ||||
| 	} | ||||
| 	return lbp.Addr, nil | ||||
|  |  | |||
|  | @ -10,6 +10,7 @@ import ( | |||
| 	"os" | ||||
| 
 | ||||
| 	"github.com/docker/machine/libmachine/log" | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| ) | ||||
| 
 | ||||
| type FakeExecutor struct { | ||||
|  | @ -56,15 +57,16 @@ func TestLocalBinaryPluginAddressTimeout(t *testing.T) { | |||
| 	if testing.Short() { | ||||
| 		t.Skip("Skipping timeout test") | ||||
| 	} | ||||
| 	lbp := &Plugin{} | ||||
| 	lbp.addrCh = make(chan string, 1) | ||||
| 	go func() { | ||||
| 		_, err := lbp.Address() | ||||
| 		if err == nil { | ||||
| 			t.Fatalf("Expected to get a timeout error, instead got %s", err) | ||||
| 
 | ||||
| 	lbp := &Plugin{ | ||||
| 		addrCh:  make(chan string, 1), | ||||
| 		timeout: 1 * time.Second, | ||||
| 	} | ||||
| 	}() | ||||
| 	time.Sleep(defaultTimeout + 1) | ||||
| 
 | ||||
| 	addr, err := lbp.Address() | ||||
| 
 | ||||
| 	assert.Empty(t, addr) | ||||
| 	assert.EqualError(t, err, "Failed to dial the plugin server in 1s") | ||||
| } | ||||
| 
 | ||||
| func TestLocalBinaryPluginClose(t *testing.T) { | ||||
|  |  | |||
|  | @ -30,6 +30,7 @@ Please use this plugin through the main 'docker-machine' binary. | |||
| 	} | ||||
| 
 | ||||
| 	log.SetDebug(true) | ||||
| 	os.Setenv("MACHINE_DEBUG", "1") | ||||
| 
 | ||||
| 	rpcd := rpcdriver.NewRPCServerDriver(d) | ||||
| 	rpc.RegisterName(rpcdriver.RPCServiceNameV0, rpcd) | ||||
|  | @ -50,10 +51,12 @@ Please use this plugin through the main 'docker-machine' binary. | |||
| 	for { | ||||
| 		select { | ||||
| 		case <-rpcd.CloseCh: | ||||
| 			log.Debug("Closing plugin on server side") | ||||
| 			os.Exit(0) | ||||
| 		case <-rpcd.HeartbeatCh: | ||||
| 			continue | ||||
| 		case <-time.After(heartbeatTimeout): | ||||
| 			// TODO: Add heartbeat retry logic
 | ||||
| 			os.Exit(1) | ||||
| 		} | ||||
| 	} | ||||
|  |  | |||
|  | @ -66,8 +66,6 @@ const ( | |||
| 	RestartMethod            = `.Restart` | ||||
| 	KillMethod               = `.Kill` | ||||
| 	UpgradeMethod            = `.Upgrade` | ||||
| 	LocalArtifactPathMethod  = `.LocalArtifactPath` | ||||
| 	GlobalArtifactPathMethod = `.GlobalArtifactPath` | ||||
| ) | ||||
| 
 | ||||
| func (ic *InternalClient) Call(serviceMethod string, args interface{}, reply interface{}) error { | ||||
|  | @ -90,13 +88,14 @@ func NewInternalClient(rpcclient *rpc.Client) *InternalClient { | |||
| 
 | ||||
| func CloseDrivers() { | ||||
| 	openedDriversLock.Lock() | ||||
| 	defer openedDriversLock.Unlock() | ||||
| 
 | ||||
| 	for _, openedDriver := range openedDrivers { | ||||
| 		openedDriver.close() | ||||
| 		if err := openedDriver.close(); err != nil { | ||||
| 			log.Warnf("Error closing a plugin driver: %s", err) | ||||
| 		} | ||||
| 	} | ||||
| 	openedDrivers = []*RPCClientDriver{} | ||||
| 
 | ||||
| 	openedDriversLock.Unlock() | ||||
| } | ||||
| 
 | ||||
| func NewRPCClientDriver(driverName string, rawDriver []byte) (*RPCClientDriver, error) { | ||||
|  | @ -159,8 +158,6 @@ func NewRPCClientDriver(driverName string, rawDriver []byte) (*RPCClientDriver, | |||
| 			case <-time.After(heartbeatInterval): | ||||
| 				if err := c.Client.Call(HeartbeatMethod, struct{}{}, nil); err != nil { | ||||
| 					log.Warnf("Error attempting heartbeat call to plugin server: %s", err) | ||||
| 					c.close() | ||||
| 					return | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
|  | @ -190,12 +187,6 @@ func (c *RPCClientDriver) close() error { | |||
| 	c.heartbeatDoneCh <- true | ||||
| 	close(c.heartbeatDoneCh) | ||||
| 
 | ||||
| 	log.Debug("Making call to close connection to plugin binary") | ||||
| 
 | ||||
| 	if err := c.plugin.Close(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	log.Debug("Making call to close driver server") | ||||
| 
 | ||||
| 	if err := c.Client.Call(CloseMethod, struct{}{}, nil); err != nil { | ||||
|  | @ -204,6 +195,12 @@ func (c *RPCClientDriver) close() error { | |||
| 
 | ||||
| 	log.Debug("Successfully made call to close driver server") | ||||
| 
 | ||||
| 	log.Debug("Making call to close connection to plugin binary") | ||||
| 
 | ||||
| 	if err := c.plugin.Close(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
|  | @ -346,25 +343,6 @@ func (c *RPCClientDriver) Kill() error { | |||
| 	return c.Client.Call(KillMethod, struct{}{}, nil) | ||||
| } | ||||
| 
 | ||||
| func (c *RPCClientDriver) LocalArtifactPath(file string) string { | ||||
| 	var path string | ||||
| 
 | ||||
| 	if err := c.Client.Call(LocalArtifactPathMethod, file, &path); err != nil { | ||||
| 		log.Warnf("Error attempting call to get LocalArtifactPath: %s", err) | ||||
| 	} | ||||
| 
 | ||||
| 	return path | ||||
| } | ||||
| 
 | ||||
| func (c *RPCClientDriver) GlobalArtifactPath() string { | ||||
| 	globalArtifactPath, err := c.rpcStringCall(GlobalArtifactPathMethod) | ||||
| 	if err != nil { | ||||
| 		log.Warnf("Error attempting call to get GlobalArtifactPath: %s", err) | ||||
| 	} | ||||
| 
 | ||||
| 	return globalArtifactPath | ||||
| } | ||||
| 
 | ||||
| func (c *RPCClientDriver) Upgrade() error { | ||||
| 	return c.Client.Call(UpgradeMethod, struct{}{}, nil) | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue