Add metadata for dequeued message in bindings.azure.storagequeues (#3028)
Signed-off-by: Yash Nisar <yashnisar@microsoft.com>
This commit is contained in:
parent
c43af14d31
commit
25656c175d
|
|
@ -38,6 +38,12 @@ const (
|
|||
defaultTTL = 10 * time.Minute
|
||||
defaultVisibilityTimeout = 30 * time.Second
|
||||
defaultPollingInterval = 10 * time.Second
|
||||
dequeueCount = "dequeueCount"
|
||||
insertionTime = "insertionTime"
|
||||
expirationTime = "expirationTime"
|
||||
nextVisibleTime = "nextVisibleTime"
|
||||
popReceipt = "popReceipt"
|
||||
messageID = "messageID"
|
||||
)
|
||||
|
||||
type consumer struct {
|
||||
|
|
@ -177,9 +183,30 @@ func (d *AzureQueueHelper) Read(ctx context.Context, consumer *consumer) error {
|
|||
}
|
||||
}
|
||||
|
||||
metadata := make(map[string]string, 6)
|
||||
|
||||
if res.Messages[0].MessageID != nil {
|
||||
metadata[messageID] = *res.Messages[0].MessageID
|
||||
}
|
||||
if res.Messages[0].PopReceipt != nil {
|
||||
metadata[popReceipt] = *res.Messages[0].PopReceipt
|
||||
}
|
||||
if res.Messages[0].InsertionTime != nil {
|
||||
metadata[insertionTime] = res.Messages[0].InsertionTime.Format(time.RFC3339)
|
||||
}
|
||||
if res.Messages[0].ExpirationTime != nil {
|
||||
metadata[expirationTime] = res.Messages[0].ExpirationTime.Format(time.RFC3339)
|
||||
}
|
||||
if res.Messages[0].TimeNextVisible != nil {
|
||||
metadata[nextVisibleTime] = res.Messages[0].TimeNextVisible.Format(time.RFC3339)
|
||||
}
|
||||
if res.Messages[0].DequeueCount != nil {
|
||||
metadata[dequeueCount] = strconv.FormatInt(*res.Messages[0].DequeueCount, 10)
|
||||
}
|
||||
|
||||
_, err = consumer.callback(ctx, &bindings.ReadResponse{
|
||||
Data: data,
|
||||
Metadata: map[string]string{},
|
||||
Metadata: metadata,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
|||
Loading…
Reference in New Issue