Add cert test for roundrobin

Signed-off-by: Joni Collinge <jonathancollinge@live.com>
This commit is contained in:
Joni Collinge 2022-12-16 16:45:17 +00:00
parent f57727f4b4
commit 28003fcf4f
No known key found for this signature in database
GPG Key ID: BF9B59005264DD95
3 changed files with 130 additions and 16 deletions

View File

@ -286,14 +286,14 @@ func (c *Client) shouldCreateSubscription(parentCtx context.Context, topic, subs
return true, nil
}
bothTrue := func(a, b *bool) bool {
neq := func(a, b *bool) bool {
if a == nil || b == nil {
return false
return true
}
return *a && *b
return *a != *b
}
if bothTrue(res.RequiresSession, &opts.RequireSessions) {
if neq(res.RequiresSession, &opts.RequireSessions) {
return false, fmt.Errorf("subscription %s already exists but session requirement doesn't match", subscription)
}

View File

@ -31,7 +31,7 @@ import (
const (
requireSessionsMetadataKey = "requireSessions"
sessionIdleTimeoutMetadataKey = "sessionIdleTimeout"
sessionIdleTimeoutMetadataKey = "sessionIdleTimeoutInSec"
maxConcurrentSessionsMetadataKey = "maxConcurrentSessions"
defaultMaxBulkSubCount = 100

View File

@ -1058,15 +1058,15 @@ func TestServicebusAuthentication(t *testing.T) {
Run()
}
// TestServicebusWithSessions tests that if we publish messages to the same
// TestServicebusWithSessionsFIFO tests that if we publish messages to the same
// topic but with 2 different session ids (session1 and session2), then the
// session messages are received in order e.g. session1 messages are not
// interleaved with session2 messages in the consumer.
func TestServicebusWithSessions(t *testing.T) {
// receiver only receives messages from a single session and in FIFO order.
func TestServicebusWithSessionsFIFO(t *testing.T) {
topic := "sessions-fifo"
session1 := "session1"
session2 := "session2"
session1Watcher := watcher.NewOrdered()
sessionWatcher := watcher.NewOrdered()
// subscriber of the given topic
subscriberApplicationWithSessions := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
@ -1215,7 +1215,7 @@ func TestServicebusWithSessions(t *testing.T) {
// Run subscriberApplicationWithSessions app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
subscriberApplicationWithSessions(appID1, topicActiveName, session1Watcher))).
subscriberApplicationWithSessions(appID1, topic, sessionWatcher))).
// Run the Dapr sidecar with the eventhubs component 1, with permission at namespace level
Step(sidecar.Run(sidecarName1,
@ -1227,15 +1227,129 @@ func TestServicebusWithSessions(t *testing.T) {
)).
Step("publish messages to topic1 on session 2", publishMessages(map[string]string{
"SessionId": session2,
}, sidecarName1, topicActiveName, session1Watcher)).
}, sidecarName1, topic, sessionWatcher)).
Step("publish messages to topic1 on session 1", publishMessages(map[string]string{
"SessionId": session1,
}, sidecarName1, topicActiveName, session1Watcher)).
}, sidecarName1, topic, sessionWatcher)).
Step("publish messages to topic1 on session 2", publishMessages(map[string]string{
"SessionId": session2,
}, sidecarName1, topicActiveName, session1Watcher)).
Step("verify if app1 has recevied messages published to only a single session", assertMessages(10*time.Second, session1Watcher)).
Step("reset", flow.Reset(session1Watcher)).
}, sidecarName1, topic, sessionWatcher)).
Step("verify if app1 has recevied messages published to only a single session", assertMessages(10*time.Second, sessionWatcher)).
Step("reset", flow.Reset(sessionWatcher)).
Run()
}
// TestServicebusWithSessionsRoundRobin tests that if we publish messages to the same
// topic but with 2 different session ids (session1 and session2), then eventually
// the receiver will receive messages from both the sessions.
func TestServicebusWithSessionsRoundRobin(t *testing.T) {
topic := "sessions-rr"
session1 := "session1"
session2 := "session2"
sessionWatcher := watcher.NewUnordered()
// subscriber of the given topic
subscriberApplicationWithSessions := func(appID string, topicName string, messageWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
Metadata: map[string]string{
"requireSessions": "true",
"maxConcurrentSessions": "1",
"sessionIdleTimeoutInSec": "2", // timeout and try another session
},
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
// Track/Observe the data of the event.
messageWatcher.Observe(e.Data)
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
var msgSuffix string
if metadata["SessionId"] != "" {
msgSuffix = fmt.Sprintf(", sessionId: %s", metadata["SessionId"])
}
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s%s", metadata[messageKey], topicName, i, uuid.New().String(), msgSuffix)
}
// add the messages as expectations to the watchers
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "error publishing message")
}
return nil
}
}
assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
m.Assert(ctx, 25*timeout)
}
return nil
}
}
flow.New(t, "servicebus certification sessions test").
// Run subscriberApplicationWithSessions app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
subscriberApplicationWithSessions(appID1, topic, sessionWatcher))).
// Run the Dapr sidecar with the eventhubs component 1, with permission at namespace level
Step(sidecar.Run(sidecarName1,
embedded.WithComponentsPath("./components/consumer_one"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
componentRuntimeOptions(),
)).
Step("publish messages to topic1 on session 2", publishMessages(map[string]string{
"SessionId": session2,
}, sidecarName1, topic, sessionWatcher)).
Step("publish messages to topic1 on session 1", publishMessages(map[string]string{
"SessionId": session1,
}, sidecarName1, topic, sessionWatcher)).
Step("verify if app1 has recevied messages published to both sessions", assertMessages(1*time.Second, sessionWatcher)).
Step("reset", flow.Reset(sessionWatcher)).
Run()
}