all: remove OtlpProtoSize in favor of Sizer interface (#3818)

* all: remove OtlpProtoSize in favor of Sizer interface

* add {Metrics|Traces|Logs}Sizer, fix test commenting

* move size tests to pb_test, fix metrics+logs parameter names

* uncommented batch_processor tests + adjusted them to use *Sizer

* I forgot BenchmarkTraceSizeBytes :(

* Add docs for NewProtobof*Sizer + update docs for NewProtobuf*Marshaler

* cast *Marshaler to *Sizer for now

* add casts to batch_processor_test
This commit is contained in:
Nathan Dias 2021-08-17 13:10:33 -05:00 committed by GitHub
parent 9cafb5dcda
commit aa450a0bf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 118 additions and 68 deletions

View File

@ -19,27 +19,32 @@ import (
"go.opentelemetry.io/collector/model/pdata"
)
// NewProtobufTracesMarshaler returns a model.TracesMarshaler. Marshals to OTLP binary protobuf bytes.
// NewProtobufTracesMarshaler returns a pdata.TracesMarshaler. Marshals to OTLP binary protobuf bytes.
func NewProtobufTracesMarshaler() pdata.TracesMarshaler {
return newPbMarshaler()
}
// NewProtobufMetricsMarshaler returns a model.MetricsMarshaler. Marshals to OTLP binary protobuf bytes.
// NewProtobufMetricsMarshaler returns a pdata.MetricsMarshaler. Marshals to OTLP binary protobuf bytes.
func NewProtobufMetricsMarshaler() pdata.MetricsMarshaler {
return newPbMarshaler()
}
// NewProtobufLogsMarshaler returns a model.LogsMarshaler. Marshals to OTLP binary protobuf bytes.
// NewProtobufLogsMarshaler returns a pdata.LogsMarshaler. Marshals to OTLP binary protobuf bytes.
func NewProtobufLogsMarshaler() pdata.LogsMarshaler {
return newPbMarshaler()
}
// TODO(#3842): Figure out how we want to represent/return *Sizers.
type pbMarshaler struct{}
func newPbMarshaler() *pbMarshaler {
return &pbMarshaler{}
}
var _ pdata.TracesSizer = (*pbMarshaler)(nil)
var _ pdata.MetricsSizer = (*pbMarshaler)(nil)
var _ pdata.LogsSizer = (*pbMarshaler)(nil)
func (e *pbMarshaler) MarshalLogs(ld pdata.Logs) ([]byte, error) {
return internal.LogsToOtlp(ld.InternalRep()).Marshal()
}
@ -51,3 +56,15 @@ func (e *pbMarshaler) MarshalMetrics(md pdata.Metrics) ([]byte, error) {
func (e *pbMarshaler) MarshalTraces(td pdata.Traces) ([]byte, error) {
return internal.TracesToOtlp(td.InternalRep()).Marshal()
}
func (e *pbMarshaler) TracesSize(td pdata.Traces) int {
return internal.TracesToOtlp(td.InternalRep()).Size()
}
func (e *pbMarshaler) MetricsSize(md pdata.Metrics) int {
return internal.MetricsToOtlp(md.InternalRep()).Size()
}
func (e *pbMarshaler) LogsSize(ld pdata.Logs) int {
return internal.LogsToOtlp(ld.InternalRep()).Size()
}

View File

@ -42,6 +42,65 @@ func TestProtobufTracesUnmarshaler_error(t *testing.T) {
assert.Error(t, err)
}
func TestProtobufTracesSizer(t *testing.T) {
sizer := NewProtobufTracesMarshaler().(pdata.TracesSizer)
marshaler := NewProtobufTracesMarshaler()
td := pdata.NewTraces()
rms := td.ResourceSpans()
rms.AppendEmpty().InstrumentationLibrarySpans().AppendEmpty().Spans().AppendEmpty().SetName("foo")
size := sizer.TracesSize(td)
bytes, err := marshaler.MarshalTraces(td)
require.NoError(t, err)
assert.Equal(t, len(bytes), size)
}
func TestProtobufTracesSizer_withNil(t *testing.T) {
sizer := NewProtobufTracesMarshaler().(pdata.TracesSizer)
assert.Equal(t, 0, sizer.TracesSize(pdata.NewTraces()))
}
func TestProtobufMetricsSizer(t *testing.T) {
sizer := NewProtobufMetricsMarshaler().(pdata.MetricsSizer)
marshaler := NewProtobufMetricsMarshaler()
md := pdata.NewMetrics()
md.ResourceMetrics().AppendEmpty().InstrumentationLibraryMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("foo")
size := sizer.MetricsSize(md)
bytes, err := marshaler.MarshalMetrics(md)
require.NoError(t, err)
assert.Equal(t, len(bytes), size)
}
func TestProtobufMetricsSizer_withNil(t *testing.T) {
sizer := NewProtobufMetricsMarshaler().(pdata.MetricsSizer)
assert.Equal(t, 0, sizer.MetricsSize(pdata.NewMetrics()))
}
func TestProtobufLogsSizer(t *testing.T) {
sizer := NewProtobufLogsMarshaler().(pdata.LogsSizer)
marshaler := NewProtobufLogsMarshaler()
ld := pdata.NewLogs()
ld.ResourceLogs().AppendEmpty().InstrumentationLibraryLogs().AppendEmpty().Logs().AppendEmpty().SetName("foo")
size := sizer.LogsSize(ld)
bytes, err := marshaler.MarshalLogs(ld)
require.NoError(t, err)
assert.Equal(t, len(bytes), size)
}
func TestProtobufLogsSizer_withNil(t *testing.T) {
sizer := NewProtobufLogsMarshaler().(pdata.LogsSizer)
assert.Equal(t, 0, sizer.LogsSize(pdata.NewLogs()))
}
func BenchmarkLogsToProtobuf(b *testing.B) {
marshaler := NewProtobufLogsMarshaler()
logs := generateBenchmarkLogs(128)

View File

@ -24,7 +24,7 @@ import (
type LogsMarshaler interface {
// MarshalLogs the given pdata.Logs into bytes.
// If the error is not nil, the returned bytes slice cannot be used.
MarshalLogs(td Logs) ([]byte, error)
MarshalLogs(ld Logs) ([]byte, error)
}
// LogsUnmarshaler unmarshalls bytes into pdata.Logs.
@ -34,6 +34,12 @@ type LogsUnmarshaler interface {
UnmarshalLogs(buf []byte) (Logs, error)
}
// LogsSizer returns the size of a Logs.
type LogsSizer interface {
// LogsSize returns the size in bytes of a Logs.
LogsSize(ld Logs) int
}
// Logs is the top-level struct that is propagated through the logs pipeline.
//
// This is a reference type (like builtin map).
@ -85,12 +91,6 @@ func (ld Logs) LogRecordCount() int {
return logCount
}
// OtlpProtoSize returns the size in bytes of this Logs encoded as OTLP Collector
// ExportLogsServiceRequest ProtoBuf bytes.
func (ld Logs) OtlpProtoSize() int {
return ld.orig.Size()
}
// ResourceLogs returns the ResourceLogsSlice associated with this Logs.
func (ld Logs) ResourceLogs() ResourceLogsSlice {
return newResourceLogsSlice(&ld.orig.ResourceLogs)

View File

@ -24,7 +24,7 @@ import (
type MetricsMarshaler interface {
// MarshalMetrics the given pdata.Metrics into bytes.
// If the error is not nil, the returned bytes slice cannot be used.
MarshalMetrics(td Metrics) ([]byte, error)
MarshalMetrics(md Metrics) ([]byte, error)
}
// MetricsUnmarshaler unmarshalls bytes into pdata.Metrics.
@ -34,6 +34,12 @@ type MetricsUnmarshaler interface {
UnmarshalMetrics(buf []byte) (Metrics, error)
}
// MetricsSizer returns the size of a Metrics.
type MetricsSizer interface {
// LogsSize returns the size in bytes of a Metrics.
MetricsSize(md Metrics) int
}
// Metrics is an opaque interface that allows transition to the new internal Metrics data, but also facilitates the
// transition to the new components, especially for traces.
//
@ -87,12 +93,6 @@ func (md Metrics) MetricCount() int {
return metricCount
}
// OtlpProtoSize returns the size in bytes of this Metrics encoded as OTLP Collector
// ExportMetricsServiceRequest ProtoBuf bytes.
func (md Metrics) OtlpProtoSize() int {
return md.orig.Size()
}
// DataPointCount calculates the total number of data points.
func (md Metrics) DataPointCount() (dataPointCount int) {
rms := md.ResourceMetrics()

View File

@ -19,7 +19,6 @@ import (
gogoproto "github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
goproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
@ -148,22 +147,6 @@ func TestMetricCount(t *testing.T) {
assert.EqualValues(t, 6, md.MetricCount())
}
func TestMetricsSize(t *testing.T) {
assert.Equal(t, 0, NewMetrics().OtlpProtoSize())
md := generateMetricsEmptyDataPoints()
orig := md.orig
size := orig.Size()
bytes, err := orig.Marshal()
require.NoError(t, err)
assert.Equal(t, size, md.OtlpProtoSize())
assert.Equal(t, len(bytes), md.OtlpProtoSize())
}
func TestMetricsSizeWithNil(t *testing.T) {
assert.Equal(t, 0, NewMetrics().OtlpProtoSize())
}
func TestMetricCountWithEmpty(t *testing.T) {
assert.EqualValues(t, 0, generateMetricsEmptyResource().MetricCount())
assert.EqualValues(t, 0, generateMetricsEmptyInstrumentation().MetricCount())

View File

@ -34,6 +34,12 @@ type TracesUnmarshaler interface {
UnmarshalTraces(buf []byte) (Traces, error)
}
// TracesSizer returns the size of a Traces.
type TracesSizer interface {
// TracesSize returns the size in bytes of a Traces.
TracesSize(td Traces) int
}
// Traces is the top-level struct that is propagated through the traces pipeline.
type Traces struct {
orig *otlpcollectortrace.ExportTraceServiceRequest
@ -77,12 +83,6 @@ func (td Traces) SpanCount() int {
return spanCount
}
// OtlpProtoSize returns the size in bytes of this Traces encoded as OTLP Collector
// ExportTraceServiceRequest ProtoBuf bytes.
func (td Traces) OtlpProtoSize() int {
return td.orig.Size()
}
// ResourceSpans returns the ResourceSpansSlice associated with this Metrics.
func (td Traces) ResourceSpans() ResourceSpansSlice {
return newResourceSpansSlice(&td.orig.ResourceSpans)

View File

@ -19,7 +19,6 @@ import (
gogoproto "github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
goproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
@ -52,23 +51,6 @@ func TestSpanCount(t *testing.T) {
assert.EqualValues(t, 6, md.SpanCount())
}
func TestTracesSize(t *testing.T) {
assert.Equal(t, 0, NewTraces().OtlpProtoSize())
td := NewTraces()
rms := td.ResourceSpans()
rms.AppendEmpty().InstrumentationLibrarySpans().AppendEmpty().Spans().AppendEmpty().SetName("foo")
orig := td.orig
size := orig.Size()
bytes, err := orig.Marshal()
require.NoError(t, err)
assert.Equal(t, size, td.OtlpProtoSize())
assert.Equal(t, len(bytes), td.OtlpProtoSize())
}
func TestTracesSizeWithNil(t *testing.T) {
assert.Equal(t, 0, NewTraces().OtlpProtoSize())
}
func TestSpanCountWithEmpty(t *testing.T) {
assert.EqualValues(t, 0, Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{{}},

View File

@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/otlp"
"go.opentelemetry.io/collector/model/pdata"
)
@ -223,10 +224,11 @@ type batchTraces struct {
nextConsumer consumer.Traces
traceData pdata.Traces
spanCount int
sizer pdata.TracesSizer
}
func newBatchTraces(nextConsumer consumer.Traces) *batchTraces {
return &batchTraces{nextConsumer: nextConsumer, traceData: pdata.NewTraces()}
return &batchTraces{nextConsumer: nextConsumer, traceData: pdata.NewTraces(), sizer: otlp.NewProtobufTracesMarshaler().(pdata.TracesSizer)}
}
// add updates current batchTraces by adding new TraceData object
@ -259,17 +261,18 @@ func (bt *batchTraces) itemCount() int {
}
func (bt *batchTraces) size() int {
return bt.traceData.OtlpProtoSize()
return bt.sizer.TracesSize(bt.traceData)
}
type batchMetrics struct {
nextConsumer consumer.Metrics
metricData pdata.Metrics
dataPointCount int
sizer pdata.MetricsSizer
}
func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
return &batchMetrics{nextConsumer: nextConsumer, metricData: pdata.NewMetrics()}
return &batchMetrics{nextConsumer: nextConsumer, metricData: pdata.NewMetrics(), sizer: otlp.NewProtobufMetricsMarshaler().(pdata.MetricsSizer)}
}
func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int) error {
@ -290,7 +293,7 @@ func (bm *batchMetrics) itemCount() int {
}
func (bm *batchMetrics) size() int {
return bm.metricData.OtlpProtoSize()
return bm.sizer.MetricsSize(bm.metricData)
}
func (bm *batchMetrics) add(item interface{}) {
@ -308,10 +311,11 @@ type batchLogs struct {
nextConsumer consumer.Logs
logData pdata.Logs
logCount int
sizer pdata.LogsSizer
}
func newBatchLogs(nextConsumer consumer.Logs) *batchLogs {
return &batchLogs{nextConsumer: nextConsumer, logData: pdata.NewLogs()}
return &batchLogs{nextConsumer: nextConsumer, logData: pdata.NewLogs(), sizer: otlp.NewProtobufLogsMarshaler().(pdata.LogsSizer)}
}
func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int) error {
@ -332,7 +336,7 @@ func (bl *batchLogs) itemCount() int {
}
func (bl *batchLogs) size() int {
return bl.logData.OtlpProtoSize()
return bl.sizer.LogsSize(bl.logData)
}
func (bl *batchLogs) add(item interface{}) {

View File

@ -31,6 +31,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/model/otlp"
"go.opentelemetry.io/collector/model/pdata"
)
@ -119,6 +120,7 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
}
func TestBatchProcessorSentBySize(t *testing.T) {
sizer := otlp.NewProtobufTracesMarshaler().(pdata.TracesSizer)
views := MetricViews()
require.NoError(t, view.Register(views...))
defer view.Unregister(views...)
@ -140,7 +142,7 @@ func TestBatchProcessorSentBySize(t *testing.T) {
sizeSum := 0
for requestNum := 0; requestNum < requestCount; requestNum++ {
td := testdata.GenerateTracesManySpansSameResource(spansPerRequest)
sizeSum += td.OtlpProtoSize()
sizeSum += sizer.TracesSize(td)
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
}
@ -306,6 +308,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
}
func TestBatchMetricProcessor_BatchSize(t *testing.T) {
sizer := otlp.NewProtobufMetricsMarshaler().(pdata.MetricsSizer)
views := MetricViews()
require.NoError(t, view.Register(views...))
defer view.Unregister(views...)
@ -333,7 +336,7 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
size := 0
for requestNum := 0; requestNum < requestCount; requestNum++ {
md := testdata.GenerateMetricsManyMetricsSameResource(metricsPerRequest)
size += md.OtlpProtoSize()
size += sizer.MetricsSize(md)
assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md))
}
require.NoError(t, batcher.Shutdown(context.Background()))
@ -508,9 +511,10 @@ func getTestMetricName(requestNum, index int) string {
}
func BenchmarkTraceSizeBytes(b *testing.B) {
sizer := otlp.NewProtobufTracesMarshaler().(pdata.TracesSizer)
td := testdata.GenerateTracesManySpansSameResource(8192)
for n := 0; n < b.N; n++ {
fmt.Println(td.OtlpProtoSize())
fmt.Println(sizer.TracesSize(td))
}
}
@ -620,6 +624,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) {
}
func TestBatchLogProcessor_BatchSize(t *testing.T) {
sizer := otlp.NewProtobufLogsMarshaler().(pdata.LogsSizer)
views := MetricViews()
require.NoError(t, view.Register(views...))
defer view.Unregister(views...)
@ -645,7 +650,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) {
size := 0
for requestNum := 0; requestNum < requestCount; requestNum++ {
ld := testdata.GenerateLogsManyLogRecordsSameResource(logsPerRequest)
size += ld.OtlpProtoSize()
size += sizer.LogsSize(ld)
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
}
require.NoError(t, batcher.Shutdown(context.Background()))