Redis: recover subscription if group or stream deleted (#3221)
Signed-off-by: Bernd Verst <github@bernd.dev>
This commit is contained in:
parent
934e86c4f7
commit
c10f59ea49
|
@ -18,6 +18,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -108,15 +109,22 @@ func (r *redisStreams) Publish(ctx context.Context, req *pubsub.PublishRequest)
|
|||
return nil
|
||||
}
|
||||
|
||||
func (r *redisStreams) CreateConsumerGroup(ctx context.Context, stream string) error {
|
||||
err := r.client.XGroupCreateMkStream(ctx, stream, r.clientSettings.ConsumerID, "0")
|
||||
// Ignore BUSYGROUP errors
|
||||
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
|
||||
r.logger.Errorf("redis streams: %s", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *redisStreams) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
|
||||
if r.closed.Load() {
|
||||
return errors.New("component is closed")
|
||||
}
|
||||
|
||||
err := r.client.XGroupCreateMkStream(ctx, req.Topic, r.clientSettings.ConsumerID, "0")
|
||||
// Ignore BUSYGROUP errors
|
||||
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
|
||||
r.logger.Errorf("redis streams: %s", err)
|
||||
if err := r.CreateConsumerGroup(ctx, req.Topic); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -242,6 +250,11 @@ func (r *redisStreams) pollNewMessagesLoop(ctx context.Context, stream string, h
|
|||
streams, err := r.client.XReadGroupResult(ctx, r.clientSettings.ConsumerID, r.clientSettings.ConsumerID, []string{stream, ">"}, int64(r.clientSettings.QueueDepth), time.Duration(r.clientSettings.ReadTimeout))
|
||||
if err != nil {
|
||||
if !errors.Is(err, r.client.GetNilValueError()) && err != context.Canceled {
|
||||
if strings.Contains(err.Error(), "NOGROUP") {
|
||||
r.logger.Warnf("redis streams: consumer group %s does not exist for stream %s. This could mean the server experienced data loss, or the group/stream was deleted.", r.clientSettings.ConsumerID, stream)
|
||||
r.logger.Warnf("redis streams: recreating group %s for stream %s", r.clientSettings.ConsumerID, stream)
|
||||
r.CreateConsumerGroup(ctx, stream)
|
||||
}
|
||||
r.logger.Errorf("redis streams: error reading from stream %s: %s", stream, err)
|
||||
}
|
||||
continue
|
||||
|
|
Loading…
Reference in New Issue