use rabbitmq message's header values as metadata values in the binding (#3031)
Signed-off-by: Ohlicher Robert <r.ohlicher@palfinger.com> Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> Co-authored-by: Ohlicher Robert <r.ohlicher@palfinger.com> Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
parent
60322a1f1c
commit
566c7fd31a
|
@ -21,6 +21,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
@ -464,8 +465,20 @@ func (r *RabbitMQ) handleMessage(ctx context.Context, handler bindings.Handler,
|
|||
r.logger.Info("Input binding channel closed")
|
||||
return
|
||||
}
|
||||
|
||||
metadata := make(map[string]string, len(d.Headers))
|
||||
// Passthrough any custom metadata to the handler.
|
||||
for k, v := range d.Headers {
|
||||
if s, ok := v.(string); ok {
|
||||
// Escape the key and value to ensure they are valid URL query parameters.
|
||||
// This is necessary for them to be sent as HTTP Metadata.
|
||||
metadata[url.QueryEscape(k)] = url.QueryEscape(s)
|
||||
}
|
||||
}
|
||||
|
||||
_, err := handler(ctx, &bindings.ReadResponse{
|
||||
Data: d.Body,
|
||||
Data: d.Body,
|
||||
Metadata: metadata,
|
||||
})
|
||||
if err != nil {
|
||||
ch.Nack(d.DeliveryTag, false, true)
|
||||
|
|
Loading…
Reference in New Issue