Remove linkerd-viz dependency from linkerd-mutlicluster `gateways` command (#8467)

This changes linkerd-multicluster's `gateways` command to use the service mirror component's `/metrics` endpoint so that there is no longer a dependency on linkerd-viz. The dependency on linkerd-viz is leftover from when those components were part of the default installation meaning that we could always rely on the Prometheus component being present.

Now, the `gateways` command starts a port-forward to each service mirror component (for each linked cluster) and queries the `/metrics` endpoint for the `gateway_alive` and `gateway_latency` metrics. It then queries the local cluster for the number of mirror services that correspond to the target cluster of that service mirror. Using these three data points, it creates the output table for the command.

### Output changes

Currently the `gateways` command displays the P50, P95, and P99 latencies for each gateway

```shell
$ linkerd multicluster gateways 
CLUSTER  ALIVE    NUM_SVC  LATENCY_P50  LATENCY_P95  LATENCY_P99  
k3d-x    True           1          1ms          3ms          3ms  
k3d-z    True           0          1ms          3ms          3ms
```

With this change, we now just show the last observed latency. This involved adding the `gateway_latency` metric Gauge — different from the current latencies Observer.

```shell
$ linkerd multicluster gateways
CLUSTER  ALIVE    NUM_SVC      LATENCY  
k3d-x    True           1          2ms  
k3d-z    True           0          3ms
```

This is because I have not found a Prometheus common library for taking the parsed metrics from `/metrics` and turning that into a histogram yet; I think we should be able to do this but I'm leaving as a follow-up for now.

Signed-off-by: Kevin Leimkuhler <kleimkuhler@icloud.com>
This commit is contained in:
Kevin Leimkuhler 2022-05-31 13:51:38 -06:00 committed by GitHub
parent f85c238023
commit 5d0e676f0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 251 additions and 123 deletions

View File

@ -50,7 +50,7 @@ func newCmdControllerMetrics() *cobra.Command {
return err
}
results := getMetrics(k8sAPI, pods.Items, adminHTTPPortName, options.wait, verbose)
results := getMetrics(k8sAPI, pods.Items, k8s.AdminHTTPPortName, options.wait, verbose)
var buf bytes.Buffer
for i, result := range results {

View File

@ -4,10 +4,6 @@ import (
"github.com/spf13/cobra"
)
const (
adminHTTPPortName string = "admin-http"
)
// newCmdDiagnostics creates a new cobra command `diagnostics` which contains commands to fetch Linkerd diagnostics
func newCmdDiagnostics() *cobra.Command {
@ -24,7 +20,7 @@ This command provides subcommands to diagnose the functionality of Linkerd.`,
# Get metrics from the web deployment in the emojivoto namespace.
linkerd diagnostics proxy-metrics -n emojivoto deploy/web
# Get the endpoints for authorities in Linkerd's control-plane itself
linkerd diagnostics endpoints web.linkerd-viz.svc.cluster.local:8084
`,

View File

@ -4,9 +4,6 @@ import (
"bytes"
"crypto/sha256"
"fmt"
"io/ioutil"
"net/http"
"os"
"sort"
"sync/atomic"
"time"
@ -35,42 +32,6 @@ func (s byResult) Less(i, j int) bool {
return s[i].pod < s[j].pod || ((s[i].pod == s[j].pod) && s[i].container < s[j].container)
}
// getResponse makes a http Get request to the passed url and returns the response/error
func getResponse(url string) ([]byte, error) {
// url has been constructed by k8s.newPortForward and is not passed in by
// the user.
//nolint:gosec
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
// getContainerMetrics returns the metrics exposed by a container on the passed in portName
func getContainerMetrics(
k8sAPI *k8s.KubernetesAPI,
pod corev1.Pod,
container corev1.Container,
emitLogs bool,
portName string,
) ([]byte, error) {
portForward, err := k8s.NewContainerMetricsForward(k8sAPI, pod, container, emitLogs, portName)
if err != nil {
return nil, err
}
defer portForward.Stop()
if err = portForward.Init(); err != nil {
fmt.Fprintf(os.Stderr, "Error running port-forward: %s", err)
return nil, err
}
metricsURL := portForward.URLFor("/metrics")
return getResponse(metricsURL)
}
// getAllContainersWithPort returns all the containers within
// a pod which exposes metrics at a port with name portName
func getAllContainersWithPort(
@ -119,7 +80,7 @@ func getMetrics(
}
for _, c := range containers {
bytes, err := getContainerMetrics(k8sAPI, p, c, emitLogs, portName)
bytes, err := k8s.GetContainerMetrics(k8sAPI, p, c, emitLogs, portName)
resultChan <- metricsResult{
pod: p.GetName(),

View File

@ -1,95 +1,218 @@
package cmd
import (
"bytes"
"context"
"fmt"
"io"
"os"
"sync/atomic"
"time"
"github.com/linkerd/linkerd2/cli/table"
"github.com/linkerd/linkerd2/pkg/k8s"
vizCmd "github.com/linkerd/linkerd2/viz/cmd"
"github.com/linkerd/linkerd2/viz/metrics-api/client"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type (
gatewaysOptions struct {
gatewayNamespace string
clusterName string
wait time.Duration
}
gatewayMetrics struct {
clusterName string
metrics []byte
err error
}
gatewayStatus struct {
clusterName string
timeWindow string
alive bool
numberOfServices int
latency uint64
}
)
func newGatewaysOptions() *gatewaysOptions {
return &gatewaysOptions{
wait: 30 * time.Second,
}
}
func newGatewaysCommand() *cobra.Command {
opts := gatewaysOptions{}
opts := newGatewaysOptions()
cmd := &cobra.Command{
Use: "gateways",
Short: "Display stats information about the gateways in target clusters",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
req := &pb.GatewaysRequest{
RemoteClusterName: opts.clusterName,
GatewayNamespace: opts.gatewayNamespace,
TimeWindow: opts.timeWindow,
}
k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
if err != nil {
return err
}
ctx := cmd.Context()
vizNs, err := k8sAPI.GetNamespaceWithExtensionLabel(ctx, vizCmd.ExtensionName)
// Get all the service mirror components in the
// linkerd-multicluster namespace which we'll collect gateway
// metrics from.
multiclusterNs, err := k8sAPI.GetNamespaceWithExtensionLabel(cmd.Context(), MulticlusterExtensionName)
if err != nil {
return fmt.Errorf("make sure the linkerd-viz extension is installed, using 'linkerd viz install' (%w)", err)
return fmt.Errorf("make sure the linkerd-multicluster extension is installed, using 'linkerd multicluster install' (%w)", err)
}
client, err := client.NewExternalClient(ctx, vizNs.Name, k8sAPI)
if err != nil {
return err
selector := fmt.Sprintf("component=%s", "linkerd-service-mirror")
if opts.clusterName != "" {
selector = fmt.Sprintf("%s,mirror.linkerd.io/cluster-name=%s", selector, opts.clusterName)
}
resp, err := requestGatewaysFromAPI(client, req)
pods, err := k8sAPI.CoreV1().Pods(multiclusterNs.Name).List(cmd.Context(), metav1.ListOptions{LabelSelector: selector})
if err != nil {
fmt.Fprintln(os.Stderr, err)
fmt.Fprintf(os.Stderr, "failed to list pods in namespace %s: %s", multiclusterNs.Name, err)
os.Exit(1)
}
renderGateways(resp.GetOk().GatewaysTable.Rows, stdout)
var statuses []gatewayStatus
gatewayMetrics := getGatewayMetrics(k8sAPI, pods.Items, opts.wait)
for _, gateway := range gatewayMetrics {
if gateway.err != nil {
fmt.Fprintf(os.Stderr, "Failed to get gateway status for %s: %s\n", gateway.clusterName, gateway.err)
continue
}
gatewayStatus := gatewayStatus{
clusterName: gateway.clusterName,
}
// Parse the gateway metrics so that we can extract liveness
// and latency information.
var metricsParser expfmt.TextParser
parsedMetrics, err := metricsParser.TextToMetricFamilies(bytes.NewReader(gateway.metrics))
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to parse metrics for %s: %s\n", gateway.clusterName, err)
continue
}
// Check if the gateway is alive by using the gateway_alive
// metric and ensuring the label matches the target cluster.
for _, metrics := range parsedMetrics["gateway_alive"].GetMetric() {
if !isTargetClusterMetric(metrics, gateway.clusterName) {
continue
}
if metrics.GetGauge().GetValue() == 1 {
gatewayStatus.alive = true
break
}
}
// Search the local cluster for mirror services that are
// mirrored from the target cluster.
selector := fmt.Sprintf("%s=%s,%s=%s",
k8s.MirroredResourceLabel, "true",
k8s.RemoteClusterNameLabel, gateway.clusterName,
)
services, err := k8sAPI.CoreV1().Services(corev1.NamespaceAll).List(context.Background(), metav1.ListOptions{LabelSelector: selector})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to list services for %s: %s\n", gateway.clusterName, err)
continue
}
gatewayStatus.numberOfServices = len(services.Items)
// Check the last observed latency by using the
// gateway_latency metric and ensuring the label the target
// cluster.
for _, metrics := range parsedMetrics["gateway_latency"].GetMetric() {
if !isTargetClusterMetric(metrics, gateway.clusterName) {
continue
}
gatewayStatus.latency = uint64(metrics.GetGauge().GetValue())
break
}
statuses = append(statuses, gatewayStatus)
}
renderGateways(statuses, stdout)
return nil
},
}
cmd.Flags().StringVar(&opts.clusterName, "cluster-name", "", "the name of the target cluster")
cmd.Flags().StringVar(&opts.gatewayNamespace, "gateway-namespace", "", "the namespace in which the gateway resides on the target cluster")
cmd.Flags().StringVarP(&opts.timeWindow, "time-window", "t", "1m", "Time window (for example: \"15s\", \"1m\", \"10m\", \"1h\"). Needs to be at least 15s.")
cmd.Flags().DurationVarP(&opts.wait, "wait", "w", opts.wait, "time allowed to fetch diagnostics")
return cmd
}
func requestGatewaysFromAPI(client pb.ApiClient, req *pb.GatewaysRequest) (*pb.GatewaysResponse, error) {
resp, err := client.Gateways(context.Background(), req)
if err != nil {
return nil, fmt.Errorf("Gateways API error: %w", err)
func getGatewayMetrics(k8sAPI *k8s.KubernetesAPI, pods []corev1.Pod, wait time.Duration) []gatewayMetrics {
var metrics []gatewayMetrics
metricsChan := make(chan gatewayMetrics)
var activeRoutines int32
for _, pod := range pods {
atomic.AddInt32(&activeRoutines, 1)
go func(p corev1.Pod) {
defer atomic.AddInt32(&activeRoutines, -1)
name := p.Labels[k8s.RemoteClusterNameLabel]
container, err := getServiceMirrorContainer(p)
if err != nil {
metricsChan <- gatewayMetrics{
clusterName: name,
err: err,
}
return
}
metrics, err := k8s.GetContainerMetrics(k8sAPI, p, container, false, k8s.AdminHTTPPortName)
metricsChan <- gatewayMetrics{
clusterName: name,
metrics: metrics,
err: err,
}
}(pod)
}
if e := resp.GetError(); e != nil {
return nil, fmt.Errorf("Gateways API response error: %v", e.Error)
timeout := time.NewTimer(wait)
defer timeout.Stop()
wait:
for {
select {
case metric := <-metricsChan:
metrics = append(metrics, metric)
case <-timeout.C:
break wait
}
if atomic.LoadInt32(&activeRoutines) == 0 {
break
}
}
return resp, nil
return metrics
}
func renderGateways(rows []*pb.GatewaysTable_Row, w io.Writer) {
func getServiceMirrorContainer(pod corev1.Pod) (corev1.Container, error) {
if pod.Status.Phase != corev1.PodRunning {
return corev1.Container{}, fmt.Errorf("pod not running: %s", pod.GetName())
}
for _, c := range pod.Spec.Containers {
if c.Name == "service-mirror" {
return c, nil
}
}
return corev1.Container{}, fmt.Errorf("pod %s did not have 'service-mirror' container", pod.GetName())
}
func isTargetClusterMetric(metric *io_prometheus_client.Metric, targetClusterName string) bool {
for _, label := range metric.GetLabel() {
if label.GetName() == "target_cluster_name" {
return label.GetValue() == targetClusterName
}
}
return false
}
func renderGateways(statuses []gatewayStatus, w io.Writer) {
t := buildGatewaysTable()
t.Data = []table.Row{}
for _, row := range rows {
row := row // Copy to satisfy golint.
t.Data = append(t.Data, gatewaysRowToTableRow(row))
for _, status := range statuses {
status := status
t.Data = append(t.Data, gatewayStatusToTableRow(status))
}
t.Render(w)
}
@ -98,79 +221,53 @@ var (
clusterNameHeader = "CLUSTER"
aliveHeader = "ALIVE"
pairedServicesHeader = "NUM_SVC"
latencyP50Header = "LATENCY_P50"
latencyP95Header = "LATENCY_P95"
latencyP99Header = "LATENCY_P99"
latencyHeader = "LATENCY"
)
func buildGatewaysTable() table.Table {
columns := []table.Column{
table.Column{
{
Header: clusterNameHeader,
Width: 7,
Flexible: true,
LeftAlign: true,
},
table.Column{
{
Header: aliveHeader,
Width: 5,
Flexible: true,
LeftAlign: true,
},
table.Column{
{
Header: pairedServicesHeader,
Width: 9,
},
table.Column{
Header: latencyP50Header,
Width: 11,
},
table.Column{
Header: latencyP95Header,
Width: 11,
},
table.Column{
Header: latencyP99Header,
{
Header: latencyHeader,
Width: 11,
},
}
t := table.NewTable(columns, []table.Row{})
t.Sort = []int{0, 1} // Sort by namespace, then name.
t.Sort = []int{0} // sort by cluster name
return t
}
func gatewaysRowToTableRow(row *pb.GatewaysTable_Row) []string {
func gatewayStatusToTableRow(status gatewayStatus) []string {
valueOrPlaceholder := func(value string) string {
if row.Alive {
if status.alive {
return value
}
return "-"
}
alive := "False"
if row.Alive {
if status.alive {
alive = "True"
}
return []string{
row.ClusterName,
status.clusterName,
alive,
fmt.Sprint(row.PairedServices),
valueOrPlaceholder(fmt.Sprintf("%dms", row.LatencyMsP50)),
valueOrPlaceholder(fmt.Sprintf("%dms", row.LatencyMsP95)),
valueOrPlaceholder(fmt.Sprintf("%dms", row.LatencyMsP99)),
fmt.Sprint(status.numberOfServices),
valueOrPlaceholder(fmt.Sprintf("%dms", status.latency)),
}
}
func extractGatewayPort(gateway *corev1.Service) (uint32, error) {
for _, port := range gateway.Spec.Ports {
if port.Name == k8s.GatewayPortName {
if gateway.Spec.Type == "NodePort" {
return uint32(port.NodePort), nil
}
return uint32(port.Port), nil
}
}
return 0, fmt.Errorf("gateway service %s has no gateway port named %s", gateway.Name, k8s.GatewayPortName)
}

View File

@ -415,3 +415,15 @@ func buildServiceMirrorValues(opts *linkOptions) (*multicluster.Values, error) {
return defaults, nil
}
func extractGatewayPort(gateway *corev1.Service) (uint32, error) {
for _, port := range gateway.Spec.Ports {
if port.Name == k8s.GatewayPortName {
if gateway.Spec.Type == "NodePort" {
return uint32(port.NodePort), nil
}
return uint32(port.Port), nil
}
}
return 0, fmt.Errorf("gateway service %s has no gateway port named %s", gateway.Name, k8s.GatewayPortName)
}

View File

@ -16,6 +16,7 @@ const (
// workers.
type ProbeMetricVecs struct {
alive *prometheus.GaugeVec
latency *prometheus.GaugeVec
latencies *prometheus.HistogramVec
enqueues *prometheus.CounterVec
dequeues *prometheus.CounterVec
@ -26,6 +27,7 @@ type ProbeMetricVecs struct {
// probe worker.
type ProbeMetrics struct {
alive prometheus.Gauge
latency prometheus.Gauge
latencies prometheus.Observer
probes *prometheus.CounterVec
unregister func()
@ -79,6 +81,14 @@ func NewProbeMetricVecs() ProbeMetricVecs {
labelNames,
)
latency := promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "gateway_latency",
Help: "A gauge which is the latency of the last probe to the gateway.",
},
labelNames,
)
latencies := promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "gateway_probe_latency_ms",
@ -95,6 +105,7 @@ func NewProbeMetricVecs() ProbeMetricVecs {
return ProbeMetricVecs{
alive: alive,
latency: latency,
latencies: latencies,
enqueues: enqueues,
dequeues: dequeues,
@ -116,6 +127,7 @@ func (mv ProbeMetricVecs) NewWorkerMetrics(remoteClusterName string) (*ProbeMetr
}
return &ProbeMetrics{
alive: mv.alive.With(labels),
latency: mv.latency.With(labels),
latencies: mv.latencies.With(labels),
probes: curriedProbes,
unregister: func() {

View File

@ -117,6 +117,7 @@ func (pw *ProbeWorker) doProbe() {
} else {
pw.log.Debug("Gateway is healthy")
pw.metrics.alive.Set(1)
pw.metrics.latency.Set(float64(end.Milliseconds()))
pw.metrics.latencies.Observe(float64(end.Milliseconds()))
pw.metrics.probes.With(successLabel).Inc()
if !pw.alive {

49
pkg/k8s/metrics.go Normal file
View File

@ -0,0 +1,49 @@
package k8s
import (
"fmt"
"io/ioutil"
"net/http"
"os"
corev1 "k8s.io/api/core/v1"
)
// AdminHTTPPortName is the name of the port used by the admin http server.
const AdminHTTPPortName string = "admin-http"
// GetContainerMetrics returns the metrics exposed by a container on the passed in portName
func GetContainerMetrics(
k8sAPI *KubernetesAPI,
pod corev1.Pod,
container corev1.Container,
emitLogs bool,
portName string,
) ([]byte, error) {
portForward, err := NewContainerMetricsForward(k8sAPI, pod, container, emitLogs, portName)
if err != nil {
return nil, err
}
defer portForward.Stop()
if err = portForward.Init(); err != nil {
fmt.Fprintf(os.Stderr, "Error running port-forward: %s", err)
return nil, err
}
metricsURL := portForward.URLFor("/metrics")
return getResponse(metricsURL)
}
// getResponse makes a http Get request to the passed url and returns the response/error
func getResponse(url string) ([]byte, error) {
// url has been constructed by k8s.newPortForward and is not passed in by
// the user.
//nolint:gosec
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}

View File

@ -86,7 +86,7 @@ func TestGateways(t *testing.T) {
return errors.New("response is empty")
}
fields := strings.Fields(rows[1])
if len(fields) < 6 {
if len(fields) < 4 {
return fmt.Errorf("unexpected number of columns: %d", len(fields))
}
if fields[0] != "target" {