Merge pull request #24406 from Luap99/event-api-response

fix API issue about missing the status code in the events and logs endpoints
This commit is contained in:
openshift-merge-bot[bot] 2024-11-04 18:54:14 +00:00 committed by GitHub
commit 0f25d9ee15
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 214 additions and 285 deletions

View File

@ -12,6 +12,7 @@ import (
"github.com/containers/podman/v5/cmd/podman/validate"
"github.com/containers/podman/v5/libpod/events"
"github.com/containers/podman/v5/pkg/domain/entities"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
@ -139,9 +140,8 @@ func eventsCmd(cmd *cobra.Command, _ []string) error {
if len(eventOptions.Since) > 0 || len(eventOptions.Until) > 0 {
eventOptions.FromStart = true
}
eventChannel := make(chan *events.Event, 1)
eventChannel := make(chan events.ReadResult, 1)
eventOptions.EventChan = eventChannel
errChannel := make(chan error)
var (
rpt *report.Formatter
@ -161,40 +161,31 @@ func eventsCmd(cmd *cobra.Command, _ []string) error {
}
}
go func() {
errChannel <- registry.ContainerEngine().Events(context.Background(), eventOptions)
close(errChannel)
}()
err := registry.ContainerEngine().Events(context.Background(), eventOptions)
if err != nil {
return err
}
for {
select {
case event, ok := <-eventChannel:
if !ok {
// channel was closed we can exit
// read the error channel blocking to make sure we are not missing any errors (#23165)
return <-errChannel
}
switch {
case doJSON:
e := newEventFromLibpodEvent(event)
jsonStr, err := e.ToJSONString()
if err != nil {
return err
}
fmt.Println(jsonStr)
case cmd.Flags().Changed("format"):
if err := rpt.Execute(newEventFromLibpodEvent(event)); err != nil {
return err
}
default:
fmt.Println(event.ToHumanReadable(!noTrunc))
}
case err := <-errChannel:
// only exit in case of an error,
// otherwise keep reading events until the event channel is closed
for evt := range eventChannel {
if evt.Error != nil {
logrus.Errorf("Failed to read event: %v", evt.Error)
continue
}
switch {
case doJSON:
e := newEventFromLibpodEvent(evt.Event)
jsonStr, err := e.ToJSONString()
if err != nil {
return err
}
fmt.Println(jsonStr)
case cmd.Flags().Changed("format"):
if err := rpt.Execute(newEventFromLibpodEvent(evt.Event)); err != nil {
return err
}
default:
fmt.Println(evt.Event.ToHumanReadable(!noTrunc))
}
}
return nil
}

View File

@ -10,7 +10,6 @@ import (
"time"
"github.com/containers/podman/v5/libpod/define"
"github.com/containers/podman/v5/libpod/events"
"github.com/containers/podman/v5/libpod/logs"
systemdDefine "github.com/containers/podman/v5/pkg/systemd/define"
"github.com/nxadm/tail"
@ -139,20 +138,10 @@ func (c *Container) readFromLogFile(ctx context.Context, options *logs.LogOption
// The container is running, so we need to wait until the container exited
go func() {
eventChannel := make(chan *events.Event)
eventOptions := events.ReadOptions{
EventChannel: eventChannel,
Filters: []string{"event=died", "container=" + c.ID()},
Stream: true,
_, err = c.Wait(ctx)
if err != nil && !errors.Is(err, define.ErrNoSuchCtr) {
logrus.Errorf("Waiting for container to exit: %v", err)
}
go func() {
if err := c.runtime.Events(ctx, eventOptions); err != nil {
logrus.Errorf("Waiting for container to exit: %v", err)
}
}()
// Now wait for the died event and signal to finish
// reading the log until EOF.
<-eventChannel
// Make sure to wait at least for the poll duration
// before stopping the file logger (see #10675).
time.Sleep(watch.POLL_DURATION)

View File

@ -6,7 +6,6 @@ import (
"context"
"fmt"
"path/filepath"
"sync"
"github.com/containers/podman/v5/libpod/define"
"github.com/containers/podman/v5/libpod/events"
@ -187,7 +186,7 @@ func (r *Runtime) Events(ctx context.Context, options events.ReadOptions) error
// GetEvents reads the event log and returns events based on input filters
func (r *Runtime) GetEvents(ctx context.Context, filters []string) ([]*events.Event, error) {
eventChannel := make(chan *events.Event)
eventChannel := make(chan events.ReadResult)
options := events.ReadOptions{
EventChannel: eventChannel,
Filters: filters,
@ -195,45 +194,21 @@ func (r *Runtime) GetEvents(ctx context.Context, filters []string) ([]*events.Ev
Stream: false,
}
logEvents := make([]*events.Event, 0, len(eventChannel))
readLock := sync.Mutex{}
readLock.Lock()
go func() {
for e := range eventChannel {
logEvents = append(logEvents, e)
}
readLock.Unlock()
}()
readErr := r.eventer.Read(ctx, options)
readLock.Lock() // Wait for the events to be consumed.
return logEvents, readErr
}
// GetLastContainerEvent takes a container name or ID and an event status and returns
// the last occurrence of the container event
func (r *Runtime) GetLastContainerEvent(ctx context.Context, nameOrID string, containerEvent events.Status) (*events.Event, error) {
// FIXME: events should be read in reverse order!
// https://github.com/containers/podman/issues/14579
// check to make sure the event.Status is valid
if _, err := events.StringToStatus(containerEvent.String()); err != nil {
return nil, err
}
filters := []string{
fmt.Sprintf("container=%s", nameOrID),
fmt.Sprintf("event=%s", containerEvent),
"type=container",
}
containerEvents, err := r.GetEvents(ctx, filters)
err := r.eventer.Read(ctx, options)
if err != nil {
return nil, err
}
if len(containerEvents) < 1 {
return nil, fmt.Errorf("%s not found: %w", containerEvent.String(), events.ErrEventNotFound)
logEvents := make([]*events.Event, 0, len(eventChannel))
for evt := range eventChannel {
// we ignore any error here, this is only used on the backup
// GetExecDiedEvent() died path as best effort anyway
if evt.Error == nil {
logEvents = append(logEvents, evt.Event)
}
}
// return the last element in the slice
return containerEvents[len(containerEvents)-1], nil
return logEvents, nil
}
// GetExecDiedEvent takes a container name or ID, exec session ID, and returns

View File

@ -16,8 +16,6 @@ const (
Journald EventerType = iota
// Null is a no-op events logger. It does not read or write events.
Null EventerType = iota
// Memory indicates the event logger will hold events in memory
Memory EventerType = iota
)
// Event describes the attributes of a libpod event
@ -87,10 +85,15 @@ type Eventer interface {
String() string
}
type ReadResult struct {
Event *Event
Error error
}
// ReadOptions describe the attributes needed to read event logs
type ReadOptions struct {
// EventChannel is the comm path back to user
EventChannel chan *Event
EventChannel chan ReadResult
// Filters are key/value pairs that describe to limit output
Filters []string
// FromStart means you start reading from the start of the logs

View File

@ -20,8 +20,6 @@ func (et EventerType) String() string {
return "file"
case Journald:
return "journald"
case Memory:
return "memory"
case Null:
return "none"
default:
@ -36,8 +34,6 @@ func IsValidEventer(eventer string) bool {
return true
case Journald.String():
return true
case Memory.String():
return true
case Null.String():
return true
default:

View File

@ -15,8 +15,6 @@ func NewEventer(options EventerOptions) (Eventer, error) {
return EventLogFile{options}, nil
case strings.ToUpper(Null.String()):
return newNullEventer(), nil
case strings.ToUpper(Memory.String()):
return NewMemoryEventer(), nil
default:
return nil, fmt.Errorf("unknown event logger type: %s", strings.ToUpper(options.EventerType))
}

View File

@ -21,8 +21,6 @@ func NewEventer(options EventerOptions) (Eventer, error) {
return newLogFileEventer(options)
case strings.ToUpper(Null.String()):
return newNullEventer(), nil
case strings.ToUpper(Memory.String()):
return NewMemoryEventer(), nil
default:
return nil, fmt.Errorf("unknown event logger type: %s", strings.ToUpper(options.EventerType))
}

View File

@ -7,6 +7,7 @@ import (
"encoding/json"
"errors"
"fmt"
"runtime"
"strconv"
"time"
@ -97,8 +98,9 @@ func (e EventJournalD) Write(ee Event) error {
}
// Read reads events from the journal and sends qualified events to the event channel
func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
defer close(options.EventChannel)
func (e EventJournalD) Read(ctx context.Context, options ReadOptions) (retErr error) {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
filterMap, err := generateEventFilters(options.Filters, options.Since, options.Until)
if err != nil {
return fmt.Errorf("failed to parse event filters: %w", err)
@ -117,13 +119,15 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
return err
}
defer func() {
if err := j.Close(); err != nil {
logrus.Errorf("Unable to close journal :%v", err)
if retErr != nil {
if err := j.Close(); err != nil {
logrus.Errorf("Unable to close journal :%v", err)
}
}
}()
err = j.SetDataThreshold(0)
if err != nil {
logrus.Warnf("cannot set data threshold: %v", err)
return fmt.Errorf("cannot set data threshold for journal: %v", err)
}
// match only podman journal entries
podmanJournal := sdjournal.Match{Field: "SYSLOG_IDENTIFIER", Value: "podman"}
@ -158,30 +162,40 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
}
}
for {
entry, err := GetNextEntry(ctx, j, options.Stream, untilTime)
if err != nil {
return err
}
// no entry == we hit the end
if entry == nil {
return nil
}
newEvent, err := newEventFromJournalEntry(entry)
if err != nil {
// We can't decode this event.
// Don't fail hard - that would make events unusable.
// Instead, log and continue.
if !errors.Is(err, ErrEventTypeBlank) {
logrus.Errorf("Unable to decode event: %v", err)
go func() {
defer close(options.EventChannel)
defer func() {
if err := j.Close(); err != nil {
logrus.Errorf("Unable to close journal :%v", err)
}
}()
for {
entry, err := GetNextEntry(ctx, j, options.Stream, untilTime)
if err != nil {
options.EventChannel <- ReadResult{Error: err}
break
}
// no entry == we hit the end
if entry == nil {
break
}
newEvent, err := newEventFromJournalEntry(entry)
if err != nil {
// We can't decode this event.
// Don't fail hard - that would make events unusable.
// Instead, log and continue.
if !errors.Is(err, ErrEventTypeBlank) {
options.EventChannel <- ReadResult{Error: fmt.Errorf("unable to decode event: %v", err)}
}
continue
}
if applyFilters(newEvent, filterMap) {
options.EventChannel <- ReadResult{Event: newEvent}
}
continue
}
if applyFilters(newEvent, filterMap) {
options.EventChannel <- newEvent
}
}
}()
return nil
}
func newEventFromJournalEntry(entry *sdjournal.JournalEntry) (*Event, error) {

View File

@ -108,7 +108,6 @@ func (e EventLogFile) readRotateEvent(event *Event) (begin bool, end bool, err e
// Reads from the log file
func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error {
defer close(options.EventChannel)
filterMap, err := generateEventFilters(options.Filters, options.Since, options.Until)
if err != nil {
return fmt.Errorf("failed to parse event filters: %w", err)
@ -148,56 +147,65 @@ func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error {
return err
}
var line *tail.Line
var ok bool
var skipRotate bool
for {
select {
case <-ctx.Done():
// the consumer has cancelled
t.Kill(errors.New("hangup by client"))
return nil
case line, ok = <-t.Lines:
if !ok {
// channel was closed
return nil
go func() {
defer close(options.EventChannel)
var line *tail.Line
var ok bool
var skipRotate bool
for {
select {
case <-ctx.Done():
// the consumer has cancelled
t.Kill(errors.New("hangup by client"))
return
case line, ok = <-t.Lines:
if !ok {
// channel was closed
return
}
// fallthrough
}
// fallthrough
}
event, err := newEventFromJSONString(line.Text)
if err != nil {
return err
}
switch event.Type {
case Image, Volume, Pod, Container, Network:
// no-op
case System:
begin, end, err := e.readRotateEvent(event)
event, err := newEventFromJSONString(line.Text)
if err != nil {
return err
err := fmt.Errorf("event type %s is not valid in %s", event.Type.String(), e.options.LogFilePath)
options.EventChannel <- ReadResult{Error: err}
continue
}
if begin && event.Time.After(readTime) {
// If the rotation event happened _after_ we
// started reading, we need to ignore/skip
// subsequent event until the end of the
// rotation.
skipRotate = true
logrus.Debugf("Skipping already read events after log-file rotation: %v", event)
} else if end {
// This rotate event
skipRotate = false
switch event.Type {
case Image, Volume, Pod, Container, Network:
// no-op
case System:
begin, end, err := e.readRotateEvent(event)
if err != nil {
options.EventChannel <- ReadResult{Error: err}
continue
}
if begin && event.Time.After(readTime) {
// If the rotation event happened _after_ we
// started reading, we need to ignore/skip
// subsequent event until the end of the
// rotation.
skipRotate = true
logrus.Debugf("Skipping already read events after log-file rotation: %v", event)
} else if end {
// This rotate event
skipRotate = false
}
default:
err := fmt.Errorf("event type %s is not valid in %s", event.Type.String(), e.options.LogFilePath)
options.EventChannel <- ReadResult{Error: err}
continue
}
if skipRotate {
continue
}
if applyFilters(event, filterMap) {
options.EventChannel <- ReadResult{Event: event}
}
default:
return fmt.Errorf("event type %s is not valid in %s", event.Type.String(), e.options.LogFilePath)
}
if skipRotate {
continue
}
if applyFilters(event, filterMap) {
options.EventChannel <- event
}
}
}()
return nil
}
// String returns a string representation of the logger

View File

@ -1,49 +0,0 @@
package events
import (
"context"
)
// EventMemory is the structure for event writing to a channel. It contains the eventer
// options and the event itself. Methods for reading and writing are also defined from it.
type EventMemory struct {
options EventerOptions
elements chan *Event
}
// Write event to memory queue
func (e EventMemory) Write(event Event) (err error) {
e.elements <- &event
return
}
// Read event(s) from memory queue
func (e EventMemory) Read(ctx context.Context, options ReadOptions) (err error) {
select {
case <-ctx.Done():
return
default:
}
select {
case event := <-e.elements:
options.EventChannel <- event
default:
}
return nil
}
// String returns eventer type
func (e EventMemory) String() string {
return e.options.EventerType
}
// NewMemoryEventer returns configured MemoryEventer
func NewMemoryEventer() Eventer {
return EventMemory{
options: EventerOptions{
EventerType: Memory.String(),
},
elements: make(chan *Event, 100),
}
}

View File

@ -106,6 +106,13 @@ func LogsFromContainer(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
flush := func() {
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
}
flush()
var frame strings.Builder
header := make([]byte, 8)
@ -167,8 +174,6 @@ func LogsFromContainer(w http.ResponseWriter, r *http.Request) {
if _, err := io.WriteString(w, frame.String()); err != nil {
log.Errorf("unable to write frame string: %q", err)
}
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
flush()
}
}

View File

@ -48,21 +48,21 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
utils.Error(w, http.StatusBadRequest, fmt.Errorf("failed to parse filters for %s: %w", r.URL.String(), err))
return
}
eventChannel := make(chan *events.Event)
errorChannel := make(chan error)
eventChannel := make(chan events.ReadResult)
// Start reading events.
go func() {
readOpts := events.ReadOptions{
FromStart: fromStart,
Stream: query.Stream,
Filters: libpodFilters,
EventChannel: eventChannel,
Since: query.Since,
Until: query.Until,
}
errorChannel <- runtime.Events(r.Context(), readOpts)
}()
readOpts := events.ReadOptions{
FromStart: fromStart,
Stream: query.Stream,
Filters: libpodFilters,
EventChannel: eventChannel,
Since: query.Since,
Until: query.Until,
}
err = runtime.Events(r.Context(), readOpts)
if err != nil {
utils.InternalServerError(w, err)
return
}
flush := func() {}
if flusher, ok := w.(http.Flusher); ok {
@ -70,31 +70,29 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
}
w.Header().Set("Content-Type", "application/json")
wroteContent := false
defer func() {
if !wroteContent {
w.WriteHeader(http.StatusOK)
flush()
}
}()
w.WriteHeader(http.StatusOK)
flush()
coder := json.NewEncoder(w)
coder.SetEscapeHTML(true)
for {
select {
case err := <-errorChannel:
if err != nil {
utils.InternalServerError(w, err)
wroteContent = true
}
case <-r.Context().Done():
return
case evt := <-eventChannel:
if evt == nil {
case evt, ok := <-eventChannel:
if !ok {
return
}
if evt.Error != nil {
logrus.Errorf("Unable to read event: %q", err)
continue
}
if evt.Event == nil {
continue
}
e := entities.ConvertToEntitiesEvent(*evt)
e := entities.ConvertToEntitiesEvent(*evt.Event)
// Some events differ between Libpod and Docker endpoints.
// Handle these differences for Docker-compat.
if !utils.IsLibpodRequest(r) && e.Type == "image" && e.Status == "remove" {
@ -110,10 +108,7 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
if err := coder.Encode(e); err != nil {
logrus.Errorf("Unable to write json: %q", err)
}
wroteContent = true
flush()
case <-r.Context().Done():
return
}
}
}

View File

@ -236,8 +236,7 @@ func waitRemoved(ctrWait containerWaitFn) (int32, error) {
func waitNextExit(ctx context.Context, containerName string) (int32, error) {
runtime := ctx.Value(api.RuntimeKey).(*libpod.Runtime)
containerEngine := &abi.ContainerEngine{Libpod: runtime}
eventChannel := make(chan *events.Event)
errChannel := make(chan error)
eventChannel := make(chan events.ReadResult)
opts := entities.EventsOptions{
EventChan: eventChannel,
Filter: []string{"event=died", fmt.Sprintf("container=%s", containerName)},
@ -247,21 +246,22 @@ func waitNextExit(ctx context.Context, containerName string) (int32, error) {
// ctx is used to cancel event watching goroutine
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
errChannel <- containerEngine.Events(ctx, opts)
}()
evt, ok := <-eventChannel
if ok {
if evt.ContainerExitCode != nil {
return int32(*evt.ContainerExitCode), nil
}
return -1, nil
err := containerEngine.Events(ctx, opts)
if err != nil {
return -1, err
}
// if ok == false then containerEngine.Events() has exited
for evt := range eventChannel {
if evt.Error == nil {
if evt.Event.ContainerExitCode != nil {
return int32(*evt.Event.ContainerExitCode), nil
}
}
}
// if we are here then containerEngine.Events() has exited
// it may happen if request was canceled (e.g. client closed connection prematurely) or
// the server is in process of shutting down
return -1, <-errChannel
return -1, nil
}
func waitNotRunning(ctrWait containerWaitFn) (int32, error) {

View File

@ -3,9 +3,7 @@ package system
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"time"
@ -31,7 +29,6 @@ func Events(ctx context.Context, eventChan chan types.Event, cancelChan chan boo
if err != nil {
return err
}
defer response.Body.Close()
if cancelChan != nil {
go func() {
@ -43,26 +40,23 @@ func Events(ctx context.Context, eventChan chan types.Event, cancelChan chan boo
}
if response.StatusCode != http.StatusOK {
defer response.Body.Close()
return response.Process(nil)
}
dec := json.NewDecoder(response.Body)
for err = (error)(nil); err == nil; {
var e = types.Event{}
err = dec.Decode(&e)
if err == nil {
eventChan <- e
go func() {
defer response.Body.Close()
defer close(eventChan)
dec := json.NewDecoder(response.Body)
for err = (error)(nil); err == nil; {
var e = types.Event{}
err = dec.Decode(&e)
if err == nil {
eventChan <- e
}
}
}
close(eventChan)
switch {
case err == nil:
return nil
case errors.Is(err, io.EOF):
return nil
default:
return fmt.Errorf("unable to decode event response: %w", err)
}
}()
return nil
}
// Prune removes all unused system data.

View File

@ -90,7 +90,7 @@ type DiffReport struct {
type EventsOptions struct {
FromStart bool
EventChan chan *events.Event
EventChan chan events.ReadResult
Filter []string
Stream bool
Since string

View File

@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"github.com/containers/podman/v5/libpod/events"
"github.com/containers/podman/v5/pkg/bindings/system"
"github.com/containers/podman/v5/pkg/domain/entities"
)
@ -23,7 +24,7 @@ func (ic *ContainerEngine) Events(ctx context.Context, opts entities.EventsOptio
binChan := make(chan entities.Event)
go func() {
for e := range binChan {
opts.EventChan <- entities.ConvertToLibpodEvent(e)
opts.EventChan <- events.ReadResult{Event: entities.ConvertToLibpodEvent(e)}
}
close(opts.EventChan)
}()

View File

@ -150,6 +150,13 @@ podman run --name $CTRNAME -d $IMAGE sleep 25
t GET containers/$CTRNAME/top?stream=false 200 \
.Processes.[0].[6]="00:00:00" \
.Processes.[0].[7]="sleep 25"
# check logs output, IMPORTANT the container should write no logs to reproduce #23712
APIV2_TEST_EXPECT_TIMEOUT=1 t GET "containers/${CTRNAME}/logs?follow=true&stdout=true&stderr=true" 999
is "" "$(<$WORKDIR/curl.result.out)" "Container MUST NOT log output"
like "$(<$WORKDIR/curl.headers.out)" ".*HTTP.* 200 OK.*" \
"Received headers from /container/<id>/logs"
podman rm -f -t0 $CTRNAME
CTRNAME=test123

View File

@ -32,4 +32,8 @@ t GET "events?stream=false&since=$START&type=remove" 200 \
'select(.status | contains("remove")).Action=remove' \
'select(.status | contains("remove")).Actor.Attributes.containerExitCode=1'
APIV2_TEST_EXPECT_TIMEOUT=1 t GET "events?stream=true" 999
like "$(<$WORKDIR/curl.headers.out)" ".*HTTP.* 200 OK.*" \
"Received headers from /events"
# vim: filetype=sh