Conformance tests & bugfixes for SQLServer (#1078)
* enable sqlserver conformance test * Update SQL Server conformance test setup * Fix sqlserver bug introduced in Go 1.16 * sqlserver almost conformant * Fix conformance tests and stored procedure * all sqlserver tests passing * Minor touchup * lint * Remove unnecessary condition * Add conformance test to GitHub workflow * remove env variable * Update stored procedure * Simplify error handling * Update state/sqlserver/sqlserver.go Co-authored-by: Simon Leet <31784195+CodeMonkeyLeet@users.noreply.github.com> * Update state/sqlserver/sqlserver.go Co-authored-by: Simon Leet <31784195+CodeMonkeyLeet@users.noreply.github.com> Co-authored-by: Bernd Verst <me@bernd.dev> Co-authored-by: Simon Leet <31784195+CodeMonkeyLeet@users.noreply.github.com> Co-authored-by: Artur Souza <artursouza.ms@outlook.com> Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
This commit is contained in:
parent
71f4748460
commit
059dc8c0d2
|
@ -0,0 +1,9 @@
|
||||||
|
version: '2'
|
||||||
|
services:
|
||||||
|
sqlserver:
|
||||||
|
image: mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04
|
||||||
|
ports:
|
||||||
|
- "1433:1433"
|
||||||
|
environment:
|
||||||
|
ACCEPT_EULA: Y
|
||||||
|
SA_PASSWORD: "Pass@Word1"
|
|
@ -62,6 +62,7 @@ jobs:
|
||||||
- secretstores.localfile
|
- secretstores.localfile
|
||||||
- state.mongodb
|
- state.mongodb
|
||||||
- state.redis
|
- state.redis
|
||||||
|
- state.sqlserver
|
||||||
EOF
|
EOF
|
||||||
)
|
)
|
||||||
echo "::set-output name=pr-components::$PR_COMPONENTS"
|
echo "::set-output name=pr-components::$PR_COMPONENTS"
|
||||||
|
@ -191,6 +192,10 @@ jobs:
|
||||||
mongodb-replica-set: test-rs
|
mongodb-replica-set: test-rs
|
||||||
if: contains(matrix.component, 'mongodb')
|
if: contains(matrix.component, 'mongodb')
|
||||||
|
|
||||||
|
- name: Start sqlserver
|
||||||
|
run: docker-compose -f ./.github/infrastructure/docker-compose-sqlserver.yml -p sqlserver up -d
|
||||||
|
if: contains(matrix.component, 'sqlserver')
|
||||||
|
|
||||||
- name: Start kafka
|
- name: Start kafka
|
||||||
run: docker-compose -f ./.github/infrastructure/docker-compose-kafka.yml -p kafka up -d
|
run: docker-compose -f ./.github/infrastructure/docker-compose-kafka.yml -p kafka up -d
|
||||||
if: contains(matrix.component, 'kafka')
|
if: contains(matrix.component, 'kafka')
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -44,7 +44,7 @@ require (
|
||||||
github.com/dancannon/gorethink v4.0.0+incompatible
|
github.com/dancannon/gorethink v4.0.0+incompatible
|
||||||
github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233
|
github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233
|
||||||
github.com/deepmap/oapi-codegen v1.8.1 // indirect
|
github.com/deepmap/oapi-codegen v1.8.1 // indirect
|
||||||
github.com/denisenkom/go-mssqldb v0.0.0-20191128021309-1d7a30a10f73
|
github.com/denisenkom/go-mssqldb v0.0.0-20210411162248-d9abbec934ba
|
||||||
github.com/dghubble/go-twitter v0.0.0-20190719072343-39e5462e111f
|
github.com/dghubble/go-twitter v0.0.0-20190719072343-39e5462e111f
|
||||||
github.com/dghubble/oauth1 v0.6.0
|
github.com/dghubble/oauth1 v0.6.0
|
||||||
github.com/didip/tollbooth v4.0.2+incompatible
|
github.com/didip/tollbooth v4.0.2+incompatible
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -273,8 +273,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
|
||||||
github.com/deepmap/oapi-codegen v1.3.6/go.mod h1:aBozjEveG+33xPiP55Iw/XbVkhtZHEGLq3nxlX0+hfU=
|
github.com/deepmap/oapi-codegen v1.3.6/go.mod h1:aBozjEveG+33xPiP55Iw/XbVkhtZHEGLq3nxlX0+hfU=
|
||||||
github.com/deepmap/oapi-codegen v1.8.1 h1:gSKgzu1DvWfRctnr0UVwieWkg1LEecP0C2htZyBwDTA=
|
github.com/deepmap/oapi-codegen v1.8.1 h1:gSKgzu1DvWfRctnr0UVwieWkg1LEecP0C2htZyBwDTA=
|
||||||
github.com/deepmap/oapi-codegen v1.8.1/go.mod h1:YLgSKSDv/bZQB7N4ws6luhozi3cEdRktEqrX88CvjIw=
|
github.com/deepmap/oapi-codegen v1.8.1/go.mod h1:YLgSKSDv/bZQB7N4ws6luhozi3cEdRktEqrX88CvjIw=
|
||||||
github.com/denisenkom/go-mssqldb v0.0.0-20191128021309-1d7a30a10f73 h1:OGNva6WhsKst5OZf7eZOklDztV3hwtTHovdrLHV+MsA=
|
github.com/denisenkom/go-mssqldb v0.0.0-20210411162248-d9abbec934ba h1:HuzamveGKQH9cN1TrsZgEoG0sHvTa5j3LKquWaHR3sY=
|
||||||
github.com/denisenkom/go-mssqldb v0.0.0-20191128021309-1d7a30a10f73/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
|
github.com/denisenkom/go-mssqldb v0.0.0-20210411162248-d9abbec934ba/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
|
||||||
github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA=
|
github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA=
|
||||||
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
|
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
|
||||||
github.com/dghubble/go-twitter v0.0.0-20190719072343-39e5462e111f h1:M2wB039zeS1/LZtN/3A7tWyfctiOBL4ty5PURBmDdWU=
|
github.com/dghubble/go-twitter v0.0.0-20190719072343-39e5462e111f h1:M2wB039zeS1/LZtN/3A7tWyfctiOBL4ty5PURBmDdWU=
|
||||||
|
|
|
@ -126,8 +126,7 @@ func (m *migration) executeMigrations() (migrationResult, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func runCommand(tsql string, db *sql.DB) error {
|
func runCommand(tsql string, db *sql.DB) error {
|
||||||
_, err := db.Exec(tsql)
|
if _, err := db.Exec(tsql); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -272,35 +271,85 @@ func (m *migration) createStoredProcedureIfNotExists(db *sql.DB, name string, es
|
||||||
/* #nosec */
|
/* #nosec */
|
||||||
func (m *migration) ensureUpsertStoredProcedureExists(db *sql.DB, mr migrationResult) error {
|
func (m *migration) ensureUpsertStoredProcedureExists(db *sql.DB, mr migrationResult) error {
|
||||||
tsql := fmt.Sprintf(`
|
tsql := fmt.Sprintf(`
|
||||||
CREATE PROCEDURE %s (
|
CREATE PROCEDURE %s (
|
||||||
@Key %s,
|
@Key %s,
|
||||||
@Data NVARCHAR(MAX),
|
@Data NVARCHAR(MAX),
|
||||||
@RowVersion BINARY(8))
|
@RowVersion BINARY(8),
|
||||||
AS
|
@FirstWrite BIT)
|
||||||
IF (@RowVersion IS NOT NULL)
|
AS
|
||||||
BEGIN
|
IF (@FirstWrite=1)
|
||||||
UPDATE [%s]
|
BEGIN
|
||||||
SET [Data]=@Data, UpdateDate=GETDATE()
|
IF (@RowVersion IS NOT NULL)
|
||||||
WHERE [Key]=@Key AND RowVersion = @RowVersion
|
BEGIN
|
||||||
|
BEGIN TRANSACTION;
|
||||||
RETURN
|
IF NOT EXISTS (SELECT * FROM [%s] WHERE [KEY]=@KEY AND RowVersion = @RowVersion)
|
||||||
END
|
BEGIN
|
||||||
|
THROW 2601, ''FIRST-WRITE: COMPETING RECORD ALREADY WRITTEN.'', 1
|
||||||
BEGIN TRY
|
END
|
||||||
INSERT INTO [%s] ([Key], [Data]) VALUES (@Key, @Data);
|
BEGIN
|
||||||
END TRY
|
UPDATE [%s]
|
||||||
|
SET [Data]=@Data, UpdateDate=GETDATE()
|
||||||
BEGIN CATCH
|
WHERE [Key]=@Key AND RowVersion = @RowVersion
|
||||||
IF ERROR_NUMBER() IN (2601, 2627)
|
END
|
||||||
UPDATE [%s]
|
COMMIT;
|
||||||
SET [Data]=@Data, UpdateDate=GETDATE()
|
END
|
||||||
WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion)
|
ELSE
|
||||||
END CATCH`,
|
BEGIN
|
||||||
|
BEGIN TRANSACTION;
|
||||||
|
IF EXISTS (SELECT * FROM [%s] WHERE [KEY]=@KEY)
|
||||||
|
BEGIN
|
||||||
|
THROW 2601, ''FIRST-WRITE: COMPETING RECORD ALREADY WRITTEN.'', 1
|
||||||
|
END
|
||||||
|
BEGIN
|
||||||
|
BEGIN TRY
|
||||||
|
INSERT INTO [%s] ([Key], [Data]) VALUES (@Key, @Data);
|
||||||
|
END TRY
|
||||||
|
|
||||||
|
BEGIN CATCH
|
||||||
|
IF ERROR_NUMBER() IN (2601, 2627)
|
||||||
|
UPDATE [%s]
|
||||||
|
SET [Data]=@Data, UpdateDate=GETDATE()
|
||||||
|
WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion)
|
||||||
|
END CATCH
|
||||||
|
END
|
||||||
|
COMMIT;
|
||||||
|
END
|
||||||
|
END
|
||||||
|
ELSE
|
||||||
|
BEGIN
|
||||||
|
IF (@RowVersion IS NOT NULL)
|
||||||
|
BEGIN
|
||||||
|
UPDATE [%s]
|
||||||
|
SET [Data]=@Data, UpdateDate=GETDATE()
|
||||||
|
WHERE [Key]=@Key AND RowVersion = @RowVersion
|
||||||
|
RETURN
|
||||||
|
END
|
||||||
|
ELSE
|
||||||
|
BEGIN
|
||||||
|
BEGIN TRY
|
||||||
|
INSERT INTO [%s] ([Key], [Data]) VALUES (@Key, @Data);
|
||||||
|
END TRY
|
||||||
|
|
||||||
|
BEGIN CATCH
|
||||||
|
IF ERROR_NUMBER() IN (2601, 2627)
|
||||||
|
UPDATE [%s]
|
||||||
|
SET [Data]=@Data, UpdateDate=GETDATE()
|
||||||
|
WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion)
|
||||||
|
END CATCH
|
||||||
|
END
|
||||||
|
END
|
||||||
|
`,
|
||||||
mr.upsertProcFullName,
|
mr.upsertProcFullName,
|
||||||
mr.pkColumnType,
|
mr.pkColumnType,
|
||||||
m.store.tableName,
|
m.store.tableName,
|
||||||
m.store.tableName,
|
m.store.tableName,
|
||||||
m.store.tableName)
|
m.store.tableName,
|
||||||
|
m.store.tableName,
|
||||||
|
m.store.tableName,
|
||||||
|
m.store.tableName,
|
||||||
|
m.store.tableName,
|
||||||
|
m.store.tableName,
|
||||||
|
)
|
||||||
|
|
||||||
return m.createStoredProcedureIfNotExists(db, mr.upsertProcName, tsql)
|
return m.createStoredProcedureIfNotExists(db, mr.upsertProcName, tsql)
|
||||||
}
|
}
|
||||||
|
|
|
@ -440,23 +440,23 @@ func (s *SQLServer) Delete(req *state.DeleteRequest) error {
|
||||||
res, err = s.db.Exec(s.deleteWithoutETagCommand, sql.Named(keyColumnName, req.Key))
|
res, err = s.db.Exec(s.deleteWithoutETagCommand, sql.Named(keyColumnName, req.Key))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// err represents errors thrown by the stored procedure or the database itself
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if req.ETag != nil {
|
|
||||||
return state.NewETagError(state.ETagMismatch, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if the row with matching key (and ETag if specified) is not found, then the stored procedure returns 0 rows affected
|
||||||
rows, err := res.RowsAffected()
|
rows, err := res.RowsAffected()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if rows != 1 {
|
// When an ETAG is specified, a row must have been deleted or else we return an ETag mismatch error
|
||||||
return fmt.Errorf("items was not updated")
|
if rows != 1 && req.ETag != nil && *req.ETag != "" {
|
||||||
|
return state.NewETagError(state.ETagMismatch, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// successful deletion, or noop if no ETAG specified
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -578,15 +578,22 @@ func (s *SQLServer) executeSet(db dbExecutor, req *state.SetRequest) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
etag := sql.Named(rowVersionColumnName, nil)
|
etag := sql.Named(rowVersionColumnName, nil)
|
||||||
if req.ETag != nil {
|
if req.ETag != nil && *req.ETag != "" {
|
||||||
var b []byte
|
var b []byte
|
||||||
b, err = hex.DecodeString(*req.ETag)
|
b, err = hex.DecodeString(*req.ETag)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return state.NewETagError(state.ETagInvalid, err)
|
return state.NewETagError(state.ETagInvalid, err)
|
||||||
}
|
}
|
||||||
etag.Value = b
|
etag = sql.Named(rowVersionColumnName, b)
|
||||||
}
|
}
|
||||||
res, err := db.Exec(s.upsertCommand, sql.Named(keyColumnName, req.Key), sql.Named("Data", string(bytes)), etag)
|
|
||||||
|
var res sql.Result
|
||||||
|
if req.Options.Concurrency == state.FirstWrite {
|
||||||
|
res, err = db.Exec(s.upsertCommand, sql.Named(keyColumnName, req.Key), sql.Named("Data", string(bytes)), etag, sql.Named("FirstWrite", 1))
|
||||||
|
} else {
|
||||||
|
res, err = db.Exec(s.upsertCommand, sql.Named(keyColumnName, req.Key), sql.Named("Data", string(bytes)), etag, sql.Named("FirstWrite", 0))
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if req.ETag != nil && *req.ETag != "" {
|
if req.ETag != nil && *req.ETag != "" {
|
||||||
return state.NewETagError(state.ETagMismatch, err)
|
return state.NewETagError(state.ETagMismatch, err)
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
apiVersion: dapr.io/v1alpha1
|
||||||
|
kind: Component
|
||||||
|
metadata:
|
||||||
|
name: statestore
|
||||||
|
spec:
|
||||||
|
type: state.sqlserver
|
||||||
|
metadata:
|
||||||
|
- name: connectionString
|
||||||
|
value: "server=localhost;user id=sa;password=Pass@Word1;port=1433;"
|
||||||
|
- name: tableName
|
||||||
|
value: mytable
|
|
@ -7,3 +7,5 @@ components:
|
||||||
allOperations: true
|
allOperations: true
|
||||||
- component: cosmosdb
|
- component: cosmosdb
|
||||||
allOperations: true
|
allOperations: true
|
||||||
|
- component: sqlserver
|
||||||
|
allOperations: true
|
||||||
|
|
|
@ -51,6 +51,7 @@ import (
|
||||||
s_cosmosdb "github.com/dapr/components-contrib/state/azure/cosmosdb"
|
s_cosmosdb "github.com/dapr/components-contrib/state/azure/cosmosdb"
|
||||||
s_mongodb "github.com/dapr/components-contrib/state/mongodb"
|
s_mongodb "github.com/dapr/components-contrib/state/mongodb"
|
||||||
s_redis "github.com/dapr/components-contrib/state/redis"
|
s_redis "github.com/dapr/components-contrib/state/redis"
|
||||||
|
s_sqlserver "github.com/dapr/components-contrib/state/sqlserver"
|
||||||
conf_bindings "github.com/dapr/components-contrib/tests/conformance/bindings"
|
conf_bindings "github.com/dapr/components-contrib/tests/conformance/bindings"
|
||||||
conf_pubsub "github.com/dapr/components-contrib/tests/conformance/pubsub"
|
conf_pubsub "github.com/dapr/components-contrib/tests/conformance/pubsub"
|
||||||
conf_secret "github.com/dapr/components-contrib/tests/conformance/secretstores"
|
conf_secret "github.com/dapr/components-contrib/tests/conformance/secretstores"
|
||||||
|
@ -363,6 +364,8 @@ func loadStateStore(tc TestComponent) state.Store {
|
||||||
store = s_cosmosdb.NewCosmosDBStateStore(testLogger)
|
store = s_cosmosdb.NewCosmosDBStateStore(testLogger)
|
||||||
case "mongodb":
|
case "mongodb":
|
||||||
store = s_mongodb.NewMongoDB(testLogger)
|
store = s_mongodb.NewMongoDB(testLogger)
|
||||||
|
case "sqlserver":
|
||||||
|
store = s_sqlserver.NewSQLServerStateStore(testLogger)
|
||||||
default:
|
default:
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -235,6 +235,7 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
|
||||||
t.Run("delete", func(t *testing.T) {
|
t.Run("delete", func(t *testing.T) {
|
||||||
for _, scenario := range scenarios {
|
for _, scenario := range scenarios {
|
||||||
if !scenario.bulkOnly && scenario.toBeDeleted {
|
if !scenario.bulkOnly && scenario.toBeDeleted {
|
||||||
|
// this also deletes two keys that were not inserted in the set operation
|
||||||
t.Logf("Deleting %s", scenario.key)
|
t.Logf("Deleting %s", scenario.key)
|
||||||
err := statestore.Delete(&state.DeleteRequest{
|
err := statestore.Delete(&state.DeleteRequest{
|
||||||
Key: scenario.key,
|
Key: scenario.key,
|
||||||
|
|
Loading…
Reference in New Issue