Added tests for migrations
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
parent
6bee9b6192
commit
c88cec451c
|
|
@ -34,24 +34,33 @@ type migrations struct {
|
|||
|
||||
// Perform the required migrations
|
||||
func (m *migrations) Perform(ctx context.Context) error {
|
||||
// Begin a transaction
|
||||
tx, err := m.Conn.Begin()
|
||||
// Begin an exclusive transaction
|
||||
// We can't use Begin because that doesn't allow us setting the level of transaction
|
||||
queryCtx, cancel := context.WithTimeout(ctx, time.Minute)
|
||||
_, err := m.Conn.ExecContext(queryCtx, "BEGIN EXCLUSIVE TRANSACTION")
|
||||
cancel()
|
||||
if err != nil {
|
||||
return fmt.Errorf("faild to begin transaction: %w", err)
|
||||
}
|
||||
|
||||
// Rollback the transaction in a deferred statement to catch errors
|
||||
success := false
|
||||
defer func() {
|
||||
err = tx.Rollback()
|
||||
if err != nil && err != sql.ErrTxDone {
|
||||
if success {
|
||||
return
|
||||
}
|
||||
queryCtx, cancel = context.WithTimeout(ctx, time.Minute)
|
||||
_, err = m.Conn.ExecContext(queryCtx, "ROLLBACK TRANSACTION")
|
||||
cancel()
|
||||
if err != nil {
|
||||
// Panicking here, as this forcibly closes the session and thus ensures we are not leaving locks hanging around
|
||||
m.Logger.Fatalf("Failed to rollback transaction: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Check if the metadata table exists, which we also use to store the migration level
|
||||
queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
exists, err := m.tableExists(queryCtx, tx, m.MetadataTableName)
|
||||
queryCtx, cancel = context.WithTimeout(ctx, 30*time.Second)
|
||||
exists, err := m.tableExists(queryCtx, m.Conn, m.MetadataTableName)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check if the metadata table exists: %w", err)
|
||||
|
|
@ -60,7 +69,7 @@ func (m *migrations) Perform(ctx context.Context) error {
|
|||
// If the table doesn't exist, create it
|
||||
if !exists {
|
||||
queryCtx, cancel = context.WithTimeout(ctx, 30*time.Second)
|
||||
err = m.createMetadataTable(queryCtx, tx)
|
||||
err = m.createMetadataTable(queryCtx, m.Conn)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create metadata table: %w", err)
|
||||
|
|
@ -73,7 +82,7 @@ func (m *migrations) Perform(ctx context.Context) error {
|
|||
migrationLevel int
|
||||
)
|
||||
queryCtx, cancel = context.WithTimeout(ctx, 30*time.Second)
|
||||
err = tx.QueryRowContext(queryCtx,
|
||||
err = m.Conn.QueryRowContext(queryCtx,
|
||||
fmt.Sprintf(`SELECT value FROM %s WHERE key = 'migrations'`, m.MetadataTableName),
|
||||
).Scan(&migrationLevelStr)
|
||||
cancel()
|
||||
|
|
@ -92,13 +101,13 @@ func (m *migrations) Perform(ctx context.Context) error {
|
|||
// Perform the migrations
|
||||
for i := migrationLevel; i < len(allMigrations); i++ {
|
||||
m.Logger.Infof("Performing migration %d", i+1)
|
||||
err = allMigrations[i](ctx, tx, m)
|
||||
err = allMigrations[i](ctx, m.Conn, m)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to perform migration %d: %w", i+1, err)
|
||||
}
|
||||
|
||||
queryCtx, cancel = context.WithTimeout(ctx, 30*time.Second)
|
||||
_, err = tx.ExecContext(queryCtx,
|
||||
_, err = m.Conn.ExecContext(queryCtx,
|
||||
fmt.Sprintf(`REPLACE INTO %s (key, value) VALUES ('migrations', ?)`, m.MetadataTableName),
|
||||
strconv.Itoa(i+1),
|
||||
)
|
||||
|
|
@ -109,11 +118,16 @@ func (m *migrations) Perform(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// Commit the transaction
|
||||
err = tx.Commit()
|
||||
queryCtx, cancel = context.WithTimeout(ctx, time.Minute)
|
||||
_, err = m.Conn.ExecContext(queryCtx, "COMMIT TRANSACTION")
|
||||
cancel()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to commit transaction")
|
||||
}
|
||||
|
||||
// Set success to true so we don't also run a rollback
|
||||
success = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -157,7 +171,7 @@ var allMigrations = [1]func(ctx context.Context, db querier, m *migrations) erro
|
|||
_, err := db.ExecContext(
|
||||
ctx,
|
||||
fmt.Sprintf(
|
||||
`CREATE TABLE %s (
|
||||
`CREATE TABLE IF NOT EXISTS %s (
|
||||
key TEXT NOT NULL PRIMARY KEY,
|
||||
value TEXT NOT NULL,
|
||||
is_binary BOOLEAN NOT NULL,
|
||||
|
|
|
|||
|
|
@ -55,6 +55,9 @@ const (
|
|||
keyCleanupInterval = "cleanupIntervalInSeconds"
|
||||
keyTableName = "tableName"
|
||||
keyMetadataTableName = "metadataTableName"
|
||||
|
||||
// Update this constant if you add more migrations
|
||||
migrationLevel = "2"
|
||||
)
|
||||
|
||||
func TestPostgreSQL(t *testing.T) {
|
||||
|
|
@ -72,9 +75,6 @@ func TestPostgreSQL(t *testing.T) {
|
|||
|
||||
currentGrpcPort := ports[0]
|
||||
|
||||
// Update this constant if you add more migrations
|
||||
const migrationLevel = "2"
|
||||
|
||||
// Holds a DB client as the "postgres" (ie. "root") user which we'll use to validate migrations and other changes in state
|
||||
var dbClient *pgx.Conn
|
||||
connectStep := func(ctx flow.Context) error {
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ Uses an in-memory, temporary database.
|
|||
|
||||
## TTLs and cleanups
|
||||
|
||||
Also test the `tableName` and `metadataTableName` metadata properties.
|
||||
Also tests the `tableName` and `metadataTableName` metadata properties.
|
||||
|
||||
1. Correctly parse the `cleanupIntervalInSeconds` metadata property:
|
||||
- No value uses the default value (disabled)
|
||||
|
|
@ -41,3 +41,12 @@ Also test the `tableName` and `metadataTableName` metadata properties.
|
|||
- A zero or negative value disables the cleanup
|
||||
2. The cleanup method deletes expired records and updates the metadata table with the last time it ran
|
||||
3. The cleanup method doesn't run if the last iteration was less than `cleanupIntervalInSeconds` or if another process is doing the cleanup
|
||||
|
||||
## Initialization and migrations
|
||||
|
||||
Also tests the `tableName` and `metadataTableName` metadata properties.
|
||||
|
||||
1. Initializes the component with names for tables that don't exist
|
||||
3. Initializes the component with all migrations performed (current level is "1")
|
||||
4. Initializes the component with only the state table, created before the metadata table was added (implied migration level "1")
|
||||
5. Initializes three components at the same time and ensure no race conditions exist in performing migrations
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ require (
|
|||
github.com/dapr/go-sdk v1.6.0
|
||||
github.com/dapr/kit v0.0.4
|
||||
github.com/stretchr/testify v1.8.1
|
||||
modernc.org/sqlite v1.20.3
|
||||
)
|
||||
|
||||
require (
|
||||
|
|
@ -141,7 +142,6 @@ require (
|
|||
modernc.org/mathutil v1.5.0 // indirect
|
||||
modernc.org/memory v1.4.0 // indirect
|
||||
modernc.org/opt v0.1.3 // indirect
|
||||
modernc.org/sqlite v1.20.3 // indirect
|
||||
modernc.org/strutil v1.1.3 // indirect
|
||||
modernc.org/token v1.0.1 // indirect
|
||||
sigs.k8s.io/controller-runtime v0.14.1 // indirect
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ limitations under the License.
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"database/sql"
|
||||
|
|
@ -22,14 +23,19 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
// Blank import for the underlying SQLite Driver.
|
||||
_ "modernc.org/sqlite"
|
||||
|
||||
"github.com/dapr/components-contrib/metadata"
|
||||
"github.com/dapr/components-contrib/tests/certification/embedded"
|
||||
"github.com/dapr/components-contrib/tests/certification/flow"
|
||||
|
|
@ -53,6 +59,9 @@ const (
|
|||
keyTableName = "tableName"
|
||||
keyMetadataTableName = "metadataTableName"
|
||||
keyCleanupInterval = "cleanupInterval"
|
||||
|
||||
// Update this constant if you add more migrations
|
||||
migrationLevel = "1"
|
||||
)
|
||||
|
||||
func TestSQLite(t *testing.T) {
|
||||
|
|
@ -264,7 +273,9 @@ func TestSQLite(t *testing.T) {
|
|||
err := storeObj.Init(md)
|
||||
require.NoError(t, err, "failed to init")
|
||||
defer storeObj.Close()
|
||||
|
||||
dbClient := storeObj.GetDBAccess().GetConnection()
|
||||
require.NotNil(t, dbClient)
|
||||
|
||||
// Seed the database with some records
|
||||
err = populateTTLRecords(ctx, dbClient)
|
||||
|
|
@ -348,6 +359,183 @@ func TestSQLite(t *testing.T) {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Tests the "Init" method and the database migrations
|
||||
// It also tests the metadata properties "tableName" and "metadataTableName"
|
||||
initTest := func(ctx flow.Context) error {
|
||||
ctx.T.Run("init and migrations", func(t *testing.T) {
|
||||
// Create a temporary database and create a connection to it
|
||||
tmpDir := t.TempDir()
|
||||
dbPath := filepath.Join(tmpDir, "init.db")
|
||||
dbClient, err := sql.Open("sqlite", "file:"+dbPath+"?_pragma=busy_timeout%282000%29&_pragma=journal_mode%28WAL%29&_txlock=immediate")
|
||||
require.NoError(t, err)
|
||||
|
||||
md := state.Metadata{
|
||||
Base: metadata.Base{
|
||||
Name: "inittest",
|
||||
Properties: map[string]string{
|
||||
keyConnectionString: dbPath,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("initial state clean", func(t *testing.T) {
|
||||
storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore)
|
||||
md.Properties[keyTableName] = "clean_state"
|
||||
md.Properties[keyMetadataTableName] = "clean_metadata"
|
||||
|
||||
// Init and perform the migrations
|
||||
err := storeObj.Init(md)
|
||||
require.NoError(t, err, "failed to init")
|
||||
defer storeObj.Close()
|
||||
|
||||
// We should have the tables correctly created
|
||||
err = tableExists(dbClient, "clean_state")
|
||||
assert.NoError(t, err, "state table does not exist")
|
||||
err = tableExists(dbClient, "clean_metadata")
|
||||
assert.NoError(t, err, "metadata table does not exist")
|
||||
|
||||
// Ensure migration level is correct
|
||||
level, err := getMigrationLevel(dbClient, "clean_metadata")
|
||||
assert.NoError(t, err, "failed to get migration level")
|
||||
assert.Equal(t, migrationLevel, level, "migration level mismatch: found '%s' but expected '%s'", level, migrationLevel)
|
||||
})
|
||||
|
||||
t.Run("all migrations performed", func(t *testing.T) {
|
||||
// Re-use "clean_state" and "clean_metadata"
|
||||
storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore)
|
||||
md.Properties[keyTableName] = "clean_state"
|
||||
md.Properties[keyMetadataTableName] = "clean_metadata"
|
||||
|
||||
// Should already have migration level 2
|
||||
level, err := getMigrationLevel(dbClient, "clean_metadata")
|
||||
assert.NoError(t, err, "failed to get migration level")
|
||||
assert.Equal(t, migrationLevel, level, "migration level mismatch: found '%s' but expected '%s'", level, migrationLevel)
|
||||
|
||||
// Init and perform the migrations
|
||||
err = storeObj.Init(md)
|
||||
require.NoError(t, err, "failed to init")
|
||||
defer storeObj.Close()
|
||||
|
||||
// Ensure migration level is correct
|
||||
level, err = getMigrationLevel(dbClient, "clean_metadata")
|
||||
assert.NoError(t, err, "failed to get migration level")
|
||||
assert.Equal(t, migrationLevel, level, "migration level mismatch: found '%s' but expected '%s'", level, migrationLevel)
|
||||
})
|
||||
|
||||
t.Run("migrate from implied level 1", func(t *testing.T) {
|
||||
// Before we added the metadata table, the "implied" level 1 had only the state table
|
||||
// Create that table to simulate the old state and validate the migration
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
defer cancel()
|
||||
_, err := dbClient.ExecContext(
|
||||
ctx,
|
||||
`CREATE TABLE pre_state (
|
||||
key TEXT NOT NULL PRIMARY KEY,
|
||||
value TEXT NOT NULL,
|
||||
is_binary BOOLEAN NOT NULL,
|
||||
etag TEXT NOT NULL,
|
||||
expiration_time TIMESTAMP DEFAULT NULL,
|
||||
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
)`,
|
||||
)
|
||||
require.NoError(t, err, "failed to create initial migration state")
|
||||
|
||||
storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore)
|
||||
md.Properties[keyTableName] = "pre_state"
|
||||
md.Properties[keyMetadataTableName] = "pre_metadata"
|
||||
|
||||
// Init and perform the migrations
|
||||
err = storeObj.Init(md)
|
||||
require.NoError(t, err, "failed to init")
|
||||
defer storeObj.Close()
|
||||
|
||||
// We should have the metadata table created
|
||||
err = tableExists(dbClient, "pre_metadata")
|
||||
assert.NoError(t, err, "metadata table does not exist")
|
||||
|
||||
// Ensure migration level is correct
|
||||
level, err := getMigrationLevel(dbClient, "pre_metadata")
|
||||
assert.NoError(t, err, "failed to get migration level")
|
||||
assert.Equal(t, migrationLevel, level, "migration level mismatch: found '%s' but expected '%s'", level, migrationLevel)
|
||||
})
|
||||
|
||||
t.Run("initialize components concurrently", func(t *testing.T) {
|
||||
// Initializes 3 components concurrently using the same table names, and ensure that they perform migrations without conflicts and race conditions
|
||||
md.Properties[keyTableName] = "mystate"
|
||||
md.Properties[keyMetadataTableName] = "mymetadata"
|
||||
|
||||
errs := make(chan error, 3)
|
||||
hasLogs := atomic.Int32{}
|
||||
for i := 0; i < 3; i++ {
|
||||
go func(i int) {
|
||||
buf := &bytes.Buffer{}
|
||||
l := logger.NewLogger("multi-init-" + strconv.Itoa(i))
|
||||
l.SetOutput(io.MultiWriter(buf, os.Stdout))
|
||||
|
||||
// Init and perform the migrations
|
||||
storeObj := state_sqlite.NewSQLiteStateStore(l).(*state_sqlite.SQLiteStore)
|
||||
err := storeObj.Init(md)
|
||||
if err != nil {
|
||||
errs <- fmt.Errorf("%d failed to init: %w", i, err)
|
||||
return
|
||||
}
|
||||
|
||||
// One and only one of the loggers should have any message indicating "Performing migration 1"
|
||||
if strings.Contains(buf.String(), "Performing migration 1") {
|
||||
hasLogs.Add(1)
|
||||
}
|
||||
|
||||
// Close the component right away
|
||||
err = storeObj.Close()
|
||||
if err != nil {
|
||||
errs <- fmt.Errorf("%d failed to close: %w", i, err)
|
||||
return
|
||||
}
|
||||
|
||||
errs <- nil
|
||||
}(i)
|
||||
}
|
||||
|
||||
failed := false
|
||||
for i := 0; i < 3; i++ {
|
||||
select {
|
||||
case err := <-errs:
|
||||
failed = failed || !assert.NoError(t, err)
|
||||
case <-time.After(time.Minute):
|
||||
t.Fatal("timed out waiting for components to initialize")
|
||||
}
|
||||
}
|
||||
if failed {
|
||||
// Short-circuit
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
// Exactly one component should have written logs (which means generated any activity during migrations)
|
||||
assert.Equal(t, int32(1), hasLogs.Load(), "expected 1 component to log anything to indicate migration activity, but got %d", hasLogs.Load())
|
||||
|
||||
// We should have the tables correctly created
|
||||
err = tableExists(dbClient, "mystate")
|
||||
assert.NoError(t, err, "state table does not exist")
|
||||
err = tableExists(dbClient, "mymetadata")
|
||||
assert.NoError(t, err, "metadata table does not exist")
|
||||
|
||||
// Ensure migration level is correct
|
||||
level, err := getMigrationLevel(dbClient, "mymetadata")
|
||||
assert.NoError(t, err, "failed to get migration level")
|
||||
assert.Equal(t, migrationLevel, level, "migration level mismatch: found '%s' but expected '%s'", level, migrationLevel)
|
||||
})
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// This makes it possible to comment-out individual tests
|
||||
_ = basicTest
|
||||
_ = verifySQLInjectionTest
|
||||
_ = readonlyTest
|
||||
_ = ttlTest
|
||||
_ = initTest
|
||||
|
||||
flow.New(t, "Run tests").
|
||||
// Start the sidecar with the in-memory database
|
||||
Step(sidecar.Run("sqlite-memory",
|
||||
|
|
@ -377,7 +565,10 @@ func TestSQLite(t *testing.T) {
|
|||
Step("confirm read-only test", readonlyConfirmTest).
|
||||
|
||||
// Run TTL tests
|
||||
Step("run TTL test", ttlTest).
|
||||
//Step("run TTL test", ttlTest).
|
||||
|
||||
// Run init and migrations tests
|
||||
Step("run init and migrations test", initTest).
|
||||
|
||||
// Start tests
|
||||
Run()
|
||||
|
|
@ -453,6 +644,40 @@ func getValueFromMetadataTable(ctx context.Context, dbClient *sql.DB, table, key
|
|||
return
|
||||
}
|
||||
|
||||
func tableExists(dbClient *sql.DB, tableName string) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var exists string
|
||||
// Returns 1 or 0 as a string if the table exists or not.
|
||||
const q = `SELECT EXISTS (
|
||||
SELECT name FROM sqlite_master WHERE type='table' AND name = ?
|
||||
) AS 'exists'`
|
||||
err := dbClient.QueryRowContext(ctx, q, tableName).
|
||||
Scan(&exists)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if exists != "1" {
|
||||
return errors.New("table not found")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getMigrationLevel(dbClient *sql.DB, metadataTable string) (level string, err error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err = dbClient.
|
||||
QueryRowContext(ctx, fmt.Sprintf(`SELECT value FROM %s WHERE key = 'migrations'`, metadataTable)).
|
||||
Scan(&level)
|
||||
if err != nil && errors.Is(err, sql.ErrNoRows) {
|
||||
err = nil
|
||||
level = ""
|
||||
}
|
||||
return level, err
|
||||
}
|
||||
|
||||
// Calculates the SHA-256 hash of a file
|
||||
func hashFile(path string) (string, error) {
|
||||
f, err := os.Open(path)
|
||||
|
|
|
|||
Loading…
Reference in New Issue