From 9870c5e33beb8706a6e76e5c95d55a39b40d7fa3 Mon Sep 17 00:00:00 2001 From: Taction Date: Sat, 12 Nov 2022 03:34:33 +0800 Subject: [PATCH] Add pubsub rabbitmq TTL support (#2093) * Add pubsub rabbitmq TTL support Signed-off-by: zhangchao * fix review: remove queue level ttl, both ttl is handled when publishing, to make message ttl has priority over queue level ttl Signed-off-by: zhangchao * add rabbitmq ttl certificate test Signed-off-by: zhangchao * fix lint Signed-off-by: zhangchao * fix ttl certificate test precreate topic Signed-off-by: zhangchao * tiny fix tests Signed-off-by: zhangchao Signed-off-by: zhangchao Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> Co-authored-by: Yaron Schneider Co-authored-by: Artur Souza Co-authored-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> --- pubsub/rabbitmq/metadata.go | 14 +- pubsub/rabbitmq/rabbitmq.go | 19 +- tests/certification/pubsub/rabbitmq/README.md | 8 + .../components/ttl/rabbitmq_msg_ttl.yaml | 19 ++ .../ttl/rabbitmq_overwrite_ttl.yaml | 21 ++ .../components/ttl/rabbitmq_queue_ttl.yaml | 21 ++ .../pubsub/rabbitmq/rabbitmq_test.go | 245 +++++++++++++++++- 7 files changed, 330 insertions(+), 17 deletions(-) create mode 100644 tests/certification/pubsub/rabbitmq/components/ttl/rabbitmq_msg_ttl.yaml create mode 100644 tests/certification/pubsub/rabbitmq/components/ttl/rabbitmq_overwrite_ttl.yaml create mode 100644 tests/certification/pubsub/rabbitmq/components/ttl/rabbitmq_queue_ttl.yaml diff --git a/pubsub/rabbitmq/metadata.go b/pubsub/rabbitmq/metadata.go index 5c477f91b..fd531024c 100644 --- a/pubsub/rabbitmq/metadata.go +++ b/pubsub/rabbitmq/metadata.go @@ -21,8 +21,10 @@ import ( amqp "github.com/rabbitmq/amqp091-go" - "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" + + contribMetadata "github.com/dapr/components-contrib/metadata" + "github.com/dapr/components-contrib/pubsub" ) type metadata struct { @@ -45,6 +47,7 @@ type metadata struct { exchangeKind string publisherConfirm bool concurrency pubsub.ConcurrencyMode + defaultQueueTTL *time.Duration } const ( @@ -191,6 +194,15 @@ func createMetadata(pubSubMetadata pubsub.Metadata, log logger.Logger) (*metadat } } + ttl, ok, err := contribMetadata.TryGetTTL(pubSubMetadata.Properties) + if err != nil { + return &result, fmt.Errorf("%s parse RabbitMQ ttl metadata with error: %s", errorMessagePrefix, err) + } + + if ok { + result.defaultQueueTTL = &ttl + } + c, err := pubsub.Concurrency(pubSubMetadata.Properties) if err != nil { return &result, err diff --git a/pubsub/rabbitmq/rabbitmq.go b/pubsub/rabbitmq/rabbitmq.go index 852af79db..821e00c8c 100644 --- a/pubsub/rabbitmq/rabbitmq.go +++ b/pubsub/rabbitmq/rabbitmq.go @@ -24,8 +24,10 @@ import ( amqp "github.com/rabbitmq/amqp091-go" - "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" + + contribMetadata "github.com/dapr/components-contrib/metadata" + "github.com/dapr/components-contrib/pubsub" ) const ( @@ -190,10 +192,23 @@ func (r *rabbitMQ) publishSync(req *pubsub.PublishRequest) (rabbitMQChannelBroke routingKey = val } + ttl, ok, err := contribMetadata.TryGetTTL(req.Metadata) + if err != nil { + r.logger.Warnf("%s publishing to %s failed parse TryGetTTL: %v, it is ignored.", logMessagePrefix, req.Topic, err) + } + var expiration string + if ok { + // RabbitMQ expects the duration in ms + expiration = strconv.FormatInt(ttl.Milliseconds(), 10) + } else if r.metadata.defaultQueueTTL != nil { + expiration = strconv.FormatInt(r.metadata.defaultQueueTTL.Milliseconds(), 10) + } + confirm, err := r.channel.PublishWithDeferredConfirmWithContext(r.ctx, req.Topic, routingKey, false, false, amqp.Publishing{ ContentType: "text/plain", Body: req.Data, DeliveryMode: r.metadata.deliveryMode, + Expiration: expiration, }) if err != nil { r.logger.Errorf("%s publishing to %s failed in channel.Publish: %v", logMessagePrefix, req.Topic, err) @@ -545,7 +560,7 @@ func (r *rabbitMQ) Close() error { } func (r *rabbitMQ) Features() []pubsub.Feature { - return nil + return []pubsub.Feature{pubsub.FeatureMessageTTL} } func mustReconnect(channel rabbitMQChannelBroker, err error) bool { diff --git a/tests/certification/pubsub/rabbitmq/README.md b/tests/certification/pubsub/rabbitmq/README.md index f07d26962..bc4699365 100644 --- a/tests/certification/pubsub/rabbitmq/README.md +++ b/tests/certification/pubsub/rabbitmq/README.md @@ -27,3 +27,11 @@ This project aims to test the RabbitMQ Pub/Sub component under various condition * the total number of the messages received by subscribers "B" * App: Simulates periodic errors * Component: Retries on error +* Test TTL is regarded. + * Setting a large TTL only at the message level but not component level, wait for a small period, and verify that the message is received. + * Setting a TTL only at the message level but not component level expires messages correctly + * Setting a default TTL at the component level expires messages correctly + * Create component spec with the field `ttlInSeconds`. + * Run dapr application with component. + * Send a message, wait TTL seconds, and verify the message is deleted/expired. + * Setting a TTL at the component level and message level ignores the default component level TTL and always uses the message level TTL specified diff --git a/tests/certification/pubsub/rabbitmq/components/ttl/rabbitmq_msg_ttl.yaml b/tests/certification/pubsub/rabbitmq/components/ttl/rabbitmq_msg_ttl.yaml new file mode 100644 index 000000000..121bbe192 --- /dev/null +++ b/tests/certification/pubsub/rabbitmq/components/ttl/rabbitmq_msg_ttl.yaml @@ -0,0 +1,19 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: msg-ttl-pubsub + namespace: default +spec: + type: pubsub.rabbitmq + version: v1 + metadata: + - name: consumerID + value: msg + - name: host + value: "amqp://test:test@localhost:5672" + - name: durable + value: true + - name: deletedWhenUnused + value: false + - name: requeueInFailure + value: true diff --git a/tests/certification/pubsub/rabbitmq/components/ttl/rabbitmq_overwrite_ttl.yaml b/tests/certification/pubsub/rabbitmq/components/ttl/rabbitmq_overwrite_ttl.yaml new file mode 100644 index 000000000..9b7ebd2ff --- /dev/null +++ b/tests/certification/pubsub/rabbitmq/components/ttl/rabbitmq_overwrite_ttl.yaml @@ -0,0 +1,21 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: overwrite-ttl-pubsub + namespace: default +spec: + type: pubsub.rabbitmq + version: v1 + metadata: + - name: consumerID + value: overwrite + - name: host + value: "amqp://test:test@localhost:5672" + - name: durable + value: true + - name: deletedWhenUnused + value: false + - name: requeueInFailure + value: true + - name: ttlInSeconds + value: 30 # Short TTL for easier testing \ No newline at end of file diff --git a/tests/certification/pubsub/rabbitmq/components/ttl/rabbitmq_queue_ttl.yaml b/tests/certification/pubsub/rabbitmq/components/ttl/rabbitmq_queue_ttl.yaml new file mode 100644 index 000000000..06a0a223d --- /dev/null +++ b/tests/certification/pubsub/rabbitmq/components/ttl/rabbitmq_queue_ttl.yaml @@ -0,0 +1,21 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: queue-ttl-pubsub + namespace: default +spec: + type: pubsub.rabbitmq + version: v1 + metadata: + - name: consumerID + value: queue + - name: host + value: "amqp://test:test@localhost:5672" + - name: durable + value: true + - name: deletedWhenUnused + value: false + - name: requeueInFailure + value: true + - name: ttlInSeconds + value: 10 # Short TTL for easier testing \ No newline at end of file diff --git a/tests/certification/pubsub/rabbitmq/rabbitmq_test.go b/tests/certification/pubsub/rabbitmq/rabbitmq_test.go index 3d14ff465..146325d81 100644 --- a/tests/certification/pubsub/rabbitmq/rabbitmq_test.go +++ b/tests/certification/pubsub/rabbitmq/rabbitmq_test.go @@ -17,24 +17,27 @@ import ( "context" "fmt" "math/rand" + "strconv" "sync" "testing" "time" "github.com/cenkalti/backoff/v4" + daprClient "github.com/dapr/go-sdk/client" amqp "github.com/rabbitmq/amqp091-go" "github.com/stretchr/testify/require" "go.uber.org/multierr" // Pub/Sub. - pubsub_rabbitmq "github.com/dapr/components-contrib/pubsub/rabbitmq" pubsub_loader "github.com/dapr/dapr/pkg/components/pubsub" "github.com/dapr/dapr/pkg/runtime" "github.com/dapr/go-sdk/service/common" "github.com/dapr/kit/logger" kit_retry "github.com/dapr/kit/retry" + pubsub_rabbitmq "github.com/dapr/components-contrib/pubsub/rabbitmq" + "github.com/dapr/components-contrib/tests/certification/embedded" "github.com/dapr/components-contrib/tests/certification/flow" "github.com/dapr/components-contrib/tests/certification/flow/app" @@ -47,26 +50,34 @@ import ( ) const ( - sidecarName1 = "dapr-1" - sidecarName2 = "dapr-2" - sidecarName3 = "dapr-3" - appID1 = "app-1" - appID2 = "app-2" - appID3 = "app-3" - clusterName = "rabbitmqcertification" - dockerComposeYAML = "docker-compose.yml" - numMessages = 1000 - errFrequency = 100 - appPort = 8000 + sidecarName1 = "dapr-1" + sidecarName2 = "dapr-2" + sidecarName3 = "dapr-3" + sidecarNameTTLClient = "dapr-ttl-client" + appID1 = "app-1" + appID2 = "app-2" + appID3 = "app-3" + clusterName = "rabbitmqcertification" + dockerComposeYAML = "docker-compose.yml" + numMessages = 1000 + errFrequency = 100 + appPort = 8000 rabbitMQURL = "amqp://test:test@localhost:5672" - pubsubAlpha = "mq-alpha" - pubsubBeta = "mq-beta" + pubsubAlpha = "mq-alpha" + pubsubBeta = "mq-beta" + pubsubMessageOnlyTTL = "msg-ttl-pubsub" + pubsubQueueOnlyTTL = "overwrite-ttl-pubsub" + pubsubOverwriteTTL = "queue-ttl-pubsub" topicRed = "red" topicBlue = "blue" topicGreen = "green" + + topicTTL1 = "ttl1" + topicTTL2 = "ttl2" + topicTTL3 = "ttl3" ) type Consumer struct { @@ -325,6 +336,212 @@ func TestRabbitMQ(t *testing.T) { Run() } +func TestRabbitMQTTL(t *testing.T) { + rand.Seed(time.Now().UTC().UnixNano()) + log := logger.NewLogger("dapr.components") + // log.SetOutputLevel(logger.DebugLevel) + + MessageOnlyMessages, QueueOnlyMessages, OverwriteMessages := watcher.NewUnordered(), watcher.NewUnordered(), watcher.NewUnordered() + fullMessages := watcher.NewUnordered() + // Application logic that tracks messages from a topic. + application := func(pubsubName, topic string, w *watcher.Watcher) app.SetupFn { + return func(ctx flow.Context, s common.Service) (err error) { + // Simulate periodic errors. + sim := simulate.PeriodicError(ctx, errFrequency) + err = multierr.Combine( + err, + s.AddTopicEventHandler(&common.Subscription{ + PubsubName: pubsubName, + Topic: topic, + Route: fmt.Sprintf("/%s", topic), + }, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) { + if err := sim(); err != nil { + log.Debugf("Simulated error - pubsub: %s, topic: %s, id: %s, data: %s", e.PubsubName, e.Topic, e.ID, e.Data) + return true, err + } + + // Track/Observe the data of the event. + w.Observe(e.Data) + log.Debugf("Event - pubsub: %s, topic: %s, id: %s, data: %s", e.PubsubName, e.Topic, e.ID, e.Data) + return false, nil + }), + ) + + return err + } + } + + sendMessage := func(sidecarName, pubsubName, topic string, ttlInSeconds int) func(ctx flow.Context) error { + fullMessages.Reset() + return func(ctx flow.Context) error { + // Declare what is expected BEFORE performing any steps + // that will satisfy the test. + msgs := make([]string, numMessages) + for i := range msgs { + msgs[i] = fmt.Sprintf("Hello, Messages %03d", i) + } + + sc := sidecar.GetClient(ctx, sidecarName) + + // Send events that the application above will observe. + log.Infof("Sending messages on topic '%s'", topic) + + var err error + for _, msg := range msgs { + log.Debugf("Sending: '%s' on topic '%s'", msg, topic) + if ttlInSeconds > 0 { + err = sc.PublishEvent(ctx, pubsubName, topic, msg, daprClient.PublishEventWithMetadata(map[string]string{"ttlInSeconds": strconv.Itoa(ttlInSeconds)})) + } else { + err = sc.PublishEvent(ctx, pubsubName, topic, msg) + } + require.NoError(ctx, err, "error publishing message") + fullMessages.Add(msg) + fullMessages.Prepare(msg) + } + + return nil + } + } + + flow.New(t, "rabbitmq ttl certification"). + // Run RabbitMQ using Docker Compose. + Step(dockercompose.Run(clusterName, dockerComposeYAML)). + Step("wait for rabbitmq readiness", + retry.Do(time.Second, 30, amqpReady(rabbitMQURL))). + // Start dapr and app to precreate all 3 queue in rabbitmq, + // if topic is not subscribed, then the message will be lost. + // Sidecar will block to wait app, so we need to start app first. + Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+1), + application(pubsubMessageOnlyTTL, topicTTL1, fullMessages))). + Step(sidecar.Run(sidecarName1, + embedded.WithComponentsPath("./components/ttl"), + embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+1), + embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+1), + embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+1), + embedded.WithProfilePort(runtime.DefaultProfilePort+1), + componentRuntimeOptions(), + )). + Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+2), + application(pubsubQueueOnlyTTL, topicTTL2, QueueOnlyMessages))). + Step(sidecar.Run(sidecarName2, + embedded.WithComponentsPath("./components/ttl"), + embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+2), + embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+2), + embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+2), + embedded.WithProfilePort(runtime.DefaultProfilePort+2), + componentRuntimeOptions(), + )). + Step(app.Run(appID3, fmt.Sprintf(":%d", appPort+4), + application(pubsubOverwriteTTL, topicTTL3, OverwriteMessages))). + Step(sidecar.Run(sidecarName3, + embedded.WithComponentsPath("./components/ttl"), + embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+4), + embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+4), + embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+4), + embedded.WithProfilePort(runtime.DefaultProfilePort+4), + componentRuntimeOptions(), + )). + // Wait for all queue crated and then stop. + Step("wait", flow.Sleep(10*time.Second)). + Step("stop sidecar1", sidecar.Stop(sidecarName1)). + Step("stop app1", app.Stop(appID1)). + Step("stop sidecar2", sidecar.Stop(sidecarName2)). + Step("stop app2", app.Stop(appID2)). + Step("stop sidecar3", sidecar.Stop(sidecarName3)). + Step("stop app3", app.Stop(appID3)). + // Run publishing sidecars and send to RabbitMQ. + Step(sidecar.Run(sidecarNameTTLClient, + embedded.WithComponentsPath("./components/ttl"), + embedded.WithAppProtocol(runtime.HTTPProtocol, 0), + embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort), + embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort), + embedded.WithProfilePort(runtime.DefaultProfilePort), + componentRuntimeOptions(), + )). + Step("wait", flow.Sleep(5*time.Second)). + // base test case, send message with large ttl and check messages are received. + Step("send message only ttl messages", sendMessage(sidecarNameTTLClient, pubsubMessageOnlyTTL, topicTTL1, 100)). + Step("wait", flow.Sleep(5*time.Second)). + // Run the application1 logic above. + Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+1), + application(pubsubMessageOnlyTTL, topicTTL1, fullMessages))). + // Run the Dapr sidecar with the RabbitMQ component. + Step(sidecar.Run(sidecarName1, + embedded.WithComponentsPath("./components/ttl"), + embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+1), + embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+1), + embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+1), + embedded.WithProfilePort(runtime.DefaultProfilePort+1), + componentRuntimeOptions(), + )). + Step("verify full messages", func(ctx flow.Context) error { + // Assertion on the data. + fullMessages.Assert(t, 3*time.Minute) + return nil + }). + Step("stop sidecar", sidecar.Stop(sidecarName1)). + Step("stop app", app.Stop(appID1)). + // test case 1, send message with ttl and check messages are expired. + Step("send message only ttl messages", sendMessage(sidecarNameTTLClient, pubsubMessageOnlyTTL, topicTTL1, 10)). + Step("wait", flow.Sleep(10*time.Second)). + Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+1), + application(pubsubMessageOnlyTTL, topicTTL1, MessageOnlyMessages))). + Step(sidecar.Run(sidecarName1, + embedded.WithComponentsPath("./components/ttl"), + embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+1), + embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+1), + embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+1), + embedded.WithProfilePort(runtime.DefaultProfilePort+1), + componentRuntimeOptions(), + )). + Step("verify messages only ttl", func(ctx flow.Context) error { + // Assertion on the data. + MessageOnlyMessages.Assert(t, 3*time.Minute) + return nil + }). + // test case 2, send message without ttl to a queue with ttl, check messages are expired. + Step("send message without ttl to queue with ttl", sendMessage(sidecarNameTTLClient, pubsubQueueOnlyTTL, topicTTL2, 0)). + Step("wait", flow.Sleep(10*time.Second)). + // Run the application2 logic above. + Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+2), + application(pubsubQueueOnlyTTL, topicTTL2, QueueOnlyMessages))). + // Run the Dapr sidecar with the RabbitMQ component. + Step(sidecar.Run(sidecarName2, + embedded.WithComponentsPath("./components/ttl"), + embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+2), + embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+2), + embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+2), + embedded.WithProfilePort(runtime.DefaultProfilePort+2), + componentRuntimeOptions(), + )). + Step("verify messages only ttl", func(ctx flow.Context) error { + // Assertion on the data. + QueueOnlyMessages.Assert(t, 3*time.Minute) + return nil + }). + Step("send message with ttl 10 to queue with ttl 30", + sendMessage(sidecarNameTTLClient, pubsubOverwriteTTL, topicTTL3, 10)). + Step("wait", flow.Sleep(10*time.Second)). + // test case 3, send message with ttl 10s to a queue with ttl 30s, wait for 10s, check messages are expired. + Step(app.Run(appID3, fmt.Sprintf(":%d", appPort+4), + application(pubsubOverwriteTTL, topicTTL3, OverwriteMessages))). + // Run the Dapr sidecar with the RabbitMQ component. + Step(sidecar.Run(sidecarName3, + embedded.WithComponentsPath("./components/ttl"), + embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+4), + embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+4), + embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+4), + embedded.WithProfilePort(runtime.DefaultProfilePort+4), + componentRuntimeOptions(), + )). + Step("verify messages only ttl", func(ctx flow.Context) error { + // Assertion on the data. + OverwriteMessages.Assert(t, 3*time.Minute) + return nil + }). + Run() +} + func componentRuntimeOptions() []runtime.Option { log := logger.NewLogger("dapr.components")