rework event code to improve API errors

One of the problems with the Events() API was that you had to call it in
a new goroutine. This meant the the error returned by it had to be read
back via a second channel. This cuased other bugs in the past but here
the biggest problem is that basic errors such as invalid since/until
options were not directly returned to the caller.
It meant in the API we were not able to write http code 200 quickly
because we always waited for the first event or error from the
channels. This in turn made some clients not happy as they assume the
server hangs on time out if no such events are generated.

To fix this we resturcture the entire event flow. First we spawn the
goroutine inside the eventer Read() function so not all the callers have
to. Then we can return the basic error quickly without the goroutine.
The caller then checks the error like any normal function and the API
can use this one to decide which status code to return.
Second we now return errors/event in one channel then the callers can
decide to ignore or log them which makes it a bit more clear.

Fixes c46884aa93 ("podman events: check for an error after we finish reading events")
Fixes #23712

Signed-off-by: Paul Holzinger <pholzing@redhat.com>
This commit is contained in:
Paul Holzinger 2024-10-29 15:26:45 +01:00
parent e3abf5c9e8
commit 768ad8653a
No known key found for this signature in database
GPG Key ID: EB145DD938A3CAF2
11 changed files with 196 additions and 209 deletions

View File

@ -12,6 +12,7 @@ import (
"github.com/containers/podman/v5/cmd/podman/validate" "github.com/containers/podman/v5/cmd/podman/validate"
"github.com/containers/podman/v5/libpod/events" "github.com/containers/podman/v5/libpod/events"
"github.com/containers/podman/v5/pkg/domain/entities" "github.com/containers/podman/v5/pkg/domain/entities"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -139,9 +140,8 @@ func eventsCmd(cmd *cobra.Command, _ []string) error {
if len(eventOptions.Since) > 0 || len(eventOptions.Until) > 0 { if len(eventOptions.Since) > 0 || len(eventOptions.Until) > 0 {
eventOptions.FromStart = true eventOptions.FromStart = true
} }
eventChannel := make(chan *events.Event, 1) eventChannel := make(chan events.ReadResult, 1)
eventOptions.EventChan = eventChannel eventOptions.EventChan = eventChannel
errChannel := make(chan error)
var ( var (
rpt *report.Formatter rpt *report.Formatter
@ -161,40 +161,31 @@ func eventsCmd(cmd *cobra.Command, _ []string) error {
} }
} }
go func() { err := registry.ContainerEngine().Events(context.Background(), eventOptions)
errChannel <- registry.ContainerEngine().Events(context.Background(), eventOptions) if err != nil {
close(errChannel) return err
}() }
for { for evt := range eventChannel {
select { if evt.Error != nil {
case event, ok := <-eventChannel: logrus.Errorf("Failed to read event: %v", evt.Error)
if !ok { continue
// channel was closed we can exit
// read the error channel blocking to make sure we are not missing any errors (#23165)
return <-errChannel
} }
switch { switch {
case doJSON: case doJSON:
e := newEventFromLibpodEvent(event) e := newEventFromLibpodEvent(evt.Event)
jsonStr, err := e.ToJSONString() jsonStr, err := e.ToJSONString()
if err != nil { if err != nil {
return err return err
} }
fmt.Println(jsonStr) fmt.Println(jsonStr)
case cmd.Flags().Changed("format"): case cmd.Flags().Changed("format"):
if err := rpt.Execute(newEventFromLibpodEvent(event)); err != nil { if err := rpt.Execute(newEventFromLibpodEvent(evt.Event)); err != nil {
return err return err
} }
default: default:
fmt.Println(event.ToHumanReadable(!noTrunc)) fmt.Println(evt.Event.ToHumanReadable(!noTrunc))
}
case err := <-errChannel:
// only exit in case of an error,
// otherwise keep reading events until the event channel is closed
if err != nil {
return err
}
} }
} }
return nil
} }

View File

@ -6,7 +6,6 @@ import (
"context" "context"
"fmt" "fmt"
"path/filepath" "path/filepath"
"sync"
"github.com/containers/podman/v5/libpod/define" "github.com/containers/podman/v5/libpod/define"
"github.com/containers/podman/v5/libpod/events" "github.com/containers/podman/v5/libpod/events"
@ -187,7 +186,7 @@ func (r *Runtime) Events(ctx context.Context, options events.ReadOptions) error
// GetEvents reads the event log and returns events based on input filters // GetEvents reads the event log and returns events based on input filters
func (r *Runtime) GetEvents(ctx context.Context, filters []string) ([]*events.Event, error) { func (r *Runtime) GetEvents(ctx context.Context, filters []string) ([]*events.Event, error) {
eventChannel := make(chan *events.Event) eventChannel := make(chan events.ReadResult)
options := events.ReadOptions{ options := events.ReadOptions{
EventChannel: eventChannel, EventChannel: eventChannel,
Filters: filters, Filters: filters,
@ -195,45 +194,21 @@ func (r *Runtime) GetEvents(ctx context.Context, filters []string) ([]*events.Ev
Stream: false, Stream: false,
} }
logEvents := make([]*events.Event, 0, len(eventChannel)) err := r.eventer.Read(ctx, options)
readLock := sync.Mutex{}
readLock.Lock()
go func() {
for e := range eventChannel {
logEvents = append(logEvents, e)
}
readLock.Unlock()
}()
readErr := r.eventer.Read(ctx, options)
readLock.Lock() // Wait for the events to be consumed.
return logEvents, readErr
}
// GetLastContainerEvent takes a container name or ID and an event status and returns
// the last occurrence of the container event
func (r *Runtime) GetLastContainerEvent(ctx context.Context, nameOrID string, containerEvent events.Status) (*events.Event, error) {
// FIXME: events should be read in reverse order!
// https://github.com/containers/podman/issues/14579
// check to make sure the event.Status is valid
if _, err := events.StringToStatus(containerEvent.String()); err != nil {
return nil, err
}
filters := []string{
fmt.Sprintf("container=%s", nameOrID),
fmt.Sprintf("event=%s", containerEvent),
"type=container",
}
containerEvents, err := r.GetEvents(ctx, filters)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(containerEvents) < 1 {
return nil, fmt.Errorf("%s not found: %w", containerEvent.String(), events.ErrEventNotFound) logEvents := make([]*events.Event, 0, len(eventChannel))
for evt := range eventChannel {
// we ignore any error here, this is only used on the backup
// GetExecDiedEvent() died path as best effort anyway
if evt.Error == nil {
logEvents = append(logEvents, evt.Event)
} }
// return the last element in the slice }
return containerEvents[len(containerEvents)-1], nil
return logEvents, nil
} }
// GetExecDiedEvent takes a container name or ID, exec session ID, and returns // GetExecDiedEvent takes a container name or ID, exec session ID, and returns

View File

@ -85,10 +85,15 @@ type Eventer interface {
String() string String() string
} }
type ReadResult struct {
Event *Event
Error error
}
// ReadOptions describe the attributes needed to read event logs // ReadOptions describe the attributes needed to read event logs
type ReadOptions struct { type ReadOptions struct {
// EventChannel is the comm path back to user // EventChannel is the comm path back to user
EventChannel chan *Event EventChannel chan ReadResult
// Filters are key/value pairs that describe to limit output // Filters are key/value pairs that describe to limit output
Filters []string Filters []string
// FromStart means you start reading from the start of the logs // FromStart means you start reading from the start of the logs

View File

@ -7,6 +7,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"runtime"
"strconv" "strconv"
"time" "time"
@ -97,8 +98,9 @@ func (e EventJournalD) Write(ee Event) error {
} }
// Read reads events from the journal and sends qualified events to the event channel // Read reads events from the journal and sends qualified events to the event channel
func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error { func (e EventJournalD) Read(ctx context.Context, options ReadOptions) (retErr error) {
defer close(options.EventChannel) runtime.LockOSThread()
defer runtime.UnlockOSThread()
filterMap, err := generateEventFilters(options.Filters, options.Since, options.Until) filterMap, err := generateEventFilters(options.Filters, options.Since, options.Until)
if err != nil { if err != nil {
return fmt.Errorf("failed to parse event filters: %w", err) return fmt.Errorf("failed to parse event filters: %w", err)
@ -117,13 +119,15 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
return err return err
} }
defer func() { defer func() {
if retErr != nil {
if err := j.Close(); err != nil { if err := j.Close(); err != nil {
logrus.Errorf("Unable to close journal :%v", err) logrus.Errorf("Unable to close journal :%v", err)
} }
}
}() }()
err = j.SetDataThreshold(0) err = j.SetDataThreshold(0)
if err != nil { if err != nil {
logrus.Warnf("cannot set data threshold: %v", err) return fmt.Errorf("cannot set data threshold for journal: %v", err)
} }
// match only podman journal entries // match only podman journal entries
podmanJournal := sdjournal.Match{Field: "SYSLOG_IDENTIFIER", Value: "podman"} podmanJournal := sdjournal.Match{Field: "SYSLOG_IDENTIFIER", Value: "podman"}
@ -158,14 +162,22 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
} }
} }
go func() {
defer close(options.EventChannel)
defer func() {
if err := j.Close(); err != nil {
logrus.Errorf("Unable to close journal :%v", err)
}
}()
for { for {
entry, err := GetNextEntry(ctx, j, options.Stream, untilTime) entry, err := GetNextEntry(ctx, j, options.Stream, untilTime)
if err != nil { if err != nil {
return err options.EventChannel <- ReadResult{Error: err}
break
} }
// no entry == we hit the end // no entry == we hit the end
if entry == nil { if entry == nil {
return nil break
} }
newEvent, err := newEventFromJournalEntry(entry) newEvent, err := newEventFromJournalEntry(entry)
@ -174,14 +186,16 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
// Don't fail hard - that would make events unusable. // Don't fail hard - that would make events unusable.
// Instead, log and continue. // Instead, log and continue.
if !errors.Is(err, ErrEventTypeBlank) { if !errors.Is(err, ErrEventTypeBlank) {
logrus.Errorf("Unable to decode event: %v", err) options.EventChannel <- ReadResult{Error: fmt.Errorf("unable to decode event: %v", err)}
} }
continue continue
} }
if applyFilters(newEvent, filterMap) { if applyFilters(newEvent, filterMap) {
options.EventChannel <- newEvent options.EventChannel <- ReadResult{Event: newEvent}
} }
} }
}()
return nil
} }
func newEventFromJournalEntry(entry *sdjournal.JournalEntry) (*Event, error) { func newEventFromJournalEntry(entry *sdjournal.JournalEntry) (*Event, error) {

View File

@ -108,7 +108,6 @@ func (e EventLogFile) readRotateEvent(event *Event) (begin bool, end bool, err e
// Reads from the log file // Reads from the log file
func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error { func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error {
defer close(options.EventChannel)
filterMap, err := generateEventFilters(options.Filters, options.Since, options.Until) filterMap, err := generateEventFilters(options.Filters, options.Since, options.Until)
if err != nil { if err != nil {
return fmt.Errorf("failed to parse event filters: %w", err) return fmt.Errorf("failed to parse event filters: %w", err)
@ -148,6 +147,8 @@ func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error {
return err return err
} }
go func() {
defer close(options.EventChannel)
var line *tail.Line var line *tail.Line
var ok bool var ok bool
var skipRotate bool var skipRotate bool
@ -156,18 +157,20 @@ func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error {
case <-ctx.Done(): case <-ctx.Done():
// the consumer has cancelled // the consumer has cancelled
t.Kill(errors.New("hangup by client")) t.Kill(errors.New("hangup by client"))
return nil return
case line, ok = <-t.Lines: case line, ok = <-t.Lines:
if !ok { if !ok {
// channel was closed // channel was closed
return nil return
} }
// fallthrough // fallthrough
} }
event, err := newEventFromJSONString(line.Text) event, err := newEventFromJSONString(line.Text)
if err != nil { if err != nil {
return err err := fmt.Errorf("event type %s is not valid in %s", event.Type.String(), e.options.LogFilePath)
options.EventChannel <- ReadResult{Error: err}
continue
} }
switch event.Type { switch event.Type {
case Image, Volume, Pod, Container, Network: case Image, Volume, Pod, Container, Network:
@ -175,7 +178,8 @@ func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error {
case System: case System:
begin, end, err := e.readRotateEvent(event) begin, end, err := e.readRotateEvent(event)
if err != nil { if err != nil {
return err options.EventChannel <- ReadResult{Error: err}
continue
} }
if begin && event.Time.After(readTime) { if begin && event.Time.After(readTime) {
// If the rotation event happened _after_ we // If the rotation event happened _after_ we
@ -189,15 +193,19 @@ func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error {
skipRotate = false skipRotate = false
} }
default: default:
return fmt.Errorf("event type %s is not valid in %s", event.Type.String(), e.options.LogFilePath) err := fmt.Errorf("event type %s is not valid in %s", event.Type.String(), e.options.LogFilePath)
options.EventChannel <- ReadResult{Error: err}
continue
} }
if skipRotate { if skipRotate {
continue continue
} }
if applyFilters(event, filterMap) { if applyFilters(event, filterMap) {
options.EventChannel <- event options.EventChannel <- ReadResult{Event: event}
} }
} }
}()
return nil
} }
// String returns a string representation of the logger // String returns a string representation of the logger

View File

@ -48,11 +48,8 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
utils.Error(w, http.StatusBadRequest, fmt.Errorf("failed to parse filters for %s: %w", r.URL.String(), err)) utils.Error(w, http.StatusBadRequest, fmt.Errorf("failed to parse filters for %s: %w", r.URL.String(), err))
return return
} }
eventChannel := make(chan *events.Event) eventChannel := make(chan events.ReadResult)
errorChannel := make(chan error)
// Start reading events.
go func() {
readOpts := events.ReadOptions{ readOpts := events.ReadOptions{
FromStart: fromStart, FromStart: fromStart,
Stream: query.Stream, Stream: query.Stream,
@ -61,8 +58,11 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
Since: query.Since, Since: query.Since,
Until: query.Until, Until: query.Until,
} }
errorChannel <- runtime.Events(r.Context(), readOpts) err = runtime.Events(r.Context(), readOpts)
}() if err != nil {
utils.InternalServerError(w, err)
return
}
flush := func() {} flush := func() {}
if flusher, ok := w.(http.Flusher); ok { if flusher, ok := w.(http.Flusher); ok {
@ -70,31 +70,29 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
wroteContent := false
defer func() {
if !wroteContent {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
flush() flush()
}
}()
coder := json.NewEncoder(w) coder := json.NewEncoder(w)
coder.SetEscapeHTML(true) coder.SetEscapeHTML(true)
for { for {
select { select {
case err := <-errorChannel: case <-r.Context().Done():
if err != nil {
utils.InternalServerError(w, err)
wroteContent = true
}
return return
case evt := <-eventChannel: case evt, ok := <-eventChannel:
if evt == nil { if !ok {
return
}
if evt.Error != nil {
logrus.Errorf("Unable to read event: %q", err)
continue
}
if evt.Event == nil {
continue continue
} }
e := entities.ConvertToEntitiesEvent(*evt) e := entities.ConvertToEntitiesEvent(*evt.Event)
// Some events differ between Libpod and Docker endpoints. // Some events differ between Libpod and Docker endpoints.
// Handle these differences for Docker-compat. // Handle these differences for Docker-compat.
if !utils.IsLibpodRequest(r) && e.Type == "image" && e.Status == "remove" { if !utils.IsLibpodRequest(r) && e.Type == "image" && e.Status == "remove" {
@ -110,10 +108,7 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
if err := coder.Encode(e); err != nil { if err := coder.Encode(e); err != nil {
logrus.Errorf("Unable to write json: %q", err) logrus.Errorf("Unable to write json: %q", err)
} }
wroteContent = true
flush() flush()
case <-r.Context().Done():
return
} }
} }
} }

View File

@ -236,8 +236,7 @@ func waitRemoved(ctrWait containerWaitFn) (int32, error) {
func waitNextExit(ctx context.Context, containerName string) (int32, error) { func waitNextExit(ctx context.Context, containerName string) (int32, error) {
runtime := ctx.Value(api.RuntimeKey).(*libpod.Runtime) runtime := ctx.Value(api.RuntimeKey).(*libpod.Runtime)
containerEngine := &abi.ContainerEngine{Libpod: runtime} containerEngine := &abi.ContainerEngine{Libpod: runtime}
eventChannel := make(chan *events.Event) eventChannel := make(chan events.ReadResult)
errChannel := make(chan error)
opts := entities.EventsOptions{ opts := entities.EventsOptions{
EventChan: eventChannel, EventChan: eventChannel,
Filter: []string{"event=died", fmt.Sprintf("container=%s", containerName)}, Filter: []string{"event=died", fmt.Sprintf("container=%s", containerName)},
@ -247,21 +246,22 @@ func waitNextExit(ctx context.Context, containerName string) (int32, error) {
// ctx is used to cancel event watching goroutine // ctx is used to cancel event watching goroutine
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
go func() { err := containerEngine.Events(ctx, opts)
errChannel <- containerEngine.Events(ctx, opts) if err != nil {
}() return -1, err
}
evt, ok := <-eventChannel for evt := range eventChannel {
if ok { if evt.Error == nil {
if evt.ContainerExitCode != nil { if evt.Event.ContainerExitCode != nil {
return int32(*evt.ContainerExitCode), nil return int32(*evt.Event.ContainerExitCode), nil
} }
return -1, nil
} }
// if ok == false then containerEngine.Events() has exited }
// if we are here then containerEngine.Events() has exited
// it may happen if request was canceled (e.g. client closed connection prematurely) or // it may happen if request was canceled (e.g. client closed connection prematurely) or
// the server is in process of shutting down // the server is in process of shutting down
return -1, <-errChannel return -1, nil
} }
func waitNotRunning(ctrWait containerWaitFn) (int32, error) { func waitNotRunning(ctrWait containerWaitFn) (int32, error) {

View File

@ -3,9 +3,7 @@ package system
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io"
"net/http" "net/http"
"time" "time"
@ -31,7 +29,6 @@ func Events(ctx context.Context, eventChan chan types.Event, cancelChan chan boo
if err != nil { if err != nil {
return err return err
} }
defer response.Body.Close()
if cancelChan != nil { if cancelChan != nil {
go func() { go func() {
@ -43,9 +40,13 @@ func Events(ctx context.Context, eventChan chan types.Event, cancelChan chan boo
} }
if response.StatusCode != http.StatusOK { if response.StatusCode != http.StatusOK {
defer response.Body.Close()
return response.Process(nil) return response.Process(nil)
} }
go func() {
defer response.Body.Close()
defer close(eventChan)
dec := json.NewDecoder(response.Body) dec := json.NewDecoder(response.Body)
for err = (error)(nil); err == nil; { for err = (error)(nil); err == nil; {
var e = types.Event{} var e = types.Event{}
@ -54,15 +55,8 @@ func Events(ctx context.Context, eventChan chan types.Event, cancelChan chan boo
eventChan <- e eventChan <- e
} }
} }
close(eventChan) }()
switch {
case err == nil:
return nil return nil
case errors.Is(err, io.EOF):
return nil
default:
return fmt.Errorf("unable to decode event response: %w", err)
}
} }
// Prune removes all unused system data. // Prune removes all unused system data.

View File

@ -90,7 +90,7 @@ type DiffReport struct {
type EventsOptions struct { type EventsOptions struct {
FromStart bool FromStart bool
EventChan chan *events.Event EventChan chan events.ReadResult
Filter []string Filter []string
Stream bool Stream bool
Since string Since string

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/containers/podman/v5/libpod/events"
"github.com/containers/podman/v5/pkg/bindings/system" "github.com/containers/podman/v5/pkg/bindings/system"
"github.com/containers/podman/v5/pkg/domain/entities" "github.com/containers/podman/v5/pkg/domain/entities"
) )
@ -23,7 +24,7 @@ func (ic *ContainerEngine) Events(ctx context.Context, opts entities.EventsOptio
binChan := make(chan entities.Event) binChan := make(chan entities.Event)
go func() { go func() {
for e := range binChan { for e := range binChan {
opts.EventChan <- entities.ConvertToLibpodEvent(e) opts.EventChan <- events.ReadResult{Event: entities.ConvertToLibpodEvent(e)}
} }
close(opts.EventChan) close(opts.EventChan)
}() }()

View File

@ -32,4 +32,8 @@ t GET "events?stream=false&since=$START&type=remove" 200 \
'select(.status | contains("remove")).Action=remove' \ 'select(.status | contains("remove")).Action=remove' \
'select(.status | contains("remove")).Actor.Attributes.containerExitCode=1' 'select(.status | contains("remove")).Actor.Attributes.containerExitCode=1'
APIV2_TEST_EXPECT_TIMEOUT=1 t GET "events?stream=true" 999
like "$(<$WORKDIR/curl.headers.out)" ".*HTTP.* 200 OK.*" \
"Received headers from /events"
# vim: filetype=sh # vim: filetype=sh