Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
parent
eb82293623
commit
93cf5cb2c9
|
@ -47,11 +47,20 @@ builtinAuthenticationProfiles:
|
|||
description: |
|
||||
Must be set to `true` to enable the component to retrieve access tokens from AWS IAM.
|
||||
This authentication method only works with AWS Relational Database Service for PostgreSQL databases.
|
||||
- name: connectionString
|
||||
required: true
|
||||
sensitive: true
|
||||
description: |
|
||||
The connection string for the PostgreSQL database
|
||||
This must contain the user, which corresponds to the name of the user created inside PostgreSQL that maps to the AWS IAM policy. This connection string should not contain any password. Note that the database name field is denoted by dbname with AWS.
|
||||
example: |
|
||||
"host=mydb.postgres.database.aws.com user=myapplication port=5432 dbname=dapr_test sslmode=require"
|
||||
type: string
|
||||
- name: awsRegion
|
||||
type: string
|
||||
required: true
|
||||
description: |
|
||||
The AWS Region where the MSK Kafka broker is deployed to.
|
||||
The AWS Region where the AWS Relational Database Service is deployed to.
|
||||
example: '"us-east-1"'
|
||||
- name: awsAccessKey
|
||||
type: string
|
||||
|
@ -66,12 +75,6 @@ builtinAuthenticationProfiles:
|
|||
description: |
|
||||
The secret key associated with the access key.
|
||||
example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"'
|
||||
- name: awsSessionToken
|
||||
type: string
|
||||
sensitive: true
|
||||
description: |
|
||||
AWS session token to use. A session token is only required if you are using temporary security credentials.
|
||||
example: '"TOKEN"'
|
||||
authenticationProfiles:
|
||||
- title: "Connection string"
|
||||
description: "Authenticate using a Connection String"
|
||||
|
|
|
@ -66,7 +66,7 @@ func (p *Postgres) Init(ctx context.Context, meta bindings.Metadata) error {
|
|||
return err
|
||||
}
|
||||
|
||||
poolConfig, err := m.GetPgxPoolConfig(ctx)
|
||||
poolConfig, err := m.GetPgxPoolConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -80,14 +80,12 @@ func NewEnvironmentSettings(md map[string]string) (EnvironmentSettings, error) {
|
|||
|
||||
type AWSIAM struct {
|
||||
// Ignored by metadata parser because included in built-in authentication profile
|
||||
// access key to use for accessing postgresql.
|
||||
// Access key to use for accessing PostgreSQL.
|
||||
AWSAccessKey string `json:"awsAccessKey" mapstructure:"awsAccessKey"`
|
||||
// secret key to use for accessing postgresql.
|
||||
// Secret key to use for accessing PostgreSQL.
|
||||
AWSSecretKey string `json:"awsSecretKey" mapstructure:"awsSecretKey"`
|
||||
// aws session token to use.
|
||||
AWSSessionToken string `mapstructure:"awsSessionToken"`
|
||||
// aws region in which postgresql should create resources.
|
||||
AWSRegion string `mapstructure:"awsRegion"`
|
||||
// AWS region in which PostgreSQL is deployed.
|
||||
AWSRegion string `json:"awsRegion" mapstructure:"awsRegion"`
|
||||
}
|
||||
|
||||
type AWSIAMAuthOptions struct {
|
||||
|
@ -138,7 +136,7 @@ func (opts *AWSIAMAuthOptions) GetAccessToken(ctx context.Context) (string, erro
|
|||
return authenticationToken, nil
|
||||
}
|
||||
|
||||
func (opts *AWSIAMAuthOptions) InitiateAWSIAMAuth(ctx context.Context) error {
|
||||
func (opts *AWSIAMAuthOptions) InitiateAWSIAMAuth() error {
|
||||
// Set max connection lifetime to 8 minutes in postgres connection pool configuration.
|
||||
// Note: this will refresh connections before the 15 min expiration on the IAM AWS auth token,
|
||||
// while leveraging the BeforeConnect hook to recreate the token in time dynamically.
|
||||
|
|
|
@ -66,11 +66,13 @@ func (m *PostgresAuthMetadata) InitWithMetadata(meta map[string]string, opts Ini
|
|||
}
|
||||
switch {
|
||||
case opts.AzureADEnabled && m.UseAzureAD:
|
||||
// Populate the Azure environment if using Azure AD
|
||||
m.azureEnv, err = azure.NewEnvironmentSettings(meta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case opts.AWSIAMEnabled && m.UseAWSIAM:
|
||||
// Populate the AWS environment if using AWS IAM
|
||||
m.awsEnv, err = aws.NewEnvironmentSettings(meta)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -101,7 +103,7 @@ func (m *PostgresAuthMetadata) ValidateAwsIamFields() (string, string, string, e
|
|||
}
|
||||
|
||||
// GetPgxPoolConfig returns the pgxpool.Config object that contains the credentials for connecting to PostgreSQL.
|
||||
func (m *PostgresAuthMetadata) GetPgxPoolConfig(ctx context.Context) (*pgxpool.Config, error) {
|
||||
func (m *PostgresAuthMetadata) GetPgxPoolConfig() (*pgxpool.Config, error) {
|
||||
// Get the config from the connection string
|
||||
config, err := pgxpool.ParseConfig(m.ConnectionString)
|
||||
if err != nil {
|
||||
|
@ -129,8 +131,9 @@ func (m *PostgresAuthMetadata) GetPgxPoolConfig(ctx context.Context) (*pgxpool.C
|
|||
}
|
||||
}
|
||||
|
||||
// Check if we should use Azure AD
|
||||
if m.UseAzureAD {
|
||||
switch {
|
||||
case m.UseAzureAD:
|
||||
// Use Azure AD
|
||||
tokenCred, errToken := m.azureEnv.GetTokenCredential()
|
||||
if errToken != nil {
|
||||
return nil, errToken
|
||||
|
@ -155,11 +158,11 @@ func (m *PostgresAuthMetadata) GetPgxPoolConfig(ctx context.Context) (*pgxpool.C
|
|||
cc.Password = at.Token
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if m.UseAWSIAM {
|
||||
case m.UseAWSIAM:
|
||||
// We should use AWS IAM
|
||||
awsRegion, awsAccessKey, awsSecretKey, err := m.ValidateAwsIamFields()
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to validate AWS IAM authentication fields: %v", err)
|
||||
err = fmt.Errorf("failed to validate AWS IAM authentication fields: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -171,11 +174,12 @@ func (m *PostgresAuthMetadata) GetPgxPoolConfig(ctx context.Context) (*pgxpool.C
|
|||
SecretKey: awsSecretKey,
|
||||
}
|
||||
|
||||
err = awsOpts.InitiateAWSIAMAuth(ctx)
|
||||
err = awsOpts.InitiateAWSIAMAuth()
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to initiate AWS IAM authentication rotation dynamically: %v", err)
|
||||
err = fmt.Errorf("failed to initiate AWS IAM authentication rotation: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
|
|
@ -101,7 +101,7 @@ func (p *PostgreSQL) Init(ctx context.Context, meta state.Metadata) error {
|
|||
return fmt.Errorf("failed to parse metadata: %w", err)
|
||||
}
|
||||
|
||||
config, err := p.metadata.GetPgxPoolConfig(ctx)
|
||||
config, err := p.metadata.GetPgxPoolConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -69,7 +69,7 @@ func (p *PostgreSQLQuery) Query(parentCtx context.Context, req *state.QueryReque
|
|||
if err := qbuilder.BuildQuery(&req.Query); err != nil {
|
||||
return &state.QueryResponse{}, err
|
||||
}
|
||||
data, token, err := q.execute(parentCtx, p.logger, p.db)
|
||||
data, token, err := q.execute(parentCtx, p.db)
|
||||
if err != nil {
|
||||
return &state.QueryResponse{}, err
|
||||
}
|
||||
|
@ -259,7 +259,7 @@ func (q *Query) Finalize(filters string, qq *query.Query) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (q *Query) execute(ctx context.Context, logger logger.Logger, db pginterfaces.DBQuerier) ([]state.QueryItem, string, error) {
|
||||
func (q *Query) execute(ctx context.Context, db pginterfaces.DBQuerier) ([]state.QueryItem, string, error) {
|
||||
rows, err := db.Query(ctx, q.query, q.params...)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
|
|
|
@ -37,11 +37,20 @@ builtinAuthenticationProfiles:
|
|||
description: |
|
||||
Must be set to `true` to enable the component to retrieve access tokens from AWS IAM.
|
||||
This authentication method only works with AWS Relational Database Service for PostgreSQL databases.
|
||||
- name: connectionString
|
||||
required: true
|
||||
sensitive: true
|
||||
description: |
|
||||
The connection string for the PostgreSQL database
|
||||
This must contain the user, which corresponds to the name of the user created inside PostgreSQL that maps to the AWS IAM policy. This connection string should not contain any password. Note that the database name field is denoted by dbname with AWS.
|
||||
example: |
|
||||
"host=mydb.postgres.database.aws.com user=myapplication port=5432 dbname=dapr_test sslmode=require"
|
||||
type: string
|
||||
- name: awsRegion
|
||||
type: string
|
||||
required: true
|
||||
description: |
|
||||
The AWS Region where the MSK Kafka broker is deployed to.
|
||||
The AWS Region where the AWS Relational Database Service is deployed to.
|
||||
example: '"us-east-1"'
|
||||
- name: awsAccessKey
|
||||
type: string
|
||||
|
@ -56,12 +65,6 @@ builtinAuthenticationProfiles:
|
|||
description: |
|
||||
The secret key associated with the access key.
|
||||
example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"'
|
||||
- name: awsSessionToken
|
||||
type: string
|
||||
sensitive: true
|
||||
description: |
|
||||
AWS session token to use. A session token is only required if you are using temporary security credentials.
|
||||
example: '"TOKEN"'
|
||||
authenticationProfiles:
|
||||
- title: "Connection string"
|
||||
description: "Authenticate using a Connection String."
|
||||
|
|
|
@ -80,7 +80,7 @@ func (p *ConfigurationStore) Init(ctx context.Context, metadata configuration.Me
|
|||
}
|
||||
|
||||
p.ActiveSubscriptions = make(map[string]*subscription)
|
||||
p.client, err = p.connectDB(ctx, p.metadata.ConnectionString)
|
||||
p.client, err = p.connectDB(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error connecting to configuration store: '%w'", err)
|
||||
}
|
||||
|
@ -285,8 +285,8 @@ func (p *ConfigurationStore) handleSubscribedChange(ctx context.Context, handler
|
|||
}
|
||||
}
|
||||
|
||||
func (p *ConfigurationStore) connectDB(ctx context.Context, connStr string) (*pgxpool.Pool, error) {
|
||||
config, err := p.metadata.GetPgxPoolConfig(ctx)
|
||||
func (p *ConfigurationStore) connectDB(ctx context.Context) (*pgxpool.Pool, error) {
|
||||
config, err := p.metadata.GetPgxPoolConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("PostgreSQL configuration store connection error: %s", err)
|
||||
}
|
||||
|
|
|
@ -72,12 +72,6 @@ builtinAuthenticationProfiles:
|
|||
description: |
|
||||
The secret key associated with the access key.
|
||||
example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"'
|
||||
- name: awsSessionToken
|
||||
type: string
|
||||
sensitive: true
|
||||
description: |
|
||||
AWS session token to use. A session token is only required if you are using temporary security credentials.
|
||||
example: '"TOKEN"'
|
||||
authenticationProfiles:
|
||||
- title: "Connection string"
|
||||
description: "Authenticate using a Connection String"
|
||||
|
|
|
@ -71,12 +71,6 @@ builtinAuthenticationProfiles:
|
|||
description: |
|
||||
The secret key associated with the access key.
|
||||
example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"'
|
||||
- name: awsSessionToken
|
||||
type: string
|
||||
sensitive: true
|
||||
description: |
|
||||
AWS session token to use. A session token is only required if you are using temporary security credentials.
|
||||
example: '"TOKEN"'
|
||||
authenticationProfiles:
|
||||
- title: "Connection string"
|
||||
description: "Authenticate using a Connection String"
|
||||
|
|
|
@ -36,7 +36,6 @@ import (
|
|||
"github.com/dapr/components-contrib/state"
|
||||
stateutils "github.com/dapr/components-contrib/state/utils"
|
||||
"github.com/dapr/kit/logger"
|
||||
"github.com/dapr/kit/utils"
|
||||
)
|
||||
|
||||
// PostgreSQL state store.
|
||||
|
@ -50,8 +49,7 @@ type PostgreSQL struct {
|
|||
gc sqlinternal.GarbageCollector
|
||||
|
||||
enableAzureAD bool
|
||||
|
||||
enableAWSIAM bool
|
||||
enableAWSIAM bool
|
||||
}
|
||||
|
||||
type Options struct {
|
||||
|
@ -71,13 +69,6 @@ func NewPostgreSQLStateStore(logger logger.Logger) state.Store {
|
|||
return NewPostgreSQLStateStoreWithOptions(logger, Options{})
|
||||
}
|
||||
|
||||
// NewPostgreSQLStateStoreWithTrueOptions creates a new instance of PostgreSQL state store v2 with Azure AD authentication and AWS IAM authentication disabled.
|
||||
// The v2 of the component uses a different format for storing data, always in a BYTEA column, which is more efficient than the JSONB column used in v1.
|
||||
// Additionally, v2 uses random UUIDs for etags instead of the xmin column, expanding support to all Postgres-compatible databases such as CockroachDB, etc.
|
||||
func NewPostgreSQLStateStoreWithTrueOptions(logger logger.Logger) state.Store {
|
||||
return NewPostgreSQLStateStoreWithOptions(logger, Options{NoAWSIAM: true, NoAzureAD: true})
|
||||
}
|
||||
|
||||
// NewPostgreSQLStateStoreWithOptions creates a new instance of PostgreSQL state store with options.
|
||||
func NewPostgreSQLStateStoreWithOptions(logger logger.Logger, opts Options) state.Store {
|
||||
s := &PostgreSQL{
|
||||
|
@ -90,21 +81,10 @@ func NewPostgreSQLStateStoreWithOptions(logger logger.Logger, opts Options) stat
|
|||
}
|
||||
|
||||
// Init sets up Postgres connection and performs migrations
|
||||
func (p *PostgreSQL) Init(ctx context.Context, meta state.Metadata) error {
|
||||
var (
|
||||
useAWS bool
|
||||
useAzure bool
|
||||
err error
|
||||
)
|
||||
|
||||
awsIam, _ := metadata.GetMetadataProperty(meta.Properties, "UseAWSIAM")
|
||||
useAWS = utils.IsTruthy(awsIam)
|
||||
azureAd, _ := metadata.GetMetadataProperty(meta.Properties, "UseAzureAD")
|
||||
useAzure = utils.IsTruthy(azureAd)
|
||||
|
||||
func (p *PostgreSQL) Init(ctx context.Context, meta state.Metadata) (err error) {
|
||||
opts := pgauth.InitWithMetadataOpts{
|
||||
AzureADEnabled: useAzure,
|
||||
AWSIAMEnabled: useAWS,
|
||||
AzureADEnabled: p.enableAzureAD,
|
||||
AWSIAMEnabled: p.enableAWSIAM,
|
||||
}
|
||||
|
||||
err = p.metadata.InitWithMetadata(meta, opts)
|
||||
|
@ -112,14 +92,14 @@ func (p *PostgreSQL) Init(ctx context.Context, meta state.Metadata) error {
|
|||
return err
|
||||
}
|
||||
|
||||
config, err := p.metadata.GetPgxPoolConfig(ctx)
|
||||
config, err := p.metadata.GetPgxPoolConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
connCtx, connCancel := context.WithTimeout(ctx, p.metadata.Timeout)
|
||||
p.db, err = pgxpool.NewWithConfig(connCtx, config)
|
||||
defer connCancel()
|
||||
connCancel()
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to connect to the database: %w", err)
|
||||
return err
|
||||
|
|
|
@ -108,7 +108,7 @@ func (r *ConfigUpdater) Init(props map[string]string) error {
|
|||
return fmt.Errorf("missing postgreSQL configuration table name")
|
||||
}
|
||||
|
||||
config, err := md.GetPgxPoolConfig(ctx)
|
||||
config, err := md.GetPgxPoolConfig()
|
||||
if err != nil {
|
||||
return fmt.Errorf("postgres configuration store connection error : %w", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue