From ea55aea8c332c42e52b9a00d59f78545c7a1e76c Mon Sep 17 00:00:00 2001 From: light-pan <373465009@qq.com> Date: Tue, 1 Feb 2022 01:08:34 +0800 Subject: [PATCH] Support configuring broker version in Kafka bindings component (#1459) * Update kafka.go Support configuring broker version in Kafka bindings component Signed-off-by: panxiangyue Signed-off-by: light-pan <373465009@qq.com> * Update kafka_test.go kafka binding add version test Signed-off-by: panxiangyue Signed-off-by: light-pan <373465009@qq.com> * Update kafka.go gofumpt check Co-Authored-By: light-pan <21969903+light-pan@users.noreply.github.com> Signed-off-by: light-pan <373465009@qq.com> Co-authored-by: light-pan <21969903+light-pan@users.noreply.github.com> Co-authored-by: Taction --- bindings/kafka/kafka.go | 17 +++++++++++++++-- bindings/kafka/kafka_test.go | 16 +++++++++++++++- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/bindings/kafka/kafka.go b/bindings/kafka/kafka.go index 35b19b18d..1e3424e64 100644 --- a/bindings/kafka/kafka.go +++ b/bindings/kafka/kafka.go @@ -47,6 +47,7 @@ type Kafka struct { saslPassword string initialOffset int64 logger logger.Logger + Version sarama.KafkaVersion } type kafkaMetadata struct { @@ -59,6 +60,7 @@ type kafkaMetadata struct { SaslPassword string `json:"saslPassword"` InitialOffset int64 `json:"initialOffset"` MaxMessageBytes int + Version sarama.KafkaVersion } type consumer struct { @@ -111,6 +113,7 @@ func (k *Kafka) Init(metadata bindings.Metadata) error { k.consumerGroup = meta.ConsumerGroup k.authRequired = meta.AuthRequired k.initialOffset = meta.InitialOffset + k.Version = meta.Version // ignore SASL properties if authRequired is false if meta.AuthRequired { @@ -198,6 +201,16 @@ func (k *Kafka) getKafkaMetadata(metadata bindings.Metadata) (*kafkaMetadata, er meta.MaxMessageBytes = maxBytes } + if val, ok := metadata.Properties["version"]; ok && val != "" { + version, err := sarama.ParseKafkaVersion(val) + if err != nil { + return nil, fmt.Errorf("kafka error: invalid 'version' attribute: %w", err) + } + meta.Version = version + } else { + meta.Version = sarama.V1_0_0_0 + } + return &meta, nil } @@ -206,7 +219,7 @@ func (k *Kafka) getSyncProducer(meta *kafkaMetadata) (sarama.SyncProducer, error config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 5 config.Producer.Return.Successes = true - config.Version = sarama.V1_0_0_0 + config.Version = meta.Version // ignore SASL properties if authRequired is false if meta.AuthRequired { @@ -232,7 +245,7 @@ func (k *Kafka) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error } config := sarama.NewConfig() - config.Version = sarama.V1_0_0_0 + config.Version = k.Version config.Consumer.Offsets.Initial = k.initialOffset // ignore SASL properties if authRequired is false if k.authRequired { diff --git a/bindings/kafka/kafka_test.go b/bindings/kafka/kafka_test.go index 8f67dff7b..de748214a 100644 --- a/bindings/kafka/kafka_test.go +++ b/bindings/kafka/kafka_test.go @@ -31,7 +31,7 @@ func TestParseMetadata(t *testing.T) { t.Run("correct metadata (authRequired false)", func(t *testing.T) { m := bindings.Metadata{} - m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a", "authRequired": "false"} + m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a", "authRequired": "false", "version": "1.1.0"} k := Kafka{logger: logger} meta, err := k.getKafkaMetadata(m) assert.Nil(t, err) @@ -40,6 +40,7 @@ func TestParseMetadata(t *testing.T) { assert.Equal(t, "a", meta.PublishTopic) assert.Equal(t, "a", meta.Topics[0]) assert.False(t, meta.AuthRequired) + assert.Equal(t, "1.1.0", meta.Version.String()) }) t.Run("correct metadata (authRequired FALSE)", func(t *testing.T) { @@ -52,6 +53,7 @@ func TestParseMetadata(t *testing.T) { assert.Equal(t, "a", meta.ConsumerGroup) assert.Equal(t, "a", meta.PublishTopic) assert.Equal(t, "a", meta.Topics[0]) + assert.Equal(t, "1.0.0", meta.Version.String()) assert.False(t, meta.AuthRequired) }) @@ -65,6 +67,7 @@ func TestParseMetadata(t *testing.T) { assert.Equal(t, "a", meta.ConsumerGroup) assert.Equal(t, "a", meta.PublishTopic) assert.Equal(t, "a", meta.Topics[0]) + assert.Equal(t, "1.0.0", meta.Version.String()) assert.False(t, meta.AuthRequired) }) @@ -78,6 +81,7 @@ func TestParseMetadata(t *testing.T) { assert.Equal(t, "a", meta.ConsumerGroup) assert.Equal(t, "a", meta.PublishTopic) assert.Equal(t, "a", meta.Topics[0]) + assert.Equal(t, "1.0.0", meta.Version.String()) assert.False(t, meta.AuthRequired) }) @@ -91,6 +95,7 @@ func TestParseMetadata(t *testing.T) { assert.Equal(t, "a", meta.ConsumerGroup) assert.Equal(t, "a", meta.PublishTopic) assert.Equal(t, "a", meta.Topics[0]) + assert.Equal(t, "1.0.0", meta.Version.String()) assert.False(t, meta.AuthRequired) }) @@ -104,6 +109,7 @@ func TestParseMetadata(t *testing.T) { assert.Equal(t, "a", meta.ConsumerGroup) assert.Equal(t, "a", meta.PublishTopic) assert.Equal(t, "a", meta.Topics[0]) + assert.Equal(t, "1.0.0", meta.Version.String()) assert.False(t, meta.AuthRequired) }) @@ -117,6 +123,7 @@ func TestParseMetadata(t *testing.T) { assert.Equal(t, "a", meta.ConsumerGroup) assert.Equal(t, "a", meta.PublishTopic) assert.Equal(t, "a", meta.Topics[0]) + assert.Equal(t, "1.0.0", meta.Version.String()) assert.False(t, meta.AuthRequired) }) @@ -133,6 +140,7 @@ func TestParseMetadata(t *testing.T) { assert.True(t, meta.AuthRequired) assert.Equal(t, "foo", meta.SaslUsername) assert.Equal(t, "bar", meta.SaslPassword) + assert.Equal(t, "1.0.0", meta.Version.String()) }) t.Run("correct metadata (authRequired TRUE)", func(t *testing.T) { @@ -148,6 +156,7 @@ func TestParseMetadata(t *testing.T) { assert.True(t, meta.AuthRequired) assert.Equal(t, "foo", meta.SaslUsername) assert.Equal(t, "bar", meta.SaslPassword) + assert.Equal(t, "1.0.0", meta.Version.String()) }) t.Run("correct metadata (authRequired True)", func(t *testing.T) { @@ -163,6 +172,7 @@ func TestParseMetadata(t *testing.T) { assert.True(t, meta.AuthRequired) assert.Equal(t, "foo", meta.SaslUsername) assert.Equal(t, "bar", meta.SaslPassword) + assert.Equal(t, "1.0.0", meta.Version.String()) }) t.Run("correct metadata (authRequired T)", func(t *testing.T) { @@ -178,6 +188,7 @@ func TestParseMetadata(t *testing.T) { assert.True(t, meta.AuthRequired) assert.Equal(t, "foo", meta.SaslUsername) assert.Equal(t, "bar", meta.SaslPassword) + assert.Equal(t, "1.0.0", meta.Version.String()) }) t.Run("correct metadata (authRequired t)", func(t *testing.T) { @@ -208,6 +219,7 @@ func TestParseMetadata(t *testing.T) { assert.True(t, meta.AuthRequired) assert.Equal(t, "foo", meta.SaslUsername) assert.Equal(t, "bar", meta.SaslPassword) + assert.Equal(t, "1.0.0", meta.Version.String()) }) t.Run("correct metadata (maxMessageBytes 2048)", func(t *testing.T) { @@ -224,6 +236,7 @@ func TestParseMetadata(t *testing.T) { assert.Equal(t, "foo", meta.SaslUsername) assert.Equal(t, "bar", meta.SaslPassword) assert.Equal(t, 2048, meta.MaxMessageBytes) + assert.Equal(t, "1.0.0", meta.Version.String()) }) t.Run("correct metadata (no maxMessageBytes)", func(t *testing.T) { @@ -240,6 +253,7 @@ func TestParseMetadata(t *testing.T) { assert.Equal(t, "foo", meta.SaslUsername) assert.Equal(t, "bar", meta.SaslPassword) assert.Equal(t, 0, meta.MaxMessageBytes) + assert.Equal(t, "1.0.0", meta.Version.String()) }) t.Run("missing authRequired", func(t *testing.T) {