Kafka: Support for AWS IAM role auth (#3310)

Signed-off-by: Eunice Compra <eunicecompra@gmail.com>
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com>
Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
Co-authored-by: Eunice Compra <eunice.compra@gmail.com>
Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
Co-authored-by: Bernd Verst <github@bernd.dev>
Co-authored-by: Fabian Martinez <46371672+famarting@users.noreply.github.com>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
This commit is contained in:
Eunice Compra 2024-01-10 15:11:32 +11:00 committed by GitHub
parent 419f03fc02
commit 85722a4e88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 63 additions and 2 deletions

View File

@ -140,7 +140,7 @@ authenticationProfiles:
allowedValues: allowedValues:
- "none" - "none"
- title: "AWS IAM" - title: "AWS IAM"
description: "Authenticate using AWS IAM, useful for Serverless AWS MSK" description: "Authenticate using AWS IAM credentials or role for AWS MSK"
metadata: metadata:
- name: authType - name: authType
type: string type: string
@ -176,6 +176,18 @@ authenticationProfiles:
description: | description: |
AWS session token to use. A session token is only required if you are using\ntemporary security credentials. AWS session token to use. A session token is only required if you are using\ntemporary security credentials.
example: '"TOKEN"' example: '"TOKEN"'
- name: awsIamRoleArn
type: string
required: true
description: |
IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials.
example: '"arn:aws:iam::123456789:role/mskRole"'
- name: awsStsSessionName
type: string
description: |
Represents the session name for assuming a role.
example: '"MyAppSession"'
default: '"MSKSASLDefaultSession"'
metadata: metadata:
- name: topics - name: topics
type: string type: string

View File

@ -103,6 +103,8 @@ func updateAWSIAMAuthInfo(ctx context.Context, config *sarama.Config, metadata *
accessKey: metadata.AWSAccessKey, accessKey: metadata.AWSAccessKey,
secretKey: metadata.AWSSecretKey, secretKey: metadata.AWSSecretKey,
sessionToken: metadata.AWSSessionToken, sessionToken: metadata.AWSSessionToken,
awsIamRoleArn: metadata.AWSIamRoleArn,
awsStsSessionName: metadata.AWSStsSessionName,
} }
_, err := config.Net.SASL.TokenProvider.Token() _, err := config.Net.SASL.TokenProvider.Token()
@ -118,6 +120,8 @@ type mskAccessTokenProvider struct {
accessKey string accessKey string
secretKey string secretKey string
sessionToken string sessionToken string
awsIamRoleArn string
awsStsSessionName string
region string region string
} }
@ -135,6 +139,9 @@ func (m *mskAccessTokenProvider) Token() (*sarama.AccessToken, error) {
}, nil }, nil
})) }))
return &sarama.AccessToken{Token: token}, err return &sarama.AccessToken{Token: token}, err
} else if m.awsIamRoleArn != "" {
token, _, err := signer.GenerateAuthTokenFromRole(ctx, m.region, m.awsIamRoleArn, m.awsStsSessionName)
return &sarama.AccessToken{Token: token}, err
} }
token, _, err := signer.GenerateAuthToken(ctx, m.region) token, _, err := signer.GenerateAuthToken(ctx, m.region)

View File

@ -84,6 +84,8 @@ type KafkaMetadata struct {
AWSAccessKey string `mapstructure:"awsAccessKey"` AWSAccessKey string `mapstructure:"awsAccessKey"`
AWSSecretKey string `mapstructure:"awsSecretKey"` AWSSecretKey string `mapstructure:"awsSecretKey"`
AWSSessionToken string `mapstructure:"awsSessionToken"` AWSSessionToken string `mapstructure:"awsSessionToken"`
AWSIamRoleArn string `mapstructure:"awsIamRoleArn"`
AWSStsSessionName string `mapstructure:"awsStsSessionName"`
AWSRegion string `mapstructure:"awsRegion"` AWSRegion string `mapstructure:"awsRegion"`
channelBufferSize int `mapstructure:"-"` channelBufferSize int `mapstructure:"-"`
consumerFetchMin int32 `mapstructure:"-"` consumerFetchMin int32 `mapstructure:"-"`
@ -244,6 +246,9 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
if m.AWSRegion == "" { if m.AWSRegion == "" {
return nil, errors.New("missing AWS region property 'awsRegion' for authType 'awsiam'") return nil, errors.New("missing AWS region property 'awsRegion' for authType 'awsiam'")
} }
if m.AWSIamRoleArn == "" && m.AWSSecretKey == "" && m.AWSAccessKey == "" {
return nil, errors.New("missing AWS credentials or IAM role properties for authType 'awsiam'")
}
k.logger.Debug("Configuring AWS IAM authentication.") k.logger.Debug("Configuring AWS IAM authentication.")
default: default:
return nil, errors.New("kafka error: invalid value for 'authType' attribute") return nil, errors.New("kafka error: invalid value for 'authType' attribute")

View File

@ -369,6 +369,31 @@ func TestTls(t *testing.T) {
}) })
} }
func TestAwsIam(t *testing.T) {
k := getKafka()
t.Run("missing aws region", func(t *testing.T) {
m := getBaseMetadata()
m[authType] = awsIAMAuthType
meta, err := k.getKafkaMetadata(m)
require.Error(t, err)
require.Nil(t, meta)
require.Equal(t, "missing AWS region property 'awsRegion' for authType 'awsiam'", err.Error())
})
t.Run("missing aws credentials", func(t *testing.T) {
m := getBaseMetadata()
m[authType] = awsIAMAuthType
m["awsRegion"] = "us-east-1"
meta, err := k.getKafkaMetadata(m)
require.Error(t, err)
require.Nil(t, meta)
require.Equal(t, "missing AWS credentials or IAM role properties for authType 'awsiam'", err.Error())
})
}
func TestMetadataConsumerFetchValues(t *testing.T) { func TestMetadataConsumerFetchValues(t *testing.T) {
k := getKafka() k := getKafka()
m := getCompleteMetadata() m := getCompleteMetadata()

View File

@ -134,7 +134,7 @@ authenticationProfiles:
allowedValues: allowedValues:
- "none" - "none"
- title: "AWS IAM" - title: "AWS IAM"
description: "Authenticate using AWS IAM, useful for Serverless AWS MSK" description: "Authenticate using AWS IAM credentials or role for AWS MSK"
metadata: metadata:
- name: authType - name: authType
type: string type: string
@ -170,6 +170,18 @@ authenticationProfiles:
description: | description: |
AWS session token to use. A session token is only required if you are using\ntemporary security credentials. AWS session token to use. A session token is only required if you are using\ntemporary security credentials.
example: '"TOKEN"' example: '"TOKEN"'
- name: awsIamRoleArn
type: string
required: true
description: |
IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials.
example: '"arn:aws:iam::123456789:role/mskRole"'
- name: awsStsSessionName
type: string
description: |
Represents the session name for assuming a role.
example: '"MyAppSession"'
default: '"MSKSASLDefaultSession"'
metadata: metadata:
- name: brokers - name: brokers
type: string type: string