mirror of https://github.com/containers/podman.git
				
				
				
			
		
			
				
	
	
		
			229 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			229 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Go
		
	
	
	
| //+build linux
 | |
| //+build systemd
 | |
| 
 | |
| package libpod
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"math"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/containers/podman/v3/libpod/define"
 | |
| 	"github.com/containers/podman/v3/libpod/logs"
 | |
| 	journal "github.com/coreos/go-systemd/v22/sdjournal"
 | |
| 	"github.com/pkg/errors"
 | |
| 	"github.com/sirupsen/logrus"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// journaldLogOut is the journald priority signifying stdout
 | |
| 	journaldLogOut = "6"
 | |
| 
 | |
| 	// journaldLogErr is the journald priority signifying stderr
 | |
| 	journaldLogErr = "3"
 | |
| 
 | |
| 	// bufLen is the length of the buffer to read from a k8s-file
 | |
| 	// formatted log line
 | |
| 	// let's set it as 2k just to be safe if k8s-file format ever changes
 | |
| 	bufLen = 16384
 | |
| )
 | |
| 
 | |
| func (c *Container) readFromJournal(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
 | |
| 	var config journal.JournalReaderConfig
 | |
| 	if options.Tail < 0 {
 | |
| 		config.NumFromTail = 0
 | |
| 	} else if options.Tail == 0 {
 | |
| 		config.NumFromTail = math.MaxUint64
 | |
| 	} else {
 | |
| 		config.NumFromTail = uint64(options.Tail)
 | |
| 	}
 | |
| 	if options.Multi {
 | |
| 		config.Formatter = journalFormatterWithID
 | |
| 	} else {
 | |
| 		config.Formatter = journalFormatter
 | |
| 	}
 | |
| 	defaultTime := time.Time{}
 | |
| 	if options.Since != defaultTime {
 | |
| 		// coreos/go-systemd/sdjournal doesn't correctly handle requests for data in the future
 | |
| 		// return nothing instead of falsely printing
 | |
| 		if time.Now().Before(options.Since) {
 | |
| 			return nil
 | |
| 		}
 | |
| 		// coreos/go-systemd/sdjournal expects a negative time.Duration for times in the past
 | |
| 		config.Since = -time.Since(options.Since)
 | |
| 	}
 | |
| 	config.Matches = append(config.Matches, journal.Match{
 | |
| 		Field: "CONTAINER_ID_FULL",
 | |
| 		Value: c.ID(),
 | |
| 	})
 | |
| 	options.WaitGroup.Add(1)
 | |
| 
 | |
| 	r, err := journal.NewJournalReader(config)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if r == nil {
 | |
| 		return errors.Errorf("journal reader creation failed")
 | |
| 	}
 | |
| 	if options.Tail == math.MaxInt64 {
 | |
| 		r.Rewind()
 | |
| 	}
 | |
| 	state, err := c.State()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if options.Follow && state == define.ContainerStateRunning {
 | |
| 		go func() {
 | |
| 			done := make(chan bool)
 | |
| 			until := make(chan time.Time)
 | |
| 			go func() {
 | |
| 				select {
 | |
| 				case <-ctx.Done():
 | |
| 					until <- time.Time{}
 | |
| 				case <-done:
 | |
| 					// nothing to do anymore
 | |
| 				}
 | |
| 			}()
 | |
| 			go func() {
 | |
| 				// FIXME (#10323): we are facing a terrible
 | |
| 				// race condition here. At the time the
 | |
| 				// container dies and `c.Wait()` has returned,
 | |
| 				// we may not have received all journald logs.
 | |
| 				// So far there is no other way than waiting
 | |
| 				// for a second.  Ultimately, `r.Follow` is
 | |
| 				// racy and we may have to implement our custom
 | |
| 				// logic here.
 | |
| 				c.Wait(ctx)
 | |
| 				time.Sleep(time.Second)
 | |
| 				until <- time.Time{}
 | |
| 			}()
 | |
| 			follower := journaldFollowBuffer{logChannel, options.Multi}
 | |
| 			err := r.Follow(until, follower)
 | |
| 			if err != nil {
 | |
| 				logrus.Debugf(err.Error())
 | |
| 			}
 | |
| 			r.Close()
 | |
| 			options.WaitGroup.Done()
 | |
| 			done <- true
 | |
| 			return
 | |
| 		}()
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	go func() {
 | |
| 		bytes := make([]byte, bufLen)
 | |
| 		// /me complains about no do-while in go
 | |
| 		ec, err := r.Read(bytes)
 | |
| 		for ec != 0 && err == nil {
 | |
| 			// because we are reusing bytes, we need to make
 | |
| 			// sure the old data doesn't get into the new line
 | |
| 			bytestr := string(bytes[:ec])
 | |
| 			logLine, err2 := logs.NewJournaldLogLine(bytestr, options.Multi)
 | |
| 			if err2 != nil {
 | |
| 				logrus.Error(err2)
 | |
| 				continue
 | |
| 			}
 | |
| 			logChannel <- logLine
 | |
| 			ec, err = r.Read(bytes)
 | |
| 		}
 | |
| 		if err != nil && err != io.EOF {
 | |
| 			logrus.Error(err)
 | |
| 		}
 | |
| 		r.Close()
 | |
| 		options.WaitGroup.Done()
 | |
| 	}()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func journalFormatterWithID(entry *journal.JournalEntry) (string, error) {
 | |
| 	output, err := formatterPrefix(entry)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 
 | |
| 	id, ok := entry.Fields["CONTAINER_ID_FULL"]
 | |
| 	if !ok {
 | |
| 		return "", fmt.Errorf("no CONTAINER_ID_FULL field present in journal entry")
 | |
| 	}
 | |
| 	if len(id) > 12 {
 | |
| 		id = id[:12]
 | |
| 	}
 | |
| 	output += fmt.Sprintf("%s ", id)
 | |
| 	// Append message
 | |
| 	msg, err := formatterMessage(entry)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	output += msg
 | |
| 	return output, nil
 | |
| }
 | |
| 
 | |
| func journalFormatter(entry *journal.JournalEntry) (string, error) {
 | |
| 	output, err := formatterPrefix(entry)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	// Append message
 | |
| 	msg, err := formatterMessage(entry)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	output += msg
 | |
| 	return output, nil
 | |
| }
 | |
| 
 | |
| func formatterPrefix(entry *journal.JournalEntry) (string, error) {
 | |
| 	usec := entry.RealtimeTimestamp
 | |
| 	tsString := time.Unix(0, int64(usec)*int64(time.Microsecond)).Format(logs.LogTimeFormat)
 | |
| 	output := fmt.Sprintf("%s ", tsString)
 | |
| 	priority, ok := entry.Fields["PRIORITY"]
 | |
| 	if !ok {
 | |
| 		return "", errors.Errorf("no PRIORITY field present in journal entry")
 | |
| 	}
 | |
| 	if priority == journaldLogOut {
 | |
| 		output += "stdout "
 | |
| 	} else if priority == journaldLogErr {
 | |
| 		output += "stderr "
 | |
| 	} else {
 | |
| 		return "", errors.Errorf("unexpected PRIORITY field in journal entry")
 | |
| 	}
 | |
| 
 | |
| 	// if CONTAINER_PARTIAL_MESSAGE is defined, the log type is "P"
 | |
| 	if _, ok := entry.Fields["CONTAINER_PARTIAL_MESSAGE"]; ok {
 | |
| 		output += fmt.Sprintf("%s ", logs.PartialLogType)
 | |
| 	} else {
 | |
| 		output += fmt.Sprintf("%s ", logs.FullLogType)
 | |
| 	}
 | |
| 
 | |
| 	return output, nil
 | |
| }
 | |
| 
 | |
| func formatterMessage(entry *journal.JournalEntry) (string, error) {
 | |
| 	// Finally, append the message
 | |
| 	msg, ok := entry.Fields["MESSAGE"]
 | |
| 	if !ok {
 | |
| 		return "", fmt.Errorf("no MESSAGE field present in journal entry")
 | |
| 	}
 | |
| 	msg = strings.TrimSuffix(msg, "\n")
 | |
| 	return msg, nil
 | |
| }
 | |
| 
 | |
| type journaldFollowBuffer struct {
 | |
| 	logChannel chan *logs.LogLine
 | |
| 	withID     bool
 | |
| }
 | |
| 
 | |
| func (f journaldFollowBuffer) Write(p []byte) (int, error) {
 | |
| 	bytestr := string(p)
 | |
| 	logLine, err := logs.NewJournaldLogLine(bytestr, f.withID)
 | |
| 	if err != nil {
 | |
| 		return -1, err
 | |
| 	}
 | |
| 	f.logChannel <- logLine
 | |
| 	return len(p), nil
 | |
| }
 |