Merge branch 'master' into master
This commit is contained in:
commit
ead1843aab
|
@ -21,8 +21,10 @@ import (
|
|||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
"github.com/dapr/kit/logger"
|
||||
|
||||
contribMetadata "github.com/dapr/components-contrib/metadata"
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
)
|
||||
|
||||
type metadata struct {
|
||||
|
@ -45,6 +47,7 @@ type metadata struct {
|
|||
exchangeKind string
|
||||
publisherConfirm bool
|
||||
concurrency pubsub.ConcurrencyMode
|
||||
defaultQueueTTL *time.Duration
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -191,6 +194,15 @@ func createMetadata(pubSubMetadata pubsub.Metadata, log logger.Logger) (*metadat
|
|||
}
|
||||
}
|
||||
|
||||
ttl, ok, err := contribMetadata.TryGetTTL(pubSubMetadata.Properties)
|
||||
if err != nil {
|
||||
return &result, fmt.Errorf("%s parse RabbitMQ ttl metadata with error: %s", errorMessagePrefix, err)
|
||||
}
|
||||
|
||||
if ok {
|
||||
result.defaultQueueTTL = &ttl
|
||||
}
|
||||
|
||||
c, err := pubsub.Concurrency(pubSubMetadata.Properties)
|
||||
if err != nil {
|
||||
return &result, err
|
||||
|
|
|
@ -24,8 +24,10 @@ import (
|
|||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
"github.com/dapr/kit/logger"
|
||||
|
||||
contribMetadata "github.com/dapr/components-contrib/metadata"
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -190,10 +192,23 @@ func (r *rabbitMQ) publishSync(req *pubsub.PublishRequest) (rabbitMQChannelBroke
|
|||
routingKey = val
|
||||
}
|
||||
|
||||
ttl, ok, err := contribMetadata.TryGetTTL(req.Metadata)
|
||||
if err != nil {
|
||||
r.logger.Warnf("%s publishing to %s failed parse TryGetTTL: %v, it is ignored.", logMessagePrefix, req.Topic, err)
|
||||
}
|
||||
var expiration string
|
||||
if ok {
|
||||
// RabbitMQ expects the duration in ms
|
||||
expiration = strconv.FormatInt(ttl.Milliseconds(), 10)
|
||||
} else if r.metadata.defaultQueueTTL != nil {
|
||||
expiration = strconv.FormatInt(r.metadata.defaultQueueTTL.Milliseconds(), 10)
|
||||
}
|
||||
|
||||
confirm, err := r.channel.PublishWithDeferredConfirmWithContext(r.ctx, req.Topic, routingKey, false, false, amqp.Publishing{
|
||||
ContentType: "text/plain",
|
||||
Body: req.Data,
|
||||
DeliveryMode: r.metadata.deliveryMode,
|
||||
Expiration: expiration,
|
||||
})
|
||||
if err != nil {
|
||||
r.logger.Errorf("%s publishing to %s failed in channel.Publish: %v", logMessagePrefix, req.Topic, err)
|
||||
|
@ -545,7 +560,7 @@ func (r *rabbitMQ) Close() error {
|
|||
}
|
||||
|
||||
func (r *rabbitMQ) Features() []pubsub.Feature {
|
||||
return nil
|
||||
return []pubsub.Feature{pubsub.FeatureMessageTTL}
|
||||
}
|
||||
|
||||
func mustReconnect(channel rabbitMQChannelBroker, err error) bool {
|
||||
|
|
|
@ -27,3 +27,11 @@ This project aims to test the RabbitMQ Pub/Sub component under various condition
|
|||
* the total number of the messages received by subscribers "B"
|
||||
* App: Simulates periodic errors
|
||||
* Component: Retries on error
|
||||
* Test TTL is regarded.
|
||||
* Setting a large TTL only at the message level but not component level, wait for a small period, and verify that the message is received.
|
||||
* Setting a TTL only at the message level but not component level expires messages correctly
|
||||
* Setting a default TTL at the component level expires messages correctly
|
||||
* Create component spec with the field `ttlInSeconds`.
|
||||
* Run dapr application with component.
|
||||
* Send a message, wait TTL seconds, and verify the message is deleted/expired.
|
||||
* Setting a TTL at the component level and message level ignores the default component level TTL and always uses the message level TTL specified
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: msg-ttl-pubsub
|
||||
namespace: default
|
||||
spec:
|
||||
type: pubsub.rabbitmq
|
||||
version: v1
|
||||
metadata:
|
||||
- name: consumerID
|
||||
value: msg
|
||||
- name: host
|
||||
value: "amqp://test:test@localhost:5672"
|
||||
- name: durable
|
||||
value: true
|
||||
- name: deletedWhenUnused
|
||||
value: false
|
||||
- name: requeueInFailure
|
||||
value: true
|
|
@ -0,0 +1,21 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: overwrite-ttl-pubsub
|
||||
namespace: default
|
||||
spec:
|
||||
type: pubsub.rabbitmq
|
||||
version: v1
|
||||
metadata:
|
||||
- name: consumerID
|
||||
value: overwrite
|
||||
- name: host
|
||||
value: "amqp://test:test@localhost:5672"
|
||||
- name: durable
|
||||
value: true
|
||||
- name: deletedWhenUnused
|
||||
value: false
|
||||
- name: requeueInFailure
|
||||
value: true
|
||||
- name: ttlInSeconds
|
||||
value: 30 # Short TTL for easier testing
|
|
@ -0,0 +1,21 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: queue-ttl-pubsub
|
||||
namespace: default
|
||||
spec:
|
||||
type: pubsub.rabbitmq
|
||||
version: v1
|
||||
metadata:
|
||||
- name: consumerID
|
||||
value: queue
|
||||
- name: host
|
||||
value: "amqp://test:test@localhost:5672"
|
||||
- name: durable
|
||||
value: true
|
||||
- name: deletedWhenUnused
|
||||
value: false
|
||||
- name: requeueInFailure
|
||||
value: true
|
||||
- name: ttlInSeconds
|
||||
value: 10 # Short TTL for easier testing
|
|
@ -17,24 +17,27 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
daprClient "github.com/dapr/go-sdk/client"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/multierr"
|
||||
|
||||
// Pub/Sub.
|
||||
|
||||
pubsub_rabbitmq "github.com/dapr/components-contrib/pubsub/rabbitmq"
|
||||
pubsub_loader "github.com/dapr/dapr/pkg/components/pubsub"
|
||||
"github.com/dapr/dapr/pkg/runtime"
|
||||
"github.com/dapr/go-sdk/service/common"
|
||||
"github.com/dapr/kit/logger"
|
||||
kit_retry "github.com/dapr/kit/retry"
|
||||
|
||||
pubsub_rabbitmq "github.com/dapr/components-contrib/pubsub/rabbitmq"
|
||||
|
||||
"github.com/dapr/components-contrib/tests/certification/embedded"
|
||||
"github.com/dapr/components-contrib/tests/certification/flow"
|
||||
"github.com/dapr/components-contrib/tests/certification/flow/app"
|
||||
|
@ -47,26 +50,34 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
sidecarName1 = "dapr-1"
|
||||
sidecarName2 = "dapr-2"
|
||||
sidecarName3 = "dapr-3"
|
||||
appID1 = "app-1"
|
||||
appID2 = "app-2"
|
||||
appID3 = "app-3"
|
||||
clusterName = "rabbitmqcertification"
|
||||
dockerComposeYAML = "docker-compose.yml"
|
||||
numMessages = 1000
|
||||
errFrequency = 100
|
||||
appPort = 8000
|
||||
sidecarName1 = "dapr-1"
|
||||
sidecarName2 = "dapr-2"
|
||||
sidecarName3 = "dapr-3"
|
||||
sidecarNameTTLClient = "dapr-ttl-client"
|
||||
appID1 = "app-1"
|
||||
appID2 = "app-2"
|
||||
appID3 = "app-3"
|
||||
clusterName = "rabbitmqcertification"
|
||||
dockerComposeYAML = "docker-compose.yml"
|
||||
numMessages = 1000
|
||||
errFrequency = 100
|
||||
appPort = 8000
|
||||
|
||||
rabbitMQURL = "amqp://test:test@localhost:5672"
|
||||
|
||||
pubsubAlpha = "mq-alpha"
|
||||
pubsubBeta = "mq-beta"
|
||||
pubsubAlpha = "mq-alpha"
|
||||
pubsubBeta = "mq-beta"
|
||||
pubsubMessageOnlyTTL = "msg-ttl-pubsub"
|
||||
pubsubQueueOnlyTTL = "overwrite-ttl-pubsub"
|
||||
pubsubOverwriteTTL = "queue-ttl-pubsub"
|
||||
|
||||
topicRed = "red"
|
||||
topicBlue = "blue"
|
||||
topicGreen = "green"
|
||||
|
||||
topicTTL1 = "ttl1"
|
||||
topicTTL2 = "ttl2"
|
||||
topicTTL3 = "ttl3"
|
||||
)
|
||||
|
||||
type Consumer struct {
|
||||
|
@ -325,6 +336,212 @@ func TestRabbitMQ(t *testing.T) {
|
|||
Run()
|
||||
}
|
||||
|
||||
func TestRabbitMQTTL(t *testing.T) {
|
||||
rand.Seed(time.Now().UTC().UnixNano())
|
||||
log := logger.NewLogger("dapr.components")
|
||||
// log.SetOutputLevel(logger.DebugLevel)
|
||||
|
||||
MessageOnlyMessages, QueueOnlyMessages, OverwriteMessages := watcher.NewUnordered(), watcher.NewUnordered(), watcher.NewUnordered()
|
||||
fullMessages := watcher.NewUnordered()
|
||||
// Application logic that tracks messages from a topic.
|
||||
application := func(pubsubName, topic string, w *watcher.Watcher) app.SetupFn {
|
||||
return func(ctx flow.Context, s common.Service) (err error) {
|
||||
// Simulate periodic errors.
|
||||
sim := simulate.PeriodicError(ctx, errFrequency)
|
||||
err = multierr.Combine(
|
||||
err,
|
||||
s.AddTopicEventHandler(&common.Subscription{
|
||||
PubsubName: pubsubName,
|
||||
Topic: topic,
|
||||
Route: fmt.Sprintf("/%s", topic),
|
||||
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
|
||||
if err := sim(); err != nil {
|
||||
log.Debugf("Simulated error - pubsub: %s, topic: %s, id: %s, data: %s", e.PubsubName, e.Topic, e.ID, e.Data)
|
||||
return true, err
|
||||
}
|
||||
|
||||
// Track/Observe the data of the event.
|
||||
w.Observe(e.Data)
|
||||
log.Debugf("Event - pubsub: %s, topic: %s, id: %s, data: %s", e.PubsubName, e.Topic, e.ID, e.Data)
|
||||
return false, nil
|
||||
}),
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
sendMessage := func(sidecarName, pubsubName, topic string, ttlInSeconds int) func(ctx flow.Context) error {
|
||||
fullMessages.Reset()
|
||||
return func(ctx flow.Context) error {
|
||||
// Declare what is expected BEFORE performing any steps
|
||||
// that will satisfy the test.
|
||||
msgs := make([]string, numMessages)
|
||||
for i := range msgs {
|
||||
msgs[i] = fmt.Sprintf("Hello, Messages %03d", i)
|
||||
}
|
||||
|
||||
sc := sidecar.GetClient(ctx, sidecarName)
|
||||
|
||||
// Send events that the application above will observe.
|
||||
log.Infof("Sending messages on topic '%s'", topic)
|
||||
|
||||
var err error
|
||||
for _, msg := range msgs {
|
||||
log.Debugf("Sending: '%s' on topic '%s'", msg, topic)
|
||||
if ttlInSeconds > 0 {
|
||||
err = sc.PublishEvent(ctx, pubsubName, topic, msg, daprClient.PublishEventWithMetadata(map[string]string{"ttlInSeconds": strconv.Itoa(ttlInSeconds)}))
|
||||
} else {
|
||||
err = sc.PublishEvent(ctx, pubsubName, topic, msg)
|
||||
}
|
||||
require.NoError(ctx, err, "error publishing message")
|
||||
fullMessages.Add(msg)
|
||||
fullMessages.Prepare(msg)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
flow.New(t, "rabbitmq ttl certification").
|
||||
// Run RabbitMQ using Docker Compose.
|
||||
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
|
||||
Step("wait for rabbitmq readiness",
|
||||
retry.Do(time.Second, 30, amqpReady(rabbitMQURL))).
|
||||
// Start dapr and app to precreate all 3 queue in rabbitmq,
|
||||
// if topic is not subscribed, then the message will be lost.
|
||||
// Sidecar will block to wait app, so we need to start app first.
|
||||
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+1),
|
||||
application(pubsubMessageOnlyTTL, topicTTL1, fullMessages))).
|
||||
Step(sidecar.Run(sidecarName1,
|
||||
embedded.WithComponentsPath("./components/ttl"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+1),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+1),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+1),
|
||||
embedded.WithProfilePort(runtime.DefaultProfilePort+1),
|
||||
componentRuntimeOptions(),
|
||||
)).
|
||||
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+2),
|
||||
application(pubsubQueueOnlyTTL, topicTTL2, QueueOnlyMessages))).
|
||||
Step(sidecar.Run(sidecarName2,
|
||||
embedded.WithComponentsPath("./components/ttl"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+2),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+2),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+2),
|
||||
embedded.WithProfilePort(runtime.DefaultProfilePort+2),
|
||||
componentRuntimeOptions(),
|
||||
)).
|
||||
Step(app.Run(appID3, fmt.Sprintf(":%d", appPort+4),
|
||||
application(pubsubOverwriteTTL, topicTTL3, OverwriteMessages))).
|
||||
Step(sidecar.Run(sidecarName3,
|
||||
embedded.WithComponentsPath("./components/ttl"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+4),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+4),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+4),
|
||||
embedded.WithProfilePort(runtime.DefaultProfilePort+4),
|
||||
componentRuntimeOptions(),
|
||||
)).
|
||||
// Wait for all queue crated and then stop.
|
||||
Step("wait", flow.Sleep(10*time.Second)).
|
||||
Step("stop sidecar1", sidecar.Stop(sidecarName1)).
|
||||
Step("stop app1", app.Stop(appID1)).
|
||||
Step("stop sidecar2", sidecar.Stop(sidecarName2)).
|
||||
Step("stop app2", app.Stop(appID2)).
|
||||
Step("stop sidecar3", sidecar.Stop(sidecarName3)).
|
||||
Step("stop app3", app.Stop(appID3)).
|
||||
// Run publishing sidecars and send to RabbitMQ.
|
||||
Step(sidecar.Run(sidecarNameTTLClient,
|
||||
embedded.WithComponentsPath("./components/ttl"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, 0),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
|
||||
embedded.WithProfilePort(runtime.DefaultProfilePort),
|
||||
componentRuntimeOptions(),
|
||||
)).
|
||||
Step("wait", flow.Sleep(5*time.Second)).
|
||||
// base test case, send message with large ttl and check messages are received.
|
||||
Step("send message only ttl messages", sendMessage(sidecarNameTTLClient, pubsubMessageOnlyTTL, topicTTL1, 100)).
|
||||
Step("wait", flow.Sleep(5*time.Second)).
|
||||
// Run the application1 logic above.
|
||||
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+1),
|
||||
application(pubsubMessageOnlyTTL, topicTTL1, fullMessages))).
|
||||
// Run the Dapr sidecar with the RabbitMQ component.
|
||||
Step(sidecar.Run(sidecarName1,
|
||||
embedded.WithComponentsPath("./components/ttl"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+1),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+1),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+1),
|
||||
embedded.WithProfilePort(runtime.DefaultProfilePort+1),
|
||||
componentRuntimeOptions(),
|
||||
)).
|
||||
Step("verify full messages", func(ctx flow.Context) error {
|
||||
// Assertion on the data.
|
||||
fullMessages.Assert(t, 3*time.Minute)
|
||||
return nil
|
||||
}).
|
||||
Step("stop sidecar", sidecar.Stop(sidecarName1)).
|
||||
Step("stop app", app.Stop(appID1)).
|
||||
// test case 1, send message with ttl and check messages are expired.
|
||||
Step("send message only ttl messages", sendMessage(sidecarNameTTLClient, pubsubMessageOnlyTTL, topicTTL1, 10)).
|
||||
Step("wait", flow.Sleep(10*time.Second)).
|
||||
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+1),
|
||||
application(pubsubMessageOnlyTTL, topicTTL1, MessageOnlyMessages))).
|
||||
Step(sidecar.Run(sidecarName1,
|
||||
embedded.WithComponentsPath("./components/ttl"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+1),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+1),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+1),
|
||||
embedded.WithProfilePort(runtime.DefaultProfilePort+1),
|
||||
componentRuntimeOptions(),
|
||||
)).
|
||||
Step("verify messages only ttl", func(ctx flow.Context) error {
|
||||
// Assertion on the data.
|
||||
MessageOnlyMessages.Assert(t, 3*time.Minute)
|
||||
return nil
|
||||
}).
|
||||
// test case 2, send message without ttl to a queue with ttl, check messages are expired.
|
||||
Step("send message without ttl to queue with ttl", sendMessage(sidecarNameTTLClient, pubsubQueueOnlyTTL, topicTTL2, 0)).
|
||||
Step("wait", flow.Sleep(10*time.Second)).
|
||||
// Run the application2 logic above.
|
||||
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+2),
|
||||
application(pubsubQueueOnlyTTL, topicTTL2, QueueOnlyMessages))).
|
||||
// Run the Dapr sidecar with the RabbitMQ component.
|
||||
Step(sidecar.Run(sidecarName2,
|
||||
embedded.WithComponentsPath("./components/ttl"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+2),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+2),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+2),
|
||||
embedded.WithProfilePort(runtime.DefaultProfilePort+2),
|
||||
componentRuntimeOptions(),
|
||||
)).
|
||||
Step("verify messages only ttl", func(ctx flow.Context) error {
|
||||
// Assertion on the data.
|
||||
QueueOnlyMessages.Assert(t, 3*time.Minute)
|
||||
return nil
|
||||
}).
|
||||
Step("send message with ttl 10 to queue with ttl 30",
|
||||
sendMessage(sidecarNameTTLClient, pubsubOverwriteTTL, topicTTL3, 10)).
|
||||
Step("wait", flow.Sleep(10*time.Second)).
|
||||
// test case 3, send message with ttl 10s to a queue with ttl 30s, wait for 10s, check messages are expired.
|
||||
Step(app.Run(appID3, fmt.Sprintf(":%d", appPort+4),
|
||||
application(pubsubOverwriteTTL, topicTTL3, OverwriteMessages))).
|
||||
// Run the Dapr sidecar with the RabbitMQ component.
|
||||
Step(sidecar.Run(sidecarName3,
|
||||
embedded.WithComponentsPath("./components/ttl"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+4),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+4),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+4),
|
||||
embedded.WithProfilePort(runtime.DefaultProfilePort+4),
|
||||
componentRuntimeOptions(),
|
||||
)).
|
||||
Step("verify messages only ttl", func(ctx flow.Context) error {
|
||||
// Assertion on the data.
|
||||
OverwriteMessages.Assert(t, 3*time.Minute)
|
||||
return nil
|
||||
}).
|
||||
Run()
|
||||
}
|
||||
|
||||
func componentRuntimeOptions() []runtime.Option {
|
||||
log := logger.NewLogger("dapr.components")
|
||||
|
||||
|
|
Loading…
Reference in New Issue