components-contrib/pubsub/redis/redis.go

443 lines
14 KiB
Go

/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package redis
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
rediscomponent "github.com/dapr/components-contrib/common/component/redis"
contribMetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
)
const (
consumerID = "consumerID"
enableTLS = "enableTLS"
processingTimeout = "processingTimeout"
redeliverInterval = "redeliverInterval"
queueDepth = "queueDepth"
concurrency = "concurrency"
maxLenApprox = "maxLenApprox"
)
// redisStreams handles consuming from a Redis stream using
// `XREADGROUP` for reading new messages and `XPENDING` and
// `XCLAIM` for redelivering messages that previously failed.
//
// See https://redis.io/topics/streams-intro for more information
// on the mechanics of Redis Streams.
type redisStreams struct {
client rediscomponent.RedisClient
clientSettings *rediscomponent.Settings
logger logger.Logger
wg sync.WaitGroup
closed atomic.Bool
closeCh chan struct{}
queue chan redisMessageWrapper
}
// redisMessageWrapper encapsulates the message identifier,
// pubsub message, and handler to send to the queue channel for processing.
type redisMessageWrapper struct {
ctx context.Context
messageID string
message pubsub.NewMessage
handler pubsub.Handler
}
// NewRedisStreams returns a new redis streams pub-sub implementation.
func NewRedisStreams(logger logger.Logger) pubsub.PubSub {
return &redisStreams{
logger: logger,
closeCh: make(chan struct{}),
}
}
func (r *redisStreams) Init(ctx context.Context, metadata pubsub.Metadata) error {
var err error
r.client, r.clientSettings, err = rediscomponent.ParseClientFromProperties(metadata.Properties, contribMetadata.PubSubType)
if err != nil {
return err
}
if _, err = r.client.PingResult(ctx); err != nil {
return fmt.Errorf("redis streams: error connecting to redis at %s: %s", r.clientSettings.Host, err)
}
r.queue = make(chan redisMessageWrapper, int(r.clientSettings.QueueDepth))
for i := uint(0); i < r.clientSettings.Concurrency; i++ {
r.wg.Add(1)
go func() {
defer r.wg.Done()
r.worker()
}()
}
return nil
}
func (r *redisStreams) Publish(ctx context.Context, req *pubsub.PublishRequest) error {
if r.closed.Load() {
return errors.New("component is closed")
}
redisPayload := map[string]interface{}{"data": req.Data}
if req.Metadata != nil {
serializedMetadata, err := json.Marshal(req.Metadata)
if err != nil {
return err
}
redisPayload["metadata"] = serializedMetadata
}
_, err := r.client.XAdd(ctx, req.Topic, r.clientSettings.MaxLenApprox, redisPayload)
if err != nil {
return fmt.Errorf("redis streams: error from publish: %s", err)
}
return nil
}
func (r *redisStreams) CreateConsumerGroup(ctx context.Context, stream string) error {
err := r.client.XGroupCreateMkStream(ctx, stream, r.clientSettings.ConsumerID, "0")
// Ignore BUSYGROUP errors
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
r.logger.Errorf("redis streams: %s", err)
return err
}
return nil
}
func (r *redisStreams) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
if r.closed.Load() {
return errors.New("component is closed")
}
if err := r.CreateConsumerGroup(ctx, req.Topic); err != nil {
return err
}
loopCtx, cancel := context.WithCancel(ctx)
r.wg.Add(3)
go func() {
// Add a context which catches the close signal to account for situations
// where Close is called, but the context is not cancelled.
defer r.wg.Done()
defer cancel()
select {
case <-loopCtx.Done():
case <-r.closeCh:
}
}()
go func() {
defer r.wg.Done()
r.pollNewMessagesLoop(loopCtx, req.Topic, handler)
}()
go func() {
defer r.wg.Done()
r.reclaimPendingMessagesLoop(loopCtx, req.Topic, handler)
}()
return nil
}
// enqueueMessages is a shared function that funnels new messages (via polling)
// and redelivered messages (via reclaiming) to a channel where workers can
// pick them up for processing.
func (r *redisStreams) enqueueMessages(ctx context.Context, stream string, handler pubsub.Handler, msgs []rediscomponent.RedisXMessage) {
for _, msg := range msgs {
rmsg := r.createRedisMessageWrapper(ctx, stream, handler, msg)
select {
// Might block if the queue is full so we need the ctx.Done below.
case r.queue <- rmsg:
// Noop
// Handle cancelation
case <-ctx.Done():
return
}
}
}
// createRedisMessageWrapper encapsulates the Redis message, message identifier, and handler
// in `redisMessage` for processing.
func (r *redisStreams) createRedisMessageWrapper(ctx context.Context, stream string, handler pubsub.Handler, msg rediscomponent.RedisXMessage) redisMessageWrapper {
var data []byte
if dataValue, exists := msg.Values["data"]; exists && dataValue != nil {
switch v := dataValue.(type) {
case string:
data = []byte(v)
case []byte:
data = v
}
}
var metadata map[string]string
if metadataValue, exists := msg.Values["metadata"]; exists && metadataValue != nil {
metadataStr := metadataValue.(string)
err := json.Unmarshal([]byte(metadataStr), &metadata)
if err != nil {
r.logger.Warnf("Redis PubSub: Could not extract metadata for Redis message %s: %v", msg.ID, err)
}
}
return redisMessageWrapper{
ctx: ctx,
message: pubsub.NewMessage{
Topic: stream,
Data: data,
Metadata: metadata,
},
messageID: msg.ID,
handler: handler,
}
}
// worker runs in separate goroutine(s) and pull messages from a channel for processing.
// The number of workers is controlled by the `concurrency` setting.
func (r *redisStreams) worker() {
for {
select {
// Handle closing
case <-r.closeCh:
return
case msg := <-r.queue:
r.processMessage(msg)
}
}
}
// processMessage attempts to process a single Redis message by invoking
// its handler. If the message processed successfully, then it is Ack'ed.
// Otherwise, it remains in the pending list and will be redelivered
// by `reclaimPendingMessagesLoop`.
func (r *redisStreams) processMessage(msg redisMessageWrapper) error {
r.logger.Debugf("Processing Redis message %s", msg.messageID)
ctx := msg.ctx
var cancel context.CancelFunc
if r.clientSettings.ProcessingTimeout != 0 && r.clientSettings.RedeliverInterval != 0 {
ctx, cancel = context.WithTimeout(ctx, r.clientSettings.ProcessingTimeout)
defer cancel()
}
if err := msg.handler(ctx, &msg.message); err != nil {
r.logger.Errorf("Error processing Redis message %s: %v", msg.messageID, err)
return err
}
// Use the background context in case subscriptionCtx is already closed.
if err := r.client.XAck(context.Background(), msg.message.Topic, r.clientSettings.ConsumerID, msg.messageID); err != nil {
r.logger.Errorf("Error acknowledging Redis message %s: %v", msg.messageID, err)
return err
}
return nil
}
// pollMessagesLoop calls `XReadGroup` for new messages and funnels them to the message channel
// by calling `enqueueMessages`.
func (r *redisStreams) pollNewMessagesLoop(ctx context.Context, stream string, handler pubsub.Handler) {
for {
// Return on cancelation
if ctx.Err() != nil {
return
}
// Read messages
streams, err := r.client.XReadGroupResult(ctx, r.clientSettings.ConsumerID, r.clientSettings.ConsumerID, []string{stream, ">"}, int64(r.clientSettings.QueueDepth), time.Duration(r.clientSettings.ReadTimeout))
if err != nil {
if !errors.Is(err, r.client.GetNilValueError()) && err != context.Canceled {
if strings.Contains(err.Error(), "NOGROUP") {
r.logger.Warnf("redis streams: consumer group %s does not exist for stream %s. This could mean the server experienced data loss, or the group/stream was deleted.", r.clientSettings.ConsumerID, stream)
r.logger.Warnf("redis streams: recreating group %s for stream %s", r.clientSettings.ConsumerID, stream)
r.CreateConsumerGroup(ctx, stream)
}
r.logger.Errorf("redis streams: error reading from stream %s: %s", stream, err)
}
continue
}
// Enqueue messages for the returned streams
for _, s := range streams {
r.enqueueMessages(ctx, s.Stream, handler, s.Messages)
}
}
}
// reclaimPendingMessagesLoop periodically reclaims pending messages
// based on the `redeliverInterval` setting.
func (r *redisStreams) reclaimPendingMessagesLoop(ctx context.Context, stream string, handler pubsub.Handler) {
// Having a `processingTimeout` or `redeliverInterval` means that
// redelivery is disabled so we just return out of the goroutine.
if r.clientSettings.ProcessingTimeout == 0 || r.clientSettings.RedeliverInterval == 0 {
return
}
// Do an initial reclaim call
r.reclaimPendingMessages(ctx, stream, handler)
reclaimTicker := time.NewTicker(r.clientSettings.RedeliverInterval)
for {
select {
case <-ctx.Done():
return
case <-reclaimTicker.C:
r.reclaimPendingMessages(ctx, stream, handler)
}
}
}
// reclaimPendingMessages handles reclaiming messages that previously failed to process and
// funneling them to the message channel by calling `enqueueMessages`.
func (r *redisStreams) reclaimPendingMessages(ctx context.Context, stream string, handler pubsub.Handler) {
for {
// Retrieve pending messages for this stream and consumer
pendingResult, err := r.client.XPendingExtResult(ctx,
stream,
r.clientSettings.ConsumerID,
"-",
"+",
int64(r.clientSettings.QueueDepth),
)
if err != nil && !errors.Is(err, r.client.GetNilValueError()) {
r.logger.Errorf("error retrieving pending Redis messages: %v", err)
break
}
// Filter out messages that have not timed out yet
msgIDs := make([]string, 0, len(pendingResult))
for _, msg := range pendingResult {
if msg.Idle >= r.clientSettings.ProcessingTimeout {
msgIDs = append(msgIDs, msg.ID)
}
}
// Nothing to claim
if len(msgIDs) == 0 {
break
}
// Attempt to claim the messages for the filtered IDs
claimResult, err := r.client.XClaimResult(ctx,
stream,
r.clientSettings.ConsumerID,
r.clientSettings.ConsumerID,
r.clientSettings.ProcessingTimeout,
msgIDs,
)
if err != nil && !errors.Is(err, r.client.GetNilValueError()) {
r.logger.Errorf("error claiming pending Redis messages: %v", err)
break
}
// Enqueue claimed messages
r.enqueueMessages(ctx, stream, handler, claimResult)
// If the Redis nil error is returned, it means somes message in the pending
// state no longer exist. We need to acknowledge these messages to
// remove them from the pending list.
if errors.Is(err, r.client.GetNilValueError()) {
// Build a set of message IDs that were not returned
// that potentially no longer exist.
expectedMsgIDs := make(map[string]struct{}, len(msgIDs))
for _, id := range msgIDs {
expectedMsgIDs[id] = struct{}{}
}
for _, claimed := range claimResult {
delete(expectedMsgIDs, claimed.ID)
}
r.removeMessagesThatNoLongerExistFromPending(ctx, stream, expectedMsgIDs, handler)
}
}
}
// removeMessagesThatNoLongerExistFromPending attempts to claim messages individually so that messages in the pending list
// that no longer exist can be removed from the pending list. This is done by calling `XACK`.
func (r *redisStreams) removeMessagesThatNoLongerExistFromPending(ctx context.Context, stream string, messageIDs map[string]struct{}, handler pubsub.Handler) {
// Check each message ID individually.
for pendingID := range messageIDs {
claimResultSingleMsg, err := r.client.XClaimResult(ctx,
stream,
r.clientSettings.ConsumerID,
r.clientSettings.ConsumerID,
0,
[]string{pendingID},
)
if err != nil && !errors.Is(err, r.client.GetNilValueError()) {
r.logger.Errorf("error claiming pending Redis message %s: %v", pendingID, err)
continue
}
// Ack the message to remove it from the pending list.
if errors.Is(err, r.client.GetNilValueError()) {
// Use the background context in case subscriptionCtx is already closed.
if err = r.client.XAck(context.Background(), stream, r.clientSettings.ConsumerID, pendingID); err != nil {
r.logger.Errorf("error acknowledging Redis message %s after failed claim for %s: %v", pendingID, stream, err)
}
} else {
// This should not happen but if it does the message should be processed.
r.enqueueMessages(ctx, stream, handler, claimResultSingleMsg)
}
}
}
func (r *redisStreams) Close() error {
defer r.wg.Wait()
if r.closed.CompareAndSwap(false, true) {
close(r.closeCh)
}
if r.client == nil {
return nil
}
return r.client.Close()
}
func (r *redisStreams) Features() []pubsub.Feature {
return nil
}
func (r *redisStreams) Ping(ctx context.Context) error {
if _, err := r.client.PingResult(ctx); err != nil {
return fmt.Errorf("redis pubsub: error connecting to redis at %s: %s", r.clientSettings.Host, err)
}
return nil
}
func (r *redisStreams) GetComponentMetadata() (metadataInfo contribMetadata.MetadataMap) {
metadataStruct := rediscomponent.Settings{}
contribMetadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, contribMetadata.PubSubType)
return
}