65 lines
1.5 KiB
Go
65 lines
1.5 KiB
Go
package kafka
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/IBM/sarama"
|
|
|
|
awsAuth "github.com/dapr/components-contrib/common/authentication/aws"
|
|
)
|
|
|
|
type clients struct {
|
|
consumerGroup sarama.ConsumerGroup
|
|
producer sarama.SyncProducer
|
|
}
|
|
|
|
func (k *Kafka) latestClients() (*clients, error) {
|
|
switch {
|
|
// case 0: use mock clients for testing
|
|
case k.mockProducer != nil || k.mockConsumerGroup != nil:
|
|
return &clients{
|
|
consumerGroup: k.mockConsumerGroup,
|
|
producer: k.mockProducer,
|
|
}, nil
|
|
|
|
// case 1: use aws clients with refreshable tokens in the cfg
|
|
case k.awsAuthProvider != nil:
|
|
awsKafkaOpts := awsAuth.KafkaOptions{
|
|
Config: k.config,
|
|
ConsumerGroup: k.consumerGroup,
|
|
Brokers: k.brokers,
|
|
MaxMessageBytes: k.maxMessageBytes,
|
|
}
|
|
awsKafkaClients, err := k.awsAuthProvider.Kafka(awsKafkaOpts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get AWS IAM Kafka clients: %w", err)
|
|
}
|
|
return &clients{
|
|
consumerGroup: awsKafkaClients.ConsumerGroup,
|
|
producer: awsKafkaClients.Producer,
|
|
}, nil
|
|
|
|
// case 2: normal static auth profile clients
|
|
default:
|
|
if k.clients != nil {
|
|
return k.clients, nil
|
|
}
|
|
cg, err := sarama.NewConsumerGroup(k.brokers, k.consumerGroup, k.config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
p, err := GetSyncProducer(*k.config, k.brokers, k.maxMessageBytes)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
newStaticClients := clients{
|
|
consumerGroup: cg,
|
|
producer: p,
|
|
}
|
|
k.clients = &newStaticClients
|
|
return k.clients, nil
|
|
}
|
|
}
|