From fd1a54487711232002d1c82b79085ba2c8f4f1bf Mon Sep 17 00:00:00 2001 From: zhangchao Date: Thu, 18 Mar 2021 21:24:24 +0800 Subject: [PATCH] fix: parse duration-based settings as a time.Duration like redeliverInterval and processingTimeout are and remove the InSec suffix --- pubsub/redis/metadata.go | 8 ++-- pubsub/redis/redis.go | 76 +++++++++++++++++++++----------------- pubsub/redis/redis_test.go | 42 ++++++++++++++------- 3 files changed, 75 insertions(+), 51 deletions(-) diff --git a/pubsub/redis/metadata.go b/pubsub/redis/metadata.go index 2a71a5980..5a2504f34 100644 --- a/pubsub/redis/metadata.go +++ b/pubsub/redis/metadata.go @@ -15,16 +15,16 @@ type metadata struct { // The Redis password password string // Dial timeout for establishing new connections. - dialTimeoutInSec int + dialTimeout time.Duration // Timeout for socket reads. If reached, commands will fail // with a timeout instead of blocking. Use value -1 for no timeout and 0 for default. - readTimeoutInSec int + readTimeout time.Duration // Timeout for socket writes. If reached, commands will fail - writeTimeoutInSec int + writeTimeout time.Duration // Maximum number of socket connections. poolSize int // Connection age at which client retires (closes) the connection. - maxConnAgeInSec int + maxConnAge time.Duration // The consumer identifier consumerID string // A flag to enables TLS by setting InsecureSkipVerify to true diff --git a/pubsub/redis/redis.go b/pubsub/redis/redis.go index 53e1e8fbd..7f0507ff5 100644 --- a/pubsub/redis/redis.go +++ b/pubsub/redis/redis.go @@ -22,11 +22,11 @@ import ( const ( host = "redisHost" password = "redisPassword" - dialTimeoutInSec = "dialTimeoutInSec" - readTimeoutInSec = "readTimeoutInSec" - writeTimeoutInSec = "writeTimeoutInSec" + dialTimeout = "dialTimeout" + readTimeout = "readTimeout" + writeTimeout = "writeTimeout" poolSize = "poolSize" - maxConnAgeInSec = "maxConnAgeInSec" + maxConnAge = "maxConnAge" consumerID = "consumerID" enableTLS = "enableTLS" processingTimeout = "processingTimeout" @@ -134,27 +134,35 @@ func parseRedisMetadata(meta pubsub.Metadata) (metadata, error) { m.concurrency = uint(concurrency) } - if val, ok := meta.Properties[dialTimeoutInSec]; ok && val != "" { - var err error - m.dialTimeoutInSec, err = strconv.Atoi(val) - if err != nil { - return m, fmt.Errorf("redis streams error: invalid dialTimeoutInSec %s, %s", val, err) + if val, ok := meta.Properties[dialTimeout]; ok && val != "" { + if dialTimeoutMs, err := strconv.ParseUint(val, 10, 64); err == nil { + m.dialTimeout = time.Duration(dialTimeoutMs) * time.Millisecond + } else if d, err := time.ParseDuration(val); err == nil { + m.dialTimeout = d + } else { + return m, fmt.Errorf("redis streams error: invalid dialTimeout %s, %s", val, err) } } - if val, ok := meta.Properties[readTimeoutInSec]; ok && val != "" { - var err error - m.readTimeoutInSec, err = strconv.Atoi(val) - if err != nil { - return m, fmt.Errorf("redis streams error: invalid readTimeoutInSec %s, %s", val, err) + if val, ok := meta.Properties[readTimeout]; ok && val != "" { + if val == "-1" { + m.readTimeout = -1 + } else if readTimeoutMs, err := strconv.ParseUint(val, 10, 64); err == nil { + m.readTimeout = time.Duration(readTimeoutMs) * time.Millisecond + } else if d, err := time.ParseDuration(val); err == nil { + m.readTimeout = d + } else { + return m, fmt.Errorf("redis streams error: invalid readTimeout %s, %s", val, err) } } - if val, ok := meta.Properties[writeTimeoutInSec]; ok && val != "" { - var err error - m.writeTimeoutInSec, err = strconv.Atoi(val) - if err != nil { - return m, fmt.Errorf("redis streams error: invalid writeTimeoutInSec %s, %s", val, err) + if val, ok := meta.Properties[writeTimeout]; ok && val != "" { + if writeTimeoutMs, err := strconv.ParseUint(val, 10, 64); err == nil { + m.writeTimeout = time.Duration(writeTimeoutMs) * time.Millisecond + } else if d, err := time.ParseDuration(val); err == nil { + m.writeTimeout = d + } else { + return m, fmt.Errorf("redis streams error: invalid writeTimeout %s, %s", val, err) } } @@ -166,11 +174,13 @@ func parseRedisMetadata(meta pubsub.Metadata) (metadata, error) { } } - if val, ok := meta.Properties[maxConnAgeInSec]; ok && val != "" { - var err error - m.maxConnAgeInSec, err = strconv.Atoi(val) - if err != nil { - return m, fmt.Errorf("redis streams error: invalid maxConnAgeInSec %s, %s", val, err) + if val, ok := meta.Properties[maxConnAge]; ok && val != "" { + if maxConnAgeMs, err := strconv.ParseUint(val, 10, 64); err == nil { + m.maxConnAge = time.Duration(maxConnAgeMs) * time.Millisecond + } else if d, err := time.ParseDuration(val); err == nil { + m.maxConnAge = d + } else { + return m, fmt.Errorf("redis streams error: invalid maxConnAge %s, %s", val, err) } } @@ -191,22 +201,20 @@ func (r *redisStreams) Init(metadata pubsub.Metadata) error { MaxRetries: 3, MaxRetryBackoff: time.Second * 2, } - if m.dialTimeoutInSec > 0 { - options.DialTimeout = time.Duration(m.dialTimeoutInSec) * time.Second + if m.dialTimeout > 0 { + options.DialTimeout = m.dialTimeout } - if m.readTimeoutInSec > 0 { - options.ReadTimeout = time.Duration(m.readTimeoutInSec) * time.Second - } else if m.readTimeoutInSec == -1 { - options.ReadTimeout = time.Duration(m.readTimeoutInSec) + if m.readTimeout >= -1 { + options.ReadTimeout = m.readTimeout } - if m.writeTimeoutInSec > 0 { - options.WriteTimeout = time.Duration(m.writeTimeoutInSec) * time.Second + if m.writeTimeout > 0 { + options.WriteTimeout = m.writeTimeout } if m.poolSize > 0 { options.PoolSize = m.poolSize } - if m.maxConnAgeInSec > 0 { - options.MaxConnAge = time.Duration(m.maxConnAgeInSec) * time.Second + if m.maxConnAge > 0 { + options.MaxConnAge = m.maxConnAge } /* #nosec */ diff --git a/pubsub/redis/redis_test.go b/pubsub/redis/redis_test.go index 430664dd6..358765fe4 100644 --- a/pubsub/redis/redis_test.go +++ b/pubsub/redis/redis_test.go @@ -11,6 +11,7 @@ import ( "fmt" "sync" "testing" + "time" "github.com/go-redis/redis/v7" "github.com/stretchr/testify/assert" @@ -21,15 +22,15 @@ import ( func getFakeProperties() map[string]string { return map[string]string{ - consumerID: "fakeConsumer", - host: "fake.redis.com", - password: "fakePassword", - enableTLS: "true", - dialTimeoutInSec: "5", - readTimeoutInSec: "5", - writeTimeoutInSec: "5", - poolSize: "20", - maxConnAgeInSec: "200", + consumerID: "fakeConsumer", + host: "fake.redis.com", + password: "fakePassword", + enableTLS: "true", + dialTimeout: "5s", + readTimeout: "5s", + writeTimeout: "50000", + poolSize: "20", + maxConnAge: "200s", } } @@ -50,11 +51,11 @@ func TestParseRedisMetadata(t *testing.T) { assert.Equal(t, fakeProperties[password], m.password) assert.Equal(t, fakeProperties[consumerID], m.consumerID) assert.Equal(t, true, m.enableTLS) - assert.Equal(t, 5, m.dialTimeoutInSec) - assert.Equal(t, 5, m.readTimeoutInSec) - assert.Equal(t, 5, m.writeTimeoutInSec) + assert.Equal(t, 5 * time.Second, m.dialTimeout) + assert.Equal(t, 5 * time.Second, m.readTimeout) + assert.Equal(t, 50000 * time.Millisecond, m.writeTimeout) assert.Equal(t, 20, m.poolSize) - assert.Equal(t, 200, m.maxConnAgeInSec) + assert.Equal(t, 200 * time.Second, m.maxConnAge) }) t.Run("host is not given", func(t *testing.T) { @@ -91,6 +92,21 @@ func TestParseRedisMetadata(t *testing.T) { assert.Equal(t, fakeProperties[password], m.password) assert.Empty(t, m.consumerID) }) + + t.Run("readTimeout can be set as -1", func(t *testing.T) { + fakeProperties := getFakeProperties() + + fakeMetaData := pubsub.Metadata{ + Properties: fakeProperties, + } + fakeMetaData.Properties[readTimeout] = "-1" + + // act + m, err := parseRedisMetadata(fakeMetaData) + // assert + assert.NoError(t, err) + assert.True(t, m.readTimeout == -1) + }) } func TestProcessStreams(t *testing.T) {