Kafka: fix context use in AWS IAM auth (#3305)
Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> Co-authored-by: Yaron Schneider <schneider.yaron@live.com> Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
parent
be9d23e769
commit
7d39c46425
|
|
@ -19,6 +19,7 @@ import (
|
|||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
"github.com/aws/aws-msk-iam-sasl-signer-go/signer"
|
||||
|
|
@ -96,11 +97,12 @@ func updateAWSIAMAuthInfo(ctx context.Context, config *sarama.Config, metadata *
|
|||
config.Net.SASL.Enable = true
|
||||
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
|
||||
config.Net.SASL.TokenProvider = &mskAccessTokenProvider{
|
||||
ctx: ctx,
|
||||
region: metadata.AWSRegion,
|
||||
accessKey: metadata.AWSAccessKey,
|
||||
secretKey: metadata.AWSSecretKey,
|
||||
sessionToken: metadata.AWSSessionToken,
|
||||
ctx: ctx,
|
||||
generateTokenTimeout: 10 * time.Second,
|
||||
region: metadata.AWSRegion,
|
||||
accessKey: metadata.AWSAccessKey,
|
||||
secretKey: metadata.AWSSecretKey,
|
||||
sessionToken: metadata.AWSSessionToken,
|
||||
}
|
||||
|
||||
_, err := config.Net.SASL.TokenProvider.Token()
|
||||
|
|
@ -111,16 +113,21 @@ func updateAWSIAMAuthInfo(ctx context.Context, config *sarama.Config, metadata *
|
|||
}
|
||||
|
||||
type mskAccessTokenProvider struct {
|
||||
ctx context.Context
|
||||
accessKey string
|
||||
secretKey string
|
||||
sessionToken string
|
||||
region string
|
||||
ctx context.Context
|
||||
generateTokenTimeout time.Duration
|
||||
accessKey string
|
||||
secretKey string
|
||||
sessionToken string
|
||||
region string
|
||||
}
|
||||
|
||||
func (m *mskAccessTokenProvider) Token() (*sarama.AccessToken, error) {
|
||||
// this function can't use the context passed on Init because that context would be cancelled right after Init
|
||||
ctx, cancel := context.WithTimeout(m.ctx, m.generateTokenTimeout)
|
||||
defer cancel()
|
||||
|
||||
if m.accessKey != "" && m.secretKey != "" {
|
||||
token, _, err := signer.GenerateAuthTokenFromCredentialsProvider(m.ctx, m.region, aws2.CredentialsProviderFunc(func(ctx context.Context) (aws2.Credentials, error) {
|
||||
token, _, err := signer.GenerateAuthTokenFromCredentialsProvider(ctx, m.region, aws2.CredentialsProviderFunc(func(ctx context.Context) (aws2.Credentials, error) {
|
||||
return aws2.Credentials{
|
||||
AccessKeyID: m.accessKey,
|
||||
SecretAccessKey: m.secretKey,
|
||||
|
|
@ -130,6 +137,6 @@ func (m *mskAccessTokenProvider) Token() (*sarama.AccessToken, error) {
|
|||
return &sarama.AccessToken{Token: token}, err
|
||||
}
|
||||
|
||||
token, _, err := signer.GenerateAuthToken(m.ctx, m.region)
|
||||
token, _, err := signer.GenerateAuthToken(ctx, m.region)
|
||||
return &sarama.AccessToken{Token: token}, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,6 +42,10 @@ type Kafka struct {
|
|||
subscribeTopics TopicHandlerConfig
|
||||
subscribeLock sync.Mutex
|
||||
|
||||
// used for background logic that cannot use the context passed to the Init function
|
||||
internalContext context.Context
|
||||
internalContextCancel func()
|
||||
|
||||
backOffConfig retry.Config
|
||||
|
||||
// The default value should be true for kafka pubsub component and false for kafka binding component
|
||||
|
|
@ -71,6 +75,9 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// this context can't use the context passed to Init because that context would be cancelled right after Init
|
||||
k.internalContext, k.internalContextCancel = context.WithCancel(context.Background())
|
||||
|
||||
k.brokers = meta.internalBrokers
|
||||
k.consumerGroup = meta.ConsumerGroup
|
||||
k.initialOffset = meta.internalInitialOffset
|
||||
|
|
@ -114,7 +121,7 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
|
|||
// already handled in updateTLSConfig
|
||||
case awsIAMAuthType:
|
||||
k.logger.Info("Configuring AWS IAM authentcation")
|
||||
err = updateAWSIAMAuthInfo(ctx, config, meta)
|
||||
err = updateAWSIAMAuthInfo(k.internalContext, config, meta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -152,6 +159,10 @@ func (k *Kafka) Close() (err error) {
|
|||
k.producer = nil
|
||||
}
|
||||
|
||||
if k.internalContext != nil {
|
||||
k.internalContextCancel()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue