172 lines
6.6 KiB
Go
172 lines
6.6 KiB
Go
package snssqs
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/dapr/components-contrib/pubsub"
|
|
"github.com/dapr/kit/metadata"
|
|
|
|
"github.com/aws/aws-sdk-go/aws/endpoints"
|
|
)
|
|
|
|
type snsSqsMetadata struct {
|
|
// Ignored by metadata parser because included in built-in authentication profile
|
|
// access key to use for accessing sqs/sns.
|
|
AccessKey string `json:"accessKey" mapstructure:"accessKey" mdignore:"true"`
|
|
// secret key to use for accessing sqs/sns.
|
|
SecretKey string `json:"secretKey" mapstructure:"secretKey" mdignore:"true"`
|
|
// aws session token to use.
|
|
SessionToken string `mapstructure:"sessionToken" mdignore:"true"`
|
|
|
|
// aws endpoint for the component to use.
|
|
Endpoint string `mapstructure:"endpoint"`
|
|
// aws region in which SNS/SQS should create resources.
|
|
Region string `mapstructure:"region"`
|
|
// aws partition in which SNS/SQS should create resources.
|
|
internalPartition string `mapstructure:"-"`
|
|
// name of the queue for this application. The is provided by the runtime as "consumerID".
|
|
SqsQueueName string `mapstructure:"consumerID" mdignore:"true"`
|
|
// name of the dead letter queue for this application.
|
|
SqsDeadLettersQueueName string `mapstructure:"sqsDeadLettersQueueName"`
|
|
// flag to SNS and SQS FIFO.
|
|
Fifo bool `mapstructure:"fifo"`
|
|
// a namespace for SNS SQS FIFO to order messages within that group. limits consumer concurrency if set but guarantees that all
|
|
// published messages would be ordered by their arrival time to SQS.
|
|
// see: https://aws.amazon.com/blogs/compute/solving-complex-ordering-challenges-with-amazon-sqs-fifo-queues/
|
|
FifoMessageGroupID string `mapstructure:"fifoMessageGroupID"`
|
|
// amount of time in seconds that a message is hidden from receive requests after it is sent to a subscriber. Default: 10.
|
|
MessageVisibilityTimeout int64 `mapstructure:"messageVisibilityTimeout"`
|
|
// number of times to resend a message after processing of that message fails before removing that message from the queue. Default: 10.
|
|
MessageRetryLimit int64 `mapstructure:"messageRetryLimit"`
|
|
// upon reaching the messageRetryLimit, disables the default deletion behaviour of the message from the SQS queue, and resetting the message visibilty on SQS
|
|
// so that other consumers can try consuming that message.
|
|
DisableDeleteOnRetryLimit bool `mapstructure:"disableDeleteOnRetryLimit"`
|
|
// if sqsDeadLettersQueueName is set to a value, then the MessageReceiveLimit defines the number of times a message is received
|
|
// before it is moved to the dead-letters queue. This value must be smaller than messageRetryLimit.
|
|
MessageReceiveLimit int64 `mapstructure:"messageReceiveLimit"`
|
|
// amount of time to await receipt of a message before making another request. Default: 2.
|
|
MessageWaitTimeSeconds int64 `mapstructure:"messageWaitTimeSeconds"`
|
|
// maximum number of messages to receive from the queue at a time. Default: 10, Maximum: 10.
|
|
MessageMaxNumber int64 `mapstructure:"messageMaxNumber"`
|
|
// disable resource provisioning of SNS and SQS.
|
|
DisableEntityManagement bool `mapstructure:"disableEntityManagement"`
|
|
// assets creation timeout.
|
|
AssetsManagementTimeoutSeconds float64 `mapstructure:"assetsManagementTimeoutSeconds"`
|
|
// aws account ID. internally resolved if not given.
|
|
AccountID string `mapstructure:"accountID"`
|
|
// processing concurrency mode
|
|
ConcurrencyMode pubsub.ConcurrencyMode `mapstructure:"concurrencyMode"`
|
|
}
|
|
|
|
func maskLeft(s string) string {
|
|
rs := []rune(s)
|
|
for i := 0; i < len(rs)-4; i++ {
|
|
rs[i] = 'X'
|
|
}
|
|
return string(rs)
|
|
}
|
|
|
|
func (s *snsSqs) getSnsSqsMetatdata(meta pubsub.Metadata) (*snsSqsMetadata, error) {
|
|
md := &snsSqsMetadata{
|
|
AssetsManagementTimeoutSeconds: assetsManagementDefaultTimeoutSeconds,
|
|
MessageVisibilityTimeout: 10,
|
|
MessageRetryLimit: 10,
|
|
MessageWaitTimeSeconds: 2,
|
|
MessageMaxNumber: 10,
|
|
}
|
|
upgradeMetadata(&meta)
|
|
err := metadata.DecodeMetadata(meta.Properties, md)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if md.Region != "" {
|
|
if partition, ok := endpoints.PartitionForRegion(endpoints.DefaultPartitions(), md.Region); ok {
|
|
md.internalPartition = partition.ID()
|
|
} else {
|
|
md.internalPartition = "aws"
|
|
}
|
|
}
|
|
|
|
if md.SqsQueueName == "" {
|
|
return nil, errors.New("consumerID must be set")
|
|
}
|
|
|
|
if md.MessageVisibilityTimeout < 1 {
|
|
return nil, errors.New("messageVisibilityTimeout must be greater than 0")
|
|
}
|
|
|
|
if md.MessageRetryLimit < 2 {
|
|
return nil, errors.New("messageRetryLimit must be greater than 1")
|
|
}
|
|
|
|
// XOR on having either a valid messageReceiveLimit and invalid sqsDeadLettersQueueName, and vice versa.
|
|
if (md.MessageReceiveLimit > 0 || len(md.SqsDeadLettersQueueName) > 0) && !(md.MessageReceiveLimit > 0 && len(md.SqsDeadLettersQueueName) > 0) {
|
|
return nil, errors.New("to use SQS dead letters queue, messageReceiveLimit and sqsDeadLettersQueueName must both be set to a value")
|
|
}
|
|
|
|
if len(md.SqsDeadLettersQueueName) > 0 && md.DisableDeleteOnRetryLimit {
|
|
return nil, errors.New("configuration conflict: 'disableDeleteOnRetryLimit' cannot be set to 'true' when 'sqsDeadLettersQueueName' is set to a value. either remove this configuration or set 'disableDeleteOnRetryLimit' to 'false'")
|
|
}
|
|
|
|
if md.MessageWaitTimeSeconds < 1 {
|
|
return nil, errors.New("messageWaitTimeSeconds must be greater than 0")
|
|
}
|
|
|
|
// fifo settings: assign user provided Message Group ID
|
|
// for more details, see: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html
|
|
if md.FifoMessageGroupID == "" {
|
|
md.FifoMessageGroupID = meta.Properties[pubsub.RuntimeConsumerIDKey]
|
|
}
|
|
|
|
if md.MessageMaxNumber < 1 {
|
|
return nil, errors.New("messageMaxNumber must be greater than 0")
|
|
} else if md.MessageMaxNumber > 10 {
|
|
return nil, errors.New("messageMaxNumber must be less than or equal to 10")
|
|
}
|
|
|
|
if err := md.setConcurrencyMode(meta.Properties); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s.logger.Debug(md.hideDebugPrintedCredentials())
|
|
|
|
return md, nil
|
|
}
|
|
|
|
func (md *snsSqsMetadata) setConcurrencyMode(props map[string]string) error {
|
|
c, err := pubsub.Concurrency(props)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
md.ConcurrencyMode = c
|
|
|
|
return nil
|
|
}
|
|
|
|
func (md *snsSqsMetadata) hideDebugPrintedCredentials() string {
|
|
mdCopy := *md
|
|
mdCopy.AccessKey = maskLeft(md.AccessKey)
|
|
mdCopy.SecretKey = maskLeft(md.SecretKey)
|
|
mdCopy.SessionToken = maskLeft(md.SessionToken)
|
|
|
|
return fmt.Sprintf("%#v\n", mdCopy)
|
|
}
|
|
|
|
func upgradeMetadata(m *pubsub.Metadata) {
|
|
upgradeMap := map[string]string{
|
|
"Endpoint": "endpoint",
|
|
"awsAccountID": "accessKey",
|
|
"awsSecret": "secretKey",
|
|
"awsRegion": "region",
|
|
}
|
|
|
|
for oldKey, newKey := range upgradeMap {
|
|
if val, ok := m.Properties[oldKey]; ok {
|
|
m.Properties[newKey] = val
|
|
delete(m.Properties, oldKey)
|
|
}
|
|
}
|
|
}
|