Updating eventshub.go, to ensure receiving custom properties from IoT Hub

Currently, we can receive only the system properties from iot hub. Done changes should allow the retrieval of all custom properties.
It is based on metadata key : requireAllProperties.

By default it is false, if true , looks into the , Properties of Event struct to retrieve the custom properties.

Signed-off-by: mecoding1 <118708378+mecoding1@users.noreply.github.com>
This commit is contained in:
mecoding1 2022-11-21 10:16:23 +05:30
parent 0bbc8c42d6
commit a669c4733b
1 changed files with 25 additions and 4 deletions

View File

@ -73,6 +73,9 @@ const (
sysPropIotHubEnqueuedTime = "iothub-enqueuedtime" sysPropIotHubEnqueuedTime = "iothub-enqueuedtime"
sysPropMessageID = "message-id" sysPropMessageID = "message-id"
// Metadata field to ensure all Event Hub properties pass through
requireAllProperties = "requireAllProperties"
defaultMessageRetentionInDays = 1 defaultMessageRetentionInDays = 1
defaultPartitionCount = 1 defaultPartitionCount = 1
@ -86,7 +89,7 @@ const (
maxPartitionCount = int32(1024) maxPartitionCount = int32(1024)
) )
func subscribeHandler(ctx context.Context, topic string, e *eventhub.Event, handler pubsub.Handler) error { func subscribeHandler(ctx context.Context, topic string, getAllProperties bool , e *eventhub.Event, handler pubsub.Handler) error {
res := pubsub.NewMessage{Data: e.Data, Topic: topic, Metadata: map[string]string{}} res := pubsub.NewMessage{Data: e.Data, Topic: topic, Metadata: map[string]string{}}
if e.SystemProperties.SequenceNumber != nil { if e.SystemProperties.SequenceNumber != nil {
res.Metadata[sysPropSequenceNumber] = strconv.FormatInt(*e.SystemProperties.SequenceNumber, 10) res.Metadata[sysPropSequenceNumber] = strconv.FormatInt(*e.SystemProperties.SequenceNumber, 10)
@ -124,6 +127,16 @@ func subscribeHandler(ctx context.Context, topic string, e *eventhub.Event, hand
if e.ID != "" { if e.ID != "" {
res.Metadata[sysPropMessageID] = e.ID res.Metadata[sysPropMessageID] = e.ID
} }
// added properties if any ( includes application properties from iot-hub)
if getAllProperties{
if e.Properties != nil && len(e.Properties) > 0{
for key, value := range e.Properties {
if str, ok := value.(string); ok {
res.Metadata[key] = str
}
}
}
}
return handler(ctx, &res) return handler(ctx, &res)
} }
@ -626,16 +639,24 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, req pubsub.Su
return err return err
} }
var getAllProperties bool = false
if req.Metadata[requireAllProperties] != ""{
getAllProperties, err = strconv.ParseBool(req.Metadata[requireAllProperties])
if err!=nil{
aeh.logger.Errorf("invalid value for metadata : %s . Error: %v.", requireAllProperties, err)
}
}
aeh.logger.Debugf("registering handler for topic %s", req.Topic) aeh.logger.Debugf("registering handler for topic %s", req.Topic)
_, err = processor.RegisterHandler(subscribeCtx, _, err = processor.RegisterHandler(subscribeCtx,
func(_ context.Context, e *eventhub.Event) error { func(_ context.Context, e *eventhub.Event) error {
// This component has built-in retries because Event Hubs doesn't support N/ACK for messages // This component has built-in retries because Event Hubs doesn't support N/ACK for messages
b := aeh.backOffConfig.NewBackOffWithContext(subscribeCtx) b := aeh.backOffConfig.NewBackOffWithContext(subscribeCtx)
retryerr := retry.NotifyRecover(func() error { retryerr := retry.NotifyRecover(func() error {
aeh.logger.Debugf("Processing EventHubs event %s/%s", req.Topic, e.ID) aeh.logger.Debugf("Processing EventHubs event %s/%s", req.Topic, e.ID)
return subscribeHandler(subscribeCtx, req.Topic, e, handler) return subscribeHandler(subscribeCtx, req.Topic, getAllProperties, e, handler)
}, b, func(_ error, _ time.Duration) { }, b, func(_ error, _ time.Duration) {
aeh.logger.Warnf("Error processing EventHubs event: %s/%s. Retrying...", req.Topic, e.ID) aeh.logger.Warnf("Error processing EventHubs event: %s/%s. Retrying...", req.Topic, e.ID)
}, func() { }, func() {