Merge branch 'master' into optimize-bulkpubres-struct

This commit is contained in:
Mukundan Sundararajan 2022-12-17 11:05:39 +05:30
commit b7817c0ee7
33 changed files with 1184 additions and 263 deletions

View File

@ -0,0 +1,8 @@
version: '2'
services:
redis:
image: redis:7
ports:
- "6380:6379"
environment:
- REDIS_REPLICATION_MODE=master

View File

@ -57,7 +57,8 @@ jobs:
- bindings.mqtt-mosquitto
- bindings.mqtt-vernemq
- bindings.postgres
- bindings.redis
- bindings.redis.v6
- bindings.redis.v7
- bindings.kubemq
- bindings.rabbitmq
- pubsub.aws.snssqs
@ -69,7 +70,7 @@ jobs:
- pubsub.natsstreaming
- pubsub.pulsar
- pubsub.rabbitmq
- pubsub.redis
- pubsub.redis.v6
- pubsub.kafka-wurstmeister
- pubsub.kafka-confluent
- pubsub.kubemq
@ -83,7 +84,8 @@ jobs:
- state.mysql.mysql
- state.mysql.mariadb
- state.postgresql
- state.redis
- state.redis.v6
- state.redis.v7
- state.sqlserver
- state.in-memory
- state.cockroachdb
@ -254,9 +256,13 @@ jobs:
echo "$CERT_NAME=$CERT_FILE" >> $GITHUB_ENV
done
- name: Start Redis
- name: Start Redis 6 with Redis JSON
run: docker-compose -f ./.github/infrastructure/docker-compose-redisjson.yml -p redis up -d
if: contains(matrix.component, 'redis')
if: contains(matrix.component, 'redis.v6')
- name: Start Redis 7
run: docker-compose -f ./.github/infrastructure/docker-compose-redis7.yml -p redis up -d
if: contains(matrix.component, 'redis.v7')
- name: Start Temporal
run: docker-compose -f ./.github/infrastructure/docker-compose-temporal.yml -p temporal up -d

View File

@ -15,11 +15,9 @@ package redis
import (
"context"
"errors"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/pkg/errors"
"github.com/dapr/components-contrib/bindings"
rediscomponent "github.com/dapr/components-contrib/internal/component/redis"
"github.com/dapr/kit/logger"
@ -27,7 +25,7 @@ import (
// Redis is a redis output binding.
type Redis struct {
client redis.UniversalClient
client rediscomponent.RedisClient
clientSettings *rediscomponent.Settings
logger logger.Logger
@ -49,7 +47,7 @@ func (r *Redis) Init(meta bindings.Metadata) (err error) {
r.ctx, r.cancel = context.WithCancel(context.Background())
_, err = r.client.Ping(r.ctx).Result()
_, err = r.client.PingResult(r.ctx)
if err != nil {
return fmt.Errorf("redis binding: error connecting to redis at %s: %s", r.clientSettings.Host, err)
}
@ -58,7 +56,7 @@ func (r *Redis) Init(meta bindings.Metadata) (err error) {
}
func (r *Redis) Ping() error {
if _, err := r.client.Ping(r.ctx).Result(); err != nil {
if _, err := r.client.PingResult(r.ctx); err != nil {
return fmt.Errorf("redis binding: error connecting to redis at %s: %s", r.clientSettings.Host, err)
}
@ -77,12 +75,12 @@ func (r *Redis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindi
if key, ok := req.Metadata["key"]; ok && key != "" {
switch req.Operation {
case bindings.DeleteOperation:
err := r.client.Del(ctx, key).Err()
err := r.client.Del(ctx, key)
if err != nil {
return nil, err
}
case bindings.GetOperation:
data, err := r.client.Get(ctx, key).Result()
data, err := r.client.Get(ctx, key)
if err != nil {
return nil, err
}
@ -90,7 +88,7 @@ func (r *Redis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindi
rep.Data = []byte(data)
return rep, nil
case bindings.CreateOperation:
_, err := r.client.Do(ctx, "SET", key, req.Data).Result()
err := r.client.DoWrite(ctx, "SET", key, req.Data)
if err != nil {
return nil, err
}

View File

@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/dapr/components-contrib/bindings"
internalredis "github.com/dapr/components-contrib/internal/component/redis"
"github.com/dapr/kit/logger"
)
@ -34,13 +35,14 @@ func TestInvokeCreate(t *testing.T) {
s, c := setupMiniredis()
defer s.Close()
// miniRedis is compatible with the existing v8 client
bind := &Redis{
client: c,
logger: logger.NewLogger("test"),
}
bind.ctx, bind.cancel = context.WithCancel(context.Background())
_, err := c.Do(context.Background(), "GET", testKey).Result()
_, err := c.DoRead(context.Background(), "GET", testKey)
assert.Equal(t, redis.Nil, err)
bindingRes, err := bind.Invoke(context.TODO(), &bindings.InvokeRequest{
@ -51,7 +53,7 @@ func TestInvokeCreate(t *testing.T) {
assert.Equal(t, nil, err)
assert.Equal(t, true, bindingRes == nil)
getRes, err := c.Do(context.Background(), "GET", testKey).Result()
getRes, err := c.DoRead(context.Background(), "GET", testKey)
assert.Equal(t, nil, err)
assert.Equal(t, true, getRes == testData)
}
@ -66,7 +68,7 @@ func TestInvokeGet(t *testing.T) {
}
bind.ctx, bind.cancel = context.WithCancel(context.Background())
_, err := c.Do(context.Background(), "SET", testKey, testData).Result()
err := c.DoWrite(context.Background(), "SET", testKey, testData)
assert.Equal(t, nil, err)
bindingRes, err := bind.Invoke(context.TODO(), &bindings.InvokeRequest{
@ -87,10 +89,10 @@ func TestInvokeDelete(t *testing.T) {
}
bind.ctx, bind.cancel = context.WithCancel(context.Background())
_, err := c.Do(context.Background(), "SET", testKey, testData).Result()
err := c.DoWrite(context.Background(), "SET", testKey, testData)
assert.Equal(t, nil, err)
getRes, err := c.Do(context.Background(), "GET", testKey).Result()
getRes, err := c.DoRead(context.Background(), "GET", testKey)
assert.Equal(t, nil, err)
assert.Equal(t, true, getRes == testData)
@ -101,12 +103,12 @@ func TestInvokeDelete(t *testing.T) {
assert.Equal(t, nil, err)
rgetRep, err := c.Do(context.Background(), "GET", testKey).Result()
rgetRep, err := c.DoRead(context.Background(), "GET", testKey)
assert.Equal(t, redis.Nil, err)
assert.Equal(t, nil, rgetRep)
}
func setupMiniredis() (*miniredis.Miniredis, *redis.Client) {
func setupMiniredis() (*miniredis.Miniredis, internalredis.RedisClient) {
s, err := miniredis.Run()
if err != nil {
panic(err)
@ -116,5 +118,5 @@ func setupMiniredis() (*miniredis.Miniredis, *redis.Client) {
DB: 0,
}
return s, redis.NewClient(opts)
return s, internalredis.ClientFromV8Client(redis.NewClient(opts))
}

2
go.mod
View File

@ -59,6 +59,7 @@ require (
github.com/fasthttp-contrib/sessions v0.0.0-20160905201309-74f6ac73d5d5
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis/v8 v8.11.5
github.com/go-redis/redis/v9 v9.0.0-rc.2
github.com/go-sql-driver/mysql v1.7.0
github.com/gocql/gocql v1.3.1
github.com/golang-jwt/jwt/v4 v4.4.3
@ -112,6 +113,7 @@ require (
go.uber.org/ratelimit v0.2.0
golang.org/x/crypto v0.4.0
golang.org/x/exp v0.0.0-20221028150844-83b7d23a625f
golang.org/x/mod v0.6.0
golang.org/x/net v0.4.0
golang.org/x/oauth2 v0.3.0
google.golang.org/api v0.104.0

6
go.sum
View File

@ -771,6 +771,8 @@ github.com/go-playground/validator/v10 v10.11.0 h1:0W+xRM511GY47Yy3bZUbJVitCNg2B
github.com/go-playground/validator/v10 v10.11.0/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-redis/redis/v9 v9.0.0-rc.2 h1:IN1eI8AvJJeWHjMW/hlFAv2sAfvTun2DVksDDJ3a6a0=
github.com/go-redis/redis/v9 v9.0.0-rc.2/go.mod h1:cgBknjwcBJa2prbnuHH/4k/Mlj4r0pWNV2HBanHujfY=
github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
@ -1361,7 +1363,7 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
github.com/onsi/gomega v1.23.0 h1:/oxKu9c2HVap+F3PfKort2Hw5DEU+HGlW8n+tguWsys=
github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/open-policy-agent/opa v0.47.3 h1:Uj8zw+q6Cvv1iiQFh704Q6sl3fKVvk35WZNJLsd6mgk=
github.com/open-policy-agent/opa v0.47.3/go.mod h1:I5DbT677OGqfk9gvu5i54oIt0rrVf4B5pedpqDquAXo=
@ -1837,6 +1839,8 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.6.0 h1:b9gGHsz9/HhJ3HF5DHQytPpuwocVTChQJK3AvoLRD5I=
golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI=
golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=

View File

@ -14,12 +14,14 @@ limitations under the License.
package redis
import (
"crypto/tls"
"context"
"fmt"
"strings"
"time"
"github.com/go-redis/redis/v8"
"golang.org/x/mod/semver"
"github.com/dapr/kit/ptr"
)
const (
@ -27,7 +29,53 @@ const (
NodeType = "node"
)
func ParseClientFromProperties(properties map[string]string, defaultSettings *Settings) (client redis.UniversalClient, settings *Settings, err error) {
type RedisXMessage struct {
ID string
Values map[string]interface{}
}
type RedisXStream struct {
Stream string
Messages []RedisXMessage
}
type RedisXPendingExt struct {
ID string
Consumer string
Idle time.Duration
RetryCount int64
}
type RedisPipeliner interface {
Exec(ctx context.Context) error
Do(ctx context.Context, args ...interface{})
}
var clientHasJSONSupport *bool
//nolint:interfacebloat
type RedisClient interface {
GetNilValueError() RedisError
Context() context.Context
DoRead(ctx context.Context, args ...interface{}) (interface{}, error)
DoWrite(ctx context.Context, args ...interface{}) error
Del(ctx context.Context, keys ...string) error
Get(ctx context.Context, key string) (string, error)
Close() error
PingResult(ctx context.Context) (string, error)
SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) (*bool, error)
EvalInt(ctx context.Context, script string, keys []string, args ...interface{}) (*int, error, error)
XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error)
XGroupCreateMkStream(ctx context.Context, stream string, group string, start string) error
XAck(ctx context.Context, stream string, group string, messageID string) error
XReadGroupResult(ctx context.Context, group string, consumer string, streams []string, count int64, block time.Duration) ([]RedisXStream, error)
XPendingExtResult(ctx context.Context, stream string, group string, start string, end string, count int64) ([]RedisXPendingExt, error)
XClaimResult(ctx context.Context, stream string, group string, consumer string, minIdleTime time.Duration, messageIDs []string) ([]RedisXMessage, error)
TxPipeline() RedisPipeliner
TTLResult(ctx context.Context, key string) (time.Duration, error)
}
func ParseClientFromProperties(properties map[string]string, defaultSettings *Settings) (client RedisClient, settings *Settings, err error) {
if defaultSettings == nil {
settings = &Settings{}
} else {
@ -37,110 +85,79 @@ func ParseClientFromProperties(properties map[string]string, defaultSettings *Se
if err != nil {
return nil, nil, fmt.Errorf("redis client configuration error: %w", err)
}
var c RedisClient
if settings.Failover {
return newFailoverClient(settings), settings, nil
c = newV8FailoverClient(settings)
} else {
c = newV8Client(settings)
}
version, versionErr := GetServerVersion(c)
c.Close() // close the client to avoid leaking connections
return newClient(settings), settings, nil
useNewClient := false
if versionErr != nil {
// we couldn't query the server version, so we will assume the v8 client is not supported
useNewClient = true
} else if semver.Compare("v"+version, "v7.0.0") > -1 {
// if the server version is >= 7, we will use the v9 client
useNewClient = true
}
if useNewClient {
if settings.Failover {
return newV9FailoverClient(settings), settings, nil
}
return newV9Client(settings), settings, nil
} else {
if settings.Failover {
return newV8FailoverClient(settings), settings, nil
}
return newV8Client(settings), settings, nil
}
}
func newFailoverClient(s *Settings) redis.UniversalClient {
if s == nil {
return nil
func ClientHasJSONSupport(c RedisClient) bool {
if clientHasJSONSupport != nil {
return *clientHasJSONSupport
}
opts := &redis.FailoverOptions{
DB: s.DB,
MasterName: s.SentinelMasterName,
SentinelAddrs: []string{s.Host},
Password: s.Password,
Username: s.Username,
MaxRetries: s.RedisMaxRetries,
MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval),
MinRetryBackoff: time.Duration(s.RedisMinRetryInterval),
DialTimeout: time.Duration(s.DialTimeout),
ReadTimeout: time.Duration(s.ReadTimeout),
WriteTimeout: time.Duration(s.WriteTimeout),
PoolSize: s.PoolSize,
MaxConnAge: time.Duration(s.MaxConnAge),
MinIdleConns: s.MinIdleConns,
PoolTimeout: time.Duration(s.PoolTimeout),
IdleCheckFrequency: time.Duration(s.IdleCheckFrequency),
IdleTimeout: time.Duration(s.IdleTimeout),
bgctx := context.Background()
ctx, cancel := context.WithTimeout(bgctx, 5*time.Second)
defer cancel()
err := c.DoWrite(ctx, "JSON.GET")
if err == nil {
clientHasJSONSupport = ptr.Of(true)
return true
}
/* #nosec */
if s.EnableTLS {
opts.TLSConfig = &tls.Config{
InsecureSkipVerify: s.EnableTLS,
}
if strings.HasPrefix(err.Error(), "ERR unknown command") {
clientHasJSONSupport = ptr.Of(false)
return false
}
if s.RedisType == ClusterType {
opts.SentinelAddrs = strings.Split(s.Host, ",")
return redis.NewFailoverClusterClient(opts)
}
return redis.NewFailoverClient(opts)
clientHasJSONSupport = ptr.Of(true)
return true
}
func newClient(s *Settings) redis.UniversalClient {
if s == nil {
return nil
func GetServerVersion(c RedisClient) (string, error) {
bgctx := context.Background()
ctx, cancel := context.WithTimeout(bgctx, 5*time.Second)
defer cancel()
res, err := c.DoRead(ctx, "INFO", "server")
if err != nil {
return "", err
}
if s.RedisType == ClusterType {
options := &redis.ClusterOptions{
Addrs: strings.Split(s.Host, ","),
Password: s.Password,
Username: s.Username,
MaxRetries: s.RedisMaxRetries,
MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval),
MinRetryBackoff: time.Duration(s.RedisMinRetryInterval),
DialTimeout: time.Duration(s.DialTimeout),
ReadTimeout: time.Duration(s.ReadTimeout),
WriteTimeout: time.Duration(s.WriteTimeout),
PoolSize: s.PoolSize,
MaxConnAge: time.Duration(s.MaxConnAge),
MinIdleConns: s.MinIdleConns,
PoolTimeout: time.Duration(s.PoolTimeout),
IdleCheckFrequency: time.Duration(s.IdleCheckFrequency),
IdleTimeout: time.Duration(s.IdleTimeout),
}
/* #nosec */
if s.EnableTLS {
options.TLSConfig = &tls.Config{
InsecureSkipVerify: s.EnableTLS,
}
}
return redis.NewClusterClient(options)
}
options := &redis.Options{
Addr: s.Host,
Password: s.Password,
Username: s.Username,
DB: s.DB,
MaxRetries: s.RedisMaxRetries,
MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval),
MinRetryBackoff: time.Duration(s.RedisMinRetryInterval),
DialTimeout: time.Duration(s.DialTimeout),
ReadTimeout: time.Duration(s.ReadTimeout),
WriteTimeout: time.Duration(s.WriteTimeout),
PoolSize: s.PoolSize,
MaxConnAge: time.Duration(s.MaxConnAge),
MinIdleConns: s.MinIdleConns,
PoolTimeout: time.Duration(s.PoolTimeout),
IdleCheckFrequency: time.Duration(s.IdleCheckFrequency),
IdleTimeout: time.Duration(s.IdleTimeout),
}
/* #nosec */
if s.EnableTLS {
options.TLSConfig = &tls.Config{
InsecureSkipVerify: s.EnableTLS,
// get row in string res beginning with "redis_version"
rows := strings.Split(res.(string), "\n")
for _, row := range rows {
if strings.HasPrefix(row, "redis_version:") {
return strings.TrimSpace(strings.Split(row, ":")[1]), nil
}
}
return redis.NewClient(options)
return "", fmt.Errorf("could not find redis_version in redis info response")
}
type RedisError string
func (e RedisError) Error() string { return string(e) }
func (RedisError) RedisError() {}

View File

@ -0,0 +1,414 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package redis
import (
"context"
"crypto/tls"
"strings"
"time"
v8 "github.com/go-redis/redis/v8"
)
type v8Pipeliner struct {
pipeliner v8.Pipeliner
writeTimeout Duration
}
func (p v8Pipeliner) Exec(ctx context.Context) error {
_, err := p.pipeliner.Exec(ctx)
return err
}
func (p v8Pipeliner) Do(ctx context.Context, args ...interface{}) {
if p.writeTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(p.writeTimeout))
defer cancel()
p.pipeliner.Do(timeoutCtx, args...)
}
p.pipeliner.Do(ctx, args...)
}
// v8Client is an interface implementation of RedisClient
type v8Client struct {
client v8.UniversalClient
readTimeout Duration
writeTimeout Duration
dialTimeout Duration
}
func (c v8Client) DoWrite(ctx context.Context, args ...interface{}) error {
if c.writeTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout))
defer cancel()
return c.client.Do(timeoutCtx, args...).Err()
}
return c.client.Do(ctx, args...).Err()
}
func (c v8Client) DoRead(ctx context.Context, args ...interface{}) (interface{}, error) {
if c.readTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout))
defer cancel()
return c.client.Do(timeoutCtx, args...).Result()
}
return c.client.Do(ctx, args...).Result()
}
func (c v8Client) Del(ctx context.Context, keys ...string) error {
err := c.client.Del(ctx, keys...).Err()
if err != nil {
return err
}
return nil
}
func (c v8Client) Get(ctx context.Context, key string) (string, error) {
return c.client.Get(ctx, key).Result()
}
func (c v8Client) GetNilValueError() RedisError {
return RedisError(v8.Nil.Error())
}
func (c v8Client) Context() context.Context {
return c.client.Context()
}
func (c v8Client) Close() error {
return c.client.Close()
}
func (c v8Client) PingResult(ctx context.Context) (string, error) {
if c.dialTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.dialTimeout))
defer cancel()
return c.client.Ping(timeoutCtx).Result()
}
return c.client.Ping(ctx).Result()
}
func (c v8Client) EvalInt(ctx context.Context, script string, keys []string, args ...interface{}) (*int, error, error) {
var evalCtx context.Context
if c.readTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout))
defer cancel()
evalCtx = timeoutCtx
} else {
evalCtx = ctx
}
eval := c.client.Eval(evalCtx, script, keys, args...)
if eval == nil {
return nil, nil, nil
}
i, err := eval.Int()
return &i, err, eval.Err()
}
func (c v8Client) SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) (*bool, error) {
var writeCtx context.Context
if c.writeTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout))
defer cancel()
writeCtx = timeoutCtx
} else {
writeCtx = ctx
}
nx := c.client.SetNX(writeCtx, key, value, expiration)
if nx == nil {
return nil, nil
}
val := nx.Val()
return &val, nx.Err()
}
func (c v8Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error) {
var writeCtx context.Context
if c.writeTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout))
defer cancel()
writeCtx = timeoutCtx
} else {
writeCtx = ctx
}
return c.client.XAdd(writeCtx, &v8.XAddArgs{
Stream: stream,
Values: values,
MaxLenApprox: maxLenApprox,
}).Result()
}
func (c v8Client) XGroupCreateMkStream(ctx context.Context, stream string, group string, start string) error {
var writeCtx context.Context
if c.writeTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout))
defer cancel()
writeCtx = timeoutCtx
} else {
writeCtx = ctx
}
return c.client.XGroupCreateMkStream(writeCtx, stream, group, start).Err()
}
func (c v8Client) XAck(ctx context.Context, stream string, group string, messageID string) error {
var readCtx context.Context
if c.readTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout))
defer cancel()
readCtx = timeoutCtx
} else {
readCtx = ctx
}
ack := c.client.XAck(readCtx, stream, group, messageID)
return ack.Err()
}
func (c v8Client) XReadGroupResult(ctx context.Context, group string, consumer string, streams []string, count int64, block time.Duration) ([]RedisXStream, error) {
var readCtx context.Context
if c.readTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout))
defer cancel()
readCtx = timeoutCtx
} else {
readCtx = ctx
}
res, err := c.client.XReadGroup(readCtx,
&v8.XReadGroupArgs{
Group: group,
Consumer: consumer,
Streams: streams,
Count: count,
Block: block,
},
).Result()
if err != nil {
return nil, err
}
// convert []v8.XStream to []RedisXStream
redisXStreams := make([]RedisXStream, len(res))
for i, xStream := range res {
redisXStreams[i].Stream = xStream.Stream
redisXStreams[i].Messages = make([]RedisXMessage, len(xStream.Messages))
for j, message := range xStream.Messages {
redisXStreams[i].Messages[j].ID = message.ID
redisXStreams[i].Messages[j].Values = message.Values
}
}
return redisXStreams, nil
}
func (c v8Client) XPendingExtResult(ctx context.Context, stream string, group string, start string, end string, count int64) ([]RedisXPendingExt, error) {
var readCtx context.Context
if c.readTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout))
defer cancel()
readCtx = timeoutCtx
} else {
readCtx = ctx
}
res, err := c.client.XPendingExt(readCtx, &v8.XPendingExtArgs{
Stream: stream,
Group: group,
Start: start,
End: end,
Count: count,
}).Result()
if err != nil {
return nil, err
}
// convert []v8.XPendingExt to []RedisXPendingExt
redisXPendingExts := make([]RedisXPendingExt, len(res))
for i, xPendingExt := range res {
redisXPendingExts[i] = RedisXPendingExt(xPendingExt)
}
return redisXPendingExts, nil
}
func (c v8Client) XClaimResult(ctx context.Context, stream string, group string, consumer string, minIdleTime time.Duration, messageIDs []string) ([]RedisXMessage, error) {
var readCtx context.Context
if c.readTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout))
defer cancel()
readCtx = timeoutCtx
} else {
readCtx = ctx
}
res, err := c.client.XClaim(readCtx, &v8.XClaimArgs{
Stream: stream,
Group: group,
Consumer: consumer,
MinIdle: minIdleTime,
Messages: messageIDs,
}).Result()
if err != nil {
return nil, err
}
// convert res to []RedisXMessage
redisXMessages := make([]RedisXMessage, len(res))
for i, xMessage := range res {
redisXMessages[i] = RedisXMessage(xMessage)
}
return redisXMessages, nil
}
func (c v8Client) TxPipeline() RedisPipeliner {
return v8Pipeliner{
pipeliner: c.client.TxPipeline(),
writeTimeout: c.writeTimeout,
}
}
func (c v8Client) TTLResult(ctx context.Context, key string) (time.Duration, error) {
var writeCtx context.Context
if c.writeTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout))
defer cancel()
writeCtx = timeoutCtx
} else {
writeCtx = ctx
}
return c.client.TTL(writeCtx, key).Result()
}
func newV8FailoverClient(s *Settings) RedisClient {
if s == nil {
return nil
}
opts := &v8.FailoverOptions{
DB: s.DB,
MasterName: s.SentinelMasterName,
SentinelAddrs: []string{s.Host},
Password: s.Password,
Username: s.Username,
MaxRetries: s.RedisMaxRetries,
MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval),
MinRetryBackoff: time.Duration(s.RedisMinRetryInterval),
DialTimeout: time.Duration(s.DialTimeout),
ReadTimeout: time.Duration(s.ReadTimeout),
WriteTimeout: time.Duration(s.WriteTimeout),
PoolSize: s.PoolSize,
MaxConnAge: time.Duration(s.MaxConnAge),
MinIdleConns: s.MinIdleConns,
PoolTimeout: time.Duration(s.PoolTimeout),
IdleCheckFrequency: time.Duration(s.IdleCheckFrequency),
IdleTimeout: time.Duration(s.IdleTimeout),
}
/* #nosec */
if s.EnableTLS {
opts.TLSConfig = &tls.Config{
InsecureSkipVerify: s.EnableTLS,
}
}
if s.RedisType == ClusterType {
opts.SentinelAddrs = strings.Split(s.Host, ",")
return v8Client{
client: v8.NewFailoverClusterClient(opts),
readTimeout: s.ReadTimeout,
writeTimeout: s.WriteTimeout,
dialTimeout: s.DialTimeout,
}
}
return v8Client{
client: v8.NewFailoverClient(opts),
readTimeout: s.ReadTimeout,
writeTimeout: s.WriteTimeout,
dialTimeout: s.DialTimeout,
}
}
func newV8Client(s *Settings) RedisClient {
if s == nil {
return nil
}
if s.RedisType == ClusterType {
options := &v8.ClusterOptions{
Addrs: strings.Split(s.Host, ","),
Password: s.Password,
Username: s.Username,
MaxRetries: s.RedisMaxRetries,
MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval),
MinRetryBackoff: time.Duration(s.RedisMinRetryInterval),
DialTimeout: time.Duration(s.DialTimeout),
ReadTimeout: time.Duration(s.ReadTimeout),
WriteTimeout: time.Duration(s.WriteTimeout),
PoolSize: s.PoolSize,
MaxConnAge: time.Duration(s.MaxConnAge),
MinIdleConns: s.MinIdleConns,
PoolTimeout: time.Duration(s.PoolTimeout),
IdleCheckFrequency: time.Duration(s.IdleCheckFrequency),
IdleTimeout: time.Duration(s.IdleTimeout),
}
/* #nosec */
if s.EnableTLS {
options.TLSConfig = &tls.Config{
InsecureSkipVerify: s.EnableTLS,
}
}
return v8Client{
client: v8.NewClusterClient(options),
readTimeout: s.ReadTimeout,
writeTimeout: s.WriteTimeout,
dialTimeout: s.DialTimeout,
}
}
options := &v8.Options{
Addr: s.Host,
Password: s.Password,
Username: s.Username,
DB: s.DB,
MaxRetries: s.RedisMaxRetries,
MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval),
MinRetryBackoff: time.Duration(s.RedisMinRetryInterval),
DialTimeout: time.Duration(s.DialTimeout),
ReadTimeout: time.Duration(s.ReadTimeout),
WriteTimeout: time.Duration(s.WriteTimeout),
PoolSize: s.PoolSize,
MaxConnAge: time.Duration(s.MaxConnAge),
MinIdleConns: s.MinIdleConns,
PoolTimeout: time.Duration(s.PoolTimeout),
IdleCheckFrequency: time.Duration(s.IdleCheckFrequency),
IdleTimeout: time.Duration(s.IdleTimeout),
}
/* #nosec */
if s.EnableTLS {
options.TLSConfig = &tls.Config{
InsecureSkipVerify: s.EnableTLS,
}
}
return v8Client{
client: v8.NewClient(options),
readTimeout: s.ReadTimeout,
writeTimeout: s.WriteTimeout,
dialTimeout: s.DialTimeout,
}
}
func ClientFromV8Client(client v8.UniversalClient) RedisClient {
return v8Client{client: client}
}

View File

@ -0,0 +1,410 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package redis
import (
"context"
"crypto/tls"
"strings"
"time"
v9 "github.com/go-redis/redis/v9"
)
type v9Pipeliner struct {
pipeliner v9.Pipeliner
writeTimeout Duration
}
func (p v9Pipeliner) Exec(ctx context.Context) error {
_, err := p.pipeliner.Exec(ctx)
return err
}
func (p v9Pipeliner) Do(ctx context.Context, args ...interface{}) {
if p.writeTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(p.writeTimeout))
defer cancel()
p.pipeliner.Do(timeoutCtx, args...)
}
p.pipeliner.Do(ctx, args...)
}
// v9Client is an interface implementation of RedisClient
type v9Client struct {
client v9.UniversalClient
readTimeout Duration
writeTimeout Duration
dialTimeout Duration
}
func (c v9Client) DoWrite(ctx context.Context, args ...interface{}) error {
if c.writeTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout))
defer cancel()
return c.client.Do(timeoutCtx, args...).Err()
}
return c.client.Do(ctx, args...).Err()
}
func (c v9Client) DoRead(ctx context.Context, args ...interface{}) (interface{}, error) {
if c.readTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout))
defer cancel()
return c.client.Do(timeoutCtx, args...).Result()
}
return c.client.Do(ctx, args...).Result()
}
func (c v9Client) Del(ctx context.Context, keys ...string) error {
err := c.client.Del(ctx, keys...).Err()
if err != nil {
return err
}
return nil
}
func (c v9Client) Get(ctx context.Context, key string) (string, error) {
return c.client.Get(ctx, key).Result()
}
func (c v9Client) GetNilValueError() RedisError {
return RedisError(v9.Nil.Error())
}
func (c v9Client) Context() context.Context {
return context.Background()
}
func (c v9Client) Close() error {
return c.client.Close()
}
func (c v9Client) PingResult(ctx context.Context) (string, error) {
if c.dialTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.dialTimeout))
defer cancel()
return c.client.Ping(timeoutCtx).Result()
}
return c.client.Ping(ctx).Result()
}
func (c v9Client) EvalInt(ctx context.Context, script string, keys []string, args ...interface{}) (*int, error, error) {
var evalCtx context.Context
if c.readTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout))
defer cancel()
evalCtx = timeoutCtx
} else {
evalCtx = ctx
}
eval := c.client.Eval(evalCtx, script, keys, args...)
if eval == nil {
return nil, nil, nil
}
i, err := eval.Int()
return &i, err, eval.Err()
}
func (c v9Client) SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) (*bool, error) {
var writeCtx context.Context
if c.writeTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout))
defer cancel()
writeCtx = timeoutCtx
} else {
writeCtx = ctx
}
nx := c.client.SetNX(writeCtx, key, value, expiration)
if nx == nil {
return nil, nil
}
val := nx.Val()
return &val, nx.Err()
}
func (c v9Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error) {
var writeCtx context.Context
if c.writeTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout))
defer cancel()
writeCtx = timeoutCtx
} else {
writeCtx = ctx
}
return c.client.XAdd(writeCtx, &v9.XAddArgs{
Stream: stream,
Values: values,
MaxLen: maxLenApprox,
}).Result()
}
func (c v9Client) XGroupCreateMkStream(ctx context.Context, stream string, group string, start string) error {
var writeCtx context.Context
if c.writeTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout))
defer cancel()
writeCtx = timeoutCtx
} else {
writeCtx = ctx
}
return c.client.XGroupCreateMkStream(writeCtx, stream, group, start).Err()
}
func (c v9Client) XAck(ctx context.Context, stream string, group string, messageID string) error {
var readCtx context.Context
if c.readTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout))
defer cancel()
readCtx = timeoutCtx
} else {
readCtx = ctx
}
ack := c.client.XAck(readCtx, stream, group, messageID)
return ack.Err()
}
func (c v9Client) XReadGroupResult(ctx context.Context, group string, consumer string, streams []string, count int64, block time.Duration) ([]RedisXStream, error) {
var readCtx context.Context
if c.readTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout))
defer cancel()
readCtx = timeoutCtx
} else {
readCtx = ctx
}
res, err := c.client.XReadGroup(readCtx,
&v9.XReadGroupArgs{
Group: group,
Consumer: consumer,
Streams: streams,
Count: count,
Block: block,
},
).Result()
if err != nil {
return nil, err
}
// convert []v9.XStream to []RedisXStream
redisXStreams := make([]RedisXStream, len(res))
for i, xStream := range res {
redisXStreams[i].Stream = xStream.Stream
redisXStreams[i].Messages = make([]RedisXMessage, len(xStream.Messages))
for j, message := range xStream.Messages {
redisXStreams[i].Messages[j].ID = message.ID
redisXStreams[i].Messages[j].Values = message.Values
}
}
return redisXStreams, nil
}
func (c v9Client) XPendingExtResult(ctx context.Context, stream string, group string, start string, end string, count int64) ([]RedisXPendingExt, error) {
var readCtx context.Context
if c.readTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout))
defer cancel()
readCtx = timeoutCtx
} else {
readCtx = ctx
}
res, err := c.client.XPendingExt(readCtx, &v9.XPendingExtArgs{
Stream: stream,
Group: group,
Start: start,
End: end,
Count: count,
}).Result()
if err != nil {
return nil, err
}
// convert []v9.XPendingExt to []RedisXPendingExt
redisXPendingExts := make([]RedisXPendingExt, len(res))
for i, xPendingExt := range res {
redisXPendingExts[i] = RedisXPendingExt(xPendingExt)
}
return redisXPendingExts, nil
}
func (c v9Client) XClaimResult(ctx context.Context, stream string, group string, consumer string, minIdleTime time.Duration, messageIDs []string) ([]RedisXMessage, error) {
var readCtx context.Context
if c.readTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.readTimeout))
defer cancel()
readCtx = timeoutCtx
} else {
readCtx = ctx
}
res, err := c.client.XClaim(readCtx, &v9.XClaimArgs{
Stream: stream,
Group: group,
Consumer: consumer,
MinIdle: minIdleTime,
Messages: messageIDs,
}).Result()
if err != nil {
return nil, err
}
// convert res to []RedisXMessage
redisXMessages := make([]RedisXMessage, len(res))
for i, xMessage := range res {
redisXMessages[i] = RedisXMessage(xMessage)
}
return redisXMessages, nil
}
func (c v9Client) TxPipeline() RedisPipeliner {
return v9Pipeliner{
pipeliner: c.client.TxPipeline(),
writeTimeout: c.writeTimeout,
}
}
func (c v9Client) TTLResult(ctx context.Context, key string) (time.Duration, error) {
var writeCtx context.Context
if c.writeTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout))
defer cancel()
writeCtx = timeoutCtx
} else {
writeCtx = ctx
}
return c.client.TTL(writeCtx, key).Result()
}
func newV9FailoverClient(s *Settings) RedisClient {
if s == nil {
return nil
}
opts := &v9.FailoverOptions{
DB: s.DB,
MasterName: s.SentinelMasterName,
SentinelAddrs: []string{s.Host},
Password: s.Password,
Username: s.Username,
MaxRetries: s.RedisMaxRetries,
MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval),
MinRetryBackoff: time.Duration(s.RedisMinRetryInterval),
DialTimeout: time.Duration(s.DialTimeout),
ReadTimeout: time.Duration(s.ReadTimeout),
WriteTimeout: time.Duration(s.WriteTimeout),
PoolSize: s.PoolSize,
ConnMaxLifetime: time.Duration(s.MaxConnAge),
MinIdleConns: s.MinIdleConns,
PoolTimeout: time.Duration(s.PoolTimeout),
ConnMaxIdleTime: time.Duration(s.IdleTimeout),
ContextTimeoutEnabled: true,
}
/* #nosec */
if s.EnableTLS {
opts.TLSConfig = &tls.Config{
InsecureSkipVerify: s.EnableTLS,
}
}
if s.RedisType == ClusterType {
opts.SentinelAddrs = strings.Split(s.Host, ",")
return v9Client{
client: v9.NewFailoverClusterClient(opts),
readTimeout: s.ReadTimeout,
writeTimeout: s.WriteTimeout,
dialTimeout: s.DialTimeout,
}
}
return v9Client{
client: v9.NewFailoverClient(opts),
readTimeout: s.ReadTimeout,
writeTimeout: s.WriteTimeout,
dialTimeout: s.DialTimeout,
}
}
func newV9Client(s *Settings) RedisClient {
if s == nil {
return nil
}
if s.RedisType == ClusterType {
options := &v9.ClusterOptions{
Addrs: strings.Split(s.Host, ","),
Password: s.Password,
Username: s.Username,
MaxRetries: s.RedisMaxRetries,
MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval),
MinRetryBackoff: time.Duration(s.RedisMinRetryInterval),
DialTimeout: time.Duration(s.DialTimeout),
ReadTimeout: time.Duration(s.ReadTimeout),
WriteTimeout: time.Duration(s.WriteTimeout),
PoolSize: s.PoolSize,
ConnMaxLifetime: time.Duration(s.MaxConnAge),
MinIdleConns: s.MinIdleConns,
PoolTimeout: time.Duration(s.PoolTimeout),
ConnMaxIdleTime: time.Duration(s.IdleTimeout),
ContextTimeoutEnabled: true,
}
/* #nosec */
if s.EnableTLS {
options.TLSConfig = &tls.Config{
InsecureSkipVerify: s.EnableTLS,
}
}
return v9Client{
client: v9.NewClusterClient(options),
readTimeout: s.ReadTimeout,
writeTimeout: s.WriteTimeout,
dialTimeout: s.DialTimeout,
}
}
options := &v9.Options{
Addr: s.Host,
Password: s.Password,
Username: s.Username,
DB: s.DB,
MaxRetries: s.RedisMaxRetries,
MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval),
MinRetryBackoff: time.Duration(s.RedisMinRetryInterval),
DialTimeout: time.Duration(s.DialTimeout),
ReadTimeout: time.Duration(s.ReadTimeout),
WriteTimeout: time.Duration(s.WriteTimeout),
PoolSize: s.PoolSize,
ConnMaxLifetime: time.Duration(s.MaxConnAge),
MinIdleConns: s.MinIdleConns,
PoolTimeout: time.Duration(s.PoolTimeout),
ConnMaxIdleTime: time.Duration(s.IdleTimeout),
ContextTimeoutEnabled: true,
}
/* #nosec */
if s.EnableTLS {
options.TLSConfig = &tls.Config{
InsecureSkipVerify: s.EnableTLS,
}
}
return v9Client{
client: v9.NewClient(options),
readTimeout: s.ReadTimeout,
writeTimeout: s.WriteTimeout,
dialTimeout: s.DialTimeout,
}
}

View File

@ -20,8 +20,6 @@ import (
"strings"
"time"
"github.com/go-redis/redis/v8"
rediscomponent "github.com/dapr/components-contrib/internal/component/redis"
"github.com/dapr/components-contrib/lock"
"github.com/dapr/kit/logger"
@ -35,7 +33,7 @@ const (
// Standalone Redis lock store.Any fail-over related features are not supported,such as Sentinel and Redis Cluster.
type StandaloneRedisLock struct {
client redis.UniversalClient
client rediscomponent.RedisClient
clientSettings *rediscomponent.Settings
metadata rediscomponent.Metadata
@ -79,7 +77,7 @@ func (r *StandaloneRedisLock) InitLockStore(metadata lock.Metadata) error {
}
r.ctx, r.cancel = context.WithCancel(context.Background())
// 3. connect to redis
if _, err = r.client.Ping(r.ctx).Result(); err != nil {
if _, err = r.client.PingResult(r.ctx); err != nil {
return fmt.Errorf("[standaloneRedisLock]: error connecting to redis at %s: %s", r.clientSettings.Host, err)
}
// no replica
@ -104,7 +102,7 @@ func needFailover(properties map[string]string) bool {
}
func (r *StandaloneRedisLock) getConnectedSlaves() (int, error) {
res, err := r.client.Do(r.ctx, "INFO", "replication").Result()
res, err := r.client.DoRead(r.ctx, "INFO", "replication")
if err != nil {
return 0, err
}
@ -135,37 +133,32 @@ func (r *StandaloneRedisLock) parseConnectedSlaves(res string) int {
// Try to acquire a redis lock.
func (r *StandaloneRedisLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
// 1.Setting redis expiration time
nx := r.client.SetNX(r.ctx, req.ResourceID, req.LockOwner, time.Second*time.Duration(req.ExpiryInSeconds))
if nx == nil {
nxval, err := r.client.SetNX(r.ctx, req.ResourceID, req.LockOwner, time.Second*time.Duration(req.ExpiryInSeconds))
if nxval == nil {
return &lock.TryLockResponse{}, fmt.Errorf("[standaloneRedisLock]: SetNX returned nil.ResourceID: %s", req.ResourceID)
}
// 2. check error
err := nx.Err()
if err != nil {
return &lock.TryLockResponse{}, err
}
return &lock.TryLockResponse{
Success: nx.Val(),
Success: *nxval,
}, nil
}
// Try to release a redis lock.
func (r *StandaloneRedisLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
// 1. delegate to client.eval lua script
eval := r.client.Eval(r.ctx, unlockScript, []string{req.ResourceID}, req.LockOwner)
evalInt, parseErr, err := r.client.EvalInt(r.ctx, unlockScript, []string{req.ResourceID}, req.LockOwner)
// 2. check error
if eval == nil {
if evalInt == nil {
return newInternalErrorUnlockResponse(), fmt.Errorf("[standaloneRedisLock]: Eval unlock script returned nil.ResourceID: %s", req.ResourceID)
}
err := eval.Err()
if err != nil {
return newInternalErrorUnlockResponse(), err
}
// 3. parse result
i, err := eval.Int()
i := *evalInt
status := lock.InternalError
if err != nil {
if parseErr != nil {
return &lock.UnlockResponse{
Status: status,
}, err
@ -194,7 +187,9 @@ func (r *StandaloneRedisLock) Close() error {
r.cancel()
}
if r.client != nil {
return r.client.Close()
closeErr := r.client.Close()
r.client = nil
return closeErr
}
return nil
}

View File

@ -20,8 +20,6 @@ import (
"strconv"
"time"
"github.com/go-redis/redis/v8"
rediscomponent "github.com/dapr/components-contrib/internal/component/redis"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
@ -45,7 +43,7 @@ const (
// on the mechanics of Redis Streams.
type redisStreams struct {
metadata metadata
client redis.UniversalClient
client rediscomponent.RedisClient
clientSettings *rediscomponent.Settings
logger logger.Logger
@ -144,7 +142,7 @@ func (r *redisStreams) Init(metadata pubsub.Metadata) error {
r.ctx, r.cancel = context.WithCancel(context.Background())
if _, err = r.client.Ping(r.ctx).Result(); err != nil {
if _, err = r.client.PingResult(r.ctx); err != nil {
return fmt.Errorf("redis streams: error connecting to redis at %s: %s", r.clientSettings.Host, err)
}
r.queue = make(chan redisMessageWrapper, int(r.metadata.queueDepth))
@ -157,11 +155,7 @@ func (r *redisStreams) Init(metadata pubsub.Metadata) error {
}
func (r *redisStreams) Publish(req *pubsub.PublishRequest) error {
_, err := r.client.XAdd(r.ctx, &redis.XAddArgs{
Stream: req.Topic,
MaxLenApprox: r.metadata.maxLenApprox,
Values: map[string]interface{}{"data": req.Data},
}).Result()
_, err := r.client.XAdd(r.ctx, req.Topic, r.metadata.maxLenApprox, map[string]interface{}{"data": req.Data})
if err != nil {
return fmt.Errorf("redis streams: error from publish: %s", err)
}
@ -170,7 +164,7 @@ func (r *redisStreams) Publish(req *pubsub.PublishRequest) error {
}
func (r *redisStreams) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
err := r.client.XGroupCreateMkStream(ctx, req.Topic, r.metadata.consumerID, "0").Err()
err := r.client.XGroupCreateMkStream(ctx, req.Topic, r.metadata.consumerID, "0")
// Ignore BUSYGROUP errors
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
r.logger.Errorf("redis streams: %s", err)
@ -186,14 +180,14 @@ func (r *redisStreams) Subscribe(ctx context.Context, req pubsub.SubscribeReques
// enqueueMessages is a shared function that funnels new messages (via polling)
// and redelivered messages (via reclaiming) to a channel where workers can
// pick them up for processing.
func (r *redisStreams) enqueueMessages(ctx context.Context, stream string, handler pubsub.Handler, msgs []redis.XMessage) {
func (r *redisStreams) enqueueMessages(ctx context.Context, stream string, handler pubsub.Handler, msgs []rediscomponent.RedisXMessage) {
for _, msg := range msgs {
rmsg := createRedisMessageWrapper(ctx, stream, handler, msg)
select {
// Might block if the queue is full so we need the ctx.Done below.
case r.queue <- rmsg:
// Noop
// Handle cancelation
case <-ctx.Done():
return
@ -203,7 +197,7 @@ func (r *redisStreams) enqueueMessages(ctx context.Context, stream string, handl
// createRedisMessageWrapper encapsulates the Redis message, message identifier, and handler
// in `redisMessage` for processing.
func createRedisMessageWrapper(ctx context.Context, stream string, handler pubsub.Handler, msg redis.XMessage) redisMessageWrapper {
func createRedisMessageWrapper(ctx context.Context, stream string, handler pubsub.Handler, msg rediscomponent.RedisXMessage) redisMessageWrapper {
var data []byte
if dataValue, exists := msg.Values["data"]; exists && dataValue != nil {
switch v := dataValue.(type) {
@ -259,7 +253,7 @@ func (r *redisStreams) processMessage(msg redisMessageWrapper) error {
}
// Use the background context in case subscriptionCtx is already closed
if err := r.client.XAck(context.Background(), msg.message.Topic, r.metadata.consumerID, msg.messageID).Err(); err != nil {
if err := r.client.XAck(context.Background(), msg.message.Topic, r.metadata.consumerID, msg.messageID); err != nil {
r.logger.Errorf("Error acknowledging Redis message %s: %v", msg.messageID, err)
return err
@ -278,15 +272,9 @@ func (r *redisStreams) pollNewMessagesLoop(ctx context.Context, stream string, h
}
// Read messages
streams, err := r.client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: r.metadata.consumerID,
Consumer: r.metadata.consumerID,
Streams: []string{stream, ">"},
Count: int64(r.metadata.queueDepth),
Block: time.Duration(r.clientSettings.ReadTimeout),
}).Result()
streams, err := r.client.XReadGroupResult(ctx, r.metadata.consumerID, r.metadata.consumerID, []string{stream, ">"}, int64(r.metadata.queueDepth), time.Duration(r.clientSettings.ReadTimeout))
if err != nil {
if !errors.Is(err, redis.Nil) && err != context.Canceled {
if !errors.Is(err, r.client.GetNilValueError()) && err != context.Canceled {
r.logger.Errorf("redis streams: error reading from stream %s: %s", stream, err)
}
continue
@ -329,14 +317,14 @@ func (r *redisStreams) reclaimPendingMessagesLoop(ctx context.Context, stream st
func (r *redisStreams) reclaimPendingMessages(ctx context.Context, stream string, handler pubsub.Handler) {
for {
// Retrieve pending messages for this stream and consumer
pendingResult, err := r.client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: stream,
Group: r.metadata.consumerID,
Start: "-",
End: "+",
Count: int64(r.metadata.queueDepth),
}).Result()
if err != nil && !errors.Is(err, redis.Nil) {
pendingResult, err := r.client.XPendingExtResult(ctx,
stream,
r.metadata.consumerID,
"-",
"+",
int64(r.metadata.queueDepth),
)
if err != nil && !errors.Is(err, r.client.GetNilValueError()) {
r.logger.Errorf("error retrieving pending Redis messages: %v", err)
break
@ -356,14 +344,14 @@ func (r *redisStreams) reclaimPendingMessages(ctx context.Context, stream string
}
// Attempt to claim the messages for the filtered IDs
claimResult, err := r.client.XClaim(ctx, &redis.XClaimArgs{
Stream: stream,
Group: r.metadata.consumerID,
Consumer: r.metadata.consumerID,
MinIdle: r.metadata.processingTimeout,
Messages: msgIDs,
}).Result()
if err != nil && !errors.Is(err, redis.Nil) {
claimResult, err := r.client.XClaimResult(ctx,
stream,
r.metadata.consumerID,
r.metadata.consumerID,
r.metadata.processingTimeout,
msgIDs,
)
if err != nil && !errors.Is(err, r.client.GetNilValueError()) {
r.logger.Errorf("error claiming pending Redis messages: %v", err)
break
@ -375,7 +363,7 @@ func (r *redisStreams) reclaimPendingMessages(ctx context.Context, stream string
// If the Redis nil error is returned, it means somes message in the pending
// state no longer exist. We need to acknowledge these messages to
// remove them from the pending list.
if errors.Is(err, redis.Nil) {
if errors.Is(err, r.client.GetNilValueError()) {
// Build a set of message IDs that were not returned
// that potentially no longer exist.
expectedMsgIDs := make(map[string]struct{}, len(msgIDs))
@ -396,23 +384,23 @@ func (r *redisStreams) reclaimPendingMessages(ctx context.Context, stream string
func (r *redisStreams) removeMessagesThatNoLongerExistFromPending(ctx context.Context, stream string, messageIDs map[string]struct{}, handler pubsub.Handler) {
// Check each message ID individually.
for pendingID := range messageIDs {
claimResultSingleMsg, err := r.client.XClaim(ctx, &redis.XClaimArgs{
Stream: stream,
Group: r.metadata.consumerID,
Consumer: r.metadata.consumerID,
MinIdle: 0,
Messages: []string{pendingID},
}).Result()
if err != nil && !errors.Is(err, redis.Nil) {
claimResultSingleMsg, err := r.client.XClaimResult(ctx,
stream,
r.metadata.consumerID,
r.metadata.consumerID,
0,
[]string{pendingID},
)
if err != nil && !errors.Is(err, r.client.GetNilValueError()) {
r.logger.Errorf("error claiming pending Redis message %s: %v", pendingID, err)
continue
}
// Ack the message to remove it from the pending list.
if errors.Is(err, redis.Nil) {
if errors.Is(err, r.client.GetNilValueError()) {
// Use the background context in case subscriptionCtx is already closed
if err = r.client.XAck(context.Background(), stream, r.metadata.consumerID, pendingID).Err(); err != nil {
if err = r.client.XAck(context.Background(), stream, r.metadata.consumerID, pendingID); err != nil {
r.logger.Errorf("error acknowledging Redis message %s after failed claim for %s: %v", pendingID, stream, err)
}
} else {
@ -423,8 +411,13 @@ func (r *redisStreams) removeMessagesThatNoLongerExistFromPending(ctx context.Co
}
func (r *redisStreams) Close() error {
r.cancel()
if r.cancel != nil {
r.cancel()
}
if r.client == nil {
return nil
}
return r.client.Close()
}
@ -433,7 +426,7 @@ func (r *redisStreams) Features() []pubsub.Feature {
}
func (r *redisStreams) Ping() error {
if _, err := r.client.Ping(context.Background()).Result(); err != nil {
if _, err := r.client.PingResult(context.Background()); err != nil {
return fmt.Errorf("redis pubsub: error connecting to redis at %s: %s", r.clientSettings.Host, err)
}

View File

@ -20,12 +20,13 @@ import (
"sync"
"testing"
"github.com/go-redis/redis/v8"
"github.com/stretchr/testify/assert"
mdata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
internalredis "github.com/dapr/components-contrib/internal/component/redis"
)
func getFakeProperties() map[string]string {
@ -108,9 +109,9 @@ func TestProcessStreams(t *testing.T) {
assert.Equal(t, 3, messageCount)
}
func generateRedisStreamTestData(topicCount, messageCount int, data string) []redis.XMessage {
generateXMessage := func(id int) redis.XMessage {
return redis.XMessage{
func generateRedisStreamTestData(topicCount, messageCount int, data string) []internalredis.RedisXMessage {
generateXMessage := func(id int) internalredis.RedisXMessage {
return internalredis.RedisXMessage{
ID: fmt.Sprintf("%d", id),
Values: map[string]interface{}{
"data": data,
@ -118,7 +119,7 @@ func generateRedisStreamTestData(topicCount, messageCount int, data string) []re
}
}
xmessageArray := make([]redis.XMessage, messageCount)
xmessageArray := make([]internalredis.RedisXMessage, messageCount)
for i := range xmessageArray {
xmessageArray[i] = generateXMessage(i)
}

View File

@ -20,7 +20,6 @@ import (
"strconv"
"strings"
"github.com/go-redis/redis/v8"
jsoniter "github.com/json-iterator/go"
"github.com/dapr/components-contrib/contenttype"
@ -92,7 +91,7 @@ const (
// StateStore is a Redis state store.
type StateStore struct {
state.DefaultBulkStore
client redis.UniversalClient
client rediscomponent.RedisClient
clientSettings *rediscomponent.Settings
json jsoniter.API
metadata rediscomponent.Metadata
@ -119,7 +118,7 @@ func NewRedisStateStore(logger logger.Logger) state.Store {
}
func (r *StateStore) Ping() error {
if _, err := r.client.Ping(context.Background()).Result(); err != nil {
if _, err := r.client.PingResult(context.Background()); err != nil {
return fmt.Errorf("redis store: error connecting to redis at %s: %s", r.clientSettings.Host, err)
}
@ -147,7 +146,7 @@ func (r *StateStore) Init(metadata state.Metadata) error {
r.ctx, r.cancel = context.WithCancel(context.Background())
if _, err = r.client.Ping(r.ctx).Result(); err != nil {
if _, err = r.client.PingResult(r.ctx); err != nil {
return fmt.Errorf("redis store: error connecting to redis at %s: %v", r.clientSettings.Host, err)
}
@ -168,7 +167,7 @@ func (r *StateStore) Features() []state.Feature {
}
func (r *StateStore) getConnectedSlaves() (int, error) {
res, err := r.client.Do(r.ctx, "INFO", "replication").Result()
res, err := r.client.DoRead(r.ctx, "INFO", "replication")
if err != nil {
return 0, err
}
@ -209,12 +208,12 @@ func (r *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error
}
var delQuery string
if contentType, ok := req.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType {
if contentType, ok := req.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType && rediscomponent.ClientHasJSONSupport(r.client) {
delQuery = delJSONQuery
} else {
delQuery = delDefaultQuery
}
_, err = r.client.Do(ctx, "EVAL", delQuery, 1, req.Key, *req.ETag).Result()
err = r.client.DoWrite(ctx, "EVAL", delQuery, 1, req.Key, *req.ETag)
if err != nil {
return state.NewETagError(state.ETagMismatch, err)
}
@ -222,9 +221,8 @@ func (r *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error
return nil
}
// Delete performs a delete operation.
func (r *StateStore) directGet(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
res, err := r.client.Do(r.ctx, "GET", req.Key).Result()
res, err := r.client.DoRead(ctx, "GET", req.Key)
if err != nil {
return nil, err
}
@ -241,14 +239,23 @@ func (r *StateStore) directGet(ctx context.Context, req *state.GetRequest) (*sta
}
func (r *StateStore) getDefault(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
res, err := r.client.Do(ctx, "HGETALL", req.Key).Result() // Prefer values with ETags
res, err := r.client.DoRead(ctx, "HGETALL", req.Key) // Prefer values with ETags
if err != nil {
return r.directGet(ctx, req) // Falls back to original get for backward compats.
}
if res == nil {
return &state.GetResponse{}, nil
}
vals := res.([]interface{})
vals, ok := res.([]interface{})
if !ok {
// we retrieved a JSON value from a non-JSON store
valMap := res.(map[interface{}]interface{})
// convert valMap to []interface{}
vals = make([]interface{}, 0, len(valMap))
for k, v := range valMap {
vals = append(vals, k, v)
}
}
if len(vals) == 0 {
return &state.GetResponse{}, nil
}
@ -265,7 +272,7 @@ func (r *StateStore) getDefault(ctx context.Context, req *state.GetRequest) (*st
}
func (r *StateStore) getJSON(req *state.GetRequest) (*state.GetResponse, error) {
res, err := r.client.Do(r.ctx, "JSON.GET", req.Key).Result()
res, err := r.client.DoRead(r.ctx, "JSON.GET", req.Key)
if err != nil {
return nil, err
}
@ -303,7 +310,7 @@ func (r *StateStore) getJSON(req *state.GetRequest) (*state.GetResponse, error)
// Get retrieves state from redis with a key.
func (r *StateStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
if contentType, ok := req.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType {
if contentType, ok := req.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType && rediscomponent.ClientHasJSONSupport(r.client) {
return r.getJSON(req)
}
@ -341,7 +348,7 @@ func (r *StateStore) Set(ctx context.Context, req *state.SetRequest) error {
var bt []byte
var setQuery string
if contentType, ok := req.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType {
if contentType, ok := req.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType && rediscomponent.ClientHasJSONSupport(r.client) {
setQuery = setJSONQuery
bt, _ = utils.Marshal(&jsonEntry{Data: req.Value}, r.json.Marshal)
} else {
@ -349,7 +356,7 @@ func (r *StateStore) Set(ctx context.Context, req *state.SetRequest) error {
bt, _ = utils.Marshal(req.Value, r.json.Marshal)
}
err = r.client.Do(ctx, "EVAL", setQuery, 1, req.Key, ver, bt, firstWrite).Err()
err = r.client.DoWrite(ctx, "EVAL", setQuery, 1, req.Key, ver, bt, firstWrite)
if err != nil {
if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err)
@ -359,21 +366,21 @@ func (r *StateStore) Set(ctx context.Context, req *state.SetRequest) error {
}
if ttl != nil && *ttl > 0 {
_, err = r.client.Do(ctx, "EXPIRE", req.Key, *ttl).Result()
err = r.client.DoWrite(ctx, "EXPIRE", req.Key, *ttl)
if err != nil {
return fmt.Errorf("failed to set key %s ttl: %s", req.Key, err)
}
}
if ttl != nil && *ttl <= 0 {
_, err = r.client.Do(ctx, "PERSIST", req.Key).Result()
err = r.client.DoWrite(ctx, "PERSIST", req.Key)
if err != nil {
return fmt.Errorf("failed to persist key %s: %s", req.Key, err)
}
}
if req.Options.Consistency == state.Strong && r.replicas > 0 {
_, err = r.client.Do(ctx, "WAIT", r.replicas, 1000).Result()
err = r.client.DoWrite(ctx, "WAIT", r.replicas, 1000)
if err != nil {
return fmt.Errorf("redis waiting for %v replicas to acknowledge write, err: %s", r.replicas, err.Error())
}
@ -386,7 +393,7 @@ func (r *StateStore) Set(ctx context.Context, req *state.SetRequest) error {
func (r *StateStore) Multi(ctx context.Context, request *state.TransactionalStateRequest) error {
var setQuery, delQuery string
var isJSON bool
if contentType, ok := request.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType {
if contentType, ok := request.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType && rediscomponent.ClientHasJSONSupport(r.client) {
isJSON = true
setQuery = setJSONQuery
delQuery = delJSONQuery
@ -434,7 +441,7 @@ func (r *StateStore) Multi(ctx context.Context, request *state.TransactionalStat
}
}
_, err := pipe.Exec(ctx)
err := pipe.Exec(ctx)
return err
}
@ -442,15 +449,15 @@ func (r *StateStore) Multi(ctx context.Context, request *state.TransactionalStat
func (r *StateStore) registerSchemas() error {
for name, elem := range r.querySchemas {
r.logger.Infof("redis: create query index %s", name)
if err := r.client.Do(r.ctx, elem.schema...).Err(); err != nil {
if err := r.client.DoWrite(r.ctx, elem.schema...); err != nil {
if err.Error() != "Index already exists" {
return err
}
r.logger.Infof("redis: drop stale query index %s", name)
if err = r.client.Do(r.ctx, "FT.DROPINDEX", name).Err(); err != nil {
if err = r.client.DoWrite(r.ctx, "FT.DROPINDEX", name); err != nil {
return err
}
if err = r.client.Do(r.ctx, elem.schema...).Err(); err != nil {
if err = r.client.DoWrite(r.ctx, elem.schema...); err != nil {
return err
}
}
@ -509,6 +516,9 @@ func (r *StateStore) parseTTL(req *state.SetRequest) (*int, error) {
// Query executes a query against store.
func (r *StateStore) Query(ctx context.Context, req *state.QueryRequest) (*state.QueryResponse, error) {
if !rediscomponent.ClientHasJSONSupport(r.client) {
return nil, fmt.Errorf("redis-json server support is required for query capability")
}
indexName, ok := daprmetadata.TryGetQueryIndexName(req.Metadata)
if !ok {
return nil, fmt.Errorf("query index not found")

View File

@ -20,10 +20,9 @@ import (
"strconv"
"strings"
rediscomponent "github.com/dapr/components-contrib/internal/component/redis"
"github.com/dapr/components-contrib/state"
"github.com/dapr/components-contrib/state/query"
"github.com/go-redis/redis/v8"
)
var ErrMultipleSortBy error = errors.New("multiple SORTBY steps are not allowed. Sort multiple fields in a single step")
@ -190,9 +189,9 @@ func (q *Query) Finalize(filters string, qq *query.Query) error {
return nil
}
func (q *Query) execute(ctx context.Context, client redis.UniversalClient) ([]state.QueryItem, string, error) {
func (q *Query) execute(ctx context.Context, client rediscomponent.RedisClient) ([]state.QueryItem, string, error) {
query := append(append([]interface{}{"FT.SEARCH", q.schemaName}, q.query...), "RETURN", "2", "$.data", "$.version")
ret, err := client.Do(ctx, query...).Result()
ret, err := client.DoRead(ctx, query...)
if err != nil {
return nil, "", err
}

View File

@ -239,7 +239,7 @@ func TestTransactionalUpsert(t *testing.T) {
})
assert.Equal(t, nil, err)
res, err := c.Do(context.Background(), "HGETALL", "weapon").Result()
res, err := c.DoRead(context.Background(), "HGETALL", "weapon")
assert.Equal(t, nil, err)
vals := res.([]interface{})
@ -248,15 +248,15 @@ func TestTransactionalUpsert(t *testing.T) {
assert.Equal(t, ptr.Of("1"), version)
assert.Equal(t, `"deathstar"`, data)
res, err = c.Do(context.Background(), "TTL", "weapon").Result()
res, err = c.DoRead(context.Background(), "TTL", "weapon")
assert.Equal(t, nil, err)
assert.Equal(t, int64(-1), res)
res, err = c.Do(context.Background(), "TTL", "weapon2").Result()
res, err = c.DoRead(context.Background(), "TTL", "weapon2")
assert.Equal(t, nil, err)
assert.Equal(t, int64(123), res)
res, err = c.Do(context.Background(), "TTL", "weapon3").Result()
res, err = c.DoRead(context.Background(), "TTL", "weapon3")
assert.Equal(t, nil, err)
assert.Equal(t, int64(-1), res)
}
@ -290,7 +290,7 @@ func TestTransactionalDelete(t *testing.T) {
})
assert.Equal(t, nil, err)
res, err := c.Do(context.Background(), "HGETALL", "weapon").Result()
res, err := c.DoRead(context.Background(), "HGETALL", "weapon")
assert.Equal(t, nil, err)
vals := res.([]interface{})
@ -335,7 +335,7 @@ func TestRequestsWithGlobalTTL(t *testing.T) {
Key: "weapon100",
Value: "deathstar100",
})
ttl, _ := ss.client.TTL(ss.ctx, "weapon100").Result()
ttl, _ := ss.client.TTLResult(ss.ctx, "weapon100")
assert.Equal(t, time.Duration(globalTTLInSeconds)*time.Second, ttl)
})
@ -349,7 +349,7 @@ func TestRequestsWithGlobalTTL(t *testing.T) {
"ttlInSeconds": strconv.Itoa(requestTTL),
},
})
ttl, _ := ss.client.TTL(ss.ctx, "weapon100").Result()
ttl, _ := ss.client.TTLResult(ss.ctx, "weapon100")
assert.Equal(t, time.Duration(requestTTL)*time.Second, ttl)
})
@ -388,7 +388,7 @@ func TestRequestsWithGlobalTTL(t *testing.T) {
})
assert.Equal(t, nil, err)
res, err := c.Do(context.Background(), "HGETALL", "weapon").Result()
res, err := c.DoRead(context.Background(), "HGETALL", "weapon")
assert.Equal(t, nil, err)
vals := res.([]interface{})
@ -397,15 +397,15 @@ func TestRequestsWithGlobalTTL(t *testing.T) {
assert.Equal(t, ptr.Of("1"), version)
assert.Equal(t, `"deathstar"`, data)
res, err = c.Do(context.Background(), "TTL", "weapon").Result()
res, err = c.DoRead(context.Background(), "TTL", "weapon")
assert.Equal(t, nil, err)
assert.Equal(t, int64(globalTTLInSeconds), res)
res, err = c.Do(context.Background(), "TTL", "weapon2").Result()
res, err = c.DoRead(context.Background(), "TTL", "weapon2")
assert.Equal(t, nil, err)
assert.Equal(t, int64(123), res)
res, err = c.Do(context.Background(), "TTL", "weapon3").Result()
res, err = c.DoRead(context.Background(), "TTL", "weapon3")
assert.Equal(t, nil, err)
assert.Equal(t, int64(-1), res)
})
@ -432,7 +432,7 @@ func TestSetRequestWithTTL(t *testing.T) {
},
})
ttl, _ := ss.client.TTL(ss.ctx, "weapon100").Result()
ttl, _ := ss.client.TTLResult(ss.ctx, "weapon100")
assert.Equal(t, time.Duration(ttlInSeconds)*time.Second, ttl)
})
@ -443,7 +443,7 @@ func TestSetRequestWithTTL(t *testing.T) {
Value: "deathstar200",
})
ttl, _ := ss.client.TTL(ss.ctx, "weapon200").Result()
ttl, _ := ss.client.TTLResult(ss.ctx, "weapon200")
assert.Equal(t, time.Duration(-1), ttl)
})
@ -453,7 +453,7 @@ func TestSetRequestWithTTL(t *testing.T) {
Key: "weapon300",
Value: "deathstar300",
})
ttl, _ := ss.client.TTL(ss.ctx, "weapon300").Result()
ttl, _ := ss.client.TTLResult(ss.ctx, "weapon300")
assert.Equal(t, time.Duration(-1), ttl)
// make the key no longer persistent
@ -465,7 +465,7 @@ func TestSetRequestWithTTL(t *testing.T) {
"ttlInSeconds": strconv.Itoa(ttlInSeconds),
},
})
ttl, _ = ss.client.TTL(ss.ctx, "weapon300").Result()
ttl, _ = ss.client.TTLResult(ss.ctx, "weapon300")
assert.Equal(t, time.Duration(ttlInSeconds)*time.Second, ttl)
// make the key persistent again
@ -476,7 +476,7 @@ func TestSetRequestWithTTL(t *testing.T) {
"ttlInSeconds": strconv.Itoa(-1),
},
})
ttl, _ = ss.client.TTL(ss.ctx, "weapon300").Result()
ttl, _ = ss.client.TTLResult(ss.ctx, "weapon300")
assert.Equal(t, time.Duration(-1), ttl)
})
}
@ -508,7 +508,7 @@ func TestTransactionalDeleteNoEtag(t *testing.T) {
})
assert.Equal(t, nil, err)
res, err := c.Do(context.Background(), "HGETALL", "weapon100").Result()
res, err := c.DoRead(context.Background(), "HGETALL", "weapon100")
assert.Equal(t, nil, err)
vals := res.([]interface{})
@ -532,7 +532,7 @@ func TestGetMetadata(t *testing.T) {
assert.Equal(t, metadataInfo["idleCheckFrequency"], "redis.Duration")
}
func setupMiniredis() (*miniredis.Miniredis, *redis.Client) {
func setupMiniredis() (*miniredis.Miniredis, rediscomponent.RedisClient) {
s, err := miniredis.Run()
if err != nil {
panic(err)
@ -542,5 +542,5 @@ func setupMiniredis() (*miniredis.Miniredis, *redis.Client) {
DB: defaultDB,
}
return s, redis.NewClient(opts)
return s, rediscomponent.ClientFromV8Client(redis.NewClient(opts))
}

View File

@ -4,13 +4,8 @@ go 1.19
require (
dubbo.apache.org/dubbo-go/v3 v3.0.3-0.20220610080020-48691a404537
<<<<<<< HEAD
github.com/apache/dubbo-go-hessian2 v1.11.3
github.com/dapr/components-contrib v1.9.1-0.20221213185150-c5c985a68514
=======
github.com/apache/dubbo-go-hessian2 v1.11.5
github.com/dapr/components-contrib v1.9.6
>>>>>>> master
github.com/dapr/components-contrib/tests/certification v0.0.0-20211026011813-36b75e9ae272
github.com/dapr/dapr v1.9.5
github.com/dapr/go-sdk v1.6.0

View File

@ -3,11 +3,7 @@ module github.com/dapr/components-contrib/tests/certification/bindings/alicloud/
go 1.19
require (
<<<<<<< HEAD
github.com/dapr/components-contrib v1.9.1-0.20221213185150-c5c985a68514
=======
github.com/dapr/components-contrib v1.9.6
>>>>>>> master
github.com/dapr/components-contrib/tests/certification v0.0.0-20211026011813-36b75e9ae272
github.com/dapr/dapr v1.9.5
github.com/dapr/go-sdk v1.6.0

View File

@ -3,13 +3,8 @@ module github.com/dapr/components-contrib/tests/certification/bindings/azure/blo
go 1.19
require (
<<<<<<< HEAD
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.5.1
github.com/dapr/components-contrib v1.9.1-0.20221213185150-c5c985a68514
=======
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.6.1
github.com/dapr/components-contrib v1.9.6
>>>>>>> master
github.com/dapr/components-contrib/tests/certification v0.0.0-20211130185200-4918900c09e1
github.com/dapr/dapr v1.9.5
github.com/dapr/go-sdk v1.6.0

View File

@ -39,6 +39,7 @@ require (
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/swag v0.19.14 // indirect
github.com/go-redis/redis/v9 v9.0.0-rc.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect

View File

@ -155,6 +155,8 @@ github.com/go-openapi/swag v0.19.14 h1:gm3vOOXfiuw5i9p5N9xJvfjvuofpyvLA9Wr6QfK5F
github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-redis/redis/v9 v9.0.0-rc.2 h1:IN1eI8AvJJeWHjMW/hlFAv2sAfvTun2DVksDDJ3a6a0=
github.com/go-redis/redis/v9 v9.0.0-rc.2/go.mod h1:cgBknjwcBJa2prbnuHH/4k/Mlj4r0pWNV2HBanHujfY=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
@ -378,7 +380,7 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo/v2 v2.4.0 h1:+Ig9nvqgS5OBSACXNk15PLdp0U9XPYROt9CFzVdFGIs=
github.com/onsi/gomega v1.23.0 h1:/oxKu9c2HVap+F3PfKort2Hw5DEU+HGlW8n+tguWsys=
github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/openzipkin/zipkin-go v0.4.1 h1:kNd/ST2yLLWhaWrkgchya40TJabe8Hioj9udfPcEO5A=
github.com/openzipkin/zipkin-go v0.4.1/go.mod h1:qY0VqDSN1pOBN94dBc6w2GJlWLiovAyg7Qt6/I9HecM=

View File

@ -40,6 +40,7 @@ require (
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/swag v0.19.14 // indirect
github.com/go-redis/redis/v9 v9.0.0-rc.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect

View File

@ -155,6 +155,8 @@ github.com/go-openapi/swag v0.19.14 h1:gm3vOOXfiuw5i9p5N9xJvfjvuofpyvLA9Wr6QfK5F
github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-redis/redis/v9 v9.0.0-rc.2 h1:IN1eI8AvJJeWHjMW/hlFAv2sAfvTun2DVksDDJ3a6a0=
github.com/go-redis/redis/v9 v9.0.0-rc.2/go.mod h1:cgBknjwcBJa2prbnuHH/4k/Mlj4r0pWNV2HBanHujfY=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
@ -378,7 +380,7 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo/v2 v2.4.0 h1:+Ig9nvqgS5OBSACXNk15PLdp0U9XPYROt9CFzVdFGIs=
github.com/onsi/gomega v1.23.0 h1:/oxKu9c2HVap+F3PfKort2Hw5DEU+HGlW8n+tguWsys=
github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/openzipkin/zipkin-go v0.4.1 h1:kNd/ST2yLLWhaWrkgchya40TJabe8Hioj9udfPcEO5A=
github.com/openzipkin/zipkin-go v0.4.1/go.mod h1:qY0VqDSN1pOBN94dBc6w2GJlWLiovAyg7Qt6/I9HecM=

View File

@ -0,0 +1,17 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: redis-binding
namespace: default
spec:
type: bindings.redis
version: v1
metadata:
- name: redisHost
value: localhost:6380
- name: redisPassword
value:
- name: enableTLS
value: false
- name: redisVersion
value: "7"

View File

@ -7,7 +7,12 @@
## method: specific to http component, what method to use
componentType: bindings
components:
- component: redis
- component: redis.v6
operations: ["create", "operations"]
config:
output:
key: $((uuid))
- component: redis.v7
operations: ["create", "operations"]
config:
output:

View File

@ -0,0 +1,20 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6380
- name: redisPassword
value: ""
- name: consumerID
value: "testConsumer"
- name: processingTimeout
value: 5s
- name: redeliverInterval
value: 1s
- name: concurrency
value: 1

View File

@ -41,7 +41,11 @@ components:
testMultiTopic1Name: dapr-conf-queue-multi1
testMultiTopic2Name: dapr-conf-queue-multi2
checkInOrderProcessing: false
- component: redis
- component: redis.v6
operations: ["publish", "subscribe", "multiplehandlers"]
config:
checkInOrderProcessing: false
- component: redis.v7
operations: ["publish", "subscribe", "multiplehandlers"]
config:
checkInOrderProcessing: false

View File

@ -0,0 +1,13 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
metadata:
- name: redisHost
value: localhost:6380
- name: redisPassword
value: ""
- name: redisVersion
value: "7"

View File

@ -1,8 +1,11 @@
# Supported operations: set, get, delete, bulkset, bulkdelete, transaction, etag, first-write, query, ttl
componentType: state
components:
- component: redis
- component: redis.v6
allOperations: true
- component: redis.v7
allOperations: false
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "first-write" ]
- component: mongodb
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "first-write", "query" ]
- component: memcached

View File

@ -93,7 +93,8 @@ import (
const (
eventhubs = "azure.eventhubs"
redis = "redis"
redisv6 = "redis.v6"
redisv7 = "redis.v7"
kafka = "kafka"
mqtt = "mqtt"
generateUUID = "$((uuid))"
@ -375,7 +376,9 @@ func (tc *TestConfiguration) Run(t *testing.T) {
func loadPubSub(tc TestComponent) pubsub.PubSub {
var pubsub pubsub.PubSub
switch tc.Component {
case redis:
case redisv6:
pubsub = p_redis.NewRedisStreams(testLogger)
case redisv7:
pubsub = p_redis.NewRedisStreams(testLogger)
case eventhubs:
pubsub = p_eventhubs.NewAzureEventHubs(testLogger)
@ -435,7 +438,9 @@ func loadSecretStore(tc TestComponent) secretstores.SecretStore {
func loadStateStore(tc TestComponent) state.Store {
var store state.Store
switch tc.Component {
case redis:
case redisv6:
store = s_redis.NewRedisStateStore(testLogger)
case redisv7:
store = s_redis.NewRedisStateStore(testLogger)
case "azure.blobstorage":
store = s_blobstorage.NewAzureBlobStorageStore(testLogger)
@ -478,7 +483,9 @@ func loadOutputBindings(tc TestComponent) bindings.OutputBinding {
var binding bindings.OutputBinding
switch tc.Component {
case redis:
case redisv6:
binding = b_redis.NewRedis(testLogger)
case redisv7:
binding = b_redis.NewRedis(testLogger)
case "azure.blobstorage":
binding = b_azure_blobstorage.NewAzureBlobStorage(testLogger)

View File

@ -511,6 +511,7 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
}()
for i := 0; i < 3; i++ {
t.Logf("Starting iteration %d", i)
switch i {
case 1: // On iteration 1, close the first subscriber
subscribe1Cancel()
@ -547,11 +548,11 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
Topic: topic,
Metadata: config.PublishMetadata,
})
assert.NoError(t, err, "expected no error on publishing data %s on topic %s", data, topic)
assert.NoError(t, err, "expected no error on publishing data %s on topic %s", string(data), topic)
}
allSentCh <- true
t.Logf("waiting for %v to complete read", config.MaxReadDuration)
t.Logf("Waiting for %v to complete read", config.MaxReadDuration)
<-wait
}
})