parent
3060cc7a42
commit
0e78291bf5
|
|
@ -1,8 +1,9 @@
|
|||
package servicebusqueues
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
|
@ -27,6 +28,18 @@ type serviceBusQueuesMetadata struct {
|
|||
}
|
||||
|
||||
const (
|
||||
// Keys
|
||||
connectionString = "connectionString"
|
||||
namespaceName = "namespaceName"
|
||||
queueName = "queueName"
|
||||
timeoutInSec = "timeoutInSec"
|
||||
maxConnectionRecoveryInSec = "maxConnectionRecoveryInSec"
|
||||
minConnectionRecoveryInSec = "minConnectionRecoveryInSec"
|
||||
maxRetriableErrorsPerSec = "maxRetriableErrorsPerSec"
|
||||
maxActiveMessages = "maxActiveMessages"
|
||||
lockRenewalInSec = "lockRenewalInSec"
|
||||
maxConcurrentHandlers = "maxConcurrentHandlers"
|
||||
|
||||
// Default time to live for queues, which is 14 days. The same way Azure Portal does.
|
||||
defaultMessageTimeToLive = time.Hour * 24 * 14
|
||||
|
||||
|
|
@ -50,18 +63,19 @@ const (
|
|||
|
||||
// Default rate of retriable errors per second
|
||||
defaultMaxRetriableErrorsPerSec = 10
|
||||
|
||||
errorMessagePrefix = "azure service bus error:"
|
||||
)
|
||||
|
||||
func (a *AzureServiceBusQueues) parseMetadata(metadata bindings.Metadata) (*serviceBusQueuesMetadata, error) {
|
||||
b, err := json.Marshal(metadata.Properties)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
m := serviceBusQueuesMetadata{}
|
||||
|
||||
if val, ok := metadata.Properties[connectionString]; ok && val != "" {
|
||||
m.ConnectionString = val
|
||||
}
|
||||
|
||||
var m serviceBusQueuesMetadata
|
||||
err = json.Unmarshal(b, &m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if val, ok := metadata.Properties[namespaceName]; ok && val != "" {
|
||||
m.NamespaceName = val
|
||||
}
|
||||
|
||||
if m.ConnectionString != "" && m.NamespaceName != "" {
|
||||
|
|
@ -78,46 +92,74 @@ func (a *AzureServiceBusQueues) parseMetadata(metadata bindings.Metadata) (*serv
|
|||
}
|
||||
m.ttl = ttl
|
||||
|
||||
// Queue names are case-insensitive and are forced to lowercase. This mimics the Azure portal's behavior.
|
||||
m.QueueName = strings.ToLower(m.QueueName)
|
||||
|
||||
if m.TimeoutInSec < 1 {
|
||||
m.TimeoutInSec = defaultTimeoutInSec
|
||||
if val, ok := metadata.Properties[queueName]; ok && val != "" {
|
||||
// Queue names are case-insensitive and are forced to lowercase. This mimics the Azure portal's behavior.
|
||||
m.QueueName = strings.ToLower(val)
|
||||
}
|
||||
|
||||
if m.MinConnectionRecoveryInSec < 1 {
|
||||
m.MinConnectionRecoveryInSec = defaultMinConnectionRecoveryInSec
|
||||
/* Optional configuration settings - defaults will be set by the client. */
|
||||
m.TimeoutInSec = defaultTimeoutInSec
|
||||
if val, ok := metadata.Properties[timeoutInSec]; ok && val != "" {
|
||||
m.TimeoutInSec, err = strconv.Atoi(val)
|
||||
if err != nil {
|
||||
return &m, fmt.Errorf("%s invalid timeoutInSec %s, %s", errorMessagePrefix, val, err)
|
||||
}
|
||||
}
|
||||
|
||||
if m.MaxConnectionRecoveryInSec < 1 {
|
||||
m.MaxConnectionRecoveryInSec = defaultMaxConnectionRecoveryInSec
|
||||
m.MinConnectionRecoveryInSec = defaultMinConnectionRecoveryInSec
|
||||
if val, ok := metadata.Properties[minConnectionRecoveryInSec]; ok && val != "" {
|
||||
m.MinConnectionRecoveryInSec, err = strconv.Atoi(val)
|
||||
if err != nil {
|
||||
return &m, fmt.Errorf("%s invalid minConnectionRecoveryInSec %s, %s", errorMessagePrefix, val, err)
|
||||
}
|
||||
}
|
||||
|
||||
if m.MinConnectionRecoveryInSec > m.MaxConnectionRecoveryInSec {
|
||||
return nil, errors.New("maxConnectionRecoveryInSec must be greater than minConnectionRecoveryInSec")
|
||||
m.MaxConnectionRecoveryInSec = defaultMaxConnectionRecoveryInSec
|
||||
if val, ok := metadata.Properties[maxConnectionRecoveryInSec]; ok && val != "" {
|
||||
m.MaxConnectionRecoveryInSec, err = strconv.Atoi(val)
|
||||
if err != nil {
|
||||
return &m, fmt.Errorf("%s invalid maxConnectionRecoveryInSec %s, %s", errorMessagePrefix, val, err)
|
||||
}
|
||||
}
|
||||
|
||||
if m.MaxActiveMessages < 1 {
|
||||
m.MaxActiveMessages = defaultMaxActiveMessages
|
||||
m.MaxActiveMessages = defaultMaxActiveMessages
|
||||
if val, ok := metadata.Properties[maxActiveMessages]; ok && val != "" {
|
||||
m.MaxActiveMessages, err = strconv.Atoi(val)
|
||||
if err != nil {
|
||||
return &m, fmt.Errorf("%s invalid maxActiveMessages %s, %s", errorMessagePrefix, val, err)
|
||||
}
|
||||
}
|
||||
|
||||
if m.MaxConcurrentHandlers < 1 {
|
||||
m.MaxConcurrentHandlers = defaultMaxConcurrentHandlers
|
||||
m.MaxConcurrentHandlers = defaultMaxConcurrentHandlers
|
||||
if val, ok := metadata.Properties[maxConcurrentHandlers]; ok && val != "" {
|
||||
m.MaxConcurrentHandlers, err = strconv.Atoi(val)
|
||||
if err != nil {
|
||||
return &m, fmt.Errorf("%s invalid maxConcurrentHandlers %s, %s", errorMessagePrefix, val, err)
|
||||
}
|
||||
}
|
||||
|
||||
if m.MaxConcurrentHandlers > m.MaxActiveMessages {
|
||||
return nil, errors.New("maxConcurrentHandlers cannot be bigger than maxActiveMessages")
|
||||
return nil, fmt.Errorf("%s maxConcurrentHandlers cannot be bigger than maxActiveMessages, %s", errorMessagePrefix, err)
|
||||
}
|
||||
|
||||
if m.LockRenewalInSec < 1 {
|
||||
m.LockRenewalInSec = defaultLockRenewalInSec
|
||||
m.LockRenewalInSec = defaultLockRenewalInSec
|
||||
if val, ok := metadata.Properties[lockRenewalInSec]; ok && val != "" {
|
||||
m.LockRenewalInSec, err = strconv.Atoi(val)
|
||||
if err != nil {
|
||||
return &m, fmt.Errorf("%s invalid lockRenewalInSec %s, %s", errorMessagePrefix, val, err)
|
||||
}
|
||||
}
|
||||
|
||||
if m.MaxRetriableErrorsPerSec == nil {
|
||||
m.MaxRetriableErrorsPerSec = to.Ptr(defaultMaxRetriableErrorsPerSec)
|
||||
}
|
||||
if *m.MaxRetriableErrorsPerSec < 0 {
|
||||
return nil, errors.New("maxRetriableErrorsPerSec must be non-negative")
|
||||
m.MaxRetriableErrorsPerSec = to.Ptr(defaultMaxRetriableErrorsPerSec)
|
||||
if val, ok := metadata.Properties[maxRetriableErrorsPerSec]; ok && val != "" {
|
||||
mRetriableErrorsPerSec, err := strconv.Atoi(val)
|
||||
if err != nil {
|
||||
return &m, fmt.Errorf("%s invalid lockRenewalInSec %s, %s", errorMessagePrefix, val, err)
|
||||
}
|
||||
if mRetriableErrorsPerSec < 0 {
|
||||
return nil, fmt.Errorf("%smaxRetriableErrorsPerSec must be non-negative, %s", errorMessagePrefix, err)
|
||||
}
|
||||
m.MaxRetriableErrorsPerSec = to.Ptr(mRetriableErrorsPerSec)
|
||||
}
|
||||
|
||||
return &m, nil
|
||||
|
|
|
|||
|
|
@ -28,32 +28,74 @@ func TestParseMetadata(t *testing.T) {
|
|||
oneSecondDuration := time.Second
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
properties map[string]string
|
||||
expectedConnectionString string
|
||||
expectedQueueName string
|
||||
expectedTTL time.Duration
|
||||
name string
|
||||
properties map[string]string
|
||||
expectedConnectionString string
|
||||
expectedQueueName string
|
||||
expectedTTL time.Duration
|
||||
expectedTimeoutInSec int
|
||||
expectedMaxConnectionRecoveryInSec int
|
||||
expectedMinConnectionRecoveryInSec int
|
||||
expectedMaxRetriableErrorsPerSec int
|
||||
expectedMaxActiveMessages int
|
||||
expectedLockRenewalInSec int
|
||||
expectedMaxConcurrentHandlers int
|
||||
}{
|
||||
{
|
||||
name: "ConnectionString and queue name",
|
||||
properties: map[string]string{"connectionString": "connString", "queueName": "queue1"},
|
||||
expectedConnectionString: "connString",
|
||||
expectedQueueName: "queue1",
|
||||
expectedTTL: defaultMessageTimeToLive,
|
||||
name: "ConnectionString and queue name",
|
||||
properties: map[string]string{"connectionString": "connString", "queueName": "queue1"},
|
||||
expectedConnectionString: "connString",
|
||||
expectedQueueName: "queue1",
|
||||
expectedTTL: defaultMessageTimeToLive,
|
||||
expectedTimeoutInSec: defaultTimeoutInSec,
|
||||
expectedMaxConnectionRecoveryInSec: defaultMaxConnectionRecoveryInSec,
|
||||
expectedMinConnectionRecoveryInSec: defaultMinConnectionRecoveryInSec,
|
||||
expectedMaxRetriableErrorsPerSec: defaultMaxRetriableErrorsPerSec,
|
||||
expectedMaxActiveMessages: defaultMaxActiveMessages,
|
||||
expectedLockRenewalInSec: defaultLockRenewalInSec,
|
||||
expectedMaxConcurrentHandlers: defaultMaxConcurrentHandlers,
|
||||
},
|
||||
{
|
||||
name: "Empty TTL",
|
||||
properties: map[string]string{"connectionString": "connString", "queueName": "queue1", metadata.TTLMetadataKey: ""},
|
||||
expectedConnectionString: "connString",
|
||||
expectedQueueName: "queue1",
|
||||
expectedTTL: defaultMessageTimeToLive,
|
||||
name: "ConnectionString, queue name and all optional values",
|
||||
properties: map[string]string{"connectionString": "connString", "queueName": "queue1", "timeoutInSec": "30", "minConnectionRecoveryInSec": "1", "maxConnectionRecoveryInSec": "200", "maxRetriableErrorsPerSec": "20", "maxActiveMessages": "10", "maxConcurrentHandlers": "2", "lockRenewalInSec": "30"},
|
||||
expectedConnectionString: "connString",
|
||||
expectedQueueName: "queue1",
|
||||
expectedTTL: defaultMessageTimeToLive,
|
||||
expectedTimeoutInSec: 30,
|
||||
expectedMaxConnectionRecoveryInSec: 200,
|
||||
expectedMinConnectionRecoveryInSec: 1,
|
||||
expectedMaxRetriableErrorsPerSec: 20,
|
||||
expectedMaxActiveMessages: 10,
|
||||
expectedMaxConcurrentHandlers: 2,
|
||||
expectedLockRenewalInSec: 30,
|
||||
},
|
||||
{
|
||||
name: "With TTL",
|
||||
properties: map[string]string{"connectionString": "connString", "queueName": "queue1", metadata.TTLMetadataKey: "1"},
|
||||
expectedConnectionString: "connString",
|
||||
expectedQueueName: "queue1",
|
||||
expectedTTL: oneSecondDuration,
|
||||
name: "Empty TTL",
|
||||
properties: map[string]string{"connectionString": "connString", "queueName": "queue1", metadata.TTLMetadataKey: ""},
|
||||
expectedConnectionString: "connString",
|
||||
expectedQueueName: "queue1",
|
||||
expectedTTL: defaultMessageTimeToLive,
|
||||
expectedTimeoutInSec: defaultTimeoutInSec,
|
||||
expectedMaxConnectionRecoveryInSec: defaultMaxConnectionRecoveryInSec,
|
||||
expectedMinConnectionRecoveryInSec: defaultMinConnectionRecoveryInSec,
|
||||
expectedMaxRetriableErrorsPerSec: defaultMaxRetriableErrorsPerSec,
|
||||
expectedMaxActiveMessages: defaultMaxActiveMessages,
|
||||
expectedLockRenewalInSec: defaultLockRenewalInSec,
|
||||
expectedMaxConcurrentHandlers: defaultMaxConcurrentHandlers,
|
||||
},
|
||||
{
|
||||
name: "With TTL",
|
||||
properties: map[string]string{"connectionString": "connString", "queueName": "queue1", metadata.TTLMetadataKey: "1"},
|
||||
expectedConnectionString: "connString",
|
||||
expectedQueueName: "queue1",
|
||||
expectedTTL: oneSecondDuration,
|
||||
expectedTimeoutInSec: defaultTimeoutInSec,
|
||||
expectedMaxConnectionRecoveryInSec: defaultMaxConnectionRecoveryInSec,
|
||||
expectedMinConnectionRecoveryInSec: defaultMinConnectionRecoveryInSec,
|
||||
expectedMaxRetriableErrorsPerSec: defaultMaxRetriableErrorsPerSec,
|
||||
expectedMaxActiveMessages: defaultMaxActiveMessages,
|
||||
expectedLockRenewalInSec: defaultLockRenewalInSec,
|
||||
expectedMaxConcurrentHandlers: defaultMaxConcurrentHandlers,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue