845 lines
28 KiB
Go
845 lines
28 KiB
Go
/*
|
|
Copyright 2023 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"database/sql"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
// Blank import for the underlying SQLite Driver.
|
|
_ "modernc.org/sqlite"
|
|
|
|
"github.com/dapr/components-contrib/metadata"
|
|
"github.com/dapr/components-contrib/tests/certification/embedded"
|
|
"github.com/dapr/components-contrib/tests/certification/flow"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
|
|
|
|
"github.com/dapr/components-contrib/state"
|
|
state_sqlite "github.com/dapr/components-contrib/state/sqlite"
|
|
state_loader "github.com/dapr/dapr/pkg/components/state"
|
|
"github.com/dapr/dapr/pkg/runtime"
|
|
"github.com/dapr/go-sdk/client"
|
|
"github.com/dapr/kit/logger"
|
|
)
|
|
|
|
const (
|
|
stateStoreName = "statestore"
|
|
certificationTestPrefix = "stable-certification-"
|
|
portOffset = 2
|
|
readonlyDBPath = "artifacts/readonly.db"
|
|
|
|
keyConnectionString = "connectionString"
|
|
keyTableName = "tableName"
|
|
keyMetadataTableName = "metadataTableName"
|
|
keyCleanupInterval = "cleanupInterval"
|
|
keyBusyTimeout = "busyTimeout"
|
|
|
|
// Update this constant if you add more migrations
|
|
migrationLevel = "1"
|
|
)
|
|
|
|
func TestSQLite(t *testing.T) {
|
|
log := logger.NewLogger("dapr.components")
|
|
|
|
stateStore := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore)
|
|
|
|
stateRegistry := state_loader.NewRegistry()
|
|
stateRegistry.Logger = log
|
|
stateRegistry.RegisterComponent(func(l logger.Logger) state.Store {
|
|
return stateStore
|
|
}, "sqlite")
|
|
|
|
// Compute the hash of the read-only DB
|
|
readonlyDBHash, err := hashFile(readonlyDBPath)
|
|
require.NoError(t, err)
|
|
|
|
// Basic test validating CRUD operations
|
|
basicTest := func(port int) func(ctx flow.Context) error {
|
|
return func(ctx flow.Context) error {
|
|
ctx.T.Run("basic test", func(t *testing.T) {
|
|
client, err := client.NewClientWithPort(strconv.Itoa(port))
|
|
require.NoError(t, err)
|
|
defer client.Close()
|
|
|
|
// save state
|
|
err = client.SaveState(ctx, stateStoreName, "key1", []byte("la nebbia agli irti colli piovigginando sale"), nil)
|
|
require.NoError(t, err)
|
|
|
|
// get state
|
|
item, err := client.GetState(ctx, stateStoreName, "key1", nil)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, "la nebbia agli irti colli piovigginando sale", string(item.Value))
|
|
|
|
// update state
|
|
errUpdate := client.SaveState(ctx, stateStoreName, "key1", []byte("e sotto il maestrale urla e biancheggia il mar"), nil)
|
|
require.NoError(t, errUpdate)
|
|
item, errUpdatedGet := client.GetState(ctx, stateStoreName, "key1", nil)
|
|
require.NoError(t, errUpdatedGet)
|
|
assert.Equal(t, "e sotto il maestrale urla e biancheggia il mar", string(item.Value))
|
|
|
|
// delete state
|
|
err = client.DeleteState(ctx, stateStoreName, "key1", nil)
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// checks the state store component is not vulnerable to SQL injection
|
|
verifySQLInjectionTest := func(port int) func(ctx flow.Context) error {
|
|
return func(ctx flow.Context) error {
|
|
ctx.T.Run("sql injection test", func(t *testing.T) {
|
|
client, err := client.NewClientWithPort(strconv.Itoa(port))
|
|
require.NoError(t, err)
|
|
defer client.Close()
|
|
|
|
// common SQL injection techniques for PostgreSQL
|
|
sqlInjectionAttempts := []string{
|
|
"DROP TABLE dapr_user",
|
|
"dapr' OR '1'='1",
|
|
}
|
|
|
|
for _, sqlInjectionAttempt := range sqlInjectionAttempts {
|
|
// save state with sqlInjectionAttempt's value as key, default options: strong, last-write
|
|
err = client.SaveState(ctx, stateStoreName, sqlInjectionAttempt, []byte(sqlInjectionAttempt), nil)
|
|
assert.NoError(t, err)
|
|
|
|
// get state for key sqlInjectionAttempt's value
|
|
item, err := client.GetState(ctx, stateStoreName, sqlInjectionAttempt, nil)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, sqlInjectionAttempt, string(item.Value))
|
|
|
|
// delete state for key sqlInjectionAttempt's value
|
|
err = client.DeleteState(ctx, stateStoreName, sqlInjectionAttempt, nil)
|
|
assert.NoError(t, err)
|
|
}
|
|
})
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Checks that the read-only database cannot be written to
|
|
readonlyTest := func(port int) func(ctx flow.Context) error {
|
|
return func(ctx flow.Context) error {
|
|
ctx.T.Run("read-only test", func(t *testing.T) {
|
|
client, err := client.NewClientWithPort(strconv.Itoa(port))
|
|
require.NoError(t, err)
|
|
defer client.Close()
|
|
|
|
// Retrieving state should work
|
|
item, err := client.GetState(ctx, stateStoreName, "my_string", nil)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, `"hello world"`, string(item.Value))
|
|
|
|
// Saving state should fail
|
|
err = client.SaveState(ctx, stateStoreName, "my_string", []byte("updated!"), nil)
|
|
require.Error(t, err)
|
|
assert.ErrorContains(t, err, "attempt to write a readonly database")
|
|
|
|
// Value should not be updated
|
|
item, err = client.GetState(ctx, stateStoreName, "my_string", nil)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, `"hello world"`, string(item.Value))
|
|
|
|
// Deleting state should fail
|
|
err = client.DeleteState(ctx, stateStoreName, "my_string", nil)
|
|
require.Error(t, err)
|
|
assert.ErrorContains(t, err, "attempt to write a readonly database")
|
|
})
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Checks the hash of the readonly DB (after the sidecar has been stopped) to confirm it wasn't modified
|
|
readonlyConfirmTest := func(ctx flow.Context) error {
|
|
ctx.T.Run("confirm read-only test", func(t *testing.T) {
|
|
newHash, err := hashFile(readonlyDBPath)
|
|
require.NoError(t, err)
|
|
|
|
assert.Equal(t, readonlyDBHash, newHash, "read-only datbaase has been modified on disk")
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
// Validates TTLs and garbage collections
|
|
ttlTest := func(ctx flow.Context) error {
|
|
md := state.Metadata{
|
|
Base: metadata.Base{
|
|
Name: "ttltest",
|
|
Properties: map[string]string{
|
|
keyConnectionString: "file::memory:",
|
|
keyTableName: "ttl_state",
|
|
},
|
|
},
|
|
}
|
|
|
|
ctx.T.Run("parse cleanupIntervalInSeconds", func(t *testing.T) {
|
|
t.Run("default value", func(t *testing.T) {
|
|
// Default value is disabled
|
|
delete(md.Properties, keyCleanupInterval)
|
|
storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore)
|
|
|
|
err := storeObj.Init(context.Background(), md)
|
|
require.NoError(t, err, "failed to init")
|
|
defer storeObj.Close()
|
|
|
|
dbAccess := storeObj.GetDBAccess()
|
|
require.NotNil(t, dbAccess)
|
|
|
|
cleanupInterval := dbAccess.GetCleanupInterval()
|
|
assert.Equal(t, time.Duration(0), cleanupInterval)
|
|
})
|
|
|
|
t.Run("positive value", func(t *testing.T) {
|
|
md.Properties[keyCleanupInterval] = "10s"
|
|
storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore)
|
|
|
|
err := storeObj.Init(context.Background(), md)
|
|
require.NoError(t, err, "failed to init")
|
|
defer storeObj.Close()
|
|
|
|
dbAccess := storeObj.GetDBAccess()
|
|
require.NotNil(t, dbAccess)
|
|
|
|
cleanupInterval := dbAccess.GetCleanupInterval()
|
|
assert.Equal(t, time.Duration(10*time.Second), cleanupInterval)
|
|
})
|
|
|
|
t.Run("disabled", func(t *testing.T) {
|
|
// A value of 0 means that the cleanup is disabled
|
|
md.Properties[keyCleanupInterval] = "0"
|
|
storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore)
|
|
|
|
err := storeObj.Init(context.Background(), md)
|
|
require.NoError(t, err, "failed to init")
|
|
defer storeObj.Close()
|
|
|
|
dbAccess := storeObj.GetDBAccess()
|
|
require.NotNil(t, dbAccess)
|
|
|
|
cleanupInterval := dbAccess.GetCleanupInterval()
|
|
assert.Equal(t, time.Duration(0), cleanupInterval)
|
|
})
|
|
|
|
})
|
|
|
|
ctx.T.Run("cleanup", func(t *testing.T) {
|
|
md := state.Metadata{
|
|
Base: metadata.Base{
|
|
Name: "ttltest",
|
|
Properties: map[string]string{
|
|
keyConnectionString: "file::memory:",
|
|
keyTableName: "ttl_state",
|
|
keyMetadataTableName: "ttl_metadata",
|
|
},
|
|
},
|
|
}
|
|
|
|
t.Run("automatically delete expired records", func(t *testing.T) {
|
|
// Run every second
|
|
md.Properties[keyCleanupInterval] = "1s"
|
|
|
|
storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore)
|
|
err := storeObj.Init(context.Background(), md)
|
|
require.NoError(t, err, "failed to init")
|
|
defer storeObj.Close()
|
|
|
|
dbClient := storeObj.GetDBAccess().GetConnection()
|
|
require.NotNil(t, dbClient)
|
|
|
|
// Seed the database with some records
|
|
err = populateTTLRecords(ctx, dbClient)
|
|
require.NoError(t, err, "failed to seed records")
|
|
|
|
// Wait 2 seconds then verify we have only 10 rows left
|
|
time.Sleep(2 * time.Second)
|
|
count, err := countRowsInTable(ctx, dbClient, "ttl_state")
|
|
assert.NoError(t, err, "failed to run query to count rows")
|
|
assert.Equal(t, 10, count)
|
|
|
|
// The "last-cleanup" value should be <= 1 second (+ a bit of buffer)
|
|
lastCleanup, err := loadLastCleanupInterval(ctx, dbClient, "ttl_metadata")
|
|
require.NoError(t, err, "failed to load value for 'last-cleanup'")
|
|
assert.LessOrEqual(t, lastCleanup, int64(1200))
|
|
|
|
// Wait 6 more seconds and verify there are no more rows left
|
|
time.Sleep(6 * time.Second)
|
|
count, err = countRowsInTable(ctx, dbClient, "ttl_state")
|
|
require.NoError(t, err, "failed to run query to count rows")
|
|
assert.Equal(t, 0, count)
|
|
|
|
// The "last-cleanup" value should be <= 1 second (+ a bit of buffer)
|
|
lastCleanup, err = loadLastCleanupInterval(ctx, dbClient, "ttl_metadata")
|
|
require.NoError(t, err, "failed to load value for 'last-cleanup'")
|
|
assert.LessOrEqual(t, lastCleanup, int64(1200))
|
|
})
|
|
|
|
t.Run("cleanup concurrency", func(t *testing.T) {
|
|
// Set to run every hour
|
|
// (we'll manually trigger more frequent iterations)
|
|
md.Properties[keyCleanupInterval] = "1h"
|
|
|
|
storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore)
|
|
err := storeObj.Init(context.Background(), md)
|
|
require.NoError(t, err, "failed to init")
|
|
defer storeObj.Close()
|
|
|
|
dbAccess := storeObj.GetDBAccess()
|
|
require.NotNil(t, dbAccess)
|
|
|
|
dbClient := dbAccess.GetConnection()
|
|
|
|
// Seed the database with some records
|
|
err = populateTTLRecords(ctx, dbClient)
|
|
require.NoError(t, err, "failed to seed records")
|
|
|
|
// Validate that 20 records are present
|
|
count, err := countRowsInTable(ctx, dbClient, "ttl_state")
|
|
require.NoError(t, err, "failed to run query to count rows")
|
|
assert.Equal(t, 20, count)
|
|
|
|
// Set last-cleanup to 1s ago (less than 3600s)
|
|
err = setValueInMetadataTable(ctx, dbClient, "ttl_metadata", "'last-cleanup'", "datetime('now', '-1 second')")
|
|
require.NoError(t, err, "failed to set last-cleanup")
|
|
|
|
// The "last-cleanup" value should be ~1 second (+ a bit of buffer)
|
|
lastCleanup, err := loadLastCleanupInterval(ctx, dbClient, "ttl_metadata")
|
|
require.NoError(t, err, "failed to load value for 'last-cleanup'")
|
|
assert.LessOrEqual(t, lastCleanup, int64(1200))
|
|
lastCleanupValueOrig, err := getValueFromMetadataTable(ctx, dbClient, "ttl_metadata", "last-cleanup")
|
|
require.NoError(t, err, "failed to load absolute value for 'last-cleanup'")
|
|
require.NotEmpty(t, lastCleanupValueOrig)
|
|
|
|
// Trigger the background cleanup, which should do nothing because the last cleanup was < 3600s
|
|
err = dbAccess.CleanupExpired()
|
|
require.NoError(t, err, "CleanupExpired returned an error")
|
|
|
|
// Validate that 20 records are still present
|
|
count, err = countRowsInTable(ctx, dbClient, "ttl_state")
|
|
require.NoError(t, err, "failed to run query to count rows")
|
|
assert.Equal(t, 20, count)
|
|
|
|
// The "last-cleanup" value should not have been changed
|
|
lastCleanupValue, err := getValueFromMetadataTable(ctx, dbClient, "ttl_metadata", "last-cleanup")
|
|
require.NoError(t, err, "failed to load absolute value for 'last-cleanup'")
|
|
assert.Equal(t, lastCleanupValueOrig, lastCleanupValue)
|
|
})
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
// Tests the "Init" method and the database migrations
|
|
// It also tests the metadata properties "tableName" and "metadataTableName"
|
|
initTest := func(ctx flow.Context) error {
|
|
ctx.T.Run("init and migrations", func(t *testing.T) {
|
|
// Create a temporary database and create a connection to it
|
|
tmpDir := t.TempDir()
|
|
dbPath := filepath.Join(tmpDir, "init.db")
|
|
dbClient, err := sql.Open("sqlite", "file:"+dbPath+"?_pragma=busy_timeout%282000%29&_pragma=journal_mode%28WAL%29&_txlock=immediate")
|
|
require.NoError(t, err)
|
|
|
|
md := state.Metadata{
|
|
Base: metadata.Base{
|
|
Name: "inittest",
|
|
Properties: map[string]string{
|
|
keyConnectionString: dbPath,
|
|
},
|
|
},
|
|
}
|
|
|
|
t.Run("initial state clean", func(t *testing.T) {
|
|
storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore)
|
|
md.Properties[keyTableName] = "clean_state"
|
|
md.Properties[keyMetadataTableName] = "clean_metadata"
|
|
|
|
// Init and perform the migrations
|
|
err := storeObj.Init(context.Background(), md)
|
|
require.NoError(t, err, "failed to init")
|
|
defer storeObj.Close()
|
|
|
|
// We should have the tables correctly created
|
|
err = tableExists(dbClient, "clean_state")
|
|
assert.NoError(t, err, "state table does not exist")
|
|
err = tableExists(dbClient, "clean_metadata")
|
|
assert.NoError(t, err, "metadata table does not exist")
|
|
|
|
// Ensure migration level is correct
|
|
level, err := getMigrationLevel(dbClient, "clean_metadata")
|
|
assert.NoError(t, err, "failed to get migration level")
|
|
assert.Equal(t, migrationLevel, level, "migration level mismatch: found '%s' but expected '%s'", level, migrationLevel)
|
|
})
|
|
|
|
t.Run("all migrations performed", func(t *testing.T) {
|
|
// Re-use "clean_state" and "clean_metadata"
|
|
storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore)
|
|
md.Properties[keyTableName] = "clean_state"
|
|
md.Properties[keyMetadataTableName] = "clean_metadata"
|
|
|
|
// Should already have migration level 2
|
|
level, err := getMigrationLevel(dbClient, "clean_metadata")
|
|
assert.NoError(t, err, "failed to get migration level")
|
|
assert.Equal(t, migrationLevel, level, "migration level mismatch: found '%s' but expected '%s'", level, migrationLevel)
|
|
|
|
// Init and perform the migrations
|
|
err = storeObj.Init(context.Background(), md)
|
|
require.NoError(t, err, "failed to init")
|
|
defer storeObj.Close()
|
|
|
|
// Ensure migration level is correct
|
|
level, err = getMigrationLevel(dbClient, "clean_metadata")
|
|
assert.NoError(t, err, "failed to get migration level")
|
|
assert.Equal(t, migrationLevel, level, "migration level mismatch: found '%s' but expected '%s'", level, migrationLevel)
|
|
})
|
|
|
|
t.Run("migrate from implied level 1", func(t *testing.T) {
|
|
// Before we added the metadata table, the "implied" level 1 had only the state table
|
|
// Create that table to simulate the old state and validate the migration
|
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel()
|
|
_, err := dbClient.ExecContext(
|
|
ctx,
|
|
`CREATE TABLE pre_state (
|
|
key TEXT NOT NULL PRIMARY KEY,
|
|
value TEXT NOT NULL,
|
|
is_binary BOOLEAN NOT NULL,
|
|
etag TEXT NOT NULL,
|
|
expiration_time TIMESTAMP DEFAULT NULL,
|
|
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
|
|
)`,
|
|
)
|
|
require.NoError(t, err, "failed to create initial migration state")
|
|
|
|
storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore)
|
|
md.Properties[keyTableName] = "pre_state"
|
|
md.Properties[keyMetadataTableName] = "pre_metadata"
|
|
|
|
// Init and perform the migrations
|
|
err = storeObj.Init(context.Background(), md)
|
|
require.NoError(t, err, "failed to init")
|
|
defer storeObj.Close()
|
|
|
|
// We should have the metadata table created
|
|
err = tableExists(dbClient, "pre_metadata")
|
|
assert.NoError(t, err, "metadata table does not exist")
|
|
|
|
// Ensure migration level is correct
|
|
level, err := getMigrationLevel(dbClient, "pre_metadata")
|
|
assert.NoError(t, err, "failed to get migration level")
|
|
assert.Equal(t, migrationLevel, level, "migration level mismatch: found '%s' but expected '%s'", level, migrationLevel)
|
|
})
|
|
|
|
t.Run("initialize components concurrently", func(t *testing.T) {
|
|
// Initializes 3 components concurrently using the same table names, and ensure that they perform migrations without conflicts and race conditions
|
|
md.Properties[keyTableName] = "mystate"
|
|
md.Properties[keyMetadataTableName] = "mymetadata"
|
|
|
|
errs := make(chan error, 3)
|
|
hasLogs := atomic.Int32{}
|
|
for i := 0; i < 3; i++ {
|
|
go func(i int) {
|
|
buf := &bytes.Buffer{}
|
|
l := logger.NewLogger("multi-init-" + strconv.Itoa(i))
|
|
l.SetOutput(io.MultiWriter(buf, os.Stdout))
|
|
|
|
// Init and perform the migrations
|
|
storeObj := state_sqlite.NewSQLiteStateStore(l).(*state_sqlite.SQLiteStore)
|
|
err := storeObj.Init(context.Background(), md)
|
|
if err != nil {
|
|
errs <- fmt.Errorf("%d failed to init: %w", i, err)
|
|
return
|
|
}
|
|
|
|
// One and only one of the loggers should have any message indicating "Performing migration 1"
|
|
if strings.Contains(buf.String(), "Performing migration 1") {
|
|
hasLogs.Add(1)
|
|
}
|
|
|
|
// Close the component right away
|
|
err = storeObj.Close()
|
|
if err != nil {
|
|
errs <- fmt.Errorf("%d failed to close: %w", i, err)
|
|
return
|
|
}
|
|
|
|
errs <- nil
|
|
}(i)
|
|
}
|
|
|
|
failed := false
|
|
for i := 0; i < 3; i++ {
|
|
select {
|
|
case err := <-errs:
|
|
failed = failed || !assert.NoError(t, err)
|
|
case <-time.After(time.Minute):
|
|
t.Fatal("timed out waiting for components to initialize")
|
|
}
|
|
}
|
|
if failed {
|
|
// Short-circuit
|
|
t.FailNow()
|
|
}
|
|
|
|
// Exactly one component should have written logs (which means generated any activity during migrations)
|
|
assert.Equal(t, int32(1), hasLogs.Load(), "expected 1 component to log anything to indicate migration activity, but got %d", hasLogs.Load())
|
|
|
|
// We should have the tables correctly created
|
|
err = tableExists(dbClient, "mystate")
|
|
assert.NoError(t, err, "state table does not exist")
|
|
err = tableExists(dbClient, "mymetadata")
|
|
assert.NoError(t, err, "metadata table does not exist")
|
|
|
|
// Ensure migration level is correct
|
|
level, err := getMigrationLevel(dbClient, "mymetadata")
|
|
assert.NoError(t, err, "failed to get migration level")
|
|
assert.Equal(t, migrationLevel, level, "migration level mismatch: found '%s' but expected '%s'", level, migrationLevel)
|
|
})
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
// Tests many concurrent operations to ensure the database can operate successfully
|
|
concurrencyTest := func(ctx flow.Context) error {
|
|
ctx.T.Run("concurrency", func(t *testing.T) {
|
|
// Create a temporary database
|
|
tmpDir := t.TempDir()
|
|
dbPath := filepath.Join(tmpDir, "init.db")
|
|
|
|
md := state.Metadata{
|
|
Base: metadata.Base{
|
|
Name: "inittest",
|
|
Properties: map[string]string{
|
|
keyConnectionString: dbPath,
|
|
// Connect with a higher busy timeout
|
|
keyBusyTimeout: "10s",
|
|
},
|
|
},
|
|
}
|
|
|
|
// Maintain all store objects
|
|
var storeObjs []*state_sqlite.SQLiteStore
|
|
newStoreObj := func() error {
|
|
storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore)
|
|
err := storeObj.Init(context.Background(), md)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
storeObjs = append(storeObjs, storeObj)
|
|
return nil
|
|
}
|
|
defer func() {
|
|
for _, storeObj := range storeObjs {
|
|
_ = storeObj.Close()
|
|
}
|
|
}()
|
|
|
|
testMultipleKeys := func(t *testing.T) {
|
|
const parallel = 10
|
|
const runs = 30
|
|
|
|
ctx := context.Background()
|
|
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(parallel)
|
|
for i := 0; i < parallel; i++ {
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
|
|
var (
|
|
res *state.GetResponse
|
|
err error
|
|
)
|
|
|
|
storeObj := storeObjs[i%len(storeObjs)]
|
|
|
|
key := fmt.Sprintf("multiple_%d", i)
|
|
for j := 0; j < runs; j++ {
|
|
// Save state
|
|
err = storeObj.Set(ctx, &state.SetRequest{
|
|
Key: key,
|
|
Value: j,
|
|
})
|
|
assert.NoError(t, err)
|
|
|
|
// Retrieve state
|
|
res, err = storeObj.Get(ctx, &state.GetRequest{
|
|
Key: key,
|
|
})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, strconv.Itoa(j), string(res.Data))
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
testSameKey := func(t *testing.T) {
|
|
const parallel = 10
|
|
const runs = 30
|
|
|
|
ctx := context.Background()
|
|
|
|
// We have as many counters as we have number of parallel workers. The final value will be one of them.
|
|
// This is because we have a race condition between the time we call `save := counter.Add(1)` and when we set the value, causing tests to be flaky since the value stored in the database may not be the one stored in counter.
|
|
// This is not a bug in the SQLite component, as it's working as intended; the race condition is on the tests themselves.
|
|
// The real solution to the race condition (and what one should do in a real-world app) would be to wrap the call to increment counter and the store operation in a mutex. However, that would make it so only one query is hitting the database at the same time, while here we're precisely testing how the component handles multiple writes at the same time.
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(parallel)
|
|
counters := [parallel]atomic.Int32{}
|
|
key := "same"
|
|
for i := 0; i < parallel; i++ {
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
|
|
var err error
|
|
|
|
storeObj := storeObjs[i%len(storeObjs)]
|
|
|
|
for j := 0; j < runs; j++ {
|
|
save := counters[i].Add(1)
|
|
// Save state
|
|
err = storeObj.Set(ctx, &state.SetRequest{
|
|
Key: key,
|
|
Value: int(save),
|
|
})
|
|
assert.NoError(t, err)
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// Retrieve state
|
|
res, err := storeObjs[0].Get(ctx, &state.GetRequest{
|
|
Key: key,
|
|
})
|
|
assert.NoError(t, err)
|
|
|
|
expect := [parallel]string{}
|
|
for i := 0; i < parallel; i++ {
|
|
expect[i] = strconv.Itoa(int(counters[i].Load()))
|
|
}
|
|
assert.Contains(t, expect, string(res.Data))
|
|
}
|
|
|
|
// Init one store object
|
|
require.NoError(t, newStoreObj(), "failed to init")
|
|
|
|
t.Run("same connection, multiple keys", testMultipleKeys)
|
|
t.Run("same connection, same key", testSameKey)
|
|
|
|
// Init two more store objects
|
|
require.NoError(t, newStoreObj(), "failed to init")
|
|
require.NoError(t, newStoreObj(), "failed to init")
|
|
|
|
t.Run("multiple connections, multiple keys", testMultipleKeys)
|
|
t.Run("multiple connections, same key", testSameKey)
|
|
})
|
|
return nil
|
|
}
|
|
|
|
// This makes it possible to comment-out individual tests
|
|
_ = basicTest
|
|
_ = verifySQLInjectionTest
|
|
_ = readonlyTest
|
|
_ = ttlTest
|
|
_ = initTest
|
|
_ = concurrencyTest
|
|
|
|
flow.New(t, "Run tests").
|
|
// Start the sidecar with the in-memory database
|
|
Step(sidecar.Run("sqlite-memory",
|
|
embedded.WithoutApp(),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)),
|
|
embedded.WithProfilingEnabled(false),
|
|
embedded.WithResourcesPath("resources/memory"),
|
|
embedded.WithStates(stateRegistry),
|
|
)).
|
|
|
|
// Run some basic certification tests with the in-memory database
|
|
Step("run basic test", basicTest(runtime.DefaultDaprAPIGRPCPort)).
|
|
Step("run SQL injection test", verifySQLInjectionTest(runtime.DefaultDaprAPIGRPCPort)).
|
|
Step("stop app", sidecar.Stop("sqlite-memory")).
|
|
|
|
// Start the sidecar with a read-only database
|
|
Step(sidecar.Run("sqlite-readonly",
|
|
embedded.WithoutApp(),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset)),
|
|
embedded.WithProfilingEnabled(false),
|
|
embedded.WithResourcesPath("resources/readonly"),
|
|
embedded.WithStates(stateRegistry),
|
|
)).
|
|
Step("run read-only test", readonlyTest(runtime.DefaultDaprAPIGRPCPort+portOffset)).
|
|
Step("stop sqlite-readonly sidecar", sidecar.Stop("sqlite-readonly")).
|
|
Step("confirm read-only test", readonlyConfirmTest).
|
|
|
|
// Run TTL tests
|
|
Step("run TTL test", ttlTest).
|
|
|
|
// Run init and migrations tests
|
|
Step("run init and migrations test", initTest).
|
|
|
|
// Concurrency test
|
|
Step("run concurrency test", concurrencyTest).
|
|
|
|
// Start tests
|
|
Run()
|
|
}
|
|
|
|
func populateTTLRecords(ctx context.Context, dbClient *sql.DB) error {
|
|
// Insert 10 records that have expired, and 10 that will expire in 4 seconds
|
|
// Note this uses fmt.Sprintf and not parametrized queries-on purpose, so we can pass multiple rows in the same INSERT query
|
|
// Normally this would be a very bad idea, just don't do it outside of tests (and maybe not even in tests like I'm doing right now)...
|
|
rows := make([]string, 20)
|
|
for i := 0; i < 10; i++ {
|
|
rows[i] = fmt.Sprintf(`("expired_%d", '"value_%d"', false, "etag", datetime('now', '-1 minute'))`, i, i)
|
|
}
|
|
for i := 0; i < 10; i++ {
|
|
rows[i+10] = fmt.Sprintf(`("notexpired_%d", '"value_%d"', false, "etag", datetime('now', '+4 seconds'))`, i, i)
|
|
}
|
|
queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
|
defer cancel()
|
|
q := "INSERT INTO ttl_state (key, value, is_binary, etag, expiration_time) VALUES " + strings.Join(rows, ", ")
|
|
res, err := dbClient.ExecContext(queryCtx, q)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
n, _ := res.RowsAffected()
|
|
if n != 20 {
|
|
return fmt.Errorf("expected to insert 20 rows, but only got %d", n)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func countRowsInTable(ctx context.Context, dbClient *sql.DB, table string) (count int, err error) {
|
|
queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
|
err = dbClient.QueryRowContext(queryCtx, "SELECT COUNT(key) FROM "+table).Scan(&count)
|
|
cancel()
|
|
return
|
|
}
|
|
|
|
func loadLastCleanupInterval(ctx context.Context, dbClient *sql.DB, table string) (lastCleanup int64, err error) {
|
|
queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
|
err = dbClient.
|
|
QueryRowContext(queryCtx,
|
|
fmt.Sprintf("SELECT unixepoch(CURRENT_TIMESTAMP) - unixepoch(value) FROM %s WHERE key = 'last-cleanup'", table),
|
|
).
|
|
Scan(&lastCleanup)
|
|
cancel()
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
err = nil
|
|
}
|
|
return
|
|
}
|
|
|
|
// Note this uses fmt.Sprintf and not parametrized queries-on purpose, so we can pass SQLite functions.
|
|
// Normally this would be a very bad idea, just don't do it... (do as I say don't do as I do :) ).
|
|
func setValueInMetadataTable(ctx context.Context, dbClient *sql.DB, table, key, value string) error {
|
|
queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
_, err := dbClient.ExecContext(queryCtx,
|
|
//nolint:gosec
|
|
fmt.Sprintf(`REPLACE INTO %s (key, value) VALUES (%s, %s)`, table, key, value),
|
|
)
|
|
cancel()
|
|
return err
|
|
}
|
|
|
|
func getValueFromMetadataTable(ctx context.Context, dbClient *sql.DB, table, key string) (value string, err error) {
|
|
queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
|
err = dbClient.
|
|
QueryRowContext(queryCtx, fmt.Sprintf("SELECT value FROM %s WHERE key = ?", table), key).
|
|
Scan(&value)
|
|
cancel()
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
err = nil
|
|
}
|
|
return
|
|
}
|
|
|
|
func tableExists(dbClient *sql.DB, tableName string) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel()
|
|
|
|
var exists string
|
|
// Returns 1 or 0 as a string if the table exists or not.
|
|
const q = `SELECT EXISTS (
|
|
SELECT name FROM sqlite_master WHERE type='table' AND name = ?
|
|
) AS 'exists'`
|
|
err := dbClient.QueryRowContext(ctx, q, tableName).
|
|
Scan(&exists)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if exists != "1" {
|
|
return errors.New("table not found")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func getMigrationLevel(dbClient *sql.DB, metadataTable string) (level string, err error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel()
|
|
|
|
err = dbClient.
|
|
QueryRowContext(ctx, fmt.Sprintf(`SELECT value FROM %s WHERE key = 'migrations'`, metadataTable)).
|
|
Scan(&level)
|
|
if err != nil && errors.Is(err, sql.ErrNoRows) {
|
|
err = nil
|
|
level = ""
|
|
}
|
|
return level, err
|
|
}
|
|
|
|
// Calculates the SHA-256 hash of a file
|
|
func hashFile(path string) (string, error) {
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer f.Close()
|
|
|
|
h := sha256.New()
|
|
_, err = io.Copy(h, f)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return hex.EncodeToString(h.Sum(nil)), nil
|
|
}
|