[RabbitMQ pub/sub] Allow quorum queues (#2816)

Signed-off-by: Bernd Verst <github@bernd.dev>
Signed-off-by: Alvaro Aguilar <alvaro.aguilar@scrm.lidl>
Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
Signed-off-by: Álvaro Aguilar <95039001@lidl.de>
Signed-off-by: Álvaro Aguilar <alvaroteleco@hotmail.com>
Co-authored-by: Bernd Verst <github@bernd.dev>
Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
Co-authored-by: Álvaro Aguilar <95039001@lidl.de>
This commit is contained in:
Álvaro Aguilar-Tablada Espinosa 2023-08-07 15:28:58 +02:00 committed by GitHub
parent c957420341
commit e10d5b7e86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 94 additions and 18 deletions

View File

@ -39,19 +39,23 @@ const (
errorMessagePrefix = "rabbitmq pub/sub error:"
errorChannelNotInitialized = "channel not initialized"
errorChannelConnection = "channel/connection is not open"
errorInvalidQueueType = "invalid queue type"
defaultDeadLetterExchangeFormat = "dlx-%s"
defaultDeadLetterQueueFormat = "dlq-%s"
publishMaxRetries = 3
publishRetryWaitSeconds = 2
argQueueMode = "x-queue-mode"
argMaxLength = "x-max-length"
argMaxLengthBytes = "x-max-length-bytes"
argDeadLetterExchange = "x-dead-letter-exchange"
argMaxPriority = "x-max-priority"
queueModeLazy = "lazy"
reqMetadataRoutingKey = "routingKey"
argQueueMode = "x-queue-mode"
argMaxLength = "x-max-length"
argMaxLengthBytes = "x-max-length-bytes"
argDeadLetterExchange = "x-dead-letter-exchange"
argMaxPriority = "x-max-priority"
queueModeLazy = "lazy"
reqMetadataRoutingKey = "routingKey"
reqMetadataQueueTypeKey = "queueType" // at the moment, only supporting classic and quorum queues
reqMetadataMaxLenKey = "maxLen"
reqMetadataMaxLenBytesKey = "maxLenBytes"
)
// RabbitMQ allows sending/receiving messages in pub/sub format.
@ -319,7 +323,7 @@ func (r *rabbitMQ) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, h
r.logger.Infof("%s subscribe to topic/queue '%s/%s'", logMessagePrefix, req.Topic, queueName)
// Do not set a timeout on the context, as we're just waiting for the first ack; we're using a semaphore instead
ackCh := make(chan struct{}, 1)
ackCh := make(chan bool, 1)
defer close(ackCh)
subctx, cancel := context.WithCancel(ctx)
@ -341,8 +345,12 @@ func (r *rabbitMQ) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, h
select {
case <-time.After(time.Minute):
return fmt.Errorf("failed to subscribe to %s", queueName)
case <-ackCh:
return nil
case failed := <-ackCh:
if failed {
return fmt.Errorf("error not retriable for %s", queueName)
} else {
return nil
}
}
}
@ -405,6 +413,37 @@ func (r *rabbitMQ) prepareSubscription(channel rabbitMQChannelBroker, req pubsub
args[argMaxPriority] = mp
}
// queue type is classic by default, but we allow user to create quorum queues if desired
if val := req.Metadata[reqMetadataQueueTypeKey]; val != "" {
if !queueTypeValid(val) {
return nil, fmt.Errorf("invalid queue type %s. Valid types are %s and %s", val, amqp.QueueTypeClassic, amqp.QueueTypeQuorum)
} else {
args[amqp.QueueTypeArg] = val
}
} else {
args[amqp.QueueTypeArg] = amqp.QueueTypeClassic
}
// Applying x-max-length-bytes if defined at subscription level
if val, ok := req.Metadata[reqMetadataMaxLenBytesKey]; ok && val != "" {
parsedVal, pErr := strconv.ParseUint(val, 10, 0)
if pErr != nil {
r.logger.Errorf("%s prepareSubscription error: can't parse %s value on subscription metadata for topic/queue `%s/%s`: %s", logMessagePrefix, argMaxLengthBytes, req.Topic, queueName, pErr)
return nil, pErr
}
args[argMaxLengthBytes] = parsedVal
}
// Applying x-max-length if defined at subscription level
if val, ok := req.Metadata[reqMetadataMaxLenKey]; ok && val != "" {
parsedVal, pErr := strconv.ParseUint(val, 10, 0)
if pErr != nil {
r.logger.Errorf("%s prepareSubscription error: can't parse %s value on subscription metadata for topic/queue `%s/%s`: %s", logMessagePrefix, argMaxLength, req.Topic, queueName, pErr)
return nil, pErr
}
args[argMaxLength] = parsedVal
}
q, err := channel.QueueDeclare(queueName, r.metadata.Durable, r.metadata.DeleteWhenUnused, false, false, args)
if err != nil {
r.logger.Errorf("%s prepareSubscription for topic/queue '%s/%s' failed in channel.QueueDeclare: %v", logMessagePrefix, req.Topic, queueName, err)
@ -454,7 +493,7 @@ func (r *rabbitMQ) ensureSubscription(req pubsub.SubscribeRequest, queueName str
return r.channel, r.connectionCount, q, err
}
func (r *rabbitMQ) subscribeForever(ctx context.Context, req pubsub.SubscribeRequest, queueName string, handler pubsub.Handler, ackCh chan struct{}) {
func (r *rabbitMQ) subscribeForever(ctx context.Context, req pubsub.SubscribeRequest, queueName string, handler pubsub.Handler, ackCh chan bool) {
for {
var (
err error
@ -466,6 +505,7 @@ func (r *rabbitMQ) subscribeForever(ctx context.Context, req pubsub.SubscribeReq
)
for {
channel, connectionCount, q, err = r.ensureSubscription(req, queueName)
if err != nil {
errFuncName = "ensureSubscription"
break
@ -487,7 +527,7 @@ func (r *rabbitMQ) subscribeForever(ctx context.Context, req pubsub.SubscribeReq
// one-time notification on successful subscribe
if ackCh != nil {
ackCh <- struct{}{}
ackCh <- false
ackCh = nil
}
@ -498,6 +538,11 @@ func (r *rabbitMQ) subscribeForever(ctx context.Context, req pubsub.SubscribeReq
}
}
if strings.Contains(err.Error(), errorInvalidQueueType) {
ackCh <- true
return
}
if err == context.Canceled || err == context.DeadlineExceeded {
// Subscription context was canceled
r.logger.Infof("%s subscription for %s has context canceled", logMessagePrefix, queueName)
@ -682,3 +727,7 @@ func (r *rabbitMQ) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.PubSubType)
return
}
func queueTypeValid(qType string) bool {
return qType == amqp.QueueTypeClassic || qType == amqp.QueueTypeQuorum
}

View File

@ -98,10 +98,36 @@ func TestPublishAndSubscribeWithPriorityQueue(t *testing.T) {
assert.Equal(t, 1, messageCount)
assert.Equal(t, "hello world", lastMessage)
err = pubsubRabbitMQ.Publish(context.Background(), &pubsub.PublishRequest{Topic: topic, Data: []byte("foo bar")})
assert.Nil(t, err)
// subscribe using classic queue type
err = pubsubRabbitMQ.Subscribe(context.Background(), pubsub.SubscribeRequest{Topic: topic, Metadata: map[string]string{reqMetadataQueueTypeKey: "classic"}}, handler)
assert.NoError(t, err)
// publish using classic queue type
err = pubsubRabbitMQ.Publish(context.Background(), &pubsub.PublishRequest{Topic: topic, Data: []byte("hey there"), Metadata: map[string]string{reqMetadataQueueTypeKey: "classic"}})
assert.NoError(t, err)
<-processed
assert.Equal(t, 2, messageCount)
assert.Equal(t, "hey there", lastMessage)
// subscribe using quorum queue type
err = pubsubRabbitMQ.Subscribe(context.Background(), pubsub.SubscribeRequest{Topic: topic, Metadata: map[string]string{reqMetadataQueueTypeKey: "quorum"}}, handler)
assert.NoError(t, err)
// publish using quorum queue type
err = pubsubRabbitMQ.Publish(context.Background(), &pubsub.PublishRequest{Topic: topic, Data: []byte("hello friends"), Metadata: map[string]string{reqMetadataQueueTypeKey: "quorum"}})
assert.NoError(t, err)
<-processed
assert.Equal(t, 3, messageCount)
assert.Equal(t, "hello friends", lastMessage)
// trying to subscribe using invalid queue type
err = pubsubRabbitMQ.Subscribe(context.Background(), pubsub.SubscribeRequest{Topic: topic, Metadata: map[string]string{reqMetadataQueueTypeKey: "invalid"}}, handler)
assert.Error(t, err)
err = pubsubRabbitMQ.Publish(context.Background(), &pubsub.PublishRequest{Topic: topic, Data: []byte("foo bar")})
assert.NoError(t, err)
<-processed
assert.Equal(t, 4, messageCount)
assert.Equal(t, "foo bar", lastMessage)
}

View File

@ -226,7 +226,7 @@ func TestRabbitMQ(t *testing.T) {
}
// Application logic that tracks messages from a topic.
application := func(consumer *Consumer, routeIndex int) app.SetupFn {
application := func(consumer *Consumer, routeIndex int, queueType string) app.SetupFn {
return func(ctx flow.Context, s common.Service) (err error) {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, errFrequency)
@ -239,6 +239,7 @@ func TestRabbitMQ(t *testing.T) {
PubsubName: consumer.pubsub,
Topic: topic,
Route: fmt.Sprintf("/%s-%d", topic, routeIndex),
Metadata: map[string]string{"queueType": queueType},
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
log.Debugf("Simulated error - consumer: %s, pubsub: %s, topic: %s, id: %s, data: %s", consumer.pubsub, e.PubsubName, e.Topic, e.ID, e.Data)
@ -318,7 +319,7 @@ func TestRabbitMQ(t *testing.T) {
retry.Do(time.Second, 30, amqpReady(rabbitMQURL))).
// Run the application1 logic above.
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
application(alpha, 1))).
application(alpha, 1, "quorum"))).
// Run the Dapr sidecar with the RabbitMQ component.
Step(sidecar.Run(sidecarName1,
embedded.WithComponentsPath("./components/alpha"),
@ -331,7 +332,7 @@ func TestRabbitMQ(t *testing.T) {
)).
// Run the application2 logic above.
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+2),
application(beta, 2))).
application(beta, 2, "classic"))).
// Run the Dapr sidecar with the RabbitMQ component.
Step(sidecar.Run(sidecarName2,
embedded.WithComponentsPath("./components/beta"),
@ -344,7 +345,7 @@ func TestRabbitMQ(t *testing.T) {
)).
// Run the application3 logic above.
Step(app.Run(appID3, fmt.Sprintf(":%d", appPort+4),
application(beta, 3))).
application(beta, 3, "classic"))).
// Run the Dapr sidecar with the RabbitMQ component.
Step(sidecar.Run(sidecarName3,
embedded.WithComponentsPath("./components/beta"),