Added redeliveryDelay to pubsub.pulsar

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
ItalyPaleAle 2022-06-27 21:12:34 +00:00
parent 0c40dccd07
commit a942b05b4a
3 changed files with 20 additions and 5 deletions

View File

@ -27,4 +27,5 @@ type pulsarMetadata struct {
Namespace string `json:"namespace"` Namespace string `json:"namespace"`
Persistent bool `json:"persistent"` Persistent bool `json:"persistent"`
Token string `json:"token"` Token string `json:"token"`
RedeliveryDelay time.Duration `json:"redeliveryDelay"`
} }

View File

@ -41,6 +41,7 @@ const (
tenant = "tenant" tenant = "tenant"
namespace = "namespace" namespace = "namespace"
persistent = "persistent" persistent = "persistent"
redeliveryDelay = "redeliveryDelay"
defaultTenant = "public" defaultTenant = "public"
defaultNamespace = "default" defaultNamespace = "default"
@ -59,6 +60,8 @@ const (
defaultMaxMessages = 1000 defaultMaxMessages = 1000
// defaultMaxBatchSize init default for maximum number of bytes per batch. // defaultMaxBatchSize init default for maximum number of bytes per batch.
defaultMaxBatchSize = 128 * 1024 defaultMaxBatchSize = 128 * 1024
// defaultRedeliveryDelay init default for redelivery delay.
defaultRedeliveryDelay = 30 * time.Second
) )
type Pulsar struct { type Pulsar struct {
@ -124,6 +127,14 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
} }
m.BatchingMaxSize = uint(batchingMaxSize) m.BatchingMaxSize = uint(batchingMaxSize)
} }
m.RedeliveryDelay = defaultRedeliveryDelay
if val, ok := meta.Properties[redeliveryDelay]; ok {
redeliveryDelay, err := formatDuration(val)
if err != nil {
return nil, errors.New("pulsar error: invalid value for redeliveryDelay")
}
m.RedeliveryDelay = redeliveryDelay
}
if val, ok := meta.Properties[persistent]; ok && val != "" { if val, ok := meta.Properties[persistent]; ok && val != "" {
per, err := strconv.ParseBool(val) per, err := strconv.ParseBool(val)
if err != nil { if err != nil {
@ -255,10 +266,11 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
topic := p.formatTopic(req.Topic) topic := p.formatTopic(req.Topic)
options := pulsar.ConsumerOptions{ options := pulsar.ConsumerOptions{
Topic: topic, Topic: topic,
SubscriptionName: p.metadata.ConsumerID, SubscriptionName: p.metadata.ConsumerID,
Type: pulsar.Shared, Type: pulsar.Shared,
MessageChannel: channel, MessageChannel: channel,
NackRedeliveryDelay: p.metadata.RedeliveryDelay,
} }
consumer, err := p.client.Subscribe(options) consumer, err := p.client.Subscribe(options)

View File

@ -9,4 +9,6 @@ spec:
- name: host - name: host
value: "localhost:6650" value: "localhost:6650"
- name: consumerID - name: consumerID
value: myConsumerID value: myConsumerID
- name: redeliveryDelay
value: 1s