diff --git a/.github/infrastructure/docker-compose-sqlserver.yml b/.github/infrastructure/docker-compose-sqlserver.yml new file mode 100644 index 000000000..5b041506a --- /dev/null +++ b/.github/infrastructure/docker-compose-sqlserver.yml @@ -0,0 +1,9 @@ +version: '2' +services: + sqlserver: + image: mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04 + ports: + - "1433:1433" + environment: + ACCEPT_EULA: Y + SA_PASSWORD: "Pass@Word1" \ No newline at end of file diff --git a/.github/workflows/conformance.yml b/.github/workflows/conformance.yml index a1a43be3d..6134b3923 100644 --- a/.github/workflows/conformance.yml +++ b/.github/workflows/conformance.yml @@ -62,6 +62,7 @@ jobs: - secretstores.localfile - state.mongodb - state.redis + - state.sqlserver EOF ) echo "::set-output name=pr-components::$PR_COMPONENTS" @@ -191,6 +192,10 @@ jobs: mongodb-replica-set: test-rs if: contains(matrix.component, 'mongodb') + - name: Start sqlserver + run: docker-compose -f ./.github/infrastructure/docker-compose-sqlserver.yml -p sqlserver up -d + if: contains(matrix.component, 'sqlserver') + - name: Start kafka run: docker-compose -f ./.github/infrastructure/docker-compose-kafka.yml -p kafka up -d if: contains(matrix.component, 'kafka') diff --git a/go.mod b/go.mod index 31dcd4432..37c9c2cac 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,7 @@ require ( github.com/dancannon/gorethink v4.0.0+incompatible github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233 github.com/deepmap/oapi-codegen v1.8.1 // indirect - github.com/denisenkom/go-mssqldb v0.0.0-20191128021309-1d7a30a10f73 + github.com/denisenkom/go-mssqldb v0.0.0-20210411162248-d9abbec934ba github.com/dghubble/go-twitter v0.0.0-20190719072343-39e5462e111f github.com/dghubble/oauth1 v0.6.0 github.com/didip/tollbooth v4.0.2+incompatible diff --git a/go.sum b/go.sum index c59644713..0a9f25ac4 100644 --- a/go.sum +++ b/go.sum @@ -273,8 +273,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/deepmap/oapi-codegen v1.3.6/go.mod h1:aBozjEveG+33xPiP55Iw/XbVkhtZHEGLq3nxlX0+hfU= github.com/deepmap/oapi-codegen v1.8.1 h1:gSKgzu1DvWfRctnr0UVwieWkg1LEecP0C2htZyBwDTA= github.com/deepmap/oapi-codegen v1.8.1/go.mod h1:YLgSKSDv/bZQB7N4ws6luhozi3cEdRktEqrX88CvjIw= -github.com/denisenkom/go-mssqldb v0.0.0-20191128021309-1d7a30a10f73 h1:OGNva6WhsKst5OZf7eZOklDztV3hwtTHovdrLHV+MsA= -github.com/denisenkom/go-mssqldb v0.0.0-20191128021309-1d7a30a10f73/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= +github.com/denisenkom/go-mssqldb v0.0.0-20210411162248-d9abbec934ba h1:HuzamveGKQH9cN1TrsZgEoG0sHvTa5j3LKquWaHR3sY= +github.com/denisenkom/go-mssqldb v0.0.0-20210411162248-d9abbec934ba/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA= github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY= github.com/dghubble/go-twitter v0.0.0-20190719072343-39e5462e111f h1:M2wB039zeS1/LZtN/3A7tWyfctiOBL4ty5PURBmDdWU= diff --git a/state/sqlserver/migration.go b/state/sqlserver/migration.go index 248a6e71a..848472377 100644 --- a/state/sqlserver/migration.go +++ b/state/sqlserver/migration.go @@ -126,8 +126,7 @@ func (m *migration) executeMigrations() (migrationResult, error) { } func runCommand(tsql string, db *sql.DB) error { - _, err := db.Exec(tsql) - if err != nil { + if _, err := db.Exec(tsql); err != nil { return err } @@ -272,35 +271,85 @@ func (m *migration) createStoredProcedureIfNotExists(db *sql.DB, name string, es /* #nosec */ func (m *migration) ensureUpsertStoredProcedureExists(db *sql.DB, mr migrationResult) error { tsql := fmt.Sprintf(` - CREATE PROCEDURE %s ( - @Key %s, - @Data NVARCHAR(MAX), - @RowVersion BINARY(8)) - AS - IF (@RowVersion IS NOT NULL) - BEGIN - UPDATE [%s] - SET [Data]=@Data, UpdateDate=GETDATE() - WHERE [Key]=@Key AND RowVersion = @RowVersion - - RETURN - END - - BEGIN TRY - INSERT INTO [%s] ([Key], [Data]) VALUES (@Key, @Data); - END TRY - - BEGIN CATCH - IF ERROR_NUMBER() IN (2601, 2627) - UPDATE [%s] - SET [Data]=@Data, UpdateDate=GETDATE() - WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion) - END CATCH`, + CREATE PROCEDURE %s ( + @Key %s, + @Data NVARCHAR(MAX), + @RowVersion BINARY(8), + @FirstWrite BIT) + AS + IF (@FirstWrite=1) + BEGIN + IF (@RowVersion IS NOT NULL) + BEGIN + BEGIN TRANSACTION; + IF NOT EXISTS (SELECT * FROM [%s] WHERE [KEY]=@KEY AND RowVersion = @RowVersion) + BEGIN + THROW 2601, ''FIRST-WRITE: COMPETING RECORD ALREADY WRITTEN.'', 1 + END + BEGIN + UPDATE [%s] + SET [Data]=@Data, UpdateDate=GETDATE() + WHERE [Key]=@Key AND RowVersion = @RowVersion + END + COMMIT; + END + ELSE + BEGIN + BEGIN TRANSACTION; + IF EXISTS (SELECT * FROM [%s] WHERE [KEY]=@KEY) + BEGIN + THROW 2601, ''FIRST-WRITE: COMPETING RECORD ALREADY WRITTEN.'', 1 + END + BEGIN + BEGIN TRY + INSERT INTO [%s] ([Key], [Data]) VALUES (@Key, @Data); + END TRY + + BEGIN CATCH + IF ERROR_NUMBER() IN (2601, 2627) + UPDATE [%s] + SET [Data]=@Data, UpdateDate=GETDATE() + WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion) + END CATCH + END + COMMIT; + END + END + ELSE + BEGIN + IF (@RowVersion IS NOT NULL) + BEGIN + UPDATE [%s] + SET [Data]=@Data, UpdateDate=GETDATE() + WHERE [Key]=@Key AND RowVersion = @RowVersion + RETURN + END + ELSE + BEGIN + BEGIN TRY + INSERT INTO [%s] ([Key], [Data]) VALUES (@Key, @Data); + END TRY + + BEGIN CATCH + IF ERROR_NUMBER() IN (2601, 2627) + UPDATE [%s] + SET [Data]=@Data, UpdateDate=GETDATE() + WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion) + END CATCH + END + END + `, mr.upsertProcFullName, mr.pkColumnType, m.store.tableName, m.store.tableName, - m.store.tableName) + m.store.tableName, + m.store.tableName, + m.store.tableName, + m.store.tableName, + m.store.tableName, + m.store.tableName, + ) return m.createStoredProcedureIfNotExists(db, mr.upsertProcName, tsql) } diff --git a/state/sqlserver/sqlserver.go b/state/sqlserver/sqlserver.go index 08c0d48ff..53f827550 100644 --- a/state/sqlserver/sqlserver.go +++ b/state/sqlserver/sqlserver.go @@ -440,23 +440,23 @@ func (s *SQLServer) Delete(req *state.DeleteRequest) error { res, err = s.db.Exec(s.deleteWithoutETagCommand, sql.Named(keyColumnName, req.Key)) } + // err represents errors thrown by the stored procedure or the database itself if err != nil { - if req.ETag != nil { - return state.NewETagError(state.ETagMismatch, err) - } - return err } + // if the row with matching key (and ETag if specified) is not found, then the stored procedure returns 0 rows affected rows, err := res.RowsAffected() if err != nil { return err } - if rows != 1 { - return fmt.Errorf("items was not updated") + // When an ETAG is specified, a row must have been deleted or else we return an ETag mismatch error + if rows != 1 && req.ETag != nil && *req.ETag != "" { + return state.NewETagError(state.ETagMismatch, nil) } + // successful deletion, or noop if no ETAG specified return nil } @@ -578,15 +578,22 @@ func (s *SQLServer) executeSet(db dbExecutor, req *state.SetRequest) error { return err } etag := sql.Named(rowVersionColumnName, nil) - if req.ETag != nil { + if req.ETag != nil && *req.ETag != "" { var b []byte b, err = hex.DecodeString(*req.ETag) if err != nil { return state.NewETagError(state.ETagInvalid, err) } - etag.Value = b + etag = sql.Named(rowVersionColumnName, b) } - res, err := db.Exec(s.upsertCommand, sql.Named(keyColumnName, req.Key), sql.Named("Data", string(bytes)), etag) + + var res sql.Result + if req.Options.Concurrency == state.FirstWrite { + res, err = db.Exec(s.upsertCommand, sql.Named(keyColumnName, req.Key), sql.Named("Data", string(bytes)), etag, sql.Named("FirstWrite", 1)) + } else { + res, err = db.Exec(s.upsertCommand, sql.Named(keyColumnName, req.Key), sql.Named("Data", string(bytes)), etag, sql.Named("FirstWrite", 0)) + } + if err != nil { if req.ETag != nil && *req.ETag != "" { return state.NewETagError(state.ETagMismatch, err) diff --git a/tests/config/state/sqlserver/statestore.yml b/tests/config/state/sqlserver/statestore.yml new file mode 100644 index 000000000..98d1403d3 --- /dev/null +++ b/tests/config/state/sqlserver/statestore.yml @@ -0,0 +1,11 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.sqlserver + metadata: + - name: connectionString + value: "server=localhost;user id=sa;password=Pass@Word1;port=1433;" + - name: tableName + value: mytable diff --git a/tests/config/state/tests.yml b/tests/config/state/tests.yml index 52a6895ff..5a721ea1a 100644 --- a/tests/config/state/tests.yml +++ b/tests/config/state/tests.yml @@ -7,3 +7,5 @@ components: allOperations: true - component: cosmosdb allOperations: true + - component: sqlserver + allOperations: true diff --git a/tests/conformance/common.go b/tests/conformance/common.go index 0ecd132ae..4e7e7acf0 100644 --- a/tests/conformance/common.go +++ b/tests/conformance/common.go @@ -51,6 +51,7 @@ import ( s_cosmosdb "github.com/dapr/components-contrib/state/azure/cosmosdb" s_mongodb "github.com/dapr/components-contrib/state/mongodb" s_redis "github.com/dapr/components-contrib/state/redis" + s_sqlserver "github.com/dapr/components-contrib/state/sqlserver" conf_bindings "github.com/dapr/components-contrib/tests/conformance/bindings" conf_pubsub "github.com/dapr/components-contrib/tests/conformance/pubsub" conf_secret "github.com/dapr/components-contrib/tests/conformance/secretstores" @@ -363,6 +364,8 @@ func loadStateStore(tc TestComponent) state.Store { store = s_cosmosdb.NewCosmosDBStateStore(testLogger) case "mongodb": store = s_mongodb.NewMongoDB(testLogger) + case "sqlserver": + store = s_sqlserver.NewSQLServerStateStore(testLogger) default: return nil } diff --git a/tests/conformance/state/state.go b/tests/conformance/state/state.go index 0adc2008e..c17bab843 100644 --- a/tests/conformance/state/state.go +++ b/tests/conformance/state/state.go @@ -235,6 +235,7 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St t.Run("delete", func(t *testing.T) { for _, scenario := range scenarios { if !scenario.bulkOnly && scenario.toBeDeleted { + // this also deletes two keys that were not inserted in the set operation t.Logf("Deleting %s", scenario.key) err := statestore.Delete(&state.DeleteRequest{ Key: scenario.key,