From 3e0873691113fb32dc745305174d1a9e64840711 Mon Sep 17 00:00:00 2001 From: Bernd Verst <4535280+berndverst@users.noreply.github.com> Date: Fri, 16 Dec 2022 14:12:11 -0800 Subject: [PATCH] Add support for Redis 7 in all Dapr Components (#2228) * Add support for Redis 7 Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> * Add Redis 7 to conformance tests Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> * automatically detect redis version, make redis7 state conformant Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> * Additions for REDIS without JSON support Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> * Return helpful query API error when missing redis-json Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> * Change Redis 7 port for conformance test Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> * Fix nil reference Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * Update Redis7 port Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> * Enable ContextTimeoutEnabled for Redis v9 client Co-Authored-By: Bernd Verst <4535280+berndverst@users.noreply.github.com> Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * Add timeouts, address code review Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> * Add another timeout Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> * Redis: so much more context Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> * modtidy Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> * fix issue from latest merge Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> * modtidy Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> * Add generic redis nil error Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> * fix redis version check Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> * upgrade redis v9 client Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> * Remove flaky redis v7 pubsub conformance test Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> * modtidy Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Co-authored-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- .../infrastructure/docker-compose-redis7.yml | 8 + .github/workflows/conformance.yml | 16 +- bindings/redis/redis.go | 16 +- bindings/redis/redis_test.go | 18 +- go.mod | 2 + go.sum | 6 +- internal/component/redis/redis.go | 207 +++++---- internal/component/redis/v8client.go | 414 ++++++++++++++++++ internal/component/redis/v9client.go | 410 +++++++++++++++++ lock/redis/standalone.go | 31 +- pubsub/redis/redis.go | 95 ++-- pubsub/redis/redis_test.go | 11 +- state/redis/redis.go | 56 ++- state/redis/redis_query.go | 7 +- state/redis/redis_test.go | 38 +- tests/certification/bindings/redis/go.mod | 1 + tests/certification/bindings/redis/go.sum | 4 +- tests/certification/state/redis/go.mod | 1 + tests/certification/state/redis/go.sum | 4 +- .../bindings/redis/{ => v6}/bindings.yml | 0 tests/config/bindings/redis/v7/bindings.yml | 17 + tests/config/bindings/tests.yml | 7 +- tests/config/pubsub/redis/{ => v6}/pubsub.yml | 0 tests/config/pubsub/redis/v7/pubsub.yml | 20 + tests/config/pubsub/tests.yml | 6 +- .../state/redis/{ => v6}/statestore.yaml | 0 tests/config/state/redis/v7/statestore.yaml | 13 + tests/config/state/tests.yml | 5 +- tests/conformance/common.go | 15 +- tests/conformance/pubsub/pubsub.go | 5 +- 30 files changed, 1184 insertions(+), 249 deletions(-) create mode 100644 .github/infrastructure/docker-compose-redis7.yml create mode 100644 internal/component/redis/v8client.go create mode 100644 internal/component/redis/v9client.go rename tests/config/bindings/redis/{ => v6}/bindings.yml (100%) create mode 100644 tests/config/bindings/redis/v7/bindings.yml rename tests/config/pubsub/redis/{ => v6}/pubsub.yml (100%) create mode 100644 tests/config/pubsub/redis/v7/pubsub.yml rename tests/config/state/redis/{ => v6}/statestore.yaml (100%) create mode 100644 tests/config/state/redis/v7/statestore.yaml diff --git a/.github/infrastructure/docker-compose-redis7.yml b/.github/infrastructure/docker-compose-redis7.yml new file mode 100644 index 000000000..a2090ab0e --- /dev/null +++ b/.github/infrastructure/docker-compose-redis7.yml @@ -0,0 +1,8 @@ +version: '2' +services: + redis: + image: redis:7 + ports: + - "6380:6379" + environment: + - REDIS_REPLICATION_MODE=master diff --git a/.github/workflows/conformance.yml b/.github/workflows/conformance.yml index 7a898acbb..7274717b0 100644 --- a/.github/workflows/conformance.yml +++ b/.github/workflows/conformance.yml @@ -57,7 +57,8 @@ jobs: - bindings.mqtt-mosquitto - bindings.mqtt-vernemq - bindings.postgres - - bindings.redis + - bindings.redis.v6 + - bindings.redis.v7 - bindings.kubemq - bindings.rabbitmq - pubsub.aws.snssqs @@ -69,7 +70,7 @@ jobs: - pubsub.natsstreaming - pubsub.pulsar - pubsub.rabbitmq - - pubsub.redis + - pubsub.redis.v6 - pubsub.kafka-wurstmeister - pubsub.kafka-confluent - pubsub.kubemq @@ -83,7 +84,8 @@ jobs: - state.mysql.mysql - state.mysql.mariadb - state.postgresql - - state.redis + - state.redis.v6 + - state.redis.v7 - state.sqlserver - state.in-memory - state.cockroachdb @@ -254,9 +256,13 @@ jobs: echo "$CERT_NAME=$CERT_FILE" >> $GITHUB_ENV done - - name: Start Redis + - name: Start Redis 6 with Redis JSON run: docker-compose -f ./.github/infrastructure/docker-compose-redisjson.yml -p redis up -d - if: contains(matrix.component, 'redis') + if: contains(matrix.component, 'redis.v6') + + - name: Start Redis 7 + run: docker-compose -f ./.github/infrastructure/docker-compose-redis7.yml -p redis up -d + if: contains(matrix.component, 'redis.v7') - name: Start Temporal run: docker-compose -f ./.github/infrastructure/docker-compose-temporal.yml -p temporal up -d diff --git a/bindings/redis/redis.go b/bindings/redis/redis.go index 41b947d50..305baeb44 100644 --- a/bindings/redis/redis.go +++ b/bindings/redis/redis.go @@ -15,11 +15,9 @@ package redis import ( "context" + "errors" "fmt" - "github.com/go-redis/redis/v8" - "github.com/pkg/errors" - "github.com/dapr/components-contrib/bindings" rediscomponent "github.com/dapr/components-contrib/internal/component/redis" "github.com/dapr/kit/logger" @@ -27,7 +25,7 @@ import ( // Redis is a redis output binding. type Redis struct { - client redis.UniversalClient + client rediscomponent.RedisClient clientSettings *rediscomponent.Settings logger logger.Logger @@ -49,7 +47,7 @@ func (r *Redis) Init(meta bindings.Metadata) (err error) { r.ctx, r.cancel = context.WithCancel(context.Background()) - _, err = r.client.Ping(r.ctx).Result() + _, err = r.client.PingResult(r.ctx) if err != nil { return fmt.Errorf("redis binding: error connecting to redis at %s: %s", r.clientSettings.Host, err) } @@ -58,7 +56,7 @@ func (r *Redis) Init(meta bindings.Metadata) (err error) { } func (r *Redis) Ping() error { - if _, err := r.client.Ping(r.ctx).Result(); err != nil { + if _, err := r.client.PingResult(r.ctx); err != nil { return fmt.Errorf("redis binding: error connecting to redis at %s: %s", r.clientSettings.Host, err) } @@ -77,12 +75,12 @@ func (r *Redis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindi if key, ok := req.Metadata["key"]; ok && key != "" { switch req.Operation { case bindings.DeleteOperation: - err := r.client.Del(ctx, key).Err() + err := r.client.Del(ctx, key) if err != nil { return nil, err } case bindings.GetOperation: - data, err := r.client.Get(ctx, key).Result() + data, err := r.client.Get(ctx, key) if err != nil { return nil, err } @@ -90,7 +88,7 @@ func (r *Redis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindi rep.Data = []byte(data) return rep, nil case bindings.CreateOperation: - _, err := r.client.Do(ctx, "SET", key, req.Data).Result() + err := r.client.DoWrite(ctx, "SET", key, req.Data) if err != nil { return nil, err } diff --git a/bindings/redis/redis_test.go b/bindings/redis/redis_test.go index 81fe68e92..be5bb6519 100644 --- a/bindings/redis/redis_test.go +++ b/bindings/redis/redis_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/dapr/components-contrib/bindings" + internalredis "github.com/dapr/components-contrib/internal/component/redis" "github.com/dapr/kit/logger" ) @@ -34,13 +35,14 @@ func TestInvokeCreate(t *testing.T) { s, c := setupMiniredis() defer s.Close() + // miniRedis is compatible with the existing v8 client bind := &Redis{ client: c, logger: logger.NewLogger("test"), } bind.ctx, bind.cancel = context.WithCancel(context.Background()) - _, err := c.Do(context.Background(), "GET", testKey).Result() + _, err := c.DoRead(context.Background(), "GET", testKey) assert.Equal(t, redis.Nil, err) bindingRes, err := bind.Invoke(context.TODO(), &bindings.InvokeRequest{ @@ -51,7 +53,7 @@ func TestInvokeCreate(t *testing.T) { assert.Equal(t, nil, err) assert.Equal(t, true, bindingRes == nil) - getRes, err := c.Do(context.Background(), "GET", testKey).Result() + getRes, err := c.DoRead(context.Background(), "GET", testKey) assert.Equal(t, nil, err) assert.Equal(t, true, getRes == testData) } @@ -66,7 +68,7 @@ func TestInvokeGet(t *testing.T) { } bind.ctx, bind.cancel = context.WithCancel(context.Background()) - _, err := c.Do(context.Background(), "SET", testKey, testData).Result() + err := c.DoWrite(context.Background(), "SET", testKey, testData) assert.Equal(t, nil, err) bindingRes, err := bind.Invoke(context.TODO(), &bindings.InvokeRequest{ @@ -87,10 +89,10 @@ func TestInvokeDelete(t *testing.T) { } bind.ctx, bind.cancel = context.WithCancel(context.Background()) - _, err := c.Do(context.Background(), "SET", testKey, testData).Result() + err := c.DoWrite(context.Background(), "SET", testKey, testData) assert.Equal(t, nil, err) - getRes, err := c.Do(context.Background(), "GET", testKey).Result() + getRes, err := c.DoRead(context.Background(), "GET", testKey) assert.Equal(t, nil, err) assert.Equal(t, true, getRes == testData) @@ -101,12 +103,12 @@ func TestInvokeDelete(t *testing.T) { assert.Equal(t, nil, err) - rgetRep, err := c.Do(context.Background(), "GET", testKey).Result() + rgetRep, err := c.DoRead(context.Background(), "GET", testKey) assert.Equal(t, redis.Nil, err) assert.Equal(t, nil, rgetRep) } -func setupMiniredis() (*miniredis.Miniredis, *redis.Client) { +func setupMiniredis() (*miniredis.Miniredis, internalredis.RedisClient) { s, err := miniredis.Run() if err != nil { panic(err) @@ -116,5 +118,5 @@ func setupMiniredis() (*miniredis.Miniredis, *redis.Client) { DB: 0, } - return s, redis.NewClient(opts) + return s, internalredis.ClientFromV8Client(redis.NewClient(opts)) } diff --git a/go.mod b/go.mod index e17023352..8cf79a66c 100644 --- a/go.mod +++ b/go.mod @@ -59,6 +59,7 @@ require ( github.com/fasthttp-contrib/sessions v0.0.0-20160905201309-74f6ac73d5d5 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis/v8 v8.11.5 + github.com/go-redis/redis/v9 v9.0.0-rc.2 github.com/go-sql-driver/mysql v1.7.0 github.com/gocql/gocql v1.3.1 github.com/golang-jwt/jwt/v4 v4.4.3 @@ -111,6 +112,7 @@ require ( go.uber.org/atomic v1.10.0 go.uber.org/ratelimit v0.2.0 golang.org/x/crypto v0.4.0 + golang.org/x/mod v0.6.0 golang.org/x/net v0.4.0 golang.org/x/oauth2 v0.3.0 google.golang.org/api v0.104.0 diff --git a/go.sum b/go.sum index 0a2666e42..ebfbbb7b9 100644 --- a/go.sum +++ b/go.sum @@ -771,6 +771,8 @@ github.com/go-playground/validator/v10 v10.11.0 h1:0W+xRM511GY47Yy3bZUbJVitCNg2B github.com/go-playground/validator/v10 v10.11.0/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-redis/redis/v9 v9.0.0-rc.2 h1:IN1eI8AvJJeWHjMW/hlFAv2sAfvTun2DVksDDJ3a6a0= +github.com/go-redis/redis/v9 v9.0.0-rc.2/go.mod h1:cgBknjwcBJa2prbnuHH/4k/Mlj4r0pWNV2HBanHujfY= github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY= github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= @@ -1361,7 +1363,7 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro= -github.com/onsi/gomega v1.23.0 h1:/oxKu9c2HVap+F3PfKort2Hw5DEU+HGlW8n+tguWsys= +github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/open-policy-agent/opa v0.47.3 h1:Uj8zw+q6Cvv1iiQFh704Q6sl3fKVvk35WZNJLsd6mgk= github.com/open-policy-agent/opa v0.47.3/go.mod h1:I5DbT677OGqfk9gvu5i54oIt0rrVf4B5pedpqDquAXo= @@ -1836,6 +1838,8 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.6.0 h1:b9gGHsz9/HhJ3HF5DHQytPpuwocVTChQJK3AvoLRD5I= +golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI= golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= diff --git a/internal/component/redis/redis.go b/internal/component/redis/redis.go index 733caed92..e7a664691 100644 --- a/internal/component/redis/redis.go +++ b/internal/component/redis/redis.go @@ -14,12 +14,14 @@ limitations under the License. package redis import ( - "crypto/tls" + "context" "fmt" "strings" "time" - "github.com/go-redis/redis/v8" + "golang.org/x/mod/semver" + + "github.com/dapr/kit/ptr" ) const ( @@ -27,7 +29,53 @@ const ( NodeType = "node" ) -func ParseClientFromProperties(properties map[string]string, defaultSettings *Settings) (client redis.UniversalClient, settings *Settings, err error) { +type RedisXMessage struct { + ID string + Values map[string]interface{} +} + +type RedisXStream struct { + Stream string + Messages []RedisXMessage +} + +type RedisXPendingExt struct { + ID string + Consumer string + Idle time.Duration + RetryCount int64 +} + +type RedisPipeliner interface { + Exec(ctx context.Context) error + Do(ctx context.Context, args ...interface{}) +} + +var clientHasJSONSupport *bool + +//nolint:interfacebloat +type RedisClient interface { + GetNilValueError() RedisError + Context() context.Context + DoRead(ctx context.Context, args ...interface{}) (interface{}, error) + DoWrite(ctx context.Context, args ...interface{}) error + Del(ctx context.Context, keys ...string) error + Get(ctx context.Context, key string) (string, error) + Close() error + PingResult(ctx context.Context) (string, error) + SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) (*bool, error) + EvalInt(ctx context.Context, script string, keys []string, args ...interface{}) (*int, error, error) + XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error) + XGroupCreateMkStream(ctx context.Context, stream string, group string, start string) error + XAck(ctx context.Context, stream string, group string, messageID string) error + XReadGroupResult(ctx context.Context, group string, consumer string, streams []string, count int64, block time.Duration) ([]RedisXStream, error) + XPendingExtResult(ctx context.Context, stream string, group string, start string, end string, count int64) ([]RedisXPendingExt, error) + XClaimResult(ctx context.Context, stream string, group string, consumer string, minIdleTime time.Duration, messageIDs []string) ([]RedisXMessage, error) + TxPipeline() RedisPipeliner + TTLResult(ctx context.Context, key string) (time.Duration, error) +} + +func ParseClientFromProperties(properties map[string]string, defaultSettings *Settings) (client RedisClient, settings *Settings, err error) { if defaultSettings == nil { settings = &Settings{} } else { @@ -37,110 +85,79 @@ func ParseClientFromProperties(properties map[string]string, defaultSettings *Se if err != nil { return nil, nil, fmt.Errorf("redis client configuration error: %w", err) } + + var c RedisClient if settings.Failover { - return newFailoverClient(settings), settings, nil + c = newV8FailoverClient(settings) + } else { + c = newV8Client(settings) } + version, versionErr := GetServerVersion(c) + c.Close() // close the client to avoid leaking connections - return newClient(settings), settings, nil + useNewClient := false + if versionErr != nil { + // we couldn't query the server version, so we will assume the v8 client is not supported + useNewClient = true + } else if semver.Compare("v"+version, "v7.0.0") > -1 { + // if the server version is >= 7, we will use the v9 client + useNewClient = true + } + if useNewClient { + if settings.Failover { + return newV9FailoverClient(settings), settings, nil + } + return newV9Client(settings), settings, nil + } else { + if settings.Failover { + return newV8FailoverClient(settings), settings, nil + } + return newV8Client(settings), settings, nil + } } -func newFailoverClient(s *Settings) redis.UniversalClient { - if s == nil { - return nil +func ClientHasJSONSupport(c RedisClient) bool { + if clientHasJSONSupport != nil { + return *clientHasJSONSupport } - opts := &redis.FailoverOptions{ - DB: s.DB, - MasterName: s.SentinelMasterName, - SentinelAddrs: []string{s.Host}, - Password: s.Password, - Username: s.Username, - MaxRetries: s.RedisMaxRetries, - MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval), - MinRetryBackoff: time.Duration(s.RedisMinRetryInterval), - DialTimeout: time.Duration(s.DialTimeout), - ReadTimeout: time.Duration(s.ReadTimeout), - WriteTimeout: time.Duration(s.WriteTimeout), - PoolSize: s.PoolSize, - MaxConnAge: time.Duration(s.MaxConnAge), - MinIdleConns: s.MinIdleConns, - PoolTimeout: time.Duration(s.PoolTimeout), - IdleCheckFrequency: time.Duration(s.IdleCheckFrequency), - IdleTimeout: time.Duration(s.IdleTimeout), + bgctx := context.Background() + ctx, cancel := context.WithTimeout(bgctx, 5*time.Second) + defer cancel() + err := c.DoWrite(ctx, "JSON.GET") + + if err == nil { + clientHasJSONSupport = ptr.Of(true) + return true } - /* #nosec */ - if s.EnableTLS { - opts.TLSConfig = &tls.Config{ - InsecureSkipVerify: s.EnableTLS, - } + if strings.HasPrefix(err.Error(), "ERR unknown command") { + clientHasJSONSupport = ptr.Of(false) + return false } - - if s.RedisType == ClusterType { - opts.SentinelAddrs = strings.Split(s.Host, ",") - - return redis.NewFailoverClusterClient(opts) - } - - return redis.NewFailoverClient(opts) + clientHasJSONSupport = ptr.Of(true) + return true } -func newClient(s *Settings) redis.UniversalClient { - if s == nil { - return nil +func GetServerVersion(c RedisClient) (string, error) { + bgctx := context.Background() + ctx, cancel := context.WithTimeout(bgctx, 5*time.Second) + defer cancel() + res, err := c.DoRead(ctx, "INFO", "server") + if err != nil { + return "", err } - if s.RedisType == ClusterType { - options := &redis.ClusterOptions{ - Addrs: strings.Split(s.Host, ","), - Password: s.Password, - Username: s.Username, - MaxRetries: s.RedisMaxRetries, - MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval), - MinRetryBackoff: time.Duration(s.RedisMinRetryInterval), - DialTimeout: time.Duration(s.DialTimeout), - ReadTimeout: time.Duration(s.ReadTimeout), - WriteTimeout: time.Duration(s.WriteTimeout), - PoolSize: s.PoolSize, - MaxConnAge: time.Duration(s.MaxConnAge), - MinIdleConns: s.MinIdleConns, - PoolTimeout: time.Duration(s.PoolTimeout), - IdleCheckFrequency: time.Duration(s.IdleCheckFrequency), - IdleTimeout: time.Duration(s.IdleTimeout), - } - /* #nosec */ - if s.EnableTLS { - options.TLSConfig = &tls.Config{ - InsecureSkipVerify: s.EnableTLS, - } - } - - return redis.NewClusterClient(options) - } - - options := &redis.Options{ - Addr: s.Host, - Password: s.Password, - Username: s.Username, - DB: s.DB, - MaxRetries: s.RedisMaxRetries, - MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval), - MinRetryBackoff: time.Duration(s.RedisMinRetryInterval), - DialTimeout: time.Duration(s.DialTimeout), - ReadTimeout: time.Duration(s.ReadTimeout), - WriteTimeout: time.Duration(s.WriteTimeout), - PoolSize: s.PoolSize, - MaxConnAge: time.Duration(s.MaxConnAge), - MinIdleConns: s.MinIdleConns, - PoolTimeout: time.Duration(s.PoolTimeout), - IdleCheckFrequency: time.Duration(s.IdleCheckFrequency), - IdleTimeout: time.Duration(s.IdleTimeout), - } - - /* #nosec */ - if s.EnableTLS { - options.TLSConfig = &tls.Config{ - InsecureSkipVerify: s.EnableTLS, + // get row in string res beginning with "redis_version" + rows := strings.Split(res.(string), "\n") + for _, row := range rows { + if strings.HasPrefix(row, "redis_version:") { + return strings.TrimSpace(strings.Split(row, ":")[1]), nil } } - - return redis.NewClient(options) + return "", fmt.Errorf("could not find redis_version in redis info response") } + +type RedisError string + +func (e RedisError) Error() string { return string(e) } + +func (RedisError) RedisError() {} diff --git a/internal/component/redis/v8client.go b/internal/component/redis/v8client.go new file mode 100644 index 000000000..cc879d15c --- /dev/null +++ b/internal/component/redis/v8client.go @@ -0,0 +1,414 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package redis + +import ( + "context" + "crypto/tls" + "strings" + "time" + + v8 "github.com/go-redis/redis/v8" +) + +type v8Pipeliner struct { + pipeliner v8.Pipeliner + writeTimeout Duration +} + +func (p v8Pipeliner) Exec(ctx context.Context) error { + _, err := p.pipeliner.Exec(ctx) + return err +} + +func (p v8Pipeliner) Do(ctx context.Context, args ...interface{}) { + if p.writeTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(p.writeTimeout)) + defer cancel() + p.pipeliner.Do(timeoutCtx, args...) + } + p.pipeliner.Do(ctx, args...) +} + +// v8Client is an interface implementation of RedisClient + +type v8Client struct { + client v8.UniversalClient + readTimeout Duration + writeTimeout Duration + dialTimeout Duration +} + +func (c v8Client) DoWrite(ctx context.Context, args ...interface{}) error { + if c.writeTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout)) + defer cancel() + return c.client.Do(timeoutCtx, args...).Err() + } + return c.client.Do(ctx, args...).Err() +} + +func (c v8Client) DoRead(ctx context.Context, args ...interface{}) (interface{}, error) { + if c.readTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout)) + defer cancel() + return c.client.Do(timeoutCtx, args...).Result() + } + return c.client.Do(ctx, args...).Result() +} + +func (c v8Client) Del(ctx context.Context, keys ...string) error { + err := c.client.Del(ctx, keys...).Err() + if err != nil { + return err + } + return nil +} + +func (c v8Client) Get(ctx context.Context, key string) (string, error) { + return c.client.Get(ctx, key).Result() +} + +func (c v8Client) GetNilValueError() RedisError { + return RedisError(v8.Nil.Error()) +} + +func (c v8Client) Context() context.Context { + return c.client.Context() +} + +func (c v8Client) Close() error { + return c.client.Close() +} + +func (c v8Client) PingResult(ctx context.Context) (string, error) { + if c.dialTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.dialTimeout)) + defer cancel() + return c.client.Ping(timeoutCtx).Result() + } + return c.client.Ping(ctx).Result() +} + +func (c v8Client) EvalInt(ctx context.Context, script string, keys []string, args ...interface{}) (*int, error, error) { + var evalCtx context.Context + if c.readTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout)) + defer cancel() + evalCtx = timeoutCtx + } else { + evalCtx = ctx + } + eval := c.client.Eval(evalCtx, script, keys, args...) + if eval == nil { + return nil, nil, nil + } + i, err := eval.Int() + return &i, err, eval.Err() +} + +func (c v8Client) SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) (*bool, error) { + var writeCtx context.Context + if c.writeTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout)) + defer cancel() + writeCtx = timeoutCtx + } else { + writeCtx = ctx + } + nx := c.client.SetNX(writeCtx, key, value, expiration) + if nx == nil { + return nil, nil + } + val := nx.Val() + return &val, nx.Err() +} + +func (c v8Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error) { + var writeCtx context.Context + if c.writeTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout)) + defer cancel() + writeCtx = timeoutCtx + } else { + writeCtx = ctx + } + return c.client.XAdd(writeCtx, &v8.XAddArgs{ + Stream: stream, + Values: values, + MaxLenApprox: maxLenApprox, + }).Result() +} + +func (c v8Client) XGroupCreateMkStream(ctx context.Context, stream string, group string, start string) error { + var writeCtx context.Context + if c.writeTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout)) + defer cancel() + writeCtx = timeoutCtx + } else { + writeCtx = ctx + } + return c.client.XGroupCreateMkStream(writeCtx, stream, group, start).Err() +} + +func (c v8Client) XAck(ctx context.Context, stream string, group string, messageID string) error { + var readCtx context.Context + if c.readTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout)) + defer cancel() + readCtx = timeoutCtx + } else { + readCtx = ctx + } + ack := c.client.XAck(readCtx, stream, group, messageID) + return ack.Err() +} + +func (c v8Client) XReadGroupResult(ctx context.Context, group string, consumer string, streams []string, count int64, block time.Duration) ([]RedisXStream, error) { + var readCtx context.Context + if c.readTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout)) + defer cancel() + readCtx = timeoutCtx + } else { + readCtx = ctx + } + res, err := c.client.XReadGroup(readCtx, + &v8.XReadGroupArgs{ + Group: group, + Consumer: consumer, + Streams: streams, + Count: count, + Block: block, + }, + ).Result() + if err != nil { + return nil, err + } + + // convert []v8.XStream to []RedisXStream + redisXStreams := make([]RedisXStream, len(res)) + for i, xStream := range res { + redisXStreams[i].Stream = xStream.Stream + redisXStreams[i].Messages = make([]RedisXMessage, len(xStream.Messages)) + for j, message := range xStream.Messages { + redisXStreams[i].Messages[j].ID = message.ID + redisXStreams[i].Messages[j].Values = message.Values + } + } + + return redisXStreams, nil +} + +func (c v8Client) XPendingExtResult(ctx context.Context, stream string, group string, start string, end string, count int64) ([]RedisXPendingExt, error) { + var readCtx context.Context + if c.readTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout)) + defer cancel() + readCtx = timeoutCtx + } else { + readCtx = ctx + } + res, err := c.client.XPendingExt(readCtx, &v8.XPendingExtArgs{ + Stream: stream, + Group: group, + Start: start, + End: end, + Count: count, + }).Result() + if err != nil { + return nil, err + } + + // convert []v8.XPendingExt to []RedisXPendingExt + redisXPendingExts := make([]RedisXPendingExt, len(res)) + for i, xPendingExt := range res { + redisXPendingExts[i] = RedisXPendingExt(xPendingExt) + } + return redisXPendingExts, nil +} + +func (c v8Client) XClaimResult(ctx context.Context, stream string, group string, consumer string, minIdleTime time.Duration, messageIDs []string) ([]RedisXMessage, error) { + var readCtx context.Context + if c.readTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout)) + defer cancel() + readCtx = timeoutCtx + } else { + readCtx = ctx + } + res, err := c.client.XClaim(readCtx, &v8.XClaimArgs{ + Stream: stream, + Group: group, + Consumer: consumer, + MinIdle: minIdleTime, + Messages: messageIDs, + }).Result() + if err != nil { + return nil, err + } + + // convert res to []RedisXMessage + redisXMessages := make([]RedisXMessage, len(res)) + for i, xMessage := range res { + redisXMessages[i] = RedisXMessage(xMessage) + } + + return redisXMessages, nil +} + +func (c v8Client) TxPipeline() RedisPipeliner { + return v8Pipeliner{ + pipeliner: c.client.TxPipeline(), + writeTimeout: c.writeTimeout, + } +} + +func (c v8Client) TTLResult(ctx context.Context, key string) (time.Duration, error) { + var writeCtx context.Context + if c.writeTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout)) + defer cancel() + writeCtx = timeoutCtx + } else { + writeCtx = ctx + } + return c.client.TTL(writeCtx, key).Result() +} + +func newV8FailoverClient(s *Settings) RedisClient { + if s == nil { + return nil + } + opts := &v8.FailoverOptions{ + DB: s.DB, + MasterName: s.SentinelMasterName, + SentinelAddrs: []string{s.Host}, + Password: s.Password, + Username: s.Username, + MaxRetries: s.RedisMaxRetries, + MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval), + MinRetryBackoff: time.Duration(s.RedisMinRetryInterval), + DialTimeout: time.Duration(s.DialTimeout), + ReadTimeout: time.Duration(s.ReadTimeout), + WriteTimeout: time.Duration(s.WriteTimeout), + PoolSize: s.PoolSize, + MaxConnAge: time.Duration(s.MaxConnAge), + MinIdleConns: s.MinIdleConns, + PoolTimeout: time.Duration(s.PoolTimeout), + IdleCheckFrequency: time.Duration(s.IdleCheckFrequency), + IdleTimeout: time.Duration(s.IdleTimeout), + } + + /* #nosec */ + if s.EnableTLS { + opts.TLSConfig = &tls.Config{ + InsecureSkipVerify: s.EnableTLS, + } + } + + if s.RedisType == ClusterType { + opts.SentinelAddrs = strings.Split(s.Host, ",") + + return v8Client{ + client: v8.NewFailoverClusterClient(opts), + readTimeout: s.ReadTimeout, + writeTimeout: s.WriteTimeout, + dialTimeout: s.DialTimeout, + } + } + + return v8Client{ + client: v8.NewFailoverClient(opts), + readTimeout: s.ReadTimeout, + writeTimeout: s.WriteTimeout, + dialTimeout: s.DialTimeout, + } +} + +func newV8Client(s *Settings) RedisClient { + if s == nil { + return nil + } + if s.RedisType == ClusterType { + options := &v8.ClusterOptions{ + Addrs: strings.Split(s.Host, ","), + Password: s.Password, + Username: s.Username, + MaxRetries: s.RedisMaxRetries, + MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval), + MinRetryBackoff: time.Duration(s.RedisMinRetryInterval), + DialTimeout: time.Duration(s.DialTimeout), + ReadTimeout: time.Duration(s.ReadTimeout), + WriteTimeout: time.Duration(s.WriteTimeout), + PoolSize: s.PoolSize, + MaxConnAge: time.Duration(s.MaxConnAge), + MinIdleConns: s.MinIdleConns, + PoolTimeout: time.Duration(s.PoolTimeout), + IdleCheckFrequency: time.Duration(s.IdleCheckFrequency), + IdleTimeout: time.Duration(s.IdleTimeout), + } + /* #nosec */ + if s.EnableTLS { + options.TLSConfig = &tls.Config{ + InsecureSkipVerify: s.EnableTLS, + } + } + + return v8Client{ + client: v8.NewClusterClient(options), + readTimeout: s.ReadTimeout, + writeTimeout: s.WriteTimeout, + dialTimeout: s.DialTimeout, + } + } + + options := &v8.Options{ + Addr: s.Host, + Password: s.Password, + Username: s.Username, + DB: s.DB, + MaxRetries: s.RedisMaxRetries, + MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval), + MinRetryBackoff: time.Duration(s.RedisMinRetryInterval), + DialTimeout: time.Duration(s.DialTimeout), + ReadTimeout: time.Duration(s.ReadTimeout), + WriteTimeout: time.Duration(s.WriteTimeout), + PoolSize: s.PoolSize, + MaxConnAge: time.Duration(s.MaxConnAge), + MinIdleConns: s.MinIdleConns, + PoolTimeout: time.Duration(s.PoolTimeout), + IdleCheckFrequency: time.Duration(s.IdleCheckFrequency), + IdleTimeout: time.Duration(s.IdleTimeout), + } + + /* #nosec */ + if s.EnableTLS { + options.TLSConfig = &tls.Config{ + InsecureSkipVerify: s.EnableTLS, + } + } + + return v8Client{ + client: v8.NewClient(options), + readTimeout: s.ReadTimeout, + writeTimeout: s.WriteTimeout, + dialTimeout: s.DialTimeout, + } +} + +func ClientFromV8Client(client v8.UniversalClient) RedisClient { + return v8Client{client: client} +} diff --git a/internal/component/redis/v9client.go b/internal/component/redis/v9client.go new file mode 100644 index 000000000..1f231b90f --- /dev/null +++ b/internal/component/redis/v9client.go @@ -0,0 +1,410 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package redis + +import ( + "context" + "crypto/tls" + "strings" + "time" + + v9 "github.com/go-redis/redis/v9" +) + +type v9Pipeliner struct { + pipeliner v9.Pipeliner + writeTimeout Duration +} + +func (p v9Pipeliner) Exec(ctx context.Context) error { + _, err := p.pipeliner.Exec(ctx) + return err +} + +func (p v9Pipeliner) Do(ctx context.Context, args ...interface{}) { + if p.writeTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(p.writeTimeout)) + defer cancel() + p.pipeliner.Do(timeoutCtx, args...) + } + p.pipeliner.Do(ctx, args...) +} + +// v9Client is an interface implementation of RedisClient + +type v9Client struct { + client v9.UniversalClient + readTimeout Duration + writeTimeout Duration + dialTimeout Duration +} + +func (c v9Client) DoWrite(ctx context.Context, args ...interface{}) error { + if c.writeTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout)) + defer cancel() + return c.client.Do(timeoutCtx, args...).Err() + } + return c.client.Do(ctx, args...).Err() +} + +func (c v9Client) DoRead(ctx context.Context, args ...interface{}) (interface{}, error) { + if c.readTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout)) + defer cancel() + return c.client.Do(timeoutCtx, args...).Result() + } + return c.client.Do(ctx, args...).Result() +} + +func (c v9Client) Del(ctx context.Context, keys ...string) error { + err := c.client.Del(ctx, keys...).Err() + if err != nil { + return err + } + return nil +} + +func (c v9Client) Get(ctx context.Context, key string) (string, error) { + return c.client.Get(ctx, key).Result() +} + +func (c v9Client) GetNilValueError() RedisError { + return RedisError(v9.Nil.Error()) +} + +func (c v9Client) Context() context.Context { + return context.Background() +} + +func (c v9Client) Close() error { + return c.client.Close() +} + +func (c v9Client) PingResult(ctx context.Context) (string, error) { + if c.dialTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.dialTimeout)) + defer cancel() + return c.client.Ping(timeoutCtx).Result() + } + return c.client.Ping(ctx).Result() +} + +func (c v9Client) EvalInt(ctx context.Context, script string, keys []string, args ...interface{}) (*int, error, error) { + var evalCtx context.Context + if c.readTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout)) + defer cancel() + evalCtx = timeoutCtx + } else { + evalCtx = ctx + } + eval := c.client.Eval(evalCtx, script, keys, args...) + if eval == nil { + return nil, nil, nil + } + i, err := eval.Int() + return &i, err, eval.Err() +} + +func (c v9Client) SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) (*bool, error) { + var writeCtx context.Context + if c.writeTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout)) + defer cancel() + writeCtx = timeoutCtx + } else { + writeCtx = ctx + } + nx := c.client.SetNX(writeCtx, key, value, expiration) + if nx == nil { + return nil, nil + } + val := nx.Val() + return &val, nx.Err() +} + +func (c v9Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error) { + var writeCtx context.Context + if c.writeTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout)) + defer cancel() + writeCtx = timeoutCtx + } else { + writeCtx = ctx + } + return c.client.XAdd(writeCtx, &v9.XAddArgs{ + Stream: stream, + Values: values, + MaxLen: maxLenApprox, + }).Result() +} + +func (c v9Client) XGroupCreateMkStream(ctx context.Context, stream string, group string, start string) error { + var writeCtx context.Context + if c.writeTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout)) + defer cancel() + writeCtx = timeoutCtx + } else { + writeCtx = ctx + } + return c.client.XGroupCreateMkStream(writeCtx, stream, group, start).Err() +} + +func (c v9Client) XAck(ctx context.Context, stream string, group string, messageID string) error { + var readCtx context.Context + if c.readTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout)) + defer cancel() + readCtx = timeoutCtx + } else { + readCtx = ctx + } + ack := c.client.XAck(readCtx, stream, group, messageID) + return ack.Err() +} + +func (c v9Client) XReadGroupResult(ctx context.Context, group string, consumer string, streams []string, count int64, block time.Duration) ([]RedisXStream, error) { + var readCtx context.Context + if c.readTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout)) + defer cancel() + readCtx = timeoutCtx + } else { + readCtx = ctx + } + res, err := c.client.XReadGroup(readCtx, + &v9.XReadGroupArgs{ + Group: group, + Consumer: consumer, + Streams: streams, + Count: count, + Block: block, + }, + ).Result() + if err != nil { + return nil, err + } + + // convert []v9.XStream to []RedisXStream + redisXStreams := make([]RedisXStream, len(res)) + for i, xStream := range res { + redisXStreams[i].Stream = xStream.Stream + redisXStreams[i].Messages = make([]RedisXMessage, len(xStream.Messages)) + for j, message := range xStream.Messages { + redisXStreams[i].Messages[j].ID = message.ID + redisXStreams[i].Messages[j].Values = message.Values + } + } + + return redisXStreams, nil +} + +func (c v9Client) XPendingExtResult(ctx context.Context, stream string, group string, start string, end string, count int64) ([]RedisXPendingExt, error) { + var readCtx context.Context + if c.readTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout)) + defer cancel() + readCtx = timeoutCtx + } else { + readCtx = ctx + } + res, err := c.client.XPendingExt(readCtx, &v9.XPendingExtArgs{ + Stream: stream, + Group: group, + Start: start, + End: end, + Count: count, + }).Result() + if err != nil { + return nil, err + } + + // convert []v9.XPendingExt to []RedisXPendingExt + redisXPendingExts := make([]RedisXPendingExt, len(res)) + for i, xPendingExt := range res { + redisXPendingExts[i] = RedisXPendingExt(xPendingExt) + } + return redisXPendingExts, nil +} + +func (c v9Client) XClaimResult(ctx context.Context, stream string, group string, consumer string, minIdleTime time.Duration, messageIDs []string) ([]RedisXMessage, error) { + var readCtx context.Context + if c.readTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout)) + defer cancel() + readCtx = timeoutCtx + } else { + readCtx = ctx + } + res, err := c.client.XClaim(readCtx, &v9.XClaimArgs{ + Stream: stream, + Group: group, + Consumer: consumer, + MinIdle: minIdleTime, + Messages: messageIDs, + }).Result() + if err != nil { + return nil, err + } + + // convert res to []RedisXMessage + redisXMessages := make([]RedisXMessage, len(res)) + for i, xMessage := range res { + redisXMessages[i] = RedisXMessage(xMessage) + } + + return redisXMessages, nil +} + +func (c v9Client) TxPipeline() RedisPipeliner { + return v9Pipeliner{ + pipeliner: c.client.TxPipeline(), + writeTimeout: c.writeTimeout, + } +} + +func (c v9Client) TTLResult(ctx context.Context, key string) (time.Duration, error) { + var writeCtx context.Context + if c.writeTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout)) + defer cancel() + writeCtx = timeoutCtx + } else { + writeCtx = ctx + } + return c.client.TTL(writeCtx, key).Result() +} + +func newV9FailoverClient(s *Settings) RedisClient { + if s == nil { + return nil + } + opts := &v9.FailoverOptions{ + DB: s.DB, + MasterName: s.SentinelMasterName, + SentinelAddrs: []string{s.Host}, + Password: s.Password, + Username: s.Username, + MaxRetries: s.RedisMaxRetries, + MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval), + MinRetryBackoff: time.Duration(s.RedisMinRetryInterval), + DialTimeout: time.Duration(s.DialTimeout), + ReadTimeout: time.Duration(s.ReadTimeout), + WriteTimeout: time.Duration(s.WriteTimeout), + PoolSize: s.PoolSize, + ConnMaxLifetime: time.Duration(s.MaxConnAge), + MinIdleConns: s.MinIdleConns, + PoolTimeout: time.Duration(s.PoolTimeout), + ConnMaxIdleTime: time.Duration(s.IdleTimeout), + ContextTimeoutEnabled: true, + } + + /* #nosec */ + if s.EnableTLS { + opts.TLSConfig = &tls.Config{ + InsecureSkipVerify: s.EnableTLS, + } + } + + if s.RedisType == ClusterType { + opts.SentinelAddrs = strings.Split(s.Host, ",") + + return v9Client{ + client: v9.NewFailoverClusterClient(opts), + readTimeout: s.ReadTimeout, + writeTimeout: s.WriteTimeout, + dialTimeout: s.DialTimeout, + } + } + + return v9Client{ + client: v9.NewFailoverClient(opts), + readTimeout: s.ReadTimeout, + writeTimeout: s.WriteTimeout, + dialTimeout: s.DialTimeout, + } +} + +func newV9Client(s *Settings) RedisClient { + if s == nil { + return nil + } + if s.RedisType == ClusterType { + options := &v9.ClusterOptions{ + Addrs: strings.Split(s.Host, ","), + Password: s.Password, + Username: s.Username, + MaxRetries: s.RedisMaxRetries, + MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval), + MinRetryBackoff: time.Duration(s.RedisMinRetryInterval), + DialTimeout: time.Duration(s.DialTimeout), + ReadTimeout: time.Duration(s.ReadTimeout), + WriteTimeout: time.Duration(s.WriteTimeout), + PoolSize: s.PoolSize, + ConnMaxLifetime: time.Duration(s.MaxConnAge), + MinIdleConns: s.MinIdleConns, + PoolTimeout: time.Duration(s.PoolTimeout), + ConnMaxIdleTime: time.Duration(s.IdleTimeout), + ContextTimeoutEnabled: true, + } + /* #nosec */ + if s.EnableTLS { + options.TLSConfig = &tls.Config{ + InsecureSkipVerify: s.EnableTLS, + } + } + + return v9Client{ + client: v9.NewClusterClient(options), + readTimeout: s.ReadTimeout, + writeTimeout: s.WriteTimeout, + dialTimeout: s.DialTimeout, + } + } + + options := &v9.Options{ + Addr: s.Host, + Password: s.Password, + Username: s.Username, + DB: s.DB, + MaxRetries: s.RedisMaxRetries, + MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval), + MinRetryBackoff: time.Duration(s.RedisMinRetryInterval), + DialTimeout: time.Duration(s.DialTimeout), + ReadTimeout: time.Duration(s.ReadTimeout), + WriteTimeout: time.Duration(s.WriteTimeout), + PoolSize: s.PoolSize, + ConnMaxLifetime: time.Duration(s.MaxConnAge), + MinIdleConns: s.MinIdleConns, + PoolTimeout: time.Duration(s.PoolTimeout), + ConnMaxIdleTime: time.Duration(s.IdleTimeout), + ContextTimeoutEnabled: true, + } + + /* #nosec */ + if s.EnableTLS { + options.TLSConfig = &tls.Config{ + InsecureSkipVerify: s.EnableTLS, + } + } + + return v9Client{ + client: v9.NewClient(options), + readTimeout: s.ReadTimeout, + writeTimeout: s.WriteTimeout, + dialTimeout: s.DialTimeout, + } +} diff --git a/lock/redis/standalone.go b/lock/redis/standalone.go index 8a4a0c68d..2f61f8bf2 100644 --- a/lock/redis/standalone.go +++ b/lock/redis/standalone.go @@ -20,8 +20,6 @@ import ( "strings" "time" - "github.com/go-redis/redis/v8" - rediscomponent "github.com/dapr/components-contrib/internal/component/redis" "github.com/dapr/components-contrib/lock" "github.com/dapr/kit/logger" @@ -35,7 +33,7 @@ const ( // Standalone Redis lock store.Any fail-over related features are not supported,such as Sentinel and Redis Cluster. type StandaloneRedisLock struct { - client redis.UniversalClient + client rediscomponent.RedisClient clientSettings *rediscomponent.Settings metadata rediscomponent.Metadata @@ -79,7 +77,7 @@ func (r *StandaloneRedisLock) InitLockStore(metadata lock.Metadata) error { } r.ctx, r.cancel = context.WithCancel(context.Background()) // 3. connect to redis - if _, err = r.client.Ping(r.ctx).Result(); err != nil { + if _, err = r.client.PingResult(r.ctx); err != nil { return fmt.Errorf("[standaloneRedisLock]: error connecting to redis at %s: %s", r.clientSettings.Host, err) } // no replica @@ -104,7 +102,7 @@ func needFailover(properties map[string]string) bool { } func (r *StandaloneRedisLock) getConnectedSlaves() (int, error) { - res, err := r.client.Do(r.ctx, "INFO", "replication").Result() + res, err := r.client.DoRead(r.ctx, "INFO", "replication") if err != nil { return 0, err } @@ -135,37 +133,32 @@ func (r *StandaloneRedisLock) parseConnectedSlaves(res string) int { // Try to acquire a redis lock. func (r *StandaloneRedisLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) { // 1.Setting redis expiration time - nx := r.client.SetNX(r.ctx, req.ResourceID, req.LockOwner, time.Second*time.Duration(req.ExpiryInSeconds)) - if nx == nil { + nxval, err := r.client.SetNX(r.ctx, req.ResourceID, req.LockOwner, time.Second*time.Duration(req.ExpiryInSeconds)) + if nxval == nil { return &lock.TryLockResponse{}, fmt.Errorf("[standaloneRedisLock]: SetNX returned nil.ResourceID: %s", req.ResourceID) } // 2. check error - err := nx.Err() if err != nil { return &lock.TryLockResponse{}, err } return &lock.TryLockResponse{ - Success: nx.Val(), + Success: *nxval, }, nil } // Try to release a redis lock. func (r *StandaloneRedisLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) { // 1. delegate to client.eval lua script - eval := r.client.Eval(r.ctx, unlockScript, []string{req.ResourceID}, req.LockOwner) + evalInt, parseErr, err := r.client.EvalInt(r.ctx, unlockScript, []string{req.ResourceID}, req.LockOwner) // 2. check error - if eval == nil { + if evalInt == nil { return newInternalErrorUnlockResponse(), fmt.Errorf("[standaloneRedisLock]: Eval unlock script returned nil.ResourceID: %s", req.ResourceID) } - err := eval.Err() - if err != nil { - return newInternalErrorUnlockResponse(), err - } // 3. parse result - i, err := eval.Int() + i := *evalInt status := lock.InternalError - if err != nil { + if parseErr != nil { return &lock.UnlockResponse{ Status: status, }, err @@ -194,7 +187,9 @@ func (r *StandaloneRedisLock) Close() error { r.cancel() } if r.client != nil { - return r.client.Close() + closeErr := r.client.Close() + r.client = nil + return closeErr } return nil } diff --git a/pubsub/redis/redis.go b/pubsub/redis/redis.go index 15ea43cc5..17817c75d 100644 --- a/pubsub/redis/redis.go +++ b/pubsub/redis/redis.go @@ -20,8 +20,6 @@ import ( "strconv" "time" - "github.com/go-redis/redis/v8" - rediscomponent "github.com/dapr/components-contrib/internal/component/redis" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" @@ -45,7 +43,7 @@ const ( // on the mechanics of Redis Streams. type redisStreams struct { metadata metadata - client redis.UniversalClient + client rediscomponent.RedisClient clientSettings *rediscomponent.Settings logger logger.Logger @@ -144,7 +142,7 @@ func (r *redisStreams) Init(metadata pubsub.Metadata) error { r.ctx, r.cancel = context.WithCancel(context.Background()) - if _, err = r.client.Ping(r.ctx).Result(); err != nil { + if _, err = r.client.PingResult(r.ctx); err != nil { return fmt.Errorf("redis streams: error connecting to redis at %s: %s", r.clientSettings.Host, err) } r.queue = make(chan redisMessageWrapper, int(r.metadata.queueDepth)) @@ -157,11 +155,7 @@ func (r *redisStreams) Init(metadata pubsub.Metadata) error { } func (r *redisStreams) Publish(req *pubsub.PublishRequest) error { - _, err := r.client.XAdd(r.ctx, &redis.XAddArgs{ - Stream: req.Topic, - MaxLenApprox: r.metadata.maxLenApprox, - Values: map[string]interface{}{"data": req.Data}, - }).Result() + _, err := r.client.XAdd(r.ctx, req.Topic, r.metadata.maxLenApprox, map[string]interface{}{"data": req.Data}) if err != nil { return fmt.Errorf("redis streams: error from publish: %s", err) } @@ -170,7 +164,7 @@ func (r *redisStreams) Publish(req *pubsub.PublishRequest) error { } func (r *redisStreams) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { - err := r.client.XGroupCreateMkStream(ctx, req.Topic, r.metadata.consumerID, "0").Err() + err := r.client.XGroupCreateMkStream(ctx, req.Topic, r.metadata.consumerID, "0") // Ignore BUSYGROUP errors if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { r.logger.Errorf("redis streams: %s", err) @@ -186,14 +180,14 @@ func (r *redisStreams) Subscribe(ctx context.Context, req pubsub.SubscribeReques // enqueueMessages is a shared function that funnels new messages (via polling) // and redelivered messages (via reclaiming) to a channel where workers can // pick them up for processing. -func (r *redisStreams) enqueueMessages(ctx context.Context, stream string, handler pubsub.Handler, msgs []redis.XMessage) { +func (r *redisStreams) enqueueMessages(ctx context.Context, stream string, handler pubsub.Handler, msgs []rediscomponent.RedisXMessage) { for _, msg := range msgs { rmsg := createRedisMessageWrapper(ctx, stream, handler, msg) select { // Might block if the queue is full so we need the ctx.Done below. case r.queue <- rmsg: - + // Noop // Handle cancelation case <-ctx.Done(): return @@ -203,7 +197,7 @@ func (r *redisStreams) enqueueMessages(ctx context.Context, stream string, handl // createRedisMessageWrapper encapsulates the Redis message, message identifier, and handler // in `redisMessage` for processing. -func createRedisMessageWrapper(ctx context.Context, stream string, handler pubsub.Handler, msg redis.XMessage) redisMessageWrapper { +func createRedisMessageWrapper(ctx context.Context, stream string, handler pubsub.Handler, msg rediscomponent.RedisXMessage) redisMessageWrapper { var data []byte if dataValue, exists := msg.Values["data"]; exists && dataValue != nil { switch v := dataValue.(type) { @@ -259,7 +253,7 @@ func (r *redisStreams) processMessage(msg redisMessageWrapper) error { } // Use the background context in case subscriptionCtx is already closed - if err := r.client.XAck(context.Background(), msg.message.Topic, r.metadata.consumerID, msg.messageID).Err(); err != nil { + if err := r.client.XAck(context.Background(), msg.message.Topic, r.metadata.consumerID, msg.messageID); err != nil { r.logger.Errorf("Error acknowledging Redis message %s: %v", msg.messageID, err) return err @@ -278,15 +272,9 @@ func (r *redisStreams) pollNewMessagesLoop(ctx context.Context, stream string, h } // Read messages - streams, err := r.client.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: r.metadata.consumerID, - Consumer: r.metadata.consumerID, - Streams: []string{stream, ">"}, - Count: int64(r.metadata.queueDepth), - Block: time.Duration(r.clientSettings.ReadTimeout), - }).Result() + streams, err := r.client.XReadGroupResult(ctx, r.metadata.consumerID, r.metadata.consumerID, []string{stream, ">"}, int64(r.metadata.queueDepth), time.Duration(r.clientSettings.ReadTimeout)) if err != nil { - if !errors.Is(err, redis.Nil) && err != context.Canceled { + if !errors.Is(err, r.client.GetNilValueError()) && err != context.Canceled { r.logger.Errorf("redis streams: error reading from stream %s: %s", stream, err) } continue @@ -329,14 +317,14 @@ func (r *redisStreams) reclaimPendingMessagesLoop(ctx context.Context, stream st func (r *redisStreams) reclaimPendingMessages(ctx context.Context, stream string, handler pubsub.Handler) { for { // Retrieve pending messages for this stream and consumer - pendingResult, err := r.client.XPendingExt(ctx, &redis.XPendingExtArgs{ - Stream: stream, - Group: r.metadata.consumerID, - Start: "-", - End: "+", - Count: int64(r.metadata.queueDepth), - }).Result() - if err != nil && !errors.Is(err, redis.Nil) { + pendingResult, err := r.client.XPendingExtResult(ctx, + stream, + r.metadata.consumerID, + "-", + "+", + int64(r.metadata.queueDepth), + ) + if err != nil && !errors.Is(err, r.client.GetNilValueError()) { r.logger.Errorf("error retrieving pending Redis messages: %v", err) break @@ -356,14 +344,14 @@ func (r *redisStreams) reclaimPendingMessages(ctx context.Context, stream string } // Attempt to claim the messages for the filtered IDs - claimResult, err := r.client.XClaim(ctx, &redis.XClaimArgs{ - Stream: stream, - Group: r.metadata.consumerID, - Consumer: r.metadata.consumerID, - MinIdle: r.metadata.processingTimeout, - Messages: msgIDs, - }).Result() - if err != nil && !errors.Is(err, redis.Nil) { + claimResult, err := r.client.XClaimResult(ctx, + stream, + r.metadata.consumerID, + r.metadata.consumerID, + r.metadata.processingTimeout, + msgIDs, + ) + if err != nil && !errors.Is(err, r.client.GetNilValueError()) { r.logger.Errorf("error claiming pending Redis messages: %v", err) break @@ -375,7 +363,7 @@ func (r *redisStreams) reclaimPendingMessages(ctx context.Context, stream string // If the Redis nil error is returned, it means somes message in the pending // state no longer exist. We need to acknowledge these messages to // remove them from the pending list. - if errors.Is(err, redis.Nil) { + if errors.Is(err, r.client.GetNilValueError()) { // Build a set of message IDs that were not returned // that potentially no longer exist. expectedMsgIDs := make(map[string]struct{}, len(msgIDs)) @@ -396,23 +384,23 @@ func (r *redisStreams) reclaimPendingMessages(ctx context.Context, stream string func (r *redisStreams) removeMessagesThatNoLongerExistFromPending(ctx context.Context, stream string, messageIDs map[string]struct{}, handler pubsub.Handler) { // Check each message ID individually. for pendingID := range messageIDs { - claimResultSingleMsg, err := r.client.XClaim(ctx, &redis.XClaimArgs{ - Stream: stream, - Group: r.metadata.consumerID, - Consumer: r.metadata.consumerID, - MinIdle: 0, - Messages: []string{pendingID}, - }).Result() - if err != nil && !errors.Is(err, redis.Nil) { + claimResultSingleMsg, err := r.client.XClaimResult(ctx, + stream, + r.metadata.consumerID, + r.metadata.consumerID, + 0, + []string{pendingID}, + ) + if err != nil && !errors.Is(err, r.client.GetNilValueError()) { r.logger.Errorf("error claiming pending Redis message %s: %v", pendingID, err) continue } // Ack the message to remove it from the pending list. - if errors.Is(err, redis.Nil) { + if errors.Is(err, r.client.GetNilValueError()) { // Use the background context in case subscriptionCtx is already closed - if err = r.client.XAck(context.Background(), stream, r.metadata.consumerID, pendingID).Err(); err != nil { + if err = r.client.XAck(context.Background(), stream, r.metadata.consumerID, pendingID); err != nil { r.logger.Errorf("error acknowledging Redis message %s after failed claim for %s: %v", pendingID, stream, err) } } else { @@ -423,8 +411,13 @@ func (r *redisStreams) removeMessagesThatNoLongerExistFromPending(ctx context.Co } func (r *redisStreams) Close() error { - r.cancel() + if r.cancel != nil { + r.cancel() + } + if r.client == nil { + return nil + } return r.client.Close() } @@ -433,7 +426,7 @@ func (r *redisStreams) Features() []pubsub.Feature { } func (r *redisStreams) Ping() error { - if _, err := r.client.Ping(context.Background()).Result(); err != nil { + if _, err := r.client.PingResult(context.Background()); err != nil { return fmt.Errorf("redis pubsub: error connecting to redis at %s: %s", r.clientSettings.Host, err) } diff --git a/pubsub/redis/redis_test.go b/pubsub/redis/redis_test.go index 2ad82cd0c..99a5f2533 100644 --- a/pubsub/redis/redis_test.go +++ b/pubsub/redis/redis_test.go @@ -20,12 +20,13 @@ import ( "sync" "testing" - "github.com/go-redis/redis/v8" "github.com/stretchr/testify/assert" mdata "github.com/dapr/components-contrib/metadata" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" + + internalredis "github.com/dapr/components-contrib/internal/component/redis" ) func getFakeProperties() map[string]string { @@ -108,9 +109,9 @@ func TestProcessStreams(t *testing.T) { assert.Equal(t, 3, messageCount) } -func generateRedisStreamTestData(topicCount, messageCount int, data string) []redis.XMessage { - generateXMessage := func(id int) redis.XMessage { - return redis.XMessage{ +func generateRedisStreamTestData(topicCount, messageCount int, data string) []internalredis.RedisXMessage { + generateXMessage := func(id int) internalredis.RedisXMessage { + return internalredis.RedisXMessage{ ID: fmt.Sprintf("%d", id), Values: map[string]interface{}{ "data": data, @@ -118,7 +119,7 @@ func generateRedisStreamTestData(topicCount, messageCount int, data string) []re } } - xmessageArray := make([]redis.XMessage, messageCount) + xmessageArray := make([]internalredis.RedisXMessage, messageCount) for i := range xmessageArray { xmessageArray[i] = generateXMessage(i) } diff --git a/state/redis/redis.go b/state/redis/redis.go index 96d616c29..3ff79d4c3 100644 --- a/state/redis/redis.go +++ b/state/redis/redis.go @@ -20,7 +20,6 @@ import ( "strconv" "strings" - "github.com/go-redis/redis/v8" jsoniter "github.com/json-iterator/go" "github.com/dapr/components-contrib/contenttype" @@ -92,7 +91,7 @@ const ( // StateStore is a Redis state store. type StateStore struct { state.DefaultBulkStore - client redis.UniversalClient + client rediscomponent.RedisClient clientSettings *rediscomponent.Settings json jsoniter.API metadata rediscomponent.Metadata @@ -119,7 +118,7 @@ func NewRedisStateStore(logger logger.Logger) state.Store { } func (r *StateStore) Ping() error { - if _, err := r.client.Ping(context.Background()).Result(); err != nil { + if _, err := r.client.PingResult(context.Background()); err != nil { return fmt.Errorf("redis store: error connecting to redis at %s: %s", r.clientSettings.Host, err) } @@ -147,7 +146,7 @@ func (r *StateStore) Init(metadata state.Metadata) error { r.ctx, r.cancel = context.WithCancel(context.Background()) - if _, err = r.client.Ping(r.ctx).Result(); err != nil { + if _, err = r.client.PingResult(r.ctx); err != nil { return fmt.Errorf("redis store: error connecting to redis at %s: %v", r.clientSettings.Host, err) } @@ -168,7 +167,7 @@ func (r *StateStore) Features() []state.Feature { } func (r *StateStore) getConnectedSlaves() (int, error) { - res, err := r.client.Do(r.ctx, "INFO", "replication").Result() + res, err := r.client.DoRead(r.ctx, "INFO", "replication") if err != nil { return 0, err } @@ -209,12 +208,12 @@ func (r *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error } var delQuery string - if contentType, ok := req.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType { + if contentType, ok := req.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType && rediscomponent.ClientHasJSONSupport(r.client) { delQuery = delJSONQuery } else { delQuery = delDefaultQuery } - _, err = r.client.Do(ctx, "EVAL", delQuery, 1, req.Key, *req.ETag).Result() + err = r.client.DoWrite(ctx, "EVAL", delQuery, 1, req.Key, *req.ETag) if err != nil { return state.NewETagError(state.ETagMismatch, err) } @@ -222,9 +221,8 @@ func (r *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error return nil } -// Delete performs a delete operation. func (r *StateStore) directGet(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) { - res, err := r.client.Do(r.ctx, "GET", req.Key).Result() + res, err := r.client.DoRead(ctx, "GET", req.Key) if err != nil { return nil, err } @@ -241,14 +239,23 @@ func (r *StateStore) directGet(ctx context.Context, req *state.GetRequest) (*sta } func (r *StateStore) getDefault(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) { - res, err := r.client.Do(ctx, "HGETALL", req.Key).Result() // Prefer values with ETags + res, err := r.client.DoRead(ctx, "HGETALL", req.Key) // Prefer values with ETags if err != nil { return r.directGet(ctx, req) // Falls back to original get for backward compats. } if res == nil { return &state.GetResponse{}, nil } - vals := res.([]interface{}) + vals, ok := res.([]interface{}) + if !ok { + // we retrieved a JSON value from a non-JSON store + valMap := res.(map[interface{}]interface{}) + // convert valMap to []interface{} + vals = make([]interface{}, 0, len(valMap)) + for k, v := range valMap { + vals = append(vals, k, v) + } + } if len(vals) == 0 { return &state.GetResponse{}, nil } @@ -265,7 +272,7 @@ func (r *StateStore) getDefault(ctx context.Context, req *state.GetRequest) (*st } func (r *StateStore) getJSON(req *state.GetRequest) (*state.GetResponse, error) { - res, err := r.client.Do(r.ctx, "JSON.GET", req.Key).Result() + res, err := r.client.DoRead(r.ctx, "JSON.GET", req.Key) if err != nil { return nil, err } @@ -303,7 +310,7 @@ func (r *StateStore) getJSON(req *state.GetRequest) (*state.GetResponse, error) // Get retrieves state from redis with a key. func (r *StateStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) { - if contentType, ok := req.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType { + if contentType, ok := req.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType && rediscomponent.ClientHasJSONSupport(r.client) { return r.getJSON(req) } @@ -341,7 +348,7 @@ func (r *StateStore) Set(ctx context.Context, req *state.SetRequest) error { var bt []byte var setQuery string - if contentType, ok := req.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType { + if contentType, ok := req.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType && rediscomponent.ClientHasJSONSupport(r.client) { setQuery = setJSONQuery bt, _ = utils.Marshal(&jsonEntry{Data: req.Value}, r.json.Marshal) } else { @@ -349,7 +356,7 @@ func (r *StateStore) Set(ctx context.Context, req *state.SetRequest) error { bt, _ = utils.Marshal(req.Value, r.json.Marshal) } - err = r.client.Do(ctx, "EVAL", setQuery, 1, req.Key, ver, bt, firstWrite).Err() + err = r.client.DoWrite(ctx, "EVAL", setQuery, 1, req.Key, ver, bt, firstWrite) if err != nil { if req.ETag != nil { return state.NewETagError(state.ETagMismatch, err) @@ -359,21 +366,21 @@ func (r *StateStore) Set(ctx context.Context, req *state.SetRequest) error { } if ttl != nil && *ttl > 0 { - _, err = r.client.Do(ctx, "EXPIRE", req.Key, *ttl).Result() + err = r.client.DoWrite(ctx, "EXPIRE", req.Key, *ttl) if err != nil { return fmt.Errorf("failed to set key %s ttl: %s", req.Key, err) } } if ttl != nil && *ttl <= 0 { - _, err = r.client.Do(ctx, "PERSIST", req.Key).Result() + err = r.client.DoWrite(ctx, "PERSIST", req.Key) if err != nil { return fmt.Errorf("failed to persist key %s: %s", req.Key, err) } } if req.Options.Consistency == state.Strong && r.replicas > 0 { - _, err = r.client.Do(ctx, "WAIT", r.replicas, 1000).Result() + err = r.client.DoWrite(ctx, "WAIT", r.replicas, 1000) if err != nil { return fmt.Errorf("redis waiting for %v replicas to acknowledge write, err: %s", r.replicas, err.Error()) } @@ -386,7 +393,7 @@ func (r *StateStore) Set(ctx context.Context, req *state.SetRequest) error { func (r *StateStore) Multi(ctx context.Context, request *state.TransactionalStateRequest) error { var setQuery, delQuery string var isJSON bool - if contentType, ok := request.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType { + if contentType, ok := request.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType && rediscomponent.ClientHasJSONSupport(r.client) { isJSON = true setQuery = setJSONQuery delQuery = delJSONQuery @@ -434,7 +441,7 @@ func (r *StateStore) Multi(ctx context.Context, request *state.TransactionalStat } } - _, err := pipe.Exec(ctx) + err := pipe.Exec(ctx) return err } @@ -442,15 +449,15 @@ func (r *StateStore) Multi(ctx context.Context, request *state.TransactionalStat func (r *StateStore) registerSchemas() error { for name, elem := range r.querySchemas { r.logger.Infof("redis: create query index %s", name) - if err := r.client.Do(r.ctx, elem.schema...).Err(); err != nil { + if err := r.client.DoWrite(r.ctx, elem.schema...); err != nil { if err.Error() != "Index already exists" { return err } r.logger.Infof("redis: drop stale query index %s", name) - if err = r.client.Do(r.ctx, "FT.DROPINDEX", name).Err(); err != nil { + if err = r.client.DoWrite(r.ctx, "FT.DROPINDEX", name); err != nil { return err } - if err = r.client.Do(r.ctx, elem.schema...).Err(); err != nil { + if err = r.client.DoWrite(r.ctx, elem.schema...); err != nil { return err } } @@ -509,6 +516,9 @@ func (r *StateStore) parseTTL(req *state.SetRequest) (*int, error) { // Query executes a query against store. func (r *StateStore) Query(ctx context.Context, req *state.QueryRequest) (*state.QueryResponse, error) { + if !rediscomponent.ClientHasJSONSupport(r.client) { + return nil, fmt.Errorf("redis-json server support is required for query capability") + } indexName, ok := daprmetadata.TryGetQueryIndexName(req.Metadata) if !ok { return nil, fmt.Errorf("query index not found") diff --git a/state/redis/redis_query.go b/state/redis/redis_query.go index d539dfece..de88515cf 100644 --- a/state/redis/redis_query.go +++ b/state/redis/redis_query.go @@ -20,10 +20,9 @@ import ( "strconv" "strings" + rediscomponent "github.com/dapr/components-contrib/internal/component/redis" "github.com/dapr/components-contrib/state" "github.com/dapr/components-contrib/state/query" - - "github.com/go-redis/redis/v8" ) var ErrMultipleSortBy error = errors.New("multiple SORTBY steps are not allowed. Sort multiple fields in a single step") @@ -190,9 +189,9 @@ func (q *Query) Finalize(filters string, qq *query.Query) error { return nil } -func (q *Query) execute(ctx context.Context, client redis.UniversalClient) ([]state.QueryItem, string, error) { +func (q *Query) execute(ctx context.Context, client rediscomponent.RedisClient) ([]state.QueryItem, string, error) { query := append(append([]interface{}{"FT.SEARCH", q.schemaName}, q.query...), "RETURN", "2", "$.data", "$.version") - ret, err := client.Do(ctx, query...).Result() + ret, err := client.DoRead(ctx, query...) if err != nil { return nil, "", err } diff --git a/state/redis/redis_test.go b/state/redis/redis_test.go index dfa32f58e..d116ba284 100644 --- a/state/redis/redis_test.go +++ b/state/redis/redis_test.go @@ -239,7 +239,7 @@ func TestTransactionalUpsert(t *testing.T) { }) assert.Equal(t, nil, err) - res, err := c.Do(context.Background(), "HGETALL", "weapon").Result() + res, err := c.DoRead(context.Background(), "HGETALL", "weapon") assert.Equal(t, nil, err) vals := res.([]interface{}) @@ -248,15 +248,15 @@ func TestTransactionalUpsert(t *testing.T) { assert.Equal(t, ptr.Of("1"), version) assert.Equal(t, `"deathstar"`, data) - res, err = c.Do(context.Background(), "TTL", "weapon").Result() + res, err = c.DoRead(context.Background(), "TTL", "weapon") assert.Equal(t, nil, err) assert.Equal(t, int64(-1), res) - res, err = c.Do(context.Background(), "TTL", "weapon2").Result() + res, err = c.DoRead(context.Background(), "TTL", "weapon2") assert.Equal(t, nil, err) assert.Equal(t, int64(123), res) - res, err = c.Do(context.Background(), "TTL", "weapon3").Result() + res, err = c.DoRead(context.Background(), "TTL", "weapon3") assert.Equal(t, nil, err) assert.Equal(t, int64(-1), res) } @@ -290,7 +290,7 @@ func TestTransactionalDelete(t *testing.T) { }) assert.Equal(t, nil, err) - res, err := c.Do(context.Background(), "HGETALL", "weapon").Result() + res, err := c.DoRead(context.Background(), "HGETALL", "weapon") assert.Equal(t, nil, err) vals := res.([]interface{}) @@ -335,7 +335,7 @@ func TestRequestsWithGlobalTTL(t *testing.T) { Key: "weapon100", Value: "deathstar100", }) - ttl, _ := ss.client.TTL(ss.ctx, "weapon100").Result() + ttl, _ := ss.client.TTLResult(ss.ctx, "weapon100") assert.Equal(t, time.Duration(globalTTLInSeconds)*time.Second, ttl) }) @@ -349,7 +349,7 @@ func TestRequestsWithGlobalTTL(t *testing.T) { "ttlInSeconds": strconv.Itoa(requestTTL), }, }) - ttl, _ := ss.client.TTL(ss.ctx, "weapon100").Result() + ttl, _ := ss.client.TTLResult(ss.ctx, "weapon100") assert.Equal(t, time.Duration(requestTTL)*time.Second, ttl) }) @@ -388,7 +388,7 @@ func TestRequestsWithGlobalTTL(t *testing.T) { }) assert.Equal(t, nil, err) - res, err := c.Do(context.Background(), "HGETALL", "weapon").Result() + res, err := c.DoRead(context.Background(), "HGETALL", "weapon") assert.Equal(t, nil, err) vals := res.([]interface{}) @@ -397,15 +397,15 @@ func TestRequestsWithGlobalTTL(t *testing.T) { assert.Equal(t, ptr.Of("1"), version) assert.Equal(t, `"deathstar"`, data) - res, err = c.Do(context.Background(), "TTL", "weapon").Result() + res, err = c.DoRead(context.Background(), "TTL", "weapon") assert.Equal(t, nil, err) assert.Equal(t, int64(globalTTLInSeconds), res) - res, err = c.Do(context.Background(), "TTL", "weapon2").Result() + res, err = c.DoRead(context.Background(), "TTL", "weapon2") assert.Equal(t, nil, err) assert.Equal(t, int64(123), res) - res, err = c.Do(context.Background(), "TTL", "weapon3").Result() + res, err = c.DoRead(context.Background(), "TTL", "weapon3") assert.Equal(t, nil, err) assert.Equal(t, int64(-1), res) }) @@ -432,7 +432,7 @@ func TestSetRequestWithTTL(t *testing.T) { }, }) - ttl, _ := ss.client.TTL(ss.ctx, "weapon100").Result() + ttl, _ := ss.client.TTLResult(ss.ctx, "weapon100") assert.Equal(t, time.Duration(ttlInSeconds)*time.Second, ttl) }) @@ -443,7 +443,7 @@ func TestSetRequestWithTTL(t *testing.T) { Value: "deathstar200", }) - ttl, _ := ss.client.TTL(ss.ctx, "weapon200").Result() + ttl, _ := ss.client.TTLResult(ss.ctx, "weapon200") assert.Equal(t, time.Duration(-1), ttl) }) @@ -453,7 +453,7 @@ func TestSetRequestWithTTL(t *testing.T) { Key: "weapon300", Value: "deathstar300", }) - ttl, _ := ss.client.TTL(ss.ctx, "weapon300").Result() + ttl, _ := ss.client.TTLResult(ss.ctx, "weapon300") assert.Equal(t, time.Duration(-1), ttl) // make the key no longer persistent @@ -465,7 +465,7 @@ func TestSetRequestWithTTL(t *testing.T) { "ttlInSeconds": strconv.Itoa(ttlInSeconds), }, }) - ttl, _ = ss.client.TTL(ss.ctx, "weapon300").Result() + ttl, _ = ss.client.TTLResult(ss.ctx, "weapon300") assert.Equal(t, time.Duration(ttlInSeconds)*time.Second, ttl) // make the key persistent again @@ -476,7 +476,7 @@ func TestSetRequestWithTTL(t *testing.T) { "ttlInSeconds": strconv.Itoa(-1), }, }) - ttl, _ = ss.client.TTL(ss.ctx, "weapon300").Result() + ttl, _ = ss.client.TTLResult(ss.ctx, "weapon300") assert.Equal(t, time.Duration(-1), ttl) }) } @@ -508,7 +508,7 @@ func TestTransactionalDeleteNoEtag(t *testing.T) { }) assert.Equal(t, nil, err) - res, err := c.Do(context.Background(), "HGETALL", "weapon100").Result() + res, err := c.DoRead(context.Background(), "HGETALL", "weapon100") assert.Equal(t, nil, err) vals := res.([]interface{}) @@ -532,7 +532,7 @@ func TestGetMetadata(t *testing.T) { assert.Equal(t, metadataInfo["idleCheckFrequency"], "redis.Duration") } -func setupMiniredis() (*miniredis.Miniredis, *redis.Client) { +func setupMiniredis() (*miniredis.Miniredis, rediscomponent.RedisClient) { s, err := miniredis.Run() if err != nil { panic(err) @@ -542,5 +542,5 @@ func setupMiniredis() (*miniredis.Miniredis, *redis.Client) { DB: defaultDB, } - return s, redis.NewClient(opts) + return s, rediscomponent.ClientFromV8Client(redis.NewClient(opts)) } diff --git a/tests/certification/bindings/redis/go.mod b/tests/certification/bindings/redis/go.mod index c6f405744..e899f3b84 100644 --- a/tests/certification/bindings/redis/go.mod +++ b/tests/certification/bindings/redis/go.mod @@ -39,6 +39,7 @@ require ( github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.20.0 // indirect github.com/go-openapi/swag v0.19.14 // indirect + github.com/go-redis/redis/v9 v9.0.0-rc.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/mock v1.6.0 // indirect diff --git a/tests/certification/bindings/redis/go.sum b/tests/certification/bindings/redis/go.sum index 9e3a2f774..c7b20ed27 100644 --- a/tests/certification/bindings/redis/go.sum +++ b/tests/certification/bindings/redis/go.sum @@ -157,6 +157,8 @@ github.com/go-openapi/swag v0.19.14 h1:gm3vOOXfiuw5i9p5N9xJvfjvuofpyvLA9Wr6QfK5F github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-redis/redis/v9 v9.0.0-rc.2 h1:IN1eI8AvJJeWHjMW/hlFAv2sAfvTun2DVksDDJ3a6a0= +github.com/go-redis/redis/v9 v9.0.0-rc.2/go.mod h1:cgBknjwcBJa2prbnuHH/4k/Mlj4r0pWNV2HBanHujfY= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -378,7 +380,7 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo/v2 v2.4.0 h1:+Ig9nvqgS5OBSACXNk15PLdp0U9XPYROt9CFzVdFGIs= -github.com/onsi/gomega v1.23.0 h1:/oxKu9c2HVap+F3PfKort2Hw5DEU+HGlW8n+tguWsys= +github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/openzipkin/zipkin-go v0.4.1 h1:kNd/ST2yLLWhaWrkgchya40TJabe8Hioj9udfPcEO5A= github.com/openzipkin/zipkin-go v0.4.1/go.mod h1:qY0VqDSN1pOBN94dBc6w2GJlWLiovAyg7Qt6/I9HecM= diff --git a/tests/certification/state/redis/go.mod b/tests/certification/state/redis/go.mod index 38de2109e..f93b0107f 100644 --- a/tests/certification/state/redis/go.mod +++ b/tests/certification/state/redis/go.mod @@ -40,6 +40,7 @@ require ( github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.20.0 // indirect github.com/go-openapi/swag v0.19.14 // indirect + github.com/go-redis/redis/v9 v9.0.0-rc.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/mock v1.6.0 // indirect diff --git a/tests/certification/state/redis/go.sum b/tests/certification/state/redis/go.sum index 9e3a2f774..c7b20ed27 100644 --- a/tests/certification/state/redis/go.sum +++ b/tests/certification/state/redis/go.sum @@ -157,6 +157,8 @@ github.com/go-openapi/swag v0.19.14 h1:gm3vOOXfiuw5i9p5N9xJvfjvuofpyvLA9Wr6QfK5F github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-redis/redis/v9 v9.0.0-rc.2 h1:IN1eI8AvJJeWHjMW/hlFAv2sAfvTun2DVksDDJ3a6a0= +github.com/go-redis/redis/v9 v9.0.0-rc.2/go.mod h1:cgBknjwcBJa2prbnuHH/4k/Mlj4r0pWNV2HBanHujfY= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -378,7 +380,7 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo/v2 v2.4.0 h1:+Ig9nvqgS5OBSACXNk15PLdp0U9XPYROt9CFzVdFGIs= -github.com/onsi/gomega v1.23.0 h1:/oxKu9c2HVap+F3PfKort2Hw5DEU+HGlW8n+tguWsys= +github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/openzipkin/zipkin-go v0.4.1 h1:kNd/ST2yLLWhaWrkgchya40TJabe8Hioj9udfPcEO5A= github.com/openzipkin/zipkin-go v0.4.1/go.mod h1:qY0VqDSN1pOBN94dBc6w2GJlWLiovAyg7Qt6/I9HecM= diff --git a/tests/config/bindings/redis/bindings.yml b/tests/config/bindings/redis/v6/bindings.yml similarity index 100% rename from tests/config/bindings/redis/bindings.yml rename to tests/config/bindings/redis/v6/bindings.yml diff --git a/tests/config/bindings/redis/v7/bindings.yml b/tests/config/bindings/redis/v7/bindings.yml new file mode 100644 index 000000000..2e614891d --- /dev/null +++ b/tests/config/bindings/redis/v7/bindings.yml @@ -0,0 +1,17 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: redis-binding + namespace: default +spec: + type: bindings.redis + version: v1 + metadata: + - name: redisHost + value: localhost:6380 + - name: redisPassword + value: + - name: enableTLS + value: false + - name: redisVersion + value: "7" diff --git a/tests/config/bindings/tests.yml b/tests/config/bindings/tests.yml index 4898822ea..837ef7c9f 100644 --- a/tests/config/bindings/tests.yml +++ b/tests/config/bindings/tests.yml @@ -7,7 +7,12 @@ ## method: specific to http component, what method to use componentType: bindings components: - - component: redis + - component: redis.v6 + operations: ["create", "operations"] + config: + output: + key: $((uuid)) + - component: redis.v7 operations: ["create", "operations"] config: output: diff --git a/tests/config/pubsub/redis/pubsub.yml b/tests/config/pubsub/redis/v6/pubsub.yml similarity index 100% rename from tests/config/pubsub/redis/pubsub.yml rename to tests/config/pubsub/redis/v6/pubsub.yml diff --git a/tests/config/pubsub/redis/v7/pubsub.yml b/tests/config/pubsub/redis/v7/pubsub.yml new file mode 100644 index 000000000..ca4a1af5d --- /dev/null +++ b/tests/config/pubsub/redis/v7/pubsub.yml @@ -0,0 +1,20 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: pubsub +spec: + type: pubsub.redis + version: v1 + metadata: + - name: redisHost + value: localhost:6380 + - name: redisPassword + value: "" + - name: consumerID + value: "testConsumer" + - name: processingTimeout + value: 5s + - name: redeliverInterval + value: 1s + - name: concurrency + value: 1 diff --git a/tests/config/pubsub/tests.yml b/tests/config/pubsub/tests.yml index 70ecceff8..3d19bbb5f 100644 --- a/tests/config/pubsub/tests.yml +++ b/tests/config/pubsub/tests.yml @@ -41,7 +41,11 @@ components: testMultiTopic1Name: dapr-conf-queue-multi1 testMultiTopic2Name: dapr-conf-queue-multi2 checkInOrderProcessing: false - - component: redis + - component: redis.v6 + operations: ["publish", "subscribe", "multiplehandlers"] + config: + checkInOrderProcessing: false + - component: redis.v7 operations: ["publish", "subscribe", "multiplehandlers"] config: checkInOrderProcessing: false diff --git a/tests/config/state/redis/statestore.yaml b/tests/config/state/redis/v6/statestore.yaml similarity index 100% rename from tests/config/state/redis/statestore.yaml rename to tests/config/state/redis/v6/statestore.yaml diff --git a/tests/config/state/redis/v7/statestore.yaml b/tests/config/state/redis/v7/statestore.yaml new file mode 100644 index 000000000..c92247bdd --- /dev/null +++ b/tests/config/state/redis/v7/statestore.yaml @@ -0,0 +1,13 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.redis + metadata: + - name: redisHost + value: localhost:6380 + - name: redisPassword + value: "" + - name: redisVersion + value: "7" diff --git a/tests/config/state/tests.yml b/tests/config/state/tests.yml index d16efe044..d38650db1 100644 --- a/tests/config/state/tests.yml +++ b/tests/config/state/tests.yml @@ -1,8 +1,11 @@ # Supported operations: set, get, delete, bulkset, bulkdelete, transaction, etag, first-write, query, ttl componentType: state components: - - component: redis + - component: redis.v6 allOperations: true + - component: redis.v7 + allOperations: false + operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "first-write" ] - component: mongodb operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "first-write", "query" ] - component: memcached diff --git a/tests/conformance/common.go b/tests/conformance/common.go index 4faff0db2..62f5f80bb 100644 --- a/tests/conformance/common.go +++ b/tests/conformance/common.go @@ -93,7 +93,8 @@ import ( const ( eventhubs = "azure.eventhubs" - redis = "redis" + redisv6 = "redis.v6" + redisv7 = "redis.v7" kafka = "kafka" mqtt = "mqtt" generateUUID = "$((uuid))" @@ -375,7 +376,9 @@ func (tc *TestConfiguration) Run(t *testing.T) { func loadPubSub(tc TestComponent) pubsub.PubSub { var pubsub pubsub.PubSub switch tc.Component { - case redis: + case redisv6: + pubsub = p_redis.NewRedisStreams(testLogger) + case redisv7: pubsub = p_redis.NewRedisStreams(testLogger) case eventhubs: pubsub = p_eventhubs.NewAzureEventHubs(testLogger) @@ -435,7 +438,9 @@ func loadSecretStore(tc TestComponent) secretstores.SecretStore { func loadStateStore(tc TestComponent) state.Store { var store state.Store switch tc.Component { - case redis: + case redisv6: + store = s_redis.NewRedisStateStore(testLogger) + case redisv7: store = s_redis.NewRedisStateStore(testLogger) case "azure.blobstorage": store = s_blobstorage.NewAzureBlobStorageStore(testLogger) @@ -478,7 +483,9 @@ func loadOutputBindings(tc TestComponent) bindings.OutputBinding { var binding bindings.OutputBinding switch tc.Component { - case redis: + case redisv6: + binding = b_redis.NewRedis(testLogger) + case redisv7: binding = b_redis.NewRedis(testLogger) case "azure.blobstorage": binding = b_azure_blobstorage.NewAzureBlobStorage(testLogger) diff --git a/tests/conformance/pubsub/pubsub.go b/tests/conformance/pubsub/pubsub.go index d04e17b79..a0193e838 100644 --- a/tests/conformance/pubsub/pubsub.go +++ b/tests/conformance/pubsub/pubsub.go @@ -508,6 +508,7 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c }() for i := 0; i < 3; i++ { + t.Logf("Starting iteration %d", i) switch i { case 1: // On iteration 1, close the first subscriber subscribe1Cancel() @@ -544,11 +545,11 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c Topic: topic, Metadata: config.PublishMetadata, }) - assert.NoError(t, err, "expected no error on publishing data %s on topic %s", data, topic) + assert.NoError(t, err, "expected no error on publishing data %s on topic %s", string(data), topic) } allSentCh <- true - t.Logf("waiting for %v to complete read", config.MaxReadDuration) + t.Logf("Waiting for %v to complete read", config.MaxReadDuration) <-wait } })