diff --git a/bindings/redis/redis.go b/bindings/redis/redis.go index 03551202c..794fc20d3 100644 --- a/bindings/redis/redis.go +++ b/bindings/redis/redis.go @@ -20,6 +20,7 @@ import ( "github.com/dapr/components-contrib/bindings" rediscomponent "github.com/dapr/components-contrib/internal/component/redis" + contribMetadata "github.com/dapr/components-contrib/metadata" "github.com/dapr/kit/logger" ) @@ -30,6 +31,11 @@ type Redis struct { logger logger.Logger } +const ( + // IncrementOperation is the operation to increment a key. + IncrementOperation bindings.OperationKind = "increment" +) + // NewRedis returns a new redis bindings instance. func NewRedis(logger logger.Logger) bindings.OutputBinding { return &Redis{logger: logger} @@ -63,9 +69,25 @@ func (r *Redis) Operations() []bindings.OperationKind { bindings.CreateOperation, bindings.DeleteOperation, bindings.GetOperation, + IncrementOperation, } } +func (r *Redis) expireKeyIfRequested(ctx context.Context, requestMetadata map[string]string, key string) error { + // get ttl from request metadata + ttl, ok, err := contribMetadata.TryGetTTL(requestMetadata) + if err != nil { + return err + } + if ok { + errExpire := r.client.DoWrite(ctx, "EXPIRE", key, int(ttl.Seconds())) + if errExpire != nil { + return errExpire + } + } + return nil +} + func (r *Redis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { if key, ok := req.Metadata["key"]; ok && key != "" { switch req.Operation { @@ -77,6 +99,9 @@ func (r *Redis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindi case bindings.GetOperation: data, err := r.client.Get(ctx, key) if err != nil { + if err.Error() == "redis: nil" { + return &bindings.InvokeResponse{}, nil + } return nil, err } rep := &bindings.InvokeResponse{} @@ -87,6 +112,19 @@ func (r *Redis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindi if err != nil { return nil, err } + err = r.expireKeyIfRequested(ctx, req.Metadata, key) + if err != nil { + return nil, err + } + case IncrementOperation: + err := r.client.DoWrite(ctx, "INCR", key) + if err != nil { + return nil, err + } + err = r.expireKeyIfRequested(ctx, req.Metadata, key) + if err != nil { + return nil, err + } default: return nil, fmt.Errorf("invalid operation type: %s", req.Operation) } diff --git a/bindings/redis/redis_test.go b/bindings/redis/redis_test.go index 59d71924b..14dfe9e5b 100644 --- a/bindings/redis/redis_test.go +++ b/bindings/redis/redis_test.go @@ -16,6 +16,7 @@ package redis import ( "context" "testing" + "time" miniredis "github.com/alicebob/miniredis/v2" "github.com/go-redis/redis/v8" @@ -23,6 +24,7 @@ import ( "github.com/dapr/components-contrib/bindings" internalredis "github.com/dapr/components-contrib/internal/component/redis" + "github.com/dapr/components-contrib/metadata" "github.com/dapr/kit/logger" ) @@ -105,6 +107,104 @@ func TestInvokeDelete(t *testing.T) { assert.Equal(t, nil, rgetRep) } +func TestCreateExpire(t *testing.T) { + s, c := setupMiniredis() + defer s.Close() + + bind := &Redis{ + client: c, + logger: logger.NewLogger("test"), + } + _, err := bind.Invoke(context.TODO(), &bindings.InvokeRequest{ + Metadata: map[string]string{"key": testKey, metadata.TTLMetadataKey: "1"}, + Operation: bindings.CreateOperation, + Data: []byte(testData), + }) + assert.Equal(t, nil, err) + + rgetRep, err := c.DoRead(context.Background(), "TTL", testKey) + assert.Nil(t, err) + assert.Equal(t, int64(1), rgetRep) + + res, err2 := bind.Invoke(context.TODO(), &bindings.InvokeRequest{ + Metadata: map[string]string{"key": testKey}, + Operation: bindings.GetOperation, + }) + assert.Equal(t, nil, err2) + assert.Equal(t, res.Data, []byte(testData)) + + // wait for ttl to expire + s.FastForward(2 * time.Second) + + res, err2 = bind.Invoke(context.TODO(), &bindings.InvokeRequest{ + Metadata: map[string]string{"key": testKey}, + Operation: bindings.GetOperation, + }) + assert.Nil(t, err2) + assert.Equal(t, []byte(nil), res.Data) + + _, err = bind.Invoke(context.TODO(), &bindings.InvokeRequest{ + Metadata: map[string]string{"key": testKey}, + Operation: bindings.DeleteOperation, + }) + assert.Equal(t, nil, err) +} + +func TestIncrement(t *testing.T) { + s, c := setupMiniredis() + defer s.Close() + + bind := &Redis{ + client: c, + logger: logger.NewLogger("test"), + } + _, err := bind.Invoke(context.TODO(), &bindings.InvokeRequest{ + Metadata: map[string]string{"key": "incKey"}, + Operation: IncrementOperation, + }) + assert.Equal(t, nil, err) + + res, err2 := bind.Invoke(context.TODO(), &bindings.InvokeRequest{ + Metadata: map[string]string{"key": "incKey"}, + Operation: bindings.GetOperation, + }) + assert.Nil(t, nil, err2) + assert.Equal(t, res.Data, []byte("1")) + + _, err = bind.Invoke(context.TODO(), &bindings.InvokeRequest{ + Metadata: map[string]string{"key": "incKey", metadata.TTLMetadataKey: "5"}, + Operation: IncrementOperation, + }) + assert.Nil(t, err) + + rgetRep, err := c.DoRead(context.Background(), "TTL", "incKey") + assert.Nil(t, err) + assert.Equal(t, int64(5), rgetRep) + + res, err2 = bind.Invoke(context.TODO(), &bindings.InvokeRequest{ + Metadata: map[string]string{"key": "incKey"}, + Operation: bindings.GetOperation, + }) + assert.Equal(t, nil, err2) + assert.Equal(t, []byte("2"), res.Data) + + // wait for ttl to expire + s.FastForward(10 * time.Second) + + res, err2 = bind.Invoke(context.TODO(), &bindings.InvokeRequest{ + Metadata: map[string]string{"key": "incKey"}, + Operation: bindings.GetOperation, + }) + assert.Nil(t, err2) + assert.Equal(t, []byte(nil), res.Data) + + _, err = bind.Invoke(context.TODO(), &bindings.InvokeRequest{ + Metadata: map[string]string{"key": "incKey"}, + Operation: bindings.DeleteOperation, + }) + assert.Equal(t, nil, err) +} + func setupMiniredis() (*miniredis.Miniredis, internalredis.RedisClient) { s, err := miniredis.Run() if err != nil { diff --git a/tests/certification/bindings/redis/components/retryOptions/redis.yaml b/tests/certification/bindings/redis/components/retryOptions/redis.yaml index 95cfe3542..57a1a102d 100644 --- a/tests/certification/bindings/redis/components/retryOptions/redis.yaml +++ b/tests/certification/bindings/redis/components/retryOptions/redis.yaml @@ -9,7 +9,7 @@ spec: ignoreErrors: true metadata: - name: redisHost - value: "localhost:6379" + value: "localhost:6399" - name: redisPassword value: "" - name: dialTimeout diff --git a/tests/certification/bindings/redis/components/standard/redis.yaml b/tests/certification/bindings/redis/components/standard/redis.yaml index 650340452..9330d79b4 100644 --- a/tests/certification/bindings/redis/components/standard/redis.yaml +++ b/tests/certification/bindings/redis/components/standard/redis.yaml @@ -7,6 +7,6 @@ spec: version: v1 metadata: - name: redisHost - value: "localhost:6379" + value: "localhost:6399" - name: redisPassword value: "" diff --git a/tests/certification/bindings/redis/docker-compose.yml b/tests/certification/bindings/redis/docker-compose.yml index aa394dcca..54607c06e 100644 --- a/tests/certification/bindings/redis/docker-compose.yml +++ b/tests/certification/bindings/redis/docker-compose.yml @@ -3,5 +3,5 @@ services: redis: image: 'redislabs/redisearch:latest' ports: - - '6379:6379' + - '6399:6379' command: redis-server diff --git a/tests/certification/bindings/redis/redis_test.go b/tests/certification/bindings/redis/redis_test.go index 79899d6df..425b02fca 100644 --- a/tests/certification/bindings/redis/redis_test.go +++ b/tests/certification/bindings/redis/redis_test.go @@ -2,6 +2,9 @@ package redisbinding_test import ( "fmt" + "testing" + "time" + "github.com/dapr/components-contrib/bindings" bindingRedis "github.com/dapr/components-contrib/bindings/redis" "github.com/dapr/components-contrib/tests/certification/embedded" @@ -17,8 +20,6 @@ import ( "github.com/dapr/kit/logger" "github.com/go-redis/redis/v8" "github.com/stretchr/testify/assert" - "testing" - "time" ) const ( @@ -53,7 +54,7 @@ func TestRedisBinding(t *testing.T) { assert.NoError(t, err) rdb := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", // host:port of the redis server + Addr: "localhost:6399", // host:port of the redis server Password: "", // no password set DB: 0, // use default DB }) @@ -69,9 +70,82 @@ func TestRedisBinding(t *testing.T) { return nil } + testInvokeCreateIncr := func(ctx flow.Context) error { + client, clientErr := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort)) + if clientErr != nil { + panic(clientErr) + } + defer client.Close() + + invokeRequest := &daprClient.InvokeBindingRequest{ + Name: bindingName, + Operation: string(bindings.CreateOperation), + Data: []byte("hello"), + Metadata: map[string]string{"key": "expireKey", "ttlInSeconds": "2"}, + } + + err := client.InvokeOutputBinding(ctx, invokeRequest) + assert.NoError(t, err) + + invokeRequest = &daprClient.InvokeBindingRequest{ + Name: bindingName, + Operation: string(bindings.CreateOperation), + Data: []byte("41"), + Metadata: map[string]string{"key": "incKey"}, + } + + err = client.InvokeOutputBinding(ctx, invokeRequest) + assert.NoError(t, err) + + invokeRequest = &daprClient.InvokeBindingRequest{ + Name: bindingName, + Operation: "increment", + Metadata: map[string]string{"key": "incKey", "ttlInSeconds": "2"}, + } + + err = client.InvokeOutputBinding(ctx, invokeRequest) + assert.NoError(t, err) + + invokeRequest = &daprClient.InvokeBindingRequest{ + Name: bindingName, + Operation: string(bindings.GetOperation), + Metadata: map[string]string{"key": "incKey"}, + } + + out, err2 := client.InvokeBinding(ctx, invokeRequest) + assert.NoError(t, err2) + assert.Equal(t, "42", string(out.Data)) + + time.Sleep(3 * time.Second) + + // all keys should be expired now + + invokeRequest = &daprClient.InvokeBindingRequest{ + Name: bindingName, + Operation: string(bindings.GetOperation), + Metadata: map[string]string{"key": "incKey"}, + } + + out, err2 = client.InvokeBinding(ctx, invokeRequest) + assert.NoError(t, err2) + assert.Equal(t, []byte(nil), out.Data) + + invokeRequest = &daprClient.InvokeBindingRequest{ + Name: bindingName, + Operation: string(bindings.GetOperation), + Metadata: map[string]string{"key": "expireKey"}, + } + + out, err2 = client.InvokeBinding(ctx, invokeRequest) + assert.NoError(t, err2) + assert.Equal(t, []byte(nil), out.Data) + + return nil + } + checkRedisConnection := func(ctx flow.Context) error { rdb := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", // host:port of the redis server + Addr: "localhost:6399", // host:port of the redis server Password: "", // no password set DB: 0, // use default DB }) @@ -91,7 +165,7 @@ func TestRedisBinding(t *testing.T) { testCheckInsertedData := func(ctx flow.Context) error { rdb := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", // host:port of the redis server + Addr: "localhost:6399", // host:port of the redis server Password: "", // no password set DB: 0, // use default DB }) @@ -112,7 +186,7 @@ func TestRedisBinding(t *testing.T) { Step("Waiting for Redis Readiness...", retry.Do(time.Second*3, 10, checkRedisConnection)). Step(sidecar.Run(sidecarName, embedded.WithoutApp(), - embedded.WithComponentsPath("components/standard"), + embedded.WithResourcesPath("components/standard"), embedded.WithDaprGRPCPort(grpcPort), embedded.WithDaprHTTPPort(httpPort), componentRuntimeOptions(), @@ -126,7 +200,7 @@ func TestRedisBinding(t *testing.T) { Step("Waiting for Redis Readiness...", retry.Do(time.Second*3, 10, checkRedisConnection)). Step(sidecar.Run(sidecarName, embedded.WithoutApp(), - embedded.WithComponentsPath("components/standard"), + embedded.WithResourcesPath("components/standard"), embedded.WithDaprGRPCPort(grpcPort), embedded.WithDaprHTTPPort(httpPort), componentRuntimeOptions(), @@ -147,7 +221,7 @@ func TestRedisBinding(t *testing.T) { Step("Waiting for Redis Readiness...", retry.Do(time.Second*3, 10, checkRedisConnection)). Step(sidecar.Run(sidecarName, embedded.WithoutApp(), - embedded.WithComponentsPath("components/retryOptions"), + embedded.WithResourcesPath("components/retryOptions"), embedded.WithDaprGRPCPort(grpcPort), embedded.WithDaprHTTPPort(httpPort), componentRuntimeOptions(), @@ -155,12 +229,26 @@ func TestRedisBinding(t *testing.T) { Step("Waiting for the component to start", flow.Sleep(10*time.Second)). Step("Stop Redis server", dockercompose.Stop("redis", dockerComposeYAML)). Step("Start Redis server", dockercompose.Start("redis", dockerComposeYAML)). - //After restarting Redis, it usually takes a couple of seconds for the container to start but - //since we have retry strategies and connection timeouts configured, the client will retry if it is - //not able to establish a connection to the Redis server + // After restarting Redis, it usually takes a couple of seconds for the container to start but + // since we have retry strategies and connection timeouts configured, the client will retry if it is + // not able to establish a connection to the Redis server Step("Insert data into the redis data store during the server restart", testInvokeCreate). Step("Check if the data is accessible after the server is up again", testCheckInsertedData). Run() + + flow.New(t, "Test Redis Output Binding CREATE and INCREMENT operation with EXPIRE"). + Step(dockercompose.Run("redis", dockerComposeYAML)). + Step("Waiting for Redis Readiness...", retry.Do(time.Second*3, 10, checkRedisConnection)). + Step(sidecar.Run(sidecarName, + embedded.WithoutApp(), + embedded.WithResourcesPath("components/standard"), + embedded.WithDaprGRPCPort(grpcPort), + embedded.WithDaprHTTPPort(httpPort), + componentRuntimeOptions(), + )). + Step("Waiting for the component to start", flow.Sleep(10*time.Second)). + Step("Insert data and increment data, with expiration options.", testInvokeCreateIncr). + Run() } func componentRuntimeOptions() []runtime.Option {