Add priority queue support to rabbitmq pub/sub (#2680)

Signed-off-by: yaron2 <schneider.yaron@live.com>
This commit is contained in:
Yaron Schneider 2023-03-17 06:00:49 -07:00 committed by GitHub
parent a8415d82b8
commit 0e83c4adc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 229 additions and 4 deletions

View File

@ -78,6 +78,7 @@ const (
metadataExchangeKindKey = "exchangeKind"
metadataPublisherConfirmKey = "publisherConfirm"
metadataSaslExternal = "saslExternal"
metadataMaxPriority = "maxPriority"
defaultReconnectWaitSeconds = 3

View File

@ -18,6 +18,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"math"
"strconv"
"strings"
"sync"
@ -47,6 +48,7 @@ const (
argMaxLength = "x-max-length"
argMaxLengthBytes = "x-max-length-bytes"
argDeadLetterExchange = "x-dead-letter-exchange"
argMaxPriority = "x-max-priority"
queueModeLazy = "lazy"
reqMetadataRoutingKey = "routingKey"
)
@ -216,7 +218,7 @@ func (r *rabbitMQ) publishSync(ctx context.Context, req *pubsub.PublishRequest)
ttl, ok, err := contribMetadata.TryGetTTL(req.Metadata)
if err != nil {
r.logger.Warnf("%s publishing to %s failed parse TryGetTTL: %v, it is ignored.", logMessagePrefix, req.Topic, err)
r.logger.Warnf("%s publishing to %s failed to parse TryGetTTL: %v, it is ignored.", logMessagePrefix, req.Topic, err)
}
var expiration string
if ok {
@ -226,12 +228,23 @@ func (r *rabbitMQ) publishSync(ctx context.Context, req *pubsub.PublishRequest)
expiration = strconv.FormatInt(r.metadata.defaultQueueTTL.Milliseconds(), 10)
}
confirm, err := r.channel.PublishWithDeferredConfirmWithContext(ctx, req.Topic, routingKey, false, false, amqp.Publishing{
p := amqp.Publishing{
ContentType: "text/plain",
Body: req.Data,
DeliveryMode: r.metadata.deliveryMode,
Expiration: expiration,
})
}
priority, ok, err := contribMetadata.TryGetPriority(req.Metadata)
if err != nil {
r.logger.Warnf("%s publishing to %s failed to parse priority: %v, it is ignored.", logMessagePrefix, req.Topic, err)
}
if ok {
p.Priority = priority
}
confirm, err := r.channel.PublishWithDeferredConfirmWithContext(ctx, req.Topic, routingKey, false, false, p)
if err != nil {
r.logger.Errorf("%s publishing to %s failed in channel.Publish: %v", logMessagePrefix, req.Topic, err)
@ -370,6 +383,23 @@ func (r *rabbitMQ) prepareSubscription(channel rabbitMQChannelBroker, req pubsub
args = amqp.Table{argDeadLetterExchange: dlxName}
}
args = r.metadata.formatQueueDeclareArgs(args)
// use priority queue if configured on subscription
if val, ok := req.Metadata[metadataMaxPriority]; ok && val != "" {
parsedVal, pErr := strconv.ParseUint(val, 10, 0)
if pErr != nil {
r.logger.Errorf("%s prepareSubscription error: can't parse maxPriority %s value on subscription metadata for topic/queue `%s/%s`: %s", logMessagePrefix, val, req.Topic, queueName, pErr)
return nil, pErr
}
mp := uint8(parsedVal)
if parsedVal > 255 {
mp = math.MaxUint8
}
args[argMaxPriority] = mp
}
q, err := channel.QueueDeclare(queueName, r.metadata.durable, r.metadata.deleteWhenUnused, false, false, args)
if err != nil {
r.logger.Errorf("%s prepareSubscription for topic/queue '%s/%s' failed in channel.QueueDeclare: %v", logMessagePrefix, req.Topic, queueName, err)

View File

@ -62,6 +62,49 @@ func TestNoConsumer(t *testing.T) {
assert.Contains(t, err.Error(), "consumerID is required for subscriptions")
}
func TestPublishAndSubscribeWithPriorityQueue(t *testing.T) {
broker := newBroker()
pubsubRabbitMQ := newRabbitMQTest(broker)
metadata := pubsub.Metadata{Base: mdata.Base{
Properties: map[string]string{
metadataHostnameKey: "anyhost",
metadataConsumerIDKey: "consumer",
},
}}
err := pubsubRabbitMQ.Init(context.Background(), metadata)
assert.Nil(t, err)
assert.Equal(t, int32(1), broker.connectCount.Load())
assert.Equal(t, int32(0), broker.closeCount.Load())
topic := "mytopic"
messageCount := 0
lastMessage := ""
processed := make(chan bool)
handler := func(ctx context.Context, msg *pubsub.NewMessage) error {
messageCount++
lastMessage = string(msg.Data)
processed <- true
return nil
}
err = pubsubRabbitMQ.Subscribe(context.Background(), pubsub.SubscribeRequest{Topic: topic, Metadata: map[string]string{metadataMaxPriority: "5"}}, handler)
assert.Nil(t, err)
err = pubsubRabbitMQ.Publish(context.Background(), &pubsub.PublishRequest{Topic: topic, Data: []byte("hello world"), Metadata: map[string]string{metadataMaxPriority: "5"}})
assert.Nil(t, err)
<-processed
assert.Equal(t, 1, messageCount)
assert.Equal(t, "hello world", lastMessage)
err = pubsubRabbitMQ.Publish(context.Background(), &pubsub.PublishRequest{Topic: topic, Data: []byte("foo bar")})
assert.Nil(t, err)
<-processed
assert.Equal(t, 2, messageCount)
assert.Equal(t, "foo bar", lastMessage)
}
func TestConcurrencyMode(t *testing.T) {
t.Run("parallel", func(t *testing.T) {
broker := newBroker()

View File

@ -36,4 +36,6 @@ This project aims to test the RabbitMQ Pub/Sub component under various condition
* Send a message, wait TTL seconds, and verify the message is deleted/expired.
* Setting a TTL at the component level and message level ignores the default component level TTL and always uses the message level TTL specified
* Test mTLS External Authentication
* Being able to connect to the RabbitMQ
* Being able to connect to the RabbitMQ
* Test priority queues
* Being able to publish and subscribe to priority messages/topics

View File

@ -0,0 +1,18 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: mq-priority
spec:
type: pubsub.rabbitmq
version: v1
metadata:
- name: consumerID
value: priority
- name: host
value: "amqp://test:test@localhost:5672"
- name: durable
value: true
- name: deletedWhenUnused
value: false
- name: requeueInFailure
value: true

View File

@ -52,10 +52,12 @@ const (
sidecarName1 = "dapr-1"
sidecarName2 = "dapr-2"
sidecarName3 = "dapr-3"
sidecarName4 = "dapr-4"
sidecarNameTTLClient = "dapr-ttl-client"
appID1 = "app-1"
appID2 = "app-2"
appID3 = "app-3"
appID4 = "app-4"
clusterName = "rabbitmqcertification"
dockerComposeYAML = "docker-compose.yml"
extSaslDockerComposeYAML = "mtls_sasl_external/docker-compose.yml"
@ -72,6 +74,7 @@ const (
pubsubMessageOnlyTTL = "msg-ttl-pubsub"
pubsubQueueOnlyTTL = "overwrite-ttl-pubsub"
pubsubOverwriteTTL = "queue-ttl-pubsub"
pubsubPriority = "mq-priority"
topicRed = "red"
topicBlue = "blue"
@ -707,6 +710,134 @@ func TestRabbitMQExtAuth(t *testing.T) {
Run()
}
func TestRabbitMQPriority(t *testing.T) {
rand.Seed(time.Now().UTC().UnixNano())
log := logger.NewLogger("dapr.components")
// log.SetOutputLevel(logger.DebugLevel)
pubTopics := []string{topicRed}
subTopics := []string{topicRed}
priorityClient := &Consumer{pubsub: pubsubPriority, messages: make(map[string]*watcher.Watcher)}
consumers := []*Consumer{priorityClient}
for _, consumer := range consumers {
for _, topic := range pubTopics {
// In RabbitMQ, messages might not come in order.
consumer.messages[topic] = watcher.NewUnordered()
}
}
// subscribed is used to synchronize between publisher and subscriber
subscribed := make(chan struct{}, 1)
// Test logic that sends messages to topics and
// verifies the two consumers with different IDs have received them.
test := func(ctx flow.Context) error {
// Declare what is expected BEFORE performing any steps
// that will satisfy the test.
msgs := make([]string, numMessages)
for i := range msgs {
msgs[i] = fmt.Sprintf("Hello, Messages %03d", i)
}
for _, consumer := range consumers {
for _, topic := range subTopics {
consumer.messages[topic].ExpectStrings(msgs...)
}
}
<-subscribed
// sidecar client array []{sidecar client, pubsub component name}
sidecars := []struct {
client *sidecar.Client
pubsub string
}{
{sidecar.GetClient(ctx, sidecarName4), pubsubPriority},
}
var wg sync.WaitGroup
wg.Add(len(pubTopics))
for _, topic := range pubTopics {
go func(topic string) {
defer wg.Done()
// Send events that the application above will observe.
log.Infof("Sending messages on topic '%s'", topic)
for _, msg := range msgs {
// randomize publishers
indx := rand.Intn(len(sidecars))
log.Debugf("Sending: '%s' on topic '%s'", msg, topic)
err := sidecars[indx].client.PublishEvent(ctx, sidecars[indx].pubsub, topic, msg, daprClient.PublishEventWithMetadata(map[string]string{"priority": "1"}))
require.NoError(ctx, err, "error publishing message")
}
}(topic)
}
wg.Wait()
return nil
}
// Application logic that tracks messages from a topic.
application := func(consumer *Consumer, routeIndex int) app.SetupFn {
return func(ctx flow.Context, s common.Service) (err error) {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, errFrequency)
for _, topic := range subTopics {
// Setup the /orders event handler.
err = multierr.Combine(
err,
s.AddTopicEventHandler(&common.Subscription{
PubsubName: consumer.pubsub,
Topic: topic,
Route: fmt.Sprintf("/%s-%d", topic, routeIndex),
Metadata: map[string]string{"maxPriority": "1"},
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
log.Debugf("Simulated error - consumer: %s, pubsub: %s, topic: %s, id: %s, data: %s", consumer.pubsub, e.PubsubName, e.Topic, e.ID, e.Data)
return true, err
}
// Track/Observe the data of the event.
consumer.messages[e.Topic].Observe(e.Data)
log.Debugf("Event - consumer: %s, pubsub: %s, topic: %s, id: %s, data: %s", consumer.pubsub, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
return err
}
}
flow.New(t, "rabbitmq priority certification").
// Run RabbitMQ using Docker Compose.
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait for rabbitmq readiness",
retry.Do(time.Second, 30, amqpReady(rabbitMQURL))).
// Start dapr and app to precreate all queues in rabbitmq,
// if topic is not subscribed, then the message will be lost.
// Sidecar will block to wait app, so we need to start app first.
Step(app.Run(appID4, fmt.Sprintf(":%d", appPort+1),
application(priorityClient, 1))).
Step(sidecar.Run(sidecarName4,
embedded.WithComponentsPath("./components/priority"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+1),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+10),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+1),
embedded.WithProfilePort(runtime.DefaultProfilePort+1),
embedded.WithGracefulShutdownDuration(2*time.Second),
componentRuntimeOptions(),
)).
Step("signal subscribed", flow.MustDo(func() {
close(subscribed)
})).
Step("send and wait", test).
Run()
}
func componentRuntimeOptions() []runtime.Option {
log := logger.NewLogger("dapr.components")