From 0e83c4adc528b906a3d024eff197b79a8a77728c Mon Sep 17 00:00:00 2001 From: Yaron Schneider Date: Fri, 17 Mar 2023 06:00:49 -0700 Subject: [PATCH] Add priority queue support to rabbitmq pub/sub (#2680) Signed-off-by: yaron2 --- pubsub/rabbitmq/metadata.go | 1 + pubsub/rabbitmq/rabbitmq.go | 36 ++++- pubsub/rabbitmq/rabbitmq_test.go | 43 ++++++ tests/certification/pubsub/rabbitmq/README.md | 4 +- .../priority/rabbitmq_priority.yaml | 18 +++ .../pubsub/rabbitmq/rabbitmq_test.go | 131 ++++++++++++++++++ 6 files changed, 229 insertions(+), 4 deletions(-) create mode 100644 tests/certification/pubsub/rabbitmq/components/priority/rabbitmq_priority.yaml diff --git a/pubsub/rabbitmq/metadata.go b/pubsub/rabbitmq/metadata.go index 44ee7ecd4..17da28b58 100644 --- a/pubsub/rabbitmq/metadata.go +++ b/pubsub/rabbitmq/metadata.go @@ -78,6 +78,7 @@ const ( metadataExchangeKindKey = "exchangeKind" metadataPublisherConfirmKey = "publisherConfirm" metadataSaslExternal = "saslExternal" + metadataMaxPriority = "maxPriority" defaultReconnectWaitSeconds = 3 diff --git a/pubsub/rabbitmq/rabbitmq.go b/pubsub/rabbitmq/rabbitmq.go index 3b8700f0a..34af3e7d2 100644 --- a/pubsub/rabbitmq/rabbitmq.go +++ b/pubsub/rabbitmq/rabbitmq.go @@ -18,6 +18,7 @@ import ( "crypto/tls" "errors" "fmt" + "math" "strconv" "strings" "sync" @@ -47,6 +48,7 @@ const ( argMaxLength = "x-max-length" argMaxLengthBytes = "x-max-length-bytes" argDeadLetterExchange = "x-dead-letter-exchange" + argMaxPriority = "x-max-priority" queueModeLazy = "lazy" reqMetadataRoutingKey = "routingKey" ) @@ -216,7 +218,7 @@ func (r *rabbitMQ) publishSync(ctx context.Context, req *pubsub.PublishRequest) ttl, ok, err := contribMetadata.TryGetTTL(req.Metadata) if err != nil { - r.logger.Warnf("%s publishing to %s failed parse TryGetTTL: %v, it is ignored.", logMessagePrefix, req.Topic, err) + r.logger.Warnf("%s publishing to %s failed to parse TryGetTTL: %v, it is ignored.", logMessagePrefix, req.Topic, err) } var expiration string if ok { @@ -226,12 +228,23 @@ func (r *rabbitMQ) publishSync(ctx context.Context, req *pubsub.PublishRequest) expiration = strconv.FormatInt(r.metadata.defaultQueueTTL.Milliseconds(), 10) } - confirm, err := r.channel.PublishWithDeferredConfirmWithContext(ctx, req.Topic, routingKey, false, false, amqp.Publishing{ + p := amqp.Publishing{ ContentType: "text/plain", Body: req.Data, DeliveryMode: r.metadata.deliveryMode, Expiration: expiration, - }) + } + + priority, ok, err := contribMetadata.TryGetPriority(req.Metadata) + if err != nil { + r.logger.Warnf("%s publishing to %s failed to parse priority: %v, it is ignored.", logMessagePrefix, req.Topic, err) + } + + if ok { + p.Priority = priority + } + + confirm, err := r.channel.PublishWithDeferredConfirmWithContext(ctx, req.Topic, routingKey, false, false, p) if err != nil { r.logger.Errorf("%s publishing to %s failed in channel.Publish: %v", logMessagePrefix, req.Topic, err) @@ -370,6 +383,23 @@ func (r *rabbitMQ) prepareSubscription(channel rabbitMQChannelBroker, req pubsub args = amqp.Table{argDeadLetterExchange: dlxName} } args = r.metadata.formatQueueDeclareArgs(args) + + // use priority queue if configured on subscription + if val, ok := req.Metadata[metadataMaxPriority]; ok && val != "" { + parsedVal, pErr := strconv.ParseUint(val, 10, 0) + if pErr != nil { + r.logger.Errorf("%s prepareSubscription error: can't parse maxPriority %s value on subscription metadata for topic/queue `%s/%s`: %s", logMessagePrefix, val, req.Topic, queueName, pErr) + return nil, pErr + } + + mp := uint8(parsedVal) + if parsedVal > 255 { + mp = math.MaxUint8 + } + + args[argMaxPriority] = mp + } + q, err := channel.QueueDeclare(queueName, r.metadata.durable, r.metadata.deleteWhenUnused, false, false, args) if err != nil { r.logger.Errorf("%s prepareSubscription for topic/queue '%s/%s' failed in channel.QueueDeclare: %v", logMessagePrefix, req.Topic, queueName, err) diff --git a/pubsub/rabbitmq/rabbitmq_test.go b/pubsub/rabbitmq/rabbitmq_test.go index 29e233173..c6a14b1ae 100644 --- a/pubsub/rabbitmq/rabbitmq_test.go +++ b/pubsub/rabbitmq/rabbitmq_test.go @@ -62,6 +62,49 @@ func TestNoConsumer(t *testing.T) { assert.Contains(t, err.Error(), "consumerID is required for subscriptions") } +func TestPublishAndSubscribeWithPriorityQueue(t *testing.T) { + broker := newBroker() + pubsubRabbitMQ := newRabbitMQTest(broker) + metadata := pubsub.Metadata{Base: mdata.Base{ + Properties: map[string]string{ + metadataHostnameKey: "anyhost", + metadataConsumerIDKey: "consumer", + }, + }} + err := pubsubRabbitMQ.Init(context.Background(), metadata) + assert.Nil(t, err) + assert.Equal(t, int32(1), broker.connectCount.Load()) + assert.Equal(t, int32(0), broker.closeCount.Load()) + + topic := "mytopic" + + messageCount := 0 + lastMessage := "" + processed := make(chan bool) + handler := func(ctx context.Context, msg *pubsub.NewMessage) error { + messageCount++ + lastMessage = string(msg.Data) + processed <- true + + return nil + } + + err = pubsubRabbitMQ.Subscribe(context.Background(), pubsub.SubscribeRequest{Topic: topic, Metadata: map[string]string{metadataMaxPriority: "5"}}, handler) + assert.Nil(t, err) + + err = pubsubRabbitMQ.Publish(context.Background(), &pubsub.PublishRequest{Topic: topic, Data: []byte("hello world"), Metadata: map[string]string{metadataMaxPriority: "5"}}) + assert.Nil(t, err) + <-processed + assert.Equal(t, 1, messageCount) + assert.Equal(t, "hello world", lastMessage) + + err = pubsubRabbitMQ.Publish(context.Background(), &pubsub.PublishRequest{Topic: topic, Data: []byte("foo bar")}) + assert.Nil(t, err) + <-processed + assert.Equal(t, 2, messageCount) + assert.Equal(t, "foo bar", lastMessage) +} + func TestConcurrencyMode(t *testing.T) { t.Run("parallel", func(t *testing.T) { broker := newBroker() diff --git a/tests/certification/pubsub/rabbitmq/README.md b/tests/certification/pubsub/rabbitmq/README.md index 2d3eb20c5..d112f3dec 100644 --- a/tests/certification/pubsub/rabbitmq/README.md +++ b/tests/certification/pubsub/rabbitmq/README.md @@ -36,4 +36,6 @@ This project aims to test the RabbitMQ Pub/Sub component under various condition * Send a message, wait TTL seconds, and verify the message is deleted/expired. * Setting a TTL at the component level and message level ignores the default component level TTL and always uses the message level TTL specified * Test mTLS External Authentication - * Being able to connect to the RabbitMQ \ No newline at end of file + * Being able to connect to the RabbitMQ +* Test priority queues + * Being able to publish and subscribe to priority messages/topics \ No newline at end of file diff --git a/tests/certification/pubsub/rabbitmq/components/priority/rabbitmq_priority.yaml b/tests/certification/pubsub/rabbitmq/components/priority/rabbitmq_priority.yaml new file mode 100644 index 000000000..a34b318bb --- /dev/null +++ b/tests/certification/pubsub/rabbitmq/components/priority/rabbitmq_priority.yaml @@ -0,0 +1,18 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: mq-priority +spec: + type: pubsub.rabbitmq + version: v1 + metadata: + - name: consumerID + value: priority + - name: host + value: "amqp://test:test@localhost:5672" + - name: durable + value: true + - name: deletedWhenUnused + value: false + - name: requeueInFailure + value: true diff --git a/tests/certification/pubsub/rabbitmq/rabbitmq_test.go b/tests/certification/pubsub/rabbitmq/rabbitmq_test.go index 26c31b25f..ae4f6ae77 100644 --- a/tests/certification/pubsub/rabbitmq/rabbitmq_test.go +++ b/tests/certification/pubsub/rabbitmq/rabbitmq_test.go @@ -52,10 +52,12 @@ const ( sidecarName1 = "dapr-1" sidecarName2 = "dapr-2" sidecarName3 = "dapr-3" + sidecarName4 = "dapr-4" sidecarNameTTLClient = "dapr-ttl-client" appID1 = "app-1" appID2 = "app-2" appID3 = "app-3" + appID4 = "app-4" clusterName = "rabbitmqcertification" dockerComposeYAML = "docker-compose.yml" extSaslDockerComposeYAML = "mtls_sasl_external/docker-compose.yml" @@ -72,6 +74,7 @@ const ( pubsubMessageOnlyTTL = "msg-ttl-pubsub" pubsubQueueOnlyTTL = "overwrite-ttl-pubsub" pubsubOverwriteTTL = "queue-ttl-pubsub" + pubsubPriority = "mq-priority" topicRed = "red" topicBlue = "blue" @@ -707,6 +710,134 @@ func TestRabbitMQExtAuth(t *testing.T) { Run() } +func TestRabbitMQPriority(t *testing.T) { + rand.Seed(time.Now().UTC().UnixNano()) + log := logger.NewLogger("dapr.components") + // log.SetOutputLevel(logger.DebugLevel) + + pubTopics := []string{topicRed} + subTopics := []string{topicRed} + + priorityClient := &Consumer{pubsub: pubsubPriority, messages: make(map[string]*watcher.Watcher)} + consumers := []*Consumer{priorityClient} + + for _, consumer := range consumers { + for _, topic := range pubTopics { + // In RabbitMQ, messages might not come in order. + consumer.messages[topic] = watcher.NewUnordered() + } + } + + // subscribed is used to synchronize between publisher and subscriber + subscribed := make(chan struct{}, 1) + + // Test logic that sends messages to topics and + // verifies the two consumers with different IDs have received them. + test := func(ctx flow.Context) error { + // Declare what is expected BEFORE performing any steps + // that will satisfy the test. + msgs := make([]string, numMessages) + for i := range msgs { + msgs[i] = fmt.Sprintf("Hello, Messages %03d", i) + } + + for _, consumer := range consumers { + for _, topic := range subTopics { + consumer.messages[topic].ExpectStrings(msgs...) + } + } + + <-subscribed + + // sidecar client array []{sidecar client, pubsub component name} + sidecars := []struct { + client *sidecar.Client + pubsub string + }{ + {sidecar.GetClient(ctx, sidecarName4), pubsubPriority}, + } + + var wg sync.WaitGroup + wg.Add(len(pubTopics)) + for _, topic := range pubTopics { + go func(topic string) { + defer wg.Done() + + // Send events that the application above will observe. + log.Infof("Sending messages on topic '%s'", topic) + + for _, msg := range msgs { + // randomize publishers + indx := rand.Intn(len(sidecars)) + log.Debugf("Sending: '%s' on topic '%s'", msg, topic) + err := sidecars[indx].client.PublishEvent(ctx, sidecars[indx].pubsub, topic, msg, daprClient.PublishEventWithMetadata(map[string]string{"priority": "1"})) + require.NoError(ctx, err, "error publishing message") + } + }(topic) + } + wg.Wait() + + return nil + } + + // Application logic that tracks messages from a topic. + application := func(consumer *Consumer, routeIndex int) app.SetupFn { + return func(ctx flow.Context, s common.Service) (err error) { + // Simulate periodic errors. + sim := simulate.PeriodicError(ctx, errFrequency) + + for _, topic := range subTopics { + // Setup the /orders event handler. + err = multierr.Combine( + err, + s.AddTopicEventHandler(&common.Subscription{ + PubsubName: consumer.pubsub, + Topic: topic, + Route: fmt.Sprintf("/%s-%d", topic, routeIndex), + Metadata: map[string]string{"maxPriority": "1"}, + }, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) { + if err := sim(); err != nil { + log.Debugf("Simulated error - consumer: %s, pubsub: %s, topic: %s, id: %s, data: %s", consumer.pubsub, e.PubsubName, e.Topic, e.ID, e.Data) + return true, err + } + + // Track/Observe the data of the event. + consumer.messages[e.Topic].Observe(e.Data) + log.Debugf("Event - consumer: %s, pubsub: %s, topic: %s, id: %s, data: %s", consumer.pubsub, e.PubsubName, e.Topic, e.ID, e.Data) + return false, nil + }), + ) + } + return err + } + } + + flow.New(t, "rabbitmq priority certification"). + // Run RabbitMQ using Docker Compose. + Step(dockercompose.Run(clusterName, dockerComposeYAML)). + Step("wait for rabbitmq readiness", + retry.Do(time.Second, 30, amqpReady(rabbitMQURL))). + // Start dapr and app to precreate all queues in rabbitmq, + // if topic is not subscribed, then the message will be lost. + // Sidecar will block to wait app, so we need to start app first. + Step(app.Run(appID4, fmt.Sprintf(":%d", appPort+1), + application(priorityClient, 1))). + Step(sidecar.Run(sidecarName4, + embedded.WithComponentsPath("./components/priority"), + embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+1), + embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+10), + embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+1), + embedded.WithProfilePort(runtime.DefaultProfilePort+1), + embedded.WithGracefulShutdownDuration(2*time.Second), + componentRuntimeOptions(), + )). + Step("signal subscribed", flow.MustDo(func() { + close(subscribed) + })). + Step("send and wait", test). + Run() +} + func componentRuntimeOptions() []runtime.Option { log := logger.NewLogger("dapr.components")