diff --git a/Gopkg.lock b/Gopkg.lock index 816a51b8e..1f611ce4e 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -29,15 +29,14 @@ version = "v0.1.0" [[projects]] - digest = "1:83bd9ccdcc61bb43f45e4336cf9622849b5a867ef137f8b53303968202970225" + digest = "1:c3fd5ddaad733530174bba5dd787d98a45d181851a95a0b7362be7bce7144f56" name = "contrib.go.opencensus.io/exporter/stackdriver" packages = [ ".", "monitoredresource", ] pruneopts = "NUT" - revision = "68e3d742b03c099c35428443886e65d9587c8d76" - version = "v0.12.2" + revision = "59d068f8d8ff5b653916aa30cdc4e13c7f15d56e" [[projects]] digest = "1:7b5f423f5b0dd3dfa32a19a6183b0ab9129bff371ebf3f9efae32f87e4986d8f" @@ -1354,6 +1353,7 @@ "github.com/rogpeppe/go-internal/semver", "github.com/spf13/pflag", "github.com/tsenart/vegeta/lib", + "go.opencensus.io/metric/metricdata", "go.opencensus.io/plugin/ochttp", "go.opencensus.io/plugin/ochttp/propagation/b3", "go.opencensus.io/stats", diff --git a/Gopkg.toml b/Gopkg.toml index 5d41f9a42..8964cd599 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -75,9 +75,8 @@ required = [ [[constraint]] name = "contrib.go.opencensus.io/exporter/stackdriver" - # The build fails against 0.12.6 and newer because - # stackdriver.Options.GetMonitoredResource was removed. - version = "<=v0.12.5" + # With the fix of https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/237 + revision = "59d068f8d8ff5b653916aa30cdc4e13c7f15d56e" [[constraint]] name = "github.com/google/mako" diff --git a/metrics/exporter.go b/metrics/exporter.go index ddbb6b7d1..1a697c15c 100644 --- a/metrics/exporter.go +++ b/metrics/exporter.go @@ -33,6 +33,11 @@ type flushable interface { Flush() } +type stoppable interface { + // StopMetricsExporter stops the exporter + StopMetricsExporter() +} + // ExporterOptions contains options for configuring the exporter. type ExporterOptions struct { // Domain is the metrics domain. e.g. "knative.dev". Must be present. @@ -126,11 +131,12 @@ func newMetricsExporter(config *metricsConfig, logger *zap.SugaredLogger) (view. // If there is a Prometheus Exporter server running, stop it. resetCurPromSrv() - if ce != nil { - // UnregisterExporter is idempotent and it can be called multiple times for the same exporter - // without side effects. - view.UnregisterExporter(ce) + // TODO(https://github.com/knative/pkg/issues/866): Move Stackdriver and Promethus + // operations before stopping to an interface. + if se, ok := ce.(stoppable); ok { + se.StopMetricsExporter() } + var err error var e view.Exporter switch config.backendDestination { @@ -156,7 +162,6 @@ func getCurMetricsExporter() view.Exporter { func setCurMetricsExporter(e view.Exporter) { metricsMux.Lock() defer metricsMux.Unlock() - view.RegisterExporter(e) curMetricsExporter = e } diff --git a/metrics/exporter_test.go b/metrics/exporter_test.go index ea52b5bd9..7bd8bee53 100644 --- a/metrics/exporter_test.go +++ b/metrics/exporter_test.go @@ -18,11 +18,8 @@ import ( "testing" "time" - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" "go.opencensus.io/tag" . "knative.dev/pkg/logging/testing" - "knative.dev/pkg/metrics/metricskey" ) // TODO UTs should move to eventing and serving, as appropriate. @@ -44,35 +41,6 @@ const ( testSourceResourceGroup = "test-source-rg" ) -var ( - testView = &view.View{ - Description: "Test View", - Measure: stats.Int64("test", "Test Measure", stats.UnitNone), - Aggregation: view.LastValue(), - TagKeys: []tag.Key{}, - } - - nsKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelNamespaceName), Value: testNS} - - serviceKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelServiceName), Value: testService} - routeKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelRouteName), Value: testRoute} - revisionKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelRevisionName), Value: testRevision} - - brokerKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelBrokerName), Value: testBroker} - triggerKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelTriggerName), Value: testTrigger} - filterTypeKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelFilterType), Value: testFilterType} - sourceKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelName), Value: testSource} - sourceResourceGroupKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelResourceGroup), Value: testSourceResourceGroup} - eventTypeKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelEventType), Value: testEventType} - eventSourceKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelEventSource), Value: testEventSource} - - revisionTestTags = []tag.Tag{nsKey, serviceKey, routeKey, revisionKey} - - brokerTestTags = []tag.Tag{nsKey, brokerKey, eventTypeKey} - triggerTestTags = []tag.Tag{nsKey, triggerKey, brokerKey, filterTypeKey} - sourceTestTags = []tag.Tag{nsKey, sourceKey, sourceResourceGroupKey, eventTypeKey, eventSourceKey} -) - func mustNewTagKey(s string) tag.Key { tagKey, err := tag.NewKey(s) if err != nil { @@ -81,15 +49,6 @@ func mustNewTagKey(s string) tag.Key { return tagKey } -func getResourceLabelValue(key string, tags []tag.Tag) string { - for _, t := range tags { - if t.Key.Name() == key { - return t.Value - } - } - return "" -} - func TestMain(m *testing.M) { resetCurPromSrv() // Set gcpMetadataFunc and newStackdriverExporterFunc for testing diff --git a/metrics/monitored_resources_eventing.go b/metrics/monitored_resources_eventing.go index 87c2b55a9..32742e773 100644 --- a/metrics/monitored_resources_eventing.go +++ b/metrics/monitored_resources_eventing.go @@ -20,8 +20,7 @@ package metrics import ( "contrib.go.opencensus.io/exporter/stackdriver/monitoredresource" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" + "go.opencensus.io/metric/metricdata" "knative.dev/pkg/metrics/metricskey" ) @@ -89,75 +88,72 @@ func (ki *KnativeSource) MonitoredResource() (resType string, labels map[string] } func GetKnativeBrokerMonitoredResource( - v *view.View, tags []tag.Tag, gm *gcpMetadata) ([]tag.Tag, monitoredresource.Interface) { - tagsMap := getTagsMap(tags) + des *metricdata.Descriptor, tags map[string]string, gm *gcpMetadata) (map[string]string, monitoredresource.Interface) { kb := &KnativeBroker{ // The first three resource labels are from metadata. Project: gm.project, Location: gm.location, ClusterName: gm.cluster, // The rest resource labels are from metrics labels. - NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap), - BrokerName: valueOrUnknown(metricskey.LabelBrokerName, tagsMap), + NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tags), + BrokerName: valueOrUnknown(metricskey.LabelBrokerName, tags), } - var newTags []tag.Tag - for _, t := range tags { + metricLabels := map[string]string{} + for k, v := range tags { // Keep the metrics labels that are not resource labels - if !metricskey.KnativeBrokerLabels.Has(t.Key.Name()) { - newTags = append(newTags, t) + if !metricskey.KnativeBrokerLabels.Has(k) { + metricLabels[k] = v } } - return newTags, kb + return metricLabels, kb } func GetKnativeTriggerMonitoredResource( - v *view.View, tags []tag.Tag, gm *gcpMetadata) ([]tag.Tag, monitoredresource.Interface) { - tagsMap := getTagsMap(tags) + des *metricdata.Descriptor, tags map[string]string, gm *gcpMetadata) (map[string]string, monitoredresource.Interface) { kt := &KnativeTrigger{ // The first three resource labels are from metadata. Project: gm.project, Location: gm.location, ClusterName: gm.cluster, // The rest resource labels are from metrics labels. - NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap), - BrokerName: valueOrUnknown(metricskey.LabelBrokerName, tagsMap), - TriggerName: valueOrUnknown(metricskey.LabelTriggerName, tagsMap), + NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tags), + BrokerName: valueOrUnknown(metricskey.LabelBrokerName, tags), + TriggerName: valueOrUnknown(metricskey.LabelTriggerName, tags), } - var newTags []tag.Tag - for _, t := range tags { + metricLabels := map[string]string{} + for k, v := range tags { // Keep the metrics labels that are not resource labels - if !metricskey.KnativeTriggerLabels.Has(t.Key.Name()) { - newTags = append(newTags, t) + if !metricskey.KnativeTriggerLabels.Has(k) { + metricLabels[k] = v } } - return newTags, kt + return metricLabels, kt } func GetKnativeSourceMonitoredResource( - v *view.View, tags []tag.Tag, gm *gcpMetadata) ([]tag.Tag, monitoredresource.Interface) { - tagsMap := getTagsMap(tags) - ki := &KnativeSource{ + des *metricdata.Descriptor, tags map[string]string, gm *gcpMetadata) (map[string]string, monitoredresource.Interface) { + ks := &KnativeSource{ // The first three resource labels are from metadata. Project: gm.project, Location: gm.location, ClusterName: gm.cluster, // The rest resource labels are from metrics labels. - NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap), - SourceName: valueOrUnknown(metricskey.LabelName, tagsMap), - SourceResourceGroup: valueOrUnknown(metricskey.LabelResourceGroup, tagsMap), + NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tags), + SourceName: valueOrUnknown(metricskey.LabelName, tags), + SourceResourceGroup: valueOrUnknown(metricskey.LabelResourceGroup, tags), } - var newTags []tag.Tag - for _, t := range tags { + metricLabels := map[string]string{} + for k, v := range tags { // Keep the metrics labels that are not resource labels - if !metricskey.KnativeSourceLabels.Has(t.Key.Name()) { - newTags = append(newTags, t) + if !metricskey.KnativeSourceLabels.Has(k) { + metricLabels[k] = v } } - return newTags, ki + return metricLabels, ks } diff --git a/metrics/monitored_resources_serving.go b/metrics/monitored_resources_serving.go index b2a1d33f5..eae42408b 100644 --- a/metrics/monitored_resources_serving.go +++ b/metrics/monitored_resources_serving.go @@ -18,8 +18,7 @@ package metrics import ( "contrib.go.opencensus.io/exporter/stackdriver/monitoredresource" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" + "go.opencensus.io/metric/metricdata" "knative.dev/pkg/metrics/metricskey" ) @@ -49,27 +48,26 @@ func (kr *KnativeRevision) MonitoredResource() (resType string, labels map[strin } func GetKnativeRevisionMonitoredResource( - v *view.View, tags []tag.Tag, gm *gcpMetadata) ([]tag.Tag, monitoredresource.Interface) { - tagsMap := getTagsMap(tags) + des *metricdata.Descriptor, tags map[string]string, gm *gcpMetadata) (map[string]string, monitoredresource.Interface) { kr := &KnativeRevision{ // The first three resource labels are from metadata. Project: gm.project, Location: gm.location, ClusterName: gm.cluster, // The rest resource labels are from metrics labels. - NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap), - ServiceName: valueOrUnknown(metricskey.LabelServiceName, tagsMap), - ConfigurationName: valueOrUnknown(metricskey.LabelConfigurationName, tagsMap), - RevisionName: valueOrUnknown(metricskey.LabelRevisionName, tagsMap), + NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tags), + ServiceName: valueOrUnknown(metricskey.LabelServiceName, tags), + ConfigurationName: valueOrUnknown(metricskey.LabelConfigurationName, tags), + RevisionName: valueOrUnknown(metricskey.LabelRevisionName, tags), } - var newTags []tag.Tag - for _, t := range tags { + metricLabels := map[string]string{} + for k, v := range tags { // Keep the metrics labels that are not resource labels - if !metricskey.KnativeRevisionLabels.Has(t.Key.Name()) { - newTags = append(newTags, t) + if !metricskey.KnativeRevisionLabels.Has(k) { + metricLabels[k] = v } } - return newTags, kr + return metricLabels, kr } diff --git a/metrics/stackdriver_exporter.go b/metrics/stackdriver_exporter.go index ff5deb581..90122f6fd 100644 --- a/metrics/stackdriver_exporter.go +++ b/metrics/stackdriver_exporter.go @@ -23,8 +23,8 @@ import ( "contrib.go.opencensus.io/exporter/stackdriver" "contrib.go.opencensus.io/exporter/stackdriver/monitoredresource" + "go.opencensus.io/metric/metricdata" "go.opencensus.io/stats/view" - "go.opencensus.io/tag" "go.uber.org/zap" "google.golang.org/api/option" "knative.dev/pkg/metrics/metricskey" @@ -101,14 +101,20 @@ func init() { } func newOpencensusSDExporter(o stackdriver.Options) (view.Exporter, error) { - return stackdriver.NewExporter(o) + e, err := stackdriver.NewExporter(o) + if err == nil { + // Start the exporter. + // TODO(https://github.com/knative/pkg/issues/866): Move this to an interface. + e.StartMetricsExporter() + } + return e, nil } -// TODO should be properly refactored to be able to inject the getMonitoredResourceFunc function. +// TODO should be properly refactored to be able to inject the getResourceByDescriptorFunc function. // See https://github.com/knative/pkg/issues/608 func newStackdriverExporter(config *metricsConfig, logger *zap.SugaredLogger) (view.Exporter, error) { gm := getMergedGCPMetadata(config) - mtf := getMetricTypeFunc(config.stackdriverMetricTypePrefix, config.stackdriverCustomMetricTypePrefix) + mpf := getMetricPrefixFunc(config.stackdriverMetricTypePrefix, config.stackdriverCustomMetricTypePrefix) co, err := getStackdriverExporterClientOptions(&config.stackdriverClientConfig) if err != nil { logger.Warnw("Issue configuring Stackdriver exporter client options, no additional client options will be used: ", zap.Error(err)) @@ -119,9 +125,9 @@ func newStackdriverExporter(config *metricsConfig, logger *zap.SugaredLogger) (v Location: gm.location, MonitoringClientOptions: co, TraceClientOptions: co, - GetMetricDisplayName: mtf, // Use metric type for display name for custom metrics. No impact on built-in metrics. - GetMetricType: mtf, - GetMonitoredResource: getMonitoredResourceFunc(config.stackdriverMetricTypePrefix, gm), + GetMetricPrefix: mpf, + ResourceByDescriptor: getResourceByDescriptorFunc(config.stackdriverMetricTypePrefix, gm), + ReportingInterval: config.reportingPeriod, DefaultMonitoringLabels: &stackdriver.Labels{}, }) if err != nil { @@ -172,39 +178,39 @@ func getMergedGCPMetadata(config *metricsConfig) *gcpMetadata { return gm } -func getMonitoredResourceFunc(metricTypePrefix string, gm *gcpMetadata) func(v *view.View, tags []tag.Tag) ([]tag.Tag, monitoredresource.Interface) { - return func(view *view.View, tags []tag.Tag) ([]tag.Tag, monitoredresource.Interface) { - metricType := path.Join(metricTypePrefix, view.Measure.Name()) +func getResourceByDescriptorFunc(metricTypePrefix string, gm *gcpMetadata) func(*metricdata.Descriptor, map[string]string) (map[string]string, monitoredresource.Interface) { + return func(des *metricdata.Descriptor, tags map[string]string) (map[string]string, monitoredresource.Interface) { + metricType := path.Join(metricTypePrefix, des.Name) if metricskey.KnativeRevisionMetrics.Has(metricType) { - return GetKnativeRevisionMonitoredResource(view, tags, gm) + return GetKnativeRevisionMonitoredResource(des, tags, gm) } else if metricskey.KnativeBrokerMetrics.Has(metricType) { - return GetKnativeBrokerMonitoredResource(view, tags, gm) + return GetKnativeBrokerMonitoredResource(des, tags, gm) } else if metricskey.KnativeTriggerMetrics.Has(metricType) { - return GetKnativeTriggerMonitoredResource(view, tags, gm) + return GetKnativeTriggerMonitoredResource(des, tags, gm) } else if metricskey.KnativeSourceMetrics.Has(metricType) { - return GetKnativeSourceMonitoredResource(view, tags, gm) + return GetKnativeSourceMonitoredResource(des, tags, gm) } // Unsupported metric by knative_revision, knative_broker, knative_trigger, and knative_source, use "global" resource type. - return getGlobalMonitoredResource(view, tags) + return getGlobalMonitoredResource(des, tags) } } -func getGlobalMonitoredResource(v *view.View, tags []tag.Tag) ([]tag.Tag, monitoredresource.Interface) { +func getGlobalMonitoredResource(des *metricdata.Descriptor, tags map[string]string) (map[string]string, monitoredresource.Interface) { return tags, &Global{} } -func getMetricTypeFunc(metricTypePrefix, customMetricTypePrefix string) func(view *view.View) string { - return func(view *view.View) string { - metricType := path.Join(metricTypePrefix, view.Measure.Name()) +func getMetricPrefixFunc(metricTypePrefix, customMetricTypePrefix string) func(name string) string { + return func(name string) string { + metricType := path.Join(metricTypePrefix, name) inServing := metricskey.KnativeRevisionMetrics.Has(metricType) inEventing := metricskey.KnativeBrokerMetrics.Has(metricType) || metricskey.KnativeTriggerMetrics.Has(metricType) || metricskey.KnativeSourceMetrics.Has(metricType) if inServing || inEventing { - return metricType + return metricTypePrefix } // Unsupported metric by knative_revision, use custom domain. - return path.Join(customMetricTypePrefix, view.Measure.Name()) + return customMetricTypePrefix } } diff --git a/metrics/stackdriver_exporter_test.go b/metrics/stackdriver_exporter_test.go index b508d99cf..6f4663d03 100644 --- a/metrics/stackdriver_exporter_test.go +++ b/metrics/stackdriver_exporter_test.go @@ -22,9 +22,8 @@ import ( "time" "contrib.go.opencensus.io/exporter/stackdriver" - "go.opencensus.io/stats" + "go.opencensus.io/metric/metricdata" "go.opencensus.io/stats/view" - "go.opencensus.io/tag" . "knative.dev/pkg/logging/testing" "knative.dev/pkg/metrics/metricskey" ) @@ -33,6 +32,31 @@ import ( // See https://github.com/knative/pkg/issues/608 var ( + revisionTestTags = map[string]string{ + metricskey.LabelNamespaceName: testNS, + metricskey.LabelServiceName: testService, + metricskey.LabelRouteName: testRoute, // Not a label key for knative_revision resource + metricskey.LabelRevisionName: testRevision, + } + brokerTestTags = map[string]string{ + metricskey.LabelNamespaceName: testNS, + metricskey.LabelBrokerName: testBroker, + metricskey.LabelEventType: testEventType, // Not a label key for knative_broker resource + } + triggerTestTags = map[string]string{ + metricskey.LabelNamespaceName: testNS, + metricskey.LabelTriggerName: testTrigger, + metricskey.LabelBrokerName: testBroker, + metricskey.LabelFilterType: testFilterType, // Not a label key for knative_trigger resource + } + sourceTestTags = map[string]string{ + metricskey.LabelNamespaceName: testNS, + metricskey.LabelName: testSource, + metricskey.LabelResourceGroup: testSourceResourceGroup, + metricskey.LabelEventType: testEventType, // Not a label key for knative_source resource + metricskey.LabelEventSource: testEventSource, // Not a label key for knative_source resource + } + testGcpMetadata = gcpMetadata{ project: "test-project", location: "test-location", @@ -148,200 +172,175 @@ func newFakeExporter(o stackdriver.Options) (view.Exporter, error) { return &fakeExporter{}, nil } -func TestGetMonitoredResourceFunc_UseKnativeRevision(t *testing.T) { +func TestGetResourceByDescriptorFunc_UseKnativeRevision(t *testing.T) { for _, testCase := range supportedServingMetricsTestCases { - testView = &view.View{ + testDescriptor := &metricdata.Descriptor{ + Name: testCase.metricName, Description: "Test View", - Measure: stats.Int64(testCase.metricName, "Test Measure", stats.UnitNone), - Aggregation: view.LastValue(), - TagKeys: []tag.Key{}, + Type: metricdata.TypeGaugeInt64, + Unit: metricdata.UnitDimensionless, } - mrf := getMonitoredResourceFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata) + rbd := getResourceByDescriptorFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata) - newTags, monitoredResource := mrf(testView, revisionTestTags) - gotResType, labels := monitoredResource.MonitoredResource() + metricLabels, monitoredResource := rbd(testDescriptor, revisionTestTags) + gotResType, resourceLabels := monitoredResource.MonitoredResource() wantedResType := "knative_revision" if gotResType != wantedResType { t.Fatalf("MonitoredResource=%v, want %v", gotResType, wantedResType) } - got := getResourceLabelValue(metricskey.LabelRouteName, newTags) - if got != testRoute { - t.Errorf("expected new tag: %v, got: %v", routeKey, newTags) + // revisionTestTags includes route_name, which is not a key for knative_revision resource. + if got := metricLabels[metricskey.LabelRouteName]; got != testRoute { + t.Errorf("expected metrics label: %v, got: %v", testRoute, got) } - got, ok := labels[metricskey.LabelNamespaceName] - if !ok || got != testNS { - t.Errorf("expected label %v with value %v, got: %v", metricskey.LabelNamespaceName, testNS, got) + if got := resourceLabels[metricskey.LabelNamespaceName]; got != testNS { + t.Errorf("expected resource label %v with value %v, got: %v", metricskey.LabelNamespaceName, testNS, got) } - got, ok = labels[metricskey.LabelConfigurationName] - if !ok || got != metricskey.ValueUnknown { - t.Errorf("expected label %v with value %v, got: %v", metricskey.LabelConfigurationName, metricskey.ValueUnknown, got) + // configuration_name is a key required by knative_revision but missed in revisionTestTags + if got := resourceLabels[metricskey.LabelConfigurationName]; got != metricskey.ValueUnknown { + t.Errorf("expected resource label %v with value %v, got: %v", metricskey.LabelConfigurationName, metricskey.ValueUnknown, got) } } } -func TestGetMonitoredResourceFunc_UseKnativeBroker(t *testing.T) { +func TestGetResourceByDescriptorFunc_UseKnativeBroker(t *testing.T) { for _, testCase := range supportedEventingBrokerMetricsTestCases { - testView = &view.View{ + testDescriptor := &metricdata.Descriptor{ + Name: testCase.metricName, Description: "Test View", - Measure: stats.Int64(testCase.metricName, "Test Measure", stats.UnitDimensionless), - Aggregation: view.LastValue(), - TagKeys: []tag.Key{}, + Type: metricdata.TypeGaugeInt64, + Unit: metricdata.UnitDimensionless, } - mrf := getMonitoredResourceFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata) + rbd := getResourceByDescriptorFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata) - newTags, monitoredResource := mrf(testView, brokerTestTags) - gotResType, labels := monitoredResource.MonitoredResource() + metricLabels, monitoredResource := rbd(testDescriptor, brokerTestTags) + gotResType, resourceLabels := monitoredResource.MonitoredResource() wantedResType := "knative_broker" if gotResType != wantedResType { t.Fatalf("MonitoredResource=%v, want %v", gotResType, wantedResType) } - got := getResourceLabelValue(metricskey.LabelEventType, newTags) - if got != testEventType { - t.Errorf("expected new tag: %v, got: %v", eventTypeKey, newTags) + // brokerTestTags includes event_type, which is not a key for knative_broker resource. + if got := metricLabels[metricskey.LabelEventType]; got != testEventType { + t.Errorf("expected metrics label: %v, got: %v", testEventType, got) } - got, ok := labels[metricskey.LabelNamespaceName] - if !ok || got != testNS { - t.Errorf("expected label %v with value %v, got: %v", metricskey.LabelNamespaceName, testNS, got) + if got := resourceLabels[metricskey.LabelNamespaceName]; got != testNS { + t.Errorf("expected resource label %v with value %v, got: %v", metricskey.LabelNamespaceName, testNS, got) } - got, ok = labels[metricskey.LabelBrokerName] - if !ok || got != testBroker { - t.Errorf("expected label %v with value %v, got: %v", metricskey.LabelBrokerName, testBroker, got) + if got := resourceLabels[metricskey.LabelBrokerName]; got != testBroker { + t.Errorf("expected resource label %v with value %v, got: %v", metricskey.LabelBrokerName, testBroker, got) } } } -func TestGetMonitoredResourceFunc_UseKnativeTrigger(t *testing.T) { +func TestGetResourceByDescriptorFunc_UseKnativeTrigger(t *testing.T) { for _, testCase := range supportedEventingTriggerMetricsTestCases { - testView = &view.View{ + testDescriptor := &metricdata.Descriptor{ + Name: testCase.metricName, Description: "Test View", - Measure: stats.Int64(testCase.metricName, "Test Measure", stats.UnitDimensionless), - Aggregation: view.LastValue(), - TagKeys: []tag.Key{}, + Type: metricdata.TypeGaugeInt64, + Unit: metricdata.UnitDimensionless, } - mrf := getMonitoredResourceFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata) + rbd := getResourceByDescriptorFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata) - newTags, monitoredResource := mrf(testView, triggerTestTags) - gotResType, labels := monitoredResource.MonitoredResource() + metricLabels, monitoredResource := rbd(testDescriptor, triggerTestTags) + gotResType, resourceLabels := monitoredResource.MonitoredResource() wantedResType := "knative_trigger" if gotResType != wantedResType { t.Fatalf("MonitoredResource=%v, want %v", gotResType, wantedResType) } - got := getResourceLabelValue(metricskey.LabelFilterType, newTags) - if got != testFilterType { - t.Errorf("expected new tag: %v, got: %v", filterTypeKey, newTags) + // triggerTestTags includes filter_type, which is not a key for knative_trigger resource. + if got := metricLabels[metricskey.LabelFilterType]; got != testFilterType { + t.Errorf("expected metrics label: %v, got: %v", testFilterType, got) } - got, ok := labels[metricskey.LabelNamespaceName] - if !ok || got != testNS { - t.Errorf("expected label %v with value %v, got: %v", metricskey.LabelNamespaceName, testNS, got) + if got := resourceLabels[metricskey.LabelNamespaceName]; got != testNS { + t.Errorf("expected resource label %v with value %v, got: %v", metricskey.LabelNamespaceName, testNS, got) } - got, ok = labels[metricskey.LabelBrokerName] - if !ok || got != testBroker { - t.Errorf("expected label %v with value %v, got: %v", metricskey.LabelBrokerName, testBroker, got) + if got := resourceLabels[metricskey.LabelBrokerName]; got != testBroker { + t.Errorf("expected resource label %v with value %v, got: %v", metricskey.LabelBrokerName, testBroker, got) } } } -func TestGetMonitoredResourceFunc_UseKnativeSource(t *testing.T) { +func TestGetResourceByDescriptorFunc_UseKnativeSource(t *testing.T) { for _, testCase := range supportedEventingSourceMetricsTestCases { - testView = &view.View{ + testDescriptor := &metricdata.Descriptor{ + Name: testCase.metricName, Description: "Test View", - Measure: stats.Int64(testCase.metricName, "Test Measure", stats.UnitDimensionless), - Aggregation: view.LastValue(), - TagKeys: []tag.Key{}, + Type: metricdata.TypeGaugeInt64, + Unit: metricdata.UnitDimensionless, } - mrf := getMonitoredResourceFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata) + rbd := getResourceByDescriptorFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata) - newTags, monitoredResource := mrf(testView, sourceTestTags) - gotResType, labels := monitoredResource.MonitoredResource() + metricLabels, monitoredResource := rbd(testDescriptor, sourceTestTags) + gotResType, resourceLabels := monitoredResource.MonitoredResource() wantedResType := "knative_source" if gotResType != wantedResType { t.Fatalf("MonitoredResource=%v, want %v", gotResType, wantedResType) } - got := getResourceLabelValue(metricskey.LabelEventType, newTags) - if got != testEventType { - t.Errorf("expected new tag: %v, got: %v", eventTypeKey, newTags) + // sourceTestTags includes event_type, which is not a key for knative_trigger resource. + if got := metricLabels[metricskey.LabelEventType]; got != testEventType { + t.Errorf("expected metrics label: %v, got: %v", testEventType, got) } - got = getResourceLabelValue(metricskey.LabelEventSource, newTags) - if got != testEventSource { - t.Errorf("expected new tag: %v, got: %v", eventSourceKey, newTags) + // sourceTestTags includes event_source, which is not a key for knative_trigger resource. + if got := metricLabels[metricskey.LabelEventSource]; got != testEventSource { + t.Errorf("expected metrics label: %v, got: %v", testEventSource, got) } - got, ok := labels[metricskey.LabelNamespaceName] - if !ok || got != testNS { - t.Errorf("expected label %v with value %v, got: %v", metricskey.LabelNamespaceName, testNS, got) + if got := resourceLabels[metricskey.LabelNamespaceName]; got != testNS { + t.Errorf("expected resource label %v with value %v, got: %v", metricskey.LabelNamespaceName, testNS, got) } - got, ok = labels[metricskey.LabelName] - if !ok || got != testSource { - t.Errorf("expected label %v with value %v, got: %v", metricskey.LabelName, testSource, got) + if got := resourceLabels[metricskey.LabelName]; got != testSource { + t.Errorf("expected resource label %v with value %v, got: %v", metricskey.LabelName, testSource, got) } - got, ok = labels[metricskey.LabelResourceGroup] - if !ok || got != testSourceResourceGroup { - t.Errorf("expected label %v with value %v, got: %v", metricskey.LabelResourceGroup, testSourceResourceGroup, got) + if got := resourceLabels[metricskey.LabelResourceGroup]; got != testSourceResourceGroup { + t.Errorf("expected resource label %v with value %v, got: %v", metricskey.LabelResourceGroup, testSourceResourceGroup, got) } } } -func TestGetMonitoredResourceFunc_UseGlobal(t *testing.T) { +func TestGetResourceByDescriptorFunc_UseGlobal(t *testing.T) { for _, testCase := range unsupportedMetricsTestCases { - testView = &view.View{ + testDescriptor := &metricdata.Descriptor{ + Name: testCase.metricName, Description: "Test View", - Measure: stats.Int64(testCase.metricName, "Test Measure", stats.UnitNone), - Aggregation: view.LastValue(), - TagKeys: []tag.Key{}, + Type: metricdata.TypeGaugeInt64, + Unit: metricdata.UnitDimensionless, } - mrf := getMonitoredResourceFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata) + mrf := getResourceByDescriptorFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata) - newTags, monitoredResource := mrf(testView, revisionTestTags) - gotResType, labels := monitoredResource.MonitoredResource() + metricLabels, monitoredResource := mrf(testDescriptor, revisionTestTags) + gotResType, resourceLabels := monitoredResource.MonitoredResource() wantedResType := "global" if gotResType != wantedResType { t.Fatalf("MonitoredResource=%v, want: %v", gotResType, wantedResType) } - got := getResourceLabelValue(metricskey.LabelNamespaceName, newTags) - if got != testNS { - t.Errorf("expected new tag %v with value %v, got: %v", routeKey, testNS, newTags) + if got := metricLabels[metricskey.LabelNamespaceName]; got != testNS { + t.Errorf("expected new tag %v with value %v, got: %v", metricskey.LabelNamespaceName, testNS, got) } - if len(labels) != 0 { - t.Errorf("expected no label, got: %v", labels) + if len(resourceLabels) != 0 { + t.Errorf("expected no label, got: %v", resourceLabels) } } } -func TestGetMetricTypeFunc_UseKnativeDomain(t *testing.T) { +func TestGetMetricPrefixFunc_UseKnativeDomain(t *testing.T) { for _, testCase := range supportedServingMetricsTestCases { - testView = &view.View{ - Description: "Test View", - Measure: stats.Int64(testCase.metricName, "Test Measure", stats.UnitNone), - Aggregation: view.LastValue(), - TagKeys: []tag.Key{}, - } - mtf := getMetricTypeFunc( - path.Join(testCase.domain, testCase.component), - path.Join(defaultCustomMetricSubDomain, testCase.component)) + knativePrefix := path.Join(testCase.domain, testCase.component) + customPrefix := path.Join(defaultCustomMetricSubDomain, testCase.component) + mpf := getMetricPrefixFunc(knativePrefix, customPrefix) - gotMetricType := mtf(testView) - wantedMetricType := path.Join(testCase.domain, testCase.component, testView.Measure.Name()) - if gotMetricType != wantedMetricType { - t.Fatalf("getMetricType=%v, want %v", gotMetricType, wantedMetricType) + if got, want := mpf(testCase.metricName), knativePrefix; got != want { + t.Fatalf("getMetricPrefixFunc=%v, want %v", got, want) } } } -func TestGetgetMetricTypeFunc_UseCustomDomain(t *testing.T) { +func TestGetMetricPrefixFunc_UseCustomDomain(t *testing.T) { for _, testCase := range unsupportedMetricsTestCases { - testView = &view.View{ - Description: "Test View", - Measure: stats.Int64(testCase.metricName, "Test Measure", stats.UnitNone), - Aggregation: view.LastValue(), - TagKeys: []tag.Key{}, - } - mtf := getMetricTypeFunc( - path.Join(testCase.domain, testCase.component), - path.Join(defaultCustomMetricSubDomain, testCase.component)) + knativePrefix := path.Join(testCase.domain, testCase.component) + customPrefix := path.Join(defaultCustomMetricSubDomain, testCase.component) + mpf := getMetricPrefixFunc(knativePrefix, customPrefix) - gotMetricType := mtf(testView) - wantedMetricType := path.Join(defaultCustomMetricSubDomain, testCase.component, testView.Measure.Name()) - if gotMetricType != wantedMetricType { - t.Fatalf("getMetricType=%v, want %v", gotMetricType, wantedMetricType) + if got, want := mpf(testCase.metricName), customPrefix; got != want { + t.Fatalf("getMetricPrefixFunc=%v, want %v", got, want) } } } diff --git a/vendor/contrib.go.opencensus.io/exporter/stackdriver/metrics.go b/vendor/contrib.go.opencensus.io/exporter/stackdriver/metrics.go index 0e69eb81b..a2df93f05 100644 --- a/vendor/contrib.go.opencensus.io/exporter/stackdriver/metrics.go +++ b/vendor/contrib.go.opencensus.io/exporter/stackdriver/metrics.go @@ -21,8 +21,9 @@ directly to Stackdriver Metrics. import ( "context" - "errors" "fmt" + "strings" + "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/any" "github.com/golang/protobuf/ptypes/timestamp" @@ -34,15 +35,11 @@ import ( monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" + "contrib.go.opencensus.io/exporter/stackdriver/monitoredresource" "go.opencensus.io/metric/metricdata" "go.opencensus.io/resource" ) -var ( - errLableExtraction = errors.New("error extracting labels") - errUnspecifiedMetricKind = errors.New("metric kind is unpsecified") -) - const ( exemplarAttachmentTypeString = "type.googleapis.com/google.protobuf.StringValue" exemplarAttachmentTypeSpanCtx = "type.googleapis.com/google.monitoring.v3.SpanContext" @@ -73,9 +70,11 @@ func (se *statsExporter) handleMetricsUpload(metrics []*metricdata.Metric) { } func (se *statsExporter) uploadMetrics(metrics []*metricdata.Metric) error { - ctx, cancel := se.o.newContextWithTimeout() + ctx, cancel := newContextWithTimeout(se.o.Context, se.o.Timeout) defer cancel() + var errors []error + ctx, span := trace.StartSpan( ctx, "contrib.go.opencensus.io/exporter/stackdriver.uploadMetrics", @@ -87,7 +86,7 @@ func (se *statsExporter) uploadMetrics(metrics []*metricdata.Metric) error { // Now create the metric descriptor remotely. if err := se.createMetricDescriptorFromMetric(ctx, metric); err != nil { span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) - //TODO: [rghetia] record error metrics. + errors = append(errors, err) continue } } @@ -97,7 +96,7 @@ func (se *statsExporter) uploadMetrics(metrics []*metricdata.Metric) error { tsl, err := se.metricToMpbTs(ctx, metric) if err != nil { span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) - //TODO: [rghetia] record error metrics. + errors = append(errors, err) continue } if tsl != nil { @@ -116,26 +115,35 @@ func (se *statsExporter) uploadMetrics(metrics []*metricdata.Metric) error { for _, ctsreq := range ctsreql { if err := createTimeSeries(ctx, se.c, ctsreq); err != nil { span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) - // TODO(@rghetia): record error metrics - // return err + errors = append(errors, err) } } } - return nil + numErrors := len(errors) + if numErrors == 0 { + return nil + } else if numErrors == 1 { + return errors[0] + } + errMsgs := make([]string, 0, numErrors) + for _, err := range errors { + errMsgs = append(errMsgs, err.Error()) + } + return fmt.Errorf("[%s]", strings.Join(errMsgs, "; ")) } // metricToMpbTs converts a metric into a list of Stackdriver Monitoring v3 API TimeSeries // but it doesn't invoke any remote API. func (se *statsExporter) metricToMpbTs(ctx context.Context, metric *metricdata.Metric) ([]*monitoringpb.TimeSeries, error) { if metric == nil { - return nil, errNilMetric + return nil, errNilMetricOrMetricDescriptor } resource := se.metricRscToMpbRsc(metric.Resource) metricName := metric.Descriptor.Name - metricType, _ := se.metricTypeFromProto(metricName) + metricType := se.metricTypeFromProto(metricName) metricLabelKeys := metric.Descriptor.LabelKeys metricKind, _ := metricDescriptorTypeToMetricKind(metric) @@ -159,12 +167,26 @@ func (se *statsExporter) metricToMpbTs(ctx context.Context, metric *metricdata.M // TODO: (@rghetia) perhaps log this error from labels extraction, if non-nil. continue } + + var rsc *monitoredrespb.MonitoredResource + var mr monitoredresource.Interface + if se.o.ResourceByDescriptor != nil { + labels, mr = se.o.ResourceByDescriptor(&metric.Descriptor, labels) + // TODO(rghetia): optimize this. It is inefficient to convert this for all metrics. + rsc = convertMonitoredResourceToPB(mr) + if rsc.Type == "" { + rsc.Type = "global" + rsc.Labels = nil + } + } else { + rsc = resource + } timeSeries = append(timeSeries, &monitoringpb.TimeSeries{ Metric: &googlemetricpb.Metric{ Type: metricType, Labels: labels, }, - Resource: resource, + Resource: rsc, Points: sdPoints, }) } @@ -173,17 +195,21 @@ func (se *statsExporter) metricToMpbTs(ctx context.Context, metric *metricdata.M } func metricLabelsToTsLabels(defaults map[string]labelValue, labelKeys []metricdata.LabelKey, labelValues []metricdata.LabelValue) (map[string]string, error) { + // Perform this sanity check now. + if len(labelKeys) != len(labelValues) { + return nil, fmt.Errorf("length mismatch: len(labelKeys)=%d len(labelValues)=%d", len(labelKeys), len(labelValues)) + } + + if len(defaults)+len(labelKeys) == 0 { + return nil, nil + } + labels := make(map[string]string) // Fill in the defaults firstly, irrespective of if the labelKeys and labelValues are mismatched. for key, label := range defaults { labels[sanitize(key)] = label.val } - // Perform this sanity check now. - if len(labelKeys) != len(labelValues) { - return labels, fmt.Errorf("Length mismatch: len(labelKeys)=%d len(labelValues)=%d", len(labelKeys), len(labelValues)) - } - for i, labelKey := range labelKeys { labelValue := labelValues[i] labels[sanitize(labelKey.Key)] = labelValue.Value @@ -195,6 +221,11 @@ func metricLabelsToTsLabels(defaults map[string]labelValue, labelKeys []metricda // createMetricDescriptorFromMetric creates a metric descriptor from the OpenCensus metric // and then creates it remotely using Stackdriver's API. func (se *statsExporter) createMetricDescriptorFromMetric(ctx context.Context, metric *metricdata.Metric) error { + // Skip create metric descriptor if configured + if se.o.SkipCMD { + return nil + } + se.metricMu.Lock() defer se.metricMu.Unlock() @@ -203,6 +234,11 @@ func (se *statsExporter) createMetricDescriptorFromMetric(ctx context.Context, m return nil } + if builtinMetric(se.metricTypeFromProto(name)) { + se.metricDescriptors[name] = true + return nil + } + // Otherwise, we encountered a cache-miss and // should create the metric descriptor remotely. inMD, err := se.metricToMpbMetricDescriptor(metric) @@ -210,35 +246,21 @@ func (se *statsExporter) createMetricDescriptorFromMetric(ctx context.Context, m return err } - var md *googlemetricpb.MetricDescriptor - if builtinMetric(inMD.Type) { - gmrdesc := &monitoringpb.GetMetricDescriptorRequest{ - Name: inMD.Name, - } - md, err = getMetricDescriptor(ctx, se.c, gmrdesc) - } else { - - cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{ - Name: fmt.Sprintf("projects/%s", se.o.ProjectID), - MetricDescriptor: inMD, - } - md, err = createMetricDescriptor(ctx, se.c, cmrdesc) + if err = se.createMetricDescriptor(ctx, inMD); err != nil { + return err } - if err == nil { - // Now record the metric as having been created. - se.metricDescriptors[name] = md - } - - return err + // Now record the metric as having been created. + se.metricDescriptors[name] = true + return nil } func (se *statsExporter) metricToMpbMetricDescriptor(metric *metricdata.Metric) (*googlemetricpb.MetricDescriptor, error) { if metric == nil { - return nil, errNilMetric + return nil, errNilMetricOrMetricDescriptor } - metricType, _ := se.metricTypeFromProto(metric.Descriptor.Name) + metricType := se.metricTypeFromProto(metric.Descriptor.Name) displayName := se.displayName(metric.Descriptor.Name) metricKind, valueType := metricDescriptorTypeToMetricKind(metric) @@ -466,11 +488,9 @@ func metricExemplarToPbExemplar(exemplar *metricdata.Exemplar, projectID string) func attachmentsToPbAttachments(attachments metricdata.Attachments, projectID string) []*any.Any { var pbAttachments []*any.Any for _, v := range attachments { - switch v.(type) { - case trace.SpanContext: - spanCtx, _ := v.(trace.SpanContext) + if spanCtx, succ := v.(trace.SpanContext); succ { pbAttachments = append(pbAttachments, toPbSpanCtxAttachment(spanCtx, projectID)) - default: + } else { // Treat everything else as plain string for now. // TODO(songy23): add support for dropped label attachments. pbAttachments = append(pbAttachments, toPbStringAttachment(v)) diff --git a/vendor/contrib.go.opencensus.io/exporter/stackdriver/metrics_batcher.go b/vendor/contrib.go.opencensus.io/exporter/stackdriver/metrics_batcher.go new file mode 100644 index 000000000..ccd6ee4a6 --- /dev/null +++ b/vendor/contrib.go.opencensus.io/exporter/stackdriver/metrics_batcher.go @@ -0,0 +1,201 @@ +// Copyright 2019, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stackdriver + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + monitoring "cloud.google.com/go/monitoring/apiv3" + monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" +) + +const ( + minNumWorkers = 1 + minReqsChanSize = 5 +) + +type metricsBatcher struct { + projectName string + allTss []*monitoringpb.TimeSeries + allErrs []error + + // Counts all dropped TimeSeries by this metricsBatcher. + droppedTimeSeries int + + workers []*worker + // reqsChan, respsChan and wg are shared between metricsBatcher and worker goroutines. + reqsChan chan *monitoringpb.CreateTimeSeriesRequest + respsChan chan *response + wg *sync.WaitGroup +} + +func newMetricsBatcher(ctx context.Context, projectID string, numWorkers int, mc *monitoring.MetricClient, timeout time.Duration) *metricsBatcher { + if numWorkers < minNumWorkers { + numWorkers = minNumWorkers + } + workers := make([]*worker, 0, numWorkers) + reqsChanSize := numWorkers + if reqsChanSize < minReqsChanSize { + reqsChanSize = minReqsChanSize + } + reqsChan := make(chan *monitoringpb.CreateTimeSeriesRequest, reqsChanSize) + respsChan := make(chan *response, numWorkers) + var wg sync.WaitGroup + wg.Add(numWorkers) + for i := 0; i < numWorkers; i++ { + w := newWorker(ctx, mc, reqsChan, respsChan, &wg, timeout) + workers = append(workers, w) + go w.start() + } + return &metricsBatcher{ + projectName: fmt.Sprintf("projects/%s", projectID), + allTss: make([]*monitoringpb.TimeSeries, 0, maxTimeSeriesPerUpload), + droppedTimeSeries: 0, + workers: workers, + wg: &wg, + reqsChan: reqsChan, + respsChan: respsChan, + } +} + +func (mb *metricsBatcher) recordDroppedTimeseries(numTimeSeries int, errs ...error) { + mb.droppedTimeSeries += numTimeSeries + for _, err := range errs { + if err != nil { + mb.allErrs = append(mb.allErrs, err) + } + } +} + +func (mb *metricsBatcher) addTimeSeries(ts *monitoringpb.TimeSeries) { + mb.allTss = append(mb.allTss, ts) + if len(mb.allTss) == maxTimeSeriesPerUpload { + mb.sendReqToChan() + mb.allTss = make([]*monitoringpb.TimeSeries, 0, maxTimeSeriesPerUpload) + } +} + +func (mb *metricsBatcher) close(ctx context.Context) error { + // Send any remaining time series, must be <200 + if len(mb.allTss) > 0 { + mb.sendReqToChan() + } + + close(mb.reqsChan) + mb.wg.Wait() + for i := 0; i < len(mb.workers); i++ { + resp := <-mb.respsChan + mb.recordDroppedTimeseries(resp.droppedTimeSeries, resp.errs...) + } + close(mb.respsChan) + + numErrors := len(mb.allErrs) + if numErrors == 0 { + return nil + } + + if numErrors == 1 { + return mb.allErrs[0] + } + + errMsgs := make([]string, 0, numErrors) + for _, err := range mb.allErrs { + errMsgs = append(errMsgs, err.Error()) + } + return fmt.Errorf("[%s]", strings.Join(errMsgs, "; ")) +} + +// sendReqToChan grabs all the timeseies in this metricsBatcher, puts them +// to a CreateTimeSeriesRequest and sends the request to reqsChan. +func (mb *metricsBatcher) sendReqToChan() { + req := &monitoringpb.CreateTimeSeriesRequest{ + Name: mb.projectName, + TimeSeries: mb.allTss, + } + mb.reqsChan <- req +} + +// sendReq sends create time series requests to Stackdriver, +// and returns the count of dropped time series and error. +func sendReq(ctx context.Context, c *monitoring.MetricClient, req *monitoringpb.CreateTimeSeriesRequest) (int, error) { + if c != nil { // c==nil only happens in unit tests where we don't make real calls to Stackdriver server + err := createTimeSeries(ctx, c, req) + if err != nil { + return len(req.TimeSeries), err + } + } + return 0, nil +} + +type worker struct { + ctx context.Context + timeout time.Duration + mc *monitoring.MetricClient + + resp *response + + respsChan chan *response + reqsChan chan *monitoringpb.CreateTimeSeriesRequest + + wg *sync.WaitGroup +} + +func newWorker( + ctx context.Context, + mc *monitoring.MetricClient, + reqsChan chan *monitoringpb.CreateTimeSeriesRequest, + respsChan chan *response, + wg *sync.WaitGroup, + timeout time.Duration) *worker { + return &worker{ + ctx: ctx, + mc: mc, + resp: &response{}, + reqsChan: reqsChan, + respsChan: respsChan, + wg: wg, + } +} + +func (w *worker) start() { + for req := range w.reqsChan { + w.sendReqWithTimeout(req) + } + w.respsChan <- w.resp + w.wg.Done() +} + +func (w *worker) sendReqWithTimeout(req *monitoringpb.CreateTimeSeriesRequest) { + ctx, cancel := newContextWithTimeout(w.ctx, w.timeout) + defer cancel() + + w.recordDroppedTimeseries(sendReq(ctx, w.mc, req)) +} + +func (w *worker) recordDroppedTimeseries(numTimeSeries int, err error) { + w.resp.droppedTimeSeries += numTimeSeries + if err != nil { + w.resp.errs = append(w.resp.errs, err) + } +} + +type response struct { + droppedTimeSeries int + errs []error +} diff --git a/vendor/contrib.go.opencensus.io/exporter/stackdriver/metrics_proto.go b/vendor/contrib.go.opencensus.io/exporter/stackdriver/metrics_proto.go index efa2b525b..bcc1f0ee9 100644 --- a/vendor/contrib.go.opencensus.io/exporter/stackdriver/metrics_proto.go +++ b/vendor/contrib.go.opencensus.io/exporter/stackdriver/metrics_proto.go @@ -24,81 +24,74 @@ import ( "errors" "fmt" "path" - "sort" "strings" - "github.com/golang/protobuf/ptypes/timestamp" - "go.opencensus.io/stats" - "go.opencensus.io/trace" - - "cloud.google.com/go/monitoring/apiv3" - distributionpb "google.golang.org/genproto/googleapis/api/distribution" - labelpb "google.golang.org/genproto/googleapis/api/label" - googlemetricpb "google.golang.org/genproto/googleapis/api/metric" - monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" + "go.opencensus.io/resource" commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" - "go.opencensus.io/resource" + timestamppb "github.com/golang/protobuf/ptypes/timestamp" + distributionpb "google.golang.org/genproto/googleapis/api/distribution" + labelpb "google.golang.org/genproto/googleapis/api/label" + googlemetricpb "google.golang.org/genproto/googleapis/api/metric" + monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" + monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" ) -var errNilMetric = errors.New("expecting a non-nil metric") -var errNilMetricDescriptor = errors.New("expecting a non-nil metric descriptor") +var errNilMetricOrMetricDescriptor = errors.New("non-nil metric or metric descriptor") var percentileLabelKey = &metricspb.LabelKey{ Key: "percentile", Description: "the value at a given percentile of a distribution", } +var globalResource = &resource.Resource{Type: "global"} +var domains = []string{"googleapis.com", "kubernetes.io", "istio.io", "knative.dev"} -type metricProtoPayload struct { - node *commonpb.Node - resource *resourcepb.Resource - metric *metricspb.Metric - additionalLabels map[string]labelValue -} - -func (se *statsExporter) addPayload(node *commonpb.Node, rsc *resourcepb.Resource, labels map[string]labelValue, metrics ...*metricspb.Metric) { - for _, metric := range metrics { - payload := &metricProtoPayload{ - metric: metric, - resource: rsc, - node: node, - additionalLabels: labels, - } - se.protoMetricsBundler.Add(payload, 1) - } -} - -// ExportMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring. -func (se *statsExporter) ExportMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) error { +// PushMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring synchronously, +// without de-duping or adding proto metrics to the bundler. +func (se *statsExporter) PushMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) (int, error) { if len(metrics) == 0 { - return errNilMetric + return 0, errNilMetricOrMetricDescriptor } - additionalLabels := se.defaultLabels - if additionalLabels == nil { - // additionalLabels must be stateless because each node is different - additionalLabels = getDefaultLabelsFromNode(node) - } + // Caches the resources seen so far + seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) + mb := newMetricsBatcher(ctx, se.o.ProjectID, se.o.NumberOfWorkers, se.c, se.o.Timeout) for _, metric := range metrics { + if len(metric.GetTimeseries()) == 0 { + // No TimeSeries to export, skip this metric. + continue + } + mappedRsc := se.getResource(rsc, metric, seenResources) if metric.GetMetricDescriptor().GetType() == metricspb.MetricDescriptor_SUMMARY { - se.addPayload(node, rsc, additionalLabels, se.convertSummaryMetrics(metric)...) + summaryMtcs := se.convertSummaryMetrics(metric) + for _, summaryMtc := range summaryMtcs { + if err := se.createMetricDescriptorFromMetricProto(ctx, summaryMtc); err != nil { + mb.recordDroppedTimeseries(len(summaryMtc.GetTimeseries()), err) + continue + } + se.protoMetricToTimeSeries(ctx, mappedRsc, summaryMtc, mb) + } } else { - se.addPayload(node, rsc, additionalLabels, metric) + if err := se.createMetricDescriptorFromMetricProto(ctx, metric); err != nil { + mb.recordDroppedTimeseries(len(metric.GetTimeseries()), err) + continue + } + se.protoMetricToTimeSeries(ctx, mappedRsc, metric, mb) } } - return nil + return mb.droppedTimeSeries, mb.close(ctx) } func (se *statsExporter) convertSummaryMetrics(summary *metricspb.Metric) []*metricspb.Metric { var metrics []*metricspb.Metric - var percentileTss []*metricspb.TimeSeries - var countTss []*metricspb.TimeSeries - var sumTss []*metricspb.TimeSeries for _, ts := range summary.Timeseries { + var percentileTss []*metricspb.TimeSeries + var countTss []*metricspb.TimeSeries + var sumTss []*metricspb.TimeSeries lvs := ts.GetLabelValues() startTime := ts.StartTimestamp @@ -141,7 +134,8 @@ func (se *statsExporter) convertSummaryMetrics(summary *metricspb.Metric) []*met for _, percentileValue := range snapshot.GetPercentileValues() { lvsWithPercentile := lvs[0:] lvsWithPercentile = append(lvsWithPercentile, &metricspb.LabelValue{ - Value: fmt.Sprintf("%f", percentileValue.Percentile), + HasValue: true, + Value: fmt.Sprintf("%f", percentileValue.Percentile), }) percentileTs := &metricspb.TimeSeries{ LabelValues: lvsWithPercentile, @@ -207,135 +201,22 @@ func (se *statsExporter) convertSummaryMetrics(summary *metricspb.Metric) []*met return metrics } -func (se *statsExporter) handleMetricsProtoUpload(payloads []*metricProtoPayload) error { - ctx, cancel := se.o.newContextWithTimeout() - defer cancel() - - ctx, span := trace.StartSpan( - ctx, - "contrib.go.opencensus.io/exporter/stackdriver.uploadMetrics", - trace.WithSampler(trace.NeverSample()), - ) - defer span.End() - - for _, payload := range payloads { - // Now create the metric descriptor remotely. - if err := se.createMetricDescriptor(ctx, payload.metric, payload.additionalLabels); err != nil { - span.SetStatus(trace.Status{Code: 2, Message: err.Error()}) - return err - } +func (se *statsExporter) getResource(rsc *resourcepb.Resource, metric *metricspb.Metric, seenRscs map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) *monitoredrespb.MonitoredResource { + var resource = rsc + if metric.Resource != nil { + resource = metric.Resource } - - var allTimeSeries []*monitoringpb.TimeSeries - for _, payload := range payloads { - tsl, err := se.protoMetricToTimeSeries(ctx, payload.node, payload.resource, payload.metric, payload.additionalLabels) - if err != nil { - span.SetStatus(trace.Status{Code: 2, Message: err.Error()}) - return err - } - allTimeSeries = append(allTimeSeries, tsl...) + mappedRsc, ok := seenRscs[resource] + if !ok { + mappedRsc = se.o.MapResource(resourcepbToResource(resource)) + seenRscs[resource] = mappedRsc } - - // Now batch timeseries up and then export. - for start, end := 0, 0; start < len(allTimeSeries); start = end { - end = start + maxTimeSeriesPerUpload - if end > len(allTimeSeries) { - end = len(allTimeSeries) - } - batch := allTimeSeries[start:end] - ctsreql := se.combineTimeSeriesToCreateTimeSeriesRequest(batch) - for _, ctsreq := range ctsreql { - if err := createTimeSeries(ctx, se.c, ctsreq); err != nil { - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) - // TODO(@odeke-em): Don't fail fast here, perhaps batch errors? - // return err - } - } - } - - return nil -} - -// metricSignature creates a unique signature consisting of a -// metric's type and its lexicographically sorted label values -// See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/120 -func metricSignature(metric *googlemetricpb.Metric) string { - labels := metric.GetLabels() - labelValues := make([]string, 0, len(labels)) - - for _, labelValue := range labels { - labelValues = append(labelValues, labelValue) - } - sort.Strings(labelValues) - return fmt.Sprintf("%s:%s", metric.GetType(), strings.Join(labelValues, ",")) -} - -func (se *statsExporter) combineTimeSeriesToCreateTimeSeriesRequest(ts []*monitoringpb.TimeSeries) (ctsreql []*monitoringpb.CreateTimeSeriesRequest) { - if len(ts) == 0 { - return nil - } - - // Since there are scenarios in which Metrics with the same Type - // can be bunched in the same TimeSeries, we have to ensure that - // we create a unique CreateTimeSeriesRequest with entirely unique Metrics - // per TimeSeries, lest we'll encounter: - // - // err: rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written: - // Field timeSeries[2] had an invalid value: Duplicate TimeSeries encountered. - // Only one point can be written per TimeSeries per request.: timeSeries[2] - // - // This scenario happens when we are using the OpenCensus Agent in which multiple metrics - // are streamed by various client applications. - // See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/73 - uniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts)) - nonUniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts)) - seenMetrics := make(map[string]struct{}) - - for _, tti := range ts { - key := metricSignature(tti.Metric) - if _, alreadySeen := seenMetrics[key]; !alreadySeen { - uniqueTimeSeries = append(uniqueTimeSeries, tti) - seenMetrics[key] = struct{}{} - } else { - nonUniqueTimeSeries = append(nonUniqueTimeSeries, tti) - } - } - - // UniqueTimeSeries can be bunched up together - // While for each nonUniqueTimeSeries, we have - // to make a unique CreateTimeSeriesRequest. - ctsreql = append(ctsreql, &monitoringpb.CreateTimeSeriesRequest{ - Name: monitoring.MetricProjectPath(se.o.ProjectID), - TimeSeries: uniqueTimeSeries, - }) - - // Now recursively also combine the non-unique TimeSeries - // that were singly added to nonUniqueTimeSeries. - // The reason is that we need optimal combinations - // for optimal combinations because: - // * "a/b/c" - // * "a/b/c" - // * "x/y/z" - // * "a/b/c" - // * "x/y/z" - // * "p/y/z" - // * "d/y/z" - // - // should produce: - // CreateTimeSeries(uniqueTimeSeries) :: ["a/b/c", "x/y/z", "p/y/z", "d/y/z"] - // CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c"] - // CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c", "x/y/z"] - nonUniqueRequests := se.combineTimeSeriesToCreateTimeSeriesRequest(nonUniqueTimeSeries) - ctsreql = append(ctsreql, nonUniqueRequests...) - - return ctsreql + return mappedRsc } func resourcepbToResource(rsc *resourcepb.Resource) *resource.Resource { if rsc == nil { - return &resource.Resource{ - Type: "global", - } + return globalResource } res := &resource.Resource{ Type: rsc.Type, @@ -350,92 +231,87 @@ func resourcepbToResource(rsc *resourcepb.Resource) *resource.Resource { // protoMetricToTimeSeries converts a metric into a Stackdriver Monitoring v3 API CreateTimeSeriesRequest // but it doesn't invoke any remote API. -func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metric *metricspb.Metric, additionalLabels map[string]labelValue) ([]*monitoringpb.TimeSeries, error) { - if metric == nil { - return nil, errNilMetric +func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, mappedRsc *monitoredrespb.MonitoredResource, metric *metricspb.Metric, mb *metricsBatcher) { + if metric == nil || metric.MetricDescriptor == nil { + mb.recordDroppedTimeseries(len(metric.GetTimeseries()), errNilMetricOrMetricDescriptor) } - var resource = rsc - if metric.Resource != nil { - resource = metric.Resource - } - - mappedRes := se.o.MapResource(resourcepbToResource(resource)) - - metricName, _, _, err := metricProseFromProto(metric) - if err != nil { - return nil, err - } - metricType, _ := se.metricTypeFromProto(metricName) + metricType := se.metricTypeFromProto(metric.GetMetricDescriptor().GetName()) metricLabelKeys := metric.GetMetricDescriptor().GetLabelKeys() - metricKind, _ := protoMetricDescriptorTypeToMetricKind(metric) + metricKind, valueType := protoMetricDescriptorTypeToMetricKind(metric) + labelKeys := make([]string, 0, len(metricLabelKeys)) + for _, key := range metricLabelKeys { + labelKeys = append(labelKeys, sanitize(key.GetKey())) + } - timeSeries := make([]*monitoringpb.TimeSeries, 0, len(metric.Timeseries)) for _, protoTimeSeries := range metric.Timeseries { + if len(protoTimeSeries.Points) == 0 { + // No points to send just move forward. + continue + } + sdPoints, err := se.protoTimeSeriesToMonitoringPoints(protoTimeSeries, metricKind) if err != nil { - return nil, err + mb.recordDroppedTimeseries(1, err) + continue } // Each TimeSeries has labelValues which MUST be correlated // with that from the MetricDescriptor - labels, err := labelsPerTimeSeries(additionalLabels, metricLabelKeys, protoTimeSeries.GetLabelValues()) + labels, err := labelsPerTimeSeries(se.defaultLabels, labelKeys, protoTimeSeries.GetLabelValues()) if err != nil { - // TODO: (@odeke-em) perhaps log this error from labels extraction, if non-nil. + mb.recordDroppedTimeseries(1, err) continue } - timeSeries = append(timeSeries, &monitoringpb.TimeSeries{ + mb.addTimeSeries(&monitoringpb.TimeSeries{ Metric: &googlemetricpb.Metric{ Type: metricType, Labels: labels, }, - Resource: mappedRes, - Points: sdPoints, + MetricKind: metricKind, + ValueType: valueType, + Resource: mappedRsc, + Points: sdPoints, }) } - - return timeSeries, nil } -func labelsPerTimeSeries(defaults map[string]labelValue, labelKeys []*metricspb.LabelKey, labelValues []*metricspb.LabelValue) (map[string]string, error) { +func labelsPerTimeSeries(defaults map[string]labelValue, labelKeys []string, labelValues []*metricspb.LabelValue) (map[string]string, error) { + if len(labelKeys) != len(labelValues) { + return nil, fmt.Errorf("length mismatch: len(labelKeys)=%d len(labelValues)=%d", len(labelKeys), len(labelValues)) + } + + if len(defaults)+len(labelKeys) == 0 { + // No labels for this metric + return nil, nil + } + labels := make(map[string]string) // Fill in the defaults firstly, irrespective of if the labelKeys and labelValues are mismatched. for key, label := range defaults { - labels[sanitize(key)] = label.val - } - - // Perform this sanity check now. - if len(labelKeys) != len(labelValues) { - return labels, fmt.Errorf("Length mismatch: len(labelKeys)=%d len(labelValues)=%d", len(labelKeys), len(labelValues)) + labels[key] = label.val } for i, labelKey := range labelKeys { labelValue := labelValues[i] - labels[sanitize(labelKey.GetKey())] = labelValue.GetValue() + if !labelValue.GetHasValue() { + continue + } + labels[labelKey] = labelValue.GetValue() } return labels, nil } -func (se *statsExporter) protoMetricDescriptorToCreateMetricDescriptorRequest(ctx context.Context, metric *metricspb.Metric, additionalLabels map[string]labelValue) (*monitoringpb.CreateMetricDescriptorRequest, error) { - // Otherwise, we encountered a cache-miss and - // should create the metric descriptor remotely. - inMD, err := se.protoToMonitoringMetricDescriptor(metric, additionalLabels) - if err != nil { - return nil, err +func (se *statsExporter) createMetricDescriptorFromMetricProto(ctx context.Context, metric *metricspb.Metric) error { + // Skip create metric descriptor if configured + if se.o.SkipCMD { + return nil } - cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{ - Name: fmt.Sprintf("projects/%s", se.o.ProjectID), - MetricDescriptor: inMD, - } + ctx, cancel := newContextWithTimeout(ctx, se.o.Timeout) + defer cancel() - return cmrdesc, nil -} - -// createMetricDescriptor creates a metric descriptor from the OpenCensus proto metric -// and then creates it remotely using Stackdriver's API. -func (se *statsExporter) createMetricDescriptor(ctx context.Context, metric *metricspb.Metric, additionalLabels map[string]labelValue) error { se.protoMu.Lock() defer se.protoMu.Unlock() @@ -444,46 +320,35 @@ func (se *statsExporter) createMetricDescriptor(ctx context.Context, metric *met return nil } + if builtinMetric(se.metricTypeFromProto(name)) { + se.protoMetricDescriptors[name] = true + return nil + } + // Otherwise, we encountered a cache-miss and // should create the metric descriptor remotely. - inMD, err := se.protoToMonitoringMetricDescriptor(metric, additionalLabels) + inMD, err := se.protoToMonitoringMetricDescriptor(metric, se.defaultLabels) if err != nil { return err } - var md *googlemetricpb.MetricDescriptor - if builtinMetric(inMD.Type) { - gmrdesc := &monitoringpb.GetMetricDescriptorRequest{ - Name: inMD.Name, - } - md, err = getMetricDescriptor(ctx, se.c, gmrdesc) - } else { - - cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{ - Name: fmt.Sprintf("projects/%s", se.o.ProjectID), - MetricDescriptor: inMD, - } - md, err = createMetricDescriptor(ctx, se.c, cmrdesc) + if err = se.createMetricDescriptor(ctx, inMD); err != nil { + return err } - if err == nil { - // Now record the metric as having been created. - se.protoMetricDescriptors[name] = md - } - - return err + se.protoMetricDescriptors[name] = true + return nil } -func (se *statsExporter) protoTimeSeriesToMonitoringPoints(ts *metricspb.TimeSeries, metricKind googlemetricpb.MetricDescriptor_MetricKind) (sptl []*monitoringpb.Point, err error) { +func (se *statsExporter) protoTimeSeriesToMonitoringPoints(ts *metricspb.TimeSeries, metricKind googlemetricpb.MetricDescriptor_MetricKind) ([]*monitoringpb.Point, error) { + sptl := make([]*monitoringpb.Point, 0, len(ts.Points)) for _, pt := range ts.Points { - // If we have a last value aggregation point i.e. MetricDescriptor_GAUGE // StartTime should be nil. startTime := ts.StartTimestamp if metricKind == googlemetricpb.MetricDescriptor_GAUGE { startTime = nil } - spt, err := fromProtoPoint(startTime, pt) if err != nil { return nil, err @@ -494,15 +359,15 @@ func (se *statsExporter) protoTimeSeriesToMonitoringPoints(ts *metricspb.TimeSer } func (se *statsExporter) protoToMonitoringMetricDescriptor(metric *metricspb.Metric, additionalLabels map[string]labelValue) (*googlemetricpb.MetricDescriptor, error) { - if metric == nil { - return nil, errNilMetric + if metric == nil || metric.MetricDescriptor == nil { + return nil, errNilMetricOrMetricDescriptor } - metricName, description, unit, err := metricProseFromProto(metric) - if err != nil { - return nil, err - } - metricType, _ := se.metricTypeFromProto(metricName) + md := metric.GetMetricDescriptor() + metricName := md.GetName() + unit := md.GetUnit() + description := md.GetDescription() + metricType := se.metricTypeFromProto(metricName) displayName := se.displayName(metricName) metricKind, valueType := protoMetricDescriptorTypeToMetricKind(metric) @@ -543,32 +408,32 @@ func labelDescriptorsFromProto(defaults map[string]labelValue, protoLabelKeys [] return labelDescriptors } -func metricProseFromProto(metric *metricspb.Metric) (name, description, unit string, err error) { - md := metric.GetMetricDescriptor() - if md == nil { - return "", "", "", errNilMetricDescriptor +func (se *statsExporter) metricTypeFromProto(name string) string { + prefix := se.o.MetricPrefix + if se.o.GetMetricPrefix != nil { + prefix = se.o.GetMetricPrefix(name) } - - name = md.GetName() - unit = md.GetUnit() - description = md.GetDescription() - - if md.Type == metricspb.MetricDescriptor_CUMULATIVE_INT64 { - // If the aggregation type is count, which counts the number of recorded measurements, the unit must be "1", - // because this view does not apply to the recorded values. - unit = stats.UnitDimensionless + if prefix != "" { + name = path.Join(prefix, name) } - - return name, description, unit, nil + if !hasDomain(name) { + // Still needed because the name may or may not have a "/" at the beginning. + name = path.Join(defaultDomain, name) + } + return name } -func (se *statsExporter) metricTypeFromProto(name string) (string, bool) { - // TODO: (@odeke-em) support non-"custom.googleapis.com" metrics names. - name = path.Join("custom.googleapis.com", "opencensus", name) - return name, true +// hasDomain checks if the metric name already has a domain in it. +func hasDomain(name string) bool { + for _, domain := range domains { + if strings.Contains(name, domain) { + return true + } + } + return false } -func fromProtoPoint(startTime *timestamp.Timestamp, pt *metricspb.Point) (*monitoringpb.Point, error) { +func fromProtoPoint(startTime *timestamppb.Timestamp, pt *metricspb.Point) (*monitoringpb.Point, error) { if pt == nil { return nil, nil } @@ -578,14 +443,13 @@ func fromProtoPoint(startTime *timestamp.Timestamp, pt *metricspb.Point) (*monit return nil, err } - mpt := &monitoringpb.Point{ + return &monitoringpb.Point{ Value: mptv, Interval: &monitoringpb.TimeInterval{ StartTime: startTime, EndTime: pt.Timestamp, }, - } - return mpt, nil + }, nil } func protoToMetricPoint(value interface{}) (*monitoringpb.TypedValue, error) { @@ -593,8 +457,6 @@ func protoToMetricPoint(value interface{}) (*monitoringpb.TypedValue, error) { return nil, nil } - var err error - var tval *monitoringpb.TypedValue switch v := value.(type) { default: // All the other types are not yet handled. @@ -610,21 +472,21 @@ func protoToMetricPoint(value interface{}) (*monitoringpb.TypedValue, error) { // TODO: Add conversion from SummaryValue when // https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/66 // has been figured out. - err = fmt.Errorf("protoToMetricPoint: unknown Data type: %T", value) + return nil, fmt.Errorf("protoToMetricPoint: unknown Data type: %T", value) case *metricspb.Point_Int64Value: - tval = &monitoringpb.TypedValue{ + return &monitoringpb.TypedValue{ Value: &monitoringpb.TypedValue_Int64Value{ Int64Value: v.Int64Value, }, - } + }, nil case *metricspb.Point_DoubleValue: - tval = &monitoringpb.TypedValue{ + return &monitoringpb.TypedValue{ Value: &monitoringpb.TypedValue_DoubleValue{ DoubleValue: v.DoubleValue, }, - } + }, nil case *metricspb.Point_DistributionValue: dv := v.DistributionValue @@ -662,10 +524,8 @@ func protoToMetricPoint(value interface{}) (*monitoringpb.TypedValue, error) { mv.DistributionValue.BucketCounts = addZeroBucketCountOnCondition(insertZeroBound, bucketCounts(dv.Buckets)...) } - tval = &monitoringpb.TypedValue{Value: mv} + return &monitoringpb.TypedValue{Value: mv}, nil } - - return tval, err } func bucketCounts(buckets []*metricspb.DistributionValue_Bucket) []int64 { @@ -707,13 +567,3 @@ func protoMetricDescriptorTypeToMetricKind(m *metricspb.Metric) (googlemetricpb. return googlemetricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, googlemetricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED } } - -func getDefaultLabelsFromNode(node *commonpb.Node) map[string]labelValue { - taskValue := fmt.Sprintf("%s-%d@%s", strings.ToLower(node.LibraryInfo.GetLanguage().String()), node.Identifier.Pid, node.Identifier.HostName) - return map[string]labelValue{ - opencensusTaskKey: { - val: taskValue, - desc: opencensusTaskDescription, - }, - } -} diff --git a/vendor/contrib.go.opencensus.io/exporter/stackdriver/metrics_test_utils.go b/vendor/contrib.go.opencensus.io/exporter/stackdriver/metrics_test_utils.go deleted file mode 100644 index bc7a1ff4d..000000000 --- a/vendor/contrib.go.opencensus.io/exporter/stackdriver/metrics_test_utils.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2019, OpenCensus Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package stackdriver - -/* -Common test utilities for comparing Stackdriver metrics. -*/ - -import ( - "github.com/golang/protobuf/ptypes/timestamp" - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - - googlemetricpb "google.golang.org/genproto/googleapis/api/metric" - monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" - monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" - - "time" -) - -func timestampToTime(ts *timestamp.Timestamp) time.Time { - if ts == nil { - return time.Unix(0, 0).UTC() - } - return time.Unix(ts.Seconds, int64(ts.Nanos)).UTC() -} - -func cmpResource(got, want *monitoredrespb.MonitoredResource) string { - return cmp.Diff(got, want, cmpopts.IgnoreUnexported(monitoredrespb.MonitoredResource{})) -} - -func cmpTSReqs(got, want []*monitoringpb.CreateTimeSeriesRequest) string { - return cmp.Diff(got, want, cmpopts.IgnoreUnexported(monitoringpb.CreateTimeSeriesRequest{})) -} - -func cmpMD(got, want *googlemetricpb.MetricDescriptor) string { - return cmp.Diff(got, want, cmpopts.IgnoreUnexported(googlemetricpb.MetricDescriptor{})) -} - -func cmpMDReq(got, want *monitoringpb.CreateMetricDescriptorRequest) string { - return cmp.Diff(got, want, cmpopts.IgnoreUnexported(monitoringpb.CreateMetricDescriptorRequest{})) -} - -func cmpMDReqs(got, want []*monitoringpb.CreateMetricDescriptorRequest) string { - return cmp.Diff(got, want, cmpopts.IgnoreUnexported(monitoringpb.CreateMetricDescriptorRequest{})) -} - -func cmpPoint(got, want *monitoringpb.Point) string { - return cmp.Diff(got, want, cmpopts.IgnoreUnexported(monitoringpb.Point{})) -} diff --git a/vendor/contrib.go.opencensus.io/exporter/stackdriver/monitoredresource/aws_identity_doc_utils.go b/vendor/contrib.go.opencensus.io/exporter/stackdriver/monitoredresource/aws_identity_doc_utils.go index d6a23a8cf..ee519a4bf 100644 --- a/vendor/contrib.go.opencensus.io/exporter/stackdriver/monitoredresource/aws_identity_doc_utils.go +++ b/vendor/contrib.go.opencensus.io/exporter/stackdriver/monitoredresource/aws_identity_doc_utils.go @@ -37,8 +37,12 @@ type awsIdentityDocument struct { // This is only done once. func retrieveAWSIdentityDocument() *awsIdentityDocument { awsIdentityDoc := awsIdentityDocument{} - c := ec2metadata.New(session.New()) - if c.Available() == false { + sesion, err := session.NewSession() + if err != nil { + return nil + } + c := ec2metadata.New(sesion) + if !c.Available() { return nil } ec2InstanceIdentifyDocument, err := c.GetInstanceIdentityDocument() diff --git a/vendor/contrib.go.opencensus.io/exporter/stackdriver/monitoredresource/gcp_metadata_config.go b/vendor/contrib.go.opencensus.io/exporter/stackdriver/monitoredresource/gcp_metadata_config.go index 412e34772..f0d88856b 100644 --- a/vendor/contrib.go.opencensus.io/exporter/stackdriver/monitoredresource/gcp_metadata_config.go +++ b/vendor/contrib.go.opencensus.io/exporter/stackdriver/monitoredresource/gcp_metadata_config.go @@ -22,7 +22,7 @@ import ( "strings" "cloud.google.com/go/compute/metadata" - "cloud.google.com/go/container/apiv1" + container "cloud.google.com/go/container/apiv1" containerpb "google.golang.org/genproto/googleapis/container/v1" ) diff --git a/vendor/contrib.go.opencensus.io/exporter/stackdriver/resource.go b/vendor/contrib.go.opencensus.io/exporter/stackdriver/resource.go index 5e2fbce58..782011cb6 100644 --- a/vendor/contrib.go.opencensus.io/exporter/stackdriver/resource.go +++ b/vendor/contrib.go.opencensus.io/exporter/stackdriver/resource.go @@ -22,13 +22,6 @@ import ( monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" ) -type resourceMap struct { - // Mapping from the input resource type to the monitored resource type in Stackdriver. - srcType, dstType string - // Mapping from Stackdriver monitored resource label to an OpenCensus resource label. - labels map[string]string -} - // Resource labels that are generally internal to the exporter. // Consider exposing these labels and a type identifier in the future to allow // for customization. @@ -41,7 +34,7 @@ const ( ) // Mappings for the well-known OpenCensus resources to applicable Stackdriver resources. -var k8sResourceMap = map[string]string{ +var k8sContainerMap = map[string]string{ "project_id": stackdriverProjectID, "location": resourcekeys.CloudKeyZone, "cluster_name": resourcekeys.K8SKeyClusterName, @@ -50,6 +43,21 @@ var k8sResourceMap = map[string]string{ "container_name": resourcekeys.ContainerKeyName, } +var k8sPodMap = map[string]string{ + "project_id": stackdriverProjectID, + "location": resourcekeys.CloudKeyZone, + "cluster_name": resourcekeys.K8SKeyClusterName, + "namespace_name": resourcekeys.K8SKeyNamespaceName, + "pod_name": resourcekeys.K8SKeyPodName, +} + +var k8sNodeMap = map[string]string{ + "project_id": stackdriverProjectID, + "location": resourcekeys.CloudKeyZone, + "cluster_name": resourcekeys.K8SKeyClusterName, + "node_name": resourcekeys.HostKeyName, +} + var gcpResourceMap = map[string]string{ "project_id": stackdriverProjectID, "instance_id": resourcekeys.HostKeyID, @@ -72,14 +80,20 @@ var genericResourceMap = map[string]string{ "task_id": stackdriverGenericTaskID, } -func transformResource(match, input map[string]string) map[string]string { +// returns transformed label map and true if all labels in match are found +// in input except optional project_id. It returns false if at least one label +// other than project_id is missing. +func transformResource(match, input map[string]string) (map[string]string, bool) { output := make(map[string]string, len(input)) for dst, src := range match { - if v, ok := input[src]; ok { + v, ok := input[src] + if ok { output[dst] = v + } else if dst != "project_id" { + return nil, true } } - return output + return output, false } func defaultMapResource(res *resource.Resource) *monitoredrespb.MonitoredResource { @@ -90,19 +104,36 @@ func defaultMapResource(res *resource.Resource) *monitoredrespb.MonitoredResourc if res == nil || res.Labels == nil { return result } - if res.Type == resourcekeys.ContainerType { + + switch { + case res.Type == resourcekeys.ContainerType: result.Type = "k8s_container" - match = k8sResourceMap - } else if v, ok := res.Labels[resourcekeys.CloudKeyProvider]; ok { - if v == resourcekeys.CloudProviderGCP { - result.Type = "gce_instance" - match = gcpResourceMap - } else if v == resourcekeys.CloudProviderAWS { - result.Type = "aws_ec2_instance" - match = awsResourceMap - } + match = k8sContainerMap + case res.Type == resourcekeys.K8SType: + result.Type = "k8s_pod" + match = k8sPodMap + case res.Type == resourcekeys.HostType && res.Labels[resourcekeys.K8SKeyClusterName] != "": + result.Type = "k8s_node" + match = k8sNodeMap + case res.Labels[resourcekeys.CloudKeyProvider] == resourcekeys.CloudProviderGCP: + result.Type = "gce_instance" + match = gcpResourceMap + case res.Labels[resourcekeys.CloudKeyProvider] == resourcekeys.CloudProviderAWS: + result.Type = "aws_ec2_instance" + match = awsResourceMap + } + + var missing bool + result.Labels, missing = transformResource(match, res.Labels) + if missing { + result.Type = "global" + // if project id specified then transform it. + if v, ok := res.Labels[stackdriverProjectID]; ok { + result.Labels = make(map[string]string, 1) + result.Labels["project_id"] = v + } + return result } - result.Labels = transformResource(match, res.Labels) if result.Type == "aws_ec2_instance" { if v, ok := result.Labels["region"]; ok { result.Labels["region"] = fmt.Sprintf("aws:%s", v) diff --git a/vendor/contrib.go.opencensus.io/exporter/stackdriver/stackdriver.go b/vendor/contrib.go.opencensus.io/exporter/stackdriver/stackdriver.go index 7672a3e06..fafd06c28 100644 --- a/vendor/contrib.go.opencensus.io/exporter/stackdriver/stackdriver.go +++ b/vendor/contrib.go.opencensus.io/exporter/stackdriver/stackdriver.go @@ -54,6 +54,7 @@ import ( "log" "os" "path" + "strings" "time" metadataapi "cloud.google.com/go/compute/metadata" @@ -61,7 +62,6 @@ import ( "contrib.go.opencensus.io/exporter/stackdriver/monitoredresource" "go.opencensus.io/resource" "go.opencensus.io/stats/view" - "go.opencensus.io/tag" "go.opencensus.io/trace" "golang.org/x/oauth2/google" "google.golang.org/api/option" @@ -186,11 +186,9 @@ type Options struct { // conversions from auto-detected resources to well-known Stackdriver monitored resources. MapResource func(*resource.Resource) *monitoredrespb.MonitoredResource - // MetricPrefix overrides the prefix of a Stackdriver metric display names. - // Optional. If unset defaults to "OpenCensus/". - // Deprecated: Provide GetMetricDisplayName to change the display name of - // the metric. - // If GetMetricDisplayName is non-nil, this option is ignored. + // MetricPrefix overrides the prefix of a Stackdriver metric names. + // Optional. If unset defaults to "custom.googleapis.com/opencensus/". + // If GetMetricPrefix is non-nil, this option is ignored. MetricPrefix string // GetMetricDisplayName allows customizing the display name for the metric @@ -203,8 +201,16 @@ type Options struct { // "custom.googleapis.com/opencensus/" + view.Name // // See: https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.metricDescriptors#MetricDescriptor + // Depreacted. Use GetMetricPrefix instead. GetMetricType func(view *view.View) string + // GetMetricPrefix allows customizing the metric prefix for the given metric name. + // If it is not set, MetricPrefix is used. If MetricPrefix is not set, it defaults to: + // "custom.googleapis.com/opencensus/" + // + // See: https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.metricDescriptors#MetricDescriptor + GetMetricPrefix func(name string) string + // DefaultTraceAttributes will be appended to every span that is exported to // Stackdriver Trace. DefaultTraceAttributes map[string]interface{} @@ -238,31 +244,47 @@ type Options struct { // If unset, context.Background() will be used. Context context.Context + // SkipCMD enforces to skip all the CreateMetricDescriptor calls. + // These calls are important in order to configure the unit of the metrics, + // but in some cases all the exported metrics are builtin (unit is configured) + // or the unit is not important. + SkipCMD bool + // Timeout for all API calls. If not set, defaults to 5 seconds. Timeout time.Duration - // GetMonitoredResource may be provided to supply the details of the - // monitored resource dynamically based on the tags associated with each - // data point. Most users will not need to set this, but should instead - // set the MonitoredResource field. - // - // GetMonitoredResource may add or remove tags by returning a new set of - // tags. It is safe for the function to mutate its argument and return it. - // - // See the documentation on the MonitoredResource field for guidance on the - // interaction between monitored resources and labels. - // - // The MonitoredResource field is ignored if this field is set to a non-nil - // value. - GetMonitoredResource func(*view.View, []tag.Tag) ([]tag.Tag, monitoredresource.Interface) - // ReportingInterval sets the interval between reporting metrics. // If it is set to zero then default value is used. ReportingInterval time.Duration + + // NumberOfWorkers sets the number of go rountines that send requests + // to Stackdriver Monitoring. This is only used for Proto metrics export + // for now. The minimum number of workers is 1. + NumberOfWorkers int + + // ResourceByDescriptor may be provided to supply monitored resource dynamically + // based on the metric Descriptor. Most users will not need to set this, + // but should instead set ResourceDetector. + // + // The MonitoredResource and ResourceDetector fields are ignored if this + // field is set to a non-nil value. + // + // The ResourceByDescriptor is called to derive monitored resources from + // metric.Descriptor and the label map associated with the time-series. + // If any label is used for the derived resource then it will be removed + // from the label map. The remaining labels in the map are returned to + // be used with the time-series. + // + // If the func set to this field does not return valid resource even for one + // time-series then it will result into an error for the entire CreateTimeSeries request + // which may contain more than one time-series. + ResourceByDescriptor func(*metricdata.Descriptor, map[string]string) (map[string]string, monitoredresource.Interface) } const defaultTimeout = 5 * time.Second +var defaultDomain = path.Join("custom.googleapis.com", "opencensus") + // Exporter is a stats and trace exporter that uploads data to Stackdriver. // // You can create a single Exporter and register it as both a trace exporter @@ -291,16 +313,19 @@ func NewExporter(o Options) (*Exporter, error) { o.ProjectID = creds.ProjectID } if o.Location == "" { - ctx := o.Context - if ctx == nil { - ctx = context.Background() - } - zone, err := metadataapi.Zone() - if err != nil { - log.Printf("Setting Stackdriver default location failed: %s", err) - } else { - log.Printf("Setting Stackdriver default location to %q", zone) - o.Location = zone + if metadataapi.OnGCE() { + zone, err := metadataapi.Zone() + if err != nil { + // This error should be logged with a warning level. + err = fmt.Errorf("setting Stackdriver default location failed: %s", err) + if o.OnError != nil { + o.OnError(err) + } else { + log.Print(err) + } + } else { + o.Location = zone + } } } @@ -329,6 +354,9 @@ func NewExporter(o Options) (*Exporter, error) { o.Resource = o.MapResource(res) } + if o.MetricPrefix != "" && !strings.HasSuffix(o.MetricPrefix, "/") { + o.MetricPrefix = o.MetricPrefix + "/" + } se, err := newStatsExporter(o) if err != nil { @@ -346,13 +374,21 @@ func NewExporter(o Options) (*Exporter, error) { // ExportView exports to the Stackdriver Monitoring if view data // has one or more rows. +// Deprecated: use ExportMetrics and StartMetricsExporter instead. func (e *Exporter) ExportView(vd *view.Data) { e.statsExporter.ExportView(vd) } -// ExportMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring. +// ExportMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring synchronously, +// without de-duping or adding proto metrics to the bundler. func (e *Exporter) ExportMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) error { - return e.statsExporter.ExportMetricsProto(ctx, node, rsc, metrics) + _, err := e.statsExporter.PushMetricsProto(ctx, node, rsc, metrics) + return err +} + +// PushMetricsProto simliar with ExportMetricsProto but returns the number of dropped timeseries. +func (e *Exporter) PushMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) (int, error) { + return e.statsExporter.PushMetricsProto(ctx, node, rsc, metrics) } // ExportMetrics exports OpenCensus Metrics to Stackdriver Monitoring @@ -420,12 +456,10 @@ func (o Options) handleError(err error) { log.Printf("Failed to export to Stackdriver: %v", err) } -func (o Options) newContextWithTimeout() (context.Context, func()) { - ctx := o.Context +func newContextWithTimeout(ctx context.Context, timeout time.Duration) (context.Context, func()) { if ctx == nil { ctx = context.Background() } - timeout := o.Timeout if timeout <= 0 { timeout = defaultTimeout } diff --git a/vendor/contrib.go.opencensus.io/exporter/stackdriver/stats.go b/vendor/contrib.go.opencensus.io/exporter/stackdriver/stats.go index 47f54ce6e..e0a02ca9a 100644 --- a/vendor/contrib.go.opencensus.io/exporter/stackdriver/stats.go +++ b/vendor/contrib.go.opencensus.io/exporter/stackdriver/stats.go @@ -20,18 +20,19 @@ import ( "fmt" "os" "path" + "sort" "strconv" "strings" "sync" "time" - "go.opencensus.io" + opencensus "go.opencensus.io" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" "go.opencensus.io/trace" - "cloud.google.com/go/monitoring/apiv3" + monitoring "cloud.google.com/go/monitoring/apiv3" "github.com/golang/protobuf/ptypes/timestamp" "go.opencensus.io/metric/metricdata" "go.opencensus.io/metric/metricexport" @@ -40,6 +41,7 @@ import ( distributionpb "google.golang.org/genproto/googleapis/api/distribution" labelpb "google.golang.org/genproto/googleapis/api/label" "google.golang.org/genproto/googleapis/api/metric" + googlemetricpb "google.golang.org/genproto/googleapis/api/metric" metricpb "google.golang.org/genproto/googleapis/api/metric" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" @@ -59,18 +61,14 @@ var userAgent = fmt.Sprintf("opencensus-go %s; stackdriver-exporter %s", opencen type statsExporter struct { o Options - viewDataBundler *bundler.Bundler - protoMetricsBundler *bundler.Bundler - metricsBundler *bundler.Bundler - - createdViewsMu sync.Mutex - createdViews map[string]*metricpb.MetricDescriptor // Views already created remotely + viewDataBundler *bundler.Bundler + metricsBundler *bundler.Bundler protoMu sync.Mutex - protoMetricDescriptors map[string]*metricpb.MetricDescriptor // Saves the metric descriptors that were already created remotely + protoMetricDescriptors map[string]bool // Metric descriptors that were already created remotely metricMu sync.Mutex - metricDescriptors map[string]*metricpb.MetricDescriptor // Saves the metric descriptors that were already created remotely + metricDescriptors map[string]bool // Metric descriptors that were already created remotely c *monitoring.MetricClient defaultLabels map[string]labelValue @@ -92,8 +90,10 @@ func newStatsExporter(o Options) (*statsExporter, error) { } opts := append(o.MonitoringClientOptions, option.WithUserAgent(userAgent)) - ctx, cancel := o.newContextWithTimeout() - defer cancel() + ctx := o.Context + if ctx == nil { + ctx = context.Background() + } client, err := monitoring.NewMetricClient(ctx, opts...) if err != nil { return nil, err @@ -101,39 +101,39 @@ func newStatsExporter(o Options) (*statsExporter, error) { e := &statsExporter{ c: client, o: o, - createdViews: make(map[string]*metricpb.MetricDescriptor), - protoMetricDescriptors: make(map[string]*metricpb.MetricDescriptor), - metricDescriptors: make(map[string]*metricpb.MetricDescriptor), + protoMetricDescriptors: make(map[string]bool), + metricDescriptors: make(map[string]bool), } + var defaultLablesNotSanitized map[string]labelValue if o.DefaultMonitoringLabels != nil { - e.defaultLabels = o.DefaultMonitoringLabels.m + defaultLablesNotSanitized = o.DefaultMonitoringLabels.m } else { - e.defaultLabels = map[string]labelValue{ + defaultLablesNotSanitized = map[string]labelValue{ opencensusTaskKey: {val: getTaskValue(), desc: opencensusTaskDescription}, } } + e.defaultLabels = make(map[string]labelValue) + // Fill in the defaults firstly, irrespective of if the labelKeys and labelValues are mismatched. + for key, label := range defaultLablesNotSanitized { + e.defaultLabels[sanitize(key)] = label + } + e.viewDataBundler = bundler.NewBundler((*view.Data)(nil), func(bundle interface{}) { vds := bundle.([]*view.Data) e.handleUpload(vds...) }) - e.protoMetricsBundler = bundler.NewBundler((*metricProtoPayload)(nil), func(bundle interface{}) { - payloads := bundle.([]*metricProtoPayload) - e.handleMetricsProtoUpload(payloads) - }) e.metricsBundler = bundler.NewBundler((*metricdata.Metric)(nil), func(bundle interface{}) { metrics := bundle.([]*metricdata.Metric) e.handleMetricsUpload(metrics) }) if delayThreshold := e.o.BundleDelayThreshold; delayThreshold > 0 { e.viewDataBundler.DelayThreshold = delayThreshold - e.protoMetricsBundler.DelayThreshold = delayThreshold e.metricsBundler.DelayThreshold = delayThreshold } if countThreshold := e.o.BundleCountThreshold; countThreshold > 0 { e.viewDataBundler.BundleCountThreshold = countThreshold - e.protoMetricsBundler.BundleCountThreshold = countThreshold e.metricsBundler.BundleCountThreshold = countThreshold } return e, nil @@ -141,7 +141,7 @@ func newStatsExporter(o Options) (*statsExporter, error) { func (e *statsExporter) startMetricsReader() error { e.initReaderOnce.Do(func() { - e.ir, _ = metricexport.NewIntervalReader(&metricexport.Reader{}, e) + e.ir, _ = metricexport.NewIntervalReader(metricexport.NewReader(), e) }) e.ir.ReportingInterval = e.o.ReportingInterval return e.ir.Start() @@ -154,10 +154,6 @@ func (e *statsExporter) stopMetricsReader() { } func (e *statsExporter) getMonitoredResource(v *view.View, tags []tag.Tag) ([]tag.Tag, *monitoredrespb.MonitoredResource) { - if get := e.o.GetMonitoredResource; get != nil { - newTags, mr := get(v, tags) - return newTags, convertMonitoredResourceToPB(mr) - } resource := e.o.Resource if resource == nil { resource = &monitoredrespb.MonitoredResource{ @@ -208,12 +204,11 @@ func (e *statsExporter) handleUpload(vds ...*view.Data) { // want to lose data that hasn't yet been exported. func (e *statsExporter) Flush() { e.viewDataBundler.Flush() - e.protoMetricsBundler.Flush() e.metricsBundler.Flush() } func (e *statsExporter) uploadStats(vds []*view.Data) error { - ctx, cancel := e.o.newContextWithTimeout() + ctx, cancel := newContextWithTimeout(e.o.Context, e.o.Timeout) defer cancel() ctx, span := trace.StartSpan( ctx, @@ -223,7 +218,7 @@ func (e *statsExporter) uploadStats(vds []*view.Data) error { defer span.End() for _, vd := range vds { - if err := e.createMeasure(ctx, vd.View); err != nil { + if err := e.createMetricDescriptorFromView(ctx, vd.View); err != nil { span.SetStatus(trace.Status{Code: 2, Message: err.Error()}) return err } @@ -332,34 +327,27 @@ func (e *statsExporter) viewToMetricDescriptor(ctx context.Context, v *view.View return res, nil } -func (e *statsExporter) viewToCreateMetricDescriptorRequest(ctx context.Context, v *view.View) (*monitoringpb.CreateMetricDescriptorRequest, error) { - inMD, err := e.viewToMetricDescriptor(ctx, v) - if err != nil { - return nil, err - } - - cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{ - Name: fmt.Sprintf("projects/%s", e.o.ProjectID), - MetricDescriptor: inMD, - } - return cmrdesc, nil -} - -// createMeasure creates a MetricDescriptor for the given view data in Stackdriver Monitoring. +// createMetricDescriptorFromView creates a MetricDescriptor for the given view data in Stackdriver Monitoring. // An error will be returned if there is already a metric descriptor created with the same name // but it has a different aggregation or keys. -func (e *statsExporter) createMeasure(ctx context.Context, v *view.View) error { - e.createdViewsMu.Lock() - defer e.createdViewsMu.Unlock() +func (e *statsExporter) createMetricDescriptorFromView(ctx context.Context, v *view.View) error { + // Skip create metric descriptor if configured + if e.o.SkipCMD { + return nil + } + + e.metricMu.Lock() + defer e.metricMu.Unlock() viewName := v.Name - if md, ok := e.createdViews[viewName]; ok { - // [TODO:rghetia] Temporary fix for https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/76#issuecomment-459459091 - if builtinMetric(md.Type) { - return nil - } - return e.equalMeasureAggTagKeys(md, v.Measure, v.Aggregation, v.TagKeys) + if _, created := e.metricDescriptors[viewName]; created { + return nil + } + + if builtinMetric(e.metricType(v)) { + e.metricDescriptors[viewName] = true + return nil } inMD, err := e.viewToMetricDescriptor(ctx, v) @@ -367,34 +355,92 @@ func (e *statsExporter) createMeasure(ctx context.Context, v *view.View) error { return err } - var dmd *metric.MetricDescriptor - if builtinMetric(inMD.Type) { - gmrdesc := &monitoringpb.GetMetricDescriptorRequest{ - Name: inMD.Name, - } - dmd, err = getMetricDescriptor(ctx, e.c, gmrdesc) - } else { - cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{ - Name: fmt.Sprintf("projects/%s", e.o.ProjectID), - MetricDescriptor: inMD, - } - dmd, err = createMetricDescriptor(ctx, e.c, cmrdesc) - } - if err != nil { + if err = e.createMetricDescriptor(ctx, inMD); err != nil { return err } // Now cache the metric descriptor - e.createdViews[viewName] = dmd - return err + e.metricDescriptors[viewName] = true + return nil } func (e *statsExporter) displayName(suffix string) string { - displayNamePrefix := defaultDisplayNamePrefix - if e.o.MetricPrefix != "" { - displayNamePrefix = e.o.MetricPrefix + return path.Join(defaultDisplayNamePrefix, suffix) +} + +func (e *statsExporter) combineTimeSeriesToCreateTimeSeriesRequest(ts []*monitoringpb.TimeSeries) (ctsreql []*monitoringpb.CreateTimeSeriesRequest) { + if len(ts) == 0 { + return nil } - return path.Join(displayNamePrefix, suffix) + + // Since there are scenarios in which Metrics with the same Type + // can be bunched in the same TimeSeries, we have to ensure that + // we create a unique CreateTimeSeriesRequest with entirely unique Metrics + // per TimeSeries, lest we'll encounter: + // + // err: rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written: + // Field timeSeries[2] had an invalid value: Duplicate TimeSeries encountered. + // Only one point can be written per TimeSeries per request.: timeSeries[2] + // + // This scenario happens when we are using the OpenCensus Agent in which multiple metrics + // are streamed by various client applications. + // See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/73 + uniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts)) + nonUniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts)) + seenMetrics := make(map[string]struct{}) + + for _, tti := range ts { + key := metricSignature(tti.Metric) + if _, alreadySeen := seenMetrics[key]; !alreadySeen { + uniqueTimeSeries = append(uniqueTimeSeries, tti) + seenMetrics[key] = struct{}{} + } else { + nonUniqueTimeSeries = append(nonUniqueTimeSeries, tti) + } + } + + // UniqueTimeSeries can be bunched up together + // While for each nonUniqueTimeSeries, we have + // to make a unique CreateTimeSeriesRequest. + ctsreql = append(ctsreql, &monitoringpb.CreateTimeSeriesRequest{ + Name: fmt.Sprintf("projects/%s", e.o.ProjectID), + TimeSeries: uniqueTimeSeries, + }) + + // Now recursively also combine the non-unique TimeSeries + // that were singly added to nonUniqueTimeSeries. + // The reason is that we need optimal combinations + // for optimal combinations because: + // * "a/b/c" + // * "a/b/c" + // * "x/y/z" + // * "a/b/c" + // * "x/y/z" + // * "p/y/z" + // * "d/y/z" + // + // should produce: + // CreateTimeSeries(uniqueTimeSeries) :: ["a/b/c", "x/y/z", "p/y/z", "d/y/z"] + // CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c"] + // CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c", "x/y/z"] + nonUniqueRequests := e.combineTimeSeriesToCreateTimeSeriesRequest(nonUniqueTimeSeries) + ctsreql = append(ctsreql, nonUniqueRequests...) + + return ctsreql +} + +// metricSignature creates a unique signature consisting of a +// metric's type and its lexicographically sorted label values +// See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/120 +func metricSignature(metric *googlemetricpb.Metric) string { + labels := metric.GetLabels() + labelValues := make([]string, 0, len(labels)) + + for _, labelValue := range labels { + labelValues = append(labelValues, labelValue) + } + sort.Strings(labelValues) + return fmt.Sprintf("%s:%s", metric.GetType(), strings.Join(labelValues, ",")) } func newPoint(v *view.View, row *view.Row, start, end time.Time) *monitoringpb.Point { @@ -546,61 +592,21 @@ func newLabelDescriptors(defaults map[string]labelValue, keys []tag.Key) []*labe return labelDescriptors } -func (e *statsExporter) equalMeasureAggTagKeys(md *metricpb.MetricDescriptor, m stats.Measure, agg *view.Aggregation, keys []tag.Key) error { - var aggTypeMatch bool - switch md.ValueType { - case metricpb.MetricDescriptor_INT64: - if _, ok := m.(*stats.Int64Measure); !(ok || agg.Type == view.AggTypeCount) { - return fmt.Errorf("stackdriver metric descriptor was not created as int64") - } - aggTypeMatch = agg.Type == view.AggTypeCount || agg.Type == view.AggTypeSum || agg.Type == view.AggTypeLastValue - case metricpb.MetricDescriptor_DOUBLE: - if _, ok := m.(*stats.Float64Measure); !ok { - return fmt.Errorf("stackdriver metric descriptor was not created as double") - } - aggTypeMatch = agg.Type == view.AggTypeSum || agg.Type == view.AggTypeLastValue - case metricpb.MetricDescriptor_DISTRIBUTION: - aggTypeMatch = agg.Type == view.AggTypeDistribution +func (e *statsExporter) createMetricDescriptor(ctx context.Context, md *metric.MetricDescriptor) error { + ctx, cancel := newContextWithTimeout(ctx, e.o.Timeout) + defer cancel() + cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{ + Name: fmt.Sprintf("projects/%s", e.o.ProjectID), + MetricDescriptor: md, } - - if !aggTypeMatch { - return fmt.Errorf("stackdriver metric descriptor was not created with aggregation type %T", agg.Type) - } - - labels := make(map[string]struct{}, len(keys)+len(e.defaultLabels)) - for _, k := range keys { - labels[sanitize(k.Name())] = struct{}{} - } - for k := range e.defaultLabels { - labels[sanitize(k)] = struct{}{} - } - - for _, k := range md.Labels { - if _, ok := labels[k.Key]; !ok { - return fmt.Errorf("stackdriver metric descriptor %q was not created with label %q", md.Type, k) - } - delete(labels, k.Key) - } - - if len(labels) > 0 { - extra := make([]string, 0, len(labels)) - for k := range labels { - extra = append(extra, k) - } - return fmt.Errorf("stackdriver metric descriptor %q contains unexpected labels: %s", md.Type, strings.Join(extra, ", ")) - } - - return nil + _, err := createMetricDescriptor(ctx, e.c, cmrdesc) + return err } var createMetricDescriptor = func(ctx context.Context, c *monitoring.MetricClient, mdr *monitoringpb.CreateMetricDescriptorRequest) (*metric.MetricDescriptor, error) { return c.CreateMetricDescriptor(ctx, mdr) } -var getMetricDescriptor = func(ctx context.Context, c *monitoring.MetricClient, mdr *monitoringpb.GetMetricDescriptorRequest) (*metric.MetricDescriptor, error) { - return c.GetMetricDescriptor(ctx, mdr) -} - var createTimeSeries = func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error { return c.CreateTimeSeries(ctx, ts) } diff --git a/vendor/contrib.go.opencensus.io/exporter/stackdriver/trace.go b/vendor/contrib.go.opencensus.io/exporter/stackdriver/trace.go index 71e7f36d2..ee6535eef 100644 --- a/vendor/contrib.go.opencensus.io/exporter/stackdriver/trace.go +++ b/vendor/contrib.go.opencensus.io/exporter/stackdriver/trace.go @@ -121,7 +121,7 @@ func (e *traceExporter) uploadSpans(spans []*tracepb.Span) { Spans: spans, } // Create a never-sampled span to prevent traces associated with exporter. - ctx, cancel := e.o.newContextWithTimeout() + ctx, cancel := newContextWithTimeout(e.o.Context, e.o.Timeout) defer cancel() ctx, span := trace.StartSpan( ctx,