Use struct for NewSubscription to simplify parameters

Signed-off-by: Joni Collinge <jonathancollinge@live.com>
This commit is contained in:
Joni Collinge 2022-12-20 09:23:30 +00:00
parent 86df9a07b3
commit b6911b67fd
No known key found for this signature in database
GPG Key ID: BF9B59005264DD95
7 changed files with 97 additions and 90 deletions

View File

@ -110,18 +110,16 @@ func (a *AzureServiceBusQueues) Read(subscribeCtx context.Context, handler bindi
go func() {
// Reconnect loop.
for {
sub := impl.NewSubscription(
subscribeCtx,
a.metadata.MaxActiveMessages,
a.metadata.TimeoutInSec,
nil,
a.metadata.MaxRetriableErrorsPerSec,
a.metadata.MaxConcurrentHandlers,
"queue "+a.metadata.QueueName,
a.metadata.LockRenewalInSec,
false, // Sessions not supported for queues yet.
a.logger,
)
sub := impl.NewSubscription(subscribeCtx, impl.SubsriptionOptions{
MaxActiveMessages: a.metadata.MaxActiveMessages,
TimeoutInSec: a.metadata.TimeoutInSec,
MaxBulkSubCount: nil,
MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec,
MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers,
Entity: "queue " + a.metadata.QueueName,
LockRenewalInSec: a.metadata.LockRenewalInSec,
RequireSessions: false, // Sessions not supported for queues yet.
}, a.logger)
// Blocks until a successful connection (or until context is canceled)
receiver, err := sub.Connect(func() (impl.Receiver, error) {

View File

@ -193,14 +193,14 @@ func (c *Client) EnsureTopic(ctx context.Context, topic string) error {
return nil
}
type SubscriptionOpts struct {
type SubscribeOptions struct {
RequireSessions bool
MaxConcurrentSesions int
}
// EnsureSubscription creates the topic subscription if it doesn't exist.
// Returns with nil error if the admin client doesn't exist.
func (c *Client) EnsureSubscription(ctx context.Context, name string, topic string, opts SubscriptionOpts) error {
func (c *Client) EnsureSubscription(ctx context.Context, name string, topic string, opts SubscribeOptions) error {
if c.adminClient == nil {
return nil
}
@ -273,7 +273,7 @@ func (c *Client) createTopic(parentCtx context.Context, topic string) error {
return nil
}
func (c *Client) shouldCreateSubscription(parentCtx context.Context, topic, subscription string, opts SubscriptionOpts) (bool, error) {
func (c *Client) shouldCreateSubscription(parentCtx context.Context, topic, subscription string, opts SubscribeOptions) (bool, error) {
ctx, cancel := context.WithTimeout(parentCtx, time.Second*time.Duration(c.metadata.TimeoutInSec))
defer cancel()
@ -300,7 +300,7 @@ func (c *Client) shouldCreateSubscription(parentCtx context.Context, topic, subs
return false, nil
}
func (c *Client) createSubscription(parentCtx context.Context, topic, subscription string, opts SubscriptionOpts) error {
func (c *Client) createSubscription(parentCtx context.Context, topic, subscription string, opts SubscribeOptions) error {
ctx, cancel := context.WithTimeout(parentCtx, time.Second*time.Duration(c.metadata.TimeoutInSec))
defer cancel()

View File

@ -310,7 +310,7 @@ func ParseMetadata(md map[string]string, logger logger.Logger, mode byte) (m *Me
}
// CreateSubscriptionProperties returns the SubscriptionProperties object to create new Subscriptions to Service Bus topics.
func (a Metadata) CreateSubscriptionProperties(opts SubscriptionOpts) *sbadmin.SubscriptionProperties {
func (a Metadata) CreateSubscriptionProperties(opts SubscribeOptions) *sbadmin.SubscriptionProperties {
properties := &sbadmin.SubscriptionProperties{}
if a.MaxDeliveryCount != nil {

View File

@ -53,60 +53,64 @@ type Subscription struct {
cancel context.CancelFunc
}
type SubsriptionOptions struct {
MaxActiveMessages int
TimeoutInSec int
MaxBulkSubCount *int
MaxRetriableEPS int
MaxConcurrentHandlers int
Entity string
LockRenewalInSec int
RequireSessions bool
}
// NewBulkSubscription returns a new Subscription object.
// Parameter "entity" is usually in the format "topic <topicname>" or "queue <queuename>" and it's only used for logging.
func NewSubscription(
parentCtx context.Context,
maxActiveMessages int,
timeoutInSec int,
maxBulkSubCount *int,
maxRetriableEPS int,
maxConcurrentHandlers int,
entity string,
lockRenewalInSec int,
requireSessions bool,
opts SubsriptionOptions,
logger logger.Logger,
) *Subscription {
ctx, cancel := context.WithCancel(parentCtx)
if maxBulkSubCount != nil {
if *maxBulkSubCount < 1 {
if opts.MaxBulkSubCount != nil {
if *opts.MaxBulkSubCount < 1 {
logger.Warnf("maxBulkSubCount must be greater than 0, setting it to 1")
maxBulkSubCount = ptr.Of(1)
opts.MaxBulkSubCount = ptr.Of(1)
}
} else {
// for non-bulk subscriptions, we only get one message at a time
maxBulkSubCount = ptr.Of(1)
opts.MaxBulkSubCount = ptr.Of(1)
}
if *maxBulkSubCount > maxActiveMessages {
logger.Warnf("maxBulkSubCount must not be greater than maxActiveMessages, setting it to %d", maxActiveMessages)
maxBulkSubCount = &maxActiveMessages
if *opts.MaxBulkSubCount > opts.MaxActiveMessages {
logger.Warnf("maxBulkSubCount must not be greater than maxActiveMessages, setting it to %d", opts.MaxActiveMessages)
opts.MaxBulkSubCount = &opts.MaxActiveMessages
}
s := &Subscription{
entity: entity,
entity: opts.Entity,
activeMessages: make(map[int64]*azservicebus.ReceivedMessage),
timeout: time.Duration(timeoutInSec) * time.Second,
maxBulkSubCount: *maxBulkSubCount,
requireSessions: requireSessions,
timeout: time.Duration(opts.TimeoutInSec) * time.Second,
maxBulkSubCount: *opts.MaxBulkSubCount,
requireSessions: opts.RequireSessions,
logger: logger,
ctx: ctx,
cancel: cancel,
// This is a pessimistic estimate of the number of total operations that can be active at any given time.
// In case of a non-bulk subscription, one operation is one message.
activeOperationsChan: make(chan struct{}, maxActiveMessages/(*maxBulkSubCount)),
activeOperationsChan: make(chan struct{}, opts.MaxActiveMessages/(*opts.MaxBulkSubCount)),
}
if maxRetriableEPS > 0 {
s.retriableErrLimit = ratelimit.New(maxRetriableEPS)
if opts.MaxRetriableEPS > 0 {
s.retriableErrLimit = ratelimit.New(opts.MaxRetriableEPS)
} else {
s.retriableErrLimit = ratelimit.NewUnlimited()
}
if maxConcurrentHandlers > 0 {
s.logger.Debugf("Subscription to %s is limited to %d message handler(s)", entity, maxConcurrentHandlers)
s.handleChan = make(chan struct{}, maxConcurrentHandlers)
if opts.MaxConcurrentHandlers > 0 {
s.logger.Debugf("Subscription to %s is limited to %d message handler(s)", opts.Entity, opts.MaxConcurrentHandlers)
s.handleChan = make(chan struct{}, opts.MaxConcurrentHandlers)
}
return s

View File

@ -68,15 +68,16 @@ func TestNewSubscription(t *testing.T) {
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
sub := NewSubscription(
context.Background(),
1000,
1,
tc.maxBulkSubCountParam,
10,
100,
"test",
30,
tc.requireSessionsParam,
context.Background(), SubsriptionOptions{
MaxActiveMessages: 1000,
TimeoutInSec: 1,
MaxBulkSubCount: tc.maxBulkSubCountParam,
MaxRetriableEPS: 10,
MaxConcurrentHandlers: 100,
Entity: "test",
LockRenewalInSec: 30,
RequireSessions: tc.requireSessionsParam,
},
logger.NewLogger("test"),
)
if sub.maxBulkSubCount != tc.maxBulkSubCountExpected {

View File

@ -179,15 +179,16 @@ func (a *azureServiceBus) BulkPublish(ctx context.Context, req *pubsub.BulkPubli
func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
sub := impl.NewSubscription(
subscribeCtx,
a.metadata.MaxActiveMessages,
a.metadata.TimeoutInSec,
nil,
a.metadata.MaxRetriableErrorsPerSec,
a.metadata.MaxConcurrentHandlers,
"queue "+req.Topic,
a.metadata.LockRenewalInSec,
false,
subscribeCtx, impl.SubsriptionOptions{
MaxActiveMessages: a.metadata.MaxActiveMessages,
TimeoutInSec: a.metadata.TimeoutInSec,
MaxBulkSubCount: nil,
MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec,
MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers,
Entity: "queue " + req.Topic,
LockRenewalInSec: a.metadata.LockRenewalInSec,
RequireSessions: false,
},
a.logger,
)
@ -208,15 +209,16 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub
func (a *azureServiceBus) BulkSubscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.BulkHandler) error {
maxBulkSubCount := utils.GetElemOrDefaultFromMap(req.Metadata, contribMetadata.MaxBulkSubCountKey, defaultMaxBulkSubCount)
sub := impl.NewSubscription(
subscribeCtx,
a.metadata.MaxActiveMessages,
a.metadata.TimeoutInSec,
&maxBulkSubCount,
a.metadata.MaxRetriableErrorsPerSec,
a.metadata.MaxConcurrentHandlers,
"queue "+req.Topic,
a.metadata.LockRenewalInSec,
false,
subscribeCtx, impl.SubsriptionOptions{
MaxActiveMessages: a.metadata.MaxActiveMessages,
TimeoutInSec: a.metadata.TimeoutInSec,
MaxBulkSubCount: &maxBulkSubCount,
MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec,
MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers,
Entity: "queue " + req.Topic,
LockRenewalInSec: a.metadata.LockRenewalInSec,
RequireSessions: false,
},
a.logger,
)

View File

@ -190,15 +190,16 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub
maxConcurrentSessions := utils.GetElemOrDefaultFromMap(req.Metadata, maxConcurrentSessionsMetadataKey, defaultMaxConcurrentSessions)
sub := impl.NewSubscription(
subscribeCtx,
a.metadata.MaxActiveMessages,
a.metadata.TimeoutInSec,
nil,
a.metadata.MaxRetriableErrorsPerSec,
a.metadata.MaxConcurrentHandlers,
"topic "+req.Topic,
a.metadata.LockRenewalInSec,
requireSessions,
subscribeCtx, impl.SubsriptionOptions{
MaxActiveMessages: a.metadata.MaxActiveMessages,
TimeoutInSec: a.metadata.TimeoutInSec,
MaxBulkSubCount: nil,
MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec,
MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers,
Entity: "topic " + req.Topic,
LockRenewalInSec: a.metadata.LockRenewalInSec,
RequireSessions: requireSessions,
},
a.logger,
)
@ -214,7 +215,7 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub
)
}
return a.doSubscribe(subscribeCtx, req, sub, receiveAndBlockFn, impl.SubscriptionOpts{
return a.doSubscribe(subscribeCtx, req, sub, receiveAndBlockFn, impl.SubscribeOptions{
RequireSessions: requireSessions,
MaxConcurrentSesions: maxConcurrentSessions,
})
@ -230,15 +231,16 @@ func (a *azureServiceBus) BulkSubscribe(subscribeCtx context.Context, req pubsub
maxBulkSubCount := utils.GetElemOrDefaultFromMap(req.Metadata, contribMetadata.MaxBulkSubCountKey, defaultMaxBulkSubCount)
sub := impl.NewSubscription(
subscribeCtx,
a.metadata.MaxActiveMessages,
a.metadata.TimeoutInSec,
&maxBulkSubCount,
a.metadata.MaxRetriableErrorsPerSec,
a.metadata.MaxConcurrentHandlers,
"topic "+req.Topic,
a.metadata.LockRenewalInSec,
requireSessions,
subscribeCtx, impl.SubsriptionOptions{
MaxActiveMessages: a.metadata.MaxActiveMessages,
TimeoutInSec: a.metadata.TimeoutInSec,
MaxBulkSubCount: &maxBulkSubCount,
MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec,
MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers,
Entity: "topic " + req.Topic,
LockRenewalInSec: a.metadata.LockRenewalInSec,
RequireSessions: requireSessions,
},
a.logger,
)
@ -254,7 +256,7 @@ func (a *azureServiceBus) BulkSubscribe(subscribeCtx context.Context, req pubsub
)
}
return a.doSubscribe(subscribeCtx, req, sub, receiveAndBlockFn, impl.SubscriptionOpts{
return a.doSubscribe(subscribeCtx, req, sub, receiveAndBlockFn, impl.SubscribeOptions{
RequireSessions: requireSessions,
MaxConcurrentSesions: maxConcurrentSessions,
})
@ -263,7 +265,7 @@ func (a *azureServiceBus) BulkSubscribe(subscribeCtx context.Context, req pubsub
// doSubscribe is a helper function that handles the common logic for both Subscribe and BulkSubscribe.
// The receiveAndBlockFn is a function should invoke a blocking call to receive messages from the topic.
func (a *azureServiceBus) doSubscribe(subscribeCtx context.Context,
req pubsub.SubscribeRequest, sub *impl.Subscription, receiveAndBlockFn func(impl.Receiver, func()) error, opts impl.SubscriptionOpts,
req pubsub.SubscribeRequest, sub *impl.Subscription, receiveAndBlockFn func(impl.Receiver, func()) error, opts impl.SubscribeOptions,
) error {
// Does nothing if DisableEntityManagement is true
err := a.client.EnsureSubscription(subscribeCtx, a.metadata.ConsumerID, req.Topic, opts)