From 7ecfb6edd0baa6007dbe95161a5a5a73cfcf25c6 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Wed, 19 Oct 2022 20:08:07 +0000 Subject: [PATCH] Moved code to add metadata to ASB messages to shared impl too Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- .../servicebusqueues/servicebusqueues.go | 28 +- .../component/azure/servicebus/message.go | 175 +++++ .../azure/servicebus/message_test.go | 131 ++++ pubsub/azure/servicebus/message.go | 184 +----- pubsub/azure/servicebus/message_test.go | 119 +--- pubsub/azure/servicebus/metadata.go | 77 --- pubsub/azure/servicebus/servicebus.go | 3 +- pubsub/azure/servicebus/servicebus_test.go | 611 ------------------ 8 files changed, 342 insertions(+), 986 deletions(-) create mode 100644 internal/component/azure/servicebus/message.go create mode 100644 internal/component/azure/servicebus/message_test.go delete mode 100644 pubsub/azure/servicebus/metadata.go delete mode 100644 pubsub/azure/servicebus/servicebus_test.go diff --git a/bindings/azure/servicebusqueues/servicebusqueues.go b/bindings/azure/servicebusqueues/servicebusqueues.go index dc9af0102..95f0066b6 100644 --- a/bindings/azure/servicebusqueues/servicebusqueues.go +++ b/bindings/azure/servicebusqueues/servicebusqueues.go @@ -27,7 +27,6 @@ import ( "github.com/dapr/components-contrib/bindings" azauth "github.com/dapr/components-contrib/internal/authentication/azure" impl "github.com/dapr/components-contrib/internal/component/azure/servicebus" - contribMetadata "github.com/dapr/components-contrib/metadata" "github.com/dapr/kit/logger" ) @@ -138,32 +137,9 @@ func (a *AzureServiceBusQueues) Invoke(invokeCtx context.Context, req *bindings. return nil, fmt.Errorf("failed to create a sender for the Service Bus queue: %w", err) } - msg := &servicebus.Message{ - Body: req.Data, - ApplicationProperties: make(map[string]interface{}), - } - if val, ok := req.Metadata[id]; ok && val != "" { - msg.MessageID = &val - } - if val, ok := req.Metadata[correlationID]; ok && val != "" { - msg.CorrelationID = &val - } - - // Include incoming metadata in the message to be used when it is read. - for k, v := range req.Metadata { - // Don't include the values that are saved in MessageID or CorrelationID. - if k == id || k == correlationID { - continue - } - msg.ApplicationProperties[k] = v - } - - ttl, ok, err := contribMetadata.TryGetTTL(req.Metadata) + msg, err := impl.NewASBMessageFromInvokeRequest(req) if err != nil { - return nil, err - } - if ok { - msg.TimeToLive = &ttl + return nil, fmt.Errorf("failed to create message: %w", err) } // Send the message diff --git a/internal/component/azure/servicebus/message.go b/internal/component/azure/servicebus/message.go new file mode 100644 index 000000000..0cd9e161b --- /dev/null +++ b/internal/component/azure/servicebus/message.go @@ -0,0 +1,175 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package servicebus + +import ( + "fmt" + "net/http" + "time" + + azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + + "github.com/dapr/components-contrib/bindings" + mdutils "github.com/dapr/components-contrib/metadata" + "github.com/dapr/components-contrib/pubsub" + "github.com/dapr/kit/ptr" +) + +const ( + // MessageKeyMessageID defines the metadata key for the message id. + MessageKeyMessageID = "MessageId" // read, write. + // MessageKeyMessageIDAlias is an alias for "MessageId" for write only, for backwards-compatibility + MessageKeyMessageIDAlias = "id" + + // MessageKeyCorrelationID defines the metadata key for the correlation id. + MessageKeyCorrelationID = "CorrelationId" // read, write. + // MessageKeyCorrelationIDAlias is an alias for "CorrelationId" for write only, for backwards-compatibility + MessageKeyCorrelationIDAlias = "correlationID" + + // MessageKeySessionID defines the metadata key for the session id. + MessageKeySessionID = "SessionId" // read, write. + + // MessageKeyLabel defines the metadata key for the label. + MessageKeyLabel = "Label" // read, write. + + // MessageKeyReplyTo defines the metadata key for the reply to value. + MessageKeyReplyTo = "ReplyTo" // read, write. + + // MessageKeyTo defines the metadata key for the to value. + MessageKeyTo = "To" // read, write. + + // MessageKeyPartitionKey defines the metadata key for the partition key. + MessageKeyPartitionKey = "PartitionKey" // read, write. + + // MessageKeyContentType defines the metadata key for the content type. + MessageKeyContentType = "ContentType" // read, write. + + // MessageKeyDeliveryCount defines the metadata key for the delivery count. + MessageKeyDeliveryCount = "DeliveryCount" // read. + + // MessageKeyLockedUntilUtc defines the metadata key for the locked until utc value. + MessageKeyLockedUntilUtc = "LockedUntilUtc" // read. + + // MessageKeyLockToken defines the metadata key for the lock token. + MessageKeyLockToken = "LockToken" // read. + + // MessageKeyEnqueuedTimeUtc defines the metadata key for the enqueued time utc value. + MessageKeyEnqueuedTimeUtc = "EnqueuedTimeUtc" // read. + + // MessageKeySequenceNumber defines the metadata key for the sequence number. + MessageKeySequenceNumber = "SequenceNumber" // read. + + // MessageKeyScheduledEnqueueTimeUtc defines the metadata key for the scheduled enqueue time utc value. + MessageKeyScheduledEnqueueTimeUtc = "ScheduledEnqueueTimeUtc" // read, write. + + // MessageKeyReplyToSessionID defines the metadata key for the reply to session id. + // Currently unused. + MessageKeyReplyToSessionID = "ReplyToSessionId" // read, write. +) + +// NewASBMessageFromPubsubRequest builds a new Azure Service Bus message from a PublishRequest. +func NewASBMessageFromPubsubRequest(req *pubsub.PublishRequest) (*azservicebus.Message, error) { + asbMsg := &azservicebus.Message{ + Body: req.Data, + } + + err := addMetadataToMessage(asbMsg, req.Metadata) + return asbMsg, err +} + +// NewASBMessageFromBulkMessageEntry builds a new Azure Service Bus message from a BulkMessageEntry. +func NewASBMessageFromBulkMessageEntry(entry pubsub.BulkMessageEntry) (*azservicebus.Message, error) { + asbMsg := &azservicebus.Message{ + Body: entry.Event, + ContentType: &entry.ContentType, + } + + err := addMetadataToMessage(asbMsg, entry.Metadata) + return asbMsg, err +} + +// NewASBMessageFromInvokeRequest builds a new Azure Service Bus message from a binding's Invoke request. +func NewASBMessageFromInvokeRequest(req *bindings.InvokeRequest) (*azservicebus.Message, error) { + asbMsg := &azservicebus.Message{ + Body: req.Data, + } + + err := addMetadataToMessage(asbMsg, req.Metadata) + return asbMsg, err +} + +// Adds metadata to the message. +// Reference for Azure Service Bus specific properties: https://docs.microsoft.com/en-us/rest/api/servicebus/message-headers-and-properties#message-headers +func addMetadataToMessage(asbMsg *azservicebus.Message, metadata map[string]string) error { + asbMsg.ApplicationProperties = make(map[string]interface{}, len(metadata)) + + for k, v := range metadata { + // Note: do not just do &v because we're in a loop + if v == "" { + continue + } + + switch k { + // Common keys + case mdutils.TTLMetadataKey: + // Ignore v here and use TryGetTTL for the validation it performs + ttl, ok, _ := mdutils.TryGetTTL(metadata) + if ok { + asbMsg.TimeToLive = &ttl + } + + // Keys with aliases + case MessageKeyMessageID, MessageKeyMessageIDAlias: + if asbMsg.MessageID == nil { + asbMsg.MessageID = ptr.Of(v) + } + + case MessageKeyCorrelationID, MessageKeyCorrelationIDAlias: + if asbMsg.CorrelationID == nil { + asbMsg.CorrelationID = ptr.Of(v) + } + + // String types + case MessageKeySessionID: + asbMsg.SessionID = ptr.Of(v) + case MessageKeyLabel: + asbMsg.Subject = ptr.Of(v) + case MessageKeyReplyTo: + asbMsg.ReplyTo = ptr.Of(v) + case MessageKeyTo: + asbMsg.To = ptr.Of(v) + case MessageKeyPartitionKey: + asbMsg.PartitionKey = ptr.Of(v) + case MessageKeyContentType: + asbMsg.ContentType = ptr.Of(v) + + // Time + case MessageKeyScheduledEnqueueTimeUtc: + timeVal, err := time.Parse(http.TimeFormat, v) + if err == nil { + asbMsg.ScheduledEnqueueTime = &timeVal + } + + // Fallback: set as application property + default: + asbMsg.ApplicationProperties[k] = v + } + } + + if asbMsg.PartitionKey != nil && asbMsg.SessionID != nil && *asbMsg.PartitionKey != *asbMsg.SessionID { + return fmt.Errorf("session id %s and partition key %s should be equal when both present", *asbMsg.SessionID, *asbMsg.PartitionKey) + } + + return nil +} diff --git a/internal/component/azure/servicebus/message_test.go b/internal/component/azure/servicebus/message_test.go new file mode 100644 index 000000000..63186078c --- /dev/null +++ b/internal/component/azure/servicebus/message_test.go @@ -0,0 +1,131 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package servicebus + +import ( + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" +) + +var ( + testMessageID = "testMessageId" + testCorrelationID = "testCorrelationId" + testSessionID = "testSessionId" + testLabel = "testLabel" + testReplyTo = "testReplyTo" + testTo = "testTo" + testPartitionKey = testSessionID + testPartitionKeyUnique = "testPartitionKey" + testContentType = "testContentType" + nowUtc = time.Now().UTC() + testScheduledEnqueueTimeUtc = nowUtc.Format(http.TimeFormat) + testLockTokenString = "bG9ja3Rva2VuAAAAAAAAAA==" //nolint:gosec + testLockTokenBytes = [16]byte{108, 111, 99, 107, 116, 111, 107, 101, 110} + testDeliveryCount = uint32(1) + testSampleTime = time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) + testSampleTimeHTTPFormat = "Thu, 01 Jan 1970 00:00:00 GMT" + testSequenceNumber = int64(1) +) + +func TestAddMetadataToMessage(t *testing.T) { + testCases := []struct { + name string + metadata map[string]string + expectedAzServiceBusMessage azservicebus.Message + expectError bool + }{ + { + name: "Maps pubsub request to azure service bus message.", + metadata: map[string]string{ + MessageKeyMessageID: testMessageID, + MessageKeyCorrelationID: testCorrelationID, + MessageKeySessionID: testSessionID, + MessageKeyLabel: testLabel, + MessageKeyReplyTo: testReplyTo, + MessageKeyTo: testTo, + MessageKeyPartitionKey: testPartitionKey, + MessageKeyContentType: testContentType, + MessageKeyScheduledEnqueueTimeUtc: testScheduledEnqueueTimeUtc, + }, + expectedAzServiceBusMessage: azservicebus.Message{ + MessageID: &testMessageID, + CorrelationID: &testCorrelationID, + SessionID: &testSessionID, + Subject: &testLabel, + ReplyTo: &testReplyTo, + To: &testTo, + PartitionKey: &testPartitionKey, + ScheduledEnqueueTime: &nowUtc, + ContentType: &testContentType, + }, + expectError: false, + }, + { + name: "Errors when partition key and session id set but not equal.", + metadata: map[string]string{ + MessageKeyMessageID: testMessageID, + MessageKeyCorrelationID: testCorrelationID, + MessageKeySessionID: testSessionID, + MessageKeyLabel: testLabel, + MessageKeyReplyTo: testReplyTo, + MessageKeyTo: testTo, + MessageKeyPartitionKey: testPartitionKeyUnique, + MessageKeyContentType: testContentType, + }, + expectedAzServiceBusMessage: azservicebus.Message{ + MessageID: &testMessageID, + CorrelationID: &testCorrelationID, + SessionID: &testSessionID, + Subject: &testLabel, + ReplyTo: &testReplyTo, + To: &testTo, + PartitionKey: &testPartitionKey, + ContentType: &testContentType, + }, + expectError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // act. + msg := &azservicebus.Message{} + err := addMetadataToMessage(msg, tc.metadata) + + // assert. + if tc.expectError { + require.NotNil(t, err) + } else { + require.Nil(t, err) + assert.Equal(t, tc.expectedAzServiceBusMessage.Body, msg.Body) + assert.Equal(t, tc.expectedAzServiceBusMessage.MessageID, msg.MessageID) + assert.Equal(t, tc.expectedAzServiceBusMessage.CorrelationID, msg.CorrelationID) + assert.Equal(t, tc.expectedAzServiceBusMessage.SessionID, msg.SessionID) + assert.Equal(t, tc.expectedAzServiceBusMessage.ContentType, msg.ContentType) + assert.Equal(t, tc.expectedAzServiceBusMessage.ReplyTo, msg.ReplyTo) + assert.Equal(t, tc.expectedAzServiceBusMessage.TimeToLive, msg.TimeToLive) + assert.Equal(t, tc.expectedAzServiceBusMessage.To, msg.To) + assert.Equal(t, tc.expectedAzServiceBusMessage.Subject, msg.Subject) + assert.Equal(t, tc.expectedAzServiceBusMessage.PartitionKey, msg.PartitionKey) + assert.Equal(t, tc.expectedAzServiceBusMessage.ScheduledEnqueueTime.Unix(), msg.ScheduledEnqueueTime.Unix()) + } + }) + } +} diff --git a/pubsub/azure/servicebus/message.go b/pubsub/azure/servicebus/message.go index 819805c5c..2a5f1a996 100644 --- a/pubsub/azure/servicebus/message.go +++ b/pubsub/azure/servicebus/message.go @@ -15,65 +15,16 @@ package servicebus import ( "encoding/base64" - "fmt" "net/http" "strconv" - "time" azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/google/uuid" - contribMetadata "github.com/dapr/components-contrib/metadata" + impl "github.com/dapr/components-contrib/internal/component/azure/servicebus" "github.com/dapr/components-contrib/pubsub" ) -const ( - // MessageIDMetadataKey defines the metadata key for the message id. - MessageIDMetadataKey = "MessageId" // read, write. - - // CorrelationIDMetadataKey defines the metadata key for the correlation id. - CorrelationIDMetadataKey = "CorrelationId" // read, write. - - // SessionIDMetadataKey defines the metadata key for the session id. - SessionIDMetadataKey = "SessionId" // read, write. - - // LabelMetadataKey defines the metadata key for the label. - LabelMetadataKey = "Label" // read, write. - - // ReplyToMetadataKey defines the metadata key for the reply to value. - ReplyToMetadataKey = "ReplyTo" // read, write. - - // ToMetadataKey defines the metadata key for the to value. - ToMetadataKey = "To" // read, write. - - // PartitionKeyMetadataKey defines the metadata key for the partition key. - PartitionKeyMetadataKey = "PartitionKey" // read, write. - - // ContentTypeMetadataKey defines the metadata key for the content type. - ContentTypeMetadataKey = "ContentType" // read, write. - - // DeliveryCountMetadataKey defines the metadata key for the delivery count. - DeliveryCountMetadataKey = "DeliveryCount" // read. - - // LockedUntilUtcMetadataKey defines the metadata key for the locked until utc value. - LockedUntilUtcMetadataKey = "LockedUntilUtc" // read. - - // LockTokenMetadataKey defines the metadata key for the lock token. - LockTokenMetadataKey = "LockToken" // read. - - // EnqueuedTimeUtcMetadataKey defines the metadata key for the enqueued time utc value. - EnqueuedTimeUtcMetadataKey = "EnqueuedTimeUtc" // read. - - // SequenceNumberMetadataKey defines the metadata key for the sequence number. - SequenceNumberMetadataKey = "SequenceNumber" // read. - - // ScheduledEnqueueTimeUtcMetadataKey defines the metadata key for the scheduled enqueue time utc value. - ScheduledEnqueueTimeUtcMetadataKey = "ScheduledEnqueueTimeUtc" // read, write. - - // ReplyToSessionID defines the metadata key for the reply to session id. - ReplyToSessionID = "ReplyToSessionId" // read, write. -) - func NewPubsubMessageFromASBMessage(asbMsg *azservicebus.ReceivedMessage, topic string) (*pubsub.NewMessage, error) { pubsubMsg := &pubsub.NewMessage{ Topic: topic, @@ -111,142 +62,60 @@ func addMessageAttributesToMetadata(metadata map[string]string, asbMsg *azservic } if asbMsg.MessageID != "" { - addToMetadata(metadata, MessageIDMetadataKey, asbMsg.MessageID) + addToMetadata(metadata, impl.MessageKeyMessageID, asbMsg.MessageID) } if asbMsg.SessionID != nil { - addToMetadata(metadata, SessionIDMetadataKey, *asbMsg.SessionID) + addToMetadata(metadata, impl.MessageKeySessionID, *asbMsg.SessionID) } if asbMsg.CorrelationID != nil && *asbMsg.CorrelationID != "" { - addToMetadata(metadata, CorrelationIDMetadataKey, *asbMsg.CorrelationID) + addToMetadata(metadata, impl.MessageKeyCorrelationID, *asbMsg.CorrelationID) } if asbMsg.Subject != nil && *asbMsg.Subject != "" { - addToMetadata(metadata, LabelMetadataKey, *asbMsg.Subject) + addToMetadata(metadata, impl.MessageKeyLabel, *asbMsg.Subject) } if asbMsg.ReplyTo != nil && *asbMsg.ReplyTo != "" { - addToMetadata(metadata, ReplyToMetadataKey, *asbMsg.ReplyTo) + addToMetadata(metadata, impl.MessageKeyReplyTo, *asbMsg.ReplyTo) } if asbMsg.To != nil && *asbMsg.To != "" { - addToMetadata(metadata, ToMetadataKey, *asbMsg.To) + addToMetadata(metadata, impl.MessageKeyTo, *asbMsg.To) } if asbMsg.ContentType != nil && *asbMsg.ContentType != "" { - addToMetadata(metadata, ContentTypeMetadataKey, *asbMsg.ContentType) + addToMetadata(metadata, impl.MessageKeyContentType, *asbMsg.ContentType) } if asbMsg.LockToken != [16]byte{} { - addToMetadata(metadata, LockTokenMetadataKey, base64.StdEncoding.EncodeToString(asbMsg.LockToken[:])) + addToMetadata(metadata, impl.MessageKeyLockToken, base64.StdEncoding.EncodeToString(asbMsg.LockToken[:])) } // Always set delivery count. - addToMetadata(metadata, DeliveryCountMetadataKey, strconv.FormatInt(int64(asbMsg.DeliveryCount), 10)) + addToMetadata(metadata, impl.MessageKeyDeliveryCount, strconv.FormatInt(int64(asbMsg.DeliveryCount), 10)) if asbMsg.EnqueuedTime != nil { // Preserve RFC2616 time format. - addToMetadata(metadata, EnqueuedTimeUtcMetadataKey, asbMsg.EnqueuedTime.UTC().Format(http.TimeFormat)) + addToMetadata(metadata, impl.MessageKeyEnqueuedTimeUtc, asbMsg.EnqueuedTime.UTC().Format(http.TimeFormat)) } if asbMsg.SequenceNumber != nil { - addToMetadata(metadata, SequenceNumberMetadataKey, strconv.FormatInt(*asbMsg.SequenceNumber, 10)) + addToMetadata(metadata, impl.MessageKeySequenceNumber, strconv.FormatInt(*asbMsg.SequenceNumber, 10)) } if asbMsg.ScheduledEnqueueTime != nil { // Preserve RFC2616 time format. - addToMetadata(metadata, ScheduledEnqueueTimeUtcMetadataKey, asbMsg.ScheduledEnqueueTime.UTC().Format(http.TimeFormat)) + addToMetadata(metadata, impl.MessageKeyScheduledEnqueueTimeUtc, asbMsg.ScheduledEnqueueTime.UTC().Format(http.TimeFormat)) } if asbMsg.PartitionKey != nil { - addToMetadata(metadata, PartitionKeyMetadataKey, *asbMsg.PartitionKey) + addToMetadata(metadata, impl.MessageKeyPartitionKey, *asbMsg.PartitionKey) } if asbMsg.LockedUntil != nil { // Preserve RFC2616 time format. - addToMetadata(metadata, LockedUntilUtcMetadataKey, asbMsg.LockedUntil.UTC().Format(http.TimeFormat)) + addToMetadata(metadata, impl.MessageKeyLockedUntilUtc, asbMsg.LockedUntil.UTC().Format(http.TimeFormat)) } return metadata } -// NewASBMessageFromPubsubRequest builds a new Azure Service Bus message from a PublishRequest. -func NewASBMessageFromPubsubRequest(req *pubsub.PublishRequest) (*azservicebus.Message, error) { - asbMsg := &azservicebus.Message{ - Body: req.Data, - } - - err := addMetadataToMessage(asbMsg, req.Metadata) - return asbMsg, err -} - -// NewASBMessageFromBulkMessageEntry builds a new Azure Service Bus message from a BulkMessageEntry. -func NewASBMessageFromBulkMessageEntry(entry pubsub.BulkMessageEntry) (*azservicebus.Message, error) { - asbMsg := &azservicebus.Message{ - Body: entry.Event, - ContentType: &entry.ContentType, - } - - err := addMetadataToMessage(asbMsg, entry.Metadata) - return asbMsg, err -} - -func addMetadataToMessage(asbMsg *azservicebus.Message, metadata map[string]string) error { - // Common properties. - ttl, ok, _ := contribMetadata.TryGetTTL(metadata) - if ok { - asbMsg.TimeToLive = &ttl - } - - // Azure Service Bus specific properties. - // reference: https://docs.microsoft.com/en-us/rest/api/servicebus/message-headers-and-properties#message-headers - msgID, ok, _ := tryGetString(metadata, MessageIDMetadataKey) - if ok { - asbMsg.MessageID = &msgID - } - - correlationID, ok, _ := tryGetString(metadata, CorrelationIDMetadataKey) - if ok { - asbMsg.CorrelationID = &correlationID - } - - sessionID, okSessionID, _ := tryGetString(metadata, SessionIDMetadataKey) - if okSessionID { - asbMsg.SessionID = &sessionID - } - - label, ok, _ := tryGetString(metadata, LabelMetadataKey) - if ok { - asbMsg.Subject = &label - } - - replyTo, ok, _ := tryGetString(metadata, ReplyToMetadataKey) - if ok { - asbMsg.ReplyTo = &replyTo - } - - to, ok, _ := tryGetString(metadata, ToMetadataKey) - if ok { - asbMsg.To = &to - } - - partitionKey, ok, _ := tryGetString(metadata, PartitionKeyMetadataKey) - if ok { - if okSessionID && partitionKey != sessionID { - return fmt.Errorf("session id %s and partition key %s should be equal when both present", sessionID, partitionKey) - } - - asbMsg.PartitionKey = &partitionKey - } - - contentType, ok, _ := tryGetString(metadata, ContentTypeMetadataKey) - if ok { - asbMsg.ContentType = &contentType - } - - scheduledEnqueueTime, ok, _ := tryGetScheduledEnqueueTime(metadata) - if ok { - asbMsg.ScheduledEnqueueTime = scheduledEnqueueTime - } - - return nil -} - // UpdateASBBatchMessageWithBulkPublishRequest updates the batch message with messages from the bulk publish request. func UpdateASBBatchMessageWithBulkPublishRequest(asbMsgBatch *azservicebus.MessageBatch, req *pubsub.BulkPublishRequest) error { // Add entries from bulk request to batch. for _, entry := range req.Entries { - asbMsg, err := NewASBMessageFromBulkMessageEntry(entry) + asbMsg, err := impl.NewASBMessageFromBulkMessageEntry(entry) if err != nil { return err } @@ -259,24 +128,3 @@ func UpdateASBBatchMessageWithBulkPublishRequest(asbMsgBatch *azservicebus.Messa return nil } - -func tryGetString(props map[string]string, key string) (string, bool, error) { - if val, ok := props[key]; ok && val != "" { - return val, true, nil - } - - return "", false, nil -} - -func tryGetScheduledEnqueueTime(props map[string]string) (*time.Time, bool, error) { - if val, ok := props[ScheduledEnqueueTimeUtcMetadataKey]; ok && val != "" { - timeVal, err := time.Parse(http.TimeFormat, val) - if err != nil { - return nil, false, err - } - - return &timeVal, true, nil - } - - return nil, false, nil -} diff --git a/pubsub/azure/servicebus/message_test.go b/pubsub/azure/servicebus/message_test.go index 1eb8b877e..6ba336440 100644 --- a/pubsub/azure/servicebus/message_test.go +++ b/pubsub/azure/servicebus/message_test.go @@ -19,9 +19,10 @@ import ( "testing" "time" - azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + + azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + impl "github.com/dapr/components-contrib/internal/component/azure/servicebus" ) var ( @@ -69,20 +70,20 @@ func TestAddMessageAttributesToMetadata(t *testing.T) { LockedUntil: &testSampleTime, }, expectedMetadata: map[string]string{ - "metadata." + MessageIDMetadataKey: testMessageID, - "metadata." + SessionIDMetadataKey: testSessionID, - "metadata." + CorrelationIDMetadataKey: testCorrelationID, - "metadata." + LabelMetadataKey: testLabel, // Subject - "metadata." + ReplyToMetadataKey: testReplyTo, - "metadata." + ToMetadataKey: testTo, - "metadata." + ContentTypeMetadataKey: testContentType, - "metadata." + LockTokenMetadataKey: testLockTokenString, - "metadata." + DeliveryCountMetadataKey: "1", - "metadata." + EnqueuedTimeUtcMetadataKey: testSampleTimeHTTPFormat, - "metadata." + SequenceNumberMetadataKey: "1", - "metadata." + ScheduledEnqueueTimeUtcMetadataKey: testSampleTimeHTTPFormat, - "metadata." + PartitionKeyMetadataKey: testPartitionKey, - "metadata." + LockedUntilUtcMetadataKey: testSampleTimeHTTPFormat, + "metadata." + impl.MessageKeyMessageID: testMessageID, + "metadata." + impl.MessageKeySessionID: testSessionID, + "metadata." + impl.MessageKeyCorrelationID: testCorrelationID, + "metadata." + impl.MessageKeyLabel: testLabel, // Subject + "metadata." + impl.MessageKeyReplyTo: testReplyTo, + "metadata." + impl.MessageKeyTo: testTo, + "metadata." + impl.MessageKeyContentType: testContentType, + "metadata." + impl.MessageKeyLockToken: testLockTokenString, + "metadata." + impl.MessageKeyDeliveryCount: "1", + "metadata." + impl.MessageKeyEnqueuedTimeUtc: testSampleTimeHTTPFormat, + "metadata." + impl.MessageKeySequenceNumber: "1", + "metadata." + impl.MessageKeyScheduledEnqueueTimeUtc: testSampleTimeHTTPFormat, + "metadata." + impl.MessageKeyPartitionKey: testPartitionKey, + "metadata." + impl.MessageKeyLockedUntilUtc: testSampleTimeHTTPFormat, }, }, } @@ -101,89 +102,3 @@ func TestAddMessageAttributesToMetadata(t *testing.T) { } } } - -func TestAddMetadataToMessage(t *testing.T) { - testCases := []struct { - name string - metadata map[string]string - expectedAzServiceBusMessage azservicebus.Message - expectError bool - }{ - { - name: "Maps pubsub request to azure service bus message.", - metadata: map[string]string{ - MessageIDMetadataKey: testMessageID, - CorrelationIDMetadataKey: testCorrelationID, - SessionIDMetadataKey: testSessionID, - LabelMetadataKey: testLabel, - ReplyToMetadataKey: testReplyTo, - ToMetadataKey: testTo, - PartitionKeyMetadataKey: testPartitionKey, - ContentTypeMetadataKey: testContentType, - ScheduledEnqueueTimeUtcMetadataKey: testScheduledEnqueueTimeUtc, - }, - expectedAzServiceBusMessage: azservicebus.Message{ - MessageID: &testMessageID, - CorrelationID: &testCorrelationID, - SessionID: &testSessionID, - Subject: &testLabel, - ReplyTo: &testReplyTo, - To: &testTo, - PartitionKey: &testPartitionKey, - ScheduledEnqueueTime: &nowUtc, - ContentType: &testContentType, - }, - expectError: false, - }, - { - name: "Errors when partition key and session id set but not equal.", - metadata: map[string]string{ - MessageIDMetadataKey: testMessageID, - CorrelationIDMetadataKey: testCorrelationID, - SessionIDMetadataKey: testSessionID, - LabelMetadataKey: testLabel, - ReplyToMetadataKey: testReplyTo, - ToMetadataKey: testTo, - PartitionKeyMetadataKey: testPartitionKeyUnique, - ContentTypeMetadataKey: testContentType, - }, - expectedAzServiceBusMessage: azservicebus.Message{ - MessageID: &testMessageID, - CorrelationID: &testCorrelationID, - SessionID: &testSessionID, - Subject: &testLabel, - ReplyTo: &testReplyTo, - To: &testTo, - PartitionKey: &testPartitionKey, - ContentType: &testContentType, - }, - expectError: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - // act. - msg := &azservicebus.Message{} - err := addMetadataToMessage(msg, tc.metadata) - - // assert. - if tc.expectError { - require.NotNil(t, err) - } else { - require.Nil(t, err) - assert.Equal(t, tc.expectedAzServiceBusMessage.Body, msg.Body) - assert.Equal(t, tc.expectedAzServiceBusMessage.MessageID, msg.MessageID) - assert.Equal(t, tc.expectedAzServiceBusMessage.CorrelationID, msg.CorrelationID) - assert.Equal(t, tc.expectedAzServiceBusMessage.SessionID, msg.SessionID) - assert.Equal(t, tc.expectedAzServiceBusMessage.ContentType, msg.ContentType) - assert.Equal(t, tc.expectedAzServiceBusMessage.ReplyTo, msg.ReplyTo) - assert.Equal(t, tc.expectedAzServiceBusMessage.TimeToLive, msg.TimeToLive) - assert.Equal(t, tc.expectedAzServiceBusMessage.To, msg.To) - assert.Equal(t, tc.expectedAzServiceBusMessage.Subject, msg.Subject) - assert.Equal(t, tc.expectedAzServiceBusMessage.PartitionKey, msg.PartitionKey) - assert.Equal(t, tc.expectedAzServiceBusMessage.ScheduledEnqueueTime.Unix(), msg.ScheduledEnqueueTime.Unix()) - } - }) - } -} diff --git a/pubsub/azure/servicebus/metadata.go b/pubsub/azure/servicebus/metadata.go deleted file mode 100644 index 2756c5bc2..000000000 --- a/pubsub/azure/servicebus/metadata.go +++ /dev/null @@ -1,77 +0,0 @@ -/* -Copyright 2021 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package servicebus - -// Reference for settings: -// https://github.com/Azure/azure-service-bus-go/blob/54b2faa53e5216616e59725281be692acc120c34/subscription_manager.go#L101 -type metadata struct { - ConnectionString string `json:"connectionString"` - ConsumerID string `json:"consumerID"` - TimeoutInSec int `json:"timeoutInSec"` - HandlerTimeoutInSec int `json:"handlerTimeoutInSec"` - LockRenewalInSec int `json:"lockRenewalInSec"` - MaxActiveMessages int `json:"maxActiveMessages"` - MaxConnectionRecoveryInSec int `json:"maxConnectionRecoveryInSec"` - MinConnectionRecoveryInSec int `json:"minConnectionRecoveryInSec"` - DisableEntityManagement bool `json:"disableEntityManagement"` - MaxRetriableErrorsPerSec int `json:"maxRetriableErrorsPerSec"` - MaxDeliveryCount *int `json:"maxDeliveryCount"` - LockDurationInSec *int `json:"lockDurationInSec"` - DefaultMessageTimeToLiveInSec *int `json:"defaultMessageTimeToLiveInSec"` - AutoDeleteOnIdleInSec *int `json:"autoDeleteOnIdleInSec"` - MaxConcurrentHandlers *int `json:"maxConcurrentHandlers"` - PublishMaxRetries int `json:"publishMaxRetries"` - PublishInitialRetryIntervalInMs int `json:"publishInitialRetryInternalInMs"` - NamespaceName string `json:"namespaceName,omitempty"` -} - -const ( - // Keys. - connectionString = "connectionString" - consumerID = "consumerID" - timeoutInSec = "timeoutInSec" - handlerTimeoutInSec = "handlerTimeoutInSec" - lockRenewalInSec = "lockRenewalInSec" - maxActiveMessages = "maxActiveMessages" - maxConnectionRecoveryInSec = "maxConnectionRecoveryInSec" - minConnectionRecoveryInSec = "minConnectionRecoveryInSec" - disableEntityManagement = "disableEntityManagement" - maxRetriableErrorsPerSec = "maxRetriableErrorsPerSec" - maxDeliveryCount = "maxDeliveryCount" - lockDurationInSec = "lockDurationInSec" - defaultMessageTimeToLiveInSec = "defaultMessageTimeToLiveInSec" - autoDeleteOnIdleInSec = "autoDeleteOnIdleInSec" - maxConcurrentHandlers = "maxConcurrentHandlers" - publishMaxRetries = "publishMaxRetries" - publishInitialRetryInternalInMs = "publishInitialRetryInternalInMs" - namespaceName = "namespaceName" - - // Deprecated keys. - maxReconnectionAttempts = "maxReconnectionAttempts" - connectionRecoveryInSec = "connectionRecoveryInSec" - - // Defaults. - defaultTimeoutInSec = 60 - defaultHandlerTimeoutInSec = 60 - defaultLockRenewalInSec = 20 - defaultMaxRetriableErrorsPerSec = 10 - // ASB Messages can be up to 256Kb. 10000 messages at this size would roughly use 2.56Gb. - // We should change this if performance testing suggests a more sensible default. - defaultMaxActiveMessages = 10000 - defaultDisableEntityManagement = false - defaultMinConnectionRecoveryInSec = 2 - defaultMaxConnectionRecoveryInSec = 300 - defaultPublishMaxRetries = 5 - defaultPublishInitialRetryInternalInMs = 500 -) diff --git a/pubsub/azure/servicebus/servicebus.go b/pubsub/azure/servicebus/servicebus.go index df7fcb5c7..12213647c 100644 --- a/pubsub/azure/servicebus/servicebus.go +++ b/pubsub/azure/servicebus/servicebus.go @@ -116,8 +116,7 @@ func (a *azureServiceBus) Init(metadata pubsub.Metadata) (err error) { } func (a *azureServiceBus) Publish(req *pubsub.PublishRequest) error { - // a.logger.Debugf("Creating message with body: %s", string(req.Data)) - msg, err := NewASBMessageFromPubsubRequest(req) + msg, err := impl.NewASBMessageFromPubsubRequest(req) if err != nil { return err } diff --git a/pubsub/azure/servicebus/servicebus_test.go b/pubsub/azure/servicebus/servicebus_test.go deleted file mode 100644 index 940d39c42..000000000 --- a/pubsub/azure/servicebus/servicebus_test.go +++ /dev/null @@ -1,611 +0,0 @@ -/* -Copyright 2021 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package servicebus - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - mdata "github.com/dapr/components-contrib/metadata" - "github.com/dapr/components-contrib/pubsub" -) - -const ( - invalidNumber = "invalid_number" -) - -func getFakeProperties() map[string]string { - return map[string]string{ - connectionString: "fakeConnectionString", - namespaceName: "", - consumerID: "fakeConId", - disableEntityManagement: "true", - timeoutInSec: "90", - handlerTimeoutInSec: "30", - maxDeliveryCount: "10", - autoDeleteOnIdleInSec: "240", - defaultMessageTimeToLiveInSec: "2400", - lockDurationInSec: "120", - lockRenewalInSec: "15", - maxConcurrentHandlers: "1", - maxActiveMessages: "100", - minConnectionRecoveryInSec: "5", - maxConnectionRecoveryInSec: "600", - maxRetriableErrorsPerSec: "50", - } -} - -func TestParseServiceBusMetadata(t *testing.T) { - t.Run("metadata is correct", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - - // act. - m, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.NoError(t, err) - assert.Equal(t, fakeProperties[connectionString], m.ConnectionString) - assert.Equal(t, fakeProperties[consumerID], m.ConsumerID) - - assert.Equal(t, 90, m.TimeoutInSec) - assert.Equal(t, true, m.DisableEntityManagement) - assert.Equal(t, 30, m.HandlerTimeoutInSec) - assert.NotNil(t, m.LockRenewalInSec) - assert.Equal(t, 15, m.LockRenewalInSec) - assert.NotNil(t, m.MaxActiveMessages) - assert.Equal(t, 100, m.MaxActiveMessages) - assert.NotNil(t, m.MinConnectionRecoveryInSec) - assert.Equal(t, 5, m.MinConnectionRecoveryInSec) - assert.NotNil(t, m.MaxConnectionRecoveryInSec) - assert.Equal(t, 600, m.MaxConnectionRecoveryInSec) - assert.Equal(t, 50, m.MaxRetriableErrorsPerSec) - - assert.NotNil(t, m.AutoDeleteOnIdleInSec) - assert.Equal(t, 240, *m.AutoDeleteOnIdleInSec) - assert.NotNil(t, m.MaxDeliveryCount) - assert.Equal(t, 10, *m.MaxDeliveryCount) - assert.NotNil(t, m.DefaultMessageTimeToLiveInSec) - assert.Equal(t, 2400, *m.DefaultMessageTimeToLiveInSec) - assert.NotNil(t, m.LockDurationInSec) - assert.Equal(t, 120, *m.LockDurationInSec) - assert.NotNil(t, m.MaxConcurrentHandlers) - assert.Equal(t, 1, *m.MaxConcurrentHandlers) - }) - - t.Run("missing required connectionString and namespaceName", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[connectionString] = "" - fakeMetaData.Properties[namespaceName] = "" - - // act. - m, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Error(t, err) - assertValidErrorMessage(t, err) - assert.Empty(t, m.ConnectionString) - }) - - t.Run("connectionString makes namespace optional", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[namespaceName] = "" - - // act. - m, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.NoError(t, err) - assert.Equal(t, "fakeConnectionString", m.ConnectionString) - }) - - t.Run("namespace makes conectionString optional", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[namespaceName] = "fakeNamespace" - fakeMetaData.Properties[connectionString] = "" - - // act. - m, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.NoError(t, err) - assert.Equal(t, "fakeNamespace", m.NamespaceName) - }) - - t.Run("connectionString and namespace are mutually exclusive", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - - fakeMetaData.Properties[namespaceName] = "fakeNamespace" - - // act. - _, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Error(t, err) - assertValidErrorMessage(t, err) - }) - - t.Run("missing required consumerID", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[consumerID] = "" - - // act. - m, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Error(t, err) - assertValidErrorMessage(t, err) - assert.Empty(t, m.ConsumerID) - }) - - t.Run("missing optional timeoutInSec", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[timeoutInSec] = "" - - // act. - m, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Equal(t, defaultTimeoutInSec, m.TimeoutInSec) - assert.Nil(t, err) - }) - - t.Run("invalid optional timeoutInSec", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[timeoutInSec] = invalidNumber - - // act. - _, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Error(t, err) - assertValidErrorMessage(t, err) - }) - - t.Run("missing optional disableEntityManagement", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[disableEntityManagement] = "" - - // act. - m, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Equal(t, false, m.DisableEntityManagement) - assert.Nil(t, err) - }) - - t.Run("invalid optional disableEntityManagement", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[disableEntityManagement] = "invalid_bool" - - // act. - _, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Error(t, err) - assertValidErrorMessage(t, err) - }) - - t.Run("missing optional handlerTimeoutInSec", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[handlerTimeoutInSec] = "" - - // act. - m, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Equal(t, defaultHandlerTimeoutInSec, m.HandlerTimeoutInSec) - assert.Nil(t, err) - }) - - t.Run("invalid optional handlerTimeoutInSec", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[handlerTimeoutInSec] = invalidNumber - - // act. - _, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Error(t, err) - assertValidErrorMessage(t, err) - }) - - t.Run("missing optional lockRenewalInSec", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[lockRenewalInSec] = "" - - // act. - m, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Equal(t, defaultLockRenewalInSec, m.LockRenewalInSec) - assert.Nil(t, err) - }) - - t.Run("invalid optional lockRenewalInSec", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[lockRenewalInSec] = invalidNumber - - // act. - _, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Error(t, err) - assertValidErrorMessage(t, err) - }) - - t.Run("missing optional maxRetriableErrorsPerSec", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[maxRetriableErrorsPerSec] = "" - - // act. - m, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Equal(t, defaultMaxRetriableErrorsPerSec, m.MaxRetriableErrorsPerSec) - assert.Nil(t, err) - }) - - t.Run("invalid optional maxRetriableErrorsPerSec", func(t *testing.T) { - // NaN: Not a Number - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[maxRetriableErrorsPerSec] = invalidNumber - - // act. - _, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Error(t, err) - assertValidErrorMessage(t, err) - - // Negative number - fakeProperties = getFakeProperties() - - fakeMetaData = pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[maxRetriableErrorsPerSec] = "-1" - - // act. - _, err = parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Error(t, err) - assertValidErrorMessage(t, err) - }) - - t.Run("missing optional maxActiveMessages", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[maxActiveMessages] = "" - - // act. - m, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Equal(t, defaultMaxActiveMessages, m.MaxActiveMessages) - assert.Nil(t, err) - }) - - t.Run("invalid optional maxActiveMessages", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[maxActiveMessages] = invalidNumber - - // act. - _, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Error(t, err) - assertValidErrorMessage(t, err) - }) - - t.Run("missing optional maxConnectionRecoveryInSec", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[maxConnectionRecoveryInSec] = "" - - // act. - m, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Equal(t, defaultMaxConnectionRecoveryInSec, m.MaxConnectionRecoveryInSec) - assert.Nil(t, err) - }) - - t.Run("invalid optional maxConnectionRecoveryInSec", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[maxConnectionRecoveryInSec] = invalidNumber - - // act. - _, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Error(t, err) - assertValidErrorMessage(t, err) - }) - - t.Run("missing optional minConnectionRecoveryInSec", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[minConnectionRecoveryInSec] = "" - - // act. - m, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Equal(t, defaultMinConnectionRecoveryInSec, m.MinConnectionRecoveryInSec) - assert.Nil(t, err) - }) - - t.Run("invalid optional minConnectionRecoveryInSec", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[minConnectionRecoveryInSec] = invalidNumber - - // act. - _, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Error(t, err) - assertValidErrorMessage(t, err) - }) - - t.Run("missing nullable maxDeliveryCount", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[maxDeliveryCount] = "" - - // act. - m, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Nil(t, m.MaxDeliveryCount) - assert.Nil(t, err) - }) - - t.Run("invalid nullable maxDeliveryCount", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[maxDeliveryCount] = invalidNumber - - // act. - _, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Error(t, err) - assertValidErrorMessage(t, err) - }) - - t.Run("missing nullable defaultMessageTimeToLiveInSec", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[defaultMessageTimeToLiveInSec] = "" - - // act. - m, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Nil(t, m.DefaultMessageTimeToLiveInSec) - assert.Nil(t, err) - }) - - t.Run("invalid nullable defaultMessageTimeToLiveInSec", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[defaultMessageTimeToLiveInSec] = invalidNumber - - // act. - _, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Error(t, err) - assertValidErrorMessage(t, err) - }) - - t.Run("missing nullable autoDeleteOnIdleInSec", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[autoDeleteOnIdleInSec] = "" - - // act. - m, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Nil(t, m.AutoDeleteOnIdleInSec) - assert.Nil(t, err) - }) - - t.Run("invalid nullable autoDeleteOnIdleInSec", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[autoDeleteOnIdleInSec] = invalidNumber - - // act. - _, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Error(t, err) - assertValidErrorMessage(t, err) - }) - - t.Run("missing nullable lockDurationInSec", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[lockDurationInSec] = "" - - // act. - m, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Nil(t, m.LockDurationInSec) - assert.Nil(t, err) - }) - - t.Run("invalid nullable lockDurationInSec", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[lockDurationInSec] = invalidNumber - - // act. - _, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Error(t, err) - assertValidErrorMessage(t, err) - }) - - t.Run("missing nullable maxConcurrentHandlers", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[maxConcurrentHandlers] = "" - - // act. - m, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Nil(t, m.MaxConcurrentHandlers) - assert.Nil(t, err) - }) - - t.Run("invalid nullable maxConcurrentHandlers", func(t *testing.T) { - fakeProperties := getFakeProperties() - - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - fakeMetaData.Properties[maxConcurrentHandlers] = invalidNumber - - // act. - _, err := parseAzureServiceBusMetadata(fakeMetaData, nil) - - // assert. - assert.Error(t, err) - assertValidErrorMessage(t, err) - }) -} - -func assertValidErrorMessage(t *testing.T, err error) { - assert.Contains(t, err.Error(), errorMessagePrefix) -}