Add RabbitMQ single active consumer argument (#3437)

Signed-off-by: Arthur Poiret <dropsnorz@gmail.com>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
This commit is contained in:
Arthur Poiret 2024-06-18 01:12:27 +02:00 committed by GitHub
parent 105dabb47a
commit 51e0c79dd4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 29 additions and 11 deletions

View File

@ -31,6 +31,7 @@ import (
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/utils"
)
const (
@ -48,17 +49,19 @@ const (
defaultHeartbeat = 10 * time.Second
defaultLocale = "en_US"
argQueueMode = "x-queue-mode"
argMaxLength = "x-max-length"
argMaxLengthBytes = "x-max-length-bytes"
argDeadLetterExchange = "x-dead-letter-exchange"
argMaxPriority = "x-max-priority"
propertyClientName = "connection_name"
queueModeLazy = "lazy"
reqMetadataRoutingKey = "routingKey"
reqMetadataQueueTypeKey = "queueType" // at the moment, only supporting classic and quorum queues
reqMetadataMaxLenKey = "maxLen"
reqMetadataMaxLenBytesKey = "maxLenBytes"
argQueueMode = "x-queue-mode"
argMaxLength = "x-max-length"
argMaxLengthBytes = "x-max-length-bytes"
argDeadLetterExchange = "x-dead-letter-exchange"
argMaxPriority = "x-max-priority"
argSingleActiveConsumer = "x-single-active-consumer"
propertyClientName = "connection_name"
queueModeLazy = "lazy"
reqMetadataRoutingKey = "routingKey"
reqMetadataQueueTypeKey = "queueType" // at the moment, only supporting classic and quorum queues
reqMetadataSingleActiveConsumerKey = "singleActiveConsumer"
reqMetadataMaxLenKey = "maxLen"
reqMetadataMaxLenBytesKey = "maxLenBytes"
)
// RabbitMQ allows sending/receiving messages in pub/sub format.
@ -432,6 +435,11 @@ func (r *rabbitMQ) prepareSubscription(channel rabbitMQChannelBroker, req pubsub
args[amqp.QueueTypeArg] = amqp.QueueTypeClassic
}
// Applying x-single-active-consumer if defined at subscription level
if val := req.Metadata[reqMetadataSingleActiveConsumerKey]; utils.IsTruthy(val) {
args[argSingleActiveConsumer] = true
}
// Applying x-max-length-bytes if defined at subscription level
if val, ok := req.Metadata[reqMetadataMaxLenBytesKey]; ok && val != "" {
parsedVal, pErr := strconv.ParseUint(val, 10, 0)

View File

@ -129,6 +129,16 @@ func TestPublishAndSubscribeWithPriorityQueue(t *testing.T) {
<-processed
assert.Equal(t, 4, messageCount)
assert.Equal(t, "foo bar", lastMessage)
// subscribe using single active consumer
err = pubsubRabbitMQ.Subscribe(context.Background(), pubsub.SubscribeRequest{Topic: topic, Metadata: map[string]string{reqMetadataSingleActiveConsumerKey: "true"}}, handler)
require.NoError(t, err)
err = pubsubRabbitMQ.Publish(context.Background(), &pubsub.PublishRequest{Topic: topic, Data: []byte("dummy data"), Metadata: map[string]string{reqMetadataSingleActiveConsumerKey: "true"}})
require.NoError(t, err)
<-processed
assert.Equal(t, 5, messageCount)
assert.Equal(t, "dummy data", lastMessage)
}
func TestConcurrencyMode(t *testing.T) {