Make the batch processor limit data points rather than metrics. (#3141)

**Link to tracking Issue:** #2754

This change introduces a BenchmarkBatchMetricProcessor that stress tests batching logic.
Results before:
`BenchmarkBatchMetricProcessor-12           20000             80614 ns/op`
Results after the change:
`BenchmarkBatchMetricProcessor-12           20000             96184 ns/op`
This commit is contained in:
kisieland 2021-06-02 16:52:24 +02:00 committed by GitHub
parent 5b73ece6d3
commit 6d44f0d303
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 252 additions and 33 deletions

View File

@ -13,6 +13,7 @@
- Replace `ProcessorCreateParams` with `ProcessorCreateSettings`. (#3181)
- Replace `ExporterCreateParams` with `ExporterCreateSettings` (#3164)
- Replace `ReceiverCreateParams` with `ReceiverCreateSettings`. (#3167)
- Change `batchprocessor` logic to limit data points rather than metrics (#3141)
## 💡 Enhancements 💡

View File

@ -15,8 +15,8 @@ any data drops such as sampling.
Please refer to [config.go](./config.go) for the config spec.
The following configuration options can be modified:
- `send_batch_size` (default = 8192): Number of spans or metrics after which a
batch will be sent regardless of the timeout.
- `send_batch_size` (default = 8192): Number of spans, metric data points, or log
records after which a batch will be sent regardless of the timeout.
- `timeout` (default = 200ms): Time duration after which a batch will be sent
regardless of size.
- `send_batch_max_size` (default = 0): The upper limit of the batch size.

View File

@ -263,9 +263,9 @@ func (bt *batchTraces) size() int {
}
type batchMetrics struct {
nextConsumer consumer.Metrics
metricData pdata.Metrics
metricCount int
nextConsumer consumer.Metrics
metricData pdata.Metrics
dataPointCount int
}
func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
@ -274,19 +274,19 @@ func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int) error {
var req pdata.Metrics
if sendBatchMaxSize > 0 && bm.metricCount > sendBatchMaxSize {
if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize {
req = splitMetrics(sendBatchMaxSize, bm.metricData)
bm.metricCount -= sendBatchMaxSize
bm.dataPointCount -= sendBatchMaxSize
} else {
req = bm.metricData
bm.metricData = pdata.NewMetrics()
bm.metricCount = 0
bm.dataPointCount = 0
}
return bm.nextConsumer.ConsumeMetrics(ctx, req)
}
func (bm *batchMetrics) itemCount() int {
return bm.metricCount
return bm.dataPointCount
}
func (bm *batchMetrics) size() int {
@ -296,11 +296,11 @@ func (bm *batchMetrics) size() int {
func (bm *batchMetrics) add(item interface{}) {
md := item.(pdata.Metrics)
newMetricsCount := md.MetricCount()
if newMetricsCount == 0 {
_, newDataPointCount := md.MetricAndDataPointCount()
if newDataPointCount == 0 {
return
}
bm.metricCount += newMetricsCount
bm.dataPointCount += newDataPointCount
md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics())
}

View File

@ -17,6 +17,7 @@ package batchprocessor
import (
"context"
"fmt"
"sync"
"testing"
"time"
@ -29,6 +30,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/testdata"
@ -320,6 +322,8 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
requestCount := 100
metricsPerRequest := 5
dataPointsPerMetric := 2 // Since the int counter uses two datapoints.
dataPointsPerRequest := metricsPerRequest * dataPointsPerMetric
sink := new(consumertest.MetricsSink)
creationSet := component.ProcessorCreateSettings{Logger: zap.NewNop()}
@ -339,8 +343,8 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
elapsed := time.Since(start)
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())
expectedBatchesNum := requestCount * metricsPerRequest / int(cfg.SendBatchSize)
expectedBatchingFactor := int(cfg.SendBatchSize) / metricsPerRequest
expectedBatchesNum := requestCount * dataPointsPerRequest / int(cfg.SendBatchSize)
expectedBatchingFactor := int(cfg.SendBatchSize) / dataPointsPerRequest
require.Equal(t, requestCount*metricsPerRequest, sink.MetricsCount())
receivedMds := sink.AllMetrics()
@ -357,7 +361,7 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
assert.Equal(t, 1, len(viewData))
distData := viewData[0].Data.(*view.DistributionData)
assert.Equal(t, int64(expectedBatchesNum), distData.Count)
assert.Equal(t, sink.MetricsCount(), int(distData.Sum()))
assert.Equal(t, sink.MetricsCount()*dataPointsPerMetric, int(distData.Sum()))
assert.Equal(t, cfg.SendBatchSize, uint32(distData.Min))
assert.Equal(t, cfg.SendBatchSize, uint32(distData.Max))
@ -369,6 +373,23 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
assert.Equal(t, size, int(distData.Sum()))
}
func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {
ctx := context.Background()
sink := new(metricsSink)
metricsCount := 50
dataPointsPerMetric := 2
sendBatchMaxSize := 99
batchMetrics := newBatchMetrics(sink)
md := testdata.GenerateMetricsManyMetricsSameResource(metricsCount)
batchMetrics.add(md)
require.Equal(t, dataPointsPerMetric*metricsCount, batchMetrics.dataPointCount)
require.NoError(t, batchMetrics.export(ctx, sendBatchMaxSize))
remainingDataPointsCount := metricsCount*dataPointsPerMetric - sendBatchMaxSize
require.Equal(t, remainingDataPointsCount, batchMetrics.dataPointCount)
}
func TestBatchMetricsProcessor_Timeout(t *testing.T) {
cfg := Config{
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
@ -502,6 +523,55 @@ func BenchmarkTraceSizeSpanCount(b *testing.B) {
}
}
func BenchmarkBatchMetricProcessor(b *testing.B) {
b.StopTimer()
cfg := Config{
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
}
ctx := context.Background()
sink := new(metricsSink)
creationSet := component.ProcessorCreateSettings{Logger: zap.NewNop()}
metricsPerRequest := 1000
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(b, err)
require.NoError(b, batcher.Start(ctx, componenttest.NewNopHost()))
mds := make([]pdata.Metrics, 0, b.N)
for n := 0; n < b.N; n++ {
mds = append(mds,
testdata.GenerateMetricsManyMetricsSameResource(metricsPerRequest),
)
}
b.StartTimer()
for n := 0; n < b.N; n++ {
batcher.ConsumeMetrics(ctx, mds[n])
}
b.StopTimer()
require.NoError(b, batcher.Shutdown(ctx))
require.Equal(b, b.N*metricsPerRequest, sink.metricsCount)
}
type metricsSink struct {
mu sync.Mutex
metricsCount int
}
func (sme *metricsSink) Capabilities() consumer.Capabilities {
return consumer.Capabilities{
MutatesData: false,
}
}
func (sme *metricsSink) ConsumeMetrics(_ context.Context, md pdata.Metrics) error {
sme.mu.Lock()
defer sme.mu.Unlock()
sme.metricsCount += md.MetricCount()
return nil
}
func TestBatchLogProcessor_ReceivingData(t *testing.T) {
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.

View File

@ -20,15 +20,16 @@ import (
// splitMetrics removes metrics from the input data and returns a new data of the specified size.
func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {
if src.MetricCount() <= size {
_, dataPoints := src.MetricAndDataPointCount()
if dataPoints <= size {
return src
}
totalCopiedMetrics := 0
totalCopiedDataPoints := 0
dest := pdata.NewMetrics()
src.ResourceMetrics().RemoveIf(func(srcRs pdata.ResourceMetrics) bool {
// If we are done skip everything else.
if totalCopiedMetrics == size {
if totalCopiedDataPoints == size {
return false
}
@ -37,7 +38,7 @@ func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {
srcRs.InstrumentationLibraryMetrics().RemoveIf(func(srcIlm pdata.InstrumentationLibraryMetrics) bool {
// If we are done skip everything else.
if totalCopiedMetrics == size {
if totalCopiedDataPoints == size {
return false
}
@ -45,21 +46,22 @@ func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {
srcIlm.InstrumentationLibrary().CopyTo(destIlm.InstrumentationLibrary())
// If possible to move all metrics do that.
srcMetricsLen := srcIlm.Metrics().Len()
if size-totalCopiedMetrics >= srcMetricsLen {
totalCopiedMetrics += srcMetricsLen
srcDataPointCount := metricSliceDataPointCount(srcIlm.Metrics())
if size-totalCopiedDataPoints >= srcDataPointCount {
totalCopiedDataPoints += srcDataPointCount
srcIlm.Metrics().MoveAndAppendTo(destIlm.Metrics())
return true
}
srcIlm.Metrics().RemoveIf(func(srcMetric pdata.Metric) bool {
// If we are done skip everything else.
if totalCopiedMetrics == size {
if totalCopiedDataPoints == size {
return false
}
srcMetric.CopyTo(destIlm.Metrics().AppendEmpty())
totalCopiedMetrics++
return true
// If the metric has more data points than free slots we should split it.
copiedDataPoints, remove := splitMetric(srcMetric, destIlm.Metrics().AppendEmpty(), size-totalCopiedDataPoints)
totalCopiedDataPoints += copiedDataPoints
return remove
})
return false
})
@ -68,3 +70,82 @@ func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {
return dest
}
// metricSliceDataPointCount calculates the total number of data points.
func metricSliceDataPointCount(ms pdata.MetricSlice) (dataPointCount int) {
for k := 0; k < ms.Len(); k++ {
dataPointCount += metricDataPointCount(ms.At(k))
}
return
}
// metricDataPointCount calculates the total number of data points.
func metricDataPointCount(ms pdata.Metric) (dataPointCount int) {
switch ms.DataType() {
case pdata.MetricDataTypeIntGauge:
dataPointCount = ms.IntGauge().DataPoints().Len()
case pdata.MetricDataTypeDoubleGauge:
dataPointCount = ms.DoubleGauge().DataPoints().Len()
case pdata.MetricDataTypeIntSum:
dataPointCount = ms.IntSum().DataPoints().Len()
case pdata.MetricDataTypeDoubleSum:
dataPointCount = ms.DoubleSum().DataPoints().Len()
case pdata.MetricDataTypeIntHistogram:
dataPointCount = ms.IntHistogram().DataPoints().Len()
case pdata.MetricDataTypeHistogram:
dataPointCount = ms.Histogram().DataPoints().Len()
case pdata.MetricDataTypeSummary:
dataPointCount = ms.Summary().DataPoints().Len()
}
return
}
// splitMetric removes metric points from the input data and moves data of the specified size to destination.
// Returns size of moved data and boolean describing, whether the metric should be removed from original slice.
func splitMetric(ms, dest pdata.Metric, size int) (int, bool) {
ms.CopyTo(dest)
if metricDataPointCount(ms) <= size {
return metricDataPointCount(ms), true
}
msSize, i := metricDataPointCount(ms)-size, 0
filterDataPoints := func() bool { i++; return i <= msSize }
switch ms.DataType() {
case pdata.MetricDataTypeIntGauge:
dest.IntGauge().DataPoints().Resize(size)
ms.IntGauge().DataPoints().RemoveIf(func(_ pdata.IntDataPoint) bool {
return filterDataPoints()
})
case pdata.MetricDataTypeDoubleGauge:
dest.DoubleGauge().DataPoints().Resize(size)
ms.DoubleGauge().DataPoints().RemoveIf(func(_ pdata.DoubleDataPoint) bool {
return filterDataPoints()
})
case pdata.MetricDataTypeIntSum:
dest.IntSum().DataPoints().Resize(size)
ms.IntSum().DataPoints().RemoveIf(func(_ pdata.IntDataPoint) bool {
return filterDataPoints()
})
case pdata.MetricDataTypeDoubleSum:
dest.DoubleSum().DataPoints().Resize(size)
ms.DoubleSum().DataPoints().RemoveIf(func(_ pdata.DoubleDataPoint) bool {
return filterDataPoints()
})
case pdata.MetricDataTypeIntHistogram:
dest.IntHistogram().DataPoints().Resize(size)
ms.IntHistogram().DataPoints().RemoveIf(func(_ pdata.IntHistogramDataPoint) bool {
return filterDataPoints()
})
case pdata.MetricDataTypeHistogram:
dest.Histogram().DataPoints().Resize(size)
ms.Histogram().DataPoints().RemoveIf(func(_ pdata.HistogramDataPoint) bool {
return filterDataPoints()
})
case pdata.MetricDataTypeSummary:
dest.Summary().DataPoints().Resize(size)
ms.Summary().DataPoints().RemoveIf(func(_ pdata.SummaryDataPoint) bool {
return filterDataPoints()
})
}
return size, false
}

View File

@ -36,8 +36,10 @@ func TestSplitMetrics_noop(t *testing.T) {
func TestSplitMetrics(t *testing.T) {
md := testdata.GenerateMetricsManyMetricsSameResource(20)
metrics := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()
dataPointCount := metricDataPointCount(metrics.At(0))
for i := 0; i < metrics.Len(); i++ {
metrics.At(i).SetName(getTestMetricName(0, i))
assert.Equal(t, dataPointCount, metricDataPointCount(metrics.At(i)))
}
cp := pdata.NewMetrics()
cpMetrics := cp.ResourceMetrics().AppendEmpty().InstrumentationLibraryMetrics().AppendEmpty().Metrics()
@ -52,9 +54,10 @@ func TestSplitMetrics(t *testing.T) {
metrics.At(3).CopyTo(cpMetrics.At(3))
metrics.At(4).CopyTo(cpMetrics.At(4))
splitSize := 5
splitMetricCount := 5
splitSize := splitMetricCount * dataPointCount
split := splitMetrics(splitSize, md)
assert.Equal(t, splitSize, split.MetricCount())
assert.Equal(t, splitMetricCount, split.MetricCount())
assert.Equal(t, cp, split)
assert.Equal(t, 15, md.MetricCount())
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
@ -79,8 +82,10 @@ func TestSplitMetrics(t *testing.T) {
func TestSplitMetricsMultipleResourceSpans(t *testing.T) {
md := testdata.GenerateMetricsManyMetricsSameResource(20)
metrics := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()
dataPointCount := metricDataPointCount(metrics.At(0))
for i := 0; i < metrics.Len(); i++ {
metrics.At(i).SetName(getTestMetricName(0, i))
assert.Equal(t, dataPointCount, metricDataPointCount(metrics.At(i)))
}
// add second index to resource metrics
testdata.GenerateMetricsManyMetricsSameResource(20).
@ -90,9 +95,10 @@ func TestSplitMetricsMultipleResourceSpans(t *testing.T) {
metrics.At(i).SetName(getTestMetricName(1, i))
}
splitSize := 5
splitMetricCount := 5
splitSize := splitMetricCount * dataPointCount
split := splitMetrics(splitSize, md)
assert.Equal(t, splitSize, split.MetricCount())
assert.Equal(t, splitMetricCount, split.MetricCount())
assert.Equal(t, 35, md.MetricCount())
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-4", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())
@ -101,8 +107,10 @@ func TestSplitMetricsMultipleResourceSpans(t *testing.T) {
func TestSplitMetricsMultipleResourceSpans_SplitSizeGreaterThanMetricSize(t *testing.T) {
td := testdata.GenerateMetricsManyMetricsSameResource(20)
metrics := td.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()
dataPointCount := metricDataPointCount(metrics.At(0))
for i := 0; i < metrics.Len(); i++ {
metrics.At(i).SetName(getTestMetricName(0, i))
assert.Equal(t, dataPointCount, metricDataPointCount(metrics.At(i)))
}
td.ResourceMetrics().Resize(2)
// add second index to resource metrics
@ -113,10 +121,11 @@ func TestSplitMetricsMultipleResourceSpans_SplitSizeGreaterThanMetricSize(t *tes
metrics.At(i).SetName(getTestMetricName(1, i))
}
splitSize := 25
splitMetricCount := 25
splitSize := splitMetricCount * dataPointCount
split := splitMetrics(splitSize, td)
assert.Equal(t, splitSize, split.MetricCount())
assert.Equal(t, 40-splitSize, td.MetricCount())
assert.Equal(t, splitMetricCount, split.MetricCount())
assert.Equal(t, 40-splitMetricCount, td.MetricCount())
assert.Equal(t, 1, td.ResourceMetrics().Len())
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-19", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(19).Name())
@ -166,3 +175,61 @@ func BenchmarkCloneMetrics(b *testing.B) {
}
}
}
func TestSplitMetricsUneven(t *testing.T) {
md := testdata.GenerateMetricsManyMetricsSameResource(10)
metrics := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()
dataPointCount := 2
for i := 0; i < metrics.Len(); i++ {
metrics.At(i).SetName(getTestMetricName(0, i))
assert.Equal(t, dataPointCount, metricDataPointCount(metrics.At(i)))
}
splitSize := 9
split := splitMetrics(splitSize, md)
assert.Equal(t, 5, split.MetricCount())
assert.Equal(t, 6, md.MetricCount())
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-4", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())
split = splitMetrics(splitSize, md)
assert.Equal(t, 5, split.MetricCount())
assert.Equal(t, 1, md.MetricCount())
assert.Equal(t, "test-metric-int-0-4", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-8", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())
split = splitMetrics(splitSize, md)
assert.Equal(t, 1, split.MetricCount())
assert.Equal(t, "test-metric-int-0-9", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
}
func TestSplitMetricsBatchSizeSmallerThanDataPointCount(t *testing.T) {
md := testdata.GenerateMetricsManyMetricsSameResource(2)
metrics := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()
dataPointCount := 2
for i := 0; i < metrics.Len(); i++ {
metrics.At(i).SetName(getTestMetricName(0, i))
assert.Equal(t, dataPointCount, metricDataPointCount(metrics.At(i)))
}
splitSize := 1
split := splitMetrics(splitSize, md)
assert.Equal(t, 1, split.MetricCount())
assert.Equal(t, 2, md.MetricCount())
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
split = splitMetrics(splitSize, md)
assert.Equal(t, 1, split.MetricCount())
assert.Equal(t, 1, md.MetricCount())
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
split = splitMetrics(splitSize, md)
assert.Equal(t, 1, split.MetricCount())
assert.Equal(t, 1, md.MetricCount())
assert.Equal(t, "test-metric-int-0-1", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
split = splitMetrics(splitSize, md)
assert.Equal(t, 1, split.MetricCount())
assert.Equal(t, 1, md.MetricCount())
assert.Equal(t, "test-metric-int-0-1", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
}