mirror of https://github.com/docker/docs.git
Merge pull request #8041 from unclejack/lower_allocations_broadcastwriter
lower the number of allocations in broadcastwriter
This commit is contained in:
commit
77d30e7112
|
@ -34,6 +34,7 @@ import (
|
||||||
"github.com/docker/docker/pkg/parsers/filters"
|
"github.com/docker/docker/pkg/parsers/filters"
|
||||||
"github.com/docker/docker/pkg/signal"
|
"github.com/docker/docker/pkg/signal"
|
||||||
"github.com/docker/docker/pkg/term"
|
"github.com/docker/docker/pkg/term"
|
||||||
|
"github.com/docker/docker/pkg/timeutils"
|
||||||
"github.com/docker/docker/pkg/units"
|
"github.com/docker/docker/pkg/units"
|
||||||
"github.com/docker/docker/registry"
|
"github.com/docker/docker/registry"
|
||||||
"github.com/docker/docker/runconfig"
|
"github.com/docker/docker/runconfig"
|
||||||
|
@ -1650,7 +1651,7 @@ func (cli *DockerCli) CmdEvents(args ...string) error {
|
||||||
loc = time.FixedZone(time.Now().Zone())
|
loc = time.FixedZone(time.Now().Zone())
|
||||||
)
|
)
|
||||||
var setTime = func(key, value string) {
|
var setTime = func(key, value string) {
|
||||||
format := utils.RFC3339NanoFixed
|
format := timeutils.RFC3339NanoFixed
|
||||||
if len(value) < len(format) {
|
if len(value) < len(format) {
|
||||||
format = format[:len(value)]
|
format = format[:len(value)]
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,12 +8,11 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/docker/docker/pkg/log"
|
|
||||||
"github.com/docker/docker/pkg/tailfile"
|
|
||||||
|
|
||||||
"github.com/docker/docker/engine"
|
"github.com/docker/docker/engine"
|
||||||
"github.com/docker/docker/pkg/jsonlog"
|
"github.com/docker/docker/pkg/jsonlog"
|
||||||
"github.com/docker/docker/utils"
|
"github.com/docker/docker/pkg/log"
|
||||||
|
"github.com/docker/docker/pkg/tailfile"
|
||||||
|
"github.com/docker/docker/pkg/timeutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (daemon *Daemon) ContainerLogs(job *engine.Job) engine.Status {
|
func (daemon *Daemon) ContainerLogs(job *engine.Job) engine.Status {
|
||||||
|
@ -35,7 +34,7 @@ func (daemon *Daemon) ContainerLogs(job *engine.Job) engine.Status {
|
||||||
return job.Errorf("You must choose at least one stream")
|
return job.Errorf("You must choose at least one stream")
|
||||||
}
|
}
|
||||||
if times {
|
if times {
|
||||||
format = utils.RFC3339NanoFixed
|
format = timeutils.RFC3339NanoFixed
|
||||||
}
|
}
|
||||||
if tail == "" {
|
if tail == "" {
|
||||||
tail = "all"
|
tail = "all"
|
||||||
|
|
|
@ -8,7 +8,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/docker/docker/utils"
|
"github.com/docker/docker/pkg/timeutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
// This used to work, it test a log of PageSize-1 (gh#4851)
|
// This used to work, it test a log of PageSize-1 (gh#4851)
|
||||||
|
@ -104,7 +104,7 @@ func TestLogsTimestamps(t *testing.T) {
|
||||||
|
|
||||||
for _, l := range lines {
|
for _, l := range lines {
|
||||||
if l != "" {
|
if l != "" {
|
||||||
_, err := time.Parse(utils.RFC3339NanoFixed+" ", ts.FindString(l))
|
_, err := time.Parse(timeutils.RFC3339NanoFixed+" ", ts.FindString(l))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to parse timestamp from %v: %v", l, err)
|
t.Fatalf("Failed to parse timestamp from %v: %v", l, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package broadcastwriter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -14,8 +13,9 @@ import (
|
||||||
// BroadcastWriter accumulate multiple io.WriteCloser by stream.
|
// BroadcastWriter accumulate multiple io.WriteCloser by stream.
|
||||||
type BroadcastWriter struct {
|
type BroadcastWriter struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
buf *bytes.Buffer
|
buf *bytes.Buffer
|
||||||
streams map[string](map[io.WriteCloser]struct{})
|
jsLogBuf *bytes.Buffer
|
||||||
|
streams map[string](map[io.WriteCloser]struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddWriter adds new io.WriteCloser for stream.
|
// AddWriter adds new io.WriteCloser for stream.
|
||||||
|
@ -43,6 +43,10 @@ func (w *BroadcastWriter) Write(p []byte) (n int, err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if w.jsLogBuf == nil {
|
||||||
|
w.jsLogBuf = new(bytes.Buffer)
|
||||||
|
w.jsLogBuf.Grow(1024)
|
||||||
|
}
|
||||||
w.buf.Write(p)
|
w.buf.Write(p)
|
||||||
for {
|
for {
|
||||||
line, err := w.buf.ReadString('\n')
|
line, err := w.buf.ReadString('\n')
|
||||||
|
@ -54,19 +58,23 @@ func (w *BroadcastWriter) Write(p []byte) (n int, err error) {
|
||||||
if stream == "" {
|
if stream == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
b, err := json.Marshal(jsonlog.JSONLog{Log: line, Stream: stream, Created: created})
|
jsonLog := jsonlog.JSONLog{Log: line, Stream: stream, Created: created}
|
||||||
|
err = jsonLog.MarshalJSONBuf(w.jsLogBuf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error making JSON log line: %s", err)
|
log.Errorf("Error making JSON log line: %s", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
b = append(b, '\n')
|
w.jsLogBuf.WriteByte('\n')
|
||||||
|
b := w.jsLogBuf.Bytes()
|
||||||
for sw := range writers {
|
for sw := range writers {
|
||||||
if _, err := sw.Write(b); err != nil {
|
if _, err := sw.Write(b); err != nil {
|
||||||
delete(writers, sw)
|
delete(writers, sw)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
w.jsLogBuf.Reset()
|
||||||
}
|
}
|
||||||
|
w.jsLogBuf.Reset()
|
||||||
w.Unlock()
|
w.Unlock()
|
||||||
return len(p), nil
|
return len(p), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,176 @@
|
||||||
|
// This code was initially generated by ffjson <https://github.com/pquerna/ffjson>
|
||||||
|
// This code was generated via the following steps:
|
||||||
|
// $ go get -u github.com/pquerna/ffjson
|
||||||
|
// $ make shell BINDDIR=.
|
||||||
|
// $ ffjson pkg/jsonlog/jsonlog.go
|
||||||
|
// $ mv pkg/jsonglog/jsonlog_ffjson.go pkg/jsonlog/jsonlog_marshalling.go
|
||||||
|
//
|
||||||
|
// It has been modified to improve the performance of time marshalling to JSON
|
||||||
|
// and to clean it up.
|
||||||
|
// Should this code need to be regenerated when the JSONLog struct is changed,
|
||||||
|
// the relevant changes which have been made are:
|
||||||
|
// import (
|
||||||
|
// "bytes"
|
||||||
|
//-
|
||||||
|
// "unicode/utf8"
|
||||||
|
//+
|
||||||
|
//+ "github.com/docker/docker/pkg/timeutils"
|
||||||
|
// )
|
||||||
|
//
|
||||||
|
// func (mj *JSONLog) MarshalJSON() ([]byte, error) {
|
||||||
|
//@@ -20,13 +16,13 @@ func (mj *JSONLog) MarshalJSON() ([]byte, error) {
|
||||||
|
// }
|
||||||
|
// return buf.Bytes(), nil
|
||||||
|
// }
|
||||||
|
//+
|
||||||
|
// func (mj *JSONLog) MarshalJSONBuf(buf *bytes.Buffer) error {
|
||||||
|
//- var err error
|
||||||
|
//- var obj []byte
|
||||||
|
//- var first bool = true
|
||||||
|
//- _ = obj
|
||||||
|
//- _ = err
|
||||||
|
//- _ = first
|
||||||
|
//+ var (
|
||||||
|
//+ err error
|
||||||
|
//+ timestamp string
|
||||||
|
//+ first bool = true
|
||||||
|
//+ )
|
||||||
|
// buf.WriteString(`{`)
|
||||||
|
// if len(mj.Log) != 0 {
|
||||||
|
// if first == true {
|
||||||
|
//@@ -52,11 +48,11 @@ func (mj *JSONLog) MarshalJSONBuf(buf *bytes.Buffer) error {
|
||||||
|
// buf.WriteString(`,`)
|
||||||
|
// }
|
||||||
|
// buf.WriteString(`"time":`)
|
||||||
|
//- obj, err = mj.Created.MarshalJSON()
|
||||||
|
//+ timestamp, err = timeutils.FastMarshalJSON(mj.Created)
|
||||||
|
// if err != nil {
|
||||||
|
// return err
|
||||||
|
// }
|
||||||
|
//- buf.Write(obj)
|
||||||
|
//+ buf.WriteString(timestamp)
|
||||||
|
// buf.WriteString(`}`)
|
||||||
|
// return nil
|
||||||
|
// }
|
||||||
|
|
||||||
|
package jsonlog
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"unicode/utf8"
|
||||||
|
|
||||||
|
"github.com/docker/docker/pkg/timeutils"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (mj *JSONLog) MarshalJSON() ([]byte, error) {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
buf.Grow(1024)
|
||||||
|
err := mj.MarshalJSONBuf(&buf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return buf.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mj *JSONLog) MarshalJSONBuf(buf *bytes.Buffer) error {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
timestamp string
|
||||||
|
first bool = true
|
||||||
|
)
|
||||||
|
buf.WriteString(`{`)
|
||||||
|
if len(mj.Log) != 0 {
|
||||||
|
if first == true {
|
||||||
|
first = false
|
||||||
|
} else {
|
||||||
|
buf.WriteString(`,`)
|
||||||
|
}
|
||||||
|
buf.WriteString(`"log":`)
|
||||||
|
ffjson_WriteJsonString(buf, mj.Log)
|
||||||
|
}
|
||||||
|
if len(mj.Stream) != 0 {
|
||||||
|
if first == true {
|
||||||
|
first = false
|
||||||
|
} else {
|
||||||
|
buf.WriteString(`,`)
|
||||||
|
}
|
||||||
|
buf.WriteString(`"stream":`)
|
||||||
|
ffjson_WriteJsonString(buf, mj.Stream)
|
||||||
|
}
|
||||||
|
if first == true {
|
||||||
|
first = false
|
||||||
|
} else {
|
||||||
|
buf.WriteString(`,`)
|
||||||
|
}
|
||||||
|
buf.WriteString(`"time":`)
|
||||||
|
timestamp, err = timeutils.FastMarshalJSON(mj.Created)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
buf.WriteString(timestamp)
|
||||||
|
buf.WriteString(`}`)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ffjson_WriteJsonString(buf *bytes.Buffer, s string) {
|
||||||
|
const hex = "0123456789abcdef"
|
||||||
|
|
||||||
|
buf.WriteByte('"')
|
||||||
|
start := 0
|
||||||
|
for i := 0; i < len(s); {
|
||||||
|
if b := s[i]; b < utf8.RuneSelf {
|
||||||
|
if 0x20 <= b && b != '\\' && b != '"' && b != '<' && b != '>' && b != '&' {
|
||||||
|
i++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if start < i {
|
||||||
|
buf.WriteString(s[start:i])
|
||||||
|
}
|
||||||
|
switch b {
|
||||||
|
case '\\', '"':
|
||||||
|
buf.WriteByte('\\')
|
||||||
|
buf.WriteByte(b)
|
||||||
|
case '\n':
|
||||||
|
buf.WriteByte('\\')
|
||||||
|
buf.WriteByte('n')
|
||||||
|
case '\r':
|
||||||
|
buf.WriteByte('\\')
|
||||||
|
buf.WriteByte('r')
|
||||||
|
default:
|
||||||
|
|
||||||
|
buf.WriteString(`\u00`)
|
||||||
|
buf.WriteByte(hex[b>>4])
|
||||||
|
buf.WriteByte(hex[b&0xF])
|
||||||
|
}
|
||||||
|
i++
|
||||||
|
start = i
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
c, size := utf8.DecodeRuneInString(s[i:])
|
||||||
|
if c == utf8.RuneError && size == 1 {
|
||||||
|
if start < i {
|
||||||
|
buf.WriteString(s[start:i])
|
||||||
|
}
|
||||||
|
buf.WriteString(`\ufffd`)
|
||||||
|
i += size
|
||||||
|
start = i
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if c == '\u2028' || c == '\u2029' {
|
||||||
|
if start < i {
|
||||||
|
buf.WriteString(s[start:i])
|
||||||
|
}
|
||||||
|
buf.WriteString(`\u202`)
|
||||||
|
buf.WriteByte(hex[c&0xF])
|
||||||
|
i += size
|
||||||
|
start = i
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
i += size
|
||||||
|
}
|
||||||
|
if start < len(s) {
|
||||||
|
buf.WriteString(s[start:])
|
||||||
|
}
|
||||||
|
buf.WriteByte('"')
|
||||||
|
}
|
|
@ -0,0 +1 @@
|
||||||
|
Cristian Staretu <cristian.staretu@gmail.com> (@unclejack)
|
|
@ -0,0 +1,23 @@
|
||||||
|
package timeutils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Define our own version of RFC339Nano because we want one
|
||||||
|
// that pads the nano seconds part with zeros to ensure
|
||||||
|
// the timestamps are aligned in the logs.
|
||||||
|
RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
|
||||||
|
JSONFormat = `"` + time.RFC3339Nano + `"`
|
||||||
|
)
|
||||||
|
|
||||||
|
func FastMarshalJSON(t time.Time) (string, error) {
|
||||||
|
if y := t.Year(); y < 0 || y >= 10000 {
|
||||||
|
// RFC 3339 is clear that years are 4 digits exactly.
|
||||||
|
// See golang.org/issue/4556#c15 for more discussion.
|
||||||
|
return "", errors.New("Time.MarshalJSON: year outside of range [0,9999]")
|
||||||
|
}
|
||||||
|
return t.Format(JSONFormat), nil
|
||||||
|
}
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/docker/docker/pkg/term"
|
"github.com/docker/docker/pkg/term"
|
||||||
|
"github.com/docker/docker/pkg/timeutils"
|
||||||
"github.com/docker/docker/pkg/units"
|
"github.com/docker/docker/pkg/units"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -100,7 +101,7 @@ func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if jm.Time != 0 {
|
if jm.Time != 0 {
|
||||||
fmt.Fprintf(out, "%s ", time.Unix(jm.Time, 0).Format(RFC3339NanoFixed))
|
fmt.Fprintf(out, "%s ", time.Unix(jm.Time, 0).Format(timeutils.RFC3339NanoFixed))
|
||||||
}
|
}
|
||||||
if jm.ID != "" {
|
if jm.ID != "" {
|
||||||
fmt.Fprintf(out, "%s: ", jm.ID)
|
fmt.Fprintf(out, "%s: ", jm.ID)
|
||||||
|
|
|
@ -24,13 +24,6 @@ import (
|
||||||
"github.com/docker/docker/pkg/log"
|
"github.com/docker/docker/pkg/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
// Define our own version of RFC339Nano because we want one
|
|
||||||
// that pads the nano seconds part with zeros to ensure
|
|
||||||
// the timestamps are aligned in the logs.
|
|
||||||
RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
|
|
||||||
)
|
|
||||||
|
|
||||||
type KeyValuePair struct {
|
type KeyValuePair struct {
|
||||||
Key string
|
Key string
|
||||||
Value string
|
Value string
|
||||||
|
|
Loading…
Reference in New Issue