linkerd2/cli/cmd/top.go

417 lines
11 KiB
Go

package cmd
import (
"context"
"fmt"
"io"
"os"
"sort"
"strconv"
"strings"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/linkerd/linkerd2/controller/api/util"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/addr"
runewidth "github.com/mattn/go-runewidth"
termbox "github.com/nsf/termbox-go"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
type topOptions struct {
namespace string
toResource string
toNamespace string
maxRps float32
scheme string
method string
authority string
path string
hideSources bool
}
type topRequest struct {
event *pb.TapEvent
reqInit *pb.TapEvent_Http_RequestInit
rspInit *pb.TapEvent_Http_ResponseInit
rspEnd *pb.TapEvent_Http_ResponseEnd
}
type topRequestID struct {
src string
dst string
stream uint64
}
func (id topRequestID) String() string {
return fmt.Sprintf("%s->%s(%d)", id.src, id.dst, id.stream)
}
type tableRow struct {
by string
method string
source string
destination string
count int
best time.Duration
worst time.Duration
last time.Duration
successes int
failures int
}
const headerHeight = 3
var (
columnNames = []string{"Source", "Destination", "Method", "Path", "Count", "Best", "Worst", "Last", "Success Rate"}
columnWidths = []int{23, 23, 10, 37, 6, 6, 6, 6, 3}
)
func newTopOptions() *topOptions {
return &topOptions{
namespace: "default",
toResource: "",
toNamespace: "",
maxRps: 100.0,
scheme: "",
method: "",
authority: "",
path: "",
hideSources: false,
}
}
func newCmdTop() *cobra.Command {
options := newTopOptions()
cmd := &cobra.Command{
Use: "top [flags] (RESOURCE)",
Short: "Display sorted information about live traffic",
Long: `Display sorted information about live traffic.
The RESOURCE argument specifies the target resource(s) to view traffic for:
(TYPE [NAME] | TYPE/NAME)
Examples:
* deploy
* deploy/my-deploy
* deploy my-deploy
* ns/my-ns
Valid resource types include:
* deployments
* namespaces
* pods
* replicationcontrollers
* services (only supported as a "--to" resource)`,
Example: ` # display traffic for the web deployment in the default namespace
linkerd top deploy/web
# display traffic for the web-dlbvj pod in the default namespace
linkerd top pod/web-dlbvj`,
Args: cobra.RangeArgs(1, 2),
ValidArgs: util.ValidTargets,
RunE: func(cmd *cobra.Command, args []string) error {
requestParams := util.TapRequestParams{
Resource: strings.Join(args, "/"),
Namespace: options.namespace,
ToResource: options.toResource,
ToNamespace: options.toNamespace,
MaxRps: options.maxRps,
Scheme: options.scheme,
Method: options.method,
Authority: options.authority,
Path: options.path,
}
req, err := util.BuildTapByResourceRequest(requestParams)
if err != nil {
return err
}
return getTrafficByResourceFromAPI(os.Stdout, validatedPublicAPIClient(time.Time{}), req, options)
},
}
cmd.PersistentFlags().StringVarP(&options.namespace, "namespace", "n", options.namespace,
"Namespace of the specified resource")
cmd.PersistentFlags().StringVar(&options.toResource, "to", options.toResource,
"Display requests to this resource")
cmd.PersistentFlags().StringVar(&options.toNamespace, "to-namespace", options.toNamespace,
"Sets the namespace used to lookup the \"--to\" resource; by default the current \"--namespace\" is used")
cmd.PersistentFlags().Float32Var(&options.maxRps, "max-rps", options.maxRps,
"Maximum requests per second to tap.")
cmd.PersistentFlags().StringVar(&options.scheme, "scheme", options.scheme,
"Display requests with this scheme")
cmd.PersistentFlags().StringVar(&options.method, "method", options.method,
"Display requests with this HTTP method")
cmd.PersistentFlags().StringVar(&options.authority, "authority", options.authority,
"Display requests with this :authority")
cmd.PersistentFlags().StringVar(&options.path, "path", options.path,
"Display requests with paths that start with this prefix")
cmd.PersistentFlags().BoolVar(&options.hideSources, "hide-sources", options.hideSources, "Hide the source column")
return cmd
}
func getTrafficByResourceFromAPI(w io.Writer, client pb.ApiClient, req *pb.TapByResourceRequest, options *topOptions) error {
rsp, err := client.TapByResource(context.Background(), req)
if err != nil {
return err
}
err = termbox.Init()
if err != nil {
return err
}
defer termbox.Close()
requestCh := make(chan topRequest, 100)
done := make(chan struct{})
go recvEvents(rsp, requestCh, done)
go pollInput(done)
renderTable(requestCh, done, !options.hideSources)
return nil
}
func recvEvents(tapClient pb.Api_TapByResourceClient, requestCh chan<- topRequest, done chan<- struct{}) {
outstandingRequests := make(map[topRequestID]topRequest)
for {
event, err := tapClient.Recv()
if err == io.EOF {
fmt.Println("Tap stream terminated")
close(done)
return
}
if err != nil {
fmt.Println(err.Error())
close(done)
return
}
id := topRequestID{
src: addr.PublicAddressToString(event.GetSource()),
dst: addr.PublicAddressToString(event.GetDestination()),
}
switch ev := event.GetHttp().GetEvent().(type) {
case *pb.TapEvent_Http_RequestInit_:
id.stream = ev.RequestInit.GetId().Stream
outstandingRequests[id] = topRequest{
event: event,
reqInit: ev.RequestInit,
}
case *pb.TapEvent_Http_ResponseInit_:
id.stream = ev.ResponseInit.GetId().Stream
if req, ok := outstandingRequests[id]; ok {
req.rspInit = ev.ResponseInit
outstandingRequests[id] = req
} else {
log.Warnf("Got ResponseInit for unknown stream: %s", id)
}
case *pb.TapEvent_Http_ResponseEnd_:
id.stream = ev.ResponseEnd.GetId().Stream
if req, ok := outstandingRequests[id]; ok {
req.rspEnd = ev.ResponseEnd
requestCh <- req
} else {
log.Warnf("Got ResponseEnd for unknown stream: %s", id)
}
}
}
}
func pollInput(done chan<- struct{}) {
for {
switch ev := termbox.PollEvent(); ev.Type {
case termbox.EventKey:
if ev.Ch == 'q' || ev.Key == termbox.KeyCtrlC {
close(done)
return
}
}
}
}
func renderTable(requestCh <-chan topRequest, done <-chan struct{}, withSource bool) {
ticker := time.NewTicker(100 * time.Millisecond)
var table []tableRow
for {
select {
case <-done:
return
case req := <-requestCh:
tableInsert(&table, req, withSource)
case <-ticker.C:
termbox.Clear(termbox.ColorDefault, termbox.ColorDefault)
renderHeaders(withSource)
renderTableBody(&table, withSource)
termbox.Flush()
}
}
}
func tableInsert(table *[]tableRow, req topRequest, withSource bool) {
by := req.reqInit.GetPath()
method := req.reqInit.GetMethod().GetRegistered().String()
source := stripPort(addr.PublicAddressToString(req.event.GetSource()))
if pod := req.event.SourceMeta.Labels["pod"]; pod != "" {
source = pod
}
destination := stripPort(addr.PublicAddressToString(req.event.GetDestination()))
if pod := req.event.DestinationMeta.Labels["pod"]; pod != "" {
destination = pod
}
latency, err := ptypes.Duration(req.rspEnd.GetSinceRequestInit())
if err != nil {
log.Errorf("error parsing duration %v: %s", req.rspEnd.GetSinceRequestInit(), err)
return
}
success := req.rspInit.GetHttpStatus() < 500
if success {
switch eos := req.rspEnd.GetEos().GetEnd().(type) {
case *pb.Eos_GrpcStatusCode:
success = eos.GrpcStatusCode == 0
case *pb.Eos_ResetErrorCode:
success = false
}
}
found := false
for i, row := range *table {
if row.by == by && row.method == method && row.destination == destination && (row.source == source || !withSource) {
(*table)[i].count++
if latency.Nanoseconds() < row.best.Nanoseconds() {
(*table)[i].best = latency
}
if latency.Nanoseconds() > row.worst.Nanoseconds() {
(*table)[i].worst = latency
}
(*table)[i].last = latency
if success {
(*table)[i].successes++
} else {
(*table)[i].failures++
}
found = true
}
}
if !found {
successes := 0
failures := 0
if success {
successes++
} else {
failures++
}
row := tableRow{
by: by,
method: method,
source: source,
destination: destination,
count: 1,
best: latency,
worst: latency,
last: latency,
successes: successes,
failures: failures,
}
*table = append(*table, row)
}
}
func stripPort(address string) string {
return strings.Split(address, ":")[0]
}
func renderHeaders(withSource bool) {
tbprint(0, 0, "(press q to quit)")
x := 0
for i, header := range columnNames {
if i == 0 && !withSource {
continue
}
width := columnWidths[i]
padded := fmt.Sprintf("%-"+strconv.Itoa(width)+"s ", header)
tbprintBold(x, 2, padded)
x += width + 1
}
}
func max(i, j int) int {
if i > j {
return i
}
return j
}
func renderTableBody(table *[]tableRow, withSource bool) {
sort.SliceStable(*table, func(i, j int) bool {
return (*table)[i].count > (*table)[j].count
})
adjustedColumnWidths := columnWidths
for _, row := range *table {
adjustedColumnWidths[0] = max(adjustedColumnWidths[0], runewidth.StringWidth(row.source))
adjustedColumnWidths[1] = max(adjustedColumnWidths[1], runewidth.StringWidth(row.destination))
adjustedColumnWidths[3] = max(adjustedColumnWidths[3], runewidth.StringWidth(row.by))
}
for i, row := range *table {
x := 0
if withSource {
tbprint(x, i+headerHeight, row.source)
x += adjustedColumnWidths[0] + 1
}
tbprint(x, i+headerHeight, row.destination)
x += adjustedColumnWidths[1] + 1
tbprint(x, i+headerHeight, row.method)
x += adjustedColumnWidths[2] + 1
tbprint(x, i+headerHeight, row.by)
x += adjustedColumnWidths[3] + 1
tbprint(x, i+headerHeight, strconv.Itoa(row.count))
x += adjustedColumnWidths[4] + 1
tbprint(x, i+headerHeight, formatDuration(row.best))
x += adjustedColumnWidths[5] + 1
tbprint(x, i+headerHeight, formatDuration(row.worst))
x += adjustedColumnWidths[6] + 1
tbprint(x, i+headerHeight, formatDuration(row.last))
x += adjustedColumnWidths[7] + 1
successRate := fmt.Sprintf("%.2f%%", 100.0*float32(row.successes)/float32(row.successes+row.failures))
tbprint(x, i+headerHeight, successRate)
}
}
func tbprint(x, y int, msg string) {
for _, c := range msg {
termbox.SetCell(x, y, c, termbox.ColorDefault, termbox.ColorDefault)
x += runewidth.RuneWidth(c)
}
}
func tbprintBold(x, y int, msg string) {
for _, c := range msg {
termbox.SetCell(x, y, c, termbox.AttrBold, termbox.ColorDefault)
x += runewidth.RuneWidth(c)
}
}
func formatDuration(d time.Duration) string {
if d < time.Millisecond {
return d.Round(time.Microsecond).String()
}
if d < time.Second {
return d.Round(time.Millisecond).String()
}
return d.Round(time.Second).String()
}