Merge pull request #6902 from vrothberg/events-endpoint

events endpoint: fix panic and race condition
This commit is contained in:
OpenShift Merge Robot 2020-07-21 10:55:40 -04:00 committed by GitHub
commit f8e2a3500e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 142 additions and 64 deletions

View File

@ -90,6 +90,13 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
return err return err
} }
for { for {
select {
case <-ctx.Done():
// the consumer has cancelled
return nil
default:
// fallthrough
}
if _, err := j.Next(); err != nil { if _, err := j.Next(); err != nil {
return err return err
} }

View File

@ -63,6 +63,14 @@ func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error {
} }
}() }()
for line := range t.Lines { for line := range t.Lines {
select {
case <-ctx.Done():
// the consumer has cancelled
return nil
default:
// fallthrough
}
event, err := newEventFromJSONString(line.Text) event, err := newEventFromJSONString(line.Text)
if err != nil { if err != nil {
return err return err

View File

@ -1,9 +1,10 @@
package compat package compat
import ( import (
"context" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"sync"
"github.com/containers/libpod/v2/libpod" "github.com/containers/libpod/v2/libpod"
"github.com/containers/libpod/v2/libpod/events" "github.com/containers/libpod/v2/libpod/events"
@ -15,77 +16,132 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// filtersFromRequests extracts the "filters" parameter from the specified
// http.Request. The paramater can either be a `map[string][]string` as done
// in new versions of Docker and libpod, or a `map[string]map[string]bool` as
// done in older versions of Docker. We have to do a bit of Yoga to support
// both - just as Docker does as well.
//
// Please refer to https://github.com/containers/podman/issues/6899 for some
// background.
func filtersFromRequest(r *http.Request) ([]string, error) {
var (
compatFilters map[string]map[string]bool
filters map[string][]string
libpodFilters []string
)
raw := []byte(r.Form.Get("filters"))
// Backwards compat with older versions of Docker.
if err := json.Unmarshal(raw, &compatFilters); err == nil {
for filterKey, filterMap := range compatFilters {
for filterValue, toAdd := range filterMap {
if toAdd {
libpodFilters = append(libpodFilters, fmt.Sprintf("%s=%s", filterKey, filterValue))
}
}
}
return libpodFilters, nil
}
if err := json.Unmarshal(raw, &filters); err != nil {
return nil, err
}
for filterKey, filterSlice := range filters {
for _, filterValue := range filterSlice {
libpodFilters = append(libpodFilters, fmt.Sprintf("%s=%s", filterKey, filterValue))
}
}
return libpodFilters, nil
}
// NOTE: this endpoint serves both the docker-compatible one and the new libpod
// one.
func GetEvents(w http.ResponseWriter, r *http.Request) { func GetEvents(w http.ResponseWriter, r *http.Request) {
var ( var (
fromStart bool fromStart bool
eventsError error decoder = r.Context().Value("decoder").(*schema.Decoder)
decoder = r.Context().Value("decoder").(*schema.Decoder) runtime = r.Context().Value("runtime").(*libpod.Runtime)
runtime = r.Context().Value("runtime").(*libpod.Runtime) json = jsoniter.ConfigCompatibleWithStandardLibrary // FIXME: this should happen on the package level
) )
// NOTE: the "filters" parameter is extracted separately for backwards
// compat via `fitlerFromRequest()`.
query := struct { query := struct {
Since string `schema:"since"` Since string `schema:"since"`
Until string `schema:"until"` Until string `schema:"until"`
Filters map[string][]string `schema:"filters"` Stream bool `schema:"stream"`
Stream bool `schema:"stream"`
}{ }{
Stream: true, Stream: true,
} }
if err := decoder.Decode(&query, r.URL.Query()); err != nil { if err := decoder.Decode(&query, r.URL.Query()); err != nil {
utils.Error(w, "Failed to parse parameters", http.StatusBadRequest, errors.Wrapf(err, "Failed to parse parameters for %s", r.URL.String())) utils.Error(w, "Failed to parse parameters", http.StatusBadRequest, errors.Wrapf(err, "Failed to parse parameters for %s", r.URL.String()))
} return
var libpodFilters = []string{}
if _, found := r.URL.Query()["filters"]; found {
for k, v := range query.Filters {
libpodFilters = append(libpodFilters, fmt.Sprintf("%s=%s", k, v[0]))
}
} }
if len(query.Since) > 0 || len(query.Until) > 0 { if len(query.Since) > 0 || len(query.Until) > 0 {
fromStart = true fromStart = true
} }
eventCtx, eventCancel := context.WithCancel(r.Context()) libpodFilters, err := filtersFromRequest(r)
eventChannel := make(chan *events.Event) if err != nil {
go func() { utils.Error(w, "Failed to parse parameters", http.StatusBadRequest, errors.Wrapf(err, "Failed to parse parameters for %s", r.URL.String()))
readOpts := events.ReadOptions{FromStart: fromStart, Stream: query.Stream, Filters: libpodFilters, EventChannel: eventChannel, Since: query.Since, Until: query.Until}
eventsError = runtime.Events(eventCtx, readOpts)
}()
if eventsError != nil {
utils.InternalServerError(w, eventsError)
eventCancel()
close(eventChannel)
return return
} }
// If client disappears we need to stop listening for events eventChannel := make(chan *events.Event)
go func(done <-chan struct{}) { errorChannel := make(chan error)
<-done
eventCancel()
if _, ok := <-eventChannel; ok {
close(eventChannel)
}
}(r.Context().Done())
// Headers need to be written out before turning Writer() over to json encoder // Start reading events.
w.Header().Set("Content-Type", "application/json") go func() {
w.WriteHeader(http.StatusOK) readOpts := events.ReadOptions{
if flusher, ok := w.(http.Flusher); ok { FromStart: fromStart,
flusher.Flush() Stream: query.Stream,
} Filters: libpodFilters,
EventChannel: eventChannel,
json := jsoniter.ConfigCompatibleWithStandardLibrary Since: query.Since,
coder := json.NewEncoder(w) Until: query.Until,
coder.SetEscapeHTML(true)
for event := range eventChannel {
e := entities.ConvertToEntitiesEvent(*event)
if err := coder.Encode(e); err != nil {
logrus.Errorf("unable to write json: %q", err)
} }
if flusher, ok := w.(http.Flusher); ok { errorChannel <- runtime.Events(r.Context(), readOpts)
flusher.Flush() }()
var coder *jsoniter.Encoder
var writeHeader sync.Once
for stream := true; stream; stream = query.Stream {
select {
case err := <-errorChannel:
if err != nil {
utils.InternalServerError(w, err)
return
}
case evt := <-eventChannel:
writeHeader.Do(func() {
// Use a sync.Once so that we write the header
// only once.
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
coder = json.NewEncoder(w)
coder.SetEscapeHTML(true)
})
if evt == nil {
continue
}
e := entities.ConvertToEntitiesEvent(*evt)
if err := coder.Encode(e); err != nil {
logrus.Errorf("unable to write json: %q", err)
}
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
} }
} }
} }

View File

@ -1,6 +1,7 @@
package test_bindings package test_bindings
import ( import (
"sync"
"time" "time"
"github.com/containers/libpod/v2/pkg/bindings" "github.com/containers/libpod/v2/pkg/bindings"
@ -38,22 +39,28 @@ var _ = Describe("Podman system", func() {
}) })
It("podman events", func() { It("podman events", func() {
eChan := make(chan entities.Event, 1) var name = "top"
var messages []entities.Event _, err := bt.RunTopContainer(&name, bindings.PFalse, nil)
cancelChan := make(chan bool, 1) Expect(err).To(BeNil())
filters := make(map[string][]string)
filters["container"] = []string{name}
binChan := make(chan entities.Event)
done := sync.Mutex{}
done.Lock()
eventCounter := 0
go func() { go func() {
for e := range eChan { defer done.Unlock()
messages = append(messages, e) for range binChan {
eventCounter++
} }
}() }()
go func() {
system.Events(bt.conn, eChan, cancelChan, nil, nil, nil, bindings.PFalse)
}()
_, err := bt.RunTopContainer(nil, nil, nil) err = system.Events(bt.conn, binChan, nil, nil, nil, filters, bindings.PFalse)
Expect(err).To(BeNil()) Expect(err).To(BeNil())
cancelChan <- true done.Lock()
Expect(len(messages)).To(BeNumerically("==", 5)) Expect(eventCounter).To(BeNumerically(">", 0))
}) })
It("podman system prune - pod,container stopped", func() { It("podman system prune - pod,container stopped", func() {

View File

@ -136,6 +136,7 @@ var _ = Describe("Podman events", func() {
Expect(ec).To(Equal(0)) Expect(ec).To(Equal(0))
test := podmanTest.Podman([]string{"events", "--stream=false", "--format", "json"}) test := podmanTest.Podman([]string{"events", "--stream=false", "--format", "json"})
test.WaitWithDefaultTimeout() test.WaitWithDefaultTimeout()
Expect(test.ExitCode()).To(BeZero())
jsonArr := test.OutputToStringArray() jsonArr := test.OutputToStringArray()
Expect(len(jsonArr)).To(Not(BeZero())) Expect(len(jsonArr)).To(Not(BeZero()))
eventsMap := make(map[string]string) eventsMap := make(map[string]string)
@ -143,10 +144,10 @@ var _ = Describe("Podman events", func() {
Expect(err).To(BeNil()) Expect(err).To(BeNil())
_, exist := eventsMap["Status"] _, exist := eventsMap["Status"]
Expect(exist).To(BeTrue()) Expect(exist).To(BeTrue())
Expect(test.ExitCode()).To(BeZero())
test = podmanTest.Podman([]string{"events", "--stream=false", "--format", "{{json.}}"}) test = podmanTest.Podman([]string{"events", "--stream=false", "--format", "{{json.}}"})
test.WaitWithDefaultTimeout() test.WaitWithDefaultTimeout()
Expect(test.ExitCode()).To(BeZero())
jsonArr = test.OutputToStringArray() jsonArr = test.OutputToStringArray()
Expect(len(jsonArr)).To(Not(BeZero())) Expect(len(jsonArr)).To(Not(BeZero()))
eventsMap = make(map[string]string) eventsMap = make(map[string]string)
@ -154,6 +155,5 @@ var _ = Describe("Podman events", func() {
Expect(err).To(BeNil()) Expect(err).To(BeNil())
_, exist = eventsMap["Status"] _, exist = eventsMap["Status"]
Expect(exist).To(BeTrue()) Expect(exist).To(BeTrue())
Expect(test.ExitCode()).To(BeZero())
}) })
}) })