Support Increment Operation, TTL in Redis Binding (#2654)

Signed-off-by: Bernd Verst <github@bernd.dev>
This commit is contained in:
Bernd Verst 2023-03-09 13:23:04 -08:00 committed by GitHub
parent 85feb7c3d7
commit 6feae775e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 240 additions and 14 deletions

View File

@ -20,6 +20,7 @@ import (
"github.com/dapr/components-contrib/bindings"
rediscomponent "github.com/dapr/components-contrib/internal/component/redis"
contribMetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
)
@ -30,6 +31,11 @@ type Redis struct {
logger logger.Logger
}
const (
// IncrementOperation is the operation to increment a key.
IncrementOperation bindings.OperationKind = "increment"
)
// NewRedis returns a new redis bindings instance.
func NewRedis(logger logger.Logger) bindings.OutputBinding {
return &Redis{logger: logger}
@ -63,9 +69,25 @@ func (r *Redis) Operations() []bindings.OperationKind {
bindings.CreateOperation,
bindings.DeleteOperation,
bindings.GetOperation,
IncrementOperation,
}
}
func (r *Redis) expireKeyIfRequested(ctx context.Context, requestMetadata map[string]string, key string) error {
// get ttl from request metadata
ttl, ok, err := contribMetadata.TryGetTTL(requestMetadata)
if err != nil {
return err
}
if ok {
errExpire := r.client.DoWrite(ctx, "EXPIRE", key, int(ttl.Seconds()))
if errExpire != nil {
return errExpire
}
}
return nil
}
func (r *Redis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
if key, ok := req.Metadata["key"]; ok && key != "" {
switch req.Operation {
@ -77,6 +99,9 @@ func (r *Redis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindi
case bindings.GetOperation:
data, err := r.client.Get(ctx, key)
if err != nil {
if err.Error() == "redis: nil" {
return &bindings.InvokeResponse{}, nil
}
return nil, err
}
rep := &bindings.InvokeResponse{}
@ -87,6 +112,19 @@ func (r *Redis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindi
if err != nil {
return nil, err
}
err = r.expireKeyIfRequested(ctx, req.Metadata, key)
if err != nil {
return nil, err
}
case IncrementOperation:
err := r.client.DoWrite(ctx, "INCR", key)
if err != nil {
return nil, err
}
err = r.expireKeyIfRequested(ctx, req.Metadata, key)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("invalid operation type: %s", req.Operation)
}

View File

@ -16,6 +16,7 @@ package redis
import (
"context"
"testing"
"time"
miniredis "github.com/alicebob/miniredis/v2"
"github.com/go-redis/redis/v8"
@ -23,6 +24,7 @@ import (
"github.com/dapr/components-contrib/bindings"
internalredis "github.com/dapr/components-contrib/internal/component/redis"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
)
@ -105,6 +107,104 @@ func TestInvokeDelete(t *testing.T) {
assert.Equal(t, nil, rgetRep)
}
func TestCreateExpire(t *testing.T) {
s, c := setupMiniredis()
defer s.Close()
bind := &Redis{
client: c,
logger: logger.NewLogger("test"),
}
_, err := bind.Invoke(context.TODO(), &bindings.InvokeRequest{
Metadata: map[string]string{"key": testKey, metadata.TTLMetadataKey: "1"},
Operation: bindings.CreateOperation,
Data: []byte(testData),
})
assert.Equal(t, nil, err)
rgetRep, err := c.DoRead(context.Background(), "TTL", testKey)
assert.Nil(t, err)
assert.Equal(t, int64(1), rgetRep)
res, err2 := bind.Invoke(context.TODO(), &bindings.InvokeRequest{
Metadata: map[string]string{"key": testKey},
Operation: bindings.GetOperation,
})
assert.Equal(t, nil, err2)
assert.Equal(t, res.Data, []byte(testData))
// wait for ttl to expire
s.FastForward(2 * time.Second)
res, err2 = bind.Invoke(context.TODO(), &bindings.InvokeRequest{
Metadata: map[string]string{"key": testKey},
Operation: bindings.GetOperation,
})
assert.Nil(t, err2)
assert.Equal(t, []byte(nil), res.Data)
_, err = bind.Invoke(context.TODO(), &bindings.InvokeRequest{
Metadata: map[string]string{"key": testKey},
Operation: bindings.DeleteOperation,
})
assert.Equal(t, nil, err)
}
func TestIncrement(t *testing.T) {
s, c := setupMiniredis()
defer s.Close()
bind := &Redis{
client: c,
logger: logger.NewLogger("test"),
}
_, err := bind.Invoke(context.TODO(), &bindings.InvokeRequest{
Metadata: map[string]string{"key": "incKey"},
Operation: IncrementOperation,
})
assert.Equal(t, nil, err)
res, err2 := bind.Invoke(context.TODO(), &bindings.InvokeRequest{
Metadata: map[string]string{"key": "incKey"},
Operation: bindings.GetOperation,
})
assert.Nil(t, nil, err2)
assert.Equal(t, res.Data, []byte("1"))
_, err = bind.Invoke(context.TODO(), &bindings.InvokeRequest{
Metadata: map[string]string{"key": "incKey", metadata.TTLMetadataKey: "5"},
Operation: IncrementOperation,
})
assert.Nil(t, err)
rgetRep, err := c.DoRead(context.Background(), "TTL", "incKey")
assert.Nil(t, err)
assert.Equal(t, int64(5), rgetRep)
res, err2 = bind.Invoke(context.TODO(), &bindings.InvokeRequest{
Metadata: map[string]string{"key": "incKey"},
Operation: bindings.GetOperation,
})
assert.Equal(t, nil, err2)
assert.Equal(t, []byte("2"), res.Data)
// wait for ttl to expire
s.FastForward(10 * time.Second)
res, err2 = bind.Invoke(context.TODO(), &bindings.InvokeRequest{
Metadata: map[string]string{"key": "incKey"},
Operation: bindings.GetOperation,
})
assert.Nil(t, err2)
assert.Equal(t, []byte(nil), res.Data)
_, err = bind.Invoke(context.TODO(), &bindings.InvokeRequest{
Metadata: map[string]string{"key": "incKey"},
Operation: bindings.DeleteOperation,
})
assert.Equal(t, nil, err)
}
func setupMiniredis() (*miniredis.Miniredis, internalredis.RedisClient) {
s, err := miniredis.Run()
if err != nil {

View File

@ -9,7 +9,7 @@ spec:
ignoreErrors: true
metadata:
- name: redisHost
value: "localhost:6379"
value: "localhost:6399"
- name: redisPassword
value: ""
- name: dialTimeout

View File

@ -7,6 +7,6 @@ spec:
version: v1
metadata:
- name: redisHost
value: "localhost:6379"
value: "localhost:6399"
- name: redisPassword
value: ""

View File

@ -3,5 +3,5 @@ services:
redis:
image: 'redislabs/redisearch:latest'
ports:
- '6379:6379'
- '6399:6379'
command: redis-server

View File

@ -2,6 +2,9 @@ package redisbinding_test
import (
"fmt"
"testing"
"time"
"github.com/dapr/components-contrib/bindings"
bindingRedis "github.com/dapr/components-contrib/bindings/redis"
"github.com/dapr/components-contrib/tests/certification/embedded"
@ -17,8 +20,6 @@ import (
"github.com/dapr/kit/logger"
"github.com/go-redis/redis/v8"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
const (
@ -53,7 +54,7 @@ func TestRedisBinding(t *testing.T) {
assert.NoError(t, err)
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // host:port of the redis server
Addr: "localhost:6399", // host:port of the redis server
Password: "", // no password set
DB: 0, // use default DB
})
@ -69,9 +70,82 @@ func TestRedisBinding(t *testing.T) {
return nil
}
testInvokeCreateIncr := func(ctx flow.Context) error {
client, clientErr := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
if clientErr != nil {
panic(clientErr)
}
defer client.Close()
invokeRequest := &daprClient.InvokeBindingRequest{
Name: bindingName,
Operation: string(bindings.CreateOperation),
Data: []byte("hello"),
Metadata: map[string]string{"key": "expireKey", "ttlInSeconds": "2"},
}
err := client.InvokeOutputBinding(ctx, invokeRequest)
assert.NoError(t, err)
invokeRequest = &daprClient.InvokeBindingRequest{
Name: bindingName,
Operation: string(bindings.CreateOperation),
Data: []byte("41"),
Metadata: map[string]string{"key": "incKey"},
}
err = client.InvokeOutputBinding(ctx, invokeRequest)
assert.NoError(t, err)
invokeRequest = &daprClient.InvokeBindingRequest{
Name: bindingName,
Operation: "increment",
Metadata: map[string]string{"key": "incKey", "ttlInSeconds": "2"},
}
err = client.InvokeOutputBinding(ctx, invokeRequest)
assert.NoError(t, err)
invokeRequest = &daprClient.InvokeBindingRequest{
Name: bindingName,
Operation: string(bindings.GetOperation),
Metadata: map[string]string{"key": "incKey"},
}
out, err2 := client.InvokeBinding(ctx, invokeRequest)
assert.NoError(t, err2)
assert.Equal(t, "42", string(out.Data))
time.Sleep(3 * time.Second)
// all keys should be expired now
invokeRequest = &daprClient.InvokeBindingRequest{
Name: bindingName,
Operation: string(bindings.GetOperation),
Metadata: map[string]string{"key": "incKey"},
}
out, err2 = client.InvokeBinding(ctx, invokeRequest)
assert.NoError(t, err2)
assert.Equal(t, []byte(nil), out.Data)
invokeRequest = &daprClient.InvokeBindingRequest{
Name: bindingName,
Operation: string(bindings.GetOperation),
Metadata: map[string]string{"key": "expireKey"},
}
out, err2 = client.InvokeBinding(ctx, invokeRequest)
assert.NoError(t, err2)
assert.Equal(t, []byte(nil), out.Data)
return nil
}
checkRedisConnection := func(ctx flow.Context) error {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // host:port of the redis server
Addr: "localhost:6399", // host:port of the redis server
Password: "", // no password set
DB: 0, // use default DB
})
@ -91,7 +165,7 @@ func TestRedisBinding(t *testing.T) {
testCheckInsertedData := func(ctx flow.Context) error {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // host:port of the redis server
Addr: "localhost:6399", // host:port of the redis server
Password: "", // no password set
DB: 0, // use default DB
})
@ -112,7 +186,7 @@ func TestRedisBinding(t *testing.T) {
Step("Waiting for Redis Readiness...", retry.Do(time.Second*3, 10, checkRedisConnection)).
Step(sidecar.Run(sidecarName,
embedded.WithoutApp(),
embedded.WithComponentsPath("components/standard"),
embedded.WithResourcesPath("components/standard"),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
componentRuntimeOptions(),
@ -126,7 +200,7 @@ func TestRedisBinding(t *testing.T) {
Step("Waiting for Redis Readiness...", retry.Do(time.Second*3, 10, checkRedisConnection)).
Step(sidecar.Run(sidecarName,
embedded.WithoutApp(),
embedded.WithComponentsPath("components/standard"),
embedded.WithResourcesPath("components/standard"),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
componentRuntimeOptions(),
@ -147,7 +221,7 @@ func TestRedisBinding(t *testing.T) {
Step("Waiting for Redis Readiness...", retry.Do(time.Second*3, 10, checkRedisConnection)).
Step(sidecar.Run(sidecarName,
embedded.WithoutApp(),
embedded.WithComponentsPath("components/retryOptions"),
embedded.WithResourcesPath("components/retryOptions"),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
componentRuntimeOptions(),
@ -155,12 +229,26 @@ func TestRedisBinding(t *testing.T) {
Step("Waiting for the component to start", flow.Sleep(10*time.Second)).
Step("Stop Redis server", dockercompose.Stop("redis", dockerComposeYAML)).
Step("Start Redis server", dockercompose.Start("redis", dockerComposeYAML)).
//After restarting Redis, it usually takes a couple of seconds for the container to start but
//since we have retry strategies and connection timeouts configured, the client will retry if it is
//not able to establish a connection to the Redis server
// After restarting Redis, it usually takes a couple of seconds for the container to start but
// since we have retry strategies and connection timeouts configured, the client will retry if it is
// not able to establish a connection to the Redis server
Step("Insert data into the redis data store during the server restart", testInvokeCreate).
Step("Check if the data is accessible after the server is up again", testCheckInsertedData).
Run()
flow.New(t, "Test Redis Output Binding CREATE and INCREMENT operation with EXPIRE").
Step(dockercompose.Run("redis", dockerComposeYAML)).
Step("Waiting for Redis Readiness...", retry.Do(time.Second*3, 10, checkRedisConnection)).
Step(sidecar.Run(sidecarName,
embedded.WithoutApp(),
embedded.WithResourcesPath("components/standard"),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
componentRuntimeOptions(),
)).
Step("Waiting for the component to start", flow.Sleep(10*time.Second)).
Step("Insert data and increment data, with expiration options.", testInvokeCreateIncr).
Run()
}
func componentRuntimeOptions() []runtime.Option {