fix: support multiple amqp data fields

Ref: https://github.com/cloudevents/spec/issues/1275
Signed-off-by: Michael Gasch <15986659+embano1@users.noreply.github.com>
This commit is contained in:
Michael Gasch 2024-04-14 16:35:45 +02:00
parent 20bfd0a38d
commit 4cafcc3533
2 changed files with 74 additions and 4 deletions

View File

@ -50,8 +50,10 @@ func NewMessage(message *amqp.Message, receiver *amqp.Receiver) *Message {
return &Message{AMQP: message, AMQPrcv: receiver, format: fmt, version: vn}
}
var _ binding.Message = (*Message)(nil)
var _ binding.MessageMetadataReader = (*Message)(nil)
var (
_ binding.Message = (*Message)(nil)
_ binding.MessageMetadataReader = (*Message)(nil)
)
func getSpecVersion(message *amqp.Message) spec.Version {
if sv, ok := message.ApplicationProperties[specs.PrefixedSpecVersionName()]; ok {
@ -74,7 +76,8 @@ func (m *Message) ReadEncoding() binding.Encoding {
func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error {
if m.format != nil {
return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(m.AMQP.GetData()))
data := m.getAmqpData()
return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(data))
}
return binding.ErrNotStructured
}
@ -106,7 +109,7 @@ func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter)
}
}
data := m.AMQP.GetData()
data := m.getAmqpData()
if len(data) != 0 { // Some data
err = encoder.SetData(bytes.NewBuffer(data))
if err != nil {
@ -137,3 +140,15 @@ func (m *Message) Finish(err error) error {
}
return m.AMQPrcv.AcceptMessage(context.Background(), m.AMQP)
}
// fixes: github.com/cloudevents/spec/issues/1275
func (m *Message) getAmqpData() []byte {
var data []byte
amqpData := m.AMQP.Data
// TODO: replace with slices.Concat once go mod bumped to 1.22
for idx := range amqpData {
data = append(data, amqpData[idx]...)
}
return data
}

View File

@ -62,3 +62,58 @@ func TestNewMessage_message_unknown(t *testing.T) {
got := NewMessage(message, &rcv)
require.Equal(t, binding.EncodingUnknown, got.ReadEncoding())
}
func TestMessage_getAmqpData(t *testing.T) {
tests := []struct {
name string
message *amqp.Message
want []byte
}{
{
name: "nil data",
message: amqp.NewMessage(nil),
want: nil,
},
{
name: "empty string",
message: amqp.NewMessage([]byte(`""`)),
want: []byte(`""`),
},
{
name: "simple string",
message: amqp.NewMessage([]byte("hello world")),
want: []byte("hello world"),
},
{
name: "multiple data with simple strings",
message: &amqp.Message{Data: [][]byte{
[]byte("hello"),
[]byte(" "),
[]byte("world"),
}},
want: []byte("hello world"),
},
{
name: "multiple data to build JSON array",
message: &amqp.Message{Data: [][]byte{
[]byte("["),
[]byte("Foo"),
[]byte(","),
[]byte("Bar"),
[]byte(","),
[]byte("Baz"),
[]byte("]"),
}},
want: []byte("[Foo,Bar,Baz]"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &Message{
AMQP: tt.message,
}
got := m.getAmqpData()
require.Equal(t, tt.want, got)
})
}
}