Remove the logger.Message ContainerID field

Log drivers are instantiated on a per-container basis, and passed the
container ID (along with other information) when they're initialized.
Drivers that care about that value are caching the value that they're
passed when they're initialized and using it in favor of the value
contained in Message structures that are passed to them, so the field in
Messages is unused, so we remove it.

Signed-off-by: Nalin Dahyabhai <nalin@redhat.com>
This commit is contained in:
Nalin Dahyabhai 2016-05-31 15:46:55 -04:00
parent 0b5e84cc8d
commit 7772d270c0
6 changed files with 17 additions and 40 deletions

View File

@ -14,8 +14,6 @@ import (
// ContainerID and Timestamp. // ContainerID and Timestamp.
// Writes are concurrent, so you need implement some sync in your logger // Writes are concurrent, so you need implement some sync in your logger
type Copier struct { type Copier struct {
// cid is the container id for which we are copying logs
cid string
// srcs is map of name -> reader pairs, for example "stdout", "stderr" // srcs is map of name -> reader pairs, for example "stdout", "stderr"
srcs map[string]io.Reader srcs map[string]io.Reader
dst Logger dst Logger
@ -24,9 +22,8 @@ type Copier struct {
} }
// NewCopier creates a new Copier // NewCopier creates a new Copier
func NewCopier(cid string, srcs map[string]io.Reader, dst Logger) *Copier { func NewCopier(srcs map[string]io.Reader, dst Logger) *Copier {
return &Copier{ return &Copier{
cid: cid,
srcs: srcs, srcs: srcs,
dst: dst, dst: dst,
closed: make(chan struct{}), closed: make(chan struct{}),
@ -56,7 +53,7 @@ func (c *Copier) copySrc(name string, src io.Reader) {
// ReadBytes can return full or partial output even when it failed. // ReadBytes can return full or partial output even when it failed.
// e.g. it can return a full entry and EOF. // e.g. it can return a full entry and EOF.
if err == nil || len(line) > 0 { if err == nil || len(line) > 0 {
if logErr := c.dst.Log(&Message{ContainerID: c.cid, Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil { if logErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr) logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
} }
} }

View File

@ -28,19 +28,6 @@ func (l *TestLoggerJSON) Close() error { return nil }
func (l *TestLoggerJSON) Name() string { return "json" } func (l *TestLoggerJSON) Name() string { return "json" }
type TestLoggerText struct {
*bytes.Buffer
}
func (l *TestLoggerText) Log(m *Message) error {
_, err := l.WriteString(m.ContainerID + " " + m.Source + " " + string(m.Line) + "\n")
return err
}
func (l *TestLoggerText) Close() error { return nil }
func (l *TestLoggerText) Name() string { return "text" }
func TestCopier(t *testing.T) { func TestCopier(t *testing.T) {
stdoutLine := "Line that thinks that it is log line from docker stdout" stdoutLine := "Line that thinks that it is log line from docker stdout"
stderrLine := "Line that thinks that it is log line from docker stderr" stderrLine := "Line that thinks that it is log line from docker stderr"
@ -59,8 +46,7 @@ func TestCopier(t *testing.T) {
jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)} jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}
cid := "a7317399f3f857173c6179d44823594f8294678dea9999662e5c625b5a1c7657" c := NewCopier(
c := NewCopier(cid,
map[string]io.Reader{ map[string]io.Reader{
"stdout": &stdout, "stdout": &stdout,
"stderr": &stderr, "stderr": &stderr,
@ -89,9 +75,6 @@ func TestCopier(t *testing.T) {
if msg.Source != "stdout" && msg.Source != "stderr" { if msg.Source != "stdout" && msg.Source != "stderr" {
t.Fatalf("Wrong Source: %q, should be %q or %q", msg.Source, "stdout", "stderr") t.Fatalf("Wrong Source: %q, should be %q or %q", msg.Source, "stdout", "stderr")
} }
if msg.ContainerID != cid {
t.Fatalf("Wrong ContainerID: %q, expected %q", msg.ContainerID, cid)
}
if msg.Source == "stdout" { if msg.Source == "stdout" {
if string(msg.Line) != stdoutLine { if string(msg.Line) != stdoutLine {
t.Fatalf("Wrong Line: %q, expected %q", msg.Line, stdoutLine) t.Fatalf("Wrong Line: %q, expected %q", msg.Line, stdoutLine)
@ -118,8 +101,7 @@ func TestCopierSlow(t *testing.T) {
//encoder := &encodeCloser{Encoder: json.NewEncoder(&jsonBuf)} //encoder := &encodeCloser{Encoder: json.NewEncoder(&jsonBuf)}
jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf), delay: 100 * time.Millisecond} jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf), delay: 100 * time.Millisecond}
cid := "a7317399f3f857173c6179d44823594f8294678dea9999662e5c625b5a1c7657" c := NewCopier(map[string]io.Reader{"stdout": &stdout}, jsonLog)
c := NewCopier(cid, map[string]io.Reader{"stdout": &stdout}, jsonLog)
c.Run() c.Run()
wait := make(chan struct{}) wait := make(chan struct{})
go func() { go func() {

View File

@ -157,8 +157,7 @@ drain:
source = "stdout" source = "stdout"
} }
// Send the log message. // Send the log message.
cid := s.vars["CONTAINER_ID_FULL"] logWatcher.Msg <- &logger.Message{Line: line, Source: source, Timestamp: timestamp}
logWatcher.Msg <- &logger.Message{ContainerID: cid, Line: line, Source: source, Timestamp: timestamp}
} }
// If we're at the end of the journal, we're done (for now). // If we're at the end of the journal, we're done (for now).
if C.sd_journal_next(j) <= 0 { if C.sd_journal_next(j) <= 0 {

View File

@ -31,13 +31,13 @@ func TestJSONFileLogger(t *testing.T) {
} }
defer l.Close() defer l.Close()
if err := l.Log(&logger.Message{ContainerID: cid, Line: []byte("line1"), Source: "src1"}); err != nil { if err := l.Log(&logger.Message{Line: []byte("line1"), Source: "src1"}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := l.Log(&logger.Message{ContainerID: cid, Line: []byte("line2"), Source: "src2"}); err != nil { if err := l.Log(&logger.Message{Line: []byte("line2"), Source: "src2"}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := l.Log(&logger.Message{ContainerID: cid, Line: []byte("line3"), Source: "src3"}); err != nil { if err := l.Log(&logger.Message{Line: []byte("line3"), Source: "src3"}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
res, err := ioutil.ReadFile(filename) res, err := ioutil.ReadFile(filename)
@ -72,7 +72,7 @@ func BenchmarkJSONFileLogger(b *testing.B) {
defer l.Close() defer l.Close()
testLine := "Line that thinks that it is log line from docker\n" testLine := "Line that thinks that it is log line from docker\n"
msg := &logger.Message{ContainerID: cid, Line: []byte(testLine), Source: "stderr", Timestamp: time.Now().UTC()} msg := &logger.Message{Line: []byte(testLine), Source: "stderr", Timestamp: time.Now().UTC()}
jsonlog, err := (&jsonlog.JSONLog{Log: string(msg.Line) + "\n", Stream: msg.Source, Created: msg.Timestamp}).MarshalJSON() jsonlog, err := (&jsonlog.JSONLog{Log: string(msg.Line) + "\n", Stream: msg.Source, Created: msg.Timestamp}).MarshalJSON()
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
@ -107,7 +107,7 @@ func TestJSONFileLoggerWithOpts(t *testing.T) {
} }
defer l.Close() defer l.Close()
for i := 0; i < 20; i++ { for i := 0; i < 20; i++ {
if err := l.Log(&logger.Message{ContainerID: cid, Line: []byte("line" + strconv.Itoa(i)), Source: "src1"}); err != nil { if err := l.Log(&logger.Message{Line: []byte("line" + strconv.Itoa(i)), Source: "src1"}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -172,7 +172,7 @@ func TestJSONFileLoggerWithLabelsEnv(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
defer l.Close() defer l.Close()
if err := l.Log(&logger.Message{ContainerID: cid, Line: []byte("line"), Source: "src1"}); err != nil { if err := l.Log(&logger.Message{Line: []byte("line"), Source: "src1"}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
res, err := ioutil.ReadFile(filename) res, err := ioutil.ReadFile(filename)
@ -218,7 +218,7 @@ func BenchmarkJSONFileLoggerWithReader(b *testing.B) {
b.Fatal(err) b.Fatal(err)
} }
defer l.Close() defer l.Close()
msg := &logger.Message{ContainerID: cid, Line: []byte("line"), Source: "src1"} msg := &logger.Message{Line: []byte("line"), Source: "src1"}
jsonlog, err := (&jsonlog.JSONLog{Log: string(msg.Line) + "\n", Stream: msg.Source, Created: msg.Timestamp}).MarshalJSON() jsonlog, err := (&jsonlog.JSONLog{Log: string(msg.Line) + "\n", Stream: msg.Source, Created: msg.Timestamp}).MarshalJSON()
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)

View File

@ -27,11 +27,10 @@ const (
// Message is datastructure that represents record from some container. // Message is datastructure that represents record from some container.
type Message struct { type Message struct {
ContainerID string Line []byte
Line []byte Source string
Source string Timestamp time.Time
Timestamp time.Time Attrs LogAttributes
Attrs LogAttributes
} }
// LogAttributes is used to hold the extra attributes available in the log message // LogAttributes is used to hold the extra attributes available in the log message

View File

@ -124,7 +124,7 @@ func (daemon *Daemon) StartLogging(container *container.Container) error {
return fmt.Errorf("Failed to initialize logging driver: %v", err) return fmt.Errorf("Failed to initialize logging driver: %v", err)
} }
copier := logger.NewCopier(container.ID, map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l) copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l)
container.LogCopier = copier container.LogCopier = copier
copier.Run() copier.Run()
container.LogDriver = l container.LogDriver = l