Merge pull request #11930 from unclejack/broadcastwriter_refactor

pkg/broadcastwriter: use []byte to lower alloc
This commit is contained in:
Michael Crosby 2015-03-31 16:47:23 -07:00
commit e736f16bbd
3 changed files with 171 additions and 5 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/timeutils"
)
// BroadcastWriter accumulate multiple io.WriteCloser by stream.
@ -33,7 +34,6 @@ func (w *BroadcastWriter) AddWriter(writer io.WriteCloser, stream string) {
// Write writes bytes to all writers. Failed writers will be evicted during
// this call.
func (w *BroadcastWriter) Write(p []byte) (n int, err error) {
created := time.Now().UTC()
w.Lock()
if writers, ok := w.streams[""]; ok {
for sw := range writers {
@ -42,23 +42,44 @@ func (w *BroadcastWriter) Write(p []byte) (n int, err error) {
delete(writers, sw)
}
}
if len(w.streams) == 1 {
if w.buf.Len() >= 4096 {
w.buf.Reset()
} else {
w.buf.Write(p)
}
w.Unlock()
return len(p), nil
}
}
if w.jsLogBuf == nil {
w.jsLogBuf = new(bytes.Buffer)
w.jsLogBuf.Grow(1024)
}
var timestamp string
created := time.Now().UTC()
w.buf.Write(p)
for {
line, err := w.buf.ReadString('\n')
if err != nil {
w.buf.WriteString(line)
if n := w.buf.Len(); n == 0 {
break
}
i := bytes.IndexByte(w.buf.Bytes(), '\n')
if i < 0 {
break
}
lineBytes := w.buf.Next(i + 1)
if timestamp == "" {
timestamp, err = timeutils.FastMarshalJSON(created)
if err != nil {
continue
}
}
for stream, writers := range w.streams {
if stream == "" {
continue
}
jsonLog := jsonlog.JSONLog{Log: line, Stream: stream, Created: created}
jsonLog := jsonlog.JSONLogBytes{Log: lineBytes, Stream: stream, Created: timestamp}
err = jsonLog.MarshalJSONBuf(w.jsLogBuf)
if err != nil {
logrus.Errorf("Error making JSON log line: %s", err)

View File

@ -142,3 +142,33 @@ func BenchmarkBroadcastWriter(b *testing.B) {
b.StartTimer()
}
}
func BenchmarkBroadcastWriterWithoutStdoutStderr(b *testing.B) {
writer := New()
setUpWriter := func() {
for i := 0; i < 100; i++ {
writer.AddWriter(devNullCloser(0), "")
}
}
testLine := "Line that thinks that it is log line from docker"
var buf bytes.Buffer
for i := 0; i < 100; i++ {
buf.Write([]byte(testLine + "\n"))
}
// line without eol
buf.Write([]byte(testLine))
testText := buf.Bytes()
b.SetBytes(int64(5 * len(testText)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
setUpWriter()
for j := 0; j < 5; j++ {
if _, err := writer.Write(testText); err != nil {
b.Fatal(err)
}
}
writer.Clean()
}
}

115
pkg/jsonlog/jsonlogbytes.go Normal file
View File

@ -0,0 +1,115 @@
package jsonlog
import (
"bytes"
"unicode/utf8"
)
// JSONLogBytes is based on JSONLog.
// It allows marshalling JSONLog from Log as []byte
// and an already marshalled Created timestamp.
type JSONLogBytes struct {
Log []byte `json:"log,omitempty"`
Stream string `json:"stream,omitempty"`
Created string `json:"time"`
}
// MarshalJSONBuf is based on the same method from JSONLog
// It has been modified to take into account the necessary changes.
func (mj *JSONLogBytes) MarshalJSONBuf(buf *bytes.Buffer) error {
var first = true
buf.WriteString(`{`)
if len(mj.Log) != 0 {
if first == true {
first = false
} else {
buf.WriteString(`,`)
}
buf.WriteString(`"log":`)
ffjson_WriteJsonBytesAsString(buf, mj.Log)
}
if len(mj.Stream) != 0 {
if first == true {
first = false
} else {
buf.WriteString(`,`)
}
buf.WriteString(`"stream":`)
ffjson_WriteJsonString(buf, mj.Stream)
}
if first == true {
first = false
} else {
buf.WriteString(`,`)
}
buf.WriteString(`"time":`)
buf.WriteString(mj.Created)
buf.WriteString(`}`)
return nil
}
// This is based on ffjson_WriteJsonString. It has been changed
// to accept a string passed as a slice of bytes.
func ffjson_WriteJsonBytesAsString(buf *bytes.Buffer, s []byte) {
const hex = "0123456789abcdef"
buf.WriteByte('"')
start := 0
for i := 0; i < len(s); {
if b := s[i]; b < utf8.RuneSelf {
if 0x20 <= b && b != '\\' && b != '"' && b != '<' && b != '>' && b != '&' {
i++
continue
}
if start < i {
buf.Write(s[start:i])
}
switch b {
case '\\', '"':
buf.WriteByte('\\')
buf.WriteByte(b)
case '\n':
buf.WriteByte('\\')
buf.WriteByte('n')
case '\r':
buf.WriteByte('\\')
buf.WriteByte('r')
default:
buf.WriteString(`\u00`)
buf.WriteByte(hex[b>>4])
buf.WriteByte(hex[b&0xF])
}
i++
start = i
continue
}
c, size := utf8.DecodeRune(s[i:])
if c == utf8.RuneError && size == 1 {
if start < i {
buf.Write(s[start:i])
}
buf.WriteString(`\ufffd`)
i += size
start = i
continue
}
if c == '\u2028' || c == '\u2029' {
if start < i {
buf.Write(s[start:i])
}
buf.WriteString(`\u202`)
buf.WriteByte(hex[c&0xF])
i += size
start = i
continue
}
i += size
}
if start < len(s) {
buf.Write(s[start:])
}
buf.WriteByte('"')
}