[chore] Add manual AnyValue unmarshal to allow using memory pools (#13687)

Updates
https://github.com/open-telemetry/opentelemetry-collector/issues/13631

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
Bogdan Drutu 2025-08-22 11:57:14 -07:00 committed by GitHub
parent d09cfe5180
commit 1a001d98c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 171 additions and 9 deletions

View File

@ -5,10 +5,12 @@ package internal // import "go.opentelemetry.io/collector/pdata/internal"
import (
"fmt"
"math"
"sync"
otlpcommon "go.opentelemetry.io/collector/pdata/internal/data/protogen/common/v1"
"go.opentelemetry.io/collector/pdata/internal/json"
"go.opentelemetry.io/collector/pdata/internal/proto"
)
type Value struct {
@ -108,7 +110,7 @@ func NewOrigAnyValueArrayValue() *otlpcommon.AnyValue_ArrayValue {
return protoPoolAnyValueArrayValue.Get().(*otlpcommon.AnyValue_ArrayValue)
}
func NewOrigAnyValueKeyValueList() *otlpcommon.AnyValue_KvlistValue {
func NewOrigAnyValueKvlistValue() *otlpcommon.AnyValue_KvlistValue {
if !UseProtoPooling.IsEnabled() {
return &otlpcommon.AnyValue_KvlistValue{}
}
@ -196,7 +198,7 @@ func CopyOrigAnyValue(dest, src *otlpcommon.AnyValue) {
dv.BytesValue = make([]byte, len(sv.BytesValue))
copy(dv.BytesValue, sv.BytesValue)
case *otlpcommon.AnyValue_KvlistValue:
dv := NewOrigAnyValueKeyValueList()
dv := NewOrigAnyValueKvlistValue()
dest.Value = dv
if sv.KvlistValue == nil {
return
@ -277,7 +279,7 @@ func UnmarshalJSONOrigAnyValue(orig *otlpcommon.AnyValue, iter *json.Iterator) {
UnmarshalJSONOrigArrayValue(ov.ArrayValue, iter)
orig.Value = ov
case "kvlistValue", "kvlist_value":
ov := NewOrigAnyValueKeyValueList()
ov := NewOrigAnyValueKvlistValue()
ov.KvlistValue = NewOrigKeyValueList()
UnmarshalJSONOrigKeyValueList(ov.KvlistValue, iter)
orig.Value = ov
@ -305,5 +307,129 @@ func MarshalProtoOrigAnyValue(orig *otlpcommon.AnyValue, buf []byte) int {
}
func UnmarshalProtoOrigAnyValue(orig *otlpcommon.AnyValue, buf []byte) error {
return orig.Unmarshal(buf)
var err error
var fieldNum int32
var wireType proto.WireType
l := len(buf)
pos := 0
for pos < l {
// If in a group parsing, move to the next tag.
fieldNum, wireType, pos, err = proto.ConsumeTag(buf, pos)
if err != nil {
return err
}
switch fieldNum {
case 1:
if wireType != proto.WireTypeLen {
return fmt.Errorf("proto: wrong wireType = %d for field StringValue", wireType)
}
var length int
length, pos, err = proto.ConsumeLen(buf, pos)
if err != nil {
return err
}
startPos := pos - length
ov := NewOrigAnyValueStringValue()
ov.StringValue = string(buf[startPos:pos])
orig.Value = ov
case 2:
if wireType != proto.WireTypeVarint {
return fmt.Errorf("proto: wrong wireType = %d for field BoolValue", wireType)
}
var num uint64
num, pos, err = proto.ConsumeVarint(buf, pos)
if err != nil {
return err
}
ov := NewOrigAnyValueBoolValue()
ov.BoolValue = num != 0
orig.Value = ov
case 3:
if wireType != proto.WireTypeVarint {
return fmt.Errorf("proto: wrong wireType = %d for field IntValue", wireType)
}
var num uint64
num, pos, err = proto.ConsumeVarint(buf, pos)
if err != nil {
return err
}
ov := NewOrigAnyValueIntValue()
ov.IntValue = int64(num) //nolint:gosec // G115
orig.Value = ov
case 4:
if wireType != proto.WireTypeI64 {
return fmt.Errorf("proto: wrong wireType = %d for field DoubleValue", wireType)
}
var num uint64
num, pos, err = proto.ConsumeI64(buf, pos)
if err != nil {
return err
}
ov := NewOrigAnyValueDoubleValue()
ov.DoubleValue = math.Float64frombits(num)
orig.Value = ov
case 5:
if wireType != proto.WireTypeLen {
return fmt.Errorf("proto: wrong wireType = %d for field ArrayValue", wireType)
}
var length int
length, pos, err = proto.ConsumeLen(buf, pos)
if err != nil {
return err
}
startPos := pos - length
ov := NewOrigAnyValueArrayValue()
ov.ArrayValue = NewOrigArrayValue()
err = UnmarshalProtoOrigArrayValue(ov.ArrayValue, buf[startPos:pos])
if err != nil {
return err
}
orig.Value = ov
case 6:
if wireType != proto.WireTypeLen {
return fmt.Errorf("proto: wrong wireType = %d for field KvlistValue", wireType)
}
var length int
length, pos, err = proto.ConsumeLen(buf, pos)
if err != nil {
return err
}
startPos := pos - length
ov := NewOrigAnyValueKvlistValue()
ov.KvlistValue = NewOrigKeyValueList()
err = UnmarshalProtoOrigKeyValueList(ov.KvlistValue, buf[startPos:pos])
if err != nil {
return err
}
orig.Value = ov
case 7:
if wireType != proto.WireTypeLen {
return fmt.Errorf("proto: wrong wireType = %d for field BytesValue", wireType)
}
var length int
length, pos, err = proto.ConsumeLen(buf, pos)
if err != nil {
return err
}
startPos := pos - length
ov := NewOrigAnyValueBytesValue()
ov.BytesValue = make([]byte, length)
copy(ov.BytesValue, buf[startPos:pos])
orig.Value = ov
default:
pos, err = proto.ConsumeUnknown(buf, pos, wireType)
if err != nil {
return err
}
}
}
return nil
}

View File

@ -84,7 +84,7 @@ func TestCopyOrigAnyValueAllTypes(t *testing.T) {
}
}
func TestMarshalAndUnmarshalJSONOrigAnyValueAllTypes(t *testing.T) {
func TestMarshalAndUnmarshalJSONOrigAnyValue(t *testing.T) {
for name, src := range allAnyValues() {
for _, pooling := range []bool{true, false} {
t.Run(name+"pooling_"+strconv.FormatBool(pooling), func(t *testing.T) {
@ -112,7 +112,23 @@ func TestMarshalAndUnmarshalJSONOrigAnyValueAllTypes(t *testing.T) {
}
}
func TestMarshalAndUnmarshalProtoAnyValueAllTypes(t *testing.T) {
func TestMarshalAndUnmarshalProtoAnyValueUnknown(t *testing.T) {
dest := NewOrigAnyValue()
// message Test { required int64 field = 1313; } encoding { "field": "1234" }
require.NoError(t, UnmarshalProtoOrigAnyValue(dest, []byte{0x88, 0x52, 0xD2, 0x09}))
assert.Equal(t, NewOrigAnyValue(), dest)
}
func TestMarshalAndUnmarshalProtoOrigAnyValueFailing(t *testing.T) {
for name, buf := range genTestFailingUnmarshalProtoValuesAnyValue() {
t.Run(name, func(t *testing.T) {
dest := NewOrigAnyValue()
require.Error(t, UnmarshalProtoOrigAnyValue(dest, buf))
})
}
}
func TestMarshalAndUnmarshalProtoAnyValue(t *testing.T) {
for name, src := range allAnyValues() {
for _, pooling := range []bool{true, false} {
t.Run(name+"pooling_"+strconv.FormatBool(pooling), func(t *testing.T) {
@ -136,6 +152,26 @@ func TestMarshalAndUnmarshalProtoAnyValueAllTypes(t *testing.T) {
}
}
func genTestFailingUnmarshalProtoValuesAnyValue() map[string][]byte {
return map[string][]byte{
"invalid_field": {0x02},
"StringValue/wrong_wire_type": {0xc},
"StringValue/missing_value": {0xa},
"BoolValue/wrong_wire_type": {0x14},
"BoolValue/missing_value": {0x10},
"IntValue/wrong_wire_type": {0x1c},
"IntValue/missing_value": {0x18},
"DoubleValue/wrong_wire_type": {0x24},
"DoubleValue/missing_value": {0x21},
"ArrayValue/wrong_wire_type": {0x2c},
"ArrayValue/missing_value": {0x2a},
"KvlistValue/wrong_wire_type": {0x34},
"KvlistValue/missing_value": {0x30},
"BytesValue/wrong_wire_type": {0x3c},
"BytesValue/missing_value": {0x3a},
}
}
func allAnyValues() map[string]*otlpcommon.AnyValue {
return map[string]*otlpcommon.AnyValue{
"empty": {Value: nil},

View File

@ -194,7 +194,7 @@ func (m Map) PutEmptyMap(k string) Map {
if av, existing := m.Get(k); existing {
return av.SetEmptyMap()
}
ov := internal.NewOrigAnyValueKeyValueList()
ov := internal.NewOrigAnyValueKvlistValue()
ov.KvlistValue = internal.NewOrigKeyValueList()
*m.getOrig() = append(*m.getOrig(), otlpcommon.KeyValue{Key: k, Value: otlpcommon.AnyValue{Value: ov}})
return Map(internal.NewMap(&ov.KvlistValue.Values, m.getState()))

View File

@ -112,7 +112,7 @@ func NewValueBool(v bool) Value {
// NewValueMap creates a new Value of map type.
func NewValueMap() Value {
ov := internal.NewOrigAnyValueKeyValueList()
ov := internal.NewOrigAnyValueKvlistValue()
ov.KvlistValue = internal.NewOrigKeyValueList()
orig := internal.NewOrigAnyValue()
orig.Value = ov
@ -343,7 +343,7 @@ func (v Value) SetEmptyMap() Map {
v.getState().AssertMutable()
// Delete everything but the AnyValue object itself.
internal.DeleteOrigAnyValue(v.getOrig(), false)
ov := internal.NewOrigAnyValueKeyValueList()
ov := internal.NewOrigAnyValueKvlistValue()
ov.KvlistValue = internal.NewOrigKeyValueList()
v.getOrig().Value = ov
return newMap(&ov.KvlistValue.Values, v.getState())