ASB lock renewal per topic (#365)

* bump asb sdk version to 0.10.2

* add isolation per topic for lock renewal

* added cancellations on async handlers

* refactored out subscription logic from namespace

* updated error handling

* defer async context cancel

* fixed lint issues

* add connection recovery

* fix lint

* update comments

* update comments

Co-authored-by: Yaron Schneider <yaronsc@microsoft.com>
This commit is contained in:
Joni Collinge 2020-07-02 16:02:42 +01:00 committed by GitHub
parent a086a42f2b
commit 4b2252b017
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 337 additions and 162 deletions

12
go.mod
View File

@ -11,10 +11,11 @@ require (
contrib.go.opencensus.io/exporter/zipkin v0.1.1
github.com/Azure/azure-event-hubs-go v1.3.1
github.com/Azure/azure-sdk-for-go v42.0.0+incompatible
github.com/Azure/azure-service-bus-go v0.9.1
github.com/Azure/azure-service-bus-go v0.10.2
github.com/Azure/azure-storage-blob-go v0.8.0
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd
github.com/Azure/go-autorest/autorest v0.10.0
github.com/Azure/go-amqp v0.12.7 // indirect
github.com/Azure/go-autorest/autorest v0.10.2
github.com/Azure/go-autorest/autorest/adal v0.8.3
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2
github.com/Shopify/sarama v1.23.1
@ -45,13 +46,14 @@ require (
github.com/hazelcast/hazelcast-go-client v0.0.0-20190530123621-6cf767c2f31a
github.com/json-iterator/go v1.1.8
github.com/kubernetes-client/go v0.0.0-20190625181339-cd8e39e789c7
github.com/mitchellh/mapstructure v1.3.2 // indirect
github.com/nats-io/gnatsd v1.4.1
github.com/nats-io/go-nats v1.7.2
github.com/nats-io/nats-streaming-server v0.17.0 // indirect
github.com/nats-io/nats.go v1.9.1
github.com/nats-io/stan.go v0.6.0
github.com/openzipkin/zipkin-go v0.1.6
github.com/pkg/errors v0.8.1
github.com/pkg/errors v0.9.1
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da
github.com/satori/go.uuid v1.2.0
github.com/sendgrid/rest v2.4.1+incompatible // indirect
@ -66,8 +68,8 @@ require (
go.etcd.io/etcd v3.3.17+incompatible
go.mongodb.org/mongo-driver v1.1.2
go.opencensus.io v0.22.3
golang.org/x/crypto v0.0.0-20200206161412-a0c6ece9d31a
golang.org/x/net v0.0.0-20200202094626-16171245cfb2
golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9
golang.org/x/net v0.0.0-20200602114024-627f9648deb9
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
google.golang.org/api v0.15.0
google.golang.org/genproto v0.0.0-20200122232147-0452cf42e150

21
go.sum
View File

@ -28,6 +28,8 @@ github.com/Azure/azure-amqp-common-go v1.1.4 h1:DmPXxmLZwi/71CgRTZIKR6yiKEW3eC42
github.com/Azure/azure-amqp-common-go v1.1.4/go.mod h1:FhZtXirFANw40UXI2ntweO+VOkfaw8s6vZxUiRhLYW8=
github.com/Azure/azure-amqp-common-go/v2 v2.1.0 h1:+QbFgmWCnPzdaRMfsI0Yb6GrRdBj5jVL8N3EXuEUcBQ=
github.com/Azure/azure-amqp-common-go/v2 v2.1.0/go.mod h1:R8rea+gJRuJR6QxTir/XuEd+YuKoUiazDC/N96FiDEU=
github.com/Azure/azure-amqp-common-go/v3 v3.0.0 h1:j9tjcwhypb/jek3raNrwlCIl7iKQYOug7CLpSyBBodc=
github.com/Azure/azure-amqp-common-go/v3 v3.0.0/go.mod h1:SY08giD/XbhTz07tJdpw1SoxQXHPN30+DI3Z04SYqyg=
github.com/Azure/azure-event-hubs-go v1.3.1 h1:vKw7tLOFJ8kVMkhNvOXZWz+3purRQ/WTe60+bavZ5qc=
github.com/Azure/azure-event-hubs-go v1.3.1/go.mod h1:me2m3+0WC7G7JRBTWI5SQ81s2TYyOqgV3JIpYg86jZA=
github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
@ -39,15 +41,22 @@ github.com/Azure/azure-sdk-for-go v29.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9mo
github.com/Azure/azure-sdk-for-go v30.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v33.4.0+incompatible h1:yzJKzcKTX0WwDdZC8kAqxiGVZz66uqpajhgphstEUN0=
github.com/Azure/azure-sdk-for-go v33.4.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v37.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v42.0.0+incompatible h1:yz6sFf5bHZ+gEOQVuK5JhPqTTAmv+OvSLSaqgzqaCwY=
github.com/Azure/azure-sdk-for-go v42.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-service-bus-go v0.9.1 h1:G1qBLQvHCFDv9pcpgwgFkspzvnGknJRR0PYJ9ytY/JA=
github.com/Azure/azure-service-bus-go v0.9.1/go.mod h1:yzBx6/BUGfjfeqbRZny9AQIbIe3AcV9WZbAdpkoXOa0=
github.com/Azure/azure-service-bus-go v0.10.2 h1:yrGKscDQqNXnFXzYfukW34p8Jr4LmzaAOgYaQKEsipM=
github.com/Azure/azure-service-bus-go v0.10.2/go.mod h1:E/FOceuKAFUfpbIJDKWz/May6guE+eGibfGT6q+n1to=
github.com/Azure/azure-storage-blob-go v0.0.0-20181023070848-cf01652132cc/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
github.com/Azure/azure-storage-blob-go v0.8.0 h1:53qhf0Oxa0nOjgbDeeYPUeyiNmafAFEY95rZLK0Tj6o=
github.com/Azure/azure-storage-blob-go v0.8.0/go.mod h1:lPI3aLPpuLTeUwh1sViKXFxwl2B6teiRqI0deQUvsw0=
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd h1:b3wyxBl3vvr15tUAziPBPK354y+LSdfPCpex5oBttHo=
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd/go.mod h1:K6am8mT+5iFXgingS9LUc7TmbsW6XBw3nxaRyaMyWc8=
github.com/Azure/go-amqp v0.12.6 h1:34yItuwhA/nusvq2sPSNPQxZLCf/CtaogYH8n578mnY=
github.com/Azure/go-amqp v0.12.6/go.mod h1:qApuH6OFTSKZFmCOxccvAv5rLizBQf4v8pRmG138DPo=
github.com/Azure/go-amqp v0.12.7 h1:/Uyqh30J5JrDFAOERQtEqP0qPWkrNXxr94vRnSa54Ac=
github.com/Azure/go-amqp v0.12.7/go.mod h1:qApuH6OFTSKZFmCOxccvAv5rLizBQf4v8pRmG138DPo=
github.com/Azure/go-autorest v13.3.0+incompatible h1:8Ix0VdeOllBx9jEcZ2Wb1uqWUpE1awmJiaHztwaJCPk=
github.com/Azure/go-autorest v13.3.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
@ -57,6 +66,8 @@ github.com/Azure/go-autorest/autorest v0.9.3 h1:OZEIaBbMdUE/Js+BQKlpO81XlISgipr6
github.com/Azure/go-autorest/autorest v0.9.3/go.mod h1:GsRuLYvwzLjjjRoWEIyMUaYq8GNUx2nRB378IPt/1p0=
github.com/Azure/go-autorest/autorest v0.10.0 h1:mvdtztBqcL8se7MdrUweNieTNi4kfNG6GOJuurQJpuY=
github.com/Azure/go-autorest/autorest v0.10.0/go.mod h1:/FALq9T/kS7b5J5qsQ+RSTUdAmGFqi0vUdVNNx8q630=
github.com/Azure/go-autorest/autorest v0.10.2 h1:NuSF3gXetiHyUbVdneJMEVyPUYAe5wh+aN08JYAf1tI=
github.com/Azure/go-autorest/autorest v0.10.2/go.mod h1:/FALq9T/kS7b5J5qsQ+RSTUdAmGFqi0vUdVNNx8q630=
github.com/Azure/go-autorest/autorest/adal v0.5.0 h1:q2gDruN08/guU9vAjuPWff0+QIrpH6ediguzdAzXAUU=
github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0=
github.com/Azure/go-autorest/autorest/adal v0.6.0 h1:UCTq22yE3RPgbU/8u4scfnnzuCW6pwQ9n+uBtV78ouo=
@ -201,6 +212,7 @@ github.com/dghubble/oauth1 v0.6.0/go.mod h1:8pFdfPkv/jr8mkChVbNVuJ0suiHe278BtWI4
github.com/dghubble/sling v1.3.0 h1:pZHjCJq4zJvc6qVQ5wN1jo5oNZlNE0+8T/h0XeXBUKU=
github.com/dghubble/sling v1.3.0/go.mod h1:XXShWaBWKzNLhu2OxikSNFrlsvowtz4kyRuXUG7oQKY=
github.com/dgrijalva/jwt-go v0.0.0-20160705203006-01aeca54ebda/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgrijalva/jwt-go v1.0.2 h1:KPldsxuKGsS2FPWsNeg9ZO18aCrGKujPoWXn2yo+KQM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
@ -534,6 +546,8 @@ github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0Qu
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.3.2 h1:mRS76wmkOn3KkKAyXDu42V+6ebnXWIztFSYGN7GeoRg=
github.com/mitchellh/mapstructure v1.3.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@ -614,6 +628,8 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@ -806,6 +822,8 @@ golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 h1:ULYEB3JvPRE/IfO+9uO7vK
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200206161412-a0c6ece9d31a h1:aczoJ0HPNE92XKa7DrIzkNN6esOKO2TBwiiYoKcINhA=
golang.org/x/crypto v0.0.0-20200206161412-a0c6ece9d31a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9 h1:vEg9joUBmeBcK9iSJftGNf3coIG4HqZElCPehJsfAYM=
golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@ -860,6 +878,8 @@ golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa h1:F+8P+gmewFQYRk6JoLQLwjBCT
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200602114024-627f9648deb9 h1:pNX+40auqi2JqRfOP1akLGtYcn15TUbkhwuCO3foqqM=
golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -910,6 +930,7 @@ golang.org/x/sys v0.0.0-20200113162924-86b910548bc1 h1:gZpLHxUX5BdYLA08Lj4YCJNN/
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 h1:LfCXLvNmTYH9kEmVgqbnsWfruoXZIrh4YBgqVHtDvw0=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

View File

@ -9,7 +9,6 @@ import (
"context"
"fmt"
"strconv"
"sync"
"time"
azservicebus "github.com/Azure/azure-service-bus-go"
@ -44,24 +43,24 @@ const (
defaultMaxActiveMessages = 10000
defaultMaxActiveMessagesRecoveryInSec = 2
defaultDisableEntityManagement = false
maxReconnAttempts = 10
connectionRecoveryInSec = 2
)
type handler = struct{}
type azureServiceBus struct {
metadata metadata
namespace *azservicebus.Namespace
topicManager *azservicebus.TopicManager
activeMessages map[string]*azservicebus.Message
mu sync.RWMutex
logger logger.Logger
metadata metadata
namespace *azservicebus.Namespace
topicManager *azservicebus.TopicManager
logger logger.Logger
}
// NewAzureServiceBus returns a new Azure ServiceBus pub-sub implementation
func NewAzureServiceBus(logger logger.Logger) pubsub.PubSub {
return &azureServiceBus{
activeMessages: make(map[string]*azservicebus.Message),
logger: logger,
logger: logger,
}
}
@ -230,7 +229,7 @@ func (a *azureServiceBus) Publish(req *pubsub.PublishRequest) error {
return nil
}
func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, daprHandler func(msg *pubsub.NewMessage) error) error {
func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, appHandler func(msg *pubsub.NewMessage) error) error {
subID := a.metadata.ConsumerID
if !a.metadata.DisableEntityManagement {
err := a.ensureSubscription(subID, req.Topic)
@ -238,167 +237,89 @@ func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, daprHandler fun
return err
}
}
topic, err := a.namespace.NewTopic(req.Topic)
if err != nil {
return fmt.Errorf("%s could not instantiate topic %s, %s", errorMessagePrefix, req.Topic, err)
}
var opts []azservicebus.SubscriptionOption
if a.metadata.PrefetchCount != nil {
opts = append(opts, azservicebus.SubscriptionWithPrefetchCount(uint32(*a.metadata.PrefetchCount)))
}
sub, err := topic.NewSubscription(subID, opts...)
if err != nil {
return fmt.Errorf("%s could not instantiate subscription %s for topic %s", errorMessagePrefix, subID, req.Topic)
}
asbHandler := azservicebus.HandlerFunc(a.getHandlerFunc(req.Topic, daprHandler))
go a.handleSubscriptionMessages(req.Topic, sub, asbHandler)
return nil
}
func (a *azureServiceBus) getHandlerFunc(topic string, daprHandler func(msg *pubsub.NewMessage) error) func(ctx context.Context, message *azservicebus.Message) error {
return func(ctx context.Context, message *azservicebus.Message) error {
msg := &pubsub.NewMessage{
Data: message.Data,
Topic: topic,
go func() {
// Limit the number of attempted reconnects we make
reconnAttempts := make(chan struct{}, maxReconnAttempts)
for i := 0; i < maxReconnAttempts; i++ {
reconnAttempts <- struct{}{}
}
a.logger.Debugf("Calling app's handler for message %s", message.ID)
err := daprHandler(msg)
if err != nil {
a.logger.Debugf("Error in app's handler: %+v", err)
return a.abandonMessage(ctx, message)
}
return a.completeMessage(ctx, message)
}
}
// len(reconnAttempts) should be considered stale but we can afford a little error here
readAttemptsStale := func() int { return len(reconnAttempts) }
func (a *azureServiceBus) handleSubscriptionMessages(topic string, sub *azservicebus.Subscription, asbHandler azservicebus.HandlerFunc) {
// Limiting the number of concurrent handlers will throttle
// how many messages are processed concurrently.
limitConcurrentHandlers := a.metadata.MaxConcurrentHandlers != nil
var handlers chan handler
if limitConcurrentHandlers {
a.logger.Debugf("Limited to %d message handler(s)", *a.metadata.MaxConcurrentHandlers)
handlers = make(chan handler, *a.metadata.MaxConcurrentHandlers)
for i := 0; i < *a.metadata.MaxConcurrentHandlers; i++ {
handlers <- handler{}
}
defer close(handlers)
}
// Async message handler
var asyncAsbHandler azservicebus.HandlerFunc = func(ctx context.Context, msg *azservicebus.Message) error {
a.addActiveMessage(msg)
// Process messages asynchronously
// Periodically refill the reconnect attempts channel to avoid
// exhausting all the refill attempts due to intermittent issues
// ocurring over a longer period of time.
reconnCtx, reconnCancel := context.WithCancel(context.TODO())
defer reconnCancel()
go func() {
if limitConcurrentHandlers {
a.logger.Debugf("Attempting to take message handler...")
<-handlers // Take or wait on a free handler before getting a new message
a.logger.Debugf("Taken message handler")
defer func() {
a.logger.Debugf("Releasing message handler...")
handlers <- handler{} // Release a handler
a.logger.Debugf("Released message handler")
}()
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.metadata.HandlerTimeoutInSec))
defer cancel()
a.logger.Debugf("Handling message %s", msg.ID)
err := asbHandler(ctx, msg)
if err != nil {
a.logger.Errorf("%s error handling message %s from topic '%s', %s", errorMessagePrefix, msg.ID, topic, err)
for {
select {
case <-reconnCtx.Done():
a.logger.Debugf("Reconnect context for topic %s is done", req.Topic)
return
case <-time.After(2 * time.Minute):
attempts := readAttemptsStale()
if attempts < maxReconnAttempts {
reconnAttempts <- struct{}{}
}
a.logger.Debugf("Number of reconnect attempts remaining for topic %s: %d", req.Topic, attempts)
}
}
}()
return nil
}
// Lock renewal loop
go func() {
if !a.lockRenewalEnabled() {
a.logger.Debugf("lock renewal disabled")
return
}
// Reconnect loop
for {
time.Sleep(time.Second * time.Duration(a.metadata.LockRenewalInSec))
if len(a.activeMessages) == 0 {
a.logger.Debugf("No active messages to renew lock for")
continue
}
msgs := make([]*azservicebus.Message, 0)
a.mu.RLock()
for _, m := range a.activeMessages {
msgs = append(msgs, m)
}
a.mu.RUnlock()
a.logger.Debugf("Renewing %d active message lock(s)", len(msgs))
err := sub.RenewLocks(context.Background(), msgs...)
topic, err := a.namespace.NewTopic(req.Topic)
if err != nil {
a.logger.Errorf("%s error renewing active message lock(s) for topic %s, ", errorMessagePrefix, topic, err)
a.logger.Errorf("%s could not instantiate topic %s, %s", errorMessagePrefix, req.Topic, err)
return
}
var opts []azservicebus.SubscriptionOption
if a.metadata.PrefetchCount != nil {
opts = append(opts, azservicebus.SubscriptionWithPrefetchCount(uint32(*a.metadata.PrefetchCount)))
}
subEntity, err := topic.NewSubscription(subID, opts...)
if err != nil {
a.logger.Errorf("%s could not instantiate subscription %s for topic %s", errorMessagePrefix, subID, req.Topic)
return
}
sub := newSubscription(req.Topic, subEntity, a.metadata.MaxConcurrentHandlers, a.logger)
// ReceiveAndBlock will only return with an error
// that it cannot handle internally. The subscription
// connection is closed when this method returns.
// If that occurs, we will log the error and attempt
// to re-establish the subscription connection until
// we exhaust the number of reconnect attempts.
ctx, cancel := context.WithCancel(context.Background())
innerErr := sub.ReceiveAndBlock(ctx,
appHandler,
a.metadata.LockRenewalInSec,
a.metadata.HandlerTimeoutInSec,
a.metadata.TimeoutInSec,
a.metadata.MaxActiveMessages,
a.metadata.MaxActiveMessagesRecoveryInSec)
if innerErr != nil {
a.logger.Error(innerErr)
}
cancel() // Cancel receive context
attempts := readAttemptsStale()
if attempts == 0 {
a.logger.Errorf("Subscription to topic %s lost connection, unable to recover after %d attempts", sub.topic, maxReconnAttempts)
return
}
a.logger.Warnf("Subscription to topic %s lost connection, attempting to reconnect... [%d/%d]", sub.topic, maxReconnAttempts-attempts, maxReconnAttempts)
time.Sleep(time.Second * connectionRecoveryInSec)
<-reconnAttempts
}
}()
// Receiver loop
for {
// If we have too many active messages don't receive any more for a while.
if len(a.activeMessages) < a.metadata.MaxActiveMessages {
a.logger.Debugf("Waiting to receive message from topic")
if err := sub.ReceiveOne(context.Background(), asyncAsbHandler); err != nil {
a.logger.Errorf("%s error receiving from topic %s, %s", errorMessagePrefix, topic, err)
// Must close to reset sub's receiver
if err := sub.Close(context.Background()); err != nil {
a.logger.Errorf("%s error closing subscription to topic %s, %s", errorMessagePrefix, topic, err)
return
}
}
} else {
// Sleep to allow the current active messages to be processed before getting more.
a.logger.Debugf("Max active messages %d reached, recovering for %d seconds", a.metadata.MaxActiveMessages, a.metadata.MaxActiveMessagesRecoveryInSec)
time.Sleep(time.Second * time.Duration(a.metadata.MaxActiveMessagesRecoveryInSec))
}
}
}
func (a *azureServiceBus) lockRenewalEnabled() bool {
return a.metadata.LockRenewalInSec > 0
}
func (a *azureServiceBus) abandonMessage(ctx context.Context, m *azservicebus.Message) error {
a.removeActiveMessage(m.ID)
a.logger.Debugf("Abandoning message %s", m.ID)
return m.Abandon(ctx)
}
func (a *azureServiceBus) completeMessage(ctx context.Context, m *azservicebus.Message) error {
a.removeActiveMessage(m.ID)
a.logger.Debugf("Completing message %s", m.ID)
return m.Complete(ctx)
}
func (a *azureServiceBus) addActiveMessage(m *azservicebus.Message) {
a.logger.Debugf("Adding message %s to active messages", m.ID)
a.mu.Lock()
a.activeMessages[m.ID] = m
a.mu.Unlock()
}
func (a *azureServiceBus) removeActiveMessage(messageID string) {
a.logger.Debugf("Removing message %s from active messages", messageID)
a.mu.Lock()
delete(a.activeMessages, messageID)
a.mu.Unlock()
return nil
}
func (a *azureServiceBus) ensureTopic(topic string) error {

View File

@ -0,0 +1,231 @@
package servicebus
import (
"context"
"fmt"
"sync"
"time"
azservicebus "github.com/Azure/azure-service-bus-go"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/dapr/pkg/logger"
)
type subscription struct {
topic string
mu sync.RWMutex
activeMessages map[string]*azservicebus.Message
entity *azservicebus.Subscription
limitConcurrentHandlers bool
handlerChan chan handler
logger logger.Logger
}
func newSubscription(topic string, sub *azservicebus.Subscription, maxConcurrentHandlers *int, logger logger.Logger) *subscription {
s := &subscription{
topic: topic,
activeMessages: make(map[string]*azservicebus.Message),
entity: sub,
logger: logger,
}
if maxConcurrentHandlers != nil {
s.logger.Debugf("Subscription to topic %s is limited to %d message handler(s)", topic, *maxConcurrentHandlers)
s.limitConcurrentHandlers = true
s.handlerChan = make(chan handler, *maxConcurrentHandlers)
for i := 0; i < *maxConcurrentHandlers; i++ {
s.handlerChan <- handler{}
}
}
return s
}
// ReceiveAndBlock is a blocking call to receive messages on an Azure Service Bus subscription from a topic
func (s *subscription) ReceiveAndBlock(ctx context.Context, appHandler func(msg *pubsub.NewMessage) error, lockRenewalInSec int, handlerTimeoutInSec int, timeoutInSec int, maxActiveMessages int, maxActiveMessagesRecoveryInSec int) error {
// Close subscription
defer func() {
closeCtx, closeCancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeoutInSec))
defer closeCancel()
s.close(closeCtx)
}()
// Lock renewal loop
go func() {
shouldRenewLocks := lockRenewalInSec > 0
if !shouldRenewLocks {
s.logger.Debugf("Lock renewal for topic %s disabled", s.topic)
return
}
for {
select {
case <-ctx.Done():
s.logger.Debugf("Lock renewal context for topic %s done", s.topic)
return
case <-time.After(time.Second * time.Duration(lockRenewalInSec)):
s.tryRenewLocks()
}
}
}()
asyncHandler := s.asyncWrapper(s.getHandlerFunc(appHandler, handlerTimeoutInSec, timeoutInSec))
// Receiver loop
for {
s.mu.RLock()
activeMessageLen := len(s.activeMessages)
s.mu.RUnlock()
if activeMessageLen >= maxActiveMessages {
// Max active messages reached, sleep to allow the current active messages to be processed before getting more
s.logger.Debugf("Max active messages %d reached for topic %s, recovering for %d seconds", maxActiveMessages, s.topic, maxActiveMessagesRecoveryInSec)
select {
case <-ctx.Done():
s.logger.Debugf("Receive context for topic %s done", s.topic)
return ctx.Err()
case <-time.After(time.Second * time.Duration(maxActiveMessagesRecoveryInSec)):
continue
}
}
if err := s.receiveMessage(ctx, asyncHandler); err != nil {
return err
}
}
}
func (s *subscription) close(ctx context.Context) {
s.logger.Debugf("Closing subscription to topic %s", s.topic)
// Ensure subscription entity is closed
if err := s.entity.Close(ctx); err != nil {
s.logger.Errorf("%s closing subscription entity for topic %s: %+v", errorMessagePrefix, s.topic, err)
}
}
func (s *subscription) getHandlerFunc(appHandler func(msg *pubsub.NewMessage) error, handlerTimeoutInSec int, timeoutInSec int) azservicebus.HandlerFunc {
return func(ctx context.Context, message *azservicebus.Message) error {
msg := &pubsub.NewMessage{
Data: message.Data,
Topic: s.topic,
}
// TODO(#1721): Context should be propogated to the app handler call for timeout and cancellation.
//
// handleCtx, handleCancel := context.WithTimeout(ctx, time.Second*time.Duration(handlerTimeoutInSec))
// defer handleCancel()
s.logger.Debugf("Calling app's handler for message %s on topic %s", message.ID, s.topic)
err := appHandler(msg)
// The context isn't handled downstream so we time out these
// operations here to avoid getting stuck waiting for a message
// to finalize (abandon/complete) if the connection has dropped.
errs := make(chan error, 1)
if err != nil {
s.logger.Warnf("Error in app's handler: %+v", err)
go func() {
errs <- s.abandonMessage(ctx, message)
}()
}
go func() {
errs <- s.completeMessage(ctx, message)
}()
select {
case err := <-errs:
return err
case <-time.After(time.Second * time.Duration(timeoutInSec)):
return fmt.Errorf("%s call to finalize message %s has timedout", errorMessagePrefix, message.ID)
case <-ctx.Done():
return ctx.Err()
}
}
}
func (s *subscription) asyncWrapper(handlerFunc azservicebus.HandlerFunc) azservicebus.HandlerFunc {
return func(ctx context.Context, msg *azservicebus.Message) error {
go func() {
s.addActiveMessage(msg)
defer s.removeActiveMessage(msg.ID)
if s.limitConcurrentHandlers {
s.logger.Debugf("Taking message handler for %s on topic %s", msg.ID, s.topic)
select {
case <-ctx.Done():
s.logger.Debugf("Message context done for %s on topic %s", msg.ID, s.topic)
return
case <-s.handlerChan: // Take or wait on a free handler before getting a new message
s.logger.Debugf("Taken message handler for %s on topic %s", msg.ID, s.topic)
}
defer func() {
s.logger.Debugf("Releasing message handler for %s on topic %s", msg.ID, s.topic)
s.handlerChan <- handler{} // Release a handler when complete
s.logger.Debugf("Released message handler for %s on topic %s", msg.ID, s.topic)
}()
}
err := handlerFunc(ctx, msg)
if err != nil {
s.logger.Errorf("%s error handling message %s on topic '%s', %s", errorMessagePrefix, msg.ID, s.topic, err)
}
}()
return nil
}
}
func (s *subscription) tryRenewLocks() {
s.mu.RLock()
activeMessageLen := len(s.activeMessages)
s.mu.RUnlock()
if activeMessageLen == 0 {
s.logger.Debugf("No active messages require lock renewal for topic %s", s.topic)
return
}
// Snapshot the messages to try to renew locks for
msgs := make([]*azservicebus.Message, 0)
s.mu.RLock()
for _, m := range s.activeMessages {
msgs = append(msgs, m)
}
s.mu.RUnlock()
// Lock renewal is best effort and not guaranteed to succeed, warnings are expected.
s.logger.Debugf("Trying to renew %d active message lock(s) for topic %s", len(msgs), s.topic)
err := s.entity.RenewLocks(context.Background(), msgs...)
if err != nil {
s.logger.Warnf("Couldn't renew all active message lock(s) for topic %s, ", s.topic, err)
}
}
func (s *subscription) receiveMessage(ctx context.Context, handler azservicebus.HandlerFunc) error {
s.logger.Debugf("Waiting to receive message on topic %s", s.topic)
if err := s.entity.ReceiveOne(ctx, handler); err != nil {
return fmt.Errorf("%s error receiving message on topic %s, %s", errorMessagePrefix, s.topic, err)
}
return nil
}
func (s *subscription) abandonMessage(ctx context.Context, m *azservicebus.Message) error {
s.logger.Debugf("Abandoning message %s on topic %s", m.ID, s.topic)
return m.Abandon(ctx)
}
func (s *subscription) completeMessage(ctx context.Context, m *azservicebus.Message) error {
s.logger.Debugf("Completing message %s on topic %s", m.ID, s.topic)
return m.Complete(ctx)
}
func (s *subscription) addActiveMessage(m *azservicebus.Message) {
s.logger.Debugf("Adding message %s to active messages on topic %s", m.ID, s.topic)
s.mu.Lock()
s.activeMessages[m.ID] = m
s.mu.Unlock()
}
func (s *subscription) removeActiveMessage(messageID string) {
s.logger.Debugf("Removing message %s from active messages on topic %s", messageID, s.topic)
s.mu.Lock()
delete(s.activeMessages, messageID)
s.mu.Unlock()
}