Fixes for SQL Server state store (#2912)

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
Signed-off-by: joshvanl <me@joshvanl.dev>
Co-authored-by: joshvanl <me@joshvanl.dev>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
This commit is contained in:
Alessandro (Ale) Segala 2023-06-19 11:04:46 -07:00 committed by GitHub
parent 36f09695b6
commit d9ea9c69c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 74 additions and 283 deletions

View File

@ -1,7 +1,7 @@
version: '2'
services:
sqlserver:
image: mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04
image: mcr.microsoft.com/mssql/server:2019-latest
ports:
- "1433:1433"
environment:

View File

@ -46,7 +46,7 @@ func newMigration(metadata *sqlServerMetadata) migrator {
func (m *migration) newMigrationResult() migrationResult {
r := migrationResult{
itemRefTableTypeName: fmt.Sprintf("[%s].%s_Table", m.metadata.Schema, m.metadata.TableName),
upsertProcName: fmt.Sprintf("sp_Upsert_v3_%s", m.metadata.TableName),
upsertProcName: fmt.Sprintf("sp_Upsert_v4_%s", m.metadata.TableName),
getCommand: fmt.Sprintf("SELECT [Data], [RowVersion], [ExpireDate] FROM [%s].[%s] WHERE [Key] = @Key AND ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE())", m.metadata.Schema, m.metadata.TableName),
deleteWithETagCommand: fmt.Sprintf(`DELETE [%s].[%s] WHERE [Key]=@Key AND [RowVersion]=@RowVersion`, m.metadata.Schema, m.metadata.TableName),
deleteWithoutETagCommand: fmt.Sprintf(`DELETE [%s].[%s] WHERE [Key]=@Key`, m.metadata.Schema, m.metadata.TableName),
@ -296,7 +296,7 @@ func (m *migration) ensureUpsertStoredProcedureExists(ctx context.Context, db *s
BEGIN
UPDATE [%[3]s]
SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END
WHERE [Key]=@Key AND RowVersion = @RowVersion AND (([RowVersion] IS NULL) OR ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE()))
WHERE [Key]=@Key AND RowVersion = @RowVersion
END
COMMIT;
END
@ -316,7 +316,7 @@ func (m *migration) ensureUpsertStoredProcedureExists(ctx context.Context, db *s
IF ERROR_NUMBER() IN (2601, 2627)
UPDATE [%[3]s]
SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END
WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion) AND (([RowVersion] IS NULL) OR ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE()))
WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion)
END CATCH
END
COMMIT;
@ -328,7 +328,7 @@ func (m *migration) ensureUpsertStoredProcedureExists(ctx context.Context, db *s
BEGIN
UPDATE [%[3]s]
SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END
WHERE [Key]=@Key AND RowVersion = @RowVersion AND (([RowVersion] IS NULL) OR ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE()))
WHERE [Key]=@Key AND RowVersion = @RowVersion
RETURN
END
ELSE
@ -341,7 +341,7 @@ func (m *migration) ensureUpsertStoredProcedureExists(ctx context.Context, db *s
IF ERROR_NUMBER() IN (2601, 2627)
UPDATE [%[3]s]
SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END
WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion) AND (([RowVersion] IS NULL) OR ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE()))
WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion)
END CATCH
END
END

View File

@ -78,7 +78,7 @@ type IndexedProperty struct {
Type string `json:"type"`
}
// SQLServer defines a Ms SQL Server based state store.
// SQLServer defines a MS SQL Server based state store.
type SQLServer struct {
state.BulkStore

View File

@ -64,12 +64,8 @@ type userWithEtag struct {
etag string
}
func getMasterConnectionString() string {
return os.Getenv(connectionStringEnvKey)
}
func TestIntegrationCases(t *testing.T) {
connectionString := getMasterConnectionString()
connectionString := os.Getenv(connectionStringEnvKey)
if connectionString == "" {
t.Skipf("SQLServer state integration tests skipped. To enable define the connection string using environment variable '%s' (example 'export %s=\"server=localhost;user id=sa;password=Pass@Word1;port=1433;\")", connectionStringEnvKey, connectionStringEnvKey)
}
@ -78,8 +74,6 @@ func TestIntegrationCases(t *testing.T) {
t.Run("Set New Record With Invalid Etag Should Fail", testSetNewRecordWithInvalidEtagShouldFail)
t.Run("Indexed Properties", testIndexedProperties)
t.Run("Multi operations", testMultiOperations)
t.Run("Bulk sets", testBulkSet)
t.Run("Bulk delete", testBulkDelete)
t.Run("Insert and Update Set Record Dates", testInsertAndUpdateSetRecordDates)
t.Run("Multiple initializations", testMultipleInitializations)
@ -100,7 +94,7 @@ func getUniqueDBSchema() string {
func createMetadata(schema string, kt KeyType, indexedProperties string) state.Metadata {
metadata := state.Metadata{Base: metadata.Base{
Properties: map[string]string{
connectionStringKey: getMasterConnectionString(),
connectionStringKey: os.Getenv(connectionStringEnvKey),
schemaKey: schema,
tableNameKey: usersTableName,
keyTypeKey: string(kt),
@ -126,7 +120,9 @@ func getTestStoreWithKeyType(t *testing.T, kt KeyType, indexedProperties string)
metadata := createMetadata(schema, kt, indexedProperties)
store := &SQLServer{
logger: logger.NewLogger("test"),
migratorFactory: newMigration,
}
store.BulkStore = state.NewDefaultBulkStore(store)
err := store.Init(context.Background(), metadata)
require.NoError(t, err)
@ -162,13 +158,9 @@ func assertUserDoesNotExist(t *testing.T, store *SQLServer, key string) {
}
func assertDBQuery(t *testing.T, store *SQLServer, query string, assertReader func(t *testing.T, rows *sql.Rows)) {
db, err := sql.Open("sqlserver", store.metadata.ConnectionString)
rows, err := store.db.Query(query)
require.NoError(t, err)
defer db.Close()
rows, err := db.Query(query)
require.NoError(t, err)
assert.Nil(t, rows.Err())
require.NoError(t, rows.Err())
defer rows.Close()
assertReader(t, rows)
@ -493,214 +485,6 @@ func testMultiOperations(t *testing.T) {
}
}
func testBulkSet(t *testing.T) {
tests := []struct {
name string
kt KeyType
keyGen userKeyGenerator
}{
{"Bulk set string key type", StringKeyType, &numbericKeyGenerator{}},
{"Bulk set integer key type", IntegerKeyType, &numbericKeyGenerator{}},
{"Bulk set uuid key type", UUIDKeyType, &uuidKeyGenerator{}},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
store := getTestStoreWithKeyType(t, test.kt, "")
keyGen := test.keyGen
initialUsers := []user{
{keyGen.NextKey(), "John", "Coffee"},
{keyGen.NextKey(), "Laura", "Water"},
{keyGen.NextKey(), "Carl", "Beer"},
}
totalUsers := 0
userIndex := 0
t.Run("Add initial users", func(t *testing.T) {
sets := make([]state.SetRequest, len(initialUsers))
for i, u := range initialUsers {
sets[i] = state.SetRequest{Key: u.ID, Value: u}
}
err := store.BulkSet(context.Background(), sets, state.BulkStoreOpts{})
require.NoError(t, err)
totalUsers = len(sets)
assertUserCountIsEqualTo(t, store, totalUsers)
})
t.Run("Add 1, update 1 with valid etag", func(t *testing.T) {
toModify, toModifyETag := assertUserExists(t, store, initialUsers[userIndex].ID)
modified := toModify
modified.FavoriteBeverage = beverageTea
toInsert := user{keyGen.NextKey(), "Maria", "Wine"}
err := store.BulkSet(context.Background(), []state.SetRequest{
{Key: modified.ID, Value: modified, ETag: &toModifyETag},
{Key: toInsert.ID, Value: toInsert},
}, state.BulkStoreOpts{})
require.NoError(t, err)
assertLoadedUserIsEqual(t, store, modified.ID, modified)
assertLoadedUserIsEqual(t, store, toInsert.ID, toInsert)
totalUsers++
assertUserCountIsEqualTo(t, store, totalUsers)
userIndex++
})
t.Run("Add 1, update 1 without etag", func(t *testing.T) {
toModify := initialUsers[userIndex]
modified := toModify
modified.FavoriteBeverage = beverageTea
toInsert := user{keyGen.NextKey(), "Tony", "Milk"}
err := store.BulkSet(context.Background(), []state.SetRequest{
{Key: modified.ID, Value: modified},
{Key: toInsert.ID, Value: toInsert},
}, state.BulkStoreOpts{})
require.NoError(t, err)
assertLoadedUserIsEqual(t, store, modified.ID, modified)
assertLoadedUserIsEqual(t, store, toInsert.ID, toInsert)
totalUsers++
assertUserCountIsEqualTo(t, store, totalUsers)
userIndex++
})
t.Run("Failed upsert due to etag should be aborted", func(t *testing.T) {
toInsert1 := user{keyGen.NextKey(), "Ted1", "Beer"}
toInsert2 := user{keyGen.NextKey(), "Ted2", "Beer"}
toModify := initialUsers[userIndex]
modified := toModify
modified.FavoriteBeverage = beverageTea
invEtag := invalidEtag
sets := []state.SetRequest{
{Key: toInsert1.ID, Value: toInsert1},
{Key: toInsert2.ID, Value: toInsert2},
{Key: modified.ID, Value: modified, ETag: &invEtag},
}
err := store.BulkSet(context.Background(), sets, state.BulkStoreOpts{})
assert.NotNil(t, err)
assertUserCountIsEqualTo(t, store, totalUsers)
assertUserDoesNotExist(t, store, toInsert1.ID)
assertUserDoesNotExist(t, store, toInsert2.ID)
assertLoadedUserIsEqual(t, store, modified.ID, toModify)
assertUserCountIsEqualTo(t, store, totalUsers)
})
})
}
}
func testBulkDelete(t *testing.T) {
tests := []struct {
name string
kt KeyType
keyGen userKeyGenerator
}{
{"Bulk delete string key type", StringKeyType, &numbericKeyGenerator{}},
{"Bulk delete integer key type", IntegerKeyType, &numbericKeyGenerator{}},
{"Bulk delete uuid key type", UUIDKeyType, &uuidKeyGenerator{}},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
store := getTestStoreWithKeyType(t, test.kt, "")
keyGen := test.keyGen
initialUsers := []user{
{keyGen.NextKey(), "John", "Coffee"},
{keyGen.NextKey(), "Laura", "Water"},
{keyGen.NextKey(), "Carl", "Beer"},
{keyGen.NextKey(), "Maria", "Wine"},
{keyGen.NextKey(), "Mark", "Juice"},
{keyGen.NextKey(), "Sara", "Soda"},
{keyGen.NextKey(), "Tony", "Milk"},
{keyGen.NextKey(), "Hugo", "Juice"},
}
sets := make([]state.SetRequest, len(initialUsers))
for i, u := range initialUsers {
sets[i] = state.SetRequest{Key: u.ID, Value: u}
}
err := store.BulkSet(context.Background(), sets, state.BulkStoreOpts{})
require.NoError(t, err)
totalUsers := len(initialUsers)
assertUserCountIsEqualTo(t, store, totalUsers)
userIndex := 0
t.Run("Delete 2 items without etag should work", func(t *testing.T) {
deleted1 := initialUsers[userIndex].ID
deleted2 := initialUsers[userIndex+1].ID
err := store.BulkDelete(context.Background(), []state.DeleteRequest{
{Key: deleted1},
{Key: deleted2},
}, state.BulkStoreOpts{})
require.NoError(t, err)
totalUsers -= 2
assertUserCountIsEqualTo(t, store, totalUsers)
assertUserDoesNotExist(t, store, deleted1)
assertUserDoesNotExist(t, store, deleted2)
userIndex += 2
})
t.Run("Delete 2 items with etag should work", func(t *testing.T) {
deleted1, deleted1Etag := assertUserExists(t, store, initialUsers[userIndex].ID)
deleted2, deleted2Etag := assertUserExists(t, store, initialUsers[userIndex+1].ID)
err := store.BulkDelete(context.Background(), []state.DeleteRequest{
{Key: deleted1.ID, ETag: &deleted1Etag},
{Key: deleted2.ID, ETag: &deleted2Etag},
}, state.BulkStoreOpts{})
require.NoError(t, err)
totalUsers -= 2
assertUserCountIsEqualTo(t, store, totalUsers)
assertUserDoesNotExist(t, store, deleted1.ID)
assertUserDoesNotExist(t, store, deleted2.ID)
userIndex += 2
})
t.Run("Delete with/without etag should work", func(t *testing.T) {
deleted1, deleted1Etag := assertUserExists(t, store, initialUsers[userIndex].ID)
deleted2 := initialUsers[userIndex+1]
err := store.BulkDelete(context.Background(), []state.DeleteRequest{
{Key: deleted1.ID, ETag: &deleted1Etag},
{Key: deleted2.ID},
}, state.BulkStoreOpts{})
require.NoError(t, err)
totalUsers -= 2
assertUserCountIsEqualTo(t, store, totalUsers)
assertUserDoesNotExist(t, store, deleted1.ID)
assertUserDoesNotExist(t, store, deleted2.ID)
userIndex += 2
})
t.Run("Failed delete due to etag should be aborted", func(t *testing.T) {
deleted1, deleted1Etag := assertUserExists(t, store, initialUsers[userIndex].ID)
deleted2 := initialUsers[userIndex+1]
invEtag := invalidEtag
err := store.BulkDelete(context.Background(), []state.DeleteRequest{
{Key: deleted1.ID, ETag: &deleted1Etag},
{Key: deleted2.ID, ETag: &invEtag},
}, state.BulkStoreOpts{})
assert.NotNil(t, err)
assert.NotNil(t, err)
assertUserCountIsEqualTo(t, store, totalUsers)
assertUserExists(t, store, deleted1.ID)
assertUserExists(t, store, deleted2.ID)
})
})
}
}
/* #nosec. */
func testInsertAndUpdateSetRecordDates(t *testing.T) {
const maxDiffInMs = float64(500)
@ -804,7 +588,9 @@ func testMultipleInitializations(t *testing.T) {
store2 := &SQLServer{
logger: logger.NewLogger("test"),
migratorFactory: newMigration,
}
store2.BulkStore = state.NewDefaultBulkStore(store2)
err := store2.Init(context.Background(), createMetadata(store.metadata.Schema, test.kt, test.indexedProperties))
assert.NoError(t, err)
})

View File

@ -1,7 +1,7 @@
version: "3.7"
services:
sqlserver:
image: mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04
image: mcr.microsoft.com/mssql/server:2019-latest
ports:
- "1433:1433"
environment:

View File

@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"os"
"strconv"
"testing"
"time"
@ -66,7 +67,8 @@ func TestSqlServer(t *testing.T) {
currentHTTPPort := ports[1]
basicTest := func(ctx flow.Context) error {
client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort))
ctx.T.Run("basic test", func(t *testing.T) {
client, err := client.NewClientWithPort(strconv.Itoa(currentGrpcPort))
if err != nil {
panic(err)
}
@ -74,51 +76,54 @@ func TestSqlServer(t *testing.T) {
// save state, default options: strong, last-write
err = client.SaveState(ctx, stateStoreName, certificationTestPrefix+"key1", []byte("certificationdata"), nil)
require.NoError(ctx.T, err)
require.NoError(t, err)
// get state
item, err := client.GetState(ctx, stateStoreName, certificationTestPrefix+"key1", nil)
require.NoError(ctx.T, err)
assert.Equal(ctx.T, "certificationdata", string(item.Value))
require.NoError(t, err)
assert.Equal(t, "certificationdata", string(item.Value))
// delete state
err = client.DeleteState(ctx, stateStoreName, certificationTestPrefix+"key1", nil)
require.NoError(ctx.T, err)
require.NoError(t, err)
})
return nil
}
basicTTLTest := func(ctx flow.Context) error {
client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort))
ctx.T.Run("basic TTL test", func(t *testing.T) {
client, err := client.NewClientWithPort(strconv.Itoa(currentGrpcPort))
if err != nil {
panic(err)
}
defer client.Close()
err = client.SaveState(ctx, stateStoreName, certificationTestPrefix+"key1", []byte("certificationdata"), map[string]string{
err = client.SaveState(ctx, stateStoreName, certificationTestPrefix+"key2", []byte("certificationdata"), map[string]string{
"ttlInSeconds": "86400",
})
require.NoError(ctx.T, err)
require.NoError(t, err)
// get state
item, err := client.GetState(ctx, stateStoreName, certificationTestPrefix+"key1", nil)
require.NoError(ctx.T, err)
assert.Equal(ctx.T, "certificationdata", string(item.Value))
assert.Contains(ctx.T, item.Metadata, "ttlExpireTime")
item, err := client.GetState(ctx, stateStoreName, certificationTestPrefix+"key2", nil)
require.NoError(t, err)
assert.Equal(t, "certificationdata", string(item.Value))
assert.Contains(t, item.Metadata, "ttlExpireTime")
expireTime, err := time.Parse(time.RFC3339, item.Metadata["ttlExpireTime"])
_ = assert.NoError(ctx.T, err) && assert.InDelta(ctx.T, time.Now().Add(24*time.Hour).Unix(), expireTime.Unix(), 10)
_ = assert.NoError(t, err) &&
assert.InDelta(t, time.Now().Add(24*time.Hour).Unix(), expireTime.Unix(), 10)
err = client.SaveState(ctx, stateStoreName, certificationTestPrefix+"key1", []byte("certificationdata"), map[string]string{
err = client.SaveState(ctx, stateStoreName, certificationTestPrefix+"key2", []byte("certificationdata"), map[string]string{
"ttlInSeconds": "1",
})
require.NoError(ctx.T, err)
require.NoError(t, err)
time.Sleep(2 * time.Second)
item, err = client.GetState(ctx, stateStoreName, certificationTestPrefix+"key1", nil)
require.NoError(ctx.T, err)
assert.Nil(ctx.T, item.Value)
assert.Nil(ctx.T, item.Metadata)
item, err = client.GetState(ctx, stateStoreName, certificationTestPrefix+"key2", nil)
require.NoError(t, err)
assert.Nil(t, item.Value)
assert.Nil(t, item.Metadata)
})
return nil
}
@ -126,7 +131,7 @@ func TestSqlServer(t *testing.T) {
// this test function heavily depends on the values defined in ./components/docker/customschemawithindex
verifyIndexedPopertiesTest := func(ctx flow.Context) error {
// verify indices were created by Dapr as specified in the component metadata
db, err := sql.Open("sqlserver", fmt.Sprintf("%sdatabase=certificationtest;", dockerConnectionString))
db, err := sql.Open("mssql", fmt.Sprintf("%sdatabase=certificationtest;", dockerConnectionString))
require.NoError(ctx.T, err)
defer db.Close()
@ -152,7 +157,7 @@ func TestSqlServer(t *testing.T) {
assert.Equal(ctx.T, 3, indexFoundCount)
// write JSON data to the state store (which will automatically be indexed in separate columns)
client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort))
client, err := client.NewClientWithPort(strconv.Itoa(currentGrpcPort))
if err != nil {
panic(err)
}
@ -201,7 +206,7 @@ func TestSqlServer(t *testing.T) {
// helper function for testing the use of an existing custom schema
createCustomSchema := func(ctx flow.Context) error {
db, err := sql.Open("sqlserver", dockerConnectionString)
db, err := sql.Open("mssql", dockerConnectionString)
assert.NoError(ctx.T, err)
_, err = db.Exec("CREATE SCHEMA customschema;")
assert.NoError(ctx.T, err)
@ -211,7 +216,7 @@ func TestSqlServer(t *testing.T) {
// helper function to insure the SQL Server Docker Container is truly ready
checkSQLServerAvailability := func(ctx flow.Context) error {
db, err := sql.Open("sqlserver", dockerConnectionString)
db, err := sql.Open("mssql", dockerConnectionString)
if err != nil {
return err
}
@ -224,7 +229,7 @@ func TestSqlServer(t *testing.T) {
// checks the state store component is not vulnerable to SQL injection
verifySQLInjectionTest := func(ctx flow.Context) error {
client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort))
client, err := client.NewClientWithPort(strconv.Itoa(currentGrpcPort))
if err != nil {
panic(err)
}
@ -315,7 +320,7 @@ func TestSqlServer(t *testing.T) {
})
ctx.T.Run("cleanup", func(t *testing.T) {
dbClient, err := sql.Open("sqlserver", connString)
dbClient, err := sql.Open("mssql", connString)
require.NoError(t, err)
t.Run("automatically delete expiredate records", func(t *testing.T) {
@ -436,7 +441,7 @@ func TestSqlServer(t *testing.T) {
)).
Step("Run basic test", basicTest).
Step("Run basic TTL test", basicTTLTest).
// Introduce network interruption of 15 seconds
// Introduce network interruption of 10 seconds
// Note: the connection timeout is set to 5 seconds via the component metadata connection string.
Step("interrupt network",
network.InterruptNetwork(10*time.Second, nil, nil, "1433", "1434")).