diff --git a/api/client/logs.go b/api/client/logs.go index a1375226b1..0f4be3dac6 100644 --- a/api/client/logs.go +++ b/api/client/logs.go @@ -19,7 +19,7 @@ func (cli *DockerCli) CmdLogs(args ...string) error { follow := cmd.Bool([]string{"f", "-follow"}, false, "Follow log output") since := cmd.String([]string{"-since"}, "", "Show logs since timestamp") times := cmd.Bool([]string{"t", "-timestamps"}, false, "Show timestamps") - tail := cmd.String([]string{"-tail"}, "latest", "Number of lines to show from the end of the logs") + tail := cmd.String([]string{"-tail"}, "all", "Number of lines to show from the end of the logs") cmd.Require(flag.Exact, 1) cmd.ParseFlags(args, true) diff --git a/api/server/server.go b/api/server/server.go index 0c165d024e..23af4d7e2b 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -629,6 +629,17 @@ func (s *Server) getContainersLogs(version version.Version, w http.ResponseWrite closeNotifier = notifier.CloseNotify() } + c, err := s.daemon.Get(vars["name"]) + if err != nil { + return err + } + + outStream := ioutils.NewWriteFlusher(w) + // write an empty chunk of data (this is to ensure that the + // HTTP Response is sent immediatly, even if the container has + // not yet produced any data) + outStream.Write(nil) + logsConfig := &daemon.ContainerLogsConfig{ Follow: boolValue(r, "follow"), Timestamps: boolValue(r, "timestamps"), @@ -636,11 +647,11 @@ func (s *Server) getContainersLogs(version version.Version, w http.ResponseWrite Tail: r.Form.Get("tail"), UseStdout: stdout, UseStderr: stderr, - OutStream: ioutils.NewWriteFlusher(w), + OutStream: outStream, Stop: closeNotifier, } - if err := s.daemon.ContainerLogs(vars["name"], logsConfig); err != nil { + if err := s.daemon.ContainerLogs(c, logsConfig); err != nil { fmt.Fprintf(w, "Error running logs job: %s\n", err) } diff --git a/daemon/container.go b/daemon/container.go index bc9d5ab043..4f046b75db 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -25,7 +25,6 @@ import ( "github.com/docker/docker/pkg/broadcastwriter" "github.com/docker/docker/pkg/fileutils" "github.com/docker/docker/pkg/ioutils" - "github.com/docker/docker/pkg/jsonlog" "github.com/docker/docker/pkg/mount" "github.com/docker/docker/pkg/nat" "github.com/docker/docker/pkg/promise" @@ -721,6 +720,9 @@ func (container *Container) getLogConfig() runconfig.LogConfig { } func (container *Container) getLogger() (logger.Logger, error) { + if container.logDriver != nil && container.IsRunning() { + return container.logDriver, nil + } cfg := container.getLogConfig() if err := logger.ValidateLogOpts(cfg.Type, cfg.Config); err != nil { return nil, err @@ -894,36 +896,33 @@ func (c *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writ } func (c *Container) AttachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error { - if logs { logDriver, err := c.getLogger() if err != nil { - logrus.Errorf("Error obtaining the logger %v", err) return err } - if _, ok := logDriver.(logger.Reader); !ok { - logrus.Errorf("cannot read logs for [%s] driver", logDriver.Name()) - } else { - if cLog, err := logDriver.(logger.Reader).ReadLog(); err != nil { - logrus.Errorf("Error reading logs %v", err) - } else { - dec := json.NewDecoder(cLog) - for { - l := &jsonlog.JSONLog{} + cLog, ok := logDriver.(logger.LogReader) + if !ok { + return logger.ErrReadLogsNotSupported + } + logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1}) - if err := dec.Decode(l); err == io.EOF { - break - } else if err != nil { - logrus.Errorf("Error streaming logs: %s", err) - break - } - if l.Stream == "stdout" && stdout != nil { - io.WriteString(stdout, l.Log) - } - if l.Stream == "stderr" && stderr != nil { - io.WriteString(stderr, l.Log) - } + LogLoop: + for { + select { + case msg, ok := <-logs.Msg: + if !ok { + break LogLoop } + if msg.Source == "stdout" && stdout != nil { + stdout.Write(msg.Line) + } + if msg.Source == "stderr" && stderr != nil { + stderr.Write(msg.Line) + } + case err := <-logs.Err: + logrus.Errorf("Error streaming logs: %v", err) + break LogLoop } } } diff --git a/daemon/logger/factory.go b/daemon/logger/factory.go index a086c51ebc..14b09596a2 100644 --- a/daemon/logger/factory.go +++ b/daemon/logger/factory.go @@ -27,6 +27,7 @@ type Context struct { LogPath string } +// Hostname returns the hostname from the underlying OS func (ctx *Context) Hostname() (string, error) { hostname, err := os.Hostname() if err != nil { @@ -35,6 +36,7 @@ func (ctx *Context) Hostname() (string, error) { return hostname, nil } +// Command returns the command that the container being logged was started with func (ctx *Context) Command() string { terms := []string{ctx.ContainerEntrypoint} for _, arg := range ctx.ContainerArgs { diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index be1572bc31..383aada822 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -2,32 +2,42 @@ package jsonfilelog import ( "bytes" + "encoding/json" "fmt" "io" "os" "strconv" "sync" + "time" + + "gopkg.in/fsnotify.v1" "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/logger" + "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/jsonlog" + "github.com/docker/docker/pkg/pubsub" + "github.com/docker/docker/pkg/tailfile" "github.com/docker/docker/pkg/timeutils" "github.com/docker/docker/pkg/units" ) const ( - Name = "json-file" + Name = "json-file" + maxJSONDecodeRetry = 10 ) // JSONFileLogger is Logger implementation for default docker logging: // JSON objects to file type JSONFileLogger struct { - buf *bytes.Buffer - f *os.File // store for closing - mu sync.Mutex // protects buffer - capacity int64 //maximum size of each file - n int //maximum number of files - ctx logger.Context + buf *bytes.Buffer + f *os.File // store for closing + mu sync.Mutex // protects buffer + capacity int64 //maximum size of each file + n int //maximum number of files + ctx logger.Context + readers map[*logger.LogWatcher]struct{} // stores the active log followers + notifyRotate *pubsub.Publisher } func init() { @@ -64,11 +74,13 @@ func New(ctx logger.Context) (logger.Logger, error) { } } return &JSONFileLogger{ - f: log, - buf: bytes.NewBuffer(nil), - ctx: ctx, - capacity: capval, - n: maxFiles, + f: log, + buf: bytes.NewBuffer(nil), + ctx: ctx, + capacity: capval, + n: maxFiles, + readers: make(map[*logger.LogWatcher]struct{}), + notifyRotate: pubsub.NewPublisher(0, 1), }, nil } @@ -111,6 +123,7 @@ func writeLog(l *JSONFileLogger) (int64, error) { return -1, err } l.f = file + l.notifyRotate.Publish(struct{}{}) } return writeToBuf(l) } @@ -148,11 +161,11 @@ func backup(old, curr string) error { } } if _, err := os.Stat(curr); os.IsNotExist(err) { - if f, err := os.Create(curr); err != nil { + f, err := os.Create(curr) + if err != nil { return err - } else { - f.Close() } + f.Close() } return os.Rename(curr, old) } @@ -169,31 +182,200 @@ func ValidateLogOpt(cfg map[string]string) error { return nil } -func (l *JSONFileLogger) ReadLog(args ...string) (io.Reader, error) { - pth := l.ctx.LogPath - if len(args) > 0 { - //check if args[0] is an integer index - index, err := strconv.ParseInt(args[0], 0, 0) - if err != nil { - return nil, err - } - if index > 0 { - pth = pth + "." + args[0] - } - } - return os.Open(pth) -} - func (l *JSONFileLogger) LogPath() string { return l.ctx.LogPath } -// Close closes underlying file +// Close closes underlying file and signals all readers to stop func (l *JSONFileLogger) Close() error { - return l.f.Close() + l.mu.Lock() + err := l.f.Close() + for r := range l.readers { + r.Close() + delete(l.readers, r) + } + l.mu.Unlock() + return err } // Name returns name of this logger func (l *JSONFileLogger) Name() string { return Name } + +func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) { + l.Reset() + if err := dec.Decode(l); err != nil { + return nil, err + } + msg := &logger.Message{ + Source: l.Stream, + Timestamp: l.Created, + Line: []byte(l.Log), + } + return msg, nil +} + +// Reads from the log file +func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { + logWatcher := logger.NewLogWatcher() + + go l.readLogs(logWatcher, config) + return logWatcher +} + +func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) { + defer close(logWatcher.Msg) + + pth := l.ctx.LogPath + var files []io.ReadSeeker + for i := l.n; i > 1; i-- { + f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1)) + if err != nil { + if !os.IsNotExist(err) { + logWatcher.Err <- err + break + } + continue + } + defer f.Close() + files = append(files, f) + } + + latestFile, err := os.Open(pth) + if err != nil { + logWatcher.Err <- err + return + } + defer latestFile.Close() + + files = append(files, latestFile) + tailer := ioutils.MultiReadSeeker(files...) + + if config.Tail != 0 { + tailFile(tailer, logWatcher, config.Tail, config.Since) + } + + if !config.Follow { + return + } + if config.Tail == 0 { + latestFile.Seek(0, os.SEEK_END) + } + + l.mu.Lock() + l.readers[logWatcher] = struct{}{} + l.mu.Unlock() + + notifyRotate := l.notifyRotate.Subscribe() + followLogs(latestFile, logWatcher, notifyRotate, config.Since) + + l.mu.Lock() + delete(l.readers, logWatcher) + l.mu.Unlock() + + l.notifyRotate.Evict(notifyRotate) +} + +func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) { + var rdr io.Reader = f + if tail > 0 { + ls, err := tailfile.TailFile(f, tail) + if err != nil { + logWatcher.Err <- err + return + } + rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n"))) + } + dec := json.NewDecoder(rdr) + l := &jsonlog.JSONLog{} + for { + msg, err := decodeLogLine(dec, l) + if err != nil { + if err != io.EOF { + logWatcher.Err <- err + } + return + } + if !since.IsZero() && msg.Timestamp.Before(since) { + continue + } + logWatcher.Msg <- msg + } +} + +func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) { + dec := json.NewDecoder(f) + l := &jsonlog.JSONLog{} + fileWatcher, err := fsnotify.NewWatcher() + if err != nil { + logWatcher.Err <- err + return + } + defer fileWatcher.Close() + if err := fileWatcher.Add(f.Name()); err != nil { + logWatcher.Err <- err + return + } + + var retries int + for { + msg, err := decodeLogLine(dec, l) + if err != nil { + if err != io.EOF { + // try again because this shouldn't happen + if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry { + dec = json.NewDecoder(f) + retries += 1 + continue + } + logWatcher.Err <- err + return + } + + select { + case <-fileWatcher.Events: + dec = json.NewDecoder(f) + continue + case <-fileWatcher.Errors: + logWatcher.Err <- err + return + case <-logWatcher.WatchClose(): + return + case <-notifyRotate: + fileWatcher.Remove(f.Name()) + + f, err = os.Open(f.Name()) + if err != nil { + logWatcher.Err <- err + return + } + if err := fileWatcher.Add(f.Name()); err != nil { + logWatcher.Err <- err + } + dec = json.NewDecoder(f) + continue + } + } + + retries = 0 // reset retries since we've succeeded + if !since.IsZero() && msg.Timestamp.Before(since) { + continue + } + select { + case logWatcher.Msg <- msg: + case <-logWatcher.WatchClose(): + logWatcher.Msg <- msg + for { + msg, err := decodeLogLine(dec, l) + if err != nil { + return + } + if !since.IsZero() && msg.Timestamp.Before(since) { + continue + } + logWatcher.Msg <- msg + } + } + } +} diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index e0d1b6a38a..96421f4b96 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -2,11 +2,19 @@ package logger import ( "errors" - "io" "time" + + "github.com/docker/docker/pkg/timeutils" ) -var ReadLogsNotSupported = errors.New("configured logging reader does not support reading") +// ErrReadLogsNotSupported is returned when the logger does not support reading logs +var ErrReadLogsNotSupported = errors.New("configured logging reader does not support reading") + +const ( + // TimeFormat is the time format used for timestamps sent to log readers + TimeFormat = timeutils.RFC3339NanoFixed + logWatcherBufferSize = 4096 +) // Message is datastructure that represents record from some container type Message struct { @@ -16,14 +24,51 @@ type Message struct { Timestamp time.Time } -// Logger is interface for docker logging drivers +// Logger is the interface for docker logging drivers type Logger interface { Log(*Message) error Name() string Close() error } -//Reader is an interface for docker logging drivers that support reading -type Reader interface { - ReadLog(args ...string) (io.Reader, error) +// ReadConfig is the configuration passed into ReadLogs +type ReadConfig struct { + Since time.Time + Tail int + Follow bool +} + +// LogReader is the interface for reading log messages for loggers that support reading +type LogReader interface { + // Read logs from underlying logging backend + ReadLogs(ReadConfig) *LogWatcher +} + +// LogWatcher is used when consuming logs read from the LogReader interface +type LogWatcher struct { + // For sending log messages to a reader + Msg chan *Message + // For sending error messages that occur while while reading logs + Err chan error + closeNotifier chan struct{} +} + +// NewLogWatcher returns a new LogWatcher. +func NewLogWatcher() *LogWatcher { + return &LogWatcher{ + Msg: make(chan *Message, logWatcherBufferSize), + Err: make(chan error, 1), + closeNotifier: make(chan struct{}), + } +} + +// Close notifies the underlying log reader to stop +func (w *LogWatcher) Close() { + close(w.closeNotifier) +} + +// WatchClose returns a channel receiver that receives notification when the watcher has been closed +// This should only be called from one goroutine +func (w *LogWatcher) WatchClose() <-chan struct{} { + return w.closeNotifier } diff --git a/daemon/logs.go b/daemon/logs.go index b9e0d964b7..e032c5716e 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -1,23 +1,14 @@ package daemon import ( - "bytes" - "encoding/json" "fmt" "io" - "net" - "os" "strconv" - "syscall" "time" "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/logger" - "github.com/docker/docker/daemon/logger/jsonfilelog" - "github.com/docker/docker/pkg/jsonlog" "github.com/docker/docker/pkg/stdcopy" - "github.com/docker/docker/pkg/tailfile" - "github.com/docker/docker/pkg/timeutils" ) type ContainerLogsConfig struct { @@ -29,209 +20,64 @@ type ContainerLogsConfig struct { Stop <-chan bool } -func (daemon *Daemon) ContainerLogs(name string, config *ContainerLogsConfig) error { - var ( - lines = -1 - format string - ) +func (daemon *Daemon) ContainerLogs(container *Container, config *ContainerLogsConfig) error { if !(config.UseStdout || config.UseStderr) { return fmt.Errorf("You must choose at least one stream") } - if config.Timestamps { - format = timeutils.RFC3339NanoFixed - } - if config.Tail == "" { - config.Tail = "latest" - } - container, err := daemon.Get(name) - if err != nil { - return err - } - - var ( - outStream = config.OutStream - errStream io.Writer - ) + outStream := config.OutStream + errStream := outStream if !container.Config.Tty { errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr) outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout) - } else { - errStream = outStream } - if container.LogDriverType() != jsonfilelog.Name { - return fmt.Errorf("\"logs\" endpoint is supported only for \"json-file\" logging driver") - } - - maxFile := 1 - container.readHostConfig() - cfg := container.getLogConfig() - conf := cfg.Config - if val, ok := conf["max-file"]; ok { - var err error - maxFile, err = strconv.Atoi(val) - if err != nil { - return fmt.Errorf("Error reading max-file value: %s", err) - } - } - - logDriver, err := container.getLogger() + cLog, err := container.getLogger() if err != nil { return err } - _, ok := logDriver.(logger.Reader) + logReader, ok := cLog.(logger.LogReader) if !ok { - logrus.Errorf("Cannot read logs of the [%s] driver", logDriver.Name()) - } else { - // json-file driver - if config.Tail != "all" && config.Tail != "latest" { - var err error - lines, err = strconv.Atoi(config.Tail) - if err != nil { - logrus.Errorf("Failed to parse tail %s, error: %v, show all logs", config.Tail, err) - lines = -1 - } - } - - if lines != 0 { - n := maxFile - if config.Tail == "latest" && config.Since.IsZero() { - n = 1 - } - before := false - for i := n; i > 0; i-- { - if before { - break - } - cLog, err := getReader(logDriver, i, n, lines) - if err != nil { - logrus.Debugf("Error reading %d log file: %v", i-1, err) - continue - } - //if lines are specified, then iterate only once - if lines > 0 { - i = 1 - } else { // if lines are not specified, cLog is a file, It needs to be closed - defer cLog.(*os.File).Close() - } - dec := json.NewDecoder(cLog) - l := &jsonlog.JSONLog{} - for { - l.Reset() - if err := dec.Decode(l); err == io.EOF { - break - } else if err != nil { - logrus.Errorf("Error streaming logs: %s", err) - break - } - logLine := l.Log - if !config.Since.IsZero() && l.Created.Before(config.Since) { - continue - } - if config.Timestamps { - // format can be "" or time format, so here can't be error - logLine, _ = l.Format(format) - } - if l.Stream == "stdout" && config.UseStdout { - io.WriteString(outStream, logLine) - } - if l.Stream == "stderr" && config.UseStderr { - io.WriteString(errStream, logLine) - } - } - } - } + return logger.ErrReadLogsNotSupported } - if config.Follow && container.IsRunning() { - chErrStderr := make(chan error) - chErrStdout := make(chan error) - var stdoutPipe, stderrPipe io.ReadCloser + follow := config.Follow && container.IsRunning() + tailLines, err := strconv.Atoi(config.Tail) + if err != nil { + tailLines = -1 + } - // write an empty chunk of data (this is to ensure that the - // HTTP Response is sent immediatly, even if the container has - // not yet produced any data) - outStream.Write(nil) - - if config.UseStdout { - stdoutPipe = container.StdoutLogPipe() - go func() { - logrus.Debug("logs: stdout stream begin") - chErrStdout <- jsonlog.WriteLog(stdoutPipe, outStream, format, config.Since) - logrus.Debug("logs: stdout stream end") - }() - } - if config.UseStderr { - stderrPipe = container.StderrLogPipe() - go func() { - logrus.Debug("logs: stderr stream begin") - chErrStderr <- jsonlog.WriteLog(stderrPipe, errStream, format, config.Since) - logrus.Debug("logs: stderr stream end") - }() - } + logrus.Debug("logs: begin stream") + readConfig := logger.ReadConfig{ + Since: config.Since, + Tail: tailLines, + Follow: follow, + } + logs := logReader.ReadLogs(readConfig) + for { select { - case err = <-chErrStderr: - if stdoutPipe != nil { - stdoutPipe.Close() - <-chErrStdout - } - case err = <-chErrStdout: - if stderrPipe != nil { - stderrPipe.Close() - <-chErrStderr - } - case <-config.Stop: - if stdoutPipe != nil { - stdoutPipe.Close() - <-chErrStdout - } - if stderrPipe != nil { - stderrPipe.Close() - <-chErrStderr - } + case err := <-logs.Err: + logrus.Errorf("Error streaming logs: %v", err) return nil - } - - if err != nil && err != io.EOF && err != io.ErrClosedPipe { - if e, ok := err.(*net.OpError); ok && e.Err != syscall.EPIPE { - logrus.Errorf("error streaming logs: %v", err) + case <-config.Stop: + logs.Close() + return nil + case msg, ok := <-logs.Msg: + if !ok { + logrus.Debugf("logs: end stream") + return nil + } + logLine := msg.Line + if config.Timestamps { + logLine = append([]byte(msg.Timestamp.Format(logger.TimeFormat)+" "), logLine...) + } + if msg.Source == "stdout" && config.UseStdout { + outStream.Write(logLine) + } + if msg.Source == "stderr" && config.UseStderr { + errStream.Write(logLine) } } } - return nil -} - -func getReader(logDriver logger.Logger, fileIndex, maxFiles, lines int) (io.Reader, error) { - if lines <= 0 { - index := strconv.Itoa(fileIndex - 1) - cLog, err := logDriver.(logger.Reader).ReadLog(index) - return cLog, err - } - buf := bytes.NewBuffer([]byte{}) - remaining := lines - for i := 0; i < maxFiles; i++ { - index := strconv.Itoa(i) - cLog, err := logDriver.(logger.Reader).ReadLog(index) - if err != nil { - return buf, err - } - f := cLog.(*os.File) - ls, err := tailfile.TailFile(f, remaining) - if err != nil { - return buf, err - } - tmp := bytes.NewBuffer([]byte{}) - for _, l := range ls { - fmt.Fprintf(tmp, "%s\n", l) - } - tmp.ReadFrom(buf) - buf = tmp - if len(ls) == remaining { - return buf, nil - } - remaining = remaining - len(ls) - } - return buf, nil } diff --git a/daemon/monitor.go b/daemon/monitor.go index ff173c8f03..1f020574b0 100644 --- a/daemon/monitor.go +++ b/daemon/monitor.go @@ -12,7 +12,10 @@ import ( "github.com/docker/docker/runconfig" ) -const defaultTimeIncrement = 100 +const ( + defaultTimeIncrement = 100 + loggerCloseTimeout = 10 * time.Second +) // containerMonitor monitors the execution of a container's main process. // If a restart policy is specified for the container the monitor will ensure that the @@ -310,7 +313,7 @@ func (m *containerMonitor) resetContainer(lock bool) { close(exit) }() select { - case <-time.After(1 * time.Second): + case <-time.After(loggerCloseTimeout): logrus.Warnf("Logger didn't exit in time: logs may be truncated") case <-exit: } diff --git a/docs/reference/commandline/logs.md b/docs/reference/commandline/logs.md index db97143dec..a2e69e4d21 100644 --- a/docs/reference/commandline/logs.md +++ b/docs/reference/commandline/logs.md @@ -29,7 +29,7 @@ The `docker logs --follow` command will continue streaming the new output from the container's `STDOUT` and `STDERR`. Passing a negative number or a non-integer to `--tail` is invalid and the -value is set to `latest` in that case. +value is set to `all` in that case. The `docker logs --timestamp` commands will add an RFC3339Nano timestamp, for example `2014-09-16T06:17:46.000000000Z`, to each diff --git a/integration-cli/docker_cli_logs_test.go b/integration-cli/docker_cli_logs_test.go index 40a5c153f4..ebcd4ecafd 100644 --- a/integration-cli/docker_cli_logs_test.go +++ b/integration-cli/docker_cli_logs_test.go @@ -250,13 +250,9 @@ func (s *DockerSuite) TestLogsFollowSlowStdoutConsumer(c *check.C) { }() logCmd := exec.Command(dockerBinary, "logs", "-f", cleanedContainerID) - stdout, err := logCmd.StdoutPipe() c.Assert(err, check.IsNil) - - if err := logCmd.Start(); err != nil { - c.Fatal(err) - } + c.Assert(logCmd.Start(), check.IsNil) // First read slowly bytes1, err := consumeWithSpeed(stdout, 10, 50*time.Millisecond, stopSlowRead) diff --git a/pkg/ioutils/multireader.go b/pkg/ioutils/multireader.go new file mode 100644 index 0000000000..f231aa9daf --- /dev/null +++ b/pkg/ioutils/multireader.go @@ -0,0 +1,226 @@ +package ioutils + +import ( + "bytes" + "fmt" + "io" + "os" +) + +type pos struct { + idx int + offset int64 +} + +type multiReadSeeker struct { + readers []io.ReadSeeker + pos *pos + posIdx map[io.ReadSeeker]int +} + +func (r *multiReadSeeker) Seek(offset int64, whence int) (int64, error) { + var tmpOffset int64 + switch whence { + case os.SEEK_SET: + for i, rdr := range r.readers { + // get size of the current reader + s, err := rdr.Seek(0, os.SEEK_END) + if err != nil { + return -1, err + } + + if offset > tmpOffset+s { + if i == len(r.readers)-1 { + rdrOffset := s + (offset - tmpOffset) + if _, err := rdr.Seek(rdrOffset, os.SEEK_SET); err != nil { + return -1, err + } + r.pos = &pos{i, rdrOffset} + return offset, nil + } + + tmpOffset += s + continue + } + + rdrOffset := offset - tmpOffset + idx := i + + rdr.Seek(rdrOffset, os.SEEK_SET) + // make sure all following readers are at 0 + for _, rdr := range r.readers[i+1:] { + rdr.Seek(0, os.SEEK_SET) + } + + if rdrOffset == s && i != len(r.readers)-1 { + idx += 1 + rdrOffset = 0 + } + r.pos = &pos{idx, rdrOffset} + return offset, nil + } + case os.SEEK_END: + for _, rdr := range r.readers { + s, err := rdr.Seek(0, os.SEEK_END) + if err != nil { + return -1, err + } + tmpOffset += s + } + r.Seek(tmpOffset+offset, os.SEEK_SET) + return tmpOffset + offset, nil + case os.SEEK_CUR: + if r.pos == nil { + return r.Seek(offset, os.SEEK_SET) + } + // Just return the current offset + if offset == 0 { + return r.getCurOffset() + } + + curOffset, err := r.getCurOffset() + if err != nil { + return -1, err + } + rdr, rdrOffset, err := r.getReaderForOffset(curOffset + offset) + if err != nil { + return -1, err + } + + r.pos = &pos{r.posIdx[rdr], rdrOffset} + return curOffset + offset, nil + default: + return -1, fmt.Errorf("Invalid whence: %d", whence) + } + + return -1, fmt.Errorf("Error seeking for whence: %d, offset: %d", whence, offset) +} + +func (r *multiReadSeeker) getReaderForOffset(offset int64) (io.ReadSeeker, int64, error) { + var rdr io.ReadSeeker + var rdrOffset int64 + + for i, rdr := range r.readers { + offsetTo, err := r.getOffsetToReader(rdr) + if err != nil { + return nil, -1, err + } + if offsetTo > offset { + rdr = r.readers[i-1] + rdrOffset = offsetTo - offset + break + } + + if rdr == r.readers[len(r.readers)-1] { + rdrOffset = offsetTo + offset + break + } + } + + return rdr, rdrOffset, nil +} + +func (r *multiReadSeeker) getCurOffset() (int64, error) { + var totalSize int64 + for _, rdr := range r.readers[:r.pos.idx+1] { + if r.posIdx[rdr] == r.pos.idx { + totalSize += r.pos.offset + break + } + + size, err := getReadSeekerSize(rdr) + if err != nil { + return -1, fmt.Errorf("error getting seeker size: %v", err) + } + totalSize += size + } + return totalSize, nil +} + +func (r *multiReadSeeker) getOffsetToReader(rdr io.ReadSeeker) (int64, error) { + var offset int64 + for _, r := range r.readers { + if r == rdr { + break + } + + size, err := getReadSeekerSize(rdr) + if err != nil { + return -1, err + } + offset += size + } + return offset, nil +} + +func (r *multiReadSeeker) Read(b []byte) (int, error) { + if r.pos == nil { + r.pos = &pos{0, 0} + } + + bCap := int64(cap(b)) + buf := bytes.NewBuffer(nil) + var rdr io.ReadSeeker + + for _, rdr = range r.readers[r.pos.idx:] { + readBytes, err := io.CopyN(buf, rdr, bCap) + if err != nil && err != io.EOF { + return -1, err + } + bCap -= readBytes + + if bCap == 0 { + break + } + } + + rdrPos, err := rdr.Seek(0, os.SEEK_CUR) + if err != nil { + return -1, err + } + r.pos = &pos{r.posIdx[rdr], rdrPos} + return buf.Read(b) +} + +func getReadSeekerSize(rdr io.ReadSeeker) (int64, error) { + // save the current position + pos, err := rdr.Seek(0, os.SEEK_CUR) + if err != nil { + return -1, err + } + + // get the size + size, err := rdr.Seek(0, os.SEEK_END) + if err != nil { + return -1, err + } + + // reset the position + if _, err := rdr.Seek(pos, os.SEEK_SET); err != nil { + return -1, err + } + return size, nil +} + +// MultiReadSeeker returns a ReadSeeker that's the logical concatenation of the provided +// input readseekers. After calling this method the initial position is set to the +// beginning of the first ReadSeeker. At the end of a ReadSeeker, Read always advances +// to the beginning of the next ReadSeeker and returns EOF at the end of the last ReadSeeker. +// Seek can be used over the sum of lengths of all readseekers. +// +// When a MultiReadSeeker is used, no Read and Seek operations should be made on +// its ReadSeeker components. Also, users should make no assumption on the state +// of individual readseekers while the MultiReadSeeker is used. +func MultiReadSeeker(readers ...io.ReadSeeker) io.ReadSeeker { + if len(readers) == 1 { + return readers[0] + } + idx := make(map[io.ReadSeeker]int) + for i, rdr := range readers { + idx[rdr] = i + } + return &multiReadSeeker{ + readers: readers, + posIdx: idx, + } +} diff --git a/pkg/ioutils/multireader_test.go b/pkg/ioutils/multireader_test.go new file mode 100644 index 0000000000..de495b56da --- /dev/null +++ b/pkg/ioutils/multireader_test.go @@ -0,0 +1,149 @@ +package ioutils + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "strings" + "testing" +) + +func TestMultiReadSeekerReadAll(t *testing.T) { + str := "hello world" + s1 := strings.NewReader(str + " 1") + s2 := strings.NewReader(str + " 2") + s3 := strings.NewReader(str + " 3") + mr := MultiReadSeeker(s1, s2, s3) + + expectedSize := int64(s1.Len() + s2.Len() + s3.Len()) + + b, err := ioutil.ReadAll(mr) + if err != nil { + t.Fatal(err) + } + + expected := "hello world 1hello world 2hello world 3" + if string(b) != expected { + t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected) + } + + size, err := mr.Seek(0, os.SEEK_END) + if err != nil { + t.Fatal(err) + } + if size != expectedSize { + t.Fatalf("reader size does not match, got %d, expected %d", size, expectedSize) + } + + // Reset the position and read again + pos, err := mr.Seek(0, os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + if pos != 0 { + t.Fatalf("expected position to be set to 0, got %d", pos) + } + + b, err = ioutil.ReadAll(mr) + if err != nil { + t.Fatal(err) + } + + if string(b) != expected { + t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected) + } +} + +func TestMultiReadSeekerReadEach(t *testing.T) { + str := "hello world" + s1 := strings.NewReader(str + " 1") + s2 := strings.NewReader(str + " 2") + s3 := strings.NewReader(str + " 3") + mr := MultiReadSeeker(s1, s2, s3) + + var totalBytes int64 + for i, s := range []*strings.Reader{s1, s2, s3} { + sLen := int64(s.Len()) + buf := make([]byte, s.Len()) + expected := []byte(fmt.Sprintf("%s %d", str, i+1)) + + if _, err := mr.Read(buf); err != nil && err != io.EOF { + t.Fatal(err) + } + + if !bytes.Equal(buf, expected) { + t.Fatalf("expected %q to be %q", string(buf), string(expected)) + } + + pos, err := mr.Seek(0, os.SEEK_CUR) + if err != nil { + t.Fatalf("iteration: %d, error: %v", i+1, err) + } + + // check that the total bytes read is the current position of the seeker + totalBytes += sLen + if pos != totalBytes { + t.Fatalf("expected current position to be: %d, got: %d, iteration: %d", totalBytes, pos, i+1) + } + + // This tests not only that SEEK_SET and SEEK_CUR give the same values, but that the next iteration is in the expected position as well + newPos, err := mr.Seek(pos, os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + if newPos != pos { + t.Fatalf("expected to get same position when calling SEEK_SET with value from SEEK_CUR, cur: %d, set: %d", pos, newPos) + } + } +} + +func TestMultiReadSeekerReadSpanningChunks(t *testing.T) { + str := "hello world" + s1 := strings.NewReader(str + " 1") + s2 := strings.NewReader(str + " 2") + s3 := strings.NewReader(str + " 3") + mr := MultiReadSeeker(s1, s2, s3) + + buf := make([]byte, s1.Len()+3) + _, err := mr.Read(buf) + if err != nil { + t.Fatal(err) + } + + // expected is the contents of s1 + 3 bytes from s2, ie, the `hel` at the end of this string + expected := "hello world 1hel" + if string(buf) != expected { + t.Fatalf("expected %s to be %s", string(buf), expected) + } +} + +func TestMultiReadSeekerNegativeSeek(t *testing.T) { + str := "hello world" + s1 := strings.NewReader(str + " 1") + s2 := strings.NewReader(str + " 2") + s3 := strings.NewReader(str + " 3") + mr := MultiReadSeeker(s1, s2, s3) + + s1Len := s1.Len() + s2Len := s2.Len() + s3Len := s3.Len() + + s, err := mr.Seek(int64(-1*s3.Len()), os.SEEK_END) + if err != nil { + t.Fatal(err) + } + if s != int64(s1Len+s2Len) { + t.Fatalf("expected %d to be %d", s, s1.Len()+s2.Len()) + } + + buf := make([]byte, s3Len) + if _, err := mr.Read(buf); err != nil && err != io.EOF { + t.Fatal(err) + } + expected := fmt.Sprintf("%s %d", str, 3) + if string(buf) != fmt.Sprintf("%s %d", str, 3) { + t.Fatalf("expected %q to be %q", string(buf), expected) + } +} diff --git a/pkg/tailfile/tailfile.go b/pkg/tailfile/tailfile.go index 2ffd36d258..92aea4608e 100644 --- a/pkg/tailfile/tailfile.go +++ b/pkg/tailfile/tailfile.go @@ -3,6 +3,7 @@ package tailfile import ( "bytes" "errors" + "io" "os" ) @@ -12,7 +13,7 @@ var eol = []byte("\n") var ErrNonPositiveLinesNumber = errors.New("Lines number must be positive") //TailFile returns last n lines of file f -func TailFile(f *os.File, n int) ([][]byte, error) { +func TailFile(f io.ReadSeeker, n int) ([][]byte, error) { if n <= 0 { return nil, ErrNonPositiveLinesNumber }