vendor: update 'psn' for 'TopStream'

This commit is contained in:
Gyu-Ho Lee 2017-02-04 02:31:14 -08:00
parent 53510c2965
commit d6083d853c
No known key found for this signature in database
GPG Key ID: 1DDD39C7EB70C24C
8 changed files with 620 additions and 280 deletions

View File

@ -21,6 +21,7 @@ type EntryFilter struct {
// for ps
TopCommandPath string
TopStream *TopStream
// for Proc
DiskDevice string
@ -83,6 +84,11 @@ func WithTopCommandPath(path string) FilterFunc {
return func(ft *EntryFilter) { ft.TopCommandPath = path }
}
// WithTopStream gets the PSEntry from the 'top' stream.
func WithTopStream(str *TopStream) FilterFunc {
return func(ft *EntryFilter) { ft.TopStream = str }
}
// WithDiskDevice to filter entries by disk device.
func WithDiskDevice(name string) FilterFunc {
return func(ft *EntryFilter) { ft.DiskDevice = name }

View File

@ -71,27 +71,32 @@ func GetPS(opts ...FilterFunc) (pss []PSEntry, err error) {
ft.ProgramMatchFunc = func(string) bool { return true }
}
var topRows []TopCommandRow
if len(pids) == 1 {
topRows, err = GetTop(ft.TopCommandPath, pids[0])
if err != nil {
return
var topM map[int64]TopCommandRow
if ft.TopStream == nil {
var topRows []TopCommandRow
if len(pids) == 1 {
topRows, err = GetTop(ft.TopCommandPath, pids[0])
if err != nil {
return
}
} else {
topRows, err = GetTop(ft.TopCommandPath, 0)
if err != nil {
return
}
}
topM = make(map[int64]TopCommandRow, len(topRows))
for _, row := range topRows {
topM[row.PID] = row
}
for _, pid := range pids {
if _, ok := topM[pid]; !ok {
topM[pid] = TopCommandRow{PID: pid}
log.Printf("PID %d is not found at 'top' command output", pid)
}
}
} else {
topRows, err = GetTop(ft.TopCommandPath, 0)
if err != nil {
return
}
}
topM := make(map[int64]TopCommandRow, len(topRows))
for _, row := range topRows {
topM[row.PID] = row
}
for _, pid := range pids {
if _, ok := topM[pid]; !ok {
topM[pid] = TopCommandRow{PID: pid}
log.Printf("PID %d is not found at 'top' command output", pid)
}
topM = ft.TopStream.Latest()
}
var pmu sync.RWMutex

View File

@ -36,6 +36,8 @@ type Proc struct {
Extra []byte
}
// ProcSlice is a slice of 'Proc' and implements
// the sort.Sort interface in unix nano/second ascending order.
type ProcSlice []Proc
func (p ProcSlice) Len() int { return len(p) }
@ -60,10 +62,13 @@ func GetProc(opts ...FilterFunc) (Proc, error) {
ts := time.Now().UnixNano()
proc := Proc{UnixNanosecond: ts, UnixSecond: ConvertUnixNano(ts)}
toFinish := 0
errc := make(chan error)
toFinish++
go func() {
// get process stats
ets, err := GetPS(WithPID(ft.PID))
ets, err := GetPS(WithPID(ft.PID), WithTopStream(ft.TopStream))
if err != nil {
errc <- err
return
@ -76,6 +81,7 @@ func GetProc(opts ...FilterFunc) (Proc, error) {
errc <- nil
}()
toFinish++
go func() {
lvg, err := GetProcLoadAvg()
if err != nil {
@ -87,6 +93,7 @@ func GetProc(opts ...FilterFunc) (Proc, error) {
}()
if ft.DiskDevice != "" {
toFinish++
go func() {
// get diskstats
ds, err := GetDS()
@ -105,6 +112,7 @@ func GetProc(opts ...FilterFunc) (Proc, error) {
}
if ft.NetworkInterface != "" {
toFinish++
go func() {
// get network I/O stats
ns, err := GetNS()
@ -123,6 +131,7 @@ func GetProc(opts ...FilterFunc) (Proc, error) {
}
if ft.ExtraPath != "" {
toFinish++
go func() {
f, err := openToRead(ft.ExtraPath)
if err != nil {
@ -140,7 +149,7 @@ func GetProc(opts ...FilterFunc) (Proc, error) {
}
cnt := 0
for cnt != len(opts)+1 { // include load avg query
for cnt != toFinish { // include load avg query
err := <-errc
if err != nil {
return Proc{}, err
@ -150,7 +159,7 @@ func GetProc(opts ...FilterFunc) (Proc, error) {
if ft.DiskDevice != "" {
if proc.DSEntry.Device == "" {
return Proc{}, fmt.Errorf("disk device %q was not found", ft.DiskDevice)
return Proc{}, fmt.Errorf("disk device %q was not found (%+v)", ft.DiskDevice, proc.DSEntry)
}
}
if ft.NetworkInterface != "" {

View File

@ -3,6 +3,7 @@ package psn
import (
"encoding/csv"
"fmt"
"log"
"strconv"
humanize "github.com/dustin/go-humanize"
@ -26,6 +27,11 @@ type CSV struct {
// ExtraPath contains extra information.
ExtraPath string
// TopStream feeds realtime 'top' command data in the background, every second.
// And whenver 'Add' gets called, returns the latest 'top' data.
// Use this to provide more accurate CPU usage.
TopStream *TopStream
// Rows are sorted by unix time in nanoseconds.
// It's the number of nanoseconds (not seconds) elapsed
// since January 1, 1970 UTC.
@ -33,8 +39,8 @@ type CSV struct {
}
// NewCSV returns a new CSV.
func NewCSV(fpath string, pid int64, diskDevice string, networkInterface string, extraPath string) *CSV {
return &CSV{
func NewCSV(fpath string, pid int64, diskDevice string, networkInterface string, extraPath string, tcfg *TopConfig) (c *CSV, err error) {
c = &CSV{
FilePath: fpath,
PID: pid,
DiskDevice: diskDevice,
@ -51,6 +57,10 @@ func NewCSV(fpath string, pid int64, diskDevice string, networkInterface string,
ExtraPath: extraPath,
Rows: []Proc{},
}
if tcfg != nil {
c.TopStream, err = tcfg.StartStream()
}
return
}
// Add is called periodically to append a new entry to CSV; it only appends.
@ -62,6 +72,7 @@ func (c *CSV) Add() error {
WithDiskDevice(c.DiskDevice),
WithNetworkInterface(c.NetworkInterface),
WithExtraPath(c.ExtraPath),
WithTopStream(c.TopStream),
)
if err != nil {
return err
@ -106,6 +117,18 @@ func (c *CSV) Add() error {
// Save saves CSV to disk.
func (c *CSV) Save() error {
if c.TopStream != nil {
if err := c.TopStream.Stop(); err != nil {
log.Println(err)
}
select {
case err := <-c.TopStream.ErrChan():
log.Println(err)
default:
log.Println("TopStream has stopped")
}
}
f, err := openToAppend(c.FilePath)
if err != nil {
return err

332
vendor/github.com/gyuho/psn/top.go generated vendored
View File

@ -5,273 +5,103 @@ import (
"fmt"
"io"
"os/exec"
"reflect"
"strconv"
"strings"
humanize "github.com/dustin/go-humanize"
)
// GetTop returns all entries in 'top' command.
// If pid<1, it reads all processes in 'top' command.
func GetTop(topPath string, pid int64) ([]TopCommandRow, error) {
o, err := ReadTop(topPath, pid)
if err != nil {
return nil, err
}
return ParseTopOutput(o)
}
// GetTopDefault returns all entries in 'top' command.
// If pid<1, it reads all processes in 'top' command.
func GetTopDefault(pid int64) ([]TopCommandRow, error) {
o, err := ReadTop(DefaultTopPath, pid)
if err != nil {
return nil, err
}
return ParseTopOutput(o)
}
// DefaultTopPath is the default 'top' command path.
var DefaultTopPath = "/usr/bin/top"
// ReadTopDefault reads Linux 'top' command output.
func ReadTopDefault(pid int64) (string, error) {
return ReadTop(DefaultTopPath, pid)
// TopConfig configures 'top' command runs.
type TopConfig struct {
Exec string
// MAKE THIS TRUE BY DEFAULT
// OTHERWISE PARSER HAS TO DEAL WITH HIGHLIGHTED TEXTS
//
// BatchMode is true to start 'top' in batch mode, which could be useful
// for sending output from 'top' to other programs or to a file.
// In this mode, 'top' will not accept input and runs until the interations
// limit ('-n' flag) or until killed.
// It's '-b' flag.
// BatchMode bool
// Limit limits the iteration of 'top' commands to run before exit.
// If 1, 'top' prints out the current processes and exits.
// It's '-n' flag.
Limit int
// IntervalSecond is the delay time between updates.
// Default is 1 second.
// It's '-d' flag.
IntervalSecond float64
// PID specifies the PID to monitor.
// It's '-p' flag.
PID int64
// Writer stores 'top' command outputs.
Writer io.Writer
cmd *exec.Cmd
}
// ReadTop reads Linux 'top' command output.
func ReadTop(topPath string, pid int64) (string, error) {
buf := new(bytes.Buffer)
err := readTop(topPath, pid, buf)
o := strings.TrimSpace(buf.String())
return o, err
}
// Flags returns the 'top' command flags.
func (cfg *TopConfig) Flags() (fs []string) {
// batch mode by default
fs = append(fs, "-b")
func readTop(topPath string, pid int64, w io.Writer) error {
if !exist(topPath) {
return fmt.Errorf("%q does not exist", topPath)
}
topFlags := []string{"-b", "-n", "1"}
if pid > 0 {
topFlags = append(topFlags, "-p", fmt.Sprint(pid))
}
cmd := exec.Command(topPath, topFlags...)
cmd.Stdout = w
cmd.Stderr = w
return cmd.Run()
}
func convertProcStatus(s string) string {
ns := strings.TrimSpace(s)
if len(s) > 1 {
ns = ns[:1]
}
switch ns {
case "D":
return "D (uninterruptible sleep)"
case "R":
return "R (running)"
case "S":
return "S (sleeping)"
case "T":
return "T (stopped by job control signal)"
case "t":
return "t (stopped by debugger during trace)"
case "Z":
return "Z (zombie)"
default:
return fmt.Sprintf("unknown process %q", s)
}
}
// parses KiB strings, returns bytes in int64, and humanized bytes.
//
// KiB = kibibyte = 1024 bytes
// MiB = mebibyte = 1024 KiB = 1,048,576 bytes
// GiB = gibibyte = 1024 MiB = 1,073,741,824 bytes
// TiB = tebibyte = 1024 GiB = 1,099,511,627,776 bytes
// PiB = pebibyte = 1024 TiB = 1,125,899,906,842,624 bytes
// EiB = exbibyte = 1024 PiB = 1,152,921,504,606,846,976 bytes
//
func parseKiBInTop(s string) (bts uint64, hs string, err error) {
s = strings.TrimSpace(s)
switch {
// suffix 'm' means megabytes
case strings.HasSuffix(s, "m"):
ns := s[:len(s)-1]
var mib float64
mib, err = strconv.ParseFloat(ns, 64)
if err != nil {
return 0, "", err
}
bts = uint64(mib) * 1024 * 1024
// suffix 'g' means gigabytes
case strings.HasSuffix(s, "g"):
ns := s[:len(s)-1]
var gib float64
gib, err = strconv.ParseFloat(ns, 64)
if err != nil {
return 0, "", err
}
bts = uint64(gib) * 1024 * 1024 * 1024
default:
var kib float64
kib, err = strconv.ParseFloat(s, 64)
if err != nil {
return 0, "", err
}
bts = uint64(kib) * 1024
if cfg.Limit > 0 { // if 1, command just exists after one output
fs = append(fs, "-n", fmt.Sprintf("%d", cfg.Limit))
}
if cfg.IntervalSecond > 0 {
fs = append(fs, "-d", fmt.Sprintf("%.2f", cfg.IntervalSecond))
}
if cfg.PID > 0 {
fs = append(fs, "-p", fmt.Sprintf("%d", cfg.PID))
}
hs = humanize.Bytes(bts)
return
}
// TopRowHeaders is the headers in 'top' output.
var TopRowHeaders = []string{
"PID",
"USER",
"PR",
"NI",
"VIRT",
"RES",
"SHR",
"S",
"%CPU",
"%MEM",
"TIME+",
"COMMAND",
// process updates with '*exec.Cmd' for the given 'TopConfig'.
func (cfg *TopConfig) createCmd() error {
if cfg == nil {
return fmt.Errorf("TopConfig is nil")
}
if !exist(cfg.Exec) {
return fmt.Errorf("%q does not exist", cfg.Exec)
}
flags := cfg.Flags()
c := exec.Command(cfg.Exec, flags...)
c.Stdout = cfg.Writer
c.Stderr = cfg.Writer
cfg.cmd = c
return nil
}
type topCommandOutputRowIdx int
const (
top_command_output_row_idx_pid topCommandOutputRowIdx = iota
top_command_output_row_idx_user
top_command_output_row_idx_pr
top_command_output_row_idx_ni
top_command_output_row_idx_virt
top_command_output_row_idx_res
top_command_output_row_idx_shr
top_command_output_row_idx_s
top_command_output_row_idx_cpu
top_command_output_row_idx_mem
top_command_output_row_idx_time
top_command_output_row_idx_command
)
// ParseTopOutput parses 'top' command output and returns the rows.
func ParseTopOutput(s string) ([]TopCommandRow, error) {
lines := strings.Split(s, "\n")
rows := make([][]string, 0, len(lines))
headerFound := false
for _, line := range lines {
if len(line) == 0 {
continue
}
ds := strings.Fields(strings.TrimSpace(line))
if ds[0] == "PID" { // header line
if !reflect.DeepEqual(ds, TopRowHeaders) {
return nil, fmt.Errorf("unexpected 'top' command header order (%v, expected %v)", ds, TopRowHeaders)
}
headerFound = true
continue
}
if !headerFound {
continue
}
row := strings.Fields(strings.TrimSpace(line))
if len(row) != len(TopRowHeaders) {
return nil, fmt.Errorf("unexpected row column number %v (expected %v)", row, TopRowHeaders)
}
rows = append(rows, row)
// GetTop returns all entries in 'top' command.
// If pid<1, it reads all processes in 'top' command.
// This is one-time command.
func GetTop(topPath string, pid int64) ([]TopCommandRow, error) {
buf := new(bytes.Buffer)
cfg := &TopConfig{
Exec: topPath,
Limit: 1,
IntervalSecond: 1,
PID: pid,
Writer: buf,
cmd: nil,
}
if err := cfg.createCmd(); err != nil {
return nil, err
}
type result struct {
row TopCommandRow
err error
// run starts the 'top' command and waits for it to complete.
if err := cfg.cmd.Run(); err != nil {
return nil, err
}
rc := make(chan result, len(rows))
for _, row := range rows {
go func(row []string) {
tr, err := parseTopRow(row)
rc <- result{row: tr, err: err}
}(row)
}
tcRows := make([]TopCommandRow, 0, len(rows))
for len(tcRows) != len(rows) {
select {
case rs := <-rc:
if rs.err != nil {
return nil, rs.err
}
tcRows = append(tcRows, rs.row)
}
}
return tcRows, nil
}
func parseTopRow(row []string) (TopCommandRow, error) {
trow := TopCommandRow{
USER: strings.TrimSpace(row[top_command_output_row_idx_user]),
}
pv, err := strconv.ParseInt(row[top_command_output_row_idx_pid], 10, 64)
if err != nil {
return TopCommandRow{}, fmt.Errorf("parse error %v (row %v)", err, row)
}
trow.PID = pv
trow.PR = strings.TrimSpace(row[top_command_output_row_idx_pr])
trow.NI = strings.TrimSpace(row[top_command_output_row_idx_ni])
virt, virtTxt, err := parseKiBInTop(row[top_command_output_row_idx_virt])
if err != nil {
return TopCommandRow{}, fmt.Errorf("parse error %v (row %v)", err, row)
}
trow.VIRT = row[top_command_output_row_idx_virt]
trow.VIRTBytesN = virt
trow.VIRTParsedBytes = virtTxt
res, resTxt, err := parseKiBInTop(row[top_command_output_row_idx_res])
if err != nil {
return TopCommandRow{}, fmt.Errorf("parse error %v (row %v)", err, row)
}
trow.RES = row[top_command_output_row_idx_res]
trow.RESBytesN = res
trow.RESParsedBytes = resTxt
shr, shrTxt, err := parseKiBInTop(row[top_command_output_row_idx_shr])
if err != nil {
return TopCommandRow{}, fmt.Errorf("parse error %v (row %v)", err, row)
}
trow.SHR = row[top_command_output_row_idx_shr]
trow.SHRBytesN = shr
trow.SHRParsedBytes = shrTxt
trow.S = row[top_command_output_row_idx_s]
trow.SParsedStatus = convertProcStatus(row[top_command_output_row_idx_s])
cnum, err := strconv.ParseFloat(row[top_command_output_row_idx_cpu], 64)
if err != nil {
return TopCommandRow{}, fmt.Errorf("parse error %v (row %v)", err, row)
}
trow.CPUPercent = cnum
mnum, err := strconv.ParseFloat(row[top_command_output_row_idx_mem], 64)
if err != nil {
return TopCommandRow{}, fmt.Errorf("parse error %v (row %v)", err, row)
}
trow.MEMPercent = mnum
trow.TIME = row[top_command_output_row_idx_time]
return trow, nil
return ParseTopOutput(buf.String())
}

212
vendor/github.com/gyuho/psn/top_parse_output.go generated vendored Normal file
View File

@ -0,0 +1,212 @@
package psn
import (
"bytes"
"fmt"
"strconv"
"strings"
humanize "github.com/dustin/go-humanize"
)
// parses KiB strings, returns bytes in int64, and humanized bytes.
//
// KiB = kibibyte = 1024 bytes
// MiB = mebibyte = 1024 KiB = 1,048,576 bytes
// GiB = gibibyte = 1024 MiB = 1,073,741,824 bytes
// TiB = tebibyte = 1024 GiB = 1,099,511,627,776 bytes
// PiB = pebibyte = 1024 TiB = 1,125,899,906,842,624 bytes
// EiB = exbibyte = 1024 PiB = 1,152,921,504,606,846,976 bytes
//
func parseTopCommandKiB(s string) (bts uint64, hs string, err error) {
s = strings.TrimSpace(s)
switch {
// suffix 'm' means megabytes
case strings.HasSuffix(s, "m"):
ns := s[:len(s)-1]
var mib float64
mib, err = strconv.ParseFloat(ns, 64)
if err != nil {
return 0, "", err
}
bts = uint64(mib) * 1024 * 1024
// suffix 'g' means gigabytes
case strings.HasSuffix(s, "g"):
ns := s[:len(s)-1]
var gib float64
gib, err = strconv.ParseFloat(ns, 64)
if err != nil {
return 0, "", err
}
bts = uint64(gib) * 1024 * 1024 * 1024
default:
var kib float64
kib, err = strconv.ParseFloat(s, 64)
if err != nil {
return 0, "", err
}
bts = uint64(kib) * 1024
}
hs = humanize.Bytes(bts)
return
}
// TopRowHeaders is the headers in 'top' output.
var TopRowHeaders = []string{
"PID",
"USER",
"PR",
"NI",
"VIRT",
"RES",
"SHR",
"S",
"%CPU",
"%MEM",
"TIME+",
"COMMAND",
}
type topCommandOutputRowIdx int
const (
top_command_output_row_idx_pid topCommandOutputRowIdx = iota
top_command_output_row_idx_user
top_command_output_row_idx_pr
top_command_output_row_idx_ni
top_command_output_row_idx_virt
top_command_output_row_idx_res
top_command_output_row_idx_shr
top_command_output_row_idx_s
top_command_output_row_idx_cpu
top_command_output_row_idx_mem
top_command_output_row_idx_time
top_command_output_row_idx_command
)
var bytesToSkip = [][]byte{
{116, 111, 112, 32, 45}, // 'top -'
{84, 97, 115, 107, 115, 58, 32}, // 'Tasks: '
{37, 67, 112, 117, 40, 115, 41, 58, 32}, // '%Cpu(s): '
{67, 112, 117, 40, 115, 41, 58, 32}, // 'Cpu(s): '
{75, 105, 66, 32, 77, 101, 109, 32, 58, 32}, // 'KiB Mem : '
{75, 105, 66, 32, 83, 119, 97, 112, 58, 32}, // 'KiB Swap: '
{77, 101, 109, 58, 32}, // 'Mem: '
{83, 119, 97, 112, 58, 32}, // 'Swap: '
{80, 73, 68, 32}, // 'PID '
}
func topRowToSkip(data []byte) bool {
for _, prefix := range bytesToSkip {
if bytes.HasPrefix(data, prefix) {
return true
}
}
return false
}
// ParseTopOutput parses 'top' command output and returns the rows.
func ParseTopOutput(s string) ([]TopCommandRow, error) {
lines := strings.Split(s, "\n")
rows := make([][]string, 0, len(lines))
for _, line := range lines {
line = strings.TrimSpace(line)
if len(line) == 0 {
continue
}
if topRowToSkip([]byte(line)) {
continue
}
row := strings.Fields(strings.TrimSpace(line))
if len(row) != len(TopRowHeaders) {
return nil, fmt.Errorf("unexpected row column number %v (expected %v)", row, TopRowHeaders)
}
rows = append(rows, row)
}
type result struct {
row TopCommandRow
err error
}
rc := make(chan result, len(rows))
for _, row := range rows {
go func(row []string) {
tr, err := parseTopRow(row)
rc <- result{row: tr, err: err}
}(row)
}
tcRows := make([]TopCommandRow, 0, len(rows))
for len(tcRows) != len(rows) {
select {
case rs := <-rc:
if rs.err != nil {
return nil, rs.err
}
tcRows = append(tcRows, rs.row)
}
}
return tcRows, nil
}
func parseTopRow(row []string) (TopCommandRow, error) {
trow := TopCommandRow{
USER: strings.TrimSpace(row[top_command_output_row_idx_user]),
}
pv, err := strconv.ParseInt(row[top_command_output_row_idx_pid], 10, 64)
if err != nil {
return TopCommandRow{}, fmt.Errorf("parse error %v (row %v)", err, row)
}
trow.PID = pv
trow.PR = strings.TrimSpace(row[top_command_output_row_idx_pr])
trow.NI = strings.TrimSpace(row[top_command_output_row_idx_ni])
virt, virtTxt, err := parseTopCommandKiB(row[top_command_output_row_idx_virt])
if err != nil {
return TopCommandRow{}, fmt.Errorf("parse error %v (row %v)", err, row)
}
trow.VIRT = row[top_command_output_row_idx_virt]
trow.VIRTBytesN = virt
trow.VIRTParsedBytes = virtTxt
res, resTxt, err := parseTopCommandKiB(row[top_command_output_row_idx_res])
if err != nil {
return TopCommandRow{}, fmt.Errorf("parse error %v (row %v)", err, row)
}
trow.RES = row[top_command_output_row_idx_res]
trow.RESBytesN = res
trow.RESParsedBytes = resTxt
shr, shrTxt, err := parseTopCommandKiB(row[top_command_output_row_idx_shr])
if err != nil {
return TopCommandRow{}, fmt.Errorf("parse error %v (row %v)", err, row)
}
trow.SHR = row[top_command_output_row_idx_shr]
trow.SHRBytesN = shr
trow.SHRParsedBytes = shrTxt
trow.S = row[top_command_output_row_idx_s]
trow.SParsedStatus = convertProcStatus(row[top_command_output_row_idx_s])
cnum, err := strconv.ParseFloat(row[top_command_output_row_idx_cpu], 64)
if err != nil {
return TopCommandRow{}, fmt.Errorf("parse error %v (row %v)", err, row)
}
trow.CPUPercent = cnum
mnum, err := strconv.ParseFloat(row[top_command_output_row_idx_mem], 64)
if err != nil {
return TopCommandRow{}, fmt.Errorf("parse error %v (row %v)", err, row)
}
trow.MEMPercent = mnum
trow.TIME = row[top_command_output_row_idx_time]
return trow, nil
}

231
vendor/github.com/gyuho/psn/top_stream.go generated vendored Normal file
View File

@ -0,0 +1,231 @@
package psn
import (
"bufio"
"bytes"
"os"
"os/exec"
"strings"
"sync"
"github.com/kr/pty"
)
// TopStream provides top command output stream.
type TopStream struct {
cmd *exec.Cmd
pmu sync.Mutex
pt *os.File
// broadcast updates whenver available available
wg sync.WaitGroup
rcond *sync.Cond
rmu sync.RWMutex // protect results
queue []TopCommandRow
pid2TopCommandRow map[int64]TopCommandRow
err error
errc chan error
// signal only once at initial, once the first line is ready
readymu sync.RWMutex
ready bool
readyc chan struct{}
}
// StartStream starts 'top' command stream.
func (cfg *TopConfig) StartStream() (*TopStream, error) {
if err := cfg.createCmd(); err != nil {
return nil, err
}
pt, err := pty.Start(cfg.cmd)
if err != nil {
return nil, err
}
str := &TopStream{
cmd: cfg.cmd,
pmu: sync.Mutex{},
pt: pt,
wg: sync.WaitGroup{},
rmu: sync.RWMutex{},
// pre-allocate
queue: make([]TopCommandRow, 0, 500),
pid2TopCommandRow: make(map[int64]TopCommandRow, 500),
err: nil,
errc: make(chan error, 1),
ready: false,
readyc: make(chan struct{}, 1),
}
str.rcond = sync.NewCond(&str.rmu)
str.wg.Add(1)
go str.enqueue()
go str.dequeue()
<-str.readyc
return str, nil
}
// Stop kills the 'top' process and waits for it to exit.
func (str *TopStream) Stop() error {
return str.close(true)
}
// Wait just waits for the 'top' process to exit.
func (str *TopStream) Wait() error {
return str.close(false)
}
// ErrChan returns the error from stream.
func (str *TopStream) ErrChan() <-chan error {
return str.errc
}
// Latest returns the latest top command outputs.
func (str *TopStream) Latest() map[int64]TopCommandRow {
str.rmu.RLock()
cm := make(map[int64]TopCommandRow, len(str.pid2TopCommandRow))
for k, v := range str.pid2TopCommandRow {
cm[k] = v
}
str.rmu.RUnlock()
return cm
}
func (str *TopStream) noError() (noErr bool) {
str.rmu.RLock()
noErr = str.err == nil
str.rmu.RUnlock()
return
}
// feed new top results into the queue
func (str *TopStream) enqueue() {
defer str.wg.Done()
reader := bufio.NewReader(str.pt)
for str.noError() {
// lock for pty
str.pmu.Lock()
data, _, lerr := reader.ReadLine()
str.pmu.Unlock()
data = bytes.TrimSpace(data)
if topRowToSkip(data) {
continue
}
line := string(data)
// lock for results
str.rmu.Lock()
str.err = lerr
if line == "" {
str.rmu.Unlock()
continue
}
row := strings.Fields(line)
if len(row) != len(TopRowHeaders) {
str.rmu.Unlock()
continue
}
r, rerr := parseTopRow(row)
if rerr != nil {
str.err = rerr
str.rmu.Unlock()
continue
}
str.queue = append(str.queue, r)
if len(str.queue) == 1 {
// we have a new output; signal!
str.rcond.Signal()
}
str.rmu.Unlock()
}
// we got error; signal!
str.rcond.Signal()
}
// dequeue polls from 'top' process.
// And signals error channel if any.
func (str *TopStream) dequeue() {
str.rmu.Lock()
for {
// wait until there's output
for len(str.queue) == 0 && str.err == nil {
str.rcond.Wait()
}
// no output; should be error
if len(str.queue) == 0 {
break
}
row := str.queue[0]
str.queue = str.queue[1:]
str.pid2TopCommandRow[row.PID] = row
str.readymu.RLock()
rd := str.ready
str.readymu.RUnlock()
if !rd {
str.readymu.Lock()
str.ready = true
str.readymu.Unlock()
close(str.readyc)
}
}
if expectedErr(str.err) {
str.err = nil
}
if str.err != nil {
str.errc <- str.err
}
str.rmu.Unlock()
}
func (str *TopStream) close(kill bool) (err error) {
if str.cmd == nil {
return str.err
}
if kill {
str.cmd.Process.Kill()
}
err = str.cmd.Wait()
str.pmu.Lock()
str.pt.Close() // close file
str.pmu.Unlock()
str.wg.Wait()
if err != nil {
if !kill && strings.Contains(err.Error(), "exit status") {
err = nil // non-zero exit code
} else if kill && expectedErr(err) {
err = nil
}
}
str.cmd = nil
return err
}
func expectedErr(err error) bool {
if err == nil {
return true
}
es := err.Error()
return strings.Contains(es, "signal:") ||
strings.Contains(es, "/dev/ptmx: input/output error") ||
strings.Contains(es, "/dev/ptmx: file already closed")
}

View File

@ -1,12 +1,41 @@
package psn
import (
"fmt"
"io/ioutil"
"path/filepath"
"strconv"
"strings"
)
func convertProcStatus(s string) string {
ns := strings.TrimSpace(s)
if len(s) > 1 {
ns = ns[:1]
}
switch ns {
case "D":
return "D (uninterruptible sleep)"
case "R":
return "R (running)"
case "S":
return "S (sleeping)"
case "T":
return "T (stopped by job control signal)"
case "t":
return "t (stopped by debugger during trace)"
case "Z":
return "Z (zombie)"
default:
return fmt.Sprintf("unknown process %q", s)
}
}
func pidFromFd(s string) (int64, error) {
// get 5261 from '/proc/5261/fd/69'
return strconv.ParseInt(strings.Split(s, "/")[2], 10, 64)
}
// ListPIDs reads all PIDs in '/proc'.
func ListPIDs() ([]int64, error) {
ds, err := ioutil.ReadDir("/proc")
@ -38,11 +67,6 @@ func ListProcFds() ([]string, error) {
return fs, nil
}
func pidFromFd(s string) (int64, error) {
// get 5261 from '/proc/5261/fd/69'
return strconv.ParseInt(strings.Split(s, "/")[2], 10, 64)
}
// GetProgram returns the program name.
func GetProgram(pid int64) (string, error) {
// Readlink needs root permission