Merge branch 'master' into release-1.9-merge

This commit is contained in:
Alessandro (Ale) Segala 2022-10-05 13:47:22 -07:00 committed by GitHub
commit 55a282caee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1584 additions and 264 deletions

View File

@ -22,7 +22,7 @@ on:
pull_request: pull_request:
branches: branches:
- master - master
- release-* - 'release-*'
jobs: jobs:
# Based on whether this is a PR or a scheduled run, we will run a different # Based on whether this is a PR or a scheduled run, we will run a different

View File

@ -16,6 +16,7 @@ package servicebusqueues
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"sync" "sync"
"time" "time"
@ -230,6 +231,7 @@ func (a *AzureServiceBusQueues) Read(subscribeCtx context.Context, handler bindi
err = sub.ReceiveAndBlock( err = sub.ReceiveAndBlock(
a.getHandlerFunc(handler), a.getHandlerFunc(handler),
a.metadata.LockRenewalInSec, a.metadata.LockRenewalInSec,
false, // Bulk is not supported here.
func() { func() {
// Reset the backoff when the subscription is successful and we have received the first message // Reset the backoff when the subscription is successful and we have received the first message
bo.Reset() bo.Reset()
@ -268,7 +270,12 @@ func (a *AzureServiceBusQueues) Read(subscribeCtx context.Context, handler bindi
} }
func (a *AzureServiceBusQueues) getHandlerFunc(handler bindings.Handler) impl.HandlerFunc { func (a *AzureServiceBusQueues) getHandlerFunc(handler bindings.Handler) impl.HandlerFunc {
return func(ctx context.Context, msg *servicebus.ReceivedMessage) error { return func(ctx context.Context, asbMsgs []*servicebus.ReceivedMessage) ([]impl.HandlerResponseItem, error) {
if len(asbMsgs) != 1 {
return nil, fmt.Errorf("expected 1 message, got %d", len(asbMsgs))
}
msg := asbMsgs[0]
metadata := make(map[string]string) metadata := make(map[string]string)
metadata[id] = msg.MessageID metadata[id] = msg.MessageID
if msg.CorrelationID != nil { if msg.CorrelationID != nil {
@ -289,7 +296,7 @@ func (a *AzureServiceBusQueues) getHandlerFunc(handler bindings.Handler) impl.Ha
Data: msg.Body, Data: msg.Body,
Metadata: metadata, Metadata: metadata,
}) })
return err return []impl.HandlerResponseItem{}, err
} }
} }

View File

@ -89,9 +89,12 @@ func (b *Binding) Read(ctx context.Context, handler bindings.Handler) error {
return nil return nil
} }
ah := adaptHandler(handler) handlerConfig := kafka.SubscriptionHandlerConfig{
IsBulkSubscribe: false,
Handler: adaptHandler(handler),
}
for _, t := range b.topics { for _, t := range b.topics {
b.kafka.AddTopicHandler(t, ah) b.kafka.AddTopicHandler(t, handlerConfig)
} }
go func() { go func() {

View File

@ -26,22 +26,29 @@ import (
"github.com/dapr/kit/retry" "github.com/dapr/kit/retry"
) )
// HandlerResponseItem represents a response from the handler for each message.
type HandlerResponseItem struct {
EntryId string //nolint:stylecheck
Error error
}
// HandlerFunc is the type for handlers that receive messages // HandlerFunc is the type for handlers that receive messages
type HandlerFunc func(ctx context.Context, msg *azservicebus.ReceivedMessage) error type HandlerFunc func(ctx context.Context, msgs []*azservicebus.ReceivedMessage) ([]HandlerResponseItem, error)
// Subscription is an object that manages a subscription to an Azure Service Bus receiver, for a topic or queue. // Subscription is an object that manages a subscription to an Azure Service Bus receiver, for a topic or queue.
type Subscription struct { type Subscription struct {
entity string entity string
mu sync.RWMutex mu sync.RWMutex
activeMessages map[int64]*azservicebus.ReceivedMessage activeMessages map[int64]*azservicebus.ReceivedMessage
activeMessagesChan chan struct{} activeOperationsChan chan struct{}
receiver *azservicebus.Receiver receiver *azservicebus.Receiver
timeout time.Duration timeout time.Duration
retriableErrLimit ratelimit.Limiter maxBulkSubCount int
handleChan chan struct{} retriableErrLimit ratelimit.Limiter
logger logger.Logger handleChan chan struct{}
ctx context.Context logger logger.Logger
cancel context.CancelFunc ctx context.Context
cancel context.CancelFunc
} }
// NewSubscription returns a new Subscription object. // NewSubscription returns a new Subscription object.
@ -57,13 +64,14 @@ func NewSubscription(
) *Subscription { ) *Subscription {
ctx, cancel := context.WithCancel(parentCtx) ctx, cancel := context.WithCancel(parentCtx)
s := &Subscription{ s := &Subscription{
entity: entity, entity: entity,
activeMessages: make(map[int64]*azservicebus.ReceivedMessage), activeMessages: make(map[int64]*azservicebus.ReceivedMessage),
activeMessagesChan: make(chan struct{}, maxActiveMessages), activeOperationsChan: make(chan struct{}, maxActiveMessages), // In case of a non-bulk subscription, one operation is one message.
timeout: time.Duration(timeoutInSec) * time.Second, timeout: time.Duration(timeoutInSec) * time.Second,
logger: logger, maxBulkSubCount: 1, // for non-bulk subscriptions, we only get one message at a time
ctx: ctx, logger: logger,
cancel: cancel, ctx: ctx,
cancel: cancel,
} }
if maxRetriableEPS > 0 { if maxRetriableEPS > 0 {
@ -80,6 +88,56 @@ func NewSubscription(
return s return s
} }
// NewBulkSubscription returns a new Subscription object with bulk support.
// Parameter "entity" is usually in the format "topic <topicname>" or "queue <queuename>" and it's only used for logging.
func NewBulkSubscription(
parentCtx context.Context,
maxActiveMessages int,
timeoutInSec int,
maxBulkSubCount int,
maxRetriableEPS int,
maxConcurrentHandlers *int,
entity string,
logger logger.Logger,
) *Subscription {
ctx, cancel := context.WithCancel(parentCtx)
s := &Subscription{
entity: entity,
activeMessages: make(map[int64]*azservicebus.ReceivedMessage),
timeout: time.Duration(timeoutInSec) * time.Second,
maxBulkSubCount: maxBulkSubCount,
logger: logger,
ctx: ctx,
cancel: cancel,
}
if maxRetriableEPS > 0 {
s.retriableErrLimit = ratelimit.New(maxRetriableEPS)
} else {
s.retriableErrLimit = ratelimit.NewUnlimited()
}
if maxBulkSubCount < 1 {
s.logger.Warnf("maxBulkSubCount must be greater than 0, setting it to 1")
s.maxBulkSubCount = 1
}
if maxBulkSubCount > maxActiveMessages {
s.logger.Warnf("maxBulkSubCount must not be greater than maxActiveMessages, setting it to %d", maxActiveMessages)
s.maxBulkSubCount = maxActiveMessages
}
// This is a pessimistic estimate of the number of total operations that can be active at any given time.
s.activeOperationsChan = make(chan struct{}, maxActiveMessages/s.maxBulkSubCount)
if maxConcurrentHandlers != nil {
s.logger.Debugf("Subscription to %s is limited to %d message handler(s)", entity, *maxConcurrentHandlers)
s.handleChan = make(chan struct{}, *maxConcurrentHandlers)
}
return s
}
// Connect to a Service Bus topic or queue, blocking until it succeeds; it can retry forever (until the context is canceled). // Connect to a Service Bus topic or queue, blocking until it succeeds; it can retry forever (until the context is canceled).
func (s *Subscription) Connect(newReceiverFunc func() (*azservicebus.Receiver, error)) error { func (s *Subscription) Connect(newReceiverFunc func() (*azservicebus.Receiver, error)) error {
// Connections need to retry forever with a maximum backoff of 5 minutes and exponential scaling. // Connections need to retry forever with a maximum backoff of 5 minutes and exponential scaling.
@ -111,7 +169,7 @@ func (s *Subscription) Connect(newReceiverFunc func() (*azservicebus.Receiver, e
} }
// ReceiveAndBlock is a blocking call to receive messages on an Azure Service Bus subscription from a topic or queue. // ReceiveAndBlock is a blocking call to receive messages on an Azure Service Bus subscription from a topic or queue.
func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int, onFirstSuccess func()) error { func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int, bulkEnabled bool, onFirstSuccess func()) error {
ctx, cancel := context.WithCancel(s.ctx) ctx, cancel := context.WithCancel(s.ctx)
defer cancel() defer cancel()
@ -138,9 +196,9 @@ func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int
// Receiver loop // Receiver loop
for { for {
select { select {
// This blocks if there are too many active messages already // This blocks if there are too many active operations already
// This is released by the handler, but if the loop ends before it reaches the handler, make sure to release it with `<-s.activeMessagesChan` // This is released by the handler, but if the loop ends before it reaches the handler, make sure to release it with `<-s.activeOperationsChan`
case s.activeMessagesChan <- struct{}{}: case s.activeOperationsChan <- struct{}{}:
// Return if context is canceled // Return if context is canceled
case <-ctx.Done(): case <-ctx.Done():
s.logger.Debugf("Receive context for %s done", s.entity) s.logger.Debugf("Receive context for %s done", s.entity)
@ -148,12 +206,12 @@ func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int
} }
// This method blocks until we get a message or the context is canceled // This method blocks until we get a message or the context is canceled
msgs, err := s.receiver.ReceiveMessages(s.ctx, 1, nil) msgs, err := s.receiver.ReceiveMessages(s.ctx, s.maxBulkSubCount, nil)
if err != nil { if err != nil {
if err != context.Canceled { if err != context.Canceled {
s.logger.Errorf("Error reading from %s. %s", s.entity, err.Error()) s.logger.Errorf("Error reading from %s. %s", s.entity, err.Error())
} }
<-s.activeMessagesChan <-s.activeOperationsChan
// Return the error. This will cause the Service Bus component to try and reconnect. // Return the error. This will cause the Service Bus component to try and reconnect.
return err return err
} }
@ -168,28 +226,90 @@ func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int
if l == 0 { if l == 0 {
// We got no message, which is unusual too // We got no message, which is unusual too
s.logger.Warn("Received 0 messages from Service Bus") s.logger.Warn("Received 0 messages from Service Bus")
<-s.activeMessagesChan <-s.activeOperationsChan
continue continue
} else if l > 1 { } else if l > 1 && !bulkEnabled {
// We are requesting one message only; this should never happen // We are requesting one message only; this should never happen
s.logger.Errorf("Expected one message from Service Bus, but received %d", l) s.logger.Errorf("Expected one message from Service Bus, but received %d", l)
} }
msg := msgs[0] s.logger.Debugf("Received messages: %d; current active operations usage: %d/%d", l, len(s.activeOperationsChan), cap(s.activeOperationsChan))
s.logger.Debugf("Received message: %s; current active message usage: %d/%d", msg.MessageID, len(s.activeMessagesChan), cap(s.activeMessagesChan))
// s.logger.Debugf("Message body: %s", string(msg.Body))
if err = s.addActiveMessage(msg); err != nil { skipProcessing := false
// If we cannot add the message then sequence number is not set, this must for _, msg := range msgs {
// be a bug in the Azure Service Bus SDK so we will log the error and not if err = s.addActiveMessage(msg); err != nil {
// handle the message. The message will eventually be retried until fixed. // If we cannot add the message then sequence number is not set, this must
s.logger.Errorf("Error adding message: %s", err.Error()) // be a bug in the Azure Service Bus SDK so we will log the error and not
<-s.activeMessagesChan // handle the message. The message will eventually be retried until fixed.
s.logger.Errorf("Error adding message: %s", err.Error())
skipProcessing = true
break
}
s.logger.Debugf("Processing received message: %s", msg.MessageID)
}
if skipProcessing {
<-s.activeOperationsChan
continue continue
} }
s.logger.Debugf("Processing received message: %s", msg.MessageID) runHandlerFn := func(hctx context.Context) {
s.handleAsync(s.ctx, msg, handler) msg := msgs[0]
// Invoke the handler to process the message
_, err = handler(hctx, msgs)
// This context is used for the calls to service bus to finalize (i.e. complete/abandon) the message.
// If we fail to finalize the message, this message will eventually be reprocessed (at-least once delivery).
// This uses a background context in case ctx has been canceled already.
finalizeCtx, finalizeCancel := context.WithTimeout(context.Background(), s.timeout)
defer finalizeCancel()
if err != nil {
// Log the error only, as we're running asynchronously
s.logger.Errorf("App handler returned an error for message %s on %s: %s", msg.MessageID, s.entity, err)
s.AbandonMessage(finalizeCtx, msg)
return
}
s.CompleteMessage(finalizeCtx, msg)
}
bulkRunHandlerFunc := func(hctx context.Context) {
resps, err := handler(hctx, msgs)
// This context is used for the calls to service bus to finalize (i.e. complete/abandon) the message.
// If we fail to finalize the message, this message will eventually be reprocessed (at-least once delivery).
// This uses a background context in case ctx has been canceled already.
finalizeCtx, finalizeCancel := context.WithTimeout(context.Background(), s.timeout)
defer finalizeCancel()
if err != nil {
// Handle the error and mark messages accordingly.
// Note, the order of the responses match the order of the messages.
for i, resp := range resps {
if resp.Error != nil {
// Log the error only, as we're running asynchronously.
s.logger.Errorf("App handler returned an error for message %s on %s: %s", msgs[i].MessageID, s.entity, resp.Error)
s.AbandonMessage(finalizeCtx, msgs[i])
} else {
s.CompleteMessage(finalizeCtx, msgs[i])
}
}
return
}
// No error, so we can complete all messages.
for _, msg := range msgs {
s.CompleteMessage(finalizeCtx, msg)
}
}
if bulkEnabled {
s.handleAsync(s.ctx, msgs, bulkRunHandlerFunc)
} else {
s.handleAsync(s.ctx, msgs, runHandlerFn)
}
} }
} }
@ -204,69 +324,60 @@ func (s *Subscription) Close(closeCtx context.Context) {
s.cancel() s.cancel()
} }
func (s *Subscription) handleAsync(ctx context.Context, msg *azservicebus.ReceivedMessage, handler HandlerFunc) { // handleAsync handles messages from azure service bus asynchronously.
// runHandlerFn is responsible for calling the message handler function
// and marking messages as complete/abandon.
func (s *Subscription) handleAsync(ctx context.Context, msgs []*azservicebus.ReceivedMessage, runHandlerFn func(ctx context.Context)) {
go func() { go func() {
var ( var (
consumeToken bool consumeToken bool
takenConcurrentHandler bool takenConcurrentHandler bool
err error
) )
defer func() { defer func() {
// Release a handler if needed for _, msg := range msgs {
if takenConcurrentHandler { // Release a handler if needed
<-s.handleChan if takenConcurrentHandler {
s.logger.Debugf("Released message handle for %s on %s", msg.MessageID, s.entity) <-s.handleChan
s.logger.Debugf("Released message handle for %s on %s", msg.MessageID, s.entity)
}
// If we got a retriable error (app handler returned a retriable error, or a network error while connecting to the app, etc) consume a retriable error token
// We do it here, after the handler has been released but before removing the active message (which would allow us to retrieve more messages)
if consumeToken {
s.logger.Debugf("Taking a retriable error token")
before := time.Now()
_ = s.retriableErrLimit.Take()
s.logger.Debugf("Resumed after pausing for %v", time.Since(before))
}
// Remove the message from the map of active ones
s.removeActiveMessage(msg.MessageID, *msg.SequenceNumber)
} }
// If we got a retriable error (app handler returned a retriable error, or a network error while connecting to the app, etc) consume a retriable error token // Remove an entry from activeOperationsChan to allow processing more messages
// We do it here, after the handler has been released but before removing the active message (which would allow us to retrieve more messages) <-s.activeOperationsChan
if consumeToken {
s.logger.Debugf("Taking a retriable error token")
before := time.Now()
_ = s.retriableErrLimit.Take()
s.logger.Debugf("Resumed after pausing for %v", time.Now().Sub(before))
}
// Remove the message from the map of active ones
s.removeActiveMessage(msg.MessageID, *msg.SequenceNumber)
// Remove an entry from activeMessageChan to allow processing more messages
<-s.activeMessagesChan
}() }()
// If handleChan is non-nil, we have a limit on how many handler we can process for _, msg := range msgs {
if cap(s.handleChan) > 0 { // If handleChan is non-nil, we have a limit on how many handler we can process
s.logger.Debugf("Taking message handle for %s on %s", msg.MessageID, s.entity) if cap(s.handleChan) > 0 {
select { s.logger.Debugf("Taking message handle for %s on %s", msg.MessageID, s.entity)
// Context is done, so we will stop waiting select {
case <-ctx.Done(): // Context is done, so we will stop waiting
s.logger.Debugf("Message context done for %s on %s", msg.MessageID, s.entity) case <-ctx.Done():
return s.logger.Debugf("Message context done for %s on %s", msg.MessageID, s.entity)
// Blocks until we have a handler available return
case s.handleChan <- struct{}{}: // Blocks until we have a handler available
takenConcurrentHandler = true case s.handleChan <- struct{}{}:
s.logger.Debugf("Taken message handle for %s on %s", msg.MessageID, s.entity) takenConcurrentHandler = true
s.logger.Debugf("Taken message handle for %s on %s", msg.MessageID, s.entity)
}
} }
} }
// Invoke the handler to process the message // Invoke the handler to process the message.
err = handler(ctx, msg) runHandlerFn(ctx)
// This context is used for the calls to service bus to finalize (i.e. complete/abandon) the message.
// If we fail to finalize the message, this message will eventually be reprocessed (at-least once delivery).
// This uses a background context in case ctx has been canceled already.
finalizeCtx, finalizeCancel := context.WithTimeout(context.Background(), s.timeout)
defer finalizeCancel()
if err != nil {
// Log the error only, as we're running asynchronously
s.logger.Errorf("App handler returned an error for message %s on %s: %s", msg.MessageID, s.entity, err)
s.AbandonMessage(finalizeCtx, msg)
return
}
s.CompleteMessage(finalizeCtx, msg)
}() }()
} }

View File

@ -0,0 +1,63 @@
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package servicebus
import (
"context"
"testing"
"github.com/dapr/kit/logger"
)
var maxConcurrentHandlers = 100
func TestNewBulkSubscription_MaxBulkSubCountShouldBeGreaterThanZero(t *testing.T) {
testcases := []struct {
name string
maxBulkSubCountParam int
maxBulkSubCountExpected int
}{
{
"maxBulkSubCount passed is 0",
0,
1,
},
{
"maxBulkSubCount passed is negative",
-100,
1,
},
{
"maxBulkSubCount passed is positive",
100,
100,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
bulkSubscription := NewBulkSubscription(
context.Background(),
1000,
1,
tc.maxBulkSubCountParam,
10,
&maxConcurrentHandlers,
"test",
logger.NewLogger("test"))
if bulkSubscription.maxBulkSubCount != tc.maxBulkSubCountExpected {
t.Errorf("Expected maxBulkSubCount to be %d but got %d", tc.maxBulkSubCountExpected, bulkSubscription.maxBulkSubCount)
}
})
}
}

View File

@ -17,6 +17,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strconv"
"sync" "sync"
"time" "time"
@ -31,65 +32,167 @@ type consumer struct {
ready chan bool ready chan bool
running chan struct{} running chan struct{}
once sync.Once once sync.Once
mutex sync.Mutex
} }
func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
b := consumer.k.backOffConfig.NewBackOffWithContext(session.Context()) b := consumer.k.backOffConfig.NewBackOffWithContext(session.Context())
isBulkSubscribe := consumer.k.checkBulkSubscribe(claim.Topic())
for { handlerConfig, err := consumer.k.GetTopicHandlerConfig(claim.Topic())
select { if err != nil {
case message, ok := <-claim.Messages(): return fmt.Errorf("error getting bulk handler config for topic %s: %w", claim.Topic(), err)
if !ok { }
if isBulkSubscribe {
ticker := time.NewTicker(time.Duration(handlerConfig.SubscribeConfig.MaxBulkSubAwaitDurationMs) * time.Millisecond)
defer ticker.Stop()
messages := make([]*sarama.ConsumerMessage, 0, handlerConfig.SubscribeConfig.MaxBulkSubCount)
for {
select {
case <-session.Context().Done():
return consumer.flushBulkMessages(claim, messages, session, handlerConfig.BulkHandler, b)
case message := <-claim.Messages():
consumer.mutex.Lock()
if message != nil {
messages = append(messages, message)
if len(messages) >= handlerConfig.SubscribeConfig.MaxBulkSubCount {
consumer.flushBulkMessages(claim, messages, session, handlerConfig.BulkHandler, b)
messages = messages[:0]
}
}
consumer.mutex.Unlock()
case <-ticker.C:
consumer.mutex.Lock()
consumer.flushBulkMessages(claim, messages, session, handlerConfig.BulkHandler, b)
messages = messages[:0]
consumer.mutex.Unlock()
}
}
} else {
for {
select {
case message, ok := <-claim.Messages():
if !ok {
return nil
}
if consumer.k.consumeRetryEnabled {
if err := retry.NotifyRecover(func() error {
return consumer.doCallback(session, message)
}, b, func(err error, d time.Duration) {
consumer.k.logger.Warnf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v. Retrying...", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
}, func() {
consumer.k.logger.Infof("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key))
}); err != nil {
consumer.k.logger.Errorf("Too many failed attempts at processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
}
} else {
err := consumer.doCallback(session, message)
if err != nil {
consumer.k.logger.Errorf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
}
}
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/Shopify/sarama/issues/1192
case <-session.Context().Done():
return nil return nil
} }
if consumer.k.consumeRetryEnabled {
if err := retry.NotifyRecover(func() error {
return consumer.doCallback(session, message)
}, b, func(err error, d time.Duration) {
consumer.k.logger.Errorf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v. Retrying...", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
}, func() {
consumer.k.logger.Infof("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key))
}); err != nil {
consumer.k.logger.Errorf("Too many failed attempts at processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
}
} else {
err := consumer.doCallback(session, message)
if err != nil {
consumer.k.logger.Errorf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
}
}
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/Shopify/sarama/issues/1192
case <-session.Context().Done():
return nil
} }
} }
} }
func (consumer *consumer) flushBulkMessages(claim sarama.ConsumerGroupClaim,
messages []*sarama.ConsumerMessage, session sarama.ConsumerGroupSession,
handler BulkEventHandler, b backoff.BackOff,
) error {
if len(messages) > 0 {
if consumer.k.consumeRetryEnabled {
if err := retry.NotifyRecover(func() error {
return consumer.doBulkCallback(session, messages, handler, claim.Topic())
}, b, func(err error, d time.Duration) {
consumer.k.logger.Warnf("Error processing Kafka bulk messages: %s. Error: %v. Retrying...", claim.Topic(), err)
}, func() {
consumer.k.logger.Infof("Successfully processed Kafka message after it previously failed: %s", claim.Topic())
}); err != nil {
consumer.k.logger.Errorf("Too many failed attempts at processing Kafka message: %s. Error: %v.", claim.Topic(), err)
}
} else {
err := consumer.doBulkCallback(session, messages, handler, claim.Topic())
if err != nil {
consumer.k.logger.Errorf("Error processing Kafka message: %s. Error: %v.", claim.Topic(), err)
}
return err
}
}
return nil
}
func (consumer *consumer) doBulkCallback(session sarama.ConsumerGroupSession,
messages []*sarama.ConsumerMessage, handler BulkEventHandler, topic string,
) error {
consumer.k.logger.Debugf("Processing Kafka bulk message: %s", topic)
messageValues := make([]KafkaBulkMessageEntry, (len(messages)))
for i, message := range messages {
if message != nil {
metadata := make(map[string]string, len(message.Headers))
if message.Headers != nil {
for _, t := range message.Headers {
metadata[string(t.Key)] = string(t.Value)
}
}
childMessage := KafkaBulkMessageEntry{
EntryId: strconv.Itoa(i),
Event: message.Value,
Metadata: metadata,
}
messageValues[i] = childMessage
}
}
event := KafkaBulkMessage{
Topic: topic,
Entries: messageValues,
}
responses, err := handler(session.Context(), &event)
if err != nil {
for i, resp := range responses {
// An extra check to confirm that runtime returned responses are in order
if resp.EntryId != messageValues[i].EntryId {
return errors.New("entry id mismatch while processing bulk messages")
}
if resp.Error != nil {
break
}
session.MarkMessage(messages[i], "")
}
} else {
for _, message := range messages {
session.MarkMessage(message, "")
}
}
return err
}
func (consumer *consumer) doCallback(session sarama.ConsumerGroupSession, message *sarama.ConsumerMessage) error { func (consumer *consumer) doCallback(session sarama.ConsumerGroupSession, message *sarama.ConsumerMessage) error {
consumer.k.logger.Debugf("Processing Kafka message: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key)) consumer.k.logger.Debugf("Processing Kafka message: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key))
handler, err := consumer.k.GetTopicHandler(message.Topic) handlerConfig, err := consumer.k.GetTopicHandlerConfig(message.Topic)
if err != nil { if err != nil {
return err return err
} }
if !handlerConfig.IsBulkSubscribe && handlerConfig.Handler == nil {
return errors.New("invalid handler config for subscribe call")
}
event := NewEvent{ event := NewEvent{
Topic: message.Topic, Topic: message.Topic,
Data: message.Value, Data: message.Value,
} }
// This is true only when headers are set (Kafka > 0.11)
if message.Headers != nil && len(message.Headers) > 0 { err = handlerConfig.Handler(session.Context(), &event)
event.Metadata = make(map[string]string, len(message.Headers))
for _, header := range message.Headers {
event.Metadata[string(header.Key)] = string(header.Value)
}
}
err = handler(session.Context(), &event)
if err == nil { if err == nil {
session.MarkMessage(message, "") session.MarkMessage(message, "")
} }
return err return err
} }
@ -105,10 +208,10 @@ func (consumer *consumer) Setup(sarama.ConsumerGroupSession) error {
return nil return nil
} }
// AddTopicHandler adds a topic handler // AddTopicHandler adds a handler and configuration for a topic
func (k *Kafka) AddTopicHandler(topic string, handler EventHandler) { func (k *Kafka) AddTopicHandler(topic string, handlerConfig SubscriptionHandlerConfig) {
k.subscribeLock.Lock() k.subscribeLock.Lock()
k.subscribeTopics[topic] = handler k.subscribeTopics[topic] = handlerConfig
k.subscribeLock.Unlock() k.subscribeLock.Unlock()
} }
@ -119,14 +222,26 @@ func (k *Kafka) RemoveTopicHandler(topic string) {
k.subscribeLock.Unlock() k.subscribeLock.Unlock()
} }
// GetTopicHandler returns the handler for a topic // checkBulkSubscribe checks if a bulk handler and config are correctly registered for provided topic
func (k *Kafka) GetTopicHandler(topic string) (EventHandler, error) { func (k *Kafka) checkBulkSubscribe(topic string) bool {
handler, ok := k.subscribeTopics[topic] if bulkHandlerConfig, ok := k.subscribeTopics[topic]; ok &&
if !ok || handler == nil { bulkHandlerConfig.IsBulkSubscribe &&
return nil, fmt.Errorf("handler for messages of topic %s not found", topic) bulkHandlerConfig.BulkHandler != nil && (bulkHandlerConfig.SubscribeConfig.MaxBulkSubCount > 0) &&
bulkHandlerConfig.SubscribeConfig.MaxBulkSubAwaitDurationMs > 0 {
return true
} }
return false
}
return handler, nil // GetTopicBulkHandler returns the handlerConfig for a topic
func (k *Kafka) GetTopicHandlerConfig(topic string) (SubscriptionHandlerConfig, error) {
handlerConfig, ok := k.subscribeTopics[topic]
if ok && ((handlerConfig.IsBulkSubscribe && handlerConfig.BulkHandler != nil) ||
(!handlerConfig.IsBulkSubscribe && handlerConfig.Handler != nil)) {
return handlerConfig, nil
}
return SubscriptionHandlerConfig{},
fmt.Errorf("any handler for messages of topic %s not found", topic)
} }
// Subscribe to topic in the Kafka cluster, in a background goroutine // Subscribe to topic in the Kafka cluster, in a background goroutine

View File

@ -20,6 +20,7 @@ import (
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger" "github.com/dapr/kit/logger"
"github.com/dapr/kit/retry" "github.com/dapr/kit/retry"
) )
@ -38,7 +39,7 @@ type Kafka struct {
cancel context.CancelFunc cancel context.CancelFunc
consumer consumer consumer consumer
config *sarama.Config config *sarama.Config
subscribeTopics TopicHandlers subscribeTopics TopicHandlerConfig
subscribeLock sync.Mutex subscribeLock sync.Mutex
backOffConfig retry.Config backOffConfig retry.Config
@ -53,7 +54,7 @@ type Kafka struct {
func NewKafka(logger logger.Logger) *Kafka { func NewKafka(logger logger.Logger) *Kafka {
return &Kafka{ return &Kafka{
logger: logger, logger: logger,
subscribeTopics: make(TopicHandlers), subscribeTopics: make(TopicHandlerConfig),
subscribeLock: sync.Mutex{}, subscribeLock: sync.Mutex{},
} }
} }
@ -146,6 +147,17 @@ func (k *Kafka) Close() (err error) {
// EventHandler is the handler used to handle the subscribed event. // EventHandler is the handler used to handle the subscribed event.
type EventHandler func(ctx context.Context, msg *NewEvent) error type EventHandler func(ctx context.Context, msg *NewEvent) error
// BulkEventHandler is the handler used to handle the subscribed bulk event.
type BulkEventHandler func(ctx context.Context, msg *KafkaBulkMessage) ([]pubsub.BulkSubscribeResponseEntry, error)
// SubscriptionHandlerConfig is the handler and configuration for subscription.
type SubscriptionHandlerConfig struct {
IsBulkSubscribe bool
SubscribeConfig pubsub.BulkSubscribeConfig
BulkHandler BulkEventHandler
Handler EventHandler
}
// NewEvent is an event arriving from a message bus instance. // NewEvent is an event arriving from a message bus instance.
type NewEvent struct { type NewEvent struct {
Data []byte `json:"data"` Data []byte `json:"data"`
@ -153,3 +165,18 @@ type NewEvent struct {
Metadata map[string]string `json:"metadata"` Metadata map[string]string `json:"metadata"`
ContentType *string `json:"contentType,omitempty"` ContentType *string `json:"contentType,omitempty"`
} }
// KafkaBulkMessage is a bulk event arriving from a message bus instance.
type KafkaBulkMessage struct {
Entries []KafkaBulkMessageEntry `json:"entries"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata"`
}
// KafkaBulkMessageEntry is an item contained inside bulk event arriving from a message bus instance.
type KafkaBulkMessageEntry struct {
EntryId string `json:"entryId"` //nolint:stylecheck
Event []byte `json:"event"`
ContentType string `json:"contentType,omitempty"`
Metadata map[string]string `json:"metadata"`
}

View File

@ -14,9 +14,12 @@ limitations under the License.
package kafka package kafka
import ( import (
"context"
"errors" "errors"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/dapr/components-contrib/pubsub"
) )
func getSyncProducer(config sarama.Config, brokers []string, maxMessageBytes int) (sarama.SyncProducer, error) { func getSyncProducer(config sarama.Config, brokers []string, maxMessageBytes int) (sarama.SyncProducer, error) {
@ -74,3 +77,96 @@ func (k *Kafka) Publish(topic string, data []byte, metadata map[string]string) e
return nil return nil
} }
func (k *Kafka) BulkPublish(_ context.Context, topic string, entries []pubsub.BulkMessageEntry, metadata map[string]string) (pubsub.BulkPublishResponse, error) {
if k.producer == nil {
err := errors.New("component is closed")
return pubsub.NewBulkPublishResponse(entries, pubsub.PublishFailed, err), err
}
k.logger.Debugf("Bulk Publishing on topic %v", topic)
msgs := []*sarama.ProducerMessage{}
for _, entry := range entries {
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(entry.Event),
}
// From Sarama documentation
// This field is used to hold arbitrary data you wish to include so it
// will be available when receiving on the Successes and Errors channels.
// Sarama completely ignores this field and is only to be used for
// pass-through data.
// This pass thorugh field is used for mapping errors, as seen in the mapKafkaProducerErrors method
// The EntryId will be unique for this request and the ProducerMessage is returned on the Errros channel,
// the metadata in that field is compared to the entry metadata to generate the right response on partial failures
msg.Metadata = entry.EntryId
for name, value := range metadata {
if name == key {
msg.Key = sarama.StringEncoder(value)
} else {
if msg.Headers == nil {
msg.Headers = make([]sarama.RecordHeader, 0, len(metadata))
}
msg.Headers = append(msg.Headers, sarama.RecordHeader{
Key: []byte(name),
Value: []byte(value),
})
}
}
msgs = append(msgs, msg)
}
if err := k.producer.SendMessages(msgs); err != nil {
// map the returned error to different entries
return k.mapKafkaProducerErrors(err, entries), err
}
return pubsub.NewBulkPublishResponse(entries, pubsub.PublishSucceeded, nil), nil
}
// mapKafkaProducerErrors to correct response statuses
func (k *Kafka) mapKafkaProducerErrors(err error, entries []pubsub.BulkMessageEntry) pubsub.BulkPublishResponse {
var pErrs sarama.ProducerErrors
if !errors.As(err, &pErrs) {
// Ideally this condition should not be executed, but in the scenario that the err is not of sarama.ProducerErrors type
// return a default error that all messages have failed
return pubsub.NewBulkPublishResponse(entries, pubsub.PublishFailed, err)
}
resp := pubsub.BulkPublishResponse{
Statuses: make([]pubsub.BulkPublishResponseEntry, 0, len(entries)),
}
// used in the case of the partial success scenario
alreadySeen := map[string]struct{}{}
for _, pErr := range pErrs {
if entryId, ok := pErr.Msg.Metadata.(string); ok { //nolint:stylecheck
alreadySeen[entryId] = struct{}{}
resp.Statuses = append(resp.Statuses, pubsub.BulkPublishResponseEntry{
Status: pubsub.PublishFailed,
EntryId: entryId,
Error: pErr.Err,
})
} else {
// Ideally this condition should not be executed, but in the scenario that the Metadata field
// is not of string type return a default error that all messages have failed
k.logger.Warnf("error parsing bulk errors from Kafka, returning default error response of all failed")
return pubsub.NewBulkPublishResponse(entries, pubsub.PublishFailed, err)
}
}
// Check if all the messages have failed
if len(pErrs) != len(entries) {
// This is a partial success scenario
for _, entry := range entries {
// Check if the entryId was not seen in the pErrs list
if _, ok := alreadySeen[entry.EntryId]; !ok {
// this is a message that has succeeded
resp.Statuses = append(resp.Statuses, pubsub.BulkPublishResponseEntry{
Status: pubsub.PublishSucceeded,
EntryId: entry.EntryId,
})
}
}
}
return resp
}

View File

@ -17,11 +17,21 @@ import (
"encoding/base64" "encoding/base64"
"encoding/pem" "encoding/pem"
"fmt" "fmt"
"strconv"
"strings" "strings"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
) )
const (
// DefaultMaxBulkSubCount is the default max bulk count for kafka pubsub component
// if the MaxBulkCountKey is not set in the metadata.
DefaultMaxBulkSubCount = 80
// DefaultMaxBulkSubAwaitDurationMs is the default max bulk await duration for kafka pubsub component
// if the MaxBulkAwaitDurationKey is not set in the metadata.
DefaultMaxBulkSubAwaitDurationMs = 10000
)
// asBase64String implements the `fmt.Stringer` interface in order to print // asBase64String implements the `fmt.Stringer` interface in order to print
// `[]byte` as a base 64 encoded string. // `[]byte` as a base 64 encoded string.
// It is used above to log the message key. The call to `EncodeToString` // It is used above to log the message key. The call to `EncodeToString`
@ -52,16 +62,27 @@ func isValidPEM(val string) bool {
return block != nil return block != nil
} }
// Map of topics and their handlers // TopicHandlerConfig is the map of topics and sruct containing handler and their config.
type TopicHandlers map[string]EventHandler type TopicHandlerConfig map[string]SubscriptionHandlerConfig
// TopicList returns the list of topics // // TopicList returns the list of topics
func (th TopicHandlers) TopicList() []string { func (tbh TopicHandlerConfig) TopicList() []string {
topics := make([]string, len(th)) topics := make([]string, len(tbh))
i := 0 i := 0
for topic := range th { for topic := range tbh {
topics[i] = topic topics[i] = topic
i++ i++
} }
return topics return topics
} }
// GetIntFromMetadata returns an int value from metadata OR default value if key not found or if its
// value not convertible to int.
func GetIntFromMetadata(metadata map[string]string, key string, defaultValue int) int {
if val, ok := metadata[key]; ok {
if intVal, err := strconv.Atoi(val); err == nil {
return intVal
}
}
return defaultValue
}

View File

@ -1,6 +1,20 @@
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package utils package utils
import ( import (
"strconv"
"strings" "strings"
) )
@ -14,3 +28,21 @@ func IsTruthy(val string) bool {
return false return false
} }
} }
// GetElemOrDefaultFromMap returns the value of a key from a map, or a default value
// if the key does not exist or the value is not of the expected type.
func GetElemOrDefaultFromMap[T int | uint64](m map[string]string, key string, def T) T {
if val, ok := m[key]; ok {
switch any(def).(type) {
case int:
if ival, err := strconv.ParseInt(val, 10, 64); err == nil {
return T(ival)
}
case uint64:
if uval, err := strconv.ParseUint(val, 10, 64); err == nil {
return T(uval)
}
}
}
return def
}

View File

@ -0,0 +1,98 @@
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package utils
import "testing"
func TestGetElemOrDefaultFromMap(t *testing.T) {
t.Run("test int", func(t *testing.T) {
testcases := []struct {
name string
m map[string]string
key string
def int
expected int
}{
{
name: "Get an int value from map that exists",
m: map[string]string{"key": "1"},
key: "key",
def: 0,
expected: 1,
},
{
name: "Get an int value from map that does not exist",
m: map[string]string{"key": "1"},
key: "key2",
def: 0,
expected: 0,
},
{
name: "Get an int value from map that exists but is not an int",
m: map[string]string{"key": "a"},
key: "key",
def: 0,
expected: 0,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
actual := GetElemOrDefaultFromMap(tc.m, tc.key, tc.def)
if actual != tc.expected {
t.Errorf("expected %v, got %v", tc.expected, actual)
}
})
}
})
t.Run("test uint64", func(t *testing.T) {
testcases := []struct {
name string
m map[string]string
key string
def uint64
expected uint64
}{
{
name: "Get an uint64 value from map that exists",
m: map[string]string{"key": "1"},
key: "key",
def: uint64(0),
expected: uint64(1),
},
{
name: "Get an uint64 value from map that does not exist",
m: map[string]string{"key": "1"},
key: "key2",
def: uint64(0),
expected: uint64(0),
},
{
name: "Get an int value from map that exists but is not an uint64",
m: map[string]string{"key": "-1"},
key: "key",
def: 0,
expected: 0,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
actual := GetElemOrDefaultFromMap(tc.m, tc.key, tc.def)
if actual != tc.expected {
t.Errorf("expected %v, got %v", tc.expected, actual)
}
})
}
})
}

View File

@ -38,6 +38,15 @@ const (
// QueryIndexName defines the metadata key for the name of query indexing schema (for redis). // QueryIndexName defines the metadata key for the name of query indexing schema (for redis).
QueryIndexName = "queryIndexName" QueryIndexName = "queryIndexName"
// MaxBulkCountSubKey defines the maximum number of messages to be sent in a single bulk subscribe request.
MaxBulkSubCountKey string = "maxBulkSubCount"
// MaxBulkAwaitDurationKey is the key for the max bulk await duration in the metadata.
MaxBulkSubAwaitDurationMsKey string = "maxBulkSubAwaitDurationMs"
// MaxBulkPubBytesKey defines the maximum bytes to publish in a bulk publish request metadata.
MaxBulkPubBytesKey string = "maxBulkPubBytes"
) )
// TryGetTTL tries to get the ttl as a time.Duration value for pubsub, binding and any other building block. // TryGetTTL tries to get the ttl as a time.Duration value for pubsub, binding and any other building block.

View File

@ -32,6 +32,8 @@ import (
"github.com/Azure/go-autorest/autorest/azure" "github.com/Azure/go-autorest/autorest/azure"
azauth "github.com/dapr/components-contrib/internal/authentication/azure" azauth "github.com/dapr/components-contrib/internal/authentication/azure"
"github.com/dapr/components-contrib/internal/utils"
contribMetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub" "github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger" "github.com/dapr/kit/logger"
"github.com/dapr/kit/retry" "github.com/dapr/kit/retry"
@ -564,6 +566,41 @@ func (aeh *AzureEventHubs) Publish(req *pubsub.PublishRequest) error {
return nil return nil
} }
// BulkPublish sends data to Azure Event Hubs in bulk.
func (aeh *AzureEventHubs) BulkPublish(ctx context.Context, req *pubsub.BulkPublishRequest) (pubsub.BulkPublishResponse, error) {
if _, ok := aeh.hubClients[req.Topic]; !ok {
if err := aeh.ensurePublisherClient(ctx, req.Topic); err != nil {
err = fmt.Errorf("error on establishing hub connection: %s", err)
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err
}
}
// Create a slice of events to send.
events := make([]*eventhub.Event, len(req.Entries))
for i, entry := range req.Entries {
events[i] = &eventhub.Event{Data: entry.Event}
if val, ok := entry.Metadata[partitionKeyMetadataKey]; ok {
events[i].PartitionKey = &val
}
}
// Configure options for sending events.
opts := []eventhub.BatchOption{
eventhub.BatchWithMaxSizeInBytes(utils.GetElemOrDefaultFromMap(
req.Metadata, contribMetadata.MaxBulkPubBytesKey, int(eventhub.DefaultMaxMessageSizeInBytes))),
}
// Send events.
err := aeh.hubClients[req.Topic].SendBatch(ctx, eventhub.NewEventBatchIterator(events...), opts...)
if err != nil {
// Partial success is not supported by Azure Event Hubs.
// If an error occurs, all events are considered failed.
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err
}
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishSucceeded, nil), nil
}
// Subscribe receives data from Azure Event Hubs. // Subscribe receives data from Azure Event Hubs.
func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
err := aeh.validateSubscriptionAttributes() err := aeh.validateSubscriptionAttributes()

View File

@ -21,6 +21,7 @@ import (
"time" "time"
azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/google/uuid"
contribMetadata "github.com/dapr/components-contrib/metadata" contribMetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub" "github.com/dapr/components-contrib/pubsub"
@ -76,66 +77,87 @@ const (
func NewPubsubMessageFromASBMessage(asbMsg *azservicebus.ReceivedMessage, topic string) (*pubsub.NewMessage, error) { func NewPubsubMessageFromASBMessage(asbMsg *azservicebus.ReceivedMessage, topic string) (*pubsub.NewMessage, error) {
pubsubMsg := &pubsub.NewMessage{ pubsubMsg := &pubsub.NewMessage{
Topic: topic, Topic: topic,
Data: asbMsg.Body,
} }
pubsubMsg.Data = asbMsg.Body pubsubMsg.Metadata = addMessageAttributesToMetadata(pubsubMsg.Metadata, asbMsg)
addToMetadata := func(msg *pubsub.NewMessage, key, value string) { return pubsubMsg, nil
if msg.Metadata == nil { }
msg.Metadata = make(map[string]string)
}
msg.Metadata[fmt.Sprintf("metadata.%s", key)] = value func NewBulkMessageEntryFromASBMessage(asbMsg *azservicebus.ReceivedMessage) (pubsub.BulkMessageEntry, error) {
entryId, err := uuid.NewRandom() //nolint:stylecheck
if err != nil {
return pubsub.BulkMessageEntry{}, err
}
bulkMsgEntry := pubsub.BulkMessageEntry{
EntryId: entryId.String(),
Event: asbMsg.Body,
}
bulkMsgEntry.Metadata = addMessageAttributesToMetadata(bulkMsgEntry.Metadata, asbMsg)
return bulkMsgEntry, nil
}
func addMessageAttributesToMetadata(metadata map[string]string, asbMsg *azservicebus.ReceivedMessage) map[string]string {
if metadata == nil {
metadata = map[string]string{}
}
addToMetadata := func(metadata map[string]string, key, value string) {
metadata["metadata."+key] = value
} }
if asbMsg.MessageID != "" { if asbMsg.MessageID != "" {
addToMetadata(pubsubMsg, MessageIDMetadataKey, asbMsg.MessageID) addToMetadata(metadata, MessageIDMetadataKey, asbMsg.MessageID)
} }
if asbMsg.SessionID != nil { if asbMsg.SessionID != nil {
addToMetadata(pubsubMsg, SessionIDMetadataKey, *asbMsg.SessionID) addToMetadata(metadata, SessionIDMetadataKey, *asbMsg.SessionID)
} }
if asbMsg.CorrelationID != nil && *asbMsg.CorrelationID != "" { if asbMsg.CorrelationID != nil && *asbMsg.CorrelationID != "" {
addToMetadata(pubsubMsg, CorrelationIDMetadataKey, *asbMsg.CorrelationID) addToMetadata(metadata, CorrelationIDMetadataKey, *asbMsg.CorrelationID)
} }
if asbMsg.Subject != nil && *asbMsg.Subject != "" { if asbMsg.Subject != nil && *asbMsg.Subject != "" {
addToMetadata(pubsubMsg, LabelMetadataKey, *asbMsg.Subject) addToMetadata(metadata, LabelMetadataKey, *asbMsg.Subject)
} }
if asbMsg.ReplyTo != nil && *asbMsg.ReplyTo != "" { if asbMsg.ReplyTo != nil && *asbMsg.ReplyTo != "" {
addToMetadata(pubsubMsg, ReplyToMetadataKey, *asbMsg.ReplyTo) addToMetadata(metadata, ReplyToMetadataKey, *asbMsg.ReplyTo)
} }
if asbMsg.To != nil && *asbMsg.To != "" { if asbMsg.To != nil && *asbMsg.To != "" {
addToMetadata(pubsubMsg, ToMetadataKey, *asbMsg.To) addToMetadata(metadata, ToMetadataKey, *asbMsg.To)
} }
if asbMsg.ContentType != nil && *asbMsg.ContentType != "" { if asbMsg.ContentType != nil && *asbMsg.ContentType != "" {
addToMetadata(pubsubMsg, ContentTypeMetadataKey, *asbMsg.ContentType) addToMetadata(metadata, ContentTypeMetadataKey, *asbMsg.ContentType)
} }
if asbMsg.LockToken != [16]byte{} { if asbMsg.LockToken != [16]byte{} {
addToMetadata(pubsubMsg, LockTokenMetadataKey, base64.StdEncoding.EncodeToString(asbMsg.LockToken[:])) addToMetadata(metadata, LockTokenMetadataKey, base64.StdEncoding.EncodeToString(asbMsg.LockToken[:]))
} }
// Always set delivery count. // Always set delivery count.
addToMetadata(pubsubMsg, DeliveryCountMetadataKey, strconv.FormatInt(int64(asbMsg.DeliveryCount), 10)) addToMetadata(metadata, DeliveryCountMetadataKey, strconv.FormatInt(int64(asbMsg.DeliveryCount), 10))
if asbMsg.EnqueuedTime != nil { if asbMsg.EnqueuedTime != nil {
// Preserve RFC2616 time format. // Preserve RFC2616 time format.
addToMetadata(pubsubMsg, EnqueuedTimeUtcMetadataKey, asbMsg.EnqueuedTime.UTC().Format(http.TimeFormat)) addToMetadata(metadata, EnqueuedTimeUtcMetadataKey, asbMsg.EnqueuedTime.UTC().Format(http.TimeFormat))
} }
if asbMsg.SequenceNumber != nil { if asbMsg.SequenceNumber != nil {
addToMetadata(pubsubMsg, SequenceNumberMetadataKey, strconv.FormatInt(*asbMsg.SequenceNumber, 10)) addToMetadata(metadata, SequenceNumberMetadataKey, strconv.FormatInt(*asbMsg.SequenceNumber, 10))
} }
if asbMsg.ScheduledEnqueueTime != nil { if asbMsg.ScheduledEnqueueTime != nil {
// Preserve RFC2616 time format. // Preserve RFC2616 time format.
addToMetadata(pubsubMsg, ScheduledEnqueueTimeUtcMetadataKey, asbMsg.ScheduledEnqueueTime.UTC().Format(http.TimeFormat)) addToMetadata(metadata, ScheduledEnqueueTimeUtcMetadataKey, asbMsg.ScheduledEnqueueTime.UTC().Format(http.TimeFormat))
} }
if asbMsg.PartitionKey != nil { if asbMsg.PartitionKey != nil {
addToMetadata(pubsubMsg, PartitionKeyMetadataKey, *asbMsg.PartitionKey) addToMetadata(metadata, PartitionKeyMetadataKey, *asbMsg.PartitionKey)
} }
if asbMsg.LockedUntil != nil { if asbMsg.LockedUntil != nil {
// Preserve RFC2616 time format. // Preserve RFC2616 time format.
addToMetadata(pubsubMsg, LockedUntilUtcMetadataKey, asbMsg.LockedUntil.UTC().Format(http.TimeFormat)) addToMetadata(metadata, LockedUntilUtcMetadataKey, asbMsg.LockedUntil.UTC().Format(http.TimeFormat))
} }
return pubsubMsg, nil return metadata
} }
// NewASBMessageFromPubsubRequest builds a new Azure Service Bus message from a PublishRequest. // NewASBMessageFromPubsubRequest builds a new Azure Service Bus message from a PublishRequest.
@ -144,64 +166,98 @@ func NewASBMessageFromPubsubRequest(req *pubsub.PublishRequest) (*azservicebus.M
Body: req.Data, Body: req.Data,
} }
err := addMetadataToMessage(asbMsg, req.Metadata)
return asbMsg, err
}
// NewASBMessageFromBulkMessageEntry builds a new Azure Service Bus message from a BulkMessageEntry.
func NewASBMessageFromBulkMessageEntry(entry pubsub.BulkMessageEntry) (*azservicebus.Message, error) {
asbMsg := &azservicebus.Message{
Body: entry.Event,
ContentType: &entry.ContentType,
}
err := addMetadataToMessage(asbMsg, entry.Metadata)
return asbMsg, err
}
func addMetadataToMessage(asbMsg *azservicebus.Message, metadata map[string]string) error {
// Common properties. // Common properties.
ttl, ok, _ := contribMetadata.TryGetTTL(req.Metadata) ttl, ok, _ := contribMetadata.TryGetTTL(metadata)
if ok { if ok {
asbMsg.TimeToLive = &ttl asbMsg.TimeToLive = &ttl
} }
// Azure Service Bus specific properties. // Azure Service Bus specific properties.
// reference: https://docs.microsoft.com/en-us/rest/api/servicebus/message-headers-and-properties#message-headers // reference: https://docs.microsoft.com/en-us/rest/api/servicebus/message-headers-and-properties#message-headers
msgID, ok, _ := tryGetString(req.Metadata, MessageIDMetadataKey) msgID, ok, _ := tryGetString(metadata, MessageIDMetadataKey)
if ok { if ok {
asbMsg.MessageID = &msgID asbMsg.MessageID = &msgID
} }
correlationID, ok, _ := tryGetString(req.Metadata, CorrelationIDMetadataKey) correlationID, ok, _ := tryGetString(metadata, CorrelationIDMetadataKey)
if ok { if ok {
asbMsg.CorrelationID = &correlationID asbMsg.CorrelationID = &correlationID
} }
sessionID, okSessionID, _ := tryGetString(req.Metadata, SessionIDMetadataKey) sessionID, okSessionID, _ := tryGetString(metadata, SessionIDMetadataKey)
if okSessionID { if okSessionID {
asbMsg.SessionID = &sessionID asbMsg.SessionID = &sessionID
} }
label, ok, _ := tryGetString(req.Metadata, LabelMetadataKey) label, ok, _ := tryGetString(metadata, LabelMetadataKey)
if ok { if ok {
asbMsg.Subject = &label asbMsg.Subject = &label
} }
replyTo, ok, _ := tryGetString(req.Metadata, ReplyToMetadataKey) replyTo, ok, _ := tryGetString(metadata, ReplyToMetadataKey)
if ok { if ok {
asbMsg.ReplyTo = &replyTo asbMsg.ReplyTo = &replyTo
} }
to, ok, _ := tryGetString(req.Metadata, ToMetadataKey) to, ok, _ := tryGetString(metadata, ToMetadataKey)
if ok { if ok {
asbMsg.To = &to asbMsg.To = &to
} }
partitionKey, ok, _ := tryGetString(req.Metadata, PartitionKeyMetadataKey) partitionKey, ok, _ := tryGetString(metadata, PartitionKeyMetadataKey)
if ok { if ok {
if okSessionID && partitionKey != sessionID { if okSessionID && partitionKey != sessionID {
return nil, fmt.Errorf("session id %s and partition key %s should be equal when both present", sessionID, partitionKey) return fmt.Errorf("session id %s and partition key %s should be equal when both present", sessionID, partitionKey)
} }
asbMsg.PartitionKey = &partitionKey asbMsg.PartitionKey = &partitionKey
} }
contentType, ok, _ := tryGetString(req.Metadata, ContentTypeMetadataKey) contentType, ok, _ := tryGetString(metadata, ContentTypeMetadataKey)
if ok { if ok {
asbMsg.ContentType = &contentType asbMsg.ContentType = &contentType
} }
scheduledEnqueueTime, ok, _ := tryGetScheduledEnqueueTime(req.Metadata) scheduledEnqueueTime, ok, _ := tryGetScheduledEnqueueTime(metadata)
if ok { if ok {
asbMsg.ScheduledEnqueueTime = scheduledEnqueueTime asbMsg.ScheduledEnqueueTime = scheduledEnqueueTime
} }
return asbMsg, nil return nil
}
// UpdateASBBatchMessageWithBulkPublishRequest updates the batch message with messages from the bulk publish request.
func UpdateASBBatchMessageWithBulkPublishRequest(asbMsgBatch *azservicebus.MessageBatch, req *pubsub.BulkPublishRequest) error {
// Add entries from bulk request to batch.
for _, entry := range req.Entries {
asbMsg, err := NewASBMessageFromBulkMessageEntry(entry)
if err != nil {
return err
}
err = asbMsgBatch.AddMessage(asbMsg, nil)
if err != nil {
return err
}
}
return nil
} }
func tryGetString(props map[string]string, key string) (string, bool, error) { func tryGetString(props map[string]string, key string) (string, bool, error) {

View File

@ -14,6 +14,7 @@ limitations under the License.
package servicebus package servicebus
import ( import (
"fmt"
"net/http" "net/http"
"testing" "testing"
"time" "time"
@ -21,48 +22,107 @@ import (
azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/dapr/components-contrib/pubsub"
) )
func TestNewASBMessageFromPubsubRequest(t *testing.T) { var (
testMessageData := []byte("test message") testMessageID = "testMessageId"
testMessageID := "testMessageId" testCorrelationID = "testCorrelationId"
testCorrelationID := "testCorrelationId" testSessionID = "testSessionId"
testSessionID := "testSessionId" testLabel = "testLabel"
testLabel := "testLabel" testReplyTo = "testReplyTo"
testReplyTo := "testReplyTo" testTo = "testTo"
testTo := "testTo" testPartitionKey = testSessionID
testPartitionKey := testSessionID testPartitionKeyUnique = "testPartitionKey"
testPartitionKeyUnique := "testPartitionKey" testContentType = "testContentType"
testContentType := "testContentType" nowUtc = time.Now().UTC()
nowUtc := time.Now().UTC() testScheduledEnqueueTimeUtc = nowUtc.Format(http.TimeFormat)
testScheduledEnqueueTimeUtc := nowUtc.Format(http.TimeFormat) testLockTokenString = "bG9ja3Rva2VuAAAAAAAAAA==" //nolint:gosec
testLockTokenBytes = [16]byte{108, 111, 99, 107, 116, 111, 107, 101, 110}
testDeliveryCount uint32 = 1
testSampleTime = time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
testSampleTimeHTTPFormat = "Thu, 01 Jan 1970 00:00:00 GMT"
testSequenceNumber int64 = 1
)
func TestAddMessageAttributesToMetadata(t *testing.T) {
testCases := []struct {
name string
ASBMessage azservicebus.ReceivedMessage
expectedMetadata map[string]string
}{
{
name: "Metadata must contain all attributes with the correct prefix",
ASBMessage: azservicebus.ReceivedMessage{
MessageID: testMessageID,
SessionID: &testSessionID,
CorrelationID: &testCorrelationID,
Subject: &testLabel,
ReplyTo: &testReplyTo,
To: &testTo,
ContentType: &testContentType,
LockToken: testLockTokenBytes,
DeliveryCount: testDeliveryCount,
EnqueuedTime: &testSampleTime,
SequenceNumber: &testSequenceNumber,
ScheduledEnqueueTime: &testSampleTime,
PartitionKey: &testPartitionKey,
LockedUntil: &testSampleTime,
},
expectedMetadata: map[string]string{
"metadata." + MessageIDMetadataKey: testMessageID,
"metadata." + SessionIDMetadataKey: testSessionID,
"metadata." + CorrelationIDMetadataKey: testCorrelationID,
"metadata." + LabelMetadataKey: testLabel, // Subject
"metadata." + ReplyToMetadataKey: testReplyTo,
"metadata." + ToMetadataKey: testTo,
"metadata." + ContentTypeMetadataKey: testContentType,
"metadata." + LockTokenMetadataKey: testLockTokenString,
"metadata." + DeliveryCountMetadataKey: "1",
"metadata." + EnqueuedTimeUtcMetadataKey: testSampleTimeHTTPFormat,
"metadata." + SequenceNumberMetadataKey: "1",
"metadata." + ScheduledEnqueueTimeUtcMetadataKey: testSampleTimeHTTPFormat,
"metadata." + PartitionKeyMetadataKey: testPartitionKey,
"metadata." + LockedUntilUtcMetadataKey: testSampleTimeHTTPFormat,
},
},
}
metadataMap := map[string]map[string]string{
"Nil": nil,
"Empty": {},
}
for _, tc := range testCases {
for mType, mMap := range metadataMap {
t.Run(fmt.Sprintf("%s, metadata is %s", tc.name, mType), func(t *testing.T) {
actual := addMessageAttributesToMetadata(mMap, &tc.ASBMessage)
assert.Equal(t, tc.expectedMetadata, actual)
})
}
}
}
func TestAddMetadataToMessage(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string
pubsubRequest pubsub.PublishRequest metadata map[string]string
expectedAzServiceBusMessage azservicebus.Message expectedAzServiceBusMessage azservicebus.Message
expectError bool expectError bool
}{ }{
{ {
name: "Maps pubsub request to azure service bus message.", name: "Maps pubsub request to azure service bus message.",
pubsubRequest: pubsub.PublishRequest{ metadata: map[string]string{
Data: testMessageData, MessageIDMetadataKey: testMessageID,
Metadata: map[string]string{ CorrelationIDMetadataKey: testCorrelationID,
MessageIDMetadataKey: testMessageID, SessionIDMetadataKey: testSessionID,
CorrelationIDMetadataKey: testCorrelationID, LabelMetadataKey: testLabel,
SessionIDMetadataKey: testSessionID, ReplyToMetadataKey: testReplyTo,
LabelMetadataKey: testLabel, ToMetadataKey: testTo,
ReplyToMetadataKey: testReplyTo, PartitionKeyMetadataKey: testPartitionKey,
ToMetadataKey: testTo, ContentTypeMetadataKey: testContentType,
PartitionKeyMetadataKey: testPartitionKey, ScheduledEnqueueTimeUtcMetadataKey: testScheduledEnqueueTimeUtc,
ContentTypeMetadataKey: testContentType,
ScheduledEnqueueTimeUtcMetadataKey: testScheduledEnqueueTimeUtc,
},
}, },
expectedAzServiceBusMessage: azservicebus.Message{ expectedAzServiceBusMessage: azservicebus.Message{
Body: testMessageData,
MessageID: &testMessageID, MessageID: &testMessageID,
CorrelationID: &testCorrelationID, CorrelationID: &testCorrelationID,
SessionID: &testSessionID, SessionID: &testSessionID,
@ -77,21 +137,17 @@ func TestNewASBMessageFromPubsubRequest(t *testing.T) {
}, },
{ {
name: "Errors when partition key and session id set but not equal.", name: "Errors when partition key and session id set but not equal.",
pubsubRequest: pubsub.PublishRequest{ metadata: map[string]string{
Data: testMessageData, MessageIDMetadataKey: testMessageID,
Metadata: map[string]string{ CorrelationIDMetadataKey: testCorrelationID,
MessageIDMetadataKey: testMessageID, SessionIDMetadataKey: testSessionID,
CorrelationIDMetadataKey: testCorrelationID, LabelMetadataKey: testLabel,
SessionIDMetadataKey: testSessionID, ReplyToMetadataKey: testReplyTo,
LabelMetadataKey: testLabel, ToMetadataKey: testTo,
ReplyToMetadataKey: testReplyTo, PartitionKeyMetadataKey: testPartitionKeyUnique,
ToMetadataKey: testTo, ContentTypeMetadataKey: testContentType,
PartitionKeyMetadataKey: testPartitionKeyUnique,
ContentTypeMetadataKey: testContentType,
},
}, },
expectedAzServiceBusMessage: azservicebus.Message{ expectedAzServiceBusMessage: azservicebus.Message{
Body: testMessageData,
MessageID: &testMessageID, MessageID: &testMessageID,
CorrelationID: &testCorrelationID, CorrelationID: &testCorrelationID,
SessionID: &testSessionID, SessionID: &testSessionID,
@ -108,7 +164,8 @@ func TestNewASBMessageFromPubsubRequest(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
// act. // act.
msg, err := NewASBMessageFromPubsubRequest(&tc.pubsubRequest) msg := &azservicebus.Message{}
err := addMetadataToMessage(msg, tc.metadata)
// assert. // assert.
if tc.expectError { if tc.expectError {

View File

@ -30,6 +30,7 @@ import (
azauth "github.com/dapr/components-contrib/internal/authentication/azure" azauth "github.com/dapr/components-contrib/internal/authentication/azure"
impl "github.com/dapr/components-contrib/internal/component/azure/servicebus" impl "github.com/dapr/components-contrib/internal/component/azure/servicebus"
"github.com/dapr/components-contrib/internal/utils"
contribMetadata "github.com/dapr/components-contrib/metadata" contribMetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub" "github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger" "github.com/dapr/kit/logger"
@ -37,7 +38,9 @@ import (
) )
const ( const (
errorMessagePrefix = "azure service bus error:" errorMessagePrefix = "azure service bus error:"
defaultMaxBulkSubCount = 100
defaultMaxBulkPubBytes uint64 = 1024 * 128 // 128 KiB
) )
var retriableSendingErrors = map[amqp.ErrorCondition]struct{}{ var retriableSendingErrors = map[amqp.ErrorCondition]struct{}{
@ -359,7 +362,97 @@ func (a *azureServiceBus) Publish(req *pubsub.PublishRequest) error {
) )
} }
func (a *azureServiceBus) BulkPublish(ctx context.Context, req *pubsub.BulkPublishRequest) (pubsub.BulkPublishResponse, error) {
// If the request is empty, sender.SendMessageBatch will panic later.
// Return an empty response to avoid this.
if len(req.Entries) == 0 {
a.logger.Warnf("Empty bulk publish request, skipping")
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishSucceeded, nil), nil
}
sender, err := a.senderForTopic(ctx, req.Topic)
if err != nil {
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err
}
// Create a new batch of messages with batch options.
batchOpts := &servicebus.MessageBatchOptions{
MaxBytes: utils.GetElemOrDefaultFromMap(req.Metadata, contribMetadata.MaxBulkPubBytesKey, defaultMaxBulkPubBytes),
}
batchMsg, err := sender.NewMessageBatch(ctx, batchOpts)
if err != nil {
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err
}
// Add messages from the bulk publish request to the batch.
err = UpdateASBBatchMessageWithBulkPublishRequest(batchMsg, req)
if err != nil {
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err
}
// Azure Service Bus does not return individual status for each message in the request.
err = sender.SendMessageBatch(ctx, batchMsg, nil)
if err != nil {
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishFailed, err), err
}
return pubsub.NewBulkPublishResponse(req.Entries, pubsub.PublishSucceeded, nil), nil
}
func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
sub := impl.NewSubscription(
subscribeCtx,
a.metadata.MaxActiveMessages,
a.metadata.TimeoutInSec,
a.metadata.MaxRetriableErrorsPerSec,
a.metadata.MaxConcurrentHandlers,
"topic "+req.Topic,
a.logger,
)
receiveAndBlockFn := func(onFirstSuccess func()) error {
return sub.ReceiveAndBlock(
a.getHandlerFunc(req.Topic, handler),
a.metadata.LockRenewalInSec,
false, // Bulk is not supported in regular Subscribe.
onFirstSuccess,
)
}
return a.doSubscribe(subscribeCtx, req, sub, receiveAndBlockFn)
}
func (a *azureServiceBus) BulkSubscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.BulkHandler) error {
maxBulkSubCount := utils.GetElemOrDefaultFromMap(req.Metadata, contribMetadata.MaxBulkSubCountKey, defaultMaxBulkSubCount)
sub := impl.NewBulkSubscription(
subscribeCtx,
a.metadata.MaxActiveMessages,
a.metadata.TimeoutInSec,
maxBulkSubCount,
a.metadata.MaxRetriableErrorsPerSec,
a.metadata.MaxConcurrentHandlers,
"topic "+req.Topic,
a.logger,
)
receiveAndBlockFn := func(onFirstSuccess func()) error {
return sub.ReceiveAndBlock(
a.getBulkHandlerFunc(req.Topic, handler),
a.metadata.LockRenewalInSec,
true, // Bulk is supported in BulkSubscribe.
onFirstSuccess,
)
}
return a.doSubscribe(subscribeCtx, req, sub, receiveAndBlockFn)
}
// doSubscribe is a helper function that handles the common logic for both Subscribe and BulkSubscribe.
// The receiveAndBlockFn is a function should invoke a blocking call to receive messages from the topic.
func (a *azureServiceBus) doSubscribe(subscribeCtx context.Context,
req pubsub.SubscribeRequest, sub *impl.Subscription, receiveAndBlockFn func(func()) error,
) error {
subID := a.metadata.ConsumerID subID := a.metadata.ConsumerID
if !a.metadata.DisableEntityManagement { if !a.metadata.DisableEntityManagement {
err := a.ensureSubscription(subscribeCtx, subID, req.Topic) err := a.ensureSubscription(subscribeCtx, subID, req.Topic)
@ -374,19 +467,14 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub
bo.InitialInterval = time.Duration(a.metadata.MinConnectionRecoveryInSec) * time.Second bo.InitialInterval = time.Duration(a.metadata.MinConnectionRecoveryInSec) * time.Second
bo.MaxInterval = time.Duration(a.metadata.MaxConnectionRecoveryInSec) * time.Second bo.MaxInterval = time.Duration(a.metadata.MaxConnectionRecoveryInSec) * time.Second
onFirstSuccess := func() {
// Reset the backoff when the subscription is successful and we have received the first message
bo.Reset()
}
go func() { go func() {
// Reconnect loop. // Reconnect loop.
for { for {
sub := impl.NewSubscription(
subscribeCtx,
a.metadata.MaxActiveMessages,
a.metadata.TimeoutInSec,
a.metadata.MaxRetriableErrorsPerSec,
a.metadata.MaxConcurrentHandlers,
"topic "+req.Topic,
a.logger,
)
// Blocks until a successful connection (or until context is canceled) // Blocks until a successful connection (or until context is canceled)
err := sub.Connect(func() (*servicebus.Receiver, error) { err := sub.Connect(func() (*servicebus.Receiver, error) {
return a.client.NewReceiverForSubscription(req.Topic, subID, nil) return a.client.NewReceiverForSubscription(req.Topic, subID, nil)
@ -399,16 +487,9 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub
return return
} }
// ReceiveAndBlock will only return with an error that it cannot handle internally. The subscription connection is closed when this method returns. // receiveAndBlockFn will only return with an error that it cannot handle internally. The subscription connection is closed when this method returns.
// If that occurs, we will log the error and attempt to re-establish the subscription connection until we exhaust the number of reconnect attempts. // If that occurs, we will log the error and attempt to re-establish the subscription connection until we exhaust the number of reconnect attempts.
err = sub.ReceiveAndBlock( err = receiveAndBlockFn(onFirstSuccess)
a.getHandlerFunc(req.Topic, handler),
a.metadata.LockRenewalInSec,
func() {
// Reset the backoff when the subscription is successful and we have received the first message
bo.Reset()
},
)
if err != nil { if err != nil {
var detachError *amqp.DetachError var detachError *amqp.DetachError
var amqpError *amqp.Error var amqpError *amqp.Error
@ -442,16 +523,58 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub
} }
func (a *azureServiceBus) getHandlerFunc(topic string, handler pubsub.Handler) impl.HandlerFunc { func (a *azureServiceBus) getHandlerFunc(topic string, handler pubsub.Handler) impl.HandlerFunc {
return func(ctx context.Context, asbMsg *servicebus.ReceivedMessage) error { emptyResponseItems := []impl.HandlerResponseItem{}
pubsubMsg, err := NewPubsubMessageFromASBMessage(asbMsg, topic) // Only the first ASB message is used in the actual handler invocation.
return func(ctx context.Context, asbMsgs []*servicebus.ReceivedMessage) ([]impl.HandlerResponseItem, error) {
if len(asbMsgs) != 1 {
return nil, fmt.Errorf("expected 1 message, got %d", len(asbMsgs))
}
pubsubMsg, err := NewPubsubMessageFromASBMessage(asbMsgs[0], topic)
if err != nil { if err != nil {
return fmt.Errorf("failed to get pubsub message from azure service bus message: %+v", err) return emptyResponseItems, fmt.Errorf("failed to get pubsub message from azure service bus message: %+v", err)
} }
handleCtx, handleCancel := context.WithTimeout(ctx, time.Duration(a.metadata.HandlerTimeoutInSec)*time.Second) handleCtx, handleCancel := context.WithTimeout(ctx, time.Duration(a.metadata.HandlerTimeoutInSec)*time.Second)
defer handleCancel() defer handleCancel()
a.logger.Debugf("Calling app's handler for message %s on topic %s", asbMsg.MessageID, topic) a.logger.Debugf("Calling app's handler for message %s on topic %s", asbMsgs[0].MessageID, topic)
return handler(handleCtx, pubsubMsg) return emptyResponseItems, handler(handleCtx, pubsubMsg)
}
}
func (a *azureServiceBus) getBulkHandlerFunc(topic string, handler pubsub.BulkHandler) impl.HandlerFunc {
return func(ctx context.Context, asbMsgs []*servicebus.ReceivedMessage) ([]impl.HandlerResponseItem, error) {
pubsubMsgs := make([]pubsub.BulkMessageEntry, len(asbMsgs))
for i, asbMsg := range asbMsgs {
pubsubMsg, err := NewBulkMessageEntryFromASBMessage(asbMsg)
if err != nil {
return nil, fmt.Errorf("failed to get pubsub message from azure service bus message: %+v", err)
}
pubsubMsgs[i] = pubsubMsg
}
// Note, no metadata is currently supported here.
// In the future, we could add propagate metadata to the handler if required.
bulkMessage := &pubsub.BulkMessage{
Entries: pubsubMsgs,
Metadata: map[string]string{},
Topic: topic,
}
handleCtx, handleCancel := context.WithTimeout(ctx, time.Duration(a.metadata.HandlerTimeoutInSec)*time.Second)
defer handleCancel()
a.logger.Debugf("Calling app's handler for %d messages on topic %s", len(asbMsgs), topic)
resps, err := handler(handleCtx, bulkMessage)
implResps := make([]impl.HandlerResponseItem, len(resps))
for i, resp := range resps {
implResps[i] = impl.HandlerResponseItem{
EntryId: resp.EntryId,
Error: resp.Error,
}
}
return implResps, err
} }
} }

View File

@ -29,6 +29,8 @@ import (
const ( const (
// DefaultCloudEventType is the default event type for an Dapr published event. // DefaultCloudEventType is the default event type for an Dapr published event.
DefaultCloudEventType = "com.dapr.event.sent" DefaultCloudEventType = "com.dapr.event.sent"
// DefaultBulkEventType is the default bulk event type for a Dapr published event.
DefaultBulkEventType = "com.dapr.event.sent.bulk"
// CloudEventsSpecVersion is the specversion used by Dapr for the cloud events implementation. // CloudEventsSpecVersion is the specversion used by Dapr for the cloud events implementation.
CloudEventsSpecVersion = "1.0" CloudEventsSpecVersion = "1.0"
// DefaultCloudEventSource is the default event source. // DefaultCloudEventSource is the default event source.

View File

@ -19,6 +19,8 @@ import (
"github.com/dapr/kit/logger" "github.com/dapr/kit/logger"
"github.com/dapr/components-contrib/internal/component/kafka" "github.com/dapr/components-contrib/internal/component/kafka"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub" "github.com/dapr/components-contrib/pubsub"
) )
@ -36,7 +38,32 @@ func (p *PubSub) Init(metadata pubsub.Metadata) error {
} }
func (p *PubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { func (p *PubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
p.kafka.AddTopicHandler(req.Topic, adaptHandler(handler)) handlerConfig := kafka.SubscriptionHandlerConfig{
IsBulkSubscribe: false,
Handler: adaptHandler(handler),
}
return p.subscribeUtil(ctx, req, handlerConfig)
}
func (p *PubSub) BulkSubscribe(ctx context.Context, req pubsub.SubscribeRequest,
handler pubsub.BulkHandler,
) error {
subConfig := pubsub.BulkSubscribeConfig{
MaxBulkSubCount: kafka.GetIntFromMetadata(req.Metadata, metadata.MaxBulkSubCountKey,
kafka.DefaultMaxBulkSubCount),
MaxBulkSubAwaitDurationMs: kafka.GetIntFromMetadata(req.Metadata,
metadata.MaxBulkSubAwaitDurationMsKey, kafka.DefaultMaxBulkSubAwaitDurationMs),
}
handlerConfig := kafka.SubscriptionHandlerConfig{
IsBulkSubscribe: true,
SubscribeConfig: subConfig,
BulkHandler: adaptBulkHandler(handler),
}
return p.subscribeUtil(ctx, req, handlerConfig)
}
func (p *PubSub) subscribeUtil(ctx context.Context, req pubsub.SubscribeRequest, handlerConfig kafka.SubscriptionHandlerConfig) error {
p.kafka.AddTopicHandler(req.Topic, handlerConfig)
go func() { go func() {
// Wait for context cancelation // Wait for context cancelation
@ -78,6 +105,11 @@ func (p *PubSub) Publish(req *pubsub.PublishRequest) error {
return p.kafka.Publish(req.Topic, req.Data, req.Metadata) return p.kafka.Publish(req.Topic, req.Data, req.Metadata)
} }
// BatchPublish messages to Kafka cluster.
func (p *PubSub) BulkPublish(ctx context.Context, req *pubsub.BulkPublishRequest) (pubsub.BulkPublishResponse, error) {
return p.kafka.BulkPublish(ctx, req.Topic, req.Entries, req.Metadata)
}
func (p *PubSub) Close() (err error) { func (p *PubSub) Close() (err error) {
p.subscribeCancel() p.subscribeCancel()
return p.kafka.Close() return p.kafka.Close()
@ -97,3 +129,24 @@ func adaptHandler(handler pubsub.Handler) kafka.EventHandler {
}) })
} }
} }
func adaptBulkHandler(handler pubsub.BulkHandler) kafka.BulkEventHandler {
return func(ctx context.Context, event *kafka.KafkaBulkMessage) ([]pubsub.BulkSubscribeResponseEntry, error) {
messages := make([]pubsub.BulkMessageEntry, 0)
for _, leafEvent := range event.Entries {
message := pubsub.BulkMessageEntry{
EntryId: leafEvent.EntryId,
Event: leafEvent.Event,
Metadata: leafEvent.Metadata,
ContentType: leafEvent.ContentType,
}
messages = append(messages, message)
}
return handler(ctx, &pubsub.BulkMessage{
Topic: event.Topic,
Entries: messages,
Metadata: event.Metadata,
})
}
}

View File

@ -29,9 +29,40 @@ type PubSub interface {
Close() error Close() error
} }
// BulkPublisher is the interface that wraps the BulkPublish method.
// BulkPublish publishes a collection of entries/messages in a BulkPublishRequest to a
// message bus topic and returns a BulkPublishResponse with individual statuses for each message.
type BulkPublisher interface {
BulkPublish(ctx context.Context, req *BulkPublishRequest) (BulkPublishResponse, error)
}
// BulkSubscriber is the interface defining BulkSubscribe definition for message buses
type BulkSubscriber interface {
// BulkSubscribe is used to subscribe to a topic and receive collection of entries/ messages
// from a message bus topic.
// The bulkHandler will be called with a list of messages.
BulkSubscribe(ctx context.Context, req SubscribeRequest, bulkHandler BulkHandler) error
}
// Handler is the handler used to invoke the app handler. // Handler is the handler used to invoke the app handler.
type Handler func(ctx context.Context, msg *NewMessage) error type Handler func(ctx context.Context, msg *NewMessage) error
// BulkHandler is the handler used to invoke the app handler in a bulk fashion.
// If second return type error is not nil, and []BulkSubscribeResponseEntry is nil,
// it represents some issue and that none of the message could be sent.
// If second return type error is not nil, and []BulkSubscribeResponseEntry is also not nil,
// []BulkSubscribeResponseEntry can be checked for each message's response status.
// If second return type error is nil, that reflects all items were sent successfully
// and []BulkSubscribeResponseEntry doesn't matter
// []BulkSubscribeResponseEntry represents individual statuses for each message in an
// orderly fashion.
type BulkHandler func(ctx context.Context, msg *BulkMessage) ([]BulkSubscribeResponseEntry, error)
func Ping(pubsub PubSub) error { func Ping(pubsub PubSub) error {
// checks if this pubsub has the ping option then executes // checks if this pubsub has the ping option then executes
if pubsubWithPing, ok := pubsub.(health.Pinger); ok { if pubsubWithPing, ok := pubsub.(health.Pinger); ok {

View File

@ -22,6 +22,14 @@ type PublishRequest struct {
ContentType *string `json:"contentType,omitempty"` ContentType *string `json:"contentType,omitempty"`
} }
// BulkPublishRequest is the request to publish mutilple messages.
type BulkPublishRequest struct {
Entries []BulkMessageEntry `json:"entries"`
PubsubName string `json:"pubsubname"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata"`
}
// SubscribeRequest is the request to subscribe to a topic. // SubscribeRequest is the request to subscribe to a topic.
type SubscribeRequest struct { type SubscribeRequest struct {
Topic string `json:"topic"` Topic string `json:"topic"`
@ -35,3 +43,26 @@ type NewMessage struct {
Metadata map[string]string `json:"metadata"` Metadata map[string]string `json:"metadata"`
ContentType *string `json:"contentType,omitempty"` ContentType *string `json:"contentType,omitempty"`
} }
// BulkMessage represents bulk message arriving from a message bus instance.
type BulkMessage struct {
Entries []BulkMessageEntry `json:"entries"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata"`
}
// BulkMessageEntry represents a single message inside a bulk request.
type BulkMessageEntry struct {
EntryId string `json:"entryId"` //nolint:stylecheck
Event []byte `json:"event"`
ContentType string `json:"contentType,omitempty"`
Metadata map[string]string `json:"metadata"`
}
// BulkSubscribeConfig represents the configuration for bulk subscribe.
// It depends on specific componets to support these.
type BulkSubscribeConfig struct {
MaxBulkSubCount int `json:"maxBulkSubCount"`
MaxBulkSubAwaitDurationMs int `json:"maxBulkSubAwaitDurationMs"`
MaxBulkSizeBytes int `json:"maxBulkSizeBytes"`
}

View File

@ -16,6 +16,9 @@ package pubsub
// AppResponseStatus represents a status of a PubSub response. // AppResponseStatus represents a status of a PubSub response.
type AppResponseStatus string type AppResponseStatus string
// BulkPublishStatus represents a status of a Bulk Publish response.
type BulkPublishStatus string
const ( const (
// Success means the message is received and processed correctly. // Success means the message is received and processed correctly.
Success AppResponseStatus = "SUCCESS" Success AppResponseStatus = "SUCCESS"
@ -23,9 +26,68 @@ const (
Retry AppResponseStatus = "RETRY" Retry AppResponseStatus = "RETRY"
// Drop means the message is received but should not be processed. // Drop means the message is received but should not be processed.
Drop AppResponseStatus = "DROP" Drop AppResponseStatus = "DROP"
// PublishSucceeded represents that message was published successfully.
PublishSucceeded BulkPublishStatus = "SUCCESS"
// PublishFailed represents that message publishing failed.
PublishFailed BulkPublishStatus = "FAILED"
) )
// AppResponse is the object describing the response from user code after a pubsub event. // AppResponse is the object describing the response from user code after a pubsub event.
type AppResponse struct { type AppResponse struct {
Status AppResponseStatus `json:"status"` Status AppResponseStatus `json:"status"`
} }
// AppBulkResponseEntry Represents single response, as part of AppBulkResponse, to be
// sent by subscibed App for the corresponding single message during bulk subscribe
type AppBulkResponseEntry struct {
EntryId string `json:"entryId"` //nolint:stylecheck
Status AppResponseStatus `json:"status"`
}
// AppBulkResponse is the whole bulk subscribe response sent by App
type AppBulkResponse struct {
AppResponses []AppBulkResponseEntry `json:"statuses"`
}
// BulkPublishResponseEntry Represents single publish response, as part of BulkPublishResponse
// to be sent to publishing App for the corresponding single message during bulk publish
type BulkPublishResponseEntry struct {
EntryId string `json:"entryId"` //nolint:stylecheck
Status BulkPublishStatus `json:"status"`
Error error `json:"error"`
}
// BulkPublishResponse is the whole bulk publish response sent to App
type BulkPublishResponse struct {
Statuses []BulkPublishResponseEntry `json:"statuses"`
}
// BulkSubscribeResponseEntry Represents single subscribe response item, as part of BulkSubscribeResponse
// to be sent to building block for the corresponding single message during bulk subscribe
type BulkSubscribeResponseEntry struct {
EntryId string `json:"entryId"` //nolint:stylecheck
Error error `json:"error"`
}
// BulkSubscribeResponse is the whole bulk subscribe response sent to building block
type BulkSubscribeResponse struct {
Error error `json:"error"`
Statuses []BulkSubscribeResponseEntry `json:"statuses"`
}
// NewBulkPublishResponse returns a BulkPublishResponse with each entry having same status and error.
// This method is a helper method to map a single error/success response on BulkPublish to multiple events.
func NewBulkPublishResponse(messages []BulkMessageEntry, status BulkPublishStatus, err error) BulkPublishResponse {
response := BulkPublishResponse{}
response.Statuses = make([]BulkPublishResponseEntry, len(messages))
for i, msg := range messages {
st := BulkPublishResponseEntry{}
st.EntryId = msg.EntryId
st.Status = status
if err != nil {
st.Error = err
}
response.Statuses[i] = st
}
return response
}

79
pubsub/responses_test.go Normal file
View File

@ -0,0 +1,79 @@
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pubsub
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestNewBulkPublishResponse(t *testing.T) {
messages := []BulkMessageEntry{
{
EntryId: "1",
Event: []byte("event 1"),
Metadata: map[string]string{
"ttlInSeconds": "22",
},
ContentType: "text/plain",
},
{
EntryId: "2",
Event: []byte("event 2"),
Metadata: map[string]string{
"ttlInSeconds": "11",
},
ContentType: "text/plain",
},
}
t.Run("populate success", func(t *testing.T) {
res := NewBulkPublishResponse(messages, PublishSucceeded, nil)
assert.NotEmpty(t, res, "expected res to be populated")
assert.Equal(t, 2, len(res.Statuses), "expected two statuses")
expectedRes := BulkPublishResponse{
Statuses: []BulkPublishResponseEntry{
{
EntryId: "1",
Status: PublishSucceeded,
},
{
EntryId: "2",
Status: PublishSucceeded,
},
},
}
assert.ElementsMatch(t, expectedRes.Statuses, res.Statuses, "expected output to match")
})
t.Run("populate failure", func(t *testing.T) {
res := NewBulkPublishResponse(messages, PublishFailed, assert.AnError)
assert.NotEmpty(t, res, "expected res to be populated")
assert.Equal(t, 2, len(res.Statuses), "expected two statuses")
expectedRes := BulkPublishResponse{
Statuses: []BulkPublishResponseEntry{
{
EntryId: "1",
Status: PublishFailed,
Error: assert.AnError,
},
{
EntryId: "2",
Status: PublishFailed,
Error: assert.AnError,
},
},
}
assert.ElementsMatch(t, expectedRes.Statuses, res.Statuses, "expected output to match")
})
}

View File

@ -1,4 +1,6 @@
# Supported operation: publish, subscribe # Supported operation: publish, subscribe, multiplehandlers, bulkpublish, bulksubscribe
# bulkpublish should only be run for components that implement pubsub.BulkPublisher interface
# bulksubscribe should only be run for components that implement pubsub.BulkSubscriber interface
# Config map: # Config map:
## pubsubName : name of the pubsub ## pubsubName : name of the pubsub
## testTopicName: name of the test topic to use ## testTopicName: name of the test topic to use
@ -10,7 +12,7 @@
componentType: pubsub componentType: pubsub
components: components:
- component: azure.eventhubs - component: azure.eventhubs
allOperations: true operations: ["publish", "subscribe", "multiplehandlers", "bulkpublish"]
config: config:
pubsubName: azure-eventhubs pubsubName: azure-eventhubs
testTopicName: eventhubs-pubsub-topic testTopicName: eventhubs-pubsub-topic
@ -28,13 +30,13 @@ components:
testTopicName: dapr-conf-test testTopicName: dapr-conf-test
checkInOrderProcessing: false checkInOrderProcessing: false
- component: redis - component: redis
allOperations: true operations: ["publish", "subscribe", "multiplehandlers"]
config: config:
checkInOrderProcessing: false checkInOrderProcessing: false
- component: natsstreaming - component: natsstreaming
allOperations: true operations: ["publish", "subscribe", "multiplehandlers"]
- component: jetstream - component: jetstream
allOperations: true operations: ["publish", "subscribe", "multiplehandlers"]
- component: kafka - component: kafka
allOperations: true allOperations: true
- component: kafka - component: kafka
@ -42,27 +44,27 @@ components:
allOperations: true allOperations: true
- component: kafka - component: kafka
profile: confluent profile: confluent
allOperations: true
- component: pulsar
allOperations: true allOperations: true
- component: pulsar
operations: ["publish", "subscribe", "multiplehandlers"]
- component: mqtt - component: mqtt
profile: mosquitto profile: mosquitto
allOperations: true operations: ["publish", "subscribe", "multiplehandlers"]
- component: mqtt - component: mqtt
profile: emqx profile: emqx
allOperations: true operations: ["publish", "subscribe", "multiplehandlers"]
- component: mqtt - component: mqtt
profile: vernemq profile: vernemq
allOperations: true operations: ["publish", "subscribe", "multiplehandlers"]
- component: hazelcast - component: hazelcast
allOperations: true operations: ["publish", "subscribe", "multiplehandlers"]
- component: rabbitmq - component: rabbitmq
allOperations: true operations: ["publish", "subscribe", "multiplehandlers"]
config: config:
checkInOrderProcessing: false checkInOrderProcessing: false
- component: in-memory - component: in-memory
allOperations: true operations: ["publish", "subscribe", "multiplehandlers"]
- component: aws.snssqs - component: aws.snssqs
allOperations: true operations: ["publish", "subscribe", "multiplehandlers"]
config: config:
checkInOrderProcessing: false checkInOrderProcessing: false

View File

@ -38,22 +38,28 @@ import (
const ( const (
defaultPubsubName = "pubusub" defaultPubsubName = "pubusub"
defaultTopicName = "testTopic" defaultTopicName = "testTopic"
defaultTopicNameBulk = "testTopicBulk"
defaultMultiTopic1Name = "multiTopic1" defaultMultiTopic1Name = "multiTopic1"
defaultMultiTopic2Name = "multiTopic2" defaultMultiTopic2Name = "multiTopic2"
defaultMessageCount = 10 defaultMessageCount = 10
defaultMaxReadDuration = 60 * time.Second defaultMaxReadDuration = 60 * time.Second
defaultWaitDurationToPublish = 5 * time.Second defaultWaitDurationToPublish = 5 * time.Second
defaultCheckInOrderProcessing = true defaultCheckInOrderProcessing = true
defaultMaxBulkCount = 5
defaultMaxBulkAwaitDurationMs = 500
bulkSubStartingKey = 1000
) )
type TestConfig struct { type TestConfig struct {
utils.CommonConfig utils.CommonConfig
PubsubName string `mapstructure:"pubsubName"` PubsubName string `mapstructure:"pubsubName"`
TestTopicName string `mapstructure:"testTopicName"` TestTopicName string `mapstructure:"testTopicName"`
TestTopicForBulkSub string `mapstructure:"testTopicForBulkSub"`
TestMultiTopic1Name string `mapstructure:"testMultiTopic1Name"` TestMultiTopic1Name string `mapstructure:"testMultiTopic1Name"`
TestMultiTopic2Name string `mapstructure:"testMultiTopic2Name"` TestMultiTopic2Name string `mapstructure:"testMultiTopic2Name"`
PublishMetadata map[string]string `mapstructure:"publishMetadata"` PublishMetadata map[string]string `mapstructure:"publishMetadata"`
SubscribeMetadata map[string]string `mapstructure:"subscribeMetadata"` SubscribeMetadata map[string]string `mapstructure:"subscribeMetadata"`
BulkSubscribeMetadata map[string]string `mapstructure:"bulkSubscribeMetadata"`
MessageCount int `mapstructure:"messageCount"` MessageCount int `mapstructure:"messageCount"`
MaxReadDuration time.Duration `mapstructure:"maxReadDuration"` MaxReadDuration time.Duration `mapstructure:"maxReadDuration"`
WaitDurationToPublish time.Duration `mapstructure:"waitDurationToPublish"` WaitDurationToPublish time.Duration `mapstructure:"waitDurationToPublish"`
@ -78,7 +84,9 @@ func NewTestConfig(componentName string, allOperations bool, operations []string
WaitDurationToPublish: defaultWaitDurationToPublish, WaitDurationToPublish: defaultWaitDurationToPublish,
PublishMetadata: map[string]string{}, PublishMetadata: map[string]string{},
SubscribeMetadata: map[string]string{}, SubscribeMetadata: map[string]string{},
BulkSubscribeMetadata: map[string]string{},
CheckInOrderProcessing: defaultCheckInOrderProcessing, CheckInOrderProcessing: defaultCheckInOrderProcessing,
TestTopicForBulkSub: defaultTopicNameBulk,
} }
err := config.Decode(configMap, &tc) err := config.Decode(configMap, &tc)
@ -124,6 +132,11 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
dataPrefix := "message-" + runID + "-" dataPrefix := "message-" + runID + "-"
var outOfOrder bool var outOfOrder bool
ctx := context.Background() ctx := context.Background()
awaitingMessagesBulk := make(map[string]struct{}, 20)
processedMessagesBulk := make(map[int]struct{}, 20)
processedCBulk := make(chan string, config.MessageCount*2)
errorCountBulk := 0
var muBulk sync.Mutex
// Subscribe // Subscribe
if config.HasOperation("subscribe") { //nolint:nestif if config.HasOperation("subscribe") { //nolint:nestif
@ -201,6 +214,102 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
}) })
} }
// Bulk Subscribe
if config.HasOperation("bulksubscribe") { //nolint:nestif
t.Run("bulkSubscribe", func(t *testing.T) {
bS, ok := ps.(pubsub.BulkSubscriber)
if !ok {
t.Fatalf("cannot run bulkSubscribe conformance, BulkSubscriber interface not implemented by the component %s", config.ComponentName)
}
var counter int
var lastSequence int
config.BulkSubscribeMetadata[metadata.MaxBulkSubCountKey] = strconv.Itoa(defaultMaxBulkCount)
config.BulkSubscribeMetadata[metadata.MaxBulkSubAwaitDurationMsKey] = strconv.Itoa(defaultMaxBulkAwaitDurationMs)
err := bS.BulkSubscribe(ctx, pubsub.SubscribeRequest{
Topic: config.TestTopicForBulkSub,
Metadata: config.BulkSubscribeMetadata,
}, func(ctx context.Context, bulkMsg *pubsub.BulkMessage) ([]pubsub.BulkSubscribeResponseEntry, error) {
bulkResponses := make([]pubsub.BulkSubscribeResponseEntry, len(bulkMsg.Entries))
hasAnyError := false
for i, msg := range bulkMsg.Entries {
dataString := string(msg.Event)
if !strings.HasPrefix(dataString, dataPrefix) {
t.Logf("Ignoring message without expected prefix")
bulkResponses[i].EntryId = msg.EntryId
bulkResponses[i].Error = nil
continue
}
sequence, err := strconv.Atoi(dataString[len(dataPrefix):])
if err != nil {
t.Logf("Message did not contain a sequence number")
assert.Fail(t, "message did not contain a sequence number")
bulkResponses[i].EntryId = msg.EntryId
bulkResponses[i].Error = err
hasAnyError = true
continue
}
// Ignore already processed messages
// in case we receive a redelivery from the broker
// during retries.
muBulk.Lock()
_, alreadyProcessed := processedMessagesBulk[sequence]
muBulk.Unlock()
if alreadyProcessed {
t.Logf("Message was already processed: %d", sequence)
bulkResponses[i].EntryId = msg.EntryId
bulkResponses[i].Error = nil
continue
}
counter++
// Only consider order when we receive a message for the first time
// Messages that fail and are re-queued will naturally come out of order
if errorCountBulk == 0 {
if sequence < lastSequence {
outOfOrder = true
t.Logf("Message received out of order: expected sequence >= %d, got %d", lastSequence, sequence)
}
lastSequence = sequence
}
// This behavior is standard to repro a failure of one message in a batch.
if errorCountBulk < 2 || counter%5 == 0 {
// First message errors just to give time for more messages to pile up.
// Second error is to force an error in a batch.
errorCountBulk++
// Sleep to allow messages to pile up and be delivered as a batch.
time.Sleep(1 * time.Second)
t.Logf("Simulating subscriber error")
bulkResponses[i].EntryId = msg.EntryId
bulkResponses[i].Error = errors.Errorf("conf test simulated error")
hasAnyError = true
continue
}
t.Logf("Simulating subscriber success")
actualReadCount++
muBulk.Lock()
processedMessagesBulk[sequence] = struct{}{}
muBulk.Unlock()
processedCBulk <- dataString
bulkResponses[i].EntryId = msg.EntryId
bulkResponses[i].Error = nil
}
if hasAnyError {
return bulkResponses, errors.Errorf("Few messages errorred out")
}
return bulkResponses, nil
})
assert.NoError(t, err, "expected no error on bulk subscribe")
})
}
// Publish // Publish
if config.HasOperation("publish") { if config.HasOperation("publish") {
// Some pubsub, like Kafka need to wait for Subscriber to be up before messages can be consumed. // Some pubsub, like Kafka need to wait for Subscriber to be up before messages can be consumed.
@ -220,11 +329,81 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
} }
assert.NoError(t, err, "expected no error on publishing data %s on topic %s", data, config.TestTopicName) assert.NoError(t, err, "expected no error on publishing data %s on topic %s", data, config.TestTopicName)
} }
if config.HasOperation("bulksubscribe") {
_, ok := ps.(pubsub.BulkSubscriber)
if !ok {
t.Fatalf("cannot run bulkSubscribe conformance, BulkSubscriber interface not implemented by the component %s", config.ComponentName)
}
for k := bulkSubStartingKey; k <= (bulkSubStartingKey + config.MessageCount); k++ {
data := []byte(fmt.Sprintf("%s%d", dataPrefix, k))
err := ps.Publish(&pubsub.PublishRequest{
Data: data,
PubsubName: config.PubsubName,
Topic: config.TestTopicForBulkSub,
Metadata: config.PublishMetadata,
})
if err == nil {
awaitingMessagesBulk[string(data)] = struct{}{}
}
assert.NoError(t, err, "expected no error on publishing data %s on topic %s", data, config.TestTopicForBulkSub)
}
}
})
}
// assumes that publish operation is run only once for publishing config.MessageCount number of events
// bulkpublish needs to be run after publish operation
if config.HasOperation("bulkpublish") {
t.Run("bulkPublish", func(t *testing.T) {
bP, ok := ps.(pubsub.BulkPublisher)
if !ok {
t.Fatalf("cannot run bulkPublish conformance, BulkPublisher interface not implemented by the component %s", config.ComponentName)
}
// only run the test if BulkPublish is implemented
// Some pubsub, like Kafka need to wait for Subscriber to be up before messages can be consumed.
// So, wait for some time here.
time.Sleep(config.WaitDurationToPublish)
req := pubsub.BulkPublishRequest{
PubsubName: config.PubsubName,
Topic: config.TestTopicName,
Metadata: config.PublishMetadata,
Entries: make([]pubsub.BulkMessageEntry, config.MessageCount),
}
entryMap := map[string][]byte{}
// setting k to one value more than the previously published list of events.
// assuming that publish test is run only once and bulkPublish is run right after that
for i, k := 0, config.MessageCount+1; i < config.MessageCount; {
data := []byte(fmt.Sprintf("%s%d", dataPrefix, k))
strK := strconv.Itoa(k)
req.Entries[i].EntryId = strK
req.Entries[i].ContentType = "text/plain"
req.Entries[i].Metadata = config.PublishMetadata
req.Entries[i].Event = data
entryMap[strK] = data
t.Logf("Adding message with ID %d for bulk publish", k)
k++
i++
}
t.Logf("Calling Bulk Publish on component %s", config.ComponentName)
res, err := bP.BulkPublish(context.Background(), &req)
if err == nil {
for _, status := range res.Statuses {
if status.Status == pubsub.PublishSucceeded {
data := entryMap[status.EntryId]
t.Logf("adding to awaited messages %s", data)
awaitingMessages[string(data)] = struct{}{}
}
}
}
// here only the success case is tested for bulkPublish similar to publish.
// For scenarios on partial failures, those will be tested as part of certification tests if possible.
assert.NoError(t, err, "expected no error on bulk publishing on topic %s", config.TestTopicName)
}) })
} }
// Verify read // Verify read
if config.HasOperation("publish") && config.HasOperation("subscribe") { if (config.HasOperation("publish") || config.HasOperation("bulkpublish")) && config.HasOperation("subscribe") {
t.Run("verify read", func(t *testing.T) { t.Run("verify read", func(t *testing.T) {
t.Logf("waiting for %v to complete read", config.MaxReadDuration) t.Logf("waiting for %v to complete read", config.MaxReadDuration)
timeout := time.After(config.MaxReadDuration) timeout := time.After(config.MaxReadDuration)
@ -232,6 +411,7 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
for waiting { for waiting {
select { select {
case processed := <-processedC: case processed := <-processedC:
t.Logf("deleting %s processed message", processed)
delete(awaitingMessages, processed) delete(awaitingMessages, processed)
waiting = len(awaitingMessages) > 0 waiting = len(awaitingMessages) > 0
case <-timeout: case <-timeout:
@ -244,6 +424,31 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
}) })
} }
// Verify read on bulk subscription
if config.HasOperation("publish") && config.HasOperation("bulksubscribe") {
t.Run("verify read on bulk subscription", func(t *testing.T) {
_, ok := ps.(pubsub.BulkSubscriber)
if !ok {
t.Fatalf("cannot run bulkSubscribe conformance, BulkSubscriber interface not implemented by the component %s", config.ComponentName)
}
t.Logf("waiting for %v to complete read for bulk subscription", config.MaxReadDuration)
timeout := time.After(config.MaxReadDuration)
waiting := true
for waiting {
select {
case processed := <-processedCBulk:
delete(awaitingMessagesBulk, processed)
waiting = len(awaitingMessagesBulk) > 0
case <-timeout:
// Break out after the mamimum read duration has elapsed
waiting = false
}
}
assert.False(t, config.CheckInOrderProcessing && outOfOrder, "received messages out of order")
assert.Empty(t, awaitingMessagesBulk, "expected to read %v messages", config.MessageCount)
})
}
// Multiple handlers // Multiple handlers
if config.HasOperation("multiplehandlers") { if config.HasOperation("multiplehandlers") {
received1Ch := make(chan string) received1Ch := make(chan string)
@ -309,7 +514,7 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
close(sent1Ch) close(sent1Ch)
sent1Ch = nil sent1Ch = nil
time.Sleep(config.WaitDurationToPublish) time.Sleep(config.WaitDurationToPublish)
case 2: // On iteration 1, close the second subscriber case 2: // On iteration 2, close the second subscriber
subscribe2Cancel() subscribe2Cancel()
close(sent2Ch) close(sent2Ch)
sent2Ch = nil sent2Ch = nil