diff --git a/pubsub/rabbitmq/rabbitmq.go b/pubsub/rabbitmq/rabbitmq.go index e332b58ad..2b0ef631c 100644 --- a/pubsub/rabbitmq/rabbitmq.go +++ b/pubsub/rabbitmq/rabbitmq.go @@ -39,19 +39,23 @@ const ( errorMessagePrefix = "rabbitmq pub/sub error:" errorChannelNotInitialized = "channel not initialized" errorChannelConnection = "channel/connection is not open" + errorInvalidQueueType = "invalid queue type" defaultDeadLetterExchangeFormat = "dlx-%s" defaultDeadLetterQueueFormat = "dlq-%s" publishMaxRetries = 3 publishRetryWaitSeconds = 2 - argQueueMode = "x-queue-mode" - argMaxLength = "x-max-length" - argMaxLengthBytes = "x-max-length-bytes" - argDeadLetterExchange = "x-dead-letter-exchange" - argMaxPriority = "x-max-priority" - queueModeLazy = "lazy" - reqMetadataRoutingKey = "routingKey" + argQueueMode = "x-queue-mode" + argMaxLength = "x-max-length" + argMaxLengthBytes = "x-max-length-bytes" + argDeadLetterExchange = "x-dead-letter-exchange" + argMaxPriority = "x-max-priority" + queueModeLazy = "lazy" + reqMetadataRoutingKey = "routingKey" + reqMetadataQueueTypeKey = "queueType" // at the moment, only supporting classic and quorum queues + reqMetadataMaxLenKey = "maxLen" + reqMetadataMaxLenBytesKey = "maxLenBytes" ) // RabbitMQ allows sending/receiving messages in pub/sub format. @@ -319,7 +323,7 @@ func (r *rabbitMQ) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, h r.logger.Infof("%s subscribe to topic/queue '%s/%s'", logMessagePrefix, req.Topic, queueName) // Do not set a timeout on the context, as we're just waiting for the first ack; we're using a semaphore instead - ackCh := make(chan struct{}, 1) + ackCh := make(chan bool, 1) defer close(ackCh) subctx, cancel := context.WithCancel(ctx) @@ -341,8 +345,12 @@ func (r *rabbitMQ) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, h select { case <-time.After(time.Minute): return fmt.Errorf("failed to subscribe to %s", queueName) - case <-ackCh: - return nil + case failed := <-ackCh: + if failed { + return fmt.Errorf("error not retriable for %s", queueName) + } else { + return nil + } } } @@ -405,6 +413,37 @@ func (r *rabbitMQ) prepareSubscription(channel rabbitMQChannelBroker, req pubsub args[argMaxPriority] = mp } + // queue type is classic by default, but we allow user to create quorum queues if desired + if val := req.Metadata[reqMetadataQueueTypeKey]; val != "" { + if !queueTypeValid(val) { + return nil, fmt.Errorf("invalid queue type %s. Valid types are %s and %s", val, amqp.QueueTypeClassic, amqp.QueueTypeQuorum) + } else { + args[amqp.QueueTypeArg] = val + } + } else { + args[amqp.QueueTypeArg] = amqp.QueueTypeClassic + } + + // Applying x-max-length-bytes if defined at subscription level + if val, ok := req.Metadata[reqMetadataMaxLenBytesKey]; ok && val != "" { + parsedVal, pErr := strconv.ParseUint(val, 10, 0) + if pErr != nil { + r.logger.Errorf("%s prepareSubscription error: can't parse %s value on subscription metadata for topic/queue `%s/%s`: %s", logMessagePrefix, argMaxLengthBytes, req.Topic, queueName, pErr) + return nil, pErr + } + args[argMaxLengthBytes] = parsedVal + } + + // Applying x-max-length if defined at subscription level + if val, ok := req.Metadata[reqMetadataMaxLenKey]; ok && val != "" { + parsedVal, pErr := strconv.ParseUint(val, 10, 0) + if pErr != nil { + r.logger.Errorf("%s prepareSubscription error: can't parse %s value on subscription metadata for topic/queue `%s/%s`: %s", logMessagePrefix, argMaxLength, req.Topic, queueName, pErr) + return nil, pErr + } + args[argMaxLength] = parsedVal + } + q, err := channel.QueueDeclare(queueName, r.metadata.Durable, r.metadata.DeleteWhenUnused, false, false, args) if err != nil { r.logger.Errorf("%s prepareSubscription for topic/queue '%s/%s' failed in channel.QueueDeclare: %v", logMessagePrefix, req.Topic, queueName, err) @@ -454,7 +493,7 @@ func (r *rabbitMQ) ensureSubscription(req pubsub.SubscribeRequest, queueName str return r.channel, r.connectionCount, q, err } -func (r *rabbitMQ) subscribeForever(ctx context.Context, req pubsub.SubscribeRequest, queueName string, handler pubsub.Handler, ackCh chan struct{}) { +func (r *rabbitMQ) subscribeForever(ctx context.Context, req pubsub.SubscribeRequest, queueName string, handler pubsub.Handler, ackCh chan bool) { for { var ( err error @@ -466,6 +505,7 @@ func (r *rabbitMQ) subscribeForever(ctx context.Context, req pubsub.SubscribeReq ) for { channel, connectionCount, q, err = r.ensureSubscription(req, queueName) + if err != nil { errFuncName = "ensureSubscription" break @@ -487,7 +527,7 @@ func (r *rabbitMQ) subscribeForever(ctx context.Context, req pubsub.SubscribeReq // one-time notification on successful subscribe if ackCh != nil { - ackCh <- struct{}{} + ackCh <- false ackCh = nil } @@ -498,6 +538,11 @@ func (r *rabbitMQ) subscribeForever(ctx context.Context, req pubsub.SubscribeReq } } + if strings.Contains(err.Error(), errorInvalidQueueType) { + ackCh <- true + return + } + if err == context.Canceled || err == context.DeadlineExceeded { // Subscription context was canceled r.logger.Infof("%s subscription for %s has context canceled", logMessagePrefix, queueName) @@ -682,3 +727,7 @@ func (r *rabbitMQ) GetComponentMetadata() (metadataInfo metadata.MetadataMap) { metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.PubSubType) return } + +func queueTypeValid(qType string) bool { + return qType == amqp.QueueTypeClassic || qType == amqp.QueueTypeQuorum +} diff --git a/pubsub/rabbitmq/rabbitmq_test.go b/pubsub/rabbitmq/rabbitmq_test.go index f9248df87..fc431cf44 100644 --- a/pubsub/rabbitmq/rabbitmq_test.go +++ b/pubsub/rabbitmq/rabbitmq_test.go @@ -98,10 +98,36 @@ func TestPublishAndSubscribeWithPriorityQueue(t *testing.T) { assert.Equal(t, 1, messageCount) assert.Equal(t, "hello world", lastMessage) - err = pubsubRabbitMQ.Publish(context.Background(), &pubsub.PublishRequest{Topic: topic, Data: []byte("foo bar")}) - assert.Nil(t, err) + // subscribe using classic queue type + err = pubsubRabbitMQ.Subscribe(context.Background(), pubsub.SubscribeRequest{Topic: topic, Metadata: map[string]string{reqMetadataQueueTypeKey: "classic"}}, handler) + assert.NoError(t, err) + + // publish using classic queue type + err = pubsubRabbitMQ.Publish(context.Background(), &pubsub.PublishRequest{Topic: topic, Data: []byte("hey there"), Metadata: map[string]string{reqMetadataQueueTypeKey: "classic"}}) + assert.NoError(t, err) <-processed assert.Equal(t, 2, messageCount) + assert.Equal(t, "hey there", lastMessage) + + // subscribe using quorum queue type + err = pubsubRabbitMQ.Subscribe(context.Background(), pubsub.SubscribeRequest{Topic: topic, Metadata: map[string]string{reqMetadataQueueTypeKey: "quorum"}}, handler) + assert.NoError(t, err) + + // publish using quorum queue type + err = pubsubRabbitMQ.Publish(context.Background(), &pubsub.PublishRequest{Topic: topic, Data: []byte("hello friends"), Metadata: map[string]string{reqMetadataQueueTypeKey: "quorum"}}) + assert.NoError(t, err) + <-processed + assert.Equal(t, 3, messageCount) + assert.Equal(t, "hello friends", lastMessage) + + // trying to subscribe using invalid queue type + err = pubsubRabbitMQ.Subscribe(context.Background(), pubsub.SubscribeRequest{Topic: topic, Metadata: map[string]string{reqMetadataQueueTypeKey: "invalid"}}, handler) + assert.Error(t, err) + + err = pubsubRabbitMQ.Publish(context.Background(), &pubsub.PublishRequest{Topic: topic, Data: []byte("foo bar")}) + assert.NoError(t, err) + <-processed + assert.Equal(t, 4, messageCount) assert.Equal(t, "foo bar", lastMessage) } diff --git a/tests/certification/pubsub/rabbitmq/rabbitmq_test.go b/tests/certification/pubsub/rabbitmq/rabbitmq_test.go index ae4f6ae77..5f87438a0 100644 --- a/tests/certification/pubsub/rabbitmq/rabbitmq_test.go +++ b/tests/certification/pubsub/rabbitmq/rabbitmq_test.go @@ -226,7 +226,7 @@ func TestRabbitMQ(t *testing.T) { } // Application logic that tracks messages from a topic. - application := func(consumer *Consumer, routeIndex int) app.SetupFn { + application := func(consumer *Consumer, routeIndex int, queueType string) app.SetupFn { return func(ctx flow.Context, s common.Service) (err error) { // Simulate periodic errors. sim := simulate.PeriodicError(ctx, errFrequency) @@ -239,6 +239,7 @@ func TestRabbitMQ(t *testing.T) { PubsubName: consumer.pubsub, Topic: topic, Route: fmt.Sprintf("/%s-%d", topic, routeIndex), + Metadata: map[string]string{"queueType": queueType}, }, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) { if err := sim(); err != nil { log.Debugf("Simulated error - consumer: %s, pubsub: %s, topic: %s, id: %s, data: %s", consumer.pubsub, e.PubsubName, e.Topic, e.ID, e.Data) @@ -318,7 +319,7 @@ func TestRabbitMQ(t *testing.T) { retry.Do(time.Second, 30, amqpReady(rabbitMQURL))). // Run the application1 logic above. Step(app.Run(appID1, fmt.Sprintf(":%d", appPort), - application(alpha, 1))). + application(alpha, 1, "quorum"))). // Run the Dapr sidecar with the RabbitMQ component. Step(sidecar.Run(sidecarName1, embedded.WithComponentsPath("./components/alpha"), @@ -331,7 +332,7 @@ func TestRabbitMQ(t *testing.T) { )). // Run the application2 logic above. Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+2), - application(beta, 2))). + application(beta, 2, "classic"))). // Run the Dapr sidecar with the RabbitMQ component. Step(sidecar.Run(sidecarName2, embedded.WithComponentsPath("./components/beta"), @@ -344,7 +345,7 @@ func TestRabbitMQ(t *testing.T) { )). // Run the application3 logic above. Step(app.Run(appID3, fmt.Sprintf(":%d", appPort+4), - application(beta, 3))). + application(beta, 3, "classic"))). // Run the Dapr sidecar with the RabbitMQ component. Step(sidecar.Run(sidecarName3, embedded.WithComponentsPath("./components/beta"),