GCP Pubsub: cache topics (#3241)

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
Signed-off-by: syedsadath-17 <90619459+sadath-12@users.noreply.github.com>
Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
Signed-off-by: Bernd Verst <github@bernd.dev>
Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
Co-authored-by: Bernd Verst <github@bernd.dev>
This commit is contained in:
syedsadath-17 2023-12-07 01:19:49 +05:30 committed by GitHub
parent 79adc565c1
commit 8dfa4b663e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 101 additions and 15 deletions

View File

@ -54,9 +54,15 @@ type GCPPubSub struct {
metadata *metadata
logger logger.Logger
closed atomic.Bool
closeCh chan struct{}
wg sync.WaitGroup
closed atomic.Bool
closeCh chan struct{}
wg sync.WaitGroup
topicCache map[string]cacheEntry
lock *sync.RWMutex
}
type cacheEntry struct {
LastSync time.Time
}
type GCPAuthJSON struct {
@ -76,9 +82,39 @@ type WhatNow struct {
Type string `json:"type"`
}
const topicCacheRefreshInterval = 5 * time.Hour
// NewGCPPubSub returns a new GCPPubSub instance.
func NewGCPPubSub(logger logger.Logger) pubsub.PubSub {
return &GCPPubSub{logger: logger, closeCh: make(chan struct{})}
client := &GCPPubSub{
logger: logger,
closeCh: make(chan struct{}),
topicCache: make(map[string]cacheEntry),
lock: &sync.RWMutex{},
}
return client
}
func (g *GCPPubSub) periodicCacheRefresh() {
// Run this loop 5 times every topicCacheRefreshInterval, to be able to delete items that are stale
ticker := time.NewTicker(topicCacheRefreshInterval / 5)
defer ticker.Stop()
for {
select {
case <-g.closeCh:
return
case <-ticker.C:
g.lock.Lock()
for key, entry := range g.topicCache {
// Delete from the cache if the last sync was longer than topicCacheRefreshInterval
if time.Since(entry.LastSync) > topicCacheRefreshInterval {
delete(g.topicCache, key)
}
}
g.lock.Unlock()
}
}
}
func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) {
@ -110,6 +146,12 @@ func (g *GCPPubSub) Init(ctx context.Context, meta pubsub.Metadata) error {
return err
}
g.wg.Add(1)
go func() {
defer g.wg.Done()
g.periodicCacheRefresh()
}()
pubsubClient, err := g.getPubSubClient(ctx, metadata)
if err != nil {
return fmt.Errorf("%s error creating pubsub client: %w", errorMessagePrefix, err)
@ -174,12 +216,22 @@ func (g *GCPPubSub) Publish(ctx context.Context, req *pubsub.PublishRequest) err
if g.closed.Load() {
return errors.New("component is closed")
}
g.lock.RLock()
_, topicExists := g.topicCache[req.Topic]
g.lock.RUnlock()
if !g.metadata.DisableEntityManagement {
// We are not acquiring a write lock before calling ensureTopic, so there's the chance that ensureTopic be called multiple time
// This is acceptable in our case, even is slightly wasteful, as ensureTopic is idempotent
if !g.metadata.DisableEntityManagement && !topicExists {
err := g.ensureTopic(ctx, req.Topic)
if err != nil {
return fmt.Errorf("%s could not get valid topic %s, %s", errorMessagePrefix, req.Topic, err)
return fmt.Errorf("%s could not get valid topic %s: %w", errorMessagePrefix, req.Topic, err)
}
g.lock.Lock()
g.topicCache[req.Topic] = cacheEntry{
LastSync: time.Now(),
}
g.lock.Unlock()
}
topic := g.getTopic(req.Topic)
@ -210,12 +262,22 @@ func (g *GCPPubSub) Subscribe(parentCtx context.Context, req pubsub.SubscribeReq
if g.closed.Load() {
return errors.New("component is closed")
}
g.lock.RLock()
_, topicExists := g.topicCache[req.Topic]
g.lock.RUnlock()
if !g.metadata.DisableEntityManagement {
// We are not acquiring a write lock before calling ensureTopic, so there's the chance that ensureTopic be called multiple times
// This is acceptable in our case, even is slightly wasteful, as ensureTopic is idempotent
if !g.metadata.DisableEntityManagement && !topicExists {
topicErr := g.ensureTopic(parentCtx, req.Topic)
if topicErr != nil {
return fmt.Errorf("%s could not get valid topic - topic:%q, error: %v", errorMessagePrefix, req.Topic, topicErr)
return fmt.Errorf("%s could not get valid topic - topic:%q, error: %w", errorMessagePrefix, req.Topic, topicErr)
}
g.lock.Lock()
g.topicCache[req.Topic] = cacheEntry{
LastSync: time.Now(),
}
g.lock.Unlock()
subError := g.ensureSubscription(parentCtx, g.metadata.ConsumerID, req.Topic)
if subError != nil {
@ -354,9 +416,24 @@ func (g *GCPPubSub) getTopic(topic string) *gcppubsub.Topic {
}
func (g *GCPPubSub) ensureSubscription(parentCtx context.Context, subscription string, topic string) error {
err := g.ensureTopic(parentCtx, topic)
if err != nil {
return err
g.lock.RLock()
_, topicOK := g.topicCache[topic]
_, dlTopicOK := g.topicCache[g.metadata.DeadLetterTopic]
g.lock.RUnlock()
if !topicOK {
g.lock.Lock()
// Double-check if the topic still doesn't exist to avoid race condition
if _, ok := g.topicCache[topic]; !ok {
err := g.ensureTopic(parentCtx, topic)
if err != nil {
g.lock.Unlock()
return err
}
g.topicCache[topic] = cacheEntry{
LastSync: time.Now(),
}
}
g.lock.Unlock()
}
managedSubscription := subscription + "-" + topic
@ -369,11 +446,20 @@ func (g *GCPPubSub) ensureSubscription(parentCtx context.Context, subscription s
EnableMessageOrdering: g.metadata.EnableMessageOrdering,
}
if g.metadata.DeadLetterTopic != "" {
subErr = g.ensureTopic(parentCtx, g.metadata.DeadLetterTopic)
if subErr != nil {
return subErr
if g.metadata.DeadLetterTopic != "" && !dlTopicOK {
g.lock.Lock()
// Double-check if the DeadLetterTopic still doesn't exist to avoid race condition
if _, ok := g.topicCache[g.metadata.DeadLetterTopic]; !ok {
subErr = g.ensureTopic(parentCtx, g.metadata.DeadLetterTopic)
if subErr != nil {
g.lock.Unlock()
return subErr
}
g.topicCache[g.metadata.DeadLetterTopic] = cacheEntry{
LastSync: time.Now(),
}
}
g.lock.Unlock()
dlTopic := fmt.Sprintf("projects/%s/topics/%s", g.metadata.ProjectID, g.metadata.DeadLetterTopic)
subConfig.DeadLetterPolicy = &gcppubsub.DeadLetterPolicy{
DeadLetterTopic: dlTopic,