adds Verify with an optional parameter `disableEntityManagement` set to true test
Signed-off-by: Roberto Rojas <robertojrojas@gmail.com>
This commit is contained in:
parent
0bfca37e9f
commit
ecf18343c4
|
@ -32,4 +32,9 @@ The purpose of this module is to provide tests that certify the AWS SNS/SQS Pubs
|
|||
- Run dapr application with 1 publisher and 1 subscriber
|
||||
- Verify the creation of service bus
|
||||
- Send messages to the service created
|
||||
- Verify that subscriber received all the messages
|
||||
- Verify that subscriber received all the messages
|
||||
- Verify with an optional parameter `disableEntityManagement` set to true (TestSNSSQSEntityManagement)
|
||||
- Run dapr application with 1 publisher
|
||||
- Publisher tries to publish to 1 topic that is not present
|
||||
- Verify that the topic and subscriptions do not get created
|
||||
- Verify that the error is returned saying that the topic not present when publishing
|
|
@ -0,0 +1,40 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: snssqs-cert-tests
|
||||
spec:
|
||||
type: pubsub.snssqs
|
||||
version: v1
|
||||
metadata:
|
||||
- name: consumerID
|
||||
value: snssqscerttest1
|
||||
- name: timeoutInSec
|
||||
value: 60
|
||||
- name: handlerTimeoutInSec
|
||||
value: 60
|
||||
- name: disableEntityManagement
|
||||
value: "true"
|
||||
- name: maxDeliveryCount
|
||||
value: 3
|
||||
- name: lockDurationInSec
|
||||
value: 60
|
||||
- name: lockRenewalInSec
|
||||
value: 20
|
||||
- name: maxActiveMessages
|
||||
value: 10000
|
||||
- name: maxConcurrentHandlers
|
||||
value: 10
|
||||
- name: defaultMessageTimeToLiveInSec
|
||||
value: 10
|
||||
- name: autoDeleteOnIdleInSec
|
||||
value: 3600
|
||||
- name: minConnectionRecoveryInSec
|
||||
value: 2
|
||||
- name: maxConnectionRecoveryInSec
|
||||
value: 300
|
||||
- name: maxRetriableErrorsPerSec
|
||||
value: 10
|
||||
- name: publishMaxRetries
|
||||
value: 5
|
||||
- name: publishInitialRetryInternalInMs
|
||||
value: 500
|
|
@ -56,6 +56,7 @@ const (
|
|||
topicActiveName = "certification-pubsub-topic-active"
|
||||
topicPassiveName = "certification-pubsub-topic-passive"
|
||||
topicToBeCreated = "certification-topic-per-test-run"
|
||||
topicDefaultName = "certification-topic-default"
|
||||
partition0 = "partition-0"
|
||||
partition1 = "partition-1"
|
||||
)
|
||||
|
@ -664,6 +665,94 @@ func TestSNSSQSNonexistingTopic(t *testing.T) {
|
|||
Run()
|
||||
}
|
||||
|
||||
// Verify with an optional parameter `disableEntityManagement` set to true
|
||||
func TestSNSSQSEntityManagement(t *testing.T) {
|
||||
// TODO: Modify it to looks for component init error in the sidecar itself.
|
||||
consumerGroup1 := watcher.NewUnordered()
|
||||
|
||||
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
|
||||
metadata := map[string]string{
|
||||
messageKey: partition0,
|
||||
}
|
||||
|
||||
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
|
||||
return func(ctx flow.Context, s common.Service) error {
|
||||
// Setup the /orders event handler.
|
||||
return multierr.Combine(
|
||||
s.AddTopicEventHandler(&common.Subscription{
|
||||
PubsubName: pubsubName,
|
||||
Topic: topicName,
|
||||
Route: "/orders",
|
||||
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
|
||||
// Track/Observe the data of the event.
|
||||
messagesWatcher.Observe(e.Data)
|
||||
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
|
||||
return false, nil
|
||||
}),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
||||
return func(ctx flow.Context) error {
|
||||
// prepare the messages
|
||||
messages := make([]string, numMessages)
|
||||
for i := range messages {
|
||||
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
|
||||
}
|
||||
|
||||
// add the messages as expectations to the watchers
|
||||
for _, messageWatcher := range messageWatchers {
|
||||
messageWatcher.ExpectStrings(messages...)
|
||||
}
|
||||
|
||||
// get the sidecar (dapr) client
|
||||
client := sidecar.GetClient(ctx, sidecarName)
|
||||
|
||||
// publish messages
|
||||
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
|
||||
|
||||
var publishOptions dapr.PublishEventOption
|
||||
|
||||
if metadata != nil {
|
||||
publishOptions = dapr.PublishEventWithMetadata(metadata)
|
||||
}
|
||||
|
||||
for _, message := range messages {
|
||||
ctx.Logf("Publishing: %q", message)
|
||||
var err error
|
||||
|
||||
if publishOptions != nil {
|
||||
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
|
||||
} else {
|
||||
err = client.PublishEvent(ctx, pubsubName, topicName, message)
|
||||
}
|
||||
// Error is expected as the topic does not exist
|
||||
require.Error(ctx, err, "error publishing message")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
flow.New(t, "SNSSQS certification - entity management disabled").
|
||||
|
||||
// Run subscriberApplication app1
|
||||
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+portOffset),
|
||||
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
|
||||
|
||||
// Run the Dapr sidecar
|
||||
Step(sidecar.Run(sidecarName1,
|
||||
embedded.WithComponentsPath("./components/entity_mgmt"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset),
|
||||
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset),
|
||||
componentRuntimeOptions(),
|
||||
)).
|
||||
Step(fmt.Sprintf("publish messages to topicDefault: %s", topicDefaultName), publishMessages(metadata, sidecarName1, topicDefaultName, consumerGroup1)).
|
||||
Run()
|
||||
}
|
||||
|
||||
func componentRuntimeOptions() []runtime.Option {
|
||||
log := logger.NewLogger("dapr.components")
|
||||
|
||||
|
|
Loading…
Reference in New Issue