Merge pull request #6838 from mheon/fix_panic_events

Fix `system service` panic from early hangup in events
This commit is contained in:
OpenShift Merge Robot 2020-07-02 12:36:04 -04:00 committed by GitHub
commit 22def2e2d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 43 additions and 17 deletions

View File

@ -1,6 +1,7 @@
package libpod package libpod
import ( import (
"context"
"fmt" "fmt"
"github.com/containers/libpod/libpod/events" "github.com/containers/libpod/libpod/events"
@ -75,16 +76,16 @@ func (v *Volume) newVolumeEvent(status events.Status) {
// Events is a wrapper function for everyone to begin tailing the events log // Events is a wrapper function for everyone to begin tailing the events log
// with options // with options
func (r *Runtime) Events(options events.ReadOptions) error { func (r *Runtime) Events(ctx context.Context, options events.ReadOptions) error {
eventer, err := r.newEventer() eventer, err := r.newEventer()
if err != nil { if err != nil {
return err return err
} }
return eventer.Read(options) return eventer.Read(ctx, options)
} }
// GetEvents reads the event log and returns events based on input filters // GetEvents reads the event log and returns events based on input filters
func (r *Runtime) GetEvents(filters []string) ([]*events.Event, error) { func (r *Runtime) GetEvents(ctx context.Context, filters []string) ([]*events.Event, error) {
var readErr error var readErr error
eventChannel := make(chan *events.Event) eventChannel := make(chan *events.Event)
options := events.ReadOptions{ options := events.ReadOptions{
@ -98,7 +99,7 @@ func (r *Runtime) GetEvents(filters []string) ([]*events.Event, error) {
return nil, err return nil, err
} }
go func() { go func() {
readErr = eventer.Read(options) readErr = eventer.Read(ctx, options)
}() }()
if readErr != nil { if readErr != nil {
return nil, readErr return nil, readErr
@ -112,7 +113,7 @@ func (r *Runtime) GetEvents(filters []string) ([]*events.Event, error) {
// GetLastContainerEvent takes a container name or ID and an event status and returns // GetLastContainerEvent takes a container name or ID and an event status and returns
// the last occurrence of the container event // the last occurrence of the container event
func (r *Runtime) GetLastContainerEvent(nameOrID string, containerEvent events.Status) (*events.Event, error) { func (r *Runtime) GetLastContainerEvent(ctx context.Context, nameOrID string, containerEvent events.Status) (*events.Event, error) {
// check to make sure the event.Status is valid // check to make sure the event.Status is valid
if _, err := events.StringToStatus(containerEvent.String()); err != nil { if _, err := events.StringToStatus(containerEvent.String()); err != nil {
return nil, err return nil, err
@ -122,7 +123,7 @@ func (r *Runtime) GetLastContainerEvent(nameOrID string, containerEvent events.S
fmt.Sprintf("event=%s", containerEvent), fmt.Sprintf("event=%s", containerEvent),
"type=container", "type=container",
} }
containerEvents, err := r.GetEvents(filters) containerEvents, err := r.GetEvents(ctx, filters)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -1,6 +1,7 @@
package events package events
import ( import (
"context"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -52,7 +53,7 @@ type Eventer interface {
// Write an event to a backend // Write an event to a backend
Write(event Event) error Write(event Event) error
// Read an event from the backend // Read an event from the backend
Read(options ReadOptions) error Read(ctx context.Context, options ReadOptions) error
// String returns the type of event logger // String returns the type of event logger
String() string String() string
} }

View File

@ -3,6 +3,7 @@
package events package events
import ( import (
"context"
"fmt" "fmt"
"strconv" "strconv"
"time" "time"
@ -53,7 +54,7 @@ func (e EventJournalD) Write(ee Event) error {
} }
// Read reads events from the journal and sends qualified events to the event channel // Read reads events from the journal and sends qualified events to the event channel
func (e EventJournalD) Read(options ReadOptions) error { func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
defer close(options.EventChannel) defer close(options.EventChannel)
eventOptions, err := generateEventOptions(options.Filters, options.Since, options.Until) eventOptions, err := generateEventOptions(options.Filters, options.Since, options.Until)
if err != nil { if err != nil {

View File

@ -1,6 +1,7 @@
package events package events
import ( import (
"context"
"fmt" "fmt"
"os" "os"
@ -40,7 +41,7 @@ func (e EventLogFile) Write(ee Event) error {
} }
// Reads from the log file // Reads from the log file
func (e EventLogFile) Read(options ReadOptions) error { func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error {
defer close(options.EventChannel) defer close(options.EventChannel)
eventOptions, err := generateEventOptions(options.Filters, options.Since, options.Until) eventOptions, err := generateEventOptions(options.Filters, options.Since, options.Until)
if err != nil { if err != nil {
@ -50,6 +51,17 @@ func (e EventLogFile) Read(options ReadOptions) error {
if err != nil { if err != nil {
return err return err
} }
funcDone := make(chan bool)
copy := true
go func() {
select {
case <-funcDone:
// Do nothing
case <-ctx.Done():
copy = false
t.Kill(errors.New("hangup by client"))
}
}()
for line := range t.Lines { for line := range t.Lines {
event, err := newEventFromJSONString(line.Text) event, err := newEventFromJSONString(line.Text)
if err != nil { if err != nil {
@ -65,10 +77,11 @@ func (e EventLogFile) Read(options ReadOptions) error {
for _, filter := range eventOptions { for _, filter := range eventOptions {
include = include && filter(event) include = include && filter(event)
} }
if include { if include && copy {
options.EventChannel <- event options.EventChannel <- event
} }
} }
funcDone <- true
return nil return nil
} }

View File

@ -1,5 +1,9 @@
package events package events
import (
"context"
)
// EventToNull is an eventer type that only performs write operations // EventToNull is an eventer type that only performs write operations
// and only writes to /dev/null. It is meant for unittests only // and only writes to /dev/null. It is meant for unittests only
type EventToNull struct{} type EventToNull struct{}
@ -10,7 +14,7 @@ func (e EventToNull) Write(ee Event) error {
} }
// Read does nothing. Do not use it. // Read does nothing. Do not use it.
func (e EventToNull) Read(options ReadOptions) error { func (e EventToNull) Read(ctx context.Context, options ReadOptions) error {
return nil return nil
} }

View File

@ -1,6 +1,7 @@
package compat package compat
import ( import (
"context"
"fmt" "fmt"
"net/http" "net/http"
@ -45,13 +46,15 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
fromStart = true fromStart = true
} }
eventCtx, eventCancel := context.WithCancel(r.Context())
eventChannel := make(chan *events.Event) eventChannel := make(chan *events.Event)
go func() { go func() {
readOpts := events.ReadOptions{FromStart: fromStart, Stream: query.Stream, Filters: libpodFilters, EventChannel: eventChannel, Since: query.Since, Until: query.Until} readOpts := events.ReadOptions{FromStart: fromStart, Stream: query.Stream, Filters: libpodFilters, EventChannel: eventChannel, Since: query.Since, Until: query.Until}
eventsError = runtime.Events(readOpts) eventsError = runtime.Events(eventCtx, readOpts)
}() }()
if eventsError != nil { if eventsError != nil {
utils.InternalServerError(w, eventsError) utils.InternalServerError(w, eventsError)
eventCancel()
close(eventChannel) close(eventChannel)
return return
} }
@ -59,6 +62,7 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
// If client disappears we need to stop listening for events // If client disappears we need to stop listening for events
go func(done <-chan struct{}) { go func(done <-chan struct{}) {
<-done <-done
eventCancel()
if _, ok := <-eventChannel; ok { if _, ok := <-eventChannel; ok {
close(eventChannel) close(eventChannel)
} }

View File

@ -741,7 +741,7 @@ func (ic *ContainerEngine) ContainerStart(ctx context.Context, namesOrIds []stri
if ecode, err := ctr.Wait(); err != nil { if ecode, err := ctr.Wait(); err != nil {
if errors.Cause(err) == define.ErrNoSuchCtr { if errors.Cause(err) == define.ErrNoSuchCtr {
// Check events // Check events
event, err := ic.Libpod.GetLastContainerEvent(ctr.ID(), events.Exited) event, err := ic.Libpod.GetLastContainerEvent(ctx, ctr.ID(), events.Exited)
if err != nil { if err != nil {
logrus.Errorf("Cannot get exit code: %v", err) logrus.Errorf("Cannot get exit code: %v", err)
exitCode = define.ExecErrorCodeNotFound exitCode = define.ExecErrorCodeNotFound
@ -871,7 +871,7 @@ func (ic *ContainerEngine) ContainerRun(ctx context.Context, opts entities.Conta
if ecode, err := ctr.Wait(); err != nil { if ecode, err := ctr.Wait(); err != nil {
if errors.Cause(err) == define.ErrNoSuchCtr { if errors.Cause(err) == define.ErrNoSuchCtr {
// Check events // Check events
event, err := ic.Libpod.GetLastContainerEvent(ctr.ID(), events.Exited) event, err := ic.Libpod.GetLastContainerEvent(ctx, ctr.ID(), events.Exited)
if err != nil { if err != nil {
logrus.Errorf("Cannot get exit code: %v", err) logrus.Errorf("Cannot get exit code: %v", err)
report.ExitCode = define.ExecErrorCodeNotFound report.ExitCode = define.ExecErrorCodeNotFound

View File

@ -9,5 +9,5 @@ import (
func (ic *ContainerEngine) Events(ctx context.Context, opts entities.EventsOptions) error { func (ic *ContainerEngine) Events(ctx context.Context, opts entities.EventsOptions) error {
readOpts := events.ReadOptions{FromStart: opts.FromStart, Stream: opts.Stream, Filters: opts.Filter, EventChannel: opts.EventChan, Since: opts.Since, Until: opts.Until} readOpts := events.ReadOptions{FromStart: opts.FromStart, Stream: opts.Stream, Filters: opts.Filter, EventChannel: opts.EventChan, Since: opts.Since, Until: opts.Until}
return ic.Libpod.Events(readOpts) return ic.Libpod.Events(ctx, readOpts)
} }

View File

@ -4,6 +4,7 @@ package varlinkapi
import ( import (
"bufio" "bufio"
"context"
"io" "io"
"github.com/containers/libpod/libpod" "github.com/containers/libpod/libpod"
@ -89,7 +90,7 @@ func (i *VarlinkAPI) Attach(call iopodman.VarlinkCall, name string, detachKeys s
if ecode, err := ctr.Wait(); err != nil { if ecode, err := ctr.Wait(); err != nil {
if errors.Cause(err) == define.ErrNoSuchCtr { if errors.Cause(err) == define.ErrNoSuchCtr {
// Check events // Check events
event, err := i.Runtime.GetLastContainerEvent(ctr.ID(), events.Exited) event, err := i.Runtime.GetLastContainerEvent(context.Background(), ctr.ID(), events.Exited)
if err != nil { if err != nil {
logrus.Errorf("Cannot get exit code: %v", err) logrus.Errorf("Cannot get exit code: %v", err)
exitCode = define.ExecErrorCodeNotFound exitCode = define.ExecErrorCodeNotFound

View File

@ -3,6 +3,7 @@
package varlinkapi package varlinkapi
import ( import (
"context"
"time" "time"
"github.com/containers/libpod/libpod/events" "github.com/containers/libpod/libpod/events"
@ -27,7 +28,7 @@ func (i *VarlinkAPI) GetEvents(call iopodman.VarlinkCall, filter []string, since
eventChannel := make(chan *events.Event) eventChannel := make(chan *events.Event)
go func() { go func() {
readOpts := events.ReadOptions{FromStart: fromStart, Stream: stream, Filters: filter, EventChannel: eventChannel} readOpts := events.ReadOptions{FromStart: fromStart, Stream: stream, Filters: filter, EventChannel: eventChannel}
eventsError = i.Runtime.Events(readOpts) eventsError = i.Runtime.Events(context.Background(), readOpts)
}() }()
if eventsError != nil { if eventsError != nil {
return call.ReplyErrorOccurred(eventsError.Error()) return call.ReplyErrorOccurred(eventsError.Error())