components-contrib/pubsub
Sky Ao 8b5554dc71
Refactory kafka binding to reuse the kafka common code extracting from kafka pubsub component (#1696)
* refactory kafka pubsub code to extract common kafka code for reuse

Signed-off-by: Sky Ao <aoxiaojian@gmail.com>

* fix lint;add unit test for subscribeAdapter

Signed-off-by: Sky Ao <aoxiaojian@gmail.com>

* move topics filed from internal kafak struct to pubsub kafka struct, since in input binding the topics will confiured in metadata

Signed-off-by: Sky Ao <aoxiaojian@gmail.com>

* reuse internal  kafka code for bindings

Signed-off-by: Sky Ao <aoxiaojian@gmail.com>

* add redis standalone_test back which is delete by mistaken

Signed-off-by: Sky Ao <aoxiaojian@gmail.com>

* small code improvement to trigger test

Signed-off-by: Sky Ao <aoxiaojian@gmail.com>

* add license headers

Signed-off-by: Sky Ao <aoxiaojian@gmail.com>

* try to set disbaleTls to true to verify the kafka connection fail

Signed-off-by: Sky Ao <aoxiaojian@gmail.com>

* don't enable consum retry in kafka binding component;if authenticaion is disabled, need not set TLSDisable at the same time;

Signed-off-by: Sky Ao <aoxiaojian@gmail.com>

* fix lint

Signed-off-by: Sky Ao <aoxiaojian@gmail.com>

Co-authored-by: Loong Dai <long.dai@intel.com>
2022-05-10 20:07:23 -07:00
..
aws/snssqs conformance test runs in single mode 2022-04-11 11:41:52 +03:00
azure Use revive instead of golint (#1685) 2022-05-06 12:55:17 -07:00
gcp/pubsub Merge latest changes from upstream and resolve conflict 2022-03-10 15:47:11 +00:00
hazelcast update license to Apache v2.0 (#1406) 2022-01-04 19:53:31 -08:00
in-memory update license to Apache v2.0 (#1406) 2022-01-04 19:53:31 -08:00
jetstream Add support for nats jetstream wildcard subscriptions (#1465) 2022-02-15 09:46:33 -08:00
kafka Refactory kafka binding to reuse the kafka common code extracting from kafka pubsub component (#1696) 2022-05-10 20:07:23 -07:00
mqtt MQTT Pubsub Certification Testing + AutAckOff Fix for MQTT (#1420) 2022-01-07 10:33:54 -08:00
natsstreaming Add context in bindings interface 2022-04-25 14:16:15 +08:00
pulsar Add metadata property to configure Batching in Pulsar (#1707) 2022-05-06 17:59:27 -07:00
rabbitmq supports rabbitmq pubsub set exchangekind (#1520) 2022-02-21 13:08:18 -08:00
redis update license to Apache v2.0 (#1406) 2022-01-04 19:53:31 -08:00
rocketmq update license to Apache v2.0 (#1406) 2022-01-04 19:53:31 -08:00
Readme.md TTL in PubSub. (#565) 2020-12-29 14:28:52 -08:00
concurrency.go update license to Apache v2.0 (#1406) 2022-01-04 19:53:31 -08:00
concurrency_test.go update license to Apache v2.0 (#1406) 2022-01-04 19:53:31 -08:00
envelope.go Add context in bindings interface 2022-04-25 14:16:15 +08:00
envelope_test.go fix/typo: Change traceid to traceparent (#1604) 2022-03-21 10:21:39 -07:00
feature.go update license to Apache v2.0 (#1406) 2022-01-04 19:53:31 -08:00
metadata.go update license to Apache v2.0 (#1406) 2022-01-04 19:53:31 -08:00
pubsub.go update license to Apache v2.0 (#1406) 2022-01-04 19:53:31 -08:00
requests.go Added ContentType to pubsub/binding/state request-response (#1376) 2022-01-28 10:17:04 -08:00
responses.go update license to Apache v2.0 (#1406) 2022-01-04 19:53:31 -08:00

Readme.md

Pub Sub

Pub Subs provide a common way to interact with different message bus implementations to achieve reliable, high-scale scenarios based on event-driven async communications, while allowing users to opt-in to advanced capabilities using defined metadata.

Currently supported pub-subs are:

  • Hazelcast
  • Redis Streams
  • NATS
  • Kafka
  • Azure Service Bus
  • RabbitMQ
  • Azure Event Hubs
  • GCP Pub/Sub
  • MQTT

Implementing a new Pub Sub

A compliant pub sub needs to implement the following interface:

type PubSub interface {
	Init(metadata Metadata) error
	Publish(req *PublishRequest) error
	Subscribe(req SubscribeRequest, handler func(msg *NewMessage) error) error
}

Message TTL (or Time To Live)

Message Time to live is implemented by default in Dapr. A publishing application can set the expiration of individual messages by publishing it with the ttlInSeconds metadata. Components that support message TTL should parse this metadata attribute. For components that do not implement this feature in Dapr, the runtime will automatically populate the expiration attribute in the CloudEvent object if ttlInSeconds is present - in this case, Dapr will expire the message when a Dapr subscriber is about to consume an expired message. The expiration attribute is handled by Dapr runtime as a convenience to subscribers, dropping expired messages without invoking subscribers' endpoint. Subscriber applications that don't use Dapr, need to handle this attribute and implement the expiration logic.

If the pub sub component implementation can handle message TTL natively without relying on Dapr, consume the ttlInSeconds metadata in the component implementation for the Publish function. Also, implement the Features() function so the Dapr runtime knows that it should not add the expiration attribute to events.

Example:

import contrib_metadata "github.com/dapr/components-contrib/metadata"

//...

func (c *MyComponent) Publish(req *pubsub.PublishRequest) error {
	//...
	ttl, hasTTL, _ := contrib_metadata.TryGetTTL(req.Metadata)
	if hasTTL {
		//... handle ttl for component.
	}
	//...
	return nil
}

func (c *MyComponent) Features() []pubsub.Feature {
	// Tip: cache this list into a private property.
	// Simply return nil if component does not implement any addition features.
	return []pubsub.Feature{pubsub.FeatureMessageTTL}
}

For pub sub components that support TTL per topic or queue but not per message, there are some design choices:

  • Configure the TTL for the topic or queue as usual. Optionally, implement topic or queue provisioning in the Init() method, using the component configuration's metadata to determine the topic or queue TTL.
  • Let Dapr runtime handle ttlInSeconds for messages that want to expire earlier than the topic's or queue's TTL. So, applications can still benefit from TTL per message via Dapr for this scenario.

Note: as per the CloudEvent spec, timestamps (like expiration) are formatted using RFC3339.