This commit is contained in:
Yanwei Guo 2019-12-06 07:16:39 -08:00 committed by Knative Prow Robot
parent c97a7bc3ee
commit f9189ebff3
27 changed files with 750 additions and 1340 deletions

12
Gopkg.lock generated
View File

@ -24,15 +24,14 @@
version = "v0.1.0" version = "v0.1.0"
[[projects]] [[projects]]
digest = "1:c1e8c027918e8a7cf79327f35ca2919cb997ecaa928116e3c2b470d06d0e9c12" digest = "1:c3fd5ddaad733530174bba5dd787d98a45d181851a95a0b7362be7bce7144f56"
name = "contrib.go.opencensus.io/exporter/stackdriver" name = "contrib.go.opencensus.io/exporter/stackdriver"
packages = [ packages = [
".", ".",
"monitoredresource", "monitoredresource",
] ]
pruneopts = "NUT" pruneopts = "NUT"
revision = "bf39ce456bd8c6e2e3e37ef9775ed8b10628feca" revision = "59d068f8d8ff5b653916aa30cdc4e13c7f15d56e"
version = "v0.12.5"
[[projects]] [[projects]]
digest = "1:fb85fce01f5a35c570e02749234c1cfe4bb8d31570cbe6b62d7beb27710c5954" digest = "1:fb85fce01f5a35c570e02749234c1cfe4bb8d31570cbe6b62d7beb27710c5954"
@ -147,11 +146,10 @@
version = "v1.3.2" version = "v1.3.2"
[[projects]] [[projects]]
digest = "1:010d46ea3c1e730897e53058d1013a963f3f987675dda87df64f891b945281db" digest = "1:bf40199583e5143d1472fc34d10d6f4b69d97572142acf343b3e43136da40823"
name = "github.com/google/go-cmp" name = "github.com/google/go-cmp"
packages = [ packages = [
"cmp", "cmp",
"cmp/cmpopts",
"cmp/internal/diff", "cmp/internal/diff",
"cmp/internal/flags", "cmp/internal/flags",
"cmp/internal/function", "cmp/internal/function",
@ -933,7 +931,7 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:34e554f20eb006126d25432770ceb39b1d21940e38fd3a1e443b04e1dc8ea52d" digest = "1:bba09479e5ea003f3d14749f6e0105b6b7b6c696dc1ef76707fd138610890d63"
name = "knative.dev/pkg" name = "knative.dev/pkg"
packages = [ packages = [
"apis", "apis",
@ -952,7 +950,7 @@
"metrics/metricskey", "metrics/metricskey",
] ]
pruneopts = "T" pruneopts = "T"
revision = "3444316bdeef6a0533db7cabcc1a5152b8b7311d" revision = "eec28e7bbdc9ffe4fbdc3584daa5a102d08f5cbb"
[[projects]] [[projects]]
branch = "master" branch = "master"

View File

@ -45,12 +45,6 @@ required = [
name = "google.golang.org/genproto" name = "google.golang.org/genproto"
revision = "e7d98fc518a78c9f8b5ee77be7b0b317475d89e1" revision = "e7d98fc518a78c9f8b5ee77be7b0b317475d89e1"
[[override]]
name = "contrib.go.opencensus.io/exporter/stackdriver"
# The build fails against 0.12.6 and newer because
# stackdriver.Options.GetMonitoredResource was removed.
version = "<=v0.12.5"
[[override]] [[override]]
name = "go.opencensus.io" name = "go.opencensus.io"
# Needed because this includes the appropriate version of metricsdata # Needed because this includes the appropriate version of metricsdata

View File

@ -21,8 +21,9 @@ directly to Stackdriver Metrics.
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"strings"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/any" "github.com/golang/protobuf/ptypes/any"
"github.com/golang/protobuf/ptypes/timestamp" "github.com/golang/protobuf/ptypes/timestamp"
@ -34,15 +35,11 @@ import (
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
"contrib.go.opencensus.io/exporter/stackdriver/monitoredresource"
"go.opencensus.io/metric/metricdata" "go.opencensus.io/metric/metricdata"
"go.opencensus.io/resource" "go.opencensus.io/resource"
) )
var (
errLableExtraction = errors.New("error extracting labels")
errUnspecifiedMetricKind = errors.New("metric kind is unpsecified")
)
const ( const (
exemplarAttachmentTypeString = "type.googleapis.com/google.protobuf.StringValue" exemplarAttachmentTypeString = "type.googleapis.com/google.protobuf.StringValue"
exemplarAttachmentTypeSpanCtx = "type.googleapis.com/google.monitoring.v3.SpanContext" exemplarAttachmentTypeSpanCtx = "type.googleapis.com/google.monitoring.v3.SpanContext"
@ -73,9 +70,11 @@ func (se *statsExporter) handleMetricsUpload(metrics []*metricdata.Metric) {
} }
func (se *statsExporter) uploadMetrics(metrics []*metricdata.Metric) error { func (se *statsExporter) uploadMetrics(metrics []*metricdata.Metric) error {
ctx, cancel := se.o.newContextWithTimeout() ctx, cancel := newContextWithTimeout(se.o.Context, se.o.Timeout)
defer cancel() defer cancel()
var errors []error
ctx, span := trace.StartSpan( ctx, span := trace.StartSpan(
ctx, ctx,
"contrib.go.opencensus.io/exporter/stackdriver.uploadMetrics", "contrib.go.opencensus.io/exporter/stackdriver.uploadMetrics",
@ -87,7 +86,7 @@ func (se *statsExporter) uploadMetrics(metrics []*metricdata.Metric) error {
// Now create the metric descriptor remotely. // Now create the metric descriptor remotely.
if err := se.createMetricDescriptorFromMetric(ctx, metric); err != nil { if err := se.createMetricDescriptorFromMetric(ctx, metric); err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
//TODO: [rghetia] record error metrics. errors = append(errors, err)
continue continue
} }
} }
@ -97,7 +96,7 @@ func (se *statsExporter) uploadMetrics(metrics []*metricdata.Metric) error {
tsl, err := se.metricToMpbTs(ctx, metric) tsl, err := se.metricToMpbTs(ctx, metric)
if err != nil { if err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
//TODO: [rghetia] record error metrics. errors = append(errors, err)
continue continue
} }
if tsl != nil { if tsl != nil {
@ -116,26 +115,35 @@ func (se *statsExporter) uploadMetrics(metrics []*metricdata.Metric) error {
for _, ctsreq := range ctsreql { for _, ctsreq := range ctsreql {
if err := createTimeSeries(ctx, se.c, ctsreq); err != nil { if err := createTimeSeries(ctx, se.c, ctsreq); err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
// TODO(@rghetia): record error metrics errors = append(errors, err)
// return err
} }
} }
} }
numErrors := len(errors)
if numErrors == 0 {
return nil return nil
} else if numErrors == 1 {
return errors[0]
}
errMsgs := make([]string, 0, numErrors)
for _, err := range errors {
errMsgs = append(errMsgs, err.Error())
}
return fmt.Errorf("[%s]", strings.Join(errMsgs, "; "))
} }
// metricToMpbTs converts a metric into a list of Stackdriver Monitoring v3 API TimeSeries // metricToMpbTs converts a metric into a list of Stackdriver Monitoring v3 API TimeSeries
// but it doesn't invoke any remote API. // but it doesn't invoke any remote API.
func (se *statsExporter) metricToMpbTs(ctx context.Context, metric *metricdata.Metric) ([]*monitoringpb.TimeSeries, error) { func (se *statsExporter) metricToMpbTs(ctx context.Context, metric *metricdata.Metric) ([]*monitoringpb.TimeSeries, error) {
if metric == nil { if metric == nil {
return nil, errNilMetric return nil, errNilMetricOrMetricDescriptor
} }
resource := se.metricRscToMpbRsc(metric.Resource) resource := se.metricRscToMpbRsc(metric.Resource)
metricName := metric.Descriptor.Name metricName := metric.Descriptor.Name
metricType, _ := se.metricTypeFromProto(metricName) metricType := se.metricTypeFromProto(metricName)
metricLabelKeys := metric.Descriptor.LabelKeys metricLabelKeys := metric.Descriptor.LabelKeys
metricKind, _ := metricDescriptorTypeToMetricKind(metric) metricKind, _ := metricDescriptorTypeToMetricKind(metric)
@ -159,12 +167,26 @@ func (se *statsExporter) metricToMpbTs(ctx context.Context, metric *metricdata.M
// TODO: (@rghetia) perhaps log this error from labels extraction, if non-nil. // TODO: (@rghetia) perhaps log this error from labels extraction, if non-nil.
continue continue
} }
var rsc *monitoredrespb.MonitoredResource
var mr monitoredresource.Interface
if se.o.ResourceByDescriptor != nil {
labels, mr = se.o.ResourceByDescriptor(&metric.Descriptor, labels)
// TODO(rghetia): optimize this. It is inefficient to convert this for all metrics.
rsc = convertMonitoredResourceToPB(mr)
if rsc.Type == "" {
rsc.Type = "global"
rsc.Labels = nil
}
} else {
rsc = resource
}
timeSeries = append(timeSeries, &monitoringpb.TimeSeries{ timeSeries = append(timeSeries, &monitoringpb.TimeSeries{
Metric: &googlemetricpb.Metric{ Metric: &googlemetricpb.Metric{
Type: metricType, Type: metricType,
Labels: labels, Labels: labels,
}, },
Resource: resource, Resource: rsc,
Points: sdPoints, Points: sdPoints,
}) })
} }
@ -173,17 +195,21 @@ func (se *statsExporter) metricToMpbTs(ctx context.Context, metric *metricdata.M
} }
func metricLabelsToTsLabels(defaults map[string]labelValue, labelKeys []metricdata.LabelKey, labelValues []metricdata.LabelValue) (map[string]string, error) { func metricLabelsToTsLabels(defaults map[string]labelValue, labelKeys []metricdata.LabelKey, labelValues []metricdata.LabelValue) (map[string]string, error) {
// Perform this sanity check now.
if len(labelKeys) != len(labelValues) {
return nil, fmt.Errorf("length mismatch: len(labelKeys)=%d len(labelValues)=%d", len(labelKeys), len(labelValues))
}
if len(defaults)+len(labelKeys) == 0 {
return nil, nil
}
labels := make(map[string]string) labels := make(map[string]string)
// Fill in the defaults firstly, irrespective of if the labelKeys and labelValues are mismatched. // Fill in the defaults firstly, irrespective of if the labelKeys and labelValues are mismatched.
for key, label := range defaults { for key, label := range defaults {
labels[sanitize(key)] = label.val labels[sanitize(key)] = label.val
} }
// Perform this sanity check now.
if len(labelKeys) != len(labelValues) {
return labels, fmt.Errorf("Length mismatch: len(labelKeys)=%d len(labelValues)=%d", len(labelKeys), len(labelValues))
}
for i, labelKey := range labelKeys { for i, labelKey := range labelKeys {
labelValue := labelValues[i] labelValue := labelValues[i]
labels[sanitize(labelKey.Key)] = labelValue.Value labels[sanitize(labelKey.Key)] = labelValue.Value
@ -195,6 +221,11 @@ func metricLabelsToTsLabels(defaults map[string]labelValue, labelKeys []metricda
// createMetricDescriptorFromMetric creates a metric descriptor from the OpenCensus metric // createMetricDescriptorFromMetric creates a metric descriptor from the OpenCensus metric
// and then creates it remotely using Stackdriver's API. // and then creates it remotely using Stackdriver's API.
func (se *statsExporter) createMetricDescriptorFromMetric(ctx context.Context, metric *metricdata.Metric) error { func (se *statsExporter) createMetricDescriptorFromMetric(ctx context.Context, metric *metricdata.Metric) error {
// Skip create metric descriptor if configured
if se.o.SkipCMD {
return nil
}
se.metricMu.Lock() se.metricMu.Lock()
defer se.metricMu.Unlock() defer se.metricMu.Unlock()
@ -203,6 +234,11 @@ func (se *statsExporter) createMetricDescriptorFromMetric(ctx context.Context, m
return nil return nil
} }
if builtinMetric(se.metricTypeFromProto(name)) {
se.metricDescriptors[name] = true
return nil
}
// Otherwise, we encountered a cache-miss and // Otherwise, we encountered a cache-miss and
// should create the metric descriptor remotely. // should create the metric descriptor remotely.
inMD, err := se.metricToMpbMetricDescriptor(metric) inMD, err := se.metricToMpbMetricDescriptor(metric)
@ -210,35 +246,21 @@ func (se *statsExporter) createMetricDescriptorFromMetric(ctx context.Context, m
return err return err
} }
var md *googlemetricpb.MetricDescriptor if err = se.createMetricDescriptor(ctx, inMD); err != nil {
if builtinMetric(inMD.Type) {
gmrdesc := &monitoringpb.GetMetricDescriptorRequest{
Name: inMD.Name,
}
md, err = getMetricDescriptor(ctx, se.c, gmrdesc)
} else {
cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{
Name: fmt.Sprintf("projects/%s", se.o.ProjectID),
MetricDescriptor: inMD,
}
md, err = createMetricDescriptor(ctx, se.c, cmrdesc)
}
if err == nil {
// Now record the metric as having been created.
se.metricDescriptors[name] = md
}
return err return err
} }
// Now record the metric as having been created.
se.metricDescriptors[name] = true
return nil
}
func (se *statsExporter) metricToMpbMetricDescriptor(metric *metricdata.Metric) (*googlemetricpb.MetricDescriptor, error) { func (se *statsExporter) metricToMpbMetricDescriptor(metric *metricdata.Metric) (*googlemetricpb.MetricDescriptor, error) {
if metric == nil { if metric == nil {
return nil, errNilMetric return nil, errNilMetricOrMetricDescriptor
} }
metricType, _ := se.metricTypeFromProto(metric.Descriptor.Name) metricType := se.metricTypeFromProto(metric.Descriptor.Name)
displayName := se.displayName(metric.Descriptor.Name) displayName := se.displayName(metric.Descriptor.Name)
metricKind, valueType := metricDescriptorTypeToMetricKind(metric) metricKind, valueType := metricDescriptorTypeToMetricKind(metric)
@ -466,11 +488,9 @@ func metricExemplarToPbExemplar(exemplar *metricdata.Exemplar, projectID string)
func attachmentsToPbAttachments(attachments metricdata.Attachments, projectID string) []*any.Any { func attachmentsToPbAttachments(attachments metricdata.Attachments, projectID string) []*any.Any {
var pbAttachments []*any.Any var pbAttachments []*any.Any
for _, v := range attachments { for _, v := range attachments {
switch v.(type) { if spanCtx, succ := v.(trace.SpanContext); succ {
case trace.SpanContext:
spanCtx, _ := v.(trace.SpanContext)
pbAttachments = append(pbAttachments, toPbSpanCtxAttachment(spanCtx, projectID)) pbAttachments = append(pbAttachments, toPbSpanCtxAttachment(spanCtx, projectID))
default: } else {
// Treat everything else as plain string for now. // Treat everything else as plain string for now.
// TODO(songy23): add support for dropped label attachments. // TODO(songy23): add support for dropped label attachments.
pbAttachments = append(pbAttachments, toPbStringAttachment(v)) pbAttachments = append(pbAttachments, toPbStringAttachment(v))

View File

@ -0,0 +1,201 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stackdriver
import (
"context"
"fmt"
"strings"
"sync"
"time"
monitoring "cloud.google.com/go/monitoring/apiv3"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
)
const (
minNumWorkers = 1
minReqsChanSize = 5
)
type metricsBatcher struct {
projectName string
allTss []*monitoringpb.TimeSeries
allErrs []error
// Counts all dropped TimeSeries by this metricsBatcher.
droppedTimeSeries int
workers []*worker
// reqsChan, respsChan and wg are shared between metricsBatcher and worker goroutines.
reqsChan chan *monitoringpb.CreateTimeSeriesRequest
respsChan chan *response
wg *sync.WaitGroup
}
func newMetricsBatcher(ctx context.Context, projectID string, numWorkers int, mc *monitoring.MetricClient, timeout time.Duration) *metricsBatcher {
if numWorkers < minNumWorkers {
numWorkers = minNumWorkers
}
workers := make([]*worker, 0, numWorkers)
reqsChanSize := numWorkers
if reqsChanSize < minReqsChanSize {
reqsChanSize = minReqsChanSize
}
reqsChan := make(chan *monitoringpb.CreateTimeSeriesRequest, reqsChanSize)
respsChan := make(chan *response, numWorkers)
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
w := newWorker(ctx, mc, reqsChan, respsChan, &wg, timeout)
workers = append(workers, w)
go w.start()
}
return &metricsBatcher{
projectName: fmt.Sprintf("projects/%s", projectID),
allTss: make([]*monitoringpb.TimeSeries, 0, maxTimeSeriesPerUpload),
droppedTimeSeries: 0,
workers: workers,
wg: &wg,
reqsChan: reqsChan,
respsChan: respsChan,
}
}
func (mb *metricsBatcher) recordDroppedTimeseries(numTimeSeries int, errs ...error) {
mb.droppedTimeSeries += numTimeSeries
for _, err := range errs {
if err != nil {
mb.allErrs = append(mb.allErrs, err)
}
}
}
func (mb *metricsBatcher) addTimeSeries(ts *monitoringpb.TimeSeries) {
mb.allTss = append(mb.allTss, ts)
if len(mb.allTss) == maxTimeSeriesPerUpload {
mb.sendReqToChan()
mb.allTss = make([]*monitoringpb.TimeSeries, 0, maxTimeSeriesPerUpload)
}
}
func (mb *metricsBatcher) close(ctx context.Context) error {
// Send any remaining time series, must be <200
if len(mb.allTss) > 0 {
mb.sendReqToChan()
}
close(mb.reqsChan)
mb.wg.Wait()
for i := 0; i < len(mb.workers); i++ {
resp := <-mb.respsChan
mb.recordDroppedTimeseries(resp.droppedTimeSeries, resp.errs...)
}
close(mb.respsChan)
numErrors := len(mb.allErrs)
if numErrors == 0 {
return nil
}
if numErrors == 1 {
return mb.allErrs[0]
}
errMsgs := make([]string, 0, numErrors)
for _, err := range mb.allErrs {
errMsgs = append(errMsgs, err.Error())
}
return fmt.Errorf("[%s]", strings.Join(errMsgs, "; "))
}
// sendReqToChan grabs all the timeseies in this metricsBatcher, puts them
// to a CreateTimeSeriesRequest and sends the request to reqsChan.
func (mb *metricsBatcher) sendReqToChan() {
req := &monitoringpb.CreateTimeSeriesRequest{
Name: mb.projectName,
TimeSeries: mb.allTss,
}
mb.reqsChan <- req
}
// sendReq sends create time series requests to Stackdriver,
// and returns the count of dropped time series and error.
func sendReq(ctx context.Context, c *monitoring.MetricClient, req *monitoringpb.CreateTimeSeriesRequest) (int, error) {
if c != nil { // c==nil only happens in unit tests where we don't make real calls to Stackdriver server
err := createTimeSeries(ctx, c, req)
if err != nil {
return len(req.TimeSeries), err
}
}
return 0, nil
}
type worker struct {
ctx context.Context
timeout time.Duration
mc *monitoring.MetricClient
resp *response
respsChan chan *response
reqsChan chan *monitoringpb.CreateTimeSeriesRequest
wg *sync.WaitGroup
}
func newWorker(
ctx context.Context,
mc *monitoring.MetricClient,
reqsChan chan *monitoringpb.CreateTimeSeriesRequest,
respsChan chan *response,
wg *sync.WaitGroup,
timeout time.Duration) *worker {
return &worker{
ctx: ctx,
mc: mc,
resp: &response{},
reqsChan: reqsChan,
respsChan: respsChan,
wg: wg,
}
}
func (w *worker) start() {
for req := range w.reqsChan {
w.sendReqWithTimeout(req)
}
w.respsChan <- w.resp
w.wg.Done()
}
func (w *worker) sendReqWithTimeout(req *monitoringpb.CreateTimeSeriesRequest) {
ctx, cancel := newContextWithTimeout(w.ctx, w.timeout)
defer cancel()
w.recordDroppedTimeseries(sendReq(ctx, w.mc, req))
}
func (w *worker) recordDroppedTimeseries(numTimeSeries int, err error) {
w.resp.droppedTimeSeries += numTimeSeries
if err != nil {
w.resp.errs = append(w.resp.errs, err)
}
}
type response struct {
droppedTimeSeries int
errs []error
}

View File

@ -24,81 +24,74 @@ import (
"errors" "errors"
"fmt" "fmt"
"path" "path"
"sort"
"strings" "strings"
"github.com/golang/protobuf/ptypes/timestamp" "go.opencensus.io/resource"
"go.opencensus.io/stats"
"go.opencensus.io/trace"
"cloud.google.com/go/monitoring/apiv3"
distributionpb "google.golang.org/genproto/googleapis/api/distribution"
labelpb "google.golang.org/genproto/googleapis/api/label"
googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"go.opencensus.io/resource" timestamppb "github.com/golang/protobuf/ptypes/timestamp"
distributionpb "google.golang.org/genproto/googleapis/api/distribution"
labelpb "google.golang.org/genproto/googleapis/api/label"
googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
) )
var errNilMetric = errors.New("expecting a non-nil metric") var errNilMetricOrMetricDescriptor = errors.New("non-nil metric or metric descriptor")
var errNilMetricDescriptor = errors.New("expecting a non-nil metric descriptor")
var percentileLabelKey = &metricspb.LabelKey{ var percentileLabelKey = &metricspb.LabelKey{
Key: "percentile", Key: "percentile",
Description: "the value at a given percentile of a distribution", Description: "the value at a given percentile of a distribution",
} }
var globalResource = &resource.Resource{Type: "global"}
var domains = []string{"googleapis.com", "kubernetes.io", "istio.io", "knative.dev"}
type metricProtoPayload struct { // PushMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring synchronously,
node *commonpb.Node // without de-duping or adding proto metrics to the bundler.
resource *resourcepb.Resource func (se *statsExporter) PushMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) (int, error) {
metric *metricspb.Metric
additionalLabels map[string]labelValue
}
func (se *statsExporter) addPayload(node *commonpb.Node, rsc *resourcepb.Resource, labels map[string]labelValue, metrics ...*metricspb.Metric) {
for _, metric := range metrics {
payload := &metricProtoPayload{
metric: metric,
resource: rsc,
node: node,
additionalLabels: labels,
}
se.protoMetricsBundler.Add(payload, 1)
}
}
// ExportMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring.
func (se *statsExporter) ExportMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) error {
if len(metrics) == 0 { if len(metrics) == 0 {
return errNilMetric return 0, errNilMetricOrMetricDescriptor
} }
additionalLabels := se.defaultLabels // Caches the resources seen so far
if additionalLabels == nil { seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource)
// additionalLabels must be stateless because each node is different
additionalLabels = getDefaultLabelsFromNode(node)
}
mb := newMetricsBatcher(ctx, se.o.ProjectID, se.o.NumberOfWorkers, se.c, se.o.Timeout)
for _, metric := range metrics { for _, metric := range metrics {
if len(metric.GetTimeseries()) == 0 {
// No TimeSeries to export, skip this metric.
continue
}
mappedRsc := se.getResource(rsc, metric, seenResources)
if metric.GetMetricDescriptor().GetType() == metricspb.MetricDescriptor_SUMMARY { if metric.GetMetricDescriptor().GetType() == metricspb.MetricDescriptor_SUMMARY {
se.addPayload(node, rsc, additionalLabels, se.convertSummaryMetrics(metric)...) summaryMtcs := se.convertSummaryMetrics(metric)
for _, summaryMtc := range summaryMtcs {
if err := se.createMetricDescriptorFromMetricProto(ctx, summaryMtc); err != nil {
mb.recordDroppedTimeseries(len(summaryMtc.GetTimeseries()), err)
continue
}
se.protoMetricToTimeSeries(ctx, mappedRsc, summaryMtc, mb)
}
} else { } else {
se.addPayload(node, rsc, additionalLabels, metric) if err := se.createMetricDescriptorFromMetricProto(ctx, metric); err != nil {
mb.recordDroppedTimeseries(len(metric.GetTimeseries()), err)
continue
}
se.protoMetricToTimeSeries(ctx, mappedRsc, metric, mb)
} }
} }
return nil return mb.droppedTimeSeries, mb.close(ctx)
} }
func (se *statsExporter) convertSummaryMetrics(summary *metricspb.Metric) []*metricspb.Metric { func (se *statsExporter) convertSummaryMetrics(summary *metricspb.Metric) []*metricspb.Metric {
var metrics []*metricspb.Metric var metrics []*metricspb.Metric
for _, ts := range summary.Timeseries {
var percentileTss []*metricspb.TimeSeries var percentileTss []*metricspb.TimeSeries
var countTss []*metricspb.TimeSeries var countTss []*metricspb.TimeSeries
var sumTss []*metricspb.TimeSeries var sumTss []*metricspb.TimeSeries
for _, ts := range summary.Timeseries {
lvs := ts.GetLabelValues() lvs := ts.GetLabelValues()
startTime := ts.StartTimestamp startTime := ts.StartTimestamp
@ -141,6 +134,7 @@ func (se *statsExporter) convertSummaryMetrics(summary *metricspb.Metric) []*met
for _, percentileValue := range snapshot.GetPercentileValues() { for _, percentileValue := range snapshot.GetPercentileValues() {
lvsWithPercentile := lvs[0:] lvsWithPercentile := lvs[0:]
lvsWithPercentile = append(lvsWithPercentile, &metricspb.LabelValue{ lvsWithPercentile = append(lvsWithPercentile, &metricspb.LabelValue{
HasValue: true,
Value: fmt.Sprintf("%f", percentileValue.Percentile), Value: fmt.Sprintf("%f", percentileValue.Percentile),
}) })
percentileTs := &metricspb.TimeSeries{ percentileTs := &metricspb.TimeSeries{
@ -207,142 +201,22 @@ func (se *statsExporter) convertSummaryMetrics(summary *metricspb.Metric) []*met
return metrics return metrics
} }
func (se *statsExporter) handleMetricsProtoUpload(payloads []*metricProtoPayload) { func (se *statsExporter) getResource(rsc *resourcepb.Resource, metric *metricspb.Metric, seenRscs map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) *monitoredrespb.MonitoredResource {
err := se.uploadMetricsProto(payloads) var resource = rsc
if err != nil { if metric.Resource != nil {
se.o.handleError(err) resource = metric.Resource
} }
mappedRsc, ok := seenRscs[resource]
if !ok {
mappedRsc = se.o.MapResource(resourcepbToResource(resource))
seenRscs[resource] = mappedRsc
} }
return mappedRsc
func (se *statsExporter) uploadMetricsProto(payloads []*metricProtoPayload) error {
ctx, cancel := se.o.newContextWithTimeout()
defer cancel()
ctx, span := trace.StartSpan(
ctx,
"contrib.go.opencensus.io/exporter/stackdriver.uploadMetrics",
trace.WithSampler(trace.NeverSample()),
)
defer span.End()
for _, payload := range payloads {
// Now create the metric descriptor remotely.
if err := se.createMetricDescriptor(ctx, payload.metric, payload.additionalLabels); err != nil {
span.SetStatus(trace.Status{Code: 2, Message: err.Error()})
return err
}
}
var allTimeSeries []*monitoringpb.TimeSeries
for _, payload := range payloads {
tsl, err := se.protoMetricToTimeSeries(ctx, payload.node, payload.resource, payload.metric, payload.additionalLabels)
if err != nil {
span.SetStatus(trace.Status{Code: 2, Message: err.Error()})
return err
}
allTimeSeries = append(allTimeSeries, tsl...)
}
// Now batch timeseries up and then export.
for start, end := 0, 0; start < len(allTimeSeries); start = end {
end = start + maxTimeSeriesPerUpload
if end > len(allTimeSeries) {
end = len(allTimeSeries)
}
batch := allTimeSeries[start:end]
ctsreql := se.combineTimeSeriesToCreateTimeSeriesRequest(batch)
for _, ctsreq := range ctsreql {
if err := createTimeSeries(ctx, se.c, ctsreq); err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
// TODO(@odeke-em): Don't fail fast here, perhaps batch errors?
// return err
}
}
}
return nil
}
// metricSignature creates a unique signature consisting of a
// metric's type and its lexicographically sorted label values
// See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/120
func metricSignature(metric *googlemetricpb.Metric) string {
labels := metric.GetLabels()
labelValues := make([]string, 0, len(labels))
for _, labelValue := range labels {
labelValues = append(labelValues, labelValue)
}
sort.Strings(labelValues)
return fmt.Sprintf("%s:%s", metric.GetType(), strings.Join(labelValues, ","))
}
func (se *statsExporter) combineTimeSeriesToCreateTimeSeriesRequest(ts []*monitoringpb.TimeSeries) (ctsreql []*monitoringpb.CreateTimeSeriesRequest) {
if len(ts) == 0 {
return nil
}
// Since there are scenarios in which Metrics with the same Type
// can be bunched in the same TimeSeries, we have to ensure that
// we create a unique CreateTimeSeriesRequest with entirely unique Metrics
// per TimeSeries, lest we'll encounter:
//
// err: rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written:
// Field timeSeries[2] had an invalid value: Duplicate TimeSeries encountered.
// Only one point can be written per TimeSeries per request.: timeSeries[2]
//
// This scenario happens when we are using the OpenCensus Agent in which multiple metrics
// are streamed by various client applications.
// See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/73
uniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts))
nonUniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts))
seenMetrics := make(map[string]struct{})
for _, tti := range ts {
key := metricSignature(tti.Metric)
if _, alreadySeen := seenMetrics[key]; !alreadySeen {
uniqueTimeSeries = append(uniqueTimeSeries, tti)
seenMetrics[key] = struct{}{}
} else {
nonUniqueTimeSeries = append(nonUniqueTimeSeries, tti)
}
}
// UniqueTimeSeries can be bunched up together
// While for each nonUniqueTimeSeries, we have
// to make a unique CreateTimeSeriesRequest.
ctsreql = append(ctsreql, &monitoringpb.CreateTimeSeriesRequest{
Name: monitoring.MetricProjectPath(se.o.ProjectID),
TimeSeries: uniqueTimeSeries,
})
// Now recursively also combine the non-unique TimeSeries
// that were singly added to nonUniqueTimeSeries.
// The reason is that we need optimal combinations
// for optimal combinations because:
// * "a/b/c"
// * "a/b/c"
// * "x/y/z"
// * "a/b/c"
// * "x/y/z"
// * "p/y/z"
// * "d/y/z"
//
// should produce:
// CreateTimeSeries(uniqueTimeSeries) :: ["a/b/c", "x/y/z", "p/y/z", "d/y/z"]
// CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c"]
// CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c", "x/y/z"]
nonUniqueRequests := se.combineTimeSeriesToCreateTimeSeriesRequest(nonUniqueTimeSeries)
ctsreql = append(ctsreql, nonUniqueRequests...)
return ctsreql
} }
func resourcepbToResource(rsc *resourcepb.Resource) *resource.Resource { func resourcepbToResource(rsc *resourcepb.Resource) *resource.Resource {
if rsc == nil { if rsc == nil {
return &resource.Resource{ return globalResource
Type: "global",
}
} }
res := &resource.Resource{ res := &resource.Resource{
Type: rsc.Type, Type: rsc.Type,
@ -357,92 +231,87 @@ func resourcepbToResource(rsc *resourcepb.Resource) *resource.Resource {
// protoMetricToTimeSeries converts a metric into a Stackdriver Monitoring v3 API CreateTimeSeriesRequest // protoMetricToTimeSeries converts a metric into a Stackdriver Monitoring v3 API CreateTimeSeriesRequest
// but it doesn't invoke any remote API. // but it doesn't invoke any remote API.
func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metric *metricspb.Metric, additionalLabels map[string]labelValue) ([]*monitoringpb.TimeSeries, error) { func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, mappedRsc *monitoredrespb.MonitoredResource, metric *metricspb.Metric, mb *metricsBatcher) {
if metric == nil { if metric == nil || metric.MetricDescriptor == nil {
return nil, errNilMetric mb.recordDroppedTimeseries(len(metric.GetTimeseries()), errNilMetricOrMetricDescriptor)
} }
var resource = rsc metricType := se.metricTypeFromProto(metric.GetMetricDescriptor().GetName())
if metric.Resource != nil {
resource = metric.Resource
}
mappedRes := se.o.MapResource(resourcepbToResource(resource))
metricName, _, _, err := metricProseFromProto(metric)
if err != nil {
return nil, err
}
metricType, _ := se.metricTypeFromProto(metricName)
metricLabelKeys := metric.GetMetricDescriptor().GetLabelKeys() metricLabelKeys := metric.GetMetricDescriptor().GetLabelKeys()
metricKind, _ := protoMetricDescriptorTypeToMetricKind(metric) metricKind, valueType := protoMetricDescriptorTypeToMetricKind(metric)
labelKeys := make([]string, 0, len(metricLabelKeys))
for _, key := range metricLabelKeys {
labelKeys = append(labelKeys, sanitize(key.GetKey()))
}
timeSeries := make([]*monitoringpb.TimeSeries, 0, len(metric.Timeseries))
for _, protoTimeSeries := range metric.Timeseries { for _, protoTimeSeries := range metric.Timeseries {
if len(protoTimeSeries.Points) == 0 {
// No points to send just move forward.
continue
}
sdPoints, err := se.protoTimeSeriesToMonitoringPoints(protoTimeSeries, metricKind) sdPoints, err := se.protoTimeSeriesToMonitoringPoints(protoTimeSeries, metricKind)
if err != nil { if err != nil {
return nil, err mb.recordDroppedTimeseries(1, err)
continue
} }
// Each TimeSeries has labelValues which MUST be correlated // Each TimeSeries has labelValues which MUST be correlated
// with that from the MetricDescriptor // with that from the MetricDescriptor
labels, err := labelsPerTimeSeries(additionalLabels, metricLabelKeys, protoTimeSeries.GetLabelValues()) labels, err := labelsPerTimeSeries(se.defaultLabels, labelKeys, protoTimeSeries.GetLabelValues())
if err != nil { if err != nil {
// TODO: (@odeke-em) perhaps log this error from labels extraction, if non-nil. mb.recordDroppedTimeseries(1, err)
continue continue
} }
timeSeries = append(timeSeries, &monitoringpb.TimeSeries{ mb.addTimeSeries(&monitoringpb.TimeSeries{
Metric: &googlemetricpb.Metric{ Metric: &googlemetricpb.Metric{
Type: metricType, Type: metricType,
Labels: labels, Labels: labels,
}, },
Resource: mappedRes, MetricKind: metricKind,
ValueType: valueType,
Resource: mappedRsc,
Points: sdPoints, Points: sdPoints,
}) })
} }
return timeSeries, nil
} }
func labelsPerTimeSeries(defaults map[string]labelValue, labelKeys []*metricspb.LabelKey, labelValues []*metricspb.LabelValue) (map[string]string, error) { func labelsPerTimeSeries(defaults map[string]labelValue, labelKeys []string, labelValues []*metricspb.LabelValue) (map[string]string, error) {
if len(labelKeys) != len(labelValues) {
return nil, fmt.Errorf("length mismatch: len(labelKeys)=%d len(labelValues)=%d", len(labelKeys), len(labelValues))
}
if len(defaults)+len(labelKeys) == 0 {
// No labels for this metric
return nil, nil
}
labels := make(map[string]string) labels := make(map[string]string)
// Fill in the defaults firstly, irrespective of if the labelKeys and labelValues are mismatched. // Fill in the defaults firstly, irrespective of if the labelKeys and labelValues are mismatched.
for key, label := range defaults { for key, label := range defaults {
labels[sanitize(key)] = label.val labels[key] = label.val
}
// Perform this sanity check now.
if len(labelKeys) != len(labelValues) {
return labels, fmt.Errorf("Length mismatch: len(labelKeys)=%d len(labelValues)=%d", len(labelKeys), len(labelValues))
} }
for i, labelKey := range labelKeys { for i, labelKey := range labelKeys {
labelValue := labelValues[i] labelValue := labelValues[i]
labels[sanitize(labelKey.GetKey())] = labelValue.GetValue() if !labelValue.GetHasValue() {
continue
}
labels[labelKey] = labelValue.GetValue()
} }
return labels, nil return labels, nil
} }
func (se *statsExporter) protoMetricDescriptorToCreateMetricDescriptorRequest(ctx context.Context, metric *metricspb.Metric, additionalLabels map[string]labelValue) (*monitoringpb.CreateMetricDescriptorRequest, error) { func (se *statsExporter) createMetricDescriptorFromMetricProto(ctx context.Context, metric *metricspb.Metric) error {
// Otherwise, we encountered a cache-miss and // Skip create metric descriptor if configured
// should create the metric descriptor remotely. if se.o.SkipCMD {
inMD, err := se.protoToMonitoringMetricDescriptor(metric, additionalLabels) return nil
if err != nil {
return nil, err
} }
cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{ ctx, cancel := newContextWithTimeout(ctx, se.o.Timeout)
Name: fmt.Sprintf("projects/%s", se.o.ProjectID), defer cancel()
MetricDescriptor: inMD,
}
return cmrdesc, nil
}
// createMetricDescriptor creates a metric descriptor from the OpenCensus proto metric
// and then creates it remotely using Stackdriver's API.
func (se *statsExporter) createMetricDescriptor(ctx context.Context, metric *metricspb.Metric, additionalLabels map[string]labelValue) error {
se.protoMu.Lock() se.protoMu.Lock()
defer se.protoMu.Unlock() defer se.protoMu.Unlock()
@ -451,46 +320,35 @@ func (se *statsExporter) createMetricDescriptor(ctx context.Context, metric *met
return nil return nil
} }
if builtinMetric(se.metricTypeFromProto(name)) {
se.protoMetricDescriptors[name] = true
return nil
}
// Otherwise, we encountered a cache-miss and // Otherwise, we encountered a cache-miss and
// should create the metric descriptor remotely. // should create the metric descriptor remotely.
inMD, err := se.protoToMonitoringMetricDescriptor(metric, additionalLabels) inMD, err := se.protoToMonitoringMetricDescriptor(metric, se.defaultLabels)
if err != nil { if err != nil {
return err return err
} }
var md *googlemetricpb.MetricDescriptor if err = se.createMetricDescriptor(ctx, inMD); err != nil {
if builtinMetric(inMD.Type) {
gmrdesc := &monitoringpb.GetMetricDescriptorRequest{
Name: inMD.Name,
}
md, err = getMetricDescriptor(ctx, se.c, gmrdesc)
} else {
cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{
Name: fmt.Sprintf("projects/%s", se.o.ProjectID),
MetricDescriptor: inMD,
}
md, err = createMetricDescriptor(ctx, se.c, cmrdesc)
}
if err == nil {
// Now record the metric as having been created.
se.protoMetricDescriptors[name] = md
}
return err return err
} }
func (se *statsExporter) protoTimeSeriesToMonitoringPoints(ts *metricspb.TimeSeries, metricKind googlemetricpb.MetricDescriptor_MetricKind) (sptl []*monitoringpb.Point, err error) { se.protoMetricDescriptors[name] = true
for _, pt := range ts.Points { return nil
}
func (se *statsExporter) protoTimeSeriesToMonitoringPoints(ts *metricspb.TimeSeries, metricKind googlemetricpb.MetricDescriptor_MetricKind) ([]*monitoringpb.Point, error) {
sptl := make([]*monitoringpb.Point, 0, len(ts.Points))
for _, pt := range ts.Points {
// If we have a last value aggregation point i.e. MetricDescriptor_GAUGE // If we have a last value aggregation point i.e. MetricDescriptor_GAUGE
// StartTime should be nil. // StartTime should be nil.
startTime := ts.StartTimestamp startTime := ts.StartTimestamp
if metricKind == googlemetricpb.MetricDescriptor_GAUGE { if metricKind == googlemetricpb.MetricDescriptor_GAUGE {
startTime = nil startTime = nil
} }
spt, err := fromProtoPoint(startTime, pt) spt, err := fromProtoPoint(startTime, pt)
if err != nil { if err != nil {
return nil, err return nil, err
@ -501,15 +359,15 @@ func (se *statsExporter) protoTimeSeriesToMonitoringPoints(ts *metricspb.TimeSer
} }
func (se *statsExporter) protoToMonitoringMetricDescriptor(metric *metricspb.Metric, additionalLabels map[string]labelValue) (*googlemetricpb.MetricDescriptor, error) { func (se *statsExporter) protoToMonitoringMetricDescriptor(metric *metricspb.Metric, additionalLabels map[string]labelValue) (*googlemetricpb.MetricDescriptor, error) {
if metric == nil { if metric == nil || metric.MetricDescriptor == nil {
return nil, errNilMetric return nil, errNilMetricOrMetricDescriptor
} }
metricName, description, unit, err := metricProseFromProto(metric) md := metric.GetMetricDescriptor()
if err != nil { metricName := md.GetName()
return nil, err unit := md.GetUnit()
} description := md.GetDescription()
metricType, _ := se.metricTypeFromProto(metricName) metricType := se.metricTypeFromProto(metricName)
displayName := se.displayName(metricName) displayName := se.displayName(metricName)
metricKind, valueType := protoMetricDescriptorTypeToMetricKind(metric) metricKind, valueType := protoMetricDescriptorTypeToMetricKind(metric)
@ -550,32 +408,32 @@ func labelDescriptorsFromProto(defaults map[string]labelValue, protoLabelKeys []
return labelDescriptors return labelDescriptors
} }
func metricProseFromProto(metric *metricspb.Metric) (name, description, unit string, err error) { func (se *statsExporter) metricTypeFromProto(name string) string {
md := metric.GetMetricDescriptor() prefix := se.o.MetricPrefix
if md == nil { if se.o.GetMetricPrefix != nil {
return "", "", "", errNilMetricDescriptor prefix = se.o.GetMetricPrefix(name)
}
if prefix != "" {
name = path.Join(prefix, name)
}
if !hasDomain(name) {
// Still needed because the name may or may not have a "/" at the beginning.
name = path.Join(defaultDomain, name)
}
return name
} }
name = md.GetName() // hasDomain checks if the metric name already has a domain in it.
unit = md.GetUnit() func hasDomain(name string) bool {
description = md.GetDescription() for _, domain := range domains {
if strings.Contains(name, domain) {
if md.Type == metricspb.MetricDescriptor_CUMULATIVE_INT64 { return true
// If the aggregation type is count, which counts the number of recorded measurements, the unit must be "1", }
// because this view does not apply to the recorded values. }
unit = stats.UnitDimensionless return false
} }
return name, description, unit, nil func fromProtoPoint(startTime *timestamppb.Timestamp, pt *metricspb.Point) (*monitoringpb.Point, error) {
}
func (se *statsExporter) metricTypeFromProto(name string) (string, bool) {
// TODO: (@odeke-em) support non-"custom.googleapis.com" metrics names.
name = path.Join("custom.googleapis.com", "opencensus", name)
return name, true
}
func fromProtoPoint(startTime *timestamp.Timestamp, pt *metricspb.Point) (*monitoringpb.Point, error) {
if pt == nil { if pt == nil {
return nil, nil return nil, nil
} }
@ -585,14 +443,13 @@ func fromProtoPoint(startTime *timestamp.Timestamp, pt *metricspb.Point) (*monit
return nil, err return nil, err
} }
mpt := &monitoringpb.Point{ return &monitoringpb.Point{
Value: mptv, Value: mptv,
Interval: &monitoringpb.TimeInterval{ Interval: &monitoringpb.TimeInterval{
StartTime: startTime, StartTime: startTime,
EndTime: pt.Timestamp, EndTime: pt.Timestamp,
}, },
} }, nil
return mpt, nil
} }
func protoToMetricPoint(value interface{}) (*monitoringpb.TypedValue, error) { func protoToMetricPoint(value interface{}) (*monitoringpb.TypedValue, error) {
@ -600,8 +457,6 @@ func protoToMetricPoint(value interface{}) (*monitoringpb.TypedValue, error) {
return nil, nil return nil, nil
} }
var err error
var tval *monitoringpb.TypedValue
switch v := value.(type) { switch v := value.(type) {
default: default:
// All the other types are not yet handled. // All the other types are not yet handled.
@ -617,21 +472,21 @@ func protoToMetricPoint(value interface{}) (*monitoringpb.TypedValue, error) {
// TODO: Add conversion from SummaryValue when // TODO: Add conversion from SummaryValue when
// https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/66 // https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/66
// has been figured out. // has been figured out.
err = fmt.Errorf("protoToMetricPoint: unknown Data type: %T", value) return nil, fmt.Errorf("protoToMetricPoint: unknown Data type: %T", value)
case *metricspb.Point_Int64Value: case *metricspb.Point_Int64Value:
tval = &monitoringpb.TypedValue{ return &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{ Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: v.Int64Value, Int64Value: v.Int64Value,
}, },
} }, nil
case *metricspb.Point_DoubleValue: case *metricspb.Point_DoubleValue:
tval = &monitoringpb.TypedValue{ return &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_DoubleValue{ Value: &monitoringpb.TypedValue_DoubleValue{
DoubleValue: v.DoubleValue, DoubleValue: v.DoubleValue,
}, },
} }, nil
case *metricspb.Point_DistributionValue: case *metricspb.Point_DistributionValue:
dv := v.DistributionValue dv := v.DistributionValue
@ -669,10 +524,8 @@ func protoToMetricPoint(value interface{}) (*monitoringpb.TypedValue, error) {
mv.DistributionValue.BucketCounts = addZeroBucketCountOnCondition(insertZeroBound, bucketCounts(dv.Buckets)...) mv.DistributionValue.BucketCounts = addZeroBucketCountOnCondition(insertZeroBound, bucketCounts(dv.Buckets)...)
} }
tval = &monitoringpb.TypedValue{Value: mv} return &monitoringpb.TypedValue{Value: mv}, nil
} }
return tval, err
} }
func bucketCounts(buckets []*metricspb.DistributionValue_Bucket) []int64 { func bucketCounts(buckets []*metricspb.DistributionValue_Bucket) []int64 {
@ -714,13 +567,3 @@ func protoMetricDescriptorTypeToMetricKind(m *metricspb.Metric) (googlemetricpb.
return googlemetricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, googlemetricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED return googlemetricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, googlemetricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED
} }
} }
func getDefaultLabelsFromNode(node *commonpb.Node) map[string]labelValue {
taskValue := fmt.Sprintf("%s-%d@%s", strings.ToLower(node.LibraryInfo.GetLanguage().String()), node.Identifier.Pid, node.Identifier.HostName)
return map[string]labelValue{
opencensusTaskKey: {
val: taskValue,
desc: opencensusTaskDescription,
},
}
}

View File

@ -1,62 +0,0 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stackdriver
/*
Common test utilities for comparing Stackdriver metrics.
*/
import (
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
"time"
)
func timestampToTime(ts *timestamp.Timestamp) time.Time {
if ts == nil {
return time.Unix(0, 0).UTC()
}
return time.Unix(ts.Seconds, int64(ts.Nanos)).UTC()
}
func cmpResource(got, want *monitoredrespb.MonitoredResource) string {
return cmp.Diff(got, want, cmpopts.IgnoreUnexported(monitoredrespb.MonitoredResource{}))
}
func cmpTSReqs(got, want []*monitoringpb.CreateTimeSeriesRequest) string {
return cmp.Diff(got, want, cmpopts.IgnoreUnexported(monitoringpb.CreateTimeSeriesRequest{}))
}
func cmpMD(got, want *googlemetricpb.MetricDescriptor) string {
return cmp.Diff(got, want, cmpopts.IgnoreUnexported(googlemetricpb.MetricDescriptor{}))
}
func cmpMDReq(got, want *monitoringpb.CreateMetricDescriptorRequest) string {
return cmp.Diff(got, want, cmpopts.IgnoreUnexported(monitoringpb.CreateMetricDescriptorRequest{}))
}
func cmpMDReqs(got, want []*monitoringpb.CreateMetricDescriptorRequest) string {
return cmp.Diff(got, want, cmpopts.IgnoreUnexported(monitoringpb.CreateMetricDescriptorRequest{}))
}
func cmpPoint(got, want *monitoringpb.Point) string {
return cmp.Diff(got, want, cmpopts.IgnoreUnexported(monitoringpb.Point{}))
}

View File

@ -37,8 +37,12 @@ type awsIdentityDocument struct {
// This is only done once. // This is only done once.
func retrieveAWSIdentityDocument() *awsIdentityDocument { func retrieveAWSIdentityDocument() *awsIdentityDocument {
awsIdentityDoc := awsIdentityDocument{} awsIdentityDoc := awsIdentityDocument{}
c := ec2metadata.New(session.New()) sesion, err := session.NewSession()
if c.Available() == false { if err != nil {
return nil
}
c := ec2metadata.New(sesion)
if !c.Available() {
return nil return nil
} }
ec2InstanceIdentifyDocument, err := c.GetInstanceIdentityDocument() ec2InstanceIdentifyDocument, err := c.GetInstanceIdentityDocument()

View File

@ -22,7 +22,7 @@ import (
"strings" "strings"
"cloud.google.com/go/compute/metadata" "cloud.google.com/go/compute/metadata"
"cloud.google.com/go/container/apiv1" container "cloud.google.com/go/container/apiv1"
containerpb "google.golang.org/genproto/googleapis/container/v1" containerpb "google.golang.org/genproto/googleapis/container/v1"
) )

View File

@ -22,13 +22,6 @@ import (
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
) )
type resourceMap struct {
// Mapping from the input resource type to the monitored resource type in Stackdriver.
srcType, dstType string
// Mapping from Stackdriver monitored resource label to an OpenCensus resource label.
labels map[string]string
}
// Resource labels that are generally internal to the exporter. // Resource labels that are generally internal to the exporter.
// Consider exposing these labels and a type identifier in the future to allow // Consider exposing these labels and a type identifier in the future to allow
// for customization. // for customization.
@ -41,7 +34,7 @@ const (
) )
// Mappings for the well-known OpenCensus resources to applicable Stackdriver resources. // Mappings for the well-known OpenCensus resources to applicable Stackdriver resources.
var k8sResourceMap = map[string]string{ var k8sContainerMap = map[string]string{
"project_id": stackdriverProjectID, "project_id": stackdriverProjectID,
"location": resourcekeys.CloudKeyZone, "location": resourcekeys.CloudKeyZone,
"cluster_name": resourcekeys.K8SKeyClusterName, "cluster_name": resourcekeys.K8SKeyClusterName,
@ -50,6 +43,21 @@ var k8sResourceMap = map[string]string{
"container_name": resourcekeys.ContainerKeyName, "container_name": resourcekeys.ContainerKeyName,
} }
var k8sPodMap = map[string]string{
"project_id": stackdriverProjectID,
"location": resourcekeys.CloudKeyZone,
"cluster_name": resourcekeys.K8SKeyClusterName,
"namespace_name": resourcekeys.K8SKeyNamespaceName,
"pod_name": resourcekeys.K8SKeyPodName,
}
var k8sNodeMap = map[string]string{
"project_id": stackdriverProjectID,
"location": resourcekeys.CloudKeyZone,
"cluster_name": resourcekeys.K8SKeyClusterName,
"node_name": resourcekeys.HostKeyName,
}
var gcpResourceMap = map[string]string{ var gcpResourceMap = map[string]string{
"project_id": stackdriverProjectID, "project_id": stackdriverProjectID,
"instance_id": resourcekeys.HostKeyID, "instance_id": resourcekeys.HostKeyID,
@ -72,14 +80,20 @@ var genericResourceMap = map[string]string{
"task_id": stackdriverGenericTaskID, "task_id": stackdriverGenericTaskID,
} }
func transformResource(match, input map[string]string) map[string]string { // returns transformed label map and true if all labels in match are found
// in input except optional project_id. It returns false if at least one label
// other than project_id is missing.
func transformResource(match, input map[string]string) (map[string]string, bool) {
output := make(map[string]string, len(input)) output := make(map[string]string, len(input))
for dst, src := range match { for dst, src := range match {
if v, ok := input[src]; ok { v, ok := input[src]
if ok {
output[dst] = v output[dst] = v
} else if dst != "project_id" {
return nil, true
} }
} }
return output return output, false
} }
func defaultMapResource(res *resource.Resource) *monitoredrespb.MonitoredResource { func defaultMapResource(res *resource.Resource) *monitoredrespb.MonitoredResource {
@ -94,10 +108,13 @@ func defaultMapResource(res *resource.Resource) *monitoredrespb.MonitoredResourc
switch { switch {
case res.Type == resourcekeys.ContainerType: case res.Type == resourcekeys.ContainerType:
result.Type = "k8s_container" result.Type = "k8s_container"
match = k8sResourceMap match = k8sContainerMap
case res.Type == resourcekeys.K8SType: case res.Type == resourcekeys.K8SType:
result.Type = "k8s_pod" result.Type = "k8s_pod"
match = k8sResourceMap match = k8sPodMap
case res.Type == resourcekeys.HostType && res.Labels[resourcekeys.K8SKeyClusterName] != "":
result.Type = "k8s_node"
match = k8sNodeMap
case res.Labels[resourcekeys.CloudKeyProvider] == resourcekeys.CloudProviderGCP: case res.Labels[resourcekeys.CloudKeyProvider] == resourcekeys.CloudProviderGCP:
result.Type = "gce_instance" result.Type = "gce_instance"
match = gcpResourceMap match = gcpResourceMap
@ -106,7 +123,17 @@ func defaultMapResource(res *resource.Resource) *monitoredrespb.MonitoredResourc
match = awsResourceMap match = awsResourceMap
} }
result.Labels = transformResource(match, res.Labels) var missing bool
result.Labels, missing = transformResource(match, res.Labels)
if missing {
result.Type = "global"
// if project id specified then transform it.
if v, ok := res.Labels[stackdriverProjectID]; ok {
result.Labels = make(map[string]string, 1)
result.Labels["project_id"] = v
}
return result
}
if result.Type == "aws_ec2_instance" { if result.Type == "aws_ec2_instance" {
if v, ok := result.Labels["region"]; ok { if v, ok := result.Labels["region"]; ok {
result.Labels["region"] = fmt.Sprintf("aws:%s", v) result.Labels["region"] = fmt.Sprintf("aws:%s", v)

View File

@ -54,6 +54,7 @@ import (
"log" "log"
"os" "os"
"path" "path"
"strings"
"time" "time"
metadataapi "cloud.google.com/go/compute/metadata" metadataapi "cloud.google.com/go/compute/metadata"
@ -61,7 +62,6 @@ import (
"contrib.go.opencensus.io/exporter/stackdriver/monitoredresource" "contrib.go.opencensus.io/exporter/stackdriver/monitoredresource"
"go.opencensus.io/resource" "go.opencensus.io/resource"
"go.opencensus.io/stats/view" "go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opencensus.io/trace" "go.opencensus.io/trace"
"golang.org/x/oauth2/google" "golang.org/x/oauth2/google"
"google.golang.org/api/option" "google.golang.org/api/option"
@ -186,11 +186,9 @@ type Options struct {
// conversions from auto-detected resources to well-known Stackdriver monitored resources. // conversions from auto-detected resources to well-known Stackdriver monitored resources.
MapResource func(*resource.Resource) *monitoredrespb.MonitoredResource MapResource func(*resource.Resource) *monitoredrespb.MonitoredResource
// MetricPrefix overrides the prefix of a Stackdriver metric display names. // MetricPrefix overrides the prefix of a Stackdriver metric names.
// Optional. If unset defaults to "OpenCensus/". // Optional. If unset defaults to "custom.googleapis.com/opencensus/".
// Deprecated: Provide GetMetricDisplayName to change the display name of // If GetMetricPrefix is non-nil, this option is ignored.
// the metric.
// If GetMetricDisplayName is non-nil, this option is ignored.
MetricPrefix string MetricPrefix string
// GetMetricDisplayName allows customizing the display name for the metric // GetMetricDisplayName allows customizing the display name for the metric
@ -203,8 +201,16 @@ type Options struct {
// "custom.googleapis.com/opencensus/" + view.Name // "custom.googleapis.com/opencensus/" + view.Name
// //
// See: https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.metricDescriptors#MetricDescriptor // See: https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.metricDescriptors#MetricDescriptor
// Depreacted. Use GetMetricPrefix instead.
GetMetricType func(view *view.View) string GetMetricType func(view *view.View) string
// GetMetricPrefix allows customizing the metric prefix for the given metric name.
// If it is not set, MetricPrefix is used. If MetricPrefix is not set, it defaults to:
// "custom.googleapis.com/opencensus/"
//
// See: https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.metricDescriptors#MetricDescriptor
GetMetricPrefix func(name string) string
// DefaultTraceAttributes will be appended to every span that is exported to // DefaultTraceAttributes will be appended to every span that is exported to
// Stackdriver Trace. // Stackdriver Trace.
DefaultTraceAttributes map[string]interface{} DefaultTraceAttributes map[string]interface{}
@ -238,31 +244,47 @@ type Options struct {
// If unset, context.Background() will be used. // If unset, context.Background() will be used.
Context context.Context Context context.Context
// SkipCMD enforces to skip all the CreateMetricDescriptor calls.
// These calls are important in order to configure the unit of the metrics,
// but in some cases all the exported metrics are builtin (unit is configured)
// or the unit is not important.
SkipCMD bool
// Timeout for all API calls. If not set, defaults to 5 seconds. // Timeout for all API calls. If not set, defaults to 5 seconds.
Timeout time.Duration Timeout time.Duration
// GetMonitoredResource may be provided to supply the details of the
// monitored resource dynamically based on the tags associated with each
// data point. Most users will not need to set this, but should instead
// set the MonitoredResource field.
//
// GetMonitoredResource may add or remove tags by returning a new set of
// tags. It is safe for the function to mutate its argument and return it.
//
// See the documentation on the MonitoredResource field for guidance on the
// interaction between monitored resources and labels.
//
// The MonitoredResource field is ignored if this field is set to a non-nil
// value.
GetMonitoredResource func(*view.View, []tag.Tag) ([]tag.Tag, monitoredresource.Interface)
// ReportingInterval sets the interval between reporting metrics. // ReportingInterval sets the interval between reporting metrics.
// If it is set to zero then default value is used. // If it is set to zero then default value is used.
ReportingInterval time.Duration ReportingInterval time.Duration
// NumberOfWorkers sets the number of go rountines that send requests
// to Stackdriver Monitoring. This is only used for Proto metrics export
// for now. The minimum number of workers is 1.
NumberOfWorkers int
// ResourceByDescriptor may be provided to supply monitored resource dynamically
// based on the metric Descriptor. Most users will not need to set this,
// but should instead set ResourceDetector.
//
// The MonitoredResource and ResourceDetector fields are ignored if this
// field is set to a non-nil value.
//
// The ResourceByDescriptor is called to derive monitored resources from
// metric.Descriptor and the label map associated with the time-series.
// If any label is used for the derived resource then it will be removed
// from the label map. The remaining labels in the map are returned to
// be used with the time-series.
//
// If the func set to this field does not return valid resource even for one
// time-series then it will result into an error for the entire CreateTimeSeries request
// which may contain more than one time-series.
ResourceByDescriptor func(*metricdata.Descriptor, map[string]string) (map[string]string, monitoredresource.Interface)
} }
const defaultTimeout = 5 * time.Second const defaultTimeout = 5 * time.Second
var defaultDomain = path.Join("custom.googleapis.com", "opencensus")
// Exporter is a stats and trace exporter that uploads data to Stackdriver. // Exporter is a stats and trace exporter that uploads data to Stackdriver.
// //
// You can create a single Exporter and register it as both a trace exporter // You can create a single Exporter and register it as both a trace exporter
@ -292,10 +314,6 @@ func NewExporter(o Options) (*Exporter, error) {
} }
if o.Location == "" { if o.Location == "" {
if metadataapi.OnGCE() { if metadataapi.OnGCE() {
ctx := o.Context
if ctx == nil {
ctx = context.Background()
}
zone, err := metadataapi.Zone() zone, err := metadataapi.Zone()
if err != nil { if err != nil {
// This error should be logged with a warning level. // This error should be logged with a warning level.
@ -336,6 +354,9 @@ func NewExporter(o Options) (*Exporter, error) {
o.Resource = o.MapResource(res) o.Resource = o.MapResource(res)
} }
if o.MetricPrefix != "" && !strings.HasSuffix(o.MetricPrefix, "/") {
o.MetricPrefix = o.MetricPrefix + "/"
}
se, err := newStatsExporter(o) se, err := newStatsExporter(o)
if err != nil { if err != nil {
@ -353,13 +374,21 @@ func NewExporter(o Options) (*Exporter, error) {
// ExportView exports to the Stackdriver Monitoring if view data // ExportView exports to the Stackdriver Monitoring if view data
// has one or more rows. // has one or more rows.
// Deprecated: use ExportMetrics and StartMetricsExporter instead.
func (e *Exporter) ExportView(vd *view.Data) { func (e *Exporter) ExportView(vd *view.Data) {
e.statsExporter.ExportView(vd) e.statsExporter.ExportView(vd)
} }
// ExportMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring. // ExportMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring synchronously,
// without de-duping or adding proto metrics to the bundler.
func (e *Exporter) ExportMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) error { func (e *Exporter) ExportMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) error {
return e.statsExporter.ExportMetricsProto(ctx, node, rsc, metrics) _, err := e.statsExporter.PushMetricsProto(ctx, node, rsc, metrics)
return err
}
// PushMetricsProto simliar with ExportMetricsProto but returns the number of dropped timeseries.
func (e *Exporter) PushMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) (int, error) {
return e.statsExporter.PushMetricsProto(ctx, node, rsc, metrics)
} }
// ExportMetrics exports OpenCensus Metrics to Stackdriver Monitoring // ExportMetrics exports OpenCensus Metrics to Stackdriver Monitoring
@ -427,12 +456,10 @@ func (o Options) handleError(err error) {
log.Printf("Failed to export to Stackdriver: %v", err) log.Printf("Failed to export to Stackdriver: %v", err)
} }
func (o Options) newContextWithTimeout() (context.Context, func()) { func newContextWithTimeout(ctx context.Context, timeout time.Duration) (context.Context, func()) {
ctx := o.Context
if ctx == nil { if ctx == nil {
ctx = context.Background() ctx = context.Background()
} }
timeout := o.Timeout
if timeout <= 0 { if timeout <= 0 {
timeout = defaultTimeout timeout = defaultTimeout
} }

View File

@ -20,18 +20,19 @@ import (
"fmt" "fmt"
"os" "os"
"path" "path"
"sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"go.opencensus.io" opencensus "go.opencensus.io"
"go.opencensus.io/stats" "go.opencensus.io/stats"
"go.opencensus.io/stats/view" "go.opencensus.io/stats/view"
"go.opencensus.io/tag" "go.opencensus.io/tag"
"go.opencensus.io/trace" "go.opencensus.io/trace"
"cloud.google.com/go/monitoring/apiv3" monitoring "cloud.google.com/go/monitoring/apiv3"
"github.com/golang/protobuf/ptypes/timestamp" "github.com/golang/protobuf/ptypes/timestamp"
"go.opencensus.io/metric/metricdata" "go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricexport" "go.opencensus.io/metric/metricexport"
@ -40,6 +41,7 @@ import (
distributionpb "google.golang.org/genproto/googleapis/api/distribution" distributionpb "google.golang.org/genproto/googleapis/api/distribution"
labelpb "google.golang.org/genproto/googleapis/api/label" labelpb "google.golang.org/genproto/googleapis/api/label"
"google.golang.org/genproto/googleapis/api/metric" "google.golang.org/genproto/googleapis/api/metric"
googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
metricpb "google.golang.org/genproto/googleapis/api/metric" metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
@ -60,17 +62,13 @@ type statsExporter struct {
o Options o Options
viewDataBundler *bundler.Bundler viewDataBundler *bundler.Bundler
protoMetricsBundler *bundler.Bundler
metricsBundler *bundler.Bundler metricsBundler *bundler.Bundler
createdViewsMu sync.Mutex
createdViews map[string]*metricpb.MetricDescriptor // Views already created remotely
protoMu sync.Mutex protoMu sync.Mutex
protoMetricDescriptors map[string]*metricpb.MetricDescriptor // Saves the metric descriptors that were already created remotely protoMetricDescriptors map[string]bool // Metric descriptors that were already created remotely
metricMu sync.Mutex metricMu sync.Mutex
metricDescriptors map[string]*metricpb.MetricDescriptor // Saves the metric descriptors that were already created remotely metricDescriptors map[string]bool // Metric descriptors that were already created remotely
c *monitoring.MetricClient c *monitoring.MetricClient
defaultLabels map[string]labelValue defaultLabels map[string]labelValue
@ -103,39 +101,39 @@ func newStatsExporter(o Options) (*statsExporter, error) {
e := &statsExporter{ e := &statsExporter{
c: client, c: client,
o: o, o: o,
createdViews: make(map[string]*metricpb.MetricDescriptor), protoMetricDescriptors: make(map[string]bool),
protoMetricDescriptors: make(map[string]*metricpb.MetricDescriptor), metricDescriptors: make(map[string]bool),
metricDescriptors: make(map[string]*metricpb.MetricDescriptor),
} }
var defaultLablesNotSanitized map[string]labelValue
if o.DefaultMonitoringLabels != nil { if o.DefaultMonitoringLabels != nil {
e.defaultLabels = o.DefaultMonitoringLabels.m defaultLablesNotSanitized = o.DefaultMonitoringLabels.m
} else { } else {
e.defaultLabels = map[string]labelValue{ defaultLablesNotSanitized = map[string]labelValue{
opencensusTaskKey: {val: getTaskValue(), desc: opencensusTaskDescription}, opencensusTaskKey: {val: getTaskValue(), desc: opencensusTaskDescription},
} }
} }
e.defaultLabels = make(map[string]labelValue)
// Fill in the defaults firstly, irrespective of if the labelKeys and labelValues are mismatched.
for key, label := range defaultLablesNotSanitized {
e.defaultLabels[sanitize(key)] = label
}
e.viewDataBundler = bundler.NewBundler((*view.Data)(nil), func(bundle interface{}) { e.viewDataBundler = bundler.NewBundler((*view.Data)(nil), func(bundle interface{}) {
vds := bundle.([]*view.Data) vds := bundle.([]*view.Data)
e.handleUpload(vds...) e.handleUpload(vds...)
}) })
e.protoMetricsBundler = bundler.NewBundler((*metricProtoPayload)(nil), func(bundle interface{}) {
payloads := bundle.([]*metricProtoPayload)
e.handleMetricsProtoUpload(payloads)
})
e.metricsBundler = bundler.NewBundler((*metricdata.Metric)(nil), func(bundle interface{}) { e.metricsBundler = bundler.NewBundler((*metricdata.Metric)(nil), func(bundle interface{}) {
metrics := bundle.([]*metricdata.Metric) metrics := bundle.([]*metricdata.Metric)
e.handleMetricsUpload(metrics) e.handleMetricsUpload(metrics)
}) })
if delayThreshold := e.o.BundleDelayThreshold; delayThreshold > 0 { if delayThreshold := e.o.BundleDelayThreshold; delayThreshold > 0 {
e.viewDataBundler.DelayThreshold = delayThreshold e.viewDataBundler.DelayThreshold = delayThreshold
e.protoMetricsBundler.DelayThreshold = delayThreshold
e.metricsBundler.DelayThreshold = delayThreshold e.metricsBundler.DelayThreshold = delayThreshold
} }
if countThreshold := e.o.BundleCountThreshold; countThreshold > 0 { if countThreshold := e.o.BundleCountThreshold; countThreshold > 0 {
e.viewDataBundler.BundleCountThreshold = countThreshold e.viewDataBundler.BundleCountThreshold = countThreshold
e.protoMetricsBundler.BundleCountThreshold = countThreshold
e.metricsBundler.BundleCountThreshold = countThreshold e.metricsBundler.BundleCountThreshold = countThreshold
} }
return e, nil return e, nil
@ -143,7 +141,7 @@ func newStatsExporter(o Options) (*statsExporter, error) {
func (e *statsExporter) startMetricsReader() error { func (e *statsExporter) startMetricsReader() error {
e.initReaderOnce.Do(func() { e.initReaderOnce.Do(func() {
e.ir, _ = metricexport.NewIntervalReader(&metricexport.Reader{}, e) e.ir, _ = metricexport.NewIntervalReader(metricexport.NewReader(), e)
}) })
e.ir.ReportingInterval = e.o.ReportingInterval e.ir.ReportingInterval = e.o.ReportingInterval
return e.ir.Start() return e.ir.Start()
@ -156,10 +154,6 @@ func (e *statsExporter) stopMetricsReader() {
} }
func (e *statsExporter) getMonitoredResource(v *view.View, tags []tag.Tag) ([]tag.Tag, *monitoredrespb.MonitoredResource) { func (e *statsExporter) getMonitoredResource(v *view.View, tags []tag.Tag) ([]tag.Tag, *monitoredrespb.MonitoredResource) {
if get := e.o.GetMonitoredResource; get != nil {
newTags, mr := get(v, tags)
return newTags, convertMonitoredResourceToPB(mr)
}
resource := e.o.Resource resource := e.o.Resource
if resource == nil { if resource == nil {
resource = &monitoredrespb.MonitoredResource{ resource = &monitoredrespb.MonitoredResource{
@ -210,12 +204,11 @@ func (e *statsExporter) handleUpload(vds ...*view.Data) {
// want to lose data that hasn't yet been exported. // want to lose data that hasn't yet been exported.
func (e *statsExporter) Flush() { func (e *statsExporter) Flush() {
e.viewDataBundler.Flush() e.viewDataBundler.Flush()
e.protoMetricsBundler.Flush()
e.metricsBundler.Flush() e.metricsBundler.Flush()
} }
func (e *statsExporter) uploadStats(vds []*view.Data) error { func (e *statsExporter) uploadStats(vds []*view.Data) error {
ctx, cancel := e.o.newContextWithTimeout() ctx, cancel := newContextWithTimeout(e.o.Context, e.o.Timeout)
defer cancel() defer cancel()
ctx, span := trace.StartSpan( ctx, span := trace.StartSpan(
ctx, ctx,
@ -225,7 +218,7 @@ func (e *statsExporter) uploadStats(vds []*view.Data) error {
defer span.End() defer span.End()
for _, vd := range vds { for _, vd := range vds {
if err := e.createMeasure(ctx, vd.View); err != nil { if err := e.createMetricDescriptorFromView(ctx, vd.View); err != nil {
span.SetStatus(trace.Status{Code: 2, Message: err.Error()}) span.SetStatus(trace.Status{Code: 2, Message: err.Error()})
return err return err
} }
@ -334,34 +327,27 @@ func (e *statsExporter) viewToMetricDescriptor(ctx context.Context, v *view.View
return res, nil return res, nil
} }
func (e *statsExporter) viewToCreateMetricDescriptorRequest(ctx context.Context, v *view.View) (*monitoringpb.CreateMetricDescriptorRequest, error) { // createMetricDescriptorFromView creates a MetricDescriptor for the given view data in Stackdriver Monitoring.
inMD, err := e.viewToMetricDescriptor(ctx, v)
if err != nil {
return nil, err
}
cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{
Name: fmt.Sprintf("projects/%s", e.o.ProjectID),
MetricDescriptor: inMD,
}
return cmrdesc, nil
}
// createMeasure creates a MetricDescriptor for the given view data in Stackdriver Monitoring.
// An error will be returned if there is already a metric descriptor created with the same name // An error will be returned if there is already a metric descriptor created with the same name
// but it has a different aggregation or keys. // but it has a different aggregation or keys.
func (e *statsExporter) createMeasure(ctx context.Context, v *view.View) error { func (e *statsExporter) createMetricDescriptorFromView(ctx context.Context, v *view.View) error {
e.createdViewsMu.Lock() // Skip create metric descriptor if configured
defer e.createdViewsMu.Unlock() if e.o.SkipCMD {
return nil
}
e.metricMu.Lock()
defer e.metricMu.Unlock()
viewName := v.Name viewName := v.Name
if md, ok := e.createdViews[viewName]; ok { if _, created := e.metricDescriptors[viewName]; created {
// [TODO:rghetia] Temporary fix for https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/76#issuecomment-459459091
if builtinMetric(md.Type) {
return nil return nil
} }
return e.equalMeasureAggTagKeys(md, v.Measure, v.Aggregation, v.TagKeys)
if builtinMetric(e.metricType(v)) {
e.metricDescriptors[viewName] = true
return nil
} }
inMD, err := e.viewToMetricDescriptor(ctx, v) inMD, err := e.viewToMetricDescriptor(ctx, v)
@ -369,34 +355,92 @@ func (e *statsExporter) createMeasure(ctx context.Context, v *view.View) error {
return err return err
} }
var dmd *metric.MetricDescriptor if err = e.createMetricDescriptor(ctx, inMD); err != nil {
if builtinMetric(inMD.Type) {
gmrdesc := &monitoringpb.GetMetricDescriptorRequest{
Name: inMD.Name,
}
dmd, err = getMetricDescriptor(ctx, e.c, gmrdesc)
} else {
cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{
Name: fmt.Sprintf("projects/%s", e.o.ProjectID),
MetricDescriptor: inMD,
}
dmd, err = createMetricDescriptor(ctx, e.c, cmrdesc)
}
if err != nil {
return err return err
} }
// Now cache the metric descriptor // Now cache the metric descriptor
e.createdViews[viewName] = dmd e.metricDescriptors[viewName] = true
return err return nil
} }
func (e *statsExporter) displayName(suffix string) string { func (e *statsExporter) displayName(suffix string) string {
displayNamePrefix := defaultDisplayNamePrefix return path.Join(defaultDisplayNamePrefix, suffix)
if e.o.MetricPrefix != "" {
displayNamePrefix = e.o.MetricPrefix
} }
return path.Join(displayNamePrefix, suffix)
func (e *statsExporter) combineTimeSeriesToCreateTimeSeriesRequest(ts []*monitoringpb.TimeSeries) (ctsreql []*monitoringpb.CreateTimeSeriesRequest) {
if len(ts) == 0 {
return nil
}
// Since there are scenarios in which Metrics with the same Type
// can be bunched in the same TimeSeries, we have to ensure that
// we create a unique CreateTimeSeriesRequest with entirely unique Metrics
// per TimeSeries, lest we'll encounter:
//
// err: rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written:
// Field timeSeries[2] had an invalid value: Duplicate TimeSeries encountered.
// Only one point can be written per TimeSeries per request.: timeSeries[2]
//
// This scenario happens when we are using the OpenCensus Agent in which multiple metrics
// are streamed by various client applications.
// See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/73
uniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts))
nonUniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts))
seenMetrics := make(map[string]struct{})
for _, tti := range ts {
key := metricSignature(tti.Metric)
if _, alreadySeen := seenMetrics[key]; !alreadySeen {
uniqueTimeSeries = append(uniqueTimeSeries, tti)
seenMetrics[key] = struct{}{}
} else {
nonUniqueTimeSeries = append(nonUniqueTimeSeries, tti)
}
}
// UniqueTimeSeries can be bunched up together
// While for each nonUniqueTimeSeries, we have
// to make a unique CreateTimeSeriesRequest.
ctsreql = append(ctsreql, &monitoringpb.CreateTimeSeriesRequest{
Name: fmt.Sprintf("projects/%s", e.o.ProjectID),
TimeSeries: uniqueTimeSeries,
})
// Now recursively also combine the non-unique TimeSeries
// that were singly added to nonUniqueTimeSeries.
// The reason is that we need optimal combinations
// for optimal combinations because:
// * "a/b/c"
// * "a/b/c"
// * "x/y/z"
// * "a/b/c"
// * "x/y/z"
// * "p/y/z"
// * "d/y/z"
//
// should produce:
// CreateTimeSeries(uniqueTimeSeries) :: ["a/b/c", "x/y/z", "p/y/z", "d/y/z"]
// CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c"]
// CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c", "x/y/z"]
nonUniqueRequests := e.combineTimeSeriesToCreateTimeSeriesRequest(nonUniqueTimeSeries)
ctsreql = append(ctsreql, nonUniqueRequests...)
return ctsreql
}
// metricSignature creates a unique signature consisting of a
// metric's type and its lexicographically sorted label values
// See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/120
func metricSignature(metric *googlemetricpb.Metric) string {
labels := metric.GetLabels()
labelValues := make([]string, 0, len(labels))
for _, labelValue := range labels {
labelValues = append(labelValues, labelValue)
}
sort.Strings(labelValues)
return fmt.Sprintf("%s:%s", metric.GetType(), strings.Join(labelValues, ","))
} }
func newPoint(v *view.View, row *view.Row, start, end time.Time) *monitoringpb.Point { func newPoint(v *view.View, row *view.Row, start, end time.Time) *monitoringpb.Point {
@ -548,61 +592,21 @@ func newLabelDescriptors(defaults map[string]labelValue, keys []tag.Key) []*labe
return labelDescriptors return labelDescriptors
} }
func (e *statsExporter) equalMeasureAggTagKeys(md *metricpb.MetricDescriptor, m stats.Measure, agg *view.Aggregation, keys []tag.Key) error { func (e *statsExporter) createMetricDescriptor(ctx context.Context, md *metric.MetricDescriptor) error {
var aggTypeMatch bool ctx, cancel := newContextWithTimeout(ctx, e.o.Timeout)
switch md.ValueType { defer cancel()
case metricpb.MetricDescriptor_INT64: cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{
if _, ok := m.(*stats.Int64Measure); !(ok || agg.Type == view.AggTypeCount) { Name: fmt.Sprintf("projects/%s", e.o.ProjectID),
return fmt.Errorf("stackdriver metric descriptor was not created as int64") MetricDescriptor: md,
} }
aggTypeMatch = agg.Type == view.AggTypeCount || agg.Type == view.AggTypeSum || agg.Type == view.AggTypeLastValue _, err := createMetricDescriptor(ctx, e.c, cmrdesc)
case metricpb.MetricDescriptor_DOUBLE: return err
if _, ok := m.(*stats.Float64Measure); !ok {
return fmt.Errorf("stackdriver metric descriptor was not created as double")
}
aggTypeMatch = agg.Type == view.AggTypeSum || agg.Type == view.AggTypeLastValue
case metricpb.MetricDescriptor_DISTRIBUTION:
aggTypeMatch = agg.Type == view.AggTypeDistribution
}
if !aggTypeMatch {
return fmt.Errorf("stackdriver metric descriptor was not created with aggregation type %T", agg.Type)
}
labels := make(map[string]struct{}, len(keys)+len(e.defaultLabels))
for _, k := range keys {
labels[sanitize(k.Name())] = struct{}{}
}
for k := range e.defaultLabels {
labels[sanitize(k)] = struct{}{}
}
for _, k := range md.Labels {
if _, ok := labels[k.Key]; !ok {
return fmt.Errorf("stackdriver metric descriptor %q was not created with label %q", md.Type, k)
}
delete(labels, k.Key)
}
if len(labels) > 0 {
extra := make([]string, 0, len(labels))
for k := range labels {
extra = append(extra, k)
}
return fmt.Errorf("stackdriver metric descriptor %q contains unexpected labels: %s", md.Type, strings.Join(extra, ", "))
}
return nil
} }
var createMetricDescriptor = func(ctx context.Context, c *monitoring.MetricClient, mdr *monitoringpb.CreateMetricDescriptorRequest) (*metric.MetricDescriptor, error) { var createMetricDescriptor = func(ctx context.Context, c *monitoring.MetricClient, mdr *monitoringpb.CreateMetricDescriptorRequest) (*metric.MetricDescriptor, error) {
return c.CreateMetricDescriptor(ctx, mdr) return c.CreateMetricDescriptor(ctx, mdr)
} }
var getMetricDescriptor = func(ctx context.Context, c *monitoring.MetricClient, mdr *monitoringpb.GetMetricDescriptorRequest) (*metric.MetricDescriptor, error) {
return c.GetMetricDescriptor(ctx, mdr)
}
var createTimeSeries = func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error { var createTimeSeries = func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error {
return c.CreateTimeSeries(ctx, ts) return c.CreateTimeSeries(ctx, ts)
} }

View File

@ -121,7 +121,7 @@ func (e *traceExporter) uploadSpans(spans []*tracepb.Span) {
Spans: spans, Spans: spans,
} }
// Create a never-sampled span to prevent traces associated with exporter. // Create a never-sampled span to prevent traces associated with exporter.
ctx, cancel := e.o.newContextWithTimeout() ctx, cancel := newContextWithTimeout(e.o.Context, e.o.Timeout)
defer cancel() defer cancel()
ctx, span := trace.StartSpan( ctx, span := trace.StartSpan(
ctx, ctx,

View File

@ -1,89 +0,0 @@
// Copyright 2017, The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE.md file.
// Package cmpopts provides common options for the cmp package.
package cmpopts
import (
"math"
"reflect"
"github.com/google/go-cmp/cmp"
)
func equateAlways(_, _ interface{}) bool { return true }
// EquateEmpty returns a Comparer option that determines all maps and slices
// with a length of zero to be equal, regardless of whether they are nil.
//
// EquateEmpty can be used in conjunction with SortSlices and SortMaps.
func EquateEmpty() cmp.Option {
return cmp.FilterValues(isEmpty, cmp.Comparer(equateAlways))
}
func isEmpty(x, y interface{}) bool {
vx, vy := reflect.ValueOf(x), reflect.ValueOf(y)
return (x != nil && y != nil && vx.Type() == vy.Type()) &&
(vx.Kind() == reflect.Slice || vx.Kind() == reflect.Map) &&
(vx.Len() == 0 && vy.Len() == 0)
}
// EquateApprox returns a Comparer option that determines float32 or float64
// values to be equal if they are within a relative fraction or absolute margin.
// This option is not used when either x or y is NaN or infinite.
//
// The fraction determines that the difference of two values must be within the
// smaller fraction of the two values, while the margin determines that the two
// values must be within some absolute margin.
// To express only a fraction or only a margin, use 0 for the other parameter.
// The fraction and margin must be non-negative.
//
// The mathematical expression used is equivalent to:
// |x-y| ≤ max(fraction*min(|x|, |y|), margin)
//
// EquateApprox can be used in conjunction with EquateNaNs.
func EquateApprox(fraction, margin float64) cmp.Option {
if margin < 0 || fraction < 0 || math.IsNaN(margin) || math.IsNaN(fraction) {
panic("margin or fraction must be a non-negative number")
}
a := approximator{fraction, margin}
return cmp.Options{
cmp.FilterValues(areRealF64s, cmp.Comparer(a.compareF64)),
cmp.FilterValues(areRealF32s, cmp.Comparer(a.compareF32)),
}
}
type approximator struct{ frac, marg float64 }
func areRealF64s(x, y float64) bool {
return !math.IsNaN(x) && !math.IsNaN(y) && !math.IsInf(x, 0) && !math.IsInf(y, 0)
}
func areRealF32s(x, y float32) bool {
return areRealF64s(float64(x), float64(y))
}
func (a approximator) compareF64(x, y float64) bool {
relMarg := a.frac * math.Min(math.Abs(x), math.Abs(y))
return math.Abs(x-y) <= math.Max(a.marg, relMarg)
}
func (a approximator) compareF32(x, y float32) bool {
return a.compareF64(float64(x), float64(y))
}
// EquateNaNs returns a Comparer option that determines float32 and float64
// NaN values to be equal.
//
// EquateNaNs can be used in conjunction with EquateApprox.
func EquateNaNs() cmp.Option {
return cmp.Options{
cmp.FilterValues(areNaNsF64s, cmp.Comparer(equateAlways)),
cmp.FilterValues(areNaNsF32s, cmp.Comparer(equateAlways)),
}
}
func areNaNsF64s(x, y float64) bool {
return math.IsNaN(x) && math.IsNaN(y)
}
func areNaNsF32s(x, y float32) bool {
return areNaNsF64s(float64(x), float64(y))
}

View File

@ -1,207 +0,0 @@
// Copyright 2017, The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE.md file.
package cmpopts
import (
"fmt"
"reflect"
"unicode"
"unicode/utf8"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/internal/function"
)
// IgnoreFields returns an Option that ignores exported fields of the
// given names on a single struct type.
// The struct type is specified by passing in a value of that type.
//
// The name may be a dot-delimited string (e.g., "Foo.Bar") to ignore a
// specific sub-field that is embedded or nested within the parent struct.
//
// This does not handle unexported fields; use IgnoreUnexported instead.
func IgnoreFields(typ interface{}, names ...string) cmp.Option {
sf := newStructFilter(typ, names...)
return cmp.FilterPath(sf.filter, cmp.Ignore())
}
// IgnoreTypes returns an Option that ignores all values assignable to
// certain types, which are specified by passing in a value of each type.
func IgnoreTypes(typs ...interface{}) cmp.Option {
tf := newTypeFilter(typs...)
return cmp.FilterPath(tf.filter, cmp.Ignore())
}
type typeFilter []reflect.Type
func newTypeFilter(typs ...interface{}) (tf typeFilter) {
for _, typ := range typs {
t := reflect.TypeOf(typ)
if t == nil {
// This occurs if someone tries to pass in sync.Locker(nil)
panic("cannot determine type; consider using IgnoreInterfaces")
}
tf = append(tf, t)
}
return tf
}
func (tf typeFilter) filter(p cmp.Path) bool {
if len(p) < 1 {
return false
}
t := p.Last().Type()
for _, ti := range tf {
if t.AssignableTo(ti) {
return true
}
}
return false
}
// IgnoreInterfaces returns an Option that ignores all values or references of
// values assignable to certain interface types. These interfaces are specified
// by passing in an anonymous struct with the interface types embedded in it.
// For example, to ignore sync.Locker, pass in struct{sync.Locker}{}.
func IgnoreInterfaces(ifaces interface{}) cmp.Option {
tf := newIfaceFilter(ifaces)
return cmp.FilterPath(tf.filter, cmp.Ignore())
}
type ifaceFilter []reflect.Type
func newIfaceFilter(ifaces interface{}) (tf ifaceFilter) {
t := reflect.TypeOf(ifaces)
if ifaces == nil || t.Name() != "" || t.Kind() != reflect.Struct {
panic("input must be an anonymous struct")
}
for i := 0; i < t.NumField(); i++ {
fi := t.Field(i)
switch {
case !fi.Anonymous:
panic("struct cannot have named fields")
case fi.Type.Kind() != reflect.Interface:
panic("embedded field must be an interface type")
case fi.Type.NumMethod() == 0:
// This matches everything; why would you ever want this?
panic("cannot ignore empty interface")
default:
tf = append(tf, fi.Type)
}
}
return tf
}
func (tf ifaceFilter) filter(p cmp.Path) bool {
if len(p) < 1 {
return false
}
t := p.Last().Type()
for _, ti := range tf {
if t.AssignableTo(ti) {
return true
}
if t.Kind() != reflect.Ptr && reflect.PtrTo(t).AssignableTo(ti) {
return true
}
}
return false
}
// IgnoreUnexported returns an Option that only ignores the immediate unexported
// fields of a struct, including anonymous fields of unexported types.
// In particular, unexported fields within the struct's exported fields
// of struct types, including anonymous fields, will not be ignored unless the
// type of the field itself is also passed to IgnoreUnexported.
//
// Avoid ignoring unexported fields of a type which you do not control (i.e. a
// type from another repository), as changes to the implementation of such types
// may change how the comparison behaves. Prefer a custom Comparer instead.
func IgnoreUnexported(typs ...interface{}) cmp.Option {
ux := newUnexportedFilter(typs...)
return cmp.FilterPath(ux.filter, cmp.Ignore())
}
type unexportedFilter struct{ m map[reflect.Type]bool }
func newUnexportedFilter(typs ...interface{}) unexportedFilter {
ux := unexportedFilter{m: make(map[reflect.Type]bool)}
for _, typ := range typs {
t := reflect.TypeOf(typ)
if t == nil || t.Kind() != reflect.Struct {
panic(fmt.Sprintf("invalid struct type: %T", typ))
}
ux.m[t] = true
}
return ux
}
func (xf unexportedFilter) filter(p cmp.Path) bool {
sf, ok := p.Index(-1).(cmp.StructField)
if !ok {
return false
}
return xf.m[p.Index(-2).Type()] && !isExported(sf.Name())
}
// isExported reports whether the identifier is exported.
func isExported(id string) bool {
r, _ := utf8.DecodeRuneInString(id)
return unicode.IsUpper(r)
}
// IgnoreSliceElements returns an Option that ignores elements of []V.
// The discard function must be of the form "func(T) bool" which is used to
// ignore slice elements of type V, where V is assignable to T.
// Elements are ignored if the function reports true.
func IgnoreSliceElements(discardFunc interface{}) cmp.Option {
vf := reflect.ValueOf(discardFunc)
if !function.IsType(vf.Type(), function.ValuePredicate) || vf.IsNil() {
panic(fmt.Sprintf("invalid discard function: %T", discardFunc))
}
return cmp.FilterPath(func(p cmp.Path) bool {
si, ok := p.Index(-1).(cmp.SliceIndex)
if !ok {
return false
}
if !si.Type().AssignableTo(vf.Type().In(0)) {
return false
}
vx, vy := si.Values()
if vx.IsValid() && vf.Call([]reflect.Value{vx})[0].Bool() {
return true
}
if vy.IsValid() && vf.Call([]reflect.Value{vy})[0].Bool() {
return true
}
return false
}, cmp.Ignore())
}
// IgnoreMapEntries returns an Option that ignores entries of map[K]V.
// The discard function must be of the form "func(T, R) bool" which is used to
// ignore map entries of type K and V, where K and V are assignable to T and R.
// Entries are ignored if the function reports true.
func IgnoreMapEntries(discardFunc interface{}) cmp.Option {
vf := reflect.ValueOf(discardFunc)
if !function.IsType(vf.Type(), function.KeyValuePredicate) || vf.IsNil() {
panic(fmt.Sprintf("invalid discard function: %T", discardFunc))
}
return cmp.FilterPath(func(p cmp.Path) bool {
mi, ok := p.Index(-1).(cmp.MapIndex)
if !ok {
return false
}
if !mi.Key().Type().AssignableTo(vf.Type().In(0)) || !mi.Type().AssignableTo(vf.Type().In(1)) {
return false
}
k := mi.Key()
vx, vy := mi.Values()
if vx.IsValid() && vf.Call([]reflect.Value{k, vx})[0].Bool() {
return true
}
if vy.IsValid() && vf.Call([]reflect.Value{k, vy})[0].Bool() {
return true
}
return false
}, cmp.Ignore())
}

View File

@ -1,147 +0,0 @@
// Copyright 2017, The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE.md file.
package cmpopts
import (
"fmt"
"reflect"
"sort"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/internal/function"
)
// SortSlices returns a Transformer option that sorts all []V.
// The less function must be of the form "func(T, T) bool" which is used to
// sort any slice with element type V that is assignable to T.
//
// The less function must be:
// • Deterministic: less(x, y) == less(x, y)
// • Irreflexive: !less(x, x)
// • Transitive: if !less(x, y) and !less(y, z), then !less(x, z)
//
// The less function does not have to be "total". That is, if !less(x, y) and
// !less(y, x) for two elements x and y, their relative order is maintained.
//
// SortSlices can be used in conjunction with EquateEmpty.
func SortSlices(lessFunc interface{}) cmp.Option {
vf := reflect.ValueOf(lessFunc)
if !function.IsType(vf.Type(), function.Less) || vf.IsNil() {
panic(fmt.Sprintf("invalid less function: %T", lessFunc))
}
ss := sliceSorter{vf.Type().In(0), vf}
return cmp.FilterValues(ss.filter, cmp.Transformer("cmpopts.SortSlices", ss.sort))
}
type sliceSorter struct {
in reflect.Type // T
fnc reflect.Value // func(T, T) bool
}
func (ss sliceSorter) filter(x, y interface{}) bool {
vx, vy := reflect.ValueOf(x), reflect.ValueOf(y)
if !(x != nil && y != nil && vx.Type() == vy.Type()) ||
!(vx.Kind() == reflect.Slice && vx.Type().Elem().AssignableTo(ss.in)) ||
(vx.Len() <= 1 && vy.Len() <= 1) {
return false
}
// Check whether the slices are already sorted to avoid an infinite
// recursion cycle applying the same transform to itself.
ok1 := sort.SliceIsSorted(x, func(i, j int) bool { return ss.less(vx, i, j) })
ok2 := sort.SliceIsSorted(y, func(i, j int) bool { return ss.less(vy, i, j) })
return !ok1 || !ok2
}
func (ss sliceSorter) sort(x interface{}) interface{} {
src := reflect.ValueOf(x)
dst := reflect.MakeSlice(src.Type(), src.Len(), src.Len())
for i := 0; i < src.Len(); i++ {
dst.Index(i).Set(src.Index(i))
}
sort.SliceStable(dst.Interface(), func(i, j int) bool { return ss.less(dst, i, j) })
ss.checkSort(dst)
return dst.Interface()
}
func (ss sliceSorter) checkSort(v reflect.Value) {
start := -1 // Start of a sequence of equal elements.
for i := 1; i < v.Len(); i++ {
if ss.less(v, i-1, i) {
// Check that first and last elements in v[start:i] are equal.
if start >= 0 && (ss.less(v, start, i-1) || ss.less(v, i-1, start)) {
panic(fmt.Sprintf("incomparable values detected: want equal elements: %v", v.Slice(start, i)))
}
start = -1
} else if start == -1 {
start = i
}
}
}
func (ss sliceSorter) less(v reflect.Value, i, j int) bool {
vx, vy := v.Index(i), v.Index(j)
return ss.fnc.Call([]reflect.Value{vx, vy})[0].Bool()
}
// SortMaps returns a Transformer option that flattens map[K]V types to be a
// sorted []struct{K, V}. The less function must be of the form
// "func(T, T) bool" which is used to sort any map with key K that is
// assignable to T.
//
// Flattening the map into a slice has the property that cmp.Equal is able to
// use Comparers on K or the K.Equal method if it exists.
//
// The less function must be:
// • Deterministic: less(x, y) == less(x, y)
// • Irreflexive: !less(x, x)
// • Transitive: if !less(x, y) and !less(y, z), then !less(x, z)
// • Total: if x != y, then either less(x, y) or less(y, x)
//
// SortMaps can be used in conjunction with EquateEmpty.
func SortMaps(lessFunc interface{}) cmp.Option {
vf := reflect.ValueOf(lessFunc)
if !function.IsType(vf.Type(), function.Less) || vf.IsNil() {
panic(fmt.Sprintf("invalid less function: %T", lessFunc))
}
ms := mapSorter{vf.Type().In(0), vf}
return cmp.FilterValues(ms.filter, cmp.Transformer("cmpopts.SortMaps", ms.sort))
}
type mapSorter struct {
in reflect.Type // T
fnc reflect.Value // func(T, T) bool
}
func (ms mapSorter) filter(x, y interface{}) bool {
vx, vy := reflect.ValueOf(x), reflect.ValueOf(y)
return (x != nil && y != nil && vx.Type() == vy.Type()) &&
(vx.Kind() == reflect.Map && vx.Type().Key().AssignableTo(ms.in)) &&
(vx.Len() != 0 || vy.Len() != 0)
}
func (ms mapSorter) sort(x interface{}) interface{} {
src := reflect.ValueOf(x)
outType := reflect.StructOf([]reflect.StructField{
{Name: "K", Type: src.Type().Key()},
{Name: "V", Type: src.Type().Elem()},
})
dst := reflect.MakeSlice(reflect.SliceOf(outType), src.Len(), src.Len())
for i, k := range src.MapKeys() {
v := reflect.New(outType).Elem()
v.Field(0).Set(k)
v.Field(1).Set(src.MapIndex(k))
dst.Index(i).Set(v)
}
sort.Slice(dst.Interface(), func(i, j int) bool { return ms.less(dst, i, j) })
ms.checkSort(dst)
return dst.Interface()
}
func (ms mapSorter) checkSort(v reflect.Value) {
for i := 1; i < v.Len(); i++ {
if !ms.less(v, i-1, i) {
panic(fmt.Sprintf("partial order detected: want %v < %v", v.Index(i-1), v.Index(i)))
}
}
}
func (ms mapSorter) less(v reflect.Value, i, j int) bool {
vx, vy := v.Index(i).Field(0), v.Index(j).Field(0)
return ms.fnc.Call([]reflect.Value{vx, vy})[0].Bool()
}

View File

@ -1,182 +0,0 @@
// Copyright 2017, The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE.md file.
package cmpopts
import (
"fmt"
"reflect"
"strings"
"github.com/google/go-cmp/cmp"
)
// filterField returns a new Option where opt is only evaluated on paths that
// include a specific exported field on a single struct type.
// The struct type is specified by passing in a value of that type.
//
// The name may be a dot-delimited string (e.g., "Foo.Bar") to select a
// specific sub-field that is embedded or nested within the parent struct.
func filterField(typ interface{}, name string, opt cmp.Option) cmp.Option {
// TODO: This is currently unexported over concerns of how helper filters
// can be composed together easily.
// TODO: Add tests for FilterField.
sf := newStructFilter(typ, name)
return cmp.FilterPath(sf.filter, opt)
}
type structFilter struct {
t reflect.Type // The root struct type to match on
ft fieldTree // Tree of fields to match on
}
func newStructFilter(typ interface{}, names ...string) structFilter {
// TODO: Perhaps allow * as a special identifier to allow ignoring any
// number of path steps until the next field match?
// This could be useful when a concrete struct gets transformed into
// an anonymous struct where it is not possible to specify that by type,
// but the transformer happens to provide guarantees about the names of
// the transformed fields.
t := reflect.TypeOf(typ)
if t == nil || t.Kind() != reflect.Struct {
panic(fmt.Sprintf("%T must be a struct", typ))
}
var ft fieldTree
for _, name := range names {
cname, err := canonicalName(t, name)
if err != nil {
panic(fmt.Sprintf("%s: %v", strings.Join(cname, "."), err))
}
ft.insert(cname)
}
return structFilter{t, ft}
}
func (sf structFilter) filter(p cmp.Path) bool {
for i, ps := range p {
if ps.Type().AssignableTo(sf.t) && sf.ft.matchPrefix(p[i+1:]) {
return true
}
}
return false
}
// fieldTree represents a set of dot-separated identifiers.
//
// For example, inserting the following selectors:
// Foo
// Foo.Bar.Baz
// Foo.Buzz
// Nuka.Cola.Quantum
//
// Results in a tree of the form:
// {sub: {
// "Foo": {ok: true, sub: {
// "Bar": {sub: {
// "Baz": {ok: true},
// }},
// "Buzz": {ok: true},
// }},
// "Nuka": {sub: {
// "Cola": {sub: {
// "Quantum": {ok: true},
// }},
// }},
// }}
type fieldTree struct {
ok bool // Whether this is a specified node
sub map[string]fieldTree // The sub-tree of fields under this node
}
// insert inserts a sequence of field accesses into the tree.
func (ft *fieldTree) insert(cname []string) {
if ft.sub == nil {
ft.sub = make(map[string]fieldTree)
}
if len(cname) == 0 {
ft.ok = true
return
}
sub := ft.sub[cname[0]]
sub.insert(cname[1:])
ft.sub[cname[0]] = sub
}
// matchPrefix reports whether any selector in the fieldTree matches
// the start of path p.
func (ft fieldTree) matchPrefix(p cmp.Path) bool {
for _, ps := range p {
switch ps := ps.(type) {
case cmp.StructField:
ft = ft.sub[ps.Name()]
if ft.ok {
return true
}
if len(ft.sub) == 0 {
return false
}
case cmp.Indirect:
default:
return false
}
}
return false
}
// canonicalName returns a list of identifiers where any struct field access
// through an embedded field is expanded to include the names of the embedded
// types themselves.
//
// For example, suppose field "Foo" is not directly in the parent struct,
// but actually from an embedded struct of type "Bar". Then, the canonical name
// of "Foo" is actually "Bar.Foo".
//
// Suppose field "Foo" is not directly in the parent struct, but actually
// a field in two different embedded structs of types "Bar" and "Baz".
// Then the selector "Foo" causes a panic since it is ambiguous which one it
// refers to. The user must specify either "Bar.Foo" or "Baz.Foo".
func canonicalName(t reflect.Type, sel string) ([]string, error) {
var name string
sel = strings.TrimPrefix(sel, ".")
if sel == "" {
return nil, fmt.Errorf("name must not be empty")
}
if i := strings.IndexByte(sel, '.'); i < 0 {
name, sel = sel, ""
} else {
name, sel = sel[:i], sel[i:]
}
// Type must be a struct or pointer to struct.
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
if t.Kind() != reflect.Struct {
return nil, fmt.Errorf("%v must be a struct", t)
}
// Find the canonical name for this current field name.
// If the field exists in an embedded struct, then it will be expanded.
if !isExported(name) {
// Disallow unexported fields:
// * To discourage people from actually touching unexported fields
// * FieldByName is buggy (https://golang.org/issue/4876)
return []string{name}, fmt.Errorf("name must be exported")
}
sf, ok := t.FieldByName(name)
if !ok {
return []string{name}, fmt.Errorf("does not exist")
}
var ss []string
for i := range sf.Index {
ss = append(ss, t.FieldByIndex(sf.Index[:i+1]).Name)
}
if sel == "" {
return ss, nil
}
ssPost, err := canonicalName(sf.Type, sel)
return append(ss, ssPost...), err
}

View File

@ -1,35 +0,0 @@
// Copyright 2018, The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE.md file.
package cmpopts
import (
"github.com/google/go-cmp/cmp"
)
type xformFilter struct{ xform cmp.Option }
func (xf xformFilter) filter(p cmp.Path) bool {
for _, ps := range p {
if t, ok := ps.(cmp.Transform); ok && t.Option() == xf.xform {
return false
}
}
return true
}
// AcyclicTransformer returns a Transformer with a filter applied that ensures
// that the transformer cannot be recursively applied upon its own output.
//
// An example use case is a transformer that splits a string by lines:
// AcyclicTransformer("SplitLines", func(s string) []string{
// return strings.Split(s, "\n")
// })
//
// Had this been an unfiltered Transformer instead, this would result in an
// infinite cycle converting a string to []string to [][]string and so on.
func AcyclicTransformer(name string, xformFunc interface{}) cmp.Option {
xf := xformFilter{cmp.Transformer(name, xformFunc)}
return cmp.FilterPath(xf.filter, xf.xform)
}

10
vendor/knative.dev/pkg/Gopkg.lock generated vendored
View File

@ -29,15 +29,14 @@
version = "v0.1.0" version = "v0.1.0"
[[projects]] [[projects]]
digest = "1:83bd9ccdcc61bb43f45e4336cf9622849b5a867ef137f8b53303968202970225" digest = "1:c3fd5ddaad733530174bba5dd787d98a45d181851a95a0b7362be7bce7144f56"
name = "contrib.go.opencensus.io/exporter/stackdriver" name = "contrib.go.opencensus.io/exporter/stackdriver"
packages = [ packages = [
".", ".",
"monitoredresource", "monitoredresource",
] ]
pruneopts = "NUT" pruneopts = "NUT"
revision = "68e3d742b03c099c35428443886e65d9587c8d76" revision = "59d068f8d8ff5b653916aa30cdc4e13c7f15d56e"
version = "v0.12.2"
[[projects]] [[projects]]
digest = "1:7b5f423f5b0dd3dfa32a19a6183b0ab9129bff371ebf3f9efae32f87e4986d8f" digest = "1:7b5f423f5b0dd3dfa32a19a6183b0ab9129bff371ebf3f9efae32f87e4986d8f"
@ -1296,14 +1295,14 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:6e839a9b4183b0fbf0df609fe9a6d226c941fa9221052ba0075b25cb353588a5" digest = "1:052dcbb5c89b1843e0f5130dc9a0e815a89079c0329bc93c86f7d8c8569b8132"
name = "knative.dev/test-infra" name = "knative.dev/test-infra"
packages = [ packages = [
"scripts", "scripts",
"tools/dep-collector", "tools/dep-collector",
] ]
pruneopts = "UT" pruneopts = "UT"
revision = "9363d2f4cafd2ad3c1bfc925f447c91a1776be00" revision = "30f1a1f7b77ea46a34091c518b10ceed2c43fb3b"
[[projects]] [[projects]]
digest = "1:8730e0150dfb2b7e173890c8b9868e7a273082ef8e39f4940e3506a481cf895c" digest = "1:8730e0150dfb2b7e173890c8b9868e7a273082ef8e39f4940e3506a481cf895c"
@ -1354,6 +1353,7 @@
"github.com/rogpeppe/go-internal/semver", "github.com/rogpeppe/go-internal/semver",
"github.com/spf13/pflag", "github.com/spf13/pflag",
"github.com/tsenart/vegeta/lib", "github.com/tsenart/vegeta/lib",
"go.opencensus.io/metric/metricdata",
"go.opencensus.io/plugin/ochttp", "go.opencensus.io/plugin/ochttp",
"go.opencensus.io/plugin/ochttp/propagation/b3", "go.opencensus.io/plugin/ochttp/propagation/b3",
"go.opencensus.io/stats", "go.opencensus.io/stats",

View File

@ -75,9 +75,8 @@ required = [
[[constraint]] [[constraint]]
name = "contrib.go.opencensus.io/exporter/stackdriver" name = "contrib.go.opencensus.io/exporter/stackdriver"
# The build fails against 0.12.6 and newer because # With the fix of https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/237
# stackdriver.Options.GetMonitoredResource was removed. revision = "59d068f8d8ff5b653916aa30cdc4e13c7f15d56e"
version = "<=v0.12.5"
[[constraint]] [[constraint]]
name = "github.com/google/mako" name = "github.com/google/mako"

View File

@ -33,6 +33,11 @@ type flushable interface {
Flush() Flush()
} }
type stoppable interface {
// StopMetricsExporter stops the exporter
StopMetricsExporter()
}
// ExporterOptions contains options for configuring the exporter. // ExporterOptions contains options for configuring the exporter.
type ExporterOptions struct { type ExporterOptions struct {
// Domain is the metrics domain. e.g. "knative.dev". Must be present. // Domain is the metrics domain. e.g. "knative.dev". Must be present.
@ -126,11 +131,12 @@ func newMetricsExporter(config *metricsConfig, logger *zap.SugaredLogger) (view.
// If there is a Prometheus Exporter server running, stop it. // If there is a Prometheus Exporter server running, stop it.
resetCurPromSrv() resetCurPromSrv()
if ce != nil { // TODO(https://github.com/knative/pkg/issues/866): Move Stackdriver and Promethus
// UnregisterExporter is idempotent and it can be called multiple times for the same exporter // operations before stopping to an interface.
// without side effects. if se, ok := ce.(stoppable); ok {
view.UnregisterExporter(ce) se.StopMetricsExporter()
} }
var err error var err error
var e view.Exporter var e view.Exporter
switch config.backendDestination { switch config.backendDestination {
@ -156,7 +162,6 @@ func getCurMetricsExporter() view.Exporter {
func setCurMetricsExporter(e view.Exporter) { func setCurMetricsExporter(e view.Exporter) {
metricsMux.Lock() metricsMux.Lock()
defer metricsMux.Unlock() defer metricsMux.Unlock()
view.RegisterExporter(e)
curMetricsExporter = e curMetricsExporter = e
} }

View File

@ -20,8 +20,7 @@ package metrics
import ( import (
"contrib.go.opencensus.io/exporter/stackdriver/monitoredresource" "contrib.go.opencensus.io/exporter/stackdriver/monitoredresource"
"go.opencensus.io/stats/view" "go.opencensus.io/metric/metricdata"
"go.opencensus.io/tag"
"knative.dev/pkg/metrics/metricskey" "knative.dev/pkg/metrics/metricskey"
) )
@ -89,75 +88,72 @@ func (ki *KnativeSource) MonitoredResource() (resType string, labels map[string]
} }
func GetKnativeBrokerMonitoredResource( func GetKnativeBrokerMonitoredResource(
v *view.View, tags []tag.Tag, gm *gcpMetadata) ([]tag.Tag, monitoredresource.Interface) { des *metricdata.Descriptor, tags map[string]string, gm *gcpMetadata) (map[string]string, monitoredresource.Interface) {
tagsMap := getTagsMap(tags)
kb := &KnativeBroker{ kb := &KnativeBroker{
// The first three resource labels are from metadata. // The first three resource labels are from metadata.
Project: gm.project, Project: gm.project,
Location: gm.location, Location: gm.location,
ClusterName: gm.cluster, ClusterName: gm.cluster,
// The rest resource labels are from metrics labels. // The rest resource labels are from metrics labels.
NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap), NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tags),
BrokerName: valueOrUnknown(metricskey.LabelBrokerName, tagsMap), BrokerName: valueOrUnknown(metricskey.LabelBrokerName, tags),
} }
var newTags []tag.Tag metricLabels := map[string]string{}
for _, t := range tags { for k, v := range tags {
// Keep the metrics labels that are not resource labels // Keep the metrics labels that are not resource labels
if !metricskey.KnativeBrokerLabels.Has(t.Key.Name()) { if !metricskey.KnativeBrokerLabels.Has(k) {
newTags = append(newTags, t) metricLabels[k] = v
} }
} }
return newTags, kb return metricLabels, kb
} }
func GetKnativeTriggerMonitoredResource( func GetKnativeTriggerMonitoredResource(
v *view.View, tags []tag.Tag, gm *gcpMetadata) ([]tag.Tag, monitoredresource.Interface) { des *metricdata.Descriptor, tags map[string]string, gm *gcpMetadata) (map[string]string, monitoredresource.Interface) {
tagsMap := getTagsMap(tags)
kt := &KnativeTrigger{ kt := &KnativeTrigger{
// The first three resource labels are from metadata. // The first three resource labels are from metadata.
Project: gm.project, Project: gm.project,
Location: gm.location, Location: gm.location,
ClusterName: gm.cluster, ClusterName: gm.cluster,
// The rest resource labels are from metrics labels. // The rest resource labels are from metrics labels.
NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap), NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tags),
BrokerName: valueOrUnknown(metricskey.LabelBrokerName, tagsMap), BrokerName: valueOrUnknown(metricskey.LabelBrokerName, tags),
TriggerName: valueOrUnknown(metricskey.LabelTriggerName, tagsMap), TriggerName: valueOrUnknown(metricskey.LabelTriggerName, tags),
} }
var newTags []tag.Tag metricLabels := map[string]string{}
for _, t := range tags { for k, v := range tags {
// Keep the metrics labels that are not resource labels // Keep the metrics labels that are not resource labels
if !metricskey.KnativeTriggerLabels.Has(t.Key.Name()) { if !metricskey.KnativeTriggerLabels.Has(k) {
newTags = append(newTags, t) metricLabels[k] = v
} }
} }
return newTags, kt return metricLabels, kt
} }
func GetKnativeSourceMonitoredResource( func GetKnativeSourceMonitoredResource(
v *view.View, tags []tag.Tag, gm *gcpMetadata) ([]tag.Tag, monitoredresource.Interface) { des *metricdata.Descriptor, tags map[string]string, gm *gcpMetadata) (map[string]string, monitoredresource.Interface) {
tagsMap := getTagsMap(tags) ks := &KnativeSource{
ki := &KnativeSource{
// The first three resource labels are from metadata. // The first three resource labels are from metadata.
Project: gm.project, Project: gm.project,
Location: gm.location, Location: gm.location,
ClusterName: gm.cluster, ClusterName: gm.cluster,
// The rest resource labels are from metrics labels. // The rest resource labels are from metrics labels.
NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap), NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tags),
SourceName: valueOrUnknown(metricskey.LabelName, tagsMap), SourceName: valueOrUnknown(metricskey.LabelName, tags),
SourceResourceGroup: valueOrUnknown(metricskey.LabelResourceGroup, tagsMap), SourceResourceGroup: valueOrUnknown(metricskey.LabelResourceGroup, tags),
} }
var newTags []tag.Tag metricLabels := map[string]string{}
for _, t := range tags { for k, v := range tags {
// Keep the metrics labels that are not resource labels // Keep the metrics labels that are not resource labels
if !metricskey.KnativeSourceLabels.Has(t.Key.Name()) { if !metricskey.KnativeSourceLabels.Has(k) {
newTags = append(newTags, t) metricLabels[k] = v
} }
} }
return newTags, ki return metricLabels, ks
} }

View File

@ -18,8 +18,7 @@ package metrics
import ( import (
"contrib.go.opencensus.io/exporter/stackdriver/monitoredresource" "contrib.go.opencensus.io/exporter/stackdriver/monitoredresource"
"go.opencensus.io/stats/view" "go.opencensus.io/metric/metricdata"
"go.opencensus.io/tag"
"knative.dev/pkg/metrics/metricskey" "knative.dev/pkg/metrics/metricskey"
) )
@ -49,27 +48,26 @@ func (kr *KnativeRevision) MonitoredResource() (resType string, labels map[strin
} }
func GetKnativeRevisionMonitoredResource( func GetKnativeRevisionMonitoredResource(
v *view.View, tags []tag.Tag, gm *gcpMetadata) ([]tag.Tag, monitoredresource.Interface) { des *metricdata.Descriptor, tags map[string]string, gm *gcpMetadata) (map[string]string, monitoredresource.Interface) {
tagsMap := getTagsMap(tags)
kr := &KnativeRevision{ kr := &KnativeRevision{
// The first three resource labels are from metadata. // The first three resource labels are from metadata.
Project: gm.project, Project: gm.project,
Location: gm.location, Location: gm.location,
ClusterName: gm.cluster, ClusterName: gm.cluster,
// The rest resource labels are from metrics labels. // The rest resource labels are from metrics labels.
NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap), NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tags),
ServiceName: valueOrUnknown(metricskey.LabelServiceName, tagsMap), ServiceName: valueOrUnknown(metricskey.LabelServiceName, tags),
ConfigurationName: valueOrUnknown(metricskey.LabelConfigurationName, tagsMap), ConfigurationName: valueOrUnknown(metricskey.LabelConfigurationName, tags),
RevisionName: valueOrUnknown(metricskey.LabelRevisionName, tagsMap), RevisionName: valueOrUnknown(metricskey.LabelRevisionName, tags),
} }
var newTags []tag.Tag metricLabels := map[string]string{}
for _, t := range tags { for k, v := range tags {
// Keep the metrics labels that are not resource labels // Keep the metrics labels that are not resource labels
if !metricskey.KnativeRevisionLabels.Has(t.Key.Name()) { if !metricskey.KnativeRevisionLabels.Has(k) {
newTags = append(newTags, t) metricLabels[k] = v
} }
} }
return newTags, kr return metricLabels, kr
} }

View File

@ -23,8 +23,8 @@ import (
"contrib.go.opencensus.io/exporter/stackdriver" "contrib.go.opencensus.io/exporter/stackdriver"
"contrib.go.opencensus.io/exporter/stackdriver/monitoredresource" "contrib.go.opencensus.io/exporter/stackdriver/monitoredresource"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/stats/view" "go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/api/option" "google.golang.org/api/option"
"knative.dev/pkg/metrics/metricskey" "knative.dev/pkg/metrics/metricskey"
@ -101,14 +101,20 @@ func init() {
} }
func newOpencensusSDExporter(o stackdriver.Options) (view.Exporter, error) { func newOpencensusSDExporter(o stackdriver.Options) (view.Exporter, error) {
return stackdriver.NewExporter(o) e, err := stackdriver.NewExporter(o)
if err == nil {
// Start the exporter.
// TODO(https://github.com/knative/pkg/issues/866): Move this to an interface.
e.StartMetricsExporter()
}
return e, nil
} }
// TODO should be properly refactored to be able to inject the getMonitoredResourceFunc function. // TODO should be properly refactored to be able to inject the getResourceByDescriptorFunc function.
// See https://github.com/knative/pkg/issues/608 // See https://github.com/knative/pkg/issues/608
func newStackdriverExporter(config *metricsConfig, logger *zap.SugaredLogger) (view.Exporter, error) { func newStackdriverExporter(config *metricsConfig, logger *zap.SugaredLogger) (view.Exporter, error) {
gm := getMergedGCPMetadata(config) gm := getMergedGCPMetadata(config)
mtf := getMetricTypeFunc(config.stackdriverMetricTypePrefix, config.stackdriverCustomMetricTypePrefix) mpf := getMetricPrefixFunc(config.stackdriverMetricTypePrefix, config.stackdriverCustomMetricTypePrefix)
co, err := getStackdriverExporterClientOptions(&config.stackdriverClientConfig) co, err := getStackdriverExporterClientOptions(&config.stackdriverClientConfig)
if err != nil { if err != nil {
logger.Warnw("Issue configuring Stackdriver exporter client options, no additional client options will be used: ", zap.Error(err)) logger.Warnw("Issue configuring Stackdriver exporter client options, no additional client options will be used: ", zap.Error(err))
@ -119,9 +125,9 @@ func newStackdriverExporter(config *metricsConfig, logger *zap.SugaredLogger) (v
Location: gm.location, Location: gm.location,
MonitoringClientOptions: co, MonitoringClientOptions: co,
TraceClientOptions: co, TraceClientOptions: co,
GetMetricDisplayName: mtf, // Use metric type for display name for custom metrics. No impact on built-in metrics. GetMetricPrefix: mpf,
GetMetricType: mtf, ResourceByDescriptor: getResourceByDescriptorFunc(config.stackdriverMetricTypePrefix, gm),
GetMonitoredResource: getMonitoredResourceFunc(config.stackdriverMetricTypePrefix, gm), ReportingInterval: config.reportingPeriod,
DefaultMonitoringLabels: &stackdriver.Labels{}, DefaultMonitoringLabels: &stackdriver.Labels{},
}) })
if err != nil { if err != nil {
@ -172,39 +178,39 @@ func getMergedGCPMetadata(config *metricsConfig) *gcpMetadata {
return gm return gm
} }
func getMonitoredResourceFunc(metricTypePrefix string, gm *gcpMetadata) func(v *view.View, tags []tag.Tag) ([]tag.Tag, monitoredresource.Interface) { func getResourceByDescriptorFunc(metricTypePrefix string, gm *gcpMetadata) func(*metricdata.Descriptor, map[string]string) (map[string]string, monitoredresource.Interface) {
return func(view *view.View, tags []tag.Tag) ([]tag.Tag, monitoredresource.Interface) { return func(des *metricdata.Descriptor, tags map[string]string) (map[string]string, monitoredresource.Interface) {
metricType := path.Join(metricTypePrefix, view.Measure.Name()) metricType := path.Join(metricTypePrefix, des.Name)
if metricskey.KnativeRevisionMetrics.Has(metricType) { if metricskey.KnativeRevisionMetrics.Has(metricType) {
return GetKnativeRevisionMonitoredResource(view, tags, gm) return GetKnativeRevisionMonitoredResource(des, tags, gm)
} else if metricskey.KnativeBrokerMetrics.Has(metricType) { } else if metricskey.KnativeBrokerMetrics.Has(metricType) {
return GetKnativeBrokerMonitoredResource(view, tags, gm) return GetKnativeBrokerMonitoredResource(des, tags, gm)
} else if metricskey.KnativeTriggerMetrics.Has(metricType) { } else if metricskey.KnativeTriggerMetrics.Has(metricType) {
return GetKnativeTriggerMonitoredResource(view, tags, gm) return GetKnativeTriggerMonitoredResource(des, tags, gm)
} else if metricskey.KnativeSourceMetrics.Has(metricType) { } else if metricskey.KnativeSourceMetrics.Has(metricType) {
return GetKnativeSourceMonitoredResource(view, tags, gm) return GetKnativeSourceMonitoredResource(des, tags, gm)
} }
// Unsupported metric by knative_revision, knative_broker, knative_trigger, and knative_source, use "global" resource type. // Unsupported metric by knative_revision, knative_broker, knative_trigger, and knative_source, use "global" resource type.
return getGlobalMonitoredResource(view, tags) return getGlobalMonitoredResource(des, tags)
} }
} }
func getGlobalMonitoredResource(v *view.View, tags []tag.Tag) ([]tag.Tag, monitoredresource.Interface) { func getGlobalMonitoredResource(des *metricdata.Descriptor, tags map[string]string) (map[string]string, monitoredresource.Interface) {
return tags, &Global{} return tags, &Global{}
} }
func getMetricTypeFunc(metricTypePrefix, customMetricTypePrefix string) func(view *view.View) string { func getMetricPrefixFunc(metricTypePrefix, customMetricTypePrefix string) func(name string) string {
return func(view *view.View) string { return func(name string) string {
metricType := path.Join(metricTypePrefix, view.Measure.Name()) metricType := path.Join(metricTypePrefix, name)
inServing := metricskey.KnativeRevisionMetrics.Has(metricType) inServing := metricskey.KnativeRevisionMetrics.Has(metricType)
inEventing := metricskey.KnativeBrokerMetrics.Has(metricType) || inEventing := metricskey.KnativeBrokerMetrics.Has(metricType) ||
metricskey.KnativeTriggerMetrics.Has(metricType) || metricskey.KnativeTriggerMetrics.Has(metricType) ||
metricskey.KnativeSourceMetrics.Has(metricType) metricskey.KnativeSourceMetrics.Has(metricType)
if inServing || inEventing { if inServing || inEventing {
return metricType return metricTypePrefix
} }
// Unsupported metric by knative_revision, use custom domain. // Unsupported metric by knative_revision, use custom domain.
return path.Join(customMetricTypePrefix, view.Measure.Name()) return customMetricTypePrefix
} }
} }

View File

@ -38,7 +38,7 @@ var (
) )
type Operation interface { type Operation interface {
AcquireGKEProject(*string) (*boskoscommon.Resource, error) AcquireGKEProject(*string, string) (*boskoscommon.Resource, error)
ReleaseGKEProject(*string, string) error ReleaseGKEProject(*string, string) error
} }
@ -57,15 +57,15 @@ func newClient(host *string) *boskosclient.Client {
// AcquireGKEProject acquires GKE Boskos Project with "free" state, and not // AcquireGKEProject acquires GKE Boskos Project with "free" state, and not
// owned by anyone, sets its state to "busy" and assign it an owner of *host, // owned by anyone, sets its state to "busy" and assign it an owner of *host,
// which by default is env var `JOB_NAME`. // which by default is env var `JOB_NAME`.
func (c *Client) AcquireGKEProject(host *string) (*boskoscommon.Resource, error) { func (c *Client) AcquireGKEProject(host *string, resType string) (*boskoscommon.Resource, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultWaitDuration) ctx, cancel := context.WithTimeout(context.Background(), defaultWaitDuration)
defer cancel() defer cancel()
p, err := newClient(host).AcquireWait(ctx, GKEProjectResource, boskoscommon.Free, boskoscommon.Busy) p, err := newClient(host).AcquireWait(ctx, resType, boskoscommon.Free, boskoscommon.Busy)
if err != nil { if err != nil {
return nil, fmt.Errorf("boskos failed to acquire GKE project: %v", err) return nil, fmt.Errorf("boskos failed to acquire GKE project: %v", err)
} }
if p == nil { if p == nil {
return nil, fmt.Errorf("boskos does not have a free %s at the moment", GKEProjectResource) return nil, fmt.Errorf("boskos does not have a free %s at the moment", resType)
} }
return p, nil return p, nil
} }

View File

@ -44,11 +44,12 @@ func (c *FakeBoskosClient) GetResources() []*boskoscommon.Resource {
} }
// AcquireGKEProject fakes to be no op // AcquireGKEProject fakes to be no op
func (c *FakeBoskosClient) AcquireGKEProject(host *string) (*boskoscommon.Resource, error) { func (c *FakeBoskosClient) AcquireGKEProject(host *string, resType string) (*boskoscommon.Resource, error) {
for _, res := range c.resources { for _, res := range c.resources {
if res.State == boskoscommon.Free { if res.State == boskoscommon.Free {
res.State = boskoscommon.Busy res.State = boskoscommon.Busy
res.Owner = c.getOwner(host) res.Owner = c.getOwner(host)
res.Type = resType
return res, nil return res, nil
} }
} }

View File

@ -37,6 +37,7 @@ const (
regionEnv = "E2E_CLUSTER_REGION" regionEnv = "E2E_CLUSTER_REGION"
backupRegionEnv = "E2E_CLUSTER_BACKUP_REGIONS" backupRegionEnv = "E2E_CLUSTER_BACKUP_REGIONS"
defaultGKEVersion = "latest" defaultGKEVersion = "latest"
DefaultResourceType = boskos.GKEProjectResource
ClusterRunning = "RUNNING" ClusterRunning = "RUNNING"
) )
@ -66,6 +67,9 @@ type GKERequest struct {
// NeedsCleanup: enforce clean up if given this option, used when running // NeedsCleanup: enforce clean up if given this option, used when running
// locally // locally
NeedsCleanup bool NeedsCleanup bool
// ResourceType: the boskos resource type to acquire to hold the cluster in create
ResourceType string
} }
// GKECluster implements ClusterOperations // GKECluster implements ClusterOperations
@ -123,6 +127,10 @@ func (gs *GKEClient) Setup(r GKERequest) ClusterOperations {
r.BackupRegions = make([]string, 0) r.BackupRegions = make([]string, 0)
} }
if r.ResourceType == "" {
r.ResourceType = DefaultResourceType
}
gc.Request = &r gc.Request = &r
client, err := gke.NewSDKClient() client, err := gke.NewSDKClient()
@ -168,7 +176,7 @@ func (gc *GKECluster) Acquire() error {
// Get project name from boskos if running in Prow, otherwise it should fail // Get project name from boskos if running in Prow, otherwise it should fail
// since we don't know which project to use // since we don't know which project to use
if common.IsProw() { if common.IsProw() {
project, err := gc.boskosOps.AcquireGKEProject(nil) project, err := gc.boskosOps.AcquireGKEProject(nil, gc.Request.ResourceType)
if err != nil { if err != nil {
return fmt.Errorf("failed acquiring boskos project: '%v'", err) return fmt.Errorf("failed acquiring boskos project: '%v'", err)
} }

View File

@ -55,6 +55,7 @@ func (rw *RequestWrapper) addOptions() {
flag.StringVar(&rw.Request.Zone, "zone", "", "GCP zone") flag.StringVar(&rw.Request.Zone, "zone", "", "GCP zone")
flag.StringVar(&rw.Request.Project, "project", "", "GCP project") flag.StringVar(&rw.Request.Project, "project", "", "GCP project")
flag.StringVar(&rw.Request.ClusterName, "name", "", "cluster name") flag.StringVar(&rw.Request.ClusterName, "name", "", "cluster name")
flag.StringVar(&rw.Request.ResourceType, "resource-type", "", "Boskos Resource Type")
flag.StringVar(&rw.BackupRegionsStr, "backup-regions", "", "GCP regions as backup, separated by comma") flag.StringVar(&rw.BackupRegionsStr, "backup-regions", "", "GCP regions as backup, separated by comma")
flag.StringVar(&rw.AddonsStr, "addons", "", "addons to be added, separated by comma") flag.StringVar(&rw.AddonsStr, "addons", "", "addons to be added, separated by comma")
flag.BoolVar(&rw.Request.SkipCreation, "skip-creation", false, "should skip creation or not") flag.BoolVar(&rw.Request.SkipCreation, "skip-creation", false, "should skip creation or not")