package public import ( "context" "fmt" "math" "runtime" "sort" "strings" "time" "github.com/golang/protobuf/ptypes/duration" promv1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/runconduit/conduit/controller/api/util" healthcheckPb "github.com/runconduit/conduit/controller/gen/common/healthcheck" tapPb "github.com/runconduit/conduit/controller/gen/controller/tap" telemPb "github.com/runconduit/conduit/controller/gen/controller/telemetry" pb "github.com/runconduit/conduit/controller/gen/public" pkgK8s "github.com/runconduit/conduit/pkg/k8s" "github.com/runconduit/conduit/pkg/version" log "github.com/sirupsen/logrus" k8sV1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" applisters "k8s.io/client-go/listers/apps/v1" corelisters "k8s.io/client-go/listers/core/v1" ) type ( grpcServer struct { prometheusAPI promv1.API telemetryClient telemPb.TelemetryClient tapClient tapPb.TapClient deployLister applisters.DeploymentLister replicaSetLister applisters.ReplicaSetLister podLister corelisters.PodLister controllerNamespace string ignoredNamespaces []string } successRate struct { success float64 failure float64 } // these structs couple responses with an error, useful when returning results via channels metricResult struct { series []pb.MetricSeries err error } queryResult struct { res telemPb.QueryResponse err error } queryResultWithLabel struct { label pb.HistogramLabel queryResult } // sortable slice of unix ms timestamps timestamps []int64 ) const ( podQuery = "sum(request_total) by (pod)" countQuery = "sum(irate(responses_total{%s}[%s])) by (%s)" countHttpQuery = "sum(irate(http_requests_total{%s}[%s])) by (%s)" countGrpcQuery = "sum(irate(grpc_server_handled_total{%s}[%s])) by (%s)" latencyQuery = "sum(irate(response_latency_ms_bucket{%s}[%s])) by (%s)" quantileQuery = "histogram_quantile(%s, %s)" defaultVectorRange = "30s" // 3x scrape_interval in prometheus config targetPodLabel = "target" targetDeployLabel = "target_deployment" sourcePodLabel = "source" sourceDeployLabel = "source_deployment" jobLabel = "job" K8sClientSubsystemName = "kubernetes" K8sClientCheckDescription = "control plane can talk to Kubernetes" PromClientSubsystemName = "prometheus" PromClientCheckDescription = "control plane can talk to Prometheus" ) var ( quantileMap = map[string]pb.HistogramLabel{ "0.5": pb.HistogramLabel_P50, "0.95": pb.HistogramLabel_P95, "0.99": pb.HistogramLabel_P99, } stepMap = map[pb.TimeWindow]string{ pb.TimeWindow_TEN_SEC: "10s", pb.TimeWindow_ONE_MIN: "10s", pb.TimeWindow_TEN_MIN: "10s", pb.TimeWindow_ONE_HOUR: "1m", } aggregationMap = map[pb.AggregationType]string{ pb.AggregationType_TARGET_DEPLOY: targetDeployLabel, pb.AggregationType_SOURCE_DEPLOY: sourceDeployLabel, pb.AggregationType_MESH: jobLabel, } emptyMetadata = pb.MetricMetadata{} controlPlaneComponents = []string{"web", "controller", "prometheus", "grafana"} ) func newGrpcServer( promAPI promv1.API, telemetryClient telemPb.TelemetryClient, tapClient tapPb.TapClient, deployLister applisters.DeploymentLister, replicaSetLister applisters.ReplicaSetLister, podLister corelisters.PodLister, controllerNamespace string, ignoredNamespaces []string, ) *grpcServer { return &grpcServer{ prometheusAPI: promAPI, telemetryClient: telemetryClient, tapClient: tapClient, deployLister: deployLister, replicaSetLister: replicaSetLister, podLister: podLister, controllerNamespace: controllerNamespace, ignoredNamespaces: ignoredNamespaces, } } func (s *grpcServer) Stat(ctx context.Context, req *pb.MetricRequest) (*pb.MetricResponse, error) { var err error resultsCh := make(chan metricResult) metrics := make([]*pb.MetricSeries, 0) // kick off requests for _, metric := range req.Metrics { go func(metric pb.MetricName) { resultsCh <- s.queryMetric(ctx, req, metric) }(metric) } // process results for _ = range req.Metrics { result := <-resultsCh if result.err != nil { log.Errorf("Stat -> queryMetric failed with: %s", result.err) err = result.err } else { for i := range result.series { metrics = append(metrics, &result.series[i]) } } } // if an error occurred, return the error, along with partial results return &pb.MetricResponse{Metrics: metrics}, err } func (s *grpcServer) queryMetric(ctx context.Context, req *pb.MetricRequest, metric pb.MetricName) metricResult { result := metricResult{} switch metric { case pb.MetricName_REQUEST_RATE: if req.GroupBy == pb.AggregationType_MESH { result.series, result.err = s.requestRateMesh(ctx, req) } else { result.series, result.err = s.requestRate(ctx, req) } case pb.MetricName_SUCCESS_RATE: if req.GroupBy == pb.AggregationType_MESH { result.series, result.err = s.successRateMesh(ctx, req) } else { result.series, result.err = s.successRate(ctx, req) } case pb.MetricName_LATENCY: if req.GroupBy == pb.AggregationType_MESH { result.series = nil result.err = fmt.Errorf("latency not supported for MESH queries") } else { result.series, result.err = s.latency(ctx, req) } default: result.series = nil result.err = fmt.Errorf("unsupported metric: %s", metric) } return result } func (_ *grpcServer) Version(ctx context.Context, req *pb.Empty) (*pb.VersionInfo, error) { return &pb.VersionInfo{GoVersion: runtime.Version(), ReleaseVersion: version.Version, BuildDate: "1970-01-01T00:00:00Z"}, nil } func (s *grpcServer) ListPods(ctx context.Context, req *pb.Empty) (*pb.ListPodsResponse, error) { log.Debugf("ListPods request: %+v", req) // Reports is a map from instance name to the absolute time of the most recent // report from that instance. reports := make(map[string]time.Time) // Query Prometheus for all pods present vec, err := s.queryProm(ctx, podQuery) if err != nil { return nil, err } for _, sample := range vec { pod := string(sample.Metric["pod"]) timestamp := sample.Timestamp reports[pod] = time.Unix(0, int64(timestamp)*int64(time.Millisecond)) } pods, err := s.podLister.List(labels.Everything()) if err != nil { return nil, err } podList := make([]*pb.Pod, 0) for _, pod := range pods { if s.shouldIgnore(pod) { continue } deployment, err := s.getDeploymentFor(pod) if err != nil { log.Debugf("Cannot get deployment for pod %s: %s", pod.Name, err) deployment = "" } updated, added := reports[pod.Name] status := string(pod.Status.Phase) if pod.DeletionTimestamp != nil { status = "Terminating" } controllerComponent := pod.Labels[pkgK8s.ControllerComponentLabel] controllerNS := pod.Labels[pkgK8s.ControllerNSLabel] item := &pb.Pod{ Name: pod.Namespace + "/" + pod.Name, Deployment: deployment, // TODO: this is of the form `namespace/deployment`, it should just be `deployment` Status: status, PodIP: pod.Status.PodIP, Added: added, ControllerNamespace: controllerNS, ControlPlane: controllerComponent != "", } if added { since := time.Since(updated) item.SinceLastReport = &duration.Duration{ Seconds: int64(since / time.Second), Nanos: int32(since % time.Second), } } podList = append(podList, item) } rsp := pb.ListPodsResponse{Pods: podList} log.Debugf("ListPods response: %+v", rsp) return &rsp, nil } func (s *grpcServer) SelfCheck(ctx context.Context, in *healthcheckPb.SelfCheckRequest) (*healthcheckPb.SelfCheckResponse, error) { k8sClientCheck := &healthcheckPb.CheckResult{ SubsystemName: K8sClientSubsystemName, CheckDescription: K8sClientCheckDescription, Status: healthcheckPb.CheckStatus_OK, } _, err := s.podLister.List(labels.Everything()) if err != nil { k8sClientCheck.Status = healthcheckPb.CheckStatus_ERROR k8sClientCheck.FriendlyMessageToUser = fmt.Sprintf("Error talking to Kubernetes from control plane: %s", err.Error()) } promClientCheck := &healthcheckPb.CheckResult{ SubsystemName: PromClientSubsystemName, CheckDescription: PromClientCheckDescription, Status: healthcheckPb.CheckStatus_OK, } _, err = s.queryProm(ctx, podQuery) if err != nil { promClientCheck.Status = healthcheckPb.CheckStatus_ERROR promClientCheck.FriendlyMessageToUser = fmt.Sprintf("Error talking to Prometheus from control plane: %s", err.Error()) } response := &healthcheckPb.SelfCheckResponse{ Results: []*healthcheckPb.CheckResult{ k8sClientCheck, promClientCheck, }, } return response, nil } // Pass through to tap service func (s *grpcServer) Tap(req *pb.TapRequest, stream pb.Api_TapServer) error { tapStream := stream.(tapServer) tapClient, err := s.tapClient.Tap(tapStream.Context(), req) if err != nil { //TODO: why not return the error? log.Errorf("Unexpected error tapping [%v]: %v", req, err) return nil } for { select { case <-tapStream.Context().Done(): return nil default: event, err := tapClient.Recv() if err != nil { return err } tapStream.Send(event) } } } func (s *grpcServer) requestRate(ctx context.Context, req *pb.MetricRequest) ([]pb.MetricSeries, error) { result := s.queryCount(ctx, req, countQuery, "") if result.err != nil { return nil, result.err } return processRequestRate(result.res.Metrics, extractMetadata) } func (s *grpcServer) requestRateMesh(ctx context.Context, req *pb.MetricRequest) ([]pb.MetricSeries, error) { var err error resultsCh := make(chan queryResult) metrics := make([]*telemPb.Sample, 0) // kick off requests go func() { resultsCh <- s.queryCount(ctx, req, countHttpQuery, "") }() go func() { resultsCh <- s.queryCount(ctx, req, countGrpcQuery, "") }() // process results, loop twice, for countHttpQuery and countGrpcQuery for i := 0; i < 2; i++ { result := <-resultsCh if result.err != nil { log.Errorf("requestRateMesh -> queryCount failed with: %s", err) err = result.err } else { metrics = append(metrics, result.res.Metrics...) } } // if any errors occurred, return no results if err != nil { return nil, err } return processRequestRate(metrics, extractMetadataMesh) } func (s *grpcServer) successRate(ctx context.Context, req *pb.MetricRequest) ([]pb.MetricSeries, error) { result := s.queryCount(ctx, req, countQuery, "classification") if result.err != nil { return nil, result.err } return processSuccessRate(result.res.Metrics, extractMetadata, isSuccess) } func (s *grpcServer) successRateMesh(ctx context.Context, req *pb.MetricRequest) ([]pb.MetricSeries, error) { var err error resultsCh := make(chan queryResult) metrics := make([]*telemPb.Sample, 0) // kick off requests go func() { resultsCh <- s.queryCount(ctx, req, countHttpQuery, "code") }() go func() { resultsCh <- s.queryCount(ctx, req, countGrpcQuery, "grpc_code") }() // process results, loop twice, for countHttpQuery and countGrpcQuery for i := 0; i < 2; i++ { result := <-resultsCh if result.err != nil { log.Errorf("successRateMesh -> queryCount failed with: %s", err) err = result.err } else { metrics = append(metrics, result.res.Metrics...) } } // if any errors occurred, return no results if err != nil { return nil, err } return processSuccessRate(metrics, extractMetadataMesh, isSuccessMesh) } func (s *grpcServer) latency(ctx context.Context, req *pb.MetricRequest) ([]pb.MetricSeries, error) { timestamps := make(map[int64]struct{}) latencies := make(map[pb.MetricMetadata]map[int64][]*pb.HistogramValue) series := make([]pb.MetricSeries, 0) queryRsps, err := s.queryLatency(ctx, req) if err != nil { return nil, err } for label, queryRsp := range queryRsps { for _, metric := range queryRsp.Metrics { if len(metric.Values) == 0 { continue } metadata := extractMetadata(metric) if metadata == emptyMetadata { continue } if _, ok := latencies[metadata]; !ok { latencies[metadata] = make(map[int64][]*pb.HistogramValue) } for _, value := range metric.Values { if math.IsNaN(value.Value) { continue } timestamp := value.TimestampMs timestamps[timestamp] = struct{}{} if _, ok := latencies[metadata][timestamp]; !ok { latencies[metadata][timestamp] = make([]*pb.HistogramValue, 0) } hv := &pb.HistogramValue{ Label: label, Value: int64(value.Value), } latencies[metadata][timestamp] = append(latencies[metadata][timestamp], hv) } } } sortedTimestamps := sortTimestamps(timestamps) for metadata, latenciesByTime := range latencies { m := metadata datapoints := make([]*pb.MetricDatapoint, 0) for _, ts := range sortedTimestamps { if histogram, ok := latenciesByTime[ts]; ok { datapoint := &pb.MetricDatapoint{ Value: &pb.MetricValue{ Value: &pb.MetricValue_Histogram{ Histogram: &pb.Histogram{Values: histogram}, }, }, TimestampMs: ts, } datapoints = append(datapoints, datapoint) } } s := pb.MetricSeries{ Name: pb.MetricName_LATENCY, Metadata: &m, Datapoints: datapoints, } series = append(series, s) } return series, nil } func (s *grpcServer) queryCount(ctx context.Context, req *pb.MetricRequest, rawQuery, sumBy string) queryResult { query, err := formatQuery(rawQuery, req, sumBy, s.controllerNamespace) if err != nil { return queryResult{res: telemPb.QueryResponse{}, err: err} } queryReq, err := reqToQueryReq(req, query) if err != nil { return queryResult{res: telemPb.QueryResponse{}, err: err} } return s.query(ctx, queryReq) } func (s *grpcServer) queryLatency(ctx context.Context, req *pb.MetricRequest) (map[pb.HistogramLabel]telemPb.QueryResponse, error) { queryRsps := make(map[pb.HistogramLabel]telemPb.QueryResponse) query, err := formatQuery(latencyQuery, req, "le", s.controllerNamespace) if err != nil { return nil, err } // omit query string, we'll fill it in later queryReq, err := reqToQueryReq(req, "") if err != nil { return nil, err } results := make(chan queryResultWithLabel) // kick off requests for quantile, label := range quantileMap { go func(quantile string, label pb.HistogramLabel) { // copy queryReq, gets us StartMS, EndMS, and Step qr := queryReq // insert our quantile-specific query qr.Query = fmt.Sprintf(quantileQuery, quantile, query) results <- queryResultWithLabel{ queryResult: s.query(ctx, qr), label: label, } }(quantile, label) } // process results for _ = range quantileMap { result := <-results if result.err != nil { log.Errorf("queryLatency -> query failed with: %s", err) err = result.err } else { queryRsps[result.label] = result.res } } // if an error occurred, return the error, along with partial results return queryRsps, err } func (s *grpcServer) query(ctx context.Context, queryReq telemPb.QueryRequest) queryResult { queryRsp, err := s.telemetryClient.Query(ctx, &queryReq) if err != nil { return queryResult{res: telemPb.QueryResponse{}, err: err} } return queryResult{res: *queryRsp, err: nil} } func (s *grpcServer) shouldIgnore(pod *k8sV1.Pod) bool { for _, namespace := range s.ignoredNamespaces { if pod.Namespace == namespace { return true } } return false } func (s *grpcServer) getDeploymentFor(pod *k8sV1.Pod) (string, error) { namespace := pod.Namespace if len(pod.GetOwnerReferences()) == 0 { return "", fmt.Errorf("Pod %s has no owner", pod.Name) } parent := pod.GetOwnerReferences()[0] if parent.Kind != "ReplicaSet" { return "", fmt.Errorf("Pod %s parent is not a ReplicaSet", pod.Name) } rs, err := s.replicaSetLister.GetPodReplicaSets(pod) if err != nil { return "", err } if len(rs) == 0 || len(rs[0].GetOwnerReferences()) == 0 { return "", fmt.Errorf("Pod %s has no replicasets", pod.Name) } for _, r := range rs { for _, owner := range r.GetOwnerReferences() { switch owner.Kind { case "Deployment": return namespace + "/" + owner.Name, nil } } } return "", fmt.Errorf("Pod %s owner is not a Deployment", pod.Name) } func reqToQueryReq(req *pb.MetricRequest, query string) (telemPb.QueryRequest, error) { start, end, step, err := queryParams(req) if err != nil { return telemPb.QueryRequest{}, err } // EndMs always required to ensure deterministic timestamps queryReq := telemPb.QueryRequest{ Query: query, EndMs: end, } if !req.Summarize { queryReq.StartMs = start queryReq.Step = step } return queryReq, nil } func formatQuery(query string, req *pb.MetricRequest, sumBy string, controlPlaneNamespace string) (string, error) { sumLabels := make([]string, 0) filterLabels := make([]string, 0) if str, ok := aggregationMap[req.GroupBy]; ok { sumLabels = append(sumLabels, str) } else { return "", fmt.Errorf("unsupported AggregationType") } if sumBy != "" { sumLabels = append(sumLabels, sumBy) } if metadata := req.FilterBy; metadata != nil { if metadata.TargetDeploy != "" { filterLabels = append(filterLabels, fmt.Sprintf("%s=\"%s\"", targetDeployLabel, metadata.TargetDeploy)) sumLabels = append(sumLabels, targetDeployLabel) } if metadata.SourceDeploy != "" { filterLabels = append(filterLabels, fmt.Sprintf("%s=\"%s\"", sourceDeployLabel, metadata.SourceDeploy)) sumLabels = append(sumLabels, sourceDeployLabel) } if metadata.Component != "" { filterLabels = append(filterLabels, fmt.Sprintf("%s=\"%s\"", jobLabel, metadata.Component)) sumLabels = append(sumLabels, jobLabel) } } combinedComponentNames := strings.Join(controlPlaneComponents, "|") filterLabels = append(filterLabels, fmt.Sprintf("%s!~\"%s/(%s)\"", targetDeployLabel, controlPlaneNamespace, combinedComponentNames)) filterLabels = append(filterLabels, fmt.Sprintf("%s!~\"%s/(%s)\"", sourceDeployLabel, controlPlaneNamespace, combinedComponentNames)) return fmt.Sprintf( query, strings.Join(filterLabels, ","), defaultVectorRange, strings.Join(sumLabels, ","), ), nil } func queryParams(req *pb.MetricRequest) (int64, int64, string, error) { durationStr, err := util.GetWindowString(req.Window) if err != nil { return 0, 0, "", err } duration, err := time.ParseDuration(durationStr) if err != nil { return 0, 0, "", err } end := time.Now() start := end.Add(-1 * duration) step, ok := stepMap[req.Window] if !ok { return 0, 0, "", fmt.Errorf("unsupported Window") } ms := int64(time.Millisecond) return start.UnixNano() / ms, end.UnixNano() / ms, step, nil } func extractMetadata(metric *telemPb.Sample) pb.MetricMetadata { return pb.MetricMetadata{ TargetDeploy: metric.Labels[targetDeployLabel], SourceDeploy: metric.Labels[sourceDeployLabel], } } func extractMetadataMesh(metric *telemPb.Sample) pb.MetricMetadata { return pb.MetricMetadata{ Component: metric.Labels[jobLabel], } } func isSuccess(labels map[string]string) bool { return labels["classification"] == "success" } func isSuccessMesh(labels map[string]string) (success bool) { // check to see if the http status code is anything but a 5xx error if v, ok := labels["code"]; ok && !strings.HasPrefix(v, "5") { success = true } // or check to see if the grpc status code is OK if v, ok := labels["grpc_code"]; ok && v == "OK" { success = true } return } func processRequestRate( metrics []*telemPb.Sample, metadataFn func(*telemPb.Sample) pb.MetricMetadata, ) ([]pb.MetricSeries, error) { series := make([]pb.MetricSeries, 0) for _, metric := range metrics { if len(metric.Values) == 0 { continue } datapoints := make([]*pb.MetricDatapoint, 0) for _, value := range metric.Values { if value.Value == 0 { continue } datapoint := pb.MetricDatapoint{ Value: &pb.MetricValue{ Value: &pb.MetricValue_Gauge{Gauge: value.Value}, }, TimestampMs: value.TimestampMs, } datapoints = append(datapoints, &datapoint) } metadata := metadataFn(metric) if metadata == emptyMetadata { continue } s := pb.MetricSeries{ Name: pb.MetricName_REQUEST_RATE, Metadata: &metadata, Datapoints: datapoints, } series = append(series, s) } return series, nil } func processSuccessRate( metrics []*telemPb.Sample, metadataFn func(*telemPb.Sample) pb.MetricMetadata, successRateFn func(map[string]string) bool, ) ([]pb.MetricSeries, error) { timestamps := make(map[int64]struct{}) successRates := make(map[pb.MetricMetadata]map[int64]*successRate) series := make([]pb.MetricSeries, 0) for _, metric := range metrics { if len(metric.Values) == 0 { continue } isSuccess := successRateFn(metric.Labels) metadata := metadataFn(metric) if metadata == emptyMetadata { continue } if _, ok := successRates[metadata]; !ok { successRates[metadata] = make(map[int64]*successRate) } for _, value := range metric.Values { timestamp := value.TimestampMs timestamps[timestamp] = struct{}{} if _, ok := successRates[metadata][timestamp]; !ok { successRates[metadata][timestamp] = &successRate{} } if isSuccess { successRates[metadata][timestamp].success += value.Value } else { successRates[metadata][timestamp].failure += value.Value } } } sortedTimestamps := sortTimestamps(timestamps) for metadata, successRateByTime := range successRates { m := metadata datapoints := make([]*pb.MetricDatapoint, 0) for _, ts := range sortedTimestamps { if sr, ok := successRateByTime[ts]; ok { if requests := sr.success + sr.failure; requests > 0 { datapoint := &pb.MetricDatapoint{ Value: &pb.MetricValue{ Value: &pb.MetricValue_Gauge{Gauge: sr.success / requests}, }, TimestampMs: ts, } datapoints = append(datapoints, datapoint) } } } s := pb.MetricSeries{ Name: pb.MetricName_SUCCESS_RATE, Metadata: &m, Datapoints: datapoints, } series = append(series, s) } return series, nil } func (a timestamps) Len() int { return len(a) } func (a timestamps) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a timestamps) Less(i, j int) bool { return a[i] < a[j] } func sortTimestamps(timestampMap map[int64]struct{}) timestamps { sorted := make(timestamps, len(timestampMap)) for t, _ := range timestampMap { sorted = append(sorted, t) } sort.Sort(sorted) return sorted }