Addressed issue in Kafka-pubsub for avro null messages (#3531)

Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>
This commit is contained in:
Patrick Assuied 2024-09-04 21:16:20 -07:00 committed by GitHub
parent b6a5e80315
commit e5322262f6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 15 additions and 4 deletions

View File

@ -14,6 +14,7 @@ limitations under the License.
package kafka
import (
"bytes"
"context"
"encoding/binary"
"errors"
@ -263,7 +264,9 @@ func getSchemaSubject(topic string) string {
}
func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config SubscriptionHandlerConfig) ([]byte, error) {
// Null Data is valid and a tombstone record. It shouldn't be serialized
// Null Data is valid and a tombstone record.
// It shouldn't be going through schema validation and decoding
// Instead directly convert to JSON `null`
if message.Value == nil {
return []byte("null"), nil
}
@ -354,8 +357,9 @@ func (k *Kafka) getSchemaRegistyClient() (srclient.ISchemaRegistryClient, error)
}
func (k *Kafka) SerializeValue(topic string, data []byte, metadata map[string]string) ([]byte, error) {
// Null Data is valid and a tombstone record. It shouldn't be serialized
if data == nil {
// Null Data is valid and a tombstone record.
// It should be converted to NULL and not go through schema validation & encoding
if bytes.Equal(data, []byte("null")) || data == nil {
return nil, nil
}

View File

@ -219,7 +219,14 @@ func TestSerializeValueCachingDisabled(t *testing.T) {
require.NoError(t, err)
})
t.Run("value published null, no error", func(t *testing.T) {
t.Run("value published 'null', no error", func(t *testing.T) {
act, err := k.SerializeValue("my-topic", []byte("null"), map[string]string{"valueSchemaType": "Avro"})
require.Nil(t, act)
require.NoError(t, err)
})
t.Run("value published nil, no error", func(t *testing.T) {
act, err := k.SerializeValue("my-topic", nil, map[string]string{"valueSchemaType": "Avro"})
require.Nil(t, act)