Merge pull request #7753 from vrothberg/fix-7689

remote stats
This commit is contained in:
OpenShift Merge Robot 2020-09-24 15:32:36 +00:00 committed by GitHub
commit 3957058f29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 272 additions and 84 deletions

View File

@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"os" "os"
"strings" "strings"
"sync"
"text/tabwriter" "text/tabwriter"
"text/template" "text/template"
@ -48,8 +47,18 @@ var (
} }
) )
// statsOptionsCLI is used for storing CLI arguments. Some fields are later
// used in the backend.
type statsOptionsCLI struct {
All bool
Format string
Latest bool
NoReset bool
NoStream bool
}
var ( var (
statsOptions entities.ContainerStatsOptions statsOptions statsOptionsCLI
defaultStatsRow = "{{.ID}}\t{{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}\t{{.NetIO}}\t{{.BlockIO}}\t{{.PIDS}}\n" defaultStatsRow = "{{.ID}}\t{{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}\t{{.NetIO}}\t{{.BlockIO}}\t{{.PIDS}}\n"
defaultStatsHeader = "ID\tNAME\tCPU %\tMEM USAGE / LIMIT\tMEM %\tNET IO\tBLOCK IO\tPIDS\n" defaultStatsHeader = "ID\tNAME\tCPU %\tMEM USAGE / LIMIT\tMEM %\tNET IO\tBLOCK IO\tPIDS\n"
) )
@ -107,32 +116,37 @@ func stats(cmd *cobra.Command, args []string) error {
return errors.New("stats is not supported in rootless mode without cgroups v2") return errors.New("stats is not supported in rootless mode without cgroups v2")
} }
} }
statsOptions.StatChan = make(chan []*define.ContainerStats, 1)
wg := sync.WaitGroup{} // Convert to the entities options. We should not leak CLI-only
wg.Add(1) // options into the backend and separate concerns.
go func() { opts := entities.ContainerStatsOptions{
for reports := range statsOptions.StatChan { Latest: statsOptions.Latest,
if err := outputStats(reports); err != nil { Stream: !statsOptions.NoStream,
}
statsChan, err := registry.ContainerEngine().ContainerStats(registry.Context(), args, opts)
if err != nil {
return err
}
for report := range statsChan {
if report.Error != nil {
return report.Error
}
if err := outputStats(report.Stats); err != nil {
logrus.Error(err) logrus.Error(err)
} }
} }
wg.Done() return nil
}()
err := registry.ContainerEngine().ContainerStats(registry.Context(), args, statsOptions)
wg.Wait()
return err
} }
func outputStats(reports []*define.ContainerStats) error { func outputStats(reports []define.ContainerStats) error {
if len(statsOptions.Format) < 1 && !statsOptions.NoReset { if len(statsOptions.Format) < 1 && !statsOptions.NoReset {
tm.Clear() tm.Clear()
tm.MoveCursor(1, 1) tm.MoveCursor(1, 1)
tm.Flush() tm.Flush()
} }
stats := make([]*containerStats, 0, len(reports)) stats := make([]containerStats, 0, len(reports))
for _, r := range reports { for _, r := range reports {
stats = append(stats, &containerStats{r}) stats = append(stats, containerStats{r})
} }
if statsOptions.Format == "json" { if statsOptions.Format == "json" {
return outputJSON(stats) return outputJSON(stats)
@ -163,7 +177,7 @@ func outputStats(reports []*define.ContainerStats) error {
} }
type containerStats struct { type containerStats struct {
*define.ContainerStats define.ContainerStats
} }
func (s *containerStats) ID() string { func (s *containerStats) ID() string {
@ -213,7 +227,7 @@ func combineHumanValues(a, b uint64) string {
return fmt.Sprintf("%s / %s", units.HumanSize(float64(a)), units.HumanSize(float64(b))) return fmt.Sprintf("%s / %s", units.HumanSize(float64(a)), units.HumanSize(float64(b)))
} }
func outputJSON(stats []*containerStats) error { func outputJSON(stats []containerStats) error {
type jstat struct { type jstat struct {
Id string `json:"id"` //nolint Id string `json:"id"` //nolint
Name string `json:"name"` Name string `json:"name"`

View File

@ -0,0 +1,72 @@
package libpod
import (
"encoding/json"
"net/http"
"time"
"github.com/containers/podman/v2/libpod"
"github.com/containers/podman/v2/pkg/api/handlers/utils"
"github.com/containers/podman/v2/pkg/domain/entities"
"github.com/containers/podman/v2/pkg/domain/infra/abi"
"github.com/gorilla/schema"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const DefaultStatsPeriod = 5 * time.Second
func StatsContainer(w http.ResponseWriter, r *http.Request) {
runtime := r.Context().Value("runtime").(*libpod.Runtime)
decoder := r.Context().Value("decoder").(*schema.Decoder)
query := struct {
Containers []string `schema:"containers"`
Stream bool `schema:"stream"`
}{
Stream: true,
}
if err := decoder.Decode(&query, r.URL.Query()); err != nil {
utils.Error(w, "Something went wrong.", http.StatusBadRequest, errors.Wrapf(err, "Failed to parse parameters for %s", r.URL.String()))
return
}
// Reduce code duplication and use the local/abi implementation of
// container stats.
containerEngine := abi.ContainerEngine{Libpod: runtime}
statsOptions := entities.ContainerStatsOptions{
Stream: query.Stream,
}
// Stats will stop if the connection is closed.
statsChan, err := containerEngine.ContainerStats(r.Context(), query.Containers, statsOptions)
if err != nil {
utils.InternalServerError(w, err)
return
}
// Write header and content type.
w.WriteHeader(http.StatusOK)
w.Header().Add("Content-Type", "application/json")
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
// Setup JSON encoder for streaming.
coder := json.NewEncoder(w)
coder.SetEscapeHTML(true)
for stats := range statsChan {
if err := coder.Encode(stats); err != nil {
// Note: even when streaming, the stats goroutine will
// be notified (and stop) as the connection will be
// closed.
logrus.Errorf("Unable to encode stats: %v", err)
return
}
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
}
}

View File

@ -1013,7 +1013,7 @@ func (s *APIServer) registerContainersHandlers(r *mux.Router) error {
// tags: // tags:
// - containers // - containers
// summary: Get stats for a container // summary: Get stats for a container
// description: This returns a live stream of a containers resource usage statistics. // description: DEPRECATED. This endpoint will be removed with the next major release. Please use /libpod/containers/stats instead.
// parameters: // parameters:
// - in: path // - in: path
// name: name // name: name
@ -1035,6 +1035,35 @@ func (s *APIServer) registerContainersHandlers(r *mux.Router) error {
// 500: // 500:
// $ref: "#/responses/InternalError" // $ref: "#/responses/InternalError"
r.HandleFunc(VersionedPath("/libpod/containers/{name}/stats"), s.APIHandler(compat.StatsContainer)).Methods(http.MethodGet) r.HandleFunc(VersionedPath("/libpod/containers/{name}/stats"), s.APIHandler(compat.StatsContainer)).Methods(http.MethodGet)
// swagger:operation GET /libpod/containers/stats libpod libpodStatsContainers
// ---
// tags:
// - containers
// summary: Get stats for one or more containers
// description: Return a live stream of resource usage statistics of one or more container. If no container is specified, the statistics of all containers are returned.
// parameters:
// - in: query
// name: containers
// description: names or IDs of containers
// type: array
// items:
// type: string
// - in: query
// name: stream
// type: boolean
// default: true
// description: Stream the output
// produces:
// - application/json
// responses:
// 200:
// description: no error
// 404:
// $ref: "#/responses/NoSuchContainer"
// 500:
// $ref: "#/responses/InternalError"
r.HandleFunc(VersionedPath("/libpod/containers/stats"), s.APIHandler(libpod.StatsContainer)).Methods(http.MethodGet)
// swagger:operation GET /libpod/containers/{name}/top libpod libpodTopContainer // swagger:operation GET /libpod/containers/{name}/top libpod libpodTopContainer
// --- // ---
// tags: // tags:

View File

@ -197,7 +197,56 @@ func Start(ctx context.Context, nameOrID string, detachKeys *string) error {
return response.Process(nil) return response.Process(nil)
} }
func Stats() {} func Stats(ctx context.Context, containers []string, stream *bool) (chan entities.ContainerStatsReport, error) {
conn, err := bindings.GetClient(ctx)
if err != nil {
return nil, err
}
params := url.Values{}
if stream != nil {
params.Set("stream", strconv.FormatBool(*stream))
}
for _, c := range containers {
params.Add("containers", c)
}
response, err := conn.DoRequest(nil, http.MethodGet, "/containers/stats", params, nil)
if err != nil {
return nil, err
}
statsChan := make(chan entities.ContainerStatsReport)
go func() {
defer close(statsChan)
dec := json.NewDecoder(response.Body)
doStream := true
if stream != nil {
doStream = *stream
}
streamLabel: // label to flatten the scope
select {
case <-response.Request.Context().Done():
return // lost connection - maybe the server quit
default:
// fall through and do some work
}
var report entities.ContainerStatsReport
if err := dec.Decode(&report); err != nil {
report = entities.ContainerStatsReport{Error: err}
}
statsChan <- report
if report.Error != nil || !doStream {
return
}
goto streamLabel
}()
return statsChan, nil
}
// Top gathers statistics about the running processes in a container. The nameOrID can be a container name // Top gathers statistics about the running processes in a container. The nameOrID can be a container name
// or a partial/full ID. The descriptors allow for specifying which data to collect from the process. // or a partial/full ID. The descriptors allow for specifying which data to collect from the process.

View File

@ -411,10 +411,17 @@ type ContainerCpReport struct {
// ContainerStatsOptions describes input options for getting // ContainerStatsOptions describes input options for getting
// stats on containers // stats on containers
type ContainerStatsOptions struct { type ContainerStatsOptions struct {
All bool // Operate on the latest known container. Only supported for local
Format string // clients.
Latest bool Latest bool
NoReset bool // Stream stats.
NoStream bool Stream bool
StatChan chan []*define.ContainerStats }
// ContainerStatsReport is used for streaming container stats.
type ContainerStatsReport struct {
// Error from reading stats.
Error error
// Results, set when there is no error.
Stats []define.ContainerStats
} }

View File

@ -38,7 +38,7 @@ type ContainerEngine interface {
ContainerRun(ctx context.Context, opts ContainerRunOptions) (*ContainerRunReport, error) ContainerRun(ctx context.Context, opts ContainerRunOptions) (*ContainerRunReport, error)
ContainerRunlabel(ctx context.Context, label string, image string, args []string, opts ContainerRunlabelOptions) error ContainerRunlabel(ctx context.Context, label string, image string, args []string, opts ContainerRunlabelOptions) error
ContainerStart(ctx context.Context, namesOrIds []string, options ContainerStartOptions) ([]*ContainerStartReport, error) ContainerStart(ctx context.Context, namesOrIds []string, options ContainerStartOptions) ([]*ContainerStartReport, error)
ContainerStats(ctx context.Context, namesOrIds []string, options ContainerStatsOptions) error ContainerStats(ctx context.Context, namesOrIds []string, options ContainerStatsOptions) (chan ContainerStatsReport, error)
ContainerStop(ctx context.Context, namesOrIds []string, options StopOptions) ([]*StopReport, error) ContainerStop(ctx context.Context, namesOrIds []string, options StopOptions) ([]*StopReport, error)
ContainerTop(ctx context.Context, options TopOptions) (*StringSliceReport, error) ContainerTop(ctx context.Context, options TopOptions) (*StringSliceReport, error)
ContainerUnmount(ctx context.Context, nameOrIDs []string, options ContainerUnmountOptions) ([]*ContainerUnmountReport, error) ContainerUnmount(ctx context.Context, nameOrIDs []string, options ContainerUnmountOptions) ([]*ContainerUnmountReport, error)

View File

@ -1142,12 +1142,12 @@ func (ic *ContainerEngine) Shutdown(_ context.Context) {
}) })
} }
func (ic *ContainerEngine) ContainerStats(ctx context.Context, namesOrIds []string, options entities.ContainerStatsOptions) error { func (ic *ContainerEngine) ContainerStats(ctx context.Context, namesOrIds []string, options entities.ContainerStatsOptions) (statsChan chan entities.ContainerStatsReport, err error) {
defer close(options.StatChan) statsChan = make(chan entities.ContainerStatsReport, 1)
containerFunc := ic.Libpod.GetRunningContainers containerFunc := ic.Libpod.GetRunningContainers
queryAll := false
switch { switch {
case len(namesOrIds) > 0:
containerFunc = func() ([]*libpod.Container, error) { return ic.Libpod.GetContainersByList(namesOrIds) }
case options.Latest: case options.Latest:
containerFunc = func() ([]*libpod.Container, error) { containerFunc = func() ([]*libpod.Container, error) {
lastCtr, err := ic.Libpod.GetLatestContainer() lastCtr, err := ic.Libpod.GetLatestContainer()
@ -1156,62 +1156,76 @@ func (ic *ContainerEngine) ContainerStats(ctx context.Context, namesOrIds []stri
} }
return []*libpod.Container{lastCtr}, nil return []*libpod.Container{lastCtr}, nil
} }
case options.All: case len(namesOrIds) > 0:
containerFunc = func() ([]*libpod.Container, error) { return ic.Libpod.GetContainersByList(namesOrIds) }
default:
// No containers, no latest -> query all!
queryAll = true
containerFunc = ic.Libpod.GetAllContainers containerFunc = ic.Libpod.GetAllContainers
} }
ctrs, err := containerFunc() go func() {
if err != nil { defer close(statsChan)
return errors.Wrapf(err, "unable to get list of containers") var (
err error
containers []*libpod.Container
containerStats map[string]*define.ContainerStats
)
containerStats = make(map[string]*define.ContainerStats)
stream: // label to flatten the scope
select {
case <-ctx.Done():
// client cancelled
logrus.Debugf("Container stats stopped: context cancelled")
return
default:
// just fall through and do work
} }
containerStats := map[string]*define.ContainerStats{}
for _, ctr := range ctrs { // Anonymous func to easily use the return values for streaming.
initialStats, err := ctr.GetContainerStats(&define.ContainerStats{}) computeStats := func() ([]define.ContainerStats, error) {
containers, err = containerFunc()
if err != nil {
return nil, errors.Wrapf(err, "unable to get list of containers")
}
reportStats := []define.ContainerStats{}
for _, ctr := range containers {
prev, ok := containerStats[ctr.ID()]
if !ok {
prev = &define.ContainerStats{}
}
stats, err := ctr.GetContainerStats(prev)
if err != nil { if err != nil {
// when doing "all", don't worry about containers that are not running
cause := errors.Cause(err) cause := errors.Cause(err)
if options.All && (cause == define.ErrCtrRemoved || cause == define.ErrNoSuchCtr || cause == define.ErrCtrStateInvalid) { if queryAll && (cause == define.ErrCtrRemoved || cause == define.ErrNoSuchCtr || cause == define.ErrCtrStateInvalid) {
continue continue
} }
if cause == cgroups.ErrCgroupV1Rootless { if cause == cgroups.ErrCgroupV1Rootless {
err = cause err = cause
} }
return err return nil, err
} }
containerStats[ctr.ID()] = initialStats
containerStats[ctr.ID()] = stats
reportStats = append(reportStats, *stats)
} }
for { return reportStats, nil
reportStats := []*define.ContainerStats{}
for _, ctr := range ctrs {
id := ctr.ID()
if _, ok := containerStats[ctr.ID()]; !ok {
initialStats, err := ctr.GetContainerStats(&define.ContainerStats{})
if errors.Cause(err) == define.ErrCtrRemoved || errors.Cause(err) == define.ErrNoSuchCtr || errors.Cause(err) == define.ErrCtrStateInvalid {
// skip dealing with a container that is gone
continue
} }
if err != nil {
return err report := entities.ContainerStatsReport{}
} report.Stats, report.Error = computeStats()
containerStats[id] = initialStats statsChan <- report
}
stats, err := ctr.GetContainerStats(containerStats[id]) if report.Error != nil || !options.Stream {
if err != nil && errors.Cause(err) != define.ErrNoSuchCtr { return
return err
}
// replace the previous measurement with the current one
containerStats[id] = stats
reportStats = append(reportStats, stats)
}
ctrs, err = containerFunc()
if err != nil {
return err
}
options.StatChan <- reportStats
if options.NoStream {
break
} }
time.Sleep(time.Second) time.Sleep(time.Second)
} goto stream
return nil }()
return statsChan, nil
} }

View File

@ -722,6 +722,9 @@ func (ic *ContainerEngine) ContainerCp(ctx context.Context, source, dest string,
func (ic *ContainerEngine) Shutdown(_ context.Context) { func (ic *ContainerEngine) Shutdown(_ context.Context) {
} }
func (ic *ContainerEngine) ContainerStats(ctx context.Context, namesOrIds []string, options entities.ContainerStatsOptions) error { func (ic *ContainerEngine) ContainerStats(ctx context.Context, namesOrIds []string, options entities.ContainerStatsOptions) (statsChan chan entities.ContainerStatsReport, err error) {
return errors.New("not implemented") if options.Latest {
return nil, errors.New("latest is not supported for the remote client")
}
return containers.Stats(ic.ClientCxt, namesOrIds, &options.Stream)
} }

View File

@ -1,4 +1,4 @@
// +build !remote // +build
package integration package integration