354 lines
13 KiB
Go
354 lines
13 KiB
Go
/*
|
|
Copyright 2021 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package memcached_test
|
|
|
|
import (
|
|
"fmt"
|
|
"strconv"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/dapr/components-contrib/state"
|
|
"github.com/dapr/go-sdk/client"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
state_memcached "github.com/dapr/components-contrib/state/memcached"
|
|
"github.com/dapr/components-contrib/tests/certification/embedded"
|
|
"github.com/dapr/components-contrib/tests/certification/flow"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/dockercompose"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/network"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
|
|
state_loader "github.com/dapr/dapr/pkg/components/state"
|
|
dapr_testing "github.com/dapr/dapr/pkg/testing"
|
|
"github.com/dapr/kit/logger"
|
|
)
|
|
|
|
const (
|
|
sidecarNamePrefix = "memcached-sidecar-"
|
|
dockerComposeClusterYAML = "docker-compose.yml"
|
|
stateStoreName = "statestore"
|
|
certificationTestPrefix = "stable-certification-"
|
|
testKey1 = certificationTestPrefix + "key1"
|
|
testKey2 = certificationTestPrefix + "key2"
|
|
testKey1Value = "memcachedCert"
|
|
testKey2Value = "memcachedCert2"
|
|
testUpdateValue = "memcachedCertUpdate"
|
|
testNonexistentKey = "ThisKeyDoesNotExistInTheStateStore"
|
|
servicePortToInterrupt = "11211"
|
|
)
|
|
|
|
func TestMemcached(t *testing.T) {
|
|
log := logger.NewLogger("dapr.components")
|
|
stateStore := state_memcached.NewMemCacheStateStore(log)
|
|
|
|
ports, err := dapr_testing.GetFreePorts(2)
|
|
require.NoError(t, err)
|
|
|
|
// var rdb redis.Client
|
|
currentGrpcPort := ports[0]
|
|
currentHTTPPort := ports[1]
|
|
|
|
// Basic CRUD tests
|
|
basicTest := func(ctx flow.Context) error {
|
|
client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer client.Close()
|
|
|
|
err = client.SaveState(ctx, stateStoreName, testKey1, []byte(testKey1Value), nil)
|
|
require.NoError(t, err)
|
|
|
|
err = client.SaveState(ctx, stateStoreName, testKey2, []byte(testKey2Value), nil)
|
|
require.NoError(t, err)
|
|
|
|
// get state
|
|
item, err := client.GetState(ctx, stateStoreName, testKey1, nil)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, testKey1Value, string(item.Value))
|
|
|
|
errUpdate := client.SaveState(ctx, stateStoreName, testKey1, []byte(testUpdateValue), nil)
|
|
require.NoError(t, errUpdate)
|
|
item, errUpdatedGet := client.GetState(ctx, stateStoreName, testKey1, nil)
|
|
require.NoError(t, errUpdatedGet)
|
|
assert.Equal(t, testUpdateValue, string(item.Value))
|
|
|
|
// delete state
|
|
err = client.DeleteState(ctx, stateStoreName, testKey1, nil)
|
|
require.NoError(t, err)
|
|
item, err = client.GetState(ctx, stateStoreName, testKey1, nil)
|
|
require.NoError(t, err)
|
|
assert.Nil(t, nil, item)
|
|
|
|
// nonexistent key
|
|
item, err = client.GetState(ctx, stateStoreName, testNonexistentKey, nil)
|
|
require.NoError(t, err)
|
|
assert.Nil(t, nil, item)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Time-To-Live Tests
|
|
timeToLiveTestWithInvalidTTLValue := func(ctx flow.Context) error {
|
|
client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer client.Close()
|
|
|
|
// Invalid TTL value
|
|
|
|
key := certificationTestPrefix + "_InvalidTTLValueKey"
|
|
value := "with an invalid TTL this key should not be persisted."
|
|
|
|
// TTL has to be a number
|
|
ttlInSecondsNotNumeric := "mock value"
|
|
mapOptionsNotNumeric := map[string]string{
|
|
"ttlInSeconds": ttlInSecondsNotNumeric,
|
|
}
|
|
errNotNumeric := client.SaveState(ctx, stateStoreName, key, []byte(value), mapOptionsNotNumeric)
|
|
assert.Error(t, errNotNumeric)
|
|
|
|
return nil
|
|
}
|
|
|
|
timeToLiveTestWithNonExpiringTTL := func(ctx flow.Context) error {
|
|
client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer client.Close()
|
|
|
|
key := certificationTestPrefix + "_timeToLiveTestWithNonExpiringTTLKey"
|
|
value := "This value does not expire and should be retrieved just fine"
|
|
|
|
// Notice: we are actively setting a TTL value here: an non-expiring one.
|
|
// This is different than the basic tests where no TTL is assigned.
|
|
//
|
|
// Notice that Memcached uses "0" as the non-expiring marker TTL.
|
|
// https://github.com/memcached/memcached/wiki/Commands#set
|
|
// OTOH Dapr uses -1 for that.
|
|
// https://docs.dapr.io/developing-applications/building-blocks/state-management/state-store-ttl/
|
|
// So we are using -1 here and expect the state store to translate this accordingly.
|
|
ttlInSecondsNonExpiring := -1
|
|
mapOptionsNonExpiring := map[string]string{
|
|
"ttlInSeconds": strconv.Itoa(ttlInSecondsNonExpiring),
|
|
}
|
|
|
|
// We can successfully save...
|
|
errSave := client.SaveState(ctx, stateStoreName, key, []byte(value), mapOptionsNonExpiring)
|
|
require.NoError(t, errSave)
|
|
// and retrieve this key.
|
|
item, errGet := client.GetState(ctx, stateStoreName, key, nil)
|
|
require.NoError(t, errGet)
|
|
assert.Equal(t, value, string(item.Value))
|
|
|
|
return nil
|
|
}
|
|
|
|
timeToLiveWithAOneSecondTTL := func(ctx flow.Context) error {
|
|
client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer client.Close()
|
|
|
|
key := certificationTestPrefix + "_expiresInOneSecondKey"
|
|
value := "This key will self-destroy in 1 second"
|
|
|
|
ttlExpirationTime := 3 * time.Second
|
|
ttlInSeconds := int(ttlExpirationTime.Seconds())
|
|
mapOptionsExpiringKey := map[string]string{
|
|
"ttlInSeconds": strconv.Itoa(ttlInSeconds),
|
|
}
|
|
|
|
errSave := client.SaveState(ctx, stateStoreName, key, []byte(value), mapOptionsExpiringKey)
|
|
require.NoError(t, errSave)
|
|
|
|
// get state
|
|
item, errGetBeforeTTLExpiration := client.GetState(ctx, stateStoreName, key, nil)
|
|
require.NoError(t, errGetBeforeTTLExpiration)
|
|
assert.Equal(t, value, string(item.Value))
|
|
// Let the key expire
|
|
time.Sleep(2 * ttlExpirationTime) // It should be safe to check in double TTL
|
|
itemAfterTTL, errGetAfterTTL := client.GetState(ctx, stateStoreName, key, nil)
|
|
require.NoError(t, errGetAfterTTL)
|
|
assert.Nil(t, nil, itemAfterTTL)
|
|
|
|
return nil
|
|
}
|
|
|
|
timeToLiveWithATwoMonthsTTL := func(ctx flow.Context) error {
|
|
client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer client.Close()
|
|
|
|
key := certificationTestPrefix + "_expiresInTwoMonthsKey"
|
|
value := "This key will self-destroy in 2 months"
|
|
|
|
ttlExpirationTime := 2 * 30 * 24 * time.Hour
|
|
ttlInSeconds := int(ttlExpirationTime.Seconds())
|
|
mapOptionsExpiringKey := map[string]string{
|
|
"ttlInSeconds": strconv.Itoa(ttlInSeconds),
|
|
}
|
|
|
|
errSave := client.SaveState(ctx, stateStoreName, key, []byte(value), mapOptionsExpiringKey)
|
|
require.NoError(t, errSave)
|
|
|
|
// get state
|
|
item, errGetBeforeTTLExpiration := client.GetState(ctx, stateStoreName, key, nil)
|
|
require.NoError(t, errGetBeforeTTLExpiration)
|
|
assert.Equal(t, value, string(item.Value))
|
|
|
|
return nil
|
|
}
|
|
|
|
flow.New(t, "Connecting Memcached And Test for CRUD operations").
|
|
Step(dockercompose.Run("memcached", dockerComposeClusterYAML)).
|
|
Step("Waiting for component to start...", flow.Sleep(5*time.Second)).
|
|
Step(sidecar.Run(sidecarNamePrefix+"dockerClusterDefault",
|
|
append(componentRuntimeOptions(stateStore, log, "memcached"),
|
|
embedded.WithoutApp(),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)),
|
|
embedded.WithComponentsPath("components/docker/default"),
|
|
)...,
|
|
)).
|
|
Step("Waiting for component to load...", flow.Sleep(5*time.Second)).
|
|
Step("Run basic test", basicTest).
|
|
Step("Stop Memcached server", dockercompose.Stop("memcached", dockerComposeClusterYAML)).
|
|
Run()
|
|
|
|
flow.New(t, "Connecting Memcached And verifying TTL tests").
|
|
Step(dockercompose.Run("memcached", dockerComposeClusterYAML)).
|
|
Step("Waiting for component to start...", flow.Sleep(5*time.Second)).
|
|
Step(sidecar.Run(sidecarNamePrefix+"dockerClusterDefault",
|
|
append(componentRuntimeOptions(stateStore, log, "memcached"),
|
|
embedded.WithoutApp(),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)),
|
|
embedded.WithComponentsPath("components/docker/default"),
|
|
)...,
|
|
)).
|
|
Step("Waiting for component to load...", flow.Sleep(5*time.Second)).
|
|
Step("Run basic test", basicTest).
|
|
Step("Run TTL related test: TTL not a valid number.", timeToLiveTestWithInvalidTTLValue).
|
|
Step("Run TTL related test: TTL not expiring.", timeToLiveTestWithNonExpiringTTL).
|
|
Step("Run TTL related test: TTL of 1 second.", timeToLiveWithAOneSecondTTL).
|
|
Step("Run TTL related test: TTL of 2 months.", timeToLiveWithATwoMonthsTTL).
|
|
Step("Stop Memcached server", dockercompose.Stop("memcached", dockerComposeClusterYAML)).
|
|
Run()
|
|
}
|
|
|
|
func TestMemcachedNetworkInstability(t *testing.T) {
|
|
log := logger.NewLogger("dapr.components")
|
|
stateStore := state_memcached.NewMemCacheStateStore(log)
|
|
|
|
ports, err := dapr_testing.GetFreePorts(2)
|
|
require.NoError(t, err)
|
|
|
|
// var rdb redis.Client
|
|
currentGrpcPort := ports[0]
|
|
currentHTTPPort := ports[1]
|
|
|
|
const (
|
|
targetKey = certificationTestPrefix + "_TestMemcachedNetworkInstabilityKey"
|
|
targetValue = "This key should still be there after the network returns"
|
|
componentsPathFor20sTimeout = "components/docker/20secondsTimeout"
|
|
memcachedTimeout = 20 * time.Second
|
|
keyTTL = memcachedTimeout * 4
|
|
networkInstabilityTime = memcachedTimeout * 2
|
|
waitAfterInstabilityTime = networkInstabilityTime / 2
|
|
)
|
|
|
|
assertKey := func(key string, value string) flow.Runnable {
|
|
return func(ctx flow.Context) error {
|
|
client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer client.Close()
|
|
|
|
item, err := client.GetState(ctx, stateStoreName, key, nil)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, value, string(item.Value))
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
setKeyWithTTL := func(ttlExpirationTime time.Duration, key string, value string) flow.Runnable {
|
|
return func(ctx flow.Context) error {
|
|
client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer client.Close()
|
|
|
|
ttlInSeconds := int(ttlExpirationTime.Seconds())
|
|
mapOptionsExpiringKey := map[string]string{
|
|
"ttlInSeconds": strconv.Itoa(ttlInSeconds),
|
|
}
|
|
|
|
errSave := client.SaveState(ctx, stateStoreName, key, []byte(value), mapOptionsExpiringKey)
|
|
require.NoError(t, errSave)
|
|
// assert the key is there
|
|
item, errGetBeforeTTLExpiration := client.GetState(ctx, stateStoreName, key, nil)
|
|
require.NoError(t, errGetBeforeTTLExpiration)
|
|
assert.Equal(t, value, string(item.Value))
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
flow.New(t, "Connecting Memcached And Handling network instability").
|
|
Step(dockercompose.Run("memcached", dockerComposeClusterYAML)).
|
|
Step("Waiting for component to start...", flow.Sleep(5*time.Second)).
|
|
Step(sidecar.Run(sidecarNamePrefix+"dockerClusterDefault",
|
|
append(componentRuntimeOptions(stateStore, log, "memcached"),
|
|
embedded.WithoutApp(),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)),
|
|
embedded.WithComponentsPath(componentsPathFor20sTimeout),
|
|
)...,
|
|
)).
|
|
Step("Waiting for component to load...", flow.Sleep(5*time.Second)).
|
|
Step("Setup a key with a TTL of 4x memcached timeout ", setKeyWithTTL(keyTTL, targetKey, targetValue)).
|
|
Step("Wait 1s", flow.Sleep(1*time.Second)).
|
|
// Heads up, future developer friend: this will fail if running from WSL. :(
|
|
Step("Interrupt network for 2x memcached timeout",
|
|
network.InterruptNetwork(networkInstabilityTime, nil, nil, servicePortToInterrupt)).
|
|
// Component should recover at this point.
|
|
Step("Wait for component to recover", flow.Sleep(waitAfterInstabilityTime)).
|
|
Step("Run basic test again to verify reconnection occurred", assertKey(targetKey, targetValue)).
|
|
Step("Stop Memcached server", dockercompose.Stop("memcached", dockerComposeClusterYAML)).
|
|
Run()
|
|
}
|
|
|
|
func componentRuntimeOptions(stateStore state.Store, log logger.Logger, stateStoreName string) []embedded.Option {
|
|
stateRegistry := state_loader.NewRegistry()
|
|
stateRegistry.Logger = log
|
|
componentFactory := func(l logger.Logger) state.Store { return stateStore }
|
|
|
|
stateRegistry.RegisterComponent(componentFactory, stateStoreName)
|
|
|
|
return []embedded.Option{
|
|
embedded.WithStates(stateRegistry),
|
|
}
|
|
}
|