357 lines
10 KiB
Go
357 lines
10 KiB
Go
/*
|
|
Copyright 2021 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package sqlserver
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
)
|
|
|
|
type migrator interface {
|
|
executeMigrations(context.Context) (migrationResult, error)
|
|
}
|
|
|
|
type migration struct {
|
|
metadata *sqlServerMetadata
|
|
}
|
|
|
|
type migrationResult struct {
|
|
itemRefTableTypeName string
|
|
upsertProcName string
|
|
upsertProcFullName string
|
|
pkColumnType string
|
|
getCommand string
|
|
deleteWithETagCommand string
|
|
deleteWithoutETagCommand string
|
|
}
|
|
|
|
func newMigration(metadata *sqlServerMetadata) migrator {
|
|
return &migration{
|
|
metadata: metadata,
|
|
}
|
|
}
|
|
|
|
func (m *migration) newMigrationResult() migrationResult {
|
|
r := migrationResult{
|
|
itemRefTableTypeName: fmt.Sprintf("[%s].%s_Table", m.metadata.SchemaName, m.metadata.TableName),
|
|
upsertProcName: fmt.Sprintf("sp_Upsert_v5_%s", m.metadata.TableName),
|
|
getCommand: fmt.Sprintf("SELECT [Data], [RowVersion], [ExpireDate] FROM [%s].[%s] WHERE [Key] = @Key AND ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE())", m.metadata.SchemaName, m.metadata.TableName),
|
|
deleteWithETagCommand: fmt.Sprintf(`DELETE [%s].[%s] WHERE [Key]=@Key AND [RowVersion]=@RowVersion`, m.metadata.SchemaName, m.metadata.TableName),
|
|
deleteWithoutETagCommand: fmt.Sprintf(`DELETE [%s].[%s] WHERE [Key]=@Key`, m.metadata.SchemaName, m.metadata.TableName),
|
|
}
|
|
|
|
r.upsertProcFullName = fmt.Sprintf("[%s].%s", m.metadata.SchemaName, r.upsertProcName)
|
|
|
|
//nolint:exhaustive
|
|
switch m.metadata.keyTypeParsed {
|
|
case StringKeyType:
|
|
r.pkColumnType = fmt.Sprintf("NVARCHAR(%d)", m.metadata.keyLengthParsed)
|
|
|
|
case UUIDKeyType:
|
|
r.pkColumnType = "uniqueidentifier"
|
|
|
|
case IntegerKeyType:
|
|
r.pkColumnType = "int"
|
|
}
|
|
|
|
return r
|
|
}
|
|
|
|
/* #nosec. */
|
|
func (m *migration) executeMigrations(ctx context.Context) (migrationResult, error) {
|
|
r := m.newMigrationResult()
|
|
|
|
conn, hasDatabase, err := m.metadata.GetConnector(false)
|
|
if err != nil {
|
|
return r, err
|
|
}
|
|
db := sql.OpenDB(conn)
|
|
|
|
// If the user provides a database in the connection string do not attempt
|
|
// to create the database. This work as the component did before adding the
|
|
// support to create the db.
|
|
if hasDatabase {
|
|
// Schedule close of connection
|
|
defer db.Close()
|
|
} else {
|
|
err = m.ensureDatabaseExists(ctx, db)
|
|
if err != nil {
|
|
return r, fmt.Errorf("failed to create database: %w", err)
|
|
}
|
|
|
|
// Close the existing connection
|
|
db.Close()
|
|
|
|
// Re connect with a database-specific connection
|
|
conn, _, err = m.metadata.GetConnector(true)
|
|
if err != nil {
|
|
return r, err
|
|
}
|
|
db = sql.OpenDB(conn)
|
|
|
|
// Schedule close of new connection
|
|
defer db.Close()
|
|
}
|
|
|
|
err = m.ensureSchemaExists(ctx, db)
|
|
if err != nil {
|
|
return r, fmt.Errorf("failed to create db schema: %w", err)
|
|
}
|
|
|
|
err = m.ensureTableExists(ctx, db, r)
|
|
if err != nil {
|
|
return r, fmt.Errorf("failed to create db table: %w", err)
|
|
}
|
|
|
|
err = m.ensureStoredProcedureExists(ctx, db, r)
|
|
if err != nil {
|
|
return r, fmt.Errorf("failed to create stored procedures: %w", err)
|
|
}
|
|
|
|
for _, ix := range m.metadata.indexedPropertiesParsed {
|
|
err = m.ensureIndexedPropertyExists(ctx, db, ix)
|
|
if err != nil {
|
|
return r, err
|
|
}
|
|
}
|
|
|
|
return r, nil
|
|
}
|
|
|
|
func runCommand(ctx context.Context, db *sql.DB, tsql string) error {
|
|
if _, err := db.ExecContext(ctx, tsql); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
/* #nosec. */
|
|
func (m *migration) ensureIndexedPropertyExists(ctx context.Context, db *sql.DB, ix IndexedProperty) error {
|
|
indexName := "IX_" + ix.ColumnName
|
|
|
|
tsql := fmt.Sprintf(`
|
|
IF (NOT EXISTS(SELECT object_id
|
|
FROM sys.indexes
|
|
WHERE object_id = OBJECT_ID('[%s].%s')
|
|
AND name='%s'))
|
|
CREATE INDEX %s ON [%s].[%s]([%s])`,
|
|
m.metadata.SchemaName,
|
|
m.metadata.TableName,
|
|
indexName,
|
|
indexName,
|
|
m.metadata.SchemaName,
|
|
m.metadata.TableName,
|
|
ix.ColumnName)
|
|
|
|
return runCommand(ctx, db, tsql)
|
|
}
|
|
|
|
/* #nosec. */
|
|
func (m *migration) ensureDatabaseExists(ctx context.Context, db *sql.DB) error {
|
|
tsql := fmt.Sprintf(`
|
|
IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = N'%s')
|
|
CREATE DATABASE [%s]`,
|
|
m.metadata.DatabaseName, m.metadata.DatabaseName)
|
|
|
|
return runCommand(ctx, db, tsql)
|
|
}
|
|
|
|
/* #nosec. */
|
|
func (m *migration) ensureSchemaExists(ctx context.Context, db *sql.DB) error {
|
|
tsql := fmt.Sprintf(`
|
|
IF NOT EXISTS(SELECT * FROM sys.schemas WHERE name = N'%s')
|
|
EXEC('CREATE SCHEMA [%s]')`,
|
|
m.metadata.SchemaName, m.metadata.SchemaName)
|
|
|
|
return runCommand(ctx, db, tsql)
|
|
}
|
|
|
|
/* #nosec. */
|
|
func (m *migration) ensureTableExists(ctx context.Context, db *sql.DB, r migrationResult) error {
|
|
tsql := fmt.Sprintf(`
|
|
IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s')
|
|
CREATE TABLE [%s].[%s] (
|
|
[Key] %s CONSTRAINT PK_%s PRIMARY KEY,
|
|
[Data] NVARCHAR(MAX) NOT NULL,
|
|
[InsertDate] DateTime2 NOT NULL DEFAULT(GETDATE()),
|
|
[UpdateDate] DateTime2 NULL,
|
|
[ExpireDate] DateTime2 NULL,`,
|
|
m.metadata.SchemaName, m.metadata.TableName, m.metadata.SchemaName, m.metadata.TableName, r.pkColumnType, m.metadata.TableName)
|
|
|
|
for _, prop := range m.metadata.indexedPropertiesParsed {
|
|
if prop.Type != "" {
|
|
tsql += fmt.Sprintf("\n [%s] AS CONVERT(%s, JSON_VALUE(Data, '$.%s')) PERSISTED,", prop.ColumnName, prop.Type, prop.Property)
|
|
} else {
|
|
tsql += fmt.Sprintf("\n [%s] AS JSON_VALUE(Data, '$.%s') PERSISTED,", prop.ColumnName, prop.Property)
|
|
}
|
|
}
|
|
|
|
tsql += `
|
|
[RowVersion] ROWVERSION NOT NULL)
|
|
`
|
|
|
|
if err := runCommand(ctx, db, tsql); err != nil {
|
|
return err
|
|
}
|
|
|
|
// If table was created before v1.11
|
|
tsql = fmt.Sprintf(`IF NOT EXISTS (SELECT column_name
|
|
FROM INFORMATION_SCHEMA.COLUMNS
|
|
WHERE TABLE_SCHEMA = '%[1]s' AND TABLE_NAME = '%[2]s'
|
|
AND COLUMN_NAME = 'ExpireDate')
|
|
ALTER TABLE [%[1]s].[%[2]s] ADD [ExpireDate] DateTime2 NULL`, m.metadata.SchemaName, m.metadata.TableName)
|
|
if err := runCommand(ctx, db, tsql); err != nil {
|
|
return fmt.Errorf("failed to ensure ExpireDate column: %w", err)
|
|
}
|
|
|
|
tsql = fmt.Sprintf(`
|
|
IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%[1]s' AND TABLE_NAME = '%[2]s')
|
|
CREATE TABLE [%[1]s].[%[2]s] (
|
|
[Key] %[3]s CONSTRAINT PK_%[4]s PRIMARY KEY,
|
|
[Value] NVARCHAR(MAX) NOT NULL
|
|
)`, m.metadata.SchemaName, m.metadata.MetadataTableName, r.pkColumnType, m.metadata.MetadataTableName)
|
|
if err := runCommand(ctx, db, tsql); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
/* #nosec. */
|
|
func (m *migration) ensureTypeExists(ctx context.Context, db *sql.DB, mr migrationResult) error {
|
|
tsql := fmt.Sprintf(`
|
|
IF type_id('[%s].%s_Table') IS NULL
|
|
CREATE TYPE [%s].%s_Table AS TABLE
|
|
(
|
|
[Key] %s NOT NULL,
|
|
[RowVersion] BINARY(8)
|
|
)
|
|
`, m.metadata.SchemaName, m.metadata.TableName, m.metadata.SchemaName, m.metadata.TableName, mr.pkColumnType)
|
|
|
|
return runCommand(ctx, db, tsql)
|
|
}
|
|
|
|
func (m *migration) ensureStoredProcedureExists(ctx context.Context, db *sql.DB, mr migrationResult) error {
|
|
err := m.ensureTypeExists(ctx, db, mr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = m.ensureUpsertStoredProcedureExists(ctx, db, mr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
/* #nosec. */
|
|
func (m *migration) createStoredProcedureIfNotExists(ctx context.Context, db *sql.DB, name string, escapedDefinition string) error {
|
|
tsql := fmt.Sprintf(`
|
|
IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[%s].[%s]') AND type in (N'P', N'PC'))
|
|
BEGIN
|
|
execute ('%s')
|
|
END`,
|
|
m.metadata.SchemaName,
|
|
name,
|
|
escapedDefinition)
|
|
|
|
return runCommand(ctx, db, tsql)
|
|
}
|
|
|
|
/* #nosec. */
|
|
//nolint:dupword
|
|
func (m *migration) ensureUpsertStoredProcedureExists(ctx context.Context, db *sql.DB, mr migrationResult) error {
|
|
tsql := fmt.Sprintf(`
|
|
CREATE PROCEDURE %[1]s (
|
|
@Key %[2]s,
|
|
@Data NVARCHAR(MAX),
|
|
@TTL INT,
|
|
@RowVersion BINARY(8),
|
|
@FirstWrite BIT
|
|
) AS
|
|
BEGIN
|
|
IF (@FirstWrite=1)
|
|
BEGIN
|
|
IF (@RowVersion IS NOT NULL)
|
|
BEGIN
|
|
BEGIN TRANSACTION;
|
|
IF NOT EXISTS (SELECT * FROM [%[3]s] WHERE [Key]=@Key AND RowVersion = @RowVersion)
|
|
BEGIN
|
|
THROW 2601, ''FIRST-WRITE: COMPETING RECORD ALREADY WRITTEN.'', 1
|
|
END
|
|
BEGIN
|
|
UPDATE [%[3]s]
|
|
SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END
|
|
WHERE [Key]=@Key AND RowVersion = @RowVersion
|
|
END
|
|
COMMIT;
|
|
END
|
|
ELSE
|
|
BEGIN
|
|
BEGIN TRANSACTION;
|
|
IF EXISTS (SELECT * FROM [%[3]s] WHERE [Key]=@Key)
|
|
BEGIN
|
|
THROW 2601, ''FIRST-WRITE: COMPETING RECORD ALREADY WRITTEN.'', 1
|
|
END
|
|
BEGIN
|
|
BEGIN TRY
|
|
INSERT INTO [%[3]s] ([Key], [Data], ExpireDate) VALUES (@Key, @Data, CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END)
|
|
END TRY
|
|
|
|
BEGIN CATCH
|
|
IF ERROR_NUMBER() IN (2601, 2627)
|
|
UPDATE [%[3]s]
|
|
SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END
|
|
WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion)
|
|
END CATCH
|
|
END
|
|
COMMIT;
|
|
END
|
|
END
|
|
ELSE
|
|
BEGIN
|
|
IF (@RowVersion IS NOT NULL)
|
|
BEGIN
|
|
UPDATE [%[3]s]
|
|
SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END
|
|
WHERE [Key]=@Key AND RowVersion = @RowVersion
|
|
RETURN
|
|
END
|
|
ELSE
|
|
BEGIN
|
|
BEGIN TRY
|
|
INSERT INTO [%[3]s] ([Key], [Data], ExpireDate) VALUES (@Key, @Data, CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END)
|
|
END TRY
|
|
|
|
BEGIN CATCH
|
|
IF ERROR_NUMBER() IN (2601, 2627)
|
|
UPDATE [%[3]s]
|
|
SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END
|
|
WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion)
|
|
END CATCH
|
|
END
|
|
END
|
|
END
|
|
`,
|
|
mr.upsertProcFullName,
|
|
mr.pkColumnType,
|
|
m.metadata.TableName,
|
|
)
|
|
|
|
return m.createStoredProcedureIfNotExists(ctx, db, mr.upsertProcName, tsql)
|
|
}
|