Improve conformance tests and add pulsar (#702)

* Improve conformance tests and add pulsar

* Fix linter error

* Fix linter errors

* Tweaks to pulsar pubsub per PR

* Switching uuid import

* Using Stringer interface to print message keys as base64 in logger

* Disabling some checks w/ Redis pubsub until the component can be enhanced

* Adding comment to eventually remove the simulateErrors option

* Add comment asBase64String to explain what it does

* Adding redelivery (reclaims) to the Redis pubsub component
Removed the temporary simulateErrors flag now that Redis is passing

* Fixing linter issues

* using a wait group instead of a sleep to wait for processing to complete

* More comments and refactoring

* comment tweaks

* Tweaks per PR

Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
This commit is contained in:
Phil Kedy 2021-02-23 16:31:52 -05:00 committed by GitHub
parent 7caa49f7c0
commit 4a0011bcbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 646 additions and 165 deletions

View File

@ -0,0 +1,13 @@
version: '2'
services:
standalone:
image: apachepulsar/pulsar
ports:
- "6650:6650"
- "8080:8080"
environment:
- BOOKIE_MEM=" -Xms768m -Xms768m -XX:MaxDirectMemorySize=1g"
command: >
/bin/bash -c
"bin/apply-config-from-env.py conf/standalone.conf
&& bin/pulsar standalone --advertised-address standalone"

View File

@ -34,6 +34,7 @@ jobs:
- pubsub.redis
- pubsub.natsstreaming
- pubsub.kafka
- pubsub.pulsar
- secretstores.kubernetes
- secretstores.localenv
- secretstores.localfile
@ -167,13 +168,17 @@ jobs:
if: contains(matrix.component, 'mongodb')
- name: Start kafka
run: docker-compose -f ./.github/infrastructure/docker-compose-kafka.yml up -d
run: docker-compose -f ./.github/infrastructure/docker-compose-kafka.yml -p kafka up -d
if: contains(matrix.component, 'kafka')
- name: Start natsstreaming
run: docker-compose -f ./.github/infrastructure/docker-compose-natsstreaming.yml up -d
run: docker-compose -f ./.github/infrastructure/docker-compose-natsstreaming.yml -p natsstreaming up -d
if: contains(matrix.component, 'natsstreaming')
- name: Start pulsar
run: docker-compose -f ./.github/infrastructure/docker-compose-pulsar.yml -p pulsar up -d
if: contains(matrix.component, 'pulsar')
- name: Start KinD
uses: helm/kind-action@v1.0.0
if: contains(matrix.component, 'kubernetes')

View File

@ -8,6 +8,7 @@ package kafka
import (
"context"
"crypto/tls"
"encoding/base64"
"errors"
"fmt"
"strconv"
@ -60,35 +61,29 @@ type consumer struct {
}
func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
if consumer.callback == nil {
return fmt.Errorf("nil consumer callback")
}
bo := backoff.WithContext(consumer.backOff, session.Context())
for message := range claim.Messages() {
if consumer.callback == nil {
return fmt.Errorf("nil consumer callback")
msg := pubsub.NewMessage{
Topic: message.Topic,
Data: message.Value,
}
var warningLogged bool
err := backoff.RetryNotify(func() error {
err := consumer.callback(&pubsub.NewMessage{
Topic: claim.Topic(),
Data: message.Value,
})
if err := pubsub.RetryNotifyRecover(func() error {
consumer.logger.Debugf("Processing Kafka message: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key))
err := consumer.callback(&msg)
if err == nil {
session.MarkMessage(message, "")
if warningLogged {
consumer.logger.Infof("Kafka message processed successfully after previously failing: %s/%d [key=%s]", message.Topic, message.Partition, message.Key)
warningLogged = false
}
}
return err
}, bo, func(err error, d time.Duration) {
if !warningLogged {
consumer.logger.Warnf("Encountered error processing Kafka message: %s/%d [key=%s]. Retrying...", message.Topic, message.Partition, message.Key)
warningLogged = true
}
})
if err != nil {
consumer.logger.Errorf("Error processing Kafka message: %s/%d/%d [key=%s]. Retrying...", message.Topic, message.Partition, message.Offset, asBase64String(message.Key))
}, func() {
consumer.logger.Infof("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key))
}); err != nil {
return err
}
}
@ -211,6 +206,10 @@ func (k *Kafka) closeSubscripionResources() {
// Subscribe to topic in the Kafka cluster
// This call cannot block like its sibling in bindings/kafka because of where this is invoked in runtime.go
func (k *Kafka) Subscribe(req pubsub.SubscribeRequest, handler func(msg *pubsub.NewMessage) error) error {
if k.consumerGroup == "" {
return errors.New("kafka: consumerID must be set to subscribe")
}
topics := k.addTopic(req.Topic)
// Close resources and reset synchronization primitives
@ -368,3 +367,13 @@ func (k *Kafka) Close() error {
func (k *Kafka) Features() []pubsub.Feature {
return nil
}
// asBase64String implements the `fmt.Stringer` interface in order to print
// `[]byte` as a base 64 encoded string.
// It is used above to log the message key. The call to `EncodeToString`
// only occurs for logs that are written based on the logging level.
type asBase64String []byte
func (s asBase64String) String() string {
return base64.StdEncoding.EncodeToString(s)
}

View File

@ -9,17 +9,20 @@ Package natsstreaming implements NATS Streaming pubsub component
package natsstreaming
import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"time"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/dapr/pkg/logger"
"github.com/cenkalti/backoff/v4"
nats "github.com/nats-io/nats.go"
stan "github.com/nats-io/stan.go"
"github.com/nats-io/stan.go/pb"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/dapr/pkg/logger"
)
// compulsory options
@ -61,6 +64,10 @@ type natsStreamingPubSub struct {
natStreamingConn stan.Conn
logger logger.Logger
ctx context.Context
cancel context.CancelFunc
backOff backoff.BackOff
}
// NewNATSStreamingPubSub returns a new NATS Streaming pub-sub implementation
@ -186,6 +193,14 @@ func (n *natsStreamingPubSub) Init(metadata pubsub.Metadata) error {
}
n.logger.Debugf("connected to natsstreaming at %s", m.natsURL)
ctx, cancel := context.WithCancel(context.Background())
n.ctx = ctx
n.cancel = cancel
// TODO: Make the backoff configurable for constant or exponential
b := backoff.NewConstantBackOff(5 * time.Second)
n.backOff = backoff.WithContext(b, n.ctx)
n.natStreamingConn = natStreamingConn
return nil
@ -207,12 +222,24 @@ func (n *natsStreamingPubSub) Subscribe(req pubsub.SubscribeRequest, handler fun
}
natsMsgHandler := func(natsMsg *stan.Msg) {
herr := handler(&pubsub.NewMessage{Topic: req.Topic, Data: natsMsg.Data})
if herr != nil {
} else {
// we only send a successful ACK if there is no error from Dapr runtime
natsMsg.Ack()
msg := pubsub.NewMessage{
Topic: req.Topic,
Data: natsMsg.Data,
}
pubsub.RetryNotifyRecover(func() error {
n.logger.Debugf("Processing NATS Streaming message %s/%d", natsMsg.Subject, natsMsg.Sequence)
herr := handler(&msg)
if herr == nil {
// we only send a successful ACK if there is no error from Dapr runtime
natsMsg.Ack()
}
return herr
}, n.backOff, func(err error, d time.Duration) {
n.logger.Errorf("Error processing NATS Streaming message: %s/%d. Retrying...", natsMsg.Subject, natsMsg.Sequence)
}, func() {
n.logger.Infof("Successfully processed NATS Streaming message after it previously failed: %s/%d", natsMsg.Subject, natsMsg.Sequence)
})
}
if n.metadata.subscriptionType == subscriptionTypeTopic {
@ -290,6 +317,8 @@ func genRandomString(n int) string {
}
func (n *natsStreamingPubSub) Close() error {
n.cancel()
return n.natStreamingConn.Close()
}

View File

@ -8,6 +8,8 @@ import (
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/cenkalti/backoff/v4"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/dapr/pkg/logger"
)
@ -21,6 +23,10 @@ type Pulsar struct {
logger logger.Logger
client pulsar.Client
metadata pulsarMetadata
ctx context.Context
cancel context.CancelFunc
backOff backoff.BackOff
}
func NewPulsar(l logger.Logger) pubsub.PubSub {
@ -63,9 +69,15 @@ func (p *Pulsar) Init(metadata pubsub.Metadata) error {
}
defer client.Close()
p.ctx, p.cancel = context.WithCancel(context.Background())
p.client = client
p.metadata = *m
// TODO: Make the backoff configurable for constant or exponential
b := backoff.NewConstantBackOff(5 * time.Second)
p.backOff = backoff.WithContext(b, p.ctx)
return nil
}
@ -96,38 +108,64 @@ func (p *Pulsar) Subscribe(req pubsub.SubscribeRequest, handler func(msg *pubsub
Topic: req.Topic,
SubscriptionName: p.metadata.ConsumerID,
Type: pulsar.Failover,
MessageChannel: channel,
}
options.MessageChannel = channel
consumer, err := p.client.Subscribe(options)
if err != nil {
p.logger.Debugf("Could not subscribe %s", req.Topic)
return err
}
go p.ListenMessage(consumer, req.Topic, handler)
go p.listenMessage(consumer, handler)
return nil
}
func (p *Pulsar) ListenMessage(msgs pulsar.Consumer, topic string, handler func(msg *pubsub.NewMessage) error) {
for cm := range msgs.Chan() {
p.HandleMessage(cm, topic, handler)
func (p *Pulsar) listenMessage(consumer pulsar.Consumer, handler func(msg *pubsub.NewMessage) error) {
defer consumer.Close()
for {
select {
case msg := <-consumer.Chan():
if err := p.handleMessage(msg, handler); err != nil && !errors.Is(err, context.Canceled) {
p.logger.Errorf("Error processing message and retries are exhausted: %s/%#v [key=%s]. Closing consumer.", msg.Topic(), msg.ID(), msg.Key())
return
}
case <-p.ctx.Done():
// Handle the component being closed
return
}
}
}
func (p *Pulsar) HandleMessage(m pulsar.ConsumerMessage, topic string, handler func(msg *pubsub.NewMessage) error) {
err := handler(&pubsub.NewMessage{
Data: m.Payload(),
Topic: topic,
})
if err != nil {
p.logger.Debugf("Could not handle topic %s", topic)
} else {
m.Ack(m.Message)
func (p *Pulsar) handleMessage(msg pulsar.ConsumerMessage, handler func(msg *pubsub.NewMessage) error) error {
pubsubMsg := pubsub.NewMessage{
Data: msg.Payload(),
Topic: msg.Topic(),
Metadata: msg.Properties(),
}
return pubsub.RetryNotifyRecover(func() error {
p.logger.Debugf("Processing Pulsar message %s/%#v", msg.Topic(), msg.ID())
err := handler(&pubsubMsg)
if err == nil {
msg.Ack(msg.Message)
}
return err
}, p.backOff, func(err error, d time.Duration) {
p.logger.Errorf("Error processing Pulsar message: %s/%#v [key=%s]. Retrying...", msg.Topic(), msg.ID(), msg.Key())
}, func() {
p.logger.Infof("Successfully processed Pulsar message after it previously failed: %s/%#v [key=%s]", msg.Topic(), msg.ID(), msg.Key())
})
}
func (p *Pulsar) Close() error {
p.cancel()
p.client.Close()
return nil

View File

@ -5,9 +5,25 @@
package redis
import (
"time"
)
type metadata struct {
host string
password string
// The Redis host
host string
// The Redis password
password string
// The consumer identifier
consumerID string
enableTLS bool
// A flag to enables TLS by setting InsecureSkipVerify to true
enableTLS bool
// The interval between checking for pending messages to redelivery (0 disables redelivery)
redeliverInterval time.Duration
// The amount time a message must be pending before attempting to redeliver it (0 disables redelivery)
processingTimeout time.Duration
// The size of the message queue for processing
queueDepth uint
// The number of concurrent workers that are processing messages
concurrency uint
}

View File

@ -6,29 +6,54 @@
package redis
import (
"context"
"crypto/tls"
"errors"
"fmt"
"strconv"
"time"
"github.com/go-redis/redis/v7"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/dapr/pkg/logger"
"github.com/go-redis/redis/v7"
)
const (
host = "redisHost"
password = "redisPassword"
consumerID = "consumerID"
enableTLS = "enableTLS"
host = "redisHost"
password = "redisPassword"
consumerID = "consumerID"
enableTLS = "enableTLS"
processingTimeout = "processingTimeout"
redeliverInterval = "redeliverInterval"
queueDepth = "queueDepth"
concurrency = "concurrency"
)
// redisStreams handles consuming from a Redis stream using
// `XREADGROUP` for reading new messages and `XPENDING` and
// `XCLAIM` for redelivering messages that previously failed.
//
// See https://redis.io/topics/streams-intro for more information
// on the mechanics of Redis Streams.
type redisStreams struct {
metadata metadata
client *redis.Client
logger logger.Logger
queue chan redisMessageWrapper
ctx context.Context
cancel context.CancelFunc
}
// redisMessageWrapper encapsulates the message identifier,
// pubsub message, and handler to send to the queue channel for processing.
type redisMessageWrapper struct {
messageID string
message pubsub.NewMessage
handler func(msg *pubsub.NewMessage) error
}
// NewRedisStreams returns a new redis streams pub-sub implementation
@ -37,7 +62,13 @@ func NewRedisStreams(logger logger.Logger) pubsub.PubSub {
}
func parseRedisMetadata(meta pubsub.Metadata) (metadata, error) {
m := metadata{}
// Default values
m := metadata{
processingTimeout: 60 * time.Second,
redeliverInterval: 15 * time.Second,
queueDepth: 100,
concurrency: 10,
}
if val, ok := meta.Properties[host]; ok && val != "" {
m.host = val
} else {
@ -62,6 +93,42 @@ func parseRedisMetadata(meta pubsub.Metadata) (metadata, error) {
return m, errors.New("redis streams error: missing consumerID")
}
if val, ok := meta.Properties[processingTimeout]; ok && val != "" {
if processingTimeoutMs, err := strconv.ParseUint(val, 10, 64); err == nil {
m.processingTimeout = time.Duration(processingTimeoutMs) * time.Millisecond
} else if d, err := time.ParseDuration(val); err == nil {
m.processingTimeout = d
} else {
return m, fmt.Errorf("redis streams error: can't parse processingTimeout field: %s", err)
}
}
if val, ok := meta.Properties[redeliverInterval]; ok && val != "" {
if redeliverIntervalMs, err := strconv.ParseUint(val, 10, 64); err == nil {
m.redeliverInterval = time.Duration(redeliverIntervalMs) * time.Millisecond
} else if d, err := time.ParseDuration(val); err == nil {
m.redeliverInterval = d
} else {
return m, fmt.Errorf("redis streams error: can't parse redeliverInterval field: %s", err)
}
}
if val, ok := meta.Properties[queueDepth]; ok && val != "" {
queueDepth, err := strconv.ParseUint(val, 10, 64)
if err != nil {
return m, fmt.Errorf("redis streams error: can't parse queueDepth field: %s", err)
}
m.queueDepth = uint(queueDepth)
}
if val, ok := meta.Properties[concurrency]; ok && val != "" {
concurrency, err := strconv.ParseUint(val, 10, 64)
if err != nil {
return m, fmt.Errorf("redis streams error: can't parse concurrency field: %s", err)
}
m.concurrency = uint(concurrency)
}
return m, nil
}
@ -88,14 +155,19 @@ func (r *redisStreams) Init(metadata pubsub.Metadata) error {
}
client := redis.NewClient(options)
_, err = client.Ping().Result()
if err != nil {
if _, err = client.Ping().Result(); err != nil {
return fmt.Errorf("redis streams: error connecting to redis at %s: %s", m.host, err)
}
r.ctx, r.cancel = context.WithCancel(context.Background())
r.queue = make(chan redisMessageWrapper, int(r.metadata.queueDepth))
r.client = client
for i := uint(0); i < r.metadata.concurrency; i++ {
go r.worker()
}
return nil
}
@ -113,68 +185,251 @@ func (r *redisStreams) Publish(req *pubsub.PublishRequest) error {
func (r *redisStreams) Subscribe(req pubsub.SubscribeRequest, handler func(msg *pubsub.NewMessage) error) error {
err := r.client.XGroupCreateMkStream(req.Topic, r.metadata.consumerID, "0").Err()
if err != nil {
r.logger.Warnf("redis streams: %s", err)
// Ignore BUSYGROUP errors
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
r.logger.Errorf("redis streams: %s", err)
return err
}
go r.beginReadingFromStream(req.Topic, r.metadata.consumerID, handler)
go r.pollNewMessagesLoop(req.Topic, handler)
go r.reclaimPendingMessagesLoop(req.Topic, handler)
return nil
}
func (r *redisStreams) readFromStream(stream, consumerID, start string) ([]redis.XStream, error) {
res, err := r.client.XReadGroup(&redis.XReadGroupArgs{
Group: consumerID,
Consumer: consumerID,
Streams: []string{stream, start},
Block: 0,
}).Result()
if err != nil {
return nil, err
}
// enqueueMessages is a shared function that funnels new messages (via polling)
// and redelivered messages (via reclaiming) to a channel where workers can
// pick them up for processing.
func (r *redisStreams) enqueueMessages(stream string, handler func(msg *pubsub.NewMessage) error, msgs []redis.XMessage) {
for _, msg := range msgs {
rmsg := createRedisMessageWrapper(stream, handler, msg)
return res, nil
}
select {
// Might block if the queue is full so we need the r.ctx.Done below.
case r.queue <- rmsg:
func (r *redisStreams) processStreams(consumerID string, streams []redis.XStream, handler func(msg *pubsub.NewMessage) error) {
for _, s := range streams {
for _, m := range s.Messages {
go func(stream string, message redis.XMessage) {
msg := pubsub.NewMessage{
Topic: stream,
}
data, exists := message.Values["data"]
if exists && data != nil {
msg.Data = []byte(data.(string))
}
err := handler(&msg)
if err == nil {
r.client.XAck(stream, consumerID, message.ID).Result()
}
}(s.Stream, m)
// Handle cancelation
case <-r.ctx.Done():
return
}
}
}
func (r *redisStreams) beginReadingFromStream(stream, consumerID string, handler func(msg *pubsub.NewMessage) error) {
// first read pending items in case of recovering from crash
start := "0"
// createRedisMessageWrapper encapsulates the Redis message, message identifier, and handler
// in `redisMessage` for processing.
func createRedisMessageWrapper(stream string, handler func(msg *pubsub.NewMessage) error, msg redis.XMessage) redisMessageWrapper {
var data []byte
if dataValue, exists := msg.Values["data"]; exists && dataValue != nil {
switch v := dataValue.(type) {
case string:
data = []byte(v)
case []byte:
data = v
}
}
return redisMessageWrapper{
message: pubsub.NewMessage{
Topic: stream,
Data: data,
},
messageID: msg.ID,
handler: handler,
}
}
// worker runs in separate goroutine(s) and pull messages from a channel for processing.
// The number of workers is controlled by the `concurrency` setting.
func (r *redisStreams) worker() {
for {
streams, err := r.readFromStream(stream, consumerID, start)
select {
// Handle cancelation
case <-r.ctx.Done():
return
case msg := <-r.queue:
r.processMessage(msg)
}
}
}
// processMessage attempts to process a single Redis message by invoking
// its handler. If the message processed successfully, then it is Ack'ed.
// Otherwise, it remains in the pending list and will be redelivered
// by `reclaimPendingMessagesLoop`.
func (r *redisStreams) processMessage(msg redisMessageWrapper) error {
r.logger.Debugf("Processing Redis message %s", msg.messageID)
if err := msg.handler(&msg.message); err != nil {
r.logger.Errorf("Error processing Redis message %s: %v", msg.messageID, err)
return err
}
if err := r.client.XAck(msg.message.Topic, r.metadata.consumerID, msg.messageID).Err(); err != nil {
r.logger.Errorf("Error acknowledging Redis message %s: %v", msg.messageID, err)
return err
}
return nil
}
// pollMessagesLoop calls `XReadGroup` for new messages and funnels them to the message channel
// by calling `enqueueMessages`.
func (r *redisStreams) pollNewMessagesLoop(stream string, handler func(msg *pubsub.NewMessage) error) {
for {
// Read messages
streams, err := r.client.XReadGroup(&redis.XReadGroupArgs{
Group: r.metadata.consumerID,
Consumer: r.metadata.consumerID,
Streams: []string{stream, ">"},
Count: int64(r.metadata.queueDepth),
Block: 0,
}).Result()
if err != nil {
r.logger.Errorf("redis streams: error reading from stream %s: %s", stream, err)
continue
}
// Enqueue messages for the returned streams
for _, s := range streams {
r.enqueueMessages(s.Stream, handler, s.Messages)
}
// Return on cancelation
if r.ctx.Err() != nil {
return
}
r.processStreams(consumerID, streams, handler)
}
}
// continue with new non received items
start = ">"
// reclaimPendingMessagesLoop periodically reclaims pending messages
// based on the `redeliverInterval` setting.
func (r *redisStreams) reclaimPendingMessagesLoop(stream string, handler func(msg *pubsub.NewMessage) error) {
// Having a `processingTimeout` or `redeliverInterval` means that
// redelivery is disabled so we just return out of the goroutine.
if r.metadata.processingTimeout == 0 || r.metadata.redeliverInterval == 0 {
return
}
// Do an initial reclaim call
r.reclaimPendingMessages(stream, handler)
reclaimTicker := time.NewTicker(r.metadata.redeliverInterval)
for {
select {
case <-r.ctx.Done():
return
case <-reclaimTicker.C:
r.reclaimPendingMessages(stream, handler)
}
}
}
// reclaimPendingMessages handles reclaiming messages that previously failed to process and
// funneling them to the message channel by calling `enqueueMessages`.
func (r *redisStreams) reclaimPendingMessages(stream string, handler func(msg *pubsub.NewMessage) error) {
for {
// Retrieve pending messages for this stream and consumer
pendingResult, err := r.client.XPendingExt(&redis.XPendingExtArgs{
Stream: stream,
Group: r.metadata.consumerID,
Start: "-",
End: "+",
Count: int64(r.metadata.queueDepth),
}).Result()
if err != nil && !errors.Is(err, redis.Nil) {
r.logger.Errorf("error retrieving pending Redis messages: %v", err)
break
}
// Filter out messages that have not timed out yet
msgIDs := make([]string, 0, len(pendingResult))
for _, msg := range pendingResult {
if msg.Idle >= r.metadata.processingTimeout {
msgIDs = append(msgIDs, msg.ID)
}
}
// Nothing to claim
if len(msgIDs) == 0 {
break
}
// Attempt to claim the messages for the filtered IDs
claimResult, err := r.client.XClaim(&redis.XClaimArgs{
Stream: stream,
Group: r.metadata.consumerID,
Consumer: r.metadata.consumerID,
MinIdle: r.metadata.processingTimeout,
Messages: msgIDs,
}).Result()
if err != nil && !errors.Is(err, redis.Nil) {
r.logger.Errorf("error claiming pending Redis messages: %v", err)
break
}
// Enqueue claimed messages
r.enqueueMessages(stream, handler, claimResult)
// If the Redis nil error is returned, it means somes message in the pending
// state no longer exist. We need to acknowledge these messages to
// remove them from the pending list.
if errors.Is(err, redis.Nil) {
// Build a set of message IDs that were not returned
// that potentially no longer exist.
expectedMsgIDs := make(map[string]struct{}, len(msgIDs))
for _, id := range msgIDs {
expectedMsgIDs[id] = struct{}{}
}
for _, claimed := range claimResult {
delete(expectedMsgIDs, claimed.ID)
}
r.removeMessagesThatNoLongerExistFromPending(stream, expectedMsgIDs, handler)
}
}
}
// removeMessagesThatNoLongerExistFromPending attempts to claim messages individually so that messages in the pending list
// that no longer exist can be removed from the pending list. This is done by calling `XACK`.
func (r *redisStreams) removeMessagesThatNoLongerExistFromPending(stream string, messageIDs map[string]struct{}, handler func(msg *pubsub.NewMessage) error) {
// Check each message ID individually.
for pendingID := range messageIDs {
claimResultSingleMsg, err := r.client.XClaim(&redis.XClaimArgs{
Stream: stream,
Group: r.metadata.consumerID,
Consumer: r.metadata.consumerID,
MinIdle: r.metadata.processingTimeout,
Messages: []string{pendingID},
}).Result()
if err != nil && !errors.Is(err, redis.Nil) {
r.logger.Errorf("error claiming pending Redis message %s: %v", pendingID, err)
continue
}
// Ack the message to remove it from the pending list.
if errors.Is(err, redis.Nil) {
if err = r.client.XAck(stream, r.metadata.consumerID, pendingID).Err(); err != nil {
r.logger.Errorf("error acknowledging Redis message %s after failed claim for %s: %v", pendingID, stream, err)
}
} else {
// This should not happen but if it does the message should be processed.
r.enqueueMessages(stream, handler, claimResultSingleMsg)
}
}
}
func (r *redisStreams) Close() error {
r.cancel()
return r.client.Close()
}

View File

@ -6,15 +6,17 @@
package redis
import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"
"github.com/go-redis/redis/v7"
"github.com/stretchr/testify/assert"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/dapr/pkg/logger"
"github.com/go-redis/redis/v7"
"github.com/stretchr/testify/assert"
)
func getFakeProperties() map[string]string {
@ -87,11 +89,15 @@ func TestProcessStreams(t *testing.T) {
messageCount := 0
expectedData := "testData"
var wg sync.WaitGroup
wg.Add(3)
fakeHandler := func(msg *pubsub.NewMessage) error {
defer wg.Done()
messageCount++
if topicCount == 0 && messageCount >= 3 {
if topicCount == 0 {
topicCount = 1
messageCount = 0
}
// assert
@ -103,17 +109,20 @@ func TestProcessStreams(t *testing.T) {
// act
testRedisStream := &redisStreams{logger: logger.NewLogger("test")}
testRedisStream.processStreams(fakeConsumerID, generateRedisStreamTestData(2, 3, expectedData), fakeHandler)
testRedisStream.ctx, testRedisStream.cancel = context.WithCancel(context.Background())
testRedisStream.queue = make(chan redisMessageWrapper, 10)
go testRedisStream.worker()
testRedisStream.enqueueMessages(fakeConsumerID, fakeHandler, generateRedisStreamTestData(2, 3, expectedData))
// sleep for 10ms to give time to finish processing
time.Sleep(time.Millisecond * 10)
// Wait for the handler to finish processing
wg.Wait()
// assert
assert.Equal(t, 1, topicCount)
assert.Equal(t, 3, messageCount)
}
func generateRedisStreamTestData(topicCount, messageCount int, data string) []redis.XStream {
func generateRedisStreamTestData(topicCount, messageCount int, data string) []redis.XMessage {
generateXMessage := func(id int) redis.XMessage {
return redis.XMessage{
ID: fmt.Sprintf("%d", id),
@ -128,13 +137,5 @@ func generateRedisStreamTestData(topicCount, messageCount int, data string) []re
xmessageArray[i] = generateXMessage(i)
}
redisStreams := make([]redis.XStream, topicCount)
for i := range redisStreams {
redisStreams[i] = redis.XStream{
Stream: fmt.Sprintf("Topic%d", i),
Messages: xmessageArray,
}
}
return redisStreams
return xmessageArray
}

31
pubsub/retry.go Normal file
View File

@ -0,0 +1,31 @@
package pubsub
import (
"time"
"github.com/cenkalti/backoff/v4"
)
// RetryNotifyRecover is a wrapper around backoff.RetryNotify that adds another callback for when an operation
// previously failed but has since recovered. The main purpose of this wrapper is to call `notify` only when
// the operations fails the first time and `recovered` when it finally succeeds. This can be helpful in limiting
// log messages to only the events that operators need to be alerted on.
func RetryNotifyRecover(operation backoff.Operation, b backoff.BackOff, notify backoff.Notify, recovered func()) error {
var notified bool
return backoff.RetryNotify(func() error {
err := operation()
if err == nil && notified {
notified = false
recovered()
}
return err
}, b, func(err error, d time.Duration) {
if !notified {
notify(err, d)
notified = true
}
})
}

View File

@ -15,4 +15,6 @@ spec:
- name: consumerID
value: myConsumerID
- name: ackWaitTime
value: 3s
value: 10s
- name: maxInFlight
value: 1

View File

@ -0,0 +1,12 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.pulsar
version: v1
metadata:
- name: host
value: "localhost:6650"
- name: consumerID
value: myConsumerID

View File

@ -4,10 +4,17 @@ metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: consumerID
value: "testConsumer"
value: "testConsumer"
- name: processingTimeout
value: 5s
- name: redeliverInterval
value: 1s
- name: concurrency
value: 1

View File

@ -5,7 +5,8 @@
## pubsubName : name of the pubsub
## testTopicName: name of the test topic to use
## maxReadDuration: duration to wait for read to complete
## messageCount: no. of messages to publish
## messageCount: no. of messages to publish
## checkInOrderProcessing: false disables in-order message processing checking
componentType: pubsub
components:
- component: azure.servicebus
@ -15,8 +16,11 @@ components:
testTopicName: dapr-conf-test
- component: redis
allOperations: true
config:
checkInOrderProcessing: false
- component: natsstreaming
allOperations: true
- component: kafka
allOperations: true
- component: pulsar
allOperations: true

View File

@ -27,6 +27,7 @@ import (
p_servicebus "github.com/dapr/components-contrib/pubsub/azure/servicebus"
p_kafka "github.com/dapr/components-contrib/pubsub/kafka"
p_natsstreaming "github.com/dapr/components-contrib/pubsub/natsstreaming"
p_pulsar "github.com/dapr/components-contrib/pubsub/pulsar"
p_redis "github.com/dapr/components-contrib/pubsub/redis"
"github.com/dapr/components-contrib/secretstores"
ss_azure "github.com/dapr/components-contrib/secretstores/azure/keyvault"
@ -269,6 +270,8 @@ func loadPubSub(tc TestComponent) pubsub.PubSub {
pubsub = p_natsstreaming.NewNATSStreamingPubSub(testLogger)
case kafka:
pubsub = p_kafka.NewKafka(testLogger)
case "pulsar":
pubsub = p_pulsar.NewPulsar(testLogger)
default:
return nil
}

View File

@ -6,35 +6,40 @@
package pubsub
import (
"fmt"
"strconv"
"strings"
"testing"
"time"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/components-contrib/tests/conformance/utils"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/sets"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/components-contrib/tests/conformance/utils"
)
const (
defaultPubsubName = "pubusub"
defaultTopicName = "testTopic"
defaultMessageCount = 10
defaultMaxReadDuration = 20 * time.Second
defaultWaitDurationToPublish = 5 * time.Second
defaultPubsubName = "pubusub"
defaultTopicName = "testTopic"
defaultMessageCount = 10
defaultMaxReadDuration = 60 * time.Second
defaultWaitDurationToPublish = 5 * time.Second
defaultCheckInOrderProcessing = true
)
type TestConfig struct {
utils.CommonConfig
pubsubName string
testTopicName string
publishMetadata map[string]string
subscribeMetadata map[string]string
messageCount int
maxReadDuration time.Duration
waitDurationToPublish time.Duration
pubsubName string
testTopicName string
publishMetadata map[string]string
subscribeMetadata map[string]string
messageCount int
maxReadDuration time.Duration
waitDurationToPublish time.Duration
checkInOrderProcessing bool
}
func NewTestConfig(componentName string, allOperations bool, operations []string, config map[string]string) TestConfig {
@ -44,44 +49,44 @@ func NewTestConfig(componentName string, allOperations bool, operations []string
ComponentName: componentName,
AllOperations: allOperations,
Operations: sets.NewString(operations...)},
pubsubName: defaultPubsubName,
testTopicName: defaultTopicName,
messageCount: defaultMessageCount,
maxReadDuration: defaultMaxReadDuration,
waitDurationToPublish: defaultWaitDurationToPublish,
publishMetadata: map[string]string{},
subscribeMetadata: map[string]string{},
pubsubName: defaultPubsubName,
testTopicName: defaultTopicName,
messageCount: defaultMessageCount,
maxReadDuration: defaultMaxReadDuration,
waitDurationToPublish: defaultWaitDurationToPublish,
publishMetadata: map[string]string{},
subscribeMetadata: map[string]string{},
checkInOrderProcessing: defaultCheckInOrderProcessing,
}
for k, v := range config {
if k == "pubsubName" {
switch k {
case "pubsubName":
tc.pubsubName = v
}
if k == "testTopicName" {
case "testTopicName":
tc.testTopicName = v
}
if k == "messageCount" {
val, err := strconv.Atoi(v)
if err == nil {
case "messageCount":
if val, err := strconv.Atoi(v); err == nil {
tc.messageCount = val
}
}
if k == "maxReadDuration" {
val, err := strconv.Atoi(v)
if err == nil {
case "maxReadDuration":
if val, err := strconv.Atoi(v); err == nil {
tc.maxReadDuration = time.Duration(val) * time.Millisecond
}
}
if k == "waitDurationToPublish" {
val, err := strconv.Atoi(v)
if err == nil {
case "waitDurationToPublish":
if val, err := strconv.Atoi(v); err == nil {
tc.waitDurationToPublish = time.Duration(val) * time.Millisecond
}
}
if strings.HasPrefix(k, "publish_") {
tc.publishMetadata[strings.Replace(k, "publish_", "", 1)] = v
}
if strings.HasPrefix(k, "subscribe_") {
tc.subscribeMetadata[strings.Replace(k, "subscribe_", "", 1)] = v
case "checkInOrderProcessing":
if val, err := strconv.ParseBool(v); err == nil {
tc.checkInOrderProcessing = val
}
default:
if strings.HasPrefix(k, "publish_") {
tc.publishMetadata[strings.TrimPrefix(k, "publish_")] = v
} else if strings.HasPrefix(k, "subscribe_") {
tc.subscribeMetadata[strings.TrimPrefix(k, "subscribe_")] = v
}
}
}
@ -102,21 +107,56 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
assert.NoError(t, err, "expected no error on setting up pubsub")
})
// Generate a unique ID for this run to isolate messages to this test
// and prevent messages still stored in a locally running broker
// from being considered as part of this test.
runID := uuid.Must(uuid.NewRandom()).String()
awaitingMessages := make(map[string]struct{}, 20)
processedC := make(chan string, config.messageCount*2)
errorCount := 0
dataPrefix := "message-" + runID + "-"
var outOfOrder bool
// Subscribe
if config.HasOperation("subscribe") {
t.Run("subscribe", func(t *testing.T) {
var counter int
var lastSequence int
err := ps.Subscribe(pubsub.SubscribeRequest{
Topic: config.testTopicName,
Metadata: config.subscribeMetadata,
}, func(_ *pubsub.NewMessage) error {
}, func(msg *pubsub.NewMessage) error {
dataString := string(msg.Data)
if !strings.HasPrefix(dataString, dataPrefix) {
t.Logf("Ignoring message without expected prefix")
return nil
}
counter++
sequence, err := strconv.Atoi(dataString[len(dataPrefix):])
if err != nil {
t.Logf("Message did not contain a sequence number")
assert.Fail(t, "message did not contain a sequence number")
return err
}
if sequence < lastSequence {
outOfOrder = true
t.Logf("Message received out of order: expected sequence >= %d, got %d", lastSequence, sequence)
}
lastSequence = sequence
// This behavior is standard to repro a failure of one message in a batch.
if errorCount < 2 {
if errorCount < 2 || counter%5 == 0 {
// First message errors just to give time for more messages to pile up.
// Second error is to force an error in a batch.
errorCount++
// Sleep to allow messages to pile up and be delivered as a batch.
time.Sleep(2 * time.Second)
time.Sleep(1 * time.Second)
t.Logf("Simulating subscriber error")
return errors.Errorf("conf test simulated error")
@ -125,6 +165,8 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
t.Logf("Simulating subscriber success")
actualReadCount++
processedC <- dataString
return nil
})
assert.NoError(t, err, "expected no error on subscribe")
@ -137,14 +179,17 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
// So, wait for some time here.
time.Sleep(config.waitDurationToPublish)
t.Run("publish", func(t *testing.T) {
for k := 0; k < config.messageCount; k++ {
data := []byte("message-" + strconv.Itoa(k))
for k := 1; k <= config.messageCount; k++ {
data := []byte(fmt.Sprintf("%s%d", dataPrefix, k))
err := ps.Publish(&pubsub.PublishRequest{
Data: data,
PubsubName: config.pubsubName,
Topic: config.testTopicName,
Metadata: config.publishMetadata,
})
if err == nil {
awaitingMessages[string(data)] = struct{}{}
}
assert.NoError(t, err, "expected no error on publishing data %s", data)
}
})
@ -154,8 +199,19 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
if config.HasOperation("subscribe") {
t.Run("verify read", func(t *testing.T) {
t.Logf("waiting for %v to complete read", config.maxReadDuration)
time.Sleep(config.maxReadDuration)
assert.LessOrEqual(t, config.messageCount, actualReadCount, "expected to read %v messages", config.messageCount)
waiting := true
for waiting {
select {
case processed := <-processedC:
delete(awaitingMessages, processed)
waiting = len(awaitingMessages) > 0
case <-time.After(config.maxReadDuration):
// Break out after the mamimum read duration has elapsed
waiting = false
}
}
assert.False(t, config.checkInOrderProcessing && outOfOrder, "received messages out of order")
assert.Empty(t, awaitingMessages, "expected to read %v messages", config.messageCount)
})
}
}