Signed-off-by: addjuarez <6789375+addjuarez@users.noreply.github.com>
This commit is contained in:
addjuarez 2023-01-12 06:24:55 +00:00
parent 646aaa802c
commit 81a4375dfb
1 changed files with 83 additions and 573 deletions

View File

@ -66,35 +66,85 @@ const (
pulsarURL = "localhost:6650"
)
func subscriberApplication(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
messagesWatcher.Observe(e.Data)
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
func publishMessages(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "error publishing message")
}
return nil
}
}
func assertMessages(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
m.Assert(ctx, 25*timeout)
}
return nil
}
}
func TestPulsar(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
consumerGroup2 := watcher.NewUnordered()
// subscriber of the given topic
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
messagesWatcher.Observe(e.Data)
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
@ -134,17 +184,7 @@ func TestPulsar(t *testing.T) {
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
m.Assert(ctx, 25*timeout)
}
return nil
}
}
flow.New(t, "pulsar certification basic test").
// Run subscriberApplication app1
@ -212,82 +252,7 @@ func TestPulsarMultipleSubsSameConsumerIDs(t *testing.T) {
metadata1 := map[string]string{
messageKey: partition1,
}
// subscriber of the given topic
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
messagesWatcher.Observe(e.Data)
ctx.Logf("Message received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
m.Assert(ctx, 25*timeout)
}
return nil
}
}
flow.New(t, "pulsar certification - single publisher and multiple subscribers with same consumer IDs").
// Run subscriberApplication app1
@ -351,83 +316,7 @@ func TestPulsarMultipleSubsDifferentConsumerIDs(t *testing.T) {
metadata := map[string]string{
messageKey: partition0,
}
// subscriber of the given topic
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
messagesWatcher.Observe(e.Data)
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
m.Assert(ctx, 25*timeout)
}
return nil
}
}
flow.New(t, "pulsar certification - single publisher and multiple subscribers with different consumer IDs").
// Run subscriberApplication app1
@ -496,83 +385,7 @@ func TestPulsarMultiplePubSubsDifferentConsumerIDs(t *testing.T) {
metadata1 := map[string]string{
messageKey: partition1,
}
// subscriber of the given topic
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
messagesWatcher.Observe(e.Data)
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
m.Assert(ctx, 25*timeout)
}
return nil
}
}
flow.New(t, "pulsar certification - multiple publishers and multiple subscribers with different consumer IDs").
// Run subscriberApplication app1
@ -638,83 +451,7 @@ func TestPulsarNonexistingTopic(t *testing.T) {
metadata := map[string]string{
messageKey: partition0,
}
// subscriber of the given topic
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
messagesWatcher.Observe(e.Data)
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
m.Assert(ctx, 25*timeout)
}
return nil
}
}
flow.New(t, "pulsar certification - non-existing topic").
// Run subscriberApplication app1
@ -766,83 +503,7 @@ func TestPulsarNetworkInterruption(t *testing.T) {
metadata := map[string]string{
messageKey: partition0,
}
// subscriber of the given topic
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
messagesWatcher.Observe(e.Data)
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
m.Assert(ctx, 25*timeout)
}
return nil
}
}
flow.New(t, "pulsar certification - network interruption").
// Run subscriberApplication app1
@ -891,82 +552,7 @@ func TestPulsarNetworkInterruption(t *testing.T) {
func TestPulsarPersitant(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
// subscriber of the given topic
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
messagesWatcher.Observe(e.Data)
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
m.Assert(ctx, 25*timeout)
}
return nil
}
}
flow.New(t, "pulsar certification persistant test").
// Run subscriberApplication app1
@ -1019,83 +605,7 @@ func TestPulsarDelay(t *testing.T) {
metadata := map[string]string{
"deliverAfter": "30s",
}
// subscriber of the given topic
subscriberApplication := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 180)
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
messagesWatcher.Observe(e.Data)
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
m.Assert(ctx, 25*timeout)
}
return nil
}
}
assertMessagesNot := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages