change topicArns to map safe for concurrent access (#3459)
Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
parent
43a89f3c2b
commit
d09ffe13e0
|
@ -234,8 +234,10 @@ func (s *snsSqs) getTopicArn(parentCtx context.Context, topic string) (string, e
|
||||||
return *getTopicOutput.Attributes["TopicArn"], nil
|
return *getTopicOutput.Attributes["TopicArn"], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// get the topic ARN from the topics map. If it doesn't exist in the map, try to fetch it from AWS, if it doesn't exist
|
// Get the topic ARN from the topics map. If it doesn't exist in the map, try to fetch it from AWS, if it doesn't exist
|
||||||
// at all, issue a request to create the topic.
|
// at all, issue a request to create the topic.
|
||||||
|
// NOTE: This method potentially reads and writes to the topicArns map, which may end up being accessed by multiple goroutines concurrently,
|
||||||
|
// therefore it is necessary for its caller to lock the topic.
|
||||||
func (s *snsSqs) getOrCreateTopic(ctx context.Context, topic string) (topicArn string, sanitizedTopic string, err error) {
|
func (s *snsSqs) getOrCreateTopic(ctx context.Context, topic string) (topicArn string, sanitizedTopic string, err error) {
|
||||||
sanitizedTopic = nameToAWSSanitizedName(topic, s.metadata.Fifo)
|
sanitizedTopic = nameToAWSSanitizedName(topic, s.metadata.Fifo)
|
||||||
|
|
||||||
|
@ -840,6 +842,9 @@ func (s *snsSqs) Publish(ctx context.Context, req *pubsub.PublishRequest) error
|
||||||
return errors.New("component is closed")
|
return errors.New("component is closed")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.topicsLocker.Lock(req.Topic)
|
||||||
|
defer s.topicsLocker.Unlock(req.Topic)
|
||||||
|
|
||||||
topicArn, _, err := s.getOrCreateTopic(ctx, req.Topic)
|
topicArn, _, err := s.getOrCreateTopic(ctx, req.Topic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Errorf("error getting topic ARN for %s: %v", req.Topic, err)
|
s.logger.Errorf("error getting topic ARN for %s: %v", req.Topic, err)
|
||||||
|
|
Loading…
Reference in New Issue