stat_summary: support service metrics using `authority` label (#6514)

Currently, `viz stat` on services is pretty restricted because of
it not being a podowner resource. This PR fixes that by making
it use the `direction="outbound", authroty="svc"` while querying
the prometheus metrics. This means that for services, we can
generate metrics from the *meshed* clients side.

`StatsSummary` metrics on a service are further divided into
two kinds

### Service has no `ServiceProfiles.dstOverrides` 

In this case, We just return the metrics by
querying for `direction="outbound", authroty="svc"`, along
with any `--from` resources specified as client query labels.

We also gate this path, to fail for requests that have `--from`
as a service or for `svc/* --to xyz`, as they are invalid i.e 
we can't render metrics with service as the client.

### Service has `ServiceProfiles.dstOverrides` 

Here, We follow a similar path of `TrafficSplit`
except that we use a `ServiceProfile` resource
object instead.

_The TrafficSplit path will be removed or merged into the 
`Service` path in a separate PR for simplification,_


## Testing

### Apply Traffic Splitting through `ServiceProfiles`

```bash
on  kind-kind  linkerd2 on 🌱 taru [📦++1🤷‍] via 🐼 v1.16.5 took 1m11s
➜ k create ns linkerd-trafficsplit-test-sp                                                                                                                                ~/work/linkerd2
namespace/linkerd-trafficsplit-test-sp created

on  kind-kind  linkerd2 on 🌱 taru [📦++1🤷‍] via 🐼 v1.16.5
➜ ./bin/linkerd inject ./test/integration/trafficsplit/testdata/application.yaml | k -n linkerd-trafficsplit-test-sp apply -f -                                           ~/work/linkerd2

document missing "kind" field, skipped
deployment "backend" injected
service "backend-svc" skipped
deployment "failing" injected
service "failing-svc" skipped
deployment "slow-cooker" injected
service "slow-cooker" skipped

deployment.apps/backend created
service/backend-svc created
deployment.apps/failing created
service/failing-svc created
deployment.apps/slow-cooker created
service/slow-cooker created

on  kind-kind  linkerd2 on 🌱 taru [📦++1🤷‍] via 🐼 v1.16.5
➜ k apply -f ./test/integration/trafficsplit/testdata/sp/updated-traffic-split-leaf-weights.yaml -n linkerd-trafficsplit-test-sp                                          ~/work/linkerd2
serviceprofile.linkerd.io/backend-svc.linkerd-trafficsplit-test-sp.svc.cluster.local created

on  kind-kind  linkerd2 on 🌱 taru [📦++1🤷‍] via 🐼 v1.16.5
➜ k describe sp -n linkerd-trafficsplit-test-sp                                                                                                                           ~/work/linkerd2
Name:         backend-svc.linkerd-trafficsplit-test-sp.svc.cluster.local
Namespace:    linkerd-trafficsplit-test-sp
Labels:       <none>
Annotations:  <none>
API Version:  linkerd.io/v1alpha2
Kind:         ServiceProfile
Metadata:
  Creation Timestamp:  2021-07-01T11:05:06Z
  Generation:          1
  Managed Fields:
    API Version:  linkerd.io/v1alpha2
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .:
          f:kubectl.kubernetes.io/last-applied-configuration:
      f:spec:
        .:
        f:dstOverrides:
    Manager:         kubectl-client-side-apply
    Operation:       Update
    Time:            2021-07-01T11:05:06Z
  Resource Version:  1398
  UID:               fce0a250-1396-4a14-9729-e19030048c7a
Spec:
  Dst Overrides:
    Authority:  backend-svc.linkerd-trafficsplit-test-sp.svc.cluster.local
    Weight:     500m
    Authority:  failing-svc.linkerd-trafficsplit-test-sp.svc.cluster.local:8081
    Weight:     500m
Events:         <none>
```

### CLI Output

```bash
on  kind-kind  linkerd2 on 🌱 main [📦📝🤷‍] via 🐼 v1.16.6 via 
➜ ./bin/go-run cli viz stat svc/backend-svc -n linkerd-trafficsplit-test-sp                                                                                                     ~/work/linkerd2
NAME                                                         APEX                                                         LEAF          WEIGHT   SUCCESS      RPS   LATENCY_P50   LATENCY_P95   LATENCY_P99
backend-svc.linkerd-trafficsplit-test-sp.svc.cluster.local   backend-svc.linkerd-trafficsplit-test-sp.svc.cluster.local   backend-svc     500m   100.00%   0.9rps           1ms           2ms           2ms
backend-svc.linkerd-trafficsplit-test-sp.svc.cluster.local   backend-svc.linkerd-trafficsplit-test-sp.svc.cluster.local   failing-svc     500m     0.00%   1.1rps           1ms           2ms           2ms

on  kind-kind  linkerd2 on 🌱 main [📦📝🤷‍] via 🐼 v1.16.6 via  took 2s
➜ ./bin/go-run cli viz stat svc/backend-svc -n linkerd-trafficsplit-test-sp --from deploy/slow-cooker                                                                           ~/work/linkerd2
NAME                                                         APEX                                                         LEAF          WEIGHT   SUCCESS      RPS   LATENCY_P50   LATENCY_P95   LATENCY_P99
backend-svc.linkerd-trafficsplit-test-sp.svc.cluster.local   backend-svc.linkerd-trafficsplit-test-sp.svc.cluster.local   backend-svc     500m   100.00%   0.4rps           1ms           2ms           2ms
backend-svc.linkerd-trafficsplit-test-sp.svc.cluster.local   backend-svc.linkerd-trafficsplit-test-sp.svc.cluster.local   failing-svc     500m     0.00%   0.6rps           1ms           2ms           2ms

on  kind-kind  linkerd2 on 🌱 main [📦📝🤷‍] via 🐼 v1.16.6 via  took 2s
➜ ./bin/go-run cli viz stat svc/backend-svc -n linkerd-trafficsplit-test-sp --from deploy/slow-cooker-1                                                                         ~/work/linkerd2
NAME                                                         APEX                                                         LEAF          WEIGHT   SUCCESS      RPS   LATENCY_P50   LATENCY_P95   LATENCY_P99
backend-svc.linkerd-trafficsplit-test-sp.svc.cluster.local   backend-svc.linkerd-trafficsplit-test-sp.svc.cluster.local   backend-svc     500m   100.00%   0.5rps           1ms           2ms           2ms
backend-svc.linkerd-trafficsplit-test-sp.svc.cluster.local   backend-svc.linkerd-trafficsplit-test-sp.svc.cluster.local   failing-svc     500m     0.00%   0.5rps           1ms           2ms           2ms

on  kind-kind  linkerd2 on 🌱 main [📦📝🤷‍] via 🐼 v1.16.6 via 
➜ ./bin/go-run cli viz stat svc/prometheus -n linkerd-viz                                                   ~/work/linkerd2
StatSummary API error: service only supported as a target on 'from' queries, or as a destination on 'to' queries%


# With no `sp.dstOverrides`

on  kind-kind  linkerd2 on 🌱 taru [📦📝🤷‍] via 🐼 v1.16.6 via  took 10s
➜ k -n linkerd-trafficsplit-test-sp delete sp backend-svc.linkerd-trafficsplit-test-sp.svc.cluster.local                                     ~/work/linkerd2
serviceprofile.linkerd.io "backend-svc.linkerd-trafficsplit-test-sp.svc.cluster.local" deleted

on  kind-kind  linkerd2 on 🌱 taru [📦📝🤷‍] via 🐼 v1.16.6 via 
➜ ./bin/go-run cli viz stat svc/backend-svc -n linkerd-trafficsplit-test-sp                                                                  ~/work/linkerd2
NAME          MESHED   SUCCESS      RPS   LATENCY_P50   LATENCY_P95   LATENCY_P99
backend-svc        -   100.00%   1.2rps           1ms           2ms           2ms

on  kind-kind  linkerd2 on 🌱 taru [📦📝🤷‍] via 🐼 v1.16.6 via 
➜ ./bin/go-run cli viz stat svc/backend-svc -n linkerd-trafficsplit-test-sp --from deploy/slow-cooker-1 --from-namespace linkerd-trafficsplit-test-sp
NAME          MESHED   SUCCESS      RPS   LATENCY_P50   LATENCY_P95   LATENCY_P99
backend-svc        -   100.00%   0.6rps           1ms           2ms           2ms

on  kind-kind  linkerd2 on 🌱 taru [📦📝🤷‍] via 🐼 v1.16.6 via 
➜ ./bin/go-run cli viz stat svc/backend-svc -n linkerd-trafficsplit-test-sp --from deploy/slow-cooker --from-namespace linkerd-trafficsplit-test-sp
NAME          MESHED   SUCCESS      RPS   LATENCY_P50   LATENCY_P95   LATENCY_P99
backend-svc        -   100.00%   0.7rps           1ms           2ms           2ms

on  kind-kind  linkerd2 on 🌱 taru [📦📝🤷‍] via 🐼 v1.16.6 via 
➜ ./bin/go-run cli viz stat deploy/slow-cooker -n linkerd-trafficsplit-test-sp --to svc/backend-svc                                          ~/work/linkerd2
No traffic found.

on  kind-kind  linkerd2 on 🌱 taru [📦📝🤷‍] via 🐼 v1.16.6 via 
➜                                                                                                                                            ~/work/linkerd2


```

Note: _This means that we need documenation changes to
let the user know that the `viz stat` on a service are client
side metrics and would be missing metrics from unmeshed
clients._


Signed-off-by: Tarun Pothulapati <tarunpothulapati@outlook.com>
This commit is contained in:
Tarun Pothulapati 2021-08-09 23:05:14 +05:30 committed by GitHub
parent 01391c3c42
commit a330d20aa0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 406 additions and 118 deletions

View File

@ -118,6 +118,12 @@ func TestCliStatForLinkerdNamespace(t *testing.T) {
prometheusAuthority: "-",
},
},
{
args: []string{"viz", "stat", "au", "-n", TestHelper.GetVizNamespace(), "--to", fmt.Sprintf("po/%s", prometheusPod), "--to-namespace", prometheusNamespace},
expectedRows: map[string]string{
prometheusAuthority: "-",
},
},
}
if !TestHelper.ExternalPrometheus() {
@ -146,7 +152,7 @@ func TestCliStatForLinkerdNamespace(t *testing.T) {
{
args: []string{"viz", "stat", "svc", "prometheus", "-n", TestHelper.GetVizNamespace(), "--from", "deploy/metrics-api", "--from-namespace", TestHelper.GetVizNamespace()},
expectedRows: map[string]string{
"prometheus": "1/1",
"prometheus": "-",
},
},
}...,
@ -177,42 +183,87 @@ func TestCliStatForLinkerdNamespace(t *testing.T) {
)
}
for _, tt := range testCases {
tt := tt // pin
timeout := 20 * time.Second
t.Run("linkerd "+strings.Join(tt.args, " "), func(t *testing.T) {
err := TestHelper.RetryFor(timeout, func() error {
// Use a short time window so that transient errors at startup
// fall out of the window.
tt.args = append(tt.args, "-t", "30s")
out, err := TestHelper.LinkerdRun(tt.args...)
if err != nil {
testutil.AnnotatedFatalf(t, "unexpected stat error",
"unexpected stat error: %s\n%s", err, out)
}
// Apply a sample application
TestHelper.WithDataPlaneNamespace(ctx, "stat-test", map[string]string{}, t, func(t *testing.T, prefixedNs string) {
out, err := TestHelper.LinkerdRun("inject", "--manual", "../trafficsplit/testdata/application.yaml")
if err != nil {
testutil.AnnotatedFatal(t, "'linkerd inject' command failed", err)
}
expectedColumnCount := 8
if tt.status != "" {
expectedColumnCount++
}
rowStats, err := testutil.ParseRows(out, len(tt.expectedRows), expectedColumnCount)
if err != nil {
return err
}
out, err = TestHelper.KubectlApply(out, prefixedNs)
if err != nil {
testutil.AnnotatedFatalf(t, "'kubectl apply' command failed",
"'kubectl apply' command failed\n%s", out)
}
for name, meshed := range tt.expectedRows {
if err := validateRowStats(name, meshed, tt.status, rowStats); err != nil {
// wait for deployments to start
for _, deploy := range []string{"backend", "failing", "slow-cooker"} {
if err := TestHelper.CheckPods(ctx, prefixedNs, deploy, 1); err != nil {
if rce, ok := err.(*testutil.RestartCountError); ok {
testutil.AnnotatedWarn(t, "CheckPods timed-out", rce)
} else {
testutil.AnnotatedError(t, "CheckPods timed-out", err)
}
}
}
testCases = append(testCases, []struct {
args []string
expectedRows map[string]string
status string
}{
{
args: []string{"viz", "stat", "svc", "-n", prefixedNs},
expectedRows: map[string]string{
"backend-svc": "-",
},
},
{
args: []string{"viz", "stat", "svc/backend-svc", "-n", prefixedNs, "--from", "deploy/slow-cooker"},
expectedRows: map[string]string{
"backend-svc": "-",
},
},
}...,
)
for _, tt := range testCases {
tt := tt // pin
timeout := 20 * time.Second
t.Run("linkerd "+strings.Join(tt.args, " "), func(t *testing.T) {
err := TestHelper.RetryFor(timeout, func() error {
// Use a short time window so that transient errors at startup
// fall out of the window.
tt.args = append(tt.args, "-t", "30s")
out, err := TestHelper.LinkerdRun(tt.args...)
if err != nil {
testutil.AnnotatedFatalf(t, "unexpected stat error",
"unexpected stat error: %s\n%s", err, out)
}
expectedColumnCount := 8
if tt.status != "" {
expectedColumnCount++
}
rowStats, err := testutil.ParseRows(out, len(tt.expectedRows), expectedColumnCount)
if err != nil {
return err
}
}
return nil
for name, meshed := range tt.expectedRows {
if err := validateRowStats(name, meshed, tt.status, rowStats); err != nil {
return err
}
}
return nil
})
if err != nil {
testutil.AnnotatedFatal(t, fmt.Sprintf("timed-out checking stats (%s)", timeout), err)
}
})
if err != nil {
testutil.AnnotatedFatal(t, fmt.Sprintf("timed-out checking stats (%s)", timeout), err)
}
})
}
}
})
}
func validateRowStats(name, expectedMeshCount, expectedStatus string, rowStats map[string]*testutil.RowStat) error {

View File

@ -310,6 +310,7 @@ type row struct {
status string
*rowStats
*tsStats
*dstStats
}
type tsStats struct {
@ -318,6 +319,11 @@ type tsStats struct {
weight string
}
type dstStats struct {
dst string
weight string
}
var (
nameHeader = "NAME"
namespaceHeader = "NAMESPACE"
@ -331,7 +337,7 @@ func statHasRequestData(stat *pb.BasicStats) bool {
}
func isPodOwnerResource(typ string) bool {
return typ != k8s.TrafficSplit && typ != k8s.Authority
return typ != k8s.TrafficSplit && typ != k8s.Authority && typ != k8s.Service
}
func writeStatsToBuffer(rows []*pb.StatTable_PodGroup_Row, w *tabwriter.Writer, options *statOptions) {
@ -339,6 +345,7 @@ func writeStatsToBuffer(rows []*pb.StatTable_PodGroup_Row, w *tabwriter.Writer,
maxNamespaceLength := len(namespaceHeader)
maxApexLength := len(apexHeader)
maxLeafLength := len(leafHeader)
maxDstLength := len(dstHeader)
maxWeightLength := len(weightHeader)
statTables := make(map[string]map[string]*row)
@ -373,7 +380,7 @@ func writeStatsToBuffer(rows []*pb.StatTable_PodGroup_Row, w *tabwriter.Writer,
namespace := r.Resource.Namespace
key := fmt.Sprintf("%s/%s", namespace, name)
if r.Resource.Type == k8s.TrafficSplit {
if r.Resource.Type == k8s.TrafficSplit || (r.Resource.Type == k8s.Service && r.TsStats != nil) {
key = fmt.Sprintf("%s/%s/%s", namespace, name, r.TsStats.Leaf)
}
resourceKey := r.Resource.Type
@ -391,7 +398,7 @@ func writeStatsToBuffer(rows []*pb.StatTable_PodGroup_Row, w *tabwriter.Writer,
}
meshedCount := fmt.Sprintf("%d/%d", r.MeshedPodCount, r.RunningPodCount)
if resourceKey == k8s.Authority {
if resourceKey == k8s.Authority || resourceKey == k8s.Service {
meshedCount = "-"
}
statTables[resourceKey][key] = &row{
@ -412,26 +419,40 @@ func writeStatsToBuffer(rows []*pb.StatTable_PodGroup_Row, w *tabwriter.Writer,
}
}
if r.TsStats != nil {
leaf := r.TsStats.Leaf
apex := r.TsStats.Apex
weight := r.TsStats.Weight
if r.GetResource().GetType() == k8s.TrafficSplit {
leaf := r.TsStats.Leaf
apex := r.TsStats.Apex
weight := r.TsStats.Weight
if len(leaf) > maxLeafLength {
maxLeafLength = len(leaf)
}
if len(leaf) > maxLeafLength {
maxLeafLength = len(leaf)
}
if len(apex) > maxApexLength {
maxApexLength = len(apex)
}
if len(apex) > maxApexLength {
maxApexLength = len(apex)
}
if len(weight) > maxWeightLength {
maxWeightLength = len(weight)
}
statTables[resourceKey][key].tsStats = &tsStats{
apex: apex,
leaf: leaf,
weight: weight,
}
} else {
dst := r.TsStats.Leaf
weight := r.TsStats.Weight
statTables[resourceKey][key].tsStats = &tsStats{
apex: apex,
leaf: leaf,
weight: weight,
if len(dst) > maxDstLength {
maxDstLength = len(dst)
}
if len(weight) > maxWeightLength {
maxWeightLength = len(weight)
}
statTables[resourceKey][key].dstStats = &dstStats{
dst: dst,
weight: weight,
}
}
}
}
@ -442,13 +463,13 @@ func writeStatsToBuffer(rows []*pb.StatTable_PodGroup_Row, w *tabwriter.Writer,
fmt.Fprintln(os.Stderr, "No traffic found.")
return
}
printStatTables(statTables, w, maxNameLength, maxNamespaceLength, maxLeafLength, maxApexLength, maxWeightLength, options)
printStatTables(statTables, w, maxNameLength, maxNamespaceLength, maxLeafLength, maxApexLength, maxDstLength, maxWeightLength, options)
case jsonOutput:
printStatJSON(statTables, w)
}
}
func printStatTables(statTables map[string]map[string]*row, w *tabwriter.Writer, maxNameLength, maxNamespaceLength, maxLeafLength, maxApexLength, maxWeightLength int, options *statOptions) {
func printStatTables(statTables map[string]map[string]*row, w *tabwriter.Writer, maxNameLength, maxNamespaceLength, maxLeafLength, maxApexLength, maxDstLength, maxWeightLength int, options *statOptions) {
usePrefix := false
if len(statTables) > 1 {
usePrefix = true
@ -465,7 +486,7 @@ func printStatTables(statTables map[string]map[string]*row, w *tabwriter.Writer,
if !usePrefix {
resourceTypeLabel = ""
}
printSingleStatTable(stats, resourceTypeLabel, resourceType, w, maxNameLength, maxNamespaceLength, maxLeafLength, maxApexLength, maxWeightLength, options)
printSingleStatTable(stats, resourceTypeLabel, resourceType, w, maxNameLength, maxNamespaceLength, maxLeafLength, maxApexLength, maxDstLength, maxWeightLength, options)
}
}
}
@ -479,14 +500,29 @@ func showTCPConns(resourceType string) bool {
return resourceType != k8s.Authority && resourceType != k8s.TrafficSplit
}
func printSingleStatTable(stats map[string]*row, resourceTypeLabel, resourceType string, w *tabwriter.Writer, maxNameLength, maxNamespaceLength, maxLeafLength, maxApexLength, maxWeightLength int, options *statOptions) {
func printSingleStatTable(stats map[string]*row, resourceTypeLabel, resourceType string, w *tabwriter.Writer, maxNameLength, maxNamespaceLength, maxLeafLength, maxApexLength, maxDstLength, maxWeightLength int, options *statOptions) {
headers := make([]string, 0)
nameTemplate := fmt.Sprintf("%%-%ds", maxNameLength)
namespaceTemplate := fmt.Sprintf("%%-%ds", maxNamespaceLength)
apexTemplate := fmt.Sprintf("%%-%ds", maxApexLength)
leafTemplate := fmt.Sprintf("%%-%ds", maxLeafLength)
dstTemplate := fmt.Sprintf("%%-%ds", maxDstLength)
weightTemplate := fmt.Sprintf("%%-%ds", maxWeightLength)
hasDstStats := false
for _, r := range stats {
if r.dstStats != nil {
hasDstStats = true
}
}
hasTsStats := false
for _, r := range stats {
if r.tsStats != nil {
hasTsStats = true
}
}
if options.allNamespaces {
headers = append(headers,
fmt.Sprintf(namespaceTemplate, namespaceHeader))
@ -499,7 +535,11 @@ func printSingleStatTable(stats map[string]*row, resourceTypeLabel, resourceType
headers = append(headers, "STATUS")
}
if resourceType == k8s.TrafficSplit {
if hasDstStats {
headers = append(headers,
fmt.Sprintf(dstTemplate, dstHeader),
fmt.Sprintf(weightTemplate, weightHeader))
} else if hasTsStats {
headers = append(headers,
fmt.Sprintf(apexTemplate, apexHeader),
fmt.Sprintf(leafTemplate, leafHeader),
@ -542,9 +582,12 @@ func printSingleStatTable(stats map[string]*row, resourceTypeLabel, resourceType
templateStringEmpty = "%s\t" + templateStringEmpty
}
if resourceType == k8s.TrafficSplit {
if hasTsStats {
templateString = "%s\t%s\t%s\t%s\t%.2f%%\t%.1frps\t%dms\t%dms\t%dms\t"
templateStringEmpty = "%s\t%s\t%s\t%s\t-\t-\t-\t-\t-\t"
} else if hasDstStats {
templateString = "%s\t%s\t%s\t%.2f%%\t%.1frps\t%dms\t%dms\t%dms\t"
templateStringEmpty = "%s\t%s\t%s\t-\t-\t-\t-\t-\t"
}
if !showTCPConns(resourceType) {
@ -578,6 +621,7 @@ func printSingleStatTable(stats map[string]*row, resourceTypeLabel, resourceType
apexPadding := 0
leafPadding := 0
dstPadding := 0
if stats[key].tsStats != nil {
if maxApexLength > len(stats[key].tsStats.apex) {
@ -586,6 +630,10 @@ func printSingleStatTable(stats map[string]*row, resourceTypeLabel, resourceType
if maxLeafLength > len(stats[key].tsStats.leaf) {
leafPadding = maxLeafLength - len(stats[key].tsStats.leaf)
}
} else if stats[key].dstStats != nil {
if maxDstLength > len(stats[key].dstStats.dst) {
dstPadding = maxDstLength - len(stats[key].dstStats.dst)
}
}
values = append(values, name+strings.Repeat(" ", padding))
@ -593,12 +641,17 @@ func printSingleStatTable(stats map[string]*row, resourceTypeLabel, resourceType
values = append(values, stats[key].status)
}
if resourceType == k8s.TrafficSplit {
if hasTsStats {
values = append(values,
stats[key].tsStats.apex+strings.Repeat(" ", apexPadding),
stats[key].tsStats.leaf+strings.Repeat(" ", leafPadding),
stats[key].tsStats.weight,
)
} else if hasDstStats {
values = append(values,
stats[key].dstStats.dst+strings.Repeat(" ", dstPadding),
stats[key].dstStats.weight,
)
} else {
values = append(values, []interface{}{
stats[key].meshed,
@ -656,6 +709,7 @@ type jsonStats struct {
TCPWriteBytes *float64 `json:"tcp_write_bytes_rate,omitempty"`
Apex string `json:"apex,omitempty"`
Leaf string `json:"leaf,omitempty"`
Dst string `json:"dst,omitempty"`
Weight string `json:"weight,omitempty"`
}
@ -692,7 +746,10 @@ func printStatJSON(statTables map[string]map[string]*row, w *tabwriter.Writer) {
if stats[key].tsStats != nil {
entry.Apex = stats[key].apex
entry.Leaf = stats[key].leaf
entry.Weight = stats[key].weight
entry.Weight = stats[key].tsStats.weight
} else if stats[key].dstStats != nil {
entry.Dst = stats[key].dstStats.dst
entry.Weight = stats[key].dstStats.weight
}
entries = append(entries, entry)
}

View File

@ -39,6 +39,7 @@ const (
gatewayNameLabel = model.LabelName("gateway_name")
gatewayNamespaceLabel = model.LabelName("gateway_namespace")
remoteClusterNameLabel = model.LabelName("target_cluster_name")
authorityLabel = model.LabelName("authority")
)
var (
@ -163,7 +164,7 @@ func generateLabelStringWithRegex(l model.LabelSet, labelName string, stringToMa
for l, v := range l {
lstrs = append(lstrs, fmt.Sprintf("%s=%q", l, v))
}
lstrs = append(lstrs, fmt.Sprintf(`%s=~"^%s.+"`, labelName, stringToMatch))
lstrs = append(lstrs, fmt.Sprintf(`%s=~"^%s.*"`, labelName, stringToMatch))
sort.Strings(lstrs)
return fmt.Sprintf("{%s}", strings.Join(lstrs, ", "))

View File

@ -0,0 +1,14 @@
package api
import (
"testing"
"github.com/prometheus/common/model"
)
func TestGenerateLabelStringWithRegex(t *testing.T) {
query := generateLabelStringWithRegex(model.LabelSet{}, "key", "value")
if query != "{key=~\"^value.*\"}" {
t.Errorf("Expected 'key=~\"^value.+\"', got '%s'", query)
}
}

View File

@ -5,14 +5,17 @@ import (
"fmt"
"reflect"
"sort"
"strings"
proto "github.com/golang/protobuf/proto"
"github.com/linkerd/linkerd2/pkg/k8s"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
"github.com/linkerd/linkerd2/viz/pkg/util"
vizutil "github.com/linkerd/linkerd2/viz/pkg/util"
"github.com/prometheus/common/model"
"github.com/servicemeshinterface/smi-sdk-go/pkg/apis/split/v1alpha1"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@ -23,7 +26,6 @@ type resourceResult struct {
res *pb.StatTable
err error
}
type k8sStat struct {
object metav1.Object
podStats *podStats
@ -44,6 +46,13 @@ type tsKey struct {
Weight string
}
type dstKey struct {
Namespace string
Service string
Dst string
Weight string
}
const (
success = "success"
failure = "failure"
@ -53,6 +62,8 @@ const (
tcpConnectionsQuery = "sum(tcp_open_connections%s) by (%s)"
tcpReadBytesQuery = "sum(increase(tcp_read_bytes_total%s[%s])) by (%s)"
tcpWriteBytesQuery = "sum(increase(tcp_write_bytes_total%s[%s])) by (%s)"
regexAny = ".+"
)
type podStats struct {
@ -76,9 +87,9 @@ func (s *grpcServer) StatSummary(ctx context.Context, req *pb.StatSummaryRequest
return statSummaryError(req, "StatSummary request missing Selector Resource"), nil
}
// special case to check for services as outbound only
if isInvalidServiceRequest(req.Selector, req.GetFromResource()) {
return statSummaryError(req, "service only supported as a target on 'from' queries, or as a destination on 'to' queries"), nil
// err if --from is a service
if req.GetFromResource() != nil && req.GetFromResource().GetType() == k8s.Service {
return statSummaryError(req, "service is not supported as a target on 'from' queries, or as a target with 'to' queries"), nil
}
switch req.Outbound.(type) {
@ -111,6 +122,8 @@ func (s *grpcServer) StatSummary(ctx context.Context, req *pb.StatSummaryRequest
go func() {
if isNonK8sResourceQuery(statReq.GetSelector().GetResource().GetType()) {
resultChan <- s.nonK8sResourceQuery(ctx, statReq)
} else if statReq.GetSelector().GetResource().GetType() == k8s.Service {
resultChan <- s.serviceResourceQuery(ctx, statReq)
} else if isTrafficSplitQuery(statReq.GetSelector().GetResource().GetType()) {
resultChan <- s.trafficSplitResourceQuery(ctx, statReq)
} else {
@ -122,7 +135,7 @@ func (s *grpcServer) StatSummary(ctx context.Context, req *pb.StatSummaryRequest
for i := 0; i < len(resourcesToQuery); i++ {
result := <-resultChan
if result.err != nil {
return nil, util.GRPCError(result.err)
return nil, vizutil.GRPCError(result.err)
}
statTables = append(statTables, result.res)
}
@ -135,17 +148,10 @@ func (s *grpcServer) StatSummary(ctx context.Context, req *pb.StatSummaryRequest
},
}
log.Debugf("Sent response as %+v\n", statTables)
return &rsp, nil
}
func isInvalidServiceRequest(selector *pb.ResourceSelection, fromResource *pb.Resource) bool {
if fromResource != nil {
return fromResource.Type == k8s.Service
}
return selector.Resource.Type == k8s.Service
}
func statSummaryError(req *pb.StatSummaryRequest, message string) *pb.StatSummaryResponse {
return &pb.StatSummaryResponse{
Response: &pb.StatSummaryResponse_Error{
@ -198,6 +204,7 @@ func (s *grpcServer) getKubernetesObjectStats(req *pb.StatSummaryRequest) (map[r
}
func (s *grpcServer) k8sResourceQuery(ctx context.Context, req *pb.StatSummaryRequest) resourceResult {
k8sObjects, err := s.getKubernetesObjectStats(req)
if err != nil {
return resourceResult{res: nil, err: err}
@ -288,8 +295,8 @@ func (s *grpcServer) getTrafficSplits(req *pb.StatSummaryRequest) ([]*v1alpha1.T
}
func (s *grpcServer) trafficSplitResourceQuery(ctx context.Context, req *pb.StatSummaryRequest) resourceResult {
tss, err := s.getTrafficSplits(req)
tss, err := s.getTrafficSplits(req)
if err != nil {
return resourceResult{res: nil, err: err}
}
@ -359,11 +366,88 @@ func (s *grpcServer) trafficSplitResourceQuery(ctx context.Context, req *pb.Stat
return resourceResult{res: &rsp, err: nil}
}
func (s *grpcServer) serviceResourceQuery(ctx context.Context, req *pb.StatSummaryRequest) resourceResult {
rows := make([]*pb.StatTable_PodGroup_Row, 0)
dstBasicStats := make(map[dstKey]*pb.BasicStats)
dstTCPStats := make(map[dstKey]*pb.TcpStats)
if !req.SkipStats {
var err error
dstBasicStats, dstTCPStats, err = s.getServiceMetrics(ctx, req, req.TimeWindow)
if err != nil {
return resourceResult{res: nil, err: err}
}
}
weights := make(map[dstKey]string)
for k := range dstBasicStats {
weights[k] = ""
}
name := req.GetSelector().GetResource().GetName()
namespace := req.GetSelector().GetResource().GetNamespace()
// Check if a ServiceProfile exists for the Service
spName := fmt.Sprintf("%s.%s.svc.%s", name, namespace, s.clusterDomain)
sp, err := s.k8sAPI.SP().Lister().ServiceProfiles(namespace).Get(spName)
if err == nil {
for _, weightedDst := range sp.Spec.DstOverrides {
weights[dstKey{
Namespace: namespace,
Service: name,
Dst: dstFromAuthority(weightedDst.Authority),
}] = weightedDst.Weight.String()
}
} else if !kerrors.IsNotFound(err) {
log.Errorf("Failed to get weights from ServiceProfile %s: %v", spName, err)
}
for k, weight := range weights {
row := pb.StatTable_PodGroup_Row{
Resource: &pb.Resource{
Name: k.Service,
Namespace: k.Namespace,
Type: req.GetSelector().GetResource().GetType(),
},
TimeWindow: req.TimeWindow,
Stats: dstBasicStats[k],
TcpStats: dstTCPStats[k],
}
// Set TrafficSplitStats only when weight is not empty
if weight != "" {
row.TsStats = &pb.TrafficSplitStats{
Apex: k.Service,
Leaf: k.Dst,
Weight: weight,
}
}
rows = append(rows, &row)
}
// sort rows before returning in order to have a consistent order for tests
rows = sortTrafficSplitRows(rows)
rsp := pb.StatTable{
Table: &pb.StatTable_PodGroup_{
PodGroup: &pb.StatTable_PodGroup{
Rows: rows,
},
},
}
return resourceResult{res: &rsp, err: nil}
}
func sortTrafficSplitRows(rows []*pb.StatTable_PodGroup_Row) []*pb.StatTable_PodGroup_Row {
sort.Slice(rows, func(i, j int) bool {
key1 := rows[i].TsStats.Apex + rows[i].TsStats.Leaf
key2 := rows[j].TsStats.Apex + rows[j].TsStats.Leaf
return key1 < key2
if rows[i].TsStats != nil && rows[j].TsStats != nil {
key1 := rows[i].TsStats.Apex + rows[i].TsStats.Leaf
key2 := rows[j].TsStats.Apex + rows[j].TsStats.Leaf
return key1 < key2
}
return false
})
return rows
}
@ -464,28 +548,30 @@ func buildRequestLabels(req *pb.StatSummaryRequest) (labels model.LabelSet, labe
return
}
func buildTrafficSplitRequestLabels(req *pb.StatSummaryRequest) (labels model.LabelSet, labelNames model.LabelNames) {
// Trafficsplit labels are always direction="outbound". If the --from or --to flags were used,
// we merge an additional ToResource or FromResource label. Trafficsplit metrics results are
// always grouped by dst_service.
// N.b. requests to a traffic split may come from any namespace so we do not do any filtering
// by namespace.
func buildServiceRequestLabels(req *pb.StatSummaryRequest) (labels model.LabelSet, labelNames model.LabelNames) {
// Service Request labels are always direction="outbound". If the --from or --to flags were used,
// we merge an additional ToResource or FromResource label. Service metrics results are
// always grouped by dst_service, and dst_namespace (to avoid conflicts) .
labels = model.LabelSet{
"direction": model.LabelValue("outbound"),
}
switch out := req.Outbound.(type) {
case *pb.StatSummaryRequest_ToResource:
// if --to flag is passed, Calculate traffic sent to the service
// with additional filtering narrowing down to the workload
// it is sent to.
labels = labels.Merge(promDstQueryLabels(out.ToResource))
case *pb.StatSummaryRequest_FromResource:
// if --from flag is passed, FromResource is never a service here
labels = labels.Merge(promQueryLabels(out.FromResource))
default:
// no extra labels needed
}
groupBy := model.LabelNames{model.LabelName("dst_service")}
groupBy := model.LabelNames{model.LabelName("dst_namespace"), model.LabelName("dst_service")}
return labels, groupBy
}
@ -530,26 +616,23 @@ func (s *grpcServer) getStatMetrics(ctx context.Context, req *pb.StatSummaryRequ
func (s *grpcServer) getTrafficSplitMetrics(ctx context.Context, req *pb.StatSummaryRequest, tsStats *trafficSplitStats, timeWindow string) (map[tsKey]*pb.BasicStats, error) {
tsBasicStats := make(map[tsKey]*pb.BasicStats)
labels, groupBy := buildTrafficSplitRequestLabels(req)
labels, groupBy := buildServiceRequestLabels(req)
apex := tsStats.apex
namespace := tsStats.namespace
// TODO: add cluster domain to stringToMatch
stringToMatch := fmt.Sprintf("%s.%s.svc", apex, namespace)
reqLabels := generateLabelStringWithRegex(labels, "authority", stringToMatch)
reqLabels := generateLabelStringWithRegex(labels, string(authorityLabel), stringToMatch)
promQueries := map[promType]string{
promRequests: fmt.Sprintf(reqQuery, reqLabels, timeWindow, groupBy.String()),
}
quantileQueries := generateQuantileQueries(latencyQuantileQuery, reqLabels, timeWindow, groupBy.String())
results, err := s.getPrometheusMetrics(ctx, promQueries, quantileQueries)
if err != nil {
return nil, err
}
basicStats, _ := processPrometheusMetrics(req, results, groupBy) // we don't need tcpStat info for traffic split
for rKey, basicStatsVal := range basicStats {
@ -564,6 +647,75 @@ func (s *grpcServer) getTrafficSplitMetrics(ctx context.Context, req *pb.StatSum
return tsBasicStats, nil
}
func (s *grpcServer) getServiceMetrics(ctx context.Context, req *pb.StatSummaryRequest, timeWindow string) (map[dstKey]*pb.BasicStats, map[dstKey]*pb.TcpStats, error) {
dstBasicStats := make(map[dstKey]*pb.BasicStats)
dstTCPStats := make(map[dstKey]*pb.TcpStats)
labels, groupBy := buildServiceRequestLabels(req)
service := req.GetSelector().GetResource().GetName()
namespace := req.GetSelector().GetResource().GetNamespace()
if service == "" {
service = regexAny
}
authority := fmt.Sprintf("%s.%s.svc.%s", service, namespace, s.clusterDomain)
reqLabels := generateLabelStringWithRegex(labels, string(authorityLabel), authority)
promQueries := map[promType]string{
promRequests: fmt.Sprintf(reqQuery, reqLabels, timeWindow, groupBy.String()),
}
if req.TcpStats {
// Service stats always need to have `peer=dst`, cuz there is no `src` with `authority` label
tcpLabels := labels.Merge(promPeerLabel("dst"))
tcpLabelString := generateLabelStringWithRegex(tcpLabels, string(authorityLabel), authority)
promQueries[promTCPConnections] = fmt.Sprintf(tcpConnectionsQuery, tcpLabelString, groupBy.String())
promQueries[promTCPReadBytes] = fmt.Sprintf(tcpReadBytesQuery, tcpLabelString, timeWindow, groupBy.String())
promQueries[promTCPWriteBytes] = fmt.Sprintf(tcpWriteBytesQuery, tcpLabelString, timeWindow, groupBy.String())
}
quantileQueries := generateQuantileQueries(latencyQuantileQuery, reqLabels, timeWindow, groupBy.String())
results, err := s.getPrometheusMetrics(ctx, promQueries, quantileQueries)
if err != nil {
return nil, nil, err
}
basicStats, tcpStats := processPrometheusMetrics(req, results, groupBy)
for rKey, basicStatsVal := range basicStats {
// Use the returned `dst_service` in the `all` svc case
svcName := service
if svcName == regexAny {
svcName = rKey.Name
}
dstBasicStats[dstKey{
Namespace: rKey.Namespace,
Service: svcName,
Dst: rKey.Name,
}] = basicStatsVal
}
for rKey, tcpStatsVal := range tcpStats {
// Use the returned `dst_service` in the `all` svc case
svcName := service
if svcName == regexAny {
svcName = rKey.Name
}
dstTCPStats[dstKey{
Namespace: rKey.Namespace,
Service: svcName,
Dst: rKey.Name,
}] = tcpStatsVal
}
return dstBasicStats, dstTCPStats, nil
}
func processPrometheusMetrics(req *pb.StatSummaryRequest, results []promResult, groupBy model.LabelNames) (map[rKey]*pb.BasicStats, map[rKey]*pb.TcpStats) {
basicStats := make(map[rKey]*pb.BasicStats)
tcpStats := make(map[rKey]*pb.TcpStats)
@ -717,3 +869,13 @@ func getLabelSelector(req *pb.StatSummaryRequest) (labels.Selector, error) {
}
return labelSelector, nil
}
func dstFromAuthority(authority string) string {
// name.namespace.svc.suffix
labels := strings.Split(authority, ".")
if len(labels) >= 3 && labels[2] == "svc" {
// name
return labels[0]
}
return authority
}

View File

@ -1406,13 +1406,16 @@ status:
Rows: []*pb.StatTable_PodGroup_Row{
{
Resource: &pb.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Service,
Name: "emoji-svc",
Type: pkgK8s.Service,
},
TimeWindow: "1m",
Stats: &pb.BasicStats{
SuccessCount: 123,
FailureCount: 0,
LatencyMsP50: 123,
LatencyMsP95: 123,
LatencyMsP99: 123,
},
TimeWindow: "1m",
MeshedPodCount: 1,
RunningPodCount: 1,
},
},
},
@ -1510,29 +1513,6 @@ status:
{
req: &pb.StatSummaryRequest{},
},
{
req: &pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
Type: pkgK8s.Service,
},
},
},
},
{
req: &pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
Type: pkgK8s.Service,
},
},
Outbound: &pb.StatSummaryRequest_ToResource{
ToResource: &pb.Resource{
Type: pkgK8s.Pod,
},
},
},
},
{
req: &pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
@ -1572,6 +1552,29 @@ status:
},
},
},
{
req: &pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
Type: pkgK8s.Service,
},
},
},
},
{
req: &pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
Type: pkgK8s.Service,
},
},
Outbound: &pb.StatSummaryRequest_ToResource{
ToResource: &pb.Resource{
Type: pkgK8s.Pod,
},
},
},
},
{
req: &pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{