From 47a46673cda3a3523ef66a24a2b1a0ad67f2191d Mon Sep 17 00:00:00 2001 From: Bernd Verst <4535280+berndverst@users.noreply.github.com> Date: Thu, 27 Oct 2022 14:08:37 -0700 Subject: [PATCH] PubSub: Fall back to 'consumerID' when group not present Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> --- pubsub/aws/snssqs/metadata.go | 2 ++ pubsub/jetstream/metadata.go | 6 +++++- pubsub/metadata.go | 6 ++++++ pubsub/rocketmq/metadata.go | 4 ++++ 4 files changed, 17 insertions(+), 1 deletion(-) diff --git a/pubsub/aws/snssqs/metadata.go b/pubsub/aws/snssqs/metadata.go index cbfd7d3ac..739ab58f9 100644 --- a/pubsub/aws/snssqs/metadata.go +++ b/pubsub/aws/snssqs/metadata.go @@ -281,6 +281,8 @@ func (md *snsSqsMetadata) setFifoConfig(props map[string]string) error { // for more details, see: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html if val, ok := props["fifoMessageGroupID"]; ok { md.fifoMessageGroupID = val + } else { + md.fifoMessageGroupID = props[pubsub.RuntimeConsumerIDKey] } return nil diff --git a/pubsub/jetstream/metadata.go b/pubsub/jetstream/metadata.go index 8bc1f737c..7f8203d24 100644 --- a/pubsub/jetstream/metadata.go +++ b/pubsub/jetstream/metadata.go @@ -84,7 +84,11 @@ func parseMetadata(psm pubsub.Metadata) (metadata, error) { } m.durableName = psm.Properties["durableName"] - m.queueGroupName = psm.Properties["queueGroupName"] + if val, ok := psm.Properties["queueGroupName"]; ok && val != "" { + m.queueGroupName = val + } else { + m.queueGroupName = psm.Properties[pubsub.RuntimeConsumerIDKey] + } if v, err := strconv.ParseUint(psm.Properties["startSequence"], 10, 64); err == nil { m.startSequence = v diff --git a/pubsub/metadata.go b/pubsub/metadata.go index 134284270..0da89baf9 100644 --- a/pubsub/metadata.go +++ b/pubsub/metadata.go @@ -19,3 +19,9 @@ import "github.com/dapr/components-contrib/metadata" type Metadata struct { metadata.Base `json:",inline"` } + +// When the Dapr component does not explictly specify a consumer group, +// this value provided by the runtime must be used. This value is specific to each Dapr App. +// As a result, by default, each Dapr App will receive all messages published to the topic at least once. +// See https://github.com/dapr/dapr/blob/21566de8d7fdc7d43ae627ffc0698cc073fa71b0/pkg/runtime/runtime.go#L1735-L1739 +const RuntimeConsumerIDKey = "consumerID" diff --git a/pubsub/rocketmq/metadata.go b/pubsub/rocketmq/metadata.go index bfb5b6700..2ee9547f3 100644 --- a/pubsub/rocketmq/metadata.go +++ b/pubsub/rocketmq/metadata.go @@ -98,6 +98,10 @@ func parseRocketMQMetaData(metadata pubsub.Metadata, logger logger.Logger) (*roc logger.Warn("pubsub.rocketmq: metadata property 'groupName' has been deprecated - use 'producerGroup' instead. See: https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-rocketmq/") } + if rMetaData.ProducerGroup == "" { + rMetaData.ProducerGroup = metadata.Properties[pubsub.RuntimeConsumerIDKey] + } + if rMetaData.SendTimeOut != 0 { logger.Warn("pubsub.rocketmq: metadata property 'sendTimeOut' has been deprecated - use 'sendTimeOutSec' instead. See: https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-rocketmq/") if rMetaData.SendTimeOutSec == 0 {