Add RecordN method to permit recording of N measurements simultaneously. (#1025)

* Add RecordN method to permit recording of N measurements simultaneously.

In serving we have many places where we try to record several metrics at the same time.
Besides very repetetive code, this also brings inefficiences, since metrics code has to do _some_ work
for every call, which is completely avoidable.

This way we can record N measurements with one hit.

* tests and nits

* review items per discussion with Evan:

- remove ros
- make measurements ...

* review
This commit is contained in:
Victor Agababov 2020-01-31 11:26:28 -08:00 committed by GitHub
parent d0787646f1
commit 92c9752a4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 113 additions and 83 deletions

View File

@ -84,7 +84,7 @@ type metricsConfig struct {
// recorder provides a hook for performing custom transformations before
// writing the metrics to the stats.RecordWithOptions interface.
recorder func(context.Context, stats.Measurement, ...stats.Options) error
recorder func(context.Context, []stats.Measurement, ...stats.Options) error
// ---- OpenCensus specific below ----
// collectorAddress is the address of the collector, if not `localhost:55678`
@ -144,13 +144,13 @@ func NewStackdriverClientConfigFromMap(config map[string]string) *StackdriverCli
}
}
// Record applies the `ros` Options to `ms` and then records the resulting
// record applies the `ros` Options to each measurement in `mss` and then records the resulting
// measurements in the metricsConfig's designated backend.
func (mc *metricsConfig) Record(ctx context.Context, ms stats.Measurement, ros ...stats.Options) error {
func (mc *metricsConfig) record(ctx context.Context, mss []stats.Measurement, ros ...stats.Options) error {
if mc == nil || mc.recorder == nil {
return stats.RecordWithOptions(ctx, append(ros, stats.WithMeasurements(ms))...)
return stats.RecordWithOptions(ctx, append(ros, stats.WithMeasurements(mss...))...)
}
return mc.recorder(ctx, ms, ros...)
return mc.recorder(ctx, mss, ros...)
}
func createMetricsConfig(ops ExporterOptions, logger *zap.SugaredLogger) (*metricsConfig, error) {
@ -235,15 +235,25 @@ func createMetricsConfig(ops ExporterOptions, logger *zap.SugaredLogger) (*metri
if !allowCustomMetrics {
servingOrEventing := metricskey.KnativeRevisionMetrics.Union(
metricskey.KnativeTriggerMetrics).Union(metricskey.KnativeBrokerMetrics)
mc.recorder = func(ctx context.Context, ms stats.Measurement, ros ...stats.Options) error {
metricType := path.Join(mc.stackdriverMetricTypePrefix, ms.Measure().Name())
mc.recorder = func(ctx context.Context, mss []stats.Measurement, ros ...stats.Options) error {
// Perform array filtering in place using two indices: w(rite)Index and r(ead)Index.
wIdx := 0
for rIdx := 0; rIdx < len(mss); rIdx++ {
metricType := path.Join(mc.stackdriverMetricTypePrefix, mss[rIdx].Measure().Name())
if servingOrEventing.Has(metricType) {
return stats.RecordWithOptions(ctx, append(ros, stats.WithMeasurements(ms))...)
mss[wIdx] = mss[rIdx]
wIdx++
}
// Otherwise, skip (because it won't be accepted)
// Otherwise, skip the measurement (because it won't be accepted).
}
// Found no matched metrics.
if wIdx == 0 {
return nil
}
// Trim the list to the number of written objects.
mss = mss[:wIdx]
return stats.RecordWithOptions(ctx, append(ros, stats.WithMeasurements(mss...))...)
}
}
}

View File

@ -27,9 +27,12 @@ import (
// Record stores the given Measurement from `ms` in the current metrics backend.
func Record(ctx context.Context, ms stats.Measurement, ros ...stats.Options) {
mc := getCurMetricsConfig()
getCurMetricsConfig().record(ctx, []stats.Measurement{ms}, ros...)
}
mc.Record(ctx, ms, ros...)
// RecordBatch stores the given Measurements from `mss` in the current metrics backend.
func RecordBatch(ctx context.Context, mss ...stats.Measurement) {
getCurMetricsConfig().record(ctx, mss)
}
// Buckets125 generates an array of buckets with approximate powers-of-two

View File

@ -38,10 +38,9 @@ type cases struct {
func TestRecordServing(t *testing.T) {
measure := stats.Int64("request_count", "Number of reconcile operations", stats.UnitNone)
shouldReportCases := []cases{
// Increase the measurement value for each test case so that checking
// the last value ensures the measurement has been recorded.
{
shouldReportCases := []cases{{
name: "none stackdriver backend",
metricsConfig: &metricsConfig{},
measurement: measure.M(1),
@ -62,17 +61,15 @@ func TestRecordServing(t *testing.T) {
}, {
name: "empty metricsConfig",
measurement: measure.M(4),
},
}
}}
testRecord(t, measure, shouldReportCases)
}
func TestRecordEventing(t *testing.T) {
measure := stats.Int64("event_count", "Number of event received", stats.UnitNone)
shouldReportCases := []cases{
// Increase the measurement value for each test case so that checking
// the last value ensures the measurement has been recorded.
{
shouldReportCases := []cases{{
name: "none stackdriver backend",
metricsConfig: &metricsConfig{},
measurement: measure.M(1),
@ -82,7 +79,7 @@ func TestRecordEventing(t *testing.T) {
isStackdriverBackend: true,
stackdriverMetricTypePrefix: "knative.dev/eventing/broker",
},
measurement: measure.M(5),
measurement: measure.M(2),
}, {
name: "stackdriver backend with unsupported metric and allow custom metric",
metricsConfig: &metricsConfig{
@ -93,13 +90,35 @@ func TestRecordEventing(t *testing.T) {
}, {
name: "empty metricsConfig",
measurement: measure.M(4),
},
}
}}
testRecord(t, measure, shouldReportCases)
}
func TestRecordBatch(t *testing.T) {
ctx := context.Background()
measure1 := stats.Int64("count1", "First counter", stats.UnitNone)
measure2 := stats.Int64("count2", "Second counter", stats.UnitNone)
v := []*view.View{&view.View{
Measure: measure1,
Aggregation: view.LastValue(),
}, &view.View{
Measure: measure2,
Aggregation: view.LastValue(),
}}
view.Register(v...)
defer view.Unregister(v...)
metricsConfig := &metricsConfig{}
measurement1 := measure1.M(1984)
measurement2 := measure2.M(42)
setCurMetricsConfig(metricsConfig)
RecordBatch(ctx, measurement1, measurement2)
metricstest.CheckLastValueData(t, measurement1.Measure().Name(), map[string]string{}, 1984)
metricstest.CheckLastValueData(t, measurement2.Measure().Name(), map[string]string{}, 42)
}
func testRecord(t *testing.T, measure *stats.Int64Measure, shouldReportCases []cases) {
ctx := context.TODO()
t.Helper()
ctx := context.Background()
v := &view.View{
Measure: measure,
Aggregation: view.LastValue(),
@ -117,30 +136,28 @@ func testRecord(t *testing.T, measure *stats.Int64Measure, shouldReportCases []c
name string
metricsConfig *metricsConfig
measurement stats.Measurement
}{
// Use a different value for the measurement other than the last one of shouldReportCases
{
}{{ // Use a different value for the measurement other than the last one of shouldReportCases
name: "stackdriver backend with unsupported metric but not allow custom metric",
metricsConfig: &metricsConfig{
isStackdriverBackend: true,
stackdriverMetricTypePrefix: "knative.dev/unsupported",
recorder: func(ctx context.Context, ms stats.Measurement, ros ...stats.Options) error {
metricType := path.Join("knative.dev/unsupported", ms.Measure().Name())
recorder: func(ctx context.Context, mss []stats.Measurement, ros ...stats.Options) error {
metricType := path.Join("knative.dev/unsupported", mss[0].Measure().Name())
if metricskey.KnativeRevisionMetrics.Has(metricType) || metricskey.KnativeTriggerMetrics.Has(metricType) {
ros = append(ros, stats.WithMeasurements(ms))
ros = append(ros, stats.WithMeasurements(mss[0]))
return stats.RecordWithOptions(ctx, ros...)
}
return nil
},
},
measurement: measure.M(5),
},
}
}}
for _, test := range shouldNotReportCases {
setCurMetricsConfig(test.metricsConfig)
Record(ctx, test.measurement)
metricstest.CheckLastValueData(t, test.measurement.Measure().Name(), map[string]string{}, 4) // The value is still the last one of shouldReportCases
metricstest.CheckLastValueData(t, test.measurement.Measure().Name(), map[string]string{},
float64(len(shouldReportCases))) // The value is still the last one of shouldReportCases
}
}