components-contrib/tests/certification/state/cassandra/cassandra_test.go

322 lines
12 KiB
Go

/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cassandra_test
import (
"fmt"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dapr/components-contrib/state"
state_cassandra "github.com/dapr/components-contrib/state/cassandra"
"github.com/dapr/components-contrib/tests/certification/embedded"
"github.com/dapr/components-contrib/tests/certification/flow"
"github.com/dapr/components-contrib/tests/certification/flow/dockercompose"
"github.com/dapr/components-contrib/tests/certification/flow/network"
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
state_loader "github.com/dapr/dapr/pkg/components/state"
"github.com/dapr/dapr/pkg/runtime"
dapr_testing "github.com/dapr/dapr/pkg/testing"
goclient "github.com/dapr/go-sdk/client"
"github.com/dapr/kit/logger"
)
const (
sidecarNamePrefix = "cassandra-sidecar-"
dockerComposeYAMLCLUSTER = "docker-compose-cluster.yml"
dockerComposeYAML = "docker-compose-single.yml"
stateStoreName = "statestore"
stateStoreCluster = "statestorecluster"
stateStoreClusterFail = "statestoreclusterfail"
stateStoreVersionFail = "statestoreversionfail"
stateStoreFactorFail = "statestorefactorfail"
certificationTestPrefix = "stable-certification-"
stateStoreNoConfigError = "error saving state: rpc error: code = FailedPrecondition desc = state store is not configured"
)
func TestCassandra(t *testing.T) {
log := logger.NewLogger("dapr.components")
stateStore := state_cassandra.NewCassandraStateStore(log).(*state_cassandra.Cassandra)
ports, err := dapr_testing.GetFreePorts(2)
assert.NoError(t, err)
stateRegistry := state_loader.NewRegistry()
stateRegistry.Logger = log
stateRegistry.RegisterComponent(func(l logger.Logger) state.Store {
return stateStore
}, "cassandra")
currentGrpcPort := ports[0]
currentHTTPPort := ports[1]
basicTest := func(ctx flow.Context) error {
client, err := goclient.NewClientWithPort(fmt.Sprint(currentGrpcPort))
if err != nil {
panic(err)
}
defer client.Close()
err = client.SaveState(ctx, stateStoreName, certificationTestPrefix+"key1", []byte("cassandraCert"), nil)
assert.NoError(t, err)
// get state
item, err := client.GetState(ctx, stateStoreName, certificationTestPrefix+"key1", nil)
assert.NoError(t, err)
assert.Equal(t, "cassandraCert", string(item.Value))
assert.NotContains(t, item.Metadata, "ttlExpireTime")
errUpdate := client.SaveState(ctx, stateStoreName, certificationTestPrefix+"key1", []byte("cassandraCertUpdate"), nil)
assert.NoError(t, errUpdate)
item, errUpdatedGet := client.GetState(ctx, stateStoreName, certificationTestPrefix+"key1", nil)
assert.NoError(t, errUpdatedGet)
assert.Equal(t, "cassandraCertUpdate", string(item.Value))
assert.NotContains(t, item.Metadata, "ttlExpireTime")
// delete state
err = client.DeleteState(ctx, stateStoreName, certificationTestPrefix+"key1", nil)
assert.NoError(t, err)
return nil
}
// Time-To-Live Test
timeToLiveTest := func(ctx flow.Context) error {
client, err := goclient.NewClientWithPort(fmt.Sprint(currentGrpcPort))
if err != nil {
panic(err)
}
defer client.Close()
ttlInSecondsWrongValue := "mock value"
mapOptionsWrongValue :=
map[string]string{
"ttlInSeconds": ttlInSecondsWrongValue,
}
ttlInSecondsNonExpiring := 0
mapOptionsNonExpiring :=
map[string]string{
"ttlInSeconds": strconv.Itoa(ttlInSecondsNonExpiring),
}
ttlInSeconds := 5
mapOptions :=
map[string]string{
"ttlInSeconds": strconv.Itoa(ttlInSeconds),
}
err1 := client.SaveState(ctx, stateStoreName, certificationTestPrefix+"ttl1", []byte("cassandraCert"), mapOptionsWrongValue)
assert.Error(t, err1)
err2 := client.SaveState(ctx, stateStoreName, certificationTestPrefix+"ttl2", []byte("cassandraCert2"), mapOptionsNonExpiring)
assert.NoError(t, err2)
err3 := client.SaveState(ctx, stateStoreName, certificationTestPrefix+"ttl3", []byte("cassandraCert3"), mapOptions)
assert.NoError(t, err3)
// get state
item, err := client.GetState(ctx, stateStoreName, certificationTestPrefix+"ttl3", nil)
assert.NoError(t, err)
assert.Equal(t, "cassandraCert3", string(item.Value))
require.Contains(t, item.Metadata, "ttlExpireTime")
expireTime, err := time.Parse(time.RFC3339, item.Metadata["ttlExpireTime"])
require.NoError(t, err)
assert.InDelta(t, time.Now().Add(time.Second*5).Unix(), expireTime.Unix(), 3)
time.Sleep(5 * time.Second)
//entry should be expired now
itemAgain, errAgain := client.GetState(ctx, stateStoreName, certificationTestPrefix+"ttl3", nil)
assert.NoError(t, errAgain)
assert.Nil(t, nil, itemAgain)
return nil
}
testGetAfterCassandraRestart := func(ctx flow.Context) error {
client, err := goclient.NewClientWithPort(fmt.Sprint(currentGrpcPort))
if err != nil {
panic(err)
}
defer client.Close()
// get state
item, err := client.GetState(ctx, stateStoreName, certificationTestPrefix+"ttl2", nil)
assert.NoError(t, err)
assert.Equal(t, "cassandraCert2", string(item.Value))
return nil
}
failTest := func(ctx flow.Context) error {
client, err := goclient.NewClientWithPort(fmt.Sprint(currentGrpcPort + 2))
if err != nil {
panic(err)
}
defer client.Close()
//should fail due to lack of replicas
err = client.SaveState(ctx, stateStoreFactorFail, certificationTestPrefix+"key1", []byte("cassandraCert"), nil)
assert.Error(t, err)
return nil
}
failVerTest := func(ctx flow.Context) error {
client, err := goclient.NewClientWithPort(fmt.Sprint(currentGrpcPort + 4))
if err != nil {
panic(err)
}
defer client.Close()
// should fail due to unsupported version
err = client.SaveState(ctx, stateStoreVersionFail, certificationTestPrefix+"key1", []byte("cassandraCert"), nil)
assert.Error(t, err)
return nil
}
flow.New(t, "Connecting cassandra And Ports and Verifying TTL and network tests and table creation").
Step(dockercompose.Run("cassandra", dockerComposeYAML)).
Step("wait", flow.Sleep(80*time.Second)).
Step(sidecar.Run(sidecarNamePrefix+"dockerDefault",
embedded.WithoutApp(),
embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)),
embedded.WithComponentsPath("components/docker/default"),
embedded.WithStates(stateRegistry),
)).
Step("wait", flow.Sleep(30*time.Second)).
Step("Run TTL related test", timeToLiveTest).
Step("interrupt network",
network.InterruptNetwork(10*time.Second, nil, nil, "9044:9042")).
//Component should recover at this point.
Step("wait", flow.Sleep(30*time.Second)).
Step("Run basic test again to verify reconnection occurred", basicTest).
Step("stop cassandra server", dockercompose.Stop("cassandra", dockerComposeYAML, "cassandra")).
Step("start cassandra server", dockercompose.Start("cassandra", dockerComposeYAML, "cassandra")).
Step("wait", flow.Sleep(60*time.Second)).
Step("Get Values Saved Earlier And Not Expired, after Cassandra restart", testGetAfterCassandraRestart).
Step("Run basic test", basicTest).
Step(sidecar.Run(sidecarNamePrefix+"dockerDefault2",
embedded.WithoutApp(),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+2)),
embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort+2)),
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort+2)),
embedded.WithComponentsPath("components/docker/defaultfactorfail"),
embedded.WithStates(stateRegistry),
)).
Step("wait", flow.Sleep(30*time.Second)).
Step("Run replication factor fail test", failTest).
Step(sidecar.Run(sidecarNamePrefix+"dockerDefault3",
embedded.WithoutApp(),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+4)),
embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort+4)),
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort+4)),
embedded.WithComponentsPath("components/docker/defaultverisonfail"),
embedded.WithStates(stateRegistry),
)).
Step("wait", flow.Sleep(30*time.Second)).
Step("Run replication factor fail test", failVerTest).
Run()
}
func TestCluster(t *testing.T) {
log := logger.NewLogger("dapr.components")
stateStore := state_cassandra.NewCassandraStateStore(log).(*state_cassandra.Cassandra)
ports, err := dapr_testing.GetFreePorts(2)
assert.NoError(t, err)
currentGrpcPort := ports[0]
currentHTTPPort := ports[1]
stateRegistry := state_loader.NewRegistry()
stateRegistry.Logger = log
stateRegistry.RegisterComponent(func(l logger.Logger) state.Store {
return stateStore
}, "cassandra")
basicTest := func(ctx flow.Context) error {
client, err := goclient.NewClientWithPort(fmt.Sprint(currentGrpcPort))
if err != nil {
panic(err)
}
defer client.Close()
err = client.SaveState(ctx, stateStoreCluster, certificationTestPrefix+"key1", []byte("cassandraCert"), nil)
assert.NoError(t, err)
// get state
item, err := client.GetState(ctx, stateStoreCluster, certificationTestPrefix+"key1", nil)
assert.NoError(t, err)
assert.Equal(t, "cassandraCert", string(item.Value))
errUpdate := client.SaveState(ctx, stateStoreCluster, certificationTestPrefix+"key1", []byte("cassandraCertUpdate"), nil)
assert.NoError(t, errUpdate)
item, errUpdatedGet := client.GetState(ctx, stateStoreCluster, certificationTestPrefix+"key1", nil)
assert.NoError(t, errUpdatedGet)
assert.Equal(t, "cassandraCertUpdate", string(item.Value))
// delete state
err = client.DeleteState(ctx, stateStoreCluster, certificationTestPrefix+"key1", nil)
assert.NoError(t, err)
return nil
}
failTest := func(ctx flow.Context) error {
client, err := goclient.NewClientWithPort(fmt.Sprint(currentGrpcPort + 2))
if err != nil {
panic(err)
}
defer client.Close()
err = client.SaveState(ctx, stateStoreClusterFail, certificationTestPrefix+"key1", []byte("cassandraCert"), nil)
assert.NoError(t, err)
// get state
_, err = client.GetStateWithConsistency(ctx, stateStoreClusterFail, certificationTestPrefix+"key1", nil, goclient.StateConsistencyUndefined)
assert.Error(t, err)
return nil
}
flow.New(t, "Connecting cassandra And Verifying port/tables/keyspaces/consistency").
Step(dockercompose.Run("cassandra", dockerComposeYAMLCLUSTER)).
Step("wait", flow.Sleep(80*time.Second)).
Step(sidecar.Run(sidecarNamePrefix+"dockerDefault",
embedded.WithoutApp(),
embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)),
embedded.WithComponentsPath("components/docker/cluster"),
embedded.WithStates(stateRegistry),
)).
Step("wait", flow.Sleep(30*time.Second)).
Step("Run basic test", basicTest).
Step(sidecar.Run(sidecarNamePrefix+"dockerDefault2",
embedded.WithoutApp(),
embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort+2)),
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort+2)),
embedded.WithComponentsPath("components/docker/cluster-fail"),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+2)),
embedded.WithStates(stateRegistry),
)).
Step("wait", flow.Sleep(30*time.Second)).
Step("Run consistency fail test", failTest).
Run()
}