Add concurrency to Azure ServiceBus pubsub (#179)

* added worker pool for concurrent receive

* fixed missing end quote

* added type alias

* removed shadow decl

* added cleanup logic for subscription receiver

* renamed metadata property to numConcurrentHandlers and added handler specific timeout

Co-authored-by: Yaron Schneider <yaronsc@microsoft.com>
Co-authored-by: Young Bu Park <youngp@microsoft.com>
This commit is contained in:
Joni Collinge 2020-01-24 18:26:11 +00:00 committed by Yaron Schneider
parent 3734491718
commit ca2c3821a2
3 changed files with 137 additions and 10 deletions

View File

@ -11,9 +11,11 @@ type metadata struct {
ConnectionString string `json:"connectionString"`
ConsumerID string `json:"consumerID"`
TimeoutInSec int `json:"timeoutInSec"`
HandlerTimeoutInSec int `json:"handlerTimeoutInSec"`
DisableEntityManagement bool `json:"disableEntityManagement"`
MaxDeliveryCount *int `json:"maxDeliveryCount"`
LockDurationInSec *int `json:"lockDurationInSec"`
DefaultMessageTimeToLiveInSec *int `json:"defaultMessageTimeToLiveInSec"`
AutoDeleteOnIdleInSec *int `json:"autoDeleteOnIdleInSec"`
NumConcurrentHandlers *int `json:"numConcurrentHandlers"`
}

View File

@ -26,13 +26,18 @@ const (
defaultMessageTimeToLiveInSec = "defaultMessageTimeToLiveInSec"
autoDeleteOnIdleInSec = "autoDeleteOnIdleInSec"
disableEntityManagement = "disableEntityManagement"
numConcurrentHandlers = "numConcurrentHandlers"
handlerTimeoutInSec = "handlerTimeoutInSec"
errorMessagePrefix = "azure service bus error:"
// Defaults
defaultTimeoutInSec = 60
defaultHandlerTimeoutInSec = 60
defaultDisableEntityManagement = false
)
type handler = struct{}
type azureServiceBus struct {
metadata metadata
namespace *azservicebus.Namespace
@ -79,6 +84,15 @@ func parseAzureServiceBusMetadata(meta pubsub.Metadata) (metadata, error) {
}
}
m.HandlerTimeoutInSec = defaultHandlerTimeoutInSec
if val, ok := meta.Properties[handlerTimeoutInSec]; ok && val != "" {
var err error
m.HandlerTimeoutInSec, err = strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("%s invalid handlerTimeoutInSec %s, %s", errorMessagePrefix, val, err)
}
}
/* Nullable configuration settings - defaults will be set by the server */
if val, ok := meta.Properties[maxDeliveryCount]; ok && val != "" {
valAsInt, err := strconv.Atoi(val)
@ -112,6 +126,15 @@ func parseAzureServiceBusMetadata(meta pubsub.Metadata) (metadata, error) {
m.AutoDeleteOnIdleInSec = &valAsInt
}
if val, ok := meta.Properties[numConcurrentHandlers]; ok && val != "" {
var err error
valAsInt, err := strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("%s invalid numConcurrentHandlers %s, %s", errorMessagePrefix, val, err)
}
m.NumConcurrentHandlers = &valAsInt
}
return m, nil
}
@ -153,7 +176,7 @@ func (a *azureServiceBus) Publish(req *pubsub.PublishRequest) error {
return nil
}
func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, handler func(msg *pubsub.NewMessage) error) error {
func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, daprHandler func(msg *pubsub.NewMessage) error) error {
subID := a.metadata.ConsumerID
if !a.metadata.DisableEntityManagement {
err := a.ensureSubscription(subID, req.Topic)
@ -171,20 +194,19 @@ func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, handler func(ms
return fmt.Errorf("%s could not instantiate subscription %s for topic %s", errorMessagePrefix, subID, req.Topic)
}
sbHandlerFunc := azservicebus.HandlerFunc(a.getHandlerFunc(req.Topic, handler))
go a.handleSubscriptionMessages(req.Topic, sub, sbHandlerFunc)
asbHandler := azservicebus.HandlerFunc(a.getHandlerFunc(req.Topic, daprHandler))
go a.handleSubscriptionMessages(req.Topic, sub, asbHandler)
return nil
}
func (a *azureServiceBus) getHandlerFunc(topic string, handler func(msg *pubsub.NewMessage) error) func(ctx context.Context, message *azservicebus.Message) error {
func (a *azureServiceBus) getHandlerFunc(topic string, daprHandler func(msg *pubsub.NewMessage) error) func(ctx context.Context, message *azservicebus.Message) error {
return func(ctx context.Context, message *azservicebus.Message) error {
msg := &pubsub.NewMessage{
Data: message.Data,
Topic: topic,
}
err := handler(msg)
err := daprHandler(msg)
if err != nil {
return message.Abandon(ctx)
}
@ -192,9 +214,43 @@ func (a *azureServiceBus) getHandlerFunc(topic string, handler func(msg *pubsub.
}
}
func (a *azureServiceBus) handleSubscriptionMessages(topic string, sub *azservicebus.Subscription, handlerFunc azservicebus.HandlerFunc) {
func (a *azureServiceBus) handleSubscriptionMessages(topic string, sub *azservicebus.Subscription, asbHandler azservicebus.HandlerFunc) {
// Limiting the number of concurrent handlers will stop this
// components creating an unbounded amount of gorountines for
// each handle invocation.
limitNumConcurrentHandlers := a.metadata.NumConcurrentHandlers != nil
var handlers chan handler
if limitNumConcurrentHandlers {
handlers = make(chan handler, *a.metadata.NumConcurrentHandlers)
for i := 0; i < *a.metadata.NumConcurrentHandlers; i++ {
handlers <- handler{}
}
defer close(handlers)
}
var concurrentAsbHandler azservicebus.HandlerFunc = func(ctx context.Context, msg *azservicebus.Message) error {
go func() {
if limitNumConcurrentHandlers {
<-handlers // Take or wait on a free handler
defer func() {
handlers <- handler{} // Release a handler
} ()
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.metadata.HandlerTimeoutInSec))
defer cancel()
err := asbHandler(ctx, msg)
if err != nil {
log.Errorf("%s error handling message from topic '%s', %s", errorMessagePrefix, topic, err)
}
} ()
return nil
}
for {
if err := sub.Receive(context.Background(), handlerFunc); err != nil {
if err := sub.Receive(context.Background(), concurrentAsbHandler); err != nil {
log.Errorf("%s error receiving from topic %s, %s", errorMessagePrefix, topic, err)
// Must close to reset sub's receiver
if err := sub.Close(context.Background()); err != nil {

View File

@ -23,10 +23,12 @@ func getFakeProperties() map[string]string {
consumerID: "fakeConId",
disableEntityManagement: "true",
timeoutInSec: "90",
handlerTimeoutInSec: "30",
maxDeliveryCount: "10",
autoDeleteOnIdleInSec: "240",
defaultMessageTimeToLiveInSec: "2400",
lockDurationInSec: "120",
numConcurrentHandlers: "1",
}
}
@ -48,6 +50,7 @@ func TestParseServiceBusMetadata(t *testing.T) {
assert.Equal(t, 90, m.TimeoutInSec)
assert.Equal(t, true, m.DisableEntityManagement)
assert.Equal(t, 30, m.HandlerTimeoutInSec)
assert.NotNil(t, m.AutoDeleteOnIdleInSec)
assert.Equal(t, 240, *m.AutoDeleteOnIdleInSec)
@ -57,6 +60,8 @@ func TestParseServiceBusMetadata(t *testing.T) {
assert.Equal(t, 2400, *m.DefaultMessageTimeToLiveInSec)
assert.NotNil(t, m.LockDurationInSec)
assert.Equal(t, 120, *m.LockDurationInSec)
assert.NotNil(t, m.NumConcurrentHandlers)
assert.Equal(t, 1, *m.NumConcurrentHandlers)
})
t.Run("missing required connectionString", func(t *testing.T) {
@ -105,7 +110,7 @@ func TestParseServiceBusMetadata(t *testing.T) {
m, err := parseAzureServiceBusMetadata(fakeMetaData)
// assert
assert.Equal(t, m.TimeoutInSec, 60)
assert.Equal(t, 60, m.TimeoutInSec)
assert.Nil(t, err)
})
@ -137,7 +142,7 @@ func TestParseServiceBusMetadata(t *testing.T) {
m, err := parseAzureServiceBusMetadata(fakeMetaData)
// assert
assert.Equal(t, m.DisableEntityManagement, false)
assert.Equal(t, false, m.DisableEntityManagement)
assert.Nil(t, err)
})
@ -157,6 +162,38 @@ func TestParseServiceBusMetadata(t *testing.T) {
assertValidErrorMessage(t, err)
})
t.Run("missing optional handlerTimeoutInSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Properties: fakeProperties,
}
fakeMetaData.Properties[handlerTimeoutInSec] = ""
// act
m, err := parseAzureServiceBusMetadata(fakeMetaData)
// assert
assert.Equal(t, 60, m.HandlerTimeoutInSec)
assert.Nil(t, err)
})
t.Run("invalid optional handlerTimeoutInSec", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Properties: fakeProperties,
}
fakeMetaData.Properties[handlerTimeoutInSec] = invalidNumber
// act
_, err := parseAzureServiceBusMetadata(fakeMetaData)
// assert
assert.Error(t, err)
assertValidErrorMessage(t, err)
})
t.Run("missing nullable maxDeliveryCount", func(t *testing.T) {
fakeProperties := getFakeProperties()
@ -284,6 +321,38 @@ func TestParseServiceBusMetadata(t *testing.T) {
assert.Error(t, err)
assertValidErrorMessage(t, err)
})
t.Run("missing nullable numConcurrentHandlers", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Properties: fakeProperties,
}
fakeMetaData.Properties[numConcurrentHandlers] = ""
// act
m, err := parseAzureServiceBusMetadata(fakeMetaData)
// assert
assert.Nil(t, m.NumConcurrentHandlers)
assert.Nil(t, err)
})
t.Run("invalid nullable numConcurrentHandlers", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Properties: fakeProperties,
}
fakeMetaData.Properties[numConcurrentHandlers] = invalidNumber
// act
_, err := parseAzureServiceBusMetadata(fakeMetaData)
// assert
assert.Error(t, err)
assertValidErrorMessage(t, err)
})
}
func assertValidErrorMessage(t *testing.T, err error) {