Support configuring broker version in Kafka bindings component (#1459)

* Update kafka.go

Support configuring broker version in Kafka bindings component

Signed-off-by: panxiangyue <panxiangyue@corp-ci.com>
Signed-off-by: light-pan <373465009@qq.com>

* Update kafka_test.go

kafka binding add version test

Signed-off-by: panxiangyue <panxiangyue@corp-ci.com>
Signed-off-by: light-pan <373465009@qq.com>

* Update kafka.go

gofumpt check

Co-Authored-By: light-pan <21969903+light-pan@users.noreply.github.com>
Signed-off-by: light-pan <373465009@qq.com>

Co-authored-by: light-pan <21969903+light-pan@users.noreply.github.com>
Co-authored-by: Taction <zchao9100@gmail.com>
This commit is contained in:
light-pan 2022-02-01 01:08:34 +08:00 committed by GitHub
parent 8fe02469ba
commit ea55aea8c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 3 deletions

View File

@ -47,6 +47,7 @@ type Kafka struct {
saslPassword string saslPassword string
initialOffset int64 initialOffset int64
logger logger.Logger logger logger.Logger
Version sarama.KafkaVersion
} }
type kafkaMetadata struct { type kafkaMetadata struct {
@ -59,6 +60,7 @@ type kafkaMetadata struct {
SaslPassword string `json:"saslPassword"` SaslPassword string `json:"saslPassword"`
InitialOffset int64 `json:"initialOffset"` InitialOffset int64 `json:"initialOffset"`
MaxMessageBytes int MaxMessageBytes int
Version sarama.KafkaVersion
} }
type consumer struct { type consumer struct {
@ -111,6 +113,7 @@ func (k *Kafka) Init(metadata bindings.Metadata) error {
k.consumerGroup = meta.ConsumerGroup k.consumerGroup = meta.ConsumerGroup
k.authRequired = meta.AuthRequired k.authRequired = meta.AuthRequired
k.initialOffset = meta.InitialOffset k.initialOffset = meta.InitialOffset
k.Version = meta.Version
// ignore SASL properties if authRequired is false // ignore SASL properties if authRequired is false
if meta.AuthRequired { if meta.AuthRequired {
@ -198,6 +201,16 @@ func (k *Kafka) getKafkaMetadata(metadata bindings.Metadata) (*kafkaMetadata, er
meta.MaxMessageBytes = maxBytes meta.MaxMessageBytes = maxBytes
} }
if val, ok := metadata.Properties["version"]; ok && val != "" {
version, err := sarama.ParseKafkaVersion(val)
if err != nil {
return nil, fmt.Errorf("kafka error: invalid 'version' attribute: %w", err)
}
meta.Version = version
} else {
meta.Version = sarama.V1_0_0_0
}
return &meta, nil return &meta, nil
} }
@ -206,7 +219,7 @@ func (k *Kafka) getSyncProducer(meta *kafkaMetadata) (sarama.SyncProducer, error
config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5 config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true config.Producer.Return.Successes = true
config.Version = sarama.V1_0_0_0 config.Version = meta.Version
// ignore SASL properties if authRequired is false // ignore SASL properties if authRequired is false
if meta.AuthRequired { if meta.AuthRequired {
@ -232,7 +245,7 @@ func (k *Kafka) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error
} }
config := sarama.NewConfig() config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0 config.Version = k.Version
config.Consumer.Offsets.Initial = k.initialOffset config.Consumer.Offsets.Initial = k.initialOffset
// ignore SASL properties if authRequired is false // ignore SASL properties if authRequired is false
if k.authRequired { if k.authRequired {

View File

@ -31,7 +31,7 @@ func TestParseMetadata(t *testing.T) {
t.Run("correct metadata (authRequired false)", func(t *testing.T) { t.Run("correct metadata (authRequired false)", func(t *testing.T) {
m := bindings.Metadata{} m := bindings.Metadata{}
m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a", "authRequired": "false"} m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a", "authRequired": "false", "version": "1.1.0"}
k := Kafka{logger: logger} k := Kafka{logger: logger}
meta, err := k.getKafkaMetadata(m) meta, err := k.getKafkaMetadata(m)
assert.Nil(t, err) assert.Nil(t, err)
@ -40,6 +40,7 @@ func TestParseMetadata(t *testing.T) {
assert.Equal(t, "a", meta.PublishTopic) assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0]) assert.Equal(t, "a", meta.Topics[0])
assert.False(t, meta.AuthRequired) assert.False(t, meta.AuthRequired)
assert.Equal(t, "1.1.0", meta.Version.String())
}) })
t.Run("correct metadata (authRequired FALSE)", func(t *testing.T) { t.Run("correct metadata (authRequired FALSE)", func(t *testing.T) {
@ -52,6 +53,7 @@ func TestParseMetadata(t *testing.T) {
assert.Equal(t, "a", meta.ConsumerGroup) assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic) assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0]) assert.Equal(t, "a", meta.Topics[0])
assert.Equal(t, "1.0.0", meta.Version.String())
assert.False(t, meta.AuthRequired) assert.False(t, meta.AuthRequired)
}) })
@ -65,6 +67,7 @@ func TestParseMetadata(t *testing.T) {
assert.Equal(t, "a", meta.ConsumerGroup) assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic) assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0]) assert.Equal(t, "a", meta.Topics[0])
assert.Equal(t, "1.0.0", meta.Version.String())
assert.False(t, meta.AuthRequired) assert.False(t, meta.AuthRequired)
}) })
@ -78,6 +81,7 @@ func TestParseMetadata(t *testing.T) {
assert.Equal(t, "a", meta.ConsumerGroup) assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic) assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0]) assert.Equal(t, "a", meta.Topics[0])
assert.Equal(t, "1.0.0", meta.Version.String())
assert.False(t, meta.AuthRequired) assert.False(t, meta.AuthRequired)
}) })
@ -91,6 +95,7 @@ func TestParseMetadata(t *testing.T) {
assert.Equal(t, "a", meta.ConsumerGroup) assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic) assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0]) assert.Equal(t, "a", meta.Topics[0])
assert.Equal(t, "1.0.0", meta.Version.String())
assert.False(t, meta.AuthRequired) assert.False(t, meta.AuthRequired)
}) })
@ -104,6 +109,7 @@ func TestParseMetadata(t *testing.T) {
assert.Equal(t, "a", meta.ConsumerGroup) assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic) assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0]) assert.Equal(t, "a", meta.Topics[0])
assert.Equal(t, "1.0.0", meta.Version.String())
assert.False(t, meta.AuthRequired) assert.False(t, meta.AuthRequired)
}) })
@ -117,6 +123,7 @@ func TestParseMetadata(t *testing.T) {
assert.Equal(t, "a", meta.ConsumerGroup) assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic) assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0]) assert.Equal(t, "a", meta.Topics[0])
assert.Equal(t, "1.0.0", meta.Version.String())
assert.False(t, meta.AuthRequired) assert.False(t, meta.AuthRequired)
}) })
@ -133,6 +140,7 @@ func TestParseMetadata(t *testing.T) {
assert.True(t, meta.AuthRequired) assert.True(t, meta.AuthRequired)
assert.Equal(t, "foo", meta.SaslUsername) assert.Equal(t, "foo", meta.SaslUsername)
assert.Equal(t, "bar", meta.SaslPassword) assert.Equal(t, "bar", meta.SaslPassword)
assert.Equal(t, "1.0.0", meta.Version.String())
}) })
t.Run("correct metadata (authRequired TRUE)", func(t *testing.T) { t.Run("correct metadata (authRequired TRUE)", func(t *testing.T) {
@ -148,6 +156,7 @@ func TestParseMetadata(t *testing.T) {
assert.True(t, meta.AuthRequired) assert.True(t, meta.AuthRequired)
assert.Equal(t, "foo", meta.SaslUsername) assert.Equal(t, "foo", meta.SaslUsername)
assert.Equal(t, "bar", meta.SaslPassword) assert.Equal(t, "bar", meta.SaslPassword)
assert.Equal(t, "1.0.0", meta.Version.String())
}) })
t.Run("correct metadata (authRequired True)", func(t *testing.T) { t.Run("correct metadata (authRequired True)", func(t *testing.T) {
@ -163,6 +172,7 @@ func TestParseMetadata(t *testing.T) {
assert.True(t, meta.AuthRequired) assert.True(t, meta.AuthRequired)
assert.Equal(t, "foo", meta.SaslUsername) assert.Equal(t, "foo", meta.SaslUsername)
assert.Equal(t, "bar", meta.SaslPassword) assert.Equal(t, "bar", meta.SaslPassword)
assert.Equal(t, "1.0.0", meta.Version.String())
}) })
t.Run("correct metadata (authRequired T)", func(t *testing.T) { t.Run("correct metadata (authRequired T)", func(t *testing.T) {
@ -178,6 +188,7 @@ func TestParseMetadata(t *testing.T) {
assert.True(t, meta.AuthRequired) assert.True(t, meta.AuthRequired)
assert.Equal(t, "foo", meta.SaslUsername) assert.Equal(t, "foo", meta.SaslUsername)
assert.Equal(t, "bar", meta.SaslPassword) assert.Equal(t, "bar", meta.SaslPassword)
assert.Equal(t, "1.0.0", meta.Version.String())
}) })
t.Run("correct metadata (authRequired t)", func(t *testing.T) { t.Run("correct metadata (authRequired t)", func(t *testing.T) {
@ -208,6 +219,7 @@ func TestParseMetadata(t *testing.T) {
assert.True(t, meta.AuthRequired) assert.True(t, meta.AuthRequired)
assert.Equal(t, "foo", meta.SaslUsername) assert.Equal(t, "foo", meta.SaslUsername)
assert.Equal(t, "bar", meta.SaslPassword) assert.Equal(t, "bar", meta.SaslPassword)
assert.Equal(t, "1.0.0", meta.Version.String())
}) })
t.Run("correct metadata (maxMessageBytes 2048)", func(t *testing.T) { t.Run("correct metadata (maxMessageBytes 2048)", func(t *testing.T) {
@ -224,6 +236,7 @@ func TestParseMetadata(t *testing.T) {
assert.Equal(t, "foo", meta.SaslUsername) assert.Equal(t, "foo", meta.SaslUsername)
assert.Equal(t, "bar", meta.SaslPassword) assert.Equal(t, "bar", meta.SaslPassword)
assert.Equal(t, 2048, meta.MaxMessageBytes) assert.Equal(t, 2048, meta.MaxMessageBytes)
assert.Equal(t, "1.0.0", meta.Version.String())
}) })
t.Run("correct metadata (no maxMessageBytes)", func(t *testing.T) { t.Run("correct metadata (no maxMessageBytes)", func(t *testing.T) {
@ -240,6 +253,7 @@ func TestParseMetadata(t *testing.T) {
assert.Equal(t, "foo", meta.SaslUsername) assert.Equal(t, "foo", meta.SaslUsername)
assert.Equal(t, "bar", meta.SaslPassword) assert.Equal(t, "bar", meta.SaslPassword)
assert.Equal(t, 0, meta.MaxMessageBytes) assert.Equal(t, 0, meta.MaxMessageBytes)
assert.Equal(t, "1.0.0", meta.Version.String())
}) })
t.Run("missing authRequired", func(t *testing.T) { t.Run("missing authRequired", func(t *testing.T) {