97 lines
3.4 KiB
Go
97 lines
3.4 KiB
Go
/*
|
|
Copyright 2021 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package postgresql
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
|
|
postgresql "github.com/dapr/components-contrib/common/component/postgresql/v1"
|
|
"github.com/dapr/components-contrib/state"
|
|
"github.com/dapr/kit/logger"
|
|
)
|
|
|
|
// NewPostgreSQLStateStore creates a new instance of PostgreSQL state store.
|
|
func NewPostgreSQLStateStore(logger logger.Logger) state.Store {
|
|
return postgresql.NewPostgreSQLQueryStateStore(logger, postgresql.Options{
|
|
ETagColumn: "xmin",
|
|
EnableAzureAD: true,
|
|
MigrateFn: performMigrations,
|
|
SetQueryFn: func(req *state.SetRequest, opts postgresql.SetQueryOptions) string {
|
|
// Sprintf is required for table name because the driver does not substitute parameters for table names.
|
|
if !req.HasETag() {
|
|
// We do an upsert in both cases, even when concurrency is first-write, because the row may exist but be expired (and not yet garbage collected)
|
|
// The difference is that with concurrency as first-write, we'll update the row only if it's expired
|
|
var whereClause string
|
|
if req.Options.Concurrency == state.FirstWrite {
|
|
whereClause = " WHERE (t.expiredate IS NOT NULL AND t.expiredate < CURRENT_TIMESTAMP)"
|
|
}
|
|
|
|
return `INSERT INTO ` + opts.TableName + ` AS t
|
|
(key, value, isbinary, expiredate)
|
|
VALUES
|
|
($1, $2, $3, ` + opts.ExpireDateValue + `)
|
|
ON CONFLICT (key)
|
|
DO UPDATE SET
|
|
value = excluded.value,
|
|
isbinary = excluded.isBinary,
|
|
updatedate = CURRENT_TIMESTAMP,
|
|
expiredate = ` + opts.ExpireDateValue +
|
|
whereClause
|
|
}
|
|
|
|
return `UPDATE ` + opts.TableName + `
|
|
SET
|
|
value = $2,
|
|
isbinary = $3,
|
|
updatedate = CURRENT_TIMESTAMP,
|
|
expiredate = ` + opts.ExpireDateValue + `
|
|
WHERE
|
|
key = $1
|
|
AND xmin = $4
|
|
AND (expiredate IS NULL OR expiredate > CURRENT_TIMESTAMP)`
|
|
},
|
|
})
|
|
}
|
|
|
|
// PostgreSQLStoreWithDeleteWithPrefix is a state store for PostgreSQL that implements the DeleteWithPrefix method
|
|
type PostgreSQLStoreWithDeleteWithPrefix struct {
|
|
state.Store
|
|
}
|
|
|
|
// Features returns the features available in this state store.
|
|
func (p *PostgreSQLStoreWithDeleteWithPrefix) Features() []state.Feature {
|
|
return append(p.Store.Features(), state.FeatureDeleteWithPrefix)
|
|
}
|
|
|
|
func (p *PostgreSQLStoreWithDeleteWithPrefix) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) {
|
|
err := req.Validate()
|
|
if err != nil {
|
|
return state.DeleteWithPrefixResponse{}, err
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, p.metadata.Timeout)
|
|
defer cancel()
|
|
|
|
// Trim the trailing "||" from the prefix
|
|
result, err := p.db.Exec(ctx, "DELETE FROM "+p.metadata.TableName+" WHERE "+p.metadata.TableName+`_key_prefix("key") = $1`, strings.TrimSuffix(req.Prefix, "||"))
|
|
if err != nil {
|
|
return state.DeleteWithPrefixResponse{}, err
|
|
}
|
|
|
|
return state.DeleteWithPrefixResponse{
|
|
Count: result.RowsAffected(),
|
|
}, nil
|
|
}
|