Add optional metadata param maxLen for redis stream PubSub component (#835)
* Add optional metadata param maxLen for redis stream PubSub component * fix fmt * fix error info Co-authored-by: Phil Kedy <phil.kedy@gmail.com> Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
This commit is contained in:
parent
83803f575d
commit
f376450d7c
|
@ -70,4 +70,7 @@ type metadata struct {
|
||||||
queueDepth uint
|
queueDepth uint
|
||||||
// The number of concurrent workers that are processing messages
|
// The number of concurrent workers that are processing messages
|
||||||
concurrency uint
|
concurrency uint
|
||||||
|
|
||||||
|
// the max len of stream
|
||||||
|
maxLenApprox int64
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,7 @@ const (
|
||||||
redeliverInterval = "redeliverInterval"
|
redeliverInterval = "redeliverInterval"
|
||||||
queueDepth = "queueDepth"
|
queueDepth = "queueDepth"
|
||||||
concurrency = "concurrency"
|
concurrency = "concurrency"
|
||||||
|
maxLenApprox = "maxLenApprox"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -101,7 +102,7 @@ func parseRedisMetadata(meta pubsub.Metadata) (metadata, error) {
|
||||||
|
|
||||||
if val, ok := meta.Properties[redisType]; ok && val != "" {
|
if val, ok := meta.Properties[redisType]; ok && val != "" {
|
||||||
if val != NodeType && val != ClusterType {
|
if val != NodeType && val != ClusterType {
|
||||||
return m, errors.New("redis type error: unknown redis type")
|
return m, fmt.Errorf("redis type error: unknown redis type: %s", val)
|
||||||
}
|
}
|
||||||
m.redisType = val
|
m.redisType = val
|
||||||
}
|
}
|
||||||
|
@ -288,6 +289,14 @@ func parseRedisMetadata(meta pubsub.Metadata) (metadata, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if val, ok := meta.Properties[maxLenApprox]; ok && val != "" {
|
||||||
|
maxLenApprox, err := strconv.ParseInt(val, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return m, fmt.Errorf("redis streams error: invalid maxLenApprox %s, %s", val, err)
|
||||||
|
}
|
||||||
|
m.maxLenApprox = maxLenApprox
|
||||||
|
}
|
||||||
|
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -369,8 +378,9 @@ func (r *redisStreams) Init(metadata pubsub.Metadata) error {
|
||||||
|
|
||||||
func (r *redisStreams) Publish(req *pubsub.PublishRequest) error {
|
func (r *redisStreams) Publish(req *pubsub.PublishRequest) error {
|
||||||
_, err := r.client.XAdd(&redis.XAddArgs{
|
_, err := r.client.XAdd(&redis.XAddArgs{
|
||||||
Stream: req.Topic,
|
Stream: req.Topic,
|
||||||
Values: map[string]interface{}{"data": req.Data},
|
MaxLenApprox: r.metadata.maxLenApprox,
|
||||||
|
Values: map[string]interface{}{"data": req.Data},
|
||||||
}).Result()
|
}).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("redis streams: error from publish: %s", err)
|
return fmt.Errorf("redis streams: error from publish: %s", err)
|
||||||
|
|
|
@ -40,6 +40,7 @@ func getFakeProperties() map[string]string {
|
||||||
poolTimeout: "1s",
|
poolTimeout: "1s",
|
||||||
idleTimeout: "1s",
|
idleTimeout: "1s",
|
||||||
idleCheckFrequency: "1s",
|
idleCheckFrequency: "1s",
|
||||||
|
maxLenApprox: "1000",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,6 +75,7 @@ func TestParseRedisMetadata(t *testing.T) {
|
||||||
assert.Equal(t, 1*time.Second, m.poolTimeout)
|
assert.Equal(t, 1*time.Second, m.poolTimeout)
|
||||||
assert.Equal(t, 1*time.Second, m.idleTimeout)
|
assert.Equal(t, 1*time.Second, m.idleTimeout)
|
||||||
assert.Equal(t, 1*time.Second, m.idleCheckFrequency)
|
assert.Equal(t, 1*time.Second, m.idleCheckFrequency)
|
||||||
|
assert.Equal(t, int64(1000), m.maxLenApprox)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("host is not given", func(t *testing.T) {
|
t.Run("host is not given", func(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue