diff --git a/.github/infrastructure/docker-compose-cockroachdb.yml b/.github/infrastructure/docker-compose-cockroachdb.yml index 2fa6bff9e..75582673b 100644 --- a/.github/infrastructure/docker-compose-cockroachdb.yml +++ b/.github/infrastructure/docker-compose-cockroachdb.yml @@ -1,9 +1,9 @@ version: '2' services: cockroachdb: - image: cockroachdb/cockroach:v21.2.3 + image: cockroachdb/cockroach:v23.1.13 hostname: cockroachdb - command: start-single-node --cluster-name=single-node --logtostderr=WARNING --log-file-verbosity=WARNING --insecure + command: start-single-node --cluster-name=single-node --insecure restart: always ports: - "26257:26257" diff --git a/common/component/postgresql/v1/postgresql.go b/common/component/postgresql/v1/postgresql.go index 141242c60..7c55b43f4 100644 --- a/common/component/postgresql/v1/postgresql.go +++ b/common/component/postgresql/v1/postgresql.go @@ -64,6 +64,7 @@ type Options struct { type MigrateOptions struct { Logger logger.Logger StateTableName string + KeyPrefixFuncName string MetadataTableName string } @@ -120,6 +121,7 @@ func (p *PostgreSQL) Init(ctx context.Context, meta state.Metadata) error { err = p.migrateFn(ctx, p.db, MigrateOptions{ Logger: p.logger, StateTableName: p.metadata.TableName, + KeyPrefixFuncName: p.metadata.TableName + "_key_prefix", MetadataTableName: p.metadata.MetadataTableName, }) if err != nil { diff --git a/common/component/postgresql/v1/postgresql_query.go b/common/component/postgresql/v1/postgresql_query.go index bd3c63e35..3b551e6fe 100644 --- a/common/component/postgresql/v1/postgresql_query.go +++ b/common/component/postgresql/v1/postgresql_query.go @@ -54,6 +54,7 @@ func (p *PostgreSQLQuery) Features() []state.Feature { state.FeatureTransactional, state.FeatureQueryAPI, state.FeatureTTL, + state.FeatureDeleteWithPrefix, } } diff --git a/state/cockroachdb/cockroachdb.go b/state/cockroachdb/cockroachdb.go index 8e8bbdc91..df9f5d46d 100644 --- a/state/cockroachdb/cockroachdb.go +++ b/state/cockroachdb/cockroachdb.go @@ -72,12 +72,13 @@ WHERE func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgresql.MigrateOptions) error { exists, err := tableExists(ctx, db, opts.StateTableName) if err != nil { - return err + return fmt.Errorf("failed to check if table '%s' exists: %w", opts.StateTableName, err) } if !exists { opts.Logger.Info("Creating CockroachDB state table") - _, err = db.Exec(ctx, fmt.Sprintf(`CREATE TABLE %s ( + _, err = db.Exec(ctx, fmt.Sprintf(` +CREATE TABLE %s ( key text NOT NULL PRIMARY KEY, value jsonb NOT NULL, isbinary boolean NOT NULL, @@ -86,9 +87,10 @@ func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgre updatedate TIMESTAMP WITH TIME ZONE NULL, expiredate TIMESTAMP WITH TIME ZONE NULL, INDEX expiredate_idx (expiredate) -);`, opts.StateTableName)) +)`, + opts.StateTableName)) if err != nil { - return err + return fmt.Errorf("failed to create state table: %w", err) } } @@ -96,27 +98,29 @@ func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgre _, err = db.Exec(ctx, fmt.Sprintf( `ALTER TABLE %s ADD COLUMN IF NOT EXISTS expiredate TIMESTAMP WITH TIME ZONE NULL;`, opts.StateTableName)) if err != nil { - return err + return fmt.Errorf("failed to add expiredate column to state table: %w", err) } _, err = db.Exec(ctx, fmt.Sprintf( `CREATE INDEX IF NOT EXISTS expiredate_idx ON %s (expiredate);`, opts.StateTableName)) if err != nil { - return err + return fmt.Errorf("failed to create expiredate index on state table: %w", err) } exists, err = tableExists(ctx, db, opts.MetadataTableName) if err != nil { - return err + return fmt.Errorf("failed to check if table '%s' exists: %w", opts.MetadataTableName, err) } if !exists { opts.Logger.Info("Creating CockroachDB metadata table") - _, err = db.Exec(ctx, fmt.Sprintf(`CREATE TABLE %s ( - key text NOT NULL PRIMARY KEY, - value text NOT NULL -);`, opts.MetadataTableName)) + _, err = db.Exec(ctx, fmt.Sprintf(` +CREATE TABLE %s ( + key text NOT NULL PRIMARY KEY, + value text NOT NULL +);`, + opts.MetadataTableName)) if err != nil { - return err + return fmt.Errorf("failed to create metadata table: %w", err) } } @@ -125,6 +129,6 @@ func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgre func tableExists(ctx context.Context, db pginterfaces.PGXPoolConn, tableName string) (bool, error) { exists := false - err := db.QueryRow(ctx, "SELECT EXISTS (SELECT * FROM pg_tables where tablename = $1)", tableName).Scan(&exists) + err := db.QueryRow(ctx, "SELECT EXISTS (SELECT * FROM pg_tables WHERE tablename = $1)", tableName).Scan(&exists) return exists, err } diff --git a/state/cockroachdb/metadata.yaml b/state/cockroachdb/metadata.yaml index 6efc793f7..2d24ad79f 100644 --- a/state/cockroachdb/metadata.yaml +++ b/state/cockroachdb/metadata.yaml @@ -9,11 +9,11 @@ urls: - title: Reference url: https://docs.dapr.io/reference/components-reference/supported-state-stores/setup-cockroachdb/ capabilities: + - actorStateStore - crud - transactional - etag - ttl - - actorStateStore authenticationProfiles: - title: "Connection string" description: "Authenticate using a Connection String" diff --git a/state/postgresql/v1/metadata.yaml b/state/postgresql/v1/metadata.yaml index fa3199e5c..b7dc5acda 100644 --- a/state/postgresql/v1/metadata.yaml +++ b/state/postgresql/v1/metadata.yaml @@ -16,6 +16,7 @@ capabilities: - etag - query - ttl + - deleteWithPrefix builtinAuthenticationProfiles: - name: "azuread" metadata: diff --git a/state/postgresql/v1/migrations.go b/state/postgresql/v1/migrations.go index b7034ab75..3f0631c85 100644 --- a/state/postgresql/v1/migrations.go +++ b/state/postgresql/v1/migrations.go @@ -39,14 +39,14 @@ func performMigrations(ctx context.Context, db pginterfaces.PGXPoolConn, opts po opts.Logger.Infof("Creating state table '%s'", opts.StateTableName) _, err := db.Exec( ctx, - fmt.Sprintf( - `CREATE TABLE IF NOT EXISTS %s ( - key text NOT NULL PRIMARY KEY, - value jsonb NOT NULL, - isbinary boolean NOT NULL, - insertdate TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), - updatedate TIMESTAMP WITH TIME ZONE NULL - )`, + fmt.Sprintf(` +CREATE TABLE IF NOT EXISTS %s ( + key text NOT NULL PRIMARY KEY, + value jsonb NOT NULL, + isbinary boolean NOT NULL, + insertdate TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + updatedate TIMESTAMP WITH TIME ZONE NULL +)`, opts.StateTableName, ), ) @@ -68,6 +68,31 @@ func performMigrations(ctx context.Context, db pginterfaces.PGXPoolConn, opts po } return nil }, - }, - ) + // Migration 2: add the "key_prefix" function and "prefix" index to the state table + func(ctx context.Context) error { + // Create the "key_prefix" function + // Then add the "prefix" index to the state table that can be used by DeleteWithPrefix + opts.Logger.Infof("Creating function '%s' and adding 'prefix' index to table '%s'", opts.KeyPrefixFuncName, opts.StateTableName) + _, err := db.Exec( + ctx, + fmt.Sprintf(` +CREATE FUNCTION %[1]s(k text) RETURNS text +LANGUAGE SQL +IMMUTABLE +LEAKPROOF +RETURNS NULL ON NULL INPUT +RETURN + array_to_string(trim_array(string_to_array(k, '||'),1), '||'); + +CREATE INDEX %[2]s_prefix_idx ON %[2]s (%[1]s("key")) WHERE %[1]s("key") <> ''; +`, + opts.KeyPrefixFuncName, opts.StateTableName, + ), + ) + if err != nil { + return fmt.Errorf("failed to create virtual column: %w", err) + } + return nil + }, + }) } diff --git a/state/postgresql/v1/postgresql.go b/state/postgresql/v1/postgresql.go index d8d2be92d..2917924d0 100644 --- a/state/postgresql/v1/postgresql.go +++ b/state/postgresql/v1/postgresql.go @@ -14,6 +14,9 @@ limitations under the License. package postgresql import ( + "context" + "strings" + postgresql "github.com/dapr/components-contrib/common/component/postgresql/v1" "github.com/dapr/components-contrib/state" "github.com/dapr/kit/logger" @@ -61,3 +64,33 @@ func NewPostgreSQLStateStore(logger logger.Logger) state.Store { }, }) } + +// PostgreSQLStoreWithDeleteWithPrefix is a state store for PostgreSQL that implements the DeleteWithPrefix method +type PostgreSQLStoreWithDeleteWithPrefix struct { + state.Store +} + +// Features returns the features available in this state store. +func (p *PostgreSQLStoreWithDeleteWithPrefix) Features() []state.Feature { + return append(p.Store.Features(), state.FeatureDeleteWithPrefix) +} + +func (p *PostgreSQLStoreWithDeleteWithPrefix) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) { + err := req.Validate() + if err != nil { + return state.DeleteWithPrefixResponse{}, err + } + + ctx, cancel := context.WithTimeout(ctx, p.metadata.Timeout) + defer cancel() + + // Trim the trailing "||" from the prefix + result, err := p.db.Exec(ctx, "DELETE FROM "+p.metadata.TableName+" WHERE "+p.metadata.TableName+`_key_prefix("key") = $1`, strings.TrimSuffix(req.Prefix, "||")) + if err != nil { + return state.DeleteWithPrefixResponse{}, err + } + + return state.DeleteWithPrefixResponse{ + Count: result.RowsAffected(), + }, nil +} diff --git a/tests/config/state/tests.yml b/tests/config/state/tests.yml index 8e141eacf..75848f545 100644 --- a/tests/config/state/tests.yml +++ b/tests/config/state/tests.yml @@ -35,12 +35,12 @@ components: # This component requires etags to be hex-encoded numbers badEtag: "FFFF" - component: postgresql.v1.docker - operations: [ "transaction", "etag", "first-write", "query", "ttl" ] + operations: [ "transaction", "etag", "first-write", "query", "ttl", "delete-with-prefix" ] config: # This component requires etags to be numeric badEtag: "1" - component: postgresql.v1.azure - operations: [ "transaction", "etag", "first-write", "query", "ttl" ] + operations: [ "transaction", "etag", "first-write", "query", "ttl", "delete-with-prefix" ] config: # This component requires etags to be numeric badEtag: "1" @@ -83,7 +83,7 @@ components: # This component requires etags to be numeric badEtag: "9999999" - component: cockroachdb.v2 - operations: [ "transaction", "etag", "first-write", "ttl", "delete-with-prefix" ] + operations: [ "transaction", "etag", "first-write", "ttl" ] config: # This component requires etags to be UUIDs badEtag: "7b104dbd-1ae2-4772-bfa0-e29c7b89bc9b"