Bytes based batching for metrics (#12550)

#### Description

This PR implements serialized bytes based batching for metrics.

#### Link to tracking issue

https://github.com/open-telemetry/opentelemetry-collector/issues/3262

Continuation of #12299 made by @sfc-gh-sili.

Signed-off-by: Israel Blancas <iblancasa@gmail.com>
This commit is contained in:
Israel Blancas 2025-03-10 21:39:26 +01:00 committed by GitHub
parent eabab751a0
commit 6ae1c98f63
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 864 additions and 134 deletions

View File

@ -4,8 +4,6 @@
package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
import (
math_bits "math/bits"
"go.opentelemetry.io/collector/pdata/plog"
)
@ -22,22 +20,7 @@ type LogsSizer interface {
// LogsBytesSizer returns the byte size of serialized protos.
type LogsBytesSizer struct {
plog.ProtoMarshaler
}
// DeltaSize returns the delta size of a proto slice when a new item is added.
// Example:
//
// prevSize := proto1.Size()
// proto1.RepeatedField().AppendEmpty() = proto2
//
// Then currSize of proto1 can be calculated as
//
// currSize := (prevSize + sizer.DeltaSize(proto2.Size()))
//
// This is derived from opentelemetry-collector/pdata/internal/data/protogen/logs/v1/logs.pb.go
// which is generated with gogo/protobuf.
func (s *LogsBytesSizer) DeltaSize(newItemSize int) int {
return 1 + newItemSize + sov(uint64(newItemSize)) //nolint:gosec // disable G115
protoDeltaSizer
}
// LogsCountSizer returns the nunmber of logs entries.
@ -66,7 +49,3 @@ func (s *LogsCountSizer) LogRecordSize(_ plog.LogRecord) int {
func (s *LogsCountSizer) DeltaSize(newItemSize int) int {
return newItemSize
}
func sov(x uint64) int {
return (math_bits.Len64(x|1) + 6) / 7
}

View File

@ -55,10 +55,3 @@ func TestLogsBytesSizer(t *testing.T) {
lr.CopyTo(sl.LogRecords().AppendEmpty())
require.Equal(t, sizer.ScopeLogsSize(sl), prevSize+sizer.DeltaSize(sizer.LogRecordSize(lr)))
}
func TestLogsBytesDeltaSize(t *testing.T) {
sizer := LogsBytesSizer{}
require.Equal(t, 129, sizer.DeltaSize(127))
require.Equal(t, 131, sizer.DeltaSize(128))
require.Equal(t, 242, sizer.DeltaSize(239))
}

View File

@ -0,0 +1,85 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
import (
"go.opentelemetry.io/collector/pdata/pmetric"
) // MetricsCountSizer returns the nunmber of metrics entries.
type MetricsSizer interface {
MetricsSize(md pmetric.Metrics) (count int)
ResourceMetricsSize(rm pmetric.ResourceMetrics) (count int)
ScopeMetricsSize(sm pmetric.ScopeMetrics) (count int)
MetricSize(m pmetric.Metric) int
DeltaSize(newItemSize int) int
NumberDataPointSize(ndp pmetric.NumberDataPoint) int
HistogramDataPointSize(hdp pmetric.HistogramDataPoint) int
ExponentialHistogramDataPointSize(ehdp pmetric.ExponentialHistogramDataPoint) int
SummaryDataPointSize(sdps pmetric.SummaryDataPoint) int
}
type MetricsBytesSizer struct {
pmetric.ProtoMarshaler
protoDeltaSizer
}
var _ MetricsSizer = &MetricsBytesSizer{}
type MetricsCountSizer struct{}
var _ MetricsSizer = &MetricsCountSizer{}
func (s *MetricsCountSizer) MetricsSize(md pmetric.Metrics) int {
return md.DataPointCount()
}
func (s *MetricsCountSizer) ResourceMetricsSize(rm pmetric.ResourceMetrics) (count int) {
for i := 0; i < rm.ScopeMetrics().Len(); i++ {
count += s.ScopeMetricsSize(rm.ScopeMetrics().At(i))
}
return count
}
func (s *MetricsCountSizer) ScopeMetricsSize(sm pmetric.ScopeMetrics) (count int) {
for i := 0; i < sm.Metrics().Len(); i++ {
count += s.MetricSize(sm.Metrics().At(i))
}
return count
}
func (s *MetricsCountSizer) MetricSize(m pmetric.Metric) int {
switch m.Type() {
case pmetric.MetricTypeGauge:
return m.Gauge().DataPoints().Len()
case pmetric.MetricTypeSum:
return m.Sum().DataPoints().Len()
case pmetric.MetricTypeHistogram:
return m.Histogram().DataPoints().Len()
case pmetric.MetricTypeExponentialHistogram:
return m.ExponentialHistogram().DataPoints().Len()
case pmetric.MetricTypeSummary:
return m.Summary().DataPoints().Len()
}
return 0
}
func (s *MetricsCountSizer) DeltaSize(newItemSize int) int {
return newItemSize
}
func (s *MetricsCountSizer) NumberDataPointSize(_ pmetric.NumberDataPoint) int {
return 1
}
func (s *MetricsCountSizer) HistogramDataPointSize(_ pmetric.HistogramDataPoint) int {
return 1
}
func (s *MetricsCountSizer) ExponentialHistogramDataPointSize(_ pmetric.ExponentialHistogramDataPoint) int {
return 1
}
func (s *MetricsCountSizer) SummaryDataPointSize(_ pmetric.SummaryDataPoint) int {
return 1
}

View File

@ -0,0 +1,67 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
import (
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/testdata"
)
func TestMetricsCountSizer(t *testing.T) {
md := testdata.GenerateMetrics(7)
sizer := MetricsCountSizer{}
require.Equal(t, 14, sizer.MetricsSize(md))
rm := md.ResourceMetrics().At(0)
require.Equal(t, 14, sizer.ResourceMetricsSize(rm))
sm := rm.ScopeMetrics().At(0)
require.Equal(t, 14, sizer.ScopeMetricsSize(sm))
// Test different metric types
require.Equal(t, 2, sizer.MetricSize(sm.Metrics().At(0)))
// Test data point sizes
require.Equal(t, 1, sizer.NumberDataPointSize(sm.Metrics().At(0).Gauge().DataPoints().At(0)))
require.Equal(t, 1, sizer.NumberDataPointSize(sm.Metrics().At(1).Gauge().DataPoints().At(0)))
require.Equal(t, 1, sizer.NumberDataPointSize(sm.Metrics().At(2).Sum().DataPoints().At(0)))
require.Equal(t, 1, sizer.NumberDataPointSize(sm.Metrics().At(3).Sum().DataPoints().At(0)))
require.Equal(t, 1, sizer.HistogramDataPointSize(sm.Metrics().At(4).Histogram().DataPoints().At(0)))
require.Equal(t, 1, sizer.ExponentialHistogramDataPointSize(sm.Metrics().At(5).ExponentialHistogram().DataPoints().At(0)))
require.Equal(t, 1, sizer.SummaryDataPointSize(sm.Metrics().At(6).Summary().DataPoints().At(0)))
prevSize := sizer.ScopeMetricsSize(sm)
sm.Metrics().At(0).CopyTo(sm.Metrics().AppendEmpty())
require.Equal(t, sizer.ScopeMetricsSize(sm), prevSize+sizer.DeltaSize(sizer.MetricSize(sm.Metrics().At(0))))
}
func TestMetricsBytesSizer(t *testing.T) {
md := testdata.GenerateMetrics(7)
sizer := MetricsBytesSizer{}
require.Equal(t, 1594, sizer.MetricsSize(md))
rm := md.ResourceMetrics().At(0)
require.Equal(t, 1591, sizer.ResourceMetricsSize(rm))
sm := rm.ScopeMetrics().At(0)
require.Equal(t, 1546, sizer.ScopeMetricsSize(sm))
// Test different metric types
require.Equal(t, 130, sizer.MetricSize(sm.Metrics().At(0)))
// Test data point sizes
require.Equal(t, 55, sizer.NumberDataPointSize(sm.Metrics().At(0).Gauge().DataPoints().At(0)))
require.Equal(t, 83, sizer.NumberDataPointSize(sm.Metrics().At(1).Gauge().DataPoints().At(0)))
require.Equal(t, 55, sizer.NumberDataPointSize(sm.Metrics().At(2).Sum().DataPoints().At(0)))
require.Equal(t, 83, sizer.NumberDataPointSize(sm.Metrics().At(3).Sum().DataPoints().At(0)))
require.Equal(t, 92, sizer.HistogramDataPointSize(sm.Metrics().At(4).Histogram().DataPoints().At(0)))
require.Equal(t, 119, sizer.ExponentialHistogramDataPointSize(sm.Metrics().At(5).ExponentialHistogram().DataPoints().At(0)))
require.Equal(t, 92, sizer.SummaryDataPointSize(sm.Metrics().At(6).Summary().DataPoints().At(0)))
prevSize := sizer.ScopeMetricsSize(sm)
sm.Metrics().At(0).CopyTo(sm.Metrics().AppendEmpty())
require.Equal(t, sizer.ScopeMetricsSize(sm), prevSize+sizer.DeltaSize(sizer.MetricSize(sm.Metrics().At(0))))
}

View File

@ -0,0 +1,34 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
import (
math_bits "math/bits"
)
type protoDeltaSizer struct{}
// DeltaSize() returns the delta size of a proto slice when a new item is added.
// Example:
//
// prevSize := proto1.Size()
// proto1.RepeatedField().AppendEmpty() = proto2
//
// Then currSize of proto1 can be calculated as
//
// currSize := (prevSize + sizer.DeltaSize(proto2.Size()))
//
// This is derived from:
// - opentelemetry-collector/pdata/internal/data/protogen/metrics/v1/metrics.pb.go
// - opentelemetry-collector/pdata/internal/data/protogen/logs/v1/logs.pb.go
// - opentelemetry-collector/pdata/internal/data/protogen/traces/v1/traces.pb.go
// - opentelemetry-collector/pdata/internal/data/protogen/profiles/v1development/profiles.pb.go
// which is generated with gogo/protobuf.
func (s *protoDeltaSizer) DeltaSize(newItemSize int) int {
return 1 + newItemSize + sov(uint64(newItemSize)) //nolint:gosec // disable G115
}
func sov(x uint64) int {
return (math_bits.Len64(x|1) + 6) / 7
}

View File

@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package sizer
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMetricsBytesDeltaSize(t *testing.T) {
sizer := protoDeltaSizer{}
require.Equal(t, 129, sizer.DeltaSize(127))
require.Equal(t, 131, sizer.DeltaSize(128))
require.Equal(t, 242, sizer.DeltaSize(239))
}

View File

@ -371,3 +371,20 @@ func BenchmarkSplittingBasedOnByteSizeHugeLogs(b *testing.B) {
assert.Len(b, merged, 10)
}
}
func TestLogsRequest_MergeSplit_UnknownSizerType(t *testing.T) {
// Create a logs request
req := newLogsRequest(plog.NewLogs(), nil)
// Create config with invalid sizer type by using zero value
cfg := exporterbatcher.SizeConfig{
Sizer: exporterbatcher.SizerType{}, // Empty struct will have empty string as val
}
// Call MergeSplit with invalid sizer
result, err := req.MergeSplit(context.Background(), cfg, nil)
// Verify results
assert.Nil(t, result)
assert.EqualError(t, err, "unknown sizer type")
}

View File

@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pipeline"
)
@ -24,16 +25,16 @@ var (
)
type metricsRequest struct {
md pmetric.Metrics
pusher consumer.ConsumeMetricsFunc
cachedItemsCount int
md pmetric.Metrics
pusher consumer.ConsumeMetricsFunc
cachedSize int
}
func newMetricsRequest(md pmetric.Metrics, pusher consumer.ConsumeMetricsFunc) Request {
return &metricsRequest{
md: md,
pusher: pusher,
cachedItemsCount: md.DataPointCount(),
md: md,
pusher: pusher,
cachedSize: -1,
}
}
@ -66,11 +67,18 @@ func (req *metricsRequest) Export(ctx context.Context) error {
}
func (req *metricsRequest) ItemsCount() int {
return req.cachedItemsCount
return req.md.DataPointCount()
}
func (req *metricsRequest) setCachedItemsCount(count int) {
req.cachedItemsCount = count
func (req *metricsRequest) size(sizer sizer.MetricsSizer) int {
if req.cachedSize == -1 {
req.cachedSize = sizer.MetricsSize(req.md)
}
return req.cachedSize
}
func (req *metricsRequest) setCachedSize(count int) {
req.cachedSize = count
}
type metricsExporter struct {

View File

@ -8,202 +8,319 @@ import (
"errors"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
"go.opentelemetry.io/collector/pdata/pmetric"
)
// MergeSplit splits and/or merges the provided metrics request and the current request into one or more requests
// conforming with the MaxSizeConfig.
func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.SizeConfig, r2 Request) ([]Request, error) {
var sz sizer.MetricsSizer
switch cfg.Sizer {
case exporterbatcher.SizerTypeItems:
sz = &sizer.MetricsCountSizer{}
case exporterbatcher.SizerTypeBytes:
sz = &sizer.MetricsBytesSizer{}
default:
return nil, errors.New("unknown sizer type")
}
if r2 != nil {
req2, ok := r2.(*metricsRequest)
if !ok {
return nil, errors.New("invalid input type")
}
req2.mergeTo(req)
req2.mergeTo(req, sz)
}
// If no limit we can simply merge the new request into the current and return.
if cfg.MaxSize == 0 {
return []Request{req}, nil
}
return req.split(cfg)
return req.split(cfg.MaxSize, sz), nil
}
func (req *metricsRequest) mergeTo(dst *metricsRequest) {
dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount())
req.setCachedItemsCount(0)
func (req *metricsRequest) mergeTo(dst *metricsRequest, sz sizer.MetricsSizer) {
if sz != nil {
dst.setCachedSize(dst.size(sz) + req.size(sz))
req.setCachedSize(0)
}
req.md.ResourceMetrics().MoveAndAppendTo(dst.md.ResourceMetrics())
}
func (req *metricsRequest) split(cfg exporterbatcher.SizeConfig) ([]Request, error) {
func (req *metricsRequest) split(maxSize int, sz sizer.MetricsSizer) []Request {
var res []Request
for req.ItemsCount() > cfg.MaxSize {
md := extractMetrics(req.md, cfg.MaxSize)
size := md.DataPointCount()
req.setCachedItemsCount(req.ItemsCount() - size)
res = append(res, &metricsRequest{md: md, pusher: req.pusher, cachedItemsCount: size})
for req.size(sz) > maxSize {
md, rmSize := extractMetrics(req.md, maxSize, sz)
req.setCachedSize(req.size(sz) - rmSize)
res = append(res, newMetricsRequest(md, req.pusher))
}
res = append(res, req)
return res, nil
return res
}
// extractMetrics extracts metrics from srcMetrics until count of data points is reached.
func extractMetrics(srcMetrics pmetric.Metrics, count int) pmetric.Metrics {
// extractMetrics extracts metrics from srcMetrics until capacity is reached.
func extractMetrics(srcMetrics pmetric.Metrics, capacity int, sz sizer.MetricsSizer) (pmetric.Metrics, int) {
destMetrics := pmetric.NewMetrics()
capacityLeft := capacity - sz.MetricsSize(destMetrics)
removedSize := 0
srcMetrics.ResourceMetrics().RemoveIf(func(srcRM pmetric.ResourceMetrics) bool {
if count == 0 {
// If the no more capacity left just return.
if capacityLeft == 0 {
return false
}
needToExtract := resourceDataPointsCount(srcRM) > count
if needToExtract {
srcRM = extractResourceMetrics(srcRM, count)
rawRlSize := sz.ResourceMetricsSize(srcRM)
rlSize := sz.DeltaSize(rawRlSize)
if rlSize > capacityLeft {
extSrcRM, extRmSize := extractResourceMetrics(srcRM, capacityLeft, sz)
// This cannot make it to exactly 0 for the bytes,
// force it to be 0 since that is the stopping condition.
capacityLeft = 0
removedSize += extRmSize
// There represents the delta between the delta sizes.
removedSize += rlSize - rawRlSize - (sz.DeltaSize(rawRlSize-extRmSize) - (rawRlSize - extRmSize))
// It is possible that for the bytes scenario, the extracted field contains no scope metrics.
// Do not add it to the destination if that is the case.
if extSrcRM.ScopeMetrics().Len() > 0 {
extSrcRM.MoveTo(destMetrics.ResourceMetrics().AppendEmpty())
}
return extSrcRM.ScopeMetrics().Len() != 0
}
count -= resourceDataPointsCount(srcRM)
capacityLeft -= rlSize
removedSize += rlSize
srcRM.MoveTo(destMetrics.ResourceMetrics().AppendEmpty())
return !needToExtract
return true
})
return destMetrics
return destMetrics, removedSize
}
// extractResourceMetrics extracts resource metrics and returns a new resource metrics with the specified number of data points.
func extractResourceMetrics(srcRM pmetric.ResourceMetrics, count int) pmetric.ResourceMetrics {
func extractResourceMetrics(srcRM pmetric.ResourceMetrics, capacity int, sz sizer.MetricsSizer) (pmetric.ResourceMetrics, int) {
destRM := pmetric.NewResourceMetrics()
destRM.SetSchemaUrl(srcRM.SchemaUrl())
srcRM.Resource().CopyTo(destRM.Resource())
// Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size.
capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.ResourceMetricsSize(destRM)
removedSize := 0
srcRM.ScopeMetrics().RemoveIf(func(srcSM pmetric.ScopeMetrics) bool {
if count == 0 {
// If the no more capacity left just return.
if capacityLeft == 0 {
return false
}
needToExtract := scopeDataPointsCount(srcSM) > count
if needToExtract {
srcSM = extractScopeMetrics(srcSM, count)
rawSmSize := sz.ScopeMetricsSize(srcSM)
smSize := sz.DeltaSize(rawSmSize)
if smSize > capacityLeft {
extSrcSM, extSmSize := extractScopeMetrics(srcSM, capacityLeft, sz)
// This cannot make it to exactly 0 for the bytes,
// force it to be 0 since that is the stopping condition.
capacityLeft = 0
removedSize += extSmSize
// There represents the delta between the delta sizes.
removedSize += smSize - rawSmSize - (sz.DeltaSize(rawSmSize-extSmSize) - (rawSmSize - extSmSize))
// It is possible that for the bytes scenario, the extracted field contains no scope metrics.
// Do not add it to the destination if that is the case.
if extSrcSM.Metrics().Len() > 0 {
extSrcSM.MoveTo(destRM.ScopeMetrics().AppendEmpty())
}
return extSrcSM.Metrics().Len() != 0
}
count -= scopeDataPointsCount(srcSM)
capacityLeft -= smSize
removedSize += smSize
srcSM.MoveTo(destRM.ScopeMetrics().AppendEmpty())
return !needToExtract
return true
})
return destRM
return destRM, removedSize
}
// extractScopeMetrics extracts scope metrics and returns a new scope metrics with the specified number of data points.
func extractScopeMetrics(srcSM pmetric.ScopeMetrics, count int) pmetric.ScopeMetrics {
func extractScopeMetrics(srcSM pmetric.ScopeMetrics, capacity int, sz sizer.MetricsSizer) (pmetric.ScopeMetrics, int) {
destSM := pmetric.NewScopeMetrics()
destSM.SetSchemaUrl(srcSM.SchemaUrl())
srcSM.Scope().CopyTo(destSM.Scope())
srcSM.Metrics().RemoveIf(func(srcMetric pmetric.Metric) bool {
if count == 0 {
// Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size.
capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.ScopeMetricsSize(destSM)
removedSize := 0
srcSM.Metrics().RemoveIf(func(srcSM pmetric.Metric) bool {
// If the no more capacity left just return.
if capacityLeft == 0 {
return false
}
needToExtract := metricDataPointCount(srcMetric) > count
if needToExtract {
srcMetric = extractMetricDataPoints(srcMetric, count)
rawRmSize := sz.MetricSize(srcSM)
rmSize := sz.DeltaSize(rawRmSize)
if rmSize > capacityLeft {
extSrcSM, extRmSize := extractMetricDataPoints(srcSM, capacityLeft, sz)
// This cannot make it to exactly 0 for the bytes,
// force it to be 0 since that is the stopping condition.
capacityLeft = 0
removedSize += extRmSize
// There represents the delta between the delta sizes.
removedSize += rmSize - rawRmSize - (sz.DeltaSize(rawRmSize-extRmSize) - (rawRmSize - extRmSize))
// It is possible that for the bytes scenario, the extracted field contains no datapoints.
// Do not add it to the destination if that is the case.
if sz.MetricSize(extSrcSM) > 0 {
extSrcSM.MoveTo(destSM.Metrics().AppendEmpty())
}
return sz.MetricSize(extSrcSM) != 0
}
count -= metricDataPointCount(srcMetric)
srcMetric.MoveTo(destSM.Metrics().AppendEmpty())
return !needToExtract
capacityLeft -= rmSize
removedSize += rmSize
srcSM.MoveTo(destSM.Metrics().AppendEmpty())
return true
})
return destSM
return destSM, removedSize
}
func extractMetricDataPoints(srcMetric pmetric.Metric, count int) pmetric.Metric {
destMetric := pmetric.NewMetric()
func extractMetricDataPoints(srcMetric pmetric.Metric, capacity int, sz sizer.MetricsSizer) (pmetric.Metric, int) {
var destMetric pmetric.Metric
var removedSize int
switch srcMetric.Type() {
case pmetric.MetricTypeGauge:
extractGaugeDataPoints(srcMetric.Gauge(), count, destMetric.SetEmptyGauge())
destMetric, removedSize = extractGaugeDataPoints(srcMetric.Gauge(), capacity, sz)
case pmetric.MetricTypeSum:
extractSumDataPoints(srcMetric.Sum(), count, destMetric.SetEmptySum())
destMetric, removedSize = extractSumDataPoints(srcMetric.Sum(), capacity, sz)
case pmetric.MetricTypeHistogram:
extractHistogramDataPoints(srcMetric.Histogram(), count, destMetric.SetEmptyHistogram())
destMetric, removedSize = extractHistogramDataPoints(srcMetric.Histogram(), capacity, sz)
case pmetric.MetricTypeExponentialHistogram:
extractExponentialHistogramDataPoints(srcMetric.ExponentialHistogram(), count,
destMetric.SetEmptyExponentialHistogram())
destMetric, removedSize = extractExponentialHistogramDataPoints(srcMetric.ExponentialHistogram(), capacity, sz)
case pmetric.MetricTypeSummary:
extractSummaryDataPoints(srcMetric.Summary(), count, destMetric.SetEmptySummary())
destMetric, removedSize = extractSummaryDataPoints(srcMetric.Summary(), capacity, sz)
}
return destMetric
return destMetric, removedSize
}
func extractGaugeDataPoints(srcGauge pmetric.Gauge, count int, destGauge pmetric.Gauge) {
func extractGaugeDataPoints(srcGauge pmetric.Gauge, capacity int, sz sizer.MetricsSizer) (pmetric.Metric, int) {
m := pmetric.NewMetric()
destGauge := m.SetEmptyGauge()
// Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size.
capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.MetricSize(m)
removedSize := 0
srcGauge.DataPoints().RemoveIf(func(srcDP pmetric.NumberDataPoint) bool {
if count == 0 {
// If the no more capacity left just return.
if capacityLeft == 0 {
return false
}
rdSize := sz.DeltaSize(sz.NumberDataPointSize(srcDP))
if rdSize > capacityLeft {
// This cannot make it to exactly 0 for the bytes,
// force it to be 0 since that is the stopping condition.
capacityLeft = 0
return false
}
capacityLeft -= rdSize
removedSize += rdSize
srcDP.MoveTo(destGauge.DataPoints().AppendEmpty())
count--
return true
})
return m, removedSize
}
func extractSumDataPoints(srcSum pmetric.Sum, count int, destSum pmetric.Sum) {
func extractSumDataPoints(srcSum pmetric.Sum, capacity int, sz sizer.MetricsSizer) (pmetric.Metric, int) {
m := pmetric.NewMetric()
destSum := m.SetEmptySum()
// Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size.
capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.MetricSize(m)
removedSize := 0
srcSum.DataPoints().RemoveIf(func(srcDP pmetric.NumberDataPoint) bool {
if count == 0 {
// If the no more capacity left just return.
if capacityLeft == 0 {
return false
}
rdSize := sz.DeltaSize(sz.NumberDataPointSize(srcDP))
if rdSize > capacityLeft {
// This cannot make it to exactly 0 for the bytes,
// force it to be 0 since that is the stopping condition.
capacityLeft = 0
return false
}
capacityLeft -= rdSize
removedSize += rdSize
srcDP.MoveTo(destSum.DataPoints().AppendEmpty())
count--
return true
})
return m, removedSize
}
func extractHistogramDataPoints(srcHistogram pmetric.Histogram, count int, destHistogram pmetric.Histogram) {
func extractHistogramDataPoints(srcHistogram pmetric.Histogram, capacity int, sz sizer.MetricsSizer) (pmetric.Metric, int) {
m := pmetric.NewMetric()
destHistogram := m.SetEmptyHistogram()
// Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size.
capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.MetricSize(m)
removedSize := 0
srcHistogram.DataPoints().RemoveIf(func(srcDP pmetric.HistogramDataPoint) bool {
if count == 0 {
// If the no more capacity left just return.
if capacityLeft == 0 {
return false
}
rdSize := sz.DeltaSize(sz.HistogramDataPointSize(srcDP))
if rdSize > capacityLeft {
// This cannot make it to exactly 0 for the bytes,
// force it to be 0 since that is the stopping condition.
capacityLeft = 0
return false
}
capacityLeft -= rdSize
removedSize += rdSize
srcDP.MoveTo(destHistogram.DataPoints().AppendEmpty())
count--
return true
})
return m, removedSize
}
func extractExponentialHistogramDataPoints(srcExponentialHistogram pmetric.ExponentialHistogram, count int, destExponentialHistogram pmetric.ExponentialHistogram) {
func extractExponentialHistogramDataPoints(srcExponentialHistogram pmetric.ExponentialHistogram, capacity int, sz sizer.MetricsSizer) (pmetric.Metric, int) {
m := pmetric.NewMetric()
destExponentialHistogram := m.SetEmptyExponentialHistogram()
// Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size.
capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.MetricSize(m)
removedSize := 0
srcExponentialHistogram.DataPoints().RemoveIf(func(srcDP pmetric.ExponentialHistogramDataPoint) bool {
if count == 0 {
// If the no more capacity left just return.
if capacityLeft == 0 {
return false
}
rdSize := sz.DeltaSize(sz.ExponentialHistogramDataPointSize(srcDP))
if rdSize > capacityLeft {
// This cannot make it to exactly 0 for the bytes,
// force it to be 0 since that is the stopping condition.
capacityLeft = 0
return false
}
capacityLeft -= rdSize
removedSize += rdSize
srcDP.MoveTo(destExponentialHistogram.DataPoints().AppendEmpty())
count--
return true
})
return m, removedSize
}
func extractSummaryDataPoints(srcSummary pmetric.Summary, count int, destSummary pmetric.Summary) {
func extractSummaryDataPoints(srcSummary pmetric.Summary, capacity int, sz sizer.MetricsSizer) (pmetric.Metric, int) {
m := pmetric.NewMetric()
destSummary := m.SetEmptySummary()
// Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size.
capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.MetricSize(m)
removedSize := 0
srcSummary.DataPoints().RemoveIf(func(srcDP pmetric.SummaryDataPoint) bool {
if count == 0 {
// If the no more capacity left just return.
if capacityLeft == 0 {
return false
}
rdSize := sz.DeltaSize(sz.SummaryDataPointSize(srcDP))
if rdSize > capacityLeft {
// This cannot make it to exactly 0 for the bytes,
// force it to be 0 since that is the stopping condition.
capacityLeft = 0
return false
}
capacityLeft -= rdSize
removedSize += rdSize
srcDP.MoveTo(destSummary.DataPoints().AppendEmpty())
count--
return true
})
}
func resourceDataPointsCount(rm pmetric.ResourceMetrics) (count int) {
for i := 0; i < rm.ScopeMetrics().Len(); i++ {
count += scopeDataPointsCount(rm.ScopeMetrics().At(i))
}
return count
}
func scopeDataPointsCount(sm pmetric.ScopeMetrics) (count int) {
for i := 0; i < sm.Metrics().Len(); i++ {
count += metricDataPointCount(sm.Metrics().At(i))
}
return count
}
func metricDataPointCount(m pmetric.Metric) int {
switch m.Type() {
case pmetric.MetricTypeGauge:
return m.Gauge().DataPoints().Len()
case pmetric.MetricTypeSum:
return m.Sum().DataPoints().Len()
case pmetric.MetricTypeHistogram:
return m.Histogram().DataPoints().Len()
case pmetric.MetricTypeExponentialHistogram:
return m.ExponentialHistogram().DataPoints().Len()
case pmetric.MetricTypeSummary:
return m.Summary().DataPoints().Len()
}
return 0
return m, removedSize
}

View File

@ -11,10 +11,13 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/testdata"
)
var metricsBytesSizer = &sizer.MetricsBytesSizer{}
func TestMergeMetrics(t *testing.T) {
mr1 := newMetricsRequest(testdata.GenerateMetrics(2), nil)
mr2 := newMetricsRequest(testdata.GenerateMetrics(3), nil)
@ -32,6 +35,7 @@ func TestMergeMetricsInvalidInput(t *testing.T) {
}
func TestMergeSplitMetrics(t *testing.T) {
s := sizer.MetricsCountSizer{}
tests := []struct {
name string
cfg exporterbatcher.SizeConfig
@ -124,7 +128,9 @@ func TestMergeSplitMetrics(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, len(tt.expected), len(res))
for i := range res {
assert.Equal(t, tt.expected[i], res[i])
expected := tt.expected[i].(*metricsRequest)
actual := res[i].(*metricsRequest)
assert.Equal(t, expected.size(&s), actual.size(&s))
}
})
}
@ -148,7 +154,7 @@ func TestMergeSplitMetricsInvalidInput(t *testing.T) {
func TestExtractMetrics(t *testing.T) {
for i := 0; i < 20; i++ {
md := testdata.GenerateMetrics(10)
extractedMetrics := extractMetrics(md, i)
extractedMetrics, _ := extractMetrics(md, i, &sizer.MetricsCountSizer{})
assert.Equal(t, i, extractedMetrics.DataPointCount())
assert.Equal(t, 20-i, md.DataPointCount())
}
@ -156,7 +162,7 @@ func TestExtractMetrics(t *testing.T) {
func TestExtractMetricsInvalidMetric(t *testing.T) {
md := testdata.GenerateMetricsMetricTypeInvalid()
extractedMetrics := extractMetrics(md, 10)
extractedMetrics, _ := extractMetrics(md, 10, &sizer.MetricsCountSizer{})
assert.Equal(t, testdata.GenerateMetricsMetricTypeInvalid(), extractedMetrics)
assert.Equal(t, 0, md.ResourceMetrics().Len())
}
@ -215,3 +221,382 @@ func BenchmarkSplittingBasedOnItemCountHugeMetrics(b *testing.B) {
assert.Len(b, merged, 10)
}
}
func TestMergeSplitMetricsBasedOnByteSize(t *testing.T) {
s := sizer.MetricsBytesSizer{}
tests := []struct {
name string
cfg exporterbatcher.SizeConfig
mr1 Request
mr2 Request
expectedSizes []int
}{
{
name: "both_requests_empty",
cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: metricsBytesSizer.MetricsSize(testdata.GenerateMetrics(10))},
mr1: newMetricsRequest(pmetric.NewMetrics(), nil),
mr2: newMetricsRequest(pmetric.NewMetrics(), nil),
expectedSizes: []int{0},
},
{
name: "first_request_empty",
cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: metricsBytesSizer.MetricsSize(testdata.GenerateMetrics(10))},
mr1: newMetricsRequest(pmetric.NewMetrics(), nil),
mr2: newMetricsRequest(testdata.GenerateMetrics(5), nil),
expectedSizes: []int{1035},
},
{
name: "first_empty_second_nil",
cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: metricsBytesSizer.MetricsSize(testdata.GenerateMetrics(10))},
mr1: newMetricsRequest(pmetric.NewMetrics(), nil),
mr2: nil,
expectedSizes: []int{0},
},
{
name: "merge_only",
cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: metricsBytesSizer.MetricsSize(testdata.GenerateMetrics(11))},
mr1: newMetricsRequest(testdata.GenerateMetrics(4), nil),
mr2: newMetricsRequest(testdata.GenerateMetrics(6), nil),
expectedSizes: []int{2102},
},
{
name: "split_only",
cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: s.MetricsSize(testdata.GenerateMetrics(4))},
mr1: newMetricsRequest(pmetric.NewMetrics(), nil),
mr2: newMetricsRequest(testdata.GenerateMetrics(10), nil),
expectedSizes: []int{706, 504, 625, 378},
},
{
name: "merge_and_split",
cfg: exporterbatcher.SizeConfig{
Sizer: exporterbatcher.SizerTypeBytes,
MaxSize: metricsBytesSizer.MetricsSize(testdata.GenerateMetrics(10))/2 + metricsBytesSizer.MetricsSize(testdata.GenerateMetrics(11))/2,
},
mr1: newMetricsRequest(testdata.GenerateMetrics(8), nil),
mr2: newMetricsRequest(testdata.GenerateMetrics(20), nil),
expectedSizes: []int{2107, 2022, 1954, 290},
},
{
name: "scope_metrics_split",
cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: metricsBytesSizer.MetricsSize(testdata.GenerateMetrics(4))},
mr1: newMetricsRequest(func() pmetric.Metrics {
md := testdata.GenerateMetrics(4)
extraScopeMetrics := md.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty()
testdata.GenerateMetrics(4).ResourceMetrics().At(0).ScopeMetrics().At(0).MoveTo(extraScopeMetrics)
extraScopeMetrics.Scope().SetName("extra scope")
return md
}(), nil),
mr2: nil,
expectedSizes: []int{706, 700, 85},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res, err := tt.mr1.MergeSplit(context.Background(), tt.cfg, tt.mr2)
require.NoError(t, err)
assert.Equal(t, len(tt.expectedSizes), len(res))
for i := range res {
assert.Equal(t, tt.expectedSizes[i], res[i].(*metricsRequest).size(&s))
}
})
}
}
func TestMetricsRequest_MergeSplit_UnknownSizerType(t *testing.T) {
// Create a logs request
req := newMetricsRequest(pmetric.NewMetrics(), nil)
// Create config with invalid sizer type by using zero value
cfg := exporterbatcher.SizeConfig{
Sizer: exporterbatcher.SizerType{}, // Empty struct will have empty string as val
}
// Call MergeSplit with invalid sizer
result, err := req.MergeSplit(context.Background(), cfg, nil)
// Verify results
assert.Nil(t, result)
assert.EqualError(t, err, "unknown sizer type")
}
func TestExtractGaugeDataPoints(t *testing.T) {
tests := []struct {
name string
capacity int
numDataPoints int
expectedPoints int
}{
{
name: "extract_all_points",
capacity: 100,
numDataPoints: 2,
expectedPoints: 2,
},
{
name: "extract_partial_points",
capacity: 1,
numDataPoints: 2,
expectedPoints: 1,
},
{
name: "no_capacity",
capacity: 0,
numDataPoints: 2,
expectedPoints: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srcMetric := pmetric.NewMetric()
gauge := srcMetric.SetEmptyGauge()
for i := 0; i < tt.numDataPoints; i++ {
dp := gauge.DataPoints().AppendEmpty()
dp.SetIntValue(int64(i))
}
sz := &mockMetricsSizer{dpSize: 1}
destMetric, removedSize := extractGaugeDataPoints(gauge, tt.capacity, sz)
assert.Equal(t, tt.expectedPoints, destMetric.Gauge().DataPoints().Len())
if tt.expectedPoints > 0 {
assert.Equal(t, tt.expectedPoints, removedSize)
}
})
}
}
func TestExtractSumDataPoints(t *testing.T) {
tests := []struct {
name string
capacity int
numDataPoints int
expectedPoints int
}{
{
name: "extract_all_points",
capacity: 100,
numDataPoints: 2,
expectedPoints: 2,
},
{
name: "extract_partial_points",
capacity: 1,
numDataPoints: 2,
expectedPoints: 1,
},
{
name: "no_capacity",
capacity: 0,
numDataPoints: 2,
expectedPoints: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srcMetric := pmetric.NewMetric()
sum := srcMetric.SetEmptySum()
for i := 0; i < tt.numDataPoints; i++ {
dp := sum.DataPoints().AppendEmpty()
dp.SetIntValue(int64(i))
}
sz := &mockMetricsSizer{dpSize: 1}
destMetric, removedSize := extractSumDataPoints(sum, tt.capacity, sz)
assert.Equal(t, tt.expectedPoints, destMetric.Sum().DataPoints().Len())
if tt.expectedPoints > 0 {
assert.Equal(t, tt.expectedPoints, removedSize)
}
})
}
}
func TestExtractHistogramDataPoints(t *testing.T) {
tests := []struct {
name string
capacity int
numDataPoints int
expectedPoints int
}{
{
name: "extract_all_points",
capacity: 100,
numDataPoints: 2,
expectedPoints: 2,
},
{
name: "extract_partial_points",
capacity: 1,
numDataPoints: 2,
expectedPoints: 1,
},
{
name: "no_capacity",
capacity: 0,
numDataPoints: 2,
expectedPoints: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srcMetric := pmetric.NewMetric()
histogram := srcMetric.SetEmptyHistogram()
for i := 0; i < tt.numDataPoints; i++ {
dp := histogram.DataPoints().AppendEmpty()
dp.SetCount(uint64(i)) //nolint:gosec // disable G115
}
sz := &mockMetricsSizer{dpSize: 1}
destMetric, removedSize := extractHistogramDataPoints(histogram, tt.capacity, sz)
assert.Equal(t, tt.expectedPoints, destMetric.Histogram().DataPoints().Len())
if tt.expectedPoints > 0 {
assert.Equal(t, tt.expectedPoints, removedSize)
}
})
}
}
func TestExtractExponentialHistogramDataPoints(t *testing.T) {
tests := []struct {
name string
capacity int
numDataPoints int
expectedPoints int
}{
{
name: "extract_all_points",
capacity: 100,
numDataPoints: 2,
expectedPoints: 2,
},
{
name: "extract_partial_points",
capacity: 1,
numDataPoints: 2,
expectedPoints: 1,
},
{
name: "no_capacity",
capacity: 0,
numDataPoints: 2,
expectedPoints: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srcMetric := pmetric.NewMetric()
expHistogram := srcMetric.SetEmptyExponentialHistogram()
for i := 0; i < tt.numDataPoints; i++ {
dp := expHistogram.DataPoints().AppendEmpty()
dp.SetCount(uint64(i)) //nolint:gosec // disable G115
}
sz := &mockMetricsSizer{dpSize: 1}
destMetric, removedSize := extractExponentialHistogramDataPoints(expHistogram, tt.capacity, sz)
assert.Equal(t, tt.expectedPoints, destMetric.ExponentialHistogram().DataPoints().Len())
if tt.expectedPoints > 0 {
assert.Equal(t, tt.expectedPoints, removedSize)
}
})
}
}
func TestExtractSummaryDataPoints(t *testing.T) {
tests := []struct {
name string
capacity int
numDataPoints int
expectedPoints int
}{
{
name: "extract_all_points",
capacity: 100,
numDataPoints: 2,
expectedPoints: 2,
},
{
name: "extract_partial_points",
capacity: 1,
numDataPoints: 2,
expectedPoints: 1,
},
{
name: "no_capacity",
capacity: 0,
numDataPoints: 2,
expectedPoints: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srcMetric := pmetric.NewMetric()
summary := srcMetric.SetEmptySummary()
for i := 0; i < tt.numDataPoints; i++ {
dp := summary.DataPoints().AppendEmpty()
dp.SetCount(uint64(i)) //nolint:gosec // disable G115
}
sz := &mockMetricsSizer{dpSize: 1}
destMetric, removedSize := extractSummaryDataPoints(summary, tt.capacity, sz)
assert.Equal(t, tt.expectedPoints, destMetric.Summary().DataPoints().Len())
if tt.expectedPoints > 0 {
assert.Equal(t, tt.expectedPoints, removedSize)
}
})
}
}
// mockMetricsSizer implements sizer.MetricsSizer interface for testing
type mockMetricsSizer struct {
dpSize int
}
func (m *mockMetricsSizer) MetricsSize(_ pmetric.Metrics) int {
return 0
}
func (m *mockMetricsSizer) MetricSize(_ pmetric.Metric) int {
return 0
}
func (m *mockMetricsSizer) NumberDataPointSize(_ pmetric.NumberDataPoint) int {
return m.dpSize
}
func (m *mockMetricsSizer) HistogramDataPointSize(_ pmetric.HistogramDataPoint) int {
return m.dpSize
}
func (m *mockMetricsSizer) ExponentialHistogramDataPointSize(_ pmetric.ExponentialHistogramDataPoint) int {
return m.dpSize
}
func (m *mockMetricsSizer) SummaryDataPointSize(_ pmetric.SummaryDataPoint) int {
return m.dpSize
}
func (m *mockMetricsSizer) ResourceMetricsSize(_ pmetric.ResourceMetrics) int {
return 0
}
func (m *mockMetricsSizer) ScopeMetricsSize(_ pmetric.ScopeMetrics) int {
return 0
}
func (m *mockMetricsSizer) DeltaSize(size int) int {
return size
}

View File

@ -22,6 +22,34 @@ func (e *ProtoMarshaler) MetricsSize(md Metrics) int {
return pb.Size()
}
func (e *ProtoMarshaler) ResourceMetricsSize(rm ResourceMetrics) int {
return rm.orig.Size()
}
func (e *ProtoMarshaler) ScopeMetricsSize(sm ScopeMetrics) int {
return sm.orig.Size()
}
func (e *ProtoMarshaler) MetricSize(m Metric) int {
return m.orig.Size()
}
func (e *ProtoMarshaler) NumberDataPointSize(ndp NumberDataPoint) int {
return ndp.orig.Size()
}
func (e *ProtoMarshaler) SummaryDataPointSize(sdps SummaryDataPoint) int {
return sdps.orig.Size()
}
func (e *ProtoMarshaler) HistogramDataPointSize(hdp HistogramDataPoint) int {
return hdp.orig.Size()
}
func (e *ProtoMarshaler) ExponentialHistogramDataPointSize(ehdp ExponentialHistogramDataPoint) int {
return ehdp.orig.Size()
}
type ProtoUnmarshaler struct{}
func (d *ProtoUnmarshaler) UnmarshalMetrics(buf []byte) (Metrics, error) {