components-contrib/state/sqlserver/sqlserver_integration_test.go

601 lines
19 KiB
Go

/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sqlserver
import (
"context"
"crypto/rand"
"database/sql"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math"
"os"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
uuid "github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/state"
"github.com/dapr/kit/logger"
)
const (
// connectionStringEnvKey defines the key containing the integration test connection string
// To use docker, server=localhost;user id=sa;password=Pass@Word1;port=1433;
// To use Azure SQL, server=<your-db-server-name>.database.windows.net;user id=<your-db-user>;port=1433;password=<your-password>;database=dapr_test;.
connectionStringEnvKey = "DAPR_TEST_SQL_CONNSTRING"
usersTableName = "Users"
beverageTea = "tea"
invalidEtag = "FFFFFFFFFFFFFFFF"
)
type user struct {
ID string
Name string
FavoriteBeverage string
}
type userWithPets struct {
user
PetsCount int
}
type userWithEtag struct {
user
etag string
}
func TestIntegrationCases(t *testing.T) {
connectionString := os.Getenv(connectionStringEnvKey)
if connectionString == "" {
t.Skipf(`SQLServer state integration tests skipped. To enable this test, define the connection string using environment variable '%[1]s' (example 'export %[1]s="server=localhost;user id=sa;password=Pass@Word1;port=1433;")'`, connectionStringEnvKey)
}
t.Run("Single operations", testSingleOperations)
t.Run("Set New Record With Invalid Etag Should Fail", testSetNewRecordWithInvalidEtagShouldFail)
t.Run("Indexed Properties", testIndexedProperties)
t.Run("Multi operations", testMultiOperations)
t.Run("Insert and Update Set Record Dates", testInsertAndUpdateSetRecordDates)
t.Run("Multiple initializations", testMultipleInitializations)
// Run concurrent set tests 10 times
const executions = 10
for i := 0; i < executions; i++ {
t.Run(fmt.Sprintf("Concurrent sets, try #%d", i+1), testConcurrentSets)
}
}
func getUniqueDBSchema(t *testing.T) string {
b := make([]byte, 4)
_, err := io.ReadFull(rand.Reader, b)
require.NoError(t, err)
return fmt.Sprintf("v%s", hex.EncodeToString(b))
}
func createMetadata(schema string, kt KeyType, indexedProperties string) state.Metadata {
metadata := state.Metadata{Base: metadata.Base{
Properties: map[string]string{
"connectionString": os.Getenv(connectionStringEnvKey),
"schema": schema,
"tableName": usersTableName,
"keyType": string(kt),
"databaseName": "dapr_test",
},
}}
if indexedProperties != "" {
metadata.Properties["indexedProperties"] = indexedProperties
}
return metadata
}
// Ensure the database is running
// For docker, use: docker run --name sqlserver -e "ACCEPT_EULA=Y" -e "SA_PASSWORD=Pass@Word1" -p 1433:1433 -d mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04.
func getTestStore(t *testing.T, indexedProperties string) *SQLServer {
return getTestStoreWithKeyType(t, StringKeyType, indexedProperties)
}
func getTestStoreWithKeyType(t *testing.T, kt KeyType, indexedProperties string) *SQLServer {
schema := getUniqueDBSchema(t)
metadata := createMetadata(schema, kt, indexedProperties)
store := &SQLServer{
logger: logger.NewLogger("test"),
migratorFactory: newMigration,
}
store.BulkStore = state.NewDefaultBulkStore(store)
err := store.Init(context.Background(), metadata)
require.NoError(t, err)
return store
}
func assertUserExists(t *testing.T, store *SQLServer, key string) (user, string) {
getRes, err := store.Get(context.Background(), &state.GetRequest{Key: key})
require.NoError(t, err)
assert.NotNil(t, getRes)
assert.NotNil(t, getRes.Data, "No data was returned")
require.NotNil(t, getRes.ETag)
var loaded user
err = json.Unmarshal(getRes.Data, &loaded)
require.NoError(t, err)
return loaded, *getRes.ETag
}
func assertLoadedUserIsEqual(t *testing.T, store *SQLServer, key string, expected user) (user, string) {
loaded, etag := assertUserExists(t, store, key)
assert.Equal(t, expected.ID, loaded.ID)
assert.Equal(t, expected.Name, loaded.Name)
assert.Equal(t, expected.FavoriteBeverage, loaded.FavoriteBeverage)
return loaded, etag
}
func assertUserDoesNotExist(t *testing.T, store *SQLServer, key string) {
_, err := store.Get(context.Background(), &state.GetRequest{Key: key})
require.NoError(t, err)
}
func assertDBQuery(t *testing.T, store *SQLServer, query string, assertReader func(t *testing.T, rows *sql.Rows)) {
rows, err := store.db.Query(query)
require.NoError(t, err)
require.NoError(t, rows.Err())
defer rows.Close()
assertReader(t, rows)
}
/* #nosec. */
func assertUserCountIsEqualTo(t *testing.T, store *SQLServer, expected int) {
tsql := fmt.Sprintf("SELECT count(*) FROM [%s].[%s]", store.metadata.SchemaName, store.metadata.TableName)
assertDBQuery(t, store, tsql, func(t *testing.T, rows *sql.Rows) {
assert.True(t, rows.Next())
var actual int
err := rows.Scan(&actual)
require.NoError(t, err)
assert.Equal(t, expected, actual)
})
}
type userKeyGenerator interface {
NextKey() string
}
type numbericKeyGenerator struct {
seed int32
}
func (n *numbericKeyGenerator) NextKey() string {
val := atomic.AddInt32(&n.seed, 1)
return strconv.Itoa(int(val))
}
type uuidKeyGenerator struct{}
func (n uuidKeyGenerator) NextKey() string {
return uuid.New().String()
}
func testSingleOperations(t *testing.T) {
invEtag := invalidEtag
tests := []struct {
name string
kt KeyType
keyGen userKeyGenerator
}{
{"Single operation string key type", StringKeyType, &numbericKeyGenerator{}},
{"Single operation integer key type", IntegerKeyType, &numbericKeyGenerator{}},
{"Single operation uuid key type", UUIDKeyType, &uuidKeyGenerator{}},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
store := getTestStoreWithKeyType(t, test.kt, "")
john := user{test.keyGen.NextKey(), "John", "Coffee"}
// Get fails as the item does not exist
assertUserDoesNotExist(t, store, john.ID)
// Save and read
err := store.Set(context.Background(), &state.SetRequest{Key: john.ID, Value: john})
require.NoError(t, err)
johnV1, etagFromInsert := assertLoadedUserIsEqual(t, store, john.ID, john)
// Update with ETAG
waterJohn := johnV1
waterJohn.FavoriteBeverage = "Water"
err = store.Set(context.Background(), &state.SetRequest{Key: waterJohn.ID, Value: waterJohn, ETag: &etagFromInsert})
require.NoError(t, err)
// Get updated
johnV2, _ := assertLoadedUserIsEqual(t, store, waterJohn.ID, waterJohn)
// Update without ETAG
noEtagJohn := johnV2
noEtagJohn.FavoriteBeverage = "No Etag John"
err = store.Set(context.Background(), &state.SetRequest{Key: noEtagJohn.ID, Value: noEtagJohn})
require.NoError(t, err)
// 7. Get updated
johnV3, _ := assertLoadedUserIsEqual(t, store, noEtagJohn.ID, noEtagJohn)
// 8. Update with invalid ETAG should fail
failedJohn := johnV3
failedJohn.FavoriteBeverage = "Will not work"
err = store.Set(context.Background(), &state.SetRequest{Key: failedJohn.ID, Value: failedJohn, ETag: &etagFromInsert})
require.Error(t, err)
_, etag := assertLoadedUserIsEqual(t, store, johnV3.ID, johnV3)
// 9. Delete with invalid ETAG should fail
err = store.Delete(context.Background(), &state.DeleteRequest{Key: johnV3.ID, ETag: &invEtag})
require.Error(t, err)
assertLoadedUserIsEqual(t, store, johnV3.ID, johnV3)
// 10. Delete with valid ETAG
err = store.Delete(context.Background(), &state.DeleteRequest{Key: johnV2.ID, ETag: &etag})
require.NoError(t, err)
assertUserDoesNotExist(t, store, johnV2.ID)
})
}
}
func testSetNewRecordWithInvalidEtagShouldFail(t *testing.T) {
store := getTestStore(t, "")
u := user{uuid.New().String(), "John", "Coffee"}
invEtag := invalidEtag
err := store.Set(context.Background(), &state.SetRequest{Key: u.ID, Value: u, ETag: &invEtag})
require.Error(t, err)
}
/* #nosec. */
func testIndexedProperties(t *testing.T) {
store := getTestStore(t, `[{ "column":"FavoriteBeverage", "property":"FavoriteBeverage", "type":"nvarchar(100)"}, { "column":"PetsCount", "property":"PetsCount", "type": "INTEGER"}]`)
err := store.BulkSet(context.Background(), []state.SetRequest{
{Key: "1", Value: userWithPets{user{"1", "John", "Coffee"}, 3}},
{Key: "2", Value: userWithPets{user{"2", "Laura", "Water"}, 1}},
{Key: "3", Value: userWithPets{user{"3", "Carl", "Beer"}, 0}},
{Key: "4", Value: userWithPets{user{"4", "Maria", "Wine"}, 100}},
}, state.BulkStoreOpts{})
require.NoError(t, err)
// Check the database for computed columns
assertDBQuery(t, store, fmt.Sprintf("SELECT count(*) from [%s].[%s] WHERE PetsCount < 3", store.metadata.SchemaName, usersTableName), func(t *testing.T, rows *sql.Rows) {
assert.True(t, rows.Next())
var c int
rows.Scan(&c)
assert.Equal(t, 2, c)
})
// Ensure we can get by beverage
assertDBQuery(t, store, fmt.Sprintf("SELECT count(*) from [%s].[%s] WHERE FavoriteBeverage = '%s'", store.metadata.SchemaName, usersTableName, "Coffee"), func(t *testing.T, rows *sql.Rows) {
assert.True(t, rows.Next())
var c int
rows.Scan(&c)
assert.Equal(t, 1, c)
})
}
func testMultiOperations(t *testing.T) {
tests := []struct {
name string
kt KeyType
keyGen userKeyGenerator
}{
{"Multi operations string key type", StringKeyType, &numbericKeyGenerator{}},
{"Multi operations integer key type", IntegerKeyType, &numbericKeyGenerator{}},
{"Multi operations uuid key type", UUIDKeyType, &uuidKeyGenerator{}},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
store := getTestStoreWithKeyType(t, test.kt, `[{ "column":"FavoriteBeverage", "property":"FavoriteBeverage", "type":"nvarchar(100)"}]`)
keyGen := test.keyGen
initialUsers := []user{
{keyGen.NextKey(), "John", "Coffee"},
{keyGen.NextKey(), "Laura", "Water"},
{keyGen.NextKey(), "Carl", "Beer"},
{keyGen.NextKey(), "Maria", "Wine"},
{keyGen.NextKey(), "Mark", "Juice"},
{keyGen.NextKey(), "Sara", "Soda"},
{keyGen.NextKey(), "Tony", "Milk"},
{keyGen.NextKey(), "Hugo", "Juice"},
}
// 1. add bulk users
bulkSet := make([]state.SetRequest, len(initialUsers))
for i, u := range initialUsers {
bulkSet[i] = state.SetRequest{Key: u.ID, Value: u}
}
err := store.BulkSet(context.Background(), bulkSet, state.BulkStoreOpts{})
require.NoError(t, err)
assertUserCountIsEqualTo(t, store, len(initialUsers))
// Ensure initial users are correctly stored
loadedUsers := make([]userWithEtag, len(initialUsers))
for i, u := range initialUsers {
loaded, etag := assertLoadedUserIsEqual(t, store, u.ID, u)
loadedWithEtag := userWithEtag{loaded, etag}
loadedUsers[i] = loadedWithEtag
}
totalUsers := len(loadedUsers)
userIndex := 0
t.Run("Update and delete without etag should work", func(t *testing.T) {
toDelete := loadedUsers[userIndex].user
original := loadedUsers[userIndex+1]
modified := original.user
modified.FavoriteBeverage = beverageTea
localErr := store.Multi(context.Background(), &state.TransactionalStateRequest{
Operations: []state.TransactionalStateOperation{
state.DeleteRequest{Key: toDelete.ID},
state.SetRequest{Key: modified.ID, Value: modified},
},
})
require.NoError(t, localErr)
assertLoadedUserIsEqual(t, store, modified.ID, modified)
assertUserDoesNotExist(t, store, toDelete.ID)
totalUsers--
assertUserCountIsEqualTo(t, store, totalUsers)
userIndex += 2
})
t.Run("Update, delete and insert should work", func(t *testing.T) {
toDelete := loadedUsers[userIndex]
toModify := loadedUsers[userIndex+1]
toInsert := user{keyGen.NextKey(), "Susan", "Soda"}
modified := toModify.user
modified.FavoriteBeverage = beverageTea
err = store.Multi(context.Background(), &state.TransactionalStateRequest{
Operations: []state.TransactionalStateOperation{
state.DeleteRequest{Key: toDelete.ID, ETag: &toDelete.etag},
state.SetRequest{Key: modified.ID, Value: modified, ETag: &toModify.etag},
state.SetRequest{Key: toInsert.ID, Value: toInsert},
},
})
require.NoError(t, err)
assertLoadedUserIsEqual(t, store, modified.ID, modified)
assertLoadedUserIsEqual(t, store, toInsert.ID, toInsert)
assertUserDoesNotExist(t, store, toDelete.ID)
// we added 1 and deleted 1, so totalUsers should have no change
assertUserCountIsEqualTo(t, store, totalUsers)
userIndex += 2
})
t.Run("Update and delete with etag should work", func(t *testing.T) {
toDelete := loadedUsers[userIndex]
toModify := loadedUsers[userIndex+1]
modified := toModify.user
modified.FavoriteBeverage = beverageTea
err = store.Multi(context.Background(), &state.TransactionalStateRequest{
Operations: []state.TransactionalStateOperation{
state.DeleteRequest{Key: toDelete.ID, ETag: &toDelete.etag},
state.SetRequest{Key: modified.ID, Value: modified, ETag: &toModify.etag},
},
})
require.NoError(t, err)
assertLoadedUserIsEqual(t, store, modified.ID, modified)
assertUserDoesNotExist(t, store, toDelete.ID)
totalUsers--
assertUserCountIsEqualTo(t, store, totalUsers)
userIndex += 2
})
t.Run("Delete fails, should abort insert", func(t *testing.T) {
toDelete := loadedUsers[userIndex]
toInsert := user{keyGen.NextKey(), "Wont-be-inserted", "Beer"}
invEtag := invalidEtag
err = store.Multi(context.Background(), &state.TransactionalStateRequest{
Operations: []state.TransactionalStateOperation{
state.DeleteRequest{Key: toDelete.ID, ETag: &invEtag},
state.SetRequest{Key: toInsert.ID, Value: toInsert},
},
})
require.Error(t, err)
assertUserDoesNotExist(t, store, toInsert.ID)
assertLoadedUserIsEqual(t, store, toDelete.ID, toDelete.user)
assertUserCountIsEqualTo(t, store, totalUsers)
})
t.Run("Delete fails, should abort update", func(t *testing.T) {
toDelete := loadedUsers[userIndex]
toModify := loadedUsers[userIndex+1]
modified := toModify.user
modified.FavoriteBeverage = beverageTea
invEtag := invalidEtag
err = store.Multi(context.Background(), &state.TransactionalStateRequest{
Operations: []state.TransactionalStateOperation{
state.DeleteRequest{Key: toDelete.ID, ETag: &invEtag},
state.SetRequest{Key: modified.ID, Value: modified},
},
})
require.Error(t, err)
assertLoadedUserIsEqual(t, store, toDelete.ID, toDelete.user)
assertLoadedUserIsEqual(t, store, toModify.ID, toModify.user)
assertUserCountIsEqualTo(t, store, totalUsers)
})
t.Run("Update fails, should abort delete", func(t *testing.T) {
toDelete := loadedUsers[userIndex]
toModify := loadedUsers[userIndex+1]
modified := toModify.user
modified.FavoriteBeverage = beverageTea
invEtag := invalidEtag
err = store.Multi(context.Background(), &state.TransactionalStateRequest{
Operations: []state.TransactionalStateOperation{
state.DeleteRequest{Key: toDelete.ID},
state.SetRequest{Key: modified.ID, Value: modified, ETag: &invEtag},
},
})
require.Error(t, err)
assertLoadedUserIsEqual(t, store, toDelete.ID, toDelete.user)
assertLoadedUserIsEqual(t, store, toModify.ID, toModify.user)
assertUserCountIsEqualTo(t, store, totalUsers)
})
})
}
}
/* #nosec. */
func testInsertAndUpdateSetRecordDates(t *testing.T) {
const maxDiffInMs = float64(500)
store := getTestStore(t, "")
u := user{"1", "John", "Coffee"}
err := store.Set(context.Background(), &state.SetRequest{Key: u.ID, Value: u})
require.NoError(t, err)
var originalInsertTime time.Time
getUserTsql := fmt.Sprintf("SELECT [InsertDate], [UpdateDate] from [%s].[%s] WHERE [Key]='%s'", store.metadata.SchemaName, store.metadata.TableName, u.ID)
assertDBQuery(t, store, getUserTsql, func(t *testing.T, rows *sql.Rows) {
assert.True(t, rows.Next())
var insertDate, updateDate sql.NullTime
localErr := rows.Scan(&insertDate, &updateDate)
require.NoError(t, localErr)
assert.True(t, insertDate.Valid)
insertDiff := float64(time.Now().UTC().Sub(insertDate.Time).Milliseconds())
assert.LessOrEqual(t, math.Abs(insertDiff), maxDiffInMs)
assert.False(t, updateDate.Valid)
originalInsertTime = insertDate.Time
})
modified := u
modified.FavoriteBeverage = beverageTea
err = store.Set(context.Background(), &state.SetRequest{Key: modified.ID, Value: modified})
require.NoError(t, err)
assertDBQuery(t, store, getUserTsql, func(t *testing.T, rows *sql.Rows) {
assert.True(t, rows.Next())
var insertDate, updateDate sql.NullTime
err := rows.Scan(&insertDate, &updateDate)
require.NoError(t, err)
assert.True(t, insertDate.Valid)
assert.Equal(t, originalInsertTime, insertDate.Time)
assert.True(t, updateDate.Valid)
updateDiff := float64(time.Now().UTC().Sub(updateDate.Time).Milliseconds())
assert.LessOrEqual(t, math.Abs(updateDiff), maxDiffInMs)
})
}
func testConcurrentSets(t *testing.T) {
const parallelism = 10
store := getTestStore(t, "")
u := user{"1", "John", "Coffee"}
err := store.Set(context.Background(), &state.SetRequest{Key: u.ID, Value: u})
require.NoError(t, err)
_, etag := assertLoadedUserIsEqual(t, store, u.ID, u)
var wc sync.WaitGroup
start := make(chan bool, parallelism)
totalErrors := int32(0)
totalSucceeds := int32(0)
for i := 0; i < parallelism; i++ {
wc.Add(1)
go func(id, etag string, start <-chan bool, wc *sync.WaitGroup, store *SQLServer) {
<-start
defer wc.Done()
modified := user{"1", "John", beverageTea}
err := store.Set(context.Background(), &state.SetRequest{Key: id, Value: modified, ETag: &etag})
if err != nil {
atomic.AddInt32(&totalErrors, 1)
} else {
atomic.AddInt32(&totalSucceeds, 1)
}
}(u.ID, etag, start, &wc, store)
}
close(start)
wc.Wait()
assert.Equal(t, int32(parallelism-1), totalErrors)
assert.Equal(t, int32(1), totalSucceeds)
}
func testMultipleInitializations(t *testing.T) {
tests := []struct {
name string
kt KeyType
indexedProperties string
}{
{"No indexed properties", StringKeyType, ""},
{"With indexed properties", StringKeyType, `[{ "column":"FavoriteBeverage", "property":"FavoriteBeverage", "type":"nvarchar(100)"}, { "column":"PetsCount", "property":"PetsCount", "type": "INTEGER"}]`},
{"No indexed properties uuid key type", UUIDKeyType, ""},
{"No indexed properties integer key type", IntegerKeyType, ""},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
store := getTestStoreWithKeyType(t, test.kt, test.indexedProperties)
store2 := &SQLServer{
logger: logger.NewLogger("test"),
migratorFactory: newMigration,
}
store2.BulkStore = state.NewDefaultBulkStore(store2)
err := store2.Init(context.Background(), createMetadata(store.metadata.SchemaName, test.kt, test.indexedProperties))
require.NoError(t, err)
})
}
}