* bugfix for sns topic deletion upon termination * removed upstream github workflow files * Update snssqs.go * dapr bot schedule * read and append queue attributes * unnecessary escaping in json tag * unexporting structs * bugfix in policy * bugfix in policy. merged from master * fifo suffix as const
This commit is contained in:
parent
cac03dbe87
commit
e97b1bf9f3
|
@ -0,0 +1,44 @@
|
||||||
|
package snssqs
|
||||||
|
|
||||||
|
type arnEquals struct {
|
||||||
|
AwsSourceArn string `json:"aws:SourceArn"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type condition struct {
|
||||||
|
ArnEquals arnEquals
|
||||||
|
}
|
||||||
|
|
||||||
|
type principal struct {
|
||||||
|
Service string
|
||||||
|
}
|
||||||
|
|
||||||
|
type statement struct {
|
||||||
|
Effect string
|
||||||
|
Principal principal
|
||||||
|
Action string
|
||||||
|
Resource string
|
||||||
|
Condition condition
|
||||||
|
}
|
||||||
|
|
||||||
|
type policy struct {
|
||||||
|
Version string
|
||||||
|
Statement []statement
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *policy) statementExists(other *statement) bool {
|
||||||
|
for _, s := range p.Statement {
|
||||||
|
if s.Effect == other.Effect &&
|
||||||
|
s.Principal.Service == other.Principal.Service &&
|
||||||
|
s.Action == other.Action &&
|
||||||
|
s.Resource == other.Resource &&
|
||||||
|
s.Condition.ArnEquals.AwsSourceArn == other.Condition.ArnEquals.AwsSourceArn {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *policy) addStatement(other *statement) {
|
||||||
|
p.Statement = append(p.Statement, *other)
|
||||||
|
}
|
|
@ -74,30 +74,10 @@ type snsSqsMetadata struct {
|
||||||
messageMaxNumber int64
|
messageMaxNumber int64
|
||||||
}
|
}
|
||||||
|
|
||||||
type arnEquals struct {
|
|
||||||
AwsSourceArn string `json:"aws:SourceArn"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type condition struct {
|
|
||||||
ArnEquals arnEquals
|
|
||||||
}
|
|
||||||
|
|
||||||
type statement struct {
|
|
||||||
Effect string
|
|
||||||
Principal string
|
|
||||||
Action string
|
|
||||||
Resource string
|
|
||||||
Condition condition
|
|
||||||
}
|
|
||||||
|
|
||||||
type policy struct {
|
|
||||||
Version string
|
|
||||||
Statement []statement
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
awsSqsQueueNameKey = "dapr-queue-name"
|
awsSqsQueueNameKey = "dapr-queue-name"
|
||||||
awsSnsTopicNameKey = "dapr-topic-name"
|
awsSnsTopicNameKey = "dapr-topic-name"
|
||||||
|
awsSqsFifoSuffix = ".fifo"
|
||||||
maxAWSNameLength = 80
|
maxAWSNameLength = 80
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -146,13 +126,11 @@ func parseBool(input string, propertyName string) (bool, error) {
|
||||||
// sanitize topic/queue name to conform with:
|
// sanitize topic/queue name to conform with:
|
||||||
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-queues.html
|
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-queues.html
|
||||||
func nameToAWSSanitizedName(name string, isFifo bool) string {
|
func nameToAWSSanitizedName(name string, isFifo bool) string {
|
||||||
suffix := ".fifo"
|
|
||||||
|
|
||||||
// first remove suffix if exists, and user requested a FIFO name, then sanitize the passed in name.
|
// first remove suffix if exists, and user requested a FIFO name, then sanitize the passed in name.
|
||||||
hasFifoSuffix := false
|
hasFifoSuffix := false
|
||||||
if strings.HasSuffix(name, suffix) && isFifo {
|
if strings.HasSuffix(name, awsSqsFifoSuffix) && isFifo {
|
||||||
hasFifoSuffix = true
|
hasFifoSuffix = true
|
||||||
name = name[:len(name)-len(suffix)]
|
name = name[:len(name)-len(awsSqsFifoSuffix)]
|
||||||
}
|
}
|
||||||
|
|
||||||
s := []byte(name)
|
s := []byte(name)
|
||||||
|
@ -174,33 +152,16 @@ func nameToAWSSanitizedName(name string, isFifo bool) string {
|
||||||
|
|
||||||
// reattach/add the suffix to the sanitized name, trim more if adding the suffix would exceed the maxLength.
|
// reattach/add the suffix to the sanitized name, trim more if adding the suffix would exceed the maxLength.
|
||||||
if hasFifoSuffix || isFifo {
|
if hasFifoSuffix || isFifo {
|
||||||
delta := j + len(suffix) - maxAWSNameLength
|
delta := j + len(awsSqsFifoSuffix) - maxAWSNameLength
|
||||||
if delta > 0 {
|
if delta > 0 {
|
||||||
j -= delta
|
j -= delta
|
||||||
}
|
}
|
||||||
return string(s[:j]) + suffix
|
return string(s[:j]) + awsSqsFifoSuffix
|
||||||
}
|
}
|
||||||
|
|
||||||
return string(s[:j])
|
return string(s[:j])
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *policy) statementExists(other *statement) bool {
|
|
||||||
for _, s := range p.Statement {
|
|
||||||
if s.Effect == other.Effect &&
|
|
||||||
s.Principal == other.Principal &&
|
|
||||||
s.Action == other.Action &&
|
|
||||||
s.Resource == other.Resource &&
|
|
||||||
s.Condition.ArnEquals.AwsSourceArn == other.Condition.ArnEquals.AwsSourceArn {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *policy) addStatement(other *statement) {
|
|
||||||
p.Statement = append(p.Statement, *other)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, error) {
|
func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, error) {
|
||||||
md := snsSqsMetadata{}
|
md := snsSqsMetadata{}
|
||||||
props := metadata.Properties
|
props := metadata.Properties
|
||||||
|
@ -648,7 +609,7 @@ func (s *snsSqs) restrictQueuePublishPolicyToOnlySNS(sqsQueueInfo *sqsQueueInfo,
|
||||||
|
|
||||||
newStatement := &statement{
|
newStatement := &statement{
|
||||||
Effect: "Allow",
|
Effect: "Allow",
|
||||||
Principal: `{"Service": "sns.amazonaws.com"}`,
|
Principal: principal{Service: "sns.amazonaws.com"},
|
||||||
Action: "sqs:SendMessage",
|
Action: "sqs:SendMessage",
|
||||||
Resource: sqsQueueInfo.arn,
|
Resource: sqsQueueInfo.arn,
|
||||||
Condition: condition{
|
Condition: condition{
|
||||||
|
@ -658,7 +619,7 @@ func (s *snsSqs) restrictQueuePublishPolicyToOnlySNS(sqsQueueInfo *sqsQueueInfo,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
policy := &policy{Version: "2012-11-05"}
|
policy := &policy{Version: "2012-10-17"}
|
||||||
if policyStr, ok := getQueueAttributesOutput.Attributes[sqs.QueueAttributeNamePolicy]; ok {
|
if policyStr, ok := getQueueAttributesOutput.Attributes[sqs.QueueAttributeNamePolicy]; ok {
|
||||||
// look for the current statement if exists, else add it and store.
|
// look for the current statement if exists, else add it and store.
|
||||||
if err = json.Unmarshal([]byte(*policyStr), policy); err != nil {
|
if err = json.Unmarshal([]byte(*policyStr), policy); err != nil {
|
||||||
|
|
Loading…
Reference in New Issue