components-contrib/common/component/kafka/clients.go

65 lines
1.5 KiB
Go

package kafka
import (
"fmt"
"github.com/IBM/sarama"
awsAuth "github.com/dapr/components-contrib/common/authentication/aws"
)
type clients struct {
consumerGroup sarama.ConsumerGroup
producer sarama.SyncProducer
}
func (k *Kafka) latestClients() (*clients, error) {
switch {
// case 0: use mock clients for testing
case k.mockProducer != nil || k.mockConsumerGroup != nil:
return &clients{
consumerGroup: k.mockConsumerGroup,
producer: k.mockProducer,
}, nil
// case 1: use aws clients with refreshable tokens in the cfg
case k.awsAuthProvider != nil:
awsKafkaOpts := awsAuth.KafkaOptions{
Config: k.config,
ConsumerGroup: k.consumerGroup,
Brokers: k.brokers,
MaxMessageBytes: k.maxMessageBytes,
}
awsKafkaClients, err := k.awsAuthProvider.Kafka(awsKafkaOpts)
if err != nil {
return nil, fmt.Errorf("failed to get AWS IAM Kafka clients: %w", err)
}
return &clients{
consumerGroup: awsKafkaClients.ConsumerGroup,
producer: awsKafkaClients.Producer,
}, nil
// case 2: normal static auth profile clients
default:
if k.clients != nil {
return k.clients, nil
}
cg, err := sarama.NewConsumerGroup(k.brokers, k.consumerGroup, k.config)
if err != nil {
return nil, err
}
p, err := GetSyncProducer(*k.config, k.brokers, k.maxMessageBytes)
if err != nil {
return nil, err
}
newStaticClients := clients{
consumerGroup: cg,
producer: p,
}
k.clients = &newStaticClients
return k.clients, nil
}
}