diff --git a/pkg/broadcastwriter/broadcastwriter.go b/pkg/broadcastwriter/broadcastwriter.go index 2ee68944d1..bd9b675553 100644 --- a/pkg/broadcastwriter/broadcastwriter.go +++ b/pkg/broadcastwriter/broadcastwriter.go @@ -8,6 +8,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/jsonlog" + "github.com/docker/docker/pkg/timeutils" ) // BroadcastWriter accumulate multiple io.WriteCloser by stream. @@ -33,7 +34,6 @@ func (w *BroadcastWriter) AddWriter(writer io.WriteCloser, stream string) { // Write writes bytes to all writers. Failed writers will be evicted during // this call. func (w *BroadcastWriter) Write(p []byte) (n int, err error) { - created := time.Now().UTC() w.Lock() if writers, ok := w.streams[""]; ok { for sw := range writers { @@ -42,23 +42,44 @@ func (w *BroadcastWriter) Write(p []byte) (n int, err error) { delete(writers, sw) } } + if len(w.streams) == 1 { + if w.buf.Len() >= 4096 { + w.buf.Reset() + } else { + w.buf.Write(p) + } + w.Unlock() + return len(p), nil + } } if w.jsLogBuf == nil { w.jsLogBuf = new(bytes.Buffer) w.jsLogBuf.Grow(1024) } + var timestamp string + created := time.Now().UTC() w.buf.Write(p) for { - line, err := w.buf.ReadString('\n') - if err != nil { - w.buf.WriteString(line) + if n := w.buf.Len(); n == 0 { break } + i := bytes.IndexByte(w.buf.Bytes(), '\n') + if i < 0 { + break + } + lineBytes := w.buf.Next(i + 1) + if timestamp == "" { + timestamp, err = timeutils.FastMarshalJSON(created) + if err != nil { + continue + } + } + for stream, writers := range w.streams { if stream == "" { continue } - jsonLog := jsonlog.JSONLog{Log: line, Stream: stream, Created: created} + jsonLog := jsonlog.JSONLogBytes{Log: lineBytes, Stream: stream, Created: timestamp} err = jsonLog.MarshalJSONBuf(w.jsLogBuf) if err != nil { logrus.Errorf("Error making JSON log line: %s", err) diff --git a/pkg/broadcastwriter/broadcastwriter_test.go b/pkg/broadcastwriter/broadcastwriter_test.go index 62ca12659a..71227821b2 100644 --- a/pkg/broadcastwriter/broadcastwriter_test.go +++ b/pkg/broadcastwriter/broadcastwriter_test.go @@ -142,3 +142,33 @@ func BenchmarkBroadcastWriter(b *testing.B) { b.StartTimer() } } + +func BenchmarkBroadcastWriterWithoutStdoutStderr(b *testing.B) { + writer := New() + setUpWriter := func() { + for i := 0; i < 100; i++ { + writer.AddWriter(devNullCloser(0), "") + } + } + testLine := "Line that thinks that it is log line from docker" + var buf bytes.Buffer + for i := 0; i < 100; i++ { + buf.Write([]byte(testLine + "\n")) + } + // line without eol + buf.Write([]byte(testLine)) + testText := buf.Bytes() + b.SetBytes(int64(5 * len(testText))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + setUpWriter() + + for j := 0; j < 5; j++ { + if _, err := writer.Write(testText); err != nil { + b.Fatal(err) + } + } + + writer.Clean() + } +} diff --git a/pkg/jsonlog/jsonlogbytes.go b/pkg/jsonlog/jsonlogbytes.go new file mode 100644 index 0000000000..0d8fd9c824 --- /dev/null +++ b/pkg/jsonlog/jsonlogbytes.go @@ -0,0 +1,115 @@ +package jsonlog + +import ( + "bytes" + "unicode/utf8" +) + +// JSONLogBytes is based on JSONLog. +// It allows marshalling JSONLog from Log as []byte +// and an already marshalled Created timestamp. +type JSONLogBytes struct { + Log []byte `json:"log,omitempty"` + Stream string `json:"stream,omitempty"` + Created string `json:"time"` +} + +// MarshalJSONBuf is based on the same method from JSONLog +// It has been modified to take into account the necessary changes. +func (mj *JSONLogBytes) MarshalJSONBuf(buf *bytes.Buffer) error { + var first = true + + buf.WriteString(`{`) + if len(mj.Log) != 0 { + if first == true { + first = false + } else { + buf.WriteString(`,`) + } + buf.WriteString(`"log":`) + ffjson_WriteJsonBytesAsString(buf, mj.Log) + } + if len(mj.Stream) != 0 { + if first == true { + first = false + } else { + buf.WriteString(`,`) + } + buf.WriteString(`"stream":`) + ffjson_WriteJsonString(buf, mj.Stream) + } + if first == true { + first = false + } else { + buf.WriteString(`,`) + } + buf.WriteString(`"time":`) + buf.WriteString(mj.Created) + buf.WriteString(`}`) + return nil +} + +// This is based on ffjson_WriteJsonString. It has been changed +// to accept a string passed as a slice of bytes. +func ffjson_WriteJsonBytesAsString(buf *bytes.Buffer, s []byte) { + const hex = "0123456789abcdef" + + buf.WriteByte('"') + start := 0 + for i := 0; i < len(s); { + if b := s[i]; b < utf8.RuneSelf { + if 0x20 <= b && b != '\\' && b != '"' && b != '<' && b != '>' && b != '&' { + i++ + continue + } + if start < i { + buf.Write(s[start:i]) + } + switch b { + case '\\', '"': + buf.WriteByte('\\') + buf.WriteByte(b) + case '\n': + buf.WriteByte('\\') + buf.WriteByte('n') + case '\r': + buf.WriteByte('\\') + buf.WriteByte('r') + default: + + buf.WriteString(`\u00`) + buf.WriteByte(hex[b>>4]) + buf.WriteByte(hex[b&0xF]) + } + i++ + start = i + continue + } + c, size := utf8.DecodeRune(s[i:]) + if c == utf8.RuneError && size == 1 { + if start < i { + buf.Write(s[start:i]) + } + buf.WriteString(`\ufffd`) + i += size + start = i + continue + } + + if c == '\u2028' || c == '\u2029' { + if start < i { + buf.Write(s[start:i]) + } + buf.WriteString(`\u202`) + buf.WriteByte(hex[c&0xF]) + i += size + start = i + continue + } + i += size + } + if start < len(s) { + buf.Write(s[start:]) + } + buf.WriteByte('"') +}