424 lines
13 KiB
Go
424 lines
13 KiB
Go
/*
|
|
Copyright 2021 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package redis_test
|
|
|
|
import (
|
|
"encoding/json"
|
|
"strconv"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/dapr/components-contrib/configuration"
|
|
config_redis "github.com/dapr/components-contrib/configuration/redis"
|
|
"github.com/dapr/components-contrib/tests/certification/embedded"
|
|
"github.com/dapr/components-contrib/tests/certification/flow"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/dockercompose"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/network"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/retry"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
|
|
cu_redis "github.com/dapr/components-contrib/tests/utils/configupdater/redis"
|
|
configuration_loader "github.com/dapr/dapr/pkg/components/configuration"
|
|
dapr_testing "github.com/dapr/dapr/pkg/testing"
|
|
dapr "github.com/dapr/go-sdk/client"
|
|
"github.com/dapr/kit/logger"
|
|
redis "github.com/go-redis/redis/v8"
|
|
"github.com/stretchr/testify/assert"
|
|
|
|
"github.com/dapr/components-contrib/tests/certification/flow/watcher"
|
|
)
|
|
|
|
type testType int
|
|
|
|
const (
|
|
basicTest testType = iota
|
|
allOperationsTest
|
|
redisDBTest
|
|
)
|
|
|
|
const (
|
|
dockerComposeYAML = "docker-compose.yml"
|
|
storeName = "configstore"
|
|
key1 = "key1"
|
|
key2 = "key2"
|
|
subscribedMessageWaitDuration = 1000 * time.Millisecond
|
|
sidecarName1 = "dapr-1"
|
|
)
|
|
|
|
var subscribeID string
|
|
|
|
// Cast go-sdk ConfigurationItem to contrib ConfigurationItem
|
|
func castConfigurationItems(items map[string]*dapr.ConfigurationItem) map[string]*configuration.Item {
|
|
configItems := make(map[string]*configuration.Item)
|
|
for key, item := range items {
|
|
configItems[key] = &configuration.Item{
|
|
Value: item.Value,
|
|
Version: item.Version,
|
|
Metadata: item.Metadata,
|
|
}
|
|
}
|
|
return configItems
|
|
}
|
|
|
|
// Create UpdateEvent struct from given key, val pair
|
|
func getUpdateEvent(key string, val string) configuration.UpdateEvent {
|
|
expectedUpdateEvent := configuration.UpdateEvent{
|
|
Items: map[string]*configuration.Item{
|
|
key: {
|
|
Value: val,
|
|
},
|
|
},
|
|
}
|
|
return expectedUpdateEvent
|
|
}
|
|
|
|
// Run redis commands and and add them in watcher
|
|
func runRedisCommands(ctx flow.Context, updater *cu_redis.ConfigUpdater, messages *watcher.Watcher, test testType) error {
|
|
var scenarios []struct {
|
|
cmd []interface{}
|
|
want [][]string
|
|
waitDuration time.Duration
|
|
nowant bool
|
|
}
|
|
switch test {
|
|
case basicTest:
|
|
scenarios = []struct {
|
|
cmd []interface{}
|
|
want [][]string
|
|
waitDuration time.Duration
|
|
nowant bool
|
|
}{
|
|
{
|
|
cmd: []interface{}{"set", key1, "val1"},
|
|
want: [][]string{{key1, "val1"}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"mset", key1, "val1", key2, "val2"},
|
|
want: [][]string{{key1, "val1"}, {key2, "val2"}},
|
|
},
|
|
}
|
|
case allOperationsTest:
|
|
scenarios = []struct {
|
|
cmd []interface{}
|
|
want [][]string
|
|
waitDuration time.Duration
|
|
nowant bool
|
|
}{
|
|
{
|
|
cmd: []interface{}{"set", key1, "val1"},
|
|
want: [][]string{{key1, "val1"}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"mset", key1, "val1", key2, "val2"},
|
|
want: [][]string{{key1, "val1"}, {key2, "val2"}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"copy", key1, key2, "REPLACE"},
|
|
want: [][]string{{key2, "val1"}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"append", key1, "-append"},
|
|
want: [][]string{{key1, "val1-append"}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"setrange", key1, 4, "-offset"},
|
|
want: [][]string{{key1, "val1-offset"}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"expire", key1, 3},
|
|
want: [][]string{{key1, "val1-offset"}, {key1, ""}},
|
|
// This wait duration is required because `expire` command would generate an `expired` event for the key
|
|
// after the expiration time (3 seconds here). Setting waitDuration to 5 seconds to wait for it.
|
|
waitDuration: 5 * time.Second,
|
|
},
|
|
{
|
|
cmd: []interface{}{"set", key1, "val1"},
|
|
want: [][]string{{key1, "val1"}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"expire", key1, 10},
|
|
want: [][]string{{key1, "val1"}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"persist", key1},
|
|
want: [][]string{{key1, "val1"}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"expire", key1, -2},
|
|
want: [][]string{{key1, ""}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"set", key1, "val1"},
|
|
want: [][]string{{key1, "val1"}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"set", key2, "val2"},
|
|
want: [][]string{{key2, "val2"}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"rename", key2, key1},
|
|
want: [][]string{{key1, "val2"}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"move", key1, 1},
|
|
want: [][]string{{key1, ""}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"set", key1, 1},
|
|
want: [][]string{{key1, "1"}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"incr", key1},
|
|
want: [][]string{{key1, "2"}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"decr", key1},
|
|
want: [][]string{{key1, "1"}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"incrby", key1, 2},
|
|
want: [][]string{{key1, "3"}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"decrby", key1, 2},
|
|
want: [][]string{{key1, "1"}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"incrbyfloat", key1, 0.5},
|
|
want: [][]string{{key1, "1.5"}},
|
|
},
|
|
{
|
|
cmd: []interface{}{"del", key1},
|
|
want: [][]string{{key1, ""}},
|
|
},
|
|
}
|
|
case redisDBTest:
|
|
scenarios = []struct {
|
|
cmd []interface{}
|
|
want [][]string
|
|
waitDuration time.Duration
|
|
nowant bool
|
|
}{
|
|
{
|
|
cmd: []interface{}{"set", key1, "val1"},
|
|
// For this test, client connection is set to DB 1, while the updater connects to DB 0. So no update should be received in case of set
|
|
nowant: true,
|
|
},
|
|
{
|
|
cmd: []interface{}{"move", key1, 1},
|
|
// When key1 is moved to DB 1, update should be received to the subscriber.
|
|
want: [][]string{{key1, "val1"}},
|
|
},
|
|
}
|
|
}
|
|
|
|
for _, scenario := range scenarios {
|
|
if !scenario.nowant {
|
|
for _, keyValue := range scenario.want {
|
|
updateEvent := getUpdateEvent(keyValue[0], keyValue[1])
|
|
updateEventInJson, err := json.Marshal(updateEvent)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
messages.Expect(string(updateEventInJson))
|
|
}
|
|
}
|
|
err := updater.Client.DoWrite(ctx, scenario.cmd...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Wait for the update to be received by the subscriber
|
|
if scenario.waitDuration == 0 {
|
|
time.Sleep(subscribedMessageWaitDuration)
|
|
} else {
|
|
time.Sleep(scenario.waitDuration)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func TestRedis(t *testing.T) {
|
|
log := logger.NewLogger("dapr.components")
|
|
|
|
configStore := config_redis.NewRedisConfigurationStore(log)
|
|
configurationRegistry := configuration_loader.NewRegistry()
|
|
configurationRegistry.Logger = log
|
|
configurationRegistry.RegisterComponent(func(l logger.Logger) configuration.Store {
|
|
return configStore
|
|
}, "redis")
|
|
|
|
updater := cu_redis.NewRedisConfigUpdater(log).(*cu_redis.ConfigUpdater)
|
|
updater.Init(map[string]string{
|
|
"redisHost": "localhost:6379",
|
|
"redisPassword": "",
|
|
"redisDB": "0",
|
|
})
|
|
|
|
ports, err := dapr_testing.GetFreePorts(2)
|
|
assert.NoError(t, err)
|
|
currentGrpcPort := ports[0]
|
|
currentHTTPPort := ports[1]
|
|
|
|
messageWatcher := watcher.NewUnordered()
|
|
|
|
checkRedisConnection := func(ctx flow.Context) error {
|
|
rdb := redis.NewClient(&redis.Options{
|
|
Addr: "localhost:6379", // host:port of the redis server
|
|
Password: "", // no password set
|
|
DB: 0, // use default DB
|
|
})
|
|
defer rdb.Close()
|
|
if err := rdb.Ping(ctx).Err(); err != nil {
|
|
return err
|
|
}
|
|
log.Info("Setup for Redis done")
|
|
return nil
|
|
}
|
|
|
|
subscribefn := func(keys []string, message *watcher.Watcher) flow.Runnable {
|
|
return func(ctx flow.Context) error {
|
|
client := sidecar.GetClient(ctx, sidecarName1)
|
|
message.Reset()
|
|
var errSubscribe error
|
|
subscribeID, errSubscribe = client.SubscribeConfigurationItems(ctx, storeName, keys, func(id string, items map[string]*dapr.ConfigurationItem) {
|
|
updateEvent := &configuration.UpdateEvent{
|
|
Items: castConfigurationItems(items),
|
|
}
|
|
updateEventInJson, err := json.Marshal(updateEvent)
|
|
assert.NoError(t, err)
|
|
message.Observe(string(updateEventInJson))
|
|
})
|
|
return errSubscribe
|
|
}
|
|
}
|
|
|
|
testSubscribe := func(messages *watcher.Watcher) flow.Runnable {
|
|
return func(ctx flow.Context) error {
|
|
messages.Reset()
|
|
errRun := runRedisCommands(ctx, updater, messages, allOperationsTest)
|
|
if errRun != nil {
|
|
return errRun
|
|
}
|
|
messages.Assert(t, 10*time.Second)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
testSubscribeBasic := func(messages *watcher.Watcher) flow.Runnable {
|
|
return func(ctx flow.Context) error {
|
|
messages.Reset()
|
|
errRun := runRedisCommands(ctx, updater, messages, basicTest)
|
|
if errRun != nil {
|
|
return errRun
|
|
}
|
|
messages.Assert(t, 10*time.Second)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
testredisDBMetadata := func(messages *watcher.Watcher) flow.Runnable {
|
|
return func(ctx flow.Context) error {
|
|
messages.Reset()
|
|
errRun := runRedisCommands(ctx, updater, messages, redisDBTest)
|
|
if errRun != nil {
|
|
return errRun
|
|
}
|
|
messages.Assert(t, 10*time.Second)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
saveBeforeRestart := func(ctx flow.Context) error {
|
|
items := map[string]*configuration.Item{
|
|
key1: {
|
|
Value: "val1",
|
|
},
|
|
}
|
|
err := updater.AddKey(items)
|
|
return err
|
|
}
|
|
|
|
getAfterRestart := func(ctx flow.Context) error {
|
|
client := sidecar.GetClient(ctx, sidecarName1)
|
|
|
|
item, err := client.GetConfigurationItem(ctx, storeName, key1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
assert.Equal(t, item.Value, "val1")
|
|
return nil
|
|
}
|
|
|
|
stopSubscriber := func(ctx flow.Context) error {
|
|
client := sidecar.GetClient(ctx, sidecarName1)
|
|
return client.UnsubscribeConfigurationItems(ctx, storeName, subscribeID)
|
|
}
|
|
|
|
flow.New(t, "redis certification test").
|
|
// Run redis server
|
|
Step(dockercompose.Run("redis", dockerComposeYAML)).
|
|
Step("wait for redis to be ready", retry.Do(time.Second*3, 10, checkRedisConnection)).
|
|
// Run dapr sidecar with redis configuration store component
|
|
Step(sidecar.Run(sidecarName1,
|
|
embedded.WithoutApp(),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)),
|
|
embedded.WithComponentsPath("components/default"),
|
|
embedded.WithConfigurations(configurationRegistry),
|
|
)).
|
|
//
|
|
// Start subscriber subscribing to keys {key1,key2}
|
|
Step("start subscriber", subscribefn([]string{key1, key2}, messageWatcher)).
|
|
Step("wait for subscriber to be ready", flow.Sleep(5*time.Second)).
|
|
//Run redis commands and test updates are received by the subscriber
|
|
Step("testSubscribe", testSubscribe(messageWatcher)).
|
|
Step("reset", flow.Reset(messageWatcher)).
|
|
//
|
|
// Simulate network interruptions and verify the connection still remains intact
|
|
Step("interrupt network",
|
|
network.InterruptNetwork(10*time.Second, nil, nil, "6379:6379")).
|
|
Step("testSubscribe", testSubscribeBasic(messageWatcher)).
|
|
// Stop the subscriber
|
|
Step("stop subscriber", stopSubscriber).
|
|
//
|
|
// Save a key before restarting redis server
|
|
Step("save before restart", saveBeforeRestart).
|
|
Step("stop redis server", dockercompose.Stop("redis", dockerComposeYAML, "redis")).
|
|
Step("start redis server", dockercompose.Start("redis", dockerComposeYAML, "redis")).
|
|
Step("wait for redis to be ready after restart", retry.Do(time.Second*3, 10, checkRedisConnection)).
|
|
// Get the key after restarting redis server
|
|
Step("get after restart", getAfterRestart).
|
|
Run()
|
|
|
|
flow.New(t, "Test connection to specified redisDB").
|
|
// Run redis server
|
|
Step(dockercompose.Run("redis", dockerComposeYAML)).
|
|
Step("wait for redis to be ready", retry.Do(time.Second*3, 10, checkRedisConnection)).
|
|
// Run dapr sidecar with redis configuration store component
|
|
Step(sidecar.Run(sidecarName1,
|
|
embedded.WithoutApp(),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)),
|
|
embedded.WithComponentsPath("components/redisDB1"),
|
|
embedded.WithConfigurations(configurationRegistry),
|
|
)).
|
|
Step("start subscriber", subscribefn([]string{key1, key2}, messageWatcher)).
|
|
Step("wait for subscriber to be ready", flow.Sleep(5*time.Second)).
|
|
Step("verify database connected", testredisDBMetadata(messageWatcher)).
|
|
// // Stop the subscriber
|
|
Step("stop subscriber", stopSubscriber).
|
|
Run()
|
|
|
|
}
|