diff --git a/metrics/config.go b/metrics/config.go index 16d7e5f33..8c24e788a 100644 --- a/metrics/config.go +++ b/metrics/config.go @@ -84,7 +84,7 @@ type metricsConfig struct { // recorder provides a hook for performing custom transformations before // writing the metrics to the stats.RecordWithOptions interface. - recorder func(context.Context, stats.Measurement, ...stats.Options) error + recorder func(context.Context, []stats.Measurement, ...stats.Options) error // ---- OpenCensus specific below ---- // collectorAddress is the address of the collector, if not `localhost:55678` @@ -144,13 +144,13 @@ func NewStackdriverClientConfigFromMap(config map[string]string) *StackdriverCli } } -// Record applies the `ros` Options to `ms` and then records the resulting +// record applies the `ros` Options to each measurement in `mss` and then records the resulting // measurements in the metricsConfig's designated backend. -func (mc *metricsConfig) Record(ctx context.Context, ms stats.Measurement, ros ...stats.Options) error { +func (mc *metricsConfig) record(ctx context.Context, mss []stats.Measurement, ros ...stats.Options) error { if mc == nil || mc.recorder == nil { - return stats.RecordWithOptions(ctx, append(ros, stats.WithMeasurements(ms))...) + return stats.RecordWithOptions(ctx, append(ros, stats.WithMeasurements(mss...))...) } - return mc.recorder(ctx, ms, ros...) + return mc.recorder(ctx, mss, ros...) } func createMetricsConfig(ops ExporterOptions, logger *zap.SugaredLogger) (*metricsConfig, error) { @@ -235,14 +235,24 @@ func createMetricsConfig(ops ExporterOptions, logger *zap.SugaredLogger) (*metri if !allowCustomMetrics { servingOrEventing := metricskey.KnativeRevisionMetrics.Union( metricskey.KnativeTriggerMetrics).Union(metricskey.KnativeBrokerMetrics) - mc.recorder = func(ctx context.Context, ms stats.Measurement, ros ...stats.Options) error { - metricType := path.Join(mc.stackdriverMetricTypePrefix, ms.Measure().Name()) - - if servingOrEventing.Has(metricType) { - return stats.RecordWithOptions(ctx, append(ros, stats.WithMeasurements(ms))...) + mc.recorder = func(ctx context.Context, mss []stats.Measurement, ros ...stats.Options) error { + // Perform array filtering in place using two indices: w(rite)Index and r(ead)Index. + wIdx := 0 + for rIdx := 0; rIdx < len(mss); rIdx++ { + metricType := path.Join(mc.stackdriverMetricTypePrefix, mss[rIdx].Measure().Name()) + if servingOrEventing.Has(metricType) { + mss[wIdx] = mss[rIdx] + wIdx++ + } + // Otherwise, skip the measurement (because it won't be accepted). } - // Otherwise, skip (because it won't be accepted) - return nil + // Found no matched metrics. + if wIdx == 0 { + return nil + } + // Trim the list to the number of written objects. + mss = mss[:wIdx] + return stats.RecordWithOptions(ctx, append(ros, stats.WithMeasurements(mss...))...) } } } diff --git a/metrics/record.go b/metrics/record.go index 397f9be77..d6e433a17 100644 --- a/metrics/record.go +++ b/metrics/record.go @@ -27,9 +27,12 @@ import ( // Record stores the given Measurement from `ms` in the current metrics backend. func Record(ctx context.Context, ms stats.Measurement, ros ...stats.Options) { - mc := getCurMetricsConfig() + getCurMetricsConfig().record(ctx, []stats.Measurement{ms}, ros...) +} - mc.Record(ctx, ms, ros...) +// RecordBatch stores the given Measurements from `mss` in the current metrics backend. +func RecordBatch(ctx context.Context, mss ...stats.Measurement) { + getCurMetricsConfig().record(ctx, mss) } // Buckets125 generates an array of buckets with approximate powers-of-two diff --git a/metrics/record_test.go b/metrics/record_test.go index dbfebad76..44bd2eddf 100644 --- a/metrics/record_test.go +++ b/metrics/record_test.go @@ -38,68 +38,87 @@ type cases struct { func TestRecordServing(t *testing.T) { measure := stats.Int64("request_count", "Number of reconcile operations", stats.UnitNone) - shouldReportCases := []cases{ - // Increase the measurement value for each test case so that checking - // the last value ensures the measurement has been recorded. - { - name: "none stackdriver backend", - metricsConfig: &metricsConfig{}, - measurement: measure.M(1), - }, { - name: "stackdriver backend with supported metric", - metricsConfig: &metricsConfig{ - isStackdriverBackend: true, - stackdriverMetricTypePrefix: "knative.dev/internal/serving/activator", - }, - measurement: measure.M(2), - }, { - name: "stackdriver backend with unsupported metric and allow custom metric", - metricsConfig: &metricsConfig{ - isStackdriverBackend: true, - stackdriverMetricTypePrefix: "knative.dev/unsupported", - }, - measurement: measure.M(3), - }, { - name: "empty metricsConfig", - measurement: measure.M(4), + // Increase the measurement value for each test case so that checking + // the last value ensures the measurement has been recorded. + shouldReportCases := []cases{{ + name: "none stackdriver backend", + metricsConfig: &metricsConfig{}, + measurement: measure.M(1), + }, { + name: "stackdriver backend with supported metric", + metricsConfig: &metricsConfig{ + isStackdriverBackend: true, + stackdriverMetricTypePrefix: "knative.dev/internal/serving/activator", }, - } + measurement: measure.M(2), + }, { + name: "stackdriver backend with unsupported metric and allow custom metric", + metricsConfig: &metricsConfig{ + isStackdriverBackend: true, + stackdriverMetricTypePrefix: "knative.dev/unsupported", + }, + measurement: measure.M(3), + }, { + name: "empty metricsConfig", + measurement: measure.M(4), + }} testRecord(t, measure, shouldReportCases) } func TestRecordEventing(t *testing.T) { measure := stats.Int64("event_count", "Number of event received", stats.UnitNone) - shouldReportCases := []cases{ - // Increase the measurement value for each test case so that checking - // the last value ensures the measurement has been recorded. - { - name: "none stackdriver backend", - metricsConfig: &metricsConfig{}, - measurement: measure.M(1), - }, { - name: "stackdriver backend with supported metric", - metricsConfig: &metricsConfig{ - isStackdriverBackend: true, - stackdriverMetricTypePrefix: "knative.dev/eventing/broker", - }, - measurement: measure.M(5), - }, { - name: "stackdriver backend with unsupported metric and allow custom metric", - metricsConfig: &metricsConfig{ - isStackdriverBackend: true, - stackdriverMetricTypePrefix: "knative.dev/unsupported", - }, - measurement: measure.M(3), - }, { - name: "empty metricsConfig", - measurement: measure.M(4), + // Increase the measurement value for each test case so that checking + // the last value ensures the measurement has been recorded. + shouldReportCases := []cases{{ + name: "none stackdriver backend", + metricsConfig: &metricsConfig{}, + measurement: measure.M(1), + }, { + name: "stackdriver backend with supported metric", + metricsConfig: &metricsConfig{ + isStackdriverBackend: true, + stackdriverMetricTypePrefix: "knative.dev/eventing/broker", }, - } + measurement: measure.M(2), + }, { + name: "stackdriver backend with unsupported metric and allow custom metric", + metricsConfig: &metricsConfig{ + isStackdriverBackend: true, + stackdriverMetricTypePrefix: "knative.dev/unsupported", + }, + measurement: measure.M(3), + }, { + name: "empty metricsConfig", + measurement: measure.M(4), + }} testRecord(t, measure, shouldReportCases) } +func TestRecordBatch(t *testing.T) { + ctx := context.Background() + measure1 := stats.Int64("count1", "First counter", stats.UnitNone) + measure2 := stats.Int64("count2", "Second counter", stats.UnitNone) + v := []*view.View{&view.View{ + Measure: measure1, + Aggregation: view.LastValue(), + }, &view.View{ + Measure: measure2, + Aggregation: view.LastValue(), + }} + view.Register(v...) + defer view.Unregister(v...) + metricsConfig := &metricsConfig{} + measurement1 := measure1.M(1984) + measurement2 := measure2.M(42) + setCurMetricsConfig(metricsConfig) + RecordBatch(ctx, measurement1, measurement2) + metricstest.CheckLastValueData(t, measurement1.Measure().Name(), map[string]string{}, 1984) + metricstest.CheckLastValueData(t, measurement2.Measure().Name(), map[string]string{}, 42) +} + func testRecord(t *testing.T, measure *stats.Int64Measure, shouldReportCases []cases) { - ctx := context.TODO() + t.Helper() + ctx := context.Background() v := &view.View{ Measure: measure, Aggregation: view.LastValue(), @@ -117,30 +136,28 @@ func testRecord(t *testing.T, measure *stats.Int64Measure, shouldReportCases []c name string metricsConfig *metricsConfig measurement stats.Measurement - }{ - // Use a different value for the measurement other than the last one of shouldReportCases - { - name: "stackdriver backend with unsupported metric but not allow custom metric", - metricsConfig: &metricsConfig{ - isStackdriverBackend: true, - stackdriverMetricTypePrefix: "knative.dev/unsupported", - recorder: func(ctx context.Context, ms stats.Measurement, ros ...stats.Options) error { - metricType := path.Join("knative.dev/unsupported", ms.Measure().Name()) - if metricskey.KnativeRevisionMetrics.Has(metricType) || metricskey.KnativeTriggerMetrics.Has(metricType) { - ros = append(ros, stats.WithMeasurements(ms)) - return stats.RecordWithOptions(ctx, ros...) - } - return nil - }, + }{{ // Use a different value for the measurement other than the last one of shouldReportCases + name: "stackdriver backend with unsupported metric but not allow custom metric", + metricsConfig: &metricsConfig{ + isStackdriverBackend: true, + stackdriverMetricTypePrefix: "knative.dev/unsupported", + recorder: func(ctx context.Context, mss []stats.Measurement, ros ...stats.Options) error { + metricType := path.Join("knative.dev/unsupported", mss[0].Measure().Name()) + if metricskey.KnativeRevisionMetrics.Has(metricType) || metricskey.KnativeTriggerMetrics.Has(metricType) { + ros = append(ros, stats.WithMeasurements(mss[0])) + return stats.RecordWithOptions(ctx, ros...) + } + return nil }, - measurement: measure.M(5), }, - } + measurement: measure.M(5), + }} for _, test := range shouldNotReportCases { setCurMetricsConfig(test.metricsConfig) Record(ctx, test.measurement) - metricstest.CheckLastValueData(t, test.measurement.Measure().Name(), map[string]string{}, 4) // The value is still the last one of shouldReportCases + metricstest.CheckLastValueData(t, test.measurement.Measure().Name(), map[string]string{}, + float64(len(shouldReportCases))) // The value is still the last one of shouldReportCases } }