From c10f59ea49762f34b9f55b9571fa09d556ea1b3e Mon Sep 17 00:00:00 2001 From: Bernd Verst Date: Thu, 9 Nov 2023 19:23:31 -0800 Subject: [PATCH] Redis: recover subscription if group or stream deleted (#3221) Signed-off-by: Bernd Verst --- pubsub/redis/redis.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/pubsub/redis/redis.go b/pubsub/redis/redis.go index 899ba1360..7cc1297fc 100644 --- a/pubsub/redis/redis.go +++ b/pubsub/redis/redis.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "reflect" + "strings" "sync" "sync/atomic" "time" @@ -108,15 +109,22 @@ func (r *redisStreams) Publish(ctx context.Context, req *pubsub.PublishRequest) return nil } +func (r *redisStreams) CreateConsumerGroup(ctx context.Context, stream string) error { + err := r.client.XGroupCreateMkStream(ctx, stream, r.clientSettings.ConsumerID, "0") + // Ignore BUSYGROUP errors + if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { + r.logger.Errorf("redis streams: %s", err) + return err + } + return nil +} + func (r *redisStreams) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { if r.closed.Load() { return errors.New("component is closed") } - err := r.client.XGroupCreateMkStream(ctx, req.Topic, r.clientSettings.ConsumerID, "0") - // Ignore BUSYGROUP errors - if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { - r.logger.Errorf("redis streams: %s", err) + if err := r.CreateConsumerGroup(ctx, req.Topic); err != nil { return err } @@ -242,6 +250,11 @@ func (r *redisStreams) pollNewMessagesLoop(ctx context.Context, stream string, h streams, err := r.client.XReadGroupResult(ctx, r.clientSettings.ConsumerID, r.clientSettings.ConsumerID, []string{stream, ">"}, int64(r.clientSettings.QueueDepth), time.Duration(r.clientSettings.ReadTimeout)) if err != nil { if !errors.Is(err, r.client.GetNilValueError()) && err != context.Canceled { + if strings.Contains(err.Error(), "NOGROUP") { + r.logger.Warnf("redis streams: consumer group %s does not exist for stream %s. This could mean the server experienced data loss, or the group/stream was deleted.", r.clientSettings.ConsumerID, stream) + r.logger.Warnf("redis streams: recreating group %s for stream %s", r.clientSettings.ConsumerID, stream) + r.CreateConsumerGroup(ctx, stream) + } r.logger.Errorf("redis streams: error reading from stream %s: %s", stream, err) } continue