Add TopRoutes method the the public api and route CLI command to consume it (#1860)

Add a routes command which displays per-route stats for services that have service profiles defined.

This change has three parts:
* A new public-api RPC called `TopRoutes` which serves per-route stat data about a service
* An implementation of TopRoutes in the public-api service.  This implementation reads per-route data from Prometheus.  This is very similar to how the StatSummaries RPC and much of the code was able to be refactored and shared.
* A new CLI command called `routes` which displays the per-route data in a tabular or json format.  This is very similar to the `stat` command and much of the code was able to be refactored and shared.

Note that as of the currently targeted proxy version, only outbound route stats are supported so the `--from` flag must be included in order to see data.  This restriction will be lifted in an upcoming change once we add support for inbound route stats as well.

Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
Alex Leong 2018-11-19 12:20:30 -08:00 committed by GitHub
parent 4d4b1ebc89
commit 7a7f6b6ecb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 2055 additions and 606 deletions

View File

@ -1,6 +1,7 @@
package cmd
import (
"bytes"
"fmt"
"os"
"regexp"
@ -76,6 +77,7 @@ func init() {
RootCmd.AddCommand(newCmdInject())
RootCmd.AddCommand(newCmdInstall())
RootCmd.AddCommand(newCmdProfile())
RootCmd.AddCommand(newCmdRoutes())
RootCmd.AddCommand(newCmdStat())
RootCmd.AddCommand(newCmdTap())
RootCmd.AddCommand(newCmdTop())
@ -130,6 +132,43 @@ func validatedPublicAPIClient(retryDeadline time.Time) pb.ApiClient {
return hc.PublicAPIClient()
}
type statOptionsBase struct {
namespace string
timeWindow string
outputFormat string
}
func newStatOptionsBase() *statOptionsBase {
return &statOptionsBase{
namespace: "default",
timeWindow: "1m",
outputFormat: "",
}
}
func (o *statOptionsBase) validateOutputFormat() error {
switch o.outputFormat {
case "table", "json", "":
return nil
default:
return fmt.Errorf("--output currently only supports table and json")
}
}
func renderStats(buffer bytes.Buffer, options *statOptionsBase) string {
var out string
switch options.outputFormat {
case "table", "":
// strip left padding on the first column
out = string(buffer.Bytes()[padding:])
out = strings.Replace(out, "\n"+strings.Repeat(" ", padding), "\n", -1)
case "json":
out = string(buffer.Bytes())
}
return out
}
type proxyConfigOptions struct {
linkerdVersion string
proxyImage string

244
cli/cmd/routes.go Normal file
View File

@ -0,0 +1,244 @@
package cmd
import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"sort"
"strings"
"text/tabwriter"
"time"
"github.com/linkerd/linkerd2/controller/api/util"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/k8s"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
type routesOptions struct {
statOptionsBase
fromNamespace string
fromResource string
}
func newRoutesOptions() *routesOptions {
return &routesOptions{
statOptionsBase: *newStatOptionsBase(),
fromNamespace: "",
fromResource: "",
}
}
func newCmdRoutes() *cobra.Command {
options := newRoutesOptions()
cmd := &cobra.Command{
Use: "routes [flags] (SERVICE)",
Short: "Display route stats about a service",
Long: `Display route stats about a service.
This command will only work for services that have a Service Profile defined.`,
Example: ` # Routes for the webapp service in the test namespace.
linkerd routes webapp -n test
# Routes for calls from from the traffic deployment to the webapp service in the test namespace.
linkerd routes webapp -n test --from deploy/traffic --from-namespace test`,
Args: cobra.ExactArgs(1),
ValidArgs: util.ValidTargets,
RunE: func(cmd *cobra.Command, args []string) error {
req, err := buildTopRoutesRequest(args[0], options)
if err != nil {
return fmt.Errorf("error creating metrics request while making routes request: %v", err)
}
output, err := requestRouteStatsFromAPI(validatedPublicAPIClient(time.Time{}), req, options)
if err != nil {
return err
}
_, err = fmt.Print(output)
return err
},
}
cmd.PersistentFlags().StringVarP(&options.namespace, "namespace", "n", options.namespace, "Namespace of the specified resource")
cmd.PersistentFlags().StringVarP(&options.timeWindow, "time-window", "t", options.timeWindow, "Stat window (for example: \"10s\", \"1m\", \"10m\", \"1h\")")
cmd.PersistentFlags().StringVar(&options.fromResource, "from", options.fromResource, "If present, restricts outbound stats from the specified resource name")
cmd.PersistentFlags().StringVar(&options.fromNamespace, "from-namespace", options.fromNamespace, "Sets the namespace used from lookup the \"--from\" resource; by default the current \"--namespace\" is used")
cmd.PersistentFlags().StringVarP(&options.outputFormat, "output", "o", options.outputFormat, "Output format; currently only \"table\" (default) and \"json\" are supported")
return cmd
}
func requestRouteStatsFromAPI(client pb.ApiClient, req *pb.TopRoutesRequest, options *routesOptions) (string, error) {
resp, err := client.TopRoutes(context.Background(), req)
if err != nil {
return "", fmt.Errorf("TopRoutes API error: %v", err)
}
if e := resp.GetError(); e != nil {
return "", fmt.Errorf("TopRoutes API response error: %v", e.Error)
}
return renderRouteStats(resp, options), nil
}
func renderRouteStats(resp *pb.TopRoutesResponse, options *routesOptions) string {
var buffer bytes.Buffer
w := tabwriter.NewWriter(&buffer, 0, 0, padding, ' ', tabwriter.AlignRight)
writeRouteStatsToBuffer(resp, w, options)
w.Flush()
return renderStats(buffer, &options.statOptionsBase)
}
func writeRouteStatsToBuffer(resp *pb.TopRoutesResponse, w *tabwriter.Writer, options *routesOptions) {
table := make(map[string]*rowStats)
for _, r := range resp.GetRoutes().Rows {
if r.Stats != nil {
table[r.Route] = &rowStats{
requestRate: util.GetRequestRate(r.Stats, r.TimeWindow),
successRate: util.GetSuccessRate(r.Stats),
tlsPercent: util.GetPercentTls(r.Stats),
latencyP50: r.Stats.LatencyMsP50,
latencyP95: r.Stats.LatencyMsP95,
latencyP99: r.Stats.LatencyMsP99,
}
}
}
switch options.outputFormat {
case "table", "":
if len(table) == 0 {
fmt.Fprintln(os.Stderr, "No traffic found. Does the service have a service profile? You can create one with the `linkerd profile` command.")
os.Exit(0)
}
printRouteTable(table, w, options)
case "json":
printRouteJson(table, w)
}
}
func printRouteTable(stats map[string]*rowStats, w *tabwriter.Writer, options *routesOptions) {
headers := []string{
"ROUTE",
"SUCCESS",
"RPS",
"LATENCY_P50",
"LATENCY_P95",
"LATENCY_P99",
"TLS\t", // trailing \t is required to format last column
}
fmt.Fprintln(w, strings.Join(headers, "\t"))
templateString := "%s\t%.2f%%\t%.1frps\t%dms\t%dms\t%dms\t%.f%%\t\n"
templateStringEmpty := "%s\t-\t-\t-\t-\t-\t-\t\n"
sortedRoutes := sortRoutesByRps(stats)
for _, route := range sortedRoutes {
if row, ok := stats[route]; ok {
if route == "" {
route = "[UNKNOWN]"
}
fmt.Fprintf(w, templateString, route,
row.successRate*100,
row.requestRate,
row.latencyP50,
row.latencyP95,
row.latencyP99,
row.tlsPercent*100,
)
} else {
fmt.Fprintf(w, templateStringEmpty, route)
}
}
}
// Using pointers there where the value is NA and the corresponding json is null
type jsonRouteStats struct {
Route string `json:"route"`
Success *float64 `json:"success"`
Rps *float64 `json:"rps"`
LatencyMSp50 *uint64 `json:"latency_ms_p50"`
LatencyMSp95 *uint64 `json:"latency_ms_p95"`
LatencyMSp99 *uint64 `json:"latency_ms_p99"`
Tls *float64 `json:"tls"`
}
func printRouteJson(stats map[string]*rowStats, w *tabwriter.Writer) {
// avoid nil initialization so that if there are not stats it gets marshalled as an empty array vs null
entries := []*jsonRouteStats{}
sortedRoutes := sortRoutesByRps(stats)
for _, route := range sortedRoutes {
entry := &jsonRouteStats{
Route: route,
}
if route == "" {
entry.Route = "[UNKNOWN]"
}
if row, ok := stats[route]; ok {
entry.Success = &row.successRate
entry.Rps = &row.requestRate
entry.LatencyMSp50 = &row.latencyP50
entry.LatencyMSp95 = &row.latencyP95
entry.LatencyMSp99 = &row.latencyP99
entry.Tls = &stats[route].tlsPercent
}
entries = append(entries, entry)
}
b, err := json.MarshalIndent(entries, "", " ")
if err != nil {
log.Error(err.Error())
return
}
fmt.Fprintf(w, "%s\n", b)
}
func buildTopRoutesRequest(service string, options *routesOptions) (*pb.TopRoutesRequest, error) {
err := options.validateOutputFormat()
if err != nil {
return nil, err
}
target, err := util.BuildResource(options.namespace, fmt.Sprintf("%s/%s", k8s.Service, service))
if err != nil {
return nil, err
}
var fromRes pb.Resource
if options.fromResource != "" {
fromRes, err = util.BuildResource(options.fromNamespace, options.fromResource)
if err != nil {
return nil, err
}
}
requestParams := util.StatsRequestParams{
TimeWindow: options.timeWindow,
ResourceName: target.Name,
ResourceType: target.Type,
Namespace: options.namespace,
FromName: fromRes.Name,
FromType: fromRes.Type,
FromNamespace: options.fromNamespace,
}
return util.BuildTopRoutesRequest(requestParams)
}
func sortRoutesByRps(stats map[string]*rowStats) []string {
var sortedRoutes []string
for key := range stats {
sortedRoutes = append(sortedRoutes, key)
}
sort.Slice(sortedRoutes, func(i, j int) bool {
return stats[sortedRoutes[i]].requestRate > stats[sortedRoutes[j]].requestRate
})
return sortedRoutes
}

56
cli/cmd/routes_test.go Normal file
View File

@ -0,0 +1,56 @@
package cmd
import (
"testing"
"github.com/linkerd/linkerd2/controller/api/public"
)
type routesParamsExp struct {
options *routesOptions
routes []string
counts []uint64
file string
}
func TestRoutes(t *testing.T) {
options := newRoutesOptions()
t.Run("Returns route stats", func(t *testing.T) {
testRoutesCall(routesParamsExp{
routes: []string{"/a", "/b", "/c", ""},
counts: []uint64{90, 60, 0, 30},
options: options,
file: "routes_one_output.golden",
}, t)
})
options.outputFormat = "json"
t.Run("Returns route stats (json)", func(t *testing.T) {
testRoutesCall(routesParamsExp{
routes: []string{"/a", "/b", "/c", ""},
counts: []uint64{90, 60, 0, 30},
options: options,
file: "routes_one_output_json.golden",
}, t)
})
}
func testRoutesCall(exp routesParamsExp, t *testing.T) {
mockClient := &public.MockApiClient{}
response := public.GenTopRoutesResponse(exp.routes, exp.counts)
mockClient.TopRoutesResponseToReturn = &response
req, err := buildTopRoutesRequest("foobar", exp.options)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
output, err := requestRouteStatsFromAPI(mockClient, req, exp.options)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
diffCompareFile(t, output, exp.file)
}

View File

@ -19,14 +19,12 @@ import (
)
type statOptions struct {
namespace string
timeWindow string
statOptionsBase
toNamespace string
toResource string
fromNamespace string
fromResource string
allNamespaces bool
outputFormat string
}
type indexedResults struct {
@ -37,14 +35,12 @@ type indexedResults struct {
func newStatOptions() *statOptions {
return &statOptions{
namespace: "default",
timeWindow: "1m",
toNamespace: "",
toResource: "",
fromNamespace: "",
fromResource: "",
allNamespaces: false,
outputFormat: "",
statOptionsBase: *newStatOptionsBase(),
toNamespace: "",
toResource: "",
fromNamespace: "",
fromResource: "",
allNamespaces: false,
}
}
@ -148,7 +144,7 @@ If no resource name is specified, displays stats about all resources of the spec
}
}
output := renderStats(totalRows, options)
output := renderStatStats(totalRows, options)
_, err = fmt.Print(output)
return err
@ -189,23 +185,13 @@ func requestStatsFromAPI(client pb.ApiClient, req *pb.StatSummaryRequest, option
return resp, nil
}
func renderStats(rows []*pb.StatTable_PodGroup_Row, options *statOptions) string {
func renderStatStats(rows []*pb.StatTable_PodGroup_Row, options *statOptions) string {
var buffer bytes.Buffer
w := tabwriter.NewWriter(&buffer, 0, 0, padding, ' ', tabwriter.AlignRight)
writeStatsToBuffer(rows, w, options)
w.Flush()
var out string
switch options.outputFormat {
case "table", "":
// strip left padding on the first column
out = string(buffer.Bytes()[padding:])
out = strings.Replace(out, "\n"+strings.Repeat(" ", padding), "\n", -1)
case "json":
out = string(buffer.Bytes())
}
return out
return renderStats(buffer, &options.statOptionsBase)
}
const padding = 3
@ -276,9 +262,9 @@ func writeStatsToBuffer(rows []*pb.StatTable_PodGroup_Row, w *tabwriter.Writer,
if r.Stats != nil {
statTables[resourceKey][key].rowStats = &rowStats{
requestRate: getRequestRate(*r),
successRate: getSuccessRate(*r),
tlsPercent: getPercentTls(*r),
requestRate: util.GetRequestRate(r.Stats, r.TimeWindow),
successRate: util.GetSuccessRate(r.Stats),
tlsPercent: util.GetPercentTls(r.Stats),
latencyP50: r.Stats.LatencyMsP50,
latencyP95: r.Stats.LatencyMsP95,
latencyP99: r.Stats.LatencyMsP99,
@ -471,7 +457,7 @@ func buildStatSummaryRequests(resources []string, options *statOptions) ([]*pb.S
return nil, err
}
requestParams := util.StatSummaryRequestParams{
requestParams := util.StatsRequestParams{
TimeWindow: options.timeWindow,
ResourceName: target.Name,
ResourceType: target.Type,
@ -494,35 +480,6 @@ func buildStatSummaryRequests(resources []string, options *statOptions) ([]*pb.S
return requests, nil
}
func getRequestRate(r pb.StatTable_PodGroup_Row) float64 {
success := r.Stats.SuccessCount
failure := r.Stats.FailureCount
windowLength, err := time.ParseDuration(r.TimeWindow)
if err != nil {
log.Error(err.Error())
return 0.0
}
return float64(success+failure) / windowLength.Seconds()
}
func getSuccessRate(r pb.StatTable_PodGroup_Row) float64 {
success := r.Stats.SuccessCount
failure := r.Stats.FailureCount
if success+failure == 0 {
return 0.0
}
return float64(success) / float64(success+failure)
}
func getPercentTls(r pb.StatTable_PodGroup_Row) float64 {
reqTotal := r.Stats.SuccessCount + r.Stats.FailureCount
if reqTotal == 0 {
return 0.0
}
return float64(r.Stats.TlsRequestCount) / float64(reqTotal)
}
func sortStatsKeys(stats map[string]*row) []string {
var sortedKeys []string
for key := range stats {
@ -547,11 +504,7 @@ func (o *statOptions) validate(resourceType string) error {
}
}
if err := o.validateOutputFormat(); err != nil {
return err
}
return nil
return o.validateOutputFormat()
}
// validateConflictingFlags validates that the options do not contain mutually
@ -587,12 +540,3 @@ func (o *statOptions) validateNamespaceFlags() error {
return nil
}
func (o *statOptions) validateOutputFormat() error {
switch o.outputFormat {
case "table", "json", "":
return nil
default:
return fmt.Errorf("--output currently only supports table and json")
}
}

View File

@ -1,7 +1,6 @@
package cmd
import (
"io/ioutil"
"testing"
"github.com/linkerd/linkerd2/controller/api/public"
@ -26,7 +25,7 @@ func TestStat(t *testing.T) {
},
options: options,
resNs: []string{"emojivoto1"},
file: "testdata/stat_one_output.golden",
file: "stat_one_output.golden",
}, t)
})
@ -40,7 +39,7 @@ func TestStat(t *testing.T) {
},
options: options,
resNs: []string{"emojivoto1"},
file: "testdata/stat_one_output_json.golden",
file: "stat_one_output_json.golden",
}, t)
})
@ -55,7 +54,7 @@ func TestStat(t *testing.T) {
},
options: options,
resNs: []string{"emojivoto1", "emojivoto2"},
file: "testdata/stat_all_output.golden",
file: "stat_all_output.golden",
}, t)
})
@ -69,7 +68,7 @@ func TestStat(t *testing.T) {
},
options: options,
resNs: []string{"emojivoto1", "emojivoto2"},
file: "testdata/stat_all_output_json.golden",
file: "stat_all_output_json.golden",
}, t)
})
@ -143,13 +142,6 @@ func testStatCall(exp paramsExp, t *testing.T) {
mockClient.StatSummaryResponseToReturn = &response
goldenFileBytes, err := ioutil.ReadFile(exp.file)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
expectedOutput := string(goldenFileBytes)
args := []string{"ns"}
reqs, err := buildStatSummaryRequests(args, exp.options)
if err != nil {
@ -162,9 +154,7 @@ func testStatCall(exp paramsExp, t *testing.T) {
}
rows := respToRows(resp)
output := renderStats(rows, exp.options)
output := renderStatStats(rows, exp.options)
if output != expectedOutput {
t.Fatalf("Wrong output:\n expected: \n%s\n, got: \n%s", expectedOutput, output)
}
diffCompareFile(t, output, exp.file)
}

View File

@ -15,7 +15,7 @@ func diffCompare(t *testing.T, actual string, expected string) {
diffs := dmp.DiffMain(expected, actual, true)
// colorized output for local testing
// t.Fatalf("Unexpected output:\n%+v", dmp.DiffPrettyText(diffs))
t.Fatalf("Unexpected output:\n%+v", dmp.DiffPrettyText(diffs))
diffs = dmp.DiffCleanupSemantic(diffs)
patches := dmp.PatchMake(diffs)
@ -47,3 +47,8 @@ func readOptionalTestFile(t *testing.T, fileName string) string {
return fileData
}
func diffCompareFile(t *testing.T, actual string, goldenFile string) {
expectedOutput := readOptionalTestFile(t, goldenFile)
diffCompare(t, actual, expectedOutput)
}

View File

@ -0,0 +1,5 @@
ROUTE SUCCESS RPS LATENCY_P50 LATENCY_P95 LATENCY_P99 TLS
/a 100.00% 1.5rps 123ms 123ms 123ms 100%
/b 100.00% 1.0rps 123ms 123ms 123ms 100%
[UNKNOWN] 100.00% 0.5rps 123ms 123ms 123ms 100%
/c 0.00% 0.0rps 123ms 123ms 123ms 0%

View File

@ -0,0 +1,38 @@
[
{
"route": "/a",
"success": 1,
"rps": 1.5,
"latency_ms_p50": 123,
"latency_ms_p95": 123,
"latency_ms_p99": 123,
"tls": 1
},
{
"route": "/b",
"success": 1,
"rps": 1,
"latency_ms_p50": 123,
"latency_ms_p95": 123,
"latency_ms_p99": 123,
"tls": 1
},
{
"route": "[UNKNOWN]",
"success": 1,
"rps": 0.5,
"latency_ms_p50": 123,
"latency_ms_p95": 123,
"latency_ms_p99": 123,
"tls": 1
},
{
"route": "/c",
"success": 0,
"rps": 0,
"latency_ms_p50": 123,
"latency_ms_p95": 123,
"latency_ms_p99": 123,
"tls": 0
}
]

View File

@ -38,6 +38,12 @@ func (c *grpcOverHttpClient) StatSummary(ctx context.Context, req *pb.StatSummar
return &msg, err
}
func (c *grpcOverHttpClient) TopRoutes(ctx context.Context, req *pb.TopRoutesRequest, _ ...grpc.CallOption) (*pb.TopRoutesResponse, error) {
var msg pb.TopRoutesResponse
err := c.apiRequest(ctx, "TopRoutes", req, &msg)
return &msg, err
}
func (c *grpcOverHttpClient) Version(ctx context.Context, req *pb.Empty, _ ...grpc.CallOption) (*pb.VersionInfo, error) {
var msg pb.VersionInfo
err := c.apiRequest(ctx, "Version", req, &msg)

View File

@ -18,6 +18,7 @@ import (
var (
statSummaryPath = fullUrlPathFor("StatSummary")
topRoutesPath = fullUrlPathFor("TopRoutes")
versionPath = fullUrlPathFor("Version")
listPodsPath = fullUrlPathFor("ListPods")
tapByResourcePath = fullUrlPathFor("TapByResource")
@ -42,6 +43,8 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
switch req.URL.Path {
case statSummaryPath:
h.handleStatSummary(w, req)
case topRoutesPath:
h.handleTopRoutes(w, req)
case versionPath:
h.handleVersion(w, req)
case listPodsPath:
@ -77,6 +80,27 @@ func (h *handler) handleStatSummary(w http.ResponseWriter, req *http.Request) {
}
}
func (h *handler) handleTopRoutes(w http.ResponseWriter, req *http.Request) {
var protoRequest pb.TopRoutesRequest
err := httpRequestToProto(req, &protoRequest)
if err != nil {
writeErrorToHttpResponse(w, err)
return
}
rsp, err := h.grpcServer.TopRoutes(req.Context(), &protoRequest)
if err != nil {
writeErrorToHttpResponse(w, err)
return
}
err = writeProtoToHttpResponse(w, rsp)
if err != nil {
writeErrorToHttpResponse(w, err)
return
}
}
func (h *handler) handleVersion(w http.ResponseWriter, req *http.Request) {
var protoRequest pb.Empty
err := httpRequestToProto(req, &protoRequest)

View File

@ -24,6 +24,11 @@ func (m *mockGrpcServer) StatSummary(ctx context.Context, req *pb.StatSummaryReq
return m.ResponseToReturn.(*pb.StatSummaryResponse), m.ErrorToReturn
}
func (m *mockGrpcServer) TopRoutes(ctx context.Context, req *pb.TopRoutesRequest) (*pb.TopRoutesResponse, error) {
m.LastRequestReceived = req
return m.ResponseToReturn.(*pb.TopRoutesResponse), m.ErrorToReturn
}
func (m *mockGrpcServer) Version(ctx context.Context, req *pb.Empty) (*pb.VersionInfo, error) {
m.LastRequestReceived = req
return m.ResponseToReturn.(*pb.VersionInfo), m.ErrorToReturn

View File

@ -0,0 +1,177 @@
package public
import (
"context"
"fmt"
"math"
"time"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/prometheus/common/model"
log "github.com/sirupsen/logrus"
)
type promType string
type promResult struct {
prom promType
vec model.Vector
err error
}
const (
promRequests = promType("QUERY_REQUESTS")
promLatencyP50 = promType("0.5")
promLatencyP95 = promType("0.95")
promLatencyP99 = promType("0.99")
namespaceLabel = model.LabelName("namespace")
dstNamespaceLabel = model.LabelName("dst_namespace")
)
var promTypes = []promType{promRequests, promLatencyP50, promLatencyP95, promLatencyP99}
func extractSampleValue(sample *model.Sample) uint64 {
value := uint64(0)
if !math.IsNaN(float64(sample.Value)) {
value = uint64(math.Round(float64(sample.Value)))
}
return value
}
func (s *grpcServer) queryProm(ctx context.Context, query string) (model.Vector, error) {
log.Debugf("Query request:\n\t%+v", query)
// single data point (aka summary) query
res, err := s.prometheusAPI.Query(ctx, query, time.Time{})
if err != nil {
log.Errorf("Query(%+v) failed with: %+v", query, err)
return nil, err
}
log.Debugf("Query response:\n\t%+v", res)
if res.Type() != model.ValVector {
err = fmt.Errorf("Unexpected query result type (expected Vector): %s", res.Type())
log.Error(err)
return nil, err
}
return res.(model.Vector), nil
}
// add filtering by resource type
// note that metricToKey assumes the label ordering (namespace, name)
func promGroupByLabelNames(resource *pb.Resource) model.LabelNames {
names := model.LabelNames{namespaceLabel}
if resource.Type != k8s.Namespace {
names = append(names, promResourceType(resource))
}
return names
}
// add filtering by resource type
// note that metricToKey assumes the label ordering (namespace, name)
func promDstGroupByLabelNames(resource *pb.Resource) model.LabelNames {
names := model.LabelNames{dstNamespaceLabel}
if isNonK8sResourceQuery(resource.GetType()) {
names = append(names, promResourceType(resource))
} else if resource.Type != k8s.Namespace {
names = append(names, "dst_"+promResourceType(resource))
}
return names
}
// query a named resource
func promQueryLabels(resource *pb.Resource) model.LabelSet {
set := model.LabelSet{}
if resource.Name != "" {
set[promResourceType(resource)] = model.LabelValue(resource.Name)
}
if shouldAddNamespaceLabel(resource) {
set[namespaceLabel] = model.LabelValue(resource.Namespace)
}
return set
}
// query a named resource
func promDstQueryLabels(resource *pb.Resource) model.LabelSet {
set := model.LabelSet{}
if resource.Name != "" {
if isNonK8sResourceQuery(resource.GetType()) {
set[promResourceType(resource)] = model.LabelValue(resource.Name)
} else {
set["dst_"+promResourceType(resource)] = model.LabelValue(resource.Name)
if shouldAddNamespaceLabel(resource) {
set[dstNamespaceLabel] = model.LabelValue(resource.Namespace)
}
}
}
return set
}
// determine if we should add "namespace=<namespace>" to a named query
func shouldAddNamespaceLabel(resource *pb.Resource) bool {
return resource.Type != k8s.Namespace && resource.Namespace != ""
}
// query for inbound or outbound requests
func promDirectionLabels(direction string) model.LabelSet {
return model.LabelSet{
model.LabelName("direction"): model.LabelValue(direction),
}
}
func promResourceType(resource *pb.Resource) model.LabelName {
return model.LabelName(resource.Type)
}
func (s *grpcServer) getPrometheusMetrics(ctx context.Context, volumeQueryTemplate, latencyQueryTemplate, labels, timeWindow, groupBy string) ([]promResult, error) {
resultChan := make(chan promResult)
// kick off 4 asynchronous queries: 1 request volume + 3 latency
go func() {
// success/failure counts
requestsQuery := fmt.Sprintf(volumeQueryTemplate, labels, timeWindow, groupBy)
resultVector, err := s.queryProm(ctx, requestsQuery)
resultChan <- promResult{
prom: promRequests,
vec: resultVector,
err: err,
}
}()
for _, quantile := range []promType{promLatencyP50, promLatencyP95, promLatencyP99} {
go func(quantile promType) {
latencyQuery := fmt.Sprintf(latencyQueryTemplate, quantile, labels, timeWindow, groupBy)
latencyResult, err := s.queryProm(ctx, latencyQuery)
resultChan <- promResult{
prom: quantile,
vec: latencyResult,
err: err,
}
}(quantile)
}
// process results, receive one message per prometheus query type
var err error
results := []promResult{}
for i := 0; i < len(promTypes); i++ {
result := <-resultChan
if result.err != nil {
log.Errorf("queryProm failed with: %s", result.err)
err = result.err
} else {
results = append(results, result)
}
}
if err != nil {
return nil, err
}
return results, nil
}

View File

@ -2,29 +2,18 @@ package public
import (
"context"
"fmt"
"math"
"time"
proto "github.com/golang/protobuf/proto"
"github.com/linkerd/linkerd2/controller/api/util"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/prometheus/common/model"
log "github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
type promType string
type promResult struct {
prom promType
vec model.Vector
err error
}
type resourceResult struct {
res *pb.StatTable
err error
@ -44,18 +33,8 @@ type rKey struct {
const (
reqQuery = "sum(increase(response_total%s[%s])) by (%s, classification, tls)"
latencyQuantileQuery = "histogram_quantile(%s, sum(irate(response_latency_ms_bucket%s[%s])) by (le, %s))"
promRequests = promType("QUERY_REQUESTS")
promLatencyP50 = promType("0.5")
promLatencyP95 = promType("0.95")
promLatencyP99 = promType("0.99")
namespaceLabel = model.LabelName("namespace")
dstNamespaceLabel = model.LabelName("dst_namespace")
)
var promTypes = []promType{promRequests, promLatencyP50, promLatencyP95, promLatencyP99}
type podStats struct {
inMesh uint64
total uint64
@ -71,7 +50,7 @@ func (s *grpcServer) StatSummary(ctx context.Context, req *pb.StatSummaryRequest
}
// special case to check for services as outbound only
if isInvalidServiceRequest(req) {
if isInvalidServiceRequest(req.Selector, req.GetFromResource()) {
return statSummaryError(req, "service only supported as a target on 'from' queries, or as a destination on 'to' queries"), nil
}
@ -130,6 +109,14 @@ func (s *grpcServer) StatSummary(ctx context.Context, req *pb.StatSummaryRequest
return &rsp, nil
}
func isInvalidServiceRequest(selector *pb.ResourceSelection, fromResource *pb.Resource) bool {
if fromResource != nil {
return fromResource.Type == k8s.Service
} else {
return selector.Resource.Type == k8s.Service
}
}
func statSummaryError(req *pb.StatSummaryRequest, message string) *pb.StatSummaryResponse {
return &pb.StatSummaryResponse{
Response: &pb.StatSummaryResponse_Error{
@ -181,7 +168,7 @@ func (s *grpcServer) k8sResourceQuery(ctx context.Context, req *pb.StatSummaryRe
return resourceResult{res: nil, err: err}
}
requestMetrics, err := s.getPrometheusMetrics(ctx, req, req.TimeWindow)
requestMetrics, err := s.getStatMetrics(ctx, req, req.TimeWindow)
if err != nil {
return resourceResult{res: nil, err: err}
}
@ -226,7 +213,7 @@ func (s *grpcServer) k8sResourceQuery(ctx context.Context, req *pb.StatSummaryRe
}
func (s *grpcServer) nonK8sResourceQuery(ctx context.Context, req *pb.StatSummaryRequest) resourceResult {
requestMetrics, err := s.getPrometheusMetrics(ctx, req, req.TimeWindow)
requestMetrics, err := s.getStatMetrics(ctx, req, req.TimeWindow)
if err != nil {
return resourceResult{res: nil, err: err}
}
@ -284,75 +271,6 @@ func getResultKeys(
return keys
}
// add filtering by resource type
// note that metricToKey assumes the label ordering (namespace, name)
func promGroupByLabelNames(resource *pb.Resource) model.LabelNames {
names := model.LabelNames{namespaceLabel}
if resource.Type != k8s.Namespace {
names = append(names, promResourceType(resource))
}
return names
}
// add filtering by resource type
// note that metricToKey assumes the label ordering (namespace, name)
func promDstGroupByLabelNames(resource *pb.Resource) model.LabelNames {
names := model.LabelNames{dstNamespaceLabel}
if isNonK8sResourceQuery(resource.GetType()) {
names = append(names, promResourceType(resource))
} else if resource.Type != k8s.Namespace {
names = append(names, "dst_"+promResourceType(resource))
}
return names
}
// query a named resource
func promQueryLabels(resource *pb.Resource) model.LabelSet {
set := model.LabelSet{}
if resource.Name != "" {
set[promResourceType(resource)] = model.LabelValue(resource.Name)
}
if shouldAddNamespaceLabel(resource) {
set[namespaceLabel] = model.LabelValue(resource.Namespace)
}
return set
}
// query a named resource
func promDstQueryLabels(resource *pb.Resource) model.LabelSet {
set := model.LabelSet{}
if resource.Name != "" {
if isNonK8sResourceQuery(resource.GetType()) {
set[promResourceType(resource)] = model.LabelValue(resource.Name)
} else {
set["dst_"+promResourceType(resource)] = model.LabelValue(resource.Name)
if shouldAddNamespaceLabel(resource) {
set[dstNamespaceLabel] = model.LabelValue(resource.Namespace)
}
}
}
return set
}
// determine if we should add "namespace=<namespace>" to a named query
func shouldAddNamespaceLabel(resource *pb.Resource) bool {
return resource.Type != k8s.Namespace && resource.Namespace != ""
}
// query for inbound or outbound requests
func promDirectionLabels(direction string) model.LabelSet {
return model.LabelSet{
model.LabelName("direction"): model.LabelValue(direction),
}
}
func promResourceType(resource *pb.Resource) model.LabelName {
return model.LabelName(resource.Type)
}
func buildRequestLabels(req *pb.StatSummaryRequest) (labels model.LabelSet, labelNames model.LabelNames) {
// labelNames: the group by in the prometheus query
// labels: the labels for the resource we want to query for
@ -382,48 +300,10 @@ func buildRequestLabels(req *pb.StatSummaryRequest) (labels model.LabelSet, labe
return
}
func (s *grpcServer) getPrometheusMetrics(ctx context.Context, req *pb.StatSummaryRequest, timeWindow string) (map[rKey]*pb.BasicStats, error) {
func (s *grpcServer) getStatMetrics(ctx context.Context, req *pb.StatSummaryRequest, timeWindow string) (map[rKey]*pb.BasicStats, error) {
reqLabels, groupBy := buildRequestLabels(req)
resultChan := make(chan promResult)
results, err := s.getPrometheusMetrics(ctx, reqQuery, latencyQuantileQuery, reqLabels.String(), timeWindow, groupBy.String())
// kick off 4 asynchronous queries: 1 request volume + 3 latency
go func() {
// success/failure counts
requestsQuery := fmt.Sprintf(reqQuery, reqLabels, timeWindow, groupBy)
resultVector, err := s.queryProm(ctx, requestsQuery)
resultChan <- promResult{
prom: promRequests,
vec: resultVector,
err: err,
}
}()
for _, quantile := range []promType{promLatencyP50, promLatencyP95, promLatencyP99} {
go func(quantile promType) {
latencyQuery := fmt.Sprintf(latencyQuantileQuery, quantile, reqLabels, timeWindow, groupBy)
latencyResult, err := s.queryProm(ctx, latencyQuery)
resultChan <- promResult{
prom: quantile,
vec: latencyResult,
err: err,
}
}(quantile)
}
// process results, receive one message per prometheus query type
var err error
results := []promResult{}
for i := 0; i < len(promTypes); i++ {
result := <-resultChan
if result.err != nil {
log.Errorf("queryProm failed with: %s", result.err)
err = result.err
} else {
results = append(results, result)
}
}
if err != nil {
return nil, err
}
@ -469,14 +349,6 @@ func processPrometheusMetrics(req *pb.StatSummaryRequest, results []promResult,
return basicStats
}
func extractSampleValue(sample *model.Sample) uint64 {
value := uint64(0)
if !math.IsNaN(float64(sample.Value)) {
value = uint64(math.Round(float64(sample.Value)))
}
return value
}
func metricToKey(req *pb.StatSummaryRequest, metric model.Metric, groupBy model.LabelNames) rKey {
// this key is used to match the metric stats we queried from prometheus
// with the k8s object stats we queried from k8s
@ -558,32 +430,3 @@ func checkContainerErrors(containerStatuses []apiv1.ContainerStatus, containerNa
}
return errors
}
func isInvalidServiceRequest(req *pb.StatSummaryRequest) bool {
fromResource := req.GetFromResource()
if fromResource != nil {
return fromResource.Type == k8s.Service
} else {
return req.Selector.Resource.Type == k8s.Service
}
}
func (s *grpcServer) queryProm(ctx context.Context, query string) (model.Vector, error) {
log.Debugf("Query request:\n\t%+v", query)
// single data point (aka summary) query
res, err := s.prometheusAPI.Query(ctx, query, time.Time{})
if err != nil {
log.Errorf("Query(%+v) failed with: %+v", query, err)
return nil, err
}
log.Debugf("Query response:\n\t%+v", res)
if res.Type() != model.ValVector {
err = fmt.Errorf("Unexpected query result type (expected Vector): %s", res.Type())
log.Error(err)
return nil, err
}
return res.(model.Vector), nil
}

View File

@ -3,7 +3,6 @@ package public
import (
"context"
"errors"
"reflect"
"sort"
"testing"
@ -16,12 +15,9 @@ import (
)
type statSumExpected struct {
err error
k8sConfigs []string // k8s objects to seed the API
mockPromResponse model.Value // mock out a prometheus query response
expectedPrometheusQueries []string // queries we expect public-api to issue to prometheus
req pb.StatSummaryRequest // the request we would like to test
expectedResponse pb.StatSummaryResponse // the stat response we expect
expectedStatRpc
req pb.StatSummaryRequest // the request we would like to test
expectedResponse pb.StatSummaryResponse // the stat response we expect
}
func prometheusMetric(resName string, resType string, resNs string, classification string, isDst bool) model.Vector {
@ -69,36 +65,17 @@ func genEmptyResponse() pb.StatSummaryResponse {
func testStatSummary(t *testing.T, expectations []statSumExpected) {
for _, exp := range expectations {
k8sAPI, err := k8s.NewFakeAPI("", exp.k8sConfigs...)
mockProm, fakeGrpcServer, err := newMockGrpcServer(exp.expectedStatRpc)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
t.Fatalf("Error creating mock grpc server: %s", err)
}
mockProm := &MockProm{Res: exp.mockPromResponse}
fakeGrpcServer := newGrpcServer(
mockProm,
tap.NewTapClient(nil),
k8sAPI,
"linkerd",
[]string{},
)
k8sAPI.Sync(nil)
rsp, err := fakeGrpcServer.StatSummary(context.TODO(), &exp.req)
if err != exp.err {
t.Fatalf("Expected error: %s, Got: %s", exp.err, err)
}
if len(exp.expectedPrometheusQueries) > 0 {
sort.Strings(exp.expectedPrometheusQueries)
sort.Strings(mockProm.QueriesExecuted)
if !reflect.DeepEqual(exp.expectedPrometheusQueries, mockProm.QueriesExecuted) {
t.Fatalf("Prometheus queries incorrect. \nExpected:\n%+v \nGot:\n%+v",
exp.expectedPrometheusQueries, mockProm.QueriesExecuted)
}
}
exp.verifyPromQueries(mockProm)
rspStatTables := rsp.GetOk().StatTables
@ -156,8 +133,9 @@ func TestStatSummary(t *testing.T) {
t.Run("Successfully performs a query based on resource type", func(t *testing.T) {
expectations := []statSumExpected{
statSumExpected{
err: nil,
k8sConfigs: []string{`
expectedStatRpc: expectedStatRpc{
err: nil,
k8sConfigs: []string{`
apiVersion: apps/v1beta2
kind: Deployment
metadata:
@ -205,8 +183,9 @@ metadata:
status:
phase: Completed
`,
},
mockPromResponse: prometheusMetric("emoji", "deployment", "emojivoto", "success", false),
},
mockPromResponse: prometheusMetric("emoji", "deployment", "emojivoto", "success", false),
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
@ -230,8 +209,9 @@ status:
t.Run("Queries prometheus for a specific resource if name is specified", func(t *testing.T) {
expectations := []statSumExpected{
statSumExpected{
err: nil,
k8sConfigs: []string{`
expectedStatRpc: expectedStatRpc{
err: nil,
k8sConfigs: []string{`
apiVersion: v1
kind: Pod
metadata:
@ -243,8 +223,15 @@ metadata:
status:
phase: Running
`,
},
mockPromResponse: prometheusMetric("emojivoto-1", "pod", "emojivoto", "success", false),
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{direction="inbound", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{direction="inbound", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{direction="inbound", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`sum(increase(response_total{direction="inbound", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (namespace, pod, classification, tls)`,
},
},
mockPromResponse: prometheusMetric("emojivoto-1", "pod", "emojivoto", "success", false),
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
@ -255,12 +242,6 @@ status:
},
TimeWindow: "1m",
},
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{direction="inbound", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{direction="inbound", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{direction="inbound", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`sum(increase(response_total{direction="inbound", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (namespace, pod, classification, tls)`,
},
expectedResponse: GenStatSummaryResponse("emojivoto-1", pkgK8s.Pod, []string{"emojivoto"}, &PodCounts{
MeshedPods: 1,
RunningPods: 1,
@ -275,8 +256,9 @@ status:
t.Run("Queries prometheus for outbound metrics if from resource is specified, ignores resource name", func(t *testing.T) {
expectations := []statSumExpected{
statSumExpected{
err: nil,
k8sConfigs: []string{`
expectedStatRpc: expectedStatRpc{
err: nil,
k8sConfigs: []string{`
apiVersion: v1
kind: Pod
metadata:
@ -288,8 +270,15 @@ metadata:
status:
phase: Running
`,
},
mockPromResponse: prometheusMetric("emojivoto-2", "pod", "emojivoto", "success", false),
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-1", namespace="emojivoto", pod="emojivoto-2"}[1m])) by (le, dst_namespace, dst_pod))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-1", namespace="emojivoto", pod="emojivoto-2"}[1m])) by (le, dst_namespace, dst_pod))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-1", namespace="emojivoto", pod="emojivoto-2"}[1m])) by (le, dst_namespace, dst_pod))`,
`sum(increase(response_total{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-1", namespace="emojivoto", pod="emojivoto-2"}[1m])) by (dst_namespace, dst_pod, classification, tls)`,
},
},
mockPromResponse: prometheusMetric("emojivoto-2", "pod", "emojivoto", "success", false),
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
@ -307,12 +296,6 @@ status:
},
},
},
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-1", namespace="emojivoto", pod="emojivoto-2"}[1m])) by (le, dst_namespace, dst_pod))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-1", namespace="emojivoto", pod="emojivoto-2"}[1m])) by (le, dst_namespace, dst_pod))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-1", namespace="emojivoto", pod="emojivoto-2"}[1m])) by (le, dst_namespace, dst_pod))`,
`sum(increase(response_total{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-1", namespace="emojivoto", pod="emojivoto-2"}[1m])) by (dst_namespace, dst_pod, classification, tls)`,
},
expectedResponse: genEmptyResponse(),
},
}
@ -323,8 +306,9 @@ status:
t.Run("Queries prometheus for outbound metrics if --to resource is specified", func(t *testing.T) {
expectations := []statSumExpected{
statSumExpected{
err: nil,
k8sConfigs: []string{`
expectedStatRpc: expectedStatRpc{
err: nil,
k8sConfigs: []string{`
apiVersion: v1
kind: Pod
metadata:
@ -336,9 +320,16 @@ metadata:
status:
phase: Running
`,
},
mockPromResponse: model.Vector{
genPromSample("emojivoto-1", "pod", "emojivoto", "success", false),
},
mockPromResponse: model.Vector{
genPromSample("emojivoto-1", "pod", "emojivoto", "success", false),
},
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`sum(increase(response_total{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (namespace, pod, classification, tls)`,
},
},
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
@ -357,12 +348,6 @@ status:
},
},
},
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`sum(increase(response_total{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (namespace, pod, classification, tls)`,
},
expectedResponse: GenStatSummaryResponse("emojivoto-1", pkgK8s.Pod, []string{"emojivoto"}, &PodCounts{
MeshedPods: 1,
RunningPods: 1,
@ -377,8 +362,9 @@ status:
t.Run("Queries prometheus for outbound metrics if --to resource is specified and --to-namespace is different from the resource namespace", func(t *testing.T) {
expectations := []statSumExpected{
statSumExpected{
err: nil,
k8sConfigs: []string{`
expectedStatRpc: expectedStatRpc{
err: nil,
k8sConfigs: []string{`
apiVersion: v1
kind: Pod
metadata:
@ -390,9 +376,16 @@ metadata:
status:
phase: Running
`,
},
mockPromResponse: model.Vector{
genPromSample("emojivoto-1", "pod", "emojivoto", "success", false),
},
mockPromResponse: model.Vector{
genPromSample("emojivoto-1", "pod", "emojivoto", "success", false),
},
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="totallydifferent", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="totallydifferent", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="totallydifferent", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`sum(increase(response_total{direction="outbound", dst_namespace="totallydifferent", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (namespace, pod, classification, tls)`,
},
},
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
@ -411,12 +404,6 @@ status:
},
},
},
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="totallydifferent", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="totallydifferent", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="totallydifferent", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`sum(increase(response_total{direction="outbound", dst_namespace="totallydifferent", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (namespace, pod, classification, tls)`,
},
expectedResponse: GenStatSummaryResponse("emojivoto-1", pkgK8s.Pod, []string{"emojivoto"}, &PodCounts{
MeshedPods: 1,
RunningPods: 1,
@ -431,8 +418,9 @@ status:
t.Run("Queries prometheus for outbound metrics if --from resource is specified", func(t *testing.T) {
expectations := []statSumExpected{
statSumExpected{
err: nil,
k8sConfigs: []string{`
expectedStatRpc: expectedStatRpc{
err: nil,
k8sConfigs: []string{`
apiVersion: v1
kind: Pod
metadata:
@ -455,9 +443,16 @@ metadata:
status:
phase: Running
`,
},
mockPromResponse: model.Vector{
genPromSample("emojivoto-1", "pod", "emojivoto", "success", true),
},
mockPromResponse: model.Vector{
genPromSample("emojivoto-1", "pod", "emojivoto", "success", true),
},
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{direction="outbound", pod="emojivoto-2"}[1m])) by (le, dst_namespace, dst_pod))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{direction="outbound", pod="emojivoto-2"}[1m])) by (le, dst_namespace, dst_pod))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{direction="outbound", pod="emojivoto-2"}[1m])) by (le, dst_namespace, dst_pod))`,
`sum(increase(response_total{direction="outbound", pod="emojivoto-2"}[1m])) by (dst_namespace, dst_pod, classification, tls)`,
},
},
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
@ -476,12 +471,6 @@ status:
},
},
},
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{direction="outbound", pod="emojivoto-2"}[1m])) by (le, dst_namespace, dst_pod))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{direction="outbound", pod="emojivoto-2"}[1m])) by (le, dst_namespace, dst_pod))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{direction="outbound", pod="emojivoto-2"}[1m])) by (le, dst_namespace, dst_pod))`,
`sum(increase(response_total{direction="outbound", pod="emojivoto-2"}[1m])) by (dst_namespace, dst_pod, classification, tls)`,
},
expectedResponse: GenStatSummaryResponse("emojivoto-1", pkgK8s.Pod, []string{"emojivoto"}, &PodCounts{
MeshedPods: 1,
RunningPods: 1,
@ -496,8 +485,9 @@ status:
t.Run("Queries prometheus for outbound metrics if --from resource is specified and --from-namespace is different from the resource namespace", func(t *testing.T) {
expectations := []statSumExpected{
statSumExpected{
err: nil,
k8sConfigs: []string{`
expectedStatRpc: expectedStatRpc{
err: nil,
k8sConfigs: []string{`
apiVersion: v1
kind: Pod
metadata:
@ -520,9 +510,16 @@ metadata:
status:
phase: Running
`,
},
mockPromResponse: model.Vector{
genPromSample("emojivoto-1", "pod", "emojivoto", "success", true),
},
mockPromResponse: model.Vector{
genPromSample("emojivoto-1", "pod", "emojivoto", "success", true),
},
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-1", namespace="totallydifferent", pod="emojivoto-2"}[1m])) by (le, dst_namespace, dst_pod))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-1", namespace="totallydifferent", pod="emojivoto-2"}[1m])) by (le, dst_namespace, dst_pod))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-1", namespace="totallydifferent", pod="emojivoto-2"}[1m])) by (le, dst_namespace, dst_pod))`,
`sum(increase(response_total{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-1", namespace="totallydifferent", pod="emojivoto-2"}[1m])) by (dst_namespace, dst_pod, classification, tls)`,
},
},
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
@ -541,12 +538,6 @@ status:
},
},
},
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-1", namespace="totallydifferent", pod="emojivoto-2"}[1m])) by (le, dst_namespace, dst_pod))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-1", namespace="totallydifferent", pod="emojivoto-2"}[1m])) by (le, dst_namespace, dst_pod))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-1", namespace="totallydifferent", pod="emojivoto-2"}[1m])) by (le, dst_namespace, dst_pod))`,
`sum(increase(response_total{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-1", namespace="totallydifferent", pod="emojivoto-2"}[1m])) by (dst_namespace, dst_pod, classification, tls)`,
},
expectedResponse: GenStatSummaryResponse("emojivoto-1", pkgK8s.Pod, []string{"emojivoto"}, &PodCounts{
MeshedPods: 1,
RunningPods: 1,
@ -561,8 +552,9 @@ status:
t.Run("Successfully queries for resource type 'all'", func(t *testing.T) {
expectations := []statSumExpected{
statSumExpected{
err: nil,
k8sConfigs: []string{`
expectedStatRpc: expectedStatRpc{
err: nil,
k8sConfigs: []string{`
apiVersion: apps/v1beta2
kind: Deployment
metadata:
@ -610,8 +602,9 @@ metadata:
status:
phase: Running
`,
},
mockPromResponse: prometheusMetric("emoji-deploy", "deployment", "emojivoto", "success", false),
},
mockPromResponse: prometheusMetric("emoji-deploy", "deployment", "emojivoto", "success", false),
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
@ -621,6 +614,7 @@ status:
},
TimeWindow: "1m",
},
expectedResponse: pb.StatSummaryResponse{
Response: &pb.StatSummaryResponse_Ok_{ // https://github.com/golang/protobuf/issues/205
Ok: &pb.StatSummaryResponse_Ok{
@ -735,7 +729,9 @@ status:
expectations := []statSumExpected{
statSumExpected{
err: errors.New("rpc error: code = Unimplemented desc = unimplemented resource type: badtype"),
expectedStatRpc: expectedStatRpc{
err: errors.New("rpc error: code = Unimplemented desc = unimplemented resource type: badtype"),
},
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
@ -745,7 +741,9 @@ status:
},
},
statSumExpected{
err: errors.New("rpc error: code = Unimplemented desc = unimplemented resource type: deployments"),
expectedStatRpc: expectedStatRpc{
err: errors.New("rpc error: code = Unimplemented desc = unimplemented resource type: deployments"),
},
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
@ -755,7 +753,9 @@ status:
},
},
statSumExpected{
err: errors.New("rpc error: code = Unimplemented desc = unimplemented resource type: po"),
expectedStatRpc: expectedStatRpc{
err: errors.New("rpc error: code = Unimplemented desc = unimplemented resource type: po"),
},
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
@ -894,8 +894,9 @@ status:
t.Run("when pod phase is succeeded or failed", func(t *testing.T) {
expectations := []statSumExpected{
statSumExpected{
err: nil,
k8sConfigs: []string{`
expectedStatRpc: expectedStatRpc{
err: nil,
k8sConfigs: []string{`
apiVersion: v1
kind: Pod
metadata:
@ -918,7 +919,9 @@ metadata:
status:
phase: Failed
`},
mockPromResponse: model.Vector{},
mockPromResponse: model.Vector{},
expectedPrometheusQueries: []string{},
},
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
@ -927,8 +930,7 @@ status:
},
},
},
expectedPrometheusQueries: []string{},
expectedResponse: genEmptyResponse(),
expectedResponse: genEmptyResponse(),
},
}
@ -938,8 +940,9 @@ status:
t.Run("for succeeded or failed replicas of a deployment", func(t *testing.T) {
expectations := []statSumExpected{
statSumExpected{
err: nil,
k8sConfigs: []string{`
expectedStatRpc: expectedStatRpc{
err: nil,
k8sConfigs: []string{`
apiVersion: apps/v1beta2
kind: Deployment
metadata:
@ -998,7 +1001,8 @@ metadata:
status:
phase: Succeeded
`},
mockPromResponse: prometheusMetric("emoji", "deployment", "emojivoto", "success", false),
mockPromResponse: prometheusMetric("emoji", "deployment", "emojivoto", "success", false),
},
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
@ -1023,8 +1027,9 @@ status:
t.Run("Queries prometheus for authority stats", func(t *testing.T) {
expectations := []statSumExpected{
statSumExpected{
err: nil,
k8sConfigs: []string{`
expectedStatRpc: expectedStatRpc{
err: nil,
k8sConfigs: []string{`
apiVersion: v1
kind: Pod
metadata:
@ -1036,9 +1041,16 @@ metadata:
status:
phase: Running
`,
},
mockPromResponse: model.Vector{
genPromSample("10.1.1.239:9995", "authority", "linkerd", "success", false),
},
mockPromResponse: model.Vector{
genPromSample("10.1.1.239:9995", "authority", "linkerd", "success", false),
},
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{direction="inbound", namespace="linkerd"}[1m])) by (le, namespace, authority))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{direction="inbound", namespace="linkerd"}[1m])) by (le, namespace, authority))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{direction="inbound", namespace="linkerd"}[1m])) by (le, namespace, authority))`,
`sum(increase(response_total{direction="inbound", namespace="linkerd"}[1m])) by (namespace, authority, classification, tls)`,
},
},
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
@ -1049,12 +1061,6 @@ status:
},
TimeWindow: "1m",
},
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{direction="inbound", namespace="linkerd"}[1m])) by (le, namespace, authority))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{direction="inbound", namespace="linkerd"}[1m])) by (le, namespace, authority))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{direction="inbound", namespace="linkerd"}[1m])) by (le, namespace, authority))`,
`sum(increase(response_total{direction="inbound", namespace="linkerd"}[1m])) by (namespace, authority, classification, tls)`,
},
expectedResponse: GenStatSummaryResponse("10.1.1.239:9995", pkgK8s.Authority, []string{"linkerd"}, nil),
},
}
@ -1065,8 +1071,9 @@ status:
t.Run("Queries prometheus for authority stats when --from deployment is used", func(t *testing.T) {
expectations := []statSumExpected{
statSumExpected{
err: nil,
k8sConfigs: []string{`
expectedStatRpc: expectedStatRpc{
err: nil,
k8sConfigs: []string{`
apiVersion: v1
kind: Pod
metadata:
@ -1078,9 +1085,16 @@ metadata:
status:
phase: Running
`,
},
mockPromResponse: model.Vector{
genPromSample("10.1.1.239:9995", "authority", "linkerd", "success", false),
},
mockPromResponse: model.Vector{
genPromSample("10.1.1.239:9995", "authority", "linkerd", "success", false),
},
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{deployment="emojivoto", direction="outbound"}[1m])) by (le, dst_namespace, authority))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{deployment="emojivoto", direction="outbound"}[1m])) by (le, dst_namespace, authority))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{deployment="emojivoto", direction="outbound"}[1m])) by (le, dst_namespace, authority))`,
`sum(increase(response_total{deployment="emojivoto", direction="outbound"}[1m])) by (dst_namespace, authority, classification, tls)`,
},
},
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
@ -1098,12 +1112,6 @@ status:
},
},
},
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{deployment="emojivoto", direction="outbound"}[1m])) by (le, dst_namespace, authority))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{deployment="emojivoto", direction="outbound"}[1m])) by (le, dst_namespace, authority))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{deployment="emojivoto", direction="outbound"}[1m])) by (le, dst_namespace, authority))`,
`sum(increase(response_total{deployment="emojivoto", direction="outbound"}[1m])) by (dst_namespace, authority, classification, tls)`,
},
expectedResponse: GenStatSummaryResponse("10.1.1.239:9995", pkgK8s.Authority, []string{""}, nil),
},
}
@ -1114,8 +1122,9 @@ status:
t.Run("Queries prometheus for a named authority", func(t *testing.T) {
expectations := []statSumExpected{
statSumExpected{
err: nil,
k8sConfigs: []string{`
expectedStatRpc: expectedStatRpc{
err: nil,
k8sConfigs: []string{`
apiVersion: v1
kind: Pod
metadata:
@ -1127,9 +1136,16 @@ metadata:
status:
phase: Running
`,
},
mockPromResponse: model.Vector{
genPromSample("10.1.1.239:9995", "authority", "linkerd", "success", false),
},
mockPromResponse: model.Vector{
genPromSample("10.1.1.239:9995", "authority", "linkerd", "success", false),
},
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{authority="10.1.1.239:9995", direction="inbound", namespace="linkerd"}[1m])) by (le, namespace, authority))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{authority="10.1.1.239:9995", direction="inbound", namespace="linkerd"}[1m])) by (le, namespace, authority))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{authority="10.1.1.239:9995", direction="inbound", namespace="linkerd"}[1m])) by (le, namespace, authority))`,
`sum(increase(response_total{authority="10.1.1.239:9995", direction="inbound", namespace="linkerd"}[1m])) by (namespace, authority, classification, tls)`,
},
},
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
@ -1141,12 +1157,6 @@ status:
},
TimeWindow: "1m",
},
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{authority="10.1.1.239:9995", direction="inbound", namespace="linkerd"}[1m])) by (le, namespace, authority))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{authority="10.1.1.239:9995", direction="inbound", namespace="linkerd"}[1m])) by (le, namespace, authority))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{authority="10.1.1.239:9995", direction="inbound", namespace="linkerd"}[1m])) by (le, namespace, authority))`,
`sum(increase(response_total{authority="10.1.1.239:9995", direction="inbound", namespace="linkerd"}[1m])) by (namespace, authority, classification, tls)`,
},
expectedResponse: GenStatSummaryResponse("10.1.1.239:9995", pkgK8s.Authority, []string{"linkerd"}, nil),
},
}

View File

@ -2,12 +2,17 @@ package public
import (
"context"
"fmt"
"io"
"reflect"
"sort"
"sync"
"time"
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
tap "github.com/linkerd/linkerd2/controller/gen/controller/tap"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"google.golang.org/grpc"
@ -18,6 +23,7 @@ type MockApiClient struct {
VersionInfoToReturn *pb.VersionInfo
ListPodsResponseToReturn *pb.ListPodsResponse
StatSummaryResponseToReturn *pb.StatSummaryResponse
TopRoutesResponseToReturn *pb.TopRoutesResponse
SelfCheckResponseToReturn *healthcheckPb.SelfCheckResponse
Api_TapClientToReturn pb.Api_TapClient
Api_TapByResourceClientToReturn pb.Api_TapByResourceClient
@ -27,6 +33,10 @@ func (c *MockApiClient) StatSummary(ctx context.Context, in *pb.StatSummaryReque
return c.StatSummaryResponseToReturn, c.ErrorToReturn
}
func (c *MockApiClient) TopRoutes(ctx context.Context, in *pb.TopRoutesRequest, opts ...grpc.CallOption) (*pb.TopRoutesResponse, error) {
return c.TopRoutesResponseToReturn, c.ErrorToReturn
}
func (c *MockApiClient) Version(ctx context.Context, in *pb.Empty, opts ...grpc.CallOption) (*pb.VersionInfo, error) {
return c.VersionInfoToReturn, c.ErrorToReturn
}
@ -174,3 +184,72 @@ func GenStatSummaryResponse(resName, resType string, resNs []string, counts *Pod
return resp
}
func GenTopRoutesResponse(routes []string, counts []uint64) pb.TopRoutesResponse {
rows := []*pb.RouteTable_Row{}
for i, route := range routes {
row := &pb.RouteTable_Row{
Route: route,
Stats: &pb.BasicStats{
SuccessCount: counts[i],
FailureCount: 0,
LatencyMsP50: 123,
LatencyMsP95: 123,
LatencyMsP99: 123,
TlsRequestCount: counts[i],
},
TimeWindow: "1m",
}
rows = append(rows, row)
}
resp := pb.TopRoutesResponse{
Response: &pb.TopRoutesResponse_Routes{
Routes: &pb.RouteTable{
Rows: rows,
},
},
}
return resp
}
type expectedStatRpc struct {
err error
k8sConfigs []string // k8s objects to seed the API
mockPromResponse model.Value // mock out a prometheus query response
expectedPrometheusQueries []string // queries we expect public-api to issue to prometheus
}
func newMockGrpcServer(exp expectedStatRpc) (*MockProm, *grpcServer, error) {
k8sAPI, err := k8s.NewFakeAPI("", exp.k8sConfigs...)
if err != nil {
return nil, nil, err
}
mockProm := &MockProm{Res: exp.mockPromResponse}
fakeGrpcServer := newGrpcServer(
mockProm,
tap.NewTapClient(nil),
k8sAPI,
"linkerd",
[]string{},
)
k8sAPI.Sync(nil)
return mockProm, fakeGrpcServer, nil
}
func (exp expectedStatRpc) verifyPromQueries(mockProm *MockProm) error {
if len(exp.expectedPrometheusQueries) > 0 {
sort.Strings(exp.expectedPrometheusQueries)
sort.Strings(mockProm.QueriesExecuted)
if !reflect.DeepEqual(exp.expectedPrometheusQueries, mockProm.QueriesExecuted) {
return fmt.Errorf("Prometheus queries incorrect. \nExpected:\n%+v \nGot:\n%+v",
exp.expectedPrometheusQueries, mockProm.QueriesExecuted)
}
}
return nil
}

View File

@ -0,0 +1,160 @@
package public
import (
"context"
"fmt"
"strings"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/prometheus/common/model"
)
const (
routeReqQuery = "sum(increase(route_response_total%s[%s])) by (%s, classification, tls)"
routeLatencyQuantileQuery = "histogram_quantile(%s, sum(irate(route_response_latency_ms_bucket%s[%s])) by (le, %s))"
dstLabel = `dst=~"%s(:\\d+)?"`
)
func (s *grpcServer) TopRoutes(ctx context.Context, req *pb.TopRoutesRequest) (*pb.TopRoutesResponse, error) {
// check for well-formed request
if req.GetSelector().GetResource() == nil {
return topRoutesError(req, "TopRoutes request missing Selector Resource"), nil
}
if req.Selector.Resource.Type != k8s.Service {
return topRoutesError(req, "target resource must be a service"), nil
}
if req.GetFromResource() != nil && req.GetFromResource().Type == k8s.Service {
return topRoutesError(req, "'from' resource cannot be a service"), nil
}
switch req.Outbound.(type) {
case *pb.TopRoutesRequest_FromResource:
if req.Outbound.(*pb.TopRoutesRequest_FromResource).FromResource.Type == k8s.All {
return topRoutesError(req, "resource type 'all' is not supported as a filter"), nil
}
}
table, err := s.routeResourceQuery(ctx, req)
if err != nil {
return nil, err
}
return &pb.TopRoutesResponse{
Response: &pb.TopRoutesResponse_Routes{
Routes: table,
},
}, nil
}
func topRoutesError(req *pb.TopRoutesRequest, message string) *pb.TopRoutesResponse {
return &pb.TopRoutesResponse{
Response: &pb.TopRoutesResponse_Error{
Error: &pb.ResourceError{
Resource: req.GetSelector().GetResource(),
Error: message,
},
},
}
}
func (s *grpcServer) routeResourceQuery(ctx context.Context, req *pb.TopRoutesRequest) (*pb.RouteTable, error) {
routeMetrics, err := s.getRouteMetrics(ctx, req, req.TimeWindow)
if err != nil {
return nil, err
}
rows := make([]*pb.RouteTable_Row, 0)
for route, metrics := range routeMetrics {
row := pb.RouteTable_Row{
Route: route,
TimeWindow: req.TimeWindow,
Stats: metrics,
}
rows = append(rows, &row)
}
rsp := &pb.RouteTable{
Rows: rows,
}
return rsp, nil
}
func (s *grpcServer) getRouteMetrics(ctx context.Context, req *pb.TopRoutesRequest, timeWindow string) (map[string]*pb.BasicStats, error) {
reqLabels := buildRouteLabels(req)
groupBy := "rt_route"
results, err := s.getPrometheusMetrics(ctx, routeReqQuery, routeLatencyQuantileQuery, reqLabels, timeWindow, groupBy)
if err != nil {
return nil, err
}
return processRouteMetrics(results), nil
}
func buildRouteLabels(req *pb.TopRoutesRequest) string {
// labels: the labels for the resource we want to query for
var labels model.LabelSet
switch out := req.Outbound.(type) {
case *pb.TopRoutesRequest_FromResource:
labels = labels.Merge(promQueryLabels(out.FromResource))
labels = labels.Merge(promDirectionLabels("outbound"))
default:
labels = labels.Merge(promQueryLabels(req.Selector.Resource))
labels = labels.Merge(promDirectionLabels("inbound"))
}
pairs := make([]string, 0)
for k, v := range labels {
pairs = append(pairs, fmt.Sprintf("%s=%q", k, v))
}
pairs = append(pairs, fmt.Sprintf(dstLabel, req.Selector.Resource.Name))
return fmt.Sprintf("{%s}", strings.Join(pairs, ", "))
}
func processRouteMetrics(results []promResult) map[string]*pb.BasicStats {
routeStats := make(map[string]*pb.BasicStats)
for _, result := range results {
for _, sample := range result.vec {
route := string(sample.Metric[model.LabelName("rt_route")])
if routeStats[route] == nil {
routeStats[route] = &pb.BasicStats{}
}
value := extractSampleValue(sample)
switch result.prom {
case promRequests:
switch string(sample.Metric[model.LabelName("classification")]) {
case "success":
routeStats[route].SuccessCount += value
case "failure":
routeStats[route].FailureCount += value
}
switch string(sample.Metric[model.LabelName("tls")]) {
case "true":
routeStats[route].TlsRequestCount += value
}
case promLatencyP50:
routeStats[route].LatencyMsP50 = value
case promLatencyP95:
routeStats[route].LatencyMsP95 = value
case promLatencyP99:
routeStats[route].LatencyMsP99 = value
}
}
}
return routeStats
}

View File

@ -0,0 +1,163 @@
package public
import (
"context"
"testing"
"github.com/golang/protobuf/proto"
pb "github.com/linkerd/linkerd2/controller/gen/public"
pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/prometheus/common/model"
)
type topRoutesExpected struct {
expectedStatRpc
req pb.TopRoutesRequest // the request we would like to test
expectedResponse pb.TopRoutesResponse // the routes response we expect
}
func genEmptyTopRoutesResponse() pb.TopRoutesResponse {
return pb.TopRoutesResponse{
Response: &pb.TopRoutesResponse_Routes{
Routes: &pb.RouteTable{
Rows: nil,
},
},
}
}
func routesMetric(routes []string) model.Vector {
samples := make(model.Vector, 0)
for _, route := range routes {
samples = append(samples, genRouteSample(route))
}
return samples
}
func genRouteSample(route string) *model.Sample {
return &model.Sample{
Metric: model.Metric{
"rt_route": model.LabelValue(route),
"classification": "success",
"tls": "true",
},
Value: 123,
Timestamp: 456,
}
}
func testTopRoutes(t *testing.T, expectations []topRoutesExpected) {
for _, exp := range expectations {
mockProm, fakeGrpcServer, err := newMockGrpcServer(exp.expectedStatRpc)
if err != nil {
t.Fatalf("Error creating mock grpc server: %s", err)
}
rsp, err := fakeGrpcServer.TopRoutes(context.TODO(), &exp.req)
if err != exp.err {
t.Fatalf("Expected error: %s, Got: %s", exp.err, err)
}
err = exp.verifyPromQueries(mockProm)
if err != nil {
t.Fatal(err)
}
rows := rsp.GetRoutes().Rows
if len(rows) != len(exp.expectedResponse.GetRoutes().Rows) {
t.Fatalf(
"Expected [%d] rows, got [%d].\nExpected:\n%s\nGot:\n%s",
len(exp.expectedResponse.GetRoutes().Rows),
len(rows),
exp.expectedResponse.GetRoutes().Rows,
rows,
)
}
for i, row := range rows {
expected := exp.expectedResponse.GetRoutes().Rows[i]
if !proto.Equal(row, expected) {
t.Fatalf("Expected: %+v\n Got: %+v", expected, row)
}
}
}
}
func TestTopRoutes(t *testing.T) {
t.Run("Successfully performs a routes query", func(t *testing.T) {
routes := []string{"/a"}
counts := []uint64{123}
expectations := []topRoutesExpected{
topRoutesExpected{
expectedStatRpc: expectedStatRpc{
err: nil,
mockPromResponse: routesMetric([]string{"/a"}),
// Comparing Prometheus queries is flakey because label order is
// non-deterministic.
/*expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(route_response_latency_ms_bucket{namespace="books", service="webapp", direction="inbound", dst=~"webapp(:\\d+)?"}[1m])) by (le, rt_route))`,
`histogram_quantile(0.95, sum(irate(route_response_latency_ms_bucket{namespace="books", service="webapp", direction="inbound", dst=~"webapp(:\\d+)?"}[1m])) by (le, rt_route))`,
`histogram_quantile(0.99, sum(irate(route_response_latency_ms_bucket{namespace="books", service="webapp", direction="inbound", dst=~"webapp(:\\d+)?"}[1m])) by (le, rt_route))`,
`sum(increase(route_response_total{namespace="books", service="webapp", direction="inbound", dst=~"webapp(:\\d+)?"}[1m])) by (rt_route, classification, tls)`,
},*/
},
req: pb.TopRoutesRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
Namespace: "books",
Type: pkgK8s.Service,
Name: "webapp",
},
},
TimeWindow: "1m",
},
expectedResponse: GenTopRoutesResponse(routes, counts),
},
}
testTopRoutes(t, expectations)
})
t.Run("Successfully performs an outbound routes query", func(t *testing.T) {
routes := []string{"/a"}
counts := []uint64{123}
expectations := []topRoutesExpected{
topRoutesExpected{
expectedStatRpc: expectedStatRpc{
err: nil,
mockPromResponse: routesMetric([]string{"/a"}),
// Comparing Prometheus queries is flakey because label order is
// non-deterministic.
/*expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(route_response_latency_ms_bucket{direction="outbound", namespace="books", deployment="traffic", dst=~"webapp(:\\d+)?"}[1m])) by (le, rt_route))`,
`histogram_quantile(0.95, sum(irate(route_response_latency_ms_bucket{direction="outbound", namespace="books", deployment="traffic", dst=~"webapp(:\\d+)?"}[1m])) by (le, rt_route))`,
`histogram_quantile(0.99, sum(irate(route_response_latency_ms_bucket{direction="outbound", namespace="books", deployment="traffic", dst=~"webapp(:\\d+)?"}[1m])) by (le, rt_route))`,
`sum(increase(route_response_total{direction="outbound", namespace="books", deployment="traffic", dst=~"webapp(:\\d+)?"}[1m])) by (rt_route, classification, tls)`,
},*/
},
req: pb.TopRoutesRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
Namespace: "books",
Type: pkgK8s.Service,
Name: "webapp",
},
},
Outbound: &pb.TopRoutesRequest_FromResource{
FromResource: &pb.Resource{
Namespace: "books",
Type: pkgK8s.Deployment,
Name: "traffic",
},
},
TimeWindow: "1m",
},
expectedResponse: GenTopRoutesResponse(routes, counts),
},
}
testTopRoutes(t, expectations)
})
}

View File

@ -9,6 +9,7 @@ import (
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/addr"
"github.com/linkerd/linkerd2/pkg/k8s"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/api/core/v1"
@ -47,7 +48,9 @@ var (
}
)
type StatSummaryRequestParams struct {
// Parameters that are used to build requests for metrics data. This includes
// requests to StatSummary and TopRoutes
type StatsRequestParams struct {
TimeWindow string
Namespace string
ResourceType string
@ -105,7 +108,7 @@ func GRPCError(err error) error {
return err
}
func BuildStatSummaryRequest(p StatSummaryRequestParams) (*pb.StatSummaryRequest, error) {
func BuildStatSummaryRequest(p StatsRequestParams) (*pb.StatSummaryRequest, error) {
window := defaultMetricTimeWindow
if p.TimeWindow != "" {
_, err := time.ParseDuration(p.TimeWindow)
@ -191,6 +194,76 @@ func BuildStatSummaryRequest(p StatSummaryRequestParams) (*pb.StatSummaryRequest
return statRequest, nil
}
func BuildTopRoutesRequest(p StatsRequestParams) (*pb.TopRoutesRequest, error) {
window := defaultMetricTimeWindow
if p.TimeWindow != "" {
_, err := time.ParseDuration(p.TimeWindow)
if err != nil {
return nil, err
}
window = p.TimeWindow
}
if p.AllNamespaces {
return nil, errors.New("all namespaces is not supported for routes request")
}
targetNamespace := p.Namespace
if p.AllNamespaces {
targetNamespace = ""
} else if p.Namespace == "" {
targetNamespace = v1.NamespaceDefault
}
resourceType, err := k8s.CanonicalResourceNameFromFriendlyName(p.ResourceType)
if err != nil {
return nil, err
}
if resourceType != k8s.Service {
return nil, errors.New("routes request must target a service")
}
topRoutesRequest := &pb.TopRoutesRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
Namespace: targetNamespace,
Name: p.ResourceName,
Type: resourceType,
},
},
TimeWindow: window,
}
if p.ToName != "" || p.ToType != "" || p.ToNamespace != "" {
return nil, errors.New("to options are not supported for routes request")
}
if p.FromName != "" || p.FromType != "" || p.FromNamespace != "" {
if p.FromNamespace == "" {
p.FromNamespace = targetNamespace
}
if p.FromType == "" {
p.FromType = resourceType
}
fromType, err := validateFromResourceType(p.FromType)
if err != nil {
return nil, err
}
fromResource := pb.TopRoutesRequest_FromResource{
FromResource: &pb.Resource{
Namespace: p.FromNamespace,
Type: fromType,
Name: p.FromName,
},
}
topRoutesRequest.Outbound = &fromResource
}
return topRoutesRequest, nil
}
// An authority can only receive traffic, not send it, so it can't be a --from
func validateFromResourceType(resourceType string) (string, error) {
name, err := k8s.CanonicalResourceNameFromFriendlyName(resourceType)
@ -547,3 +620,32 @@ func RenderTapEvent(event *pb.TapEvent, resource string) string {
return fmt.Sprintf("unknown %s", flow)
}
}
func GetRequestRate(stats *pb.BasicStats, timeWindow string) float64 {
success := stats.SuccessCount
failure := stats.FailureCount
windowLength, err := time.ParseDuration(timeWindow)
if err != nil {
log.Error(err.Error())
return 0.0
}
return float64(success+failure) / windowLength.Seconds()
}
func GetSuccessRate(stats *pb.BasicStats) float64 {
success := stats.SuccessCount
failure := stats.FailureCount
if success+failure == 0 {
return 0.0
}
return float64(success) / float64(success+failure)
}
func GetPercentTls(stats *pb.BasicStats) float64 {
reqTotal := stats.SuccessCount + stats.FailureCount
if reqTotal == 0 {
return 0.0
}
return float64(stats.TlsRequestCount) / float64(reqTotal)
}

View File

@ -50,7 +50,7 @@ func TestBuildStatSummaryRequest(t *testing.T) {
for friendly, canonical := range expectations {
statSummaryRequest, err := BuildStatSummaryRequest(
StatSummaryRequestParams{
StatsRequestParams{
ResourceType: friendly,
},
)
@ -72,7 +72,7 @@ func TestBuildStatSummaryRequest(t *testing.T) {
for _, timeWindow := range expectations {
statSummaryRequest, err := BuildStatSummaryRequest(
StatSummaryRequestParams{
StatsRequestParams{
TimeWindow: timeWindow,
ResourceType: k8s.Deployment,
},
@ -94,7 +94,7 @@ func TestBuildStatSummaryRequest(t *testing.T) {
for timeWindow, msg := range expectations {
_, err := BuildStatSummaryRequest(
StatSummaryRequestParams{
StatsRequestParams{
TimeWindow: timeWindow,
},
)
@ -115,7 +115,7 @@ func TestBuildStatSummaryRequest(t *testing.T) {
for input, msg := range expectations {
_, err := BuildStatSummaryRequest(
StatSummaryRequestParams{
StatsRequestParams{
ResourceType: input,
},
)
@ -129,6 +129,94 @@ func TestBuildStatSummaryRequest(t *testing.T) {
})
}
func TestBuildTopRoutesRequest(t *testing.T) {
t.Run("Parses valid time windows", func(t *testing.T) {
expectations := []string{
"1m",
"60s",
"1m",
}
for _, timeWindow := range expectations {
topRoutesRequest, err := BuildTopRoutesRequest(
StatsRequestParams{
TimeWindow: timeWindow,
ResourceType: k8s.Service,
},
)
if err != nil {
t.Fatalf("Unexpected error from BuildTopRoutesRequest [%s => %s]", timeWindow, err)
}
if topRoutesRequest.TimeWindow != timeWindow {
t.Fatalf("Unexpected TimeWindow from BuildTopRoutesRequest [%s => %s]", timeWindow, topRoutesRequest.TimeWindow)
}
}
})
t.Run("Rejects invalid time windows", func(t *testing.T) {
expectations := map[string]string{
"1": "time: missing unit in duration 1",
"s": "time: invalid duration s",
}
for timeWindow, msg := range expectations {
_, err := BuildTopRoutesRequest(
StatsRequestParams{
TimeWindow: timeWindow,
ResourceType: k8s.Service,
},
)
if err == nil {
t.Fatalf("BuildTopRoutesRequest(%s) unexpectedly succeeded, should have returned %s", timeWindow, msg)
}
if err.Error() != msg {
t.Fatalf("BuildTopRoutesRequest(%s) should have returned: %s but got unexpected message: %s", timeWindow, msg, err)
}
}
})
t.Run("Rejects non-service Kubernetes resource types", func(t *testing.T) {
resourceTypes := []string{
"deployment",
"pod",
"namespace",
}
msg := "routes request must target a service"
for _, input := range resourceTypes {
_, err := BuildTopRoutesRequest(
StatsRequestParams{
ResourceType: input,
},
)
if err == nil {
t.Fatalf("BuildTopRoutesRequest(%s) unexpectedly succeeded, should have returned %s", input, msg)
}
if err.Error() != msg {
t.Fatalf("BuildTopRoutesRequest(%s) should have returned: %s but got unexpected message: %s", input, msg, err)
}
}
})
t.Run("Rejects all-namespaces flag", func(t *testing.T) {
msg := "all namespaces is not supported for routes request"
_, err := BuildTopRoutesRequest(
StatsRequestParams{
ResourceType: k8s.Service,
AllNamespaces: true,
},
)
if err == nil {
t.Fatalf("BuildTopRoutesRequest unexpectedly succeeded, should have returned %s", msg)
}
if err.Error() != msg {
t.Fatalf("BuildTopRoutesRequest should have returned: %s but got unexpected message: %s", msg, err)
}
})
}
func TestBuildResource(t *testing.T) {
type resourceExp struct {
namespace string

File diff suppressed because it is too large Load Diff

View File

@ -343,9 +343,39 @@ message StatTable {
}
}
message TopRoutesRequest {
ResourceSelection selector = 1;
string time_window = 2;
oneof outbound {
Empty none = 3;
Resource from_resource = 5;
}
}
message TopRoutesResponse {
oneof response {
RouteTable routes = 1;
ResourceError error = 2;
}
}
message RouteTable {
repeated Row rows = 1;
message Row {
string route = 1;
string time_window = 2;
BasicStats stats = 5;
}
}
service Api {
rpc StatSummary(StatSummaryRequest) returns (StatSummaryResponse) {}
rpc TopRoutes(TopRoutesRequest) returns (TopRoutesResponse) {}
rpc ListPods(ListPodsRequest) returns (ListPodsResponse) {}
// Superceded by `TapByResource`.

View File

@ -87,7 +87,7 @@ func (h *handler) handleApiStat(w http.ResponseWriter, req *http.Request, p http
if req.FormValue("all_namespaces") == "true" {
allNs = true
}
requestParams := util.StatSummaryRequestParams{
requestParams := util.StatsRequestParams{
TimeWindow: req.FormValue("window"),
ResourceName: req.FormValue("resource_name"),
ResourceType: req.FormValue("resource_type"),