diff --git a/pubsub/jetstream/jetstream.go b/pubsub/jetstream/jetstream.go index 970329ac8..5e5c5288f 100644 --- a/pubsub/jetstream/jetstream.go +++ b/pubsub/jetstream/jetstream.go @@ -88,8 +88,26 @@ func (js *jetstreamPubSub) Features() []pubsub.Feature { } func (js *jetstreamPubSub) Publish(req *pubsub.PublishRequest) error { - js.l.Debugf("Publishing topic %v with data: %v", req.Topic, req.Data) - _, err := js.jsc.Publish(req.Topic, req.Data) + var opts []nats.PubOpt + var msgID string + + event, err := pubsub.FromCloudEvent(req.Data, "", "", "", "") + if err != nil { + js.l.Debugf("error unmarshalling cloudevent: %v", err) + } else { + // Use the cloudevent id as the Nats-MsgId for deduplication + if id, ok := event["id"].(string); ok { + msgID = id + opts = append(opts, nats.MsgId(msgID)) + } + } + + if msgID == "" { + js.l.Warn("empty message ID, Jetstream deduplication will not be possible") + } + + js.l.Debugf("Publishing to topic %v id: %s", req.Topic, msgID) + _, err = js.jsc.Publish(req.Topic, req.Data, opts...) return err }