docs/api/client/stats.go

383 lines
10 KiB
Go

package client
import (
"encoding/json"
"fmt"
"io"
"strings"
"sync"
"text/tabwriter"
"time"
"golang.org/x/net/context"
Cli "github.com/docker/docker/cli"
"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/events"
"github.com/docker/engine-api/types/filters"
"github.com/docker/go-units"
)
type containerStats struct {
Name string
CPUPercentage float64
Memory float64
MemoryLimit float64
MemoryPercentage float64
NetworkRx float64
NetworkTx float64
BlockRead float64
BlockWrite float64
mu sync.RWMutex
err error
}
type stats struct {
mu sync.Mutex
cs []*containerStats
}
func (s *stats) isKnownContainer(cid string) bool {
for _, c := range s.cs {
if c.Name == cid {
return true
}
}
return false
}
func (s *containerStats) Collect(cli *DockerCli, streamStats bool) {
responseBody, err := cli.client.ContainerStats(context.Background(), s.Name, streamStats)
if err != nil {
s.mu.Lock()
s.err = err
s.mu.Unlock()
return
}
defer responseBody.Close()
var (
previousCPU uint64
previousSystem uint64
dec = json.NewDecoder(responseBody)
u = make(chan error, 1)
)
go func() {
for {
var v *types.StatsJSON
if err := dec.Decode(&v); err != nil {
u <- err
return
}
var memPercent = 0.0
var cpuPercent = 0.0
// MemoryStats.Limit will never be 0 unless the container is not running and we haven't
// got any data from cgroup
if v.MemoryStats.Limit != 0 {
memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0
}
previousCPU = v.PreCPUStats.CPUUsage.TotalUsage
previousSystem = v.PreCPUStats.SystemUsage
cpuPercent = calculateCPUPercent(previousCPU, previousSystem, v)
blkRead, blkWrite := calculateBlockIO(v.BlkioStats)
s.mu.Lock()
s.CPUPercentage = cpuPercent
s.Memory = float64(v.MemoryStats.Usage)
s.MemoryLimit = float64(v.MemoryStats.Limit)
s.MemoryPercentage = memPercent
s.NetworkRx, s.NetworkTx = calculateNetwork(v.Networks)
s.BlockRead = float64(blkRead)
s.BlockWrite = float64(blkWrite)
s.mu.Unlock()
u <- nil
if !streamStats {
return
}
}
}()
for {
select {
case <-time.After(2 * time.Second):
// zero out the values if we have not received an update within
// the specified duration.
s.mu.Lock()
s.CPUPercentage = 0
s.Memory = 0
s.MemoryPercentage = 0
s.MemoryLimit = 0
s.NetworkRx = 0
s.NetworkTx = 0
s.BlockRead = 0
s.BlockWrite = 0
s.mu.Unlock()
case err := <-u:
if err != nil {
s.mu.Lock()
s.err = err
s.mu.Unlock()
return
}
}
if !streamStats {
return
}
}
}
func (s *containerStats) Display(w io.Writer) error {
s.mu.RLock()
defer s.mu.RUnlock()
if s.err != nil {
return s.err
}
fmt.Fprintf(w, "%s\t%.2f%%\t%s / %s\t%.2f%%\t%s / %s\t%s / %s\n",
s.Name,
s.CPUPercentage,
units.HumanSize(s.Memory), units.HumanSize(s.MemoryLimit),
s.MemoryPercentage,
units.HumanSize(s.NetworkRx), units.HumanSize(s.NetworkTx),
units.HumanSize(s.BlockRead), units.HumanSize(s.BlockWrite))
return nil
}
// CmdStats displays a live stream of resource usage statistics for one or more containers.
//
// This shows real-time information on CPU usage, memory usage, and network I/O.
//
// Usage: docker stats [OPTIONS] [CONTAINER...]
func (cli *DockerCli) CmdStats(args ...string) error {
cmd := Cli.Subcmd("stats", []string{"[CONTAINER...]"}, Cli.DockerCommands["stats"].Description, true)
all := cmd.Bool([]string{"a", "-all"}, false, "Show all containers (default shows just running)")
noStream := cmd.Bool([]string{"-no-stream"}, false, "Disable streaming stats and only pull the first result")
cmd.ParseFlags(args, true)
names := cmd.Args()
showAll := len(names) == 0
// The containerChan is the central synchronization piece for this function,
// and all messages to either add or remove an element to the list of
// monitored containers go through this.
//
// - When watching all containers, a goroutine subscribes to the events
// API endpoint and messages this channel accordingly.
// - When watching a particular subset of containers, we feed the
// requested list of containers to this channel.
// - For both codepaths, a goroutine is responsible for watching this
// channel and subscribing to the stats API for containers.
type containerEvent struct {
id string
event string
err error
}
containerChan := make(chan containerEvent)
// monitorContainerEvents watches for container creation and removal (only
// used when calling `docker stats` without arguments).
monitorContainerEvents := func(started chan<- struct{}, c chan<- containerEvent) {
f := filters.NewArgs()
f.Add("type", "container")
options := types.EventsOptions{
Filters: f,
}
resBody, err := cli.client.Events(context.Background(), options)
// Whether we successfully subscribed to events or not, we can now
// unblock the main goroutine.
close(started)
if err != nil {
c <- containerEvent{err: err}
return
}
defer resBody.Close()
decodeEvents(resBody, func(event events.Message, err error) error {
if err != nil {
c <- containerEvent{"", "", err}
} else {
c <- containerEvent{event.ID[:12], event.Action, err}
}
return nil
})
}
// getContainerList simulates creation event for all previously existing
// containers (only used when calling `docker stats` without arguments).
getContainerList := func(c chan<- containerEvent) {
options := types.ContainerListOptions{
All: *all,
}
cs, err := cli.client.ContainerList(options)
if err != nil {
containerChan <- containerEvent{"", "", err}
}
for _, c := range cs {
containerChan <- containerEvent{c.ID[:12], "create", nil}
}
}
// Monitor the containerChan and start collection for each container.
cStats := stats{}
closeChan := make(chan error)
go func(stopChan chan<- error, c <-chan containerEvent) {
for {
event := <-c
if event.err != nil {
stopChan <- event.err
return
}
switch event.event {
case "create":
cStats.mu.Lock()
if !cStats.isKnownContainer(event.id) {
s := &containerStats{Name: event.id}
cStats.cs = append(cStats.cs, s)
go s.Collect(cli, !*noStream)
}
cStats.mu.Unlock()
case "stop":
case "die":
if !*all {
var remove int
// cStats cannot be O(1) with a map cause ranging over it would cause
// containers in stats to move up and down in the list...:(
cStats.mu.Lock()
for i, s := range cStats.cs {
if s.Name == event.id {
remove = i
break
}
}
cStats.cs = append(cStats.cs[:remove], cStats.cs[remove+1:]...)
cStats.mu.Unlock()
}
}
}
}(closeChan, containerChan)
if showAll {
// If no names were specified, start a long running goroutine which
// monitors container events. We make sure we're subscribed before
// retrieving the list of running containers to avoid a race where we
// would "miss" a creation.
started := make(chan struct{})
go monitorContainerEvents(started, containerChan)
<-started
// Start a short-lived goroutine to retrieve the initial list of
// containers.
go getContainerList(containerChan)
} else {
// Artificially send creation events for the containers we were asked to
// monitor (same code path than we use when monitoring all containers).
for _, name := range names {
containerChan <- containerEvent{name, "create", nil}
}
// We don't expect any asynchronous errors: closeChan can be closed.
close(closeChan)
// Do a quick pause to detect any error with the provided list of
// container names.
time.Sleep(1500 * time.Millisecond)
var errs []string
cStats.mu.Lock()
for _, c := range cStats.cs {
c.mu.Lock()
if c.err != nil {
errs = append(errs, fmt.Sprintf("%s: %v", c.Name, c.err))
}
c.mu.Unlock()
}
cStats.mu.Unlock()
if len(errs) > 0 {
return fmt.Errorf("%s", strings.Join(errs, ", "))
}
}
w := tabwriter.NewWriter(cli.out, 20, 1, 3, ' ', 0)
printHeader := func() {
if !*noStream {
fmt.Fprint(cli.out, "\033[2J")
fmt.Fprint(cli.out, "\033[H")
}
io.WriteString(w, "CONTAINER\tCPU %\tMEM USAGE / LIMIT\tMEM %\tNET I/O\tBLOCK I/O\n")
}
for range time.Tick(500 * time.Millisecond) {
printHeader()
toRemove := []int{}
cStats.mu.Lock()
for i, s := range cStats.cs {
if err := s.Display(w); err != nil && !*noStream {
toRemove = append(toRemove, i)
}
}
for j := len(toRemove) - 1; j >= 0; j-- {
i := toRemove[j]
cStats.cs = append(cStats.cs[:i], cStats.cs[i+1:]...)
}
if len(cStats.cs) == 0 && !showAll {
return nil
}
cStats.mu.Unlock()
w.Flush()
if *noStream {
break
}
select {
case err, ok := <-closeChan:
if ok {
if err != nil {
// this is suppressing "unexpected EOF" in the cli when the
// daemon restarts so it shutdowns cleanly
if err == io.ErrUnexpectedEOF {
return nil
}
return err
}
}
default:
// just skip
}
}
return nil
}
func calculateCPUPercent(previousCPU, previousSystem uint64, v *types.StatsJSON) float64 {
var (
cpuPercent = 0.0
// calculate the change for the cpu usage of the container in between readings
cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU)
// calculate the change for the entire system between readings
systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem)
)
if systemDelta > 0.0 && cpuDelta > 0.0 {
cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100.0
}
return cpuPercent
}
func calculateBlockIO(blkio types.BlkioStats) (blkRead uint64, blkWrite uint64) {
for _, bioEntry := range blkio.IoServiceBytesRecursive {
switch strings.ToLower(bioEntry.Op) {
case "read":
blkRead = blkRead + bioEntry.Value
case "write":
blkWrite = blkWrite + bioEntry.Value
}
}
return
}
func calculateNetwork(network map[string]types.NetworkStats) (float64, float64) {
var rx, tx float64
for _, v := range network {
rx += float64(v.RxBytes)
tx += float64(v.TxBytes)
}
return rx, tx
}