Add consumer proto schema validation for pulsar (#3014)

Signed-off-by: yaron2 <schneider.yaron@live.com>
Signed-off-by: Yaron Schneider <schneider.yaron@live.com>
Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
Yaron Schneider 2023-07-25 11:12:47 -07:00 committed by GitHub
parent c75005582b
commit 76b8480ff9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 56 additions and 8 deletions

View File

@ -52,6 +52,7 @@ const (
redeliveryDelay = "redeliveryDelay"
avroProtocol = "avro"
jsonProtocol = "json"
protoProtocol = "proto"
partitionKey = "partitionKey"
defaultTenant = "public"
@ -61,11 +62,12 @@ const (
pulsarToken = "token"
// topicFormat is the format for pulsar, which have a well-defined structure: {persistent|non-persistent}://tenant/namespace/topic,
// see https://pulsar.apache.org/docs/en/concepts-messaging/#topics for details.
topicFormat = "%s://%s/%s/%s"
persistentStr = "persistent"
nonPersistentStr = "non-persistent"
topicJSONSchemaIdentifier = ".jsonschema"
topicAvroSchemaIdentifier = ".avroschema"
topicFormat = "%s://%s/%s/%s"
persistentStr = "persistent"
nonPersistentStr = "non-persistent"
topicJSONSchemaIdentifier = ".jsonschema"
topicAvroSchemaIdentifier = ".avroschema"
topicProtoSchemaIdentifier = ".protoschema"
// defaultBatchingMaxPublishDelay init default for maximum delay to batch messages.
defaultBatchingMaxPublishDelay = 10 * time.Millisecond
@ -130,18 +132,25 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
}
for k, v := range meta.Properties {
if strings.HasSuffix(k, topicJSONSchemaIdentifier) {
switch {
case strings.HasSuffix(k, topicJSONSchemaIdentifier):
topic := k[:len(k)-len(topicJSONSchemaIdentifier)]
m.internalTopicSchemas[topic] = schemaMetadata{
protocol: jsonProtocol,
value: v,
}
} else if strings.HasSuffix(k, topicAvroSchemaIdentifier) {
topic := k[:len(k)-len(topicJSONSchemaIdentifier)]
case strings.HasSuffix(k, topicAvroSchemaIdentifier):
topic := k[:len(k)-len(topicAvroSchemaIdentifier)]
m.internalTopicSchemas[topic] = schemaMetadata{
protocol: avroProtocol,
value: v,
}
case strings.HasSuffix(k, topicProtoSchemaIdentifier):
topic := k[:len(k)-len(topicProtoSchemaIdentifier)]
m.internalTopicSchemas[topic] = schemaMetadata{
protocol: protoProtocol,
value: v,
}
}
}
@ -267,6 +276,8 @@ func getPulsarSchema(metadata schemaMetadata) pulsar.Schema {
return pulsar.NewJSONSchema(metadata.value, nil)
case avroProtocol:
return pulsar.NewAvroSchema(metadata.value, nil)
case protoProtocol:
return pulsar.NewProtoSchema(metadata.value, nil)
default:
return nil
}

View File

@ -80,6 +80,22 @@ func TestParsePulsarSchemaMetadata(t *testing.T) {
assert.Equal(t, "2", meta.internalTopicSchemas["kenobi.avroschema"].value)
})
t.Run("test proto", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"host": "a",
"obiwan.avroschema": "1",
"kenobi.protoschema.protoschema": "2",
}
meta, err := parsePulsarMetadata(m)
assert.NoError(t, err)
assert.Equal(t, "a", meta.Host)
assert.Len(t, meta.internalTopicSchemas, 2)
assert.Equal(t, "1", meta.internalTopicSchemas["obiwan"].value)
assert.Equal(t, "2", meta.internalTopicSchemas["kenobi.protoschema"].value)
})
t.Run("test combined avro/json", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
@ -98,6 +114,27 @@ func TestParsePulsarSchemaMetadata(t *testing.T) {
assert.Equal(t, jsonProtocol, meta.internalTopicSchemas["kenobi"].protocol)
})
t.Run("test combined avro/json/proto", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"host": "a",
"obiwan.avroschema": "1",
"kenobi.jsonschema": "2",
"darth.protoschema": "3",
}
meta, err := parsePulsarMetadata(m)
assert.Nil(t, err)
assert.Equal(t, "a", meta.Host)
assert.Len(t, meta.internalTopicSchemas, 3)
assert.Equal(t, "1", meta.internalTopicSchemas["obiwan"].value)
assert.Equal(t, "2", meta.internalTopicSchemas["kenobi"].value)
assert.Equal(t, "3", meta.internalTopicSchemas["darth"].value)
assert.Equal(t, avroProtocol, meta.internalTopicSchemas["obiwan"].protocol)
assert.Equal(t, jsonProtocol, meta.internalTopicSchemas["kenobi"].protocol)
assert.Equal(t, protoProtocol, meta.internalTopicSchemas["darth"].protocol)
})
t.Run("test funky edge case", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{