/* Copyright 2021 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package sqlserver import ( "context" "database/sql" "fmt" ) type migrator interface { executeMigrations(context.Context) (migrationResult, error) } type migration struct { metadata *sqlServerMetadata } type migrationResult struct { itemRefTableTypeName string upsertProcName string upsertProcFullName string pkColumnType string getCommand string deleteWithETagCommand string deleteWithoutETagCommand string } func newMigration(metadata *sqlServerMetadata) migrator { return &migration{ metadata: metadata, } } func (m *migration) newMigrationResult() migrationResult { r := migrationResult{ itemRefTableTypeName: fmt.Sprintf("[%s].%s_Table", m.metadata.Schema, m.metadata.TableName), upsertProcName: fmt.Sprintf("sp_Upsert_v4_%s", m.metadata.TableName), getCommand: fmt.Sprintf("SELECT [Data], [RowVersion], [ExpireDate] FROM [%s].[%s] WHERE [Key] = @Key AND ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE())", m.metadata.Schema, m.metadata.TableName), deleteWithETagCommand: fmt.Sprintf(`DELETE [%s].[%s] WHERE [Key]=@Key AND [RowVersion]=@RowVersion`, m.metadata.Schema, m.metadata.TableName), deleteWithoutETagCommand: fmt.Sprintf(`DELETE [%s].[%s] WHERE [Key]=@Key`, m.metadata.Schema, m.metadata.TableName), } r.upsertProcFullName = fmt.Sprintf("[%s].%s", m.metadata.Schema, r.upsertProcName) //nolint:exhaustive switch m.metadata.keyTypeParsed { case StringKeyType: r.pkColumnType = fmt.Sprintf("NVARCHAR(%d)", m.metadata.keyLengthParsed) case UUIDKeyType: r.pkColumnType = "uniqueidentifier" case IntegerKeyType: r.pkColumnType = "int" } return r } /* #nosec. */ func (m *migration) executeMigrations(ctx context.Context) (migrationResult, error) { r := m.newMigrationResult() conn, hasDatabase, err := m.metadata.GetConnector(false) if err != nil { return r, err } db := sql.OpenDB(conn) // If the user provides a database in the connection string do not attempt // to create the database. This work as the component did before adding the // support to create the db. if hasDatabase { // Schedule close of connection defer db.Close() } else { err = m.ensureDatabaseExists(ctx, db) if err != nil { return r, fmt.Errorf("failed to create database: %w", err) } // Close the existing connection db.Close() // Re connect with a database-specific connection conn, _, err = m.metadata.GetConnector(true) if err != nil { return r, err } db = sql.OpenDB(conn) // Schedule close of new connection defer db.Close() } err = m.ensureSchemaExists(ctx, db) if err != nil { return r, fmt.Errorf("failed to create db schema: %w", err) } err = m.ensureTableExists(ctx, db, r) if err != nil { return r, fmt.Errorf("failed to create db table: %w", err) } err = m.ensureStoredProcedureExists(ctx, db, r) if err != nil { return r, fmt.Errorf("failed to create stored procedures: %w", err) } for _, ix := range m.metadata.indexedPropertiesParsed { err = m.ensureIndexedPropertyExists(ctx, db, ix) if err != nil { return r, err } } return r, nil } func runCommand(ctx context.Context, db *sql.DB, tsql string) error { if _, err := db.ExecContext(ctx, tsql); err != nil { return err } return nil } /* #nosec. */ func (m *migration) ensureIndexedPropertyExists(ctx context.Context, db *sql.DB, ix IndexedProperty) error { indexName := "IX_" + ix.ColumnName tsql := fmt.Sprintf(` IF (NOT EXISTS(SELECT object_id FROM sys.indexes WHERE object_id = OBJECT_ID('[%s].%s') AND name='%s')) CREATE INDEX %s ON [%s].[%s]([%s])`, m.metadata.Schema, m.metadata.TableName, indexName, indexName, m.metadata.Schema, m.metadata.TableName, ix.ColumnName) return runCommand(ctx, db, tsql) } /* #nosec. */ func (m *migration) ensureDatabaseExists(ctx context.Context, db *sql.DB) error { tsql := fmt.Sprintf(` IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = N'%s') CREATE DATABASE [%s]`, m.metadata.DatabaseName, m.metadata.DatabaseName) return runCommand(ctx, db, tsql) } /* #nosec. */ func (m *migration) ensureSchemaExists(ctx context.Context, db *sql.DB) error { tsql := fmt.Sprintf(` IF NOT EXISTS(SELECT * FROM sys.schemas WHERE name = N'%s') EXEC('CREATE SCHEMA [%s]')`, m.metadata.Schema, m.metadata.Schema) return runCommand(ctx, db, tsql) } /* #nosec. */ func (m *migration) ensureTableExists(ctx context.Context, db *sql.DB, r migrationResult) error { tsql := fmt.Sprintf(` IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s') CREATE TABLE [%s].[%s] ( [Key] %s CONSTRAINT PK_%s PRIMARY KEY, [Data] NVARCHAR(MAX) NOT NULL, [InsertDate] DateTime2 NOT NULL DEFAULT(GETDATE()), [UpdateDate] DateTime2 NULL, [ExpireDate] DateTime2 NULL,`, m.metadata.Schema, m.metadata.TableName, m.metadata.Schema, m.metadata.TableName, r.pkColumnType, m.metadata.TableName) for _, prop := range m.metadata.indexedPropertiesParsed { if prop.Type != "" { tsql += fmt.Sprintf("\n [%s] AS CONVERT(%s, JSON_VALUE(Data, '$.%s')) PERSISTED,", prop.ColumnName, prop.Type, prop.Property) } else { tsql += fmt.Sprintf("\n [%s] AS JSON_VALUE(Data, '$.%s') PERSISTED,", prop.ColumnName, prop.Property) } } tsql += ` [RowVersion] ROWVERSION NOT NULL) ` if err := runCommand(ctx, db, tsql); err != nil { return err } // If table was created before v1.11 tsql = fmt.Sprintf(`IF NOT EXISTS (SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%[1]s' AND TABLE_NAME = '%[2]s' AND COLUMN_NAME = 'ExpireDate') ALTER TABLE [%[1]s].[%[2]s] ADD [ExpireDate] DateTime2 NULL`, m.metadata.Schema, m.metadata.TableName) if err := runCommand(ctx, db, tsql); err != nil { return fmt.Errorf("failed to ensure ExpireDate column: %w", err) } tsql = fmt.Sprintf(` IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%[1]s' AND TABLE_NAME = '%[2]s') CREATE TABLE [%[1]s].[%[2]s] ( [Key] %[3]s CONSTRAINT PK_%[4]s PRIMARY KEY, [Value] NVARCHAR(MAX) NOT NULL )`, m.metadata.Schema, m.metadata.MetadataTableName, r.pkColumnType, m.metadata.MetadataTableName) if err := runCommand(ctx, db, tsql); err != nil { return err } return nil } /* #nosec. */ func (m *migration) ensureTypeExists(ctx context.Context, db *sql.DB, mr migrationResult) error { tsql := fmt.Sprintf(` IF type_id('[%s].%s_Table') IS NULL CREATE TYPE [%s].%s_Table AS TABLE ( [Key] %s NOT NULL, [RowVersion] BINARY(8) ) `, m.metadata.Schema, m.metadata.TableName, m.metadata.Schema, m.metadata.TableName, mr.pkColumnType) return runCommand(ctx, db, tsql) } func (m *migration) ensureStoredProcedureExists(ctx context.Context, db *sql.DB, mr migrationResult) error { err := m.ensureTypeExists(ctx, db, mr) if err != nil { return err } err = m.ensureUpsertStoredProcedureExists(ctx, db, mr) if err != nil { return err } return nil } /* #nosec. */ func (m *migration) createStoredProcedureIfNotExists(ctx context.Context, db *sql.DB, name string, escapedDefinition string) error { tsql := fmt.Sprintf(` IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[%s].[%s]') AND type in (N'P', N'PC')) BEGIN execute ('%s') END`, m.metadata.Schema, name, escapedDefinition) return runCommand(ctx, db, tsql) } /* #nosec. */ //nolint:dupword func (m *migration) ensureUpsertStoredProcedureExists(ctx context.Context, db *sql.DB, mr migrationResult) error { tsql := fmt.Sprintf(` CREATE PROCEDURE %[1]s ( @Key %[2]s, @Data NVARCHAR(MAX), @TTL INT, @RowVersion BINARY(8), @FirstWrite BIT ) AS BEGIN IF (@FirstWrite=1) BEGIN IF (@RowVersion IS NOT NULL) BEGIN BEGIN TRANSACTION; IF NOT EXISTS (SELECT * FROM [%[3]s] WHERE [KEY]=@KEY AND RowVersion = @RowVersion) BEGIN THROW 2601, ''FIRST-WRITE: COMPETING RECORD ALREADY WRITTEN.'', 1 END BEGIN UPDATE [%[3]s] SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END WHERE [Key]=@Key AND RowVersion = @RowVersion END COMMIT; END ELSE BEGIN BEGIN TRANSACTION; IF EXISTS (SELECT * FROM [%[3]s] WHERE [KEY]=@KEY) BEGIN THROW 2601, ''FIRST-WRITE: COMPETING RECORD ALREADY WRITTEN.'', 1 END BEGIN BEGIN TRY INSERT INTO [%[3]s] ([Key], [Data], ExpireDate) VALUES (@Key, @Data, CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END) END TRY BEGIN CATCH IF ERROR_NUMBER() IN (2601, 2627) UPDATE [%[3]s] SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion) END CATCH END COMMIT; END END ELSE BEGIN IF (@RowVersion IS NOT NULL) BEGIN UPDATE [%[3]s] SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END WHERE [Key]=@Key AND RowVersion = @RowVersion RETURN END ELSE BEGIN BEGIN TRY INSERT INTO [%[3]s] ([Key], [Data], ExpireDate) VALUES (@Key, @Data, CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END) END TRY BEGIN CATCH IF ERROR_NUMBER() IN (2601, 2627) UPDATE [%[3]s] SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion) END CATCH END END END `, mr.upsertProcFullName, mr.pkColumnType, m.metadata.TableName, ) return m.createStoredProcedureIfNotExists(ctx, db, mr.upsertProcName, tsql) }