From d09ffe13e021044ca0cdcc8be6feef17252d811c Mon Sep 17 00:00:00 2001 From: Nikola Nedeljkovic Date: Mon, 24 Jun 2024 19:51:18 +0200 Subject: [PATCH] change topicArns to map safe for concurrent access (#3459) Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> --- pubsub/aws/snssqs/snssqs.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 269fbbcc7..e34d23ca7 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -234,8 +234,10 @@ func (s *snsSqs) getTopicArn(parentCtx context.Context, topic string) (string, e return *getTopicOutput.Attributes["TopicArn"], nil } -// get the topic ARN from the topics map. If it doesn't exist in the map, try to fetch it from AWS, if it doesn't exist +// Get the topic ARN from the topics map. If it doesn't exist in the map, try to fetch it from AWS, if it doesn't exist // at all, issue a request to create the topic. +// NOTE: This method potentially reads and writes to the topicArns map, which may end up being accessed by multiple goroutines concurrently, +// therefore it is necessary for its caller to lock the topic. func (s *snsSqs) getOrCreateTopic(ctx context.Context, topic string) (topicArn string, sanitizedTopic string, err error) { sanitizedTopic = nameToAWSSanitizedName(topic, s.metadata.Fifo) @@ -840,6 +842,9 @@ func (s *snsSqs) Publish(ctx context.Context, req *pubsub.PublishRequest) error return errors.New("component is closed") } + s.topicsLocker.Lock(req.Topic) + defer s.topicsLocker.Unlock(req.Topic) + topicArn, _, err := s.getOrCreateTopic(ctx, req.Topic) if err != nil { s.logger.Errorf("error getting topic ARN for %s: %v", req.Topic, err)