From 92c9752a4c44fabaa385a59a0f9c2ea245aa0f16 Mon Sep 17 00:00:00 2001 From: Victor Agababov Date: Fri, 31 Jan 2020 11:26:28 -0800 Subject: [PATCH] Add RecordN method to permit recording of N measurements simultaneously. (#1025) * Add RecordN method to permit recording of N measurements simultaneously. In serving we have many places where we try to record several metrics at the same time. Besides very repetetive code, this also brings inefficiences, since metrics code has to do _some_ work for every call, which is completely avoidable. This way we can record N measurements with one hit. * tests and nits * review items per discussion with Evan: - remove ros - make measurements ... * review --- metrics/config.go | 34 +++++---- metrics/record.go | 7 +- metrics/record_test.go | 155 +++++++++++++++++++++++------------------ 3 files changed, 113 insertions(+), 83 deletions(-) diff --git a/metrics/config.go b/metrics/config.go index 16d7e5f33..8c24e788a 100644 --- a/metrics/config.go +++ b/metrics/config.go @@ -84,7 +84,7 @@ type metricsConfig struct { // recorder provides a hook for performing custom transformations before // writing the metrics to the stats.RecordWithOptions interface. - recorder func(context.Context, stats.Measurement, ...stats.Options) error + recorder func(context.Context, []stats.Measurement, ...stats.Options) error // ---- OpenCensus specific below ---- // collectorAddress is the address of the collector, if not `localhost:55678` @@ -144,13 +144,13 @@ func NewStackdriverClientConfigFromMap(config map[string]string) *StackdriverCli } } -// Record applies the `ros` Options to `ms` and then records the resulting +// record applies the `ros` Options to each measurement in `mss` and then records the resulting // measurements in the metricsConfig's designated backend. -func (mc *metricsConfig) Record(ctx context.Context, ms stats.Measurement, ros ...stats.Options) error { +func (mc *metricsConfig) record(ctx context.Context, mss []stats.Measurement, ros ...stats.Options) error { if mc == nil || mc.recorder == nil { - return stats.RecordWithOptions(ctx, append(ros, stats.WithMeasurements(ms))...) + return stats.RecordWithOptions(ctx, append(ros, stats.WithMeasurements(mss...))...) } - return mc.recorder(ctx, ms, ros...) + return mc.recorder(ctx, mss, ros...) } func createMetricsConfig(ops ExporterOptions, logger *zap.SugaredLogger) (*metricsConfig, error) { @@ -235,14 +235,24 @@ func createMetricsConfig(ops ExporterOptions, logger *zap.SugaredLogger) (*metri if !allowCustomMetrics { servingOrEventing := metricskey.KnativeRevisionMetrics.Union( metricskey.KnativeTriggerMetrics).Union(metricskey.KnativeBrokerMetrics) - mc.recorder = func(ctx context.Context, ms stats.Measurement, ros ...stats.Options) error { - metricType := path.Join(mc.stackdriverMetricTypePrefix, ms.Measure().Name()) - - if servingOrEventing.Has(metricType) { - return stats.RecordWithOptions(ctx, append(ros, stats.WithMeasurements(ms))...) + mc.recorder = func(ctx context.Context, mss []stats.Measurement, ros ...stats.Options) error { + // Perform array filtering in place using two indices: w(rite)Index and r(ead)Index. + wIdx := 0 + for rIdx := 0; rIdx < len(mss); rIdx++ { + metricType := path.Join(mc.stackdriverMetricTypePrefix, mss[rIdx].Measure().Name()) + if servingOrEventing.Has(metricType) { + mss[wIdx] = mss[rIdx] + wIdx++ + } + // Otherwise, skip the measurement (because it won't be accepted). } - // Otherwise, skip (because it won't be accepted) - return nil + // Found no matched metrics. + if wIdx == 0 { + return nil + } + // Trim the list to the number of written objects. + mss = mss[:wIdx] + return stats.RecordWithOptions(ctx, append(ros, stats.WithMeasurements(mss...))...) } } } diff --git a/metrics/record.go b/metrics/record.go index 397f9be77..d6e433a17 100644 --- a/metrics/record.go +++ b/metrics/record.go @@ -27,9 +27,12 @@ import ( // Record stores the given Measurement from `ms` in the current metrics backend. func Record(ctx context.Context, ms stats.Measurement, ros ...stats.Options) { - mc := getCurMetricsConfig() + getCurMetricsConfig().record(ctx, []stats.Measurement{ms}, ros...) +} - mc.Record(ctx, ms, ros...) +// RecordBatch stores the given Measurements from `mss` in the current metrics backend. +func RecordBatch(ctx context.Context, mss ...stats.Measurement) { + getCurMetricsConfig().record(ctx, mss) } // Buckets125 generates an array of buckets with approximate powers-of-two diff --git a/metrics/record_test.go b/metrics/record_test.go index dbfebad76..44bd2eddf 100644 --- a/metrics/record_test.go +++ b/metrics/record_test.go @@ -38,68 +38,87 @@ type cases struct { func TestRecordServing(t *testing.T) { measure := stats.Int64("request_count", "Number of reconcile operations", stats.UnitNone) - shouldReportCases := []cases{ - // Increase the measurement value for each test case so that checking - // the last value ensures the measurement has been recorded. - { - name: "none stackdriver backend", - metricsConfig: &metricsConfig{}, - measurement: measure.M(1), - }, { - name: "stackdriver backend with supported metric", - metricsConfig: &metricsConfig{ - isStackdriverBackend: true, - stackdriverMetricTypePrefix: "knative.dev/internal/serving/activator", - }, - measurement: measure.M(2), - }, { - name: "stackdriver backend with unsupported metric and allow custom metric", - metricsConfig: &metricsConfig{ - isStackdriverBackend: true, - stackdriverMetricTypePrefix: "knative.dev/unsupported", - }, - measurement: measure.M(3), - }, { - name: "empty metricsConfig", - measurement: measure.M(4), + // Increase the measurement value for each test case so that checking + // the last value ensures the measurement has been recorded. + shouldReportCases := []cases{{ + name: "none stackdriver backend", + metricsConfig: &metricsConfig{}, + measurement: measure.M(1), + }, { + name: "stackdriver backend with supported metric", + metricsConfig: &metricsConfig{ + isStackdriverBackend: true, + stackdriverMetricTypePrefix: "knative.dev/internal/serving/activator", }, - } + measurement: measure.M(2), + }, { + name: "stackdriver backend with unsupported metric and allow custom metric", + metricsConfig: &metricsConfig{ + isStackdriverBackend: true, + stackdriverMetricTypePrefix: "knative.dev/unsupported", + }, + measurement: measure.M(3), + }, { + name: "empty metricsConfig", + measurement: measure.M(4), + }} testRecord(t, measure, shouldReportCases) } func TestRecordEventing(t *testing.T) { measure := stats.Int64("event_count", "Number of event received", stats.UnitNone) - shouldReportCases := []cases{ - // Increase the measurement value for each test case so that checking - // the last value ensures the measurement has been recorded. - { - name: "none stackdriver backend", - metricsConfig: &metricsConfig{}, - measurement: measure.M(1), - }, { - name: "stackdriver backend with supported metric", - metricsConfig: &metricsConfig{ - isStackdriverBackend: true, - stackdriverMetricTypePrefix: "knative.dev/eventing/broker", - }, - measurement: measure.M(5), - }, { - name: "stackdriver backend with unsupported metric and allow custom metric", - metricsConfig: &metricsConfig{ - isStackdriverBackend: true, - stackdriverMetricTypePrefix: "knative.dev/unsupported", - }, - measurement: measure.M(3), - }, { - name: "empty metricsConfig", - measurement: measure.M(4), + // Increase the measurement value for each test case so that checking + // the last value ensures the measurement has been recorded. + shouldReportCases := []cases{{ + name: "none stackdriver backend", + metricsConfig: &metricsConfig{}, + measurement: measure.M(1), + }, { + name: "stackdriver backend with supported metric", + metricsConfig: &metricsConfig{ + isStackdriverBackend: true, + stackdriverMetricTypePrefix: "knative.dev/eventing/broker", }, - } + measurement: measure.M(2), + }, { + name: "stackdriver backend with unsupported metric and allow custom metric", + metricsConfig: &metricsConfig{ + isStackdriverBackend: true, + stackdriverMetricTypePrefix: "knative.dev/unsupported", + }, + measurement: measure.M(3), + }, { + name: "empty metricsConfig", + measurement: measure.M(4), + }} testRecord(t, measure, shouldReportCases) } +func TestRecordBatch(t *testing.T) { + ctx := context.Background() + measure1 := stats.Int64("count1", "First counter", stats.UnitNone) + measure2 := stats.Int64("count2", "Second counter", stats.UnitNone) + v := []*view.View{&view.View{ + Measure: measure1, + Aggregation: view.LastValue(), + }, &view.View{ + Measure: measure2, + Aggregation: view.LastValue(), + }} + view.Register(v...) + defer view.Unregister(v...) + metricsConfig := &metricsConfig{} + measurement1 := measure1.M(1984) + measurement2 := measure2.M(42) + setCurMetricsConfig(metricsConfig) + RecordBatch(ctx, measurement1, measurement2) + metricstest.CheckLastValueData(t, measurement1.Measure().Name(), map[string]string{}, 1984) + metricstest.CheckLastValueData(t, measurement2.Measure().Name(), map[string]string{}, 42) +} + func testRecord(t *testing.T, measure *stats.Int64Measure, shouldReportCases []cases) { - ctx := context.TODO() + t.Helper() + ctx := context.Background() v := &view.View{ Measure: measure, Aggregation: view.LastValue(), @@ -117,30 +136,28 @@ func testRecord(t *testing.T, measure *stats.Int64Measure, shouldReportCases []c name string metricsConfig *metricsConfig measurement stats.Measurement - }{ - // Use a different value for the measurement other than the last one of shouldReportCases - { - name: "stackdriver backend with unsupported metric but not allow custom metric", - metricsConfig: &metricsConfig{ - isStackdriverBackend: true, - stackdriverMetricTypePrefix: "knative.dev/unsupported", - recorder: func(ctx context.Context, ms stats.Measurement, ros ...stats.Options) error { - metricType := path.Join("knative.dev/unsupported", ms.Measure().Name()) - if metricskey.KnativeRevisionMetrics.Has(metricType) || metricskey.KnativeTriggerMetrics.Has(metricType) { - ros = append(ros, stats.WithMeasurements(ms)) - return stats.RecordWithOptions(ctx, ros...) - } - return nil - }, + }{{ // Use a different value for the measurement other than the last one of shouldReportCases + name: "stackdriver backend with unsupported metric but not allow custom metric", + metricsConfig: &metricsConfig{ + isStackdriverBackend: true, + stackdriverMetricTypePrefix: "knative.dev/unsupported", + recorder: func(ctx context.Context, mss []stats.Measurement, ros ...stats.Options) error { + metricType := path.Join("knative.dev/unsupported", mss[0].Measure().Name()) + if metricskey.KnativeRevisionMetrics.Has(metricType) || metricskey.KnativeTriggerMetrics.Has(metricType) { + ros = append(ros, stats.WithMeasurements(mss[0])) + return stats.RecordWithOptions(ctx, ros...) + } + return nil }, - measurement: measure.M(5), }, - } + measurement: measure.M(5), + }} for _, test := range shouldNotReportCases { setCurMetricsConfig(test.metricsConfig) Record(ctx, test.measurement) - metricstest.CheckLastValueData(t, test.measurement.Measure().Name(), map[string]string{}, 4) // The value is still the last one of shouldReportCases + metricstest.CheckLastValueData(t, test.measurement.Measure().Name(), map[string]string{}, + float64(len(shouldReportCases))) // The value is still the last one of shouldReportCases } }