/* Copyright 2021 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package sqlserver import ( "context" "crypto/rand" "database/sql" "encoding/hex" "encoding/json" "fmt" "io" "math" "os" "strconv" "sync" "sync/atomic" "testing" "time" uuid "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/dapr/components-contrib/metadata" "github.com/dapr/components-contrib/state" "github.com/dapr/kit/logger" ) const ( // connectionStringEnvKey defines the key containing the integration test connection string // To use docker, server=localhost;user id=sa;password=Pass@Word1;port=1433; // To use Azure SQL, server=.database.windows.net;user id=;port=1433;password=;database=dapr_test;. connectionStringEnvKey = "DAPR_TEST_SQL_CONNSTRING" usersTableName = "Users" beverageTea = "tea" invalidEtag = "FFFFFFFFFFFFFFFF" ) type user struct { ID string Name string FavoriteBeverage string } type userWithPets struct { user PetsCount int } type userWithEtag struct { user etag string } func TestIntegrationCases(t *testing.T) { connectionString := os.Getenv(connectionStringEnvKey) if connectionString == "" { t.Skipf(`SQLServer state integration tests skipped. To enable this test, define the connection string using environment variable '%[1]s' (example 'export %[1]s="server=localhost;user id=sa;password=Pass@Word1;port=1433;")'`, connectionStringEnvKey) } t.Run("Single operations", testSingleOperations) t.Run("Set New Record With Invalid Etag Should Fail", testSetNewRecordWithInvalidEtagShouldFail) t.Run("Indexed Properties", testIndexedProperties) t.Run("Multi operations", testMultiOperations) t.Run("Insert and Update Set Record Dates", testInsertAndUpdateSetRecordDates) t.Run("Multiple initializations", testMultipleInitializations) // Run concurrent set tests 10 times const executions = 10 for i := 0; i < executions; i++ { t.Run(fmt.Sprintf("Concurrent sets, try #%d", i+1), testConcurrentSets) } } func getUniqueDBSchema(t *testing.T) string { b := make([]byte, 4) _, err := io.ReadFull(rand.Reader, b) require.NoError(t, err) return fmt.Sprintf("v%s", hex.EncodeToString(b)) } func createMetadata(schema string, kt KeyType, indexedProperties string) state.Metadata { metadata := state.Metadata{Base: metadata.Base{ Properties: map[string]string{ "connectionString": os.Getenv(connectionStringEnvKey), "schema": schema, "tableName": usersTableName, "keyType": string(kt), "databaseName": "dapr_test", }, }} if indexedProperties != "" { metadata.Properties["indexedProperties"] = indexedProperties } return metadata } // Ensure the database is running // For docker, use: docker run --name sqlserver -e "ACCEPT_EULA=Y" -e "SA_PASSWORD=Pass@Word1" -p 1433:1433 -d mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04. func getTestStore(t *testing.T, indexedProperties string) *SQLServer { return getTestStoreWithKeyType(t, StringKeyType, indexedProperties) } func getTestStoreWithKeyType(t *testing.T, kt KeyType, indexedProperties string) *SQLServer { schema := getUniqueDBSchema(t) metadata := createMetadata(schema, kt, indexedProperties) store := &SQLServer{ logger: logger.NewLogger("test"), migratorFactory: newMigration, } store.BulkStore = state.NewDefaultBulkStore(store) err := store.Init(context.Background(), metadata) require.NoError(t, err) return store } func assertUserExists(t *testing.T, store *SQLServer, key string) (user, string) { getRes, err := store.Get(context.Background(), &state.GetRequest{Key: key}) require.NoError(t, err) assert.NotNil(t, getRes) assert.NotNil(t, getRes.Data, "No data was returned") require.NotNil(t, getRes.ETag) var loaded user err = json.Unmarshal(getRes.Data, &loaded) require.NoError(t, err) return loaded, *getRes.ETag } func assertLoadedUserIsEqual(t *testing.T, store *SQLServer, key string, expected user) (user, string) { loaded, etag := assertUserExists(t, store, key) assert.Equal(t, expected.ID, loaded.ID) assert.Equal(t, expected.Name, loaded.Name) assert.Equal(t, expected.FavoriteBeverage, loaded.FavoriteBeverage) return loaded, etag } func assertUserDoesNotExist(t *testing.T, store *SQLServer, key string) { _, err := store.Get(context.Background(), &state.GetRequest{Key: key}) require.NoError(t, err) } func assertDBQuery(t *testing.T, store *SQLServer, query string, assertReader func(t *testing.T, rows *sql.Rows)) { rows, err := store.db.Query(query) require.NoError(t, err) require.NoError(t, rows.Err()) defer rows.Close() assertReader(t, rows) } /* #nosec. */ func assertUserCountIsEqualTo(t *testing.T, store *SQLServer, expected int) { tsql := fmt.Sprintf("SELECT count(*) FROM [%s].[%s]", store.metadata.SchemaName, store.metadata.TableName) assertDBQuery(t, store, tsql, func(t *testing.T, rows *sql.Rows) { assert.True(t, rows.Next()) var actual int err := rows.Scan(&actual) require.NoError(t, err) assert.Equal(t, expected, actual) }) } type userKeyGenerator interface { NextKey() string } type numbericKeyGenerator struct { seed int32 } func (n *numbericKeyGenerator) NextKey() string { val := atomic.AddInt32(&n.seed, 1) return strconv.Itoa(int(val)) } type uuidKeyGenerator struct{} func (n uuidKeyGenerator) NextKey() string { return uuid.New().String() } func testSingleOperations(t *testing.T) { invEtag := invalidEtag tests := []struct { name string kt KeyType keyGen userKeyGenerator }{ {"Single operation string key type", StringKeyType, &numbericKeyGenerator{}}, {"Single operation integer key type", IntegerKeyType, &numbericKeyGenerator{}}, {"Single operation uuid key type", UUIDKeyType, &uuidKeyGenerator{}}, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { store := getTestStoreWithKeyType(t, test.kt, "") john := user{test.keyGen.NextKey(), "John", "Coffee"} // Get fails as the item does not exist assertUserDoesNotExist(t, store, john.ID) // Save and read err := store.Set(context.Background(), &state.SetRequest{Key: john.ID, Value: john}) require.NoError(t, err) johnV1, etagFromInsert := assertLoadedUserIsEqual(t, store, john.ID, john) // Update with ETAG waterJohn := johnV1 waterJohn.FavoriteBeverage = "Water" err = store.Set(context.Background(), &state.SetRequest{Key: waterJohn.ID, Value: waterJohn, ETag: &etagFromInsert}) require.NoError(t, err) // Get updated johnV2, _ := assertLoadedUserIsEqual(t, store, waterJohn.ID, waterJohn) // Update without ETAG noEtagJohn := johnV2 noEtagJohn.FavoriteBeverage = "No Etag John" err = store.Set(context.Background(), &state.SetRequest{Key: noEtagJohn.ID, Value: noEtagJohn}) require.NoError(t, err) // 7. Get updated johnV3, _ := assertLoadedUserIsEqual(t, store, noEtagJohn.ID, noEtagJohn) // 8. Update with invalid ETAG should fail failedJohn := johnV3 failedJohn.FavoriteBeverage = "Will not work" err = store.Set(context.Background(), &state.SetRequest{Key: failedJohn.ID, Value: failedJohn, ETag: &etagFromInsert}) require.Error(t, err) _, etag := assertLoadedUserIsEqual(t, store, johnV3.ID, johnV3) // 9. Delete with invalid ETAG should fail err = store.Delete(context.Background(), &state.DeleteRequest{Key: johnV3.ID, ETag: &invEtag}) require.Error(t, err) assertLoadedUserIsEqual(t, store, johnV3.ID, johnV3) // 10. Delete with valid ETAG err = store.Delete(context.Background(), &state.DeleteRequest{Key: johnV2.ID, ETag: &etag}) require.NoError(t, err) assertUserDoesNotExist(t, store, johnV2.ID) }) } } func testSetNewRecordWithInvalidEtagShouldFail(t *testing.T) { store := getTestStore(t, "") u := user{uuid.New().String(), "John", "Coffee"} invEtag := invalidEtag err := store.Set(context.Background(), &state.SetRequest{Key: u.ID, Value: u, ETag: &invEtag}) require.Error(t, err) } /* #nosec. */ func testIndexedProperties(t *testing.T) { store := getTestStore(t, `[{ "column":"FavoriteBeverage", "property":"FavoriteBeverage", "type":"nvarchar(100)"}, { "column":"PetsCount", "property":"PetsCount", "type": "INTEGER"}]`) err := store.BulkSet(context.Background(), []state.SetRequest{ {Key: "1", Value: userWithPets{user{"1", "John", "Coffee"}, 3}}, {Key: "2", Value: userWithPets{user{"2", "Laura", "Water"}, 1}}, {Key: "3", Value: userWithPets{user{"3", "Carl", "Beer"}, 0}}, {Key: "4", Value: userWithPets{user{"4", "Maria", "Wine"}, 100}}, }, state.BulkStoreOpts{}) require.NoError(t, err) // Check the database for computed columns assertDBQuery(t, store, fmt.Sprintf("SELECT count(*) from [%s].[%s] WHERE PetsCount < 3", store.metadata.SchemaName, usersTableName), func(t *testing.T, rows *sql.Rows) { assert.True(t, rows.Next()) var c int rows.Scan(&c) assert.Equal(t, 2, c) }) // Ensure we can get by beverage assertDBQuery(t, store, fmt.Sprintf("SELECT count(*) from [%s].[%s] WHERE FavoriteBeverage = '%s'", store.metadata.SchemaName, usersTableName, "Coffee"), func(t *testing.T, rows *sql.Rows) { assert.True(t, rows.Next()) var c int rows.Scan(&c) assert.Equal(t, 1, c) }) } func testMultiOperations(t *testing.T) { tests := []struct { name string kt KeyType keyGen userKeyGenerator }{ {"Multi operations string key type", StringKeyType, &numbericKeyGenerator{}}, {"Multi operations integer key type", IntegerKeyType, &numbericKeyGenerator{}}, {"Multi operations uuid key type", UUIDKeyType, &uuidKeyGenerator{}}, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { store := getTestStoreWithKeyType(t, test.kt, `[{ "column":"FavoriteBeverage", "property":"FavoriteBeverage", "type":"nvarchar(100)"}]`) keyGen := test.keyGen initialUsers := []user{ {keyGen.NextKey(), "John", "Coffee"}, {keyGen.NextKey(), "Laura", "Water"}, {keyGen.NextKey(), "Carl", "Beer"}, {keyGen.NextKey(), "Maria", "Wine"}, {keyGen.NextKey(), "Mark", "Juice"}, {keyGen.NextKey(), "Sara", "Soda"}, {keyGen.NextKey(), "Tony", "Milk"}, {keyGen.NextKey(), "Hugo", "Juice"}, } // 1. add bulk users bulkSet := make([]state.SetRequest, len(initialUsers)) for i, u := range initialUsers { bulkSet[i] = state.SetRequest{Key: u.ID, Value: u} } err := store.BulkSet(context.Background(), bulkSet, state.BulkStoreOpts{}) require.NoError(t, err) assertUserCountIsEqualTo(t, store, len(initialUsers)) // Ensure initial users are correctly stored loadedUsers := make([]userWithEtag, len(initialUsers)) for i, u := range initialUsers { loaded, etag := assertLoadedUserIsEqual(t, store, u.ID, u) loadedWithEtag := userWithEtag{loaded, etag} loadedUsers[i] = loadedWithEtag } totalUsers := len(loadedUsers) userIndex := 0 t.Run("Update and delete without etag should work", func(t *testing.T) { toDelete := loadedUsers[userIndex].user original := loadedUsers[userIndex+1] modified := original.user modified.FavoriteBeverage = beverageTea localErr := store.Multi(context.Background(), &state.TransactionalStateRequest{ Operations: []state.TransactionalStateOperation{ state.DeleteRequest{Key: toDelete.ID}, state.SetRequest{Key: modified.ID, Value: modified}, }, }) require.NoError(t, localErr) assertLoadedUserIsEqual(t, store, modified.ID, modified) assertUserDoesNotExist(t, store, toDelete.ID) totalUsers-- assertUserCountIsEqualTo(t, store, totalUsers) userIndex += 2 }) t.Run("Update, delete and insert should work", func(t *testing.T) { toDelete := loadedUsers[userIndex] toModify := loadedUsers[userIndex+1] toInsert := user{keyGen.NextKey(), "Susan", "Soda"} modified := toModify.user modified.FavoriteBeverage = beverageTea err = store.Multi(context.Background(), &state.TransactionalStateRequest{ Operations: []state.TransactionalStateOperation{ state.DeleteRequest{Key: toDelete.ID, ETag: &toDelete.etag}, state.SetRequest{Key: modified.ID, Value: modified, ETag: &toModify.etag}, state.SetRequest{Key: toInsert.ID, Value: toInsert}, }, }) require.NoError(t, err) assertLoadedUserIsEqual(t, store, modified.ID, modified) assertLoadedUserIsEqual(t, store, toInsert.ID, toInsert) assertUserDoesNotExist(t, store, toDelete.ID) // we added 1 and deleted 1, so totalUsers should have no change assertUserCountIsEqualTo(t, store, totalUsers) userIndex += 2 }) t.Run("Update and delete with etag should work", func(t *testing.T) { toDelete := loadedUsers[userIndex] toModify := loadedUsers[userIndex+1] modified := toModify.user modified.FavoriteBeverage = beverageTea err = store.Multi(context.Background(), &state.TransactionalStateRequest{ Operations: []state.TransactionalStateOperation{ state.DeleteRequest{Key: toDelete.ID, ETag: &toDelete.etag}, state.SetRequest{Key: modified.ID, Value: modified, ETag: &toModify.etag}, }, }) require.NoError(t, err) assertLoadedUserIsEqual(t, store, modified.ID, modified) assertUserDoesNotExist(t, store, toDelete.ID) totalUsers-- assertUserCountIsEqualTo(t, store, totalUsers) userIndex += 2 }) t.Run("Delete fails, should abort insert", func(t *testing.T) { toDelete := loadedUsers[userIndex] toInsert := user{keyGen.NextKey(), "Wont-be-inserted", "Beer"} invEtag := invalidEtag err = store.Multi(context.Background(), &state.TransactionalStateRequest{ Operations: []state.TransactionalStateOperation{ state.DeleteRequest{Key: toDelete.ID, ETag: &invEtag}, state.SetRequest{Key: toInsert.ID, Value: toInsert}, }, }) require.Error(t, err) assertUserDoesNotExist(t, store, toInsert.ID) assertLoadedUserIsEqual(t, store, toDelete.ID, toDelete.user) assertUserCountIsEqualTo(t, store, totalUsers) }) t.Run("Delete fails, should abort update", func(t *testing.T) { toDelete := loadedUsers[userIndex] toModify := loadedUsers[userIndex+1] modified := toModify.user modified.FavoriteBeverage = beverageTea invEtag := invalidEtag err = store.Multi(context.Background(), &state.TransactionalStateRequest{ Operations: []state.TransactionalStateOperation{ state.DeleteRequest{Key: toDelete.ID, ETag: &invEtag}, state.SetRequest{Key: modified.ID, Value: modified}, }, }) require.Error(t, err) assertLoadedUserIsEqual(t, store, toDelete.ID, toDelete.user) assertLoadedUserIsEqual(t, store, toModify.ID, toModify.user) assertUserCountIsEqualTo(t, store, totalUsers) }) t.Run("Update fails, should abort delete", func(t *testing.T) { toDelete := loadedUsers[userIndex] toModify := loadedUsers[userIndex+1] modified := toModify.user modified.FavoriteBeverage = beverageTea invEtag := invalidEtag err = store.Multi(context.Background(), &state.TransactionalStateRequest{ Operations: []state.TransactionalStateOperation{ state.DeleteRequest{Key: toDelete.ID}, state.SetRequest{Key: modified.ID, Value: modified, ETag: &invEtag}, }, }) require.Error(t, err) assertLoadedUserIsEqual(t, store, toDelete.ID, toDelete.user) assertLoadedUserIsEqual(t, store, toModify.ID, toModify.user) assertUserCountIsEqualTo(t, store, totalUsers) }) }) } } /* #nosec. */ func testInsertAndUpdateSetRecordDates(t *testing.T) { const maxDiffInMs = float64(500) store := getTestStore(t, "") u := user{"1", "John", "Coffee"} err := store.Set(context.Background(), &state.SetRequest{Key: u.ID, Value: u}) require.NoError(t, err) var originalInsertTime time.Time getUserTsql := fmt.Sprintf("SELECT [InsertDate], [UpdateDate] from [%s].[%s] WHERE [Key]='%s'", store.metadata.SchemaName, store.metadata.TableName, u.ID) assertDBQuery(t, store, getUserTsql, func(t *testing.T, rows *sql.Rows) { assert.True(t, rows.Next()) var insertDate, updateDate sql.NullTime localErr := rows.Scan(&insertDate, &updateDate) require.NoError(t, localErr) assert.True(t, insertDate.Valid) insertDiff := float64(time.Now().UTC().Sub(insertDate.Time).Milliseconds()) assert.LessOrEqual(t, math.Abs(insertDiff), maxDiffInMs) assert.False(t, updateDate.Valid) originalInsertTime = insertDate.Time }) modified := u modified.FavoriteBeverage = beverageTea err = store.Set(context.Background(), &state.SetRequest{Key: modified.ID, Value: modified}) require.NoError(t, err) assertDBQuery(t, store, getUserTsql, func(t *testing.T, rows *sql.Rows) { assert.True(t, rows.Next()) var insertDate, updateDate sql.NullTime err := rows.Scan(&insertDate, &updateDate) require.NoError(t, err) assert.True(t, insertDate.Valid) assert.Equal(t, originalInsertTime, insertDate.Time) assert.True(t, updateDate.Valid) updateDiff := float64(time.Now().UTC().Sub(updateDate.Time).Milliseconds()) assert.LessOrEqual(t, math.Abs(updateDiff), maxDiffInMs) }) } func testConcurrentSets(t *testing.T) { const parallelism = 10 store := getTestStore(t, "") u := user{"1", "John", "Coffee"} err := store.Set(context.Background(), &state.SetRequest{Key: u.ID, Value: u}) require.NoError(t, err) _, etag := assertLoadedUserIsEqual(t, store, u.ID, u) var wc sync.WaitGroup start := make(chan bool, parallelism) totalErrors := int32(0) totalSucceeds := int32(0) for i := 0; i < parallelism; i++ { wc.Add(1) go func(id, etag string, start <-chan bool, wc *sync.WaitGroup, store *SQLServer) { <-start defer wc.Done() modified := user{"1", "John", beverageTea} err := store.Set(context.Background(), &state.SetRequest{Key: id, Value: modified, ETag: &etag}) if err != nil { atomic.AddInt32(&totalErrors, 1) } else { atomic.AddInt32(&totalSucceeds, 1) } }(u.ID, etag, start, &wc, store) } close(start) wc.Wait() assert.Equal(t, int32(parallelism-1), totalErrors) assert.Equal(t, int32(1), totalSucceeds) } func testMultipleInitializations(t *testing.T) { tests := []struct { name string kt KeyType indexedProperties string }{ {"No indexed properties", StringKeyType, ""}, {"With indexed properties", StringKeyType, `[{ "column":"FavoriteBeverage", "property":"FavoriteBeverage", "type":"nvarchar(100)"}, { "column":"PetsCount", "property":"PetsCount", "type": "INTEGER"}]`}, {"No indexed properties uuid key type", UUIDKeyType, ""}, {"No indexed properties integer key type", IntegerKeyType, ""}, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { store := getTestStoreWithKeyType(t, test.kt, test.indexedProperties) store2 := &SQLServer{ logger: logger.NewLogger("test"), migratorFactory: newMigration, } store2.BulkStore = state.NewDefaultBulkStore(store2) err := store2.Init(context.Background(), createMetadata(store.metadata.SchemaName, test.kt, test.indexedProperties)) require.NoError(t, err) }) } }