Fix Kafka pubsub use of AuthRequired metadata (#1015)

Kafka pubsub component was not setting `k.authRequired` property based
on parsed `meta.AuthRequired` value, so would not correctly configure
Kafka producer or consumer with appropriate credentials when specified
by the author.

- Fix assignment of `meta.AuthRequired` to `k.authRequired` consumed by
  Subscribe().
- Fix initialization of `SyncProducer` to use copy of config with the
  authorization credentials added.

Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
This commit is contained in:
Simon Leet 2021-07-28 10:24:35 -07:00 committed by GitHub
parent 9c84bd202c
commit f84a8b8914
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 13 additions and 20 deletions

View File

@ -117,19 +117,9 @@ func (k *Kafka) Init(metadata pubsub.Metadata) error {
return err
}
p, err := k.getSyncProducer(meta)
if err != nil {
return err
}
k.brokers = meta.Brokers
k.producer = p
k.consumerGroup = meta.ConsumerGroup
if meta.AuthRequired {
k.saslUsername = meta.SaslUsername
k.saslPassword = meta.SaslPassword
}
k.authRequired = meta.AuthRequired
config := sarama.NewConfig()
config.Version = sarama.V2_0_0_0
@ -139,11 +129,18 @@ func (k *Kafka) Init(metadata pubsub.Metadata) error {
}
if k.authRequired {
k.saslUsername = meta.SaslUsername
k.saslPassword = meta.SaslPassword
updateAuthInfo(config, k.saslUsername, k.saslPassword)
}
k.config = config
k.producer, err = getSyncProducer(*k.config, k.brokers, meta.MaxMessageBytes)
if err != nil {
return err
}
k.topics = make(map[string]bool)
// Default retry configuration is used if no
@ -346,21 +343,17 @@ func (k *Kafka) getKafkaMetadata(metadata pubsub.Metadata) (*kafkaMetadata, erro
return &meta, nil
}
func (k *Kafka) getSyncProducer(meta *kafkaMetadata) (sarama.SyncProducer, error) {
config := sarama.NewConfig()
func getSyncProducer(config sarama.Config, brokers []string, maxMessageBytes int) (sarama.SyncProducer, error) {
// Add SyncProducer specific properties to copy of base config
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true
if k.authRequired {
updateAuthInfo(config, k.saslUsername, k.saslPassword)
if maxMessageBytes > 0 {
config.Producer.MaxMessageBytes = maxMessageBytes
}
if meta.MaxMessageBytes > 0 {
config.Producer.MaxMessageBytes = meta.MaxMessageBytes
}
producer, err := sarama.NewSyncProducer(meta.Brokers, config)
producer, err := sarama.NewSyncProducer(brokers, &config)
if err != nil {
return nil, err
}