Receiving events

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
ItalyPaleAle 2023-01-19 21:36:20 +00:00
parent 093669e8a8
commit 445f2e4940
3 changed files with 155 additions and 5 deletions

View File

@ -25,6 +25,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
azauth "github.com/dapr/components-contrib/internal/authentication/azure"
"github.com/dapr/components-contrib/internal/utils"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/components-contrib/pubsub/azure/eventhubs/conn"
"github.com/dapr/kit/logger"
@ -168,7 +169,7 @@ func (aeh *AzureEventHubs) BulkPublish(ctx context.Context, req *pubsub.BulkPubl
}
// Subscribe receives data from Azure Event Hubs.
func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) (err error) {
if aeh.metadata.ConsumerGroup == "" {
return errors.New("property consumerID is required to subscribe to an Event Hub topic")
}
@ -176,12 +177,17 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, req pubsub.Su
return errors.New("parameter 'topic' is required")
}
// Check if requireAllProperties is set and is truthy
getAllProperties := utils.IsTruthy(req.Metadata["requireAllProperties"])
// Get the processor client
processor, err := aeh.getProcessorForTopic(subscribeCtx, req.Topic)
if err != nil {
return fmt.Errorf("error trying to establish a connection: %w", err)
}
eventHandler := subscribeHandler(subscribeCtx, req.Topic, getAllProperties, handler)
// Process all partition clients as they come in
go func() {
for {
@ -194,7 +200,7 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, req pubsub.Su
// Once we get a partition client, process the events in a separate goroutine
go func() {
processErr := aeh.processEvents(subscribeCtx, partitionClient)
processErr := aeh.processEvents(subscribeCtx, partitionClient, eventHandler)
if processErr != nil {
aeh.logger.Errorf("Error processing events from partition client: %v", processErr)
}
@ -214,7 +220,7 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, req pubsub.Su
return nil
}
func (aeh *AzureEventHubs) processEvents(subscribeCtx context.Context, partitionClient *azeventhubs.ProcessorPartitionClient) error {
func (aeh *AzureEventHubs) processEvents(subscribeCtx context.Context, partitionClient *azeventhubs.ProcessorPartitionClient, eventHandler func(e *azeventhubs.ReceivedEventData) error) error {
// At the end of the method we need to do some cleanup and close the partition client
defer func() {
closeCtx, closeCancel := context.WithTimeout(context.Background(), resourceGetTimeout)
@ -257,8 +263,10 @@ func (aeh *AzureEventHubs) processEvents(subscribeCtx context.Context, partition
if len(events) != 0 {
for _, event := range events {
// process the event in some way
fmt.Printf("Event received with body %v\n", event.Body)
// TODO: Handle errors
// Maybe consider exiting this method (without updating the checkpoint) so another app will re-try it?
err := eventHandler(event)
fmt.Printf("Event processed - err: %v", err)
}
// Update the checkpoint with the last event received. If we lose ownership of this partition or have to restart the next owner will start from this point.

View File

@ -0,0 +1,108 @@
package eventhubs
import (
"context"
"strconv"
"sync"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/spf13/cast"
"golang.org/x/exp/maps"
"github.com/dapr/components-contrib/pubsub"
)
const (
// Event Hubs SystemProperties names for metadata passthrough.
sysPropSequenceNumber = "x-opt-sequence-number"
sysPropEnqueuedTime = "x-opt-enqueued-time"
sysPropOffset = "x-opt-offset"
sysPropPartitionID = "x-opt-partition-id"
sysPropPartitionKey = "x-opt-partition-key"
sysPropIotHubDeviceConnectionID = "iothub-connection-device-id"
sysPropIotHubAuthGenerationID = "iothub-connection-auth-generation-id"
sysPropIotHubConnectionAuthMethod = "iothub-connection-auth-method"
sysPropIotHubConnectionModuleID = "iothub-connection-module-id"
sysPropIotHubEnqueuedTime = "iothub-enqueuedtime"
sysPropMessageID = "message-id"
)
// Pool of map[string]string that we can use to optimize memory usage
var metadataPool = sync.Pool{
New: func() any {
// Initial capacity is 5 which is the number of properties we normally find in a message
// The map can expand as needed, and when it's added back to the pool, we'll keep the larger size
m := make(map[string]string, 5)
return &m
},
}
func subscribeHandler(ctx context.Context, topic string, getAllProperties bool, handler pubsub.Handler) func(e *azeventhubs.ReceivedEventData) error {
return func(e *azeventhubs.ReceivedEventData) error {
md := metadataPool.Get().(*map[string]string)
maps.Clear(*md)
defer metadataPool.Put(md)
res := pubsub.NewMessage{
Data: e.Body,
Topic: topic,
Metadata: *md,
}
res.Metadata[sysPropSequenceNumber] = strconv.FormatInt(e.SequenceNumber, 10)
if e.EnqueuedTime != nil {
res.Metadata[sysPropEnqueuedTime] = e.EnqueuedTime.Format(time.RFC3339)
}
if e.Offset != nil {
res.Metadata[sysPropOffset] = strconv.FormatInt(*e.Offset, 10)
}
if e.PartitionKey != nil {
res.Metadata[sysPropPartitionKey] = *e.PartitionKey
}
if e.MessageID != nil && *e.MessageID != "" {
res.Metadata[sysPropMessageID] = *e.MessageID
}
// Iterate through the system properties looking for those coming from IoT Hub
for k, v := range e.SystemProperties {
switch k {
// The following metadata properties are only present if event was generated by Azure IoT Hub.
case sysPropIotHubDeviceConnectionID,
sysPropIotHubAuthGenerationID,
sysPropIotHubConnectionAuthMethod,
sysPropIotHubConnectionModuleID,
sysPropIotHubEnqueuedTime:
addPropertyToMetadata(k, v, res.Metadata)
default:
// nop
}
}
// Added properties if any (includes application properties from Azure IoT Hub)
if getAllProperties && len(e.Properties) > 0 {
for k, v := range e.Properties {
addPropertyToMetadata(k, v, res.Metadata)
}
}
return handler(ctx, &res)
}
}
// Adds a property to the response metadata
func addPropertyToMetadata(key string, value any, md map[string]string) {
switch v := value.(type) {
case *time.Time:
if v != nil {
md[key] = v.Format(time.RFC3339)
}
case time.Time:
md[key] = v.Format(time.RFC3339)
default:
str, err := cast.ToStringE(value)
if err == nil {
md[key] = str
}
}
}

View File

@ -13,6 +13,12 @@ limitations under the License.
package pubsub
import (
"encoding/json"
"fmt"
"strings"
)
// PublishRequest is the request to publish a message.
type PublishRequest struct {
Data []byte `json:"data"`
@ -45,6 +51,16 @@ type NewMessage struct {
ContentType *string `json:"contentType,omitempty"`
}
// String implements fmt.Stringer and it's useful for debugging.
func (m NewMessage) String() string {
ct := "(nil)"
if m.ContentType != nil {
ct = *m.ContentType
}
md, _ := json.Marshal(m.Metadata)
return fmt.Sprintf("[NewMessage] topic='%s' data='%s' content-type='%s' metadata=%s", m.Topic, string(m.Data), ct, md)
}
// BulkMessage represents bulk message arriving from a message bus instance.
type BulkMessage struct {
Entries []BulkMessageEntry `json:"entries"`
@ -52,6 +68,18 @@ type BulkMessage struct {
Metadata map[string]string `json:"metadata"`
}
// String implements fmt.Stringer and it's useful for debugging.
func (m BulkMessage) String() string {
md, _ := json.Marshal(m.Metadata)
b := strings.Builder{}
b.WriteString(fmt.Sprintf("[BulkMessage] topic='%s' metadata=%s entries=%d", m.Topic, md, len(m.Entries)))
for i, e := range m.Entries {
b.WriteString(fmt.Sprintf("\n%d: ", i))
b.WriteString(e.String())
}
return b.String()
}
// BulkMessageEntry represents a single message inside a bulk request.
type BulkMessageEntry struct {
EntryId string `json:"entryId"` //nolint:stylecheck
@ -60,6 +88,12 @@ type BulkMessageEntry struct {
Metadata map[string]string `json:"metadata"`
}
// String implements fmt.Stringer and it's useful for debugging.
func (m BulkMessageEntry) String() string {
md, _ := json.Marshal(m.Metadata)
return fmt.Sprintf("[BulkMessageEntry] entryId='%s' data='%s' content-type='%s' metadata=%s", m.EntryId, string(m.Event), m.ContentType, md)
}
// BulkSubscribeConfig represents the configuration for bulk subscribe.
// It depends on specific componets to support these.
type BulkSubscribeConfig struct {