pubsub jetstream: set Nats-Msg-Id for message deduplication (#2200) (#2202)

Signed-off-by: NickLarsenNZ <nick@aptiv.co.nz>

Signed-off-by: NickLarsenNZ <nick@aptiv.co.nz>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
Co-authored-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>
This commit is contained in:
Nick 2022-11-09 04:49:34 +01:00 committed by GitHub
parent 8256ac0a0a
commit ce87730a73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 20 additions and 2 deletions

View File

@ -88,8 +88,26 @@ func (js *jetstreamPubSub) Features() []pubsub.Feature {
}
func (js *jetstreamPubSub) Publish(req *pubsub.PublishRequest) error {
js.l.Debugf("Publishing topic %v with data: %v", req.Topic, req.Data)
_, err := js.jsc.Publish(req.Topic, req.Data)
var opts []nats.PubOpt
var msgID string
event, err := pubsub.FromCloudEvent(req.Data, "", "", "", "")
if err != nil {
js.l.Debugf("error unmarshalling cloudevent: %v", err)
} else {
// Use the cloudevent id as the Nats-MsgId for deduplication
if id, ok := event["id"].(string); ok {
msgID = id
opts = append(opts, nats.MsgId(msgID))
}
}
if msgID == "" {
js.l.Warn("empty message ID, Jetstream deduplication will not be possible")
}
js.l.Debugf("Publishing to topic %v id: %s", req.Topic, msgID)
_, err = js.jsc.Publish(req.Topic, req.Data, opts...)
return err
}