mirror of https://github.com/containers/podman.git
				
				
				
			Merge pull request #10431 from vrothberg/journald-logs
journald logger: fix race condition
This commit is contained in:
		
						commit
						5b4ffc7ba7
					
				|  | @ -6,14 +6,12 @@ package libpod | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"io" |  | ||||||
| 	"math" |  | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
| 	"github.com/containers/podman/v3/libpod/define" | 	"github.com/containers/podman/v3/libpod/events" | ||||||
| 	"github.com/containers/podman/v3/libpod/logs" | 	"github.com/containers/podman/v3/libpod/logs" | ||||||
| 	journal "github.com/coreos/go-systemd/v22/sdjournal" | 	"github.com/coreos/go-systemd/v22/sdjournal" | ||||||
| 	"github.com/pkg/errors" | 	"github.com/pkg/errors" | ||||||
| 	"github.com/sirupsen/logrus" | 	"github.com/sirupsen/logrus" | ||||||
| ) | ) | ||||||
|  | @ -24,122 +22,187 @@ const ( | ||||||
| 
 | 
 | ||||||
| 	// journaldLogErr is the journald priority signifying stderr
 | 	// journaldLogErr is the journald priority signifying stderr
 | ||||||
| 	journaldLogErr = "3" | 	journaldLogErr = "3" | ||||||
| 
 |  | ||||||
| 	// bufLen is the length of the buffer to read from a k8s-file
 |  | ||||||
| 	// formatted log line
 |  | ||||||
| 	// let's set it as 2k just to be safe if k8s-file format ever changes
 |  | ||||||
| 	bufLen = 16384 |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func (c *Container) readFromJournal(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error { | func (c *Container) readFromJournal(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error { | ||||||
| 	var config journal.JournalReaderConfig | 	journal, err := sdjournal.NewJournal() | ||||||
| 	if options.Tail < 0 { | 	if err != nil { | ||||||
| 		config.NumFromTail = 0 | 		return err | ||||||
| 	} else if options.Tail == 0 { |  | ||||||
| 		config.NumFromTail = math.MaxUint64 |  | ||||||
| 	} else { |  | ||||||
| 		config.NumFromTail = uint64(options.Tail) |  | ||||||
| 	} | 	} | ||||||
| 	if options.Multi { | 	// While logs are written to the `logChannel`, we inspect each event
 | ||||||
| 		config.Formatter = journalFormatterWithID | 	// and stop once the container has died.  Having logs and events in one
 | ||||||
| 	} else { | 	// stream prevents a race condition that we faced in #10323.
 | ||||||
| 		config.Formatter = journalFormatter | 
 | ||||||
|  | 	// Add the filters for events.
 | ||||||
|  | 	match := sdjournal.Match{Field: "SYSLOG_IDENTIFIER", Value: "podman"} | ||||||
|  | 	if err := journal.AddMatch(match.String()); err != nil { | ||||||
|  | 		return errors.Wrapf(err, "adding filter to journald logger: %v", match) | ||||||
| 	} | 	} | ||||||
| 	defaultTime := time.Time{} | 	match = sdjournal.Match{Field: "PODMAN_ID", Value: c.ID()} | ||||||
| 	if options.Since != defaultTime { | 	if err := journal.AddMatch(match.String()); err != nil { | ||||||
| 		// coreos/go-systemd/sdjournal doesn't correctly handle requests for data in the future
 | 		return errors.Wrapf(err, "adding filter to journald logger: %v", match) | ||||||
| 		// return nothing instead of falsely printing
 | 	} | ||||||
| 		if time.Now().Before(options.Since) { | 
 | ||||||
| 			return nil | 	// Add the filter for logs.  Note the disjunction so that we match
 | ||||||
|  | 	// either the events or the logs.
 | ||||||
|  | 	if err := journal.AddDisjunction(); err != nil { | ||||||
|  | 		return errors.Wrap(err, "adding filter disjunction to journald logger") | ||||||
|  | 	} | ||||||
|  | 	match = sdjournal.Match{Field: "CONTAINER_ID_FULL", Value: c.ID()} | ||||||
|  | 	if err := journal.AddMatch(match.String()); err != nil { | ||||||
|  | 		return errors.Wrapf(err, "adding filter to journald logger: %v", match) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if err := journal.SeekHead(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	// API requires Next() immediately after SeekHead().
 | ||||||
|  | 	if _, err := journal.Next(); err != nil { | ||||||
|  | 		return errors.Wrap(err, "initial journal cursor") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// API requires a next|prev before getting a cursor.
 | ||||||
|  | 	if _, err := journal.Previous(); err != nil { | ||||||
|  | 		return errors.Wrap(err, "initial journal cursor") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// Note that the initial cursor may not yet be ready, so we'll do an
 | ||||||
|  | 	// exponential backoff.
 | ||||||
|  | 	var cursor string | ||||||
|  | 	var cursorError error | ||||||
|  | 	for i := 1; i <= 3; i++ { | ||||||
|  | 		cursor, cursorError = journal.GetCursor() | ||||||
|  | 		if err != nil { | ||||||
|  | 			continue | ||||||
| 		} | 		} | ||||||
| 		// coreos/go-systemd/sdjournal expects a negative time.Duration for times in the past
 | 		time.Sleep(time.Duration(i*100) * time.Millisecond) | ||||||
| 		config.Since = -time.Since(options.Since) | 		break | ||||||
| 	} | 	} | ||||||
| 	config.Matches = append(config.Matches, journal.Match{ | 	if cursorError != nil { | ||||||
| 		Field: "CONTAINER_ID_FULL", | 		return errors.Wrap(cursorError, "inital journal cursor") | ||||||
| 		Value: c.ID(), | 	} | ||||||
| 	}) | 
 | ||||||
|  | 	// We need the container's events in the same journal to guarantee
 | ||||||
|  | 	// consistency, see #10323.
 | ||||||
|  | 	if options.Follow && c.runtime.config.Engine.EventsLogger != "journald" { | ||||||
|  | 		return errors.Errorf("using --follow with the journald --log-driver but without the journald --events-backend (%s) is not supported", c.runtime.config.Engine.EventsLogger) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	options.WaitGroup.Add(1) | 	options.WaitGroup.Add(1) | ||||||
| 
 |  | ||||||
| 	r, err := journal.NewJournalReader(config) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
| 	if r == nil { |  | ||||||
| 		return errors.Errorf("journal reader creation failed") |  | ||||||
| 	} |  | ||||||
| 	if options.Tail == math.MaxInt64 { |  | ||||||
| 		r.Rewind() |  | ||||||
| 	} |  | ||||||
| 	state, err := c.State() |  | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if options.Follow && state == define.ContainerStateRunning { |  | ||||||
| 		go func() { |  | ||||||
| 			done := make(chan bool) |  | ||||||
| 			until := make(chan time.Time) |  | ||||||
| 			go func() { |  | ||||||
| 				select { |  | ||||||
| 				case <-ctx.Done(): |  | ||||||
| 					until <- time.Time{} |  | ||||||
| 				case <-done: |  | ||||||
| 					// nothing to do anymore
 |  | ||||||
| 				} |  | ||||||
| 			}() |  | ||||||
| 			go func() { |  | ||||||
| 				// FIXME (#10323): we are facing a terrible
 |  | ||||||
| 				// race condition here. At the time the
 |  | ||||||
| 				// container dies and `c.Wait()` has returned,
 |  | ||||||
| 				// we may not have received all journald logs.
 |  | ||||||
| 				// So far there is no other way than waiting
 |  | ||||||
| 				// for a second.  Ultimately, `r.Follow` is
 |  | ||||||
| 				// racy and we may have to implement our custom
 |  | ||||||
| 				// logic here.
 |  | ||||||
| 				c.Wait(ctx) |  | ||||||
| 				time.Sleep(time.Second) |  | ||||||
| 				until <- time.Time{} |  | ||||||
| 			}() |  | ||||||
| 			follower := journaldFollowBuffer{logChannel, options.Multi} |  | ||||||
| 			err := r.Follow(until, follower) |  | ||||||
| 			if err != nil { |  | ||||||
| 				logrus.Debugf(err.Error()) |  | ||||||
| 			} |  | ||||||
| 			r.Close() |  | ||||||
| 			options.WaitGroup.Done() |  | ||||||
| 			done <- true |  | ||||||
| 			return |  | ||||||
| 		}() |  | ||||||
| 		return nil |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	go func() { | 	go func() { | ||||||
| 		bytes := make([]byte, bufLen) | 		defer func() { | ||||||
| 		// /me complains about no do-while in go
 | 			options.WaitGroup.Done() | ||||||
| 		ec, err := r.Read(bytes) | 			if err := journal.Close(); err != nil { | ||||||
| 		for ec != 0 && err == nil { | 				logrus.Errorf("Unable to close journal: %v", err) | ||||||
| 			// because we are reusing bytes, we need to make
 | 			} | ||||||
| 			// sure the old data doesn't get into the new line
 | 		}() | ||||||
| 			bytestr := string(bytes[:ec]) | 
 | ||||||
| 			logLine, err2 := logs.NewJournaldLogLine(bytestr, options.Multi) | 		afterTimeStamp := false        // needed for options.Since
 | ||||||
| 			if err2 != nil { | 		tailQueue := []*logs.LogLine{} // needed for options.Tail
 | ||||||
| 				logrus.Error(err2) | 		doTail := options.Tail > 0 | ||||||
|  | 		for { | ||||||
|  | 			select { | ||||||
|  | 			case <-ctx.Done(): | ||||||
|  | 				// Remote client may have closed/lost the connection.
 | ||||||
|  | 				return | ||||||
|  | 			default: | ||||||
|  | 				// Fallthrough
 | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			if _, err := journal.Next(); err != nil { | ||||||
|  | 				logrus.Errorf("Failed to move journal cursor to next entry: %v", err) | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 			latestCursor, err := journal.GetCursor() | ||||||
|  | 			if err != nil { | ||||||
|  | 				logrus.Errorf("Failed to get journal cursor: %v", err) | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			// Hit the end of the journal.
 | ||||||
|  | 			if cursor == latestCursor { | ||||||
|  | 				if doTail { | ||||||
|  | 					// Flush *once* we hit the end of the journal.
 | ||||||
|  | 					startIndex := int64(len(tailQueue)-1) - options.Tail | ||||||
|  | 					if startIndex < 0 { | ||||||
|  | 						startIndex = 0 | ||||||
|  | 					} | ||||||
|  | 					for i := startIndex; i < int64(len(tailQueue)); i++ { | ||||||
|  | 						logChannel <- tailQueue[i] | ||||||
|  | 					} | ||||||
|  | 					tailQueue = nil | ||||||
|  | 					doTail = false | ||||||
|  | 				} | ||||||
|  | 				// Unless we follow, quit.
 | ||||||
|  | 				if !options.Follow { | ||||||
|  | 					return | ||||||
|  | 				} | ||||||
|  | 				// Sleep until something's happening on the journal.
 | ||||||
|  | 				journal.Wait(sdjournal.IndefiniteWait) | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 			cursor = latestCursor | ||||||
|  | 
 | ||||||
|  | 			entry, err := journal.GetEntry() | ||||||
|  | 			if err != nil { | ||||||
|  | 				logrus.Errorf("Failed to get journal entry: %v", err) | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			if !afterTimeStamp { | ||||||
|  | 				entryTime := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond)) | ||||||
|  | 				if entryTime.Before(options.Since) { | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  | 				afterTimeStamp = true | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			// If we're reading an event and the container exited/died,
 | ||||||
|  | 			// then we're done and can return.
 | ||||||
|  | 			event, ok := entry.Fields["PODMAN_EVENT"] | ||||||
|  | 			if ok { | ||||||
|  | 				status, err := events.StringToStatus(event) | ||||||
|  | 				if err != nil { | ||||||
|  | 					logrus.Errorf("Failed to translate event: %v", err) | ||||||
|  | 					return | ||||||
|  | 				} | ||||||
|  | 				if status == events.Exited { | ||||||
|  | 					return | ||||||
|  | 				} | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			var message string | ||||||
|  | 			var formatError error | ||||||
|  | 
 | ||||||
|  | 			if options.Multi { | ||||||
|  | 				message, formatError = journalFormatterWithID(entry) | ||||||
|  | 			} else { | ||||||
|  | 				message, formatError = journalFormatter(entry) | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			if formatError != nil { | ||||||
|  | 				logrus.Errorf("Failed to parse journald log entry: %v", err) | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			logLine, err := logs.NewJournaldLogLine(message, options.Multi) | ||||||
|  | 			if err != nil { | ||||||
|  | 				logrus.Errorf("Failed parse log line: %v", err) | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 			if doTail { | ||||||
|  | 				tailQueue = append(tailQueue, logLine) | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
| 			logChannel <- logLine | 			logChannel <- logLine | ||||||
| 			ec, err = r.Read(bytes) |  | ||||||
| 		} | 		} | ||||||
| 		if err != nil && err != io.EOF { |  | ||||||
| 			logrus.Error(err) |  | ||||||
| 		} |  | ||||||
| 		r.Close() |  | ||||||
| 		options.WaitGroup.Done() |  | ||||||
| 	}() | 	}() | ||||||
|  | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func journalFormatterWithID(entry *journal.JournalEntry) (string, error) { | func journalFormatterWithID(entry *sdjournal.JournalEntry) (string, error) { | ||||||
| 	output, err := formatterPrefix(entry) | 	output, err := formatterPrefix(entry) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return "", err | 		return "", err | ||||||
|  | @ -162,7 +225,7 @@ func journalFormatterWithID(entry *journal.JournalEntry) (string, error) { | ||||||
| 	return output, nil | 	return output, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func journalFormatter(entry *journal.JournalEntry) (string, error) { | func journalFormatter(entry *sdjournal.JournalEntry) (string, error) { | ||||||
| 	output, err := formatterPrefix(entry) | 	output, err := formatterPrefix(entry) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return "", err | 		return "", err | ||||||
|  | @ -176,7 +239,7 @@ func journalFormatter(entry *journal.JournalEntry) (string, error) { | ||||||
| 	return output, nil | 	return output, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func formatterPrefix(entry *journal.JournalEntry) (string, error) { | func formatterPrefix(entry *sdjournal.JournalEntry) (string, error) { | ||||||
| 	usec := entry.RealtimeTimestamp | 	usec := entry.RealtimeTimestamp | ||||||
| 	tsString := time.Unix(0, int64(usec)*int64(time.Microsecond)).Format(logs.LogTimeFormat) | 	tsString := time.Unix(0, int64(usec)*int64(time.Microsecond)).Format(logs.LogTimeFormat) | ||||||
| 	output := fmt.Sprintf("%s ", tsString) | 	output := fmt.Sprintf("%s ", tsString) | ||||||
|  | @ -202,7 +265,7 @@ func formatterPrefix(entry *journal.JournalEntry) (string, error) { | ||||||
| 	return output, nil | 	return output, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func formatterMessage(entry *journal.JournalEntry) (string, error) { | func formatterMessage(entry *sdjournal.JournalEntry) (string, error) { | ||||||
| 	// Finally, append the message
 | 	// Finally, append the message
 | ||||||
| 	msg, ok := entry.Fields["MESSAGE"] | 	msg, ok := entry.Fields["MESSAGE"] | ||||||
| 	if !ok { | 	if !ok { | ||||||
|  | @ -211,18 +274,3 @@ func formatterMessage(entry *journal.JournalEntry) (string, error) { | ||||||
| 	msg = strings.TrimSuffix(msg, "\n") | 	msg = strings.TrimSuffix(msg, "\n") | ||||||
| 	return msg, nil | 	return msg, nil | ||||||
| } | } | ||||||
| 
 |  | ||||||
| type journaldFollowBuffer struct { |  | ||||||
| 	logChannel chan *logs.LogLine |  | ||||||
| 	withID     bool |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (f journaldFollowBuffer) Write(p []byte) (int, error) { |  | ||||||
| 	bytestr := string(p) |  | ||||||
| 	logLine, err := logs.NewJournaldLogLine(bytestr, f.withID) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return -1, err |  | ||||||
| 	} |  | ||||||
| 	f.logChannel <- logLine |  | ||||||
| 	return len(p), nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
|  | @ -163,7 +163,7 @@ var _ = Describe("Podman logs", func() { | ||||||
| 		}) | 		}) | ||||||
| 
 | 
 | ||||||
| 		It("podman logs on a created container should result in 0 exit code: "+log, func() { | 		It("podman logs on a created container should result in 0 exit code: "+log, func() { | ||||||
| 			session := podmanTest.Podman([]string{"create", "-t", "--name", "log", ALPINE}) | 			session := podmanTest.Podman([]string{"create", "--log-driver", log, "-t", "--name", "log", ALPINE}) | ||||||
| 			session.WaitWithDefaultTimeout() | 			session.WaitWithDefaultTimeout() | ||||||
| 			Expect(session).To(Exit(0)) | 			Expect(session).To(Exit(0)) | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -73,4 +73,56 @@ ${cid[0]} d"   "Sequential output from logs" | ||||||
|     _log_test_multi journald |     _log_test_multi journald | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @test "podman logs - journald log driver requires journald events backend" { | ||||||
|  |     skip_if_remote "remote does not support --events-backend" | ||||||
|  |     # We can't use journald on RHEL as rootless: rhbz#1895105 | ||||||
|  |     skip_if_journald_unavailable | ||||||
|  | 
 | ||||||
|  |     run_podman --events-backend=file run --log-driver=journald -d --name test --replace $IMAGE ls / | ||||||
|  |     run_podman --events-backend=file logs test | ||||||
|  |     run_podman 125 --events-backend=file logs --follow test | ||||||
|  |     is "$output" "Error: using --follow with the journald --log-driver but without the journald --events-backend (file) is not supported" "journald logger requires journald eventer" | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | function _log_test_since() { | ||||||
|  |     local driver=$1 | ||||||
|  | 
 | ||||||
|  |     s_before="before_$(random_string)_${driver}" | ||||||
|  |     s_after="after_$(random_string)_${driver}" | ||||||
|  | 
 | ||||||
|  |     before=$(date --iso-8601=seconds) | ||||||
|  |     run_podman run --log-driver=$driver -d --name test $IMAGE sh -c \ | ||||||
|  |         "echo $s_before; trap 'echo $s_after; exit' SIGTERM; while :; do sleep 1; done" | ||||||
|  | 
 | ||||||
|  |     # sleep a second to make sure the date is after the first echo | ||||||
|  |     sleep 1 | ||||||
|  |     after=$(date --iso-8601=seconds) | ||||||
|  |     run_podman stop test | ||||||
|  | 
 | ||||||
|  |     run_podman logs test | ||||||
|  |     is "$output" \ | ||||||
|  |         "$s_before | ||||||
|  | $s_after" | ||||||
|  | 
 | ||||||
|  |     run_podman logs --since $before test | ||||||
|  |     is "$output" \ | ||||||
|  |         "$s_before | ||||||
|  | $s_after" | ||||||
|  | 
 | ||||||
|  |     run_podman logs --since $after test | ||||||
|  |     is "$output" "$s_after" | ||||||
|  |     run_podman rm -f test | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | @test "podman logs - since k8s-file" { | ||||||
|  |     _log_test_since k8s-file | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | @test "podman logs - since journald" { | ||||||
|  |     # We can't use journald on RHEL as rootless: rhbz#1895105 | ||||||
|  |     skip_if_journald_unavailable | ||||||
|  | 
 | ||||||
|  |     _log_test_since journald | ||||||
|  | } | ||||||
|  | 
 | ||||||
| # vim: filetype=sh | # vim: filetype=sh | ||||||
|  |  | ||||||
|  | @ -8,8 +8,7 @@ load helpers | ||||||
| @test "podman kill - test signal handling in containers" { | @test "podman kill - test signal handling in containers" { | ||||||
|     # Start a container that will handle all signals by emitting 'got: N' |     # Start a container that will handle all signals by emitting 'got: N' | ||||||
|     local -a signals=(1 2 3 4 5 6 8 10 12 13 14 15 16 20 21 22 23 24 25 26 64) |     local -a signals=(1 2 3 4 5 6 8 10 12 13 14 15 16 20 21 22 23 24 25 26 64) | ||||||
|     # Force the k8s-file driver until #10323 is fixed. |     run_podman run -d $IMAGE sh -c \ | ||||||
|     run_podman run --log-driver=k8s-file -d $IMAGE sh -c \ |  | ||||||
|         "for i in ${signals[*]}; do trap \"echo got: \$i\" \$i; done; |         "for i in ${signals[*]}; do trap \"echo got: \$i\" \$i; done; | ||||||
|         echo READY; |         echo READY; | ||||||
|         while ! test -e /stop; do sleep 0.05; done; |         while ! test -e /stop; do sleep 0.05; done; | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue