package kafka import ( "encoding/binary" "encoding/json" "testing" "time" "github.com/IBM/sarama" gomock "github.com/golang/mock/gomock" "github.com/linkedin/goavro/v2" "github.com/riferrei/srclient" "github.com/stretchr/testify/require" mock_srclient "github.com/dapr/components-contrib/common/component/kafka/mocks" ) func TestGetValueSchemaType(t *testing.T) { t.Run("No Metadata, return None", func(t *testing.T) { act, _ := GetValueSchemaType(nil) require.Equal(t, None, act) }) t.Run("No valueSchemaType, return None", func(t *testing.T) { act, _ := GetValueSchemaType(make(map[string]string)) require.Equal(t, None, act) }) t.Run("valueSchemaType='AVRO', return AVRO", func(t *testing.T) { act, _ := GetValueSchemaType(map[string]string{"valueSchemaType": "AVRO"}) require.Equal(t, Avro, act) }) t.Run("valueSchemaType='None', return None", func(t *testing.T) { act, _ := GetValueSchemaType(map[string]string{"valueSchemaType": "None"}) require.Equal(t, None, act) }) t.Run("valueSchemaType='XXX', return Error", func(t *testing.T) { _, err := GetValueSchemaType(map[string]string{"valueSchemaType": "XXX"}) require.Error(t, err) }) } var ( testSchema1 = `{"type": "record", "name": "cupcake", "fields": [{"name": "flavor", "type": "string"}, {"name": "created_date", "type": ["null",{"type": "long","logicalType": "timestamp-millis"}],"default": null}]}` testValue1 = map[string]interface{}{"flavor": "chocolate", "created_date": float64(time.Now().UnixMilli())} invValue = map[string]string{"xxx": "chocolate"} ) func TestDeserializeValue(t *testing.T) { registry := srclient.CreateMockSchemaRegistryClient("http://localhost:8081") schema, _ := registry.CreateSchema("my-topic-value", testSchema1, srclient.Avro) handlerConfig := SubscriptionHandlerConfig{ IsBulkSubscribe: false, ValueSchemaType: Avro, } k := Kafka{ srClient: registry, schemaCachingEnabled: true, } schemaIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID())) valJSON, _ := json.Marshal(testValue1) codec, _ := goavro.NewCodecForStandardJSONFull(testSchema1) native, _, _ := codec.NativeFromTextual(valJSON) valueBytes, _ := codec.BinaryFromNative(nil, native) var recordValue []byte recordValue = append(recordValue, byte(0)) recordValue = append(recordValue, schemaIDBytes...) recordValue = append(recordValue, valueBytes...) t.Run("Schema found, return value", func(t *testing.T) { msg := sarama.ConsumerMessage{ Key: []byte("my_key"), Value: recordValue, Topic: "my-topic", } act, _ := k.DeserializeValue(&msg, handlerConfig) var actMap map[string]any json.Unmarshal(act, &actMap) require.Equal(t, testValue1, actMap) }) t.Run("Invalid too short data, return error", func(t *testing.T) { msg := sarama.ConsumerMessage{ Key: []byte("my_key"), Value: []byte("xxxx"), Topic: "my-topic", } _, err := k.DeserializeValue(&msg, handlerConfig) require.Error(t, err) }) t.Run("Invalid Schema ID, return error", func(t *testing.T) { msg := sarama.ConsumerMessage{ Key: []byte("my_key"), Value: []byte("xxxxx"), Topic: "my-topic", } _, err := k.DeserializeValue(&msg, handlerConfig) require.Error(t, err) }) t.Run("Invalid data, return error", func(t *testing.T) { var invalidVal []byte invalidVal = append(invalidVal, byte(0)) invalidVal = append(invalidVal, schemaIDBytes...) invalidVal = append(invalidVal, []byte("xxx")...) msg := sarama.ConsumerMessage{ Key: []byte("my_key"), Value: invalidVal, Topic: "my-topic", } _, err := k.DeserializeValue(&msg, handlerConfig) require.Error(t, err) }) t.Run("Missing Schema Registry settings, return error", func(t *testing.T) { kInv := Kafka{ srClient: nil, schemaCachingEnabled: true, } msg := sarama.ConsumerMessage{ Key: []byte("my_key"), Value: recordValue, Topic: "my-topic", } _, err := kInv.DeserializeValue(&msg, handlerConfig) require.Error(t, err, "schema registry details not set") }) } func assertValueSerialized(t *testing.T, act []byte, valJSON []byte, schema *srclient.Schema) { require.NotEqual(t, act, valJSON) actSchemaID := int(binary.BigEndian.Uint32(act[1:5])) codec, _ := goavro.NewCodecForStandardJSONFull(schema.Schema()) native, _, _ := codec.NativeFromBinary(act[5:]) actJSON, _ := codec.TextualFromNative(nil, native) var actMap map[string]any json.Unmarshal(actJSON, &actMap) require.Equal(t, schema.ID(), actSchemaID) require.Equal(t, testValue1, actMap) } func TestSerializeValueCachingDisabled(t *testing.T) { registry := srclient.CreateMockSchemaRegistryClient("http://localhost:8081") schema, _ := registry.CreateSchema("my-topic-value", testSchema1, srclient.Avro) k := Kafka{ srClient: registry, schemaCachingEnabled: false, } t.Run("valueSchemaType not set, leave value as is", func(t *testing.T) { valJSON, _ := json.Marshal(testValue1) act, _ := k.SerializeValue("my-topic", valJSON, map[string]string{}) require.Equal(t, valJSON, act) }) t.Run("valueSchemaType set to None, leave value as is", func(t *testing.T) { valJSON, _ := json.Marshal(testValue1) act, _ := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "None"}) require.Equal(t, valJSON, act) }) t.Run("valueSchemaType set to None, leave value as is", func(t *testing.T) { valJSON, _ := json.Marshal(testValue1) act, _ := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "NONE"}) require.Equal(t, valJSON, act) }) t.Run("valueSchemaType invalid, return error", func(t *testing.T) { valJSON, _ := json.Marshal(testValue1) _, err := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "xx"}) require.Error(t, err, "error parsing schema type. 'xx' is not a supported value") }) t.Run("schema found, serialize value as Avro binary", func(t *testing.T) { valJSON, _ := json.Marshal(testValue1) act, _ := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"}) assertValueSerialized(t, act, valJSON, schema) }) t.Run("invalid data, return error", func(t *testing.T) { valJSON, _ := json.Marshal(invValue) _, err := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"}) require.Error(t, err, "cannot decode textual record \"cupcake\": cannot decode textual map: cannot determine codec: \"xxx\"") }) } func TestSerializeValueCachingEnabled(t *testing.T) { registry := srclient.CreateMockSchemaRegistryClient("http://localhost:8081") schema, _ := registry.CreateSchema("my-topic-value", testSchema1, srclient.Avro) k := Kafka{ srClient: registry, schemaCachingEnabled: true, latestSchemaCache: make(map[string]SchemaCacheEntry), latestSchemaCacheTTL: time.Minute * 5, } t.Run("valueSchemaType not set, leave value as is", func(t *testing.T) { valJSON, _ := json.Marshal(testValue1) act, _ := k.SerializeValue("my-topic", valJSON, map[string]string{}) require.Equal(t, valJSON, act) }) t.Run("schema found, serialize value as Avro binary", func(t *testing.T) { valJSON, _ := json.Marshal(testValue1) act, _ := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"}) assertValueSerialized(t, act, valJSON, schema) }) } func TestLatestSchemaCaching(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() registry := srclient.CreateMockSchemaRegistryClient("http://locahost:8081") m := mock_srclient.NewMockISchemaRegistryClient(ctrl) schema, _ := registry.CreateSchema("my-topic-value", testSchema1, srclient.Avro) t.Run("Caching enabled, call GetLatestSchema() only once", func(t *testing.T) { k := Kafka{ srClient: m, schemaCachingEnabled: true, latestSchemaCache: make(map[string]SchemaCacheEntry), latestSchemaCacheTTL: time.Second * 10, } m.EXPECT().GetLatestSchema(gomock.Eq("my-topic-value")).Return(schema, nil).Times(1) valJSON, _ := json.Marshal(testValue1) act, _ := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"}) assertValueSerialized(t, act, valJSON, schema) // Call a 2nd time within TTL and make sure it's not called again act, _ = k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"}) assertValueSerialized(t, act, valJSON, schema) }) t.Run("Caching enabled, when cache entry expires, call GetLatestSchema() again", func(t *testing.T) { k := Kafka{ srClient: m, schemaCachingEnabled: true, latestSchemaCache: make(map[string]SchemaCacheEntry), latestSchemaCacheTTL: time.Second * 1, } m.EXPECT().GetLatestSchema(gomock.Eq("my-topic-value")).Return(schema, nil).Times(2) valJSON, _ := json.Marshal(testValue1) act, _ := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"}) assertValueSerialized(t, act, valJSON, schema) time.Sleep(2 * time.Second) // Call a 2nd time within TTL and make sure it's not called again act, _ = k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"}) assertValueSerialized(t, act, valJSON, schema) }) t.Run("Caching disabled, call GetLatestSchema() twice", func(t *testing.T) { k := Kafka{ srClient: m, schemaCachingEnabled: false, latestSchemaCache: make(map[string]SchemaCacheEntry), latestSchemaCacheTTL: 0, } m.EXPECT().GetLatestSchema(gomock.Eq("my-topic-value")).Return(schema, nil).Times(2) valJSON, _ := json.Marshal(testValue1) act, _ := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"}) assertValueSerialized(t, act, valJSON, schema) // Call a 2nd time within TTL and make sure it's not called again act, _ = k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"}) assertValueSerialized(t, act, valJSON, schema) }) }