components-contrib/pubsub
Bruce Huang a6ad8cb4bf
RocketMQ fix metadata properties (#1817)
* Removes the useless metadata attributes, AccessProto and ConsumerBatchSize, rename GroupName to ProducerGroup

Signed-off-by: helbing <helbingxxx@gmail.com>

* RocketMQ SendTimeout unit is seconds

Signed-off-by: helbing <helbingxxx@gmail.com>

* avoid breaking change

Signed-off-by: helbing <helbingxxx@gmail.com>

* implement ConsumerBatchSize option

Signed-off-by: helbing <helbingxxx@gmail.com>

* avoid breaking change

Signed-off-by: helbing <helbingxxx@gmail.com>

* add deprecated log

Signed-off-by: helbing <helbingxxx@gmail.com>

* backwards-compatibility sendTimeOut

Signed-off-by: helbing <helbingxxx@gmail.com>

* fix lint

Signed-off-by: helbing <helbingxxx@gmail.com>

* add logger import

Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>

Signed-off-by: helbing <helbingxxx@gmail.com>
Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>
Co-authored-by: addjuarez <addiajuarez@gmail.com>
Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
Co-authored-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>
2022-09-14 14:45:47 -07:00
..
aws/snssqs refactored aws sqs policy inserting (#1807) 2022-09-13 17:11:58 -07:00
azure Update log level for successful retry (#2066) 2022-09-13 15:57:47 -07:00
gcp/pubsub Add contexts to pubsub.Subscribe to allow early cancelation (#1756) 2022-06-02 15:06:36 -07:00
hazelcast Define common metadata across components (#1994) 2022-08-25 10:10:53 -07:00
in-memory pubsub.in-memory: add support for wildcard topics (#1966) 2022-08-12 17:13:47 -07:00
jetstream Define common metadata across components (#1994) 2022-08-25 10:10:53 -07:00
kafka Add contexts to pubsub.Subscribe to allow early cancelation (#1756) 2022-06-02 15:06:36 -07:00
mqtt Define common metadata across components (#1994) 2022-08-25 10:10:53 -07:00
natsstreaming Define common metadata across components (#1994) 2022-08-25 10:10:53 -07:00
pulsar Pulsar: do not disconnect in case of error processing message 2022-06-27 21:29:54 +00:00
rabbitmq Define common metadata across components (#1994) 2022-08-25 10:10:53 -07:00
redis Merge branch 'master' into redis-pubsub-fix 2022-08-29 07:00:16 -07:00
rocketmq RocketMQ fix metadata properties (#1817) 2022-09-14 14:45:47 -07:00
README.md middleware: changes wasm basic to use waPC (#1833) 2022-09-13 17:12:59 -07:00
concurrency.go update license to Apache v2.0 (#1406) 2022-01-04 19:53:31 -08:00
concurrency_test.go update license to Apache v2.0 (#1406) 2022-01-04 19:53:31 -08:00
envelope.go Go 1.19 support and linter fixes (#1975) 2022-08-18 00:45:23 -07:00
envelope_test.go fix/typo: Change traceid to traceparent (#1604) 2022-03-21 10:21:39 -07:00
feature.go Added pubsub.FeatureSubscribeWildcards capability (#1887) 2022-07-18 18:04:59 -07:00
metadata.go Define common metadata across components (#1994) 2022-08-25 10:10:53 -07:00
pubsub.go Go 1.19 support and linter fixes (#1975) 2022-08-18 00:45:23 -07:00
requests.go Added ContentType to pubsub/binding/state request-response (#1376) 2022-01-28 10:17:04 -08:00
responses.go update license to Apache v2.0 (#1406) 2022-01-04 19:53:31 -08:00

README.md

Pub Sub

Pub Sub components provide a common way to interact with different message bus implementations to achieve reliable, high-scale scenarios based on event-driven async communications, while allowing users to opt-in to advanced capabilities using defined metadata.

Implementing a new Pub Sub

A compliant pub sub needs to implement the PubSub inteface included in the pubsub.go file.

Message TTL (or Time To Live)

Message Time to live is implemented by default in Dapr. A publishing application can set the expiration of individual messages by publishing it with the ttlInSeconds metadata. Components that support message TTL should parse this metadata attribute. For components that do not implement this feature in Dapr, the runtime will automatically populate the expiration attribute in the CloudEvent object if ttlInSeconds is present - in this case, Dapr will expire the message when a Dapr subscriber is about to consume an expired message. The expiration attribute is handled by Dapr runtime as a convenience to subscribers, dropping expired messages without invoking subscribers' endpoint. Subscriber applications that don't use Dapr, need to handle this attribute and implement the expiration logic.

If the pub sub component implementation can handle message TTL natively without relying on Dapr, consume the ttlInSeconds metadata in the component implementation for the Publish function. Also, implement the Features() function so the Dapr runtime knows that it should not add the expiration attribute to events.

Example:

import contribMetadata "github.com/dapr/components-contrib/metadata"

//...

func (c *MyComponent) Publish(req *pubsub.PublishRequest) error {
	//...
	ttl, hasTTL, _ := contribMetadata.TryGetTTL(req.Metadata)
	if hasTTL {
		//... handle ttl for component.
	}
	//...
	return nil
}

func (c *MyComponent) Features() []pubsub.Feature {
	// Tip: cache this list into a private property.
	// Simply return nil if component does not implement any addition features.
	return []pubsub.Feature{pubsub.FeatureMessageTTL}
}

For pub sub components that support TTL per topic or queue but not per message, there are some design choices:

  • Configure the TTL for the topic or queue as usual. Optionally, implement topic or queue provisioning in the Init() method, using the component configuration's metadata to determine the topic or queue TTL.
  • Let Dapr runtime handle ttlInSeconds for messages that want to expire earlier than the topic's or queue's TTL. So, applications can still benefit from TTL per message via Dapr for this scenario.

Note: as per the CloudEvent spec, timestamps (like expiration) are formatted using RFC3339.