mirror of https://github.com/linkerd/linkerd2.git
252 lines
6.9 KiB
Go
252 lines
6.9 KiB
Go
package heartbeat
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
"time"
|
|
|
|
pkgK8s "github.com/linkerd/linkerd2/controller/k8s"
|
|
"github.com/linkerd/linkerd2/pkg/config"
|
|
"github.com/linkerd/linkerd2/pkg/k8s"
|
|
"github.com/linkerd/linkerd2/pkg/util"
|
|
"github.com/linkerd/linkerd2/pkg/version"
|
|
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
|
|
"github.com/prometheus/common/model"
|
|
log "github.com/sirupsen/logrus"
|
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
)
|
|
|
|
type containerMeta struct {
|
|
name model.LabelValue
|
|
ns model.LabelValue
|
|
}
|
|
|
|
// K8sValues gathers relevant heartbeat information from Kubernetes
|
|
func K8sValues(ctx context.Context, kubeAPI *k8s.KubernetesAPI, controlPlaneNamespace string) url.Values {
|
|
v := url.Values{}
|
|
|
|
cm, err := config.FetchLinkerdConfigMap(ctx, kubeAPI, controlPlaneNamespace)
|
|
if err != nil {
|
|
log.Errorf("Failed to fetch linkerd-config: %s", err)
|
|
} else {
|
|
v.Set("uuid", string(cm.GetUID()))
|
|
v.Set("install-time", strconv.FormatInt(cm.GetCreationTimestamp().Unix(), 10))
|
|
}
|
|
|
|
versionInfo, err := kubeAPI.GetVersionInfo()
|
|
if err != nil {
|
|
log.Errorf("Failed to fetch Kubernetes version info: %s", err)
|
|
} else {
|
|
v.Set("k8s-version", versionInfo.String())
|
|
}
|
|
|
|
namespaces, err := kubeAPI.GetAllNamespacesWithExtensionLabel(ctx)
|
|
if err != nil {
|
|
log.Errorf("Failed to fetch namespaces with %s label: %s", k8s.LinkerdExtensionLabel, err)
|
|
} else {
|
|
for _, ns := range namespaces {
|
|
extensionNameParam := fmt.Sprintf("ext-%s", ns.Labels[k8s.LinkerdExtensionLabel])
|
|
v.Set(extensionNameParam, "1")
|
|
}
|
|
}
|
|
|
|
err = k8s.ServiceProfilesAccess(ctx, kubeAPI)
|
|
if err != nil {
|
|
log.Errorf("Failed to verify service profile access: %s", err)
|
|
return v
|
|
}
|
|
|
|
l5dCrdClient, err := pkgK8s.NewL5DCRDClient(kubeAPI.Config)
|
|
if err != nil {
|
|
log.Errorf("Failed to create Linkerd CRD client: %s", err)
|
|
return v
|
|
}
|
|
|
|
spList, err := l5dCrdClient.LinkerdV1alpha2().ServiceProfiles("").List(ctx, v1.ListOptions{})
|
|
if err != nil {
|
|
log.Errorf("Failed to get service profiles: %s", err)
|
|
return v
|
|
}
|
|
|
|
v.Set("service-profile-count", strconv.Itoa(len(spList.Items)))
|
|
|
|
return v
|
|
}
|
|
|
|
// PromValues gathers relevant heartbeat information from Prometheus
|
|
func PromValues(promAPI promv1.API, controlPlaneNamespace string) url.Values {
|
|
v := url.Values{}
|
|
|
|
jobProxyLabels := model.LabelSet{"job": "linkerd-proxy"}
|
|
|
|
// total-rps
|
|
query := fmt.Sprintf("sum(rate(request_total%s[30s]))", jobProxyLabels.Merge(model.LabelSet{"direction": "inbound"}))
|
|
value, err := promQuery(promAPI, query, 0)
|
|
if err != nil {
|
|
log.Errorf("Prometheus query failed: %s", err)
|
|
} else {
|
|
v.Set("total-rps", value)
|
|
}
|
|
|
|
// meshed-pods
|
|
query = fmt.Sprintf("count(count by (pod) (request_total%s))", jobProxyLabels)
|
|
value, err = promQuery(promAPI, query, 0)
|
|
if err != nil {
|
|
log.Errorf("Prometheus query failed: %s", err)
|
|
} else {
|
|
v.Set("meshed-pods", value)
|
|
}
|
|
|
|
// p95-handle-us
|
|
query = fmt.Sprintf("histogram_quantile(0.99, sum(rate(request_handle_us_bucket%s[24h])) by (le))", jobProxyLabels)
|
|
value, err = promQuery(promAPI, query, 0)
|
|
if err != nil {
|
|
log.Errorf("Prometheus query failed: %s", err)
|
|
} else {
|
|
v.Set("p99-handle-us", value)
|
|
}
|
|
|
|
// proxy-injector-injections
|
|
jobInjectorLabels := model.LabelSet{
|
|
"job": "linkerd-controller",
|
|
"skip": "false",
|
|
}
|
|
query = fmt.Sprintf("sum(proxy_inject_admission_responses_total%s)", jobInjectorLabels)
|
|
value, err = promQuery(promAPI, query, 0)
|
|
if err != nil {
|
|
log.Errorf("Prometheus query failed: %s", err)
|
|
} else {
|
|
v.Set("proxy-injector-injections", value)
|
|
}
|
|
|
|
// container metrics
|
|
for _, container := range []containerMeta{
|
|
{
|
|
name: "linkerd-proxy",
|
|
},
|
|
{
|
|
name: "destination",
|
|
ns: "linkerd",
|
|
},
|
|
{
|
|
name: "prometheus",
|
|
ns: "linkerd",
|
|
},
|
|
} {
|
|
// as of k8s 1.16 cadvisor labels container names with just `container`
|
|
containerLabelsPre16 := getLabelSet(container, "container_name")
|
|
containerLabelsPost16 := getLabelSet(container, "container")
|
|
|
|
// max-mem
|
|
query = fmt.Sprintf("max(container_memory_working_set_bytes%s or container_memory_working_set_bytes%s)",
|
|
containerLabelsPre16, containerLabelsPost16)
|
|
value, err = promQuery(promAPI, query, 0)
|
|
if err != nil {
|
|
log.Errorf("Prometheus query failed: %s", err)
|
|
} else {
|
|
param := fmt.Sprintf("max-mem-%s", container.name)
|
|
v.Set(param, value)
|
|
}
|
|
|
|
// p95-cpu
|
|
query = fmt.Sprintf("max(quantile_over_time(0.95,rate(container_cpu_usage_seconds_total%s[5m])[24h:5m]) "+
|
|
"or quantile_over_time(0.95,rate(container_cpu_usage_seconds_total%s[5m])[24h:5m]))",
|
|
containerLabelsPre16, containerLabelsPost16)
|
|
value, err = promQuery(promAPI, query, 3)
|
|
if err != nil {
|
|
log.Errorf("Prometheus query failed: %s", err)
|
|
} else {
|
|
param := fmt.Sprintf("p95-cpu-%s", container.name)
|
|
v.Set(param, value)
|
|
}
|
|
}
|
|
|
|
return v
|
|
}
|
|
|
|
func getLabelSet(container containerMeta, containerKey model.LabelName) model.LabelSet {
|
|
containerLabels := model.LabelSet{
|
|
"job": "kubernetes-nodes-cadvisor",
|
|
containerKey: container.name,
|
|
}
|
|
if container.ns != "" {
|
|
containerLabels["namespace"] = container.ns
|
|
}
|
|
return containerLabels
|
|
}
|
|
|
|
func promQuery(promAPI promv1.API, query string, precision int) (string, error) {
|
|
log.Debugf("Prometheus query: %s", query)
|
|
|
|
res, warn, err := promAPI.Query(context.Background(), query, time.Time{})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if warn != nil {
|
|
log.Warnf("%v", warn)
|
|
}
|
|
|
|
if result, ok := res.(model.Vector); ok {
|
|
if len(result) != 1 {
|
|
return "", fmt.Errorf("unexpected result Prometheus result vector length: %d", len(result))
|
|
}
|
|
f := float64(result[0].Value)
|
|
if math.IsNaN(f) {
|
|
return "", fmt.Errorf("unexpected sample value: %v", result[0].Value)
|
|
}
|
|
|
|
return strconv.FormatFloat(f, 'f', precision, 64), nil
|
|
}
|
|
|
|
return "", fmt.Errorf("unexpected query result type (expected Vector): %s", res.Type())
|
|
}
|
|
|
|
// MergeValues merges two url.Values
|
|
func MergeValues(v1, v2 url.Values) url.Values {
|
|
v := url.Values{}
|
|
for key, val := range v1 {
|
|
v[key] = val
|
|
}
|
|
for key, val := range v2 {
|
|
v[key] = val
|
|
}
|
|
return v
|
|
}
|
|
|
|
// Send takes a map of url.Values and sends them to versioncheck.linkerd.io
|
|
func Send(v url.Values) error {
|
|
return send(http.DefaultClient, version.CheckURL, v)
|
|
}
|
|
|
|
func send(client *http.Client, baseURL string, v url.Values) error {
|
|
req, err := http.NewRequest("GET", baseURL, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create HTTP request for base URL [%s]: %w", baseURL, err)
|
|
}
|
|
req.URL.RawQuery = v.Encode()
|
|
|
|
log.Infof("Sending heartbeat: %s", req.URL.String())
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("check URL [%s] request failed with: %w", req.URL.String(), err)
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
|
|
body, err := util.ReadAllLimit(resp.Body, util.MB)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read response body: %w", err)
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
return fmt.Errorf("request failed with code %d; response body: %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
log.Infof("Successfully sent heartbeat: %s", string(body))
|
|
|
|
return nil
|
|
}
|