Moved code to add metadata to ASB messages to shared impl too

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
ItalyPaleAle 2022-10-19 20:08:07 +00:00
parent 8bce0d5200
commit 7ecfb6edd0
8 changed files with 342 additions and 986 deletions

View File

@ -27,7 +27,6 @@ import (
"github.com/dapr/components-contrib/bindings"
azauth "github.com/dapr/components-contrib/internal/authentication/azure"
impl "github.com/dapr/components-contrib/internal/component/azure/servicebus"
contribMetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
)
@ -138,32 +137,9 @@ func (a *AzureServiceBusQueues) Invoke(invokeCtx context.Context, req *bindings.
return nil, fmt.Errorf("failed to create a sender for the Service Bus queue: %w", err)
}
msg := &servicebus.Message{
Body: req.Data,
ApplicationProperties: make(map[string]interface{}),
}
if val, ok := req.Metadata[id]; ok && val != "" {
msg.MessageID = &val
}
if val, ok := req.Metadata[correlationID]; ok && val != "" {
msg.CorrelationID = &val
}
// Include incoming metadata in the message to be used when it is read.
for k, v := range req.Metadata {
// Don't include the values that are saved in MessageID or CorrelationID.
if k == id || k == correlationID {
continue
}
msg.ApplicationProperties[k] = v
}
ttl, ok, err := contribMetadata.TryGetTTL(req.Metadata)
msg, err := impl.NewASBMessageFromInvokeRequest(req)
if err != nil {
return nil, err
}
if ok {
msg.TimeToLive = &ttl
return nil, fmt.Errorf("failed to create message: %w", err)
}
// Send the message

View File

@ -0,0 +1,175 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package servicebus
import (
"fmt"
"net/http"
"time"
azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/dapr/components-contrib/bindings"
mdutils "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/ptr"
)
const (
// MessageKeyMessageID defines the metadata key for the message id.
MessageKeyMessageID = "MessageId" // read, write.
// MessageKeyMessageIDAlias is an alias for "MessageId" for write only, for backwards-compatibility
MessageKeyMessageIDAlias = "id"
// MessageKeyCorrelationID defines the metadata key for the correlation id.
MessageKeyCorrelationID = "CorrelationId" // read, write.
// MessageKeyCorrelationIDAlias is an alias for "CorrelationId" for write only, for backwards-compatibility
MessageKeyCorrelationIDAlias = "correlationID"
// MessageKeySessionID defines the metadata key for the session id.
MessageKeySessionID = "SessionId" // read, write.
// MessageKeyLabel defines the metadata key for the label.
MessageKeyLabel = "Label" // read, write.
// MessageKeyReplyTo defines the metadata key for the reply to value.
MessageKeyReplyTo = "ReplyTo" // read, write.
// MessageKeyTo defines the metadata key for the to value.
MessageKeyTo = "To" // read, write.
// MessageKeyPartitionKey defines the metadata key for the partition key.
MessageKeyPartitionKey = "PartitionKey" // read, write.
// MessageKeyContentType defines the metadata key for the content type.
MessageKeyContentType = "ContentType" // read, write.
// MessageKeyDeliveryCount defines the metadata key for the delivery count.
MessageKeyDeliveryCount = "DeliveryCount" // read.
// MessageKeyLockedUntilUtc defines the metadata key for the locked until utc value.
MessageKeyLockedUntilUtc = "LockedUntilUtc" // read.
// MessageKeyLockToken defines the metadata key for the lock token.
MessageKeyLockToken = "LockToken" // read.
// MessageKeyEnqueuedTimeUtc defines the metadata key for the enqueued time utc value.
MessageKeyEnqueuedTimeUtc = "EnqueuedTimeUtc" // read.
// MessageKeySequenceNumber defines the metadata key for the sequence number.
MessageKeySequenceNumber = "SequenceNumber" // read.
// MessageKeyScheduledEnqueueTimeUtc defines the metadata key for the scheduled enqueue time utc value.
MessageKeyScheduledEnqueueTimeUtc = "ScheduledEnqueueTimeUtc" // read, write.
// MessageKeyReplyToSessionID defines the metadata key for the reply to session id.
// Currently unused.
MessageKeyReplyToSessionID = "ReplyToSessionId" // read, write.
)
// NewASBMessageFromPubsubRequest builds a new Azure Service Bus message from a PublishRequest.
func NewASBMessageFromPubsubRequest(req *pubsub.PublishRequest) (*azservicebus.Message, error) {
asbMsg := &azservicebus.Message{
Body: req.Data,
}
err := addMetadataToMessage(asbMsg, req.Metadata)
return asbMsg, err
}
// NewASBMessageFromBulkMessageEntry builds a new Azure Service Bus message from a BulkMessageEntry.
func NewASBMessageFromBulkMessageEntry(entry pubsub.BulkMessageEntry) (*azservicebus.Message, error) {
asbMsg := &azservicebus.Message{
Body: entry.Event,
ContentType: &entry.ContentType,
}
err := addMetadataToMessage(asbMsg, entry.Metadata)
return asbMsg, err
}
// NewASBMessageFromInvokeRequest builds a new Azure Service Bus message from a binding's Invoke request.
func NewASBMessageFromInvokeRequest(req *bindings.InvokeRequest) (*azservicebus.Message, error) {
asbMsg := &azservicebus.Message{
Body: req.Data,
}
err := addMetadataToMessage(asbMsg, req.Metadata)
return asbMsg, err
}
// Adds metadata to the message.
// Reference for Azure Service Bus specific properties: https://docs.microsoft.com/en-us/rest/api/servicebus/message-headers-and-properties#message-headers
func addMetadataToMessage(asbMsg *azservicebus.Message, metadata map[string]string) error {
asbMsg.ApplicationProperties = make(map[string]interface{}, len(metadata))
for k, v := range metadata {
// Note: do not just do &v because we're in a loop
if v == "" {
continue
}
switch k {
// Common keys
case mdutils.TTLMetadataKey:
// Ignore v here and use TryGetTTL for the validation it performs
ttl, ok, _ := mdutils.TryGetTTL(metadata)
if ok {
asbMsg.TimeToLive = &ttl
}
// Keys with aliases
case MessageKeyMessageID, MessageKeyMessageIDAlias:
if asbMsg.MessageID == nil {
asbMsg.MessageID = ptr.Of(v)
}
case MessageKeyCorrelationID, MessageKeyCorrelationIDAlias:
if asbMsg.CorrelationID == nil {
asbMsg.CorrelationID = ptr.Of(v)
}
// String types
case MessageKeySessionID:
asbMsg.SessionID = ptr.Of(v)
case MessageKeyLabel:
asbMsg.Subject = ptr.Of(v)
case MessageKeyReplyTo:
asbMsg.ReplyTo = ptr.Of(v)
case MessageKeyTo:
asbMsg.To = ptr.Of(v)
case MessageKeyPartitionKey:
asbMsg.PartitionKey = ptr.Of(v)
case MessageKeyContentType:
asbMsg.ContentType = ptr.Of(v)
// Time
case MessageKeyScheduledEnqueueTimeUtc:
timeVal, err := time.Parse(http.TimeFormat, v)
if err == nil {
asbMsg.ScheduledEnqueueTime = &timeVal
}
// Fallback: set as application property
default:
asbMsg.ApplicationProperties[k] = v
}
}
if asbMsg.PartitionKey != nil && asbMsg.SessionID != nil && *asbMsg.PartitionKey != *asbMsg.SessionID {
return fmt.Errorf("session id %s and partition key %s should be equal when both present", *asbMsg.SessionID, *asbMsg.PartitionKey)
}
return nil
}

View File

@ -0,0 +1,131 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package servicebus
import (
"net/http"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
var (
testMessageID = "testMessageId"
testCorrelationID = "testCorrelationId"
testSessionID = "testSessionId"
testLabel = "testLabel"
testReplyTo = "testReplyTo"
testTo = "testTo"
testPartitionKey = testSessionID
testPartitionKeyUnique = "testPartitionKey"
testContentType = "testContentType"
nowUtc = time.Now().UTC()
testScheduledEnqueueTimeUtc = nowUtc.Format(http.TimeFormat)
testLockTokenString = "bG9ja3Rva2VuAAAAAAAAAA==" //nolint:gosec
testLockTokenBytes = [16]byte{108, 111, 99, 107, 116, 111, 107, 101, 110}
testDeliveryCount = uint32(1)
testSampleTime = time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
testSampleTimeHTTPFormat = "Thu, 01 Jan 1970 00:00:00 GMT"
testSequenceNumber = int64(1)
)
func TestAddMetadataToMessage(t *testing.T) {
testCases := []struct {
name string
metadata map[string]string
expectedAzServiceBusMessage azservicebus.Message
expectError bool
}{
{
name: "Maps pubsub request to azure service bus message.",
metadata: map[string]string{
MessageKeyMessageID: testMessageID,
MessageKeyCorrelationID: testCorrelationID,
MessageKeySessionID: testSessionID,
MessageKeyLabel: testLabel,
MessageKeyReplyTo: testReplyTo,
MessageKeyTo: testTo,
MessageKeyPartitionKey: testPartitionKey,
MessageKeyContentType: testContentType,
MessageKeyScheduledEnqueueTimeUtc: testScheduledEnqueueTimeUtc,
},
expectedAzServiceBusMessage: azservicebus.Message{
MessageID: &testMessageID,
CorrelationID: &testCorrelationID,
SessionID: &testSessionID,
Subject: &testLabel,
ReplyTo: &testReplyTo,
To: &testTo,
PartitionKey: &testPartitionKey,
ScheduledEnqueueTime: &nowUtc,
ContentType: &testContentType,
},
expectError: false,
},
{
name: "Errors when partition key and session id set but not equal.",
metadata: map[string]string{
MessageKeyMessageID: testMessageID,
MessageKeyCorrelationID: testCorrelationID,
MessageKeySessionID: testSessionID,
MessageKeyLabel: testLabel,
MessageKeyReplyTo: testReplyTo,
MessageKeyTo: testTo,
MessageKeyPartitionKey: testPartitionKeyUnique,
MessageKeyContentType: testContentType,
},
expectedAzServiceBusMessage: azservicebus.Message{
MessageID: &testMessageID,
CorrelationID: &testCorrelationID,
SessionID: &testSessionID,
Subject: &testLabel,
ReplyTo: &testReplyTo,
To: &testTo,
PartitionKey: &testPartitionKey,
ContentType: &testContentType,
},
expectError: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// act.
msg := &azservicebus.Message{}
err := addMetadataToMessage(msg, tc.metadata)
// assert.
if tc.expectError {
require.NotNil(t, err)
} else {
require.Nil(t, err)
assert.Equal(t, tc.expectedAzServiceBusMessage.Body, msg.Body)
assert.Equal(t, tc.expectedAzServiceBusMessage.MessageID, msg.MessageID)
assert.Equal(t, tc.expectedAzServiceBusMessage.CorrelationID, msg.CorrelationID)
assert.Equal(t, tc.expectedAzServiceBusMessage.SessionID, msg.SessionID)
assert.Equal(t, tc.expectedAzServiceBusMessage.ContentType, msg.ContentType)
assert.Equal(t, tc.expectedAzServiceBusMessage.ReplyTo, msg.ReplyTo)
assert.Equal(t, tc.expectedAzServiceBusMessage.TimeToLive, msg.TimeToLive)
assert.Equal(t, tc.expectedAzServiceBusMessage.To, msg.To)
assert.Equal(t, tc.expectedAzServiceBusMessage.Subject, msg.Subject)
assert.Equal(t, tc.expectedAzServiceBusMessage.PartitionKey, msg.PartitionKey)
assert.Equal(t, tc.expectedAzServiceBusMessage.ScheduledEnqueueTime.Unix(), msg.ScheduledEnqueueTime.Unix())
}
})
}
}

View File

@ -15,65 +15,16 @@ package servicebus
import (
"encoding/base64"
"fmt"
"net/http"
"strconv"
"time"
azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/google/uuid"
contribMetadata "github.com/dapr/components-contrib/metadata"
impl "github.com/dapr/components-contrib/internal/component/azure/servicebus"
"github.com/dapr/components-contrib/pubsub"
)
const (
// MessageIDMetadataKey defines the metadata key for the message id.
MessageIDMetadataKey = "MessageId" // read, write.
// CorrelationIDMetadataKey defines the metadata key for the correlation id.
CorrelationIDMetadataKey = "CorrelationId" // read, write.
// SessionIDMetadataKey defines the metadata key for the session id.
SessionIDMetadataKey = "SessionId" // read, write.
// LabelMetadataKey defines the metadata key for the label.
LabelMetadataKey = "Label" // read, write.
// ReplyToMetadataKey defines the metadata key for the reply to value.
ReplyToMetadataKey = "ReplyTo" // read, write.
// ToMetadataKey defines the metadata key for the to value.
ToMetadataKey = "To" // read, write.
// PartitionKeyMetadataKey defines the metadata key for the partition key.
PartitionKeyMetadataKey = "PartitionKey" // read, write.
// ContentTypeMetadataKey defines the metadata key for the content type.
ContentTypeMetadataKey = "ContentType" // read, write.
// DeliveryCountMetadataKey defines the metadata key for the delivery count.
DeliveryCountMetadataKey = "DeliveryCount" // read.
// LockedUntilUtcMetadataKey defines the metadata key for the locked until utc value.
LockedUntilUtcMetadataKey = "LockedUntilUtc" // read.
// LockTokenMetadataKey defines the metadata key for the lock token.
LockTokenMetadataKey = "LockToken" // read.
// EnqueuedTimeUtcMetadataKey defines the metadata key for the enqueued time utc value.
EnqueuedTimeUtcMetadataKey = "EnqueuedTimeUtc" // read.
// SequenceNumberMetadataKey defines the metadata key for the sequence number.
SequenceNumberMetadataKey = "SequenceNumber" // read.
// ScheduledEnqueueTimeUtcMetadataKey defines the metadata key for the scheduled enqueue time utc value.
ScheduledEnqueueTimeUtcMetadataKey = "ScheduledEnqueueTimeUtc" // read, write.
// ReplyToSessionID defines the metadata key for the reply to session id.
ReplyToSessionID = "ReplyToSessionId" // read, write.
)
func NewPubsubMessageFromASBMessage(asbMsg *azservicebus.ReceivedMessage, topic string) (*pubsub.NewMessage, error) {
pubsubMsg := &pubsub.NewMessage{
Topic: topic,
@ -111,142 +62,60 @@ func addMessageAttributesToMetadata(metadata map[string]string, asbMsg *azservic
}
if asbMsg.MessageID != "" {
addToMetadata(metadata, MessageIDMetadataKey, asbMsg.MessageID)
addToMetadata(metadata, impl.MessageKeyMessageID, asbMsg.MessageID)
}
if asbMsg.SessionID != nil {
addToMetadata(metadata, SessionIDMetadataKey, *asbMsg.SessionID)
addToMetadata(metadata, impl.MessageKeySessionID, *asbMsg.SessionID)
}
if asbMsg.CorrelationID != nil && *asbMsg.CorrelationID != "" {
addToMetadata(metadata, CorrelationIDMetadataKey, *asbMsg.CorrelationID)
addToMetadata(metadata, impl.MessageKeyCorrelationID, *asbMsg.CorrelationID)
}
if asbMsg.Subject != nil && *asbMsg.Subject != "" {
addToMetadata(metadata, LabelMetadataKey, *asbMsg.Subject)
addToMetadata(metadata, impl.MessageKeyLabel, *asbMsg.Subject)
}
if asbMsg.ReplyTo != nil && *asbMsg.ReplyTo != "" {
addToMetadata(metadata, ReplyToMetadataKey, *asbMsg.ReplyTo)
addToMetadata(metadata, impl.MessageKeyReplyTo, *asbMsg.ReplyTo)
}
if asbMsg.To != nil && *asbMsg.To != "" {
addToMetadata(metadata, ToMetadataKey, *asbMsg.To)
addToMetadata(metadata, impl.MessageKeyTo, *asbMsg.To)
}
if asbMsg.ContentType != nil && *asbMsg.ContentType != "" {
addToMetadata(metadata, ContentTypeMetadataKey, *asbMsg.ContentType)
addToMetadata(metadata, impl.MessageKeyContentType, *asbMsg.ContentType)
}
if asbMsg.LockToken != [16]byte{} {
addToMetadata(metadata, LockTokenMetadataKey, base64.StdEncoding.EncodeToString(asbMsg.LockToken[:]))
addToMetadata(metadata, impl.MessageKeyLockToken, base64.StdEncoding.EncodeToString(asbMsg.LockToken[:]))
}
// Always set delivery count.
addToMetadata(metadata, DeliveryCountMetadataKey, strconv.FormatInt(int64(asbMsg.DeliveryCount), 10))
addToMetadata(metadata, impl.MessageKeyDeliveryCount, strconv.FormatInt(int64(asbMsg.DeliveryCount), 10))
if asbMsg.EnqueuedTime != nil {
// Preserve RFC2616 time format.
addToMetadata(metadata, EnqueuedTimeUtcMetadataKey, asbMsg.EnqueuedTime.UTC().Format(http.TimeFormat))
addToMetadata(metadata, impl.MessageKeyEnqueuedTimeUtc, asbMsg.EnqueuedTime.UTC().Format(http.TimeFormat))
}
if asbMsg.SequenceNumber != nil {
addToMetadata(metadata, SequenceNumberMetadataKey, strconv.FormatInt(*asbMsg.SequenceNumber, 10))
addToMetadata(metadata, impl.MessageKeySequenceNumber, strconv.FormatInt(*asbMsg.SequenceNumber, 10))
}
if asbMsg.ScheduledEnqueueTime != nil {
// Preserve RFC2616 time format.
addToMetadata(metadata, ScheduledEnqueueTimeUtcMetadataKey, asbMsg.ScheduledEnqueueTime.UTC().Format(http.TimeFormat))
addToMetadata(metadata, impl.MessageKeyScheduledEnqueueTimeUtc, asbMsg.ScheduledEnqueueTime.UTC().Format(http.TimeFormat))
}
if asbMsg.PartitionKey != nil {
addToMetadata(metadata, PartitionKeyMetadataKey, *asbMsg.PartitionKey)
addToMetadata(metadata, impl.MessageKeyPartitionKey, *asbMsg.PartitionKey)
}
if asbMsg.LockedUntil != nil {
// Preserve RFC2616 time format.
addToMetadata(metadata, LockedUntilUtcMetadataKey, asbMsg.LockedUntil.UTC().Format(http.TimeFormat))
addToMetadata(metadata, impl.MessageKeyLockedUntilUtc, asbMsg.LockedUntil.UTC().Format(http.TimeFormat))
}
return metadata
}
// NewASBMessageFromPubsubRequest builds a new Azure Service Bus message from a PublishRequest.
func NewASBMessageFromPubsubRequest(req *pubsub.PublishRequest) (*azservicebus.Message, error) {
asbMsg := &azservicebus.Message{
Body: req.Data,
}
err := addMetadataToMessage(asbMsg, req.Metadata)
return asbMsg, err
}
// NewASBMessageFromBulkMessageEntry builds a new Azure Service Bus message from a BulkMessageEntry.
func NewASBMessageFromBulkMessageEntry(entry pubsub.BulkMessageEntry) (*azservicebus.Message, error) {
asbMsg := &azservicebus.Message{
Body: entry.Event,
ContentType: &entry.ContentType,
}
err := addMetadataToMessage(asbMsg, entry.Metadata)
return asbMsg, err
}
func addMetadataToMessage(asbMsg *azservicebus.Message, metadata map[string]string) error {
// Common properties.
ttl, ok, _ := contribMetadata.TryGetTTL(metadata)
if ok {
asbMsg.TimeToLive = &ttl
}
// Azure Service Bus specific properties.
// reference: https://docs.microsoft.com/en-us/rest/api/servicebus/message-headers-and-properties#message-headers
msgID, ok, _ := tryGetString(metadata, MessageIDMetadataKey)
if ok {
asbMsg.MessageID = &msgID
}
correlationID, ok, _ := tryGetString(metadata, CorrelationIDMetadataKey)
if ok {
asbMsg.CorrelationID = &correlationID
}
sessionID, okSessionID, _ := tryGetString(metadata, SessionIDMetadataKey)
if okSessionID {
asbMsg.SessionID = &sessionID
}
label, ok, _ := tryGetString(metadata, LabelMetadataKey)
if ok {
asbMsg.Subject = &label
}
replyTo, ok, _ := tryGetString(metadata, ReplyToMetadataKey)
if ok {
asbMsg.ReplyTo = &replyTo
}
to, ok, _ := tryGetString(metadata, ToMetadataKey)
if ok {
asbMsg.To = &to
}
partitionKey, ok, _ := tryGetString(metadata, PartitionKeyMetadataKey)
if ok {
if okSessionID && partitionKey != sessionID {
return fmt.Errorf("session id %s and partition key %s should be equal when both present", sessionID, partitionKey)
}
asbMsg.PartitionKey = &partitionKey
}
contentType, ok, _ := tryGetString(metadata, ContentTypeMetadataKey)
if ok {
asbMsg.ContentType = &contentType
}
scheduledEnqueueTime, ok, _ := tryGetScheduledEnqueueTime(metadata)
if ok {
asbMsg.ScheduledEnqueueTime = scheduledEnqueueTime
}
return nil
}
// UpdateASBBatchMessageWithBulkPublishRequest updates the batch message with messages from the bulk publish request.
func UpdateASBBatchMessageWithBulkPublishRequest(asbMsgBatch *azservicebus.MessageBatch, req *pubsub.BulkPublishRequest) error {
// Add entries from bulk request to batch.
for _, entry := range req.Entries {
asbMsg, err := NewASBMessageFromBulkMessageEntry(entry)
asbMsg, err := impl.NewASBMessageFromBulkMessageEntry(entry)
if err != nil {
return err
}
@ -259,24 +128,3 @@ func UpdateASBBatchMessageWithBulkPublishRequest(asbMsgBatch *azservicebus.Messa
return nil
}
func tryGetString(props map[string]string, key string) (string, bool, error) {
if val, ok := props[key]; ok && val != "" {
return val, true, nil
}
return "", false, nil
}
func tryGetScheduledEnqueueTime(props map[string]string) (*time.Time, bool, error) {
if val, ok := props[ScheduledEnqueueTimeUtcMetadataKey]; ok && val != "" {
timeVal, err := time.Parse(http.TimeFormat, val)
if err != nil {
return nil, false, err
}
return &timeVal, true, nil
}
return nil, false, nil
}

View File

@ -19,9 +19,10 @@ import (
"testing"
"time"
azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
impl "github.com/dapr/components-contrib/internal/component/azure/servicebus"
)
var (
@ -69,20 +70,20 @@ func TestAddMessageAttributesToMetadata(t *testing.T) {
LockedUntil: &testSampleTime,
},
expectedMetadata: map[string]string{
"metadata." + MessageIDMetadataKey: testMessageID,
"metadata." + SessionIDMetadataKey: testSessionID,
"metadata." + CorrelationIDMetadataKey: testCorrelationID,
"metadata." + LabelMetadataKey: testLabel, // Subject
"metadata." + ReplyToMetadataKey: testReplyTo,
"metadata." + ToMetadataKey: testTo,
"metadata." + ContentTypeMetadataKey: testContentType,
"metadata." + LockTokenMetadataKey: testLockTokenString,
"metadata." + DeliveryCountMetadataKey: "1",
"metadata." + EnqueuedTimeUtcMetadataKey: testSampleTimeHTTPFormat,
"metadata." + SequenceNumberMetadataKey: "1",
"metadata." + ScheduledEnqueueTimeUtcMetadataKey: testSampleTimeHTTPFormat,
"metadata." + PartitionKeyMetadataKey: testPartitionKey,
"metadata." + LockedUntilUtcMetadataKey: testSampleTimeHTTPFormat,
"metadata." + impl.MessageKeyMessageID: testMessageID,
"metadata." + impl.MessageKeySessionID: testSessionID,
"metadata." + impl.MessageKeyCorrelationID: testCorrelationID,
"metadata." + impl.MessageKeyLabel: testLabel, // Subject
"metadata." + impl.MessageKeyReplyTo: testReplyTo,
"metadata." + impl.MessageKeyTo: testTo,
"metadata." + impl.MessageKeyContentType: testContentType,
"metadata." + impl.MessageKeyLockToken: testLockTokenString,
"metadata." + impl.MessageKeyDeliveryCount: "1",
"metadata." + impl.MessageKeyEnqueuedTimeUtc: testSampleTimeHTTPFormat,
"metadata." + impl.MessageKeySequenceNumber: "1",
"metadata." + impl.MessageKeyScheduledEnqueueTimeUtc: testSampleTimeHTTPFormat,
"metadata." + impl.MessageKeyPartitionKey: testPartitionKey,
"metadata." + impl.MessageKeyLockedUntilUtc: testSampleTimeHTTPFormat,
},
},
}
@ -101,89 +102,3 @@ func TestAddMessageAttributesToMetadata(t *testing.T) {
}
}
}
func TestAddMetadataToMessage(t *testing.T) {
testCases := []struct {
name string
metadata map[string]string
expectedAzServiceBusMessage azservicebus.Message
expectError bool
}{
{
name: "Maps pubsub request to azure service bus message.",
metadata: map[string]string{
MessageIDMetadataKey: testMessageID,
CorrelationIDMetadataKey: testCorrelationID,
SessionIDMetadataKey: testSessionID,
LabelMetadataKey: testLabel,
ReplyToMetadataKey: testReplyTo,
ToMetadataKey: testTo,
PartitionKeyMetadataKey: testPartitionKey,
ContentTypeMetadataKey: testContentType,
ScheduledEnqueueTimeUtcMetadataKey: testScheduledEnqueueTimeUtc,
},
expectedAzServiceBusMessage: azservicebus.Message{
MessageID: &testMessageID,
CorrelationID: &testCorrelationID,
SessionID: &testSessionID,
Subject: &testLabel,
ReplyTo: &testReplyTo,
To: &testTo,
PartitionKey: &testPartitionKey,
ScheduledEnqueueTime: &nowUtc,
ContentType: &testContentType,
},
expectError: false,
},
{
name: "Errors when partition key and session id set but not equal.",
metadata: map[string]string{
MessageIDMetadataKey: testMessageID,
CorrelationIDMetadataKey: testCorrelationID,
SessionIDMetadataKey: testSessionID,
LabelMetadataKey: testLabel,
ReplyToMetadataKey: testReplyTo,
ToMetadataKey: testTo,
PartitionKeyMetadataKey: testPartitionKeyUnique,
ContentTypeMetadataKey: testContentType,
},
expectedAzServiceBusMessage: azservicebus.Message{
MessageID: &testMessageID,
CorrelationID: &testCorrelationID,
SessionID: &testSessionID,
Subject: &testLabel,
ReplyTo: &testReplyTo,
To: &testTo,
PartitionKey: &testPartitionKey,
ContentType: &testContentType,
},
expectError: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// act.
msg := &azservicebus.Message{}
err := addMetadataToMessage(msg, tc.metadata)
// assert.
if tc.expectError {
require.NotNil(t, err)
} else {
require.Nil(t, err)
assert.Equal(t, tc.expectedAzServiceBusMessage.Body, msg.Body)
assert.Equal(t, tc.expectedAzServiceBusMessage.MessageID, msg.MessageID)
assert.Equal(t, tc.expectedAzServiceBusMessage.CorrelationID, msg.CorrelationID)
assert.Equal(t, tc.expectedAzServiceBusMessage.SessionID, msg.SessionID)
assert.Equal(t, tc.expectedAzServiceBusMessage.ContentType, msg.ContentType)
assert.Equal(t, tc.expectedAzServiceBusMessage.ReplyTo, msg.ReplyTo)
assert.Equal(t, tc.expectedAzServiceBusMessage.TimeToLive, msg.TimeToLive)
assert.Equal(t, tc.expectedAzServiceBusMessage.To, msg.To)
assert.Equal(t, tc.expectedAzServiceBusMessage.Subject, msg.Subject)
assert.Equal(t, tc.expectedAzServiceBusMessage.PartitionKey, msg.PartitionKey)
assert.Equal(t, tc.expectedAzServiceBusMessage.ScheduledEnqueueTime.Unix(), msg.ScheduledEnqueueTime.Unix())
}
})
}
}

View File

@ -1,77 +0,0 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package servicebus
// Reference for settings:
// https://github.com/Azure/azure-service-bus-go/blob/54b2faa53e5216616e59725281be692acc120c34/subscription_manager.go#L101
type metadata struct {
ConnectionString string `json:"connectionString"`
ConsumerID string `json:"consumerID"`
TimeoutInSec int `json:"timeoutInSec"`
HandlerTimeoutInSec int `json:"handlerTimeoutInSec"`
LockRenewalInSec int `json:"lockRenewalInSec"`
MaxActiveMessages int `json:"maxActiveMessages"`
MaxConnectionRecoveryInSec int `json:"maxConnectionRecoveryInSec"`
MinConnectionRecoveryInSec int `json:"minConnectionRecoveryInSec"`
DisableEntityManagement bool `json:"disableEntityManagement"`
MaxRetriableErrorsPerSec int `json:"maxRetriableErrorsPerSec"`
MaxDeliveryCount *int `json:"maxDeliveryCount"`
LockDurationInSec *int `json:"lockDurationInSec"`
DefaultMessageTimeToLiveInSec *int `json:"defaultMessageTimeToLiveInSec"`
AutoDeleteOnIdleInSec *int `json:"autoDeleteOnIdleInSec"`
MaxConcurrentHandlers *int `json:"maxConcurrentHandlers"`
PublishMaxRetries int `json:"publishMaxRetries"`
PublishInitialRetryIntervalInMs int `json:"publishInitialRetryInternalInMs"`
NamespaceName string `json:"namespaceName,omitempty"`
}
const (
// Keys.
connectionString = "connectionString"
consumerID = "consumerID"
timeoutInSec = "timeoutInSec"
handlerTimeoutInSec = "handlerTimeoutInSec"
lockRenewalInSec = "lockRenewalInSec"
maxActiveMessages = "maxActiveMessages"
maxConnectionRecoveryInSec = "maxConnectionRecoveryInSec"
minConnectionRecoveryInSec = "minConnectionRecoveryInSec"
disableEntityManagement = "disableEntityManagement"
maxRetriableErrorsPerSec = "maxRetriableErrorsPerSec"
maxDeliveryCount = "maxDeliveryCount"
lockDurationInSec = "lockDurationInSec"
defaultMessageTimeToLiveInSec = "defaultMessageTimeToLiveInSec"
autoDeleteOnIdleInSec = "autoDeleteOnIdleInSec"
maxConcurrentHandlers = "maxConcurrentHandlers"
publishMaxRetries = "publishMaxRetries"
publishInitialRetryInternalInMs = "publishInitialRetryInternalInMs"
namespaceName = "namespaceName"
// Deprecated keys.
maxReconnectionAttempts = "maxReconnectionAttempts"
connectionRecoveryInSec = "connectionRecoveryInSec"
// Defaults.
defaultTimeoutInSec = 60
defaultHandlerTimeoutInSec = 60
defaultLockRenewalInSec = 20
defaultMaxRetriableErrorsPerSec = 10
// ASB Messages can be up to 256Kb. 10000 messages at this size would roughly use 2.56Gb.
// We should change this if performance testing suggests a more sensible default.
defaultMaxActiveMessages = 10000
defaultDisableEntityManagement = false
defaultMinConnectionRecoveryInSec = 2
defaultMaxConnectionRecoveryInSec = 300
defaultPublishMaxRetries = 5
defaultPublishInitialRetryInternalInMs = 500
)

View File

@ -116,8 +116,7 @@ func (a *azureServiceBus) Init(metadata pubsub.Metadata) (err error) {
}
func (a *azureServiceBus) Publish(req *pubsub.PublishRequest) error {
// a.logger.Debugf("Creating message with body: %s", string(req.Data))
msg, err := NewASBMessageFromPubsubRequest(req)
msg, err := impl.NewASBMessageFromPubsubRequest(req)
if err != nil {
return err
}

View File

@ -1,611 +0,0 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package servicebus
import (
"testing"
"github.com/stretchr/testify/assert"
mdata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
)
const (
invalidNumber = "invalid_number"
)
func getFakeProperties() map[string]string {
return map[string]string{
connectionString: "fakeConnectionString",
namespaceName: "",
consumerID: "fakeConId",
disableEntityManagement: "true",
timeoutInSec: "90",
handlerTimeoutInSec: "30",
maxDeliveryCount: "10",
autoDeleteOnIdleInSec: "240",
defaultMessageTimeToLiveInSec: "2400",
lockDurationInSec: "120",
lockRenewalInSec: "15",
maxConcurrentHandlers: "1",
maxActiveMessages: "100",
minConnectionRecoveryInSec: "5",
maxConnectionRecoveryInSec: "600",
maxRetriableErrorsPerSec: "50",
}
}
func TestParseServiceBusMetadata(t *testing.T) {
t.Run("metadata is correct", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
// act.
m, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.NoError(t, err)
assert.Equal(t, fakeProperties[connectionString], m.ConnectionString)
assert.Equal(t, fakeProperties[consumerID], m.ConsumerID)
assert.Equal(t, 90, m.TimeoutInSec)
assert.Equal(t, true, m.DisableEntityManagement)
assert.Equal(t, 30, m.HandlerTimeoutInSec)
assert.NotNil(t, m.LockRenewalInSec)
assert.Equal(t, 15, m.LockRenewalInSec)
assert.NotNil(t, m.MaxActiveMessages)
assert.Equal(t, 100, m.MaxActiveMessages)
assert.NotNil(t, m.MinConnectionRecoveryInSec)
assert.Equal(t, 5, m.MinConnectionRecoveryInSec)
assert.NotNil(t, m.MaxConnectionRecoveryInSec)
assert.Equal(t, 600, m.MaxConnectionRecoveryInSec)
assert.Equal(t, 50, m.MaxRetriableErrorsPerSec)
assert.NotNil(t, m.AutoDeleteOnIdleInSec)
assert.Equal(t, 240, *m.AutoDeleteOnIdleInSec)
assert.NotNil(t, m.MaxDeliveryCount)
assert.Equal(t, 10, *m.MaxDeliveryCount)
assert.NotNil(t, m.DefaultMessageTimeToLiveInSec)
assert.Equal(t, 2400, *m.DefaultMessageTimeToLiveInSec)
assert.NotNil(t, m.LockDurationInSec)
assert.Equal(t, 120, *m.LockDurationInSec)
assert.NotNil(t, m.MaxConcurrentHandlers)
assert.Equal(t, 1, *m.MaxConcurrentHandlers)
})
t.Run("missing required connectionString and namespaceName", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[connectionString] = ""
fakeMetaData.Properties[namespaceName] = ""
// act.
m, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Error(t, err)
assertValidErrorMessage(t, err)
assert.Empty(t, m.ConnectionString)
})
t.Run("connectionString makes namespace optional", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[namespaceName] = ""
// act.
m, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.NoError(t, err)
assert.Equal(t, "fakeConnectionString", m.ConnectionString)
})
t.Run("namespace makes conectionString optional", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[namespaceName] = "fakeNamespace"
fakeMetaData.Properties[connectionString] = ""
// act.
m, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.NoError(t, err)
assert.Equal(t, "fakeNamespace", m.NamespaceName)
})
t.Run("connectionString and namespace are mutually exclusive", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[namespaceName] = "fakeNamespace"
// act.
_, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Error(t, err)
assertValidErrorMessage(t, err)
})
t.Run("missing required consumerID", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[consumerID] = ""
// act.
m, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Error(t, err)
assertValidErrorMessage(t, err)
assert.Empty(t, m.ConsumerID)
})
t.Run("missing optional timeoutInSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[timeoutInSec] = ""
// act.
m, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Equal(t, defaultTimeoutInSec, m.TimeoutInSec)
assert.Nil(t, err)
})
t.Run("invalid optional timeoutInSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[timeoutInSec] = invalidNumber
// act.
_, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Error(t, err)
assertValidErrorMessage(t, err)
})
t.Run("missing optional disableEntityManagement", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[disableEntityManagement] = ""
// act.
m, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Equal(t, false, m.DisableEntityManagement)
assert.Nil(t, err)
})
t.Run("invalid optional disableEntityManagement", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[disableEntityManagement] = "invalid_bool"
// act.
_, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Error(t, err)
assertValidErrorMessage(t, err)
})
t.Run("missing optional handlerTimeoutInSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[handlerTimeoutInSec] = ""
// act.
m, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Equal(t, defaultHandlerTimeoutInSec, m.HandlerTimeoutInSec)
assert.Nil(t, err)
})
t.Run("invalid optional handlerTimeoutInSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[handlerTimeoutInSec] = invalidNumber
// act.
_, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Error(t, err)
assertValidErrorMessage(t, err)
})
t.Run("missing optional lockRenewalInSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[lockRenewalInSec] = ""
// act.
m, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Equal(t, defaultLockRenewalInSec, m.LockRenewalInSec)
assert.Nil(t, err)
})
t.Run("invalid optional lockRenewalInSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[lockRenewalInSec] = invalidNumber
// act.
_, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Error(t, err)
assertValidErrorMessage(t, err)
})
t.Run("missing optional maxRetriableErrorsPerSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[maxRetriableErrorsPerSec] = ""
// act.
m, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Equal(t, defaultMaxRetriableErrorsPerSec, m.MaxRetriableErrorsPerSec)
assert.Nil(t, err)
})
t.Run("invalid optional maxRetriableErrorsPerSec", func(t *testing.T) {
// NaN: Not a Number
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[maxRetriableErrorsPerSec] = invalidNumber
// act.
_, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Error(t, err)
assertValidErrorMessage(t, err)
// Negative number
fakeProperties = getFakeProperties()
fakeMetaData = pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[maxRetriableErrorsPerSec] = "-1"
// act.
_, err = parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Error(t, err)
assertValidErrorMessage(t, err)
})
t.Run("missing optional maxActiveMessages", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[maxActiveMessages] = ""
// act.
m, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Equal(t, defaultMaxActiveMessages, m.MaxActiveMessages)
assert.Nil(t, err)
})
t.Run("invalid optional maxActiveMessages", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[maxActiveMessages] = invalidNumber
// act.
_, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Error(t, err)
assertValidErrorMessage(t, err)
})
t.Run("missing optional maxConnectionRecoveryInSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[maxConnectionRecoveryInSec] = ""
// act.
m, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Equal(t, defaultMaxConnectionRecoveryInSec, m.MaxConnectionRecoveryInSec)
assert.Nil(t, err)
})
t.Run("invalid optional maxConnectionRecoveryInSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[maxConnectionRecoveryInSec] = invalidNumber
// act.
_, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Error(t, err)
assertValidErrorMessage(t, err)
})
t.Run("missing optional minConnectionRecoveryInSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[minConnectionRecoveryInSec] = ""
// act.
m, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Equal(t, defaultMinConnectionRecoveryInSec, m.MinConnectionRecoveryInSec)
assert.Nil(t, err)
})
t.Run("invalid optional minConnectionRecoveryInSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[minConnectionRecoveryInSec] = invalidNumber
// act.
_, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Error(t, err)
assertValidErrorMessage(t, err)
})
t.Run("missing nullable maxDeliveryCount", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[maxDeliveryCount] = ""
// act.
m, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Nil(t, m.MaxDeliveryCount)
assert.Nil(t, err)
})
t.Run("invalid nullable maxDeliveryCount", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[maxDeliveryCount] = invalidNumber
// act.
_, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Error(t, err)
assertValidErrorMessage(t, err)
})
t.Run("missing nullable defaultMessageTimeToLiveInSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[defaultMessageTimeToLiveInSec] = ""
// act.
m, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Nil(t, m.DefaultMessageTimeToLiveInSec)
assert.Nil(t, err)
})
t.Run("invalid nullable defaultMessageTimeToLiveInSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[defaultMessageTimeToLiveInSec] = invalidNumber
// act.
_, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Error(t, err)
assertValidErrorMessage(t, err)
})
t.Run("missing nullable autoDeleteOnIdleInSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[autoDeleteOnIdleInSec] = ""
// act.
m, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Nil(t, m.AutoDeleteOnIdleInSec)
assert.Nil(t, err)
})
t.Run("invalid nullable autoDeleteOnIdleInSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[autoDeleteOnIdleInSec] = invalidNumber
// act.
_, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Error(t, err)
assertValidErrorMessage(t, err)
})
t.Run("missing nullable lockDurationInSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[lockDurationInSec] = ""
// act.
m, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Nil(t, m.LockDurationInSec)
assert.Nil(t, err)
})
t.Run("invalid nullable lockDurationInSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[lockDurationInSec] = invalidNumber
// act.
_, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Error(t, err)
assertValidErrorMessage(t, err)
})
t.Run("missing nullable maxConcurrentHandlers", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[maxConcurrentHandlers] = ""
// act.
m, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Nil(t, m.MaxConcurrentHandlers)
assert.Nil(t, err)
})
t.Run("invalid nullable maxConcurrentHandlers", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[maxConcurrentHandlers] = invalidNumber
// act.
_, err := parseAzureServiceBusMetadata(fakeMetaData, nil)
// assert.
assert.Error(t, err)
assertValidErrorMessage(t, err)
})
}
func assertValidErrorMessage(t *testing.T, err error) {
assert.Contains(t, err.Error(), errorMessagePrefix)
}