stats refactor

Refactor the entities' stats API to simplify using it and reduce the
risk of running into concurrency issues at the call sites.  Further
simplify the stats code by de-spaghetti-ing the logic and reducing
duplicate code.

`ContainerStats` now returns a data channel and an error.  If the error
is nil, callers can read from the channel.

Signed-off-by: Valentin Rothberg <rothberg@redhat.com>
This commit is contained in:
Valentin Rothberg 2020-09-23 13:32:58 +02:00
parent 5cedd830f7
commit 376ba349bf
5 changed files with 87 additions and 73 deletions

View File

@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"os" "os"
"strings" "strings"
"sync"
"text/tabwriter" "text/tabwriter"
"text/template" "text/template"
@ -107,32 +106,31 @@ func stats(cmd *cobra.Command, args []string) error {
return errors.New("stats is not supported in rootless mode without cgroups v2") return errors.New("stats is not supported in rootless mode without cgroups v2")
} }
} }
statsOptions.StatChan = make(chan []*define.ContainerStats, 1)
wg := sync.WaitGroup{} statsChan, err := registry.ContainerEngine().ContainerStats(registry.Context(), args, statsOptions)
wg.Add(1) if err != nil {
go func() { return err
for reports := range statsOptions.StatChan { }
if err := outputStats(reports); err != nil { for report := range statsChan {
if report.Error != nil {
return report.Error
}
if err := outputStats(report.Stats); err != nil {
logrus.Error(err) logrus.Error(err)
} }
} }
wg.Done() return nil
}()
err := registry.ContainerEngine().ContainerStats(registry.Context(), args, statsOptions)
wg.Wait()
return err
} }
func outputStats(reports []*define.ContainerStats) error { func outputStats(reports []define.ContainerStats) error {
if len(statsOptions.Format) < 1 && !statsOptions.NoReset { if len(statsOptions.Format) < 1 && !statsOptions.NoReset {
tm.Clear() tm.Clear()
tm.MoveCursor(1, 1) tm.MoveCursor(1, 1)
tm.Flush() tm.Flush()
} }
stats := make([]*containerStats, 0, len(reports)) stats := make([]containerStats, 0, len(reports))
for _, r := range reports { for _, r := range reports {
stats = append(stats, &containerStats{r}) stats = append(stats, containerStats{r})
} }
if statsOptions.Format == "json" { if statsOptions.Format == "json" {
return outputJSON(stats) return outputJSON(stats)
@ -163,7 +161,7 @@ func outputStats(reports []*define.ContainerStats) error {
} }
type containerStats struct { type containerStats struct {
*define.ContainerStats define.ContainerStats
} }
func (s *containerStats) ID() string { func (s *containerStats) ID() string {
@ -213,7 +211,7 @@ func combineHumanValues(a, b uint64) string {
return fmt.Sprintf("%s / %s", units.HumanSize(float64(a)), units.HumanSize(float64(b))) return fmt.Sprintf("%s / %s", units.HumanSize(float64(a)), units.HumanSize(float64(b)))
} }
func outputJSON(stats []*containerStats) error { func outputJSON(stats []containerStats) error {
type jstat struct { type jstat struct {
Id string `json:"id"` //nolint Id string `json:"id"` //nolint
Name string `json:"name"` Name string `json:"name"`

View File

@ -416,5 +416,10 @@ type ContainerStatsOptions struct {
Latest bool Latest bool
NoReset bool NoReset bool
NoStream bool NoStream bool
StatChan chan []*define.ContainerStats }
// ContainerStatsReport is used for streaming container stats.
type ContainerStatsReport struct {
Error error
Stats []define.ContainerStats
} }

View File

@ -38,7 +38,7 @@ type ContainerEngine interface {
ContainerRun(ctx context.Context, opts ContainerRunOptions) (*ContainerRunReport, error) ContainerRun(ctx context.Context, opts ContainerRunOptions) (*ContainerRunReport, error)
ContainerRunlabel(ctx context.Context, label string, image string, args []string, opts ContainerRunlabelOptions) error ContainerRunlabel(ctx context.Context, label string, image string, args []string, opts ContainerRunlabelOptions) error
ContainerStart(ctx context.Context, namesOrIds []string, options ContainerStartOptions) ([]*ContainerStartReport, error) ContainerStart(ctx context.Context, namesOrIds []string, options ContainerStartOptions) ([]*ContainerStartReport, error)
ContainerStats(ctx context.Context, namesOrIds []string, options ContainerStatsOptions) error ContainerStats(ctx context.Context, namesOrIds []string, options ContainerStatsOptions) (chan ContainerStatsReport, error)
ContainerStop(ctx context.Context, namesOrIds []string, options StopOptions) ([]*StopReport, error) ContainerStop(ctx context.Context, namesOrIds []string, options StopOptions) ([]*StopReport, error)
ContainerTop(ctx context.Context, options TopOptions) (*StringSliceReport, error) ContainerTop(ctx context.Context, options TopOptions) (*StringSliceReport, error)
ContainerUnmount(ctx context.Context, nameOrIDs []string, options ContainerUnmountOptions) ([]*ContainerUnmountReport, error) ContainerUnmount(ctx context.Context, nameOrIDs []string, options ContainerUnmountOptions) ([]*ContainerUnmountReport, error)

View File

@ -1142,12 +1142,11 @@ func (ic *ContainerEngine) Shutdown(_ context.Context) {
}) })
} }
func (ic *ContainerEngine) ContainerStats(ctx context.Context, namesOrIds []string, options entities.ContainerStatsOptions) error { func (ic *ContainerEngine) ContainerStats(ctx context.Context, namesOrIds []string, options entities.ContainerStatsOptions) (statsChan chan entities.ContainerStatsReport, err error) {
defer close(options.StatChan) statsChan = make(chan entities.ContainerStatsReport, 1)
containerFunc := ic.Libpod.GetRunningContainers containerFunc := ic.Libpod.GetRunningContainers
switch { switch {
case len(namesOrIds) > 0:
containerFunc = func() ([]*libpod.Container, error) { return ic.Libpod.GetContainersByList(namesOrIds) }
case options.Latest: case options.Latest:
containerFunc = func() ([]*libpod.Container, error) { containerFunc = func() ([]*libpod.Container, error) {
lastCtr, err := ic.Libpod.GetLatestContainer() lastCtr, err := ic.Libpod.GetLatestContainer()
@ -1156,19 +1155,47 @@ func (ic *ContainerEngine) ContainerStats(ctx context.Context, namesOrIds []stri
} }
return []*libpod.Container{lastCtr}, nil return []*libpod.Container{lastCtr}, nil
} }
case len(namesOrIds) > 0:
containerFunc = func() ([]*libpod.Container, error) { return ic.Libpod.GetContainersByList(namesOrIds) }
case options.All: case options.All:
containerFunc = ic.Libpod.GetAllContainers containerFunc = ic.Libpod.GetAllContainers
} }
ctrs, err := containerFunc() go func() {
if err != nil { defer close(statsChan)
return errors.Wrapf(err, "unable to get list of containers") var (
err error
containers []*libpod.Container
containerStats map[string]*define.ContainerStats
)
containerStats = make(map[string]*define.ContainerStats)
stream: // label to flatten the scope
select {
case <-ctx.Done():
// client cancelled
logrus.Debugf("Container stats stopped: context cancelled")
return
default:
// just fall through and do work
} }
containerStats := map[string]*define.ContainerStats{}
for _, ctr := range ctrs { // Anonymous func to easily use the return values for streaming.
initialStats, err := ctr.GetContainerStats(&define.ContainerStats{}) computeStats := func() ([]define.ContainerStats, error) {
containers, err = containerFunc()
if err != nil {
return nil, errors.Wrapf(err, "unable to get list of containers")
}
reportStats := []define.ContainerStats{}
for _, ctr := range containers {
prev, ok := containerStats[ctr.ID()]
if !ok {
prev = &define.ContainerStats{}
}
stats, err := ctr.GetContainerStats(prev)
if err != nil { if err != nil {
// when doing "all", don't worry about containers that are not running
cause := errors.Cause(err) cause := errors.Cause(err)
if options.All && (cause == define.ErrCtrRemoved || cause == define.ErrNoSuchCtr || cause == define.ErrCtrStateInvalid) { if options.All && (cause == define.ErrCtrRemoved || cause == define.ErrNoSuchCtr || cause == define.ErrCtrStateInvalid) {
continue continue
@ -1176,42 +1203,26 @@ func (ic *ContainerEngine) ContainerStats(ctx context.Context, namesOrIds []stri
if cause == cgroups.ErrCgroupV1Rootless { if cause == cgroups.ErrCgroupV1Rootless {
err = cause err = cause
} }
return err return nil, err
} }
containerStats[ctr.ID()] = initialStats
containerStats[ctr.ID()] = stats
reportStats = append(reportStats, *stats)
} }
for { return reportStats, nil
reportStats := []*define.ContainerStats{}
for _, ctr := range ctrs {
id := ctr.ID()
if _, ok := containerStats[ctr.ID()]; !ok {
initialStats, err := ctr.GetContainerStats(&define.ContainerStats{})
if errors.Cause(err) == define.ErrCtrRemoved || errors.Cause(err) == define.ErrNoSuchCtr || errors.Cause(err) == define.ErrCtrStateInvalid {
// skip dealing with a container that is gone
continue
} }
if err != nil {
return err report := entities.ContainerStatsReport{}
} report.Stats, report.Error = computeStats()
containerStats[id] = initialStats statsChan <- report
}
stats, err := ctr.GetContainerStats(containerStats[id])
if err != nil && errors.Cause(err) != define.ErrNoSuchCtr {
return err
}
// replace the previous measurement with the current one
containerStats[id] = stats
reportStats = append(reportStats, stats)
}
ctrs, err = containerFunc()
if err != nil {
return err
}
options.StatChan <- reportStats
if options.NoStream { if options.NoStream {
break return
} }
time.Sleep(time.Second) time.Sleep(time.Second)
} goto stream
return nil }()
return statsChan, nil
} }

View File

@ -682,6 +682,6 @@ func (ic *ContainerEngine) ContainerCp(ctx context.Context, source, dest string,
func (ic *ContainerEngine) Shutdown(_ context.Context) { func (ic *ContainerEngine) Shutdown(_ context.Context) {
} }
func (ic *ContainerEngine) ContainerStats(ctx context.Context, namesOrIds []string, options entities.ContainerStatsOptions) error { func (ic *ContainerEngine) ContainerStats(ctx context.Context, namesOrIds []string, options entities.ContainerStatsOptions) (statsChan chan entities.ContainerStatsReport, err error) {
return errors.New("not implemented") return nil, errors.New("not implemented")
} }