Merge pull request #21311 from nalind/journal-race

Fix a couple of races in the journald log reader
This commit is contained in:
David Calavera 2016-03-18 08:53:21 -07:00
commit 8ef56e1f98
1 changed files with 20 additions and 5 deletions

View File

@ -173,6 +173,9 @@ drain:
} }
func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor string) { func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor string) {
s.readers.mu.Lock()
s.readers.readers[logWatcher] = logWatcher
s.readers.mu.Unlock()
go func() { go func() {
// Keep copying journal data out until we're notified to stop. // Keep copying journal data out until we're notified to stop.
for C.wait_for_data_or_close(j, pfd[0]) == 1 { for C.wait_for_data_or_close(j, pfd[0]) == 1 {
@ -183,10 +186,9 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.Re
s.readers.mu.Lock() s.readers.mu.Lock()
delete(s.readers.readers, logWatcher) delete(s.readers.readers, logWatcher)
s.readers.mu.Unlock() s.readers.mu.Unlock()
C.sd_journal_close(j)
close(logWatcher.Msg)
}() }()
s.readers.mu.Lock()
s.readers.readers[logWatcher] = logWatcher
s.readers.mu.Unlock()
// Wait until we're told to stop. // Wait until we're told to stop.
select { select {
case <-logWatcher.WatchClose(): case <-logWatcher.WatchClose():
@ -203,14 +205,24 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
var pipes [2]C.int var pipes [2]C.int
cursor := "" cursor := ""
defer close(logWatcher.Msg)
// Get a handle to the journal. // Get a handle to the journal.
rc := C.sd_journal_open(&j, C.int(0)) rc := C.sd_journal_open(&j, C.int(0))
if rc != 0 { if rc != 0 {
logWatcher.Err <- fmt.Errorf("error opening journal") logWatcher.Err <- fmt.Errorf("error opening journal")
close(logWatcher.Msg)
return return
} }
defer C.sd_journal_close(j) // If we end up following the log, we can set the journal context
// pointer and the channel pointer to nil so that we won't close them
// here, potentially while the goroutine that uses them is still
// running. Otherwise, close them when we return from this function.
following := false
defer func(pfollowing *bool) {
if !*pfollowing {
C.sd_journal_close(j)
close(logWatcher.Msg)
}
}(&following)
// Remove limits on the size of data items that we'll retrieve. // Remove limits on the size of data items that we'll retrieve.
rc = C.sd_journal_set_data_threshold(j, C.size_t(0)) rc = C.sd_journal_set_data_threshold(j, C.size_t(0))
if rc != 0 { if rc != 0 {
@ -286,6 +298,9 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe") logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe")
} else { } else {
s.followJournal(logWatcher, config, j, pipes, cursor) s.followJournal(logWatcher, config, j, pipes, cursor)
// Let followJournal handle freeing the journal context
// object and closing the channel.
following = true
} }
} }
return return