From 6ae1c98f63e62a38a9aaad098ed3289a151f5759 Mon Sep 17 00:00:00 2001 From: Israel Blancas Date: Mon, 10 Mar 2025 21:39:26 +0100 Subject: [PATCH] Bytes based batching for metrics (#12550) #### Description This PR implements serialized bytes based batching for metrics. #### Link to tracking issue https://github.com/open-telemetry/opentelemetry-collector/issues/3262 Continuation of #12299 made by @sfc-gh-sili. Signed-off-by: Israel Blancas --- .../internal/sizer/logs_sizer.go | 23 +- .../internal/sizer/logs_sizer_test.go | 7 - .../internal/sizer/metrics_sizer.go | 85 ++++ .../internal/sizer/metrics_sizer_test.go | 67 +++ .../internal/sizer/proto_delta_sizer.go | 34 ++ .../internal/sizer/proto_delta_sizer_test.go | 17 + exporter/exporterhelper/logs_batch_test.go | 17 + exporter/exporterhelper/metrics.go | 26 +- exporter/exporterhelper/metrics_batch.go | 303 +++++++++----- exporter/exporterhelper/metrics_batch_test.go | 391 +++++++++++++++++- pdata/pmetric/pb.go | 28 ++ 11 files changed, 864 insertions(+), 134 deletions(-) create mode 100644 exporter/exporterhelper/internal/sizer/metrics_sizer.go create mode 100644 exporter/exporterhelper/internal/sizer/metrics_sizer_test.go create mode 100644 exporter/exporterhelper/internal/sizer/proto_delta_sizer.go create mode 100644 exporter/exporterhelper/internal/sizer/proto_delta_sizer_test.go diff --git a/exporter/exporterhelper/internal/sizer/logs_sizer.go b/exporter/exporterhelper/internal/sizer/logs_sizer.go index 90667cacbc..197fadabb0 100644 --- a/exporter/exporterhelper/internal/sizer/logs_sizer.go +++ b/exporter/exporterhelper/internal/sizer/logs_sizer.go @@ -4,8 +4,6 @@ package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" import ( - math_bits "math/bits" - "go.opentelemetry.io/collector/pdata/plog" ) @@ -22,22 +20,7 @@ type LogsSizer interface { // LogsBytesSizer returns the byte size of serialized protos. type LogsBytesSizer struct { plog.ProtoMarshaler -} - -// DeltaSize returns the delta size of a proto slice when a new item is added. -// Example: -// -// prevSize := proto1.Size() -// proto1.RepeatedField().AppendEmpty() = proto2 -// -// Then currSize of proto1 can be calculated as -// -// currSize := (prevSize + sizer.DeltaSize(proto2.Size())) -// -// This is derived from opentelemetry-collector/pdata/internal/data/protogen/logs/v1/logs.pb.go -// which is generated with gogo/protobuf. -func (s *LogsBytesSizer) DeltaSize(newItemSize int) int { - return 1 + newItemSize + sov(uint64(newItemSize)) //nolint:gosec // disable G115 + protoDeltaSizer } // LogsCountSizer returns the nunmber of logs entries. @@ -66,7 +49,3 @@ func (s *LogsCountSizer) LogRecordSize(_ plog.LogRecord) int { func (s *LogsCountSizer) DeltaSize(newItemSize int) int { return newItemSize } - -func sov(x uint64) int { - return (math_bits.Len64(x|1) + 6) / 7 -} diff --git a/exporter/exporterhelper/internal/sizer/logs_sizer_test.go b/exporter/exporterhelper/internal/sizer/logs_sizer_test.go index d254e3d5a5..196d1655ed 100644 --- a/exporter/exporterhelper/internal/sizer/logs_sizer_test.go +++ b/exporter/exporterhelper/internal/sizer/logs_sizer_test.go @@ -55,10 +55,3 @@ func TestLogsBytesSizer(t *testing.T) { lr.CopyTo(sl.LogRecords().AppendEmpty()) require.Equal(t, sizer.ScopeLogsSize(sl), prevSize+sizer.DeltaSize(sizer.LogRecordSize(lr))) } - -func TestLogsBytesDeltaSize(t *testing.T) { - sizer := LogsBytesSizer{} - require.Equal(t, 129, sizer.DeltaSize(127)) - require.Equal(t, 131, sizer.DeltaSize(128)) - require.Equal(t, 242, sizer.DeltaSize(239)) -} diff --git a/exporter/exporterhelper/internal/sizer/metrics_sizer.go b/exporter/exporterhelper/internal/sizer/metrics_sizer.go new file mode 100644 index 0000000000..4b97aab4cd --- /dev/null +++ b/exporter/exporterhelper/internal/sizer/metrics_sizer.go @@ -0,0 +1,85 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" + +import ( + "go.opentelemetry.io/collector/pdata/pmetric" +) // MetricsCountSizer returns the nunmber of metrics entries. + +type MetricsSizer interface { + MetricsSize(md pmetric.Metrics) (count int) + ResourceMetricsSize(rm pmetric.ResourceMetrics) (count int) + ScopeMetricsSize(sm pmetric.ScopeMetrics) (count int) + MetricSize(m pmetric.Metric) int + DeltaSize(newItemSize int) int + NumberDataPointSize(ndp pmetric.NumberDataPoint) int + HistogramDataPointSize(hdp pmetric.HistogramDataPoint) int + ExponentialHistogramDataPointSize(ehdp pmetric.ExponentialHistogramDataPoint) int + SummaryDataPointSize(sdps pmetric.SummaryDataPoint) int +} + +type MetricsBytesSizer struct { + pmetric.ProtoMarshaler + protoDeltaSizer +} + +var _ MetricsSizer = &MetricsBytesSizer{} + +type MetricsCountSizer struct{} + +var _ MetricsSizer = &MetricsCountSizer{} + +func (s *MetricsCountSizer) MetricsSize(md pmetric.Metrics) int { + return md.DataPointCount() +} + +func (s *MetricsCountSizer) ResourceMetricsSize(rm pmetric.ResourceMetrics) (count int) { + for i := 0; i < rm.ScopeMetrics().Len(); i++ { + count += s.ScopeMetricsSize(rm.ScopeMetrics().At(i)) + } + return count +} + +func (s *MetricsCountSizer) ScopeMetricsSize(sm pmetric.ScopeMetrics) (count int) { + for i := 0; i < sm.Metrics().Len(); i++ { + count += s.MetricSize(sm.Metrics().At(i)) + } + return count +} + +func (s *MetricsCountSizer) MetricSize(m pmetric.Metric) int { + switch m.Type() { + case pmetric.MetricTypeGauge: + return m.Gauge().DataPoints().Len() + case pmetric.MetricTypeSum: + return m.Sum().DataPoints().Len() + case pmetric.MetricTypeHistogram: + return m.Histogram().DataPoints().Len() + case pmetric.MetricTypeExponentialHistogram: + return m.ExponentialHistogram().DataPoints().Len() + case pmetric.MetricTypeSummary: + return m.Summary().DataPoints().Len() + } + return 0 +} + +func (s *MetricsCountSizer) DeltaSize(newItemSize int) int { + return newItemSize +} + +func (s *MetricsCountSizer) NumberDataPointSize(_ pmetric.NumberDataPoint) int { + return 1 +} + +func (s *MetricsCountSizer) HistogramDataPointSize(_ pmetric.HistogramDataPoint) int { + return 1 +} + +func (s *MetricsCountSizer) ExponentialHistogramDataPointSize(_ pmetric.ExponentialHistogramDataPoint) int { + return 1 +} + +func (s *MetricsCountSizer) SummaryDataPointSize(_ pmetric.SummaryDataPoint) int { + return 1 +} diff --git a/exporter/exporterhelper/internal/sizer/metrics_sizer_test.go b/exporter/exporterhelper/internal/sizer/metrics_sizer_test.go new file mode 100644 index 0000000000..48df9728fa --- /dev/null +++ b/exporter/exporterhelper/internal/sizer/metrics_sizer_test.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/pdata/testdata" +) + +func TestMetricsCountSizer(t *testing.T) { + md := testdata.GenerateMetrics(7) + sizer := MetricsCountSizer{} + require.Equal(t, 14, sizer.MetricsSize(md)) + + rm := md.ResourceMetrics().At(0) + require.Equal(t, 14, sizer.ResourceMetricsSize(rm)) + + sm := rm.ScopeMetrics().At(0) + require.Equal(t, 14, sizer.ScopeMetricsSize(sm)) + + // Test different metric types + require.Equal(t, 2, sizer.MetricSize(sm.Metrics().At(0))) + + // Test data point sizes + require.Equal(t, 1, sizer.NumberDataPointSize(sm.Metrics().At(0).Gauge().DataPoints().At(0))) + require.Equal(t, 1, sizer.NumberDataPointSize(sm.Metrics().At(1).Gauge().DataPoints().At(0))) + require.Equal(t, 1, sizer.NumberDataPointSize(sm.Metrics().At(2).Sum().DataPoints().At(0))) + require.Equal(t, 1, sizer.NumberDataPointSize(sm.Metrics().At(3).Sum().DataPoints().At(0))) + require.Equal(t, 1, sizer.HistogramDataPointSize(sm.Metrics().At(4).Histogram().DataPoints().At(0))) + require.Equal(t, 1, sizer.ExponentialHistogramDataPointSize(sm.Metrics().At(5).ExponentialHistogram().DataPoints().At(0))) + require.Equal(t, 1, sizer.SummaryDataPointSize(sm.Metrics().At(6).Summary().DataPoints().At(0))) + + prevSize := sizer.ScopeMetricsSize(sm) + sm.Metrics().At(0).CopyTo(sm.Metrics().AppendEmpty()) + require.Equal(t, sizer.ScopeMetricsSize(sm), prevSize+sizer.DeltaSize(sizer.MetricSize(sm.Metrics().At(0)))) +} + +func TestMetricsBytesSizer(t *testing.T) { + md := testdata.GenerateMetrics(7) + sizer := MetricsBytesSizer{} + require.Equal(t, 1594, sizer.MetricsSize(md)) + + rm := md.ResourceMetrics().At(0) + require.Equal(t, 1591, sizer.ResourceMetricsSize(rm)) + + sm := rm.ScopeMetrics().At(0) + require.Equal(t, 1546, sizer.ScopeMetricsSize(sm)) + + // Test different metric types + require.Equal(t, 130, sizer.MetricSize(sm.Metrics().At(0))) + + // Test data point sizes + require.Equal(t, 55, sizer.NumberDataPointSize(sm.Metrics().At(0).Gauge().DataPoints().At(0))) + require.Equal(t, 83, sizer.NumberDataPointSize(sm.Metrics().At(1).Gauge().DataPoints().At(0))) + require.Equal(t, 55, sizer.NumberDataPointSize(sm.Metrics().At(2).Sum().DataPoints().At(0))) + require.Equal(t, 83, sizer.NumberDataPointSize(sm.Metrics().At(3).Sum().DataPoints().At(0))) + require.Equal(t, 92, sizer.HistogramDataPointSize(sm.Metrics().At(4).Histogram().DataPoints().At(0))) + require.Equal(t, 119, sizer.ExponentialHistogramDataPointSize(sm.Metrics().At(5).ExponentialHistogram().DataPoints().At(0))) + require.Equal(t, 92, sizer.SummaryDataPointSize(sm.Metrics().At(6).Summary().DataPoints().At(0))) + + prevSize := sizer.ScopeMetricsSize(sm) + sm.Metrics().At(0).CopyTo(sm.Metrics().AppendEmpty()) + require.Equal(t, sizer.ScopeMetricsSize(sm), prevSize+sizer.DeltaSize(sizer.MetricSize(sm.Metrics().At(0)))) +} diff --git a/exporter/exporterhelper/internal/sizer/proto_delta_sizer.go b/exporter/exporterhelper/internal/sizer/proto_delta_sizer.go new file mode 100644 index 0000000000..1ed9c5a896 --- /dev/null +++ b/exporter/exporterhelper/internal/sizer/proto_delta_sizer.go @@ -0,0 +1,34 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" + +import ( + math_bits "math/bits" +) + +type protoDeltaSizer struct{} + +// DeltaSize() returns the delta size of a proto slice when a new item is added. +// Example: +// +// prevSize := proto1.Size() +// proto1.RepeatedField().AppendEmpty() = proto2 +// +// Then currSize of proto1 can be calculated as +// +// currSize := (prevSize + sizer.DeltaSize(proto2.Size())) +// +// This is derived from: +// - opentelemetry-collector/pdata/internal/data/protogen/metrics/v1/metrics.pb.go +// - opentelemetry-collector/pdata/internal/data/protogen/logs/v1/logs.pb.go +// - opentelemetry-collector/pdata/internal/data/protogen/traces/v1/traces.pb.go +// - opentelemetry-collector/pdata/internal/data/protogen/profiles/v1development/profiles.pb.go +// which is generated with gogo/protobuf. +func (s *protoDeltaSizer) DeltaSize(newItemSize int) int { + return 1 + newItemSize + sov(uint64(newItemSize)) //nolint:gosec // disable G115 +} + +func sov(x uint64) int { + return (math_bits.Len64(x|1) + 6) / 7 +} diff --git a/exporter/exporterhelper/internal/sizer/proto_delta_sizer_test.go b/exporter/exporterhelper/internal/sizer/proto_delta_sizer_test.go new file mode 100644 index 0000000000..afca10cc99 --- /dev/null +++ b/exporter/exporterhelper/internal/sizer/proto_delta_sizer_test.go @@ -0,0 +1,17 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sizer + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMetricsBytesDeltaSize(t *testing.T) { + sizer := protoDeltaSizer{} + require.Equal(t, 129, sizer.DeltaSize(127)) + require.Equal(t, 131, sizer.DeltaSize(128)) + require.Equal(t, 242, sizer.DeltaSize(239)) +} diff --git a/exporter/exporterhelper/logs_batch_test.go b/exporter/exporterhelper/logs_batch_test.go index ba088124ed..d9b0994398 100644 --- a/exporter/exporterhelper/logs_batch_test.go +++ b/exporter/exporterhelper/logs_batch_test.go @@ -371,3 +371,20 @@ func BenchmarkSplittingBasedOnByteSizeHugeLogs(b *testing.B) { assert.Len(b, merged, 10) } } + +func TestLogsRequest_MergeSplit_UnknownSizerType(t *testing.T) { + // Create a logs request + req := newLogsRequest(plog.NewLogs(), nil) + + // Create config with invalid sizer type by using zero value + cfg := exporterbatcher.SizeConfig{ + Sizer: exporterbatcher.SizerType{}, // Empty struct will have empty string as val + } + + // Call MergeSplit with invalid sizer + result, err := req.MergeSplit(context.Background(), cfg, nil) + + // Verify results + assert.Nil(t, result) + assert.EqualError(t, err, "unknown sizer type") +} diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index b374b8aa68..96e960aa57 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pipeline" ) @@ -24,16 +25,16 @@ var ( ) type metricsRequest struct { - md pmetric.Metrics - pusher consumer.ConsumeMetricsFunc - cachedItemsCount int + md pmetric.Metrics + pusher consumer.ConsumeMetricsFunc + cachedSize int } func newMetricsRequest(md pmetric.Metrics, pusher consumer.ConsumeMetricsFunc) Request { return &metricsRequest{ - md: md, - pusher: pusher, - cachedItemsCount: md.DataPointCount(), + md: md, + pusher: pusher, + cachedSize: -1, } } @@ -66,11 +67,18 @@ func (req *metricsRequest) Export(ctx context.Context) error { } func (req *metricsRequest) ItemsCount() int { - return req.cachedItemsCount + return req.md.DataPointCount() } -func (req *metricsRequest) setCachedItemsCount(count int) { - req.cachedItemsCount = count +func (req *metricsRequest) size(sizer sizer.MetricsSizer) int { + if req.cachedSize == -1 { + req.cachedSize = sizer.MetricsSize(req.md) + } + return req.cachedSize +} + +func (req *metricsRequest) setCachedSize(count int) { + req.cachedSize = count } type metricsExporter struct { diff --git a/exporter/exporterhelper/metrics_batch.go b/exporter/exporterhelper/metrics_batch.go index 8ded9fa36d..e259600f75 100644 --- a/exporter/exporterhelper/metrics_batch.go +++ b/exporter/exporterhelper/metrics_batch.go @@ -8,202 +8,319 @@ import ( "errors" "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" "go.opentelemetry.io/collector/pdata/pmetric" ) // MergeSplit splits and/or merges the provided metrics request and the current request into one or more requests // conforming with the MaxSizeConfig. func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.SizeConfig, r2 Request) ([]Request, error) { + var sz sizer.MetricsSizer + switch cfg.Sizer { + case exporterbatcher.SizerTypeItems: + sz = &sizer.MetricsCountSizer{} + case exporterbatcher.SizerTypeBytes: + sz = &sizer.MetricsBytesSizer{} + default: + return nil, errors.New("unknown sizer type") + } + if r2 != nil { req2, ok := r2.(*metricsRequest) if !ok { return nil, errors.New("invalid input type") } - req2.mergeTo(req) + req2.mergeTo(req, sz) } // If no limit we can simply merge the new request into the current and return. if cfg.MaxSize == 0 { return []Request{req}, nil } - return req.split(cfg) + return req.split(cfg.MaxSize, sz), nil } -func (req *metricsRequest) mergeTo(dst *metricsRequest) { - dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount()) - req.setCachedItemsCount(0) +func (req *metricsRequest) mergeTo(dst *metricsRequest, sz sizer.MetricsSizer) { + if sz != nil { + dst.setCachedSize(dst.size(sz) + req.size(sz)) + req.setCachedSize(0) + } req.md.ResourceMetrics().MoveAndAppendTo(dst.md.ResourceMetrics()) } -func (req *metricsRequest) split(cfg exporterbatcher.SizeConfig) ([]Request, error) { +func (req *metricsRequest) split(maxSize int, sz sizer.MetricsSizer) []Request { var res []Request - for req.ItemsCount() > cfg.MaxSize { - md := extractMetrics(req.md, cfg.MaxSize) - size := md.DataPointCount() - req.setCachedItemsCount(req.ItemsCount() - size) - res = append(res, &metricsRequest{md: md, pusher: req.pusher, cachedItemsCount: size}) + for req.size(sz) > maxSize { + md, rmSize := extractMetrics(req.md, maxSize, sz) + req.setCachedSize(req.size(sz) - rmSize) + res = append(res, newMetricsRequest(md, req.pusher)) } res = append(res, req) - return res, nil + return res } -// extractMetrics extracts metrics from srcMetrics until count of data points is reached. -func extractMetrics(srcMetrics pmetric.Metrics, count int) pmetric.Metrics { +// extractMetrics extracts metrics from srcMetrics until capacity is reached. +func extractMetrics(srcMetrics pmetric.Metrics, capacity int, sz sizer.MetricsSizer) (pmetric.Metrics, int) { destMetrics := pmetric.NewMetrics() + capacityLeft := capacity - sz.MetricsSize(destMetrics) + removedSize := 0 srcMetrics.ResourceMetrics().RemoveIf(func(srcRM pmetric.ResourceMetrics) bool { - if count == 0 { + // If the no more capacity left just return. + if capacityLeft == 0 { return false } - needToExtract := resourceDataPointsCount(srcRM) > count - if needToExtract { - srcRM = extractResourceMetrics(srcRM, count) + rawRlSize := sz.ResourceMetricsSize(srcRM) + rlSize := sz.DeltaSize(rawRlSize) + if rlSize > capacityLeft { + extSrcRM, extRmSize := extractResourceMetrics(srcRM, capacityLeft, sz) + // This cannot make it to exactly 0 for the bytes, + // force it to be 0 since that is the stopping condition. + capacityLeft = 0 + removedSize += extRmSize + // There represents the delta between the delta sizes. + removedSize += rlSize - rawRlSize - (sz.DeltaSize(rawRlSize-extRmSize) - (rawRlSize - extRmSize)) + // It is possible that for the bytes scenario, the extracted field contains no scope metrics. + // Do not add it to the destination if that is the case. + if extSrcRM.ScopeMetrics().Len() > 0 { + extSrcRM.MoveTo(destMetrics.ResourceMetrics().AppendEmpty()) + } + return extSrcRM.ScopeMetrics().Len() != 0 } - count -= resourceDataPointsCount(srcRM) + capacityLeft -= rlSize + removedSize += rlSize srcRM.MoveTo(destMetrics.ResourceMetrics().AppendEmpty()) - return !needToExtract + return true }) - return destMetrics + return destMetrics, removedSize } // extractResourceMetrics extracts resource metrics and returns a new resource metrics with the specified number of data points. -func extractResourceMetrics(srcRM pmetric.ResourceMetrics, count int) pmetric.ResourceMetrics { +func extractResourceMetrics(srcRM pmetric.ResourceMetrics, capacity int, sz sizer.MetricsSizer) (pmetric.ResourceMetrics, int) { destRM := pmetric.NewResourceMetrics() destRM.SetSchemaUrl(srcRM.SchemaUrl()) srcRM.Resource().CopyTo(destRM.Resource()) + // Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size. + capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.ResourceMetricsSize(destRM) + removedSize := 0 srcRM.ScopeMetrics().RemoveIf(func(srcSM pmetric.ScopeMetrics) bool { - if count == 0 { + // If the no more capacity left just return. + if capacityLeft == 0 { return false } - needToExtract := scopeDataPointsCount(srcSM) > count - if needToExtract { - srcSM = extractScopeMetrics(srcSM, count) + rawSmSize := sz.ScopeMetricsSize(srcSM) + smSize := sz.DeltaSize(rawSmSize) + if smSize > capacityLeft { + extSrcSM, extSmSize := extractScopeMetrics(srcSM, capacityLeft, sz) + // This cannot make it to exactly 0 for the bytes, + // force it to be 0 since that is the stopping condition. + capacityLeft = 0 + removedSize += extSmSize + // There represents the delta between the delta sizes. + removedSize += smSize - rawSmSize - (sz.DeltaSize(rawSmSize-extSmSize) - (rawSmSize - extSmSize)) + // It is possible that for the bytes scenario, the extracted field contains no scope metrics. + // Do not add it to the destination if that is the case. + if extSrcSM.Metrics().Len() > 0 { + extSrcSM.MoveTo(destRM.ScopeMetrics().AppendEmpty()) + } + return extSrcSM.Metrics().Len() != 0 } - count -= scopeDataPointsCount(srcSM) + capacityLeft -= smSize + removedSize += smSize srcSM.MoveTo(destRM.ScopeMetrics().AppendEmpty()) - return !needToExtract + return true }) - return destRM + return destRM, removedSize } // extractScopeMetrics extracts scope metrics and returns a new scope metrics with the specified number of data points. -func extractScopeMetrics(srcSM pmetric.ScopeMetrics, count int) pmetric.ScopeMetrics { +func extractScopeMetrics(srcSM pmetric.ScopeMetrics, capacity int, sz sizer.MetricsSizer) (pmetric.ScopeMetrics, int) { destSM := pmetric.NewScopeMetrics() destSM.SetSchemaUrl(srcSM.SchemaUrl()) srcSM.Scope().CopyTo(destSM.Scope()) - srcSM.Metrics().RemoveIf(func(srcMetric pmetric.Metric) bool { - if count == 0 { + // Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size. + capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.ScopeMetricsSize(destSM) + removedSize := 0 + srcSM.Metrics().RemoveIf(func(srcSM pmetric.Metric) bool { + // If the no more capacity left just return. + if capacityLeft == 0 { return false } - needToExtract := metricDataPointCount(srcMetric) > count - if needToExtract { - srcMetric = extractMetricDataPoints(srcMetric, count) + rawRmSize := sz.MetricSize(srcSM) + rmSize := sz.DeltaSize(rawRmSize) + if rmSize > capacityLeft { + extSrcSM, extRmSize := extractMetricDataPoints(srcSM, capacityLeft, sz) + // This cannot make it to exactly 0 for the bytes, + // force it to be 0 since that is the stopping condition. + capacityLeft = 0 + removedSize += extRmSize + // There represents the delta between the delta sizes. + removedSize += rmSize - rawRmSize - (sz.DeltaSize(rawRmSize-extRmSize) - (rawRmSize - extRmSize)) + // It is possible that for the bytes scenario, the extracted field contains no datapoints. + // Do not add it to the destination if that is the case. + if sz.MetricSize(extSrcSM) > 0 { + extSrcSM.MoveTo(destSM.Metrics().AppendEmpty()) + } + return sz.MetricSize(extSrcSM) != 0 } - count -= metricDataPointCount(srcMetric) - srcMetric.MoveTo(destSM.Metrics().AppendEmpty()) - return !needToExtract + capacityLeft -= rmSize + removedSize += rmSize + srcSM.MoveTo(destSM.Metrics().AppendEmpty()) + return true }) - return destSM + return destSM, removedSize } -func extractMetricDataPoints(srcMetric pmetric.Metric, count int) pmetric.Metric { - destMetric := pmetric.NewMetric() +func extractMetricDataPoints(srcMetric pmetric.Metric, capacity int, sz sizer.MetricsSizer) (pmetric.Metric, int) { + var destMetric pmetric.Metric + var removedSize int switch srcMetric.Type() { case pmetric.MetricTypeGauge: - extractGaugeDataPoints(srcMetric.Gauge(), count, destMetric.SetEmptyGauge()) + destMetric, removedSize = extractGaugeDataPoints(srcMetric.Gauge(), capacity, sz) case pmetric.MetricTypeSum: - extractSumDataPoints(srcMetric.Sum(), count, destMetric.SetEmptySum()) + destMetric, removedSize = extractSumDataPoints(srcMetric.Sum(), capacity, sz) case pmetric.MetricTypeHistogram: - extractHistogramDataPoints(srcMetric.Histogram(), count, destMetric.SetEmptyHistogram()) + destMetric, removedSize = extractHistogramDataPoints(srcMetric.Histogram(), capacity, sz) case pmetric.MetricTypeExponentialHistogram: - extractExponentialHistogramDataPoints(srcMetric.ExponentialHistogram(), count, - destMetric.SetEmptyExponentialHistogram()) + destMetric, removedSize = extractExponentialHistogramDataPoints(srcMetric.ExponentialHistogram(), capacity, sz) case pmetric.MetricTypeSummary: - extractSummaryDataPoints(srcMetric.Summary(), count, destMetric.SetEmptySummary()) + destMetric, removedSize = extractSummaryDataPoints(srcMetric.Summary(), capacity, sz) } - return destMetric + return destMetric, removedSize } -func extractGaugeDataPoints(srcGauge pmetric.Gauge, count int, destGauge pmetric.Gauge) { +func extractGaugeDataPoints(srcGauge pmetric.Gauge, capacity int, sz sizer.MetricsSizer) (pmetric.Metric, int) { + m := pmetric.NewMetric() + destGauge := m.SetEmptyGauge() + + // Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size. + capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.MetricSize(m) + removedSize := 0 + srcGauge.DataPoints().RemoveIf(func(srcDP pmetric.NumberDataPoint) bool { - if count == 0 { + // If the no more capacity left just return. + if capacityLeft == 0 { return false } + + rdSize := sz.DeltaSize(sz.NumberDataPointSize(srcDP)) + if rdSize > capacityLeft { + // This cannot make it to exactly 0 for the bytes, + // force it to be 0 since that is the stopping condition. + capacityLeft = 0 + return false + } + capacityLeft -= rdSize + removedSize += rdSize srcDP.MoveTo(destGauge.DataPoints().AppendEmpty()) - count-- return true }) + return m, removedSize } -func extractSumDataPoints(srcSum pmetric.Sum, count int, destSum pmetric.Sum) { +func extractSumDataPoints(srcSum pmetric.Sum, capacity int, sz sizer.MetricsSizer) (pmetric.Metric, int) { + m := pmetric.NewMetric() + destSum := m.SetEmptySum() + // Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size. + capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.MetricSize(m) + removedSize := 0 srcSum.DataPoints().RemoveIf(func(srcDP pmetric.NumberDataPoint) bool { - if count == 0 { + // If the no more capacity left just return. + if capacityLeft == 0 { return false } + + rdSize := sz.DeltaSize(sz.NumberDataPointSize(srcDP)) + if rdSize > capacityLeft { + // This cannot make it to exactly 0 for the bytes, + // force it to be 0 since that is the stopping condition. + capacityLeft = 0 + return false + } + capacityLeft -= rdSize + removedSize += rdSize srcDP.MoveTo(destSum.DataPoints().AppendEmpty()) - count-- return true }) + return m, removedSize } -func extractHistogramDataPoints(srcHistogram pmetric.Histogram, count int, destHistogram pmetric.Histogram) { +func extractHistogramDataPoints(srcHistogram pmetric.Histogram, capacity int, sz sizer.MetricsSizer) (pmetric.Metric, int) { + m := pmetric.NewMetric() + destHistogram := m.SetEmptyHistogram() + // Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size. + capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.MetricSize(m) + removedSize := 0 srcHistogram.DataPoints().RemoveIf(func(srcDP pmetric.HistogramDataPoint) bool { - if count == 0 { + // If the no more capacity left just return. + if capacityLeft == 0 { return false } + + rdSize := sz.DeltaSize(sz.HistogramDataPointSize(srcDP)) + if rdSize > capacityLeft { + // This cannot make it to exactly 0 for the bytes, + // force it to be 0 since that is the stopping condition. + capacityLeft = 0 + return false + } + capacityLeft -= rdSize + removedSize += rdSize srcDP.MoveTo(destHistogram.DataPoints().AppendEmpty()) - count-- return true }) + return m, removedSize } -func extractExponentialHistogramDataPoints(srcExponentialHistogram pmetric.ExponentialHistogram, count int, destExponentialHistogram pmetric.ExponentialHistogram) { +func extractExponentialHistogramDataPoints(srcExponentialHistogram pmetric.ExponentialHistogram, capacity int, sz sizer.MetricsSizer) (pmetric.Metric, int) { + m := pmetric.NewMetric() + destExponentialHistogram := m.SetEmptyExponentialHistogram() + // Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size. + capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.MetricSize(m) + removedSize := 0 srcExponentialHistogram.DataPoints().RemoveIf(func(srcDP pmetric.ExponentialHistogramDataPoint) bool { - if count == 0 { + // If the no more capacity left just return. + if capacityLeft == 0 { return false } + + rdSize := sz.DeltaSize(sz.ExponentialHistogramDataPointSize(srcDP)) + if rdSize > capacityLeft { + // This cannot make it to exactly 0 for the bytes, + // force it to be 0 since that is the stopping condition. + capacityLeft = 0 + return false + } + capacityLeft -= rdSize + removedSize += rdSize srcDP.MoveTo(destExponentialHistogram.DataPoints().AppendEmpty()) - count-- return true }) + return m, removedSize } -func extractSummaryDataPoints(srcSummary pmetric.Summary, count int, destSummary pmetric.Summary) { +func extractSummaryDataPoints(srcSummary pmetric.Summary, capacity int, sz sizer.MetricsSizer) (pmetric.Metric, int) { + m := pmetric.NewMetric() + destSummary := m.SetEmptySummary() + // Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size. + capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.MetricSize(m) + removedSize := 0 srcSummary.DataPoints().RemoveIf(func(srcDP pmetric.SummaryDataPoint) bool { - if count == 0 { + // If the no more capacity left just return. + if capacityLeft == 0 { return false } + + rdSize := sz.DeltaSize(sz.SummaryDataPointSize(srcDP)) + if rdSize > capacityLeft { + // This cannot make it to exactly 0 for the bytes, + // force it to be 0 since that is the stopping condition. + capacityLeft = 0 + return false + } + capacityLeft -= rdSize + removedSize += rdSize srcDP.MoveTo(destSummary.DataPoints().AppendEmpty()) - count-- return true }) -} - -func resourceDataPointsCount(rm pmetric.ResourceMetrics) (count int) { - for i := 0; i < rm.ScopeMetrics().Len(); i++ { - count += scopeDataPointsCount(rm.ScopeMetrics().At(i)) - } - return count -} - -func scopeDataPointsCount(sm pmetric.ScopeMetrics) (count int) { - for i := 0; i < sm.Metrics().Len(); i++ { - count += metricDataPointCount(sm.Metrics().At(i)) - } - return count -} - -func metricDataPointCount(m pmetric.Metric) int { - switch m.Type() { - case pmetric.MetricTypeGauge: - return m.Gauge().DataPoints().Len() - case pmetric.MetricTypeSum: - return m.Sum().DataPoints().Len() - case pmetric.MetricTypeHistogram: - return m.Histogram().DataPoints().Len() - case pmetric.MetricTypeExponentialHistogram: - return m.ExponentialHistogram().DataPoints().Len() - case pmetric.MetricTypeSummary: - return m.Summary().DataPoints().Len() - } - return 0 + return m, removedSize } diff --git a/exporter/exporterhelper/metrics_batch_test.go b/exporter/exporterhelper/metrics_batch_test.go index 385d125d98..77f689c191 100644 --- a/exporter/exporterhelper/metrics_batch_test.go +++ b/exporter/exporterhelper/metrics_batch_test.go @@ -11,10 +11,13 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/testdata" ) +var metricsBytesSizer = &sizer.MetricsBytesSizer{} + func TestMergeMetrics(t *testing.T) { mr1 := newMetricsRequest(testdata.GenerateMetrics(2), nil) mr2 := newMetricsRequest(testdata.GenerateMetrics(3), nil) @@ -32,6 +35,7 @@ func TestMergeMetricsInvalidInput(t *testing.T) { } func TestMergeSplitMetrics(t *testing.T) { + s := sizer.MetricsCountSizer{} tests := []struct { name string cfg exporterbatcher.SizeConfig @@ -124,7 +128,9 @@ func TestMergeSplitMetrics(t *testing.T) { require.NoError(t, err) assert.Equal(t, len(tt.expected), len(res)) for i := range res { - assert.Equal(t, tt.expected[i], res[i]) + expected := tt.expected[i].(*metricsRequest) + actual := res[i].(*metricsRequest) + assert.Equal(t, expected.size(&s), actual.size(&s)) } }) } @@ -148,7 +154,7 @@ func TestMergeSplitMetricsInvalidInput(t *testing.T) { func TestExtractMetrics(t *testing.T) { for i := 0; i < 20; i++ { md := testdata.GenerateMetrics(10) - extractedMetrics := extractMetrics(md, i) + extractedMetrics, _ := extractMetrics(md, i, &sizer.MetricsCountSizer{}) assert.Equal(t, i, extractedMetrics.DataPointCount()) assert.Equal(t, 20-i, md.DataPointCount()) } @@ -156,7 +162,7 @@ func TestExtractMetrics(t *testing.T) { func TestExtractMetricsInvalidMetric(t *testing.T) { md := testdata.GenerateMetricsMetricTypeInvalid() - extractedMetrics := extractMetrics(md, 10) + extractedMetrics, _ := extractMetrics(md, 10, &sizer.MetricsCountSizer{}) assert.Equal(t, testdata.GenerateMetricsMetricTypeInvalid(), extractedMetrics) assert.Equal(t, 0, md.ResourceMetrics().Len()) } @@ -215,3 +221,382 @@ func BenchmarkSplittingBasedOnItemCountHugeMetrics(b *testing.B) { assert.Len(b, merged, 10) } } + +func TestMergeSplitMetricsBasedOnByteSize(t *testing.T) { + s := sizer.MetricsBytesSizer{} + tests := []struct { + name string + cfg exporterbatcher.SizeConfig + mr1 Request + mr2 Request + expectedSizes []int + }{ + { + name: "both_requests_empty", + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: metricsBytesSizer.MetricsSize(testdata.GenerateMetrics(10))}, + mr1: newMetricsRequest(pmetric.NewMetrics(), nil), + mr2: newMetricsRequest(pmetric.NewMetrics(), nil), + expectedSizes: []int{0}, + }, + { + name: "first_request_empty", + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: metricsBytesSizer.MetricsSize(testdata.GenerateMetrics(10))}, + mr1: newMetricsRequest(pmetric.NewMetrics(), nil), + mr2: newMetricsRequest(testdata.GenerateMetrics(5), nil), + expectedSizes: []int{1035}, + }, + { + name: "first_empty_second_nil", + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: metricsBytesSizer.MetricsSize(testdata.GenerateMetrics(10))}, + mr1: newMetricsRequest(pmetric.NewMetrics(), nil), + mr2: nil, + expectedSizes: []int{0}, + }, + { + name: "merge_only", + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: metricsBytesSizer.MetricsSize(testdata.GenerateMetrics(11))}, + mr1: newMetricsRequest(testdata.GenerateMetrics(4), nil), + mr2: newMetricsRequest(testdata.GenerateMetrics(6), nil), + expectedSizes: []int{2102}, + }, + { + name: "split_only", + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: s.MetricsSize(testdata.GenerateMetrics(4))}, + mr1: newMetricsRequest(pmetric.NewMetrics(), nil), + mr2: newMetricsRequest(testdata.GenerateMetrics(10), nil), + expectedSizes: []int{706, 504, 625, 378}, + }, + { + name: "merge_and_split", + cfg: exporterbatcher.SizeConfig{ + Sizer: exporterbatcher.SizerTypeBytes, + MaxSize: metricsBytesSizer.MetricsSize(testdata.GenerateMetrics(10))/2 + metricsBytesSizer.MetricsSize(testdata.GenerateMetrics(11))/2, + }, + mr1: newMetricsRequest(testdata.GenerateMetrics(8), nil), + mr2: newMetricsRequest(testdata.GenerateMetrics(20), nil), + expectedSizes: []int{2107, 2022, 1954, 290}, + }, + { + name: "scope_metrics_split", + cfg: exporterbatcher.SizeConfig{Sizer: exporterbatcher.SizerTypeBytes, MaxSize: metricsBytesSizer.MetricsSize(testdata.GenerateMetrics(4))}, + mr1: newMetricsRequest(func() pmetric.Metrics { + md := testdata.GenerateMetrics(4) + extraScopeMetrics := md.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty() + testdata.GenerateMetrics(4).ResourceMetrics().At(0).ScopeMetrics().At(0).MoveTo(extraScopeMetrics) + extraScopeMetrics.Scope().SetName("extra scope") + return md + }(), nil), + mr2: nil, + expectedSizes: []int{706, 700, 85}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res, err := tt.mr1.MergeSplit(context.Background(), tt.cfg, tt.mr2) + require.NoError(t, err) + assert.Equal(t, len(tt.expectedSizes), len(res)) + for i := range res { + assert.Equal(t, tt.expectedSizes[i], res[i].(*metricsRequest).size(&s)) + } + }) + } +} + +func TestMetricsRequest_MergeSplit_UnknownSizerType(t *testing.T) { + // Create a logs request + req := newMetricsRequest(pmetric.NewMetrics(), nil) + + // Create config with invalid sizer type by using zero value + cfg := exporterbatcher.SizeConfig{ + Sizer: exporterbatcher.SizerType{}, // Empty struct will have empty string as val + } + + // Call MergeSplit with invalid sizer + result, err := req.MergeSplit(context.Background(), cfg, nil) + + // Verify results + assert.Nil(t, result) + assert.EqualError(t, err, "unknown sizer type") +} + +func TestExtractGaugeDataPoints(t *testing.T) { + tests := []struct { + name string + capacity int + numDataPoints int + expectedPoints int + }{ + { + name: "extract_all_points", + capacity: 100, + numDataPoints: 2, + expectedPoints: 2, + }, + { + name: "extract_partial_points", + capacity: 1, + numDataPoints: 2, + expectedPoints: 1, + }, + { + name: "no_capacity", + capacity: 0, + numDataPoints: 2, + expectedPoints: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srcMetric := pmetric.NewMetric() + gauge := srcMetric.SetEmptyGauge() + for i := 0; i < tt.numDataPoints; i++ { + dp := gauge.DataPoints().AppendEmpty() + dp.SetIntValue(int64(i)) + } + + sz := &mockMetricsSizer{dpSize: 1} + + destMetric, removedSize := extractGaugeDataPoints(gauge, tt.capacity, sz) + + assert.Equal(t, tt.expectedPoints, destMetric.Gauge().DataPoints().Len()) + if tt.expectedPoints > 0 { + assert.Equal(t, tt.expectedPoints, removedSize) + } + }) + } +} + +func TestExtractSumDataPoints(t *testing.T) { + tests := []struct { + name string + capacity int + numDataPoints int + expectedPoints int + }{ + { + name: "extract_all_points", + capacity: 100, + numDataPoints: 2, + expectedPoints: 2, + }, + { + name: "extract_partial_points", + capacity: 1, + numDataPoints: 2, + expectedPoints: 1, + }, + { + name: "no_capacity", + capacity: 0, + numDataPoints: 2, + expectedPoints: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srcMetric := pmetric.NewMetric() + sum := srcMetric.SetEmptySum() + for i := 0; i < tt.numDataPoints; i++ { + dp := sum.DataPoints().AppendEmpty() + dp.SetIntValue(int64(i)) + } + + sz := &mockMetricsSizer{dpSize: 1} + + destMetric, removedSize := extractSumDataPoints(sum, tt.capacity, sz) + + assert.Equal(t, tt.expectedPoints, destMetric.Sum().DataPoints().Len()) + if tt.expectedPoints > 0 { + assert.Equal(t, tt.expectedPoints, removedSize) + } + }) + } +} + +func TestExtractHistogramDataPoints(t *testing.T) { + tests := []struct { + name string + capacity int + numDataPoints int + expectedPoints int + }{ + { + name: "extract_all_points", + capacity: 100, + numDataPoints: 2, + expectedPoints: 2, + }, + { + name: "extract_partial_points", + capacity: 1, + numDataPoints: 2, + expectedPoints: 1, + }, + { + name: "no_capacity", + capacity: 0, + numDataPoints: 2, + expectedPoints: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srcMetric := pmetric.NewMetric() + histogram := srcMetric.SetEmptyHistogram() + + for i := 0; i < tt.numDataPoints; i++ { + dp := histogram.DataPoints().AppendEmpty() + dp.SetCount(uint64(i)) //nolint:gosec // disable G115 + } + + sz := &mockMetricsSizer{dpSize: 1} + + destMetric, removedSize := extractHistogramDataPoints(histogram, tt.capacity, sz) + + assert.Equal(t, tt.expectedPoints, destMetric.Histogram().DataPoints().Len()) + if tt.expectedPoints > 0 { + assert.Equal(t, tt.expectedPoints, removedSize) + } + }) + } +} + +func TestExtractExponentialHistogramDataPoints(t *testing.T) { + tests := []struct { + name string + capacity int + numDataPoints int + expectedPoints int + }{ + { + name: "extract_all_points", + capacity: 100, + numDataPoints: 2, + expectedPoints: 2, + }, + { + name: "extract_partial_points", + capacity: 1, + numDataPoints: 2, + expectedPoints: 1, + }, + { + name: "no_capacity", + capacity: 0, + numDataPoints: 2, + expectedPoints: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srcMetric := pmetric.NewMetric() + expHistogram := srcMetric.SetEmptyExponentialHistogram() + for i := 0; i < tt.numDataPoints; i++ { + dp := expHistogram.DataPoints().AppendEmpty() + dp.SetCount(uint64(i)) //nolint:gosec // disable G115 + } + + sz := &mockMetricsSizer{dpSize: 1} + + destMetric, removedSize := extractExponentialHistogramDataPoints(expHistogram, tt.capacity, sz) + + assert.Equal(t, tt.expectedPoints, destMetric.ExponentialHistogram().DataPoints().Len()) + if tt.expectedPoints > 0 { + assert.Equal(t, tt.expectedPoints, removedSize) + } + }) + } +} + +func TestExtractSummaryDataPoints(t *testing.T) { + tests := []struct { + name string + capacity int + numDataPoints int + expectedPoints int + }{ + { + name: "extract_all_points", + capacity: 100, + numDataPoints: 2, + expectedPoints: 2, + }, + { + name: "extract_partial_points", + capacity: 1, + numDataPoints: 2, + expectedPoints: 1, + }, + { + name: "no_capacity", + capacity: 0, + numDataPoints: 2, + expectedPoints: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srcMetric := pmetric.NewMetric() + summary := srcMetric.SetEmptySummary() + for i := 0; i < tt.numDataPoints; i++ { + dp := summary.DataPoints().AppendEmpty() + dp.SetCount(uint64(i)) //nolint:gosec // disable G115 + } + + sz := &mockMetricsSizer{dpSize: 1} + + destMetric, removedSize := extractSummaryDataPoints(summary, tt.capacity, sz) + + assert.Equal(t, tt.expectedPoints, destMetric.Summary().DataPoints().Len()) + if tt.expectedPoints > 0 { + assert.Equal(t, tt.expectedPoints, removedSize) + } + }) + } +} + +// mockMetricsSizer implements sizer.MetricsSizer interface for testing +type mockMetricsSizer struct { + dpSize int +} + +func (m *mockMetricsSizer) MetricsSize(_ pmetric.Metrics) int { + return 0 +} + +func (m *mockMetricsSizer) MetricSize(_ pmetric.Metric) int { + return 0 +} + +func (m *mockMetricsSizer) NumberDataPointSize(_ pmetric.NumberDataPoint) int { + return m.dpSize +} + +func (m *mockMetricsSizer) HistogramDataPointSize(_ pmetric.HistogramDataPoint) int { + return m.dpSize +} + +func (m *mockMetricsSizer) ExponentialHistogramDataPointSize(_ pmetric.ExponentialHistogramDataPoint) int { + return m.dpSize +} + +func (m *mockMetricsSizer) SummaryDataPointSize(_ pmetric.SummaryDataPoint) int { + return m.dpSize +} + +func (m *mockMetricsSizer) ResourceMetricsSize(_ pmetric.ResourceMetrics) int { + return 0 +} + +func (m *mockMetricsSizer) ScopeMetricsSize(_ pmetric.ScopeMetrics) int { + return 0 +} + +func (m *mockMetricsSizer) DeltaSize(size int) int { + return size +} diff --git a/pdata/pmetric/pb.go b/pdata/pmetric/pb.go index 580f555d7a..775a96f6a7 100644 --- a/pdata/pmetric/pb.go +++ b/pdata/pmetric/pb.go @@ -22,6 +22,34 @@ func (e *ProtoMarshaler) MetricsSize(md Metrics) int { return pb.Size() } +func (e *ProtoMarshaler) ResourceMetricsSize(rm ResourceMetrics) int { + return rm.orig.Size() +} + +func (e *ProtoMarshaler) ScopeMetricsSize(sm ScopeMetrics) int { + return sm.orig.Size() +} + +func (e *ProtoMarshaler) MetricSize(m Metric) int { + return m.orig.Size() +} + +func (e *ProtoMarshaler) NumberDataPointSize(ndp NumberDataPoint) int { + return ndp.orig.Size() +} + +func (e *ProtoMarshaler) SummaryDataPointSize(sdps SummaryDataPoint) int { + return sdps.orig.Size() +} + +func (e *ProtoMarshaler) HistogramDataPointSize(hdp HistogramDataPoint) int { + return hdp.orig.Size() +} + +func (e *ProtoMarshaler) ExponentialHistogramDataPointSize(ehdp ExponentialHistogramDataPoint) int { + return ehdp.orig.Size() +} + type ProtoUnmarshaler struct{} func (d *ProtoUnmarshaler) UnmarshalMetrics(buf []byte) (Metrics, error) {