Upgrade OpenCensus Stackdriver exporter (#865)

* upgrade

* change unit tests

* upgrade os sd again

* change unit tests

* add comment

* add comment

* address comments
This commit is contained in:
Yanwei Guo 2019-12-03 14:12:37 -08:00 committed by Knative Prow Robot
parent 3444316bde
commit 94a34e416c
18 changed files with 864 additions and 818 deletions

6
Gopkg.lock generated
View File

@ -29,15 +29,14 @@
version = "v0.1.0"
[[projects]]
digest = "1:83bd9ccdcc61bb43f45e4336cf9622849b5a867ef137f8b53303968202970225"
digest = "1:c3fd5ddaad733530174bba5dd787d98a45d181851a95a0b7362be7bce7144f56"
name = "contrib.go.opencensus.io/exporter/stackdriver"
packages = [
".",
"monitoredresource",
]
pruneopts = "NUT"
revision = "68e3d742b03c099c35428443886e65d9587c8d76"
version = "v0.12.2"
revision = "59d068f8d8ff5b653916aa30cdc4e13c7f15d56e"
[[projects]]
digest = "1:7b5f423f5b0dd3dfa32a19a6183b0ab9129bff371ebf3f9efae32f87e4986d8f"
@ -1354,6 +1353,7 @@
"github.com/rogpeppe/go-internal/semver",
"github.com/spf13/pflag",
"github.com/tsenart/vegeta/lib",
"go.opencensus.io/metric/metricdata",
"go.opencensus.io/plugin/ochttp",
"go.opencensus.io/plugin/ochttp/propagation/b3",
"go.opencensus.io/stats",

View File

@ -75,9 +75,8 @@ required = [
[[constraint]]
name = "contrib.go.opencensus.io/exporter/stackdriver"
# The build fails against 0.12.6 and newer because
# stackdriver.Options.GetMonitoredResource was removed.
version = "<=v0.12.5"
# With the fix of https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/237
revision = "59d068f8d8ff5b653916aa30cdc4e13c7f15d56e"
[[constraint]]
name = "github.com/google/mako"

View File

@ -33,6 +33,11 @@ type flushable interface {
Flush()
}
type stoppable interface {
// StopMetricsExporter stops the exporter
StopMetricsExporter()
}
// ExporterOptions contains options for configuring the exporter.
type ExporterOptions struct {
// Domain is the metrics domain. e.g. "knative.dev". Must be present.
@ -126,11 +131,12 @@ func newMetricsExporter(config *metricsConfig, logger *zap.SugaredLogger) (view.
// If there is a Prometheus Exporter server running, stop it.
resetCurPromSrv()
if ce != nil {
// UnregisterExporter is idempotent and it can be called multiple times for the same exporter
// without side effects.
view.UnregisterExporter(ce)
// TODO(https://github.com/knative/pkg/issues/866): Move Stackdriver and Promethus
// operations before stopping to an interface.
if se, ok := ce.(stoppable); ok {
se.StopMetricsExporter()
}
var err error
var e view.Exporter
switch config.backendDestination {
@ -156,7 +162,6 @@ func getCurMetricsExporter() view.Exporter {
func setCurMetricsExporter(e view.Exporter) {
metricsMux.Lock()
defer metricsMux.Unlock()
view.RegisterExporter(e)
curMetricsExporter = e
}

View File

@ -18,11 +18,8 @@ import (
"testing"
"time"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
. "knative.dev/pkg/logging/testing"
"knative.dev/pkg/metrics/metricskey"
)
// TODO UTs should move to eventing and serving, as appropriate.
@ -44,35 +41,6 @@ const (
testSourceResourceGroup = "test-source-rg"
)
var (
testView = &view.View{
Description: "Test View",
Measure: stats.Int64("test", "Test Measure", stats.UnitNone),
Aggregation: view.LastValue(),
TagKeys: []tag.Key{},
}
nsKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelNamespaceName), Value: testNS}
serviceKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelServiceName), Value: testService}
routeKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelRouteName), Value: testRoute}
revisionKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelRevisionName), Value: testRevision}
brokerKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelBrokerName), Value: testBroker}
triggerKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelTriggerName), Value: testTrigger}
filterTypeKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelFilterType), Value: testFilterType}
sourceKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelName), Value: testSource}
sourceResourceGroupKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelResourceGroup), Value: testSourceResourceGroup}
eventTypeKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelEventType), Value: testEventType}
eventSourceKey = tag.Tag{Key: mustNewTagKey(metricskey.LabelEventSource), Value: testEventSource}
revisionTestTags = []tag.Tag{nsKey, serviceKey, routeKey, revisionKey}
brokerTestTags = []tag.Tag{nsKey, brokerKey, eventTypeKey}
triggerTestTags = []tag.Tag{nsKey, triggerKey, brokerKey, filterTypeKey}
sourceTestTags = []tag.Tag{nsKey, sourceKey, sourceResourceGroupKey, eventTypeKey, eventSourceKey}
)
func mustNewTagKey(s string) tag.Key {
tagKey, err := tag.NewKey(s)
if err != nil {
@ -81,15 +49,6 @@ func mustNewTagKey(s string) tag.Key {
return tagKey
}
func getResourceLabelValue(key string, tags []tag.Tag) string {
for _, t := range tags {
if t.Key.Name() == key {
return t.Value
}
}
return ""
}
func TestMain(m *testing.M) {
resetCurPromSrv()
// Set gcpMetadataFunc and newStackdriverExporterFunc for testing

View File

@ -20,8 +20,7 @@ package metrics
import (
"contrib.go.opencensus.io/exporter/stackdriver/monitoredresource"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opencensus.io/metric/metricdata"
"knative.dev/pkg/metrics/metricskey"
)
@ -89,75 +88,72 @@ func (ki *KnativeSource) MonitoredResource() (resType string, labels map[string]
}
func GetKnativeBrokerMonitoredResource(
v *view.View, tags []tag.Tag, gm *gcpMetadata) ([]tag.Tag, monitoredresource.Interface) {
tagsMap := getTagsMap(tags)
des *metricdata.Descriptor, tags map[string]string, gm *gcpMetadata) (map[string]string, monitoredresource.Interface) {
kb := &KnativeBroker{
// The first three resource labels are from metadata.
Project: gm.project,
Location: gm.location,
ClusterName: gm.cluster,
// The rest resource labels are from metrics labels.
NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap),
BrokerName: valueOrUnknown(metricskey.LabelBrokerName, tagsMap),
NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tags),
BrokerName: valueOrUnknown(metricskey.LabelBrokerName, tags),
}
var newTags []tag.Tag
for _, t := range tags {
metricLabels := map[string]string{}
for k, v := range tags {
// Keep the metrics labels that are not resource labels
if !metricskey.KnativeBrokerLabels.Has(t.Key.Name()) {
newTags = append(newTags, t)
if !metricskey.KnativeBrokerLabels.Has(k) {
metricLabels[k] = v
}
}
return newTags, kb
return metricLabels, kb
}
func GetKnativeTriggerMonitoredResource(
v *view.View, tags []tag.Tag, gm *gcpMetadata) ([]tag.Tag, monitoredresource.Interface) {
tagsMap := getTagsMap(tags)
des *metricdata.Descriptor, tags map[string]string, gm *gcpMetadata) (map[string]string, monitoredresource.Interface) {
kt := &KnativeTrigger{
// The first three resource labels are from metadata.
Project: gm.project,
Location: gm.location,
ClusterName: gm.cluster,
// The rest resource labels are from metrics labels.
NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap),
BrokerName: valueOrUnknown(metricskey.LabelBrokerName, tagsMap),
TriggerName: valueOrUnknown(metricskey.LabelTriggerName, tagsMap),
NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tags),
BrokerName: valueOrUnknown(metricskey.LabelBrokerName, tags),
TriggerName: valueOrUnknown(metricskey.LabelTriggerName, tags),
}
var newTags []tag.Tag
for _, t := range tags {
metricLabels := map[string]string{}
for k, v := range tags {
// Keep the metrics labels that are not resource labels
if !metricskey.KnativeTriggerLabels.Has(t.Key.Name()) {
newTags = append(newTags, t)
if !metricskey.KnativeTriggerLabels.Has(k) {
metricLabels[k] = v
}
}
return newTags, kt
return metricLabels, kt
}
func GetKnativeSourceMonitoredResource(
v *view.View, tags []tag.Tag, gm *gcpMetadata) ([]tag.Tag, monitoredresource.Interface) {
tagsMap := getTagsMap(tags)
ki := &KnativeSource{
des *metricdata.Descriptor, tags map[string]string, gm *gcpMetadata) (map[string]string, monitoredresource.Interface) {
ks := &KnativeSource{
// The first three resource labels are from metadata.
Project: gm.project,
Location: gm.location,
ClusterName: gm.cluster,
// The rest resource labels are from metrics labels.
NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap),
SourceName: valueOrUnknown(metricskey.LabelName, tagsMap),
SourceResourceGroup: valueOrUnknown(metricskey.LabelResourceGroup, tagsMap),
NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tags),
SourceName: valueOrUnknown(metricskey.LabelName, tags),
SourceResourceGroup: valueOrUnknown(metricskey.LabelResourceGroup, tags),
}
var newTags []tag.Tag
for _, t := range tags {
metricLabels := map[string]string{}
for k, v := range tags {
// Keep the metrics labels that are not resource labels
if !metricskey.KnativeSourceLabels.Has(t.Key.Name()) {
newTags = append(newTags, t)
if !metricskey.KnativeSourceLabels.Has(k) {
metricLabels[k] = v
}
}
return newTags, ki
return metricLabels, ks
}

View File

@ -18,8 +18,7 @@ package metrics
import (
"contrib.go.opencensus.io/exporter/stackdriver/monitoredresource"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opencensus.io/metric/metricdata"
"knative.dev/pkg/metrics/metricskey"
)
@ -49,27 +48,26 @@ func (kr *KnativeRevision) MonitoredResource() (resType string, labels map[strin
}
func GetKnativeRevisionMonitoredResource(
v *view.View, tags []tag.Tag, gm *gcpMetadata) ([]tag.Tag, monitoredresource.Interface) {
tagsMap := getTagsMap(tags)
des *metricdata.Descriptor, tags map[string]string, gm *gcpMetadata) (map[string]string, monitoredresource.Interface) {
kr := &KnativeRevision{
// The first three resource labels are from metadata.
Project: gm.project,
Location: gm.location,
ClusterName: gm.cluster,
// The rest resource labels are from metrics labels.
NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tagsMap),
ServiceName: valueOrUnknown(metricskey.LabelServiceName, tagsMap),
ConfigurationName: valueOrUnknown(metricskey.LabelConfigurationName, tagsMap),
RevisionName: valueOrUnknown(metricskey.LabelRevisionName, tagsMap),
NamespaceName: valueOrUnknown(metricskey.LabelNamespaceName, tags),
ServiceName: valueOrUnknown(metricskey.LabelServiceName, tags),
ConfigurationName: valueOrUnknown(metricskey.LabelConfigurationName, tags),
RevisionName: valueOrUnknown(metricskey.LabelRevisionName, tags),
}
var newTags []tag.Tag
for _, t := range tags {
metricLabels := map[string]string{}
for k, v := range tags {
// Keep the metrics labels that are not resource labels
if !metricskey.KnativeRevisionLabels.Has(t.Key.Name()) {
newTags = append(newTags, t)
if !metricskey.KnativeRevisionLabels.Has(k) {
metricLabels[k] = v
}
}
return newTags, kr
return metricLabels, kr
}

View File

@ -23,8 +23,8 @@ import (
"contrib.go.opencensus.io/exporter/stackdriver"
"contrib.go.opencensus.io/exporter/stackdriver/monitoredresource"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/zap"
"google.golang.org/api/option"
"knative.dev/pkg/metrics/metricskey"
@ -101,14 +101,20 @@ func init() {
}
func newOpencensusSDExporter(o stackdriver.Options) (view.Exporter, error) {
return stackdriver.NewExporter(o)
e, err := stackdriver.NewExporter(o)
if err == nil {
// Start the exporter.
// TODO(https://github.com/knative/pkg/issues/866): Move this to an interface.
e.StartMetricsExporter()
}
return e, nil
}
// TODO should be properly refactored to be able to inject the getMonitoredResourceFunc function.
// TODO should be properly refactored to be able to inject the getResourceByDescriptorFunc function.
// See https://github.com/knative/pkg/issues/608
func newStackdriverExporter(config *metricsConfig, logger *zap.SugaredLogger) (view.Exporter, error) {
gm := getMergedGCPMetadata(config)
mtf := getMetricTypeFunc(config.stackdriverMetricTypePrefix, config.stackdriverCustomMetricTypePrefix)
mpf := getMetricPrefixFunc(config.stackdriverMetricTypePrefix, config.stackdriverCustomMetricTypePrefix)
co, err := getStackdriverExporterClientOptions(&config.stackdriverClientConfig)
if err != nil {
logger.Warnw("Issue configuring Stackdriver exporter client options, no additional client options will be used: ", zap.Error(err))
@ -119,9 +125,9 @@ func newStackdriverExporter(config *metricsConfig, logger *zap.SugaredLogger) (v
Location: gm.location,
MonitoringClientOptions: co,
TraceClientOptions: co,
GetMetricDisplayName: mtf, // Use metric type for display name for custom metrics. No impact on built-in metrics.
GetMetricType: mtf,
GetMonitoredResource: getMonitoredResourceFunc(config.stackdriverMetricTypePrefix, gm),
GetMetricPrefix: mpf,
ResourceByDescriptor: getResourceByDescriptorFunc(config.stackdriverMetricTypePrefix, gm),
ReportingInterval: config.reportingPeriod,
DefaultMonitoringLabels: &stackdriver.Labels{},
})
if err != nil {
@ -172,39 +178,39 @@ func getMergedGCPMetadata(config *metricsConfig) *gcpMetadata {
return gm
}
func getMonitoredResourceFunc(metricTypePrefix string, gm *gcpMetadata) func(v *view.View, tags []tag.Tag) ([]tag.Tag, monitoredresource.Interface) {
return func(view *view.View, tags []tag.Tag) ([]tag.Tag, monitoredresource.Interface) {
metricType := path.Join(metricTypePrefix, view.Measure.Name())
func getResourceByDescriptorFunc(metricTypePrefix string, gm *gcpMetadata) func(*metricdata.Descriptor, map[string]string) (map[string]string, monitoredresource.Interface) {
return func(des *metricdata.Descriptor, tags map[string]string) (map[string]string, monitoredresource.Interface) {
metricType := path.Join(metricTypePrefix, des.Name)
if metricskey.KnativeRevisionMetrics.Has(metricType) {
return GetKnativeRevisionMonitoredResource(view, tags, gm)
return GetKnativeRevisionMonitoredResource(des, tags, gm)
} else if metricskey.KnativeBrokerMetrics.Has(metricType) {
return GetKnativeBrokerMonitoredResource(view, tags, gm)
return GetKnativeBrokerMonitoredResource(des, tags, gm)
} else if metricskey.KnativeTriggerMetrics.Has(metricType) {
return GetKnativeTriggerMonitoredResource(view, tags, gm)
return GetKnativeTriggerMonitoredResource(des, tags, gm)
} else if metricskey.KnativeSourceMetrics.Has(metricType) {
return GetKnativeSourceMonitoredResource(view, tags, gm)
return GetKnativeSourceMonitoredResource(des, tags, gm)
}
// Unsupported metric by knative_revision, knative_broker, knative_trigger, and knative_source, use "global" resource type.
return getGlobalMonitoredResource(view, tags)
return getGlobalMonitoredResource(des, tags)
}
}
func getGlobalMonitoredResource(v *view.View, tags []tag.Tag) ([]tag.Tag, monitoredresource.Interface) {
func getGlobalMonitoredResource(des *metricdata.Descriptor, tags map[string]string) (map[string]string, monitoredresource.Interface) {
return tags, &Global{}
}
func getMetricTypeFunc(metricTypePrefix, customMetricTypePrefix string) func(view *view.View) string {
return func(view *view.View) string {
metricType := path.Join(metricTypePrefix, view.Measure.Name())
func getMetricPrefixFunc(metricTypePrefix, customMetricTypePrefix string) func(name string) string {
return func(name string) string {
metricType := path.Join(metricTypePrefix, name)
inServing := metricskey.KnativeRevisionMetrics.Has(metricType)
inEventing := metricskey.KnativeBrokerMetrics.Has(metricType) ||
metricskey.KnativeTriggerMetrics.Has(metricType) ||
metricskey.KnativeSourceMetrics.Has(metricType)
if inServing || inEventing {
return metricType
return metricTypePrefix
}
// Unsupported metric by knative_revision, use custom domain.
return path.Join(customMetricTypePrefix, view.Measure.Name())
return customMetricTypePrefix
}
}

View File

@ -22,9 +22,8 @@ import (
"time"
"contrib.go.opencensus.io/exporter/stackdriver"
"go.opencensus.io/stats"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
. "knative.dev/pkg/logging/testing"
"knative.dev/pkg/metrics/metricskey"
)
@ -33,6 +32,31 @@ import (
// See https://github.com/knative/pkg/issues/608
var (
revisionTestTags = map[string]string{
metricskey.LabelNamespaceName: testNS,
metricskey.LabelServiceName: testService,
metricskey.LabelRouteName: testRoute, // Not a label key for knative_revision resource
metricskey.LabelRevisionName: testRevision,
}
brokerTestTags = map[string]string{
metricskey.LabelNamespaceName: testNS,
metricskey.LabelBrokerName: testBroker,
metricskey.LabelEventType: testEventType, // Not a label key for knative_broker resource
}
triggerTestTags = map[string]string{
metricskey.LabelNamespaceName: testNS,
metricskey.LabelTriggerName: testTrigger,
metricskey.LabelBrokerName: testBroker,
metricskey.LabelFilterType: testFilterType, // Not a label key for knative_trigger resource
}
sourceTestTags = map[string]string{
metricskey.LabelNamespaceName: testNS,
metricskey.LabelName: testSource,
metricskey.LabelResourceGroup: testSourceResourceGroup,
metricskey.LabelEventType: testEventType, // Not a label key for knative_source resource
metricskey.LabelEventSource: testEventSource, // Not a label key for knative_source resource
}
testGcpMetadata = gcpMetadata{
project: "test-project",
location: "test-location",
@ -148,200 +172,175 @@ func newFakeExporter(o stackdriver.Options) (view.Exporter, error) {
return &fakeExporter{}, nil
}
func TestGetMonitoredResourceFunc_UseKnativeRevision(t *testing.T) {
func TestGetResourceByDescriptorFunc_UseKnativeRevision(t *testing.T) {
for _, testCase := range supportedServingMetricsTestCases {
testView = &view.View{
testDescriptor := &metricdata.Descriptor{
Name: testCase.metricName,
Description: "Test View",
Measure: stats.Int64(testCase.metricName, "Test Measure", stats.UnitNone),
Aggregation: view.LastValue(),
TagKeys: []tag.Key{},
Type: metricdata.TypeGaugeInt64,
Unit: metricdata.UnitDimensionless,
}
mrf := getMonitoredResourceFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata)
rbd := getResourceByDescriptorFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata)
newTags, monitoredResource := mrf(testView, revisionTestTags)
gotResType, labels := monitoredResource.MonitoredResource()
metricLabels, monitoredResource := rbd(testDescriptor, revisionTestTags)
gotResType, resourceLabels := monitoredResource.MonitoredResource()
wantedResType := "knative_revision"
if gotResType != wantedResType {
t.Fatalf("MonitoredResource=%v, want %v", gotResType, wantedResType)
}
got := getResourceLabelValue(metricskey.LabelRouteName, newTags)
if got != testRoute {
t.Errorf("expected new tag: %v, got: %v", routeKey, newTags)
// revisionTestTags includes route_name, which is not a key for knative_revision resource.
if got := metricLabels[metricskey.LabelRouteName]; got != testRoute {
t.Errorf("expected metrics label: %v, got: %v", testRoute, got)
}
got, ok := labels[metricskey.LabelNamespaceName]
if !ok || got != testNS {
t.Errorf("expected label %v with value %v, got: %v", metricskey.LabelNamespaceName, testNS, got)
if got := resourceLabels[metricskey.LabelNamespaceName]; got != testNS {
t.Errorf("expected resource label %v with value %v, got: %v", metricskey.LabelNamespaceName, testNS, got)
}
got, ok = labels[metricskey.LabelConfigurationName]
if !ok || got != metricskey.ValueUnknown {
t.Errorf("expected label %v with value %v, got: %v", metricskey.LabelConfigurationName, metricskey.ValueUnknown, got)
// configuration_name is a key required by knative_revision but missed in revisionTestTags
if got := resourceLabels[metricskey.LabelConfigurationName]; got != metricskey.ValueUnknown {
t.Errorf("expected resource label %v with value %v, got: %v", metricskey.LabelConfigurationName, metricskey.ValueUnknown, got)
}
}
}
func TestGetMonitoredResourceFunc_UseKnativeBroker(t *testing.T) {
func TestGetResourceByDescriptorFunc_UseKnativeBroker(t *testing.T) {
for _, testCase := range supportedEventingBrokerMetricsTestCases {
testView = &view.View{
testDescriptor := &metricdata.Descriptor{
Name: testCase.metricName,
Description: "Test View",
Measure: stats.Int64(testCase.metricName, "Test Measure", stats.UnitDimensionless),
Aggregation: view.LastValue(),
TagKeys: []tag.Key{},
Type: metricdata.TypeGaugeInt64,
Unit: metricdata.UnitDimensionless,
}
mrf := getMonitoredResourceFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata)
rbd := getResourceByDescriptorFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata)
newTags, monitoredResource := mrf(testView, brokerTestTags)
gotResType, labels := monitoredResource.MonitoredResource()
metricLabels, monitoredResource := rbd(testDescriptor, brokerTestTags)
gotResType, resourceLabels := monitoredResource.MonitoredResource()
wantedResType := "knative_broker"
if gotResType != wantedResType {
t.Fatalf("MonitoredResource=%v, want %v", gotResType, wantedResType)
}
got := getResourceLabelValue(metricskey.LabelEventType, newTags)
if got != testEventType {
t.Errorf("expected new tag: %v, got: %v", eventTypeKey, newTags)
// brokerTestTags includes event_type, which is not a key for knative_broker resource.
if got := metricLabels[metricskey.LabelEventType]; got != testEventType {
t.Errorf("expected metrics label: %v, got: %v", testEventType, got)
}
got, ok := labels[metricskey.LabelNamespaceName]
if !ok || got != testNS {
t.Errorf("expected label %v with value %v, got: %v", metricskey.LabelNamespaceName, testNS, got)
if got := resourceLabels[metricskey.LabelNamespaceName]; got != testNS {
t.Errorf("expected resource label %v with value %v, got: %v", metricskey.LabelNamespaceName, testNS, got)
}
got, ok = labels[metricskey.LabelBrokerName]
if !ok || got != testBroker {
t.Errorf("expected label %v with value %v, got: %v", metricskey.LabelBrokerName, testBroker, got)
if got := resourceLabels[metricskey.LabelBrokerName]; got != testBroker {
t.Errorf("expected resource label %v with value %v, got: %v", metricskey.LabelBrokerName, testBroker, got)
}
}
}
func TestGetMonitoredResourceFunc_UseKnativeTrigger(t *testing.T) {
func TestGetResourceByDescriptorFunc_UseKnativeTrigger(t *testing.T) {
for _, testCase := range supportedEventingTriggerMetricsTestCases {
testView = &view.View{
testDescriptor := &metricdata.Descriptor{
Name: testCase.metricName,
Description: "Test View",
Measure: stats.Int64(testCase.metricName, "Test Measure", stats.UnitDimensionless),
Aggregation: view.LastValue(),
TagKeys: []tag.Key{},
Type: metricdata.TypeGaugeInt64,
Unit: metricdata.UnitDimensionless,
}
mrf := getMonitoredResourceFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata)
rbd := getResourceByDescriptorFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata)
newTags, monitoredResource := mrf(testView, triggerTestTags)
gotResType, labels := monitoredResource.MonitoredResource()
metricLabels, monitoredResource := rbd(testDescriptor, triggerTestTags)
gotResType, resourceLabels := monitoredResource.MonitoredResource()
wantedResType := "knative_trigger"
if gotResType != wantedResType {
t.Fatalf("MonitoredResource=%v, want %v", gotResType, wantedResType)
}
got := getResourceLabelValue(metricskey.LabelFilterType, newTags)
if got != testFilterType {
t.Errorf("expected new tag: %v, got: %v", filterTypeKey, newTags)
// triggerTestTags includes filter_type, which is not a key for knative_trigger resource.
if got := metricLabels[metricskey.LabelFilterType]; got != testFilterType {
t.Errorf("expected metrics label: %v, got: %v", testFilterType, got)
}
got, ok := labels[metricskey.LabelNamespaceName]
if !ok || got != testNS {
t.Errorf("expected label %v with value %v, got: %v", metricskey.LabelNamespaceName, testNS, got)
if got := resourceLabels[metricskey.LabelNamespaceName]; got != testNS {
t.Errorf("expected resource label %v with value %v, got: %v", metricskey.LabelNamespaceName, testNS, got)
}
got, ok = labels[metricskey.LabelBrokerName]
if !ok || got != testBroker {
t.Errorf("expected label %v with value %v, got: %v", metricskey.LabelBrokerName, testBroker, got)
if got := resourceLabels[metricskey.LabelBrokerName]; got != testBroker {
t.Errorf("expected resource label %v with value %v, got: %v", metricskey.LabelBrokerName, testBroker, got)
}
}
}
func TestGetMonitoredResourceFunc_UseKnativeSource(t *testing.T) {
func TestGetResourceByDescriptorFunc_UseKnativeSource(t *testing.T) {
for _, testCase := range supportedEventingSourceMetricsTestCases {
testView = &view.View{
testDescriptor := &metricdata.Descriptor{
Name: testCase.metricName,
Description: "Test View",
Measure: stats.Int64(testCase.metricName, "Test Measure", stats.UnitDimensionless),
Aggregation: view.LastValue(),
TagKeys: []tag.Key{},
Type: metricdata.TypeGaugeInt64,
Unit: metricdata.UnitDimensionless,
}
mrf := getMonitoredResourceFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata)
rbd := getResourceByDescriptorFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata)
newTags, monitoredResource := mrf(testView, sourceTestTags)
gotResType, labels := monitoredResource.MonitoredResource()
metricLabels, monitoredResource := rbd(testDescriptor, sourceTestTags)
gotResType, resourceLabels := monitoredResource.MonitoredResource()
wantedResType := "knative_source"
if gotResType != wantedResType {
t.Fatalf("MonitoredResource=%v, want %v", gotResType, wantedResType)
}
got := getResourceLabelValue(metricskey.LabelEventType, newTags)
if got != testEventType {
t.Errorf("expected new tag: %v, got: %v", eventTypeKey, newTags)
// sourceTestTags includes event_type, which is not a key for knative_trigger resource.
if got := metricLabels[metricskey.LabelEventType]; got != testEventType {
t.Errorf("expected metrics label: %v, got: %v", testEventType, got)
}
got = getResourceLabelValue(metricskey.LabelEventSource, newTags)
if got != testEventSource {
t.Errorf("expected new tag: %v, got: %v", eventSourceKey, newTags)
// sourceTestTags includes event_source, which is not a key for knative_trigger resource.
if got := metricLabels[metricskey.LabelEventSource]; got != testEventSource {
t.Errorf("expected metrics label: %v, got: %v", testEventSource, got)
}
got, ok := labels[metricskey.LabelNamespaceName]
if !ok || got != testNS {
t.Errorf("expected label %v with value %v, got: %v", metricskey.LabelNamespaceName, testNS, got)
if got := resourceLabels[metricskey.LabelNamespaceName]; got != testNS {
t.Errorf("expected resource label %v with value %v, got: %v", metricskey.LabelNamespaceName, testNS, got)
}
got, ok = labels[metricskey.LabelName]
if !ok || got != testSource {
t.Errorf("expected label %v with value %v, got: %v", metricskey.LabelName, testSource, got)
if got := resourceLabels[metricskey.LabelName]; got != testSource {
t.Errorf("expected resource label %v with value %v, got: %v", metricskey.LabelName, testSource, got)
}
got, ok = labels[metricskey.LabelResourceGroup]
if !ok || got != testSourceResourceGroup {
t.Errorf("expected label %v with value %v, got: %v", metricskey.LabelResourceGroup, testSourceResourceGroup, got)
if got := resourceLabels[metricskey.LabelResourceGroup]; got != testSourceResourceGroup {
t.Errorf("expected resource label %v with value %v, got: %v", metricskey.LabelResourceGroup, testSourceResourceGroup, got)
}
}
}
func TestGetMonitoredResourceFunc_UseGlobal(t *testing.T) {
func TestGetResourceByDescriptorFunc_UseGlobal(t *testing.T) {
for _, testCase := range unsupportedMetricsTestCases {
testView = &view.View{
testDescriptor := &metricdata.Descriptor{
Name: testCase.metricName,
Description: "Test View",
Measure: stats.Int64(testCase.metricName, "Test Measure", stats.UnitNone),
Aggregation: view.LastValue(),
TagKeys: []tag.Key{},
Type: metricdata.TypeGaugeInt64,
Unit: metricdata.UnitDimensionless,
}
mrf := getMonitoredResourceFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata)
mrf := getResourceByDescriptorFunc(path.Join(testCase.domain, testCase.component), &testGcpMetadata)
newTags, monitoredResource := mrf(testView, revisionTestTags)
gotResType, labels := monitoredResource.MonitoredResource()
metricLabels, monitoredResource := mrf(testDescriptor, revisionTestTags)
gotResType, resourceLabels := monitoredResource.MonitoredResource()
wantedResType := "global"
if gotResType != wantedResType {
t.Fatalf("MonitoredResource=%v, want: %v", gotResType, wantedResType)
}
got := getResourceLabelValue(metricskey.LabelNamespaceName, newTags)
if got != testNS {
t.Errorf("expected new tag %v with value %v, got: %v", routeKey, testNS, newTags)
if got := metricLabels[metricskey.LabelNamespaceName]; got != testNS {
t.Errorf("expected new tag %v with value %v, got: %v", metricskey.LabelNamespaceName, testNS, got)
}
if len(labels) != 0 {
t.Errorf("expected no label, got: %v", labels)
if len(resourceLabels) != 0 {
t.Errorf("expected no label, got: %v", resourceLabels)
}
}
}
func TestGetMetricTypeFunc_UseKnativeDomain(t *testing.T) {
func TestGetMetricPrefixFunc_UseKnativeDomain(t *testing.T) {
for _, testCase := range supportedServingMetricsTestCases {
testView = &view.View{
Description: "Test View",
Measure: stats.Int64(testCase.metricName, "Test Measure", stats.UnitNone),
Aggregation: view.LastValue(),
TagKeys: []tag.Key{},
}
mtf := getMetricTypeFunc(
path.Join(testCase.domain, testCase.component),
path.Join(defaultCustomMetricSubDomain, testCase.component))
knativePrefix := path.Join(testCase.domain, testCase.component)
customPrefix := path.Join(defaultCustomMetricSubDomain, testCase.component)
mpf := getMetricPrefixFunc(knativePrefix, customPrefix)
gotMetricType := mtf(testView)
wantedMetricType := path.Join(testCase.domain, testCase.component, testView.Measure.Name())
if gotMetricType != wantedMetricType {
t.Fatalf("getMetricType=%v, want %v", gotMetricType, wantedMetricType)
if got, want := mpf(testCase.metricName), knativePrefix; got != want {
t.Fatalf("getMetricPrefixFunc=%v, want %v", got, want)
}
}
}
func TestGetgetMetricTypeFunc_UseCustomDomain(t *testing.T) {
func TestGetMetricPrefixFunc_UseCustomDomain(t *testing.T) {
for _, testCase := range unsupportedMetricsTestCases {
testView = &view.View{
Description: "Test View",
Measure: stats.Int64(testCase.metricName, "Test Measure", stats.UnitNone),
Aggregation: view.LastValue(),
TagKeys: []tag.Key{},
}
mtf := getMetricTypeFunc(
path.Join(testCase.domain, testCase.component),
path.Join(defaultCustomMetricSubDomain, testCase.component))
knativePrefix := path.Join(testCase.domain, testCase.component)
customPrefix := path.Join(defaultCustomMetricSubDomain, testCase.component)
mpf := getMetricPrefixFunc(knativePrefix, customPrefix)
gotMetricType := mtf(testView)
wantedMetricType := path.Join(defaultCustomMetricSubDomain, testCase.component, testView.Measure.Name())
if gotMetricType != wantedMetricType {
t.Fatalf("getMetricType=%v, want %v", gotMetricType, wantedMetricType)
if got, want := mpf(testCase.metricName), customPrefix; got != want {
t.Fatalf("getMetricPrefixFunc=%v, want %v", got, want)
}
}
}

View File

@ -21,8 +21,9 @@ directly to Stackdriver Metrics.
import (
"context"
"errors"
"fmt"
"strings"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/any"
"github.com/golang/protobuf/ptypes/timestamp"
@ -34,15 +35,11 @@ import (
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
"contrib.go.opencensus.io/exporter/stackdriver/monitoredresource"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/resource"
)
var (
errLableExtraction = errors.New("error extracting labels")
errUnspecifiedMetricKind = errors.New("metric kind is unpsecified")
)
const (
exemplarAttachmentTypeString = "type.googleapis.com/google.protobuf.StringValue"
exemplarAttachmentTypeSpanCtx = "type.googleapis.com/google.monitoring.v3.SpanContext"
@ -73,9 +70,11 @@ func (se *statsExporter) handleMetricsUpload(metrics []*metricdata.Metric) {
}
func (se *statsExporter) uploadMetrics(metrics []*metricdata.Metric) error {
ctx, cancel := se.o.newContextWithTimeout()
ctx, cancel := newContextWithTimeout(se.o.Context, se.o.Timeout)
defer cancel()
var errors []error
ctx, span := trace.StartSpan(
ctx,
"contrib.go.opencensus.io/exporter/stackdriver.uploadMetrics",
@ -87,7 +86,7 @@ func (se *statsExporter) uploadMetrics(metrics []*metricdata.Metric) error {
// Now create the metric descriptor remotely.
if err := se.createMetricDescriptorFromMetric(ctx, metric); err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
//TODO: [rghetia] record error metrics.
errors = append(errors, err)
continue
}
}
@ -97,7 +96,7 @@ func (se *statsExporter) uploadMetrics(metrics []*metricdata.Metric) error {
tsl, err := se.metricToMpbTs(ctx, metric)
if err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
//TODO: [rghetia] record error metrics.
errors = append(errors, err)
continue
}
if tsl != nil {
@ -116,26 +115,35 @@ func (se *statsExporter) uploadMetrics(metrics []*metricdata.Metric) error {
for _, ctsreq := range ctsreql {
if err := createTimeSeries(ctx, se.c, ctsreq); err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
// TODO(@rghetia): record error metrics
// return err
errors = append(errors, err)
}
}
}
numErrors := len(errors)
if numErrors == 0 {
return nil
} else if numErrors == 1 {
return errors[0]
}
errMsgs := make([]string, 0, numErrors)
for _, err := range errors {
errMsgs = append(errMsgs, err.Error())
}
return fmt.Errorf("[%s]", strings.Join(errMsgs, "; "))
}
// metricToMpbTs converts a metric into a list of Stackdriver Monitoring v3 API TimeSeries
// but it doesn't invoke any remote API.
func (se *statsExporter) metricToMpbTs(ctx context.Context, metric *metricdata.Metric) ([]*monitoringpb.TimeSeries, error) {
if metric == nil {
return nil, errNilMetric
return nil, errNilMetricOrMetricDescriptor
}
resource := se.metricRscToMpbRsc(metric.Resource)
metricName := metric.Descriptor.Name
metricType, _ := se.metricTypeFromProto(metricName)
metricType := se.metricTypeFromProto(metricName)
metricLabelKeys := metric.Descriptor.LabelKeys
metricKind, _ := metricDescriptorTypeToMetricKind(metric)
@ -159,12 +167,26 @@ func (se *statsExporter) metricToMpbTs(ctx context.Context, metric *metricdata.M
// TODO: (@rghetia) perhaps log this error from labels extraction, if non-nil.
continue
}
var rsc *monitoredrespb.MonitoredResource
var mr monitoredresource.Interface
if se.o.ResourceByDescriptor != nil {
labels, mr = se.o.ResourceByDescriptor(&metric.Descriptor, labels)
// TODO(rghetia): optimize this. It is inefficient to convert this for all metrics.
rsc = convertMonitoredResourceToPB(mr)
if rsc.Type == "" {
rsc.Type = "global"
rsc.Labels = nil
}
} else {
rsc = resource
}
timeSeries = append(timeSeries, &monitoringpb.TimeSeries{
Metric: &googlemetricpb.Metric{
Type: metricType,
Labels: labels,
},
Resource: resource,
Resource: rsc,
Points: sdPoints,
})
}
@ -173,17 +195,21 @@ func (se *statsExporter) metricToMpbTs(ctx context.Context, metric *metricdata.M
}
func metricLabelsToTsLabels(defaults map[string]labelValue, labelKeys []metricdata.LabelKey, labelValues []metricdata.LabelValue) (map[string]string, error) {
// Perform this sanity check now.
if len(labelKeys) != len(labelValues) {
return nil, fmt.Errorf("length mismatch: len(labelKeys)=%d len(labelValues)=%d", len(labelKeys), len(labelValues))
}
if len(defaults)+len(labelKeys) == 0 {
return nil, nil
}
labels := make(map[string]string)
// Fill in the defaults firstly, irrespective of if the labelKeys and labelValues are mismatched.
for key, label := range defaults {
labels[sanitize(key)] = label.val
}
// Perform this sanity check now.
if len(labelKeys) != len(labelValues) {
return labels, fmt.Errorf("Length mismatch: len(labelKeys)=%d len(labelValues)=%d", len(labelKeys), len(labelValues))
}
for i, labelKey := range labelKeys {
labelValue := labelValues[i]
labels[sanitize(labelKey.Key)] = labelValue.Value
@ -195,6 +221,11 @@ func metricLabelsToTsLabels(defaults map[string]labelValue, labelKeys []metricda
// createMetricDescriptorFromMetric creates a metric descriptor from the OpenCensus metric
// and then creates it remotely using Stackdriver's API.
func (se *statsExporter) createMetricDescriptorFromMetric(ctx context.Context, metric *metricdata.Metric) error {
// Skip create metric descriptor if configured
if se.o.SkipCMD {
return nil
}
se.metricMu.Lock()
defer se.metricMu.Unlock()
@ -203,6 +234,11 @@ func (se *statsExporter) createMetricDescriptorFromMetric(ctx context.Context, m
return nil
}
if builtinMetric(se.metricTypeFromProto(name)) {
se.metricDescriptors[name] = true
return nil
}
// Otherwise, we encountered a cache-miss and
// should create the metric descriptor remotely.
inMD, err := se.metricToMpbMetricDescriptor(metric)
@ -210,35 +246,21 @@ func (se *statsExporter) createMetricDescriptorFromMetric(ctx context.Context, m
return err
}
var md *googlemetricpb.MetricDescriptor
if builtinMetric(inMD.Type) {
gmrdesc := &monitoringpb.GetMetricDescriptorRequest{
Name: inMD.Name,
}
md, err = getMetricDescriptor(ctx, se.c, gmrdesc)
} else {
cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{
Name: fmt.Sprintf("projects/%s", se.o.ProjectID),
MetricDescriptor: inMD,
}
md, err = createMetricDescriptor(ctx, se.c, cmrdesc)
}
if err == nil {
// Now record the metric as having been created.
se.metricDescriptors[name] = md
}
if err = se.createMetricDescriptor(ctx, inMD); err != nil {
return err
}
// Now record the metric as having been created.
se.metricDescriptors[name] = true
return nil
}
func (se *statsExporter) metricToMpbMetricDescriptor(metric *metricdata.Metric) (*googlemetricpb.MetricDescriptor, error) {
if metric == nil {
return nil, errNilMetric
return nil, errNilMetricOrMetricDescriptor
}
metricType, _ := se.metricTypeFromProto(metric.Descriptor.Name)
metricType := se.metricTypeFromProto(metric.Descriptor.Name)
displayName := se.displayName(metric.Descriptor.Name)
metricKind, valueType := metricDescriptorTypeToMetricKind(metric)
@ -466,11 +488,9 @@ func metricExemplarToPbExemplar(exemplar *metricdata.Exemplar, projectID string)
func attachmentsToPbAttachments(attachments metricdata.Attachments, projectID string) []*any.Any {
var pbAttachments []*any.Any
for _, v := range attachments {
switch v.(type) {
case trace.SpanContext:
spanCtx, _ := v.(trace.SpanContext)
if spanCtx, succ := v.(trace.SpanContext); succ {
pbAttachments = append(pbAttachments, toPbSpanCtxAttachment(spanCtx, projectID))
default:
} else {
// Treat everything else as plain string for now.
// TODO(songy23): add support for dropped label attachments.
pbAttachments = append(pbAttachments, toPbStringAttachment(v))

View File

@ -0,0 +1,201 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stackdriver
import (
"context"
"fmt"
"strings"
"sync"
"time"
monitoring "cloud.google.com/go/monitoring/apiv3"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
)
const (
minNumWorkers = 1
minReqsChanSize = 5
)
type metricsBatcher struct {
projectName string
allTss []*monitoringpb.TimeSeries
allErrs []error
// Counts all dropped TimeSeries by this metricsBatcher.
droppedTimeSeries int
workers []*worker
// reqsChan, respsChan and wg are shared between metricsBatcher and worker goroutines.
reqsChan chan *monitoringpb.CreateTimeSeriesRequest
respsChan chan *response
wg *sync.WaitGroup
}
func newMetricsBatcher(ctx context.Context, projectID string, numWorkers int, mc *monitoring.MetricClient, timeout time.Duration) *metricsBatcher {
if numWorkers < minNumWorkers {
numWorkers = minNumWorkers
}
workers := make([]*worker, 0, numWorkers)
reqsChanSize := numWorkers
if reqsChanSize < minReqsChanSize {
reqsChanSize = minReqsChanSize
}
reqsChan := make(chan *monitoringpb.CreateTimeSeriesRequest, reqsChanSize)
respsChan := make(chan *response, numWorkers)
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
w := newWorker(ctx, mc, reqsChan, respsChan, &wg, timeout)
workers = append(workers, w)
go w.start()
}
return &metricsBatcher{
projectName: fmt.Sprintf("projects/%s", projectID),
allTss: make([]*monitoringpb.TimeSeries, 0, maxTimeSeriesPerUpload),
droppedTimeSeries: 0,
workers: workers,
wg: &wg,
reqsChan: reqsChan,
respsChan: respsChan,
}
}
func (mb *metricsBatcher) recordDroppedTimeseries(numTimeSeries int, errs ...error) {
mb.droppedTimeSeries += numTimeSeries
for _, err := range errs {
if err != nil {
mb.allErrs = append(mb.allErrs, err)
}
}
}
func (mb *metricsBatcher) addTimeSeries(ts *monitoringpb.TimeSeries) {
mb.allTss = append(mb.allTss, ts)
if len(mb.allTss) == maxTimeSeriesPerUpload {
mb.sendReqToChan()
mb.allTss = make([]*monitoringpb.TimeSeries, 0, maxTimeSeriesPerUpload)
}
}
func (mb *metricsBatcher) close(ctx context.Context) error {
// Send any remaining time series, must be <200
if len(mb.allTss) > 0 {
mb.sendReqToChan()
}
close(mb.reqsChan)
mb.wg.Wait()
for i := 0; i < len(mb.workers); i++ {
resp := <-mb.respsChan
mb.recordDroppedTimeseries(resp.droppedTimeSeries, resp.errs...)
}
close(mb.respsChan)
numErrors := len(mb.allErrs)
if numErrors == 0 {
return nil
}
if numErrors == 1 {
return mb.allErrs[0]
}
errMsgs := make([]string, 0, numErrors)
for _, err := range mb.allErrs {
errMsgs = append(errMsgs, err.Error())
}
return fmt.Errorf("[%s]", strings.Join(errMsgs, "; "))
}
// sendReqToChan grabs all the timeseies in this metricsBatcher, puts them
// to a CreateTimeSeriesRequest and sends the request to reqsChan.
func (mb *metricsBatcher) sendReqToChan() {
req := &monitoringpb.CreateTimeSeriesRequest{
Name: mb.projectName,
TimeSeries: mb.allTss,
}
mb.reqsChan <- req
}
// sendReq sends create time series requests to Stackdriver,
// and returns the count of dropped time series and error.
func sendReq(ctx context.Context, c *monitoring.MetricClient, req *monitoringpb.CreateTimeSeriesRequest) (int, error) {
if c != nil { // c==nil only happens in unit tests where we don't make real calls to Stackdriver server
err := createTimeSeries(ctx, c, req)
if err != nil {
return len(req.TimeSeries), err
}
}
return 0, nil
}
type worker struct {
ctx context.Context
timeout time.Duration
mc *monitoring.MetricClient
resp *response
respsChan chan *response
reqsChan chan *monitoringpb.CreateTimeSeriesRequest
wg *sync.WaitGroup
}
func newWorker(
ctx context.Context,
mc *monitoring.MetricClient,
reqsChan chan *monitoringpb.CreateTimeSeriesRequest,
respsChan chan *response,
wg *sync.WaitGroup,
timeout time.Duration) *worker {
return &worker{
ctx: ctx,
mc: mc,
resp: &response{},
reqsChan: reqsChan,
respsChan: respsChan,
wg: wg,
}
}
func (w *worker) start() {
for req := range w.reqsChan {
w.sendReqWithTimeout(req)
}
w.respsChan <- w.resp
w.wg.Done()
}
func (w *worker) sendReqWithTimeout(req *monitoringpb.CreateTimeSeriesRequest) {
ctx, cancel := newContextWithTimeout(w.ctx, w.timeout)
defer cancel()
w.recordDroppedTimeseries(sendReq(ctx, w.mc, req))
}
func (w *worker) recordDroppedTimeseries(numTimeSeries int, err error) {
w.resp.droppedTimeSeries += numTimeSeries
if err != nil {
w.resp.errs = append(w.resp.errs, err)
}
}
type response struct {
droppedTimeSeries int
errs []error
}

View File

@ -24,81 +24,74 @@ import (
"errors"
"fmt"
"path"
"sort"
"strings"
"github.com/golang/protobuf/ptypes/timestamp"
"go.opencensus.io/stats"
"go.opencensus.io/trace"
"cloud.google.com/go/monitoring/apiv3"
distributionpb "google.golang.org/genproto/googleapis/api/distribution"
labelpb "google.golang.org/genproto/googleapis/api/label"
googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
"go.opencensus.io/resource"
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"go.opencensus.io/resource"
timestamppb "github.com/golang/protobuf/ptypes/timestamp"
distributionpb "google.golang.org/genproto/googleapis/api/distribution"
labelpb "google.golang.org/genproto/googleapis/api/label"
googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
)
var errNilMetric = errors.New("expecting a non-nil metric")
var errNilMetricDescriptor = errors.New("expecting a non-nil metric descriptor")
var errNilMetricOrMetricDescriptor = errors.New("non-nil metric or metric descriptor")
var percentileLabelKey = &metricspb.LabelKey{
Key: "percentile",
Description: "the value at a given percentile of a distribution",
}
var globalResource = &resource.Resource{Type: "global"}
var domains = []string{"googleapis.com", "kubernetes.io", "istio.io", "knative.dev"}
type metricProtoPayload struct {
node *commonpb.Node
resource *resourcepb.Resource
metric *metricspb.Metric
additionalLabels map[string]labelValue
}
func (se *statsExporter) addPayload(node *commonpb.Node, rsc *resourcepb.Resource, labels map[string]labelValue, metrics ...*metricspb.Metric) {
for _, metric := range metrics {
payload := &metricProtoPayload{
metric: metric,
resource: rsc,
node: node,
additionalLabels: labels,
}
se.protoMetricsBundler.Add(payload, 1)
}
}
// ExportMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring.
func (se *statsExporter) ExportMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) error {
// PushMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring synchronously,
// without de-duping or adding proto metrics to the bundler.
func (se *statsExporter) PushMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) (int, error) {
if len(metrics) == 0 {
return errNilMetric
return 0, errNilMetricOrMetricDescriptor
}
additionalLabels := se.defaultLabels
if additionalLabels == nil {
// additionalLabels must be stateless because each node is different
additionalLabels = getDefaultLabelsFromNode(node)
}
// Caches the resources seen so far
seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource)
mb := newMetricsBatcher(ctx, se.o.ProjectID, se.o.NumberOfWorkers, se.c, se.o.Timeout)
for _, metric := range metrics {
if len(metric.GetTimeseries()) == 0 {
// No TimeSeries to export, skip this metric.
continue
}
mappedRsc := se.getResource(rsc, metric, seenResources)
if metric.GetMetricDescriptor().GetType() == metricspb.MetricDescriptor_SUMMARY {
se.addPayload(node, rsc, additionalLabels, se.convertSummaryMetrics(metric)...)
summaryMtcs := se.convertSummaryMetrics(metric)
for _, summaryMtc := range summaryMtcs {
if err := se.createMetricDescriptorFromMetricProto(ctx, summaryMtc); err != nil {
mb.recordDroppedTimeseries(len(summaryMtc.GetTimeseries()), err)
continue
}
se.protoMetricToTimeSeries(ctx, mappedRsc, summaryMtc, mb)
}
} else {
se.addPayload(node, rsc, additionalLabels, metric)
if err := se.createMetricDescriptorFromMetricProto(ctx, metric); err != nil {
mb.recordDroppedTimeseries(len(metric.GetTimeseries()), err)
continue
}
se.protoMetricToTimeSeries(ctx, mappedRsc, metric, mb)
}
}
return nil
return mb.droppedTimeSeries, mb.close(ctx)
}
func (se *statsExporter) convertSummaryMetrics(summary *metricspb.Metric) []*metricspb.Metric {
var metrics []*metricspb.Metric
for _, ts := range summary.Timeseries {
var percentileTss []*metricspb.TimeSeries
var countTss []*metricspb.TimeSeries
var sumTss []*metricspb.TimeSeries
for _, ts := range summary.Timeseries {
lvs := ts.GetLabelValues()
startTime := ts.StartTimestamp
@ -141,6 +134,7 @@ func (se *statsExporter) convertSummaryMetrics(summary *metricspb.Metric) []*met
for _, percentileValue := range snapshot.GetPercentileValues() {
lvsWithPercentile := lvs[0:]
lvsWithPercentile = append(lvsWithPercentile, &metricspb.LabelValue{
HasValue: true,
Value: fmt.Sprintf("%f", percentileValue.Percentile),
})
percentileTs := &metricspb.TimeSeries{
@ -207,135 +201,22 @@ func (se *statsExporter) convertSummaryMetrics(summary *metricspb.Metric) []*met
return metrics
}
func (se *statsExporter) handleMetricsProtoUpload(payloads []*metricProtoPayload) error {
ctx, cancel := se.o.newContextWithTimeout()
defer cancel()
ctx, span := trace.StartSpan(
ctx,
"contrib.go.opencensus.io/exporter/stackdriver.uploadMetrics",
trace.WithSampler(trace.NeverSample()),
)
defer span.End()
for _, payload := range payloads {
// Now create the metric descriptor remotely.
if err := se.createMetricDescriptor(ctx, payload.metric, payload.additionalLabels); err != nil {
span.SetStatus(trace.Status{Code: 2, Message: err.Error()})
return err
func (se *statsExporter) getResource(rsc *resourcepb.Resource, metric *metricspb.Metric, seenRscs map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) *monitoredrespb.MonitoredResource {
var resource = rsc
if metric.Resource != nil {
resource = metric.Resource
}
mappedRsc, ok := seenRscs[resource]
if !ok {
mappedRsc = se.o.MapResource(resourcepbToResource(resource))
seenRscs[resource] = mappedRsc
}
var allTimeSeries []*monitoringpb.TimeSeries
for _, payload := range payloads {
tsl, err := se.protoMetricToTimeSeries(ctx, payload.node, payload.resource, payload.metric, payload.additionalLabels)
if err != nil {
span.SetStatus(trace.Status{Code: 2, Message: err.Error()})
return err
}
allTimeSeries = append(allTimeSeries, tsl...)
}
// Now batch timeseries up and then export.
for start, end := 0, 0; start < len(allTimeSeries); start = end {
end = start + maxTimeSeriesPerUpload
if end > len(allTimeSeries) {
end = len(allTimeSeries)
}
batch := allTimeSeries[start:end]
ctsreql := se.combineTimeSeriesToCreateTimeSeriesRequest(batch)
for _, ctsreq := range ctsreql {
if err := createTimeSeries(ctx, se.c, ctsreq); err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
// TODO(@odeke-em): Don't fail fast here, perhaps batch errors?
// return err
}
}
}
return nil
}
// metricSignature creates a unique signature consisting of a
// metric's type and its lexicographically sorted label values
// See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/120
func metricSignature(metric *googlemetricpb.Metric) string {
labels := metric.GetLabels()
labelValues := make([]string, 0, len(labels))
for _, labelValue := range labels {
labelValues = append(labelValues, labelValue)
}
sort.Strings(labelValues)
return fmt.Sprintf("%s:%s", metric.GetType(), strings.Join(labelValues, ","))
}
func (se *statsExporter) combineTimeSeriesToCreateTimeSeriesRequest(ts []*monitoringpb.TimeSeries) (ctsreql []*monitoringpb.CreateTimeSeriesRequest) {
if len(ts) == 0 {
return nil
}
// Since there are scenarios in which Metrics with the same Type
// can be bunched in the same TimeSeries, we have to ensure that
// we create a unique CreateTimeSeriesRequest with entirely unique Metrics
// per TimeSeries, lest we'll encounter:
//
// err: rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written:
// Field timeSeries[2] had an invalid value: Duplicate TimeSeries encountered.
// Only one point can be written per TimeSeries per request.: timeSeries[2]
//
// This scenario happens when we are using the OpenCensus Agent in which multiple metrics
// are streamed by various client applications.
// See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/73
uniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts))
nonUniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts))
seenMetrics := make(map[string]struct{})
for _, tti := range ts {
key := metricSignature(tti.Metric)
if _, alreadySeen := seenMetrics[key]; !alreadySeen {
uniqueTimeSeries = append(uniqueTimeSeries, tti)
seenMetrics[key] = struct{}{}
} else {
nonUniqueTimeSeries = append(nonUniqueTimeSeries, tti)
}
}
// UniqueTimeSeries can be bunched up together
// While for each nonUniqueTimeSeries, we have
// to make a unique CreateTimeSeriesRequest.
ctsreql = append(ctsreql, &monitoringpb.CreateTimeSeriesRequest{
Name: monitoring.MetricProjectPath(se.o.ProjectID),
TimeSeries: uniqueTimeSeries,
})
// Now recursively also combine the non-unique TimeSeries
// that were singly added to nonUniqueTimeSeries.
// The reason is that we need optimal combinations
// for optimal combinations because:
// * "a/b/c"
// * "a/b/c"
// * "x/y/z"
// * "a/b/c"
// * "x/y/z"
// * "p/y/z"
// * "d/y/z"
//
// should produce:
// CreateTimeSeries(uniqueTimeSeries) :: ["a/b/c", "x/y/z", "p/y/z", "d/y/z"]
// CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c"]
// CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c", "x/y/z"]
nonUniqueRequests := se.combineTimeSeriesToCreateTimeSeriesRequest(nonUniqueTimeSeries)
ctsreql = append(ctsreql, nonUniqueRequests...)
return ctsreql
return mappedRsc
}
func resourcepbToResource(rsc *resourcepb.Resource) *resource.Resource {
if rsc == nil {
return &resource.Resource{
Type: "global",
}
return globalResource
}
res := &resource.Resource{
Type: rsc.Type,
@ -350,92 +231,87 @@ func resourcepbToResource(rsc *resourcepb.Resource) *resource.Resource {
// protoMetricToTimeSeries converts a metric into a Stackdriver Monitoring v3 API CreateTimeSeriesRequest
// but it doesn't invoke any remote API.
func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metric *metricspb.Metric, additionalLabels map[string]labelValue) ([]*monitoringpb.TimeSeries, error) {
if metric == nil {
return nil, errNilMetric
func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, mappedRsc *monitoredrespb.MonitoredResource, metric *metricspb.Metric, mb *metricsBatcher) {
if metric == nil || metric.MetricDescriptor == nil {
mb.recordDroppedTimeseries(len(metric.GetTimeseries()), errNilMetricOrMetricDescriptor)
}
var resource = rsc
if metric.Resource != nil {
resource = metric.Resource
}
mappedRes := se.o.MapResource(resourcepbToResource(resource))
metricName, _, _, err := metricProseFromProto(metric)
if err != nil {
return nil, err
}
metricType, _ := se.metricTypeFromProto(metricName)
metricType := se.metricTypeFromProto(metric.GetMetricDescriptor().GetName())
metricLabelKeys := metric.GetMetricDescriptor().GetLabelKeys()
metricKind, _ := protoMetricDescriptorTypeToMetricKind(metric)
metricKind, valueType := protoMetricDescriptorTypeToMetricKind(metric)
labelKeys := make([]string, 0, len(metricLabelKeys))
for _, key := range metricLabelKeys {
labelKeys = append(labelKeys, sanitize(key.GetKey()))
}
timeSeries := make([]*monitoringpb.TimeSeries, 0, len(metric.Timeseries))
for _, protoTimeSeries := range metric.Timeseries {
if len(protoTimeSeries.Points) == 0 {
// No points to send just move forward.
continue
}
sdPoints, err := se.protoTimeSeriesToMonitoringPoints(protoTimeSeries, metricKind)
if err != nil {
return nil, err
mb.recordDroppedTimeseries(1, err)
continue
}
// Each TimeSeries has labelValues which MUST be correlated
// with that from the MetricDescriptor
labels, err := labelsPerTimeSeries(additionalLabels, metricLabelKeys, protoTimeSeries.GetLabelValues())
labels, err := labelsPerTimeSeries(se.defaultLabels, labelKeys, protoTimeSeries.GetLabelValues())
if err != nil {
// TODO: (@odeke-em) perhaps log this error from labels extraction, if non-nil.
mb.recordDroppedTimeseries(1, err)
continue
}
timeSeries = append(timeSeries, &monitoringpb.TimeSeries{
mb.addTimeSeries(&monitoringpb.TimeSeries{
Metric: &googlemetricpb.Metric{
Type: metricType,
Labels: labels,
},
Resource: mappedRes,
MetricKind: metricKind,
ValueType: valueType,
Resource: mappedRsc,
Points: sdPoints,
})
}
return timeSeries, nil
}
func labelsPerTimeSeries(defaults map[string]labelValue, labelKeys []*metricspb.LabelKey, labelValues []*metricspb.LabelValue) (map[string]string, error) {
func labelsPerTimeSeries(defaults map[string]labelValue, labelKeys []string, labelValues []*metricspb.LabelValue) (map[string]string, error) {
if len(labelKeys) != len(labelValues) {
return nil, fmt.Errorf("length mismatch: len(labelKeys)=%d len(labelValues)=%d", len(labelKeys), len(labelValues))
}
if len(defaults)+len(labelKeys) == 0 {
// No labels for this metric
return nil, nil
}
labels := make(map[string]string)
// Fill in the defaults firstly, irrespective of if the labelKeys and labelValues are mismatched.
for key, label := range defaults {
labels[sanitize(key)] = label.val
}
// Perform this sanity check now.
if len(labelKeys) != len(labelValues) {
return labels, fmt.Errorf("Length mismatch: len(labelKeys)=%d len(labelValues)=%d", len(labelKeys), len(labelValues))
labels[key] = label.val
}
for i, labelKey := range labelKeys {
labelValue := labelValues[i]
labels[sanitize(labelKey.GetKey())] = labelValue.GetValue()
if !labelValue.GetHasValue() {
continue
}
labels[labelKey] = labelValue.GetValue()
}
return labels, nil
}
func (se *statsExporter) protoMetricDescriptorToCreateMetricDescriptorRequest(ctx context.Context, metric *metricspb.Metric, additionalLabels map[string]labelValue) (*monitoringpb.CreateMetricDescriptorRequest, error) {
// Otherwise, we encountered a cache-miss and
// should create the metric descriptor remotely.
inMD, err := se.protoToMonitoringMetricDescriptor(metric, additionalLabels)
if err != nil {
return nil, err
func (se *statsExporter) createMetricDescriptorFromMetricProto(ctx context.Context, metric *metricspb.Metric) error {
// Skip create metric descriptor if configured
if se.o.SkipCMD {
return nil
}
cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{
Name: fmt.Sprintf("projects/%s", se.o.ProjectID),
MetricDescriptor: inMD,
}
ctx, cancel := newContextWithTimeout(ctx, se.o.Timeout)
defer cancel()
return cmrdesc, nil
}
// createMetricDescriptor creates a metric descriptor from the OpenCensus proto metric
// and then creates it remotely using Stackdriver's API.
func (se *statsExporter) createMetricDescriptor(ctx context.Context, metric *metricspb.Metric, additionalLabels map[string]labelValue) error {
se.protoMu.Lock()
defer se.protoMu.Unlock()
@ -444,46 +320,35 @@ func (se *statsExporter) createMetricDescriptor(ctx context.Context, metric *met
return nil
}
if builtinMetric(se.metricTypeFromProto(name)) {
se.protoMetricDescriptors[name] = true
return nil
}
// Otherwise, we encountered a cache-miss and
// should create the metric descriptor remotely.
inMD, err := se.protoToMonitoringMetricDescriptor(metric, additionalLabels)
inMD, err := se.protoToMonitoringMetricDescriptor(metric, se.defaultLabels)
if err != nil {
return err
}
var md *googlemetricpb.MetricDescriptor
if builtinMetric(inMD.Type) {
gmrdesc := &monitoringpb.GetMetricDescriptorRequest{
Name: inMD.Name,
}
md, err = getMetricDescriptor(ctx, se.c, gmrdesc)
} else {
cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{
Name: fmt.Sprintf("projects/%s", se.o.ProjectID),
MetricDescriptor: inMD,
}
md, err = createMetricDescriptor(ctx, se.c, cmrdesc)
}
if err == nil {
// Now record the metric as having been created.
se.protoMetricDescriptors[name] = md
}
if err = se.createMetricDescriptor(ctx, inMD); err != nil {
return err
}
func (se *statsExporter) protoTimeSeriesToMonitoringPoints(ts *metricspb.TimeSeries, metricKind googlemetricpb.MetricDescriptor_MetricKind) (sptl []*monitoringpb.Point, err error) {
for _, pt := range ts.Points {
se.protoMetricDescriptors[name] = true
return nil
}
func (se *statsExporter) protoTimeSeriesToMonitoringPoints(ts *metricspb.TimeSeries, metricKind googlemetricpb.MetricDescriptor_MetricKind) ([]*monitoringpb.Point, error) {
sptl := make([]*monitoringpb.Point, 0, len(ts.Points))
for _, pt := range ts.Points {
// If we have a last value aggregation point i.e. MetricDescriptor_GAUGE
// StartTime should be nil.
startTime := ts.StartTimestamp
if metricKind == googlemetricpb.MetricDescriptor_GAUGE {
startTime = nil
}
spt, err := fromProtoPoint(startTime, pt)
if err != nil {
return nil, err
@ -494,15 +359,15 @@ func (se *statsExporter) protoTimeSeriesToMonitoringPoints(ts *metricspb.TimeSer
}
func (se *statsExporter) protoToMonitoringMetricDescriptor(metric *metricspb.Metric, additionalLabels map[string]labelValue) (*googlemetricpb.MetricDescriptor, error) {
if metric == nil {
return nil, errNilMetric
if metric == nil || metric.MetricDescriptor == nil {
return nil, errNilMetricOrMetricDescriptor
}
metricName, description, unit, err := metricProseFromProto(metric)
if err != nil {
return nil, err
}
metricType, _ := se.metricTypeFromProto(metricName)
md := metric.GetMetricDescriptor()
metricName := md.GetName()
unit := md.GetUnit()
description := md.GetDescription()
metricType := se.metricTypeFromProto(metricName)
displayName := se.displayName(metricName)
metricKind, valueType := protoMetricDescriptorTypeToMetricKind(metric)
@ -543,32 +408,32 @@ func labelDescriptorsFromProto(defaults map[string]labelValue, protoLabelKeys []
return labelDescriptors
}
func metricProseFromProto(metric *metricspb.Metric) (name, description, unit string, err error) {
md := metric.GetMetricDescriptor()
if md == nil {
return "", "", "", errNilMetricDescriptor
func (se *statsExporter) metricTypeFromProto(name string) string {
prefix := se.o.MetricPrefix
if se.o.GetMetricPrefix != nil {
prefix = se.o.GetMetricPrefix(name)
}
if prefix != "" {
name = path.Join(prefix, name)
}
if !hasDomain(name) {
// Still needed because the name may or may not have a "/" at the beginning.
name = path.Join(defaultDomain, name)
}
return name
}
name = md.GetName()
unit = md.GetUnit()
description = md.GetDescription()
if md.Type == metricspb.MetricDescriptor_CUMULATIVE_INT64 {
// If the aggregation type is count, which counts the number of recorded measurements, the unit must be "1",
// because this view does not apply to the recorded values.
unit = stats.UnitDimensionless
// hasDomain checks if the metric name already has a domain in it.
func hasDomain(name string) bool {
for _, domain := range domains {
if strings.Contains(name, domain) {
return true
}
}
return false
}
return name, description, unit, nil
}
func (se *statsExporter) metricTypeFromProto(name string) (string, bool) {
// TODO: (@odeke-em) support non-"custom.googleapis.com" metrics names.
name = path.Join("custom.googleapis.com", "opencensus", name)
return name, true
}
func fromProtoPoint(startTime *timestamp.Timestamp, pt *metricspb.Point) (*monitoringpb.Point, error) {
func fromProtoPoint(startTime *timestamppb.Timestamp, pt *metricspb.Point) (*monitoringpb.Point, error) {
if pt == nil {
return nil, nil
}
@ -578,14 +443,13 @@ func fromProtoPoint(startTime *timestamp.Timestamp, pt *metricspb.Point) (*monit
return nil, err
}
mpt := &monitoringpb.Point{
return &monitoringpb.Point{
Value: mptv,
Interval: &monitoringpb.TimeInterval{
StartTime: startTime,
EndTime: pt.Timestamp,
},
}
return mpt, nil
}, nil
}
func protoToMetricPoint(value interface{}) (*monitoringpb.TypedValue, error) {
@ -593,8 +457,6 @@ func protoToMetricPoint(value interface{}) (*monitoringpb.TypedValue, error) {
return nil, nil
}
var err error
var tval *monitoringpb.TypedValue
switch v := value.(type) {
default:
// All the other types are not yet handled.
@ -610,21 +472,21 @@ func protoToMetricPoint(value interface{}) (*monitoringpb.TypedValue, error) {
// TODO: Add conversion from SummaryValue when
// https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/66
// has been figured out.
err = fmt.Errorf("protoToMetricPoint: unknown Data type: %T", value)
return nil, fmt.Errorf("protoToMetricPoint: unknown Data type: %T", value)
case *metricspb.Point_Int64Value:
tval = &monitoringpb.TypedValue{
return &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: v.Int64Value,
},
}
}, nil
case *metricspb.Point_DoubleValue:
tval = &monitoringpb.TypedValue{
return &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_DoubleValue{
DoubleValue: v.DoubleValue,
},
}
}, nil
case *metricspb.Point_DistributionValue:
dv := v.DistributionValue
@ -662,10 +524,8 @@ func protoToMetricPoint(value interface{}) (*monitoringpb.TypedValue, error) {
mv.DistributionValue.BucketCounts = addZeroBucketCountOnCondition(insertZeroBound, bucketCounts(dv.Buckets)...)
}
tval = &monitoringpb.TypedValue{Value: mv}
return &monitoringpb.TypedValue{Value: mv}, nil
}
return tval, err
}
func bucketCounts(buckets []*metricspb.DistributionValue_Bucket) []int64 {
@ -707,13 +567,3 @@ func protoMetricDescriptorTypeToMetricKind(m *metricspb.Metric) (googlemetricpb.
return googlemetricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, googlemetricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED
}
}
func getDefaultLabelsFromNode(node *commonpb.Node) map[string]labelValue {
taskValue := fmt.Sprintf("%s-%d@%s", strings.ToLower(node.LibraryInfo.GetLanguage().String()), node.Identifier.Pid, node.Identifier.HostName)
return map[string]labelValue{
opencensusTaskKey: {
val: taskValue,
desc: opencensusTaskDescription,
},
}
}

View File

@ -1,62 +0,0 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stackdriver
/*
Common test utilities for comparing Stackdriver metrics.
*/
import (
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
"time"
)
func timestampToTime(ts *timestamp.Timestamp) time.Time {
if ts == nil {
return time.Unix(0, 0).UTC()
}
return time.Unix(ts.Seconds, int64(ts.Nanos)).UTC()
}
func cmpResource(got, want *monitoredrespb.MonitoredResource) string {
return cmp.Diff(got, want, cmpopts.IgnoreUnexported(monitoredrespb.MonitoredResource{}))
}
func cmpTSReqs(got, want []*monitoringpb.CreateTimeSeriesRequest) string {
return cmp.Diff(got, want, cmpopts.IgnoreUnexported(monitoringpb.CreateTimeSeriesRequest{}))
}
func cmpMD(got, want *googlemetricpb.MetricDescriptor) string {
return cmp.Diff(got, want, cmpopts.IgnoreUnexported(googlemetricpb.MetricDescriptor{}))
}
func cmpMDReq(got, want *monitoringpb.CreateMetricDescriptorRequest) string {
return cmp.Diff(got, want, cmpopts.IgnoreUnexported(monitoringpb.CreateMetricDescriptorRequest{}))
}
func cmpMDReqs(got, want []*monitoringpb.CreateMetricDescriptorRequest) string {
return cmp.Diff(got, want, cmpopts.IgnoreUnexported(monitoringpb.CreateMetricDescriptorRequest{}))
}
func cmpPoint(got, want *monitoringpb.Point) string {
return cmp.Diff(got, want, cmpopts.IgnoreUnexported(monitoringpb.Point{}))
}

View File

@ -37,8 +37,12 @@ type awsIdentityDocument struct {
// This is only done once.
func retrieveAWSIdentityDocument() *awsIdentityDocument {
awsIdentityDoc := awsIdentityDocument{}
c := ec2metadata.New(session.New())
if c.Available() == false {
sesion, err := session.NewSession()
if err != nil {
return nil
}
c := ec2metadata.New(sesion)
if !c.Available() {
return nil
}
ec2InstanceIdentifyDocument, err := c.GetInstanceIdentityDocument()

View File

@ -22,7 +22,7 @@ import (
"strings"
"cloud.google.com/go/compute/metadata"
"cloud.google.com/go/container/apiv1"
container "cloud.google.com/go/container/apiv1"
containerpb "google.golang.org/genproto/googleapis/container/v1"
)

View File

@ -22,13 +22,6 @@ import (
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
)
type resourceMap struct {
// Mapping from the input resource type to the monitored resource type in Stackdriver.
srcType, dstType string
// Mapping from Stackdriver monitored resource label to an OpenCensus resource label.
labels map[string]string
}
// Resource labels that are generally internal to the exporter.
// Consider exposing these labels and a type identifier in the future to allow
// for customization.
@ -41,7 +34,7 @@ const (
)
// Mappings for the well-known OpenCensus resources to applicable Stackdriver resources.
var k8sResourceMap = map[string]string{
var k8sContainerMap = map[string]string{
"project_id": stackdriverProjectID,
"location": resourcekeys.CloudKeyZone,
"cluster_name": resourcekeys.K8SKeyClusterName,
@ -50,6 +43,21 @@ var k8sResourceMap = map[string]string{
"container_name": resourcekeys.ContainerKeyName,
}
var k8sPodMap = map[string]string{
"project_id": stackdriverProjectID,
"location": resourcekeys.CloudKeyZone,
"cluster_name": resourcekeys.K8SKeyClusterName,
"namespace_name": resourcekeys.K8SKeyNamespaceName,
"pod_name": resourcekeys.K8SKeyPodName,
}
var k8sNodeMap = map[string]string{
"project_id": stackdriverProjectID,
"location": resourcekeys.CloudKeyZone,
"cluster_name": resourcekeys.K8SKeyClusterName,
"node_name": resourcekeys.HostKeyName,
}
var gcpResourceMap = map[string]string{
"project_id": stackdriverProjectID,
"instance_id": resourcekeys.HostKeyID,
@ -72,14 +80,20 @@ var genericResourceMap = map[string]string{
"task_id": stackdriverGenericTaskID,
}
func transformResource(match, input map[string]string) map[string]string {
// returns transformed label map and true if all labels in match are found
// in input except optional project_id. It returns false if at least one label
// other than project_id is missing.
func transformResource(match, input map[string]string) (map[string]string, bool) {
output := make(map[string]string, len(input))
for dst, src := range match {
if v, ok := input[src]; ok {
v, ok := input[src]
if ok {
output[dst] = v
} else if dst != "project_id" {
return nil, true
}
}
return output
return output, false
}
func defaultMapResource(res *resource.Resource) *monitoredrespb.MonitoredResource {
@ -90,19 +104,36 @@ func defaultMapResource(res *resource.Resource) *monitoredrespb.MonitoredResourc
if res == nil || res.Labels == nil {
return result
}
if res.Type == resourcekeys.ContainerType {
switch {
case res.Type == resourcekeys.ContainerType:
result.Type = "k8s_container"
match = k8sResourceMap
} else if v, ok := res.Labels[resourcekeys.CloudKeyProvider]; ok {
if v == resourcekeys.CloudProviderGCP {
match = k8sContainerMap
case res.Type == resourcekeys.K8SType:
result.Type = "k8s_pod"
match = k8sPodMap
case res.Type == resourcekeys.HostType && res.Labels[resourcekeys.K8SKeyClusterName] != "":
result.Type = "k8s_node"
match = k8sNodeMap
case res.Labels[resourcekeys.CloudKeyProvider] == resourcekeys.CloudProviderGCP:
result.Type = "gce_instance"
match = gcpResourceMap
} else if v == resourcekeys.CloudProviderAWS {
case res.Labels[resourcekeys.CloudKeyProvider] == resourcekeys.CloudProviderAWS:
result.Type = "aws_ec2_instance"
match = awsResourceMap
}
var missing bool
result.Labels, missing = transformResource(match, res.Labels)
if missing {
result.Type = "global"
// if project id specified then transform it.
if v, ok := res.Labels[stackdriverProjectID]; ok {
result.Labels = make(map[string]string, 1)
result.Labels["project_id"] = v
}
return result
}
result.Labels = transformResource(match, res.Labels)
if result.Type == "aws_ec2_instance" {
if v, ok := result.Labels["region"]; ok {
result.Labels["region"] = fmt.Sprintf("aws:%s", v)

View File

@ -54,6 +54,7 @@ import (
"log"
"os"
"path"
"strings"
"time"
metadataapi "cloud.google.com/go/compute/metadata"
@ -61,7 +62,6 @@ import (
"contrib.go.opencensus.io/exporter/stackdriver/monitoredresource"
"go.opencensus.io/resource"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
@ -186,11 +186,9 @@ type Options struct {
// conversions from auto-detected resources to well-known Stackdriver monitored resources.
MapResource func(*resource.Resource) *monitoredrespb.MonitoredResource
// MetricPrefix overrides the prefix of a Stackdriver metric display names.
// Optional. If unset defaults to "OpenCensus/".
// Deprecated: Provide GetMetricDisplayName to change the display name of
// the metric.
// If GetMetricDisplayName is non-nil, this option is ignored.
// MetricPrefix overrides the prefix of a Stackdriver metric names.
// Optional. If unset defaults to "custom.googleapis.com/opencensus/".
// If GetMetricPrefix is non-nil, this option is ignored.
MetricPrefix string
// GetMetricDisplayName allows customizing the display name for the metric
@ -203,8 +201,16 @@ type Options struct {
// "custom.googleapis.com/opencensus/" + view.Name
//
// See: https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.metricDescriptors#MetricDescriptor
// Depreacted. Use GetMetricPrefix instead.
GetMetricType func(view *view.View) string
// GetMetricPrefix allows customizing the metric prefix for the given metric name.
// If it is not set, MetricPrefix is used. If MetricPrefix is not set, it defaults to:
// "custom.googleapis.com/opencensus/"
//
// See: https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.metricDescriptors#MetricDescriptor
GetMetricPrefix func(name string) string
// DefaultTraceAttributes will be appended to every span that is exported to
// Stackdriver Trace.
DefaultTraceAttributes map[string]interface{}
@ -238,31 +244,47 @@ type Options struct {
// If unset, context.Background() will be used.
Context context.Context
// SkipCMD enforces to skip all the CreateMetricDescriptor calls.
// These calls are important in order to configure the unit of the metrics,
// but in some cases all the exported metrics are builtin (unit is configured)
// or the unit is not important.
SkipCMD bool
// Timeout for all API calls. If not set, defaults to 5 seconds.
Timeout time.Duration
// GetMonitoredResource may be provided to supply the details of the
// monitored resource dynamically based on the tags associated with each
// data point. Most users will not need to set this, but should instead
// set the MonitoredResource field.
//
// GetMonitoredResource may add or remove tags by returning a new set of
// tags. It is safe for the function to mutate its argument and return it.
//
// See the documentation on the MonitoredResource field for guidance on the
// interaction between monitored resources and labels.
//
// The MonitoredResource field is ignored if this field is set to a non-nil
// value.
GetMonitoredResource func(*view.View, []tag.Tag) ([]tag.Tag, monitoredresource.Interface)
// ReportingInterval sets the interval between reporting metrics.
// If it is set to zero then default value is used.
ReportingInterval time.Duration
// NumberOfWorkers sets the number of go rountines that send requests
// to Stackdriver Monitoring. This is only used for Proto metrics export
// for now. The minimum number of workers is 1.
NumberOfWorkers int
// ResourceByDescriptor may be provided to supply monitored resource dynamically
// based on the metric Descriptor. Most users will not need to set this,
// but should instead set ResourceDetector.
//
// The MonitoredResource and ResourceDetector fields are ignored if this
// field is set to a non-nil value.
//
// The ResourceByDescriptor is called to derive monitored resources from
// metric.Descriptor and the label map associated with the time-series.
// If any label is used for the derived resource then it will be removed
// from the label map. The remaining labels in the map are returned to
// be used with the time-series.
//
// If the func set to this field does not return valid resource even for one
// time-series then it will result into an error for the entire CreateTimeSeries request
// which may contain more than one time-series.
ResourceByDescriptor func(*metricdata.Descriptor, map[string]string) (map[string]string, monitoredresource.Interface)
}
const defaultTimeout = 5 * time.Second
var defaultDomain = path.Join("custom.googleapis.com", "opencensus")
// Exporter is a stats and trace exporter that uploads data to Stackdriver.
//
// You can create a single Exporter and register it as both a trace exporter
@ -291,18 +313,21 @@ func NewExporter(o Options) (*Exporter, error) {
o.ProjectID = creds.ProjectID
}
if o.Location == "" {
ctx := o.Context
if ctx == nil {
ctx = context.Background()
}
if metadataapi.OnGCE() {
zone, err := metadataapi.Zone()
if err != nil {
log.Printf("Setting Stackdriver default location failed: %s", err)
// This error should be logged with a warning level.
err = fmt.Errorf("setting Stackdriver default location failed: %s", err)
if o.OnError != nil {
o.OnError(err)
} else {
log.Print(err)
}
} else {
log.Printf("Setting Stackdriver default location to %q", zone)
o.Location = zone
}
}
}
if o.MonitoredResource != nil {
o.Resource = convertMonitoredResourceToPB(o.MonitoredResource)
@ -329,6 +354,9 @@ func NewExporter(o Options) (*Exporter, error) {
o.Resource = o.MapResource(res)
}
if o.MetricPrefix != "" && !strings.HasSuffix(o.MetricPrefix, "/") {
o.MetricPrefix = o.MetricPrefix + "/"
}
se, err := newStatsExporter(o)
if err != nil {
@ -346,13 +374,21 @@ func NewExporter(o Options) (*Exporter, error) {
// ExportView exports to the Stackdriver Monitoring if view data
// has one or more rows.
// Deprecated: use ExportMetrics and StartMetricsExporter instead.
func (e *Exporter) ExportView(vd *view.Data) {
e.statsExporter.ExportView(vd)
}
// ExportMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring.
// ExportMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring synchronously,
// without de-duping or adding proto metrics to the bundler.
func (e *Exporter) ExportMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) error {
return e.statsExporter.ExportMetricsProto(ctx, node, rsc, metrics)
_, err := e.statsExporter.PushMetricsProto(ctx, node, rsc, metrics)
return err
}
// PushMetricsProto simliar with ExportMetricsProto but returns the number of dropped timeseries.
func (e *Exporter) PushMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) (int, error) {
return e.statsExporter.PushMetricsProto(ctx, node, rsc, metrics)
}
// ExportMetrics exports OpenCensus Metrics to Stackdriver Monitoring
@ -420,12 +456,10 @@ func (o Options) handleError(err error) {
log.Printf("Failed to export to Stackdriver: %v", err)
}
func (o Options) newContextWithTimeout() (context.Context, func()) {
ctx := o.Context
func newContextWithTimeout(ctx context.Context, timeout time.Duration) (context.Context, func()) {
if ctx == nil {
ctx = context.Background()
}
timeout := o.Timeout
if timeout <= 0 {
timeout = defaultTimeout
}

View File

@ -20,18 +20,19 @@ import (
"fmt"
"os"
"path"
"sort"
"strconv"
"strings"
"sync"
"time"
"go.opencensus.io"
opencensus "go.opencensus.io"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
"cloud.google.com/go/monitoring/apiv3"
monitoring "cloud.google.com/go/monitoring/apiv3"
"github.com/golang/protobuf/ptypes/timestamp"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricexport"
@ -40,6 +41,7 @@ import (
distributionpb "google.golang.org/genproto/googleapis/api/distribution"
labelpb "google.golang.org/genproto/googleapis/api/label"
"google.golang.org/genproto/googleapis/api/metric"
googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
@ -60,17 +62,13 @@ type statsExporter struct {
o Options
viewDataBundler *bundler.Bundler
protoMetricsBundler *bundler.Bundler
metricsBundler *bundler.Bundler
createdViewsMu sync.Mutex
createdViews map[string]*metricpb.MetricDescriptor // Views already created remotely
protoMu sync.Mutex
protoMetricDescriptors map[string]*metricpb.MetricDescriptor // Saves the metric descriptors that were already created remotely
protoMetricDescriptors map[string]bool // Metric descriptors that were already created remotely
metricMu sync.Mutex
metricDescriptors map[string]*metricpb.MetricDescriptor // Saves the metric descriptors that were already created remotely
metricDescriptors map[string]bool // Metric descriptors that were already created remotely
c *monitoring.MetricClient
defaultLabels map[string]labelValue
@ -92,8 +90,10 @@ func newStatsExporter(o Options) (*statsExporter, error) {
}
opts := append(o.MonitoringClientOptions, option.WithUserAgent(userAgent))
ctx, cancel := o.newContextWithTimeout()
defer cancel()
ctx := o.Context
if ctx == nil {
ctx = context.Background()
}
client, err := monitoring.NewMetricClient(ctx, opts...)
if err != nil {
return nil, err
@ -101,39 +101,39 @@ func newStatsExporter(o Options) (*statsExporter, error) {
e := &statsExporter{
c: client,
o: o,
createdViews: make(map[string]*metricpb.MetricDescriptor),
protoMetricDescriptors: make(map[string]*metricpb.MetricDescriptor),
metricDescriptors: make(map[string]*metricpb.MetricDescriptor),
protoMetricDescriptors: make(map[string]bool),
metricDescriptors: make(map[string]bool),
}
var defaultLablesNotSanitized map[string]labelValue
if o.DefaultMonitoringLabels != nil {
e.defaultLabels = o.DefaultMonitoringLabels.m
defaultLablesNotSanitized = o.DefaultMonitoringLabels.m
} else {
e.defaultLabels = map[string]labelValue{
defaultLablesNotSanitized = map[string]labelValue{
opencensusTaskKey: {val: getTaskValue(), desc: opencensusTaskDescription},
}
}
e.defaultLabels = make(map[string]labelValue)
// Fill in the defaults firstly, irrespective of if the labelKeys and labelValues are mismatched.
for key, label := range defaultLablesNotSanitized {
e.defaultLabels[sanitize(key)] = label
}
e.viewDataBundler = bundler.NewBundler((*view.Data)(nil), func(bundle interface{}) {
vds := bundle.([]*view.Data)
e.handleUpload(vds...)
})
e.protoMetricsBundler = bundler.NewBundler((*metricProtoPayload)(nil), func(bundle interface{}) {
payloads := bundle.([]*metricProtoPayload)
e.handleMetricsProtoUpload(payloads)
})
e.metricsBundler = bundler.NewBundler((*metricdata.Metric)(nil), func(bundle interface{}) {
metrics := bundle.([]*metricdata.Metric)
e.handleMetricsUpload(metrics)
})
if delayThreshold := e.o.BundleDelayThreshold; delayThreshold > 0 {
e.viewDataBundler.DelayThreshold = delayThreshold
e.protoMetricsBundler.DelayThreshold = delayThreshold
e.metricsBundler.DelayThreshold = delayThreshold
}
if countThreshold := e.o.BundleCountThreshold; countThreshold > 0 {
e.viewDataBundler.BundleCountThreshold = countThreshold
e.protoMetricsBundler.BundleCountThreshold = countThreshold
e.metricsBundler.BundleCountThreshold = countThreshold
}
return e, nil
@ -141,7 +141,7 @@ func newStatsExporter(o Options) (*statsExporter, error) {
func (e *statsExporter) startMetricsReader() error {
e.initReaderOnce.Do(func() {
e.ir, _ = metricexport.NewIntervalReader(&metricexport.Reader{}, e)
e.ir, _ = metricexport.NewIntervalReader(metricexport.NewReader(), e)
})
e.ir.ReportingInterval = e.o.ReportingInterval
return e.ir.Start()
@ -154,10 +154,6 @@ func (e *statsExporter) stopMetricsReader() {
}
func (e *statsExporter) getMonitoredResource(v *view.View, tags []tag.Tag) ([]tag.Tag, *monitoredrespb.MonitoredResource) {
if get := e.o.GetMonitoredResource; get != nil {
newTags, mr := get(v, tags)
return newTags, convertMonitoredResourceToPB(mr)
}
resource := e.o.Resource
if resource == nil {
resource = &monitoredrespb.MonitoredResource{
@ -208,12 +204,11 @@ func (e *statsExporter) handleUpload(vds ...*view.Data) {
// want to lose data that hasn't yet been exported.
func (e *statsExporter) Flush() {
e.viewDataBundler.Flush()
e.protoMetricsBundler.Flush()
e.metricsBundler.Flush()
}
func (e *statsExporter) uploadStats(vds []*view.Data) error {
ctx, cancel := e.o.newContextWithTimeout()
ctx, cancel := newContextWithTimeout(e.o.Context, e.o.Timeout)
defer cancel()
ctx, span := trace.StartSpan(
ctx,
@ -223,7 +218,7 @@ func (e *statsExporter) uploadStats(vds []*view.Data) error {
defer span.End()
for _, vd := range vds {
if err := e.createMeasure(ctx, vd.View); err != nil {
if err := e.createMetricDescriptorFromView(ctx, vd.View); err != nil {
span.SetStatus(trace.Status{Code: 2, Message: err.Error()})
return err
}
@ -332,34 +327,27 @@ func (e *statsExporter) viewToMetricDescriptor(ctx context.Context, v *view.View
return res, nil
}
func (e *statsExporter) viewToCreateMetricDescriptorRequest(ctx context.Context, v *view.View) (*monitoringpb.CreateMetricDescriptorRequest, error) {
inMD, err := e.viewToMetricDescriptor(ctx, v)
if err != nil {
return nil, err
}
cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{
Name: fmt.Sprintf("projects/%s", e.o.ProjectID),
MetricDescriptor: inMD,
}
return cmrdesc, nil
}
// createMeasure creates a MetricDescriptor for the given view data in Stackdriver Monitoring.
// createMetricDescriptorFromView creates a MetricDescriptor for the given view data in Stackdriver Monitoring.
// An error will be returned if there is already a metric descriptor created with the same name
// but it has a different aggregation or keys.
func (e *statsExporter) createMeasure(ctx context.Context, v *view.View) error {
e.createdViewsMu.Lock()
defer e.createdViewsMu.Unlock()
func (e *statsExporter) createMetricDescriptorFromView(ctx context.Context, v *view.View) error {
// Skip create metric descriptor if configured
if e.o.SkipCMD {
return nil
}
e.metricMu.Lock()
defer e.metricMu.Unlock()
viewName := v.Name
if md, ok := e.createdViews[viewName]; ok {
// [TODO:rghetia] Temporary fix for https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/76#issuecomment-459459091
if builtinMetric(md.Type) {
if _, created := e.metricDescriptors[viewName]; created {
return nil
}
return e.equalMeasureAggTagKeys(md, v.Measure, v.Aggregation, v.TagKeys)
if builtinMetric(e.metricType(v)) {
e.metricDescriptors[viewName] = true
return nil
}
inMD, err := e.viewToMetricDescriptor(ctx, v)
@ -367,34 +355,92 @@ func (e *statsExporter) createMeasure(ctx context.Context, v *view.View) error {
return err
}
var dmd *metric.MetricDescriptor
if builtinMetric(inMD.Type) {
gmrdesc := &monitoringpb.GetMetricDescriptorRequest{
Name: inMD.Name,
}
dmd, err = getMetricDescriptor(ctx, e.c, gmrdesc)
} else {
cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{
Name: fmt.Sprintf("projects/%s", e.o.ProjectID),
MetricDescriptor: inMD,
}
dmd, err = createMetricDescriptor(ctx, e.c, cmrdesc)
}
if err != nil {
if err = e.createMetricDescriptor(ctx, inMD); err != nil {
return err
}
// Now cache the metric descriptor
e.createdViews[viewName] = dmd
return err
e.metricDescriptors[viewName] = true
return nil
}
func (e *statsExporter) displayName(suffix string) string {
displayNamePrefix := defaultDisplayNamePrefix
if e.o.MetricPrefix != "" {
displayNamePrefix = e.o.MetricPrefix
return path.Join(defaultDisplayNamePrefix, suffix)
}
return path.Join(displayNamePrefix, suffix)
func (e *statsExporter) combineTimeSeriesToCreateTimeSeriesRequest(ts []*monitoringpb.TimeSeries) (ctsreql []*monitoringpb.CreateTimeSeriesRequest) {
if len(ts) == 0 {
return nil
}
// Since there are scenarios in which Metrics with the same Type
// can be bunched in the same TimeSeries, we have to ensure that
// we create a unique CreateTimeSeriesRequest with entirely unique Metrics
// per TimeSeries, lest we'll encounter:
//
// err: rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written:
// Field timeSeries[2] had an invalid value: Duplicate TimeSeries encountered.
// Only one point can be written per TimeSeries per request.: timeSeries[2]
//
// This scenario happens when we are using the OpenCensus Agent in which multiple metrics
// are streamed by various client applications.
// See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/73
uniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts))
nonUniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts))
seenMetrics := make(map[string]struct{})
for _, tti := range ts {
key := metricSignature(tti.Metric)
if _, alreadySeen := seenMetrics[key]; !alreadySeen {
uniqueTimeSeries = append(uniqueTimeSeries, tti)
seenMetrics[key] = struct{}{}
} else {
nonUniqueTimeSeries = append(nonUniqueTimeSeries, tti)
}
}
// UniqueTimeSeries can be bunched up together
// While for each nonUniqueTimeSeries, we have
// to make a unique CreateTimeSeriesRequest.
ctsreql = append(ctsreql, &monitoringpb.CreateTimeSeriesRequest{
Name: fmt.Sprintf("projects/%s", e.o.ProjectID),
TimeSeries: uniqueTimeSeries,
})
// Now recursively also combine the non-unique TimeSeries
// that were singly added to nonUniqueTimeSeries.
// The reason is that we need optimal combinations
// for optimal combinations because:
// * "a/b/c"
// * "a/b/c"
// * "x/y/z"
// * "a/b/c"
// * "x/y/z"
// * "p/y/z"
// * "d/y/z"
//
// should produce:
// CreateTimeSeries(uniqueTimeSeries) :: ["a/b/c", "x/y/z", "p/y/z", "d/y/z"]
// CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c"]
// CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c", "x/y/z"]
nonUniqueRequests := e.combineTimeSeriesToCreateTimeSeriesRequest(nonUniqueTimeSeries)
ctsreql = append(ctsreql, nonUniqueRequests...)
return ctsreql
}
// metricSignature creates a unique signature consisting of a
// metric's type and its lexicographically sorted label values
// See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/120
func metricSignature(metric *googlemetricpb.Metric) string {
labels := metric.GetLabels()
labelValues := make([]string, 0, len(labels))
for _, labelValue := range labels {
labelValues = append(labelValues, labelValue)
}
sort.Strings(labelValues)
return fmt.Sprintf("%s:%s", metric.GetType(), strings.Join(labelValues, ","))
}
func newPoint(v *view.View, row *view.Row, start, end time.Time) *monitoringpb.Point {
@ -546,61 +592,21 @@ func newLabelDescriptors(defaults map[string]labelValue, keys []tag.Key) []*labe
return labelDescriptors
}
func (e *statsExporter) equalMeasureAggTagKeys(md *metricpb.MetricDescriptor, m stats.Measure, agg *view.Aggregation, keys []tag.Key) error {
var aggTypeMatch bool
switch md.ValueType {
case metricpb.MetricDescriptor_INT64:
if _, ok := m.(*stats.Int64Measure); !(ok || agg.Type == view.AggTypeCount) {
return fmt.Errorf("stackdriver metric descriptor was not created as int64")
func (e *statsExporter) createMetricDescriptor(ctx context.Context, md *metric.MetricDescriptor) error {
ctx, cancel := newContextWithTimeout(ctx, e.o.Timeout)
defer cancel()
cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{
Name: fmt.Sprintf("projects/%s", e.o.ProjectID),
MetricDescriptor: md,
}
aggTypeMatch = agg.Type == view.AggTypeCount || agg.Type == view.AggTypeSum || agg.Type == view.AggTypeLastValue
case metricpb.MetricDescriptor_DOUBLE:
if _, ok := m.(*stats.Float64Measure); !ok {
return fmt.Errorf("stackdriver metric descriptor was not created as double")
}
aggTypeMatch = agg.Type == view.AggTypeSum || agg.Type == view.AggTypeLastValue
case metricpb.MetricDescriptor_DISTRIBUTION:
aggTypeMatch = agg.Type == view.AggTypeDistribution
}
if !aggTypeMatch {
return fmt.Errorf("stackdriver metric descriptor was not created with aggregation type %T", agg.Type)
}
labels := make(map[string]struct{}, len(keys)+len(e.defaultLabels))
for _, k := range keys {
labels[sanitize(k.Name())] = struct{}{}
}
for k := range e.defaultLabels {
labels[sanitize(k)] = struct{}{}
}
for _, k := range md.Labels {
if _, ok := labels[k.Key]; !ok {
return fmt.Errorf("stackdriver metric descriptor %q was not created with label %q", md.Type, k)
}
delete(labels, k.Key)
}
if len(labels) > 0 {
extra := make([]string, 0, len(labels))
for k := range labels {
extra = append(extra, k)
}
return fmt.Errorf("stackdriver metric descriptor %q contains unexpected labels: %s", md.Type, strings.Join(extra, ", "))
}
return nil
_, err := createMetricDescriptor(ctx, e.c, cmrdesc)
return err
}
var createMetricDescriptor = func(ctx context.Context, c *monitoring.MetricClient, mdr *monitoringpb.CreateMetricDescriptorRequest) (*metric.MetricDescriptor, error) {
return c.CreateMetricDescriptor(ctx, mdr)
}
var getMetricDescriptor = func(ctx context.Context, c *monitoring.MetricClient, mdr *monitoringpb.GetMetricDescriptorRequest) (*metric.MetricDescriptor, error) {
return c.GetMetricDescriptor(ctx, mdr)
}
var createTimeSeries = func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error {
return c.CreateTimeSeries(ctx, ts)
}

View File

@ -121,7 +121,7 @@ func (e *traceExporter) uploadSpans(spans []*tracepb.Span) {
Spans: spans,
}
// Create a never-sampled span to prevent traces associated with exporter.
ctx, cancel := e.o.newContextWithTimeout()
ctx, cancel := newContextWithTimeout(e.o.Context, e.o.Timeout)
defer cancel()
ctx, span := trace.StartSpan(
ctx,