/* Copyright 2023 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package main import ( "bytes" "context" "crypto/sha256" "database/sql" "encoding/hex" "errors" "fmt" "io" "os" "path/filepath" "strconv" "strings" "sync" "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" // Blank import for the underlying SQLite Driver. _ "modernc.org/sqlite" "github.com/dapr/components-contrib/metadata" "github.com/dapr/components-contrib/tests/certification/embedded" "github.com/dapr/components-contrib/tests/certification/flow" "github.com/dapr/components-contrib/tests/certification/flow/sidecar" "github.com/dapr/components-contrib/state" state_sqlite "github.com/dapr/components-contrib/state/sqlite" state_loader "github.com/dapr/dapr/pkg/components/state" "github.com/dapr/dapr/pkg/runtime" "github.com/dapr/go-sdk/client" "github.com/dapr/kit/logger" ) const ( stateStoreName = "statestore" certificationTestPrefix = "stable-certification-" portOffset = 2 readonlyDBPath = "artifacts/readonly.db" keyConnectionString = "connectionString" keyTableName = "tableName" keyMetadataTableName = "metadataTableName" keyCleanupInterval = "cleanupInterval" keyBusyTimeout = "busyTimeout" // Update this constant if you add more migrations migrationLevel = "1" ) func TestSQLite(t *testing.T) { log := logger.NewLogger("dapr.components") stateStore := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore) stateRegistry := state_loader.NewRegistry() stateRegistry.Logger = log stateRegistry.RegisterComponent(func(l logger.Logger) state.Store { return stateStore }, "sqlite") // Compute the hash of the read-only DB readonlyDBHash, err := hashFile(readonlyDBPath) require.NoError(t, err) // Basic test validating CRUD operations basicTest := func(port int) func(ctx flow.Context) error { return func(ctx flow.Context) error { ctx.T.Run("basic test", func(t *testing.T) { client, err := client.NewClientWithPort(strconv.Itoa(port)) require.NoError(t, err) defer client.Close() // save state err = client.SaveState(ctx, stateStoreName, "key1", []byte("la nebbia agli irti colli piovigginando sale"), nil) require.NoError(t, err) // get state item, err := client.GetState(ctx, stateStoreName, "key1", nil) require.NoError(t, err) assert.Equal(t, "la nebbia agli irti colli piovigginando sale", string(item.Value)) // update state errUpdate := client.SaveState(ctx, stateStoreName, "key1", []byte("e sotto il maestrale urla e biancheggia il mar"), nil) require.NoError(t, errUpdate) item, errUpdatedGet := client.GetState(ctx, stateStoreName, "key1", nil) require.NoError(t, errUpdatedGet) assert.Equal(t, "e sotto il maestrale urla e biancheggia il mar", string(item.Value)) // delete state err = client.DeleteState(ctx, stateStoreName, "key1", nil) require.NoError(t, err) }) return nil } } // checks the state store component is not vulnerable to SQL injection verifySQLInjectionTest := func(port int) func(ctx flow.Context) error { return func(ctx flow.Context) error { ctx.T.Run("sql injection test", func(t *testing.T) { client, err := client.NewClientWithPort(strconv.Itoa(port)) require.NoError(t, err) defer client.Close() // common SQL injection techniques for PostgreSQL sqlInjectionAttempts := []string{ "DROP TABLE dapr_user", "dapr' OR '1'='1", } for _, sqlInjectionAttempt := range sqlInjectionAttempts { // save state with sqlInjectionAttempt's value as key, default options: strong, last-write err = client.SaveState(ctx, stateStoreName, sqlInjectionAttempt, []byte(sqlInjectionAttempt), nil) assert.NoError(t, err) // get state for key sqlInjectionAttempt's value item, err := client.GetState(ctx, stateStoreName, sqlInjectionAttempt, nil) assert.NoError(t, err) assert.Equal(t, sqlInjectionAttempt, string(item.Value)) // delete state for key sqlInjectionAttempt's value err = client.DeleteState(ctx, stateStoreName, sqlInjectionAttempt, nil) assert.NoError(t, err) } }) return nil } } // Checks that the read-only database cannot be written to readonlyTest := func(port int) func(ctx flow.Context) error { return func(ctx flow.Context) error { ctx.T.Run("read-only test", func(t *testing.T) { client, err := client.NewClientWithPort(strconv.Itoa(port)) require.NoError(t, err) defer client.Close() // Retrieving state should work item, err := client.GetState(ctx, stateStoreName, "my_string", nil) require.NoError(t, err) assert.Equal(t, `"hello world"`, string(item.Value)) // Saving state should fail err = client.SaveState(ctx, stateStoreName, "my_string", []byte("updated!"), nil) require.Error(t, err) assert.ErrorContains(t, err, "attempt to write a readonly database") // Value should not be updated item, err = client.GetState(ctx, stateStoreName, "my_string", nil) require.NoError(t, err) assert.Equal(t, `"hello world"`, string(item.Value)) // Deleting state should fail err = client.DeleteState(ctx, stateStoreName, "my_string", nil) require.Error(t, err) assert.ErrorContains(t, err, "attempt to write a readonly database") }) return nil } } // Checks the hash of the readonly DB (after the sidecar has been stopped) to confirm it wasn't modified readonlyConfirmTest := func(ctx flow.Context) error { ctx.T.Run("confirm read-only test", func(t *testing.T) { newHash, err := hashFile(readonlyDBPath) require.NoError(t, err) assert.Equal(t, readonlyDBHash, newHash, "read-only datbaase has been modified on disk") }) return nil } // Validates TTLs and garbage collections ttlTest := func(ctx flow.Context) error { md := state.Metadata{ Base: metadata.Base{ Name: "ttltest", Properties: map[string]string{ keyConnectionString: "file::memory:", keyTableName: "ttl_state", }, }, } ctx.T.Run("parse cleanupIntervalInSeconds", func(t *testing.T) { t.Run("default value", func(t *testing.T) { // Default value is disabled delete(md.Properties, keyCleanupInterval) storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore) err := storeObj.Init(context.Background(), md) require.NoError(t, err, "failed to init") defer storeObj.Close() dbAccess := storeObj.GetDBAccess() require.NotNil(t, dbAccess) cleanupInterval := dbAccess.GetCleanupInterval() assert.Equal(t, time.Duration(0), cleanupInterval) }) t.Run("positive value", func(t *testing.T) { md.Properties[keyCleanupInterval] = "10s" storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore) err := storeObj.Init(context.Background(), md) require.NoError(t, err, "failed to init") defer storeObj.Close() dbAccess := storeObj.GetDBAccess() require.NotNil(t, dbAccess) cleanupInterval := dbAccess.GetCleanupInterval() assert.Equal(t, time.Duration(10*time.Second), cleanupInterval) }) t.Run("disabled", func(t *testing.T) { // A value of 0 means that the cleanup is disabled md.Properties[keyCleanupInterval] = "0" storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore) err := storeObj.Init(context.Background(), md) require.NoError(t, err, "failed to init") defer storeObj.Close() dbAccess := storeObj.GetDBAccess() require.NotNil(t, dbAccess) cleanupInterval := dbAccess.GetCleanupInterval() assert.Equal(t, time.Duration(0), cleanupInterval) }) }) ctx.T.Run("cleanup", func(t *testing.T) { md := state.Metadata{ Base: metadata.Base{ Name: "ttltest", Properties: map[string]string{ keyConnectionString: "file::memory:", keyTableName: "ttl_state", keyMetadataTableName: "ttl_metadata", }, }, } t.Run("automatically delete expired records", func(t *testing.T) { // Run every second md.Properties[keyCleanupInterval] = "1s" storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore) err := storeObj.Init(context.Background(), md) require.NoError(t, err, "failed to init") defer storeObj.Close() dbClient := storeObj.GetDBAccess().GetConnection() require.NotNil(t, dbClient) // Seed the database with some records err = populateTTLRecords(ctx, dbClient) require.NoError(t, err, "failed to seed records") // Wait 2 seconds then verify we have only 10 rows left time.Sleep(2 * time.Second) count, err := countRowsInTable(ctx, dbClient, "ttl_state") assert.NoError(t, err, "failed to run query to count rows") assert.Equal(t, 10, count) // The "last-cleanup" value should be <= 1 second (+ a bit of buffer) lastCleanup, err := loadLastCleanupInterval(ctx, dbClient, "ttl_metadata") require.NoError(t, err, "failed to load value for 'last-cleanup'") assert.LessOrEqual(t, lastCleanup, int64(1200)) // Wait 6 more seconds and verify there are no more rows left time.Sleep(6 * time.Second) count, err = countRowsInTable(ctx, dbClient, "ttl_state") require.NoError(t, err, "failed to run query to count rows") assert.Equal(t, 0, count) // The "last-cleanup" value should be <= 1 second (+ a bit of buffer) lastCleanup, err = loadLastCleanupInterval(ctx, dbClient, "ttl_metadata") require.NoError(t, err, "failed to load value for 'last-cleanup'") assert.LessOrEqual(t, lastCleanup, int64(1200)) }) t.Run("cleanup concurrency", func(t *testing.T) { // Set to run every hour // (we'll manually trigger more frequent iterations) md.Properties[keyCleanupInterval] = "1h" storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore) err := storeObj.Init(context.Background(), md) require.NoError(t, err, "failed to init") defer storeObj.Close() dbAccess := storeObj.GetDBAccess() require.NotNil(t, dbAccess) dbClient := dbAccess.GetConnection() // Seed the database with some records err = populateTTLRecords(ctx, dbClient) require.NoError(t, err, "failed to seed records") // Validate that 20 records are present count, err := countRowsInTable(ctx, dbClient, "ttl_state") require.NoError(t, err, "failed to run query to count rows") assert.Equal(t, 20, count) // Set last-cleanup to 1s ago (less than 3600s) err = setValueInMetadataTable(ctx, dbClient, "ttl_metadata", "'last-cleanup'", "datetime('now', '-1 second')") require.NoError(t, err, "failed to set last-cleanup") // The "last-cleanup" value should be ~1 second (+ a bit of buffer) lastCleanup, err := loadLastCleanupInterval(ctx, dbClient, "ttl_metadata") require.NoError(t, err, "failed to load value for 'last-cleanup'") assert.LessOrEqual(t, lastCleanup, int64(1200)) lastCleanupValueOrig, err := getValueFromMetadataTable(ctx, dbClient, "ttl_metadata", "last-cleanup") require.NoError(t, err, "failed to load absolute value for 'last-cleanup'") require.NotEmpty(t, lastCleanupValueOrig) // Trigger the background cleanup, which should do nothing because the last cleanup was < 3600s err = dbAccess.CleanupExpired() require.NoError(t, err, "CleanupExpired returned an error") // Validate that 20 records are still present count, err = countRowsInTable(ctx, dbClient, "ttl_state") require.NoError(t, err, "failed to run query to count rows") assert.Equal(t, 20, count) // The "last-cleanup" value should not have been changed lastCleanupValue, err := getValueFromMetadataTable(ctx, dbClient, "ttl_metadata", "last-cleanup") require.NoError(t, err, "failed to load absolute value for 'last-cleanup'") assert.Equal(t, lastCleanupValueOrig, lastCleanupValue) }) }) return nil } // Tests the "Init" method and the database migrations // It also tests the metadata properties "tableName" and "metadataTableName" initTest := func(ctx flow.Context) error { ctx.T.Run("init and migrations", func(t *testing.T) { // Create a temporary database and create a connection to it tmpDir := t.TempDir() dbPath := filepath.Join(tmpDir, "init.db") dbClient, err := sql.Open("sqlite", "file:"+dbPath+"?_pragma=busy_timeout%282000%29&_pragma=journal_mode%28WAL%29&_txlock=immediate") require.NoError(t, err) md := state.Metadata{ Base: metadata.Base{ Name: "inittest", Properties: map[string]string{ keyConnectionString: dbPath, }, }, } t.Run("initial state clean", func(t *testing.T) { storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore) md.Properties[keyTableName] = "clean_state" md.Properties[keyMetadataTableName] = "clean_metadata" // Init and perform the migrations err := storeObj.Init(context.Background(), md) require.NoError(t, err, "failed to init") defer storeObj.Close() // We should have the tables correctly created err = tableExists(dbClient, "clean_state") assert.NoError(t, err, "state table does not exist") err = tableExists(dbClient, "clean_metadata") assert.NoError(t, err, "metadata table does not exist") // Ensure migration level is correct level, err := getMigrationLevel(dbClient, "clean_metadata") assert.NoError(t, err, "failed to get migration level") assert.Equal(t, migrationLevel, level, "migration level mismatch: found '%s' but expected '%s'", level, migrationLevel) }) t.Run("all migrations performed", func(t *testing.T) { // Re-use "clean_state" and "clean_metadata" storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore) md.Properties[keyTableName] = "clean_state" md.Properties[keyMetadataTableName] = "clean_metadata" // Should already have migration level 2 level, err := getMigrationLevel(dbClient, "clean_metadata") assert.NoError(t, err, "failed to get migration level") assert.Equal(t, migrationLevel, level, "migration level mismatch: found '%s' but expected '%s'", level, migrationLevel) // Init and perform the migrations err = storeObj.Init(context.Background(), md) require.NoError(t, err, "failed to init") defer storeObj.Close() // Ensure migration level is correct level, err = getMigrationLevel(dbClient, "clean_metadata") assert.NoError(t, err, "failed to get migration level") assert.Equal(t, migrationLevel, level, "migration level mismatch: found '%s' but expected '%s'", level, migrationLevel) }) t.Run("migrate from implied level 1", func(t *testing.T) { // Before we added the metadata table, the "implied" level 1 had only the state table // Create that table to simulate the old state and validate the migration ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() _, err := dbClient.ExecContext( ctx, `CREATE TABLE pre_state ( key TEXT NOT NULL PRIMARY KEY, value TEXT NOT NULL, is_binary BOOLEAN NOT NULL, etag TEXT NOT NULL, expiration_time TIMESTAMP DEFAULT NULL, update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP )`, ) require.NoError(t, err, "failed to create initial migration state") storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore) md.Properties[keyTableName] = "pre_state" md.Properties[keyMetadataTableName] = "pre_metadata" // Init and perform the migrations err = storeObj.Init(context.Background(), md) require.NoError(t, err, "failed to init") defer storeObj.Close() // We should have the metadata table created err = tableExists(dbClient, "pre_metadata") assert.NoError(t, err, "metadata table does not exist") // Ensure migration level is correct level, err := getMigrationLevel(dbClient, "pre_metadata") assert.NoError(t, err, "failed to get migration level") assert.Equal(t, migrationLevel, level, "migration level mismatch: found '%s' but expected '%s'", level, migrationLevel) }) t.Run("initialize components concurrently", func(t *testing.T) { // Initializes 3 components concurrently using the same table names, and ensure that they perform migrations without conflicts and race conditions md.Properties[keyTableName] = "mystate" md.Properties[keyMetadataTableName] = "mymetadata" errs := make(chan error, 3) hasLogs := atomic.Int32{} for i := 0; i < 3; i++ { go func(i int) { buf := &bytes.Buffer{} l := logger.NewLogger("multi-init-" + strconv.Itoa(i)) l.SetOutput(io.MultiWriter(buf, os.Stdout)) // Init and perform the migrations storeObj := state_sqlite.NewSQLiteStateStore(l).(*state_sqlite.SQLiteStore) err := storeObj.Init(context.Background(), md) if err != nil { errs <- fmt.Errorf("%d failed to init: %w", i, err) return } // One and only one of the loggers should have any message indicating "Performing migration 1" if strings.Contains(buf.String(), "Performing migration 1") { hasLogs.Add(1) } // Close the component right away err = storeObj.Close() if err != nil { errs <- fmt.Errorf("%d failed to close: %w", i, err) return } errs <- nil }(i) } failed := false for i := 0; i < 3; i++ { select { case err := <-errs: failed = failed || !assert.NoError(t, err) case <-time.After(time.Minute): t.Fatal("timed out waiting for components to initialize") } } if failed { // Short-circuit t.FailNow() } // Exactly one component should have written logs (which means generated any activity during migrations) assert.Equal(t, int32(1), hasLogs.Load(), "expected 1 component to log anything to indicate migration activity, but got %d", hasLogs.Load()) // We should have the tables correctly created err = tableExists(dbClient, "mystate") assert.NoError(t, err, "state table does not exist") err = tableExists(dbClient, "mymetadata") assert.NoError(t, err, "metadata table does not exist") // Ensure migration level is correct level, err := getMigrationLevel(dbClient, "mymetadata") assert.NoError(t, err, "failed to get migration level") assert.Equal(t, migrationLevel, level, "migration level mismatch: found '%s' but expected '%s'", level, migrationLevel) }) }) return nil } // Tests many concurrent operations to ensure the database can operate successfully concurrencyTest := func(ctx flow.Context) error { ctx.T.Run("concurrency", func(t *testing.T) { // Create a temporary database tmpDir := t.TempDir() dbPath := filepath.Join(tmpDir, "init.db") md := state.Metadata{ Base: metadata.Base{ Name: "inittest", Properties: map[string]string{ keyConnectionString: dbPath, // Connect with a higher busy timeout keyBusyTimeout: "10s", }, }, } // Maintain all store objects var storeObjs []*state_sqlite.SQLiteStore newStoreObj := func() error { storeObj := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore) err := storeObj.Init(context.Background(), md) if err != nil { return err } storeObjs = append(storeObjs, storeObj) return nil } defer func() { for _, storeObj := range storeObjs { _ = storeObj.Close() } }() testMultipleKeys := func(t *testing.T) { const parallel = 10 const runs = 30 ctx := context.Background() wg := sync.WaitGroup{} wg.Add(parallel) for i := 0; i < parallel; i++ { go func(i int) { defer wg.Done() var ( res *state.GetResponse err error ) storeObj := storeObjs[i%len(storeObjs)] key := fmt.Sprintf("multiple_%d", i) for j := 0; j < runs; j++ { // Save state err = storeObj.Set(ctx, &state.SetRequest{ Key: key, Value: j, }) assert.NoError(t, err) // Retrieve state res, err = storeObj.Get(ctx, &state.GetRequest{ Key: key, }) assert.NoError(t, err) assert.Equal(t, strconv.Itoa(j), string(res.Data)) } }(i) } wg.Wait() } testSameKey := func(t *testing.T) { const parallel = 10 const runs = 30 ctx := context.Background() // We have as many counters as we have number of parallel workers. The final value will be one of them. // This is because we have a race condition between the time we call `save := counter.Add(1)` and when we set the value, causing tests to be flaky since the value stored in the database may not be the one stored in counter. // This is not a bug in the SQLite component, as it's working as intended; the race condition is on the tests themselves. // The real solution to the race condition (and what one should do in a real-world app) would be to wrap the call to increment counter and the store operation in a mutex. However, that would make it so only one query is hitting the database at the same time, while here we're precisely testing how the component handles multiple writes at the same time. wg := sync.WaitGroup{} wg.Add(parallel) counters := [parallel]atomic.Int32{} key := "same" for i := 0; i < parallel; i++ { go func(i int) { defer wg.Done() var err error storeObj := storeObjs[i%len(storeObjs)] for j := 0; j < runs; j++ { save := counters[i].Add(1) // Save state err = storeObj.Set(ctx, &state.SetRequest{ Key: key, Value: int(save), }) assert.NoError(t, err) } }(i) } wg.Wait() // Retrieve state res, err := storeObjs[0].Get(ctx, &state.GetRequest{ Key: key, }) assert.NoError(t, err) expect := [parallel]string{} for i := 0; i < parallel; i++ { expect[i] = strconv.Itoa(int(counters[i].Load())) } assert.Contains(t, expect, string(res.Data)) } // Init one store object require.NoError(t, newStoreObj(), "failed to init") t.Run("same connection, multiple keys", testMultipleKeys) t.Run("same connection, same key", testSameKey) // Init two more store objects require.NoError(t, newStoreObj(), "failed to init") require.NoError(t, newStoreObj(), "failed to init") t.Run("multiple connections, multiple keys", testMultipleKeys) t.Run("multiple connections, same key", testSameKey) }) return nil } // This makes it possible to comment-out individual tests _ = basicTest _ = verifySQLInjectionTest _ = readonlyTest _ = ttlTest _ = initTest _ = concurrencyTest flow.New(t, "Run tests"). // Start the sidecar with the in-memory database Step(sidecar.Run("sqlite-memory", embedded.WithoutApp(), embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)), embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)), embedded.WithProfilingEnabled(false), embedded.WithResourcesPath("resources/memory"), embedded.WithStates(stateRegistry), )). // Run some basic certification tests with the in-memory database Step("run basic test", basicTest(runtime.DefaultDaprAPIGRPCPort)). Step("run SQL injection test", verifySQLInjectionTest(runtime.DefaultDaprAPIGRPCPort)). Step("stop app", sidecar.Stop("sqlite-memory")). // Start the sidecar with a read-only database Step(sidecar.Run("sqlite-readonly", embedded.WithoutApp(), embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset)), embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset)), embedded.WithProfilingEnabled(false), embedded.WithResourcesPath("resources/readonly"), embedded.WithStates(stateRegistry), )). Step("run read-only test", readonlyTest(runtime.DefaultDaprAPIGRPCPort+portOffset)). Step("stop sqlite-readonly sidecar", sidecar.Stop("sqlite-readonly")). Step("confirm read-only test", readonlyConfirmTest). // Run TTL tests Step("run TTL test", ttlTest). // Run init and migrations tests Step("run init and migrations test", initTest). // Concurrency test Step("run concurrency test", concurrencyTest). // Start tests Run() } func populateTTLRecords(ctx context.Context, dbClient *sql.DB) error { // Insert 10 records that have expired, and 10 that will expire in 4 seconds // Note this uses fmt.Sprintf and not parametrized queries-on purpose, so we can pass multiple rows in the same INSERT query // Normally this would be a very bad idea, just don't do it outside of tests (and maybe not even in tests like I'm doing right now)... rows := make([]string, 20) for i := 0; i < 10; i++ { rows[i] = fmt.Sprintf(`("expired_%d", '"value_%d"', false, "etag", datetime('now', '-1 minute'))`, i, i) } for i := 0; i < 10; i++ { rows[i+10] = fmt.Sprintf(`("notexpired_%d", '"value_%d"', false, "etag", datetime('now', '+4 seconds'))`, i, i) } queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() q := "INSERT INTO ttl_state (key, value, is_binary, etag, expiration_time) VALUES " + strings.Join(rows, ", ") res, err := dbClient.ExecContext(queryCtx, q) if err != nil { return err } n, _ := res.RowsAffected() if n != 20 { return fmt.Errorf("expected to insert 20 rows, but only got %d", n) } return nil } func countRowsInTable(ctx context.Context, dbClient *sql.DB, table string) (count int, err error) { queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second) err = dbClient.QueryRowContext(queryCtx, "SELECT COUNT(key) FROM "+table).Scan(&count) cancel() return } func loadLastCleanupInterval(ctx context.Context, dbClient *sql.DB, table string) (lastCleanup int64, err error) { queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second) err = dbClient. QueryRowContext(queryCtx, fmt.Sprintf("SELECT unixepoch(CURRENT_TIMESTAMP) - unixepoch(value) FROM %s WHERE key = 'last-cleanup'", table), ). Scan(&lastCleanup) cancel() if errors.Is(err, sql.ErrNoRows) { err = nil } return } // Note this uses fmt.Sprintf and not parametrized queries-on purpose, so we can pass SQLite functions. // Normally this would be a very bad idea, just don't do it... (do as I say don't do as I do :) ). func setValueInMetadataTable(ctx context.Context, dbClient *sql.DB, table, key, value string) error { queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) _, err := dbClient.ExecContext(queryCtx, //nolint:gosec fmt.Sprintf(`REPLACE INTO %s (key, value) VALUES (%s, %s)`, table, key, value), ) cancel() return err } func getValueFromMetadataTable(ctx context.Context, dbClient *sql.DB, table, key string) (value string, err error) { queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second) err = dbClient. QueryRowContext(queryCtx, fmt.Sprintf("SELECT value FROM %s WHERE key = ?", table), key). Scan(&value) cancel() if errors.Is(err, sql.ErrNoRows) { err = nil } return } func tableExists(dbClient *sql.DB, tableName string) error { ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() var exists string // Returns 1 or 0 as a string if the table exists or not. const q = `SELECT EXISTS ( SELECT name FROM sqlite_master WHERE type='table' AND name = ? ) AS 'exists'` err := dbClient.QueryRowContext(ctx, q, tableName). Scan(&exists) if err != nil { return err } if exists != "1" { return errors.New("table not found") } return nil } func getMigrationLevel(dbClient *sql.DB, metadataTable string) (level string, err error) { ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() err = dbClient. QueryRowContext(ctx, fmt.Sprintf(`SELECT value FROM %s WHERE key = 'migrations'`, metadataTable)). Scan(&level) if err != nil && errors.Is(err, sql.ErrNoRows) { err = nil level = "" } return level, err } // Calculates the SHA-256 hash of a file func hashFile(path string) (string, error) { f, err := os.Open(path) if err != nil { return "", err } defer f.Close() h := sha256.New() _, err = io.Copy(h, f) if err != nil { return "", err } return hex.EncodeToString(h.Sum(nil)), nil }