components-contrib/tests/certification/configuration/redis/redis_test.go

424 lines
13 KiB
Go

/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package redis_test
import (
"encoding/json"
"strconv"
"testing"
"time"
"github.com/dapr/components-contrib/configuration"
config_redis "github.com/dapr/components-contrib/configuration/redis"
"github.com/dapr/components-contrib/tests/certification/embedded"
"github.com/dapr/components-contrib/tests/certification/flow"
"github.com/dapr/components-contrib/tests/certification/flow/dockercompose"
"github.com/dapr/components-contrib/tests/certification/flow/network"
"github.com/dapr/components-contrib/tests/certification/flow/retry"
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
cu_redis "github.com/dapr/components-contrib/tests/utils/configupdater/redis"
configuration_loader "github.com/dapr/dapr/pkg/components/configuration"
dapr_testing "github.com/dapr/dapr/pkg/testing"
dapr "github.com/dapr/go-sdk/client"
"github.com/dapr/kit/logger"
redis "github.com/go-redis/redis/v8"
"github.com/stretchr/testify/assert"
"github.com/dapr/components-contrib/tests/certification/flow/watcher"
)
type testType int
const (
basicTest testType = iota
allOperationsTest
redisDBTest
)
const (
dockerComposeYAML = "docker-compose.yml"
storeName = "configstore"
key1 = "key1"
key2 = "key2"
subscribedMessageWaitDuration = 1000 * time.Millisecond
sidecarName1 = "dapr-1"
)
var subscribeID string
// Cast go-sdk ConfigurationItem to contrib ConfigurationItem
func castConfigurationItems(items map[string]*dapr.ConfigurationItem) map[string]*configuration.Item {
configItems := make(map[string]*configuration.Item)
for key, item := range items {
configItems[key] = &configuration.Item{
Value: item.Value,
Version: item.Version,
Metadata: item.Metadata,
}
}
return configItems
}
// Create UpdateEvent struct from given key, val pair
func getUpdateEvent(key string, val string) configuration.UpdateEvent {
expectedUpdateEvent := configuration.UpdateEvent{
Items: map[string]*configuration.Item{
key: {
Value: val,
},
},
}
return expectedUpdateEvent
}
// Run redis commands and and add them in watcher
func runRedisCommands(ctx flow.Context, updater *cu_redis.ConfigUpdater, messages *watcher.Watcher, test testType) error {
var scenarios []struct {
cmd []interface{}
want [][]string
waitDuration time.Duration
nowant bool
}
switch test {
case basicTest:
scenarios = []struct {
cmd []interface{}
want [][]string
waitDuration time.Duration
nowant bool
}{
{
cmd: []interface{}{"set", key1, "val1"},
want: [][]string{{key1, "val1"}},
},
{
cmd: []interface{}{"mset", key1, "val1", key2, "val2"},
want: [][]string{{key1, "val1"}, {key2, "val2"}},
},
}
case allOperationsTest:
scenarios = []struct {
cmd []interface{}
want [][]string
waitDuration time.Duration
nowant bool
}{
{
cmd: []interface{}{"set", key1, "val1"},
want: [][]string{{key1, "val1"}},
},
{
cmd: []interface{}{"mset", key1, "val1", key2, "val2"},
want: [][]string{{key1, "val1"}, {key2, "val2"}},
},
{
cmd: []interface{}{"copy", key1, key2, "REPLACE"},
want: [][]string{{key2, "val1"}},
},
{
cmd: []interface{}{"append", key1, "-append"},
want: [][]string{{key1, "val1-append"}},
},
{
cmd: []interface{}{"setrange", key1, 4, "-offset"},
want: [][]string{{key1, "val1-offset"}},
},
{
cmd: []interface{}{"expire", key1, 3},
want: [][]string{{key1, "val1-offset"}, {key1, ""}},
// This wait duration is required because `expire` command would generate an `expired` event for the key
// after the expiration time (3 seconds here). Setting waitDuration to 5 seconds to wait for it.
waitDuration: 5 * time.Second,
},
{
cmd: []interface{}{"set", key1, "val1"},
want: [][]string{{key1, "val1"}},
},
{
cmd: []interface{}{"expire", key1, 10},
want: [][]string{{key1, "val1"}},
},
{
cmd: []interface{}{"persist", key1},
want: [][]string{{key1, "val1"}},
},
{
cmd: []interface{}{"expire", key1, -2},
want: [][]string{{key1, ""}},
},
{
cmd: []interface{}{"set", key1, "val1"},
want: [][]string{{key1, "val1"}},
},
{
cmd: []interface{}{"set", key2, "val2"},
want: [][]string{{key2, "val2"}},
},
{
cmd: []interface{}{"rename", key2, key1},
want: [][]string{{key1, "val2"}},
},
{
cmd: []interface{}{"move", key1, 1},
want: [][]string{{key1, ""}},
},
{
cmd: []interface{}{"set", key1, 1},
want: [][]string{{key1, "1"}},
},
{
cmd: []interface{}{"incr", key1},
want: [][]string{{key1, "2"}},
},
{
cmd: []interface{}{"decr", key1},
want: [][]string{{key1, "1"}},
},
{
cmd: []interface{}{"incrby", key1, 2},
want: [][]string{{key1, "3"}},
},
{
cmd: []interface{}{"decrby", key1, 2},
want: [][]string{{key1, "1"}},
},
{
cmd: []interface{}{"incrbyfloat", key1, 0.5},
want: [][]string{{key1, "1.5"}},
},
{
cmd: []interface{}{"del", key1},
want: [][]string{{key1, ""}},
},
}
case redisDBTest:
scenarios = []struct {
cmd []interface{}
want [][]string
waitDuration time.Duration
nowant bool
}{
{
cmd: []interface{}{"set", key1, "val1"},
// For this test, client connection is set to DB 1, while the updater connects to DB 0. So no update should be received in case of set
nowant: true,
},
{
cmd: []interface{}{"move", key1, 1},
// When key1 is moved to DB 1, update should be received to the subscriber.
want: [][]string{{key1, "val1"}},
},
}
}
for _, scenario := range scenarios {
if !scenario.nowant {
for _, keyValue := range scenario.want {
updateEvent := getUpdateEvent(keyValue[0], keyValue[1])
updateEventInJson, err := json.Marshal(updateEvent)
if err != nil {
return err
}
messages.Expect(string(updateEventInJson))
}
}
err := updater.Client.DoWrite(ctx, scenario.cmd...)
if err != nil {
return err
}
// Wait for the update to be received by the subscriber
if scenario.waitDuration == 0 {
time.Sleep(subscribedMessageWaitDuration)
} else {
time.Sleep(scenario.waitDuration)
}
}
return nil
}
func TestRedis(t *testing.T) {
log := logger.NewLogger("dapr.components")
configStore := config_redis.NewRedisConfigurationStore(log)
configurationRegistry := configuration_loader.NewRegistry()
configurationRegistry.Logger = log
configurationRegistry.RegisterComponent(func(l logger.Logger) configuration.Store {
return configStore
}, "redis")
updater := cu_redis.NewRedisConfigUpdater(log).(*cu_redis.ConfigUpdater)
updater.Init(map[string]string{
"redisHost": "localhost:6379",
"redisPassword": "",
"redisDB": "0",
})
ports, err := dapr_testing.GetFreePorts(2)
assert.NoError(t, err)
currentGrpcPort := ports[0]
currentHTTPPort := ports[1]
messageWatcher := watcher.NewUnordered()
checkRedisConnection := func(ctx flow.Context) error {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // host:port of the redis server
Password: "", // no password set
DB: 0, // use default DB
})
defer rdb.Close()
if err := rdb.Ping(ctx).Err(); err != nil {
return err
}
log.Info("Setup for Redis done")
return nil
}
subscribefn := func(keys []string, message *watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
client := sidecar.GetClient(ctx, sidecarName1)
message.Reset()
var errSubscribe error
subscribeID, errSubscribe = client.SubscribeConfigurationItems(ctx, storeName, keys, func(id string, items map[string]*dapr.ConfigurationItem) {
updateEvent := &configuration.UpdateEvent{
Items: castConfigurationItems(items),
}
updateEventInJson, err := json.Marshal(updateEvent)
assert.NoError(t, err)
message.Observe(string(updateEventInJson))
})
return errSubscribe
}
}
testSubscribe := func(messages *watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
messages.Reset()
errRun := runRedisCommands(ctx, updater, messages, allOperationsTest)
if errRun != nil {
return errRun
}
messages.Assert(t, 10*time.Second)
return nil
}
}
testSubscribeBasic := func(messages *watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
messages.Reset()
errRun := runRedisCommands(ctx, updater, messages, basicTest)
if errRun != nil {
return errRun
}
messages.Assert(t, 10*time.Second)
return nil
}
}
testredisDBMetadata := func(messages *watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
messages.Reset()
errRun := runRedisCommands(ctx, updater, messages, redisDBTest)
if errRun != nil {
return errRun
}
messages.Assert(t, 10*time.Second)
return nil
}
}
saveBeforeRestart := func(ctx flow.Context) error {
items := map[string]*configuration.Item{
key1: {
Value: "val1",
},
}
err := updater.AddKey(items)
return err
}
getAfterRestart := func(ctx flow.Context) error {
client := sidecar.GetClient(ctx, sidecarName1)
item, err := client.GetConfigurationItem(ctx, storeName, key1)
if err != nil {
return err
}
assert.Equal(t, item.Value, "val1")
return nil
}
stopSubscriber := func(ctx flow.Context) error {
client := sidecar.GetClient(ctx, sidecarName1)
return client.UnsubscribeConfigurationItems(ctx, storeName, subscribeID)
}
flow.New(t, "redis certification test").
// Run redis server
Step(dockercompose.Run("redis", dockerComposeYAML)).
Step("wait for redis to be ready", retry.Do(time.Second*3, 10, checkRedisConnection)).
// Run dapr sidecar with redis configuration store component
Step(sidecar.Run(sidecarName1,
embedded.WithoutApp(),
embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)),
embedded.WithComponentsPath("components/default"),
embedded.WithConfigurations(configurationRegistry),
)).
//
// Start subscriber subscribing to keys {key1,key2}
Step("start subscriber", subscribefn([]string{key1, key2}, messageWatcher)).
Step("wait for subscriber to be ready", flow.Sleep(5*time.Second)).
//Run redis commands and test updates are received by the subscriber
Step("testSubscribe", testSubscribe(messageWatcher)).
Step("reset", flow.Reset(messageWatcher)).
//
// Simulate network interruptions and verify the connection still remains intact
Step("interrupt network",
network.InterruptNetwork(10*time.Second, nil, nil, "6379:6379")).
Step("testSubscribe", testSubscribeBasic(messageWatcher)).
// Stop the subscriber
Step("stop subscriber", stopSubscriber).
//
// Save a key before restarting redis server
Step("save before restart", saveBeforeRestart).
Step("stop redis server", dockercompose.Stop("redis", dockerComposeYAML, "redis")).
Step("start redis server", dockercompose.Start("redis", dockerComposeYAML, "redis")).
Step("wait for redis to be ready after restart", retry.Do(time.Second*3, 10, checkRedisConnection)).
// Get the key after restarting redis server
Step("get after restart", getAfterRestart).
Run()
flow.New(t, "Test connection to specified redisDB").
// Run redis server
Step(dockercompose.Run("redis", dockerComposeYAML)).
Step("wait for redis to be ready", retry.Do(time.Second*3, 10, checkRedisConnection)).
// Run dapr sidecar with redis configuration store component
Step(sidecar.Run(sidecarName1,
embedded.WithoutApp(),
embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)),
embedded.WithComponentsPath("components/redisDB1"),
embedded.WithConfigurations(configurationRegistry),
)).
Step("start subscriber", subscribefn([]string{key1, key2}, messageWatcher)).
Step("wait for subscriber to be ready", flow.Sleep(5*time.Second)).
Step("verify database connected", testredisDBMetadata(messageWatcher)).
// // Stop the subscriber
Step("stop subscriber", stopSubscriber).
Run()
}