Enable eventhubs binding to read all message properties (#3615)

Signed-off-by: yaron2 <schneider.yaron@live.com>
This commit is contained in:
Yaron Schneider 2024-11-26 11:50:50 -08:00 committed by GitHub
parent 913ba4ce6f
commit 85cbbf123a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 32 additions and 1 deletions

View File

@ -161,3 +161,12 @@ metadata:
description: |
Storage container name.
example: '"myeventhubstoragecontainer"'
- name: getAllMessageProperties
required: false
default: "false"
example: "false"
binding:
input: true
output: false
description: |
When set to true, will retrieve all message properties and include them in the returned event metadata

View File

@ -127,6 +127,11 @@ func (aeh *AzureEventHubs) EventHubName() string {
return aeh.metadata.hubName
}
// GetAllMessageProperties returns a boolean to indicate whether to return all properties for an event hubs message.
func (aeh *AzureEventHubs) GetAllMessageProperties() bool {
return aeh.metadata.GetAllMessageProperties
}
// Publish a batch of messages.
func (aeh *AzureEventHubs) Publish(ctx context.Context, topic string, messages []*azeventhubs.EventData, batchOpts *azeventhubs.EventDataBatchOptions) error {
// Get the producer client
@ -165,7 +170,7 @@ func (aeh *AzureEventHubs) GetBindingsHandlerFunc(topic string, getAllProperties
return nil, fmt.Errorf("expected 1 message, got %d", len(messages))
}
bindingsMsg, err := NewBindingsReadResponseFromEventData(messages[0], topic, getAllProperties)
bindingsMsg, err := NewBindingsReadResponseFromEventData(messages[0], topic, aeh.GetAllMessageProperties())
if err != nil {
return nil, fmt.Errorf("failed to get bindings read response from azure eventhubs message: %w", err)
}

View File

@ -39,6 +39,7 @@ type AzureEventHubsMetadata struct {
SubscriptionID string `json:"subscriptionID" mapstructure:"subscriptionID"`
ResourceGroupName string `json:"resourceGroupName" mapstructure:"resourceGroupName"`
EnableInOrderMessageDelivery bool `json:"enableInOrderMessageDelivery,string" mapstructure:"enableInOrderMessageDelivery"`
GetAllMessageProperties bool `json:"getAllMessageProperties,string" mapstructure:"getAllMessageProperties"`
// Binding only
EventHub string `json:"eventHub" mapstructure:"eventHub" mdonly:"bindings"`

View File

@ -130,6 +130,10 @@ func (aeh *AzureEventHubs) Subscribe(ctx context.Context, req pubsub.SubscribeRe
// Check if requireAllProperties is set and is truthy
getAllProperties := utils.IsTruthy(req.Metadata["requireAllProperties"])
if !getAllProperties {
getAllProperties = aeh.GetAllMessageProperties()
}
checkPointFrequencyPerPartition := commonutils.GetIntValFromString(req.Metadata["checkPointFrequencyPerPartition"], impl.DefaultCheckpointFrequencyPerPartition)
pubsubHandler := aeh.GetPubSubHandlerFunc(topic, getAllProperties, handler)
@ -155,6 +159,9 @@ func (aeh *AzureEventHubs) BulkSubscribe(ctx context.Context, req pubsub.Subscri
// Check if requireAllProperties is set and is truthy
getAllProperties := utils.IsTruthy(req.Metadata["requireAllProperties"])
if !getAllProperties {
getAllProperties = aeh.GetAllMessageProperties()
}
checkPointFrequencyPerPartition := commonutils.GetIntValFromString(req.Metadata["checkPointFrequencyPerPartition"], impl.DefaultCheckpointFrequencyPerPartition)
maxBulkSubCount := commonutils.GetIntValOrDefault(req.BulkSubscribeConfig.MaxMessagesCount, impl.DefaultMaxBulkSubCount)
maxBulkSubAwaitDurationMs := commonutils.GetIntValOrDefault(req.BulkSubscribeConfig.MaxAwaitDurationMs, impl.DefaultMaxBulkSubAwaitDurationMs)

View File

@ -110,3 +110,12 @@ metadata:
description: |
The name of the Event Hubs Consumer Group to listen on.
example: '"group1"'
- name: getAllMessageProperties
required: false
default: "false"
example: "false"
binding:
input: true
output: false
description: |
When set to true, will retrieve all message properties and include them in the returned event metadata