PubSub: Fall back to 'consumerID' when group not present

Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>
This commit is contained in:
Bernd Verst 2022-10-27 14:08:37 -07:00
parent 99b635eb20
commit 47a46673cd
4 changed files with 17 additions and 1 deletions

View File

@ -281,6 +281,8 @@ func (md *snsSqsMetadata) setFifoConfig(props map[string]string) error {
// for more details, see: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html
if val, ok := props["fifoMessageGroupID"]; ok {
md.fifoMessageGroupID = val
} else {
md.fifoMessageGroupID = props[pubsub.RuntimeConsumerIDKey]
}
return nil

View File

@ -84,7 +84,11 @@ func parseMetadata(psm pubsub.Metadata) (metadata, error) {
}
m.durableName = psm.Properties["durableName"]
m.queueGroupName = psm.Properties["queueGroupName"]
if val, ok := psm.Properties["queueGroupName"]; ok && val != "" {
m.queueGroupName = val
} else {
m.queueGroupName = psm.Properties[pubsub.RuntimeConsumerIDKey]
}
if v, err := strconv.ParseUint(psm.Properties["startSequence"], 10, 64); err == nil {
m.startSequence = v

View File

@ -19,3 +19,9 @@ import "github.com/dapr/components-contrib/metadata"
type Metadata struct {
metadata.Base `json:",inline"`
}
// When the Dapr component does not explictly specify a consumer group,
// this value provided by the runtime must be used. This value is specific to each Dapr App.
// As a result, by default, each Dapr App will receive all messages published to the topic at least once.
// See https://github.com/dapr/dapr/blob/21566de8d7fdc7d43ae627ffc0698cc073fa71b0/pkg/runtime/runtime.go#L1735-L1739
const RuntimeConsumerIDKey = "consumerID"

View File

@ -98,6 +98,10 @@ func parseRocketMQMetaData(metadata pubsub.Metadata, logger logger.Logger) (*roc
logger.Warn("pubsub.rocketmq: metadata property 'groupName' has been deprecated - use 'producerGroup' instead. See: https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-rocketmq/")
}
if rMetaData.ProducerGroup == "" {
rMetaData.ProducerGroup = metadata.Properties[pubsub.RuntimeConsumerIDKey]
}
if rMetaData.SendTimeOut != 0 {
logger.Warn("pubsub.rocketmq: metadata property 'sendTimeOut' has been deprecated - use 'sendTimeOutSec' instead. See: https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-rocketmq/")
if rMetaData.SendTimeOutSec == 0 {