fix: parse duration-based settings as a time.Duration like redeliverInterval and processingTimeout are and remove the InSec suffix

This commit is contained in:
zhangchao 2021-03-18 21:24:24 +08:00
parent f433307fb1
commit fd1a544877
3 changed files with 75 additions and 51 deletions

View File

@ -15,16 +15,16 @@ type metadata struct {
// The Redis password
password string
// Dial timeout for establishing new connections.
dialTimeoutInSec int
dialTimeout time.Duration
// Timeout for socket reads. If reached, commands will fail
// with a timeout instead of blocking. Use value -1 for no timeout and 0 for default.
readTimeoutInSec int
readTimeout time.Duration
// Timeout for socket writes. If reached, commands will fail
writeTimeoutInSec int
writeTimeout time.Duration
// Maximum number of socket connections.
poolSize int
// Connection age at which client retires (closes) the connection.
maxConnAgeInSec int
maxConnAge time.Duration
// The consumer identifier
consumerID string
// A flag to enables TLS by setting InsecureSkipVerify to true

View File

@ -22,11 +22,11 @@ import (
const (
host = "redisHost"
password = "redisPassword"
dialTimeoutInSec = "dialTimeoutInSec"
readTimeoutInSec = "readTimeoutInSec"
writeTimeoutInSec = "writeTimeoutInSec"
dialTimeout = "dialTimeout"
readTimeout = "readTimeout"
writeTimeout = "writeTimeout"
poolSize = "poolSize"
maxConnAgeInSec = "maxConnAgeInSec"
maxConnAge = "maxConnAge"
consumerID = "consumerID"
enableTLS = "enableTLS"
processingTimeout = "processingTimeout"
@ -134,27 +134,35 @@ func parseRedisMetadata(meta pubsub.Metadata) (metadata, error) {
m.concurrency = uint(concurrency)
}
if val, ok := meta.Properties[dialTimeoutInSec]; ok && val != "" {
var err error
m.dialTimeoutInSec, err = strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("redis streams error: invalid dialTimeoutInSec %s, %s", val, err)
if val, ok := meta.Properties[dialTimeout]; ok && val != "" {
if dialTimeoutMs, err := strconv.ParseUint(val, 10, 64); err == nil {
m.dialTimeout = time.Duration(dialTimeoutMs) * time.Millisecond
} else if d, err := time.ParseDuration(val); err == nil {
m.dialTimeout = d
} else {
return m, fmt.Errorf("redis streams error: invalid dialTimeout %s, %s", val, err)
}
}
if val, ok := meta.Properties[readTimeoutInSec]; ok && val != "" {
var err error
m.readTimeoutInSec, err = strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("redis streams error: invalid readTimeoutInSec %s, %s", val, err)
if val, ok := meta.Properties[readTimeout]; ok && val != "" {
if val == "-1" {
m.readTimeout = -1
} else if readTimeoutMs, err := strconv.ParseUint(val, 10, 64); err == nil {
m.readTimeout = time.Duration(readTimeoutMs) * time.Millisecond
} else if d, err := time.ParseDuration(val); err == nil {
m.readTimeout = d
} else {
return m, fmt.Errorf("redis streams error: invalid readTimeout %s, %s", val, err)
}
}
if val, ok := meta.Properties[writeTimeoutInSec]; ok && val != "" {
var err error
m.writeTimeoutInSec, err = strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("redis streams error: invalid writeTimeoutInSec %s, %s", val, err)
if val, ok := meta.Properties[writeTimeout]; ok && val != "" {
if writeTimeoutMs, err := strconv.ParseUint(val, 10, 64); err == nil {
m.writeTimeout = time.Duration(writeTimeoutMs) * time.Millisecond
} else if d, err := time.ParseDuration(val); err == nil {
m.writeTimeout = d
} else {
return m, fmt.Errorf("redis streams error: invalid writeTimeout %s, %s", val, err)
}
}
@ -166,11 +174,13 @@ func parseRedisMetadata(meta pubsub.Metadata) (metadata, error) {
}
}
if val, ok := meta.Properties[maxConnAgeInSec]; ok && val != "" {
var err error
m.maxConnAgeInSec, err = strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("redis streams error: invalid maxConnAgeInSec %s, %s", val, err)
if val, ok := meta.Properties[maxConnAge]; ok && val != "" {
if maxConnAgeMs, err := strconv.ParseUint(val, 10, 64); err == nil {
m.maxConnAge = time.Duration(maxConnAgeMs) * time.Millisecond
} else if d, err := time.ParseDuration(val); err == nil {
m.maxConnAge = d
} else {
return m, fmt.Errorf("redis streams error: invalid maxConnAge %s, %s", val, err)
}
}
@ -191,22 +201,20 @@ func (r *redisStreams) Init(metadata pubsub.Metadata) error {
MaxRetries: 3,
MaxRetryBackoff: time.Second * 2,
}
if m.dialTimeoutInSec > 0 {
options.DialTimeout = time.Duration(m.dialTimeoutInSec) * time.Second
if m.dialTimeout > 0 {
options.DialTimeout = m.dialTimeout
}
if m.readTimeoutInSec > 0 {
options.ReadTimeout = time.Duration(m.readTimeoutInSec) * time.Second
} else if m.readTimeoutInSec == -1 {
options.ReadTimeout = time.Duration(m.readTimeoutInSec)
if m.readTimeout >= -1 {
options.ReadTimeout = m.readTimeout
}
if m.writeTimeoutInSec > 0 {
options.WriteTimeout = time.Duration(m.writeTimeoutInSec) * time.Second
if m.writeTimeout > 0 {
options.WriteTimeout = m.writeTimeout
}
if m.poolSize > 0 {
options.PoolSize = m.poolSize
}
if m.maxConnAgeInSec > 0 {
options.MaxConnAge = time.Duration(m.maxConnAgeInSec) * time.Second
if m.maxConnAge > 0 {
options.MaxConnAge = m.maxConnAge
}
/* #nosec */

View File

@ -11,6 +11,7 @@ import (
"fmt"
"sync"
"testing"
"time"
"github.com/go-redis/redis/v7"
"github.com/stretchr/testify/assert"
@ -21,15 +22,15 @@ import (
func getFakeProperties() map[string]string {
return map[string]string{
consumerID: "fakeConsumer",
host: "fake.redis.com",
password: "fakePassword",
enableTLS: "true",
dialTimeoutInSec: "5",
readTimeoutInSec: "5",
writeTimeoutInSec: "5",
poolSize: "20",
maxConnAgeInSec: "200",
consumerID: "fakeConsumer",
host: "fake.redis.com",
password: "fakePassword",
enableTLS: "true",
dialTimeout: "5s",
readTimeout: "5s",
writeTimeout: "50000",
poolSize: "20",
maxConnAge: "200s",
}
}
@ -50,11 +51,11 @@ func TestParseRedisMetadata(t *testing.T) {
assert.Equal(t, fakeProperties[password], m.password)
assert.Equal(t, fakeProperties[consumerID], m.consumerID)
assert.Equal(t, true, m.enableTLS)
assert.Equal(t, 5, m.dialTimeoutInSec)
assert.Equal(t, 5, m.readTimeoutInSec)
assert.Equal(t, 5, m.writeTimeoutInSec)
assert.Equal(t, 5 * time.Second, m.dialTimeout)
assert.Equal(t, 5 * time.Second, m.readTimeout)
assert.Equal(t, 50000 * time.Millisecond, m.writeTimeout)
assert.Equal(t, 20, m.poolSize)
assert.Equal(t, 200, m.maxConnAgeInSec)
assert.Equal(t, 200 * time.Second, m.maxConnAge)
})
t.Run("host is not given", func(t *testing.T) {
@ -91,6 +92,21 @@ func TestParseRedisMetadata(t *testing.T) {
assert.Equal(t, fakeProperties[password], m.password)
assert.Empty(t, m.consumerID)
})
t.Run("readTimeout can be set as -1", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Properties: fakeProperties,
}
fakeMetaData.Properties[readTimeout] = "-1"
// act
m, err := parseRedisMetadata(fakeMetaData)
// assert
assert.NoError(t, err)
assert.True(t, m.readTimeout == -1)
})
}
func TestProcessStreams(t *testing.T) {