247 lines
		
	
	
		
			6.9 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			247 lines
		
	
	
		
			6.9 KiB
		
	
	
	
		
			Go
		
	
	
	
//go:build systemd
 | 
						|
// +build systemd
 | 
						|
 | 
						|
package events
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"encoding/json"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"strconv"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/containers/podman/v4/pkg/util"
 | 
						|
	"github.com/coreos/go-systemd/v22/journal"
 | 
						|
	"github.com/coreos/go-systemd/v22/sdjournal"
 | 
						|
	"github.com/sirupsen/logrus"
 | 
						|
)
 | 
						|
 | 
						|
// DefaultEventerType is journald when systemd is available
 | 
						|
const DefaultEventerType = Journald
 | 
						|
 | 
						|
// EventJournalD is the journald implementation of an eventer
 | 
						|
type EventJournalD struct {
 | 
						|
	options EventerOptions
 | 
						|
}
 | 
						|
 | 
						|
// newEventJournalD creates a new journald Eventer
 | 
						|
func newEventJournalD(options EventerOptions) (Eventer, error) {
 | 
						|
	return EventJournalD{options}, nil
 | 
						|
}
 | 
						|
 | 
						|
// Write to journald
 | 
						|
func (e EventJournalD) Write(ee Event) error {
 | 
						|
	m := make(map[string]string)
 | 
						|
	m["SYSLOG_IDENTIFIER"] = "podman"
 | 
						|
	m["PODMAN_EVENT"] = ee.Status.String()
 | 
						|
	m["PODMAN_TYPE"] = ee.Type.String()
 | 
						|
	m["PODMAN_TIME"] = ee.Time.Format(time.RFC3339Nano)
 | 
						|
 | 
						|
	// Add specialized information based on the podman type
 | 
						|
	switch ee.Type {
 | 
						|
	case Image:
 | 
						|
		m["PODMAN_NAME"] = ee.Name
 | 
						|
		m["PODMAN_ID"] = ee.ID
 | 
						|
	case Container, Pod:
 | 
						|
		m["PODMAN_IMAGE"] = ee.Image
 | 
						|
		m["PODMAN_NAME"] = ee.Name
 | 
						|
		m["PODMAN_ID"] = ee.ID
 | 
						|
		if ee.ContainerExitCode != 0 {
 | 
						|
			m["PODMAN_EXIT_CODE"] = strconv.Itoa(ee.ContainerExitCode)
 | 
						|
		}
 | 
						|
		// If we have container labels, we need to convert them to a string so they
 | 
						|
		// can be recorded with the event
 | 
						|
		if len(ee.Details.Attributes) > 0 {
 | 
						|
			b, err := json.Marshal(ee.Details.Attributes)
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
			m["PODMAN_LABELS"] = string(b)
 | 
						|
		}
 | 
						|
		m["PODMAN_HEALTH_STATUS"] = ee.HealthStatus
 | 
						|
	case Network:
 | 
						|
		m["PODMAN_ID"] = ee.ID
 | 
						|
		m["PODMAN_NETWORK_NAME"] = ee.Network
 | 
						|
	case Volume:
 | 
						|
		m["PODMAN_NAME"] = ee.Name
 | 
						|
	}
 | 
						|
	return journal.Send(ee.ToHumanReadable(false), journal.PriInfo, m)
 | 
						|
}
 | 
						|
 | 
						|
// Read reads events from the journal and sends qualified events to the event channel
 | 
						|
func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
 | 
						|
	defer close(options.EventChannel)
 | 
						|
	filterMap, err := generateEventFilters(options.Filters, options.Since, options.Until)
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("failed to parse event filters: %w", err)
 | 
						|
	}
 | 
						|
 | 
						|
	var untilTime time.Time
 | 
						|
	if len(options.Until) > 0 {
 | 
						|
		untilTime, err = util.ParseInputTime(options.Until, false)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	j, err := sdjournal.NewJournal()
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	defer func() {
 | 
						|
		if err := j.Close(); err != nil {
 | 
						|
			logrus.Errorf("Unable to close journal :%v", err)
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	// match only podman journal entries
 | 
						|
	podmanJournal := sdjournal.Match{Field: "SYSLOG_IDENTIFIER", Value: "podman"}
 | 
						|
	if err := j.AddMatch(podmanJournal.String()); err != nil {
 | 
						|
		return fmt.Errorf("failed to add journal filter for event log: %w", err)
 | 
						|
	}
 | 
						|
 | 
						|
	if len(options.Since) == 0 && len(options.Until) == 0 && options.Stream {
 | 
						|
		if err := j.SeekTail(); err != nil {
 | 
						|
			return fmt.Errorf("failed to seek end of journal: %w", err)
 | 
						|
		}
 | 
						|
		// After SeekTail calling Next moves to a random entry.
 | 
						|
		// To prevent this we have to call Previous first.
 | 
						|
		// see: https://bugs.freedesktop.org/show_bug.cgi?id=64614
 | 
						|
		if _, err := j.Previous(); err != nil {
 | 
						|
			return fmt.Errorf("failed to move journal cursor to previous entry: %w", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	for {
 | 
						|
		entry, err := getNextEntry(ctx, j, options.Stream, untilTime)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		// no entry == we hit the end
 | 
						|
		if entry == nil {
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
 | 
						|
		newEvent, err := newEventFromJournalEntry(entry)
 | 
						|
		if err != nil {
 | 
						|
			// We can't decode this event.
 | 
						|
			// Don't fail hard - that would make events unusable.
 | 
						|
			// Instead, log and continue.
 | 
						|
			if !errors.Is(err, ErrEventTypeBlank) {
 | 
						|
				logrus.Errorf("Unable to decode event: %v", err)
 | 
						|
			}
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if applyFilters(newEvent, filterMap) {
 | 
						|
			options.EventChannel <- newEvent
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func newEventFromJournalEntry(entry *sdjournal.JournalEntry) (*Event, error) {
 | 
						|
	newEvent := Event{}
 | 
						|
	eventType, err := StringToType(entry.Fields["PODMAN_TYPE"])
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	eventTime, err := time.Parse(time.RFC3339Nano, entry.Fields["PODMAN_TIME"])
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	eventStatus, err := StringToStatus(entry.Fields["PODMAN_EVENT"])
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	newEvent.Type = eventType
 | 
						|
	newEvent.Time = eventTime
 | 
						|
	newEvent.Status = eventStatus
 | 
						|
	newEvent.Name = entry.Fields["PODMAN_NAME"]
 | 
						|
 | 
						|
	switch eventType {
 | 
						|
	case Container, Pod:
 | 
						|
		newEvent.ID = entry.Fields["PODMAN_ID"]
 | 
						|
		newEvent.Image = entry.Fields["PODMAN_IMAGE"]
 | 
						|
		if code, ok := entry.Fields["PODMAN_EXIT_CODE"]; ok {
 | 
						|
			intCode, err := strconv.Atoi(code)
 | 
						|
			if err != nil {
 | 
						|
				logrus.Errorf("Parsing event exit code %s", code)
 | 
						|
			} else {
 | 
						|
				newEvent.ContainerExitCode = intCode
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		// we need to check for the presence of labels recorded to a container event
 | 
						|
		if stringLabels, ok := entry.Fields["PODMAN_LABELS"]; ok && len(stringLabels) > 0 {
 | 
						|
			labels := make(map[string]string, 0)
 | 
						|
			if err := json.Unmarshal([]byte(stringLabels), &labels); err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
 | 
						|
			// if we have labels, add them to the event
 | 
						|
			if len(labels) > 0 {
 | 
						|
				newEvent.Details = Details{Attributes: labels}
 | 
						|
			}
 | 
						|
		}
 | 
						|
		newEvent.HealthStatus = entry.Fields["PODMAN_HEALTH_STATUS"]
 | 
						|
	case Network:
 | 
						|
		newEvent.ID = entry.Fields["PODMAN_ID"]
 | 
						|
		newEvent.Network = entry.Fields["PODMAN_NETWORK_NAME"]
 | 
						|
	case Image:
 | 
						|
		newEvent.ID = entry.Fields["PODMAN_ID"]
 | 
						|
	}
 | 
						|
	return &newEvent, nil
 | 
						|
}
 | 
						|
 | 
						|
// String returns a string representation of the logger
 | 
						|
func (e EventJournalD) String() string {
 | 
						|
	return Journald.String()
 | 
						|
}
 | 
						|
 | 
						|
// getNextEntry returns the next entry in the journal. If the end  of the
 | 
						|
// journal is reached and stream is not set or the current time is after
 | 
						|
// the until time this function return nil,nil.
 | 
						|
func getNextEntry(ctx context.Context, j *sdjournal.Journal, stream bool, untilTime time.Time) (*sdjournal.JournalEntry, error) {
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-ctx.Done():
 | 
						|
			// the consumer has cancelled
 | 
						|
			return nil, nil
 | 
						|
		default:
 | 
						|
			// fallthrough
 | 
						|
		}
 | 
						|
		// the api requires a next|prev before reading the event
 | 
						|
		ret, err := j.Next()
 | 
						|
		if err != nil {
 | 
						|
			return nil, fmt.Errorf("failed to move journal cursor to next entry: %w", err)
 | 
						|
		}
 | 
						|
		// ret == 0 equals EOF, see sd_journal_next(3)
 | 
						|
		if ret == 0 {
 | 
						|
			if !stream || (!untilTime.IsZero() && time.Now().After(untilTime)) {
 | 
						|
				// we hit the end and should not keep streaming
 | 
						|
				return nil, nil
 | 
						|
			}
 | 
						|
			// keep waiting for the next entry
 | 
						|
			// j.Wait() is blocking, this would cause the goroutine to hang forever
 | 
						|
			// if no more journal entries are generated and thus if the client
 | 
						|
			// has closed the connection in the meantime to leak memory.
 | 
						|
			// Waiting only 5 seconds makes sure we can check if the client closed in the
 | 
						|
			// meantime at least every 5 seconds.
 | 
						|
			t := 5 * time.Second
 | 
						|
			if !untilTime.IsZero() {
 | 
						|
				until := time.Until(untilTime)
 | 
						|
				if until < t {
 | 
						|
					t = until
 | 
						|
				}
 | 
						|
			}
 | 
						|
			_ = j.Wait(t)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		entry, err := j.GetEntry()
 | 
						|
		if err != nil {
 | 
						|
			return nil, fmt.Errorf("failed to read journal entry: %w", err)
 | 
						|
		}
 | 
						|
		return entry, nil
 | 
						|
	}
 | 
						|
}
 |