diff --git a/cli/cmd/root.go b/cli/cmd/root.go index bb210fe4d..4420efe42 100644 --- a/cli/cmd/root.go +++ b/cli/cmd/root.go @@ -170,6 +170,39 @@ func renderStats(buffer bytes.Buffer, options *statOptionsBase) string { return out } +// getRequestRate calculates request rate from Public API BasicStats. +func getRequestRate(stats *pb.BasicStats, timeWindow string) float64 { + success := stats.SuccessCount + failure := stats.FailureCount + windowLength, err := time.ParseDuration(timeWindow) + if err != nil { + log.Error(err.Error()) + return 0.0 + } + return float64(success+failure) / windowLength.Seconds() +} + +// getSuccessRate calculates success rate from Public API BasicStats. +func getSuccessRate(stats *pb.BasicStats) float64 { + success := stats.SuccessCount + failure := stats.FailureCount + + if success+failure == 0 { + return 0.0 + } + return float64(success) / float64(success+failure) +} + +// getPercentTLS calculates the percent of traffic that is TLS, from Public API +// BasicStats. +func getPercentTLS(stats *pb.BasicStats) float64 { + reqTotal := stats.SuccessCount + stats.FailureCount + if reqTotal == 0 { + return 0.0 + } + return float64(stats.TlsRequestCount) / float64(reqTotal) +} + type proxyConfigOptions struct { linkerdVersion string proxyImage string diff --git a/cli/cmd/routes.go b/cli/cmd/routes.go index c3612b413..1773f0569 100644 --- a/cli/cmd/routes.go +++ b/cli/cmd/routes.go @@ -111,8 +111,8 @@ func writeRouteStatsToBuffer(resp *pb.TopRoutesResponse, w *tabwriter.Writer, op table = append(table, &rowStats{ route: route, dst: r.GetAuthority(), - requestRate: util.GetRequestRate(r.Stats, r.TimeWindow), - successRate: util.GetSuccessRate(r.Stats), + requestRate: getRequestRate(r.Stats, r.TimeWindow), + successRate: getSuccessRate(r.Stats), latencyP50: r.Stats.LatencyMsP50, latencyP95: r.Stats.LatencyMsP95, latencyP99: r.Stats.LatencyMsP99, diff --git a/cli/cmd/stat.go b/cli/cmd/stat.go index 642d325c5..011263257 100644 --- a/cli/cmd/stat.go +++ b/cli/cmd/stat.go @@ -265,9 +265,9 @@ func writeStatsToBuffer(rows []*pb.StatTable_PodGroup_Row, w *tabwriter.Writer, if r.Stats != nil { statTables[resourceKey][key].rowStats = &rowStats{ - requestRate: util.GetRequestRate(r.Stats, r.TimeWindow), - successRate: util.GetSuccessRate(r.Stats), - tlsPercent: util.GetPercentTLS(r.Stats), + requestRate: getRequestRate(r.Stats, r.TimeWindow), + successRate: getSuccessRate(r.Stats), + tlsPercent: getPercentTLS(r.Stats), latencyP50: r.Stats.LatencyMsP50, latencyP95: r.Stats.LatencyMsP95, latencyP99: r.Stats.LatencyMsP99, diff --git a/cli/cmd/tap.go b/cli/cmd/tap.go index 97b6babda..2f21aa842 100644 --- a/cli/cmd/tap.go +++ b/cli/cmd/tap.go @@ -11,8 +11,11 @@ import ( "github.com/linkerd/linkerd2/controller/api/util" pb "github.com/linkerd/linkerd2/controller/gen/public" + "github.com/linkerd/linkerd2/pkg/addr" + "github.com/linkerd/linkerd2/pkg/k8s" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "google.golang.org/grpc/codes" ) type tapOptions struct { @@ -165,7 +168,7 @@ func writeTapEventsToBuffer(tapClient pb.Api_TapByResourceClient, w *tabwriter.W fmt.Fprintln(os.Stderr, err) break } - _, err = fmt.Fprintln(w, util.RenderTapEvent(event, resource)) + _, err = fmt.Fprintln(w, renderTapEvent(event, resource)) if err != nil { return err } @@ -173,3 +176,179 @@ func writeTapEventsToBuffer(tapClient pb.Api_TapByResourceClient, w *tabwriter.W return nil } + +// renderTapEvent renders a Public API TapEvent to a string. +func renderTapEvent(event *pb.TapEvent, resource string) string { + dst := dst(event) + src := src(event) + + proxy := "???" + tls := "" + switch event.GetProxyDirection() { + case pb.TapEvent_INBOUND: + proxy = "in " // A space is added so it aligns with `out`. + tls = src.tlsStatus() + case pb.TapEvent_OUTBOUND: + proxy = "out" + tls = dst.tlsStatus() + default: + // Too old for TLS. + } + + flow := fmt.Sprintf("proxy=%s %s %s tls=%s", + proxy, + src.formatAddr(), + dst.formatAddr(), + tls, + ) + + // If `resource` is non-empty, then + resources := "" + if resource != "" { + resources = fmt.Sprintf( + "%s%s%s", + src.formatResource(resource), + dst.formatResource(resource), + routeLabels(event), + ) + } + + switch ev := event.GetHttp().GetEvent().(type) { + case *pb.TapEvent_Http_RequestInit_: + return fmt.Sprintf("req id=%d:%d %s :method=%s :authority=%s :path=%s%s", + ev.RequestInit.GetId().GetBase(), + ev.RequestInit.GetId().GetStream(), + flow, + ev.RequestInit.GetMethod().GetRegistered().String(), + ev.RequestInit.GetAuthority(), + ev.RequestInit.GetPath(), + resources, + ) + + case *pb.TapEvent_Http_ResponseInit_: + return fmt.Sprintf("rsp id=%d:%d %s :status=%d latency=%dµs%s", + ev.ResponseInit.GetId().GetBase(), + ev.ResponseInit.GetId().GetStream(), + flow, + ev.ResponseInit.GetHttpStatus(), + ev.ResponseInit.GetSinceRequestInit().GetNanos()/1000, + resources, + ) + + case *pb.TapEvent_Http_ResponseEnd_: + switch eos := ev.ResponseEnd.GetEos().GetEnd().(type) { + case *pb.Eos_GrpcStatusCode: + return fmt.Sprintf( + "end id=%d:%d %s grpc-status=%s duration=%dµs response-length=%dB%s", + ev.ResponseEnd.GetId().GetBase(), + ev.ResponseEnd.GetId().GetStream(), + flow, + codes.Code(eos.GrpcStatusCode), + ev.ResponseEnd.GetSinceResponseInit().GetNanos()/1000, + ev.ResponseEnd.GetResponseBytes(), + resources, + ) + + case *pb.Eos_ResetErrorCode: + return fmt.Sprintf( + "end id=%d:%d %s reset-error=%+v duration=%dµs response-length=%dB%s", + ev.ResponseEnd.GetId().GetBase(), + ev.ResponseEnd.GetId().GetStream(), + flow, + eos.ResetErrorCode, + ev.ResponseEnd.GetSinceResponseInit().GetNanos()/1000, + ev.ResponseEnd.GetResponseBytes(), + resources, + ) + + default: + return fmt.Sprintf("end id=%d:%d %s duration=%dµs response-length=%dB%s", + ev.ResponseEnd.GetId().GetBase(), + ev.ResponseEnd.GetId().GetStream(), + flow, + ev.ResponseEnd.GetSinceResponseInit().GetNanos()/1000, + ev.ResponseEnd.GetResponseBytes(), + resources, + ) + } + + default: + return fmt.Sprintf("unknown %s", flow) + } +} + +// src returns the source peer of a `TapEvent`. +func src(event *pb.TapEvent) peer { + return peer{ + address: event.GetSource(), + labels: event.GetSourceMeta().GetLabels(), + direction: "src", + } +} + +// dst returns the destination peer of a `TapEvent`. +func dst(event *pb.TapEvent) peer { + return peer{ + address: event.GetDestination(), + labels: event.GetDestinationMeta().GetLabels(), + direction: "dst", + } +} + +type peer struct { + address *pb.TcpAddress + labels map[string]string + direction string +} + +// formatAddr formats the peer's TCP address for the `src` or `dst` element in +// the tap output corresponding to this peer. +func (p *peer) formatAddr() string { + return fmt.Sprintf( + "%s=%s", + p.direction, + addr.PublicAddressToString(p.address), + ) +} + +// formatResource returns a label describing what Kubernetes resources the peer +// belongs to. If the peer belongs to a resource of kind `resourceKind`, it will +// return a label for that resource; otherwise, it will fall back to the peer's +// pod name. Additionally, if the resource is not of type `namespace`, it will +// also add a label describing the peer's resource. +func (p *peer) formatResource(resourceKind string) string { + var s string + if resourceName, exists := p.labels[resourceKind]; exists { + kind := resourceKind + if short := k8s.ShortNameFromCanonicalResourceName(resourceKind); short != "" { + kind = short + } + s = fmt.Sprintf( + " %s_res=%s/%s", + p.direction, + kind, + resourceName, + ) + } else if pod, hasPod := p.labels[k8s.Pod]; hasPod { + s = fmt.Sprintf(" %s_pod=%s", p.direction, pod) + } + if resourceKind != k8s.Namespace { + if ns, hasNs := p.labels[k8s.Namespace]; hasNs { + s += fmt.Sprintf(" %s_ns=%s", p.direction, ns) + } + } + return s +} + +func (p *peer) tlsStatus() string { + return p.labels["tls"] +} + +func routeLabels(event *pb.TapEvent) string { + out := "" + for key, val := range event.GetRouteMeta().GetLabels() { + out = fmt.Sprintf("%s rt_%s=%s", out, key, val) + } + + return out +} diff --git a/cli/cmd/tap_test.go b/cli/cmd/tap_test.go index e97c8621c..e76d003a6 100644 --- a/cli/cmd/tap_test.go +++ b/cli/cmd/tap_test.go @@ -228,7 +228,7 @@ func TestEventToString(t *testing.T) { }) expectedOutput := "req id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= :method=POST :authority=hello.default:7777 :path=/hello.v1.HelloService/Hello" - output := util.RenderTapEvent(event, "") + output := renderTapEvent(event, "") if output != expectedOutput { t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output) } @@ -245,7 +245,7 @@ func TestEventToString(t *testing.T) { }) expectedOutput := "rsp id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= :status=200 latency=999µs" - output := util.RenderTapEvent(event, "") + output := renderTapEvent(event, "") if output != expectedOutput { t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output) } @@ -266,7 +266,7 @@ func TestEventToString(t *testing.T) { }) expectedOutput := "end id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= grpc-status=OK duration=888µs response-length=111B" - output := util.RenderTapEvent(event, "") + output := renderTapEvent(event, "") if output != expectedOutput { t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output) } @@ -287,7 +287,7 @@ func TestEventToString(t *testing.T) { }) expectedOutput := "end id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= reset-error=123 duration=888µs response-length=111B" - output := util.RenderTapEvent(event, "") + output := renderTapEvent(event, "") if output != expectedOutput { t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output) } @@ -306,7 +306,7 @@ func TestEventToString(t *testing.T) { }) expectedOutput := "end id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= duration=888µs response-length=111B" - output := util.RenderTapEvent(event, "") + output := renderTapEvent(event, "") if output != expectedOutput { t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output) } @@ -324,7 +324,7 @@ func TestEventToString(t *testing.T) { }) expectedOutput := "end id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= duration=888µs response-length=111B" - output := util.RenderTapEvent(event, "") + output := renderTapEvent(event, "") if output != expectedOutput { t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output) } @@ -334,7 +334,7 @@ func TestEventToString(t *testing.T) { event := toTapEvent(&pb.TapEvent_Http{}) expectedOutput := "unknown proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls=" - output := util.RenderTapEvent(event, "") + output := renderTapEvent(event, "") if output != expectedOutput { t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output) } diff --git a/controller/api/proxy/client.go b/controller/api/proxy/client.go deleted file mode 100644 index 07ac806af..000000000 --- a/controller/api/proxy/client.go +++ /dev/null @@ -1,17 +0,0 @@ -package proxy - -import ( - pb "github.com/linkerd/linkerd2-proxy-api/go/destination" - "google.golang.org/grpc" -) - -// NewClient creates a new gRPC client to the Destination service. -// TODO: consider moving this into destination-client, or removing altogether. -func NewClient(addr string) (pb.DestinationClient, *grpc.ClientConn, error) { - conn, err := grpc.Dial(addr, grpc.WithInsecure()) - if err != nil { - return nil, nil, err - } - - return pb.NewDestinationClient(conn), conn, nil -} diff --git a/controller/api/util/api_utils.go b/controller/api/util/api_utils.go index 07784b9bd..d347134d3 100644 --- a/controller/api/util/api_utils.go +++ b/controller/api/util/api_utils.go @@ -7,9 +7,7 @@ import ( "time" pb "github.com/linkerd/linkerd2/controller/gen/public" - "github.com/linkerd/linkerd2/pkg/addr" "github.com/linkerd/linkerd2/pkg/k8s" - log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/api/core/v1" @@ -470,216 +468,3 @@ func contains(list []string, s string) bool { } return false } - -type peer struct { - address *pb.TcpAddress - labels map[string]string - direction string -} - -// src returns the source peer of a `TapEvent`. -func src(event *pb.TapEvent) peer { - return peer{ - address: event.GetSource(), - labels: event.GetSourceMeta().GetLabels(), - direction: "src", - } -} - -// dst returns the destination peer of a `TapEvent`. -func dst(event *pb.TapEvent) peer { - return peer{ - address: event.GetDestination(), - labels: event.GetDestinationMeta().GetLabels(), - direction: "dst", - } -} - -// formatAddr formats the peer's TCP address for the `src` or `dst` element in -// the tap output corresponding to this peer. -func (p *peer) formatAddr() string { - return fmt.Sprintf( - "%s=%s", - p.direction, - addr.PublicAddressToString(p.address), - ) -} - -// formatResource returns a label describing what Kubernetes resources the peer -// belongs to. If the peer belongs to a resource of kind `resourceKind`, it will -// return a label for that resource; otherwise, it will fall back to the peer's -// pod name. Additionally, if the resource is not of type `namespace`, it will -// also add a label describing the peer's resource. -func (p *peer) formatResource(resourceKind string) string { - var s string - if resourceName, exists := p.labels[resourceKind]; exists { - kind := resourceKind - if short := k8s.ShortNameFromCanonicalResourceName(resourceKind); short != "" { - kind = short - } - s = fmt.Sprintf( - " %s_res=%s/%s", - p.direction, - kind, - resourceName, - ) - } else if pod, hasPod := p.labels[k8s.Pod]; hasPod { - s = fmt.Sprintf(" %s_pod=%s", p.direction, pod) - } - if resourceKind != k8s.Namespace { - if ns, hasNs := p.labels[k8s.Namespace]; hasNs { - s += fmt.Sprintf(" %s_ns=%s", p.direction, ns) - } - } - return s -} - -func (p *peer) tlsStatus() string { - return p.labels["tls"] -} - -func routeLabels(event *pb.TapEvent) string { - out := "" - for key, val := range event.GetRouteMeta().GetLabels() { - out = fmt.Sprintf("%s rt_%s=%s", out, key, val) - } - - return out -} - -// RenderTapEvent renders a Public API TapEvent to a string. -// TODO: consider moving this into cli/cmd/tap.go. -func RenderTapEvent(event *pb.TapEvent, resource string) string { - dst := dst(event) - src := src(event) - - proxy := "???" - tls := "" - switch event.GetProxyDirection() { - case pb.TapEvent_INBOUND: - proxy = "in " // A space is added so it aligns with `out`. - tls = src.tlsStatus() - case pb.TapEvent_OUTBOUND: - proxy = "out" - tls = dst.tlsStatus() - default: - // Too old for TLS. - } - - flow := fmt.Sprintf("proxy=%s %s %s tls=%s", - proxy, - src.formatAddr(), - dst.formatAddr(), - tls, - ) - - // If `resource` is non-empty, then - resources := "" - if resource != "" { - resources = fmt.Sprintf( - "%s%s%s", - src.formatResource(resource), - dst.formatResource(resource), - routeLabels(event), - ) - } - - switch ev := event.GetHttp().GetEvent().(type) { - case *pb.TapEvent_Http_RequestInit_: - return fmt.Sprintf("req id=%d:%d %s :method=%s :authority=%s :path=%s%s", - ev.RequestInit.GetId().GetBase(), - ev.RequestInit.GetId().GetStream(), - flow, - ev.RequestInit.GetMethod().GetRegistered().String(), - ev.RequestInit.GetAuthority(), - ev.RequestInit.GetPath(), - resources, - ) - - case *pb.TapEvent_Http_ResponseInit_: - return fmt.Sprintf("rsp id=%d:%d %s :status=%d latency=%dµs%s", - ev.ResponseInit.GetId().GetBase(), - ev.ResponseInit.GetId().GetStream(), - flow, - ev.ResponseInit.GetHttpStatus(), - ev.ResponseInit.GetSinceRequestInit().GetNanos()/1000, - resources, - ) - - case *pb.TapEvent_Http_ResponseEnd_: - switch eos := ev.ResponseEnd.GetEos().GetEnd().(type) { - case *pb.Eos_GrpcStatusCode: - return fmt.Sprintf( - "end id=%d:%d %s grpc-status=%s duration=%dµs response-length=%dB%s", - ev.ResponseEnd.GetId().GetBase(), - ev.ResponseEnd.GetId().GetStream(), - flow, - codes.Code(eos.GrpcStatusCode), - ev.ResponseEnd.GetSinceResponseInit().GetNanos()/1000, - ev.ResponseEnd.GetResponseBytes(), - resources, - ) - - case *pb.Eos_ResetErrorCode: - return fmt.Sprintf( - "end id=%d:%d %s reset-error=%+v duration=%dµs response-length=%dB%s", - ev.ResponseEnd.GetId().GetBase(), - ev.ResponseEnd.GetId().GetStream(), - flow, - eos.ResetErrorCode, - ev.ResponseEnd.GetSinceResponseInit().GetNanos()/1000, - ev.ResponseEnd.GetResponseBytes(), - resources, - ) - - default: - return fmt.Sprintf("end id=%d:%d %s duration=%dµs response-length=%dB%s", - ev.ResponseEnd.GetId().GetBase(), - ev.ResponseEnd.GetId().GetStream(), - flow, - ev.ResponseEnd.GetSinceResponseInit().GetNanos()/1000, - ev.ResponseEnd.GetResponseBytes(), - resources, - ) - } - - default: - return fmt.Sprintf("unknown %s", flow) - } -} - -// GetRequestRate calculates request rate from Public API BasicStats. -// TODO: consider moving this into `/cli/cmd`. -func GetRequestRate(stats *pb.BasicStats, timeWindow string) float64 { - success := stats.SuccessCount - failure := stats.FailureCount - windowLength, err := time.ParseDuration(timeWindow) - if err != nil { - log.Error(err.Error()) - return 0.0 - } - return float64(success+failure) / windowLength.Seconds() -} - -// GetSuccessRate calculates success rate from Public API BasicStats. -// TODO: consider moving this into `/cli/cmd`. -func GetSuccessRate(stats *pb.BasicStats) float64 { - success := stats.SuccessCount - failure := stats.FailureCount - - if success+failure == 0 { - return 0.0 - } - return float64(success) / float64(success+failure) -} - -// GetPercentTLS calculates the percent of traffic that is TLS, from Public API -// BasicStats. -// TODO: consider moving this into `/cli/cmd/stat.go`. -func GetPercentTLS(stats *pb.BasicStats) float64 { - reqTotal := stats.SuccessCount + stats.FailureCount - if reqTotal == 0 { - return 0.0 - } - return float64(stats.TlsRequestCount) / float64(reqTotal) -} diff --git a/controller/script/destination-client/main.go b/controller/script/destination-client/main.go index 45662da71..fb405560e 100644 --- a/controller/script/destination-client/main.go +++ b/controller/script/destination-client/main.go @@ -8,9 +8,9 @@ import ( "time" pb "github.com/linkerd/linkerd2-proxy-api/go/destination" - "github.com/linkerd/linkerd2/controller/api/proxy" addrUtil "github.com/linkerd/linkerd2/pkg/addr" log "github.com/sirupsen/logrus" + "google.golang.org/grpc" ) // This is a throwaway script for testing the destination service @@ -23,7 +23,7 @@ func main() { method := flag.String("method", "get", "which gRPC method to invoke") flag.Parse() - client, conn, err := proxy.NewClient(*addr) + client, conn, err := newClient(*addr) if err != nil { log.Fatal(err.Error()) } @@ -44,6 +44,16 @@ func main() { } } +// newClient creates a new gRPC client to the Destination service. +func newClient(addr string) (pb.DestinationClient, *grpc.ClientConn, error) { + conn, err := grpc.Dial(addr, grpc.WithInsecure()) + if err != nil { + return nil, nil, err + } + + return pb.NewDestinationClient(conn), conn, nil +} + func get(client pb.DestinationClient, req *pb.GetDestination) { rsp, err := client.Get(context.Background(), req) if err != nil {